diff --git a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py index 8be5e576..52a36e17 100644 --- a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py +++ b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py @@ -10,10 +10,21 @@ import os import sys from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple +from typing import Annotated, Any, Dict, Iterable, List, Literal, Optional, Tuple from urllib import error, request from uuid import UUID +try: + from typing_extensions import TypedDict +except ImportError: # pragma: no cover - 仅用于轻量环境导入 + from typing import TypedDict # type: ignore + +try: + from pydantic import Field +except Exception: # pragma: no cover - 仅用于无 pydantic 的轻量环境导入 + def Field(*args: Any, **kwargs: Any) -> Dict[str, Any]: + return kwargs + if __package__ in {None, ""}: repo_root = Path(__file__).resolve().parents[5] if str(repo_root) not in sys.path: @@ -67,6 +78,23 @@ WORKFLOW_LIST_ENDPOINT = "/api/lims/workflow/work-flow-list" SUPPORTED_WORKFLOW_TYPES = {0, 1, 2} DEFAULT_READY_SIGNAL = "READY" DEFAULT_RESET_OPERATIONS = ("scheduler_reset", "reset_order_status", "reset_location") +DEFAULT_SIRNA_MATERIAL_TYPE_MAPPINGS = { + "bioyond_sirna_g3_200ul_tip_rack": ["G3-200ul枪头盒", ""], + "bioyond_sirna_g3_50ul_tip_rack": ["G3-50ul枪头盒", ""], + "bioyond_sirna_384_well_plate": ["384孔板", ""], + "bioyond_sirna_cell_culture_plate": ["细胞培养板", ""], + "bioyond_sirna_reagent_trough": ["试剂槽RiboGreen", ""], +} + + +class Experiment1RequiredParams(TypedDict): + sample_throughput: Annotated[int, Field(description="样品通量(1-96,必填),表示一次实验处理的样品数量。")] + + +class Experiment1OptionalParams(TypedDict, total=False): + order_name: Annotated[str, Field(description="订单名称(可选,自动生成)")] + parameter_overrides: Annotated[str, Field(description="参数覆盖(文本格式)")] + auto_register_materials: Annotated[bool, Field(default=True, description="是否自动注册物料(默认True)")] def _utc_now_iso8601_ms() -> str: @@ -90,6 +118,15 @@ def _workflow_list_data( } +def _apply_default_sirna_material_type_mappings(config: Dict[str, Any]) -> None: + configured = config.get("material_type_mappings") + if not isinstance(configured, dict): + configured = {} + merged = dict(DEFAULT_SIRNA_MATERIAL_TYPE_MAPPINGS) + merged.update(configured) + config["material_type_mappings"] = merged + + def load_sirna_config(config_path: str | Path) -> Dict[str, Any]: """从 JSON 文件读取小核酸站配置。""" path = Path(config_path) @@ -197,6 +234,7 @@ class BioyondSirnaStation(BioyondWorkstation): if bioyond_config: merged_config.update(bioyond_config) merged_config.update(kwargs) + _apply_default_sirna_material_type_mappings(merged_config) self._apply_env_api_config(merged_config) self.protocol_type = protocol_type @@ -294,53 +332,80 @@ class BioyondSirnaStation(BioyondWorkstation): always_free=True, node_type=NodeType.MANUAL_CONFIRM, placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, - goal_default={"timeout_seconds": 3600, "assignee_user_ids": []}, + goal_default={ + "optional_params": {"auto_register_materials": True}, + "timeout_seconds": 3600, + "assignee_user_ids": [], + }, feedback_interval=300, - description="提交小核酸实验", + description="提交小核酸实验1(报告基因检测)", ) - def submit_experiment( + def submit_experiment_1( self, - workflow_name: str = "", - sub_workflow_name: str = "", - order_code: str = "", - order_name: str = "", - sample_throughput: int = 4, - parameter_overrides: Optional[Dict[str, Any]] = None, - include_all_task_displayable: bool = False, - reset_before_create: bool = True, - reset_operations: Optional[ - List[Literal["scheduler_reset", "reset_order_status", "reset_location"]] - ] = None, - reset_order_id: str = "", - reset_location_id: str = "", - cleanup_order_code: str = "", - api_host: str = "", - api_key: str = "", - ready_signal: str = "READY", + required_params: Experiment1RequiredParams, + optional_params: Optional[Experiment1OptionalParams] = None, timeout_seconds: int = 3600, assignee_user_ids: Optional[List[str]] = None, **kwargs: Any, ) -> Dict[str, Any]: - """实时查询 LIMS 工作流和步骤参数,构造并提交 create-order。""" + """提交小核酸实验1(报告基因检测)到 Bioyond LIMS。 + + 自动查询实验1工作流参数,使用 API 默认值填充所有参数, + 创建订单并分配物料,最后将物料注册到 UniLabOS 资源树。 + + Args: + required_params: 必填参数组 + sample_throughput: 样品通量(1-96,必填),表示一次实验处理的样品数量。 + optional_params: 可选参数组 + order_name: 订单名称(可选,自动生成) + parameter_overrides: 参数覆盖(文本格式) + auto_register_materials: 是否自动注册物料(默认True) + timeout_seconds: 超时时间(秒,框架参数) + assignee_user_ids: 分配用户ID列表(框架参数) + + Returns: + 包含以下字段的字典: + - success (bool): 是否成功 + - order_code (str): 订单编号 + - order_name (str): 订单名称 + - order_ids (List[str]): 订单ID列表 + - materials (List[Dict]): 物料记录列表 + - materials_by_type (Dict): 按类型分组的物料 + - confirmation_message (str): 确认消息 + - registration_result (Dict): 物料注册结果 + """ + if isinstance(required_params, dict): + sample_throughput = required_params.get("sample_throughput") + else: + sample_throughput = required_params + sample_throughput = int(sample_throughput) + + if optional_params is None: + optional_params = {} + order_name = optional_params.get("order_name", "") + parameter_overrides = optional_params.get("parameter_overrides", "") + auto_register_materials = optional_params.get("auto_register_materials", True) + del timeout_seconds, assignee_user_ids, kwargs - self._update_runtime_api_config(api_host=api_host, api_key=api_key) - self._require_ready_signal(ready_signal) rpc = self._require_hardware_interface("create_order") - workflow = self._resolve_experiment_workflow( - rpc, - workflow_name=workflow_name, - sub_workflow_name=sub_workflow_name, - ) + + # 自动解析实验1工作流(无需用户指定workflow_name) + workflow = self._resolve_experiment_1_workflow(rpc) + step_data = rpc.workflow_step_query(workflow["sub_workflow_id"]) + + # Parse parameter_overrides from text format + parsed_overrides = self._parse_parameter_overrides_text(parameter_overrides) + param_values, parameter_template = self._build_param_values_from_step_data( step_data, - parameter_overrides=parameter_overrides or [], - include_all_task_displayable=include_all_task_displayable, + parameter_overrides=parsed_overrides, + include_all_task_displayable=True, ) if not param_values: raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues") - resolved_order_code, resolved_order_name = self._build_bioyond_order_identity(order_code, order_name) + resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name) order_payload = [ { "orderCode": resolved_order_code, @@ -352,17 +417,7 @@ class BioyondSirnaStation(BioyondWorkstation): } ] - reset_result = None - if reset_before_create: - reset_result = self._run_reset_operations( - rpc, - reset_operations=reset_operations, - reset_order_id=reset_order_id, - reset_location_id=reset_location_id, - cleanup_order_code=cleanup_order_code, - ) - - logger.info(f"正在提交小核酸实验: {resolved_order_name} ({resolved_order_code})") + logger.info(f"正在提交小核酸实验1: {resolved_order_name} ({resolved_order_code})") raw_result = rpc.create_order(json.dumps(copy.deepcopy(order_payload), ensure_ascii=False)) parsed_result = self._parse_lims_result(raw_result) material_records = self._extract_create_order_materials(parsed_result) @@ -376,6 +431,24 @@ class BioyondSirnaStation(BioyondWorkstation): "order_name": resolved_order_name, "workflow": workflow, } + confirmation_data = self._format_create_order_confirmation( + order_code=resolved_order_code, + order_name=resolved_order_name, + workflow=workflow, + order_ids=order_ids, + material_records=material_records, + suggested_locations=suggested_locations, + ) + + registration_result = None + if auto_register_materials and material_records: + try: + registration_result = self._register_materials_to_tree(material_records) + logger.info(f"物料注册完成: {len(registration_result.get('registered', []))} 个物料已添加到资源树") + except Exception as e: + logger.error(f"物料注册失败: {e}") + registration_result = {"error": str(e)} + result = { "success": self._create_result_success(parsed_result, order_ids, material_records), "order_code": resolved_order_code, @@ -383,23 +456,54 @@ class BioyondSirnaStation(BioyondWorkstation): "order_ids": order_ids, "workflow": workflow, "sample_throughput": int(sample_throughput), - "reset_result": reset_result, "payload": order_payload, "parameter_template": parameter_template, "create_order_result": parsed_result, "materials": material_records, + "materials_by_type": confirmation_data.get("materials_by_type", {}), "suggested_locations": suggested_locations, "start_experiment": start_experiment_info, - "confirmation_message": self._format_create_order_confirmation( - order_code=resolved_order_code, - order_name=resolved_order_name, - workflow=workflow, - order_ids=order_ids, - material_records=material_records, - suggested_locations=suggested_locations, - ), + "confirmation_message": confirmation_data.get("confirmation_message", ""), + "registration_result": registration_result, } - return self._with_ready_signal(result) + return result + + def _parse_parameter_overrides_text(self, text: str) -> Dict[str, Any]: + """Parse parameter overrides from text format. + + Supports two formats: + 1. Key-value pairs: "key1=value1,key2=value2" + 2. JSON string: '{"key1": "value1", "key2": "value2"}' + + Args: + text: Parameter overrides as text + + Returns: + Dict of parameter overrides + """ + if not text or not text.strip(): + return {} + + text = text.strip() + + # Try JSON format first + if text.startswith("{"): + try: + return json.loads(text) + except json.JSONDecodeError: + logger.warning(f"无法解析 JSON 格式的参数覆盖: {text}") + return {} + + # Parse key=value,key=value format + result = {} + for pair in text.split(","): + pair = pair.strip() + if "=" not in pair: + continue + key, value = pair.split("=", 1) + result[key.strip()] = value.strip() + + return result @action( always_free=True, @@ -1164,31 +1268,188 @@ class BioyondSirnaStation(BioyondWorkstation): order_ids: List[str], material_records: List[Dict[str, Any]], suggested_locations: List[Dict[str, str]], - ) -> str: + ) -> Dict[str, Any]: + """Format create order confirmation message with grouped materials. + + Returns: + Dict with 'confirmation_message' (str) and 'materials_by_type' (dict) + """ lines = [ f"实验已提交: {order_name} ({order_code})", f"工作流: {workflow.get('workflow_name', '')} / {workflow.get('sub_workflow_name', '')}", ] if order_ids: lines.append(f"实验ID: {', '.join(order_ids)}") + + # Group materials by type for better readability + grouped = {} if material_records: - lines.append("所需物料与建议库位:") - for index, record in enumerate(material_records, 1): - name = record.get("materialName") or "未命名物料" - code = record.get("materialCode") or "-" - quantity = record.get("quantity") or "-" - material_type = record.get("materialTypeName") or record.get("materialTypeMode") or "-" - location = record.get("locationShowName") or record.get("locationCode") or "-" - lines.append(f"{index}. {name} [{code}], {quantity}, {material_type}, 建议库位 {location}") + lines.append("\n实验物料分配确认:") + grouped = self._group_materials_by_type(material_records) + + for mode in ["Sample", "Consumables", "Reagent"]: + materials = grouped.get(mode, []) + if materials: + lines.append(f"\n【{mode}】") + for i, mat in enumerate(materials, 1): + name = mat.get("materialName") or "未命名物料" + code = mat.get("materialCode") or "-" + quantity = mat.get("quantity") or "-" + location = mat.get("locationShowName") or mat.get("locationCode") or "-" + lines.append(f" {i}. {name} ({code})") + lines.append(f" 数量: {quantity}, 位置: {location}") else: lines.append("所需物料与建议库位: LIMS 未返回预分配记录") + if suggested_locations: - lines.append("库位汇总:") + lines.append("\n库位汇总:") for index, location in enumerate(suggested_locations, 1): material = location.get("material_name") or "未命名物料" code = location.get("location_code") or location.get("location_id") or "-" lines.append(f"{index}. {material} -> {code}") - return "\n".join(lines) + + return { + "confirmation_message": "\n".join(lines), + "materials_by_type": grouped, + } + + def _group_materials_by_type(self, materials: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + """Group materials by materialTypeMode (Sample/Consumables/Reagent).""" + grouped: Dict[str, List[Dict[str, Any]]] = { + "Sample": [], + "Consumables": [], + "Reagent": [], + } + for mat in materials: + mode = mat.get("materialTypeMode", "Unknown") + if mode not in grouped: + grouped[mode] = [] + grouped[mode].append(mat) + return grouped + + def _register_materials_to_tree(self, material_records: List[Dict[str, Any]]) -> Dict[str, Any]: + """Register Bioyond materials to UniLabOS resource tree after user confirmation.""" + from unilabos.resources.bioyond.sirna_materials import get_material_class_by_type_code + + deck = getattr(self, "deck", None) + if deck is None: + logger.warning("deck 未初始化,跳过 resource tree 注册") + return {"registered": [], "skipped": material_records, "reason": "no_deck"} + + registered = [] + skipped = [] + for mat in material_records: + type_code = mat.get("materialTypeCode", "") + resource_class = get_material_class_by_type_code(type_code) + if resource_class is None: + logger.warning(f"未知 materialTypeCode {type_code},跳过: {mat.get('materialName')}") + skipped.append(mat) + continue + + location_code = mat.get("locationShowName") or mat.get("locationCode") or "" + if not location_code: + skipped.append(mat) + continue + + try: + warehouse, idx = self._resolve_location_to_warehouse(location_code) + except (ValueError, RuntimeError) as exc: + logger.warning(f"解析库位 {location_code} 失败: {exc}") + skipped.append(mat) + continue + + material_code = mat.get("materialCode") or f"mat_{type_code}_{location_code}" + plr_resource = resource_class(name=material_code) + plr_resource.unilabos_extra = { + "material_bioyond_id": mat.get("materialId", ""), + "material_bioyond_name": mat.get("materialName", ""), + "material_bioyond_type": mat.get("materialTypeName", ""), + "material_bioyond_type_code": type_code, + "material_bioyond_type_mode": mat.get("materialTypeMode", ""), + "location_code": location_code, + } + + try: + warehouse[idx] = plr_resource + registered.append({ + "material_code": material_code, + "material_name": mat.get("materialName", ""), + "location_code": location_code, + "warehouse": warehouse.name, + "index": idx, + }) + except (IndexError, TypeError) as exc: + logger.warning(f"放置物料 {material_code} 到 {warehouse.name}[{idx}] 失败: {exc}") + skipped.append(mat) + + self._publish_resource_tree_update() + return {"registered": registered, "skipped": skipped} + + def _resolve_location_to_warehouse(self, location_code: str) -> Tuple[Any, int]: + """Map Bioyond location code to (warehouse, index). + + Location codes come from API response (e.g., "10-1", "4-13"). + The deck defines warehouses with known row/col layouts: + - G3移液站: row 1, cols 1-14 (code "1-X" where X <= 14 and in G3 range) + - 自动化堆栈: rows 1-10, cols 1-17 (code "Y-X") + - 离心机配平板堆栈: rows 1-2, cols 1-1 + + Since both G3移液站 and 自动化堆栈 share row-based codes, we rely on + the deck warehouse configuration to determine which warehouse owns which + row/col range. The live API allocates materials using the same site codes + as defined in the deck setup. + """ + deck = getattr(self, "deck", None) + if deck is None: + raise RuntimeError("deck 未初始化") + + parts = location_code.replace("-", "-").split("-") + if len(parts) != 2: + raise ValueError(f"无法解析库位代码: {location_code!r}") + row = int(parts[0]) + col = int(parts[1]) + + # Try to find the matching warehouse by checking site keys + for child in deck.children: + warehouse = child + if not hasattr(warehouse, "sites"): + continue + site_key = f"{row}-{col}" + sites = warehouse.sites if hasattr(warehouse, "sites") else {} + if isinstance(sites, dict) and site_key in sites: + idx = list(sites.keys()).index(site_key) + return warehouse, idx + + # Fallback: use row/col based heuristic matching deck dimensions + for child in deck.children: + warehouse = child + num_sites = getattr(warehouse, "num_items", 0) or len(getattr(warehouse, "children", [])) + if num_sites == 0: + continue + num_cols = getattr(warehouse, "_num_items_y", None) or getattr(warehouse, "num_items_y", 1) + num_rows = getattr(warehouse, "_num_items_x", None) or getattr(warehouse, "num_items_x", 1) + if 1 <= row <= num_rows and 1 <= col <= num_cols: + idx = (row - 1) * num_cols + (col - 1) + if idx < num_sites: + return warehouse, idx + + raise ValueError(f"未找到与库位 {location_code} 匹配的 warehouse") + + def _publish_resource_tree_update(self) -> None: + """Trigger ROS2 resource tree update for frontend refresh.""" + ros_node = getattr(self, "_ros_node", None) + if ros_node is None: + return + try: + if hasattr(ros_node, "update_resource"): + deck = getattr(self, "deck", None) + if deck is not None: + ros_node.update_resource( + resource_name=deck.name, + resource_data=deck.serialize() if hasattr(deck, "serialize") else {}, + ) + except Exception as exc: + logger.warning(f"resource tree 更新失败 (非阻塞): {exc}") def _record_name(self, record: Optional[Dict[str, Any]]) -> str: if not isinstance(record, dict): diff --git a/unilabos/resources/bioyond/sirna_materials.py b/unilabos/resources/bioyond/sirna_materials.py new file mode 100644 index 00000000..9ec44b06 --- /dev/null +++ b/unilabos/resources/bioyond/sirna_materials.py @@ -0,0 +1,152 @@ +"""Sirna Station Material Resource Definitions + +Defines PyLabRobot resource classes for Bioyond Sirna station materials. +Each class is decorated with @resource for AST-based registry discovery. +""" + +from typing import Optional +from collections import OrderedDict +from pylabrobot.resources import Plate, TipRack, Container + +from unilabos.registry.decorators import resource + + +@resource( + id="bioyond_sirna_g3_200ul_tip_rack", + category=["labware", "tip_rack"], + description="G3-200ul枪头盒 for Sirna station", +) +class BioyondSirna_G3_200ul_TipRack(TipRack): + """G3-200ul tip rack for Sirna liquid handling.""" + + def __init__( + self, + name: str, + with_tips: bool = True, + ): + super().__init__( + name=name, + size_x=127.76, + size_y=85.48, + size_z=64.0, + model="bioyond_sirna_g3_200ul_tip_rack", + with_tips=with_tips, + ordering=OrderedDict(), # Empty ordering to satisfy PyLabRobot requirement + ) + + +@resource( + id="bioyond_sirna_g3_50ul_tip_rack", + category=["labware", "tip_rack"], + description="G3-50ul枪头盒 for Sirna station", +) +class BioyondSirna_G3_50ul_TipRack(TipRack): + """G3-50ul tip rack for Sirna liquid handling.""" + + def __init__( + self, + name: str, + with_tips: bool = True, + ): + super().__init__( + name=name, + size_x=127.76, + size_y=85.48, + size_z=64.0, + model="bioyond_sirna_g3_50ul_tip_rack", + with_tips=with_tips, + ordering=OrderedDict(), # Empty ordering to satisfy PyLabRobot requirement + ) + + +@resource( + id="bioyond_sirna_384_well_plate", + category=["labware", "plate"], + description="384孔板 for Sirna assays", +) +class BioyondSirna_384WellPlate(Plate): + """384-well plate for Sirna reporter gene detection.""" + + def __init__( + self, + name: str, + lid: Optional[object] = None, + ): + super().__init__( + name=name, + size_x=127.76, + size_y=85.48, + size_z=14.35, + lid=lid, + model="bioyond_sirna_384_well_plate", + plate_type="skirted", + ) + + +@resource( + id="bioyond_sirna_cell_culture_plate", + category=["labware", "plate"], + description="细胞培养板 for Sirna cell culture", +) +class BioyondSirna_CellCulturePlate(Plate): + """Cell culture plate for Sirna experiments.""" + + def __init__( + self, + name: str, + lid: Optional[object] = None, + ): + super().__init__( + name=name, + size_x=127.76, + size_y=85.48, + size_z=14.35, + lid=lid, + model="bioyond_sirna_cell_culture_plate", + plate_type="skirted", + ) + + +@resource( + id="bioyond_sirna_reagent_trough", + category=["labware", "trough"], + description="试剂槽 for Sirna reagents", +) +class BioyondSirna_ReagentTrough(Container): + """Reagent trough for Sirna station reagents (RiboGreen, etc.).""" + + def __init__( + self, + name: str, + max_volume: float = 300000.0, # 300mL default + ): + super().__init__( + name=name, + size_x=127.76, + size_y=85.48, + size_z=44.0, + max_volume=max_volume, + model="bioyond_sirna_reagent_trough", + ) + + +# Material type code mapping for dynamic instantiation +MATERIAL_TYPE_CODE_TO_CLASS = { + "0016": BioyondSirna_G3_200ul_TipRack, + "0017": BioyondSirna_G3_50ul_TipRack, + "0015": BioyondSirna_384WellPlate, + "0001": BioyondSirna_CellCulturePlate, + "0006": BioyondSirna_ReagentTrough, +} + + +def get_material_class_by_type_code(type_code: str): + """Get resource class by Bioyond material type code. + + Args: + type_code: Bioyond materialTypeCode (e.g., "0016", "0017") + + Returns: + Resource class or None if not found + """ + return MATERIAL_TYPE_CODE_TO_CLASS.get(type_code)