feat: RNA add per-action raw HTTP call log for Bioyond station

- Add debug_call_log module: contextvar-scoped session(), idempotent wrap_rpc_http, lazy markdown writer with apiKey redaction, source attribution via stack walk, raw_text fallback.
- Centralize Bioyond RPC creation in BioyondWorkstation via _create_bioyond_rpc / _set_hardware_interface / _debug_call_session helpers and _DEBUG_LOG_DEFAULT_DIR.
- Wrap reset, submit_experiment_1, start_experiment, get_order_list action bodies in _debug_call_session for opt-in per-action capture.
- ConnectionMonitor polling stays outside debug sessions and is not logged.
This commit is contained in:
yxz321
2026-05-08 23:22:00 +08:00
parent 5b9f77e81f
commit cae828ce74
3 changed files with 746 additions and 214 deletions

View File

@@ -0,0 +1,459 @@
"""Per-action raw call/response log for Bioyond stations.
When a debug session is active, ``wrap_rpc_http`` replaces a ``BioyondV1RPC``
instance's ``post`` / ``get`` methods with closures that perform the HTTP
transport themselves, capture the request/response details, and append a record
to the active session before returning exactly what ``BaseRequest`` would have
returned. Outside of an active session the wrapped method delegates to the
original (unwrapped) implementation, leaving non-debug behavior intact.
The session writes a Markdown file under ``out_dir`` mirroring the format of
``temp_benyao/peptide/_logs/2026-04-30_160316_day3_samplefile_only_raw_calls.md``
minus the "Raw Payload Argument" section.
This module has no dependency on ``BioyondV1RPC`` itself; the only contract is
that the wrapped instance descends from ``BaseRequest`` (i.e. has a logger
returned by ``self.get_logger()``).
"""
from __future__ import annotations
import contextvars
import copy
import inspect
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Iterator, List, Optional
import requests
__all__ = [
"CallRecord",
"CallLogContext",
"session",
"wrap_rpc_http",
"active_session",
]
_DEFAULT_TIMEOUT_GET = 30
_DEFAULT_TIMEOUT_POST = 120
@dataclass
class CallRecord:
"""One captured HTTP call inside a debug session."""
index: int
method: str
url: str
path: str
source: str
transport: str
http_status: Optional[int]
request_body: Any
response_body: Any
error: Optional[str] = None
@dataclass
class CallLogContext:
"""State for a single ``session()`` block.
A session lazily creates its file on the first appended record. Actions
that abort before any RPC produce no file.
"""
action: str
out_dir: Path
started_at: datetime
calls: List[CallRecord] = field(default_factory=list)
file_path: Optional[Path] = None
def append(self, record: CallRecord) -> None:
record.index = len(self.calls) + 1
self.calls.append(record)
self._write_file()
# -- file I/O -------------------------------------------------------------
def _resolve_file_path(self) -> Path:
if self.file_path is not None:
return self.file_path
timestamp = self.started_at.strftime("%Y-%m-%d_%H%M%S")
slug = _slugify_action(self.action)
candidate = self.out_dir / f"{timestamp}_{slug}_raw_calls.md"
suffix = 2
while candidate.exists():
candidate = (
self.out_dir
/ f"{timestamp}_{slug}_raw_calls_{suffix:02d}.md"
)
suffix += 1
self.file_path = candidate
return self.file_path
def _write_file(self) -> None:
path = self._resolve_file_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_render_markdown(self), encoding="utf-8")
_active_session: contextvars.ContextVar[Optional[CallLogContext]] = (
contextvars.ContextVar("_active_session", default=None)
)
def active_session() -> Optional[CallLogContext]:
"""Return the currently active :class:`CallLogContext`, if any."""
return _active_session.get()
@contextmanager
def session(action: str, out_dir: Path) -> Iterator[CallLogContext]:
"""Open a per-action debug session.
On entry, sets the module-level ``_active_session`` ContextVar so any
``wrap_rpc_http``'d clients on the same thread/task record their calls.
On exit, the previous active session (if any) is restored.
"""
ctx = CallLogContext(
action=str(action),
out_dir=Path(out_dir),
started_at=datetime.now(),
)
token = _active_session.set(ctx)
try:
yield ctx
finally:
_active_session.reset(token)
def wrap_rpc_http(rpc: Any) -> None:
"""Idempotently wrap ``rpc.post`` / ``rpc.get``.
When a session is active (``_active_session.get() is not None``), the
wrapped methods perform the HTTP call themselves with ``requests`` and
record the call before returning the same value ``BaseRequest`` would have
returned. When no session is active, the wrapped methods delegate to the
original implementation, preserving stock ``BaseRequest`` behavior.
Calling this twice on the same instance is a no-op. The wrapper does not
alter ``rpc.form_post`` (no Sirna action calls it as of plan 3).
"""
if rpc is None:
return
if getattr(rpc, "_debug_call_log_wrapped", False):
return
rpc._orig_post = rpc.post
rpc._orig_get = rpc.get
def _wrapped_post(
url: str,
params: Any = None,
files: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if files is not None:
kwargs["files"] = files
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_post(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="POST",
url=str(url),
path=_url_path(url),
source=source,
transport=_pick_transport(effective_params),
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.post(
url,
data=json.dumps(effective_params) if effective_params else None,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_POST,
files=files,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {response.request.body} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
else:
record.error = f"HTTP {response.status_code}: {response.text}"
try:
rpc.get_logger().error(
f"Request ERROR: ('Request ERROR:', {response.text!r})"
)
except Exception:
pass
return_value = None
ctx.append(record)
return return_value
def _wrapped_get(
url: str,
params: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_get(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="GET",
url=str(url),
path=_url_path(url),
source=source,
transport="params",
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.get(
url,
params=effective_params,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_GET,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {effective_params} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
ctx.append(record)
return return_value
rpc.post = _wrapped_post
rpc.get = _wrapped_get
rpc._debug_call_log_wrapped = True
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_URL_PATH_RE = re.compile(r"https?://[^/]+(/.*)?$")
_SLUG_RE = re.compile(r"[^A-Za-z0-9._-]+")
def _slugify_action(action: str) -> str:
slug = _SLUG_RE.sub("_", str(action)).strip("_")
return slug or "action"
def _url_path(url: Any) -> str:
text = str(url or "")
match = _URL_PATH_RE.match(text)
if match and match.group(1):
return match.group(1)
if text.startswith("/"):
return text
return text
def _pick_transport(params: Any) -> str:
if isinstance(params, dict) and "data" in params:
return "data"
return "params"
def _detect_source(rpc: Any) -> str:
"""Walk the call stack to find the outermost frame whose ``self`` is rpc."""
try:
stack = inspect.stack()
except Exception:
return ""
candidate = ""
try:
for frame_info in stack:
frame = frame_info.frame
if frame.f_locals.get("self", None) is rpc:
candidate = frame_info.function
return candidate
finally:
del stack
def _redact(params: Any) -> Any:
"""Return a copy of ``params`` with ``apiKey`` redacted."""
try:
cloned = copy.deepcopy(params)
except Exception:
return params
_redact_in_place(cloned)
return cloned
def _redact_in_place(value: Any) -> None:
if isinstance(value, dict):
for key in list(value.keys()):
if isinstance(key, str) and key.lower() == "apikey":
value[key] = "<redacted>"
else:
_redact_in_place(value[key])
elif isinstance(value, list):
for item in value:
_redact_in_place(item)
def _decode_response_body(response: Any) -> tuple[Any, Optional[str]]:
"""Best-effort response decoding used for both record + return value."""
text = getattr(response, "text", "")
try:
return response.json(), None
except Exception as exc:
if text:
return {"raw_text": text}, str(exc)
return None, str(exc)
# ---------------------------------------------------------------------------
# Markdown rendering
# ---------------------------------------------------------------------------
def _render_markdown(ctx: CallLogContext) -> str:
title = f"# {ctx.action} Raw Call/Response Log"
parts: List[str] = [title, ""]
parts.append("## LIMS Calls")
parts.append("")
parts.append("| # | Method | Path | Source | HTTP |")
parts.append("|---|---|---|---|---|")
for record in ctx.calls:
anchor = _row_anchor(record)
http = (
f"`{record.http_status}`"
if record.http_status is not None
else "`-`"
)
parts.append(
f"| [{record.index}](#{anchor}) | `{record.method}` | "
f"`{record.path}` | `{record.source}` | {http} |"
)
parts.append("")
for record in ctx.calls:
parts.append(f"## {record.index} {record.method} {record.path}")
parts.append("")
parts.append(f"- Source: `{record.source}`")
parts.append(f"- Transport: `{record.transport}`")
if record.http_status is not None:
parts.append(f"- HTTP status: `{record.http_status}`")
else:
parts.append("- HTTP status: `-`")
if record.error:
parts.append(f"- Error: {record.error}")
parts.append("")
parts.append("### Request Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.request_body))
parts.append("```")
parts.append("")
parts.append("### Response Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.response_body))
parts.append("```")
parts.append("")
return "\n".join(parts).rstrip() + "\n"
def _row_anchor(record: CallRecord) -> str:
"""Build a GitHub-style anchor matching ``## N METHOD /path``."""
raw = f"{record.index}-{record.method}-{record.path}"
raw = raw.lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw)
return raw.strip("-")
def _to_json_block(value: Any) -> str:
try:
return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=True)
except TypeError:
return json.dumps(str(value), ensure_ascii=False, indent=2)

View File

@@ -250,6 +250,8 @@ def fetch_workflow_list(
class BioyondSirnaStation(BioyondWorkstation):
"""小核酸工作站最小运行时实现。"""
_DEBUG_LOG_DEFAULT_DIR = "temp_benyao/sirna/_logs"
def __init__(
self,
bioyond_config: Optional[Dict[str, Any]] = None,
@@ -395,24 +397,25 @@ class BioyondSirnaStation(BioyondWorkstation):
**kwargs: Any,
) -> Dict[str, Any]:
"""复位调度器、订单状态和库位,并按清理读回结果决定是否 take-out。"""
reset_order_id = self._kwarg_text(kwargs, "reset_order_id")
reset_location_id = self._kwarg_text(kwargs, "reset_location_id")
cleanup_order_code = self._kwarg_text(kwargs, "cleanup_order_code")
api_host = self._kwarg_text(kwargs, "api_host")
api_key = self._kwarg_text(kwargs, "api_key")
ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL
del timeout_seconds, assignee_user_ids
self._update_runtime_api_config(api_host=api_host, api_key=api_key)
self._require_ready_signal(ready_signal)
rpc = self._require_hardware_interface_for_reset()
result = self._run_reset_operations(
rpc,
reset_operations=reset_operations,
reset_order_id=reset_order_id,
reset_location_id=reset_location_id,
cleanup_order_code=cleanup_order_code,
)
return self._with_ready_signal(result)
with self._debug_call_session("reset"):
reset_order_id = self._kwarg_text(kwargs, "reset_order_id")
reset_location_id = self._kwarg_text(kwargs, "reset_location_id")
cleanup_order_code = self._kwarg_text(kwargs, "cleanup_order_code")
api_host = self._kwarg_text(kwargs, "api_host")
api_key = self._kwarg_text(kwargs, "api_key")
ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL
del timeout_seconds, assignee_user_ids
self._update_runtime_api_config(api_host=api_host, api_key=api_key)
self._require_ready_signal(ready_signal)
rpc = self._require_hardware_interface_for_reset()
result = self._run_reset_operations(
rpc,
reset_operations=reset_operations,
reset_order_id=reset_order_id,
reset_location_id=reset_location_id,
cleanup_order_code=cleanup_order_code,
)
return self._with_ready_signal(result)
@action(
always_free=True,
@@ -491,113 +494,114 @@ class BioyondSirnaStation(BioyondWorkstation):
- confirmation_message (str): 确认消息
- registration_result (Dict): 物料注册结果
"""
if isinstance(required_params, dict):
sample_throughput = required_params.get("sample_throughput")
else:
sample_throughput = required_params
sample_throughput = int(sample_throughput)
with self._debug_call_session("submit_experiment_1"):
if isinstance(required_params, dict):
sample_throughput = required_params.get("sample_throughput")
else:
sample_throughput = required_params
sample_throughput = int(sample_throughput)
if optional_params is None:
optional_params = {}
order_name = optional_params.get("order_name", "")
parameter_overrides = optional_params.get("parameter_overrides", "")
auto_register_materials = optional_params.get("auto_register_materials", True)
target_device = (
self._kwarg_text(kwargs, "unilabos_device_id")
or self._kwarg_text(kwargs, "device_id")
or "bioyond_sirna_station"
)
if optional_params is None:
optional_params = {}
order_name = optional_params.get("order_name", "")
parameter_overrides = optional_params.get("parameter_overrides", "")
auto_register_materials = optional_params.get("auto_register_materials", True)
target_device = (
self._kwarg_text(kwargs, "unilabos_device_id")
or self._kwarg_text(kwargs, "device_id")
or "bioyond_sirna_station"
)
del timeout_seconds, assignee_user_ids, kwargs
rpc = self._require_hardware_interface("create_order")
del timeout_seconds, assignee_user_ids, kwargs
rpc = self._require_hardware_interface("create_order")
# 自动解析实验1工作流无需用户指定workflow_name
workflow = self._resolve_experiment_1_workflow(rpc)
# 自动解析实验1工作流无需用户指定workflow_name
workflow = self._resolve_experiment_1_workflow(rpc)
step_data = rpc.workflow_step_query(workflow["sub_workflow_id"])
step_data = rpc.workflow_step_query(workflow["sub_workflow_id"])
# Parse parameter_overrides from text format
parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides)
# Parse parameter_overrides from text format
parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides)
param_values, parameter_template = self._build_param_values_from_step_data(
step_data,
parameter_overrides=parsed_overrides,
include_all_task_displayable=True,
)
if not param_values:
raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues")
param_values, parameter_template = self._build_param_values_from_step_data(
step_data,
parameter_overrides=parsed_overrides,
include_all_task_displayable=True,
)
if not param_values:
raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues")
resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name)
order_payload = [
{
"orderCode": resolved_order_code,
"orderName": resolved_order_name,
"borderNumber": int(sample_throughput),
"workFlowId": workflow["sub_workflow_id"],
"paramValues": param_values,
"extendProperties": "",
resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name)
order_payload = [
{
"orderCode": resolved_order_code,
"orderName": resolved_order_name,
"borderNumber": int(sample_throughput),
"workFlowId": workflow["sub_workflow_id"],
"paramValues": param_values,
"extendProperties": "",
}
]
logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})")
raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False))
parsed_result = self._parse_lims_result(raw_result)
material_records = self._extract_create_order_materials(parsed_result)
suggested_locations = self._extract_suggested_locations(material_records)
order_ids = self._extract_created_order_ids(parsed_result)
self._last_submitted_order_ids = list(order_ids)
self._last_submitted_order_code = resolved_order_code
start_experiment_info = {
"order_ids": order_ids,
"order_code": resolved_order_code,
"order_name": resolved_order_name,
"workflow": workflow,
}
]
confirmation_data = self._format_create_order_confirmation(
order_code=resolved_order_code,
order_name=resolved_order_name,
workflow=workflow,
order_ids=order_ids,
material_records=material_records,
suggested_locations=suggested_locations,
)
logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})")
raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False))
parsed_result = self._parse_lims_result(raw_result)
material_records = self._extract_create_order_materials(parsed_result)
suggested_locations = self._extract_suggested_locations(material_records)
order_ids = self._extract_created_order_ids(parsed_result)
self._last_submitted_order_ids = list(order_ids)
self._last_submitted_order_code = resolved_order_code
start_experiment_info = {
"order_ids": order_ids,
"order_code": resolved_order_code,
"order_name": resolved_order_name,
"workflow": workflow,
}
confirmation_data = self._format_create_order_confirmation(
order_code=resolved_order_code,
order_name=resolved_order_name,
workflow=workflow,
order_ids=order_ids,
material_records=material_records,
suggested_locations=suggested_locations,
)
registration_result = None
if auto_register_materials and material_records:
try:
registration_result = self._register_materials_to_tree(material_records)
logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树")
except Exception as e:
logger.error(f"物料注册失败: {e}")
registration_result = {"error": str(e)}
registration_result = None
if auto_register_materials and material_records:
try:
registration_result = self._register_materials_to_tree(material_records)
logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树")
except Exception as e:
logger.error(f"物料注册失败: {e}")
registration_result = {"error": str(e)}
result = {
"success": self._create_result_success(parsed_result, order_ids, material_records),
"order_code": resolved_order_code,
"order_name": resolved_order_name,
"order_id": order_ids[0] if order_ids else "",
"order_ids": order_ids,
"target_device": target_device,
"workflow": workflow,
"sample_throughput": int(sample_throughput),
"payload": order_payload,
"parameter_template": parameter_template,
"create_order_result": parsed_result,
"materials": material_records,
"materials_by_type": confirmation_data.get("materials_by_type", {}),
"manual_load_tables": self._build_manual_load_tables(
confirmation_data.get("materials_by_type", {})
),
"manual_load_probe": self._build_manual_load_probe(
confirmation_data.get("materials_by_type", {})
),
"suggested_locations": suggested_locations,
"start_experiment": start_experiment_info,
"confirmation_message": confirmation_data.get("confirmation_message", ""),
"registration_result": registration_result,
}
result.update(result["manual_load_probe"])
return result
result = {
"success": self._create_result_success(parsed_result, order_ids, material_records),
"order_code": resolved_order_code,
"order_name": resolved_order_name,
"order_id": order_ids[0] if order_ids else "",
"order_ids": order_ids,
"target_device": target_device,
"workflow": workflow,
"sample_throughput": int(sample_throughput),
"payload": order_payload,
"parameter_template": parameter_template,
"create_order_result": parsed_result,
"materials": material_records,
"materials_by_type": confirmation_data.get("materials_by_type", {}),
"manual_load_tables": self._build_manual_load_tables(
confirmation_data.get("materials_by_type", {})
),
"manual_load_probe": self._build_manual_load_probe(
confirmation_data.get("materials_by_type", {})
),
"suggested_locations": suggested_locations,
"start_experiment": start_experiment_info,
"confirmation_message": confirmation_data.get("confirmation_message", ""),
"registration_result": registration_result,
}
result.update(result["manual_load_probe"])
return result
def _parse_parameter_overrides_text(self, text: str) -> Dict[str, Any]:
"""Parse parameter overrides from text format.
@@ -704,51 +708,52 @@ class BioyondSirnaStation(BioyondWorkstation):
timeout_seconds: 超时时间(秒,框架参数)。
assignee_user_ids: 分配用户 ID 列表(框架参数)。
"""
resource = kwargs.get("resource")
coin_cell_code = kwargs.get("coin_cell_code")
mount_resource = kwargs.get("mount_resource")
order_ids = kwargs.get("order_ids")
submit_experiment_result = kwargs.get("submit_experiment_result")
api_host = self._kwarg_text(kwargs, "api_host")
api_key = self._kwarg_text(kwargs, "api_key")
ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL
del timeout_seconds, assignee_user_ids
self._update_runtime_api_config(api_host=api_host, api_key=api_key)
self._require_ready_signal(ready_signal)
with self._debug_call_session("start_experiment"):
resource = kwargs.get("resource")
coin_cell_code = kwargs.get("coin_cell_code")
mount_resource = kwargs.get("mount_resource")
order_ids = kwargs.get("order_ids")
submit_experiment_result = kwargs.get("submit_experiment_result")
api_host = self._kwarg_text(kwargs, "api_host")
api_key = self._kwarg_text(kwargs, "api_key")
ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL
del timeout_seconds, assignee_user_ids
self._update_runtime_api_config(api_host=api_host, api_key=api_key)
self._require_ready_signal(ready_signal)
category_arrays = {
"materials_loaded": (
"物料",
self._as_manual_gate(materials_loaded),
[resource, coin_cell_code, mount_resource],
),
}
gates: Dict[str, Dict[str, Any]] = {}
missing_labels: List[str] = []
for gate_key, (label, ticked, arrays) in category_arrays.items():
required = any(bool(arr) for arr in arrays)
gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)}
if required and not ticked:
missing_labels.append(label)
if missing_labels:
raise RuntimeError(
f"以下分类装载尚未确认,无法启动调度: {', '.join(missing_labels)}"
category_arrays = {
"materials_loaded": (
"物料",
self._as_manual_gate(materials_loaded),
[resource, coin_cell_code, mount_resource],
),
}
gates: Dict[str, Dict[str, Any]] = {}
missing_labels: List[str] = []
for gate_key, (label, ticked, arrays) in category_arrays.items():
required = any(bool(arr) for arr in arrays)
gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)}
if required and not ticked:
missing_labels.append(label)
if missing_labels:
raise RuntimeError(
f"以下分类装载尚未确认,无法启动调度: {', '.join(missing_labels)}"
)
start_info = self._resolve_start_experiment_info(
submit_experiment_result, order_id, order_ids
)
start_info = self._resolve_start_experiment_info(
submit_experiment_result, order_id, order_ids
)
rpc = self._require_hardware_interface("scheduler_start")
logger.info("正在启动小核酸调度器")
result = rpc.scheduler_start()
return self._with_ready_signal({
"success": result == 1,
"return_info": result,
"scheduler_start_result": result,
"start_experiment": start_info,
"gates": gates,
"confirmation_message": "调度器启动成功" if result == 1 else "调度器启动失败,请检查 LIMS 状态",
})
rpc = self._require_hardware_interface("scheduler_start")
logger.info("正在启动小核酸调度器")
result = rpc.scheduler_start()
return self._with_ready_signal({
"success": result == 1,
"return_info": result,
"scheduler_start_result": result,
"start_experiment": start_info,
"gates": gates,
"confirmation_message": "调度器启动成功" if result == 1 else "调度器启动失败,请检查 LIMS 状态",
})
@action(
always_free=True,
@@ -806,63 +811,64 @@ class BioyondSirnaStation(BioyondWorkstation):
Returns:
``{"success": bool, "orders": [...], "order_id": str, "order_ids": [...], "query": {...}}``。
"""
del timeout_seconds, assignee_user_ids, kwargs
try:
normalized_max = int(max_results)
except (TypeError, ValueError):
normalized_max = 20
if normalized_max <= 0:
normalized_max = 20
rpc = self._require_hardware_interface("order_query")
query_payload = {
"timeType": "",
"beginTime": None,
"endTime": None,
"status": str(status or ""),
"filter": str(filter_text or ""),
"skipCount": 0,
"pageCount": normalized_max,
"sorting": "creationTime desc",
}
logger.info(
"正在查询 Bioyond LIMS 订单列表 filter=%r status=%r latest_only=%s",
filter_text,
status,
latest_only,
)
raw_result = rpc.order_query(json.dumps(query_payload, ensure_ascii=False))
items = self._order_items(raw_result)
with self._debug_call_session("get_order_list"):
del timeout_seconds, assignee_user_ids, kwargs
try:
normalized_max = int(max_results)
except (TypeError, ValueError):
normalized_max = 20
if normalized_max <= 0:
normalized_max = 20
rpc = self._require_hardware_interface("order_query")
query_payload = {
"timeType": "",
"beginTime": None,
"endTime": None,
"status": str(status or ""),
"filter": str(filter_text or ""),
"skipCount": 0,
"pageCount": normalized_max,
"sorting": "creationTime desc",
}
logger.info(
"正在查询 Bioyond LIMS 订单列表 filter=%r status=%r latest_only=%s",
filter_text,
status,
latest_only,
)
raw_result = rpc.order_query(json.dumps(query_payload, ensure_ascii=False))
items = self._order_items(raw_result)
orders: List[Dict[str, Any]] = []
for item in items:
order_id = str(item.get("id") or "")
if not order_id:
continue
orders.append({
"order_id": order_id,
"order_code": str(item.get("orderCode") or ""),
"order_name": str(item.get("name") or item.get("orderName") or ""),
"status": str(item.get("status") or item.get("statusName") or ""),
"created_at": str(item.get("creationTime") or item.get("createTime") or ""),
"raw": item,
})
orders: List[Dict[str, Any]] = []
for item in items:
order_id = str(item.get("id") or "")
if not order_id:
continue
orders.append({
"order_id": order_id,
"order_code": str(item.get("orderCode") or ""),
"order_name": str(item.get("name") or item.get("orderName") or ""),
"status": str(item.get("status") or item.get("statusName") or ""),
"created_at": str(item.get("creationTime") or item.get("createTime") or ""),
"raw": item,
})
order_ids = [order["order_id"] for order in orders]
if latest_only and orders:
chosen = orders[0]
order_id_value = chosen["order_id"]
elif len(order_ids) == 1:
order_id_value = order_ids[0]
else:
order_id_value = ""
order_ids = [order["order_id"] for order in orders]
if latest_only and orders:
chosen = orders[0]
order_id_value = chosen["order_id"]
elif len(order_ids) == 1:
order_id_value = order_ids[0]
else:
order_id_value = ""
return {
"success": bool(orders),
"orders": orders,
"order_id": order_id_value,
"order_ids": order_ids,
"query": query_payload,
}
return {
"success": bool(orders),
"orders": orders,
"order_id": order_id_value,
"order_ids": order_ids,
"query": query_payload,
}
def _require_hardware_interface(self, method_name: str) -> Any:
rpc = getattr(self, "hardware_interface", None)
@@ -898,7 +904,8 @@ class BioyondSirnaStation(BioyondWorkstation):
raise RuntimeError("\n".join(lines))
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
self.hardware_interface = BioyondV1RPC(self.bioyond_config)
rpc = BioyondV1RPC(self.bioyond_config)
self._set_hardware_interface(rpc)
return self.hardware_interface
def _has_required_api_config(self, config: Dict[str, Any]) -> bool:

View File

@@ -7,6 +7,7 @@ Bioyond Workstation Implementation
import time
import traceback
import threading
from contextlib import contextmanager
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
import json
@@ -14,6 +15,7 @@ from pathlib import Path
from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
from unilabos.devices.workstation.bioyond_studio import debug_call_log
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.resources.warehouse import WareHouse
from unilabos.utils.log import logger
@@ -678,6 +680,70 @@ class BioyondWorkstation(WorkstationBase):
集成Bioyond物料管理的工作站实现
"""
# 子类(如 sirna / peptide覆写以指定默认 raw-call 日志目录。
# 路径相对仓库根;为 None 时若 debug_log=True 仍会写入临时位置。
_DEBUG_LOG_DEFAULT_DIR: Optional[str] = None
def _create_bioyond_rpc(self, config: Dict[str, Any]) -> BioyondV1RPC:
"""创建 Bioyond RPC 客户端并应用调试包装。
所有创建 ``BioyondV1RPC`` 的路径饿汉初始化、Sirna 延迟初始化、
以及未来的前端重新配置路径)都应通过该 helper
以确保 debug_log 包装与命名/日志策略保持一致。
"""
rpc = BioyondV1RPC(config)
debug_call_log.wrap_rpc_http(rpc)
return rpc
def _set_hardware_interface(self, rpc: BioyondV1RPC) -> BioyondV1RPC:
"""将已构造的 RPC 客户端设置到 ``self.hardware_interface``,并应用调试包装。"""
debug_call_log.wrap_rpc_http(rpc)
self.hardware_interface = rpc
return rpc
def _debug_log_resolved_dir(self) -> Path:
"""解析 ``debug_log_dir`` 为绝对路径。"""
configured = (getattr(self, "bioyond_config", {}) or {}).get("debug_log_dir")
default_dir = getattr(self, "_DEBUG_LOG_DEFAULT_DIR", None)
candidate = configured or default_dir or "temp_benyao/_logs/bioyond_debug"
path = Path(candidate)
if not path.is_absolute():
repo_root = Path(__file__).resolve().parents[4]
path = repo_root / path
return path
def _ensure_debug_log_state(self) -> None:
"""从 ``self.bioyond_config`` 派生 ``_debug_log_enabled`` / ``_debug_log_dir``。
每次进入 ``_debug_call_session`` 时都重新解析,以兼容前端在运行时
修改 ``bioyond_config['debug_log']`` 或目录的场景;同时也容忍
子类(如 Sirna 延迟初始化)在 ``__init__`` 早期未触发本方法。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
self._debug_log_enabled = bool(cfg.get("debug_log"))
self._debug_log_dir = self._debug_log_resolved_dir()
@contextmanager
def _debug_call_session(self, action_name: str):
"""在 action 体外加一层 debug 会话上下文。
- ``debug_log`` 关闭时是空上下文,开销为 0。
- ``debug_log`` 开启时进入 :func:`debug_call_log.session`,所有
已被 ``wrap_rpc_http`` 包装过的 RPC 客户端都会捕获本次 action
产生的 HTTP 调用并写入 Markdown 文件。
子类(如 ``end_experiment``、``manual_unload`` 等)可以直接在
action 体里以 ``with self._debug_call_session("action_name"):`` 包裹。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
enabled = bool(cfg.get("debug_log"))
if not enabled:
yield None
return
out_dir = BioyondWorkstation._debug_log_resolved_dir(self)
with debug_call_log.session(action_name, out_dir) as ctx:
yield ctx
def _publish_task_status(
self,
task_id: str,
@@ -862,7 +928,7 @@ class BioyondWorkstation(WorkstationBase):
self.bioyond_config = {}
print("警告: 未提供 bioyond_config请确保在 JSON 配置文件中提供完整配置")
self.hardware_interface = BioyondV1RPC(self.bioyond_config)
self.hardware_interface = self._create_bioyond_rpc(self.bioyond_config)
def resource_tree_add(self, resources: List[ResourcePLR]) -> None:
"""添加资源到资源树并更新ROS节点