完成mix,liquid_hight,touch_tip,delay等参数的传递

This commit is contained in:
q434343
2026-03-12 13:58:06 +08:00
parent 5f45a0b81b
commit d85ff540c4
4 changed files with 306 additions and 55 deletions

View File

@@ -9,6 +9,7 @@ from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
from pylabrobot.liquid_handling.standard import GripDirection
from pylabrobot.resources.errors import TooLittleLiquidError, TooLittleVolumeError
from pylabrobot.resources import (
Resource,
TipRack,
@@ -211,10 +212,37 @@ class LiquidHandlerMiddleware(LiquidHandler):
):
if spread == "":
spread = "wide"
def _safe_aspirate_volumes(_resources: Sequence[Container], _vols: List[float]) -> List[float]:
"""将 aspirate 体积裁剪到源容器当前液量范围内,避免 volume tracker 报错。"""
safe: List[float] = []
for res, vol in zip(_resources, _vols):
req = max(float(vol), 0.0)
used_volume = None
try:
tracker = getattr(res, "tracker", None)
if bool(getattr(tracker, "is_disabled", False)):
# tracker 关闭时(例如预吸空气),不按液体体积裁剪
safe.append(req)
continue
get_used = getattr(tracker, "get_used_volume", None)
if callable(get_used):
used_volume = get_used()
except Exception:
used_volume = None
if isinstance(used_volume, (int, float)):
req = min(req, max(float(used_volume), 0.0))
safe.append(req)
return safe
actual_vols = _safe_aspirate_volumes(resources, vols)
if actual_vols != vols and hasattr(self, "_ros_node") and self._ros_node is not None:
self._ros_node.lab_logger().warning(f"[aspirate] volume adjusted, requested_vols={vols}, actual_vols={actual_vols}")
if self._simulator:
return await self._simulate_handler.aspirate(
resources,
vols,
actual_vols,
use_channels,
flow_rates,
offsets,
@@ -226,7 +254,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
try:
await super().aspirate(
resources,
vols,
actual_vols,
use_channels,
flow_rates,
offsets,
@@ -239,7 +267,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
if "Resource is too small to space channels" in str(e) and spread != "custom":
await super().aspirate(
resources,
vols,
actual_vols,
use_channels,
flow_rates,
offsets,
@@ -250,6 +278,24 @@ class LiquidHandlerMiddleware(LiquidHandler):
)
else:
raise
except TooLittleLiquidError:
# 再兜底一次:按实时可用液量重算后重试,避免状态更新竞争导致的瞬时不足
retry_vols = _safe_aspirate_volumes(resources, actual_vols)
if any(v > 0 for v in retry_vols):
await super().aspirate(
resources,
retry_vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
actual_vols = retry_vols
else:
actual_vols = retry_vols
res_samples = []
res_volumes = []
@@ -260,7 +306,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
else:
channels_to_use = use_channels
for resource, volume, channel in zip(resources, vols, channels_to_use):
for resource, volume, channel in zip(resources, actual_vols, channels_to_use):
sample_uuid_value = getattr(resource, "unilabos_extra", {}).get(EXTRA_SAMPLE_UUID, None)
res_samples.append({"name": resource.name, "sample_uuid": sample_uuid_value})
res_volumes.append(volume)
@@ -284,10 +330,32 @@ class LiquidHandlerMiddleware(LiquidHandler):
) -> SimpleReturn:
if spread == "":
spread = "wide"
def _safe_dispense_volumes(_resources: Sequence[Container], _vols: List[float]) -> List[float]:
"""将 dispense 体积裁剪到目标容器可用体积范围内,避免 volume tracker 报错。"""
safe: List[float] = []
for res, vol in zip(_resources, _vols):
req = max(float(vol), 0.0)
free_volume = None
try:
tracker = getattr(res, "tracker", None)
get_free = getattr(tracker, "get_free_volume", None)
if callable(get_free):
free_volume = get_free()
except Exception:
free_volume = None
if isinstance(free_volume, (int, float)):
req = min(req, max(float(free_volume), 0.0))
safe.append(req)
return safe
actual_vols = _safe_dispense_volumes(resources, vols)
if self._simulator:
return await self._simulate_handler.dispense(
resources,
vols,
actual_vols,
use_channels,
flow_rates,
offsets,
@@ -299,7 +367,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
try:
await super().dispense(
resources,
vols,
actual_vols,
use_channels,
flow_rates,
offsets,
@@ -312,7 +380,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
if "Resource is too small to space channels" in str(e) and spread != "custom":
await super().dispense(
resources,
vols,
actual_vols,
use_channels,
flow_rates,
offsets,
@@ -323,9 +391,31 @@ class LiquidHandlerMiddleware(LiquidHandler):
)
else:
raise
except TooLittleVolumeError:
# 再兜底一次:按实时 free volume 重新裁剪后重试,避免并发状态更新导致的瞬时超量
retry_vols = _safe_dispense_volumes(resources, actual_vols)
if any(v > 0 for v in retry_vols):
await super().dispense(
resources,
retry_vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
actual_vols = retry_vols
else:
actual_vols = retry_vols
res_samples = []
res_volumes = []
for resource, volume, channel in zip(resources, vols, use_channels):
if use_channels is None:
channels_to_use = [0] * len(resources)
else:
channels_to_use = use_channels
for resource, volume, channel in zip(resources, actual_vols, channels_to_use):
res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID]
self.pending_liquids_dict[channel]["volume"] -= volume
resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid
@@ -728,17 +818,43 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
uuids = [x.get("uuid") or x.get("unilabos_uuid") for _, x in dict_items]
if any(u is None for u in uuids):
raise ValueError("dict 格式的资源必须包含 uuid 或 unilabos_uuid 字段")
resource_tree = await self._ros_node.get_resource(uuids)
plr_list = resource_tree.to_plr_resources(requested_uuids=uuids)
# 映射到设备本地的 resource_tracker 实例(与 deck 一致)
def _resolve_from_local_by_uuids() -> List[Union[Container, TipRack]]:
resolved_locals: List[Union[Container, TipRack]] = []
missing: List[str] = []
for uid in uuids:
matches = self._ros_node.resource_tracker.figure_resource({"uuid": uid}, try_mode=True)
if matches:
resolved_locals.append(cast(Union[Container, TipRack], matches[0]))
else:
missing.append(str(uid))
if missing:
raise ValueError(
f"远端资源树未返回且本地资源也未命中,缺失 UUID: {missing}"
)
return resolved_locals
# 优先走远端资源树查询;若远端为空或 requested_uuids 无法解析,则降级到本地 tracker 按 UUID 解析。
resolved = []
for plr in plr_list:
local = self._ros_node.resource_tracker.figure_resource(
{"name": plr.name}, try_mode=False
)
if hasattr(plr, "unilabos_extra") and hasattr(local, "unilabos_extra"):
local.unilabos_extra = getattr(plr, "unilabos_extra", {}).copy()
resolved.append(local)
try:
resource_tree = await self._ros_node.get_resource(uuids)
plr_list = resource_tree.to_plr_resources(requested_uuids=uuids)
for uid, plr in zip(uuids, plr_list):
local_matches = self._ros_node.resource_tracker.figure_resource({"uuid": uid}, try_mode=True)
if local_matches:
local = cast(Union[Container, TipRack], local_matches[0])
else:
local = cast(Union[Container, TipRack], plr)
if hasattr(plr, "unilabos_extra") and hasattr(local, "unilabos_extra"):
local.unilabos_extra = getattr(plr, "unilabos_extra", {}).copy()
resolved.append(local)
if len(resolved) != len(uuids):
raise ValueError(
f"远端资源解析数量不匹配: requested={len(uuids)}, resolved={len(resolved)}"
)
except Exception:
resolved = _resolve_from_local_by_uuids()
result = list(items)
for (idx, _), plr in zip(dict_items, resolved):
result[idx] = plr
@@ -757,9 +873,18 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore
)
def _clamp_volume(resource: Union[Well, Container], volume: float) -> float:
# 防止初始化液量超过容器容量,导致后续 dispense 时 free volume 为负
clamped = max(float(volume), 0.0)
max_volume = getattr(resource, "max_volume", None)
if isinstance(max_volume, (int, float)) and max_volume > 0:
clamped = min(clamped, float(max_volume))
return clamped
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
well.set_liquids([(liquid_name, volume)]) # type: ignore
res_volumes.append(volume)
safe_volume = _clamp_volume(well, volume)
well.set_liquids([(liquid_name, safe_volume)]) # type: ignore
res_volumes.append(safe_volume)
return SetLiquidReturn(
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore
@@ -789,9 +914,18 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
volumes=res_volumes,
)
def _clamp_volume(resource: Union[Well, Container], volume: float) -> float:
# 防止初始化液量超过容器容量,导致后续 dispense 时 free volume 为负
clamped = max(float(volume), 0.0)
max_volume = getattr(resource, "max_volume", None)
if isinstance(max_volume, (int, float)) and max_volume > 0:
clamped = min(clamped, float(max_volume))
return clamped
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
well.set_liquids([(liquid_name, volume)]) # type: ignore
res_volumes.append(volume)
safe_volume = _clamp_volume(well, volume)
well.set_liquids([(liquid_name, safe_volume)]) # type: ignore
res_volumes.append(safe_volume)
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells})
submit_time = time.time()
@@ -1300,11 +1434,21 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
max_len = max(num_sources, num_targets)
for i in range(max_len):
# 辅助函数:安全地从列表中获取元素如果列表为空则返回None
def safe_get(lst, idx, default=None):
# 辅助函数:
# - wrap=True: 返回 [value](用于 liquid_height 等列表参数)
# - wrap=False: 返回 value用于 mix_* 标量参数)
def safe_get(value, idx, default=None, wrap: bool = True):
if value is None:
return default
try:
return [lst[idx]] if lst else default
except Exception as e:
if isinstance(value, (list, tuple)):
if len(value) == 0:
return default
item = value[idx % len(value)]
else:
item = value
return [item] if wrap else item
except Exception:
return default
# 动态构建参数字典,只传递实际提供的参数
@@ -1335,15 +1479,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if spread is not None:
kwargs['spread'] = spread
if mix_stage is not None:
kwargs['mix_stage'] = safe_get(mix_stage, i)
kwargs['mix_stage'] = safe_get(mix_stage, i, wrap=False)
if mix_times is not None:
kwargs['mix_times'] = safe_get(mix_times, i)
kwargs['mix_times'] = safe_get(mix_times, i, wrap=False)
if mix_vol is not None:
kwargs['mix_vol'] = safe_get(mix_vol, i)
kwargs['mix_vol'] = safe_get(mix_vol, i, wrap=False)
if mix_rate is not None:
kwargs['mix_rate'] = safe_get(mix_rate, i)
kwargs['mix_rate'] = safe_get(mix_rate, i, wrap=False)
if mix_liquid_height is not None:
kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i)
kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i, wrap=False)
if delays is not None:
kwargs['delays'] = safe_get(delays, i)
@@ -1384,7 +1528,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
tip = []
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
blow_out_air_volume_before_vol = 0.0
if blow_out_air_volume_before is not None and len(blow_out_air_volume_before) > 0:
blow_out_air_volume_before_vol = float(blow_out_air_volume_before[0] or 0.0)
blow_out_air_volume_vol = 0.0
if blow_out_air_volume is not None and len(blow_out_air_volume) > 0:
blow_out_air_volume_vol = float(blow_out_air_volume[0] or 0.0)
# PLR 的 blow_out_air_volume 是空气参数,不计入液体体积。
# before 空气通过单独预吸实现after 空气通过 blow_out_air_volume 参数实现。
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
@@ -1397,16 +1548,25 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
use_channels=use_channels,
)
if blow_out_air_volume_before is not None and len(blow_out_air_volume_before) > 0:
await self.aspirate(
resources=[sources[0]],
vols=[blow_out_air_volume_before[0]],
use_channels=use_channels,
flow_rates=None,
offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())],
liquid_height=None,
blow_out_air_volume=None,
)
if blow_out_air_volume_before_vol > 0:
source_tracker = getattr(sources[0], "tracker", None)
source_tracker_was_disabled = bool(getattr(source_tracker, "is_disabled", False))
try:
if source_tracker is not None and hasattr(source_tracker, "disable"):
source_tracker.disable()
await self.aspirate(
resources=[sources[0]],
vols=[blow_out_air_volume_before_vol],
use_channels=use_channels,
flow_rates=None,
offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())],
liquid_height=None,
blow_out_air_volume=None,
spread="custom",
)
finally:
if source_tracker is not None:
source_tracker.enable()
await self.aspirate(
resources=[sources[0]],
@@ -1416,7 +1576,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
[blow_out_air_volume_vol] if blow_out_air_volume_vol > 0 else None
),
spread=spread,
)
@@ -1429,7 +1589,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
flow_rates=[dis_flow_rates[0]] if dis_flow_rates and len(dis_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
[blow_out_air_volume_vol] if blow_out_air_volume_vol > 0 else None
),
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
spread=spread,

