mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-04-01 19:55:26 +00:00
add running status debounce
This commit is contained in:
@@ -1369,6 +1369,10 @@ class WebSocketClient(BaseCommunicationClient):
|
||||
self.message_processor = MessageProcessor(self.websocket_url, self.send_queue, self.device_manager)
|
||||
self.queue_processor = QueueProcessor(self.device_manager, self.message_processor)
|
||||
|
||||
# running状态debounce缓存: {job_id: (last_send_timestamp, last_feedback_data)}
|
||||
self._job_running_last_sent: Dict[str, tuple] = {}
|
||||
self._job_running_debounce_interval: float = 10.0 # 秒
|
||||
|
||||
# 设置相互引用
|
||||
self.message_processor.set_queue_processor(self.queue_processor)
|
||||
self.message_processor.set_websocket_client(self)
|
||||
@@ -1468,22 +1472,32 @@ class WebSocketClient(BaseCommunicationClient):
|
||||
logger.debug(f"[WebSocketClient] Not connected, cannot publish job status for job_id: {item.job_id}")
|
||||
return
|
||||
|
||||
job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name)
|
||||
|
||||
# 拦截最终结果状态,与原版本逻辑一致
|
||||
if status in ["success", "failed"]:
|
||||
self._job_running_last_sent.pop(item.job_id, None)
|
||||
|
||||
host_node = HostNode.get_instance(0)
|
||||
if host_node:
|
||||
# 从HostNode的device_action_status中移除job_id
|
||||
try:
|
||||
host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id, None)
|
||||
except (KeyError, AttributeError):
|
||||
logger.warning(f"[WebSocketClient] Failed to remove job {item.job_id} from HostNode status")
|
||||
|
||||
# logger.debug(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}")
|
||||
|
||||
# 通知队列处理器job完成(包括timeout的job)
|
||||
self.queue_processor.handle_job_completed(item.job_id, status)
|
||||
|
||||
# 发送job状态消息
|
||||
# running状态按job_id做debounce,内容变化时仍然上报
|
||||
if status == "running":
|
||||
now = time.time()
|
||||
cached = self._job_running_last_sent.get(item.job_id)
|
||||
if cached is not None:
|
||||
last_ts, last_data = cached
|
||||
if now - last_ts < self._job_running_debounce_interval and last_data == feedback_data:
|
||||
logger.trace(f"[WebSocketClient] Job status debounced (skip): {job_log} - {status}")
|
||||
return
|
||||
self._job_running_last_sent[item.job_id] = (now, feedback_data)
|
||||
|
||||
message = {
|
||||
"action": "job_status",
|
||||
"data": {
|
||||
@@ -1499,7 +1513,6 @@ class WebSocketClient(BaseCommunicationClient):
|
||||
}
|
||||
self.message_processor.send_message(message)
|
||||
|
||||
job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name)
|
||||
logger.trace(f"[WebSocketClient] Job status published: {job_log} - {status}")
|
||||
|
||||
def send_ping(self, ping_id: str, timestamp: float) -> None:
|
||||
|
||||
Reference in New Issue
Block a user