mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-31 18:24:23 +00:00
修改上传工作流部分代码
This commit is contained in:
@@ -152,6 +152,253 @@ def _map_deck_slot(raw_slot: str, object_type: str = "") -> str:
|
||||
return {"4": "13", "8": "14"}.get(s, s)
|
||||
|
||||
|
||||
def _labware_def_index(labware_defs: Optional[List[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]:
|
||||
m: Dict[str, Dict[str, Any]] = {}
|
||||
for d in labware_defs or []:
|
||||
for k in ("id", "name", "reagent_id", "reagent"):
|
||||
key = d.get(k)
|
||||
if key is not None and str(key):
|
||||
m[str(key)] = d
|
||||
return m
|
||||
|
||||
|
||||
def _labware_hint_text(labware_id: str, item: Dict[str, Any]) -> str:
|
||||
"""合并 id 与协议里的 labware 描述(OpenTrons 全名常在 labware 字段)。"""
|
||||
parts = [str(labware_id), str(item.get("labware", "") or "")]
|
||||
return " ".join(parts).lower()
|
||||
|
||||
|
||||
def _infer_reagent_kind(labware_id: str, item: Dict[str, Any]) -> str:
|
||||
ot = (item.get("object") or "").strip().lower()
|
||||
if ot == "trash":
|
||||
return "trash"
|
||||
if ot == "tiprack":
|
||||
return "tip_rack"
|
||||
lid = _labware_hint_text(labware_id, item)
|
||||
if "trash" in lid:
|
||||
return "trash"
|
||||
# tiprack / tip + rack(顺序在 tuberack 之前)
|
||||
if "tiprack" in lid or ("tip" in lid and "rack" in lid):
|
||||
return "tip_rack"
|
||||
# 离心管架 / OpenTrons tuberack(勿与 96 tiprack 混淆)
|
||||
if "tuberack" in lid or "tube_rack" in lid:
|
||||
return "tube_rack"
|
||||
if "eppendorf" in lid and "rack" in lid:
|
||||
return "tube_rack"
|
||||
if "safelock" in lid and "rack" in lid:
|
||||
return "tube_rack"
|
||||
if "rack" in lid and "tip" not in lid:
|
||||
return "tube_rack"
|
||||
return "plate"
|
||||
|
||||
|
||||
def _infer_tube_rack_num_positions(labware_id: str, item: Dict[str, Any]) -> int:
|
||||
"""从 ``24_tuberack`` 等命名中解析孔位数;解析不到则默认 24(与 PRCXI_EP_Adapter 4×6 一致)。"""
|
||||
hint = _labware_hint_text(labware_id, item)
|
||||
m = re.search(r"(\d+)_tuberack", hint)
|
||||
if m:
|
||||
return int(m.group(1))
|
||||
m = re.search(r"tuberack[_\s]*(\d+)", hint)
|
||||
if m:
|
||||
return int(m.group(1))
|
||||
m = re.search(r"(\d+)\s*[-_]?\s*pos(?:ition)?s?", hint)
|
||||
if m:
|
||||
return int(m.group(1))
|
||||
return 96
|
||||
|
||||
|
||||
def _tip_volume_hint(item: Dict[str, Any], labware_id: str) -> Optional[float]:
|
||||
s = _labware_hint_text(labware_id, item)
|
||||
for v in (1250, 1000, 300, 200, 10):
|
||||
if f"{v}ul" in s or f"{v}μl" in s or f"{v}u" in s:
|
||||
return float(v)
|
||||
if f" {v} " in f" {s} ":
|
||||
return float(v)
|
||||
return None
|
||||
|
||||
|
||||
def _volume_template_covers_requirement(template: Dict[str, Any], req: Optional[float], kind: str) -> bool:
|
||||
"""有明确需求体积时,模板标称 Volume 必须 >= 需求;无 Volume 的模板不参与(trash 除外)。"""
|
||||
if kind == "trash":
|
||||
return True
|
||||
if req is None or req <= 0:
|
||||
return True
|
||||
mv = float(template.get("Volume") or 0)
|
||||
if mv <= 0:
|
||||
return False
|
||||
return mv >= req
|
||||
|
||||
|
||||
def _direct_labware_class_name(item: Dict[str, Any]) -> str:
|
||||
"""仅用于 tip_rack 且 ``preserve_tip_rack_incoming_class=True``:``class_name``/``class`` 原样;否则 ``labware`` → ``lab_*``。"""
|
||||
explicit = item.get("class_name") or item.get("class")
|
||||
if explicit is not None and str(explicit).strip() != "":
|
||||
return str(explicit).strip()
|
||||
lw = str(item.get("labware", "") or "").strip()
|
||||
if lw:
|
||||
return f"lab_{lw.lower().replace('.', 'point').replace(' ', '_')}"
|
||||
return ""
|
||||
|
||||
|
||||
def _match_score_prcxi_template(
|
||||
template: Dict[str, Any],
|
||||
num_children: int,
|
||||
child_max_volume: Optional[float],
|
||||
) -> float:
|
||||
"""孔数差主导;有需求体积且模板已满足 >= 时,余量比例 (模板-需求)/需求 越小越好(优先选刚好够的)。"""
|
||||
hole_count = int(template.get("hole_count") or 0)
|
||||
hole_diff = abs(num_children - hole_count)
|
||||
material_volume = float(template.get("Volume") or 0)
|
||||
req = child_max_volume
|
||||
if req is not None and req > 0 and material_volume > 0:
|
||||
vol_diff = (material_volume - req) / max(req, 1e-9)
|
||||
elif material_volume > 0 and req is not None:
|
||||
vol_diff = abs(float(req) - material_volume) / material_volume
|
||||
else:
|
||||
vol_diff = 0.0
|
||||
return hole_diff * 1000 + vol_diff
|
||||
|
||||
|
||||
def _apply_prcxi_labware_auto_match(
|
||||
labware_info: Dict[str, Dict[str, Any]],
|
||||
labware_defs: Optional[List[Dict[str, Any]]] = None,
|
||||
*,
|
||||
preserve_tip_rack_incoming_class: bool = True,
|
||||
) -> None:
|
||||
"""上传构建图前:按孔数+容量将 reagent 条目匹配到 ``prcxi_labware`` 注册类名,写入 ``prcxi_class_name``。
|
||||
若给出需求体积,仅选用模板标称 Volume >= 该值的物料,并在满足条件的模板中选余量最小者。
|
||||
|
||||
``preserve_tip_rack_incoming_class=True``(默认)时:**仅 tip_rack** 不做模板匹配,类名由 ``class_name``/``class`` 或
|
||||
``labware``(``lab_*``)直接给出;**plate / tube_rack / trash 等**仍按注册模板匹配。
|
||||
``False`` 时 **全部**(含 tip_rack)走模板匹配。"""
|
||||
if not labware_info:
|
||||
return
|
||||
|
||||
default_prcxi_tip_class = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
|
||||
|
||||
try:
|
||||
from unilabos.devices.liquid_handling.prcxi.prcxi_labware import get_prcxi_labware_template_specs
|
||||
except Exception:
|
||||
return
|
||||
|
||||
templates = get_prcxi_labware_template_specs()
|
||||
if not templates:
|
||||
return
|
||||
|
||||
def_map = _labware_def_index(labware_defs)
|
||||
|
||||
for labware_id, item in labware_info.items():
|
||||
if item.get("prcxi_class_name"):
|
||||
continue
|
||||
|
||||
kind = _infer_reagent_kind(labware_id, item)
|
||||
|
||||
if preserve_tip_rack_incoming_class and kind == "tip_rack":
|
||||
inc_s = _direct_labware_class_name(item)
|
||||
if inc_s == default_prcxi_tip_class:
|
||||
inc_s = ""
|
||||
if inc_s:
|
||||
item["prcxi_class_name"] = inc_s
|
||||
continue
|
||||
|
||||
explicit = item.get("class_name") or item.get("class")
|
||||
if explicit and str(explicit).startswith("PRCXI_"):
|
||||
item["prcxi_class_name"] = str(explicit)
|
||||
continue
|
||||
|
||||
extra = def_map.get(str(labware_id), {})
|
||||
|
||||
wells = item.get("well") or []
|
||||
well_n = len(wells) if isinstance(wells, list) else 0
|
||||
num_from_def = int(extra.get("num_wells") or extra.get("well_count") or item.get("num_wells") or 0)
|
||||
|
||||
if kind == "trash":
|
||||
num_children = 0
|
||||
elif kind == "tip_rack":
|
||||
num_children = num_from_def if num_from_def > 0 else 96
|
||||
elif kind == "tube_rack":
|
||||
if num_from_def > 0:
|
||||
num_children = num_from_def
|
||||
elif well_n > 0:
|
||||
num_children = well_n
|
||||
else:
|
||||
num_children = _infer_tube_rack_num_positions(labware_id, item)
|
||||
else:
|
||||
num_children = num_from_def if num_from_def > 0 else 96
|
||||
|
||||
child_max_volume = item.get("max_volume")
|
||||
if child_max_volume is None:
|
||||
child_max_volume = extra.get("max_volume")
|
||||
try:
|
||||
child_max_volume_f = float(child_max_volume) if child_max_volume is not None else None
|
||||
except (TypeError, ValueError):
|
||||
child_max_volume_f = None
|
||||
|
||||
if kind == "tip_rack" and child_max_volume_f is None:
|
||||
child_max_volume_f = _tip_volume_hint(item, labware_id) or 300.0
|
||||
|
||||
candidates = [t for t in templates if t["kind"] == kind]
|
||||
if not candidates:
|
||||
continue
|
||||
|
||||
best = None
|
||||
best_score = float("inf")
|
||||
for t in candidates:
|
||||
if kind != "trash" and int(t.get("hole_count") or 0) <= 0:
|
||||
continue
|
||||
if not _volume_template_covers_requirement(t, child_max_volume_f, kind):
|
||||
continue
|
||||
sc = _match_score_prcxi_template(t, num_children, child_max_volume_f)
|
||||
if sc < best_score:
|
||||
best_score = sc
|
||||
best = t
|
||||
|
||||
if best:
|
||||
item["prcxi_class_name"] = best["class_name"]
|
||||
|
||||
|
||||
def _reconcile_slot_carrier_prcxi_class(
|
||||
labware_info: Dict[str, Dict[str, Any]],
|
||||
*,
|
||||
preserve_tip_rack_incoming_class: bool = False,
|
||||
) -> None:
|
||||
"""同一 deck 槽位上多条 reagent 时,按载体类型优先级统一 ``prcxi_class_name``,避免先遍历到 96 板后槽位被错误绑定。
|
||||
|
||||
``preserve_tip_rack_incoming_class=True`` 时:tip_rack 条目不参与同槽类名合并(不被覆盖、也不把 tip 类名扩散到同槽其它条目)。"""
|
||||
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
|
||||
for lid, item in labware_info.items():
|
||||
ot = item.get("object", "") or ""
|
||||
slot = _map_deck_slot(str(item.get("slot", "")), ot)
|
||||
if not slot:
|
||||
continue
|
||||
by_slot.setdefault(str(slot), []).append((lid, item))
|
||||
|
||||
priority = {"trash": 0, "tube_rack": 1, "tip_rack": 2, "plate": 3}
|
||||
|
||||
for _slot, pairs in by_slot.items():
|
||||
if len(pairs) < 2:
|
||||
continue
|
||||
|
||||
def _rank(p: Tuple[str, Dict[str, Any]]) -> int:
|
||||
return priority.get(_infer_reagent_kind(p[0], p[1]), 9)
|
||||
|
||||
pairs_sorted = sorted(pairs, key=_rank)
|
||||
best_cls = None
|
||||
for lid, it in pairs_sorted:
|
||||
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
|
||||
continue
|
||||
c = it.get("prcxi_class_name")
|
||||
if c:
|
||||
best_cls = c
|
||||
break
|
||||
if not best_cls:
|
||||
continue
|
||||
for lid, it in pairs:
|
||||
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
|
||||
continue
|
||||
it["prcxi_class_name"] = best_cls
|
||||
|
||||
|
||||
# ---------------- Graph ----------------
|
||||
|
||||
|
||||
@@ -377,6 +624,7 @@ def build_protocol_graph(
|
||||
workstation_name: str,
|
||||
action_resource_mapping: Optional[Dict[str, str]] = None,
|
||||
labware_defs: Optional[List[Dict[str, Any]]] = None,
|
||||
preserve_tip_rack_incoming_class: bool = True,
|
||||
) -> WorkflowGraph:
|
||||
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
|
||||
|
||||
@@ -385,28 +633,67 @@ def build_protocol_graph(
|
||||
protocol_steps: 协议步骤列表
|
||||
workstation_name: 工作站名称
|
||||
action_resource_mapping: action 到 resource_name 的映射字典,可选
|
||||
labware_defs: 可选,``[{"id": "...", "num_wells": 96, "max_volume": 2200}, ...]`` 等,辅助 PRCXI 模板匹配
|
||||
preserve_tip_rack_incoming_class: 默认 True 时**仅 tip_rack** 不跑模板匹配(类名由传入的 class/labware 决定);
|
||||
**其它载体**仍按 PRCXI 模板匹配。False 时 **全部**(含 tip_rack)都走模板匹配。
|
||||
"""
|
||||
G = WorkflowGraph()
|
||||
resource_last_writer = {} # reagent_name -> "node_id:port"
|
||||
slot_to_create_resource = {} # slot -> create_resource node_id
|
||||
|
||||
_apply_prcxi_labware_auto_match(
|
||||
labware_info,
|
||||
labware_defs,
|
||||
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
|
||||
)
|
||||
_reconcile_slot_carrier_prcxi_class(
|
||||
labware_info,
|
||||
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
|
||||
)
|
||||
|
||||
protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
|
||||
|
||||
# ==================== 第一步:按 slot 去重创建 create_resource 节点 ====================
|
||||
# 收集所有唯一的 slot
|
||||
slots_info = {} # slot -> {labware, res_id}
|
||||
# 按槽聚合:同一 slot 多条 reagent 时不能只取遍历顺序第一条,否则 tip 的 prcxi_class_name / object 会被其它条目盖住
|
||||
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
|
||||
for labware_id, item in labware_info.items():
|
||||
object_type = item.get("object", "") or ""
|
||||
slot = _map_deck_slot(str(item.get("slot", "")), object_type)
|
||||
labware = item.get("labware", "")
|
||||
if slot and slot not in slots_info:
|
||||
res_id = f"{labware}_slot_{slot}"
|
||||
slots_info[slot] = {
|
||||
"labware": labware,
|
||||
"res_id": res_id,
|
||||
"labware_id": labware_id,
|
||||
"object": object_type,
|
||||
}
|
||||
if not slot:
|
||||
continue
|
||||
by_slot.setdefault(slot, []).append((labware_id, item))
|
||||
|
||||
slots_info: Dict[str, Dict[str, Any]] = {}
|
||||
for slot, pairs in by_slot.items():
|
||||
def _ot_tip(it: Dict[str, Any]) -> bool:
|
||||
return str(it.get("object", "") or "").strip().lower() == "tiprack"
|
||||
|
||||
tip_pairs = [(lid, it) for lid, it in pairs if _ot_tip(it)]
|
||||
chosen_lid = ""
|
||||
chosen_item: Dict[str, Any] = {}
|
||||
prcxi_val: Optional[str] = None
|
||||
|
||||
scan = tip_pairs if tip_pairs else pairs
|
||||
for lid, it in scan:
|
||||
c = it.get("prcxi_class_name")
|
||||
if c:
|
||||
chosen_lid, chosen_item, prcxi_val = lid, it, str(c)
|
||||
break
|
||||
if not chosen_lid and scan:
|
||||
chosen_lid, chosen_item = scan[0]
|
||||
pv = chosen_item.get("prcxi_class_name")
|
||||
prcxi_val = str(pv) if pv else None
|
||||
|
||||
labware = str(chosen_item.get("labware", "") or "")
|
||||
res_id = f"{labware}_slot_{slot}" if labware.strip() else f"{chosen_lid}_slot_{slot}"
|
||||
res_id = res_id.replace(" ", "_")
|
||||
slots_info[slot] = {
|
||||
"labware": labware,
|
||||
"res_id": res_id,
|
||||
"labware_id": chosen_lid,
|
||||
"object": chosen_item.get("object", "") or "",
|
||||
"prcxi_class_name": prcxi_val,
|
||||
}
|
||||
|
||||
# 创建 Group 节点,包含所有 create_resource 节点
|
||||
group_node_id = str(uuid.uuid4())
|
||||
@@ -429,11 +716,21 @@ def build_protocol_graph(
|
||||
for slot, info in slots_info.items():
|
||||
node_id = str(uuid.uuid4())
|
||||
res_id = info["res_id"]
|
||||
res_type_name = info["labware"].lower().replace(".", "point")
|
||||
object_type = info.get("object", "")
|
||||
res_type_name = f"lab_{res_type_name}"
|
||||
if object_type == "trash":
|
||||
object_type = info.get("object", "") or ""
|
||||
ot_lo = str(object_type).strip().lower()
|
||||
matched = info.get("prcxi_class_name")
|
||||
if ot_lo == "trash":
|
||||
res_type_name = "PRCXI_trash"
|
||||
elif matched:
|
||||
res_type_name = matched
|
||||
elif ot_lo == "tiprack":
|
||||
if preserve_tip_rack_incoming_class:
|
||||
lid = str(info.get("labware_id") or "").strip() or "tip_rack"
|
||||
res_type_name = f"lab_{lid.lower().replace('.', 'point').replace(' ', '_')}"
|
||||
else:
|
||||
res_type_name = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
|
||||
else:
|
||||
res_type_name = f"lab_{info['labware'].lower().replace('.', 'point')}"
|
||||
G.add_node(
|
||||
node_id,
|
||||
template_name="create_resource",
|
||||
@@ -456,9 +753,9 @@ def build_protocol_graph(
|
||||
},
|
||||
)
|
||||
slot_to_create_resource[slot] = node_id
|
||||
if object_type == "tiprack":
|
||||
if ot_lo == "tiprack":
|
||||
resource_last_writer[info["labware_id"]] = f"{node_id}:labware"
|
||||
if object_type == "trash":
|
||||
if ot_lo == "trash":
|
||||
trash_create_node_id = node_id
|
||||
# create_resource 之间不需要 ready 连接
|
||||
|
||||
|
||||
Reference in New Issue
Block a user