View File

@@ -854,9 +854,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
offsets: Optional[Coordinate] = None,
mix_rate: Optional[float] = None,
none_keys: List[str] = [],
use_channels: Optional[List[int]] = [0],
):
return await self._unilabos_backend.mix(
targets, mix_time, mix_vol, height_to_bottom, offsets, mix_rate, none_keys
targets, mix_time, mix_vol, height_to_bottom, offsets, mix_rate, none_keys, use_channels
)
def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]:
@@ -1285,9 +1286,15 @@ class PRCXI9300Backend(LiquidHandlerBackend):
offsets: Optional[Coordinate] = None,
mix_rate: Optional[float] = None,
none_keys: List[str] = [],
use_channels: Optional[List[int]] = [0],
):
"""Mix liquid in the specified resources."""
if use_channels == [0]:
axis = "Left"
elif use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(use_channels))
plate_indexes = []
for op in targets:
deck = op.parent.parent.parent

View File

@@ -706,15 +706,38 @@ class ResourceTreeSet(object):
if requested_uuids:
# 按请求的 UUID 顺序返回对应资源(从整棵树中按 uuid 提取)
# 优先使用 tracker.uuid_to_resources若映射缺失再递归遍历 PLR 树兜底搜索。
def _find_plr_by_uuid(roots: List["PLRResource"], uid: str) -> Optional["PLRResource"]:
stack = list(roots)
while stack:
node = stack.pop()
node_uid = getattr(node, "unilabos_uuid", None)
if node_uid == uid:
return node
children = getattr(node, "children", None) or []
stack.extend(children)
return None
result = []
missing_uuids = []
for uid in requested_uuids:
if uid in tracker.uuid_to_resources:
result.append(tracker.uuid_to_resources[uid])
found = tracker.uuid_to_resources.get(uid)
if found is None:
found = _find_plr_by_uuid(plr_resources, uid)
if found is not None:
# 回填缓存,后续相同 uuid 可直接命中
tracker.uuid_to_resources[uid] = found
if found is None:
missing_uuids.append(uid)
else:
raise ValueError(
f"请求的 UUID {uid} 在资源树中未找到。"
f"可用 UUID 数量: {len(tracker.uuid_to_resources)}"
)
result.append(found)
if missing_uuids:
raise ValueError(
f"请求的 UUID 未在资源树中找到: {missing_uuids}"
f"可用 UUID 数量: {len(tracker.uuid_to_resources)}"
f"资源树数量: {len(self.trees)}"
)
return result
return plr_resources

