From afddc6e40caf332afa8c866542250cb2f5015c42 Mon Sep 17 00:00:00 2001 From: q434343 <554662886@qq.com> Date: Tue, 31 Mar 2026 14:32:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=8A=E4=BC=A0=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/workflow/common.py | 331 +++++++++++++++++- unilabos/workflow/convert_from_json.py | 22 +- .../legacy/convert_from_json_legacy.py | 7 + 3 files changed, 341 insertions(+), 19 deletions(-) diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index 7614e8a1..5965a460 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -152,6 +152,253 @@ def _map_deck_slot(raw_slot: str, object_type: str = "") -> str: return {"4": "13", "8": "14"}.get(s, s) +def _labware_def_index(labware_defs: Optional[List[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]: + m: Dict[str, Dict[str, Any]] = {} + for d in labware_defs or []: + for k in ("id", "name", "reagent_id", "reagent"): + key = d.get(k) + if key is not None and str(key): + m[str(key)] = d + return m + + +def _labware_hint_text(labware_id: str, item: Dict[str, Any]) -> str: + """合并 id 与协议里的 labware 描述(OpenTrons 全名常在 labware 字段)。""" + parts = [str(labware_id), str(item.get("labware", "") or "")] + return " ".join(parts).lower() + + +def _infer_reagent_kind(labware_id: str, item: Dict[str, Any]) -> str: + ot = (item.get("object") or "").strip().lower() + if ot == "trash": + return "trash" + if ot == "tiprack": + return "tip_rack" + lid = _labware_hint_text(labware_id, item) + if "trash" in lid: + return "trash" + # tiprack / tip + rack(顺序在 tuberack 之前) + if "tiprack" in lid or ("tip" in lid and "rack" in lid): + return "tip_rack" + # 离心管架 / OpenTrons tuberack(勿与 96 tiprack 混淆) + if "tuberack" in lid or "tube_rack" in lid: + return "tube_rack" + if "eppendorf" in lid and "rack" in lid: + return "tube_rack" + if "safelock" in lid and "rack" in lid: + return "tube_rack" + if "rack" in lid and "tip" not in lid: + return "tube_rack" + return "plate" + + +def _infer_tube_rack_num_positions(labware_id: str, item: Dict[str, Any]) -> int: + """从 ``24_tuberack`` 等命名中解析孔位数;解析不到则默认 24(与 PRCXI_EP_Adapter 4×6 一致)。""" + hint = _labware_hint_text(labware_id, item) + m = re.search(r"(\d+)_tuberack", hint) + if m: + return int(m.group(1)) + m = re.search(r"tuberack[_\s]*(\d+)", hint) + if m: + return int(m.group(1)) + m = re.search(r"(\d+)\s*[-_]?\s*pos(?:ition)?s?", hint) + if m: + return int(m.group(1)) + return 96 + + +def _tip_volume_hint(item: Dict[str, Any], labware_id: str) -> Optional[float]: + s = _labware_hint_text(labware_id, item) + for v in (1250, 1000, 300, 200, 10): + if f"{v}ul" in s or f"{v}μl" in s or f"{v}u" in s: + return float(v) + if f" {v} " in f" {s} ": + return float(v) + return None + + +def _volume_template_covers_requirement(template: Dict[str, Any], req: Optional[float], kind: str) -> bool: + """有明确需求体积时,模板标称 Volume 必须 >= 需求;无 Volume 的模板不参与(trash 除外)。""" + if kind == "trash": + return True + if req is None or req <= 0: + return True + mv = float(template.get("Volume") or 0) + if mv <= 0: + return False + return mv >= req + + +def _direct_labware_class_name(item: Dict[str, Any]) -> str: + """仅用于 tip_rack 且 ``preserve_tip_rack_incoming_class=True``:``class_name``/``class`` 原样;否则 ``labware`` → ``lab_*``。""" + explicit = item.get("class_name") or item.get("class") + if explicit is not None and str(explicit).strip() != "": + return str(explicit).strip() + lw = str(item.get("labware", "") or "").strip() + if lw: + return f"lab_{lw.lower().replace('.', 'point').replace(' ', '_')}" + return "" + + +def _match_score_prcxi_template( + template: Dict[str, Any], + num_children: int, + child_max_volume: Optional[float], +) -> float: + """孔数差主导;有需求体积且模板已满足 >= 时,余量比例 (模板-需求)/需求 越小越好(优先选刚好够的)。""" + hole_count = int(template.get("hole_count") or 0) + hole_diff = abs(num_children - hole_count) + material_volume = float(template.get("Volume") or 0) + req = child_max_volume + if req is not None and req > 0 and material_volume > 0: + vol_diff = (material_volume - req) / max(req, 1e-9) + elif material_volume > 0 and req is not None: + vol_diff = abs(float(req) - material_volume) / material_volume + else: + vol_diff = 0.0 + return hole_diff * 1000 + vol_diff + + +def _apply_prcxi_labware_auto_match( + labware_info: Dict[str, Dict[str, Any]], + labware_defs: Optional[List[Dict[str, Any]]] = None, + *, + preserve_tip_rack_incoming_class: bool = True, +) -> None: + """上传构建图前:按孔数+容量将 reagent 条目匹配到 ``prcxi_labware`` 注册类名,写入 ``prcxi_class_name``。 + 若给出需求体积,仅选用模板标称 Volume >= 该值的物料,并在满足条件的模板中选余量最小者。 + + ``preserve_tip_rack_incoming_class=True``(默认)时:**仅 tip_rack** 不做模板匹配,类名由 ``class_name``/``class`` 或 + ``labware``(``lab_*``)直接给出;**plate / tube_rack / trash 等**仍按注册模板匹配。 + ``False`` 时 **全部**(含 tip_rack)走模板匹配。""" + if not labware_info: + return + + default_prcxi_tip_class = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips") + + try: + from unilabos.devices.liquid_handling.prcxi.prcxi_labware import get_prcxi_labware_template_specs + except Exception: + return + + templates = get_prcxi_labware_template_specs() + if not templates: + return + + def_map = _labware_def_index(labware_defs) + + for labware_id, item in labware_info.items(): + if item.get("prcxi_class_name"): + continue + + kind = _infer_reagent_kind(labware_id, item) + + if preserve_tip_rack_incoming_class and kind == "tip_rack": + inc_s = _direct_labware_class_name(item) + if inc_s == default_prcxi_tip_class: + inc_s = "" + if inc_s: + item["prcxi_class_name"] = inc_s + continue + + explicit = item.get("class_name") or item.get("class") + if explicit and str(explicit).startswith("PRCXI_"): + item["prcxi_class_name"] = str(explicit) + continue + + extra = def_map.get(str(labware_id), {}) + + wells = item.get("well") or [] + well_n = len(wells) if isinstance(wells, list) else 0 + num_from_def = int(extra.get("num_wells") or extra.get("well_count") or item.get("num_wells") or 0) + + if kind == "trash": + num_children = 0 + elif kind == "tip_rack": + num_children = num_from_def if num_from_def > 0 else 96 + elif kind == "tube_rack": + if num_from_def > 0: + num_children = num_from_def + elif well_n > 0: + num_children = well_n + else: + num_children = _infer_tube_rack_num_positions(labware_id, item) + else: + num_children = num_from_def if num_from_def > 0 else 96 + + child_max_volume = item.get("max_volume") + if child_max_volume is None: + child_max_volume = extra.get("max_volume") + try: + child_max_volume_f = float(child_max_volume) if child_max_volume is not None else None + except (TypeError, ValueError): + child_max_volume_f = None + + if kind == "tip_rack" and child_max_volume_f is None: + child_max_volume_f = _tip_volume_hint(item, labware_id) or 300.0 + + candidates = [t for t in templates if t["kind"] == kind] + if not candidates: + continue + + best = None + best_score = float("inf") + for t in candidates: + if kind != "trash" and int(t.get("hole_count") or 0) <= 0: + continue + if not _volume_template_covers_requirement(t, child_max_volume_f, kind): + continue + sc = _match_score_prcxi_template(t, num_children, child_max_volume_f) + if sc < best_score: + best_score = sc + best = t + + if best: + item["prcxi_class_name"] = best["class_name"] + + +def _reconcile_slot_carrier_prcxi_class( + labware_info: Dict[str, Dict[str, Any]], + *, + preserve_tip_rack_incoming_class: bool = False, +) -> None: + """同一 deck 槽位上多条 reagent 时,按载体类型优先级统一 ``prcxi_class_name``,避免先遍历到 96 板后槽位被错误绑定。 + + ``preserve_tip_rack_incoming_class=True`` 时:tip_rack 条目不参与同槽类名合并(不被覆盖、也不把 tip 类名扩散到同槽其它条目)。""" + by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {} + for lid, item in labware_info.items(): + ot = item.get("object", "") or "" + slot = _map_deck_slot(str(item.get("slot", "")), ot) + if not slot: + continue + by_slot.setdefault(str(slot), []).append((lid, item)) + + priority = {"trash": 0, "tube_rack": 1, "tip_rack": 2, "plate": 3} + + for _slot, pairs in by_slot.items(): + if len(pairs) < 2: + continue + + def _rank(p: Tuple[str, Dict[str, Any]]) -> int: + return priority.get(_infer_reagent_kind(p[0], p[1]), 9) + + pairs_sorted = sorted(pairs, key=_rank) + best_cls = None + for lid, it in pairs_sorted: + if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack": + continue + c = it.get("prcxi_class_name") + if c: + best_cls = c + break + if not best_cls: + continue + for lid, it in pairs: + if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack": + continue + it["prcxi_class_name"] = best_cls + + # ---------------- Graph ---------------- @@ -377,6 +624,7 @@ def build_protocol_graph( workstation_name: str, action_resource_mapping: Optional[Dict[str, str]] = None, labware_defs: Optional[List[Dict[str, Any]]] = None, + preserve_tip_rack_incoming_class: bool = True, ) -> WorkflowGraph: """统一的协议图构建函数,根据设备类型自动选择构建逻辑 @@ -385,28 +633,67 @@ def build_protocol_graph( protocol_steps: 协议步骤列表 workstation_name: 工作站名称 action_resource_mapping: action 到 resource_name 的映射字典,可选 + labware_defs: 可选,``[{"id": "...", "num_wells": 96, "max_volume": 2200}, ...]`` 等,辅助 PRCXI 模板匹配 + preserve_tip_rack_incoming_class: 默认 True 时**仅 tip_rack** 不跑模板匹配(类名由传入的 class/labware 决定); + **其它载体**仍按 PRCXI 模板匹配。False 时 **全部**(含 tip_rack)都走模板匹配。 """ G = WorkflowGraph() resource_last_writer = {} # reagent_name -> "node_id:port" slot_to_create_resource = {} # slot -> create_resource node_id + _apply_prcxi_labware_auto_match( + labware_info, + labware_defs, + preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class, + ) + _reconcile_slot_carrier_prcxi_class( + labware_info, + preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class, + ) + protocol_steps = refactor_data(protocol_steps, action_resource_mapping) # ==================== 第一步:按 slot 去重创建 create_resource 节点 ==================== - # 收集所有唯一的 slot - slots_info = {} # slot -> {labware, res_id} + # 按槽聚合:同一 slot 多条 reagent 时不能只取遍历顺序第一条,否则 tip 的 prcxi_class_name / object 会被其它条目盖住 + by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {} for labware_id, item in labware_info.items(): object_type = item.get("object", "") or "" slot = _map_deck_slot(str(item.get("slot", "")), object_type) - labware = item.get("labware", "") - if slot and slot not in slots_info: - res_id = f"{labware}_slot_{slot}" - slots_info[slot] = { - "labware": labware, - "res_id": res_id, - "labware_id": labware_id, - "object": object_type, - } + if not slot: + continue + by_slot.setdefault(slot, []).append((labware_id, item)) + + slots_info: Dict[str, Dict[str, Any]] = {} + for slot, pairs in by_slot.items(): + def _ot_tip(it: Dict[str, Any]) -> bool: + return str(it.get("object", "") or "").strip().lower() == "tiprack" + + tip_pairs = [(lid, it) for lid, it in pairs if _ot_tip(it)] + chosen_lid = "" + chosen_item: Dict[str, Any] = {} + prcxi_val: Optional[str] = None + + scan = tip_pairs if tip_pairs else pairs + for lid, it in scan: + c = it.get("prcxi_class_name") + if c: + chosen_lid, chosen_item, prcxi_val = lid, it, str(c) + break + if not chosen_lid and scan: + chosen_lid, chosen_item = scan[0] + pv = chosen_item.get("prcxi_class_name") + prcxi_val = str(pv) if pv else None + + labware = str(chosen_item.get("labware", "") or "") + res_id = f"{labware}_slot_{slot}" if labware.strip() else f"{chosen_lid}_slot_{slot}" + res_id = res_id.replace(" ", "_") + slots_info[slot] = { + "labware": labware, + "res_id": res_id, + "labware_id": chosen_lid, + "object": chosen_item.get("object", "") or "", + "prcxi_class_name": prcxi_val, + } # 创建 Group 节点,包含所有 create_resource 节点 group_node_id = str(uuid.uuid4()) @@ -429,11 +716,21 @@ def build_protocol_graph( for slot, info in slots_info.items(): node_id = str(uuid.uuid4()) res_id = info["res_id"] - res_type_name = info["labware"].lower().replace(".", "point") - object_type = info.get("object", "") - res_type_name = f"lab_{res_type_name}" - if object_type == "trash": + object_type = info.get("object", "") or "" + ot_lo = str(object_type).strip().lower() + matched = info.get("prcxi_class_name") + if ot_lo == "trash": res_type_name = "PRCXI_trash" + elif matched: + res_type_name = matched + elif ot_lo == "tiprack": + if preserve_tip_rack_incoming_class: + lid = str(info.get("labware_id") or "").strip() or "tip_rack" + res_type_name = f"lab_{lid.lower().replace('.', 'point').replace(' ', '_')}" + else: + res_type_name = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips") + else: + res_type_name = f"lab_{info['labware'].lower().replace('.', 'point')}" G.add_node( node_id, template_name="create_resource", @@ -456,9 +753,9 @@ def build_protocol_graph( }, ) slot_to_create_resource[slot] = node_id - if object_type == "tiprack": + if ot_lo == "tiprack": resource_last_writer[info["labware_id"]] = f"{node_id}:labware" - if object_type == "trash": + if ot_lo == "trash": trash_create_node_id = node_id # create_resource 之间不需要 ready 连接 diff --git a/unilabos/workflow/convert_from_json.py b/unilabos/workflow/convert_from_json.py index acd0d71a..cd88552b 100644 --- a/unilabos/workflow/convert_from_json.py +++ b/unilabos/workflow/convert_from_json.py @@ -210,6 +210,7 @@ def convert_from_json( data: Union[str, PathLike, Dict[str, Any]], workstation_name: str = DEFAULT_WORKSTATION, validate: bool = True, + preserve_tip_rack_incoming_class: bool = True, ) -> WorkflowGraph: """ 从 JSON 数据或文件转换为 WorkflowGraph @@ -221,6 +222,8 @@ def convert_from_json( data: JSON 文件路径、字典数据、或 JSON 字符串 workstation_name: 工作站名称,默认 "PRCXi" validate: 是否校验句柄配置,默认 True + preserve_tip_rack_incoming_class: True(默认)时仅 tip_rack 不跑模板、按传入类名/labware;其它载体仍自动匹配。 + False 时全部走模板。JSON 根 ``preserve_tip_rack_incoming_class`` 可覆盖此参数。 Returns: WorkflowGraph: 构建好的工作流图 @@ -263,6 +266,10 @@ def convert_from_json( # reagent 已经是字典格式,用于 set_liquid 和 well 数量查找 labware_info = reagent + preserve = preserve_tip_rack_incoming_class + if "preserve_tip_rack_incoming_class" in json_data: + preserve = bool(json_data["preserve_tip_rack_incoming_class"]) + # 构建工作流图 graph = build_protocol_graph( labware_info=labware_info, @@ -270,6 +277,7 @@ def convert_from_json( workstation_name=workstation_name, action_resource_mapping=ACTION_RESOURCE_MAPPING, labware_defs=labware_defs, + preserve_tip_rack_incoming_class=preserve, ) # 校验句柄配置 @@ -287,6 +295,7 @@ def convert_from_json( def convert_json_to_node_link( data: Union[str, PathLike, Dict[str, Any]], workstation_name: str = DEFAULT_WORKSTATION, + preserve_tip_rack_incoming_class: bool = True, ) -> Dict[str, Any]: """ 将 JSON 数据转换为 node-link 格式的字典 @@ -298,13 +307,18 @@ def convert_json_to_node_link( Returns: Dict: node-link 格式的工作流数据 """ - graph = convert_from_json(data, workstation_name) + graph = convert_from_json( + data, + workstation_name, + preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class, + ) return graph.to_node_link_dict() def convert_json_to_workflow_list( data: Union[str, PathLike, Dict[str, Any]], workstation_name: str = DEFAULT_WORKSTATION, + preserve_tip_rack_incoming_class: bool = True, ) -> List[Dict[str, Any]]: """ 将 JSON 数据转换为工作流列表格式 @@ -316,5 +330,9 @@ def convert_json_to_workflow_list( Returns: List: 工作流节点列表 """ - graph = convert_from_json(data, workstation_name) + graph = convert_from_json( + data, + workstation_name, + preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class, + ) return graph.to_dict() diff --git a/unilabos/workflow/legacy/convert_from_json_legacy.py b/unilabos/workflow/legacy/convert_from_json_legacy.py index 7a6d2b40..3d830d94 100644 --- a/unilabos/workflow/legacy/convert_from_json_legacy.py +++ b/unilabos/workflow/legacy/convert_from_json_legacy.py @@ -234,6 +234,7 @@ def convert_from_json( data: Union[str, PathLike, Dict[str, Any]], workstation_name: str = "PRCXi", validate: bool = True, + preserve_tip_rack_incoming_class: bool = True, ) -> WorkflowGraph: """ 从 JSON 数据或文件转换为 WorkflowGraph @@ -246,6 +247,7 @@ def convert_from_json( data: JSON 文件路径、字典数据、或 JSON 字符串 workstation_name: 工作站名称,默认 "PRCXi" validate: 是否校验句柄配置,默认 True + preserve_tip_rack_incoming_class: True 时仅 tip 不跑模板;False 时全部匹配;JSON 根字段同名可覆盖 Returns: WorkflowGraph: 构建好的工作流图 @@ -295,12 +297,17 @@ def convert_from_json( "3. {'steps': [...], 'labware': [...]}" ) + preserve = preserve_tip_rack_incoming_class + if "preserve_tip_rack_incoming_class" in json_data: + preserve = bool(json_data["preserve_tip_rack_incoming_class"]) + # 构建工作流图 graph = build_protocol_graph( labware_info=labware_info, protocol_steps=protocol_steps, workstation_name=workstation_name, action_resource_mapping=ACTION_RESOURCE_MAPPING, + preserve_tip_rack_incoming_class=preserve, ) # 校验句柄配置