From 25c94af755ff438cfb9f9ec81ff56401902f0c80 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Wed, 1 Apr 2026 16:01:22 +0800 Subject: [PATCH] add running status debounce --- unilabos/app/ws_client.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index a4fb6433..eed32e1b 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -1369,6 +1369,10 @@ class WebSocketClient(BaseCommunicationClient): self.message_processor = MessageProcessor(self.websocket_url, self.send_queue, self.device_manager) self.queue_processor = QueueProcessor(self.device_manager, self.message_processor) + # running状态debounce缓存: {job_id: (last_send_timestamp, last_feedback_data)} + self._job_running_last_sent: Dict[str, tuple] = {} + self._job_running_debounce_interval: float = 10.0 # 秒 + # 设置相互引用 self.message_processor.set_queue_processor(self.queue_processor) self.message_processor.set_websocket_client(self) @@ -1468,22 +1472,32 @@ class WebSocketClient(BaseCommunicationClient): logger.debug(f"[WebSocketClient] Not connected, cannot publish job status for job_id: {item.job_id}") return + job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name) + # 拦截最终结果状态,与原版本逻辑一致 if status in ["success", "failed"]: + self._job_running_last_sent.pop(item.job_id, None) + host_node = HostNode.get_instance(0) if host_node: - # 从HostNode的device_action_status中移除job_id try: host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id, None) except (KeyError, AttributeError): logger.warning(f"[WebSocketClient] Failed to remove job {item.job_id} from HostNode status") - # logger.debug(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}") - - # 通知队列处理器job完成(包括timeout的job) self.queue_processor.handle_job_completed(item.job_id, status) - # 发送job状态消息 + # running状态按job_id做debounce,内容变化时仍然上报 + if status == "running": + now = time.time() + cached = self._job_running_last_sent.get(item.job_id) + if cached is not None: + last_ts, last_data = cached + if now - last_ts < self._job_running_debounce_interval and last_data == feedback_data: + logger.trace(f"[WebSocketClient] Job status debounced (skip): {job_log} - {status}") + return + self._job_running_last_sent[item.job_id] = (now, feedback_data) + message = { "action": "job_status", "data": { @@ -1499,7 +1513,6 @@ class WebSocketClient(BaseCommunicationClient): } self.message_processor.send_message(message) - job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name) logger.trace(f"[WebSocketClient] Job status published: {job_log} - {status}") def send_ping(self, ping_id: str, timestamp: float) -> None: