Compare commits

...

13 Commits

Author SHA1 Message Date
Xuwznln
6d319d91ff correct raise create resource error 2026-03-10 16:26:37 +08:00
Xuwznln
3155b2f97e ret info fix revert 2026-03-10 16:04:27 +08:00
Xuwznln
e5e30a1c7d ret info fix 2026-03-10 16:00:24 +08:00
Xuwznln
4e82f62327 fix prcxi check 2026-03-10 15:57:27 +08:00
Xuwznln
95d3456214 add create_resource schema 2026-03-10 15:27:39 +08:00
Xuwznln
38bf95b13c re signal host ready event 2026-03-10 14:13:06 +08:00
Xuwznln
f2c0bec02c add websocket connection timeout and improve reconnection logic
add open_timeout parameter to websocket connection
add TimeoutError and InvalidStatus exception handling
implement exponential backoff for reconnection attempts
simplify reconnection logic flow
2026-03-07 04:40:56 +08:00
Xuwznln
e0394bf414 Merge remote-tracking branch 'origin/dev' into dev 2026-03-04 19:18:55 +08:00
Xuwznln
975a56415a import gzip 2026-03-04 19:18:36 +08:00
Xuwznln
cadbe87e3f add gzip 2026-03-04 19:18:19 +08:00
Xuwznln
b993c1f590 add gzip 2026-03-04 19:18:09 +08:00
Xuwznln
e0fae94c10 change pose extra to any 2026-03-04 19:06:58 +08:00
Xuwznln
b5cd181ac1 add isFlapY 2026-03-04 18:59:45 +08:00
10 changed files with 126 additions and 76 deletions

View File

@@ -1340,5 +1340,5 @@ def setup_api_routes(app):
# 启动广播任务 # 启动广播任务
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
asyncio.create_task(broadcast_device_status()) asyncio.create_task(broadcast_device_status(), name="web-api-startup-device")
asyncio.create_task(broadcast_status_page_data()) asyncio.create_task(broadcast_status_page_data(), name="web-api-startup-status")

View File

@@ -3,7 +3,7 @@ HTTP客户端模块
提供与远程服务器通信的客户端功能只有host需要用 提供与远程服务器通信的客户端功能只有host需要用
""" """
import gzip
import json import json
import os import os
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
@@ -290,10 +290,17 @@ class HTTPClient:
Returns: Returns:
Response: API响应对象 Response: API响应对象
""" """
compressed_body = gzip.compress(
json.dumps(registry_data, ensure_ascii=False, default=str).encode("utf-8")
)
response = requests.post( response = requests.post(
f"{self.remote_addr}/lab/resource", f"{self.remote_addr}/lab/resource",
json=registry_data, data=compressed_body,
headers={"Authorization": f"Lab {self.auth}"}, headers={
"Authorization": f"Lab {self.auth}",
"Content-Type": "application/json",
"Content-Encoding": "gzip",
},
timeout=30, timeout=30,
) )
if response.status_code not in [200, 201]: if response.status_code not in [200, 201]:

View File

