mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-24 11:24:19 +00:00
1888 lines
82 KiB
Python
1888 lines
82 KiB
Python
from __future__ import annotations
|
||
|
||
from math import e
|
||
import time
|
||
import traceback
|
||
from collections import Counter
|
||
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
|
||
|
||
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
|
||
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
|
||
from pylabrobot.liquid_handling.standard import GripDirection
|
||
from pylabrobot.resources.errors import TooLittleLiquidError, TooLittleVolumeError
|
||
from pylabrobot.resources.volume_tracker import no_volume_tracking
|
||
from pylabrobot.resources import (
|
||
Resource,
|
||
TipRack,
|
||
Container,
|
||
Coordinate,
|
||
Well,
|
||
Deck,
|
||
TipSpot,
|
||
Plate,
|
||
ResourceStack,
|
||
ResourceHolder,
|
||
Lid,
|
||
Trash,
|
||
Tip, TubeRack,
|
||
)
|
||
from typing_extensions import TypedDict
|
||
|
||
from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend
|
||
from unilabos.registry.placeholder_type import ResourceSlot
|
||
from unilabos.resources.resource_tracker import (
|
||
ResourceTreeSet,
|
||
ResourceDict,
|
||
EXTRA_SAMPLE_UUID,
|
||
EXTRA_UNILABOS_SAMPLE_UUID,
|
||
)
|
||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
|
||
|
||
|
||
class SimpleReturn(TypedDict):
|
||
samples: List[List[ResourceDict]]
|
||
volumes: List[float]
|
||
|
||
|
||
class SetLiquidReturn(TypedDict):
|
||
wells: List[List[ResourceDict]]
|
||
volumes: List[float]
|
||
|
||
|
||
class SetLiquidFromPlateReturn(TypedDict):
|
||
plate: List[List[ResourceDict]]
|
||
wells: List[List[ResourceDict]]
|
||
volumes: List[float]
|
||
|
||
|
||
class TransferLiquidReturn(TypedDict):
|
||
sources: List[List[ResourceDict]]
|
||
targets: List[List[ResourceDict]]
|
||
|
||
|
||
class LiquidHandlerMiddleware(LiquidHandler):
|
||
def __init__(
|
||
self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8, **kwargs
|
||
):
|
||
self._simulator = simulator
|
||
self.channel_num = channel_num
|
||
self.pending_liquids_dict = {}
|
||
joint_config = kwargs.get("joint_config", None)
|
||
if simulator:
|
||
if joint_config:
|
||
self._simulate_backend = UniLiquidHandlerRvizBackend(
|
||
channel_num, kwargs["total_height"], joint_config=joint_config, lh_device_id=deck.name
|
||
)
|
||
else:
|
||
self._simulate_backend = LiquidHandlerChatterboxBackend(channel_num)
|
||
self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False)
|
||
super().__init__(backend, deck)
|
||
|
||
async def setup(self, **backend_kwargs):
|
||
if self._simulator:
|
||
return await self._simulate_handler.setup(**backend_kwargs)
|
||
return await super().setup(**backend_kwargs)
|
||
|
||
def serialize_state(self) -> Dict[str, Any]:
|
||
if self._simulator:
|
||
self._simulate_handler.serialize_state()
|
||
return super().serialize_state()
|
||
|
||
def load_state(self, state: Dict[str, Any]):
|
||
if self._simulator:
|
||
self._simulate_handler.load_state(state)
|
||
super().load_state(state)
|
||
|
||
def update_head_state(self, state: Dict[int, Optional[Tip]]):
|
||
if self._simulator:
|
||
self._simulate_handler.update_head_state(state)
|
||
super().update_head_state(state)
|
||
|
||
def clear_head_state(self):
|
||
if self._simulator:
|
||
self._simulate_handler.clear_head_state()
|
||
super().clear_head_state()
|
||
|
||
def _run_async_in_thread(self, func, *args, **kwargs):
|
||
super()._run_async_in_thread(func, *args, **kwargs)
|
||
|
||
def _send_assigned_resource_to_backend(self, resource: Resource):
|
||
if self._simulator:
|
||
self._simulate_handler._send_assigned_resource_to_backend(resource)
|
||
super()._send_assigned_resource_to_backend(resource)
|
||
|
||
def _send_unassigned_resource_to_backend(self, resource: Resource):
|
||
if self._simulator:
|
||
self._simulate_handler._send_unassigned_resource_to_backend(resource)
|
||
super()._send_unassigned_resource_to_backend(resource)
|
||
|
||
def summary(self):
|
||
if self._simulator:
|
||
self._simulate_handler.summary()
|
||
super().summary()
|
||
|
||
def _assert_positions_unique(self, positions: List[str]):
|
||
super()._assert_positions_unique(positions)
|
||
|
||
def _assert_resources_exist(self, resources: Sequence[Resource]):
|
||
super()._assert_resources_exist(resources)
|
||
|
||
def _check_args(
|
||
self, method: Callable, backend_kwargs: Dict[str, Any], default: Set[str], strictness: Strictness
|
||
) -> Set[str]:
|
||
return super()._check_args(method, backend_kwargs, default, strictness)
|
||
|
||
def _make_sure_channels_exist(self, channels: List[int]):
|
||
super()._make_sure_channels_exist(channels)
|
||
|
||
def _format_param(self, value: Any) -> Any:
|
||
return super()._format_param(value)
|
||
|
||
def _log_command(self, name: str, **kwargs) -> None:
|
||
super()._log_command(name, **kwargs)
|
||
|
||
async def pick_up_tips(
|
||
self,
|
||
tip_spots: List[TipSpot],
|
||
use_channels: Optional[List[int]] = None,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
**backend_kwargs,
|
||
):
|
||
|
||
if self._simulator:
|
||
return await self._simulate_handler.pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
|
||
return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
|
||
|
||
async def drop_tips(
|
||
self,
|
||
tip_spots: Sequence[Union[TipSpot, Trash]],
|
||
use_channels: Optional[List[int]] = None,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
allow_nonzero_volume: bool = False,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.drop_tips(
|
||
tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs
|
||
)
|
||
await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs)
|
||
self.pending_liquids_dict = {}
|
||
return
|
||
|
||
async def return_tips(
|
||
self, use_channels: Optional[list[int]] = None, allow_nonzero_volume: bool = False, **backend_kwargs
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
|
||
return await super().return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
|
||
|
||
async def discard_tips(
|
||
self,
|
||
use_channels: Optional[List[int]] = None,
|
||
allow_nonzero_volume: bool = True,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
**backend_kwargs,
|
||
):
|
||
# 如果 use_channels 为 None,使用默认值(所有通道)
|
||
if use_channels is None:
|
||
use_channels = list(range(self.channel_num))
|
||
if not offsets or (isinstance(offsets, list) and len(offsets) != len(use_channels)):
|
||
offsets = [Coordinate.zero()] * len(use_channels)
|
||
if self._simulator:
|
||
return await self._simulate_handler.discard_tips(
|
||
use_channels, allow_nonzero_volume, offsets, **backend_kwargs
|
||
)
|
||
await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
|
||
self.pending_liquids_dict = {}
|
||
return
|
||
|
||
def _check_containers(self, resources: Sequence[Resource]):
|
||
super()._check_containers(resources)
|
||
|
||
async def aspirate(
|
||
self,
|
||
resources: Sequence[Container],
|
||
vols: List[float],
|
||
use_channels: Optional[List[int]] = None,
|
||
flow_rates: Optional[List[Optional[float]]] = None,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
liquid_height: Optional[List[Optional[float]]] = None,
|
||
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||
spread: Literal["wide", "tight", "custom"] = "custom",
|
||
**backend_kwargs,
|
||
):
|
||
if spread == "":
|
||
spread = "custom"
|
||
|
||
for res in resources:
|
||
tracker = getattr(res, "tracker", None)
|
||
if tracker is None or getattr(tracker, "is_disabled", False):
|
||
continue
|
||
history = getattr(tracker, "liquid_history", None)
|
||
if tracker.get_used_volume() <= 0 and isinstance(history, list) and len(history) == 0:
|
||
fill_vol = tracker.max_volume if tracker.max_volume > 0 else 50000
|
||
try:
|
||
tracker.add_liquid(fill_vol)
|
||
except Exception:
|
||
tracker.liquid_history.append(("auto_init", fill_vol))
|
||
|
||
if self._simulator:
|
||
try:
|
||
return await self._simulate_handler.aspirate(
|
||
resources,
|
||
vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread,
|
||
**backend_kwargs,
|
||
)
|
||
except (TooLittleLiquidError, TooLittleVolumeError) as e:
|
||
tracker_info = []
|
||
for r in resources:
|
||
t = r.tracker
|
||
tracker_info.append(
|
||
f"{r.name}(used={t.get_used_volume():.1f}, "
|
||
f"free={t.get_free_volume():.1f}, max={r.max_volume})"
|
||
)
|
||
if hasattr(self, "_ros_node") and self._ros_node is not None:
|
||
self._ros_node.lab_logger().warning(
|
||
f"[aspirate] volume tracker error, bypassing tracking. "
|
||
f"error={e}, vols={vols}, trackers={tracker_info}"
|
||
)
|
||
with no_volume_tracking():
|
||
return await self._simulate_handler.aspirate(
|
||
resources,
|
||
vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread,
|
||
**backend_kwargs,
|
||
)
|
||
try:
|
||
await super().aspirate(
|
||
resources,
|
||
vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread,
|
||
**backend_kwargs,
|
||
)
|
||
except ValueError as e:
|
||
if "Resource is too small to space channels" in str(e) and spread != "custom":
|
||
await super().aspirate(
|
||
resources,
|
||
vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread="custom",
|
||
**backend_kwargs,
|
||
)
|
||
else:
|
||
raise
|
||
|
||
res_samples = []
|
||
res_volumes = []
|
||
if use_channels is None:
|
||
channels_to_use = [0] * len(resources)
|
||
else:
|
||
channels_to_use = use_channels
|
||
|
||
for resource, volume, channel in zip(resources, vols, channels_to_use):
|
||
sample_uuid_value = getattr(resource, "unilabos_extra", {}).get(EXTRA_SAMPLE_UUID, None)
|
||
res_samples.append({"name": resource.name, "sample_uuid": sample_uuid_value})
|
||
res_volumes.append(volume)
|
||
self.pending_liquids_dict[channel] = {
|
||
EXTRA_SAMPLE_UUID: sample_uuid_value,
|
||
"volume": volume,
|
||
}
|
||
return SimpleReturn(samples=res_samples, volumes=res_volumes)
|
||
|
||
async def dispense(
|
||
self,
|
||
resources: Sequence[Container],
|
||
vols: List[float],
|
||
use_channels: Optional[List[int]] = None,
|
||
flow_rates: Optional[List[Optional[float]]] = None,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
liquid_height: Optional[List[Optional[float]]] = None,
|
||
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||
spread: Literal["wide", "tight", "custom"] = "wide",
|
||
**backend_kwargs,
|
||
) -> SimpleReturn:
|
||
if spread == "":
|
||
spread = "wide"
|
||
|
||
def _safe_dispense_volumes(_resources: Sequence[Container], _vols: List[float]) -> List[float]:
|
||
"""将 dispense 体积裁剪到目标容器可用体积范围内,避免 volume tracker 报错。"""
|
||
safe: List[float] = []
|
||
for res, vol in zip(_resources, _vols):
|
||
req = max(float(vol), 0.0)
|
||
free_volume = None
|
||
try:
|
||
tracker = getattr(res, "tracker", None)
|
||
get_free = getattr(tracker, "get_free_volume", None)
|
||
if callable(get_free):
|
||
free_volume = get_free()
|
||
except Exception:
|
||
free_volume = None
|
||
|
||
if isinstance(free_volume, (int, float)):
|
||
req = min(req, max(float(free_volume), 0.0))
|
||
safe.append(req)
|
||
return safe
|
||
|
||
actual_vols = _safe_dispense_volumes(resources, vols)
|
||
|
||
if self._simulator:
|
||
try:
|
||
return await self._simulate_handler.dispense(
|
||
resources,
|
||
actual_vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread,
|
||
**backend_kwargs,
|
||
)
|
||
except (TooLittleLiquidError, TooLittleVolumeError) as e:
|
||
tracker_info = []
|
||
for r in resources:
|
||
t = r.tracker
|
||
tracker_info.append(
|
||
f"{r.name}(used={t.get_used_volume():.1f}, "
|
||
f"free={t.get_free_volume():.1f}, max={r.max_volume})"
|
||
)
|
||
if hasattr(self, "_ros_node") and self._ros_node is not None:
|
||
self._ros_node.lab_logger().warning(
|
||
f"[dispense] volume tracker error, bypassing tracking. "
|
||
f"error={e}, vols={actual_vols}, trackers={tracker_info}"
|
||
)
|
||
with no_volume_tracking():
|
||
return await self._simulate_handler.dispense(
|
||
resources,
|
||
actual_vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread,
|
||
**backend_kwargs,
|
||
)
|
||
try:
|
||
await super().dispense(
|
||
resources,
|
||
actual_vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread,
|
||
**backend_kwargs,
|
||
)
|
||
except ValueError as e:
|
||
if "Resource is too small to space channels" in str(e) and spread != "custom":
|
||
await super().dispense(
|
||
resources,
|
||
actual_vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
"custom",
|
||
**backend_kwargs,
|
||
)
|
||
else:
|
||
raise
|
||
except TooLittleVolumeError:
|
||
# 再兜底一次:按实时 free volume 重新裁剪后重试,避免并发状态更新导致的瞬时超量
|
||
retry_vols = _safe_dispense_volumes(resources, actual_vols)
|
||
if any(v > 0 for v in retry_vols):
|
||
await super().dispense(
|
||
resources,
|
||
retry_vols,
|
||
use_channels,
|
||
flow_rates,
|
||
offsets,
|
||
liquid_height,
|
||
blow_out_air_volume,
|
||
spread,
|
||
**backend_kwargs,
|
||
)
|
||
actual_vols = retry_vols
|
||
else:
|
||
actual_vols = retry_vols
|
||
res_samples = []
|
||
res_volumes = []
|
||
if use_channels is None:
|
||
channels_to_use = [0] * len(resources)
|
||
else:
|
||
channels_to_use = use_channels
|
||
for resource, volume, channel in zip(resources, actual_vols, channels_to_use):
|
||
res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID]
|
||
self.pending_liquids_dict[channel]["volume"] -= volume
|
||
resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid
|
||
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid})
|
||
res_volumes.append(volume)
|
||
|
||
return SimpleReturn(samples=res_samples, volumes=res_volumes)
|
||
|
||
async def transfer(
|
||
self,
|
||
source: Well,
|
||
targets: List[Well],
|
||
source_vol: Optional[float] = None,
|
||
ratios: Optional[List[float]] = None,
|
||
target_vols: Optional[List[float]] = None,
|
||
aspiration_flow_rate: Optional[float] = None,
|
||
dispense_flow_rates: Optional[List[Optional[float]]] = None,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.transfer(
|
||
source,
|
||
targets,
|
||
source_vol,
|
||
ratios,
|
||
target_vols,
|
||
aspiration_flow_rate,
|
||
dispense_flow_rates,
|
||
**backend_kwargs,
|
||
)
|
||
return await super().transfer(
|
||
source,
|
||
targets,
|
||
source_vol,
|
||
ratios,
|
||
target_vols,
|
||
aspiration_flow_rate,
|
||
dispense_flow_rates,
|
||
**backend_kwargs,
|
||
)
|
||
|
||
def use_channels(self, channels: List[int]):
|
||
if self._simulator:
|
||
self._simulate_handler.use_channels(channels)
|
||
return super().use_channels(channels)
|
||
|
||
async def pick_up_tips96(self, tip_rack: TipRack, offset: Coordinate = Coordinate.zero(), **backend_kwargs):
|
||
if self._simulator:
|
||
return await self._simulate_handler.pick_up_tips96(tip_rack, offset, **backend_kwargs)
|
||
return await super().pick_up_tips96(tip_rack, offset, **backend_kwargs)
|
||
|
||
async def drop_tips96(
|
||
self,
|
||
resource: Union[TipRack, Trash],
|
||
offset: Coordinate = Coordinate.zero(),
|
||
allow_nonzero_volume: bool = False,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
|
||
return await super().drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
|
||
|
||
def _get_96_head_origin_tip_rack(self) -> Optional[TipRack]:
|
||
return super()._get_96_head_origin_tip_rack()
|
||
|
||
async def return_tips96(self, allow_nonzero_volume: bool = False, **backend_kwargs):
|
||
if self._simulator:
|
||
return await self._simulate_handler.return_tips96(allow_nonzero_volume, **backend_kwargs)
|
||
return await super().return_tips96(allow_nonzero_volume, **backend_kwargs)
|
||
|
||
async def discard_tips96(self, allow_nonzero_volume: bool = True, **backend_kwargs):
|
||
if self._simulator:
|
||
return await self._simulate_handler.discard_tips96(allow_nonzero_volume, **backend_kwargs)
|
||
return await super().discard_tips96(allow_nonzero_volume, **backend_kwargs)
|
||
|
||
async def aspirate96(
|
||
self,
|
||
resource: Union[Plate, Container, List[Well]],
|
||
volume: float,
|
||
offset: Coordinate = Coordinate.zero(),
|
||
flow_rate: Optional[float] = None,
|
||
blow_out_air_volume: Optional[float] = None,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.aspirate96(
|
||
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
|
||
)
|
||
return await super().aspirate96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
|
||
|
||
async def dispense96(
|
||
self,
|
||
resource: Union[Plate, Container, List[Well]],
|
||
volume: float,
|
||
offset: Coordinate = Coordinate.zero(),
|
||
flow_rate: Optional[float] = None,
|
||
blow_out_air_volume: Optional[float] = None,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.dispense96(
|
||
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
|
||
)
|
||
return await super().dispense96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
|
||
|
||
async def stamp(
|
||
self,
|
||
source: Plate,
|
||
target: Plate,
|
||
volume: float,
|
||
aspiration_flow_rate: Optional[float] = None,
|
||
dispense_flow_rate: Optional[float] = None,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
|
||
return await super().stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
|
||
|
||
async def pick_up_resource(
|
||
self,
|
||
resource: Resource,
|
||
offset: Coordinate = Coordinate.zero(),
|
||
pickup_distance_from_top: float = 0,
|
||
direction: GripDirection = GripDirection.FRONT,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.pick_up_resource(
|
||
resource, offset, pickup_distance_from_top, direction, **backend_kwargs
|
||
)
|
||
return await super().pick_up_resource(resource, offset, pickup_distance_from_top, direction, **backend_kwargs)
|
||
|
||
async def move_picked_up_resource(
|
||
self,
|
||
to: Coordinate,
|
||
offset: Coordinate = Coordinate.zero(),
|
||
direction: Optional[GripDirection] = None,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.move_picked_up_resource(to, offset, direction, **backend_kwargs)
|
||
return await super().move_picked_up_resource(to, offset, direction, **backend_kwargs)
|
||
|
||
async def drop_resource(
|
||
self,
|
||
destination: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||
offset: Coordinate = Coordinate.zero(),
|
||
direction: GripDirection = GripDirection.FRONT,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.drop_resource(destination, offset, direction, **backend_kwargs)
|
||
return await super().drop_resource(destination, offset, direction, **backend_kwargs)
|
||
|
||
async def move_resource(
|
||
self,
|
||
resource: Resource,
|
||
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||
intermediate_locations: Optional[List[Coordinate]] = None,
|
||
pickup_offset: Coordinate = Coordinate.zero(),
|
||
destination_offset: Coordinate = Coordinate.zero(),
|
||
pickup_distance_from_top: float = 0,
|
||
pickup_direction: GripDirection = GripDirection.FRONT,
|
||
drop_direction: GripDirection = GripDirection.FRONT,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.move_resource(
|
||
resource,
|
||
to,
|
||
intermediate_locations,
|
||
pickup_offset,
|
||
destination_offset,
|
||
pickup_distance_from_top,
|
||
pickup_direction,
|
||
drop_direction,
|
||
**backend_kwargs,
|
||
)
|
||
return await super().move_resource(
|
||
resource,
|
||
to,
|
||
intermediate_locations,
|
||
pickup_offset,
|
||
destination_offset,
|
||
pickup_distance_from_top,
|
||
pickup_direction,
|
||
drop_direction,
|
||
**backend_kwargs,
|
||
)
|
||
|
||
async def move_lid(
|
||
self,
|
||
lid: Lid,
|
||
to: Union[Plate, ResourceStack, Coordinate],
|
||
intermediate_locations: Optional[List[Coordinate]] = None,
|
||
pickup_offset: Coordinate = Coordinate.zero(),
|
||
destination_offset: Coordinate = Coordinate.zero(),
|
||
pickup_direction: GripDirection = GripDirection.FRONT,
|
||
drop_direction: GripDirection = GripDirection.FRONT,
|
||
pickup_distance_from_top: float = 5.7 - 3.33,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.move_lid(
|
||
lid,
|
||
to,
|
||
intermediate_locations,
|
||
pickup_offset,
|
||
destination_offset,
|
||
pickup_direction,
|
||
drop_direction,
|
||
pickup_distance_from_top,
|
||
**backend_kwargs,
|
||
)
|
||
return await super().move_lid(
|
||
lid,
|
||
to,
|
||
intermediate_locations,
|
||
pickup_offset,
|
||
destination_offset,
|
||
pickup_direction,
|
||
drop_direction,
|
||
pickup_distance_from_top,
|
||
**backend_kwargs,
|
||
)
|
||
|
||
async def move_plate(
|
||
self,
|
||
plate: Plate,
|
||
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||
intermediate_locations: Optional[List[Coordinate]] = None,
|
||
pickup_offset: Coordinate = Coordinate.zero(),
|
||
destination_offset: Coordinate = Coordinate.zero(),
|
||
drop_direction: GripDirection = GripDirection.FRONT,
|
||
pickup_direction: GripDirection = GripDirection.FRONT,
|
||
pickup_distance_from_top: float = 13.2 - 3.33,
|
||
**backend_kwargs,
|
||
):
|
||
if self._simulator:
|
||
return await self._simulate_handler.move_plate(
|
||
plate,
|
||
to,
|
||
intermediate_locations,
|
||
pickup_offset,
|
||
destination_offset,
|
||
drop_direction,
|
||
pickup_direction,
|
||
pickup_distance_from_top,
|
||
**backend_kwargs,
|
||
)
|
||
return await super().move_plate(
|
||
plate,
|
||
to,
|
||
intermediate_locations,
|
||
pickup_offset,
|
||
destination_offset,
|
||
drop_direction,
|
||
pickup_direction,
|
||
pickup_distance_from_top,
|
||
**backend_kwargs,
|
||
)
|
||
|
||
def serialize(self):
|
||
if self._simulator:
|
||
self._simulate_handler.serialize()
|
||
return super().serialize()
|
||
|
||
@classmethod
|
||
def deserialize(cls, data: dict, allow_marshal: bool = False) -> LiquidHandler:
|
||
return super().deserialize(data, allow_marshal)
|
||
|
||
@classmethod
|
||
def load(cls, path: str) -> LiquidHandler:
|
||
return super().load(path)
|
||
|
||
async def prepare_for_manual_channel_operation(self, channel: int):
|
||
if self._simulator:
|
||
return await self._simulate_handler.prepare_for_manual_channel_operation(channel)
|
||
return await super().prepare_for_manual_channel_operation(channel)
|
||
|
||
async def move_channel_x(self, channel: int, x: float):
|
||
if self._simulator:
|
||
return await self._simulate_handler.move_channel_x(channel, x)
|
||
return await super().move_channel_x(channel, x)
|
||
|
||
async def move_channel_y(self, channel: int, y: float):
|
||
if self._simulator:
|
||
return await self._simulate_handler.move_channel_y(channel, y)
|
||
return await super().move_channel_y(channel, y)
|
||
|
||
async def move_channel_z(self, channel: int, z: float):
|
||
if self._simulator:
|
||
return await self._simulate_handler.move_channel_z(channel, z)
|
||
return await super().move_channel_z(channel, z)
|
||
|
||
def assign_child_resource(self, resource: Resource, location: Optional[Coordinate], reassign: bool = True):
|
||
if self._simulator:
|
||
self._simulate_handler.assign_child_resource(resource, location, reassign)
|
||
pass
|
||
|
||
async def probe_tip_presence_via_pickup(
|
||
self, tip_spots: List[TipSpot], use_channels: Optional[List[int]] = None
|
||
) -> Dict[str, bool]:
|
||
if self._simulator:
|
||
return await self._simulate_handler.probe_tip_presence_via_pickup(tip_spots, use_channels)
|
||
return await super().probe_tip_presence_via_pickup(tip_spots, use_channels)
|
||
|
||
async def probe_tip_inventory(
|
||
self,
|
||
tip_spots: List[TipSpot],
|
||
probing_fn: Optional[TipPresenceProbingMethod] = None,
|
||
use_channels: Optional[List[int]] = None,
|
||
) -> Dict[str, bool]:
|
||
if self._simulator:
|
||
return await self._simulate_handler.probe_tip_inventory(tip_spots, probing_fn, use_channels)
|
||
return await super().probe_tip_inventory(tip_spots, probing_fn, use_channels)
|
||
|
||
async def consolidate_tip_inventory(self, tip_racks: List[TipRack], use_channels: Optional[List[int]] = None):
|
||
if self._simulator:
|
||
return await self._simulate_handler.consolidate_tip_inventory(tip_racks, use_channels)
|
||
return await super().consolidate_tip_inventory(tip_racks, use_channels)
|
||
|
||
|
||
class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
||
"""Extended LiquidHandler with additional operations."""
|
||
|
||
support_touch_tip = True
|
||
_ros_node: BaseROS2DeviceNode
|
||
|
||
def __init__(
|
||
self,
|
||
backend: LiquidHandlerBackend,
|
||
deck: Deck,
|
||
simulator: bool = False,
|
||
channel_num: int = 8,
|
||
total_height: float = 310,
|
||
):
|
||
"""Initialize a LiquidHandler.
|
||
|
||
Args:
|
||
backend: Backend to use.
|
||
deck: Deck to use.
|
||
"""
|
||
backend_type = None
|
||
if isinstance(backend, dict) and "type" in backend:
|
||
backend_dict = backend.copy()
|
||
type_str = backend_dict.pop("type")
|
||
try:
|
||
# Try to get class from string using globals (current module), or fallback to pylabrobot or unilabos namespaces
|
||
backend_cls = None
|
||
if type_str in globals():
|
||
backend_cls = globals()[type_str]
|
||
else:
|
||
# Try resolving dotted notation, e.g. "xxx.yyy.ClassName"
|
||
components = type_str.split(".")
|
||
mod = None
|
||
if len(components) > 1:
|
||
module_name = ".".join(components[:-1])
|
||
try:
|
||
import importlib
|
||
|
||
mod = importlib.import_module(module_name)
|
||
except ImportError:
|
||
mod = None
|
||
if mod is not None:
|
||
backend_cls = getattr(mod, components[-1], None)
|
||
if backend_cls is None:
|
||
# Try pylabrobot style import (if available)
|
||
try:
|
||
import pylabrobot
|
||
|
||
backend_cls = getattr(pylabrobot, type_str, None)
|
||
except Exception:
|
||
backend_cls = None
|
||
if backend_cls is not None and isinstance(backend_cls, type):
|
||
backend_type = backend_cls(**backend_dict) # pass the rest of dict as kwargs
|
||
except Exception as exc:
|
||
raise RuntimeError(f"Failed to convert backend type '{type_str}' to class: {exc}")
|
||
else:
|
||
backend_type = backend
|
||
self._simulator = simulator
|
||
self.group_info = dict()
|
||
super().__init__(backend_type, deck, simulator, channel_num)
|
||
|
||
def post_init(self, ros_node: BaseROS2DeviceNode):
|
||
self._ros_node = ros_node
|
||
|
||
async def _resolve_to_plr_resources(
|
||
self,
|
||
items: Sequence[Union[Container, TipRack, Dict[str, Any]]],
|
||
) -> List[Union[Container, TipRack]]:
|
||
"""将 dict 格式的资源解析为 PLR 实例。若全部已是 PLR,直接返回。"""
|
||
dict_items = [(i, x) for i, x in enumerate(items) if isinstance(x, dict)]
|
||
if not dict_items:
|
||
return list(items)
|
||
if not hasattr(self, "_ros_node") or self._ros_node is None:
|
||
raise ValueError(
|
||
"传入 dict 格式的 sources/targets/tip_racks 时,需通过 post_init 注入 _ros_node,"
|
||
"才能从物料系统按 uuid 解析为 PLR 资源。"
|
||
)
|
||
uuids = [x.get("uuid") or x.get("unilabos_uuid") for _, x in dict_items]
|
||
if any(u is None for u in uuids):
|
||
raise ValueError("dict 格式的资源必须包含 uuid 或 unilabos_uuid 字段")
|
||
|
||
def _resolve_from_local_by_uuids() -> List[Union[Container, TipRack]]:
|
||
resolved_locals: List[Union[Container, TipRack]] = []
|
||
missing: List[str] = []
|
||
for uid in uuids:
|
||
matches = self._ros_node.resource_tracker.figure_resource({"uuid": uid}, try_mode=True)
|
||
if matches:
|
||
resolved_locals.append(cast(Union[Container, TipRack], matches[0]))
|
||
else:
|
||
missing.append(str(uid))
|
||
if missing:
|
||
raise ValueError(
|
||
f"远端资源树未返回且本地资源也未命中,缺失 UUID: {missing}"
|
||
)
|
||
return resolved_locals
|
||
|
||
# 优先走远端资源树查询;若远端为空或 requested_uuids 无法解析,则降级到本地 tracker 按 UUID 解析。
|
||
resolved = []
|
||
try:
|
||
resource_tree = await self._ros_node.get_resource(uuids)
|
||
plr_list = resource_tree.to_plr_resources(requested_uuids=uuids)
|
||
for uid, plr in zip(uuids, plr_list):
|
||
local_matches = self._ros_node.resource_tracker.figure_resource({"uuid": uid}, try_mode=True)
|
||
if local_matches:
|
||
local = cast(Union[Container, TipRack], local_matches[0])
|
||
else:
|
||
local = cast(Union[Container, TipRack], plr)
|
||
if hasattr(plr, "unilabos_extra") and hasattr(local, "unilabos_extra"):
|
||
local.unilabos_extra = getattr(plr, "unilabos_extra", {}).copy()
|
||
if local is not plr and hasattr(plr, "tracker") and hasattr(local, "tracker"):
|
||
local_tracker = local.tracker
|
||
plr_tracker = plr.tracker
|
||
local_history = getattr(local_tracker, "liquid_history", None)
|
||
plr_history = getattr(plr_tracker, "liquid_history", None)
|
||
if (isinstance(local_history, list) and len(local_history) == 0
|
||
and isinstance(plr_history, list) and len(plr_history) > 0):
|
||
local_tracker.liquid_history = list(plr_history)
|
||
resolved.append(local)
|
||
if len(resolved) != len(uuids):
|
||
raise ValueError(
|
||
f"远端资源解析数量不匹配: requested={len(uuids)}, resolved={len(resolved)}"
|
||
)
|
||
except Exception:
|
||
resolved = _resolve_from_local_by_uuids()
|
||
|
||
result = list(items)
|
||
for (idx, orig_dict), res in zip(dict_items, resolved):
|
||
if isinstance(orig_dict, dict) and hasattr(res, "tracker"):
|
||
tracker = res.tracker
|
||
local_history = getattr(tracker, "liquid_history", None)
|
||
if isinstance(local_history, list) and len(local_history) == 0:
|
||
data = orig_dict.get("data") or {}
|
||
dict_history = data.get("liquid_history")
|
||
if isinstance(dict_history, list) and len(dict_history) > 0:
|
||
tracker.liquid_history = [
|
||
(name, float(vol)) for name, vol in dict_history
|
||
]
|
||
result[idx] = res
|
||
return result
|
||
|
||
@classmethod
|
||
def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SetLiquidReturn:
|
||
"""Set the liquid in a well.
|
||
|
||
如果 liquid_names 和 volumes 为空,但 wells 不为空,直接返回 wells。
|
||
"""
|
||
res_volumes = []
|
||
# 如果 liquid_names 和 volumes 都为空,直接返回 wells
|
||
if not liquid_names and not volumes:
|
||
return SetLiquidReturn(
|
||
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore
|
||
)
|
||
|
||
def _clamp_volume(resource: Union[Well, Container], volume: float) -> float:
|
||
# 防止初始化液量超过容器容量,导致后续 dispense 时 free volume 为负
|
||
clamped = max(float(volume), 0.0)
|
||
max_volume = getattr(resource, "max_volume", None)
|
||
if isinstance(max_volume, (int, float)) and max_volume > 0:
|
||
clamped = min(clamped, float(max_volume))
|
||
return clamped
|
||
|
||
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
|
||
safe_volume = _clamp_volume(well, volume)
|
||
well.set_liquids([(liquid_name, safe_volume)]) # type: ignore
|
||
res_volumes.append(safe_volume)
|
||
|
||
return SetLiquidReturn(
|
||
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore
|
||
)
|
||
|
||
def set_liquid_from_plate(
|
||
self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float]
|
||
) -> SetLiquidFromPlateReturn:
|
||
"""Set the liquid in wells of a plate by well names (e.g., A1, A2, B3).
|
||
|
||
如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。
|
||
"""
|
||
assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}"
|
||
plate: Union[Plate, TubeRack]
|
||
# 根据 well_names 获取对应的 Well 对象
|
||
if issubclass(plate.__class__, Plate):
|
||
wells = [plate.get_well(name) for name in well_names]
|
||
elif issubclass(plate.__class__, TubeRack):
|
||
wells = [plate.get_tube(name) for name in well_names]
|
||
res_volumes = []
|
||
|
||
# 如果 liquid_names 和 volumes 都为空,直接返回
|
||
if not liquid_names and not volumes:
|
||
return SetLiquidFromPlateReturn(
|
||
plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore
|
||
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore
|
||
volumes=res_volumes,
|
||
)
|
||
|
||
def _clamp_volume(resource: Union[Well, Container], volume: float) -> float:
|
||
# 防止初始化液量超过容器容量,导致后续 dispense 时 free volume 为负
|
||
clamped = max(float(volume), 0.0)
|
||
max_volume = getattr(resource, "max_volume", None)
|
||
if isinstance(max_volume, (int, float)) and max_volume > 0:
|
||
clamped = min(clamped, float(max_volume))
|
||
return clamped
|
||
|
||
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
|
||
safe_volume = _clamp_volume(well, volume)
|
||
well.set_liquids([(liquid_name, safe_volume)]) # type: ignore
|
||
res_volumes.append(safe_volume)
|
||
|
||
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells})
|
||
submit_time = time.time()
|
||
while not task.done():
|
||
if time.time() - submit_time > 10:
|
||
self._ros_node.lab_logger().info(f"set_liquid_from_plate {plate} 超时")
|
||
break
|
||
time.sleep(0.01)
|
||
|
||
return SetLiquidFromPlateReturn(
|
||
plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore
|
||
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore
|
||
volumes=res_volumes,
|
||
)
|
||
|
||
# ---------------------------------------------------------------
|
||
# REMOVE LIQUID --------------------------------------------------
|
||
# ---------------------------------------------------------------
|
||
|
||
def set_group(self, group_name: str, wells: List[Well], volumes: List[float]):
|
||
if self.channel_num == 8 and len(wells) != 8:
|
||
raise RuntimeError(f"Expected 8 wells, got {len(wells)}")
|
||
self.group_info[group_name] = wells
|
||
self.set_liquid(wells, [group_name] * len(wells), volumes)
|
||
|
||
async def transfer_group(self, source_group_name: str, target_group_name: str, unit_volume: float):
|
||
|
||
source_wells = self.group_info.get(source_group_name, [])
|
||
target_wells = self.group_info.get(target_group_name, [])
|
||
|
||
rack_info = dict()
|
||
for child in self.deck.children:
|
||
if issubclass(child.__class__, TipRack):
|
||
rack: TipRack = cast(TipRack, child)
|
||
if "plate" not in rack.name.lower():
|
||
for tip in rack.get_all_tips():
|
||
if unit_volume > tip.maximal_volume:
|
||
break
|
||
else:
|
||
rack_info[rack.name] = (rack, tip.maximal_volume - unit_volume)
|
||
|
||
if len(rack_info) == 0:
|
||
raise ValueError(f"No tip rack can support volume {unit_volume}.")
|
||
|
||
rack_info = sorted(rack_info.items(), key=lambda x: x[1][1])
|
||
for child in self.deck.children:
|
||
if child.name == rack_info[0][0]:
|
||
target_rack = child
|
||
target_rack = cast(TipRack, target_rack)
|
||
available_tips = {}
|
||
for idx, tipSpot in enumerate(target_rack.get_all_items()):
|
||
if tipSpot.has_tip():
|
||
available_tips[idx] = tipSpot
|
||
continue
|
||
# 一般移动液体有两种方式,一对多和多对多
|
||
print("channel_num", self.channel_num)
|
||
if self.channel_num == 8:
|
||
|
||
tip_prefix = list(available_tips.values())[0].name.split("_")[0]
|
||
colnum_list = [int(tip.name.split("_")[-1][1:]) for tip in available_tips.values()]
|
||
available_cols = [colnum for colnum, count in dict(Counter(colnum_list)).items() if count == 8]
|
||
available_cols.sort()
|
||
available_tips_dict = {tip.name: tip for tip in available_tips.values()}
|
||
tips_to_use = [available_tips_dict[f"{tip_prefix}_{chr(65 + i)}{available_cols[0]}"] for i in range(8)]
|
||
print("tips_to_use", tips_to_use)
|
||
await self.pick_up_tips(tips_to_use, use_channels=list(range(0, 8)))
|
||
print("source_wells", source_wells)
|
||
await self.aspirate(source_wells, [unit_volume] * 8, use_channels=list(range(0, 8)))
|
||
print("target_wells", target_wells)
|
||
await self.dispense(target_wells, [unit_volume] * 8, use_channels=list(range(0, 8)))
|
||
await self.discard_tips(use_channels=list(range(0, 8)))
|
||
|
||
elif self.channel_num == 1:
|
||
|
||
for num_well in range(len(target_wells)):
|
||
tip_to_use = available_tips[list(available_tips.keys())[num_well]]
|
||
print("tip_to_use", tip_to_use)
|
||
await self.pick_up_tips([tip_to_use], use_channels=[0])
|
||
print("source_wells", source_wells)
|
||
print("target_wells", target_wells)
|
||
if len(source_wells) == 1:
|
||
await self.aspirate([source_wells[0]], [unit_volume], use_channels=[0])
|
||
else:
|
||
await self.aspirate([source_wells[num_well]], [unit_volume], use_channels=[0])
|
||
await self.dispense([target_wells[num_well]], [unit_volume], use_channels=[0])
|
||
await self.discard_tips(use_channels=[0])
|
||
|
||
else:
|
||
raise ValueError(f"Unsupported channel number {self.channel_num}.")
|
||
|
||
async def create_protocol(
|
||
self,
|
||
protocol_name: str,
|
||
protocol_description: str,
|
||
protocol_version: str,
|
||
protocol_author: str,
|
||
protocol_date: str,
|
||
protocol_type: str,
|
||
none_keys: List[str] = [],
|
||
):
|
||
"""Create a new protocol with the given metadata."""
|
||
pass
|
||
|
||
async def remove_liquid(
|
||
self,
|
||
vols: List[float],
|
||
sources: Sequence[Container],
|
||
waste_liquid: Optional[Container] = None,
|
||
*,
|
||
use_channels: Optional[List[int]] = None,
|
||
flow_rates: Optional[List[Optional[float]]] = None,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
liquid_height: Optional[List[Optional[float]]] = None,
|
||
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||
spread: Optional[Literal["wide", "tight", "custom"]] = "wide",
|
||
delays: Optional[List[int]] = None,
|
||
is_96_well: Optional[bool] = False,
|
||
top: Optional[List[float]] = None,
|
||
none_keys: List[str] = [],
|
||
):
|
||
"""A complete *remove* (aspirate → waste) operation."""
|
||
|
||
try:
|
||
if is_96_well:
|
||
pass # This mode is not verified.
|
||
else:
|
||
# 首先应该对任务分组,然后每次1个/8个进行操作处理
|
||
if len(use_channels) == 1 and self.backend.num_channels == 1:
|
||
|
||
for _ in range(len(sources)):
|
||
tip = []
|
||
for __ in range(len(use_channels)):
|
||
tip.append(self._get_next_tip())
|
||
await self.pick_up_tips(tip)
|
||
await self.aspirate(
|
||
resources=[sources[_]],
|
||
vols=[vols[_]],
|
||
use_channels=use_channels,
|
||
flow_rates=[flow_rates[0]] if flow_rates else None,
|
||
offsets=[offsets[0]] if offsets else None,
|
||
liquid_height=[liquid_height[0]] if liquid_height else None,
|
||
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None,
|
||
spread=spread,
|
||
)
|
||
if delays is not None:
|
||
await self.custom_delay(seconds=delays[0])
|
||
|
||
await self.dispense(
|
||
resources=[waste_liquid],
|
||
vols=[vols[_]],
|
||
use_channels=use_channels,
|
||
flow_rates=[flow_rates[1]] if flow_rates else None,
|
||
offsets=[offsets[1]] if offsets else None,
|
||
blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None,
|
||
liquid_height=[liquid_height[1]] if liquid_height else None,
|
||
spread=spread,
|
||
)
|
||
await self.discard_tips()
|
||
|
||
elif len(use_channels) == 8 and self.backend.num_channels == 8:
|
||
|
||
# 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理
|
||
if len(sources) % 8 != 0:
|
||
raise ValueError(
|
||
f"Length of `sources` {len(sources)} must be a multiple of 8 for 8-channel mode."
|
||
)
|
||
|
||
# 8个8个来取任务序列
|
||
|
||
for i in range(0, len(sources), 8):
|
||
tip = []
|
||
for _ in range(len(use_channels)):
|
||
tip.append(self._get_next_tip())
|
||
await self.pick_up_tips(tip)
|
||
current_targets = waste_liquid[i : i + 8]
|
||
current_reagent_sources = sources[i : i + 8]
|
||
current_asp_vols = vols[i : i + 8]
|
||
current_dis_vols = vols[i : i + 8]
|
||
current_asp_flow_rates = flow_rates[i : i + 8] if flow_rates else [None] * 8
|
||
current_dis_flow_rates = (
|
||
flow_rates[-i * 8 - 8 : len(flow_rates) - i * 8] if flow_rates else [None] * 8
|
||
)
|
||
current_asp_offset = offsets[i : i + 8] if offsets else [None] * 8
|
||
current_dis_offset = offsets[-i * 8 - 8 : len(offsets) - i * 8] if offsets else [None] * 8
|
||
current_asp_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8
|
||
current_dis_liquid_height = (
|
||
liquid_height[-i * 8 - 8 : len(liquid_height) - i * 8] if liquid_height else [None] * 8
|
||
)
|
||
current_asp_blow_out_air_volume = (
|
||
blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8
|
||
)
|
||
current_dis_blow_out_air_volume = (
|
||
blow_out_air_volume[-i * 8 - 8 : len(blow_out_air_volume) - i * 8]
|
||
if blow_out_air_volume
|
||
else [None] * 8
|
||
)
|
||
|
||
await self.aspirate(
|
||
resources=current_reagent_sources,
|
||
vols=current_asp_vols,
|
||
use_channels=use_channels,
|
||
flow_rates=current_asp_flow_rates,
|
||
offsets=current_asp_offset,
|
||
liquid_height=current_asp_liquid_height,
|
||
blow_out_air_volume=current_asp_blow_out_air_volume,
|
||
spread=spread,
|
||
)
|
||
if delays is not None:
|
||
await self.custom_delay(seconds=delays[0])
|
||
await self.dispense(
|
||
resources=current_targets,
|
||
vols=current_dis_vols,
|
||
use_channels=use_channels,
|
||
flow_rates=current_dis_flow_rates,
|
||
offsets=current_dis_offset,
|
||
liquid_height=current_dis_liquid_height,
|
||
blow_out_air_volume=current_dis_blow_out_air_volume,
|
||
spread=spread,
|
||
)
|
||
if delays is not None and len(delays) > 1:
|
||
await self.custom_delay(seconds=delays[1])
|
||
await self.touch_tip(current_targets)
|
||
await self.discard_tips()
|
||
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
raise RuntimeError(f"Liquid addition failed: {e}") from e
|
||
|
||
# ---------------------------------------------------------------
|
||
# ADD LIQUID -----------------------------------------------------
|
||
# ---------------------------------------------------------------
|
||
|
||
async def add_liquid(
|
||
self,
|
||
asp_vols: Union[List[float], float],
|
||
dis_vols: Union[List[float], float],
|
||
reagent_sources: Sequence[Container],
|
||
targets: Sequence[Container],
|
||
*,
|
||
use_channels: Optional[List[int]] = None,
|
||
flow_rates: Optional[List[Optional[float]]] = None,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
liquid_height: Optional[List[Optional[float]]] = None,
|
||
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||
spread: Optional[Literal["wide", "tight", "custom"]] = "wide",
|
||
is_96_well: bool = False,
|
||
delays: Optional[List[int]] = None,
|
||
mix_time: Optional[int] = None,
|
||
mix_vol: Optional[int] = None,
|
||
mix_rate: Optional[int] = None,
|
||
mix_liquid_height: Optional[float] = None,
|
||
none_keys: List[str] = [],
|
||
):
|
||
# """A complete *add* (aspirate reagent → dispense into targets) operation."""
|
||
|
||
# # try:
|
||
if is_96_well:
|
||
pass # This mode is not verified.
|
||
else:
|
||
if len(asp_vols) != len(targets):
|
||
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
|
||
# 首先应该对任务分组,然后每次1个/8个进行操作处理
|
||
if len(use_channels) == 1:
|
||
for _ in range(len(targets)):
|
||
tip = []
|
||
for x in range(len(use_channels)):
|
||
tip.append(self._get_next_tip())
|
||
await self.pick_up_tips(tip)
|
||
|
||
await self.aspirate(
|
||
resources=[reagent_sources[_]],
|
||
vols=[asp_vols[_]],
|
||
use_channels=use_channels,
|
||
flow_rates=[flow_rates[0]] if flow_rates else None,
|
||
offsets=[offsets[0]] if offsets else None,
|
||
liquid_height=[liquid_height[0]] if liquid_height else None,
|
||
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None,
|
||
spread=spread,
|
||
)
|
||
|
||
if delays is not None:
|
||
await self.custom_delay(seconds=delays[0])
|
||
await self.dispense(
|
||
resources=[targets[_]],
|
||
vols=[dis_vols[_]],
|
||
use_channels=use_channels,
|
||
flow_rates=[flow_rates[1]] if flow_rates else None,
|
||
offsets=[offsets[1]] if offsets else None,
|
||
blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None,
|
||
liquid_height=[liquid_height[1]] if liquid_height else None,
|
||
spread=spread,
|
||
)
|
||
|
||
if delays is not None and len(delays) > 1:
|
||
await self.custom_delay(seconds=delays[1])
|
||
# 只有在 mix_time 有效时才调用 mix
|
||
if mix_time is not None and mix_time > 0:
|
||
await self.mix(
|
||
targets=[targets[_]],
|
||
mix_time=mix_time,
|
||
mix_vol=mix_vol,
|
||
offsets=offsets if offsets else None,
|
||
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||
mix_rate=mix_rate if mix_rate else None,
|
||
)
|
||
if delays is not None and len(delays) > 1:
|
||
await self.custom_delay(seconds=delays[1])
|
||
await self.touch_tip(targets[_])
|
||
await self.discard_tips()
|
||
|
||
elif len(use_channels) == 8:
|
||
# 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理
|
||
if len(targets) % 8 != 0:
|
||
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
|
||
|
||
for i in range(0, len(targets), 8):
|
||
tip = []
|
||
for _ in range(len(use_channels)):
|
||
tip.append(self._get_next_tip())
|
||
await self.pick_up_tips(tip)
|
||
current_targets = targets[i : i + 8]
|
||
current_reagent_sources = reagent_sources[i : i + 8]
|
||
current_asp_vols = asp_vols[i : i + 8]
|
||
current_dis_vols = dis_vols[i : i + 8]
|
||
current_asp_flow_rates = flow_rates[i : i + 8] if flow_rates else [None] * 8
|
||
current_dis_flow_rates = (
|
||
flow_rates[-i * 8 - 8 : len(flow_rates) - i * 8] if flow_rates else [None] * 8
|
||
)
|
||
current_asp_offset = offsets[i : i + 8] if offsets else [None] * 8
|
||
current_dis_offset = offsets[-i * 8 - 8 : len(offsets) - i * 8] if offsets else [None] * 8
|
||
current_asp_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8
|
||
current_dis_liquid_height = (
|
||
liquid_height[-i * 8 - 8 : len(liquid_height) - i * 8] if liquid_height else [None] * 8
|
||
)
|
||
current_asp_blow_out_air_volume = (
|
||
blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8
|
||
)
|
||
current_dis_blow_out_air_volume = (
|
||
blow_out_air_volume[-i * 8 - 8 : len(blow_out_air_volume) - i * 8]
|
||
if blow_out_air_volume
|
||
else [None] * 8
|
||
)
|
||
|
||
await self.aspirate(
|
||
resources=current_reagent_sources,
|
||
vols=current_asp_vols,
|
||
use_channels=use_channels,
|
||
flow_rates=current_asp_flow_rates,
|
||
offsets=current_asp_offset,
|
||
liquid_height=current_asp_liquid_height,
|
||
blow_out_air_volume=current_asp_blow_out_air_volume,
|
||
spread=spread,
|
||
)
|
||
if delays is not None:
|
||
await self.custom_delay(seconds=delays[0])
|
||
await self.dispense(
|
||
resources=current_targets,
|
||
vols=current_dis_vols,
|
||
use_channels=use_channels,
|
||
flow_rates=current_dis_flow_rates,
|
||
offsets=current_dis_offset,
|
||
liquid_height=current_dis_liquid_height,
|
||
blow_out_air_volume=current_dis_blow_out_air_volume,
|
||
spread=spread,
|
||
)
|
||
if delays is not None and len(delays) > 1:
|
||
await self.custom_delay(seconds=delays[1])
|
||
|
||
# 只有在 mix_time 有效时才调用 mix
|
||
if mix_time is not None and mix_time > 0:
|
||
await self.mix(
|
||
targets=current_targets,
|
||
mix_time=mix_time,
|
||
mix_vol=mix_vol,
|
||
offsets=offsets if offsets else None,
|
||
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||
mix_rate=mix_rate if mix_rate else None,
|
||
)
|
||
if delays is not None and len(delays) > 1:
|
||
await self.custom_delay(seconds=delays[1])
|
||
await self.touch_tip(current_targets)
|
||
await self.discard_tips()
|
||
|
||
# except Exception as e:
|
||
# traceback.print_exc()
|
||
# raise RuntimeError(f"Liquid addition failed: {e}") from e
|
||
|
||
# ---------------------------------------------------------------
|
||
# TRANSFER LIQUID ------------------------------------------------
|
||
# ---------------------------------------------------------------
|
||
async def transfer_liquid(
|
||
self,
|
||
sources: Sequence[Union[Container, Dict[str, Any]]],
|
||
targets: Sequence[Union[Container, Dict[str, Any]]],
|
||
tip_racks: Sequence[Union[TipRack, Dict[str, Any]]],
|
||
*,
|
||
use_channels: Optional[List[int]] = None,
|
||
asp_vols: Union[List[float], float],
|
||
dis_vols: Union[List[float], float],
|
||
asp_flow_rates: Optional[List[Optional[float]]] = None,
|
||
dis_flow_rates: Optional[List[Optional[float]]] = None,
|
||
offsets: Optional[List[Coordinate]] = None,
|
||
touch_tip: bool = False,
|
||
liquid_height: Optional[List[Optional[float]]] = None,
|
||
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||
blow_out_air_volume_before: Optional[List[Optional[float]]] = None,
|
||
spread: Literal["wide", "tight", "custom"] = "wide",
|
||
is_96_well: bool = False,
|
||
mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none",
|
||
mix_times: Optional[int] = None,
|
||
mix_vol: Optional[int] = None,
|
||
mix_rate: Optional[int] = None,
|
||
mix_liquid_height: Optional[float] = None,
|
||
delays: Optional[List[int]] = None,
|
||
none_keys: List[str] = [],
|
||
) -> TransferLiquidReturn:
|
||
"""Transfer liquid with automatic mode detection.
|
||
|
||
Supports three transfer modes:
|
||
1. One-to-many (1 source -> N targets): Distribute from one source to multiple targets
|
||
2. One-to-one (N sources -> N targets): Standard transfer, each source to corresponding target
|
||
3. Many-to-one (N sources -> 1 target): Combine multiple sources into one target
|
||
|
||
Parameters
|
||
----------
|
||
asp_vols, dis_vols
|
||
Single volume (µL) or list. Automatically expanded based on transfer mode.
|
||
sources, targets
|
||
Containers (wells or plates),可为 PLR 实例或 dict(含 uuid 字段,将自动解析)。
|
||
Length determines transfer mode:
|
||
- len(sources) == 1, len(targets) > 1: One-to-many mode
|
||
- len(sources) == len(targets): One-to-one mode
|
||
- len(sources) > 1, len(targets) == 1: Many-to-one mode
|
||
tip_racks
|
||
One or more TipRacks(可为 PLR 实例或含 uuid 的 dict)providing fresh tips.
|
||
is_96_well
|
||
Set *True* to use the 96‑channel head.
|
||
mix_stage
|
||
When to mix the target wells relative to dispensing. Default "none" means
|
||
no mixing occurs even if mix_times is provided. Use "before", "after", or
|
||
"both" to mix at the corresponding stage(s).
|
||
mix_times
|
||
Number of mix cycles. If *None* (default) no mixing occurs regardless of
|
||
mix_stage.
|
||
"""
|
||
# 若传入 dict(含 uuid),解析为 PLR Container/TipRack
|
||
sources = await self._resolve_to_plr_resources(sources)
|
||
targets = await self._resolve_to_plr_resources(targets)
|
||
tip_racks = list(await self._resolve_to_plr_resources(tip_racks))
|
||
num_sources = len(sources)
|
||
num_targets = len(targets)
|
||
len_asp_vols = len(asp_vols)
|
||
len_dis_vols = len(dis_vols)
|
||
# 确保 use_channels 有默认值
|
||
if use_channels is None or len(use_channels) == 0:
|
||
# 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7)
|
||
use_channels = list(range(self.channel_num)) if self.channel_num == 8 else [0]
|
||
elif len(use_channels) == 8:
|
||
if self.channel_num != 8:
|
||
raise ValueError(f"if channel_num is 8, use_channels length must be 8, but got {len(use_channels)}")
|
||
if num_sources%8 != 0 or num_targets%8 != 0 or len_asp_vols%8 != 0 or len_dis_vols%8 != 0:
|
||
raise ValueError(f"if channel_num is 8, sources, targets, asp_vols, and dis_vols length must be divisible by 8, but got {num_sources}, {num_targets}, {len_asp_vols}, and {len_dis_vols}")
|
||
|
||
if is_96_well:
|
||
pass # This mode is not verified.
|
||
else:
|
||
# 转换体积参数为列表
|
||
if isinstance(asp_vols, (int, float)):
|
||
asp_vols = [float(asp_vols)]
|
||
else:
|
||
asp_vols = [float(v) for v in asp_vols]
|
||
|
||
if isinstance(dis_vols, (int, float)):
|
||
dis_vols = [float(dis_vols)]
|
||
else:
|
||
dis_vols = [float(v) for v in dis_vols]
|
||
|
||
# 统一混合次数为标量,防止数组/列表与 int 比较时报错
|
||
if mix_times is not None and not isinstance(mix_times, (int, float)):
|
||
try:
|
||
mix_times = mix_times[0] if len(mix_times) > 0 else None
|
||
except Exception:
|
||
try:
|
||
mix_times = next(iter(mix_times))
|
||
except Exception:
|
||
pass
|
||
if mix_times is not None:
|
||
mix_times = int(mix_times)
|
||
|
||
# 设置tip racks
|
||
self.set_tiprack(tip_racks)
|
||
|
||
# 识别传输模式(mix_times 为 None 也应该能正常移液,只是不做 mix)
|
||
num_sources = len(sources)
|
||
num_targets = len(targets)
|
||
len_asp_vols = len(asp_vols)
|
||
len_dis_vols = len(dis_vols)
|
||
|
||
# if num_targets != 1 and num_sources != 1:
|
||
# if len_asp_vols != num_sources and len_asp_vols != num_targets:
|
||
# raise ValueError(f"asp_vols length must be equal to sources or targets length, but got {len_asp_vols} and {num_sources} and {num_targets}")
|
||
# if len_dis_vols != num_sources and len_dis_vols != num_targets:
|
||
# raise ValueError(f"dis_vols length must be equal to sources or targets length, but got {len_dis_vols} and {num_sources} and {num_targets}")
|
||
|
||
if len(use_channels) != 8:
|
||
max_len = max(num_sources, num_targets)
|
||
prev_dropped = True # 循环开始前通道上无 tip
|
||
for i in range(max_len):
|
||
|
||
# 辅助函数:
|
||
# - wrap=True: 返回 [value](用于 liquid_height 等列表参数)
|
||
# - wrap=False: 返回 value(用于 mix_* 标量参数)
|
||
def safe_get(value, idx, default=None, wrap: bool = True):
|
||
if value is None:
|
||
return default
|
||
try:
|
||
if isinstance(value, (list, tuple)):
|
||
if len(value) == 0:
|
||
return default
|
||
item = value[idx % len(value)]
|
||
else:
|
||
item = value
|
||
return [item] if wrap else item
|
||
except Exception:
|
||
return default
|
||
|
||
# 动态构建参数字典,只传递实际提供的参数
|
||
kwargs = {
|
||
'sources': [sources[i%num_sources]],
|
||
'targets': [targets[i%num_targets]],
|
||
'tip_racks': tip_racks,
|
||
'use_channels': use_channels,
|
||
'asp_vols': [asp_vols[i%len_asp_vols]],
|
||
'dis_vols': [dis_vols[i%len_dis_vols]],
|
||
}
|
||
|
||
# 条件性添加可选参数
|
||
if asp_flow_rates is not None:
|
||
kwargs['asp_flow_rates'] = [asp_flow_rates[i%len_asp_vols]]
|
||
if dis_flow_rates is not None:
|
||
kwargs['dis_flow_rates'] = [dis_flow_rates[i%len_dis_vols]]
|
||
if offsets is not None:
|
||
kwargs['offsets'] = safe_get(offsets, i)
|
||
if touch_tip is not None:
|
||
kwargs['touch_tip'] = touch_tip if touch_tip else False
|
||
if liquid_height is not None:
|
||
kwargs['liquid_height'] = safe_get(liquid_height, i)
|
||
if blow_out_air_volume is not None:
|
||
kwargs['blow_out_air_volume'] = safe_get(blow_out_air_volume, i)
|
||
if blow_out_air_volume_before is not None:
|
||
kwargs['blow_out_air_volume_before'] = safe_get(blow_out_air_volume_before, i)
|
||
if spread is not None:
|
||
kwargs['spread'] = spread
|
||
if mix_stage is not None:
|
||
kwargs['mix_stage'] = safe_get(mix_stage, i, wrap=False)
|
||
if mix_times is not None:
|
||
kwargs['mix_times'] = safe_get(mix_times, i, wrap=False)
|
||
if mix_vol is not None:
|
||
kwargs['mix_vol'] = safe_get(mix_vol, i, wrap=False)
|
||
if mix_rate is not None:
|
||
kwargs['mix_rate'] = safe_get(mix_rate, i, wrap=False)
|
||
if mix_liquid_height is not None:
|
||
kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i, wrap=False)
|
||
if delays is not None:
|
||
kwargs['delays'] = safe_get(delays, i)
|
||
|
||
cur_source = sources[i % num_sources]
|
||
cur_target = targets[i % num_targets]
|
||
|
||
# drop: 仅当下一轮的 source 和 target 都相同时才保留 tip(下一轮可以复用)
|
||
drop_tip = True
|
||
if i < max_len - 1:
|
||
next_source = sources[(i + 1) % num_sources]
|
||
next_target = targets[(i + 1) % num_targets]
|
||
if cur_target is next_target and cur_source is next_source:
|
||
drop_tip = False
|
||
|
||
# pick_up: 仅当上一轮保留了 tip(未 drop)且 source 相同时才复用
|
||
pick_up_tip = True
|
||
if i > 0 and not prev_dropped:
|
||
prev_source = sources[(i - 1) % num_sources]
|
||
if cur_source is prev_source:
|
||
pick_up_tip = False
|
||
|
||
prev_dropped = drop_tip
|
||
|
||
kwargs['pick_up'] = pick_up_tip
|
||
kwargs['drop'] = drop_tip
|
||
|
||
await self._transfer_base_method(**kwargs)
|
||
|
||
|
||
return TransferLiquidReturn(
|
||
sources=ResourceTreeSet.from_plr_resources(list(sources), known_newly_created=False).dump(), # type: ignore
|
||
targets=ResourceTreeSet.from_plr_resources(list(targets), known_newly_created=False).dump(), # type: ignore
|
||
)
|
||
async def _transfer_base_method(
|
||
self,
|
||
sources: Sequence[Container],
|
||
targets: Sequence[Container],
|
||
tip_racks: Sequence[TipRack],
|
||
use_channels: List[int],
|
||
asp_vols: List[float],
|
||
dis_vols: List[float],
|
||
pick_up: bool = True,
|
||
drop: bool = True,
|
||
**kwargs
|
||
):
|
||
|
||
# 从kwargs中提取参数,提供默认值
|
||
asp_flow_rates = kwargs.get('asp_flow_rates')
|
||
dis_flow_rates = kwargs.get('dis_flow_rates')
|
||
offsets = kwargs.get('offsets')
|
||
touch_tip = kwargs.get('touch_tip', False)
|
||
liquid_height = kwargs.get('liquid_height')
|
||
blow_out_air_volume = kwargs.get('blow_out_air_volume')
|
||
blow_out_air_volume_before = kwargs.get('blow_out_air_volume_before')
|
||
spread = kwargs.get('spread', 'wide')
|
||
mix_stage = kwargs.get('mix_stage')
|
||
mix_times = kwargs.get('mix_times')
|
||
mix_vol = kwargs.get('mix_vol')
|
||
mix_rate = kwargs.get('mix_rate')
|
||
mix_liquid_height = kwargs.get('mix_liquid_height')
|
||
delays = kwargs.get('delays')
|
||
|
||
tip = []
|
||
if pick_up:
|
||
tip.append(self._get_next_tip())
|
||
await self.pick_up_tips(tip)
|
||
blow_out_air_volume_before_vol = 0.0
|
||
if blow_out_air_volume_before is not None and len(blow_out_air_volume_before) > 0:
|
||
blow_out_air_volume_before_vol = float(blow_out_air_volume_before[0] or 0.0)
|
||
blow_out_air_volume_vol = 0.0
|
||
if blow_out_air_volume is not None and len(blow_out_air_volume) > 0:
|
||
blow_out_air_volume_vol = float(blow_out_air_volume[0] or 0.0)
|
||
# PLR 的 blow_out_air_volume 是空气参数,不计入液体体积。
|
||
# before 空气通过单独预吸实现,after 空气通过 blow_out_air_volume 参数实现。
|
||
|
||
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
||
await self.mix(
|
||
targets=[targets[0]],
|
||
mix_time=mix_times,
|
||
mix_vol=mix_vol,
|
||
offsets=offsets if offsets else None,
|
||
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||
mix_rate=mix_rate if mix_rate else None,
|
||
use_channels=use_channels,
|
||
)
|
||
|
||
if blow_out_air_volume_before_vol > 0:
|
||
source_tracker = getattr(sources[0], "tracker", None)
|
||
source_tracker_was_disabled = bool(getattr(source_tracker, "is_disabled", False))
|
||
try:
|
||
if source_tracker is not None and hasattr(source_tracker, "disable"):
|
||
source_tracker.disable()
|
||
await self.aspirate(
|
||
resources=[sources[0]],
|
||
vols=[blow_out_air_volume_before_vol],
|
||
use_channels=use_channels,
|
||
flow_rates=None,
|
||
offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())],
|
||
liquid_height=None,
|
||
blow_out_air_volume=None,
|
||
spread="custom",
|
||
)
|
||
finally:
|
||
if source_tracker is not None:
|
||
source_tracker.enable()
|
||
|
||
await self.aspirate(
|
||
resources=[sources[0]],
|
||
vols=[asp_vols[0]],
|
||
use_channels=use_channels,
|
||
flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None,
|
||
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
|
||
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
|
||
blow_out_air_volume=(
|
||
[blow_out_air_volume_vol] if blow_out_air_volume_vol > 0 else None
|
||
),
|
||
spread=spread,
|
||
)
|
||
if delays is not None:
|
||
await self.custom_delay(seconds=delays[0])
|
||
await self.dispense(
|
||
resources=[targets[0]],
|
||
vols=[dis_vols[0]],
|
||
use_channels=use_channels,
|
||
flow_rates=[dis_flow_rates[0]] if dis_flow_rates and len(dis_flow_rates) > 0 else None,
|
||
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
|
||
blow_out_air_volume=(
|
||
[blow_out_air_volume_vol] if blow_out_air_volume_vol > 0 else None
|
||
),
|
||
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
|
||
spread=spread,
|
||
)
|
||
if delays is not None and len(delays) > 1:
|
||
await self.custom_delay(seconds=delays[1])
|
||
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
|
||
await self.mix(
|
||
targets=[targets[0]],
|
||
mix_time=mix_times,
|
||
mix_vol=mix_vol,
|
||
offsets=offsets if offsets else None,
|
||
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||
mix_rate=mix_rate if mix_rate else None,
|
||
use_channels=use_channels,
|
||
)
|
||
if delays is not None and len(delays) > 1:
|
||
await self.custom_delay(seconds=delays[0])
|
||
await self.touch_tip(targets[0])
|
||
if drop:
|
||
await self.discard_tips(use_channels=use_channels)
|
||
|
||
# except Exception as e:
|
||
# traceback.print_exc()
|
||
# raise RuntimeError(f"Liquid addition failed: {e}") from e
|
||
|
||
# ---------------------------------------------------------------
|
||
# Helper utilities
|
||
# ---------------------------------------------------------------
|
||
|
||
async def custom_delay(self, seconds=0, msg=None):
|
||
"""
|
||
seconds: seconds to wait
|
||
msg: information to be printed
|
||
"""
|
||
if seconds != None and seconds > 0:
|
||
if msg:
|
||
print(f"Waiting time: {msg}")
|
||
print(f"Current time: {time.strftime('%H:%M:%S')}")
|
||
print(f"Time to finish: {time.strftime('%H:%M:%S', time.localtime(time.time() + seconds))}")
|
||
# Use ROS node sleep if available, otherwise use asyncio.sleep
|
||
if hasattr(self, '_ros_node') and self._ros_node is not None:
|
||
await self._ros_node.sleep(seconds)
|
||
else:
|
||
import asyncio
|
||
await asyncio.sleep(seconds)
|
||
if msg:
|
||
print(f"Done: {msg}")
|
||
print(f"Current time: {time.strftime('%H:%M:%S')}")
|
||
|
||
async def touch_tip(self, targets: Sequence[Container]):
|
||
"""Touch the tip to the side of the well."""
|
||
|
||
if not self.support_touch_tip:
|
||
return
|
||
await self.aspirate(
|
||
resources=[targets],
|
||
vols=[0],
|
||
use_channels=None,
|
||
flow_rates=None,
|
||
offsets=[Coordinate(x=-targets.get_size_x() / 2, y=0, z=0)],
|
||
liquid_height=None,
|
||
blow_out_air_volume=None,
|
||
)
|
||
# await self.custom_delay(seconds=1) # In the simulation, we do not need to wait
|
||
await self.aspirate(
|
||
resources=[targets],
|
||
vols=[0],
|
||
use_channels=None,
|
||
flow_rates=None,
|
||
offsets=[Coordinate(x=targets.get_size_x() / 2, y=0, z=0)],
|
||
liquid_height=None,
|
||
blow_out_air_volume=None,
|
||
)
|
||
|
||
async def mix(
|
||
self,
|
||
targets: Sequence[Container],
|
||
mix_time: int = None,
|
||
mix_vol: Optional[int] = None,
|
||
height_to_bottom: Optional[float] = None,
|
||
offsets: Optional[Coordinate] = None,
|
||
mix_rate: Optional[float] = None,
|
||
use_channels: Optional[List[int]] = None,
|
||
none_keys: List[str] = [],
|
||
):
|
||
if mix_time is None or mix_time <= 0: # No mixing required
|
||
return
|
||
"""Mix the liquid in the target wells."""
|
||
if mix_vol is None:
|
||
raise ValueError("`mix_vol` must be provided when `mix_time` is set.")
|
||
|
||
targets_list: List[Container] = list(targets)
|
||
if len(targets_list) == 0:
|
||
return
|
||
|
||
def _expand(value, count: int):
|
||
if value is None:
|
||
return [None] * count
|
||
if isinstance(value, (list, tuple)):
|
||
if len(value) != count:
|
||
raise ValueError("Length of per-target parameters must match targets.")
|
||
return list(value)
|
||
return [value] * count
|
||
|
||
offsets_list = _expand(offsets, len(targets_list))
|
||
heights_list = _expand(height_to_bottom, len(targets_list))
|
||
rates_list = _expand(mix_rate, len(targets_list))
|
||
|
||
for _ in range(mix_time):
|
||
for idx, target in enumerate(targets_list):
|
||
offset_arg = (
|
||
[offsets_list[idx]] if offsets_list[idx] is not None else None
|
||
)
|
||
height_arg = (
|
||
[heights_list[idx]] if heights_list[idx] is not None else None
|
||
)
|
||
rate_arg = [rates_list[idx]] if rates_list[idx] is not None else None
|
||
|
||
await self.aspirate(
|
||
resources=[target],
|
||
vols=[mix_vol],
|
||
use_channels=use_channels,
|
||
flow_rates=rate_arg,
|
||
offsets=offset_arg,
|
||
liquid_height=height_arg,
|
||
)
|
||
await self.custom_delay(seconds=1)
|
||
await self.dispense(
|
||
resources=[target],
|
||
vols=[mix_vol],
|
||
use_channels=use_channels,
|
||
flow_rates=rate_arg,
|
||
offsets=offset_arg,
|
||
liquid_height=height_arg,
|
||
)
|
||
|
||
def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]:
|
||
"""Yield tips from a list of TipRacks one-by-one until depleted."""
|
||
for rack in tip_racks:
|
||
if isinstance(rack, TipSpot):
|
||
yield rack
|
||
elif isinstance(rack, TipRack):
|
||
for item in rack:
|
||
if isinstance(item, list):
|
||
yield from item
|
||
else:
|
||
yield item
|
||
|
||
def _get_next_tip(self):
|
||
"""从 current_tip 迭代器获取下一个 tip,耗尽时抛出明确错误而非 StopIteration"""
|
||
try:
|
||
return next(self.current_tip)
|
||
except StopIteration as e:
|
||
diag_parts = []
|
||
tip_racks = getattr(self, 'tip_racks', None)
|
||
if tip_racks is not None:
|
||
for idx, rack in enumerate(tip_racks):
|
||
r_name = getattr(rack, 'name', '?')
|
||
r_type = type(rack).__name__
|
||
is_tr = isinstance(rack, TipRack)
|
||
is_ts = isinstance(rack, TipSpot)
|
||
n_children = len(getattr(rack, 'children', []))
|
||
diag_parts.append(
|
||
f"rack[{idx}] name={r_name}, type={r_type}, "
|
||
f"is_TipRack={is_tr}, is_TipSpot={is_ts}, children={n_children}"
|
||
)
|
||
else:
|
||
diag_parts.append("tip_racks=None")
|
||
by_type = getattr(self, '_tip_racks_by_type', {})
|
||
diag_parts.append(f"_tip_racks_by_type keys={list(by_type.keys())}")
|
||
raise RuntimeError(
|
||
f"Tip rack exhausted: no more tips available for transfer. "
|
||
f"Diagnostics: {'; '.join(diag_parts)}"
|
||
) from e
|
||
|
||
def set_tiprack(self, tip_racks: Sequence[TipRack]):
|
||
"""Set the tip racks for the liquid handler.
|
||
|
||
Groups tip racks by type name (``type(rack).__name__``).
|
||
- Only actual TipRack / TipSpot instances are registered.
|
||
- If a rack has already been registered (by ``name``), it is skipped.
|
||
- If a rack is new and its type already exists, it is appended to that type's list.
|
||
- If the type is new, a new key-value pair is created.
|
||
|
||
If the current ``tip_racks`` contain no valid TipRack/TipSpot (e.g. a
|
||
Plate was passed by mistake), the iterator falls back to all previously
|
||
registered racks.
|
||
"""
|
||
if not hasattr(self, '_tip_racks_by_type'):
|
||
self._tip_racks_by_type: Dict[str, List[TipRack]] = {}
|
||
self._seen_rack_names: Set[str] = set()
|
||
|
||
for rack in tip_racks:
|
||
if not isinstance(rack, (TipRack, TipSpot)):
|
||
continue
|
||
rack_name = rack.name if hasattr(rack, 'name') else str(id(rack))
|
||
if rack_name in self._seen_rack_names:
|
||
continue
|
||
self._seen_rack_names.add(rack_name)
|
||
type_key = type(rack).__name__
|
||
if type_key not in self._tip_racks_by_type:
|
||
self._tip_racks_by_type[type_key] = []
|
||
self._tip_racks_by_type[type_key].append(rack)
|
||
|
||
valid_racks = [r for r in tip_racks if isinstance(r, (TipRack, TipSpot))]
|
||
if not valid_racks:
|
||
valid_racks = [r for racks in self._tip_racks_by_type.values() for r in racks]
|
||
|
||
self.tip_racks = tip_racks
|
||
self.current_tip = self.iter_tips(valid_racks)
|
||
|
||
async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):
|
||
"""
|
||
Move a single channel to a specific well with a given z-height.
|
||
|
||
Parameters
|
||
----------
|
||
well : Well
|
||
The target well.
|
||
dis_to_top : float
|
||
Height in mm to move to relative to the well top.
|
||
channel : int
|
||
Pipetting channel to move (default: 0).
|
||
"""
|
||
await self.prepare_for_manual_channel_operation(channel=channel)
|
||
abs_loc = well.get_absolute_location()
|
||
well_height = well.get_absolute_size_z()
|
||
await self.move_channel_x(channel, abs_loc.x)
|
||
await self.move_channel_y(channel, abs_loc.y)
|
||
await self.move_channel_z(channel, abs_loc.z + well_height + dis_to_top)
|