Compare commits

...

5 Commits

Author SHA1 Message Date
q434343
5d208c832b 修改工作流上传以及lh的物料初步判定 2026-03-02 18:32:44 +08:00
q434343
786498904d 修改上传方式,添加tip_rack的连线 2026-03-02 18:32:18 +08:00
q434343
a9ea9f425d 添加单枪头的多对多移液判定 2026-03-02 18:31:28 +08:00
Xuwznln
b3bc951cae registry update & workflow update 2026-03-02 18:31:26 +08:00
Xuwznln
01df4f1115 add resource 2026-03-02 18:30:07 +08:00
7 changed files with 10900 additions and 102 deletions

View File

@@ -1159,11 +1159,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
Number of mix cycles. If *None* (default) no mixing occurs regardless of
mix_stage.
"""
num_sources = len(sources)
num_targets = len(targets)
len_asp_vols = len(asp_vols)
len_dis_vols = len(dis_vols)
# 确保 use_channels 有默认值
if use_channels is None:
# 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7
use_channels = list(range(self.channel_num)) if self.channel_num > 0 else [0]
use_channels = list(range(self.channel_num)) if self.channel_num == 8 else [0]
elif len(use_channels) == 8:
if self.channel_num != 8:
raise ValueError(f"if channel_num is 8, use_channels length must be 8, but got {len(use_channels)}")
if num_sources%8 != 0 or num_targets%8 != 0 or len_asp_vols%8 != 0 or len_dis_vols%8 != 0:
raise ValueError(f"if channel_num is 8, sources, targets, asp_vols, and dis_vols length must be divisible by 8, but got {num_sources}, {num_targets}, {len_asp_vols}, and {len_dis_vols}")
if is_96_well:
pass # This mode is not verified.
@@ -1191,90 +1199,234 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if mix_times is not None:
mix_times = int(mix_times)
# 设置tip racks
self.set_tiprack(tip_racks)
# 识别传输模式mix_times 为 None 也应该能正常移液,只是不做 mix
num_sources = len(sources)
num_targets = len(targets)
len_asp_vols = len(asp_vols)
len_dis_vols = len(dis_vols)
if num_sources == 1 and num_targets > 1:
# 模式1: 一对多 (1 source -> N targets)
await self._transfer_one_to_many(
sources[0],
targets,
tip_racks,
use_channels,
asp_vols,
dis_vols,
asp_flow_rates,
dis_flow_rates,
offsets,
touch_tip,
liquid_height,
blow_out_air_volume,
spread,
mix_stage,
mix_times,
mix_vol,
mix_rate,
mix_liquid_height,
delays,
)
elif num_sources > 1 and num_targets == 1:
# 模式2: 多对一 (N sources -> 1 target)
await self._transfer_many_to_one(
sources,
targets[0],
tip_racks,
use_channels,
asp_vols,
dis_vols,
asp_flow_rates,
dis_flow_rates,
offsets,
touch_tip,
liquid_height,
blow_out_air_volume,
spread,
mix_stage,
mix_times,
mix_vol,
mix_rate,
mix_liquid_height,
delays,
)
elif num_sources == num_targets:
# 模式3: 一对一 (N sources -> N targets)
await self._transfer_one_to_one(
sources,
targets,
tip_racks,
use_channels,
asp_vols,
dis_vols,
asp_flow_rates,
dis_flow_rates,
offsets,
touch_tip,
liquid_height,
blow_out_air_volume,
spread,
mix_stage,
mix_times,
mix_vol,
mix_rate,
mix_liquid_height,
delays,
)
else:
raise ValueError(
f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
"Supported modes: 1->N, N->1, or N->N."
)
# if num_targets != 1 and num_sources != 1:
# if len_asp_vols != num_sources and len_asp_vols != num_targets:
# raise ValueError(f"asp_vols length must be equal to sources or targets length, but got {len_asp_vols} and {num_sources} and {num_targets}")
# if len_dis_vols != num_sources and len_dis_vols != num_targets:
# raise ValueError(f"dis_vols length must be equal to sources or targets length, but got {len_dis_vols} and {num_sources} and {num_targets}")
if len(use_channels) == 1:
max_len = max(num_sources, num_targets)
for i in range(max_len):
# 辅助函数安全地从列表中获取元素如果列表为空则返回None
def safe_get(lst, idx, default=None):
return [lst[idx]] if lst else default
# 动态构建参数字典,只传递实际提供的参数
kwargs = {
'sources': [sources[i%num_sources]],
'targets': [targets[i%num_targets]],
'tip_racks': tip_racks,
'use_channels': use_channels,
'asp_vols': [asp_vols[i%len_asp_vols]],
'dis_vols': [dis_vols[i%len_dis_vols]],
}
# 条件性添加可选参数
if asp_flow_rates is not None:
kwargs['asp_flow_rates'] = [asp_flow_rates[i%len_asp_vols]]
if dis_flow_rates is not None:
kwargs['dis_flow_rates'] = [dis_flow_rates[i%len_dis_vols]]
if offsets is not None:
kwargs['offsets'] = safe_get(offsets, i)
if touch_tip is not None:
kwargs['touch_tip'] = touch_tip if touch_tip else False
if liquid_height is not None:
kwargs['liquid_height'] = safe_get(liquid_height, i)
if blow_out_air_volume is not None:
kwargs['blow_out_air_volume'] = safe_get(blow_out_air_volume, i)
if spread is not None:
kwargs['spread'] = spread
if mix_stage is not None:
kwargs['mix_stage'] = safe_get(mix_stage, i)
if mix_times is not None:
kwargs['mix_times'] = safe_get(mix_times, i)
if mix_vol is not None:
kwargs['mix_vol'] = safe_get(mix_vol, i)
if mix_rate is not None:
kwargs['mix_rate'] = safe_get(mix_rate, i)
if mix_liquid_height is not None:
kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i)
if delays is not None:
kwargs['delays'] = safe_get(delays, i)
await self._transfer_base_method(**kwargs)
# if num_sources == 1 and num_targets > 1:
# # 模式1: 一对多 (1 source -> N targets)
# await self._transfer_one_to_many(
# sources,
# targets,
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# elif num_sources > 1 and num_targets == 1:
# # 模式2: 多对一 (N sources -> 1 target)
# await self._transfer_many_to_one(
# sources,
# targets[0],
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# elif num_sources == num_targets:
# # 模式3: 一对一 (N sources -> N targets)
# await self._transfer_one_to_one(
# sources,
# targets,
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# else:
# raise ValueError(
# f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
# "Supported modes: 1->N, N->1, or N->N."
# )
return TransferLiquidReturn(
sources=ResourceTreeSet.from_plr_resources(list(sources), known_newly_created=False).dump(), # type: ignore
targets=ResourceTreeSet.from_plr_resources(list(targets), known_newly_created=False).dump(), # type: ignore
)
async def _transfer_base_method(
self,
sources: Sequence[Container],
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
**kwargs
):
# 从kwargs中提取参数提供默认值
asp_flow_rates = kwargs.get('asp_flow_rates')
dis_flow_rates = kwargs.get('dis_flow_rates')
offsets = kwargs.get('offsets')
touch_tip = kwargs.get('touch_tip', False)
liquid_height = kwargs.get('liquid_height')
blow_out_air_volume = kwargs.get('blow_out_air_volume')
spread = kwargs.get('spread', 'wide')
mix_stage = kwargs.get('mix_stage')
mix_times = kwargs.get('mix_times')
mix_vol = kwargs.get('mix_vol')
mix_rate = kwargs.get('mix_rate')
mix_liquid_height = kwargs.get('mix_liquid_height')
delays = kwargs.get('delays')
tip = []
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[0]],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
await self.aspirate(
resources=[sources[0]],
vols=[asp_vols[0]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
),
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=[targets[0]],
vols=[dis_vols[0]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[0]] if dis_flow_rates and len(dis_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
),
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[0]],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[0])
await self.touch_tip(targets[0])
await self.discard_tips(use_channels=use_channels)
async def _transfer_one_to_one(
self,
sources: Sequence[Container],

View File

@@ -804,7 +804,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
delays: Optional[List[int]] = None,
none_keys: List[str] = [],
) -> TransferLiquidReturn:
return await super().transfer_liquid(
if self.step_mode:
await self.create_protocol(f"transfer_liquid{time.time()}")
res = await super().transfer_liquid(
sources,
targets,
tip_racks,
@@ -827,6 +829,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
delays=delays,
none_keys=none_keys,
)
if self.step_mode:
await self.run_protocol()
return res
async def custom_delay(self, seconds=0, msg=None):
return await super().custom_delay(seconds, msg)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -581,6 +581,71 @@ class ResourceTreeSet(object):
d["model"] = res.config.get("model", None)
return d
# deserialize 会单独处理的元数据 key不传给构造函数
_META_KEYS = {"type", "parent_name", "location", "children", "rotation", "barcode"}
# deserialize 自定义逻辑使用的 key如 TipSpot 用 prototype_tip 构建 make_tip需保留
_DESERIALIZE_PRESERVED_KEYS = {"prototype_tip"}
def remove_incompatible_params(plr_d: dict) -> None:
"""递归移除 PLR 类不接受的参数,避免 deserialize 报错。
- 移除构造函数不接受的参数(如 compute_height_from_volume、ordering、category
- 对 TubeRack将 ordering 转为 ordered_items
- 保留 deserialize 自定义逻辑需要的 key如 prototype_tip
"""
if "type" in plr_d:
sub_cls = find_subclass(plr_d["type"], PLRResource)
if sub_cls is not None:
spec = inspect.signature(sub_cls)
valid_params = set(spec.parameters.keys())
# TubeRack 特殊处理:先转换 ordering再参与后续过滤
if "ordering" not in valid_params and "ordering" in plr_d:
ordering = plr_d.pop("ordering", None)
if sub_cls.__name__ == "TubeRack":
plr_d["ordered_items"] = (
_ordering_to_ordered_items(plr_d, ordering)
if ordering
else {}
)
# 移除构造函数不接受的参数(保留 META 和 deserialize 自定义逻辑需要的 key
for key in list(plr_d.keys()):
if (
key not in _META_KEYS
and key not in _DESERIALIZE_PRESERVED_KEYS
and key not in valid_params
):
plr_d.pop(key, None)
for child in plr_d.get("children", []):
remove_incompatible_params(child)
def _ordering_to_ordered_items(plr_d: dict, ordering: dict) -> dict:
"""将 ordering 转为 ordered_items从 children 构建 Tube 对象"""
from pylabrobot.resources import Tube, Coordinate
from pylabrobot.serializer import deserialize as plr_deserialize
children = plr_d.get("children", [])
ordered_items = {}
for idx, (ident, child_name) in enumerate(ordering.items()):
child_data = children[idx] if idx < len(children) else None
if child_data is None:
continue
loc_data = child_data.get("location")
loc = (
plr_deserialize(loc_data)
if loc_data
else Coordinate(0, 0, 0)
)
tube = Tube(
name=child_data.get("name", child_name or ident),
size_x=child_data.get("size_x", 10),
size_y=child_data.get("size_y", 10),
size_z=child_data.get("size_z", 50),
max_volume=child_data.get("max_volume", 1000),
)
tube.location = loc
ordered_items[ident] = tube
plr_d["children"] = [] # 已并入 ordered_items避免重复反序列化
return ordered_items
plr_resources = []
tracker = DeviceNodeResourceTracker()
@@ -600,9 +665,7 @@ class ResourceTreeSet(object):
raise ValueError(
f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}"
)
spec = inspect.signature(sub_cls)
if "category" not in spec.parameters:
plr_dict.pop("category", None)
remove_incompatible_params(plr_dict)
plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True)
from pylabrobot.resources import Coordinate
from pylabrobot.serializer import deserialize

View File

@@ -119,11 +119,14 @@ DEVICE_NAME_DEFAULT = "PRCXI" # transfer_liquid, set_liquid_from_plate 等动
# 节点类型
NODE_TYPE_DEFAULT = "ILab" # 所有节点的默认类型
CLASS_NAMES_MAPPING = {
"plate": "PRCXI_BioER_96_wellplate",
"tip_rack": "PRCXI_300ul_Tips",
}
# create_resource 节点默认参数
CREATE_RESOURCE_DEFAULTS = {
"device_id": "/PRCXI",
"parent_template": "/PRCXI/PRCXI_Deck",
"class_name": "PRCXI_BioER_96_wellplate",
}
# 默认液体体积 (uL)
@@ -367,11 +370,10 @@ def build_protocol_graph(
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
Args:
labware_info: reagent 信息字典,格式为 {name: {slot, well}, ...},用于 set_liquid 和 well 查找
labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...}
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
labware_defs: labware 定义列表,格式为 [{"name": "...", "slot": "1", "type": "lab_xxx"}, ...]
"""
G = WorkflowGraph()
resource_last_writer = {} # reagent_name -> "node_id:port"
@@ -379,7 +381,19 @@ def build_protocol_graph(
protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# ==================== 第一步:按 slot 创建 create_resource 节点 ====================
# ==================== 第一步:按 slot 去重创建 create_resource 节点 ====================
# 收集所有唯一的 slot
slots_info = {} # slot -> {labware, res_id}
for labware_id, item in labware_info.items():
slot = str(item.get("slot", ""))
if slot and slot not in slots_info:
res_id = f"plate_slot_{slot}"
slots_info[slot] = {
"labware": item.get("labware", ""),
"res_id": res_id,
"labware_id": labware_id,
}
# 创建 Group 节点,包含所有 create_resource 节点
group_node_id = str(uuid.uuid4())
G.add_node(
@@ -395,41 +409,38 @@ def build_protocol_graph(
param=None,
)
# 直接使用 JSON 中的 labware 定义,每个 slot 一条记录type 即 class_name
res_index = 0
for lw in (labware_defs or []):
slot = str(lw.get("slot", ""))
if not slot or slot in slot_to_create_resource:
continue # 跳过空 slot 或已处理的 slot
lw_name = lw.get("name", f"slot {slot}")
lw_type = lw.get("type", CREATE_RESOURCE_DEFAULTS["class_name"])
res_id = f"plate_slot_{slot}"
res_index += 1
# 为每个唯一的 slot 创建 create_resource 节点
for slot, info in slots_info.items():
node_id = str(uuid.uuid4())
res_id = info["res_id"]
res_type_name = info["labware"].lower().replace(".", "point")
res_type_name = f"lab_{res_type_name}"
G.add_node(
node_id,
template_name="create_resource",
resource_name="host_node",
name=lw_name,
description=f"Create {lw_name}",
name=f"{res_type_name}_slot{slot}",
description=f"Create plate on slot {slot}",
lab_node_type="Labware",
footer="create_resource-host_node",
device_name=DEVICE_NAME_HOST,
type=NODE_TYPE_DEFAULT,
parent_uuid=group_node_id,
minimized=True,
parent_uuid=group_node_id, # 指向 Group 节点
minimized=True, # 折叠显示
param={
"res_id": res_id,
"device_id": CREATE_RESOURCE_DEFAULTS["device_id"],
"class_name": lw_type,
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"],
"class_name": res_type_name,
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot),
"bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0},
"slot_on_deck": slot,
},
)
slot_to_create_resource[slot] = node_id
if "tip" in res_type_name and "rack" in res_type_name:
resource_last_writer[info["labware_id"]] = f"{node_id}:labware"
# create_resource 之间不需要 ready 连接
# ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ====================
# 创建 Group 节点,包含所有 set_liquid_from_plate 节点
@@ -511,6 +522,7 @@ def build_protocol_graph(
"reagent": "reagent",
"solvent": "solvent",
"compound": "compound",
"tip_racks": "tip_rack_identifier",
}
OUTPUT_PORT_MAPPING = {