diff --git a/unilabos/app/web/api.py b/unilabos/app/web/api.py index 0f6077c8..a67d09d2 100644 --- a/unilabos/app/web/api.py +++ b/unilabos/app/web/api.py @@ -1340,5 +1340,5 @@ def setup_api_routes(app): # 启动广播任务 @app.on_event("startup") async def startup_event(): - asyncio.create_task(broadcast_device_status()) - asyncio.create_task(broadcast_status_page_data()) + asyncio.create_task(broadcast_device_status(), name="web-api-startup-device") + asyncio.create_task(broadcast_status_page_data(), name="web-api-startup-status") diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 35b4766a..faaa3075 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -469,6 +469,7 @@ class MessageProcessor: open_timeout=20, ping_interval=WSConfig.ping_interval, ping_timeout=10, + close_timeout=5, additional_headers={ "Authorization": f"Lab {BasicConfig.auth_secret()}", "EdgeSession": f"{self.session_id}", @@ -479,42 +480,45 @@ class MessageProcessor: self.connected = True self.reconnect_count = 0 - logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}") + logger.info(f"[MessageProcessor] 已连接到 {self.websocket_url}") # 启动发送协程 - send_task = asyncio.create_task(self._send_handler()) + send_task = asyncio.create_task(self._send_handler(), name="websocket-send_task") + + # 每次连接(含重连)后重新向服务端注册, + # 否则服务端不知道客户端已上线,不会推送消息。 + if self.websocket_client: + self.websocket_client.publish_host_ready() try: # 接收消息循环 await self._message_handler() finally: + # 必须在 async with __aexit__ 之前停止 send_task, + # 否则 send_task 会在关闭握手期间继续发送数据, + # 干扰 websockets 库的内部清理,导致 task 泄漏。 + self.connected = False send_task.cancel() try: await send_task except asyncio.CancelledError: pass - self.connected = False except websockets.exceptions.ConnectionClosed: - logger.warning("[MessageProcessor] Connection closed") - self.connected = False + logger.warning("[MessageProcessor] 与服务端连接中断") except TimeoutError: logger.warning( - f"[MessageProcessor] Connection timeout (attempt {self.reconnect_count + 1}), " - f"server may be temporarily unavailable" + f"[MessageProcessor] 与服务端连接通信超时 (已尝试 {self.reconnect_count + 1} 次),请检查您的网络状况" ) - self.connected = False except websockets.exceptions.InvalidStatus as e: logger.warning( - f"[MessageProcessor] Server returned unexpected HTTP status {e.response.status_code}, " - f"WebSocket endpoint may not be ready yet" + f"[MessageProcessor] 收到服务端注册码 {e.response.status_code}, 上一进程可能还未退出" ) - self.connected = False except Exception as e: - logger.error(f"[MessageProcessor] Connection error: {str(e)}") logger.error(traceback.format_exc()) - self.connected = False + logger.error(f"[MessageProcessor] 尝试重连时出错 {str(e)}") finally: + self.connected = False self.websocket = None # 重连逻辑 @@ -522,10 +526,9 @@ class MessageProcessor: break if self.reconnect_count < WSConfig.max_reconnect_attempts: self.reconnect_count += 1 - backoff = min(WSConfig.reconnect_interval * (2 ** (self.reconnect_count - 1)), 60) + backoff = WSConfig.reconnect_interval logger.info( - f"[MessageProcessor] Reconnecting in {backoff}s " - f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" + f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" ) await asyncio.sleep(backoff) else: @@ -533,40 +536,38 @@ class MessageProcessor: break async def _message_handler(self): - """处理接收到的消息""" + """处理接收到的消息。 + + ConnectionClosed 不在此处捕获,让其向上传播到 _connection_handler, + 以便 async with websockets.connect() 的 __aexit__ 能感知连接已断, + 正确清理内部 task,避免 task 泄漏。 + """ if not self.websocket: logger.error("[MessageProcessor] WebSocket connection is None") return - try: - async for message in self.websocket: - try: - data = json.loads(message) - message_type = data.get("action", "") - message_data = data.get("data") - if self.session_id and self.session_id == data.get("edge_session"): - await self._process_message(message_type, message_data) + async for message in self.websocket: + try: + data = json.loads(message) + message_type = data.get("action", "") + message_data = data.get("data") + if self.session_id and self.session_id == data.get("edge_session"): + await self._process_message(message_type, message_data) + else: + if message_type.endswith("_material"): + logger.trace( + f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}" + ) + logger.debug( + f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}" + ) else: - if message_type.endswith("_material"): - logger.trace( - f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}" - ) - logger.debug( - f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}" - ) - else: - await self._process_message(message_type, message_data) - except json.JSONDecodeError: - logger.error(f"[MessageProcessor] Invalid JSON received: {message}") - except Exception as e: - logger.error(f"[MessageProcessor] Error processing message: {str(e)}") - logger.error(traceback.format_exc()) - - except websockets.exceptions.ConnectionClosed: - logger.info("[MessageProcessor] Message handler stopped - connection closed") - except Exception as e: - logger.error(f"[MessageProcessor] Message handler error: {str(e)}") - logger.error(traceback.format_exc()) + await self._process_message(message_type, message_data) + except json.JSONDecodeError: + logger.error(f"[MessageProcessor] Invalid JSON received: {message}") + except Exception as e: + logger.error(f"[MessageProcessor] Error processing message: {str(e)}") + logger.error(traceback.format_exc()) async def _send_handler(self): """处理发送队列中的消息""" @@ -615,6 +616,7 @@ class MessageProcessor: except asyncio.CancelledError: logger.debug("[MessageProcessor] Send handler cancelled") + raise except Exception as e: logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}") logger.error(traceback.format_exc()) diff --git a/unilabos/config/config.py b/unilabos/config/config.py index 4b7d91a4..d66b399d 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -40,7 +40,7 @@ class BasicConfig: class WSConfig: reconnect_interval = 5 # 重连间隔(秒) max_reconnect_attempts = 999 # 最大重连次数 - ping_interval = 30 # ping间隔(秒) + ping_interval = 20 # ping间隔(秒) # HTTP配置 diff --git a/unilabos/resources/container.py b/unilabos/resources/container.py index ed3871d3..08d40af0 100644 --- a/unilabos/resources/container.py +++ b/unilabos/resources/container.py @@ -12,9 +12,11 @@ class RegularContainer(Container): kwargs["size_y"] = 0 if "size_z" not in kwargs: kwargs["size_z"] = 0 + if "category" not in kwargs: + kwargs["category"] = "container" self.kwargs = kwargs - super().__init__(*args, category="container", **kwargs) + super().__init__(*args, **kwargs) def load_state(self, state: Dict[str, Any]): super().load_state(state)