diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 47eda93d..3292e118 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -9,6 +9,7 @@ from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod from pylabrobot.liquid_handling.standard import GripDirection +from pylabrobot.resources.errors import TooLittleLiquidError, TooLittleVolumeError from pylabrobot.resources import ( Resource, TipRack, @@ -211,10 +212,37 @@ class LiquidHandlerMiddleware(LiquidHandler): ): if spread == "": spread = "wide" + + def _safe_aspirate_volumes(_resources: Sequence[Container], _vols: List[float]) -> List[float]: + """将 aspirate 体积裁剪到源容器当前液量范围内,避免 volume tracker 报错。""" + safe: List[float] = [] + for res, vol in zip(_resources, _vols): + req = max(float(vol), 0.0) + used_volume = None + try: + tracker = getattr(res, "tracker", None) + if bool(getattr(tracker, "is_disabled", False)): + # tracker 关闭时(例如预吸空气),不按液体体积裁剪 + safe.append(req) + continue + get_used = getattr(tracker, "get_used_volume", None) + if callable(get_used): + used_volume = get_used() + except Exception: + used_volume = None + + if isinstance(used_volume, (int, float)): + req = min(req, max(float(used_volume), 0.0)) + safe.append(req) + return safe + + actual_vols = _safe_aspirate_volumes(resources, vols) + if actual_vols != vols and hasattr(self, "_ros_node") and self._ros_node is not None: + self._ros_node.lab_logger().warning(f"[aspirate] volume adjusted, requested_vols={vols}, actual_vols={actual_vols}") if self._simulator: return await self._simulate_handler.aspirate( resources, - vols, + actual_vols, use_channels, flow_rates, offsets, @@ -226,7 +254,7 @@ class LiquidHandlerMiddleware(LiquidHandler): try: await super().aspirate( resources, - vols, + actual_vols, use_channels, flow_rates, offsets, @@ -239,7 +267,7 @@ class LiquidHandlerMiddleware(LiquidHandler): if "Resource is too small to space channels" in str(e) and spread != "custom": await super().aspirate( resources, - vols, + actual_vols, use_channels, flow_rates, offsets, @@ -250,6 +278,24 @@ class LiquidHandlerMiddleware(LiquidHandler): ) else: raise + except TooLittleLiquidError: + # 再兜底一次:按实时可用液量重算后重试,避免状态更新竞争导致的瞬时不足 + retry_vols = _safe_aspirate_volumes(resources, actual_vols) + if any(v > 0 for v in retry_vols): + await super().aspirate( + resources, + retry_vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + actual_vols = retry_vols + else: + actual_vols = retry_vols res_samples = [] res_volumes = [] @@ -260,7 +306,7 @@ class LiquidHandlerMiddleware(LiquidHandler): else: channels_to_use = use_channels - for resource, volume, channel in zip(resources, vols, channels_to_use): + for resource, volume, channel in zip(resources, actual_vols, channels_to_use): sample_uuid_value = getattr(resource, "unilabos_extra", {}).get(EXTRA_SAMPLE_UUID, None) res_samples.append({"name": resource.name, "sample_uuid": sample_uuid_value}) res_volumes.append(volume) @@ -284,10 +330,32 @@ class LiquidHandlerMiddleware(LiquidHandler): ) -> SimpleReturn: if spread == "": spread = "wide" + + def _safe_dispense_volumes(_resources: Sequence[Container], _vols: List[float]) -> List[float]: + """将 dispense 体积裁剪到目标容器可用体积范围内,避免 volume tracker 报错。""" + safe: List[float] = [] + for res, vol in zip(_resources, _vols): + req = max(float(vol), 0.0) + free_volume = None + try: + tracker = getattr(res, "tracker", None) + get_free = getattr(tracker, "get_free_volume", None) + if callable(get_free): + free_volume = get_free() + except Exception: + free_volume = None + + if isinstance(free_volume, (int, float)): + req = min(req, max(float(free_volume), 0.0)) + safe.append(req) + return safe + + actual_vols = _safe_dispense_volumes(resources, vols) + if self._simulator: return await self._simulate_handler.dispense( resources, - vols, + actual_vols, use_channels, flow_rates, offsets, @@ -299,7 +367,7 @@ class LiquidHandlerMiddleware(LiquidHandler): try: await super().dispense( resources, - vols, + actual_vols, use_channels, flow_rates, offsets, @@ -312,7 +380,7 @@ class LiquidHandlerMiddleware(LiquidHandler): if "Resource is too small to space channels" in str(e) and spread != "custom": await super().dispense( resources, - vols, + actual_vols, use_channels, flow_rates, offsets, @@ -323,9 +391,31 @@ class LiquidHandlerMiddleware(LiquidHandler): ) else: raise + except TooLittleVolumeError: + # 再兜底一次:按实时 free volume 重新裁剪后重试,避免并发状态更新导致的瞬时超量 + retry_vols = _safe_dispense_volumes(resources, actual_vols) + if any(v > 0 for v in retry_vols): + await super().dispense( + resources, + retry_vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + actual_vols = retry_vols + else: + actual_vols = retry_vols res_samples = [] res_volumes = [] - for resource, volume, channel in zip(resources, vols, use_channels): + if use_channels is None: + channels_to_use = [0] * len(resources) + else: + channels_to_use = use_channels + for resource, volume, channel in zip(resources, actual_vols, channels_to_use): res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID] self.pending_liquids_dict[channel]["volume"] -= volume resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid @@ -728,17 +818,43 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): uuids = [x.get("uuid") or x.get("unilabos_uuid") for _, x in dict_items] if any(u is None for u in uuids): raise ValueError("dict 格式的资源必须包含 uuid 或 unilabos_uuid 字段") - resource_tree = await self._ros_node.get_resource(uuids) - plr_list = resource_tree.to_plr_resources(requested_uuids=uuids) - # 映射到设备本地的 resource_tracker 实例(与 deck 一致) + + def _resolve_from_local_by_uuids() -> List[Union[Container, TipRack]]: + resolved_locals: List[Union[Container, TipRack]] = [] + missing: List[str] = [] + for uid in uuids: + matches = self._ros_node.resource_tracker.figure_resource({"uuid": uid}, try_mode=True) + if matches: + resolved_locals.append(cast(Union[Container, TipRack], matches[0])) + else: + missing.append(str(uid)) + if missing: + raise ValueError( + f"远端资源树未返回且本地资源也未命中,缺失 UUID: {missing}" + ) + return resolved_locals + + # 优先走远端资源树查询;若远端为空或 requested_uuids 无法解析,则降级到本地 tracker 按 UUID 解析。 resolved = [] - for plr in plr_list: - local = self._ros_node.resource_tracker.figure_resource( - {"name": plr.name}, try_mode=False - ) - if hasattr(plr, "unilabos_extra") and hasattr(local, "unilabos_extra"): - local.unilabos_extra = getattr(plr, "unilabos_extra", {}).copy() - resolved.append(local) + try: + resource_tree = await self._ros_node.get_resource(uuids) + plr_list = resource_tree.to_plr_resources(requested_uuids=uuids) + for uid, plr in zip(uuids, plr_list): + local_matches = self._ros_node.resource_tracker.figure_resource({"uuid": uid}, try_mode=True) + if local_matches: + local = cast(Union[Container, TipRack], local_matches[0]) + else: + local = cast(Union[Container, TipRack], plr) + if hasattr(plr, "unilabos_extra") and hasattr(local, "unilabos_extra"): + local.unilabos_extra = getattr(plr, "unilabos_extra", {}).copy() + resolved.append(local) + if len(resolved) != len(uuids): + raise ValueError( + f"远端资源解析数量不匹配: requested={len(uuids)}, resolved={len(resolved)}" + ) + except Exception: + resolved = _resolve_from_local_by_uuids() + result = list(items) for (idx, _), plr in zip(dict_items, resolved): result[idx] = plr @@ -757,9 +873,18 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore ) + def _clamp_volume(resource: Union[Well, Container], volume: float) -> float: + # 防止初始化液量超过容器容量,导致后续 dispense 时 free volume 为负 + clamped = max(float(volume), 0.0) + max_volume = getattr(resource, "max_volume", None) + if isinstance(max_volume, (int, float)) and max_volume > 0: + clamped = min(clamped, float(max_volume)) + return clamped + for well, liquid_name, volume in zip(wells, liquid_names, volumes): - well.set_liquids([(liquid_name, volume)]) # type: ignore - res_volumes.append(volume) + safe_volume = _clamp_volume(well, volume) + well.set_liquids([(liquid_name, safe_volume)]) # type: ignore + res_volumes.append(safe_volume) return SetLiquidReturn( wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore @@ -789,9 +914,18 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): volumes=res_volumes, ) + def _clamp_volume(resource: Union[Well, Container], volume: float) -> float: + # 防止初始化液量超过容器容量,导致后续 dispense 时 free volume 为负 + clamped = max(float(volume), 0.0) + max_volume = getattr(resource, "max_volume", None) + if isinstance(max_volume, (int, float)) and max_volume > 0: + clamped = min(clamped, float(max_volume)) + return clamped + for well, liquid_name, volume in zip(wells, liquid_names, volumes): - well.set_liquids([(liquid_name, volume)]) # type: ignore - res_volumes.append(volume) + safe_volume = _clamp_volume(well, volume) + well.set_liquids([(liquid_name, safe_volume)]) # type: ignore + res_volumes.append(safe_volume) task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells}) submit_time = time.time() @@ -1300,11 +1434,21 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): max_len = max(num_sources, num_targets) for i in range(max_len): - # 辅助函数:安全地从列表中获取元素,如果列表为空则返回None - def safe_get(lst, idx, default=None): + # 辅助函数: + # - wrap=True: 返回 [value](用于 liquid_height 等列表参数) + # - wrap=False: 返回 value(用于 mix_* 标量参数) + def safe_get(value, idx, default=None, wrap: bool = True): + if value is None: + return default try: - return [lst[idx]] if lst else default - except Exception as e: + if isinstance(value, (list, tuple)): + if len(value) == 0: + return default + item = value[idx % len(value)] + else: + item = value + return [item] if wrap else item + except Exception: return default # 动态构建参数字典,只传递实际提供的参数 @@ -1335,15 +1479,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if spread is not None: kwargs['spread'] = spread if mix_stage is not None: - kwargs['mix_stage'] = safe_get(mix_stage, i) + kwargs['mix_stage'] = safe_get(mix_stage, i, wrap=False) if mix_times is not None: - kwargs['mix_times'] = safe_get(mix_times, i) + kwargs['mix_times'] = safe_get(mix_times, i, wrap=False) if mix_vol is not None: - kwargs['mix_vol'] = safe_get(mix_vol, i) + kwargs['mix_vol'] = safe_get(mix_vol, i, wrap=False) if mix_rate is not None: - kwargs['mix_rate'] = safe_get(mix_rate, i) + kwargs['mix_rate'] = safe_get(mix_rate, i, wrap=False) if mix_liquid_height is not None: - kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i) + kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i, wrap=False) if delays is not None: kwargs['delays'] = safe_get(delays, i) @@ -1384,7 +1528,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): tip = [] tip.extend(self._get_next_tip()) await self.pick_up_tips(tip) - + blow_out_air_volume_before_vol = 0.0 + if blow_out_air_volume_before is not None and len(blow_out_air_volume_before) > 0: + blow_out_air_volume_before_vol = float(blow_out_air_volume_before[0] or 0.0) + blow_out_air_volume_vol = 0.0 + if blow_out_air_volume is not None and len(blow_out_air_volume) > 0: + blow_out_air_volume_vol = float(blow_out_air_volume[0] or 0.0) + # PLR 的 blow_out_air_volume 是空气参数,不计入液体体积。 + # before 空气通过单独预吸实现,after 空气通过 blow_out_air_volume 参数实现。 if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: await self.mix( @@ -1397,17 +1548,26 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): use_channels=use_channels, ) - if blow_out_air_volume_before is not None and len(blow_out_air_volume_before) > 0: - await self.aspirate( - resources=[sources[0]], - vols=[blow_out_air_volume_before[0]], - use_channels=use_channels, - flow_rates=None, - offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())], - liquid_height=None, - blow_out_air_volume=None, - ) - + if blow_out_air_volume_before_vol > 0: + source_tracker = getattr(sources[0], "tracker", None) + source_tracker_was_disabled = bool(getattr(source_tracker, "is_disabled", False)) + try: + if source_tracker is not None and hasattr(source_tracker, "disable"): + source_tracker.disable() + await self.aspirate( + resources=[sources[0]], + vols=[blow_out_air_volume_before_vol], + use_channels=use_channels, + flow_rates=None, + offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())], + liquid_height=None, + blow_out_air_volume=None, + spread="custom", + ) + finally: + if source_tracker is not None: + source_tracker.enable() + await self.aspirate( resources=[sources[0]], vols=[asp_vols[0]], @@ -1416,7 +1576,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=[offsets[0]] if offsets and len(offsets) > 0 else None, liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None, blow_out_air_volume=( - [blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None + [blow_out_air_volume_vol] if blow_out_air_volume_vol > 0 else None ), spread=spread, ) @@ -1429,7 +1589,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): flow_rates=[dis_flow_rates[0]] if dis_flow_rates and len(dis_flow_rates) > 0 else None, offsets=[offsets[0]] if offsets and len(offsets) > 0 else None, blow_out_air_volume=( - [blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None + [blow_out_air_volume_vol] if blow_out_air_volume_vol > 0 else None ), liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None, spread=spread, diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index 397aca49..ba58e806 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -854,9 +854,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract): offsets: Optional[Coordinate] = None, mix_rate: Optional[float] = None, none_keys: List[str] = [], + use_channels: Optional[List[int]] = [0], ): return await self._unilabos_backend.mix( - targets, mix_time, mix_vol, height_to_bottom, offsets, mix_rate, none_keys + targets, mix_time, mix_vol, height_to_bottom, offsets, mix_rate, none_keys, use_channels ) def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]: @@ -1285,9 +1286,15 @@ class PRCXI9300Backend(LiquidHandlerBackend): offsets: Optional[Coordinate] = None, mix_rate: Optional[float] = None, none_keys: List[str] = [], + use_channels: Optional[List[int]] = [0], ): """Mix liquid in the specified resources.""" - + if use_channels == [0]: + axis = "Left" + elif use_channels == [1]: + axis = "Right" + else: + raise ValueError("Invalid use channels: " + str(use_channels)) plate_indexes = [] for op in targets: deck = op.parent.parent.parent diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index 6555d041..4798fd1a 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -706,15 +706,38 @@ class ResourceTreeSet(object): if requested_uuids: # 按请求的 UUID 顺序返回对应资源(从整棵树中按 uuid 提取) + # 优先使用 tracker.uuid_to_resources;若映射缺失,再递归遍历 PLR 树兜底搜索。 + def _find_plr_by_uuid(roots: List["PLRResource"], uid: str) -> Optional["PLRResource"]: + stack = list(roots) + while stack: + node = stack.pop() + node_uid = getattr(node, "unilabos_uuid", None) + if node_uid == uid: + return node + children = getattr(node, "children", None) or [] + stack.extend(children) + return None + result = [] + missing_uuids = [] for uid in requested_uuids: - if uid in tracker.uuid_to_resources: - result.append(tracker.uuid_to_resources[uid]) + found = tracker.uuid_to_resources.get(uid) + if found is None: + found = _find_plr_by_uuid(plr_resources, uid) + if found is not None: + # 回填缓存,后续相同 uuid 可直接命中 + tracker.uuid_to_resources[uid] = found + if found is None: + missing_uuids.append(uid) else: - raise ValueError( - f"请求的 UUID {uid} 在资源树中未找到。" - f"可用 UUID 数量: {len(tracker.uuid_to_resources)}" - ) + result.append(found) + + if missing_uuids: + raise ValueError( + f"请求的 UUID 未在资源树中找到: {missing_uuids}。" + f"可用 UUID 数量: {len(tracker.uuid_to_resources)}," + f"资源树数量: {len(self.trees)}" + ) return result return plr_resources diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index 6edb6e8d..43a854c1 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -51,6 +51,7 @@ -------------------------------------------------------------------------------- - 遍历 workflow 数组,为每个动作创建步骤节点 - 参数重命名: asp_vol -> asp_vols, dis_vol -> dis_vols, asp_flow_rate -> asp_flow_rates, dis_flow_rate -> dis_flow_rates +- 参数输入转换: liquid_height(按 wells 扩展);mix_stage/mix_times/mix_vol/mix_rate/mix_liquid_height 保持标量 - 参数扩展: 根据 targets 的 wells 数量,将单值扩展为数组 例: asp_vol=100.0, targets 有 3 个 wells -> asp_vols=[100.0, 100.0, 100.0] - 连接处理: 如果 sources/targets 已通过 set_liquid_from_plate 连接,参数值改为 [] @@ -543,8 +544,17 @@ def build_protocol_graph( "compound": "compound", } - # 需要根据 wells 数量扩展的参数列表(复数形式) - EXPAND_BY_WELLS_PARAMS = ["asp_vols", "dis_vols", "asp_flow_rates", "dis_flow_rates"] + # 需要根据 wells 数量扩展的参数列表: + # - 复数参数(asp_vols 等)支持单值自动扩展 + # - liquid_height 按 wells 扩展为数组 + # - mix_* 参数保持标量,避免被转换为 list + EXPAND_BY_WELLS_PARAMS = [ + "asp_vols", + "dis_vols", + "asp_flow_rates", + "dis_flow_rates", + "liquid_height", + ] # 处理协议步骤 for step in protocol_steps: @@ -558,6 +568,57 @@ def build_protocol_graph( if old_name in params: params[new_name] = params.pop(old_name) + # touch_tip 输入归一化: + # - 支持 bool / 0/1 / "true"/"false" / 单元素 list + # - 最终统一为 bool 标量,避免被下游误当作序列处理 + if "touch_tip" in params: + touch_tip_value = params.get("touch_tip") + if isinstance(touch_tip_value, list): + if len(touch_tip_value) == 1: + touch_tip_value = touch_tip_value[0] + elif len(touch_tip_value) == 0: + touch_tip_value = False + else: + warnings.append(f"touch_tip 期望标量,但收到长度为 {len(touch_tip_value)} 的列表,使用首个值") + touch_tip_value = touch_tip_value[0] + if isinstance(touch_tip_value, str): + norm = touch_tip_value.strip().lower() + if norm in {"true", "1", "yes", "y", "on"}: + touch_tip_value = True + elif norm in {"false", "0", "no", "n", "off", ""}: + touch_tip_value = False + else: + warnings.append(f"touch_tip 字符串值无法识别: {touch_tip_value},按 True 处理") + touch_tip_value = True + elif isinstance(touch_tip_value, (int, float)): + touch_tip_value = bool(touch_tip_value) + elif touch_tip_value is None: + touch_tip_value = False + else: + touch_tip_value = bool(touch_tip_value) + params["touch_tip"] = touch_tip_value + + # delays 输入归一化: + # - 支持标量(int/float/字符串数字)与 list + # - 最终统一为数字列表,供下游按 delays[0]/delays[1] 使用 + if "delays" in params: + delays_value = params.get("delays") + if delays_value is None or delays_value == "": + params["delays"] = [] + else: + raw_list = delays_value if isinstance(delays_value, list) else [delays_value] + normalized_delays = [] + for delay_item in raw_list: + if isinstance(delay_item, str): + delay_item = delay_item.strip() + if delay_item == "": + continue + try: + normalized_delays.append(float(delay_item)) + except (TypeError, ValueError): + warnings.append(f"delays 包含无法转换为数字的值: {delay_item},已忽略") + params["delays"] = normalized_delays + # 处理输入连接 for param_key, target_port in INPUT_PORT_MAPPING.items(): resource_name = params.get(param_key)