Compare commits

...

7 Commits

Author SHA1 Message Date
Xuwznln
f2c0bec02c add websocket connection timeout and improve reconnection logic
add open_timeout parameter to websocket connection
add TimeoutError and InvalidStatus exception handling
implement exponential backoff for reconnection attempts
simplify reconnection logic flow
2026-03-07 04:40:56 +08:00
Xuwznln
e0394bf414 Merge remote-tracking branch 'origin/dev' into dev 2026-03-04 19:18:55 +08:00
Xuwznln
975a56415a import gzip 2026-03-04 19:18:36 +08:00
Xuwznln
cadbe87e3f add gzip 2026-03-04 19:18:19 +08:00
Xuwznln
b993c1f590 add gzip 2026-03-04 19:18:09 +08:00
Xuwznln
e0fae94c10 change pose extra to any 2026-03-04 19:06:58 +08:00
Xuwznln
b5cd181ac1 add isFlapY 2026-03-04 18:59:45 +08:00
3 changed files with 52 additions and 26 deletions

View File

@@ -3,7 +3,7 @@ HTTP客户端模块
提供与远程服务器通信的客户端功能只有host需要用 提供与远程服务器通信的客户端功能只有host需要用
""" """
import gzip
import json import json
import os import os
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
@@ -290,10 +290,17 @@ class HTTPClient:
Returns: Returns:
Response: API响应对象 Response: API响应对象
""" """
compressed_body = gzip.compress(
json.dumps(registry_data, ensure_ascii=False, default=str).encode("utf-8")
)
response = requests.post( response = requests.post(
f"{self.remote_addr}/lab/resource", f"{self.remote_addr}/lab/resource",
json=registry_data, data=compressed_body,
headers={"Authorization": f"Lab {self.auth}"}, headers={
"Authorization": f"Lab {self.auth}",
"Content-Type": "application/json",
"Content-Encoding": "gzip",
},
timeout=30, timeout=30,
) )
if response.status_code not in [200, 201]: if response.status_code not in [200, 201]:

View File

@@ -466,6 +466,7 @@ class MessageProcessor:
async with websockets.connect( async with websockets.connect(
self.websocket_url, self.websocket_url,
ssl=ssl_context, ssl=ssl_context,
open_timeout=20,
ping_interval=WSConfig.ping_interval, ping_interval=WSConfig.ping_interval,
ping_timeout=10, ping_timeout=10,
additional_headers={ additional_headers={
@@ -497,6 +498,18 @@ class MessageProcessor:
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
logger.warning("[MessageProcessor] Connection closed") logger.warning("[MessageProcessor] Connection closed")
self.connected = False self.connected = False
except TimeoutError:
logger.warning(
f"[MessageProcessor] Connection timeout (attempt {self.reconnect_count + 1}), "
f"server may be temporarily unavailable"
)
self.connected = False
except websockets.exceptions.InvalidStatus as e:
logger.warning(
f"[MessageProcessor] Server returned unexpected HTTP status {e.response.status_code}, "
f"WebSocket endpoint may not be ready yet"
)
self.connected = False
except Exception as e: except Exception as e:
logger.error(f"[MessageProcessor] Connection error: {str(e)}") logger.error(f"[MessageProcessor] Connection error: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
@@ -505,18 +518,19 @@ class MessageProcessor:
self.websocket = None self.websocket = None
# 重连逻辑 # 重连逻辑
if self.is_running and self.reconnect_count < WSConfig.max_reconnect_attempts: if not self.is_running:
break
if self.reconnect_count < WSConfig.max_reconnect_attempts:
self.reconnect_count += 1 self.reconnect_count += 1
backoff = min(WSConfig.reconnect_interval * (2 ** (self.reconnect_count - 1)), 60)
logger.info( logger.info(
f"[MessageProcessor] Reconnecting in {WSConfig.reconnect_interval}s " f"[MessageProcessor] Reconnecting in {backoff}s "
f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
) )
await asyncio.sleep(WSConfig.reconnect_interval) await asyncio.sleep(backoff)
elif self.reconnect_count >= WSConfig.max_reconnect_attempts: else:
logger.error("[MessageProcessor] Max reconnection attempts reached") logger.error("[MessageProcessor] Max reconnection attempts reached")
break break
else:
self.reconnect_count -= 1
async def _message_handler(self): async def _message_handler(self):
"""处理接收到的消息""" """处理接收到的消息"""

View File

@@ -75,14 +75,6 @@ class ResourceDictPositionObject(BaseModel):
z: float = Field(description="Z coordinate", default=0.0) z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPoseExtraObjectType(BaseModel):
z_index: int
class ResourceDictPoseExtraObject(BaseModel):
z_index: Optional[int] = Field(alias="zIndex", default=None)
class ResourceDictPositionType(TypedDict): class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType scale: ResourceDictPositionScaleType
@@ -109,7 +101,7 @@ class ResourceDictPosition(BaseModel):
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field( cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field(
description="Cross section type", default="rectangle" description="Cross section type", default="rectangle"
) )
extra: Optional[ResourceDictPoseExtraObject] = Field(description="Extra data", default=None) extra: Optional[Dict[str, Any]] = Field(description="Extra data", default=None)
class ResourceDictType(TypedDict): class ResourceDictType(TypedDict):
@@ -848,14 +840,27 @@ class ResourceTreeSet(object):
f"从远端同步了 {added_count} 个物料子树" f"从远端同步了 {added_count} 个物料子树"
) )
else: else:
# 情况2: 二级物料(不是 device # 二级物料已存在,比较三级子节点是否缺失
if remote_child_name not in local_children_map: local_material = local_children_map[remote_child_name]
# 引入整个子树 local_material_children_map = {child.res_content.name: child for child in
remote_child.res_content.parent = local_device.res_content local_material.children}
local_device.children.append(remote_child) added_count = 0
logger.info(f"Device '{remote_root_id}': 从远端同步物料子树 '{remote_child_name}'") for remote_sub in remote_child.children:
remote_sub_name = remote_sub.res_content.name
if remote_sub_name not in local_material_children_map:
remote_sub.res_content.parent = local_material.res_content
local_material.children.append(remote_sub)
added_count += 1
else: else:
logger.info(f"物料 '{remote_root_id}/{remote_child_name}' 已存在,跳过") logger.info(
f"物料 '{remote_root_id}/{remote_child_name}/{remote_sub_name}' "
f"已存在,跳过"
)
if added_count > 0:
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}': "
f"从远端同步了 {added_count} 个子物料"
)
else: else:
# 情况1: 一级节点是物料(不是 device # 情况1: 一级节点是物料(不是 device
# 检查是否已存在 # 检查是否已存在