support notebook id

This commit is contained in:
Xuwznln
2026-05-20 18:14:13 +08:00
parent be5ff9bc5c
commit 81e9068597
3 changed files with 53 additions and 5 deletions

View File

@@ -59,6 +59,7 @@ class JobAddReq(BaseModel):
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="") task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="") job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")
node_id: str = Field(examples=["node_id"], description="node uuid", default="") node_id: str = Field(examples=["node_id"], description="node uuid", default="")
notebook_id: str = Field(examples=["notebook_id"], description="notebook uuid", default="")
server_info: dict = Field( server_info: dict = Field(
examples=[{"send_timestamp": 1717000000.0}], examples=[{"send_timestamp": 1717000000.0}],
description="server info (auto-generated if empty)", description="server info (auto-generated if empty)",

View File

@@ -320,6 +320,7 @@ def job_add(req: JobAddReq) -> JobData:
action_name=action_name, action_name=action_name,
task_id=task_id, task_id=task_id,
job_id=job_id, job_id=job_id,
notebook_id=req.notebook_id,
device_action_key=device_action_key, device_action_key=device_action_key,
) )

View File

@@ -59,6 +59,7 @@ class QueueItem:
action_name: str action_name: str
task_id: str task_id: str
job_id: str job_id: str
notebook_id: str
device_action_key: str device_action_key: str
next_run_time: float = 0 # 下次执行时间戳 next_run_time: float = 0 # 下次执行时间戳
retry_count: int = 0 # 重试次数 retry_count: int = 0 # 重试次数
@@ -71,6 +72,7 @@ class JobInfo:
job_id: str job_id: str
task_id: str task_id: str
device_id: str device_id: str
notebook_id: str
action_name: str action_name: str
device_action_key: str device_action_key: str
status: JobStatus status: JobStatus
@@ -539,7 +541,10 @@ class MessageProcessor:
self.reconnect_count += 1 self.reconnect_count += 1
backoff = WSConfig.reconnect_interval backoff = WSConfig.reconnect_interval
logger.info( logger.info(
f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" "[MessageProcessor] 即将在 %s 秒后重连 (已尝试 %s/%s)",
backoff,
self.reconnect_count,
WSConfig.max_reconnect_attempts,
) )
await asyncio.sleep(backoff) await asyncio.sleep(backoff)
else: else:
@@ -703,6 +708,7 @@ class MessageProcessor:
action_name = data.get("action_name", "") action_name = data.get("action_name", "")
task_id = data.get("task_id", "") task_id = data.get("task_id", "")
job_id = data.get("job_id", "") job_id = data.get("job_id", "")
notebook_id = data.get("notebook_id", "")
if not all([device_id, action_name, task_id, job_id]): if not all([device_id, action_name, task_id, job_id]):
logger.error("[MessageProcessor] Missing required fields in query_action_state") logger.error("[MessageProcessor] Missing required fields in query_action_state")
@@ -718,6 +724,7 @@ class MessageProcessor:
job_id=job_id, job_id=job_id,
task_id=task_id, task_id=task_id,
device_id=device_id, device_id=device_id,
notebook_id=notebook_id,
action_name=action_name, action_name=action_name,
device_action_key=device_action_key, device_action_key=device_action_key,
status=JobStatus.QUEUE, status=JobStatus.QUEUE,
@@ -732,13 +739,27 @@ class MessageProcessor:
if can_start_immediately: if can_start_immediately:
# 可以立即开始 # 可以立即开始
await self._send_action_state_response( await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", True, 0 device_id,
action_name,
task_id,
job_id,
"query_action_status",
True,
0,
notebook_id=notebook_id,
) )
logger.trace(f"[MessageProcessor] Job {job_log} can start immediately") logger.trace(f"[MessageProcessor] Job {job_log} can start immediately")
else: else:
# 需要排队 # 需要排队
await self._send_action_state_response( await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", False, 10 device_id,
action_name,
task_id,
job_id,
"query_action_status",
False,
10,
notebook_id=notebook_id,
) )
logger.trace(f"[MessageProcessor] Job {job_log} queued") logger.trace(f"[MessageProcessor] Job {job_log} queued")
@@ -768,6 +789,7 @@ class MessageProcessor:
job_id=req.job_id, job_id=req.job_id,
task_id=req.task_id, task_id=req.task_id,
device_id=req.device_id, device_id=req.device_id,
notebook_id=req.notebook_id,
action_name=action_name, action_name=action_name,
device_action_key=device_action_key, device_action_key=device_action_key,
status=JobStatus.QUEUE, status=JobStatus.QUEUE,
@@ -775,11 +797,16 @@ class MessageProcessor:
always_free=True, always_free=True,
) )
self.device_manager.add_queue_request(job_info) self.device_manager.add_queue_request(job_info)
existing_job = job_info
logger.info(f"[MessageProcessor] Job {job_log} always_free, auto-registered from direct job_start") logger.info(f"[MessageProcessor] Job {job_log} always_free, auto-registered from direct job_start")
else: else:
logger.error(f"[MessageProcessor] Job {job_log} not registered (missing query_action_state)") logger.error(f"[MessageProcessor] Job {job_log} not registered (missing query_action_state)")
return return
if existing_job and req.notebook_id and not existing_job.notebook_id:
existing_job.notebook_id = req.notebook_id
notebook_id = req.notebook_id or (existing_job.notebook_id if existing_job else "")
success = self.device_manager.start_job(req.job_id) success = self.device_manager.start_job(req.job_id)
if not success: if not success:
logger.error(f"[MessageProcessor] Failed to start job {job_log}") logger.error(f"[MessageProcessor] Failed to start job {job_log}")
@@ -795,6 +822,7 @@ class MessageProcessor:
action_name=req.action, action_name=req.action,
task_id=req.task_id, task_id=req.task_id,
job_id=req.job_id, job_id=req.job_id,
notebook_id=notebook_id,
device_action_key=device_action_key, device_action_key=device_action_key,
) )
@@ -834,6 +862,7 @@ class MessageProcessor:
"job_id": req.job_id, "job_id": req.job_id,
"task_id": req.task_id, "task_id": req.task_id,
"device_id": req.device_id, "device_id": req.device_id,
"notebook_id": queue_item.notebook_id,
"action_name": req.action, "action_name": req.action,
"status": "failed", "status": "failed",
"feedback_data": {}, "feedback_data": {},
@@ -855,6 +884,7 @@ class MessageProcessor:
"query_action_status", "query_action_status",
True, True,
0, 0,
notebook_id=next_job.notebook_id,
) )
next_job_log = format_job_log( next_job_log = format_job_log(
next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
@@ -1101,7 +1131,15 @@ class MessageProcessor:
logger.info(f"[MessageProcessor] Restart cleanup scheduled") logger.info(f"[MessageProcessor] Restart cleanup scheduled")
async def _send_action_state_response( async def _send_action_state_response(
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int self,
device_id: str,
action_name: str,
task_id: str,
job_id: str,
typ: str,
free: bool,
need_more: int,
notebook_id: str = "",
): ):
"""发送动作状态响应""" """发送动作状态响应"""
message = { message = {
@@ -1112,6 +1150,7 @@ class MessageProcessor:
"action_name": action_name, "action_name": action_name,
"task_id": task_id, "task_id": task_id,
"job_id": job_id, "job_id": job_id,
"notebook_id": notebook_id,
"free": free, "free": free,
"need_more": need_more + 1, "need_more": need_more + 1,
}, },
@@ -1194,6 +1233,7 @@ class QueueProcessor:
action_name=timeout_job.action_name, action_name=timeout_job.action_name,
task_id=timeout_job.task_id, task_id=timeout_job.task_id,
job_id=timeout_job.job_id, job_id=timeout_job.job_id,
notebook_id=timeout_job.notebook_id,
device_action_key=timeout_job.device_action_key, device_action_key=timeout_job.device_action_key,
) )
# 发布超时失败状态这会触发正常的job完成流程 # 发布超时失败状态这会触发正常的job完成流程
@@ -1252,6 +1292,7 @@ class QueueProcessor:
"action_name": job_info.action_name, "action_name": job_info.action_name,
"task_id": job_info.task_id, "task_id": job_info.task_id,
"job_id": job_info.job_id, "job_id": job_info.job_id,
"notebook_id": job_info.notebook_id,
"free": False, "free": False,
"need_more": 10 + 1, "need_more": 10 + 1,
}, },
@@ -1291,6 +1332,7 @@ class QueueProcessor:
"action_name": job_info.action_name, "action_name": job_info.action_name,
"task_id": job_info.task_id, "task_id": job_info.task_id,
"job_id": job_info.job_id, "job_id": job_info.job_id,
"notebook_id": job_info.notebook_id,
"free": False, "free": False,
"need_more": 10 + 1, "need_more": 10 + 1,
}, },
@@ -1336,12 +1378,15 @@ class QueueProcessor:
"action_name": next_job.action_name, "action_name": next_job.action_name,
"task_id": next_job.task_id, "task_id": next_job.task_id,
"job_id": next_job.job_id, "job_id": next_job.job_id,
"notebook_id": next_job.notebook_id,
"free": True, "free": True,
"need_more": 0, "need_more": 0,
}, },
} }
self.message_processor.send_message(message) self.message_processor.send_message(message)
# next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name) # next_job_log = format_job_log(
# next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
# )
# logger.debug(f"[QueueProcessor] Notified next job {next_job_log} can start") # logger.debug(f"[QueueProcessor] Notified next job {next_job_log} can start")
# 立即触发下一轮状态检查 # 立即触发下一轮状态检查
@@ -1510,6 +1555,7 @@ class WebSocketClient(BaseCommunicationClient):
"job_id": item.job_id, "job_id": item.job_id,
"task_id": item.task_id, "task_id": item.task_id,
"device_id": item.device_id, "device_id": item.device_id,
"notebook_id": item.notebook_id,
"action_name": item.action_name, "action_name": item.action_name,
"status": status, "status": status,
"feedback_data": feedback_data, "feedback_data": feedback_data,