support notebook id

This commit is contained in:
Xuwznln
2026-05-20 18:14:13 +08:00
committed by Andy6M
parent 1abbcccb53
commit e5462f748e
3 changed files with 53 additions and 5 deletions

View File

@@ -59,6 +59,7 @@ class JobAddReq(BaseModel):
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")
node_id: str = Field(examples=["node_id"], description="node uuid", default="")
notebook_id: str = Field(examples=["notebook_id"], description="notebook uuid", default="")
server_info: dict = Field(
examples=[{"send_timestamp": 1717000000.0}],
description="server info (auto-generated if empty)",

View File

@@ -320,6 +320,7 @@ def job_add(req: JobAddReq) -> JobData:
action_name=action_name,
task_id=task_id,
job_id=job_id,
notebook_id=req.notebook_id,
device_action_key=device_action_key,
)

View File

@@ -59,6 +59,7 @@ class QueueItem:
action_name: str
task_id: str
job_id: str
notebook_id: str
device_action_key: str
next_run_time: float = 0 # 下次执行时间戳
retry_count: int = 0 # 重试次数
@@ -71,6 +72,7 @@ class JobInfo:
job_id: str
task_id: str
device_id: str
notebook_id: str
action_name: str
device_action_key: str
status: JobStatus
@@ -539,7 +541,10 @@ class MessageProcessor:
self.reconnect_count += 1
backoff = WSConfig.reconnect_interval
logger.info(
f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})"
"[MessageProcessor] 即将在 %s 秒后重连 (已尝试 %s/%s)",
backoff,
self.reconnect_count,
WSConfig.max_reconnect_attempts,
)
await asyncio.sleep(backoff)
else:
@@ -703,6 +708,7 @@ class MessageProcessor:
action_name = data.get("action_name", "")
task_id = data.get("task_id", "")
job_id = data.get("job_id", "")
notebook_id = data.get("notebook_id", "")
if not all([device_id, action_name, task_id, job_id]):
logger.error("[MessageProcessor] Missing required fields in query_action_state")
@@ -718,6 +724,7 @@ class MessageProcessor:
job_id=job_id,
task_id=task_id,
device_id=device_id,
notebook_id=notebook_id,
action_name=action_name,
device_action_key=device_action_key,
status=JobStatus.QUEUE,
@@ -732,13 +739,27 @@ class MessageProcessor:
if can_start_immediately:
# 可以立即开始
await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", True, 0
device_id,
action_name,
task_id,
job_id,
"query_action_status",
True,
0,
notebook_id=notebook_id,
)
logger.trace(f"[MessageProcessor] Job {job_log} can start immediately")
else:
# 需要排队
await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", False, 10
device_id,
action_name,
task_id,
job_id,
"query_action_status",
False,
10,
notebook_id=notebook_id,
)
logger.trace(f"[MessageProcessor] Job {job_log} queued")
@@ -768,6 +789,7 @@ class MessageProcessor:
job_id=req.job_id,
task_id=req.task_id,
device_id=req.device_id,
notebook_id=req.notebook_id,
action_name=action_name,
device_action_key=device_action_key,
status=JobStatus.QUEUE,
@@ -775,11 +797,16 @@ class MessageProcessor:
always_free=True,
)
self.device_manager.add_queue_request(job_info)
existing_job = job_info
logger.info(f"[MessageProcessor] Job {job_log} always_free, auto-registered from direct job_start")
else:
logger.error(f"[MessageProcessor] Job {job_log} not registered (missing query_action_state)")
return
if existing_job and req.notebook_id and not existing_job.notebook_id:
existing_job.notebook_id = req.notebook_id
notebook_id = req.notebook_id or (existing_job.notebook_id if existing_job else "")
success = self.device_manager.start_job(req.job_id)
if not success:
logger.error(f"[MessageProcessor] Failed to start job {job_log}")
@@ -795,6 +822,7 @@ class MessageProcessor:
action_name=req.action,
task_id=req.task_id,
job_id=req.job_id,
notebook_id=notebook_id,
device_action_key=device_action_key,
)
@@ -834,6 +862,7 @@ class MessageProcessor:
"job_id": req.job_id,
"task_id": req.task_id,
"device_id": req.device_id,
"notebook_id": queue_item.notebook_id,
"action_name": req.action,
"status": "failed",
"feedback_data": {},
@@ -855,6 +884,7 @@ class MessageProcessor:
"query_action_status",
True,
0,
notebook_id=next_job.notebook_id,
)
next_job_log = format_job_log(
next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
@@ -1101,7 +1131,15 @@ class MessageProcessor:
logger.info(f"[MessageProcessor] Restart cleanup scheduled")
async def _send_action_state_response(
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int
self,
device_id: str,
action_name: str,
task_id: str,
job_id: str,
typ: str,
free: bool,
need_more: int,
notebook_id: str = "",
):
"""发送动作状态响应"""
message = {
@@ -1112,6 +1150,7 @@ class MessageProcessor:
"action_name": action_name,
"task_id": task_id,
"job_id": job_id,
"notebook_id": notebook_id,
"free": free,
"need_more": need_more + 1,
},
@@ -1194,6 +1233,7 @@ class QueueProcessor:
action_name=timeout_job.action_name,
task_id=timeout_job.task_id,
job_id=timeout_job.job_id,
notebook_id=timeout_job.notebook_id,
device_action_key=timeout_job.device_action_key,
)
# 发布超时失败状态这会触发正常的job完成流程
@@ -1252,6 +1292,7 @@ class QueueProcessor:
"action_name": job_info.action_name,
"task_id": job_info.task_id,
"job_id": job_info.job_id,
"notebook_id": job_info.notebook_id,
"free": False,
"need_more": 10 + 1,
},
@@ -1291,6 +1332,7 @@ class QueueProcessor:
"action_name": job_info.action_name,
"task_id": job_info.task_id,
"job_id": job_info.job_id,
"notebook_id": job_info.notebook_id,
"free": False,
"need_more": 10 + 1,
},
@@ -1336,12 +1378,15 @@ class QueueProcessor:
"action_name": next_job.action_name,
"task_id": next_job.task_id,
"job_id": next_job.job_id,
"notebook_id": next_job.notebook_id,
"free": True,
"need_more": 0,
},
}
self.message_processor.send_message(message)
# next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name)
# next_job_log = format_job_log(
# next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
# )
# logger.debug(f"[QueueProcessor] Notified next job {next_job_log} can start")
# 立即触发下一轮状态检查
@@ -1510,6 +1555,7 @@ class WebSocketClient(BaseCommunicationClient):
"job_id": item.job_id,
"task_id": item.task_id,
"device_id": item.device_id,
"notebook_id": item.notebook_id,
"action_name": item.action_name,
"status": status,
"feedback_data": feedback_data,