add websocket connection timeout and improve reconnection logic

add open_timeout parameter to websocket connection
add TimeoutError and InvalidStatus exception handling
implement exponential backoff for reconnection attempts
simplify reconnection logic flow
This commit is contained in:
Xuwznln
2026-03-07 04:40:34 +08:00
parent e0394bf414
commit f2c0bec02c

View File

@@ -466,6 +466,7 @@ class MessageProcessor:
async with websockets.connect( async with websockets.connect(
self.websocket_url, self.websocket_url,
ssl=ssl_context, ssl=ssl_context,
open_timeout=20,
ping_interval=WSConfig.ping_interval, ping_interval=WSConfig.ping_interval,
ping_timeout=10, ping_timeout=10,
additional_headers={ additional_headers={
@@ -497,6 +498,18 @@ class MessageProcessor:
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
logger.warning("[MessageProcessor] Connection closed") logger.warning("[MessageProcessor] Connection closed")
self.connected = False self.connected = False
except TimeoutError:
logger.warning(
f"[MessageProcessor] Connection timeout (attempt {self.reconnect_count + 1}), "
f"server may be temporarily unavailable"
)
self.connected = False
except websockets.exceptions.InvalidStatus as e:
logger.warning(
f"[MessageProcessor] Server returned unexpected HTTP status {e.response.status_code}, "
f"WebSocket endpoint may not be ready yet"
)
self.connected = False
except Exception as e: except Exception as e:
logger.error(f"[MessageProcessor] Connection error: {str(e)}") logger.error(f"[MessageProcessor] Connection error: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
@@ -505,18 +518,19 @@ class MessageProcessor:
self.websocket = None self.websocket = None
# 重连逻辑 # 重连逻辑
if self.is_running and self.reconnect_count < WSConfig.max_reconnect_attempts: if not self.is_running:
break
if self.reconnect_count < WSConfig.max_reconnect_attempts:
self.reconnect_count += 1 self.reconnect_count += 1
backoff = min(WSConfig.reconnect_interval * (2 ** (self.reconnect_count - 1)), 60)
logger.info( logger.info(
f"[MessageProcessor] Reconnecting in {WSConfig.reconnect_interval}s " f"[MessageProcessor] Reconnecting in {backoff}s "
f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
) )
await asyncio.sleep(WSConfig.reconnect_interval) await asyncio.sleep(backoff)
elif self.reconnect_count >= WSConfig.max_reconnect_attempts: else:
logger.error("[MessageProcessor] Max reconnection attempts reached") logger.error("[MessageProcessor] Max reconnection attempts reached")
break break
else:
self.reconnect_count -= 1
async def _message_handler(self): async def _message_handler(self):
"""处理接收到的消息""" """处理接收到的消息"""