re signal host ready event

This commit is contained in:
Xuwznln
2026-03-10 14:12:44 +08:00
parent f2c0bec02c
commit 38bf95b13c
4 changed files with 53 additions and 49 deletions

View File

@@ -1340,5 +1340,5 @@ def setup_api_routes(app):
# 启动广播任务 # 启动广播任务
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
asyncio.create_task(broadcast_device_status()) asyncio.create_task(broadcast_device_status(), name="web-api-startup-device")
asyncio.create_task(broadcast_status_page_data()) asyncio.create_task(broadcast_status_page_data(), name="web-api-startup-status")

View File

@@ -469,6 +469,7 @@ class MessageProcessor:
open_timeout=20, open_timeout=20,
ping_interval=WSConfig.ping_interval, ping_interval=WSConfig.ping_interval,
ping_timeout=10, ping_timeout=10,
close_timeout=5,
additional_headers={ additional_headers={
"Authorization": f"Lab {BasicConfig.auth_secret()}", "Authorization": f"Lab {BasicConfig.auth_secret()}",
"EdgeSession": f"{self.session_id}", "EdgeSession": f"{self.session_id}",
@@ -479,42 +480,45 @@ class MessageProcessor:
self.connected = True self.connected = True
self.reconnect_count = 0 self.reconnect_count = 0
logger.trace(f"[MessageProcessor] Connected to {self.websocket_url}") logger.info(f"[MessageProcessor] 已连接到 {self.websocket_url}")
# 启动发送协程 # 启动发送协程
send_task = asyncio.create_task(self._send_handler()) send_task = asyncio.create_task(self._send_handler(), name="websocket-send_task")
# 每次连接(含重连)后重新向服务端注册,
# 否则服务端不知道客户端已上线,不会推送消息。
if self.websocket_client:
self.websocket_client.publish_host_ready()
try: try:
# 接收消息循环 # 接收消息循环
await self._message_handler() await self._message_handler()
finally: finally:
# 必须在 async with __aexit__ 之前停止 send_task
# 否则 send_task 会在关闭握手期间继续发送数据,
# 干扰 websockets 库的内部清理,导致 task 泄漏。
self.connected = False
send_task.cancel() send_task.cancel()
try: try:
await send_task await send_task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
self.connected = False
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
logger.warning("[MessageProcessor] Connection closed") logger.warning("[MessageProcessor] 与服务端连接中断")
self.connected = False
except TimeoutError: except TimeoutError:
logger.warning( logger.warning(
f"[MessageProcessor] Connection timeout (attempt {self.reconnect_count + 1}), " f"[MessageProcessor] 与服务端连接通信超时 (已尝试 {self.reconnect_count + 1} 次),请检查您的网络状况"
f"server may be temporarily unavailable"
) )
self.connected = False
except websockets.exceptions.InvalidStatus as e: except websockets.exceptions.InvalidStatus as e:
logger.warning( logger.warning(
f"[MessageProcessor] Server returned unexpected HTTP status {e.response.status_code}, " f"[MessageProcessor] 收到服务端注册码 {e.response.status_code}, 上一进程可能还未退出"
f"WebSocket endpoint may not be ready yet"
) )
self.connected = False
except Exception as e: except Exception as e:
logger.error(f"[MessageProcessor] Connection error: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
self.connected = False logger.error(f"[MessageProcessor] 尝试重连时出错 {str(e)}")
finally: finally:
self.connected = False
self.websocket = None self.websocket = None
# 重连逻辑 # 重连逻辑
@@ -522,10 +526,9 @@ class MessageProcessor:
break break
if self.reconnect_count < WSConfig.max_reconnect_attempts: if self.reconnect_count < WSConfig.max_reconnect_attempts:
self.reconnect_count += 1 self.reconnect_count += 1
backoff = min(WSConfig.reconnect_interval * (2 ** (self.reconnect_count - 1)), 60) backoff = WSConfig.reconnect_interval
logger.info( logger.info(
f"[MessageProcessor] Reconnecting in {backoff}s " f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
) )
await asyncio.sleep(backoff) await asyncio.sleep(backoff)
else: else:
@@ -533,40 +536,38 @@ class MessageProcessor:
break break
async def _message_handler(self): async def _message_handler(self):
"""处理接收到的消息""" """处理接收到的消息
ConnectionClosed 不在此处捕获,让其向上传播到 _connection_handler
以便 async with websockets.connect() 的 __aexit__ 能感知连接已断,
正确清理内部 task避免 task 泄漏。
"""
if not self.websocket: if not self.websocket:
logger.error("[MessageProcessor] WebSocket connection is None") logger.error("[MessageProcessor] WebSocket connection is None")
return return
try: async for message in self.websocket:
async for message in self.websocket: try:
try: data = json.loads(message)
data = json.loads(message) message_type = data.get("action", "")
message_type = data.get("action", "") message_data = data.get("data")
message_data = data.get("data") if self.session_id and self.session_id == data.get("edge_session"):
if self.session_id and self.session_id == data.get("edge_session"): await self._process_message(message_type, message_data)
await self._process_message(message_type, message_data) else:
if message_type.endswith("_material"):
logger.trace(
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}"
)
logger.debug(
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}"
)
else: else:
if message_type.endswith("_material"): await self._process_message(message_type, message_data)
logger.trace( except json.JSONDecodeError:
f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}" logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
) except Exception as e:
logger.debug( logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}" logger.error(traceback.format_exc())
)
else:
await self._process_message(message_type, message_data)
except json.JSONDecodeError:
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
except Exception as e:
logger.error(f"[MessageProcessor] Error processing message: {str(e)}")
logger.error(traceback.format_exc())
except websockets.exceptions.ConnectionClosed:
logger.info("[MessageProcessor] Message handler stopped - connection closed")
except Exception as e:
logger.error(f"[MessageProcessor] Message handler error: {str(e)}")
logger.error(traceback.format_exc())
async def _send_handler(self): async def _send_handler(self):
"""处理发送队列中的消息""" """处理发送队列中的消息"""
@@ -615,6 +616,7 @@ class MessageProcessor:
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug("[MessageProcessor] Send handler cancelled") logger.debug("[MessageProcessor] Send handler cancelled")
raise
except Exception as e: except Exception as e:
logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}") logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())

View File

@@ -40,7 +40,7 @@ class BasicConfig:
class WSConfig: class WSConfig:
reconnect_interval = 5 # 重连间隔(秒) reconnect_interval = 5 # 重连间隔(秒)
max_reconnect_attempts = 999 # 最大重连次数 max_reconnect_attempts = 999 # 最大重连次数
ping_interval = 30 # ping间隔 ping_interval = 20 # ping间隔
# HTTP配置 # HTTP配置

View File

@@ -12,9 +12,11 @@ class RegularContainer(Container):
kwargs["size_y"] = 0 kwargs["size_y"] = 0
if "size_z" not in kwargs: if "size_z" not in kwargs:
kwargs["size_z"] = 0 kwargs["size_z"] = 0
if "category" not in kwargs:
kwargs["category"] = "container"
self.kwargs = kwargs self.kwargs = kwargs
super().__init__(*args, category="container", **kwargs) super().__init__(*args, **kwargs)
def load_state(self, state: Dict[str, Any]): def load_state(self, state: Dict[str, Any]):
super().load_state(state) super().load_state(state)