Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
c61e410362 ci(deps): bump conda-incubator/setup-miniconda from 3 to 4
Bumps [conda-incubator/setup-miniconda](https://github.com/conda-incubator/setup-miniconda) from 3 to 4.
- [Release notes](https://github.com/conda-incubator/setup-miniconda/releases)
- [Changelog](https://github.com/conda-incubator/setup-miniconda/blob/main/CHANGELOG.md)
- [Commits](https://github.com/conda-incubator/setup-miniconda/compare/v3...v4)

---
updated-dependencies:
- dependency-name: conda-incubator/setup-miniconda
  dependency-version: '4'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-08 17:24:00 +00:00
14 changed files with 22 additions and 5563 deletions

View File

@@ -25,7 +25,7 @@ jobs:
fetch-depth: 0
- name: Setup Miniforge
uses: conda-incubator/setup-miniconda@v3
uses: conda-incubator/setup-miniconda@v4
with:
miniforge-version: latest
use-mamba: true

View File

@@ -86,7 +86,7 @@ jobs:
- name: Setup Miniforge (with mamba)
if: steps.should_build.outputs.should_build == 'true'
uses: conda-incubator/setup-miniconda@v3
uses: conda-incubator/setup-miniconda@v4
with:
miniforge-version: latest
use-mamba: true

View File

@@ -51,7 +51,7 @@ jobs:
fetch-depth: 0
- name: Setup Miniforge (with mamba)
uses: conda-incubator/setup-miniconda@v3
uses: conda-incubator/setup-miniconda@v4
with:
miniforge-version: latest
use-mamba: true

View File

@@ -101,7 +101,7 @@ jobs:
- name: Setup Miniforge
if: steps.should_build.outputs.should_build == 'true'
uses: conda-incubator/setup-miniconda@v3
uses: conda-incubator/setup-miniconda@v4
with:
miniforge-version: latest
use-mamba: true

View File

@@ -94,7 +94,7 @@ jobs:
- name: Setup Miniforge
if: steps.should_build.outputs.should_build == 'true'
uses: conda-incubator/setup-miniconda@v3
uses: conda-incubator/setup-miniconda@v4
with:
miniforge-version: latest
use-mamba: true

View File

@@ -1,459 +0,0 @@
"""Per-action raw call/response log for Bioyond stations.
When a debug session is active, ``wrap_rpc_http`` replaces a ``BioyondV1RPC``
instance's ``post`` / ``get`` methods with closures that perform the HTTP
transport themselves, capture the request/response details, and append a record
to the active session before returning exactly what ``BaseRequest`` would have
returned. Outside of an active session the wrapped method delegates to the
original (unwrapped) implementation, leaving non-debug behavior intact.
The session writes a Markdown file under ``out_dir`` mirroring the format of
``temp_benyao/peptide/_logs/2026-04-30_160316_day3_samplefile_only_raw_calls.md``
minus the "Raw Payload Argument" section.
This module has no dependency on ``BioyondV1RPC`` itself; the only contract is
that the wrapped instance descends from ``BaseRequest`` (i.e. has a logger
returned by ``self.get_logger()``).
"""
from __future__ import annotations
import contextvars
import copy
import inspect
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Iterator, List, Optional
import requests
__all__ = [
"CallRecord",
"CallLogContext",
"session",
"wrap_rpc_http",
"active_session",
]
_DEFAULT_TIMEOUT_GET = 30
_DEFAULT_TIMEOUT_POST = 120
@dataclass
class CallRecord:
"""One captured HTTP call inside a debug session."""
index: int
method: str
url: str
path: str
source: str
transport: str
http_status: Optional[int]
request_body: Any
response_body: Any
error: Optional[str] = None
@dataclass
class CallLogContext:
"""State for a single ``session()`` block.
A session lazily creates its file on the first appended record. Actions
that abort before any RPC produce no file.
"""
action: str
out_dir: Path
started_at: datetime
calls: List[CallRecord] = field(default_factory=list)
file_path: Optional[Path] = None
def append(self, record: CallRecord) -> None:
record.index = len(self.calls) + 1
self.calls.append(record)
self._write_file()
# -- file I/O -------------------------------------------------------------
def _resolve_file_path(self) -> Path:
if self.file_path is not None:
return self.file_path
timestamp = self.started_at.strftime("%Y-%m-%d_%H%M%S")
slug = _slugify_action(self.action)
candidate = self.out_dir / f"{timestamp}_{slug}_raw_calls.md"
suffix = 2
while candidate.exists():
candidate = (
self.out_dir
/ f"{timestamp}_{slug}_raw_calls_{suffix:02d}.md"
)
suffix += 1
self.file_path = candidate
return self.file_path
def _write_file(self) -> None:
path = self._resolve_file_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(_render_markdown(self), encoding="utf-8")
_active_session: contextvars.ContextVar[Optional[CallLogContext]] = (
contextvars.ContextVar("_active_session", default=None)
)
def active_session() -> Optional[CallLogContext]:
"""Return the currently active :class:`CallLogContext`, if any."""
return _active_session.get()
@contextmanager
def session(action: str, out_dir: Path) -> Iterator[CallLogContext]:
"""Open a per-action debug session.
On entry, sets the module-level ``_active_session`` ContextVar so any
``wrap_rpc_http``'d clients on the same thread/task record their calls.
On exit, the previous active session (if any) is restored.
"""
ctx = CallLogContext(
action=str(action),
out_dir=Path(out_dir),
started_at=datetime.now(),
)
token = _active_session.set(ctx)
try:
yield ctx
finally:
_active_session.reset(token)
def wrap_rpc_http(rpc: Any) -> None:
"""Idempotently wrap ``rpc.post`` / ``rpc.get``.
When a session is active (``_active_session.get() is not None``), the
wrapped methods perform the HTTP call themselves with ``requests`` and
record the call before returning the same value ``BaseRequest`` would have
returned. When no session is active, the wrapped methods delegate to the
original implementation, preserving stock ``BaseRequest`` behavior.
Calling this twice on the same instance is a no-op. The wrapper does not
alter ``rpc.form_post`` (no Sirna action calls it as of plan 3).
"""
if rpc is None:
return
if getattr(rpc, "_debug_call_log_wrapped", False):
return
rpc._orig_post = rpc.post
rpc._orig_get = rpc.get
def _wrapped_post(
url: str,
params: Any = None,
files: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if files is not None:
kwargs["files"] = files
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_post(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="POST",
url=str(url),
path=_url_path(url),
source=source,
transport=_pick_transport(effective_params),
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.post(
url,
data=json.dumps(effective_params) if effective_params else None,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_POST,
files=files,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {response.request.body} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
else:
record.error = f"HTTP {response.status_code}: {response.text}"
try:
rpc.get_logger().error(
f"Request ERROR: ('Request ERROR:', {response.text!r})"
)
except Exception:
pass
return_value = None
ctx.append(record)
return return_value
def _wrapped_get(
url: str,
params: Any = None,
headers: Optional[dict] = None,
) -> Any:
ctx = _active_session.get()
if ctx is None:
kwargs = {}
if params is not None:
kwargs["params"] = params
if headers is not None:
kwargs["headers"] = headers
return rpc._orig_get(url, **kwargs)
effective_params = params if params is not None else {}
effective_headers = (
headers
if headers is not None
else {"Content-Type": "application/json"}
)
source = _detect_source(rpc)
request_body = _redact(effective_params)
record = CallRecord(
index=0,
method="GET",
url=str(url),
path=_url_path(url),
source=source,
transport="params",
http_status=None,
request_body=request_body,
response_body=None,
error=None,
)
return_value: Any = None
try:
response = requests.get(
url,
params=effective_params,
headers=effective_headers,
timeout=_DEFAULT_TIMEOUT_GET,
)
except Exception as exc: # pragma: no cover - delegated to logger
record.error = f"transport error: {exc}"
try:
rpc.get_logger().error(f"Request ERROR: {exc}")
except Exception:
pass
ctx.append(record)
return None
record.http_status = response.status_code
record.response_body, parse_error = _decode_response_body(response)
try:
rpc.get_logger().debug(
f"Request >>> : {effective_params} "
f"{response.status_code} {response.text}"
)
except Exception:
pass
if response.status_code == 200:
if parse_error is not None:
record.error = f"json parse error: {parse_error}"
return_value = None
else:
return_value = record.response_body
ctx.append(record)
return return_value
rpc.post = _wrapped_post
rpc.get = _wrapped_get
rpc._debug_call_log_wrapped = True
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_URL_PATH_RE = re.compile(r"https?://[^/]+(/.*)?$")
_SLUG_RE = re.compile(r"[^A-Za-z0-9._-]+")
def _slugify_action(action: str) -> str:
slug = _SLUG_RE.sub("_", str(action)).strip("_")
return slug or "action"
def _url_path(url: Any) -> str:
text = str(url or "")
match = _URL_PATH_RE.match(text)
if match and match.group(1):
return match.group(1)
if text.startswith("/"):
return text
return text
def _pick_transport(params: Any) -> str:
if isinstance(params, dict) and "data" in params:
return "data"
return "params"
def _detect_source(rpc: Any) -> str:
"""Walk the call stack to find the outermost frame whose ``self`` is rpc."""
try:
stack = inspect.stack()
except Exception:
return ""
candidate = ""
try:
for frame_info in stack:
frame = frame_info.frame
if frame.f_locals.get("self", None) is rpc:
candidate = frame_info.function
return candidate
finally:
del stack
def _redact(params: Any) -> Any:
"""Return a copy of ``params`` with ``apiKey`` redacted."""
try:
cloned = copy.deepcopy(params)
except Exception:
return params
_redact_in_place(cloned)
return cloned
def _redact_in_place(value: Any) -> None:
if isinstance(value, dict):
for key in list(value.keys()):
if isinstance(key, str) and key.lower() == "apikey":
value[key] = "<redacted>"
else:
_redact_in_place(value[key])
elif isinstance(value, list):
for item in value:
_redact_in_place(item)
def _decode_response_body(response: Any) -> tuple[Any, Optional[str]]:
"""Best-effort response decoding used for both record + return value."""
text = getattr(response, "text", "")
try:
return response.json(), None
except Exception as exc:
if text:
return {"raw_text": text}, str(exc)
return None, str(exc)
# ---------------------------------------------------------------------------
# Markdown rendering
# ---------------------------------------------------------------------------
def _render_markdown(ctx: CallLogContext) -> str:
title = f"# {ctx.action} Raw Call/Response Log"
parts: List[str] = [title, ""]
parts.append("## LIMS Calls")
parts.append("")
parts.append("| # | Method | Path | Source | HTTP |")
parts.append("|---|---|---|---|---|")
for record in ctx.calls:
anchor = _row_anchor(record)
http = (
f"`{record.http_status}`"
if record.http_status is not None
else "`-`"
)
parts.append(
f"| [{record.index}](#{anchor}) | `{record.method}` | "
f"`{record.path}` | `{record.source}` | {http} |"
)
parts.append("")
for record in ctx.calls:
parts.append(f"## {record.index} {record.method} {record.path}")
parts.append("")
parts.append(f"- Source: `{record.source}`")
parts.append(f"- Transport: `{record.transport}`")
if record.http_status is not None:
parts.append(f"- HTTP status: `{record.http_status}`")
else:
parts.append("- HTTP status: `-`")
if record.error:
parts.append(f"- Error: {record.error}")
parts.append("")
parts.append("### Request Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.request_body))
parts.append("```")
parts.append("")
parts.append("### Response Body")
parts.append("")
parts.append("```json")
parts.append(_to_json_block(record.response_body))
parts.append("```")
parts.append("")
return "\n".join(parts).rstrip() + "\n"
def _row_anchor(record: CallRecord) -> str:
"""Build a GitHub-style anchor matching ``## N METHOD /path``."""
raw = f"{record.index}-{record.method}-{record.path}"
raw = raw.lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw)
return raw.strip("-")
def _to_json_block(value: Any) -> str:
try:
return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=True)
except TypeError:
return json.dumps(str(value), ensure_ascii=False, indent=2)

View File

@@ -1,3 +0,0 @@
from .sirna_station import BioyondSirnaStation, fetch_workflow_list, load_sirna_config
__all__ = ["BioyondSirnaStation", "fetch_workflow_list", "load_sirna_config"]

View File

@@ -7,7 +7,6 @@ Bioyond Workstation Implementation
import time
import traceback
import threading
from contextlib import contextmanager
from datetime import datetime
from typing import Dict, Any, List, Optional, Union
import json
@@ -15,7 +14,6 @@ from pathlib import Path
from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
from unilabos.devices.workstation.bioyond_studio import debug_call_log
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.resources.warehouse import WareHouse
from unilabos.utils.log import logger
@@ -176,8 +174,6 @@ class BioyondResourceSynchronizer(ResourceSynchronizer):
logger.warning("从Bioyond获取的物料数据为空")
return False
self._update_material_cache_from_stock(all_bioyond_data)
# 转换为UniLab格式
unilab_resources = resource_bioyond_to_plr(
all_bioyond_data,
@@ -191,29 +187,6 @@ class BioyondResourceSynchronizer(ResourceSynchronizer):
logger.error(f"从Bioyond同步物料数据失败: {e}")
return False
def _update_material_cache_from_stock(self, materials: List[Dict[str, Any]]) -> None:
"""用本次库存查询结果同步 RPC 的 name -> material id 缓存。"""
material_cache = getattr(self.bioyond_api_client, "material_cache", None)
if not isinstance(material_cache, dict):
return
before_count = len(material_cache)
for material in materials:
material_name = material.get("name")
material_id = material.get("id")
if material_name and material_id:
material_cache[material_name] = material_id
for detail_material in material.get("detail", []) or []:
detail_name = detail_material.get("name")
detail_id = detail_material.get("detailMaterialId") or detail_material.get("id")
if detail_name and detail_id:
material_cache[detail_name] = detail_id
logger.debug(
f"已用Bioyond库存同步物料缓存: {before_count} -> {len(material_cache)}"
)
def sync_to_external(self, resource: Any) -> bool:
"""将本地物料数据变更同步到Bioyond系统"""
try:
@@ -705,70 +678,6 @@ class BioyondWorkstation(WorkstationBase):
集成Bioyond物料管理的工作站实现
"""
# 子类(如 sirna / peptide覆写以指定默认 raw-call 日志目录。
# 路径相对仓库根;为 None 时若 debug_log=True 仍会写入临时位置。
_DEBUG_LOG_DEFAULT_DIR: Optional[str] = None
def _create_bioyond_rpc(self, config: Dict[str, Any]) -> BioyondV1RPC:
"""创建 Bioyond RPC 客户端并应用调试包装。
所有创建 ``BioyondV1RPC`` 的路径饿汉初始化、Sirna 延迟初始化、
以及未来的前端重新配置路径)都应通过该 helper
以确保 debug_log 包装与命名/日志策略保持一致。
"""
rpc = BioyondV1RPC(config)
debug_call_log.wrap_rpc_http(rpc)
return rpc
def _set_hardware_interface(self, rpc: BioyondV1RPC) -> BioyondV1RPC:
"""将已构造的 RPC 客户端设置到 ``self.hardware_interface``,并应用调试包装。"""
debug_call_log.wrap_rpc_http(rpc)
self.hardware_interface = rpc
return rpc
def _debug_log_resolved_dir(self) -> Path:
"""解析 ``debug_log_dir`` 为绝对路径。"""
configured = (getattr(self, "bioyond_config", {}) or {}).get("debug_log_dir")
default_dir = getattr(self, "_DEBUG_LOG_DEFAULT_DIR", None)
candidate = configured or default_dir or "temp_benyao/_logs/bioyond_debug"
path = Path(candidate)
if not path.is_absolute():
repo_root = Path(__file__).resolve().parents[4]
path = repo_root / path
return path
def _ensure_debug_log_state(self) -> None:
"""从 ``self.bioyond_config`` 派生 ``_debug_log_enabled`` / ``_debug_log_dir``。
每次进入 ``_debug_call_session`` 时都重新解析,以兼容前端在运行时
修改 ``bioyond_config['debug_log']`` 或目录的场景;同时也容忍
子类(如 Sirna 延迟初始化)在 ``__init__`` 早期未触发本方法。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
self._debug_log_enabled = bool(cfg.get("debug_log"))
self._debug_log_dir = self._debug_log_resolved_dir()
@contextmanager
def _debug_call_session(self, action_name: str):
"""在 action 体外加一层 debug 会话上下文。
- ``debug_log`` 关闭时是空上下文,开销为 0。
- ``debug_log`` 开启时进入 :func:`debug_call_log.session`,所有
已被 ``wrap_rpc_http`` 包装过的 RPC 客户端都会捕获本次 action
产生的 HTTP 调用并写入 Markdown 文件。
子类(如 ``end_experiment``、``manual_unload`` 等)可以直接在
action 体里以 ``with self._debug_call_session("action_name"):`` 包裹。
"""
cfg = getattr(self, "bioyond_config", {}) or {}
enabled = bool(cfg.get("debug_log"))
if not enabled:
yield None
return
out_dir = BioyondWorkstation._debug_log_resolved_dir(self)
with debug_call_log.session(action_name, out_dir) as ctx:
yield ctx
def _publish_task_status(
self,
task_id: str,
@@ -953,7 +862,7 @@ class BioyondWorkstation(WorkstationBase):
self.bioyond_config = {}
print("警告: 未提供 bioyond_config请确保在 JSON 配置文件中提供完整配置")
self.hardware_interface = self._create_bioyond_rpc(self.bioyond_config)
self.hardware_interface = BioyondV1RPC(self.bioyond_config)
def resource_tree_add(self, resources: List[ResourcePLR]) -> None:
"""添加资源到资源树并更新ROS节点
@@ -1429,7 +1338,11 @@ class BioyondWorkstation(WorkstationBase):
if self.hardware_interface:
self.hardware_interface.scheduler_reset()
# 重新同步资源,并用同一次库存查询结果更新物料缓存
# 新物料缓存
if self.hardware_interface:
self.hardware_interface.refresh_material_cache()
# 重新同步资源
if self.resource_synchronizer:
self.resource_synchronizer.sync_from_external()

View File

@@ -1 +0,0 @@
from . import sirna_materials # noqa: F401 ensure @resource classes are importable for PLR deserialize

View File

@@ -1,8 +1,6 @@
from os import name
from pylabrobot.resources import Deck, Coordinate, Rotation
from unilabos.registry.decorators import resource
from unilabos.resources.bioyond.YB_warehouses import (
bioyond_warehouse_1x4x4,
bioyond_warehouse_1x4x4_right, # 新增:右侧仓库 (A05D08)
@@ -25,11 +23,6 @@ from unilabos.resources.bioyond.YB_warehouses import (
from unilabos.resources.bioyond.warehouses import (
bioyond_warehouse_tipbox_storage_left, # 新增Tip盒堆栈(左)
bioyond_warehouse_tipbox_storage_right, # 新增Tip盒堆栈(右)
bioyond_warehouse_sirna_automation_stack,
bioyond_warehouse_sirna_centrifuge_balance_plate_stack,
bioyond_warehouse_sirna_g3_liquid_handler,
bioyond_warehouse_numeric_stack, # 新增:数字编码堆栈 (用于多肽站)
bioyond_warehouse_live_grid,
)
@@ -108,83 +101,6 @@ class BIOYOND_PolymerPreparationStation_Deck(Deck):
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
@resource(
id="BIOYOND_SirnaStation_Deck",
category=["deck"],
description="BIOYOND 小核酸工作站 Deck",
icon="配液站.webp",
)
class BIOYOND_SirnaStation_Deck(Deck):
WAREHOUSE_BIOYOND_AXIS = {
"G3移液站": "xy_col_row",
"自动化堆栈": "xy_col_row",
"离心机配平板堆栈": "xy_col_row",
}
WAREHOUSE_BIOYOND_KEY_AXIS = {
"G3移液站": "col_row",
"自动化堆栈": "col_row",
"离心机配平板堆栈": "col_row",
}
# Bioyond warehouse UUID -> 本地仓库名称 映射。
# 留空时由配置station config 的 ``warehouse_bioyond_ids``)注入。
# graph 节点也可在 deck.config.warehouse_bioyond_ids 覆盖。
WAREHOUSE_BIOYOND_IDS: dict = {}
def __init__(
self,
name: str = "SirnaStation_Deck",
size_x: float = 2700.0,
size_y: float = 1080.0,
size_z: float = 1500.0,
category: str = "deck",
setup: bool = False,
warehouse_bioyond_ids: dict | None = None,
**kwargs,
) -> None:
super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z)
# 按需写入实例级覆盖;保留默认空 mapping避免改动模型常量。
self.warehouse_bioyond_ids: dict = dict(self.WAREHOUSE_BIOYOND_IDS)
if warehouse_bioyond_ids:
self.warehouse_bioyond_ids.update(warehouse_bioyond_ids)
if setup:
self.setup()
@classmethod
def deserialize(cls, data: dict, allow_marshal: bool = False):
if data.get("children") and data.get("setup") is True:
data = data.copy()
data["setup"] = False
result = super().deserialize(data, allow_marshal=allow_marshal)
result._ensure_sirna_warehouse_metadata()
return result
def _ensure_sirna_warehouse_metadata(self) -> None:
for child in getattr(self, "children", []):
name = getattr(child, "name", "")
axis = self.WAREHOUSE_BIOYOND_AXIS.get(name)
if axis and not hasattr(child, "bioyond_axis"):
child.bioyond_axis = axis
key_axis = self.WAREHOUSE_BIOYOND_KEY_AXIS.get(name)
if key_axis and not hasattr(child, "bioyond_key_axis"):
child.bioyond_key_axis = key_axis
def setup(self) -> None:
# Sirna 读接口 /api/storage/location/locations-by-type 返回完整固定堆栈清单。
# LIMS 在库物料接口仍使用相同的 自动化堆栈 名称和数字库位编码。
self.warehouses = {
"G3移液站": bioyond_warehouse_sirna_g3_liquid_handler(),
"自动化堆栈": bioyond_warehouse_sirna_automation_stack(),
"离心机配平板堆栈": bioyond_warehouse_sirna_centrifuge_balance_plate_stack(),
}
self.warehouse_locations = {
"G3移液站": Coordinate(0.0, 0.0, 0.0),
"自动化堆栈": Coordinate(220.0, 0.0, 0.0),
"离心机配平板堆栈": Coordinate(1740.0, 0.0, 0.0),
}
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
class BIOYOND_YB_Deck(Deck):
def __init__(
self,
@@ -234,146 +150,12 @@ class BIOYOND_YB_Deck(Deck):
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
@resource(
id="BIOYOND_PeptideStation_Deck",
category=["deck"],
description="BIOYOND 多肽工作站 Deck",
icon="preparation_station.webp",
)
class BIOYOND_PeptideStation_Deck(Deck):
WAREHOUSE_BIOYOND_AXIS = dict.fromkeys(
[
"自动化堆栈",
"低温冰箱仓库",
"Tecan移液站库",
"G3移液站库",
"IDOT移液站库",
"G3缓冲库",
"盖板缓冲库",
"配平板缓冲库",
"IDOT缓冲库",
"固相合成板底座缓冲位",
"离心机库位",
"热封膜机位",
],
"xy_col_row",
)
WAREHOUSE_BIOYOND_KEY_AXIS = dict.fromkeys(WAREHOUSE_BIOYOND_AXIS, "row_col")
def __init__(
self,
name: str = "PeptideStation_Deck",
size_x: float = 3500.0,
size_y: float = 1800.0,
size_z: float = 1500.0,
category: str = "deck",
setup: bool = False
) -> None:
super().__init__(name=name, size_x=size_x, size_y=size_y, size_z=size_z)
if setup:
self.setup()
@classmethod
def deserialize(cls, data: dict, allow_marshal: bool = False):
if data.get("children") and data.get("setup") is True:
# 已有序列化子资源,跳过 setup 避免重复创建
result = super(BIOYOND_PeptideStation_Deck, cls).deserialize(data, allow_marshal=allow_marshal)
else:
result = super(BIOYOND_PeptideStation_Deck, cls).deserialize(data, allow_marshal=allow_marshal)
result._ensure_peptide_warehouse_metadata()
return result
def _ensure_peptide_warehouse_metadata(self) -> None:
for child in getattr(self, "children", []):
name = getattr(child, "name", "")
axis = self.WAREHOUSE_BIOYOND_AXIS.get(name)
if axis and not hasattr(child, "bioyond_axis"):
child.bioyond_axis = axis
key_axis = self.WAREHOUSE_BIOYOND_KEY_AXIS.get(name)
if key_axis and not hasattr(child, "bioyond_key_axis"):
child.bioyond_key_axis = key_axis
def setup(self) -> None:
# 多肽工作站仓库配置
# 基于 2026-05-09 live API probe 发现的实际仓库拓扑 (12个仓库)
# 数据来源: temp_benyao/peptide/_logs/warehouse_discovery_raw_live_2026-05-09.json
self.warehouses = {
# 主自动化堆栈 - live API: code 10-17 -> x=17, y=10显示为 10 行×17 列
"自动化堆栈": bioyond_warehouse_numeric_stack(
"自动化堆栈", rows=10, columns=17, bioyond_axis="xy_col_row", bioyond_key_axis="row_col"
),
# 低温存储
"低温冰箱仓库": bioyond_warehouse_live_grid(
"低温冰箱仓库", rows=2, columns=3, slot_keys=["1", "2", "3", "4", "5", "6"]
),
# 移液站库位
"Tecan移液站库": bioyond_warehouse_live_grid(
"Tecan移液站库", rows=1, columns=18, slot_keys=[str(index) for index in range(1, 19)]
),
"G3移液站库": bioyond_warehouse_live_grid(
"G3移液站库",
rows=1,
columns=18,
slot_keys=["1", "2", "3", "4", "垃圾桶", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18"],
),
"IDOT移液站库": bioyond_warehouse_live_grid(
"IDOT移液站库",
rows=1,
columns=12,
slot_keys=[f"0009-{index:04d}" for index in range(1, 13)],
),
# 缓冲库位
"G3缓冲库": bioyond_warehouse_live_grid(
"G3缓冲库", rows=1, columns=5, slot_keys=[str(index) for index in range(1, 6)]
),
"盖板缓冲库": bioyond_warehouse_live_grid(
"盖板缓冲库", rows=1, columns=7, slot_keys=[str(index) for index in range(1, 8)]
),
"配平板缓冲库": bioyond_warehouse_live_grid(
"配平板缓冲库", rows=1, columns=3, slot_keys=[str(index) for index in range(1, 4)]
),
"IDOT缓冲库": bioyond_warehouse_live_grid(
"IDOT缓冲库", rows=1, columns=2, slot_keys=["1", "1"]
),
"固相合成板底座缓冲位": bioyond_warehouse_live_grid(
"固相合成板底座缓冲位",
rows=1,
columns=4,
slot_keys=[f"0015-{index:04d}" for index in range(1, 5)],
),
# 设备库位
"离心机库位": bioyond_warehouse_live_grid(
"离心机库位", rows=1, columns=4, slot_keys=[f"0017-{index:04d}" for index in range(1, 5)]
),
"热封膜机位": bioyond_warehouse_live_grid(
"热封膜机位", rows=1, columns=2, slot_keys=[f"0016-{index:04d}" for index in range(1, 3)]
),
}
# 仓库位置布局 (需根据实际硬件布局调整)
self.warehouse_locations = {
"自动化堆栈": Coordinate(0.0, 0.0, 0.0),
"Tecan移液站库": Coordinate(0.0, 1150.0, 0.0),
"G3移液站库": Coordinate(0.0, 1300.0, 0.0),
"IDOT移液站库": Coordinate(0.0, 1450.0, 0.0),
"G3缓冲库": Coordinate(0.0, 1600.0, 0.0),
"盖板缓冲库": Coordinate(850.0, 1600.0, 0.0),
"低温冰箱仓库": Coordinate(2700.0, 0.0, 0.0),
"配平板缓冲库": Coordinate(2700.0, 300.0, 0.0),
"IDOT缓冲库": Coordinate(2700.0, 450.0, 0.0),
"固相合成板底座缓冲位": Coordinate(2700.0, 600.0, 0.0),
"离心机库位": Coordinate(2700.0, 750.0, 0.0),
"热封膜机位": Coordinate(2700.0, 900.0, 0.0),
}
for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])
def YB_Deck(name: str) -> Deck:
by=BIOYOND_YB_Deck(name=name)
by.setup()
return by

View File

@@ -1,126 +0,0 @@
"""Sirna Station Material Resource Definitions
Defines PyLabRobot resource classes for Bioyond Sirna station materials.
Each class is decorated with @resource for AST-based registry discovery.
"""
from collections import OrderedDict
from pylabrobot.resources import Plate, TipRack, Container
from unilabos.registry.decorators import resource
@resource(
id="bioyond_sirna_g3_200ul_tip_rack",
category=["labware", "tip_rack"],
description="G3-200ul枪头盒 for Sirna station",
)
class BioyondSirna_G3_200ul_TipRack(TipRack):
"""G3-200ul tip rack for Sirna liquid handling."""
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 64.0)
kwargs.setdefault("model", "bioyond_sirna_g3_200ul_tip_rack")
kwargs.setdefault("with_tips", True)
if kwargs.get("ordering") is None and kwargs.get("ordered_items") is None:
kwargs["ordering"] = OrderedDict()
super().__init__(*args, **kwargs)
@resource(
id="bioyond_sirna_g3_50ul_tip_rack",
category=["labware", "tip_rack"],
description="G3-50ul枪头盒 for Sirna station",
)
class BioyondSirna_G3_50ul_TipRack(TipRack):
"""G3-50ul tip rack for Sirna liquid handling."""
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 64.0)
kwargs.setdefault("model", "bioyond_sirna_g3_50ul_tip_rack")
kwargs.setdefault("with_tips", True)
if kwargs.get("ordering") is None and kwargs.get("ordered_items") is None:
kwargs["ordering"] = OrderedDict()
super().__init__(*args, **kwargs)
@resource(
id="bioyond_sirna_384_well_plate",
category=["labware", "plate"],
description="384孔板 for Sirna assays",
)
class BioyondSirna_384WellPlate(Plate):
"""384-well plate for Sirna reporter gene detection."""
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 14.35)
kwargs.setdefault("model", "bioyond_sirna_384_well_plate")
kwargs.setdefault("plate_type", "skirted")
if kwargs.get("ordering") is None and kwargs.get("ordered_items") is None:
kwargs["ordering"] = OrderedDict()
super().__init__(*args, **kwargs)
@resource(
id="bioyond_sirna_cell_culture_plate",
category=["labware", "plate"],
description="细胞培养板 for Sirna cell culture",
)
class BioyondSirna_CellCulturePlate(Plate):
"""Cell culture plate for Sirna experiments."""
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 14.35)
kwargs.setdefault("model", "bioyond_sirna_cell_culture_plate")
kwargs.setdefault("plate_type", "skirted")
if kwargs.get("ordering") is None and kwargs.get("ordered_items") is None:
kwargs["ordering"] = OrderedDict()
super().__init__(*args, **kwargs)
@resource(
id="bioyond_sirna_reagent_trough",
category=["labware", "trough"],
description="试剂槽 for Sirna reagents",
)
class BioyondSirna_ReagentTrough(Container):
"""Reagent trough for Sirna station reagents (RiboGreen, etc.)."""
def __init__(self, *args, **kwargs):
kwargs.setdefault("size_x", 127.76)
kwargs.setdefault("size_y", 85.48)
kwargs.setdefault("size_z", 44.0)
kwargs.setdefault("max_volume", 300000.0)
kwargs.setdefault("model", "bioyond_sirna_reagent_trough")
super().__init__(*args, **kwargs)
# Material type code mapping for dynamic instantiation
MATERIAL_TYPE_CODE_TO_CLASS = {
"0016": BioyondSirna_G3_200ul_TipRack,
"0017": BioyondSirna_G3_50ul_TipRack,
"0015": BioyondSirna_384WellPlate,
"0001": BioyondSirna_CellCulturePlate,
"0006": BioyondSirna_ReagentTrough,
}
def get_material_class_by_type_code(type_code: str):
"""Get resource class by Bioyond material type code.
Args:
type_code: Bioyond materialTypeCode (e.g., "0016", "0017")
Returns:
Resource class or None if not found
"""
return MATERIAL_TYPE_CODE_TO_CLASS.get(type_code)

View File

@@ -1,180 +1,5 @@
from pylabrobot.resources import Coordinate
from pylabrobot.resources.carrier import ResourceHolder, create_homogeneous_resources
from unilabos.resources.warehouse import WareHouse, warehouse_factory
class BioyondWareHouse(WareHouse):
"""Bioyond 仓库,额外保存服务端 x/y 坐标和库位标签语义。"""
def __init__(self, *args, bioyond_axis: str = "xy_row_col", bioyond_key_axis: str = "row_col", **kwargs):
super().__init__(*args, **kwargs)
self.bioyond_axis = bioyond_axis
self.bioyond_key_axis = bioyond_key_axis
def serialize(self) -> dict:
data = super().serialize()
data["bioyond_axis"] = self.bioyond_axis
data["bioyond_key_axis"] = self.bioyond_key_axis
return data
def bioyond_warehouse_numeric_stack(
name: str,
rows: int = 10,
columns: int = 17,
bioyond_axis: str = "xy_row_col",
bioyond_key_axis: str = "row_col",
) -> WareHouse:
"""创建 Bioyond 数字库位堆栈,库位名使用服务端返回的 行-列 格式。
bioyond_axis: 仓库级别的 Bioyond 坐标轴约定,供 graphio 的坐标映射使用。
- "xy_row_col" (default): Bioyond x→row, y→col (reaction/peptide 历史约定).
- "xy_col_row": Bioyond x→col, y→row (Sirna live API 实测约定).
bioyond_key_axis: 库位标签生成约定。
- "row_col" (default): 视觉行列和标签行列一致,例如 10 行 x 17 列 → 1-1..10-17。
- "col_row": 视觉行列转置,但标签仍保持 Bioyond row-col例如
17 行 x 10 列 → 1-1..10-17。
未设置时 graphio 回退到默认 "xy_row_col",其他调用方保持原行为。
"""
num_items_x = columns
num_items_y = rows
num_items_z = 1
dx = 10.0
dy = 10.0
dz = 10.0
item_dx = 147.0
item_dy = 106.0
item_dz = 130.0
locations = [
Coordinate(dx + col * item_dx, dy + row * item_dy, dz)
for row in range(num_items_y)
for col in range(num_items_x)
]
holders = create_homogeneous_resources(
klass=ResourceHolder,
locations=locations,
resource_size_x=127.0,
resource_size_y=86.0,
resource_size_z=25.0,
name_prefix=name,
)
if bioyond_key_axis == "row_col":
keys = [
f"{row + 1}-{col + 1}"
for row in range(num_items_y)
for col in range(num_items_x)
]
elif bioyond_key_axis == "col_row":
keys = [
f"{col + 1}-{row + 1}"
for row in range(num_items_y)
for col in range(num_items_x)
]
else:
raise ValueError(f"未知 Bioyond 库位标签约定: {bioyond_key_axis!r}")
warehouse = BioyondWareHouse(
name=name,
size_x=dx + item_dx * num_items_x,
size_y=dy + item_dy * num_items_y,
size_z=dz + item_dz * num_items_z,
num_items_x=num_items_x,
num_items_y=num_items_y,
num_items_z=num_items_z,
ordering_layout="row-major",
sites={key: holder for key, holder in zip(keys, holders.values())},
category="warehouse",
bioyond_axis=bioyond_axis,
bioyond_key_axis=bioyond_key_axis,
)
return warehouse
def bioyond_warehouse_live_grid(
name: str,
rows: int,
columns: int,
slot_keys: list[str] | None = None,
bioyond_axis: str = "xy_col_row",
bioyond_key_axis: str = "row_col",
) -> WareHouse:
"""创建 Bioyond 实测库位网格,按服务端 code 保存位点标签。
默认用于 Peptide live API 返回的坐标x 是视觉列y 是视觉行。
当服务端 code 重复时,为保持 PLR ordering 唯一性,会给后续重复项追加 ``#N``。
"""
num_items_x = columns
num_items_y = rows
num_items_z = 1
dx = 10.0
dy = 10.0
dz = 10.0
item_dx = 147.0
item_dy = 106.0
item_dz = 130.0
locations = [
Coordinate(dx + col * item_dx, dy + row * item_dy, dz)
for row in range(num_items_y)
for col in range(num_items_x)
]
holders = create_homogeneous_resources(
klass=ResourceHolder,
locations=locations,
resource_size_x=127.0,
resource_size_y=86.0,
resource_size_z=25.0,
name_prefix=name,
)
keys = slot_keys or [str(index + 1) for index in range(num_items_x * num_items_y)]
if len(keys) != len(holders):
raise ValueError(f"{name} 库位数量不匹配: keys={len(keys)}, holders={len(holders)}")
seen: dict[str, int] = {}
unique_keys: list[str] = []
for key in keys:
count = seen.get(key, 0) + 1
seen[key] = count
unique_keys.append(key if count == 1 else f"{key}#{count}")
return BioyondWareHouse(
name=name,
size_x=dx + item_dx * num_items_x,
size_y=dy + item_dy * num_items_y,
size_z=dz + item_dz * num_items_z,
num_items_x=num_items_x,
num_items_y=num_items_y,
num_items_z=num_items_z,
ordering_layout="row-major",
sites={key: holder for key, holder in zip(unique_keys, holders.values())},
category="warehouse",
bioyond_axis=bioyond_axis,
bioyond_key_axis=bioyond_key_axis,
)
# ================ 小核酸工作站相关堆栈 ================
def bioyond_warehouse_sirna_g3_liquid_handler(name: str = "G3移液站") -> WareHouse:
"""创建小核酸 G3 移液站库位堆栈:显示为 14 行 x 1 列,标签保持 1-1..1-14。"""
return bioyond_warehouse_numeric_stack(
name, rows=14, columns=1, bioyond_axis="xy_col_row", bioyond_key_axis="col_row"
)
def bioyond_warehouse_sirna_automation_stack(name: str = "自动化堆栈") -> WareHouse:
"""创建小核酸自动化堆栈:显示为 17 行 x 10 列,标签保持 1-1..10-17。"""
return bioyond_warehouse_numeric_stack(
name, rows=17, columns=10, bioyond_axis="xy_col_row", bioyond_key_axis="col_row"
)
def bioyond_warehouse_sirna_centrifuge_balance_plate_stack(name: str = "离心机配平板堆栈") -> WareHouse:
"""创建小核酸离心机配平板堆栈:显示为 1 行 x 2 列,标签保持 1-1、2-1。"""
return bioyond_warehouse_numeric_stack(
name, rows=1, columns=2, bioyond_axis="xy_col_row", bioyond_key_axis="col_row"
)
# ================ 反应站相关堆栈 ================
def bioyond_warehouse_1x4x4(name: str) -> WareHouse:

View File

@@ -736,7 +736,7 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
logger.warning(f"物料 {unique_name} 不是有效的 ResourcePLR 实例,类型: {type(plr_material)}")
continue
plr_material.code = material.get("barCode") or material.get("code") or ""
plr_material.code = material.get("code", "") and material.get("barCode", "") or ""
plr_material.unilabos_uuid = str(uuid.uuid4())
# ⭐ 保存 Bioyond 原始信息到 unilabos_extra用于出库时查询
@@ -864,22 +864,11 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
warehouse = deck.warehouses[wh_name]
logger.debug(f"[Warehouse匹配] 找到warehouse: {wh_name} (容量: {warehouse.capacity}, 行×列: {warehouse.num_items_x}×{warehouse.num_items_y})")
# Bioyond坐标映射:
# - 历史 row_col 仓库中 x/y 直接按行/列参与索引。
# - Sirna 的库位标签为 col-rowstock-material 返回 x=标签第二段、y=标签第一段。
# 因此 x=13,y=4 应落到 key=4-13而不是交换后落到 3-5。
x = loc.get("x", 1)
y = loc.get("y", 1)
# Bioyond坐标映射 (重要!): x→行(1=A,2=B...), y→列(1=01,2=02...), z→层(通常=1)
x = loc.get("x", 1) # 行号 (1-based: 1=A, 2=B, 3=C, 4=D)
y = loc.get("y", 1) # 列号 (1-based: 1=01, 2=02, 3=03...)
z = loc.get("z", 1) # 层号 (1-based, 通常为1)
# 仓库级别的轴约定覆盖。
# 对旧的 row-col 视觉标签bioyond_axis="xy_col_row" 需要交换 x/y。
# 对 Sirna 的 col-row 库位标签,原始 x/y 已能直接索引到 code 对应位置,不再交换。
bioyond_axis = getattr(warehouse, "bioyond_axis", "xy_row_col")
bioyond_key_axis = getattr(warehouse, "bioyond_key_axis", "row_col")
if bioyond_axis == "xy_col_row" and bioyond_key_axis != "col_row":
x, y = y, x
# 如果是右侧堆栈,需要调整列号 (5→1, 6→2, 7→3, 8→4)
if wh_name == "堆栈1右":
y = y - 4 # 将5-8映射到1-4
@@ -923,43 +912,10 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: Dict[st
logger.debug(f"列优先warehouse {wh_name}: x={x}(行),y={y}(列) → row={row_idx},col={col_idx} → idx={idx}")
if 0 <= idx < warehouse.capacity:
slot_key = None
ordering = getattr(warehouse, "_ordering", {})
sites = getattr(warehouse, "sites", [])
if isinstance(ordering, dict) and idx < len(sites):
site_at_idx = sites[idx]
slot_key = next(
(key for key, site in ordering.items() if site is site_at_idx),
None,
)
current_resource = warehouse[idx]
if current_resource is None or isinstance(current_resource, (ResourceHolder, str)):
if isinstance(current_resource, str):
logger.warning(
f"⚠️ 物料 {unique_name} 覆盖 {wh_name}[{idx}]"
f"{f'({slot_key})' if slot_key else ''} 的旧占位 occupied_by={current_resource!r}"
)
if warehouse[idx] is None or isinstance(warehouse[idx], ResourceHolder):
# 物料尺寸已在放入warehouse前根据需要进行了交换
warehouse[idx] = plr_material
logger.debug(
f"✅ 物料 {unique_name} 放置到 {wh_name}[{idx}]"
f"{f'({slot_key})' if slot_key else ''} "
f"(Bioyond坐标: x={loc.get('x')}, y={loc.get('y')})"
)
else:
parent = getattr(current_resource, "parent", None)
current_repr = repr(current_resource)
current_len = len(current_resource) if isinstance(current_resource, str) else None
logger.warning(
f"⚠️ 物料 {unique_name} 跳过放置到 {wh_name}[{idx}]"
f"{f'({slot_key})' if slot_key else ''}:目标库位已有 "
f"{type(current_resource).__name__}"
f"(value={current_repr}, len={current_len})"
f"(name={getattr(current_resource, 'name', None)}, "
f"parent={getattr(parent, 'name', None)}, "
f"uuid={getattr(current_resource, 'unilabos_uuid', None)})"
)
logger.debug(f"✅ 物料 {unique_name} 放置到 {wh_name}[{idx}] (Bioyond坐标: x={loc.get('x')}, y={loc.get('y')})")
else:
logger.warning(f"❌ 物料 {unique_name} 的索引 {idx} 超出仓库 {wh_name} 容量 {warehouse.capacity}")
else: