修改真机运动方式,

This commit is contained in:
q434343
2026-03-31 14:33:50 +08:00
parent afddc6e40c
commit 6b3f9756a0
3 changed files with 285 additions and 99 deletions

View File

@@ -215,17 +215,38 @@ class LiquidHandlerMiddleware(LiquidHandler):
if spread == "": if spread == "":
spread = "custom" spread = "custom"
for res in resources: for i, res in enumerate(resources):
tracker = getattr(res, "tracker", None) tracker = getattr(res, "tracker", None)
if tracker is None or getattr(tracker, "is_disabled", False): if tracker is None or getattr(tracker, "is_disabled", False):
continue continue
history = getattr(tracker, "liquid_history", None) need = float(vols[i]) if i < len(vols) else 0.0
if tracker.get_used_volume() <= 0 and isinstance(history, list) and len(history) == 0: if blow_out_air_volume and i < len(blow_out_air_volume) and blow_out_air_volume[i] is not None:
fill_vol = tracker.max_volume if tracker.max_volume > 0 else 50000 need += float(blow_out_air_volume[i] or 0.0)
if need <= 0:
continue
try:
used = float(tracker.get_used_volume())
except Exception:
used = 0.0
if used >= need:
continue
mv = float(getattr(tracker, "max_volume", 0) or 0)
if used <= 0:
# 与旧逻辑一致:空孔优先加满(或极大默认),避免仅有 history 记录但 used=0 时不补液
fill_vol = mv if mv > 0 else max(need, 50000.0)
else:
fill_vol = need - used
if mv > 0:
fill_vol = min(fill_vol, max(0.0, mv - used))
try:
tracker.add_liquid(fill_vol)
except Exception:
try: try:
tracker.add_liquid(fill_vol) tracker.add_liquid(max(need - used, 1.0))
except Exception: except Exception:
tracker.liquid_history.append(("auto_init", fill_vol)) history = getattr(tracker, "liquid_history", None)
if isinstance(history, list):
history.append(("auto_init", max(fill_vol, need, 1.0)))
if self._simulator: if self._simulator:
try: try:
@@ -277,6 +298,37 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread, spread,
**backend_kwargs, **backend_kwargs,
) )
except (TooLittleLiquidError, TooLittleVolumeError) as e:
tracker_info = []
for r in resources:
t = getattr(r, "tracker", None)
if t is None:
tracker_info.append(f"{r.name}(no_tracker)")
else:
try:
tracker_info.append(
f"{r.name}(used={t.get_used_volume():.1f}, "
f"free={t.get_free_volume():.1f}, max={getattr(r, 'max_volume', '?')})"
)
except Exception:
tracker_info.append(f"{r.name}(tracker_err)")
if hasattr(self, "_ros_node") and self._ros_node is not None:
self._ros_node.lab_logger().warning(
f"[aspirate] hardware tracker shortfall, retry without volume tracking. "
f"error={e}, vols={vols}, trackers={tracker_info}"
)
with no_volume_tracking():
await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e: except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom": if "Resource is too small to space channels" in str(e) and spread != "custom":
await super().aspirate( await super().aspirate(
@@ -1620,25 +1672,25 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
use_channels=use_channels, use_channels=use_channels,
) )
if blow_out_air_volume_before_vol > 0: # if blow_out_air_volume_before_vol > 0:
source_tracker = getattr(sources[0], "tracker", None) # source_tracker = getattr(sources[0], "tracker", None)
source_tracker_was_disabled = bool(getattr(source_tracker, "is_disabled", False)) # source_tracker_was_disabled = bool(getattr(source_tracker, "is_disabled", False))
try: # try:
if source_tracker is not None and hasattr(source_tracker, "disable"): # if source_tracker is not None and hasattr(source_tracker, "disable"):
source_tracker.disable() # source_tracker.disable()
await self.aspirate( # await self.aspirate(
resources=[sources[0]], # resources=[sources[0]],
vols=[blow_out_air_volume_before_vol], # vols=[blow_out_air_volume_before_vol],
use_channels=use_channels, # use_channels=use_channels,
flow_rates=None, # flow_rates=None,
offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())], # offsets=[Coordinate(x=0, y=0, z=sources[0].get_size_z())],
liquid_height=None, # liquid_height=None,
blow_out_air_volume=None, # blow_out_air_volume=None,
spread="custom", # spread="custom",
) # )
finally: # finally:
if source_tracker is not None: # if source_tracker is not None:
source_tracker.enable() # source_tracker.enable()
await self.aspirate( await self.aspirate(
resources=[sources[0]], resources=[sources[0]],

View File

@@ -86,6 +86,23 @@ class MatrixInfo(TypedDict):
WorkTablets: list[WorkTablets] WorkTablets: list[WorkTablets]
def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
extra = getattr(resource, "unilabos_extra", {}) or {}
site = extra.get("update_resource_site", "")
if site:
digits = "".join(c for c in str(site) if c.isdigit())
return int(digits) if digits else None
loc = getattr(resource, "location", None)
if loc is not None and loc.x is not None and loc.y is not None:
col = round((loc.x - 5) / 137.5)
row = round(3 - (loc.y - 13) / 96)
idx = row * 4 + col
if 0 <= idx < 16:
return idx + 1
return None
class PRCXI9300Deck(Deck): class PRCXI9300Deck(Deck):
"""PRCXI 9300 的专用 Deck 类,继承自 Deck。 """PRCXI 9300 的专用 Deck 类,继承自 Deck。
@@ -837,22 +854,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
@staticmethod @staticmethod
def _get_slot_number(resource) -> Optional[int]: def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。""" """从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
extra = getattr(resource, "unilabos_extra", {}) or {} return _get_slot_number(resource)
site = extra.get("update_resource_site", "")
if site:
digits = "".join(c for c in str(site) if c.isdigit())
return int(digits) if digits else None
# 使用 resource.location.x 和 resource.location.y 反算槽位号
# 参考 _DEFAULT_SITE_POSITIONS: x = (i%4)*137.5+5, y = (int(i/4))*96+13
loc = getattr(resource, "location", None)
if loc is not None and loc.x is not None and loc.y is not None:
col = round((loc.x - 5) / 137.5) # 0-3
row = round(3-(loc.y - 13) / 96) # 0-3
idx = row * 4 + col # 0-15
if 0 <= idx < 16:
return idx + 1 # 槽位号从 1 开始
return None
def _match_and_create_matrix(self): def _match_and_create_matrix(self):
"""首次 transfer_liquid 时,根据 deck 上的 resource 自动匹配耗材并创建 WorkTabletMatrix。""" """首次 transfer_liquid 时,根据 deck 上的 resource 自动匹配耗材并创建 WorkTabletMatrix。"""
@@ -972,7 +974,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
if child.children: if child.children:
pip_pos = self.plr_pos_to_prcxi(child.children[0], self.left_2_claw) pip_pos = self.plr_pos_to_prcxi(child.children[0], self.left_2_claw)
else: else:
pip_pos = self.plr_pos_to_prcxi(child, Coordinate(50, self.left_2_claw.y, self.left_2_claw.z)) pip_pos = self.plr_pos_to_prcxi(child, Coordinate(-100, self.left_2_claw.y, self.left_2_claw.z))
half_x = child.get_size_x() / 2 * abs(1 + self.x_increase) half_x = child.get_size_x() / 2 * abs(1 + self.x_increase)
z_wall = child.get_size_z() z_wall = child.get_size_z()
@@ -1006,7 +1008,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
raise PRCXIError(f"Failed to create auto-matched matrix: {res.get('Message', 'Unknown error')}") raise PRCXIError(f"Failed to create auto-matched matrix: {res.get('Message', 'Unknown error')}")
def plr_pos_to_prcxi(self, resource: Resource, offset: Coordinate = Coordinate(0, 0, 0)): def plr_pos_to_prcxi(self, resource: Resource, offset: Coordinate = Coordinate(0, 0, 0)):
resource_pos = resource.get_absolute_location(x="c",y="c",z="t") z_pos = 'c'
if isinstance(resource, Tip):
z_pos = 'b'
resource_pos = resource.get_absolute_location(x="c",y="c",z=z_pos)
x = resource_pos.x x = resource_pos.x
y = resource_pos.y y = resource_pos.y
z = resource_pos.z z = resource_pos.z
@@ -1437,6 +1442,24 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.deck_z = deck_z self.deck_z = deck_z
self.tip_length = 0 self.tip_length = 0
@staticmethod
def _deck_plate_slot_no(plate, deck) -> int:
"""台面板位槽号116与 PRCXI9300Handler._get_slot_number 一致;无法解析时退回 deck 子项顺序 +1。"""
sn = PRCXI9300Handler._get_slot_number(plate)
if sn is not None:
return sn
return deck.children.index(plate) + 1
@staticmethod
def _resource_num_items_y(resource) -> int:
"""板/TipRack 等在 Y 向孔位数;无 ``num_items_y`` 或非正数时返回 1。"""
ny = getattr(resource, "num_items_y", None)
try:
n = int(ny) if ny is not None else 1
except (TypeError, ValueError):
n = 1
return n if n >= 1 else 1
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool): async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
step = self.api_client.shaker_action( step = self.api_client.shaker_action(
time=time, time=time,
@@ -1610,33 +1633,33 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right" axis = "Right"
else: else:
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = [] plate_slots = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent
plate_index = deck.children.index(plate) plate_slots.append(self._deck_plate_slot_no(plate, deck))
# print(f"Plate index: {plate_index}, Plate name: {plate.name}")
# print(f"Number of children in deck: {len(deck.children)}")
plate_indexes.append(plate_index) if len(set(plate_slots)) != 1:
raise ValueError("All pickups must be from the same plate (slot). Found different slots: " + str(plate_slots))
if len(set(plate_indexes)) != 1:
raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes))
_rack = ops[0].resource.parent
ny = self._resource_num_items_y(_rack)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All pickups must use tip racks with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8) tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns) "All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
) )
PlateNo = plate_indexes[0] + 1 PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1 hole_row = tipspot_index % ny + 1
step = self.api_client.Load( step = self.api_client.Load(
axis=axis, axis=axis,
@@ -1647,8 +1670,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}", plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers=f"{(hole_col - 1) * 8 + hole_row}" if self._num_channels != 8 else "1,2,3,4,5", hole_numbers=f"{(hole_col - 1) * ny + hole_row}" if self._num_channels != 8 else "1,2,3,4,5",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1666,8 +1689,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
# 检查trash # # 检查trash #
if ops[0].resource.name == "trash": if ops[0].resource.name == "trash":
_plate = ops[0].resource
PlateNo = ops[0].resource.parent.parent.children.index(ops[0].resource.parent) + 1 _deck = _plate.parent
PlateNo = self._deck_plate_slot_no(_plate, _deck)
step = self.api_client.UnLoad( step = self.api_client.UnLoad(
axis=axis, axis=axis,
@@ -1685,32 +1709,35 @@ class PRCXI9300Backend(LiquidHandlerBackend):
return return
# print(ops[0].resource.parent.children.index(ops[0].resource)) # print(ops[0].resource.parent.children.index(ops[0].resource))
plate_indexes = [] plate_slots = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent
plate_index = deck.children.index(plate) plate_slots.append(self._deck_plate_slot_no(plate, deck))
plate_indexes.append(plate_index) if len(set(plate_slots)) != 1:
if len(set(plate_indexes)) != 1:
raise ValueError( raise ValueError(
"All drop_tips must be from the same plate. Found different plates: " + str(plate_indexes) "All drop_tips must be from the same plate (slot). Found different slots: " + str(plate_slots)
) )
_rack = ops[0].resource.parent
ny = self._resource_num_items_y(_rack)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All drop_tips must use tip racks with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8) tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All drop_tips must be from the same tip column. Found different columns: " + str(tip_columns) "All drop_tips must be from the same tip column. Found different columns: " + str(tip_columns)
) )
PlateNo = plate_indexes[0] + 1 PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels != 8: if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1 hole_row = tipspot_index % ny + 1
step = self.api_client.UnLoad( step = self.api_client.UnLoad(
axis=axis, axis=axis,
@@ -1721,7 +1748,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}", plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1744,31 +1771,34 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right" axis = "Right"
else: else:
raise ValueError("Invalid use channels: " + str(use_channels)) raise ValueError("Invalid use channels: " + str(use_channels))
plate_indexes = [] plate_slots = []
for op in targets: for op in targets:
deck = op.parent.parent.parent deck = op.parent.parent.parent
plate = op.parent plate = op.parent
plate_index = deck.children.index(plate) plate_slots.append(self._deck_plate_slot_no(plate, deck))
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1: if len(set(plate_slots)) != 1:
raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes)) raise ValueError("All mix targets must be from the same plate (slot). Found different slots: " + str(plate_slots))
_plate0 = targets[0].parent
ny = self._resource_num_items_y(_plate0)
tip_columns = [] tip_columns = []
for op in targets: for op in targets:
if self._resource_num_items_y(op.parent) != ny:
raise ValueError("All mix targets must be on plates with the same num_items_y")
tipspot_index = op.parent.children.index(op) tipspot_index = op.parent.children.index(op)
tip_columns.append(tipspot_index // 8) tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns) "All mix targets must be in the same column group. Found different columns: " + str(tip_columns)
) )
PlateNo = plate_indexes[0] + 1 PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1 hole_row = tipspot_index % ny + 1
assert mix_time > 0 assert mix_time > 0
step = self.api_client.Blending( step = self.api_client.Blending(
@@ -1779,7 +1809,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=mix_time, blending_times=mix_time,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}", plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1796,36 +1826,39 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right" axis = "Right"
else: else:
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = [] plate_slots = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent
plate_index = deck.children.index(plate) plate_slots.append(self._deck_plate_slot_no(plate, deck))
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1: if len(set(plate_slots)) != 1:
raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes)) raise ValueError("All aspirate must be from the same plate (slot). Found different slots: " + str(plate_slots))
_plate0 = ops[0].resource.parent
ny = self._resource_num_items_y(_plate0)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All aspirate wells must be on plates with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8) tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns) "All aspirate must be from the same tip column. Found different columns: " + str(tip_columns)
) )
volumes = [op.volume for op in ops] volumes = [op.volume for op in ops]
if len(set(volumes)) != 1: if len(set(volumes)) != 1:
raise ValueError("All aspirate volumes must be the same. Found different volumes: " + str(volumes)) raise ValueError("All aspirate volumes must be the same. Found different volumes: " + str(volumes))
PlateNo = plate_indexes[0] + 1 PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1 hole_row = tipspot_index % ny + 1
step = self.api_client.Imbibing( step = self.api_client.Imbibing(
axis=axis, axis=axis,
@@ -1836,7 +1869,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}", plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1853,21 +1886,24 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right" axis = "Right"
else: else:
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
plate_indexes = [] plate_slots = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent
plate_index = deck.children.index(plate) plate_slots.append(self._deck_plate_slot_no(plate, deck))
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1: if len(set(plate_slots)) != 1:
raise ValueError("All dispense must be from the same plate. Found different plates: " + str(plate_indexes)) raise ValueError("All dispense must be from the same plate (slot). Found different slots: " + str(plate_slots))
_plate0 = ops[0].resource.parent
ny = self._resource_num_items_y(_plate0)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All dispense wells must be on plates with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8) tip_columns.append(tipspot_index // ny)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
@@ -1878,12 +1914,12 @@ class PRCXI9300Backend(LiquidHandlerBackend):
if len(set(volumes)) != 1: if len(set(volumes)) != 1:
raise ValueError("All dispense volumes must be the same. Found different volumes: " + str(volumes)) raise ValueError("All dispense volumes must be the same. Found different volumes: " + str(volumes))
PlateNo = plate_indexes[0] + 1 PlateNo = plate_slots[0]
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self.num_channels != 8:
hole_row = tipspot_index % 8 + 1 hole_row = tipspot_index % ny + 1
step = self.api_client.Tapping( step = self.api_client.Tapping(
axis=axis, axis=axis,
@@ -1894,7 +1930,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}", plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)

View File

@@ -1,4 +1,4 @@
from typing import Optional from typing import Any, Callable, Dict, List, Optional, Tuple
from pylabrobot.resources import Tube, Coordinate from pylabrobot.resources import Tube, Coordinate
from pylabrobot.resources.well import Well, WellBottomType, CrossSectionType from pylabrobot.resources.well import Well, WellBottomType, CrossSectionType
from pylabrobot.resources.tip import Tip, TipCreator from pylabrobot.resources.tip import Tip, TipCreator
@@ -838,4 +838,102 @@ def PRCXI_30mm_Adapter(name: str) -> PRCXI9300PlateAdapter:
"Name": "30mm适配器", "Name": "30mm适配器",
"SupplyType": 2 "SupplyType": 2
} }
) )
# ---------------------------------------------------------------------------
# 协议上传 / workflow 用:与设备端耗材字典字段对齐的模板描述(供 common 自动匹配)
# ---------------------------------------------------------------------------
_PRCXI_TEMPLATE_SPECS_CACHE: Optional[List[Dict[str, Any]]] = None
def _probe_prcxi_resource(factory: Callable[..., Any]) -> Any:
probe = "__unilab_template_probe__"
if factory.__name__ == "PRCXI_trash":
return factory()
return factory(probe)
def _first_child_capacity_for_match(resource: Any) -> float:
"""Well max_volume 或 Tip 的 maximal_volume用于与设备端 Volume 类似的打分。"""
ch = getattr(resource, "children", None) or []
if not ch:
return 0.0
c0 = ch[0]
mv = getattr(c0, "max_volume", None)
if mv is not None:
return float(mv)
tip = getattr(c0, "tip", None)
if tip is not None:
mv2 = getattr(tip, "maximal_volume", None)
if mv2 is not None:
return float(mv2)
return 0.0
# (factory, kind) — 不含各类 Adapter避免与真实板子误匹配
PRCXI_TEMPLATE_FACTORY_KINDS: List[Tuple[Callable[..., Any], str]] = [
(PRCXI_BioER_96_wellplate, "plate"),
(PRCXI_nest_1_troughplate, "plate"),
(PRCXI_BioRad_384_wellplate, "plate"),
(PRCXI_AGenBio_4_troughplate, "plate"),
(PRCXI_nest_12_troughplate, "plate"),
(PRCXI_CellTreat_96_wellplate, "plate"),
(PRCXI_10ul_eTips, "tip_rack"),
(PRCXI_300ul_Tips, "tip_rack"),
(PRCXI_PCR_Plate_200uL_nonskirted, "plate"),
(PRCXI_PCR_Plate_200uL_semiskirted, "plate"),
(PRCXI_PCR_Plate_200uL_skirted, "plate"),
(PRCXI_trash, "trash"),
(PRCXI_96_DeepWell, "plate"),
(PRCXI_EP_Adapter, "tube_rack"),
(PRCXI_1250uL_Tips, "tip_rack"),
(PRCXI_10uL_Tips, "tip_rack"),
(PRCXI_1000uL_Tips, "tip_rack"),
(PRCXI_200uL_Tips, "tip_rack"),
(PRCXI_48_DeepWell, "plate"),
]
def get_prcxi_labware_template_specs() -> List[Dict[str, Any]]:
"""返回与 ``prcxi._match_and_create_matrix`` 中耗材字段兼容的模板列表,用于按孔数+容量打分。"""
global _PRCXI_TEMPLATE_SPECS_CACHE
if _PRCXI_TEMPLATE_SPECS_CACHE is not None:
return _PRCXI_TEMPLATE_SPECS_CACHE
out: List[Dict[str, Any]] = []
for factory, kind in PRCXI_TEMPLATE_FACTORY_KINDS:
try:
r = _probe_prcxi_resource(factory)
except Exception:
continue
nx = int(getattr(r, "num_items_x", None) or 0)
ny = int(getattr(r, "num_items_y", None) or 0)
nchild = len(getattr(r, "children", []) or [])
hole_count = nx * ny if nx > 0 and ny > 0 else nchild
hole_row = ny if nx > 0 and ny > 0 else 0
hole_col = nx if nx > 0 and ny > 0 else 0
mi = getattr(r, "material_info", None) or {}
vol = _first_child_capacity_for_match(r)
menum = mi.get("materialEnum")
if menum is None and kind == "tip_rack":
menum = 1
elif menum is None and kind == "trash":
menum = 6
out.append(
{
"class_name": factory.__name__,
"kind": kind,
"materialEnum": menum,
"HoleRow": hole_row,
"HoleColum": hole_col,
"Volume": vol,
"hole_count": hole_count,
"material_uuid": mi.get("uuid"),
"material_code": mi.get("Code"),
}
)
_PRCXI_TEMPLATE_SPECS_CACHE = out
return out