@@ -466,8 +466,10 @@ class MessageProcessor:
async with websockets.connect( async with websockets.connect(
self.websocket_url, self.websocket_url,
ssl=ssl_context, ssl=ssl_context,
open_timeout=20,
ping_interval=WSConfig.ping_interval, ping_interval=WSConfig.ping_interval,
ping_timeout=10, ping_timeout=10,
close_timeout=5,
additional_headers={ additional_headers={
"Authorization": f"Lab {BasicConfig.auth_secret()}", "Authorization": f"Lab {BasicConfig.auth_secret()}",
"EdgeSession": f"{self.session_id}", "EdgeSession": f"{self.session_id}",
@@ -478,81 +480,94 @@ class MessageProcessor:
self.connected = True self.connected = True
self.reconnect_count = 0 self.reconnect_count = 0
logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}") logger.info(f"[MessageProcessor] 已连接到 {self.websocket_url}")
# 启动发送协程 # 启动发送协程
send_task = asyncio.create_task(self._send_handler()) send_task = asyncio.create_task(self._send_handler(), name="websocket-send_task")
# 每次连接(含重连)后重新向服务端注册,
# 否则服务端不知道客户端已上线,不会推送消息。
if self.websocket_client:
self.websocket_client.publish_host_ready()
try: try:
# 接收消息循环 # 接收消息循环
await self._message_handler() await self._message_handler()
finally: finally:
# 必须在 async with __aexit__ 之前停止 send_task
# 否则 send_task 会在关闭握手期间继续发送数据,
# 干扰 websockets 库的内部清理,导致 task 泄漏。
self.connected = False
send_task.cancel() send_task.cancel()
try: try:
await send_task await send_task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
self.connected = False
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
logger.warning("[MessageProcessor] Connection closed") logger.warning("[MessageProcessor] 与服务端连接中断")
self.connected = False except TimeoutError:
logger.warning(
f"[MessageProcessor] 与服务端连接通信超时 (已尝试 {self.reconnect_count + 1} 次),请检查您的网络状况"
)
except websockets.exceptions.InvalidStatus as e:
logger.warning(
f"[MessageProcessor] 收到服务端注册码 {e.response.status_code}, 上一进程可能还未退出"
)
except Exception as e: except Exception as e:
logger.error(f"[MessageProcessor] Connection error: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
self.connected = False logger.error(f"[MessageProcessor] 尝试重连时出错 {str(e)}")
finally: finally:
self.connected = False
self.websocket = None self.websocket = None
# 重连逻辑 # 重连逻辑
if self.is_running and self.reconnect_count < WSConfig.max_reconnect_attempts: if not self.is_running:
break
if self.reconnect_count < WSConfig.max_reconnect_attempts:
self.reconnect_count += 1 self.reconnect_count += 1
backoff = WSConfig.reconnect_interval
logger.info( logger.info(
f"[MessageProcessor] Reconnecting in {WSConfig.reconnect_interval}s " f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
) )
await asyncio.sleep(WSConfig.reconnect_interval) await asyncio.sleep(backoff)
elif self.reconnect_count >= WSConfig.max_reconnect_attempts: else:
logger.error("[MessageProcessor] Max reconnection attempts reached") logger.error("[MessageProcessor] Max reconnection attempts reached")
break break
else:
self.reconnect_count -= 1
async def _message_handler(self): async def _message_handler(self):
"""处理接收到的消息""" """处理接收到的消息
ConnectionClosed 不在此处捕获,让其向上传播到 _connection_handler
以便 async with websockets.connect() 的 __aexit__ 能感知连接已断,
正确清理内部 task避免 task 泄漏。
"""
if not self.websocket: if not self.websocket:
logger.error("[MessageProcessor] WebSocket connection is None") logger.error("[MessageProcessor] WebSocket connection is None")
return return
try: async for message in self.websocket:
async for message in self.websocket: try:
try: data = json.loads(message)
data = json.loads(message) message_type = data.get("action", "")
message_type = data.get("action", "") message_data = data.get("data")
message_data = data.get("data") if self.session_id and self.session_id == data.get("edge_session"):
if self.session_id and self.session_id == data.get("edge_session"): await self._process_message(message_type, message_data)
await self._process_message(message_type, message_data) else:
if message_type.endswith("_material"):
logger.trace(
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
)
logger.debug(
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
)
else: else:
if message_type.endswith("_material"): await self._process_message(message_type, message_data)
logger.trace( except json.JSONDecodeError:
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}" logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
) except Exception as e:
logger.debug( logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}" logger.error(traceback.format_exc())
)
else:
await self._process_message(message_type, message_data)
except json.JSONDecodeError:
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
except Exception as e:
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
logger.error(traceback.format_exc())
except websockets.exceptions.ConnectionClosed:
logger.info("[MessageProcessor] Message handler stopped - connection closed")
except Exception as e:
logger.error(f"[MessageProcessor] Message handler error: {str(e)}")
logger.error(traceback.format_exc())
async def _send_handler(self): async def _send_handler(self):
"""处理发送队列中的消息""" """处理发送队列中的消息"""
@@ -601,6 +616,7 @@ class MessageProcessor:
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug("[MessageProcessor] Send handler cancelled") logger.debug("[MessageProcessor] Send handler cancelled")
raise
except Exception as e: except Exception as e:
logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}") logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())

View File

@@ -40,7 +40,7 @@ class BasicConfig:
class WSConfig: class WSConfig:
reconnect_interval = 5 # 重连间隔(秒) reconnect_interval = 5 # 重连间隔(秒)
max_reconnect_attempts = 999 # 最大重连次数 max_reconnect_attempts = 999 # 最大重连次数
ping_interval = 30 # ping间隔 ping_interval = 20 # ping间隔
# HTTP配置 # HTTP配置

View File

@@ -634,7 +634,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
def __init__( def __init__(
self, self,
deck: Deck, deck: PRCXI9300Deck,
host: str, host: str,
port: int, port: int,
timeout: float, timeout: float,
@@ -648,11 +648,11 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
is_9320=False, is_9320=False,
): ):
tablets_info = [] tablets_info = []
count = 0 for site_id in range(len(deck.sites)):
for child in deck.children: child = deck._get_site_resource(site_id)
# 如果放其他类型的物料,是不可以的 # 如果放其他类型的物料,是不可以的
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state: if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
number = int(child.name.replace("T", "")) number = site_id + 1
tablets_info.append( tablets_info.append(
WorkTablets( WorkTablets(
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"] Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]

View File

@@ -97,6 +97,18 @@ class Registry:
) )
test_resource_schema["description"] = "用于测试物料、设备和样本。" test_resource_schema["description"] = "用于测试物料、设备和样本。"
create_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("create_resource", {})
create_resource_schema = self._generate_unilab_json_command_schema(
create_resource_method_info.get("args", []),
"create_resource",
create_resource_method_info.get("return_annotation"),
)
create_resource_schema["description"] = "用于创建物料"
raw_create_resource_schema = ros_action_to_json_schema(
self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。"
)
raw_create_resource_schema["properties"]["result"] = create_resource_schema["properties"]["result"]
self.device_type_registry.update( self.device_type_registry.update(
{ {
"host_node": { "host_node": {
@@ -140,9 +152,7 @@ class Registry:
}, },
"feedback": {}, "feedback": {},
"result": {"success": "success"}, "result": {"success": "success"},
"schema": ros_action_to_json_schema( "schema": raw_create_resource_schema,
self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。"
),
"goal_default": yaml.safe_load( "goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal)) io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
), ),

View File

@@ -12,9 +12,11 @@ class RegularContainer(Container):
kwargs["size_y"] = 0 kwargs["size_y"] = 0
if "size_z" not in kwargs: if "size_z" not in kwargs:
kwargs["size_z"] = 0 kwargs["size_z"] = 0
if "category" not in kwargs:
kwargs["category"] = "container"
self.kwargs = kwargs self.kwargs = kwargs
super().__init__(*args, category="container", **kwargs) super().__init__(*args, **kwargs)
def load_state(self, state: Dict[str, Any]): def load_state(self, state: Dict[str, Any]):
super().load_state(state) super().load_state(state)

View File

@@ -75,14 +75,6 @@ class ResourceDictPositionObject(BaseModel):
z: float = Field(description="Z coordinate", default=0.0) z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPoseExtraObjectType(BaseModel):
z_index: int
class ResourceDictPoseExtraObject(BaseModel):
z_index: Optional[int] = Field(alias="zIndex", default=None)
class ResourceDictPositionType(TypedDict): class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType scale: ResourceDictPositionScaleType
@@ -109,7 +101,7 @@ class ResourceDictPosition(BaseModel):
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field( cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field(
description="Cross section type", default="rectangle" description="Cross section type", default="rectangle"
) )
extra: Optional[ResourceDictPoseExtraObject] = Field(description="Extra data", default=None) extra: Optional[Dict[str, Any]] = Field(description="Extra data", default=None)
class ResourceDictType(TypedDict): class ResourceDictType(TypedDict):
@@ -848,14 +840,27 @@ class ResourceTreeSet(object):
f"从远端同步了 {added_count} 个物料子树" f"从远端同步了 {added_count} 个物料子树"
) )
else: else:
# 情况2: 二级物料(不是 device # 二级物料已存在,比较三级子节点是否缺失
if remote_child_name not in local_children_map: local_material = local_children_map[remote_child_name]
# 引入整个子树 local_material_children_map = {child.res_content.name: child for child in
remote_child.res_content.parent = local_device.res_content local_material.children}
local_device.children.append(remote_child) added_count = 0
logger.info(f"Device '{remote_root_id}': 从远端同步物料子树 '{remote_child_name}'") for remote_sub in remote_child.children:
else: remote_sub_name = remote_sub.res_content.name
logger.info(f"物料 '{remote_root_id}/{remote_child_name}' 已存在,跳过") if remote_sub_name not in local_material_children_map:
remote_sub.res_content.parent = local_material.res_content
local_material.children.append(remote_sub)
added_count += 1
else:
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}/{remote_sub_name}' "
f"已存在,跳过"
)
if added_count > 0:
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}': "
f"从远端同步了 {added_count} 个子物料"
)
else: else:
# 情况1: 一级节点是物料(不是 device # 情况1: 一级节点是物料(不是 device
# 检查是否已存在 # 检查是否已存在

