mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-27 14:03:05 +00:00
Compare commits
6 Commits
feature/or
...
6d319d91ff
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d319d91ff | ||
|
|
3155b2f97e | ||
|
|
e5e30a1c7d | ||
|
|
4e82f62327 | ||
|
|
95d3456214 | ||
|
|
38bf95b13c |
@@ -1340,5 +1340,5 @@ def setup_api_routes(app):
|
|||||||
# 启动广播任务
|
# 启动广播任务
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
async def startup_event():
|
async def startup_event():
|
||||||
asyncio.create_task(broadcast_device_status())
|
asyncio.create_task(broadcast_device_status(), name="web-api-startup-device")
|
||||||
asyncio.create_task(broadcast_status_page_data())
|
asyncio.create_task(broadcast_status_page_data(), name="web-api-startup-status")
|
||||||
|
|||||||
@@ -469,6 +469,7 @@ class MessageProcessor:
|
|||||||
open_timeout=20,
|
open_timeout=20,
|
||||||
ping_interval=WSConfig.ping_interval,
|
ping_interval=WSConfig.ping_interval,
|
||||||
ping_timeout=10,
|
ping_timeout=10,
|
||||||
|
close_timeout=5,
|
||||||
additional_headers={
|
additional_headers={
|
||||||
"Authorization": f"Lab {BasicConfig.auth_secret()}",
|
"Authorization": f"Lab {BasicConfig.auth_secret()}",
|
||||||
"EdgeSession": f"{self.session_id}",
|
"EdgeSession": f"{self.session_id}",
|
||||||
@@ -479,42 +480,45 @@ class MessageProcessor:
|
|||||||
self.connected = True
|
self.connected = True
|
||||||
self.reconnect_count = 0
|
self.reconnect_count = 0
|
||||||
|
|
||||||
logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}")
|
logger.info(f"[MessageProcessor] 已连接到 {self.websocket_url}")
|
||||||
|
|
||||||
# 启动发送协程
|
# 启动发送协程
|
||||||
send_task = asyncio.create_task(self._send_handler())
|
send_task = asyncio.create_task(self._send_handler(), name="websocket-send_task")
|
||||||
|
|
||||||
|
# 每次连接(含重连)后重新向服务端注册,
|
||||||
|
# 否则服务端不知道客户端已上线,不会推送消息。
|
||||||
|
if self.websocket_client:
|
||||||
|
self.websocket_client.publish_host_ready()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 接收消息循环
|
# 接收消息循环
|
||||||
await self._message_handler()
|
await self._message_handler()
|
||||||
finally:
|
finally:
|
||||||
|
# 必须在 async with __aexit__ 之前停止 send_task,
|
||||||
|
# 否则 send_task 会在关闭握手期间继续发送数据,
|
||||||
|
# 干扰 websockets 库的内部清理,导致 task 泄漏。
|
||||||
|
self.connected = False
|
||||||
send_task.cancel()
|
send_task.cancel()
|
||||||
try:
|
try:
|
||||||
await send_task
|
await send_task
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
self.connected = False
|
|
||||||
|
|
||||||
except websockets.exceptions.ConnectionClosed:
|
except websockets.exceptions.ConnectionClosed:
|
||||||
logger.warning("[MessageProcessor] Connection closed")
|
logger.warning("[MessageProcessor] 与服务端连接中断")
|
||||||
self.connected = False
|
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[MessageProcessor] Connection timeout (attempt {self.reconnect_count + 1}), "
|
f"[MessageProcessor] 与服务端连接通信超时 (已尝试 {self.reconnect_count + 1} 次),请检查您的网络状况"
|
||||||
f"server may be temporarily unavailable"
|
|
||||||
)
|
)
|
||||||
self.connected = False
|
|
||||||
except websockets.exceptions.InvalidStatus as e:
|
except websockets.exceptions.InvalidStatus as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[MessageProcessor] Server returned unexpected HTTP status {e.response.status_code}, "
|
f"[MessageProcessor] 收到服务端注册码 {e.response.status_code}, 上一进程可能还未退出"
|
||||||
f"WebSocket endpoint may not be ready yet"
|
|
||||||
)
|
)
|
||||||
self.connected = False
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[MessageProcessor] Connection error: {str(e)}")
|
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
self.connected = False
|
logger.error(f"[MessageProcessor] 尝试重连时出错 {str(e)}")
|
||||||
finally:
|
finally:
|
||||||
|
self.connected = False
|
||||||
self.websocket = None
|
self.websocket = None
|
||||||
|
|
||||||
# 重连逻辑
|
# 重连逻辑
|
||||||
@@ -522,10 +526,9 @@ class MessageProcessor:
|
|||||||
break
|
break
|
||||||
if self.reconnect_count < WSConfig.max_reconnect_attempts:
|
if self.reconnect_count < WSConfig.max_reconnect_attempts:
|
||||||
self.reconnect_count += 1
|
self.reconnect_count += 1
|
||||||
backoff = min(WSConfig.reconnect_interval * (2 ** (self.reconnect_count - 1)), 60)
|
backoff = WSConfig.reconnect_interval
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[MessageProcessor] Reconnecting in {backoff}s "
|
f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
|
||||||
f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
|
|
||||||
)
|
)
|
||||||
await asyncio.sleep(backoff)
|
await asyncio.sleep(backoff)
|
||||||
else:
|
else:
|
||||||
@@ -533,40 +536,38 @@ class MessageProcessor:
|
|||||||
break
|
break
|
||||||
|
|
||||||
async def _message_handler(self):
|
async def _message_handler(self):
|
||||||
"""处理接收到的消息"""
|
"""处理接收到的消息。
|
||||||
|
|
||||||
|
ConnectionClosed 不在此处捕获,让其向上传播到 _connection_handler,
|
||||||
|
以便 async with websockets.connect() 的 __aexit__ 能感知连接已断,
|
||||||
|
正确清理内部 task,避免 task 泄漏。
|
||||||
|
"""
|
||||||
if not self.websocket:
|
if not self.websocket:
|
||||||
logger.error("[MessageProcessor] WebSocket connection is None")
|
logger.error("[MessageProcessor] WebSocket connection is None")
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
async for message in self.websocket:
|
||||||
async for message in self.websocket:
|
try:
|
||||||
try:
|
data = json.loads(message)
|
||||||
data = json.loads(message)
|
message_type = data.get("action", "")
|
||||||
message_type = data.get("action", "")
|
message_data = data.get("data")
|
||||||
message_data = data.get("data")
|
if self.session_id and self.session_id == data.get("edge_session"):
|
||||||
if self.session_id and self.session_id == data.get("edge_session"):
|
await self._process_message(message_type, message_data)
|
||||||
await self._process_message(message_type, message_data)
|
else:
|
||||||
|
if message_type.endswith("_material"):
|
||||||
|
logger.trace(
|
||||||
|
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
if message_type.endswith("_material"):
|
await self._process_message(message_type, message_data)
|
||||||
logger.trace(
|
except json.JSONDecodeError:
|
||||||
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
|
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
|
||||||
)
|
except Exception as e:
|
||||||
logger.debug(
|
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
|
||||||
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
|
logger.error(traceback.format_exc())
|
||||||
)
|
|
||||||
else:
|
|
||||||
await self._process_message(message_type, message_data)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
|
|
||||||
except websockets.exceptions.ConnectionClosed:
|
|
||||||
logger.info("[MessageProcessor] Message handler stopped - connection closed")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"[MessageProcessor] Message handler error: {str(e)}")
|
|
||||||
logger.error(traceback.format_exc())
|
|
||||||
|
|
||||||
async def _send_handler(self):
|
async def _send_handler(self):
|
||||||
"""处理发送队列中的消息"""
|
"""处理发送队列中的消息"""
|
||||||
@@ -615,6 +616,7 @@ class MessageProcessor:
|
|||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.debug("[MessageProcessor] Send handler cancelled")
|
logger.debug("[MessageProcessor] Send handler cancelled")
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}")
|
logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}")
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ class BasicConfig:
|
|||||||
class WSConfig:
|
class WSConfig:
|
||||||
reconnect_interval = 5 # 重连间隔(秒)
|
reconnect_interval = 5 # 重连间隔(秒)
|
||||||
max_reconnect_attempts = 999 # 最大重连次数
|
max_reconnect_attempts = 999 # 最大重连次数
|
||||||
ping_interval = 30 # ping间隔(秒)
|
ping_interval = 20 # ping间隔(秒)
|
||||||
|
|
||||||
|
|
||||||
# HTTP配置
|
# HTTP配置
|
||||||
|
|||||||
@@ -634,7 +634,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
deck: Deck,
|
deck: PRCXI9300Deck,
|
||||||
host: str,
|
host: str,
|
||||||
port: int,
|
port: int,
|
||||||
timeout: float,
|
timeout: float,
|
||||||
@@ -648,11 +648,11 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
|
|||||||
is_9320=False,
|
is_9320=False,
|
||||||
):
|
):
|
||||||
tablets_info = []
|
tablets_info = []
|
||||||
count = 0
|
for site_id in range(len(deck.sites)):
|
||||||
for child in deck.children:
|
child = deck._get_site_resource(site_id)
|
||||||
# 如果放其他类型的物料,是不可以的
|
# 如果放其他类型的物料,是不可以的
|
||||||
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
|
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
|
||||||
number = int(child.name.replace("T", ""))
|
number = site_id + 1
|
||||||
tablets_info.append(
|
tablets_info.append(
|
||||||
WorkTablets(
|
WorkTablets(
|
||||||
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]
|
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]
|
||||||
|
|||||||
@@ -97,6 +97,18 @@ class Registry:
|
|||||||
)
|
)
|
||||||
test_resource_schema["description"] = "用于测试物料、设备和样本。"
|
test_resource_schema["description"] = "用于测试物料、设备和样本。"
|
||||||
|
|
||||||
|
create_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("create_resource", {})
|
||||||
|
create_resource_schema = self._generate_unilab_json_command_schema(
|
||||||
|
create_resource_method_info.get("args", []),
|
||||||
|
"create_resource",
|
||||||
|
create_resource_method_info.get("return_annotation"),
|
||||||
|
)
|
||||||
|
create_resource_schema["description"] = "用于创建物料"
|
||||||
|
raw_create_resource_schema = ros_action_to_json_schema(
|
||||||
|
self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。"
|
||||||
|
)
|
||||||
|
raw_create_resource_schema["properties"]["result"] = create_resource_schema["properties"]["result"]
|
||||||
|
|
||||||
self.device_type_registry.update(
|
self.device_type_registry.update(
|
||||||
{
|
{
|
||||||
"host_node": {
|
"host_node": {
|
||||||
@@ -140,9 +152,7 @@ class Registry:
|
|||||||
},
|
},
|
||||||
"feedback": {},
|
"feedback": {},
|
||||||
"result": {"success": "success"},
|
"result": {"success": "success"},
|
||||||
"schema": ros_action_to_json_schema(
|
"schema": raw_create_resource_schema,
|
||||||
self.ResourceCreateFromOuterEasy, "用于创建或更新物料资源,每次传入一个物料信息。"
|
|
||||||
),
|
|
||||||
"goal_default": yaml.safe_load(
|
"goal_default": yaml.safe_load(
|
||||||
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
|
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -12,9 +12,11 @@ class RegularContainer(Container):
|
|||||||
kwargs["size_y"] = 0
|
kwargs["size_y"] = 0
|
||||||
if "size_z" not in kwargs:
|
if "size_z" not in kwargs:
|
||||||
kwargs["size_z"] = 0
|
kwargs["size_z"] = 0
|
||||||
|
if "category" not in kwargs:
|
||||||
|
kwargs["category"] = "container"
|
||||||
|
|
||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
super().__init__(*args, category="container", **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
def load_state(self, state: Dict[str, Any]):
|
def load_state(self, state: Dict[str, Any]):
|
||||||
super().load_state(state)
|
super().load_state(state)
|
||||||
|
|||||||
@@ -569,9 +569,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
future.add_done_callback(done_cb)
|
future.add_done_callback(done_cb)
|
||||||
except ImportError:
|
except ImportError:
|
||||||
self.lab_logger().error("Host请求添加物料时,本环境并不存在pylabrobot")
|
self.lab_logger().error("Host请求添加物料时,本环境并不存在pylabrobot")
|
||||||
|
res.response = get_result_info_str(traceback.format_exc(), False, {})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.lab_logger().error("Host请求添加物料时出错")
|
self.lab_logger().error("Host请求添加物料时出错")
|
||||||
self.lab_logger().error(traceback.format_exc())
|
self.lab_logger().error(traceback.format_exc())
|
||||||
|
res.response = get_result_info_str(traceback.format_exc(), False, {})
|
||||||
return res
|
return res
|
||||||
|
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
|
|||||||
@@ -65,7 +65,13 @@ class DeviceActionStatus:
|
|||||||
class TestResourceReturn(TypedDict):
|
class TestResourceReturn(TypedDict):
|
||||||
resources: List[List[ResourceDict]]
|
resources: List[List[ResourceDict]]
|
||||||
devices: List[Dict[str, Any]]
|
devices: List[Dict[str, Any]]
|
||||||
unilabos_samples: List[LabSample]
|
# unilabos_samples: List[LabSample]
|
||||||
|
|
||||||
|
|
||||||
|
class CreateResourceReturn(TypedDict):
|
||||||
|
created_resource_tree: List[List[ResourceDict]]
|
||||||
|
liquid_input_resource_tree: List[Dict[str, Any]]
|
||||||
|
# unilabos_samples: List[LabSample]
|
||||||
|
|
||||||
|
|
||||||
class TestLatencyReturn(TypedDict):
|
class TestLatencyReturn(TypedDict):
|
||||||
@@ -556,7 +562,7 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
liquid_type: list[str] = [],
|
liquid_type: list[str] = [],
|
||||||
liquid_volume: list[int] = [],
|
liquid_volume: list[int] = [],
|
||||||
slot_on_deck: str = "",
|
slot_on_deck: str = "",
|
||||||
):
|
) -> CreateResourceReturn:
|
||||||
# 暂不支持多对同名父子同时存在
|
# 暂不支持多对同名父子同时存在
|
||||||
res_creation_input = {
|
res_creation_input = {
|
||||||
"id": res_id.split("/")[-1],
|
"id": res_id.split("/")[-1],
|
||||||
@@ -609,6 +615,8 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
assert len(response) == 1, "Create Resource应当只返回一个结果"
|
assert len(response) == 1, "Create Resource应当只返回一个结果"
|
||||||
for i in response:
|
for i in response:
|
||||||
res = json.loads(i)
|
res = json.loads(i)
|
||||||
|
if "suc" in res:
|
||||||
|
raise ValueError(res.get("error"))
|
||||||
return res
|
return res
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
pass
|
pass
|
||||||
|
|||||||
Reference in New Issue
Block a user