Compare commits

...

6 Commits

Author SHA1 Message Date
Xuwznln
6d319d91ff correct raise create resource error 2026-03-10 16:26:37 +08:00
Xuwznln
3155b2f97e ret info fix revert 2026-03-10 16:04:27 +08:00
Xuwznln
e5e30a1c7d ret info fix 2026-03-10 16:00:24 +08:00
Xuwznln
4e82f62327 fix prcxi check 2026-03-10 15:57:27 +08:00
Xuwznln
95d3456214 add create_resource schema 2026-03-10 15:27:39 +08:00
Xuwznln
38bf95b13c re signal host ready event 2026-03-10 14:13:06 +08:00
8 changed files with 82 additions and 58 deletions

View File

@@ -1340,5 +1340,5 @@ def setup_api_routes(app):
# 启动广播任务
@app.on_event("startup")
async def startup_event():
asyncio.create_task(broadcast_device_status())
asyncio.create_task(broadcast_status_page_data())
asyncio.create_task(broadcast_device_status(), name="web-api-startup-device")
asyncio.create_task(broadcast_status_page_data(), name="web-api-startup-status")

View File

@@ -469,6 +469,7 @@ class MessageProcessor:
open_timeout=20,
ping_interval=WSConfig.ping_interval,
ping_timeout=10,
close_timeout=5,
additional_headers={
"Authorization": f"Lab {BasicConfig.auth_secret()}",
"EdgeSession": f"{self.session_id}",
@@ -479,42 +480,45 @@ class MessageProcessor:
self.connected = True
self.reconnect_count = 0
logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}")
logger.info(f"[MessageProcessor] 已连接到 {self.websocket_url}")
# 启动发送协程
send_task = asyncio.create_task(self._send_handler())
send_task = asyncio.create_task(self._send_handler(), name="websocket-send_task")
# 每次连接(含重连)后重新向服务端注册,
# 否则服务端不知道客户端已上线,不会推送消息。
if self.websocket_client:
self.websocket_client.publish_host_ready()
try:
# 接收消息循环
await self._message_handler()
finally:
# 必须在 async with __aexit__ 之前停止 send_task
# 否则 send_task 会在关闭握手期间继续发送数据,
# 干扰 websockets 库的内部清理,导致 task 泄漏。
self.connected = False
send_task.cancel()
try:
await send_task
except asyncio.CancelledError:
pass
self.connected = False
except websockets.exceptions.ConnectionClosed:
logger.warning("[MessageProcessor] Connection closed")
self.connected = False
logger.warning("[MessageProcessor] 与服务端连接中断")
except TimeoutError:
logger.warning(
f"[MessageProcessor] Connection timeout (attempt {self.reconnect_count + 1}), "
f"server may be temporarily unavailable"
f"[MessageProcessor] 与服务端连接通信超时 (已尝试 {self.reconnect_count + 1} 次),请检查您的网络状况"
)
self.connected = False
except websockets.exceptions.InvalidStatus as e:
logger.warning(
f"[MessageProcessor] Server returned unexpected HTTP status {e.response.status_code}, "
f"WebSocket endpoint may not be ready yet"
f"[MessageProcessor] 收到服务端注册码 {e.response.status_code}, 上一进程可能还未退出"
)
self.connected = False
except Exception as e:
logger.error(f"[MessageProcessor] Connection error: {str(e)}")
logger.error(traceback.format_exc())
self.connected = False
logger.error(f"[MessageProcessor] 尝试重连时出错 {str(e)}")
finally:
self.connected = False
self.websocket = None
# 重连逻辑
@@ -522,10 +526,9 @@ class MessageProcessor:
break
if self.reconnect_count < WSConfig.max_reconnect_attempts:
self.reconnect_count += 1
backoff = min(WSConfig.reconnect_interval * (2 ** (self.reconnect_count - 1)), 60)
backoff = WSConfig.reconnect_interval
logger.info(
f"[MessageProcessor] Reconnecting in {backoff}s "
f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
)
await asyncio.sleep(backoff)
else:
@@ -533,40 +536,38 @@ class MessageProcessor:
break
async def _message_handler(self):
"""处理接收到的消息"""
"""处理接收到的消息
ConnectionClosed 不在此处捕获,让其向上传播到 _connection_handler
以便 async with websockets.connect() 的 __aexit__ 能感知连接已断,
正确清理内部 task避免 task 泄漏。
"""
if not self.websocket:
logger.error("[MessageProcessor] WebSocket connection is None")
return
try:
async for message in self.websocket:
try:
data = json.loads(message)
message_type = data.get("action", "")
message_data = data.get("data")
if self.session_id and self.session_id == data.get("edge_session"):
await self._process_message(message_type, message_data)
async for message in self.websocket:
try:
data = json.loads(message)
message_type = data.get("action", "")
message_data = data.get("data")
if self.session_id and self.session_id == data.get("edge_session"):
await self._process_message(message_type, message_data)
else:
if message_type.endswith("_material"):
logger.trace(
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
)
logger.debug(
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
)
else:
if message_type.endswith("_material"):
logger.trace(
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
)
logger.debug(
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
)
else:
await self._process_message(message_type, message_data)
except json.JSONDecodeError:
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
except Exception as e:
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
logger.error(traceback.format_exc())
except websockets.exceptions.ConnectionClosed:
logger.info("[MessageProcessor] Message handler stopped - connection closed")
except Exception as e:
logger.error(f"[MessageProcessor] Message handler error: {str(e)}")
logger.error(traceback.format_exc())
await self._process_message(message_type, message_data)
except json.JSONDecodeError:
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
except Exception as e:
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
logger.error(traceback.format_exc())
async def _send_handler(self):
"""处理发送队列中的消息"""
@@ -615,6 +616,7 @@ class MessageProcessor:
except asyncio.CancelledError:
logger.debug("[MessageProcessor] Send handler cancelled")
raise
except Exception as e:
logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}")
logger.error(traceback.format_exc())