View File

@@ -569,9 +569,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
future.add_done_callback(done_cb) future.add_done_callback(done_cb)
except ImportError: except ImportError:
self.lab_logger().error("Host请求添加物料时本环境并不存在pylabrobot") self.lab_logger().error("Host请求添加物料时本环境并不存在pylabrobot")
res.response = get_result_info_str(traceback.format_exc(), False, {})
except Exception as e: except Exception as e:
self.lab_logger().error("Host请求添加物料时出错") self.lab_logger().error("Host请求添加物料时出错")
self.lab_logger().error(traceback.format_exc()) self.lab_logger().error(traceback.format_exc())
res.response = get_result_info_str(traceback.format_exc(), False, {})
return res return res
# noinspection PyTypeChecker # noinspection PyTypeChecker

View File

@@ -65,7 +65,13 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict): class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]] resources: List[List[ResourceDict]]
devices: List[Dict[str, Any]] devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample] # unilabos_samples: List[LabSample]
class CreateResourceReturn(TypedDict):
created_resource_tree: List[List[ResourceDict]]
liquid_input_resource_tree: List[Dict[str, Any]]
# unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict): class TestLatencyReturn(TypedDict):
@@ -556,7 +562,7 @@ class HostNode(BaseROS2DeviceNode):
liquid_type: list[str] = [], liquid_type: list[str] = [],
liquid_volume: list[int] = [], liquid_volume: list[int] = [],
slot_on_deck: str = "", slot_on_deck: str = "",
): ) -> CreateResourceReturn:
# 暂不支持多对同名父子同时存在 # 暂不支持多对同名父子同时存在
res_creation_input = { res_creation_input = {
"id": res_id.split("/")[-1], "id": res_id.split("/")[-1],
@@ -609,6 +615,8 @@ class HostNode(BaseROS2DeviceNode):
assert len(response) == 1, "Create Resource应当只返回一个结果" assert len(response) == 1, "Create Resource应当只返回一个结果"
for i in response: for i in response:
res = json.loads(i) res = json.loads(i)
if "suc" in res:
raise ValueError(res.get("error"))
return res return res
except Exception as ex: except Exception as ex:
pass pass