From cae828ce7463c4a6e6caab005fe91ee432e159c2 Mon Sep 17 00:00:00 2001 From: yxz321 Date: Fri, 8 May 2026 23:22:00 +0800 Subject: [PATCH] feat: RNA add per-action raw HTTP call log for Bioyond station - Add debug_call_log module: contextvar-scoped session(), idempotent wrap_rpc_http, lazy markdown writer with apiKey redaction, source attribution via stack walk, raw_text fallback. - Centralize Bioyond RPC creation in BioyondWorkstation via _create_bioyond_rpc / _set_hardware_interface / _debug_call_session helpers and _DEBUG_LOG_DEFAULT_DIR. - Wrap reset, submit_experiment_1, start_experiment, get_order_list action bodies in _debug_call_session for opt-in per-action capture. - ConnectionMonitor polling stays outside debug sessions and is not logged. --- .../bioyond_studio/debug_call_log.py | 459 ++++++++++++++++++ .../sirna_station/sirna_station.py | 433 +++++++++-------- .../workstation/bioyond_studio/station.py | 68 ++- 3 files changed, 746 insertions(+), 214 deletions(-) create mode 100644 unilabos/devices/workstation/bioyond_studio/debug_call_log.py diff --git a/unilabos/devices/workstation/bioyond_studio/debug_call_log.py b/unilabos/devices/workstation/bioyond_studio/debug_call_log.py new file mode 100644 index 00000000..f36b43d8 --- /dev/null +++ b/unilabos/devices/workstation/bioyond_studio/debug_call_log.py @@ -0,0 +1,459 @@ +"""Per-action raw call/response log for Bioyond stations. + +When a debug session is active, ``wrap_rpc_http`` replaces a ``BioyondV1RPC`` +instance's ``post`` / ``get`` methods with closures that perform the HTTP +transport themselves, capture the request/response details, and append a record +to the active session before returning exactly what ``BaseRequest`` would have +returned. Outside of an active session the wrapped method delegates to the +original (unwrapped) implementation, leaving non-debug behavior intact. + +The session writes a Markdown file under ``out_dir`` mirroring the format of +``temp_benyao/peptide/_logs/2026-04-30_160316_day3_samplefile_only_raw_calls.md`` +minus the "Raw Payload Argument" section. + +This module has no dependency on ``BioyondV1RPC`` itself; the only contract is +that the wrapped instance descends from ``BaseRequest`` (i.e. has a logger +returned by ``self.get_logger()``). +""" + +from __future__ import annotations + +import contextvars +import copy +import inspect +import json +import re +from contextlib import contextmanager +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Iterator, List, Optional + +import requests + +__all__ = [ + "CallRecord", + "CallLogContext", + "session", + "wrap_rpc_http", + "active_session", +] + + +_DEFAULT_TIMEOUT_GET = 30 +_DEFAULT_TIMEOUT_POST = 120 + + +@dataclass +class CallRecord: + """One captured HTTP call inside a debug session.""" + + index: int + method: str + url: str + path: str + source: str + transport: str + http_status: Optional[int] + request_body: Any + response_body: Any + error: Optional[str] = None + + +@dataclass +class CallLogContext: + """State for a single ``session()`` block. + + A session lazily creates its file on the first appended record. Actions + that abort before any RPC produce no file. + """ + + action: str + out_dir: Path + started_at: datetime + calls: List[CallRecord] = field(default_factory=list) + file_path: Optional[Path] = None + + def append(self, record: CallRecord) -> None: + record.index = len(self.calls) + 1 + self.calls.append(record) + self._write_file() + + # -- file I/O ------------------------------------------------------------- + + def _resolve_file_path(self) -> Path: + if self.file_path is not None: + return self.file_path + timestamp = self.started_at.strftime("%Y-%m-%d_%H%M%S") + slug = _slugify_action(self.action) + candidate = self.out_dir / f"{timestamp}_{slug}_raw_calls.md" + suffix = 2 + while candidate.exists(): + candidate = ( + self.out_dir + / f"{timestamp}_{slug}_raw_calls_{suffix:02d}.md" + ) + suffix += 1 + self.file_path = candidate + return self.file_path + + def _write_file(self) -> None: + path = self._resolve_file_path() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(_render_markdown(self), encoding="utf-8") + + +_active_session: contextvars.ContextVar[Optional[CallLogContext]] = ( + contextvars.ContextVar("_active_session", default=None) +) + + +def active_session() -> Optional[CallLogContext]: + """Return the currently active :class:`CallLogContext`, if any.""" + return _active_session.get() + + +@contextmanager +def session(action: str, out_dir: Path) -> Iterator[CallLogContext]: + """Open a per-action debug session. + + On entry, sets the module-level ``_active_session`` ContextVar so any + ``wrap_rpc_http``'d clients on the same thread/task record their calls. + On exit, the previous active session (if any) is restored. + """ + ctx = CallLogContext( + action=str(action), + out_dir=Path(out_dir), + started_at=datetime.now(), + ) + token = _active_session.set(ctx) + try: + yield ctx + finally: + _active_session.reset(token) + + +def wrap_rpc_http(rpc: Any) -> None: + """Idempotently wrap ``rpc.post`` / ``rpc.get``. + + When a session is active (``_active_session.get() is not None``), the + wrapped methods perform the HTTP call themselves with ``requests`` and + record the call before returning the same value ``BaseRequest`` would have + returned. When no session is active, the wrapped methods delegate to the + original implementation, preserving stock ``BaseRequest`` behavior. + + Calling this twice on the same instance is a no-op. The wrapper does not + alter ``rpc.form_post`` (no Sirna action calls it as of plan 3). + """ + if rpc is None: + return + if getattr(rpc, "_debug_call_log_wrapped", False): + return + + rpc._orig_post = rpc.post + rpc._orig_get = rpc.get + + def _wrapped_post( + url: str, + params: Any = None, + files: Any = None, + headers: Optional[dict] = None, + ) -> Any: + ctx = _active_session.get() + if ctx is None: + kwargs = {} + if params is not None: + kwargs["params"] = params + if files is not None: + kwargs["files"] = files + if headers is not None: + kwargs["headers"] = headers + return rpc._orig_post(url, **kwargs) + effective_params = params if params is not None else {} + effective_headers = ( + headers + if headers is not None + else {"Content-Type": "application/json"} + ) + source = _detect_source(rpc) + request_body = _redact(effective_params) + record = CallRecord( + index=0, + method="POST", + url=str(url), + path=_url_path(url), + source=source, + transport=_pick_transport(effective_params), + http_status=None, + request_body=request_body, + response_body=None, + error=None, + ) + return_value: Any = None + try: + response = requests.post( + url, + data=json.dumps(effective_params) if effective_params else None, + headers=effective_headers, + timeout=_DEFAULT_TIMEOUT_POST, + files=files, + ) + except Exception as exc: # pragma: no cover - delegated to logger + record.error = f"transport error: {exc}" + try: + rpc.get_logger().error(f"Request ERROR: {exc}") + except Exception: + pass + ctx.append(record) + return None + + record.http_status = response.status_code + record.response_body, parse_error = _decode_response_body(response) + try: + rpc.get_logger().debug( + f"Request >>> : {response.request.body} " + f"{response.status_code} {response.text}" + ) + except Exception: + pass + + if response.status_code == 200: + if parse_error is not None: + record.error = f"json parse error: {parse_error}" + return_value = None + else: + return_value = record.response_body + else: + record.error = f"HTTP {response.status_code}: {response.text}" + try: + rpc.get_logger().error( + f"Request ERROR: ('Request ERROR:', {response.text!r})" + ) + except Exception: + pass + return_value = None + + ctx.append(record) + return return_value + + def _wrapped_get( + url: str, + params: Any = None, + headers: Optional[dict] = None, + ) -> Any: + ctx = _active_session.get() + if ctx is None: + kwargs = {} + if params is not None: + kwargs["params"] = params + if headers is not None: + kwargs["headers"] = headers + return rpc._orig_get(url, **kwargs) + effective_params = params if params is not None else {} + effective_headers = ( + headers + if headers is not None + else {"Content-Type": "application/json"} + ) + source = _detect_source(rpc) + request_body = _redact(effective_params) + record = CallRecord( + index=0, + method="GET", + url=str(url), + path=_url_path(url), + source=source, + transport="params", + http_status=None, + request_body=request_body, + response_body=None, + error=None, + ) + return_value: Any = None + try: + response = requests.get( + url, + params=effective_params, + headers=effective_headers, + timeout=_DEFAULT_TIMEOUT_GET, + ) + except Exception as exc: # pragma: no cover - delegated to logger + record.error = f"transport error: {exc}" + try: + rpc.get_logger().error(f"Request ERROR: {exc}") + except Exception: + pass + ctx.append(record) + return None + + record.http_status = response.status_code + record.response_body, parse_error = _decode_response_body(response) + try: + rpc.get_logger().debug( + f"Request >>> : {effective_params} " + f"{response.status_code} {response.text}" + ) + except Exception: + pass + + if response.status_code == 200: + if parse_error is not None: + record.error = f"json parse error: {parse_error}" + return_value = None + else: + return_value = record.response_body + + ctx.append(record) + return return_value + + rpc.post = _wrapped_post + rpc.get = _wrapped_get + rpc._debug_call_log_wrapped = True + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +_URL_PATH_RE = re.compile(r"https?://[^/]+(/.*)?$") +_SLUG_RE = re.compile(r"[^A-Za-z0-9._-]+") + + +def _slugify_action(action: str) -> str: + slug = _SLUG_RE.sub("_", str(action)).strip("_") + return slug or "action" + + +def _url_path(url: Any) -> str: + text = str(url or "") + match = _URL_PATH_RE.match(text) + if match and match.group(1): + return match.group(1) + if text.startswith("/"): + return text + return text + + +def _pick_transport(params: Any) -> str: + if isinstance(params, dict) and "data" in params: + return "data" + return "params" + + +def _detect_source(rpc: Any) -> str: + """Walk the call stack to find the outermost frame whose ``self`` is rpc.""" + try: + stack = inspect.stack() + except Exception: + return "" + candidate = "" + try: + for frame_info in stack: + frame = frame_info.frame + if frame.f_locals.get("self", None) is rpc: + candidate = frame_info.function + return candidate + finally: + del stack + + +def _redact(params: Any) -> Any: + """Return a copy of ``params`` with ``apiKey`` redacted.""" + try: + cloned = copy.deepcopy(params) + except Exception: + return params + _redact_in_place(cloned) + return cloned + + +def _redact_in_place(value: Any) -> None: + if isinstance(value, dict): + for key in list(value.keys()): + if isinstance(key, str) and key.lower() == "apikey": + value[key] = "" + else: + _redact_in_place(value[key]) + elif isinstance(value, list): + for item in value: + _redact_in_place(item) + + +def _decode_response_body(response: Any) -> tuple[Any, Optional[str]]: + """Best-effort response decoding used for both record + return value.""" + text = getattr(response, "text", "") + try: + return response.json(), None + except Exception as exc: + if text: + return {"raw_text": text}, str(exc) + return None, str(exc) + + +# --------------------------------------------------------------------------- +# Markdown rendering +# --------------------------------------------------------------------------- + + +def _render_markdown(ctx: CallLogContext) -> str: + title = f"# {ctx.action} Raw Call/Response Log" + parts: List[str] = [title, ""] + parts.append("## LIMS Calls") + parts.append("") + parts.append("| # | Method | Path | Source | HTTP |") + parts.append("|---|---|---|---|---|") + for record in ctx.calls: + anchor = _row_anchor(record) + http = ( + f"`{record.http_status}`" + if record.http_status is not None + else "`-`" + ) + parts.append( + f"| [{record.index}](#{anchor}) | `{record.method}` | " + f"`{record.path}` | `{record.source}` | {http} |" + ) + parts.append("") + + for record in ctx.calls: + parts.append(f"## {record.index} {record.method} {record.path}") + parts.append("") + parts.append(f"- Source: `{record.source}`") + parts.append(f"- Transport: `{record.transport}`") + if record.http_status is not None: + parts.append(f"- HTTP status: `{record.http_status}`") + else: + parts.append("- HTTP status: `-`") + if record.error: + parts.append(f"- Error: {record.error}") + parts.append("") + parts.append("### Request Body") + parts.append("") + parts.append("```json") + parts.append(_to_json_block(record.request_body)) + parts.append("```") + parts.append("") + parts.append("### Response Body") + parts.append("") + parts.append("```json") + parts.append(_to_json_block(record.response_body)) + parts.append("```") + parts.append("") + + return "\n".join(parts).rstrip() + "\n" + + +def _row_anchor(record: CallRecord) -> str: + """Build a GitHub-style anchor matching ``## N METHOD /path``.""" + raw = f"{record.index}-{record.method}-{record.path}" + raw = raw.lower() + raw = re.sub(r"[^a-z0-9]+", "-", raw) + return raw.strip("-") + + +def _to_json_block(value: Any) -> str: + try: + return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=True) + except TypeError: + return json.dumps(str(value), ensure_ascii=False, indent=2) diff --git a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py index b50a755c..d3c6e2f5 100644 --- a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py +++ b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py @@ -250,6 +250,8 @@ def fetch_workflow_list( class BioyondSirnaStation(BioyondWorkstation): """小核酸工作站最小运行时实现。""" + _DEBUG_LOG_DEFAULT_DIR = "temp_benyao/sirna/_logs" + def __init__( self, bioyond_config: Optional[Dict[str, Any]] = None, @@ -395,24 +397,25 @@ class BioyondSirnaStation(BioyondWorkstation): **kwargs: Any, ) -> Dict[str, Any]: """复位调度器、订单状态和库位,并按清理读回结果决定是否 take-out。""" - reset_order_id = self._kwarg_text(kwargs, "reset_order_id") - reset_location_id = self._kwarg_text(kwargs, "reset_location_id") - cleanup_order_code = self._kwarg_text(kwargs, "cleanup_order_code") - api_host = self._kwarg_text(kwargs, "api_host") - api_key = self._kwarg_text(kwargs, "api_key") - ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL - del timeout_seconds, assignee_user_ids - self._update_runtime_api_config(api_host=api_host, api_key=api_key) - self._require_ready_signal(ready_signal) - rpc = self._require_hardware_interface_for_reset() - result = self._run_reset_operations( - rpc, - reset_operations=reset_operations, - reset_order_id=reset_order_id, - reset_location_id=reset_location_id, - cleanup_order_code=cleanup_order_code, - ) - return self._with_ready_signal(result) + with self._debug_call_session("reset"): + reset_order_id = self._kwarg_text(kwargs, "reset_order_id") + reset_location_id = self._kwarg_text(kwargs, "reset_location_id") + cleanup_order_code = self._kwarg_text(kwargs, "cleanup_order_code") + api_host = self._kwarg_text(kwargs, "api_host") + api_key = self._kwarg_text(kwargs, "api_key") + ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL + del timeout_seconds, assignee_user_ids + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + self._require_ready_signal(ready_signal) + rpc = self._require_hardware_interface_for_reset() + result = self._run_reset_operations( + rpc, + reset_operations=reset_operations, + reset_order_id=reset_order_id, + reset_location_id=reset_location_id, + cleanup_order_code=cleanup_order_code, + ) + return self._with_ready_signal(result) @action( always_free=True, @@ -491,113 +494,114 @@ class BioyondSirnaStation(BioyondWorkstation): - confirmation_message (str): 确认消息 - registration_result (Dict): 物料注册结果 """ - if isinstance(required_params, dict): - sample_throughput = required_params.get("sample_throughput") - else: - sample_throughput = required_params - sample_throughput = int(sample_throughput) + with self._debug_call_session("submit_experiment_1"): + if isinstance(required_params, dict): + sample_throughput = required_params.get("sample_throughput") + else: + sample_throughput = required_params + sample_throughput = int(sample_throughput) - if optional_params is None: - optional_params = {} - order_name = optional_params.get("order_name", "") - parameter_overrides = optional_params.get("parameter_overrides", "") - auto_register_materials = optional_params.get("auto_register_materials", True) - target_device = ( - self._kwarg_text(kwargs, "unilabos_device_id") - or self._kwarg_text(kwargs, "device_id") - or "bioyond_sirna_station" - ) + if optional_params is None: + optional_params = {} + order_name = optional_params.get("order_name", "") + parameter_overrides = optional_params.get("parameter_overrides", "") + auto_register_materials = optional_params.get("auto_register_materials", True) + target_device = ( + self._kwarg_text(kwargs, "unilabos_device_id") + or self._kwarg_text(kwargs, "device_id") + or "bioyond_sirna_station" + ) - del timeout_seconds, assignee_user_ids, kwargs - rpc = self._require_hardware_interface("create_order") + del timeout_seconds, assignee_user_ids, kwargs + rpc = self._require_hardware_interface("create_order") - # 自动解析实验1工作流(无需用户指定workflow_name) - workflow = self._resolve_experiment_1_workflow(rpc) + # 自动解析实验1工作流(无需用户指定workflow_name) + workflow = self._resolve_experiment_1_workflow(rpc) - step_data = rpc.workflow_step_query(workflow["sub_workflow_id"]) + step_data = rpc.workflow_step_query(workflow["sub_workflow_id"]) - # Parse parameter_overrides from text format - parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides) + # Parse parameter_overrides from text format + parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides) - param_values, parameter_template = self._build_param_values_from_step_data( - step_data, - parameter_overrides=parsed_overrides, - include_all_task_displayable=True, - ) - if not param_values: - raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues") + param_values, parameter_template = self._build_param_values_from_step_data( + step_data, + parameter_overrides=parsed_overrides, + include_all_task_displayable=True, + ) + if not param_values: + raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues") - resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name) - order_payload = [ - { - "orderCode": resolved_order_code, - "orderName": resolved_order_name, - "borderNumber": int(sample_throughput), - "workFlowId": workflow["sub_workflow_id"], - "paramValues": param_values, - "extendProperties": "", + resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name) + order_payload = [ + { + "orderCode": resolved_order_code, + "orderName": resolved_order_name, + "borderNumber": int(sample_throughput), + "workFlowId": workflow["sub_workflow_id"], + "paramValues": param_values, + "extendProperties": "", + } + ] + + logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})") + raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False)) + parsed_result = self._parse_lims_result(raw_result) + material_records = self._extract_create_order_materials(parsed_result) + suggested_locations = self._extract_suggested_locations(material_records) + order_ids = self._extract_created_order_ids(parsed_result) + self._last_submitted_order_ids = list(order_ids) + self._last_submitted_order_code = resolved_order_code + start_experiment_info = { + "order_ids": order_ids, + "order_code": resolved_order_code, + "order_name": resolved_order_name, + "workflow": workflow, } - ] + confirmation_data = self._format_create_order_confirmation( + order_code=resolved_order_code, + order_name=resolved_order_name, + workflow=workflow, + order_ids=order_ids, + material_records=material_records, + suggested_locations=suggested_locations, + ) - logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})") - raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False)) - parsed_result = self._parse_lims_result(raw_result) - material_records = self._extract_create_order_materials(parsed_result) - suggested_locations = self._extract_suggested_locations(material_records) - order_ids = self._extract_created_order_ids(parsed_result) - self._last_submitted_order_ids = list(order_ids) - self._last_submitted_order_code = resolved_order_code - start_experiment_info = { - "order_ids": order_ids, - "order_code": resolved_order_code, - "order_name": resolved_order_name, - "workflow": workflow, - } - confirmation_data = self._format_create_order_confirmation( - order_code=resolved_order_code, - order_name=resolved_order_name, - workflow=workflow, - order_ids=order_ids, - material_records=material_records, - suggested_locations=suggested_locations, - ) + registration_result = None + if auto_register_materials and material_records: + try: + registration_result = self._register_materials_to_tree(material_records) + logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树") + except Exception as e: + logger.error(f"物料注册失败: {e}") + registration_result = {"error": str(e)} - registration_result = None - if auto_register_materials and material_records: - try: - registration_result = self._register_materials_to_tree(material_records) - logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树") - except Exception as e: - logger.error(f"物料注册失败: {e}") - registration_result = {"error": str(e)} - - result = { - "success": self._create_result_success(parsed_result, order_ids, material_records), - "order_code": resolved_order_code, - "order_name": resolved_order_name, - "order_id": order_ids[0] if order_ids else "", - "order_ids": order_ids, - "target_device": target_device, - "workflow": workflow, - "sample_throughput": int(sample_throughput), - "payload": order_payload, - "parameter_template": parameter_template, - "create_order_result": parsed_result, - "materials": material_records, - "materials_by_type": confirmation_data.get("materials_by_type", {}), - "manual_load_tables": self._build_manual_load_tables( - confirmation_data.get("materials_by_type", {}) - ), - "manual_load_probe": self._build_manual_load_probe( - confirmation_data.get("materials_by_type", {}) - ), - "suggested_locations": suggested_locations, - "start_experiment": start_experiment_info, - "confirmation_message": confirmation_data.get("confirmation_message", ""), - "registration_result": registration_result, - } - result.update(result["manual_load_probe"]) - return result + result = { + "success": self._create_result_success(parsed_result, order_ids, material_records), + "order_code": resolved_order_code, + "order_name": resolved_order_name, + "order_id": order_ids[0] if order_ids else "", + "order_ids": order_ids, + "target_device": target_device, + "workflow": workflow, + "sample_throughput": int(sample_throughput), + "payload": order_payload, + "parameter_template": parameter_template, + "create_order_result": parsed_result, + "materials": material_records, + "materials_by_type": confirmation_data.get("materials_by_type", {}), + "manual_load_tables": self._build_manual_load_tables( + confirmation_data.get("materials_by_type", {}) + ), + "manual_load_probe": self._build_manual_load_probe( + confirmation_data.get("materials_by_type", {}) + ), + "suggested_locations": suggested_locations, + "start_experiment": start_experiment_info, + "confirmation_message": confirmation_data.get("confirmation_message", ""), + "registration_result": registration_result, + } + result.update(result["manual_load_probe"]) + return result def _parse_parameter_overrides_text(self, text: str) -> Dict[str, Any]: """Parse parameter overrides from text format. @@ -704,51 +708,52 @@ class BioyondSirnaStation(BioyondWorkstation): timeout_seconds: 超时时间(秒,框架参数)。 assignee_user_ids: 分配用户 ID 列表(框架参数)。 """ - resource = kwargs.get("resource") - coin_cell_code = kwargs.get("coin_cell_code") - mount_resource = kwargs.get("mount_resource") - order_ids = kwargs.get("order_ids") - submit_experiment_result = kwargs.get("submit_experiment_result") - api_host = self._kwarg_text(kwargs, "api_host") - api_key = self._kwarg_text(kwargs, "api_key") - ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL - del timeout_seconds, assignee_user_ids - self._update_runtime_api_config(api_host=api_host, api_key=api_key) - self._require_ready_signal(ready_signal) + with self._debug_call_session("start_experiment"): + resource = kwargs.get("resource") + coin_cell_code = kwargs.get("coin_cell_code") + mount_resource = kwargs.get("mount_resource") + order_ids = kwargs.get("order_ids") + submit_experiment_result = kwargs.get("submit_experiment_result") + api_host = self._kwarg_text(kwargs, "api_host") + api_key = self._kwarg_text(kwargs, "api_key") + ready_signal = self._kwarg_text(kwargs, "ready_signal") or DEFAULT_READY_SIGNAL + del timeout_seconds, assignee_user_ids + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + self._require_ready_signal(ready_signal) - category_arrays = { - "materials_loaded": ( - "物料", - self._as_manual_gate(materials_loaded), - [resource, coin_cell_code, mount_resource], - ), - } - gates: Dict[str, Dict[str, Any]] = {} - missing_labels: List[str] = [] - for gate_key, (label, ticked, arrays) in category_arrays.items(): - required = any(bool(arr) for arr in arrays) - gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)} - if required and not ticked: - missing_labels.append(label) - if missing_labels: - raise RuntimeError( - f"以下分类装载尚未确认,无法启动调度: {', '.join(missing_labels)}" + category_arrays = { + "materials_loaded": ( + "物料", + self._as_manual_gate(materials_loaded), + [resource, coin_cell_code, mount_resource], + ), + } + gates: Dict[str, Dict[str, Any]] = {} + missing_labels: List[str] = [] + for gate_key, (label, ticked, arrays) in category_arrays.items(): + required = any(bool(arr) for arr in arrays) + gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)} + if required and not ticked: + missing_labels.append(label) + if missing_labels: + raise RuntimeError( + f"以下分类装载尚未确认,无法启动调度: {', '.join(missing_labels)}" + ) + + start_info = self._resolve_start_experiment_info( + submit_experiment_result, order_id, order_ids ) - - start_info = self._resolve_start_experiment_info( - submit_experiment_result, order_id, order_ids - ) - rpc = self._require_hardware_interface("scheduler_start") - logger.info("正在启动小核酸调度器") - result = rpc.scheduler_start() - return self._with_ready_signal({ - "success": result == 1, - "return_info": result, - "scheduler_start_result": result, - "start_experiment": start_info, - "gates": gates, - "confirmation_message": "调度器启动成功" if result == 1 else "调度器启动失败,请检查 LIMS 状态", - }) + rpc = self._require_hardware_interface("scheduler_start") + logger.info("正在启动小核酸调度器") + result = rpc.scheduler_start() + return self._with_ready_signal({ + "success": result == 1, + "return_info": result, + "scheduler_start_result": result, + "start_experiment": start_info, + "gates": gates, + "confirmation_message": "调度器启动成功" if result == 1 else "调度器启动失败,请检查 LIMS 状态", + }) @action( always_free=True, @@ -806,63 +811,64 @@ class BioyondSirnaStation(BioyondWorkstation): Returns: ``{"success": bool, "orders": [...], "order_id": str, "order_ids": [...], "query": {...}}``。 """ - del timeout_seconds, assignee_user_ids, kwargs - try: - normalized_max = int(max_results) - except (TypeError, ValueError): - normalized_max = 20 - if normalized_max <= 0: - normalized_max = 20 - rpc = self._require_hardware_interface("order_query") - query_payload = { - "timeType": "", - "beginTime": None, - "endTime": None, - "status": str(status or ""), - "filter": str(filter_text or ""), - "skipCount": 0, - "pageCount": normalized_max, - "sorting": "creationTime desc", - } - logger.info( - "正在查询 Bioyond LIMS 订单列表 filter=%r status=%r latest_only=%s", - filter_text, - status, - latest_only, - ) - raw_result = rpc.order_query(json.dumps(query_payload, ensure_ascii=False)) - items = self._order_items(raw_result) + with self._debug_call_session("get_order_list"): + del timeout_seconds, assignee_user_ids, kwargs + try: + normalized_max = int(max_results) + except (TypeError, ValueError): + normalized_max = 20 + if normalized_max <= 0: + normalized_max = 20 + rpc = self._require_hardware_interface("order_query") + query_payload = { + "timeType": "", + "beginTime": None, + "endTime": None, + "status": str(status or ""), + "filter": str(filter_text or ""), + "skipCount": 0, + "pageCount": normalized_max, + "sorting": "creationTime desc", + } + logger.info( + "正在查询 Bioyond LIMS 订单列表 filter=%r status=%r latest_only=%s", + filter_text, + status, + latest_only, + ) + raw_result = rpc.order_query(json.dumps(query_payload, ensure_ascii=False)) + items = self._order_items(raw_result) - orders: List[Dict[str, Any]] = [] - for item in items: - order_id = str(item.get("id") or "") - if not order_id: - continue - orders.append({ - "order_id": order_id, - "order_code": str(item.get("orderCode") or ""), - "order_name": str(item.get("name") or item.get("orderName") or ""), - "status": str(item.get("status") or item.get("statusName") or ""), - "created_at": str(item.get("creationTime") or item.get("createTime") or ""), - "raw": item, - }) + orders: List[Dict[str, Any]] = [] + for item in items: + order_id = str(item.get("id") or "") + if not order_id: + continue + orders.append({ + "order_id": order_id, + "order_code": str(item.get("orderCode") or ""), + "order_name": str(item.get("name") or item.get("orderName") or ""), + "status": str(item.get("status") or item.get("statusName") or ""), + "created_at": str(item.get("creationTime") or item.get("createTime") or ""), + "raw": item, + }) - order_ids = [order["order_id"] for order in orders] - if latest_only and orders: - chosen = orders[0] - order_id_value = chosen["order_id"] - elif len(order_ids) == 1: - order_id_value = order_ids[0] - else: - order_id_value = "" + order_ids = [order["order_id"] for order in orders] + if latest_only and orders: + chosen = orders[0] + order_id_value = chosen["order_id"] + elif len(order_ids) == 1: + order_id_value = order_ids[0] + else: + order_id_value = "" - return { - "success": bool(orders), - "orders": orders, - "order_id": order_id_value, - "order_ids": order_ids, - "query": query_payload, - } + return { + "success": bool(orders), + "orders": orders, + "order_id": order_id_value, + "order_ids": order_ids, + "query": query_payload, + } def _require_hardware_interface(self, method_name: str) -> Any: rpc = getattr(self, "hardware_interface", None) @@ -898,7 +904,8 @@ class BioyondSirnaStation(BioyondWorkstation): raise RuntimeError("\n".join(lines)) from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC - self.hardware_interface = BioyondV1RPC(self.bioyond_config) + rpc = BioyondV1RPC(self.bioyond_config) + self._set_hardware_interface(rpc) return self.hardware_interface def _has_required_api_config(self, config: Dict[str, Any]) -> bool: diff --git a/unilabos/devices/workstation/bioyond_studio/station.py b/unilabos/devices/workstation/bioyond_studio/station.py index 327d8195..d9e9f166 100644 --- a/unilabos/devices/workstation/bioyond_studio/station.py +++ b/unilabos/devices/workstation/bioyond_studio/station.py @@ -7,6 +7,7 @@ Bioyond Workstation Implementation import time import traceback import threading +from contextlib import contextmanager from datetime import datetime from typing import Dict, Any, List, Optional, Union import json @@ -14,6 +15,7 @@ from pathlib import Path from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC +from unilabos.devices.workstation.bioyond_studio import debug_call_log from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot from unilabos.resources.warehouse import WareHouse from unilabos.utils.log import logger @@ -678,6 +680,70 @@ class BioyondWorkstation(WorkstationBase): 集成Bioyond物料管理的工作站实现 """ + # 子类(如 sirna / peptide)覆写以指定默认 raw-call 日志目录。 + # 路径相对仓库根;为 None 时若 debug_log=True 仍会写入临时位置。 + _DEBUG_LOG_DEFAULT_DIR: Optional[str] = None + + def _create_bioyond_rpc(self, config: Dict[str, Any]) -> BioyondV1RPC: + """创建 Bioyond RPC 客户端并应用调试包装。 + + 所有创建 ``BioyondV1RPC`` 的路径(饿汉初始化、Sirna 延迟初始化、 + 以及未来的前端重新配置路径)都应通过该 helper, + 以确保 debug_log 包装与命名/日志策略保持一致。 + """ + rpc = BioyondV1RPC(config) + debug_call_log.wrap_rpc_http(rpc) + return rpc + + def _set_hardware_interface(self, rpc: BioyondV1RPC) -> BioyondV1RPC: + """将已构造的 RPC 客户端设置到 ``self.hardware_interface``,并应用调试包装。""" + debug_call_log.wrap_rpc_http(rpc) + self.hardware_interface = rpc + return rpc + + def _debug_log_resolved_dir(self) -> Path: + """解析 ``debug_log_dir`` 为绝对路径。""" + configured = (getattr(self, "bioyond_config", {}) or {}).get("debug_log_dir") + default_dir = getattr(self, "_DEBUG_LOG_DEFAULT_DIR", None) + candidate = configured or default_dir or "temp_benyao/_logs/bioyond_debug" + path = Path(candidate) + if not path.is_absolute(): + repo_root = Path(__file__).resolve().parents[4] + path = repo_root / path + return path + + def _ensure_debug_log_state(self) -> None: + """从 ``self.bioyond_config`` 派生 ``_debug_log_enabled`` / ``_debug_log_dir``。 + + 每次进入 ``_debug_call_session`` 时都重新解析,以兼容前端在运行时 + 修改 ``bioyond_config['debug_log']`` 或目录的场景;同时也容忍 + 子类(如 Sirna 延迟初始化)在 ``__init__`` 早期未触发本方法。 + """ + cfg = getattr(self, "bioyond_config", {}) or {} + self._debug_log_enabled = bool(cfg.get("debug_log")) + self._debug_log_dir = self._debug_log_resolved_dir() + + @contextmanager + def _debug_call_session(self, action_name: str): + """在 action 体外加一层 debug 会话上下文。 + + - ``debug_log`` 关闭时是空上下文,开销为 0。 + - ``debug_log`` 开启时进入 :func:`debug_call_log.session`,所有 + 已被 ``wrap_rpc_http`` 包装过的 RPC 客户端都会捕获本次 action + 产生的 HTTP 调用并写入 Markdown 文件。 + + 子类(如 ``end_experiment``、``manual_unload`` 等)可以直接在 + action 体里以 ``with self._debug_call_session("action_name"):`` 包裹。 + """ + cfg = getattr(self, "bioyond_config", {}) or {} + enabled = bool(cfg.get("debug_log")) + if not enabled: + yield None + return + out_dir = BioyondWorkstation._debug_log_resolved_dir(self) + with debug_call_log.session(action_name, out_dir) as ctx: + yield ctx + def _publish_task_status( self, task_id: str, @@ -862,7 +928,7 @@ class BioyondWorkstation(WorkstationBase): self.bioyond_config = {} print("警告: 未提供 bioyond_config,请确保在 JSON 配置文件中提供完整配置") - self.hardware_interface = BioyondV1RPC(self.bioyond_config) + self.hardware_interface = self._create_bioyond_rpc(self.bioyond_config) def resource_tree_add(self, resources: List[ResourcePLR]) -> None: """添加资源到资源树并更新ROS节点