udpate reset

This commit is contained in:
hanhua@dp.tech
2026-05-18 18:26:16 +08:00
parent 72495bfc74
commit a4678b7aa8
2 changed files with 83 additions and 26 deletions

View File

@@ -11,7 +11,7 @@ import sys
from contextlib import nullcontext
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated, Any, Dict, Iterable, List, Optional, Tuple
from typing import Annotated, Any, Dict, Iterable, List, Literal, Optional, Tuple
from uuid import UUID
import requests
@@ -743,40 +743,58 @@ class BioyondPeptideStation(BioyondWorkstation):
result["resultTable"] = resultTable or {}
return result
@action(always_free=True, description="复位调度器/订单/库位")
@action(
always_free=True,
goal_default={
"reset_operations": ["scheduler_reset", "reset_order_status", "reset_location"],
},
description="复位调度器/订单/库位",
)
def reset(
self,
reset_operations: Optional[List[str]] = None,
dry_run: bool = False,
order_id: str = "",
location_id: str = "",
reset_operations: Optional[
List[Literal["scheduler_reset", "reset_order_status", "reset_location"]]
] = None,
**kwargs: Any,
) -> Dict[str, Any]:
with self._debug_call_session("reset"):
operations = self._normalize_reset_operations(reset_operations)
planned = [{"operation": op, "endpoint": self._reset_operation_endpoint(op)} for op in operations]
result: Dict[str, Any] = {"dry_run": bool(dry_run), "planned_calls": planned, "executed_calls": [], "skipped_operations": []}
if dry_run:
return result
result: Dict[str, Any] = {
"selected_operations": operations,
"executed_calls": [],
"skipped_operations": [],
}
rpc = self._require_hardware_interface()
for operation in operations:
if operation == "scheduler_reset":
code = rpc.scheduler_reset()
result["executed_calls"].append({"operation": operation, "result": {"code": code}})
elif operation == "reset_order_status":
resolved = str(kwargs.get("reset_order_id") or order_id or kwargs.get("order_id") or "").strip()
resolved = str(
kwargs.get("reset_order_id") or kwargs.get("order_id") or ""
).strip()
if not resolved:
result["skipped_operations"].append({"operation": operation, "reason": "缺少 order_id/reset_order_id"})
result["skipped_operations"].append(
{"operation": operation, "reason": "缺少 order_id/reset_order_id"}
)
continue
code = rpc.reset_order_status(resolved)
result["executed_calls"].append({"operation": operation, "order_id": resolved, "result": {"code": code}})
result["executed_calls"].append(
{"operation": operation, "order_id": resolved, "result": {"code": code}}
)
elif operation == "reset_location":
resolved = str(kwargs.get("reset_location_id") or location_id or kwargs.get("location_id") or "").strip()
resolved = str(
kwargs.get("reset_location_id") or kwargs.get("location_id") or ""
).strip()
if not resolved:
result["skipped_operations"].append({"operation": operation, "reason": "缺少 location_id/reset_location_id"})
result["skipped_operations"].append(
{"operation": operation, "reason": "缺少 location_id/reset_location_id"}
)
continue
code = rpc.reset_location(resolved)
result["executed_calls"].append({"operation": operation, "location_id": resolved, "result": {"code": code}})
result["executed_calls"].append(
{"operation": operation, "location_id": resolved, "result": {"code": code}}
)
else:
raise ValueError(f"未知 reset operation: {operation}")
return result

View File

@@ -616,13 +616,33 @@ def test_start_experiment_starts_when_table_empty() -> None:
# ---------------------------------------------------------------------------
def test_reset_dry_run_default_false_and_planned_calls() -> None:
station = _make_station()
sig = inspect.signature(station.reset)
assert sig.parameters["dry_run"].default is False
out = station.reset(reset_operations=["scheduler_reset"], dry_run=True)
assert out["dry_run"] is True
assert out["planned_calls"][0]["endpoint"] == "/api/lims/scheduler/reset"
def test_reset_signature_drops_legacy_params_and_uses_literal() -> None:
"""plan 调整:删除 dry_run/order_id/location_idreset_operations 用 Literal 注解。"""
cls = getattr(_import_module(), CLASS_NAME)
sig = inspect.signature(cls.reset)
params = sig.parameters
for legacy in ("dry_run", "order_id", "location_id"):
assert legacy not in params, f"reset 不应再有 {legacy} 入参"
assert "reset_operations" in params
assert any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()), \
"reset 必须保留 **kwargs 以兜底 reset_order_id/reset_location_id"
annotation = params["reset_operations"].annotation
rendered = annotation if isinstance(annotation, str) else repr(annotation)
for op in ("scheduler_reset", "reset_order_status", "reset_location"):
assert op in rendered, f"reset_operations 的 Literal 必须包含 {op}"
def test_reset_goal_default_contains_all_operations() -> None:
"""像 sirna 一样goal_default 默认勾选全部三个 reset 操作。"""
cls = getattr(_import_module(), CLASS_NAME)
meta = getattr(cls.reset, "_action_registry_meta", {})
goal_default = meta.get("goal_default") or {}
assert goal_default.get("reset_operations") == [
"scheduler_reset",
"reset_order_status",
"reset_location",
]
def test_reset_executes_typed_rpc_calls() -> None:
@@ -632,11 +652,30 @@ def test_reset_executes_typed_rpc_calls() -> None:
station.hardware_interface.reset_location.return_value = 1
out = station.reset(
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"],
dry_run=False,
order_id=ORDER_GUID,
location_id="loc-1",
reset_order_id=ORDER_GUID,
reset_location_id="loc-1",
)
station.hardware_interface.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_called_once_with(ORDER_GUID)
station.hardware_interface.reset_location.assert_called_once_with("loc-1")
assert out["selected_operations"] == [
"scheduler_reset",
"reset_order_status",
"reset_location",
]
assert len(out["executed_calls"]) == 3
assert out["skipped_operations"] == []
def test_reset_skips_when_ids_missing() -> None:
"""没有 order_id / location_id 时应该 skip 而不是抛错。"""
station = _make_station()
station.hardware_interface.scheduler_reset.return_value = 1
out = station.reset(
reset_operations=["scheduler_reset", "reset_order_status", "reset_location"],
)
station.hardware_interface.scheduler_reset.assert_called_once_with()
station.hardware_interface.reset_order_status.assert_not_called()
station.hardware_interface.reset_location.assert_not_called()
skipped_ops = {item["operation"] for item in out["skipped_operations"]}
assert skipped_ops == {"reset_order_status", "reset_location"}