feat: PEP add Bioyond peptide station runtime

- Add the Bioyond peptide station package with the station-facing Day2 submission flow inlined into BioyondPeptideStation.
- Add LIMS sample Excel upload, Day2/Day3 order creation helpers, scheduler/reset controls, and manual-confirm start/reset actions.
- Register peptide material PLR resource classes and default peptide material type mappings for runtime resource synchronization.
- Add the Bioyond peptide deck definition and warehouse axis/key-axis metadata needed for peptide layout conversion.
- Update shared Bioyond warehouse/resource conversion helpers so peptide deck coordinates round-trip correctly.
- Include shared Bioyond raw-call debug logging support used by station actions, with a generic local debug output default.
- Register the peptide deck in PLR additional resources for deserialization/import visibility.
- Exclude private temp_benyao docs, HAR/API inputs, live diagnostics, and siRNA-only station/material files from this handoff commit.
This commit is contained in:
yxz321
2026-05-13 19:43:57 +08:00
parent 927c7e95f5
commit 26155b8343
10 changed files with 4453 additions and 17 deletions

View File

@@ -0,0 +1,459 @@
"""Per-action raw call/response log for Bioyond stations.
When a debug session is active, ``wrap_rpc_http`` replaces a ``BioyondV1RPC``
instance's ``post`` / ``get`` methods with closures that perform the HTTP
transport themselves, capture the request/response details, and append a record
to the active session before returning exactly what ``BaseRequest`` would have
returned. Outside of an active session the wrapped method delegates to the
original (unwrapped) implementation, leaving non-debug behavior intact.
The session writes a Markdown file under ``out_dir`` mirroring the format of
``bioyond_debug_records/2026-04-30_160316_day3_samplefile_only_raw_calls.md``
minus the "Raw Payload Argument" section.
This module has no dependency on ``BioyondV1RPC`` itself; the only contract is
that the wrapped instance descends from ``BaseRequest`` (i.e. has a logger
returned by ``self.get_logger()``).
"""
from __future__ import annotations
import contextvars
import copy
import inspect
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Iterator, List, Optional
import requests
__all__ = [
"CallRecord",
"CallLogContext",
"session",
"wrap_rpc_http",
"active_session",
]
_DEFAULT_TIMEOUT_GET = 30
_DEFAULT_TIMEOUT_POST = 120
@dataclass
class CallRecord:
"""One captured HTTP call inside a debug session."""
index: int
method: str
url: str
path: str
source: str
transport: str
http_status: Optional[int]
request_body: Any
response_body: Any
error: Optional[str] = None
@dataclass
class CallLogContext:
"""State for a single ``session()`` block.
A session lazily creates its file on the first appended record. Actions
that abort before any RPC produce no file.
"""
action: str
out_dir: Path
started_at: datetime
calls: List[CallRecord] = field(default_factory=list)
file_path: Optional[Path] = None
def append(self, record: CallRecord) -> None:
record.index = len(self.calls) + 1
self.calls.append(record)
self._write_file()
# -- file I/O -------------------------------------------------------------
def _resolve_file_path(self) -> Path:
if self.file_path is not None:
return self.file_path
timestamp = self.started_at.strftime("%Y-%m-%d_%H%M%S")
slug = _slugify_action(self.action)
candidate = self.out_dir / f"{timestamp}_{slug}_raw_calls.md"
suffix = 2
while candidate.exists():
candidate = (
self.out_dir
/ f"{timestamp}_{slug}_raw_calls_{suffix:02d}.md"
)
suffix += 1
self.file_path = candidate
return self.file_path
def _write_file(self) -> None:
path = self._resolve_file_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_render_markdown(self), encoding="utf-8")
_active_session: contextvars.ContextVar[Optional[CallLogContext]] = (
contextvars.ContextVar("_active_session", default=None)
)
def active_session() -> Optional[CallLogContext]:
"""Return the currently active :class:`CallLogContext`, if any."""
return _active_session.get()
@contextmanager
def session(action: str, out_dir: Path) -> Iterator[CallLogContext]:
"""Open a per-action debug session.
On entry, sets the module-level ``_active_session`` ContextVar so any
``wrap_rpc_http``'d clients on the same thread/task record their calls.
On exit, the previous active session (if any) is restored.
"""
ctx = CallLogContext(
action=str(action),
out_dir=Path(out_dir),
started_at=datetime.now(),
)
token = _active_session.set(ctx)
try:
yield ctx
finally:
_active_session.reset(token)
def wrap_rpc_http(rpc: Any) -> None:
"""Idempotently wrap ``rpc.post`` / ``rpc.get``.
When a session is active (``_active_session.get() is not None``), the
wrapped methods perform the HTTP call themselves with ``requests`` and
record the call before returning the same value ``BaseRequest`` would have
returned. When no session is active, the wrapped methods delegate to the
original implementation, preserving stock ``BaseRequest`` behavior.
Calling this twice on the same instance is a no-op. The wrapper does not
alter ``rpc.form_post`` (no Sirna action calls it as of plan 3).
"""
if rpc is None:
return
if getattr(rpc, "_debug_call_log_wrapped", False):
return
rpc._orig_post = rpc.post
rpc._orig_get = rpc.get
def _wrapped_post(
url: str,
params: Any = None,
files: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if files is not None:
kwargs["files"] = files
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_post(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="POST",
url=str(url),
path=_url_path(url),
source=source,
transport=_pick_transport(effective_params),
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.post(
url,
data=json.dumps(effective_params) if effective_params else None,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_POST,
files=files,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {response.request.body} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
else:
record.error = f"HTTP {response.status_code}: {response.text}"
try:
rpc.get_logger().error(
f"Request ERROR: ('Request ERROR:', {response.text!r})"
)
except Exception:
pass
return_value = None
ctx.append(record)
return return_value
def _wrapped_get(
url: str,
params: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_get(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="GET",
url=str(url),
path=_url_path(url),
source=source,
transport="params",
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.get(
url,
params=effective_params,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_GET,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {effective_params} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
ctx.append(record)
return return_value
rpc.post = _wrapped_post
rpc.get = _wrapped_get
rpc._debug_call_log_wrapped = True
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_URL_PATH_RE = re.compile(r"https?://[^/]+(/.*)?$")
_SLUG_RE = re.compile(r"[^A-Za-z0-9._-]+")
def _slugify_action(action: str) -> str:
slug = _SLUG_RE.sub("_", str(action)).strip("_")
return slug or "action"
def _url_path(url: Any) -> str:
text = str(url or "")
match = _URL_PATH_RE.match(text)
if match and match.group(1):
return match.group(1)
if text.startswith("/"):
return text
return text
def _pick_transport(params: Any) -> str:
if isinstance(params, dict) and "data" in params:
return "data"
return "params"
def _detect_source(rpc: Any) -> str:
"""Walk the call stack to find the outermost frame whose ``self`` is rpc."""
try:
stack = inspect.stack()
except Exception:
return ""
candidate = ""
try:
for frame_info in stack:
frame = frame_info.frame
if frame.f_locals.get("self", None) is rpc:
candidate = frame_info.function
return candidate
finally:
del stack
def _redact(params: Any) -> Any:
"""Return a copy of ``params`` with ``apiKey`` redacted."""
try:
cloned = copy.deepcopy(params)
except Exception:
return params
_redact_in_place(cloned)
return cloned
def _redact_in_place(value: Any) -> None:
if isinstance(value, dict):
for key in list(value.keys()):
if isinstance(key, str) and key.lower() == "apikey":
value[key] = "<redacted>"
else:
_redact_in_place(value[key])
elif isinstance(value, list):
for item in value:
_redact_in_place(item)
def _decode_response_body(response: Any) -> tuple[Any, Optional[str]]:
"""Best-effort response decoding used for both record + return value."""
text = getattr(response, "text", "")
try:
return response.json(), None
except Exception as exc:
if text:
return {"raw_text": text}, str(exc)
return None, str(exc)
# ---------------------------------------------------------------------------
# Markdown rendering
# ---------------------------------------------------------------------------
def _render_markdown(ctx: CallLogContext) -> str:
title = f"# {ctx.action} Raw Call/Response Log"
parts: List[str] = [title, ""]
parts.append("## LIMS Calls")
parts.append("")
parts.append("| # | Method | Path | Source | HTTP |")
parts.append("|---|---|---|---|---|")
for record in ctx.calls:
anchor = _row_anchor(record)
http = (
f"`{record.http_status}`"
if record.http_status is not None
else "`-`"
)
parts.append(
f"| [{record.index}](#{anchor}) | `{record.method}` | "
f"`{record.path}` | `{record.source}` | {http} |"
)
parts.append("")
for record in ctx.calls:
parts.append(f"## {record.index} {record.method} {record.path}")
parts.append("")
parts.append(f"- Source: `{record.source}`")
parts.append(f"- Transport: `{record.transport}`")
if record.http_status is not None:
parts.append(f"- HTTP status: `{record.http_status}`")
else:
parts.append("- HTTP status: `-`")
if record.error:
parts.append(f"- Error: {record.error}")
parts.append("")
parts.append("### Request Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.request_body))
parts.append("```")
parts.append("")
parts.append("### Response Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.response_body))
parts.append("```")
parts.append("")
return "\n".join(parts).rstrip() + "\n"
def _row_anchor(record: CallRecord) -> str:
"""Build a GitHub-style anchor matching ``## N METHOD /path``."""
raw = f"{record.index}-{record.method}-{record.path}"
raw = raw.lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw)
return raw.strip("-")
def _to_json_block(value: Any) -> str:
try:
return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=True)
except TypeError:
return json.dumps(str(value), ensure_ascii=False, indent=2)

View File

@@ -0,0 +1,3 @@
from .peptide_station import BioyondPeptideStation, fetch_workflow_list, load_peptide_config
__all__ = ["BioyondPeptideStation", "fetch_workflow_list", "load_peptide_config"]

File diff suppressed because it is too large Load Diff

View File

@@ -7,6 +7,7 @@ Bioyond Workstation Implementation
import time
import traceback
import threading
from contextlib import contextmanager
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
import json
@@ -14,6 +15,7 @@ from pathlib import Path
from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
from unilabos.devices.workstation.bioyond_studio import debug_call_log
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.resources.warehouse import WareHouse
from unilabos.utils.log import logger
@@ -174,6 +176,8 @@ class BioyondResourceSynchronizer(ResourceSynchronizer):
logger.warning("从Bioyond获取的物料数据为空")
return False
self._update_material_cache_from_stock(all_bioyond_data)
# 转换为UniLab格式
unilab_resources = resource_bioyond_to_plr(
all_bioyond_data,
@@ -187,6 +191,29 @@ class BioyondResourceSynchronizer(ResourceSynchronizer):
logger.error(f"从Bioyond同步物料数据失败: {e}")
return False
def _update_material_cache_from_stock(self, materials: List[Dict[str, Any]]) -> None:
"""用本次库存查询结果同步 RPC 的 name -> material id 缓存。"""
material_cache = getattr(self.bioyond_api_client, "material_cache", None)
if not isinstance(material_cache, dict):
return
before_count = len(material_cache)
for material in materials:
material_name = material.get("name")
material_id = material.get("id")
if material_name and material_id:
material_cache[material_name] = material_id
for detail_material in material.get("detail", []) or []:
detail_name = detail_material.get("name")
detail_id = detail_material.get("detailMaterialId") or detail_material.get("id")
if detail_name and detail_id:
material_cache[detail_name] = detail_id
logger.debug(
f"已用Bioyond库存同步物料缓存: {before_count} -> {len(material_cache)}"
)
def sync_to_external(self, resource: Any) -> bool:
"""将本地物料数据变更同步到Bioyond系统"""
try:
@@ -678,6 +705,70 @@ class BioyondWorkstation(WorkstationBase):
集成Bioyond物料管理的工作站实现
"""
# 子类(如 sirna / peptide覆写以指定默认 raw-call 日志目录。
# 路径相对仓库根;为 None 时若 debug_log=True 仍会写入临时位置。
_DEBUG_LOG_DEFAULT_DIR: Optional[str] = None
def _create_bioyond_rpc(self, config: Dict[str, Any]) -> BioyondV1RPC:
"""创建 Bioyond RPC 客户端并应用调试包装。
所有创建 ``BioyondV1RPC`` 的路径饿汉初始化、Sirna 延迟初始化、
以及未来的前端重新配置路径)都应通过该 helper
以确保 debug_log 包装与命名/日志策略保持一致。
"""
rpc = BioyondV1RPC(config)
debug_call_log.wrap_rpc_http(rpc)
return rpc
def _set_hardware_interface(self, rpc: BioyondV1RPC) -> BioyondV1RPC:
"""将已构造的 RPC 客户端设置到 ``self.hardware_interface``,并应用调试包装。"""
debug_call_log.wrap_rpc_http(rpc)
self.hardware_interface = rpc
return rpc
def _debug_log_resolved_dir(self) -> Path:
"""解析 ``debug_log_dir`` 为绝对路径。"""
configured = (getattr(self, "bioyond_config", {}) or {}).get("debug_log_dir")
default_dir = getattr(self, "_DEBUG_LOG_DEFAULT_DIR", None)
candidate = configured or default_dir or "bioyond_debug_records"
path = Path(candidate)
if not path.is_absolute():
repo_root = Path(__file__).resolve().parents[4]
path = repo_root / path
return path
def _ensure_debug_log_state(self) -> None:
"""从 ``self.bioyond_config`` 派生 ``_debug_log_enabled`` / ``_debug_log_dir``。
每次进入 ``_debug_call_session`` 时都重新解析,以兼容前端在运行时
修改 ``bioyond_config['debug_log']`` 或目录的场景;同时也容忍
子类(如 Sirna 延迟初始化)在 ``__init__`` 早期未触发本方法。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
self._debug_log_enabled = bool(cfg.get("debug_log"))
self._debug_log_dir = self._debug_log_resolved_dir()
@contextmanager
def _debug_call_session(self, action_name: str):
"""在 action 体外加一层 debug 会话上下文。
- ``debug_log`` 关闭时是空上下文,开销为 0。
- ``debug_log`` 开启时进入 :func:`debug_call_log.session`,所有
已被 ``wrap_rpc_http`` 包装过的 RPC 客户端都会捕获本次 action
产生的 HTTP 调用并写入 Markdown 文件。
子类(如 ``end_experiment``、``manual_unload`` 等)可以直接在
action 体里以 ``with self._debug_call_session("action_name"):`` 包裹。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
enabled = bool(cfg.get("debug_log"))
if not enabled:
yield None
return
out_dir = BioyondWorkstation._debug_log_resolved_dir(self)
with debug_call_log.session(action_name, out_dir) as ctx:
yield ctx
def _publish_task_status(
self,
task_id: str,
@@ -862,7 +953,7 @@ class BioyondWorkstation(WorkstationBase):
self.bioyond_config = {}
print("警告: 未提供 bioyond_config请确保在 JSON 配置文件中提供完整配置")
self.hardware_interface = BioyondV1RPC(self.bioyond_config)
self.hardware_interface = self._create_bioyond_rpc(self.bioyond_config)
def resource_tree_add(self, resources: List[ResourcePLR]) -> None:
"""添加资源到资源树并更新ROS节点
@@ -1338,11 +1429,7 @@ class BioyondWorkstation(WorkstationBase):
if self.hardware_interface:
self.hardware_interface.scheduler_reset()
# 新物料缓存
if self.hardware_interface:
self.hardware_interface.refresh_material_cache()
# 重新同步资源
# 重新同步资源,并用同一次库存查询结果更新物料缓存
if self.resource_synchronizer:
self.resource_synchronizer.sync_from_external()