diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index a88df4b9..61e0918c 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -10,6 +10,7 @@ from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, Liqu from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod from pylabrobot.liquid_handling.standard import GripDirection from pylabrobot.resources.errors import TooLittleLiquidError, TooLittleVolumeError +from pylabrobot.resources.volume_tracker import no_volume_tracking from pylabrobot.resources import ( Resource, TipRack, @@ -213,60 +214,60 @@ class LiquidHandlerMiddleware(LiquidHandler): if spread == "": spread = "custom" - def _safe_aspirate_volumes(_resources: Sequence[Container], _vols: List[float]) -> List[float]: - """将 aspirate 体积裁剪到源容器当前液量及 tip 剩余容量范围内,避免 volume tracker 报错。""" - safe: List[float] = [] - _head_owner = getattr(self, '_simulate_handler', None) or self - _head = getattr(_head_owner, 'head', {}) - _channels = use_channels or list(range(len(_resources))) - for i, (res, vol) in enumerate(zip(_resources, _vols)): - req = max(float(vol), 0.0) - tracker_disabled = False + for res in resources: + tracker = getattr(res, "tracker", None) + if tracker is None or getattr(tracker, "is_disabled", False): + continue + history = getattr(tracker, "liquid_history", None) + if tracker.get_used_volume() <= 0 and isinstance(history, list) and len(history) == 0: + fill_vol = tracker.max_volume if tracker.max_volume > 0 else 50000 try: - tracker = getattr(res, "tracker", None) - tracker_disabled = bool(getattr(tracker, "is_disabled", False)) - if not tracker_disabled: - get_used = getattr(tracker, "get_used_volume", None) - if callable(get_used): - used_volume = get_used() - if isinstance(used_volume, (int, float)) and used_volume > 0: - req = min(req, float(used_volume)) + tracker.add_liquid(fill_vol) except Exception: - pass + tracker.liquid_history.append(("auto_init", fill_vol)) - if not tracker_disabled: - try: - ch = _channels[i] if i < len(_channels) else i - if ch in _head and _head[ch].has_tip: - _tip = _head[ch].get_tip() - _tip_free = _tip.maximal_volume - _tip.tracker.get_used_volume() - if _tip_free >= 0: - req = min(req, _tip_free) - except Exception: - pass - - safe.append(req) - return safe - - actual_vols = _safe_aspirate_volumes(resources, vols) - if actual_vols != vols and hasattr(self, "_ros_node") and self._ros_node is not None: - self._ros_node.lab_logger().warning(f"[aspirate] volume adjusted, requested_vols={vols}, actual_vols={actual_vols}") if self._simulator: - return await self._simulate_handler.aspirate( - resources, - actual_vols, - use_channels, - flow_rates, - offsets, - liquid_height, - blow_out_air_volume, - spread, - **backend_kwargs, - ) + try: + return await self._simulate_handler.aspirate( + resources, + vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + except (TooLittleLiquidError, TooLittleVolumeError) as e: + tracker_info = [] + for r in resources: + t = r.tracker + tracker_info.append( + f"{r.name}(used={t.get_used_volume():.1f}, " + f"free={t.get_free_volume():.1f}, max={r.max_volume})" + ) + if hasattr(self, "_ros_node") and self._ros_node is not None: + self._ros_node.lab_logger().warning( + f"[aspirate] volume tracker error, bypassing tracking. " + f"error={e}, vols={vols}, trackers={tracker_info}" + ) + with no_volume_tracking(): + return await self._simulate_handler.aspirate( + resources, + vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) try: await super().aspirate( resources, - actual_vols, + vols, use_channels, flow_rates, offsets, @@ -279,7 +280,7 @@ class LiquidHandlerMiddleware(LiquidHandler): if "Resource is too small to space channels" in str(e) and spread != "custom": await super().aspirate( resources, - actual_vols, + vols, use_channels, flow_rates, offsets, @@ -290,35 +291,15 @@ class LiquidHandlerMiddleware(LiquidHandler): ) else: raise - except TooLittleLiquidError: - # 再兜底一次:按实时可用液量重算后重试,避免状态更新竞争导致的瞬时不足 - retry_vols = _safe_aspirate_volumes(resources, actual_vols) - if any(v > 0 for v in retry_vols): - await super().aspirate( - resources, - retry_vols, - use_channels, - flow_rates, - offsets, - liquid_height, - blow_out_air_volume, - spread, - **backend_kwargs, - ) - actual_vols = retry_vols - else: - actual_vols = retry_vols res_samples = [] res_volumes = [] - # 处理 use_channels 为 None 的情况(通常用于单通道操作) if use_channels is None: - # 对于单通道操作,推断通道为 [0] channels_to_use = [0] * len(resources) else: channels_to_use = use_channels - for resource, volume, channel in zip(resources, actual_vols, channels_to_use): + for resource, volume, channel in zip(resources, vols, channels_to_use): sample_uuid_value = getattr(resource, "unilabos_extra", {}).get(EXTRA_SAMPLE_UUID, None) res_samples.append({"name": resource.name, "sample_uuid": sample_uuid_value}) res_volumes.append(volume) @@ -365,17 +346,43 @@ class LiquidHandlerMiddleware(LiquidHandler): actual_vols = _safe_dispense_volumes(resources, vols) if self._simulator: - return await self._simulate_handler.dispense( - resources, - actual_vols, - use_channels, - flow_rates, - offsets, - liquid_height, - blow_out_air_volume, - spread, - **backend_kwargs, - ) + try: + return await self._simulate_handler.dispense( + resources, + actual_vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + except (TooLittleLiquidError, TooLittleVolumeError) as e: + tracker_info = [] + for r in resources: + t = r.tracker + tracker_info.append( + f"{r.name}(used={t.get_used_volume():.1f}, " + f"free={t.get_free_volume():.1f}, max={r.max_volume})" + ) + if hasattr(self, "_ros_node") and self._ros_node is not None: + self._ros_node.lab_logger().warning( + f"[dispense] volume tracker error, bypassing tracking. " + f"error={e}, vols={actual_vols}, trackers={tracker_info}" + ) + with no_volume_tracking(): + return await self._simulate_handler.dispense( + resources, + actual_vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) try: await super().dispense( resources, @@ -859,6 +866,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): local = cast(Union[Container, TipRack], plr) if hasattr(plr, "unilabos_extra") and hasattr(local, "unilabos_extra"): local.unilabos_extra = getattr(plr, "unilabos_extra", {}).copy() + if local is not plr and hasattr(plr, "tracker") and hasattr(local, "tracker"): + local_tracker = local.tracker + plr_tracker = plr.tracker + local_history = getattr(local_tracker, "liquid_history", None) + plr_history = getattr(plr_tracker, "liquid_history", None) + if (isinstance(local_history, list) and len(local_history) == 0 + and isinstance(plr_history, list) and len(plr_history) > 0): + local_tracker.liquid_history = list(plr_history) resolved.append(local) if len(resolved) != len(uuids): raise ValueError( @@ -868,8 +883,18 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): resolved = _resolve_from_local_by_uuids() result = list(items) - for (idx, _), plr in zip(dict_items, resolved): - result[idx] = plr + for (idx, orig_dict), res in zip(dict_items, resolved): + if isinstance(orig_dict, dict) and hasattr(res, "tracker"): + tracker = res.tracker + local_history = getattr(tracker, "liquid_history", None) + if isinstance(local_history, list) and len(local_history) == 0: + data = orig_dict.get("data") or {} + dict_history = data.get("liquid_history") + if isinstance(dict_history, list) and len(dict_history) > 0: + tracker.liquid_history = [ + (name, float(vol)) for name, vol in dict_history + ] + result[idx] = res return result @classmethod @@ -1070,7 +1095,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): for _ in range(len(sources)): tip = [] for __ in range(len(use_channels)): - tip.extend(self._get_next_tip()) + tip.append(self._get_next_tip()) await self.pick_up_tips(tip) await self.aspirate( resources=[sources[_]], @@ -1110,7 +1135,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): for i in range(0, len(sources), 8): tip = [] for _ in range(len(use_channels)): - tip.extend(self._get_next_tip()) + tip.append(self._get_next_tip()) await self.pick_up_tips(tip) current_targets = waste_liquid[i : i + 8] current_reagent_sources = sources[i : i + 8] @@ -1204,7 +1229,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): for _ in range(len(targets)): tip = [] for x in range(len(use_channels)): - tip.extend(self._get_next_tip()) + tip.append(self._get_next_tip()) await self.pick_up_tips(tip) await self.aspirate( @@ -1256,7 +1281,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): for i in range(0, len(targets), 8): tip = [] for _ in range(len(use_channels)): - tip.extend(self._get_next_tip()) + tip.append(self._get_next_tip()) await self.pick_up_tips(tip) current_targets = targets[i : i + 8] current_reagent_sources = reagent_sources[i : i + 8] @@ -1538,7 +1563,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): delays = kwargs.get('delays') tip = [] - tip.extend(self._get_next_tip()) + tip.append(self._get_next_tip()) await self.pick_up_tips(tip) blow_out_air_volume_before_vol = 0.0 if blow_out_air_volume_before is not None and len(blow_out_air_volume_before) > 0: @@ -1741,9 +1766,13 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]: """Yield tips from a list of TipRacks one-by-one until depleted.""" for rack in tip_racks: - for tip in rack: - yield tip - # raise RuntimeError("Out of tips!") + if isinstance(rack, TipSpot): + yield rack + elif hasattr(rack, "get_all_items"): + yield from rack.get_all_items() + else: + for tip in rack: + yield tip def _get_next_tip(self): """从 current_tip 迭代器获取下一个 tip,耗尽时抛出明确错误而非 StopIteration"""