View File

@@ -51,6 +51,7 @@
--------------------------------------------------------------------------------
- 遍历 workflow 数组,为每个动作创建步骤节点
- 参数重命名: asp_vol -> asp_vols, dis_vol -> dis_vols, asp_flow_rate -> asp_flow_rates, dis_flow_rate -> dis_flow_rates
- 参数输入转换: liquid_height按 wells 扩展mix_stage/mix_times/mix_vol/mix_rate/mix_liquid_height 保持标量
- 参数扩展: 根据 targets 的 wells 数量,将单值扩展为数组
例: asp_vol=100.0, targets 有 3 个 wells -> asp_vols=[100.0, 100.0, 100.0]
- 连接处理: 如果 sources/targets 已通过 set_liquid_from_plate 连接,参数值改为 []
@@ -543,8 +544,17 @@ def build_protocol_graph(
"compound": "compound",
}
# 需要根据 wells 数量扩展的参数列表(复数形式)
EXPAND_BY_WELLS_PARAMS = ["asp_vols", "dis_vols", "asp_flow_rates", "dis_flow_rates"]
# 需要根据 wells 数量扩展的参数列表
# - 复数参数asp_vols 等)支持单值自动扩展
# - liquid_height 按 wells 扩展为数组
# - mix_* 参数保持标量,避免被转换为 list
EXPAND_BY_WELLS_PARAMS = [
"asp_vols",
"dis_vols",
"asp_flow_rates",
"dis_flow_rates",
"liquid_height",
]
# 处理协议步骤
for step in protocol_steps:
@@ -558,6 +568,57 @@ def build_protocol_graph(
if old_name in params:
params[new_name] = params.pop(old_name)
# touch_tip 输入归一化:
# - 支持 bool / 0/1 / "true"/"false" / 单元素 list
# - 最终统一为 bool 标量,避免被下游误当作序列处理
if "touch_tip" in params:
touch_tip_value = params.get("touch_tip")
if isinstance(touch_tip_value, list):
if len(touch_tip_value) == 1:
touch_tip_value = touch_tip_value[0]
elif len(touch_tip_value) == 0:
touch_tip_value = False
else:
warnings.append(f"touch_tip 期望标量,但收到长度为 {len(touch_tip_value)} 的列表,使用首个值")
touch_tip_value = touch_tip_value[0]
if isinstance(touch_tip_value, str):
norm = touch_tip_value.strip().lower()
if norm in {"true", "1", "yes", "y", "on"}:
touch_tip_value = True
elif norm in {"false", "0", "no", "n", "off", ""}:
touch_tip_value = False
else:
warnings.append(f"touch_tip 字符串值无法识别: {touch_tip_value},按 True 处理")
touch_tip_value = True
elif isinstance(touch_tip_value, (int, float)):
touch_tip_value = bool(touch_tip_value)
elif touch_tip_value is None:
touch_tip_value = False
else:
touch_tip_value = bool(touch_tip_value)
params["touch_tip"] = touch_tip_value
# delays 输入归一化:
# - 支持标量int/float/字符串数字)与 list
# - 最终统一为数字列表,供下游按 delays[0]/delays[1] 使用
if "delays" in params:
delays_value = params.get("delays")
if delays_value is None or delays_value == "":
params["delays"] = []
else:
raw_list = delays_value if isinstance(delays_value, list) else [delays_value]
normalized_delays = []
for delay_item in raw_list:
if isinstance(delay_item, str):
delay_item = delay_item.strip()
if delay_item == "":
continue
try:
normalized_delays.append(float(delay_item))
except (TypeError, ValueError):
warnings.append(f"delays 包含无法转换为数字的值: {delay_item},已忽略")
params["delays"] = normalized_delays
# 处理输入连接
for param_key, target_port in INPUT_PORT_MAPPING.items():
resource_name = params.get(param_key)