feat: RNA refine Bioyond siRNA Experiment 1 submission

This commit is contained in:
yxz321
2026-05-07 10:56:32 +08:00
parent 1519a7d985
commit 18c3263e92
2 changed files with 477 additions and 64 deletions

View File

@@ -10,10 +10,21 @@ import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple
from typing import Annotated, Any, Dict, Iterable, List, Literal, Optional, Tuple
from urllib import error, request
from uuid import UUID
try:
from typing_extensions import TypedDict
except ImportError: # pragma: no cover - 仅用于轻量环境导入
from typing import TypedDict # type: ignore
try:
from pydantic import Field
except Exception: # pragma: no cover - 仅用于无 pydantic 的轻量环境导入
def Field(*args: Any, **kwargs: Any) -> Dict[str, Any]:
return kwargs
if __package__ in {None, ""}:
repo_root = Path(__file__).resolve().parents[5]
if str(repo_root) not in sys.path:
@@ -67,6 +78,23 @@ WORKFLOW_LIST_ENDPOINT = "/api/lims/workflow/work-flow-list"
SUPPORTED_WORKFLOW_TYPES = {0, 1, 2}
DEFAULT_READY_SIGNAL = "READY"
DEFAULT_RESET_OPERATIONS = ("scheduler_reset", "reset_order_status", "reset_location")
DEFAULT_SIRNA_MATERIAL_TYPE_MAPPINGS = {
"bioyond_sirna_g3_200ul_tip_rack": ["G3-200ul枪头盒", ""],
"bioyond_sirna_g3_50ul_tip_rack": ["G3-50ul枪头盒", ""],
"bioyond_sirna_384_well_plate": ["384孔板", ""],
"bioyond_sirna_cell_culture_plate": ["细胞培养板", ""],
"bioyond_sirna_reagent_trough": ["试剂槽RiboGreen", ""],
}
class Experiment1RequiredParams(TypedDict):
sample_throughput: Annotated[int, Field(description="样品通量1-96必填表示一次实验处理的样品数量。")]
class Experiment1OptionalParams(TypedDict, total=False):
order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")]
parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")]
auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料默认True")]
def _utc_now_iso8601_ms() -> str:
@@ -90,6 +118,15 @@ def _workflow_list_data(
}
def _apply_default_sirna_material_type_mappings(config: Dict[str, Any]) -> None:
configured = config.get("material_type_mappings")
if not isinstance(configured, dict):
configured = {}
merged = dict(DEFAULT_SIRNA_MATERIAL_TYPE_MAPPINGS)
merged.update(configured)
config["material_type_mappings"] = merged
def load_sirna_config(config_path: str | Path) -> Dict[str, Any]:
"""从 JSON 文件读取小核酸站配置。"""
path = Path(config_path)
@@ -197,6 +234,7 @@ class BioyondSirnaStation(BioyondWorkstation):
if bioyond_config:
merged_config.update(bioyond_config)
merged_config.update(kwargs)
_apply_default_sirna_material_type_mappings(merged_config)
self._apply_env_api_config(merged_config)
self.protocol_type = protocol_type
@@ -294,53 +332,80 @@ class BioyondSirnaStation(BioyondWorkstation):
always_free=True,
node_type=NodeType.MANUAL_CONFIRM,
placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"},
goal_default={"timeout_seconds": 3600, "assignee_user_ids": []},
goal_default={
"optional_params": {"auto_register_materials": True},
"timeout_seconds": 3600,
"assignee_user_ids": [],
},
feedback_interval=300,
description="提交小核酸实验",
description="提交小核酸实验1报告基因检测",
)
def submit_experiment(
def submit_experiment_1(
self,
workflow_name: str = "",
sub_workflow_name: str = "",
order_code: str = "",
order_name: str = "",
sample_throughput: int = 4,
parameter_overrides: Optional[Dict[str, Any]] = None,
include_all_task_displayable: bool = False,
reset_before_create: bool = True,
reset_operations: Optional[
List[Literal["scheduler_reset", "reset_order_status", "reset_location"]]
] = None,
reset_order_id: str = "",
reset_location_id: str = "",
cleanup_order_code: str = "",
api_host: str = "",
api_key: str = "",
ready_signal: str = "READY",
required_params: Experiment1RequiredParams,
optional_params: Optional[Experiment1OptionalParams] = None,
timeout_seconds: int = 3600,
assignee_user_ids: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""实时查询 LIMS 工作流和步骤参数,构造并提交 create-order。"""
"""提交小核酸实验1报告基因检测到 Bioyond LIMS。
自动查询实验1工作流参数使用 API 默认值填充所有参数,
创建订单并分配物料,最后将物料注册到 UniLabOS 资源树。
Args:
required_params: 必填参数组
sample_throughput: 样品通量1-96必填表示一次实验处理的样品数量。
optional_params: 可选参数组
order_name: 订单名称(可选,自动生成)
parameter_overrides: 参数覆盖(文本格式)
auto_register_materials: 是否自动注册物料默认True
timeout_seconds: 超时时间(秒,框架参数)
assignee_user_ids: 分配用户ID列表框架参数
Returns:
包含以下字段的字典:
- success (bool): 是否成功
- order_code (str): 订单编号
- order_name (str): 订单名称
- order_ids (List[str]): 订单ID列表
- materials (List[Dict]): 物料记录列表
- materials_by_type (Dict): 按类型分组的物料
- confirmation_message (str): 确认消息
- registration_result (Dict): 物料注册结果
"""
if isinstance(required_params, dict):
sample_throughput = required_params.get("sample_throughput")
else:
sample_throughput = required_params
sample_throughput = int(sample_throughput)
if optional_params is None:
optional_params = {}
order_name = optional_params.get("order_name", "")
parameter_overrides = optional_params.get("parameter_overrides", "")
auto_register_materials = optional_params.get("auto_register_materials", True)
del timeout_seconds, assignee_user_ids, kwargs
self._update_runtime_api_config(api_host=api_host, api_key=api_key)
self._require_ready_signal(ready_signal)
rpc = self._require_hardware_interface("create_order")
workflow = self._resolve_experiment_workflow(
rpc,
workflow_name=workflow_name,
sub_workflow_name=sub_workflow_name,
)
# 自动解析实验1工作流无需用户指定workflow_name
workflow = self._resolve_experiment_1_workflow(rpc)
step_data = rpc.workflow_step_query(workflow["sub_workflow_id"])
# Parse parameter_overrides from text format
parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides)
param_values, parameter_template = self._build_param_values_from_step_data(
step_data,
parameter_overrides=parameter_overrides or [],
include_all_task_displayable=include_all_task_displayable,
parameter_overrides=parsed_overrides,
include_all_task_displayable=True,
)
if not param_values:
raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues")
resolved_order_code, resolved_order_name = self._build_bioyond_order_identity(order_code, order_name)
resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name)
order_payload = [
{
"orderCode": resolved_order_code,
@@ -352,17 +417,7 @@ class BioyondSirnaStation(BioyondWorkstation):
}
]
reset_result = None
if reset_before_create:
reset_result = self._run_reset_operations(
rpc,
reset_operations=reset_operations,
reset_order_id=reset_order_id,
reset_location_id=reset_location_id,
cleanup_order_code=cleanup_order_code,
)
logger.info(f"正在提交小核酸实验: {resolved_order_name} ({resolved_order_code})")
logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})")
raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False))
parsed_result = self._parse_lims_result(raw_result)
material_records = self._extract_create_order_materials(parsed_result)
@@ -376,6 +431,24 @@ class BioyondSirnaStation(BioyondWorkstation):
"order_name": resolved_order_name,
"workflow": workflow,
}
confirmation_data = self._format_create_order_confirmation(
order_code=resolved_order_code,
order_name=resolved_order_name,
workflow=workflow,
order_ids=order_ids,
material_records=material_records,
suggested_locations=suggested_locations,
)
registration_result = None
if auto_register_materials and material_records:
try:
registration_result = self._register_materials_to_tree(material_records)
logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树")
except Exception as e:
logger.error(f"物料注册失败: {e}")
registration_result = {"error": str(e)}
result = {
"success": self._create_result_success(parsed_result, order_ids, material_records),
"order_code": resolved_order_code,
@@ -383,23 +456,54 @@ class BioyondSirnaStation(BioyondWorkstation):
"order_ids": order_ids,
"workflow": workflow,
"sample_throughput": int(sample_throughput),
"reset_result": reset_result,
"payload": order_payload,
"parameter_template": parameter_template,
"create_order_result": parsed_result,
"materials": material_records,
"materials_by_type": confirmation_data.get("materials_by_type", {}),
"suggested_locations": suggested_locations,
"start_experiment": start_experiment_info,
"confirmation_message": self._format_create_order_confirmation(
order_code=resolved_order_code,
order_name=resolved_order_name,
workflow=workflow,
order_ids=order_ids,
material_records=material_records,
suggested_locations=suggested_locations,
),
"confirmation_message": confirmation_data.get("confirmation_message", ""),
"registration_result": registration_result,
}
return self._with_ready_signal(result)
return result
def _parse_parameter_overrides_text(self, text: str) -> Dict[str, Any]:
"""Parse parameter overrides from text format.
Supports two formats:
1. Key-value pairs: "key1=value1,key2=value2"
2. JSON string: '{"key1": "value1", "key2": "value2"}'
Args:
text: Parameter overrides as text
Returns:
Dict of parameter overrides
"""
if not text or not text.strip():
return {}
text = text.strip()
# Try JSON format first
if text.startswith("{"):
try:
return json.loads(text)
except json.JSONDecodeError:
logger.warning(f"无法解析 JSON 格式的参数覆盖: {text}")
return {}
# Parse key=value,key=value format
result = {}
for pair in text.split(","):
pair = pair.strip()
if "=" not in pair:
continue
key, value = pair.split("=", 1)
result[key.strip()] = value.strip()
return result
@action(
always_free=True,
@@ -1164,31 +1268,188 @@ class BioyondSirnaStation(BioyondWorkstation):
order_ids: List[str],
material_records: List[Dict[str, Any]],
suggested_locations: List[Dict[str, str]],
) -> str:
) -> Dict[str, Any]:
"""Format create order confirmation message with grouped materials.
Returns:
Dict with 'confirmation_message' (str) and 'materials_by_type' (dict)
"""
lines = [
f"实验已提交: {order_name} ({order_code})",
f"工作流: {workflow.get('workflow_name', '')} / {workflow.get('sub_workflow_name', '')}",
]
if order_ids:
lines.append(f"实验ID: {', '.join(order_ids)}")
# Group materials by type for better readability
grouped = {}
if material_records:
lines.append("所需物料与建议库位:")
for index, record in enumerate(material_records, 1):
name = record.get("materialName") or "未命名物料"
code = record.get("materialCode") or "-"
quantity = record.get("quantity") or "-"
material_type = record.get("materialTypeName") or record.get("materialTypeMode") or "-"
location = record.get("locationShowName") or record.get("locationCode") or "-"
lines.append(f"{index}. {name} [{code}], {quantity}, {material_type}, 建议库位 {location}")
lines.append("\n实验物料分配确认:")
grouped = self._group_materials_by_type(material_records)
for mode in ["Sample", "Consumables", "Reagent"]:
materials = grouped.get(mode, [])
if materials:
lines.append(f"\n{mode}")
for i, mat in enumerate(materials, 1):
name = mat.get("materialName") or "未命名物料"
code = mat.get("materialCode") or "-"
quantity = mat.get("quantity") or "-"
location = mat.get("locationShowName") or mat.get("locationCode") or "-"
lines.append(f" {i}. {name} ({code})")
lines.append(f" 数量: {quantity}, 位置: {location}")
else:
lines.append("所需物料与建议库位: LIMS 未返回预分配记录")
if suggested_locations:
lines.append("库位汇总:")
lines.append("\n库位汇总:")
for index, location in enumerate(suggested_locations, 1):
material = location.get("material_name") or "未命名物料"
code = location.get("location_code") or location.get("location_id") or "-"
lines.append(f"{index}. {material} -> {code}")
return "\n".join(lines)
return {
"confirmation_message": "\n".join(lines),
"materials_by_type": grouped,
}
def _group_materials_by_type(self, materials: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""Group materials by materialTypeMode (Sample/Consumables/Reagent)."""
grouped: Dict[str, List[Dict[str, Any]]] = {
"Sample": [],
"Consumables": [],
"Reagent": [],
}
for mat in materials:
mode = mat.get("materialTypeMode", "Unknown")
if mode not in grouped:
grouped[mode] = []
grouped[mode].append(mat)
return grouped
def _register_materials_to_tree(self, material_records: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Register Bioyond materials to UniLabOS resource tree after user confirmation."""
from unilabos.resources.bioyond.sirna_materials import get_material_class_by_type_code
deck = getattr(self, "deck", None)
if deck is None:
logger.warning("deck 未初始化,跳过 resource tree 注册")
return {"registered": [], "skipped": material_records, "reason": "no_deck"}
registered = []
skipped = []
for mat in material_records:
type_code = mat.get("materialTypeCode", "")
resource_class = get_material_class_by_type_code(type_code)
if resource_class is None:
logger.warning(f"未知 materialTypeCode {type_code},跳过: {mat.get('materialName')}")
skipped.append(mat)
continue
location_code = mat.get("locationShowName") or mat.get("locationCode") or ""
if not location_code:
skipped.append(mat)
continue
try:
warehouse, idx = self._resolve_location_to_warehouse(location_code)
except (ValueError, RuntimeError) as exc:
logger.warning(f"解析库位 {location_code} 失败: {exc}")
skipped.append(mat)
continue
material_code = mat.get("materialCode") or f"mat_{type_code}_{location_code}"
plr_resource = resource_class(name=material_code)
plr_resource.unilabos_extra = {
"material_bioyond_id": mat.get("materialId", ""),
"material_bioyond_name": mat.get("materialName", ""),
"material_bioyond_type": mat.get("materialTypeName", ""),
"material_bioyond_type_code": type_code,
"material_bioyond_type_mode": mat.get("materialTypeMode", ""),
"location_code": location_code,
}
try:
warehouse[idx] = plr_resource
registered.append({
"material_code": material_code,
"material_name": mat.get("materialName", ""),
"location_code": location_code,
"warehouse": warehouse.name,
"index": idx,
})
except (IndexError, TypeError) as exc:
logger.warning(f"放置物料 {material_code}{warehouse.name}[{idx}] 失败: {exc}")
skipped.append(mat)
self._publish_resource_tree_update()
return {"registered": registered, "skipped": skipped}
def _resolve_location_to_warehouse(self, location_code: str) -> Tuple[Any, int]:
"""Map Bioyond location code to (warehouse, index).
Location codes come from API response (e.g., "10-1", "4-13").
The deck defines warehouses with known row/col layouts:
- G3移液站: row 1, cols 1-14 (code "1-X" where X <= 14 and in G3 range)
- 自动化堆栈: rows 1-10, cols 1-17 (code "Y-X")
- 离心机配平板堆栈: rows 1-2, cols 1-1
Since both G3移液站 and 自动化堆栈 share row-based codes, we rely on
the deck warehouse configuration to determine which warehouse owns which
row/col range. The live API allocates materials using the same site codes
as defined in the deck setup.
"""
deck = getattr(self, "deck", None)
if deck is None:
raise RuntimeError("deck 未初始化")
parts = location_code.replace("-", "-").split("-")
if len(parts) != 2:
raise ValueError(f"无法解析库位代码: {location_code!r}")
row = int(parts[0])
col = int(parts[1])
# Try to find the matching warehouse by checking site keys
for child in deck.children:
warehouse = child
if not hasattr(warehouse, "sites"):
continue
site_key = f"{row}-{col}"
sites = warehouse.sites if hasattr(warehouse, "sites") else {}
if isinstance(sites, dict) and site_key in sites:
idx = list(sites.keys()).index(site_key)
return warehouse, idx
# Fallback: use row/col based heuristic matching deck dimensions
for child in deck.children:
warehouse = child
num_sites = getattr(warehouse, "num_items", 0) or len(getattr(warehouse, "children", []))
if num_sites == 0:
continue
num_cols = getattr(warehouse, "_num_items_y", None) or getattr(warehouse, "num_items_y", 1)
num_rows = getattr(warehouse, "_num_items_x", None) or getattr(warehouse, "num_items_x", 1)
if 1 <= row <= num_rows and 1 <= col <= num_cols:
idx = (row - 1) * num_cols + (col - 1)
if idx < num_sites:
return warehouse, idx
raise ValueError(f"未找到与库位 {location_code} 匹配的 warehouse")
def _publish_resource_tree_update(self) -> None:
"""Trigger ROS2 resource tree update for frontend refresh."""
ros_node = getattr(self, "_ros_node", None)
if ros_node is None:
return
try:
if hasattr(ros_node, "update_resource"):
deck = getattr(self, "deck", None)
if deck is not None:
ros_node.update_resource(
resource_name=deck.name,
resource_data=deck.serialize() if hasattr(deck, "serialize") else {},
)
except Exception as exc:
logger.warning(f"resource tree 更新失败 (非阻塞): {exc}")
def _record_name(self, record: Optional[Dict[str, Any]]) -> str:
if not isinstance(record, dict):

View File

@@ -0,0 +1,152 @@
"""Sirna Station Material Resource Definitions
Defines PyLabRobot resource classes for Bioyond Sirna station materials.
Each class is decorated with @resource for AST-based registry discovery.
"""
from typing import Optional
from collections import OrderedDict
from pylabrobot.resources import Plate, TipRack, Container
from unilabos.registry.decorators import resource
@resource(
id="bioyond_sirna_g3_200ul_tip_rack",
category=["labware", "tip_rack"],
description="G3-200ul枪头盒 for Sirna station",
)
class BioyondSirna_G3_200ul_TipRack(TipRack):
"""G3-200ul tip rack for Sirna liquid handling."""
def __init__(
self,
name: str,
with_tips: bool = True,
):
super().__init__(
name=name,
size_x=127.76,
size_y=85.48,
size_z=64.0,
model="bioyond_sirna_g3_200ul_tip_rack",
with_tips=with_tips,
ordering=OrderedDict(), # Empty ordering to satisfy PyLabRobot requirement
)
@resource(
id="bioyond_sirna_g3_50ul_tip_rack",
category=["labware", "tip_rack"],
description="G3-50ul枪头盒 for Sirna station",
)
class BioyondSirna_G3_50ul_TipRack(TipRack):
"""G3-50ul tip rack for Sirna liquid handling."""
def __init__(
self,
name: str,
with_tips: bool = True,
):
super().__init__(
name=name,
size_x=127.76,
size_y=85.48,
size_z=64.0,
model="bioyond_sirna_g3_50ul_tip_rack",
with_tips=with_tips,
ordering=OrderedDict(), # Empty ordering to satisfy PyLabRobot requirement
)
@resource(
id="bioyond_sirna_384_well_plate",
category=["labware", "plate"],
description="384孔板 for Sirna assays",
)
class BioyondSirna_384WellPlate(Plate):
"""384-well plate for Sirna reporter gene detection."""
def __init__(
self,
name: str,
lid: Optional[object] = None,
):
super().__init__(
name=name,
size_x=127.76,
size_y=85.48,
size_z=14.35,
lid=lid,
model="bioyond_sirna_384_well_plate",
plate_type="skirted",
)
@resource(
id="bioyond_sirna_cell_culture_plate",
category=["labware", "plate"],
description="细胞培养板 for Sirna cell culture",
)
class BioyondSirna_CellCulturePlate(Plate):
"""Cell culture plate for Sirna experiments."""
def __init__(
self,
name: str,
lid: Optional[object] = None,
):
super().__init__(
name=name,
size_x=127.76,
size_y=85.48,
size_z=14.35,
lid=lid,
model="bioyond_sirna_cell_culture_plate",
plate_type="skirted",
)
@resource(
id="bioyond_sirna_reagent_trough",
category=["labware", "trough"],
description="试剂槽 for Sirna reagents",
)
class BioyondSirna_ReagentTrough(Container):
"""Reagent trough for Sirna station reagents (RiboGreen, etc.)."""
def __init__(
self,
name: str,
max_volume: float = 300000.0, # 300mL default
):
super().__init__(
name=name,
size_x=127.76,
size_y=85.48,
size_z=44.0,
max_volume=max_volume,
model="bioyond_sirna_reagent_trough",
)
# Material type code mapping for dynamic instantiation
MATERIAL_TYPE_CODE_TO_CLASS = {
"0016": BioyondSirna_G3_200ul_TipRack,
"0017": BioyondSirna_G3_50ul_TipRack,
"0015": BioyondSirna_384WellPlate,
"0001": BioyondSirna_CellCulturePlate,
"0006": BioyondSirna_ReagentTrough,
}
def get_material_class_by_type_code(type_code: str):
"""Get resource class by Bioyond material type code.
Args:
type_code: Bioyond materialTypeCode (e.g., "0016", "0017")
Returns:
Resource class or None if not found
"""
return MATERIAL_TYPE_CODE_TO_CLASS.get(type_code)