From 81e9068597d835d9852c8c2b8b2b3d19c62f8e8b Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Wed, 20 May 2026 18:14:13 +0800 Subject: [PATCH] support notebook id --- unilabos/app/model.py | 1 + unilabos/app/web/controller.py | 1 + unilabos/app/ws_client.py | 56 +++++++++++++++++++++++++++++++--- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/unilabos/app/model.py b/unilabos/app/model.py index f80ce35a..3a031aaa 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -59,6 +59,7 @@ class JobAddReq(BaseModel): task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="") job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="") node_id: str = Field(examples=["node_id"], description="node uuid", default="") + notebook_id: str = Field(examples=["notebook_id"], description="notebook uuid", default="") server_info: dict = Field( examples=[{"send_timestamp": 1717000000.0}], description="server info (auto-generated if empty)", diff --git a/unilabos/app/web/controller.py b/unilabos/app/web/controller.py index 6a01645c..147b4d20 100644 --- a/unilabos/app/web/controller.py +++ b/unilabos/app/web/controller.py @@ -320,6 +320,7 @@ def job_add(req: JobAddReq) -> JobData: action_name=action_name, task_id=task_id, job_id=job_id, + notebook_id=req.notebook_id, device_action_key=device_action_key, ) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 4823a232..fbe19b43 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -59,6 +59,7 @@ class QueueItem: action_name: str task_id: str job_id: str + notebook_id: str device_action_key: str next_run_time: float = 0 # 下次执行时间戳 retry_count: int = 0 # 重试次数 @@ -71,6 +72,7 @@ class JobInfo: job_id: str task_id: str device_id: str + notebook_id: str action_name: str device_action_key: str status: JobStatus @@ -539,7 +541,10 @@ class MessageProcessor: self.reconnect_count += 1 backoff = WSConfig.reconnect_interval logger.info( - f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" + "[MessageProcessor] 即将在 %s 秒后重连 (已尝试 %s/%s)", + backoff, + self.reconnect_count, + WSConfig.max_reconnect_attempts, ) await asyncio.sleep(backoff) else: @@ -703,6 +708,7 @@ class MessageProcessor: action_name = data.get("action_name", "") task_id = data.get("task_id", "") job_id = data.get("job_id", "") + notebook_id = data.get("notebook_id", "") if not all([device_id, action_name, task_id, job_id]): logger.error("[MessageProcessor] Missing required fields in query_action_state") @@ -718,6 +724,7 @@ class MessageProcessor: job_id=job_id, task_id=task_id, device_id=device_id, + notebook_id=notebook_id, action_name=action_name, device_action_key=device_action_key, status=JobStatus.QUEUE, @@ -732,13 +739,27 @@ class MessageProcessor: if can_start_immediately: # 可以立即开始 await self._send_action_state_response( - device_id, action_name, task_id, job_id, "query_action_status", True, 0 + device_id, + action_name, + task_id, + job_id, + "query_action_status", + True, + 0, + notebook_id=notebook_id, ) logger.trace(f"[MessageProcessor] Job {job_log} can start immediately") else: # 需要排队 await self._send_action_state_response( - device_id, action_name, task_id, job_id, "query_action_status", False, 10 + device_id, + action_name, + task_id, + job_id, + "query_action_status", + False, + 10, + notebook_id=notebook_id, ) logger.trace(f"[MessageProcessor] Job {job_log} queued") @@ -768,6 +789,7 @@ class MessageProcessor: job_id=req.job_id, task_id=req.task_id, device_id=req.device_id, + notebook_id=req.notebook_id, action_name=action_name, device_action_key=device_action_key, status=JobStatus.QUEUE, @@ -775,11 +797,16 @@ class MessageProcessor: always_free=True, ) self.device_manager.add_queue_request(job_info) + existing_job = job_info logger.info(f"[MessageProcessor] Job {job_log} always_free, auto-registered from direct job_start") else: logger.error(f"[MessageProcessor] Job {job_log} not registered (missing query_action_state)") return + if existing_job and req.notebook_id and not existing_job.notebook_id: + existing_job.notebook_id = req.notebook_id + notebook_id = req.notebook_id or (existing_job.notebook_id if existing_job else "") + success = self.device_manager.start_job(req.job_id) if not success: logger.error(f"[MessageProcessor] Failed to start job {job_log}") @@ -795,6 +822,7 @@ class MessageProcessor: action_name=req.action, task_id=req.task_id, job_id=req.job_id, + notebook_id=notebook_id, device_action_key=device_action_key, ) @@ -834,6 +862,7 @@ class MessageProcessor: "job_id": req.job_id, "task_id": req.task_id, "device_id": req.device_id, + "notebook_id": queue_item.notebook_id, "action_name": req.action, "status": "failed", "feedback_data": {}, @@ -855,6 +884,7 @@ class MessageProcessor: "query_action_status", True, 0, + notebook_id=next_job.notebook_id, ) next_job_log = format_job_log( next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name @@ -1101,7 +1131,15 @@ class MessageProcessor: logger.info(f"[MessageProcessor] Restart cleanup scheduled") async def _send_action_state_response( - self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int + self, + device_id: str, + action_name: str, + task_id: str, + job_id: str, + typ: str, + free: bool, + need_more: int, + notebook_id: str = "", ): """发送动作状态响应""" message = { @@ -1112,6 +1150,7 @@ class MessageProcessor: "action_name": action_name, "task_id": task_id, "job_id": job_id, + "notebook_id": notebook_id, "free": free, "need_more": need_more + 1, }, @@ -1194,6 +1233,7 @@ class QueueProcessor: action_name=timeout_job.action_name, task_id=timeout_job.task_id, job_id=timeout_job.job_id, + notebook_id=timeout_job.notebook_id, device_action_key=timeout_job.device_action_key, ) # 发布超时失败状态,这会触发正常的job完成流程 @@ -1252,6 +1292,7 @@ class QueueProcessor: "action_name": job_info.action_name, "task_id": job_info.task_id, "job_id": job_info.job_id, + "notebook_id": job_info.notebook_id, "free": False, "need_more": 10 + 1, }, @@ -1291,6 +1332,7 @@ class QueueProcessor: "action_name": job_info.action_name, "task_id": job_info.task_id, "job_id": job_info.job_id, + "notebook_id": job_info.notebook_id, "free": False, "need_more": 10 + 1, }, @@ -1336,12 +1378,15 @@ class QueueProcessor: "action_name": next_job.action_name, "task_id": next_job.task_id, "job_id": next_job.job_id, + "notebook_id": next_job.notebook_id, "free": True, "need_more": 0, }, } self.message_processor.send_message(message) - # next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name) + # next_job_log = format_job_log( + # next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name + # ) # logger.debug(f"[QueueProcessor] Notified next job {next_job_log} can start") # 立即触发下一轮状态检查 @@ -1510,6 +1555,7 @@ class WebSocketClient(BaseCommunicationClient): "job_id": item.job_id, "task_id": item.task_id, "device_id": item.device_id, + "notebook_id": item.notebook_id, "action_name": item.action_name, "status": status, "feedback_data": feedback_data,