View File

@@ -40,7 +40,7 @@ class BasicConfig:
class WSConfig:
reconnect_interval = 5 # 重连间隔(秒)
max_reconnect_attempts = 999 # 最大重连次数
ping_interval = 30 # ping间隔
ping_interval = 20 # ping间隔
# HTTP配置

View File

@@ -634,7 +634,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
def __init__(
self,
deck: Deck,
deck: PRCXI9300Deck,
host: str,
port: int,
timeout: float,
@@ -648,11 +648,11 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
is_9320=False,
):
tablets_info = []
count = 0
for child in deck.children:
for site_id in range(len(deck.sites)):
child = deck._get_site_resource(site_id)
# 如果放其他类型的物料,是不可以的
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
number = int(child.name.replace("T", ""))
number = site_id + 1
tablets_info.append(
WorkTablets(
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]

View File

@@ -97,6 +97,18 @@ class Registry:
)
test_resource_schema["description"] = "用于测试物料、设备和样本。"
create_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("create_resource", {})
create_resource_schema = self._generate_unilab_json_command_schema(
create_resource_method_info.get("args", []),
"create_resource",
create_resource_method_info.get("return_annotation"),
)
create_resource_schema["description"] = "用于创建物料"
raw_create_resource_schema = ros_action_to_json_schema(
self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。"
)
raw_create_resource_schema["properties"]["result"] = create_resource_schema["properties"]["result"]
self.device_type_registry.update(
{
"host_node": {
@@ -140,9 +152,7 @@ class Registry:
},
"feedback": {},
"result": {"success": "success"},
"schema": ros_action_to_json_schema(
self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。"
),
"schema": raw_create_resource_schema,
"goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
),

View File

@@ -12,9 +12,11 @@ class RegularContainer(Container):
kwargs["size_y"] = 0
if "size_z" not in kwargs:
kwargs["size_z"] = 0
if "category" not in kwargs:
kwargs["category"] = "container"
self.kwargs = kwargs
super().__init__(*args, category="container", **kwargs)
super().__init__(*args, **kwargs)
def load_state(self, state: Dict[str, Any]):
super().load_state(state)

View File

@@ -569,9 +569,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
future.add_done_callback(done_cb)
except ImportError:
self.lab_logger().error("Host请求添加物料时本环境并不存在pylabrobot")
res.response = get_result_info_str(traceback.format_exc(), False, {})
except Exception as e:
self.lab_logger().error("Host请求添加物料时出错")
self.lab_logger().error(traceback.format_exc())
res.response = get_result_info_str(traceback.format_exc(), False, {})
return res
# noinspection PyTypeChecker

View File

@@ -65,7 +65,13 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample]
# unilabos_samples: List[LabSample]
class CreateResourceReturn(TypedDict):
created_resource_tree: List[List[ResourceDict]]
liquid_input_resource_tree: List[Dict[str, Any]]
# unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict):
@@ -556,7 +562,7 @@ class HostNode(BaseROS2DeviceNode):
liquid_type: list[str] = [],
liquid_volume: list[int] = [],
slot_on_deck: str = "",
):
) -> CreateResourceReturn:
# 暂不支持多对同名父子同时存在
res_creation_input = {
"id": res_id.split("/")[-1],
@@ -609,6 +615,8 @@ class HostNode(BaseROS2DeviceNode):
assert len(response) == 1, "Create Resource应当只返回一个结果"
for i in response:
res = json.loads(i)
if "suc" in res:
raise ValueError(res.get("error"))
return res
except Exception as ex:
pass