演示时修改的部分代码

This commit is contained in:
q434343
2026-05-14 17:49:48 +08:00
parent 3aed75bc8b
commit 6288e37464
6 changed files with 361 additions and 24 deletions

View File

@@ -61,6 +61,7 @@ class TransferLiquidReturn(TypedDict):
class LiquidHandlerMiddleware(LiquidHandler):
_ros_node: ROS2DeviceNode
def __init__(
self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8, **kwargs
):
@@ -79,6 +80,11 @@ class LiquidHandlerMiddleware(LiquidHandler):
self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False)
super().__init__(backend, deck)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
if getattr(self, "_simulator", False) and getattr(self, "_simulate_handler", None) is not None:
self._simulate_handler._ros_node = ros_node
async def setup(self, **backend_kwargs):
if self._simulator:
return await self._simulate_handler.setup(**backend_kwargs)
@@ -152,6 +158,14 @@ class LiquidHandlerMiddleware(LiquidHandler):
if self._simulator:
return await self._simulate_handler.pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
if hasattr(self, "_ros_node") and self._ros_node is not None:
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": tip_spots})
submit_time = time.time()
while not task.done():
if time.time() - submit_time > 10:
self._ros_node.lab_logger().info(f"pick_up_tips {tip_spots} 超时")
break
time.sleep(0.01)
return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
async def drop_tips(
@@ -360,6 +374,16 @@ class LiquidHandlerMiddleware(LiquidHandler):
EXTRA_SAMPLE_UUID: sample_uuid_value,
"volume": volume,
}
if hasattr(self, "_ros_node") and self._ros_node is not None:
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": resources})
submit_time = time.time()
while not task.done():
if time.time() - submit_time > 10:
self._ros_node.lab_logger().info(f"aspirate {resources} 超时")
break
time.sleep(0.01)
return SimpleReturn(samples=res_samples, volumes=res_volumes)
async def dispense(
@@ -495,6 +519,15 @@ class LiquidHandlerMiddleware(LiquidHandler):
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid})
res_volumes.append(volume)
if hasattr(self, "_ros_node") and self._ros_node is not None:
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": resources})
submit_time = time.time()
while not task.done():
if time.time() - submit_time > 10:
self._ros_node.lab_logger().info(f"dispense {resources} 超时")
break
time.sleep(0.01)
return SimpleReturn(samples=res_samples, volumes=res_volumes)
async def transfer(
@@ -880,7 +913,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
super().__init__(backend_type, deck, simulator, channel_num, total_height=total_height, **kwargs)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
super().post_init(ros_node)
ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{
"resources": [self.deck]
})
@@ -1036,13 +1069,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
well.set_liquids([(liquid_name, safe_volume)]) # type: ignore
res_volumes.append(safe_volume)
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells})
submit_time = time.time()
while not task.done():
if time.time() - submit_time > 10:
self._ros_node.lab_logger().info(f"set_liquid_from_plate {plate} 超时")
break
time.sleep(0.01)
if hasattr(self, "_ros_node") and self._ros_node is not None:
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells})
submit_time = time.time()
while not task.done():
if time.time() - submit_time > 10:
self._ros_node.lab_logger().info(f"set_liquid_from_plate {plate} 超时")
break
time.sleep(0.01)
return SetLiquidFromPlateReturn(
plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore
@@ -1449,6 +1483,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None,
delays: Optional[List[int]] = None,
pre_aspirate_from_target: Optional[float] = None,
none_keys: List[str] = [],
) -> TransferLiquidReturn:
"""Transfer liquid with automatic mode detection.
@@ -1600,6 +1635,8 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i, wrap=False)
if delays is not None:
kwargs['delays'] = safe_get(delays, i)
if pre_aspirate_from_target is not None:
kwargs['pre_aspirate_from_target'] = safe_get(pre_aspirate_from_target, i)
cur_source = sources[i % num_sources]
cur_target = targets[i % num_targets]
@@ -1659,6 +1696,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
mix_rate = kwargs.get('mix_rate')
mix_liquid_height = kwargs.get('mix_liquid_height')
delays = kwargs.get('delays')
pre_aspirate_from_target = kwargs.get('pre_aspirate_from_target')
tip = []
if pick_up:

View File

@@ -149,6 +149,40 @@ class PRCXI9300Deck(Deck):
pos = self.sites[idx]["position"]
return Coordinate(pos["x"], pos["y"], pos["z"])
def get_slot_location(self, slot: Union[int, str]) -> Coordinate:
"""根据 slot 标识返回该 slot 的坐标。
支持的输入:
- int: 1-based slot 序号(与 ``assign_child_at_slot`` 一致1 → sites[0]
- str: 纯数字字符串 ``"3"``,或带前缀的 label ``"T3"``(不区分大小写)
Raises:
ValueError: slot 解析失败或越界
"""
idx: Optional[int] = None
if isinstance(slot, int):
idx = slot - 1
elif isinstance(slot, str):
s = slot.strip()
if not s:
raise ValueError(f"空 slot 标识")
digits = s[1:] if s[0].isalpha() else s
try:
idx = int(digits) - 1
except ValueError:
# 退而求其次:直接按 label 全等匹配
for i, site in enumerate(self.sites):
if site.get("label") == s:
idx = i
break
if idx is None:
raise ValueError(f"无法解析 slot 标识: {slot!r}")
if idx < 0 or idx >= len(self.sites):
raise ValueError(
f"slot {slot!r} 超出范围 [1, {len(self.sites)}] (解析为 idx={idx})"
)
return self._get_site_location(idx)
def _get_site_resource(self, idx: int) -> Optional[Resource]:
site_loc = self._get_site_location(idx)
for child in self.children:
@@ -444,7 +478,7 @@ class PRCXI9300Trash(Trash):
if name != "trash":
print(f"Warning: PRCXI9300Trash usually expects name='trash' for backend logic, but got '{name}'.")
super().__init__(name, size_x, size_y, size_z, **kwargs)
super().__init__(name, size_x, size_y, size_z, category=category, **kwargs)
self._unilabos_state = {}
# 初始化时注入 UUID
if material_info:
@@ -533,12 +567,16 @@ class PRCXI9300TubeRack(TubeRack):
# 根据情况传递不同的参数
if items_to_pass is not None:
super().__init__(name, size_x, size_y, size_z, ordered_items=items_to_pass, model=model, **kwargs)
super().__init__(
name, size_x, size_y, size_z, ordered_items=items_to_pass, category=category, model=model, **kwargs
)
elif ordering_param is not None:
# 传递 ordering 参数,让 TubeRack 自己创建 Tube 对象
super().__init__(name, size_x, size_y, size_z, ordering=ordering_param, model=model, **kwargs)
super().__init__(
name, size_x, size_y, size_z, ordering=ordering_param, category=category, model=model, **kwargs
)
else:
super().__init__(name, size_x, size_y, size_z, model=model, **kwargs)
super().__init__(name, size_x, size_y, size_z, category=category, model=model, **kwargs)
self._unilabos_state = {}
if material_info:
@@ -716,6 +754,7 @@ class PRCXI9300PlateAdapter(PlateAdapter):
adapter_hole_size_x=adapter_hole_size_x,
adapter_hole_size_y=adapter_hole_size_y,
adapter_hole_size_z=adapter_hole_size_z,
category=category,
model=model,
**kwargs,
)
@@ -803,6 +842,8 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
self.xy_coupling = xy_coupling
self._slot_prcxi_positions: Dict[int, Tuple[float, float]] = {}
self.calibration_labware_type = calibration_labware_type
self.max_z_pipetting = 185
self.max_z_claw = 170
if calibration_points is not None:
self.calibrate_from_points(calibration_points, labware_type=self.calibration_labware_type)
@@ -839,12 +880,65 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
self._first_transfer_done = False
# backend 在做槽位反查时若拿不到 deck需要回退到 handler.deck这里建立反向引用
self._unilabos_backend._handler = self
@staticmethod
def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
return _get_slot_number(resource)
def _top_level_consumable(self, resource):
"""从任意 PLR 资源沿 parent 向上找"放在 deck 上的那一层耗材""""
if resource is None:
return None
cur = resource
while cur is not None:
parent = getattr(cur, "parent", None)
if isinstance(parent, PRCXI9300Deck):
return cur
if parent is None:
# 已到顶;若 cur 本身就是 deck没有"耗材"层
if isinstance(cur, PRCXI9300Deck):
return None
return cur
cur = parent
return None
def _attach_resources_to_deck_if_needed(self, items: Sequence[Resource]) -> None:
"""把通过 _resolve_to_plr_resources 拿回的"游离"耗材自动挂到 self.deck。
- 已经在 PRCXI9300Deck 上(含 name 同名)的跳过;
- 优先按 ``unilabos_extra.update_resource_site`` 的 Tn 解析槽位;
- 否则交给 ``Deck.assign_child_resource`` 找空槽。
- 任意失败仅打印告警不中断主流程backend 仍可走名字兜底)。
"""
deck = getattr(self, "deck", None)
if not isinstance(deck, PRCXI9300Deck):
return
existing_names = {getattr(c, "name", None) for c in deck.children}
for item in items:
top = self._top_level_consumable(item)
if top is None or not isinstance(top, Resource):
continue
if isinstance(getattr(top, "parent", None), PRCXI9300Deck):
continue
top_name = getattr(top, "name", None)
if top_name in existing_names:
continue
spot_idx: Optional[int] = None
extra = getattr(top, "unilabos_extra", {}) or {}
site = str(extra.get("update_resource_site", ""))
if site:
digits = "".join(c for c in site if c.isdigit())
if digits:
spot_idx = int(digits) - 1
try:
deck.assign_child_resource(top, spot=spot_idx, reassign=False)
existing_names.add(top_name)
except Exception as e:
print(f"[PRCXI] 自动挂载到 deck 失败: name={top_name}, site={site or '?'}, err={e}")
def _match_and_create_matrix(self):
"""首次 transfer_liquid 时,根据 deck 上的 resource 自动匹配耗材并创建 WorkTabletMatrix。"""
backend = self._unilabos_backend
@@ -962,7 +1056,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
slot_pos = self._slot_prcxi_positions[number]
pos.x = slot_pos[0] - child.get_size_x() / 2 + self.left_2_claw.x
pos.y = slot_pos[1] - child.get_size_y() / 2 + self.left_2_claw.y
claw_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": pos.z})
claw_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": max(min(pos.z, self.max_z_claw),0)})
if child.children:
pip_pos = self.plr_pos_to_prcxi(child.children[0])
@@ -978,13 +1072,13 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
"Number": number,
"XPos": pip_pos.x,
"YPos": pip_pos.y,
"ZPos": pip_pos.z,
"ZPos": max(min(pip_pos.z, self.max_z_pipetting),0),
"X_Left": half_x,
"X_Right": half_x,
"ZAgainstTheWall": pip_pos.z - z_wall,
"X2Pos": pip_pos.x + self.right_2_left.x,
"Y2Pos": pip_pos.y + self.right_2_left.y,
"Z2Pos": pip_pos.z + self.right_2_left.z,
"Z2Pos": max(min(pip_pos.z + self.right_2_left.z, self.max_z_pipetting),0),
"X2_Left": half_x,
"X2_Right": half_x,
"ZAgainstTheWall2": pip_pos.z - z_wall,
@@ -1241,6 +1335,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None,
delays: Optional[List[int]] = None,
pre_aspirate_from_target: Optional[float] = None,
none_keys: List[str] = [],
) -> TransferLiquidReturn:
if not self._first_transfer_done:
@@ -1254,6 +1349,8 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
sources = await self._resolve_to_plr_resources(sources)
targets = await self._resolve_to_plr_resources(targets)
tip_racks = list(await self._resolve_to_plr_resources(tip_racks))
# 远端解析回来的 PLR 实例可能未挂到 self.deck主动绑定一次避免 backend 取 plate.parent==None
self._attach_resources_to_deck_if_needed(list(sources) + list(targets) + list(tip_racks))
if isinstance(tip_racks[0], TipRack):
tip_rack = tip_racks[0]
else:
@@ -1320,6 +1417,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
delays=delays,
pre_aspirate_from_target=pre_aspirate_from_target,
none_keys=none_keys,
)
if self.step_mode:
@@ -1475,6 +1573,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
_num_channels = 8 # 默认通道数为 8
_is_reset_ok = False
_ros_node: BaseROS2DeviceNode
_handler: Optional["PRCXI9300Handler"] = None # 由 PRCXI9300Handler.__init__ 注入
@property
def is_reset_ok(self) -> bool:
@@ -1508,13 +1607,52 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.debug = debug
self.axis = "Left"
@staticmethod
def _deck_plate_slot_no(plate, deck) -> int:
"""台面板位槽号116与 PRCXI9300Handler._get_slot_number 一致;无法解析时退回 deck 子项顺序 +1。"""
def _resolve_deck(self, plate, deck=None) -> Optional["PRCXI9300Deck"]:
"""定位 plate 所属的 PRCXI9300Deck按 deck 入参 → plate 的祖先链 → handler.deck 顺序回退。"""
if isinstance(deck, PRCXI9300Deck):
return deck
cur = plate
while cur is not None:
if isinstance(cur, PRCXI9300Deck):
return cur
cur = getattr(cur, "parent", None)
if self._handler is not None:
handler_deck = getattr(self._handler, "deck", None)
if isinstance(handler_deck, PRCXI9300Deck):
return handler_deck
return None
def _deck_plate_slot_no(self, plate, deck=None) -> int:
"""台面板位槽号116优先 _get_slot_number否则沿父链/handler.deck 找到 deck 后取序号+1。"""
sn = PRCXI9300Handler._get_slot_number(plate)
if sn is not None:
return sn
return deck.children.index(plate) + 1
actual_deck = self._resolve_deck(plate, deck)
if actual_deck is None:
raise RuntimeError(
f"无法定位 {getattr(plate, 'name', '?')} 所在的 PRCXI9300Deck"
"请确认 tip_rack/plate 已挂到 self.deck或在 unilabos_extra 中提供 update_resource_site=Tn。"
)
if plate in actual_deck.children:
index = actual_deck.children.index(plate)
plate_new = actual_deck.children[index]
sn = PRCXI9300Handler._get_slot_number(plate_new)
if sn is not None:
return sn
else:
raise RuntimeError(
f"无法定位 {getattr(plate_new, 'name', '?')} 所在的 PRCXI9300Deck"
f"x: {plate_new.location}"
)
# 名字兜底(远端解析回来的实例与 deck 上的不是同一对象)
plate_name = getattr(plate, "name", None)
if plate_name is not None:
for i, c in enumerate(actual_deck.children):
if getattr(c, "name", None) == plate_name:
return i + 1
raise RuntimeError(
f"{getattr(plate, 'name', '?')} 不在 deck.children 中且无可解析的槽位号"
)
@staticmethod
def _resource_num_items_y(resource) -> int:
@@ -1977,7 +2115,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
assist_fun1 = ""
if ops[0].blow_out_air_volume is not None:
assist_fun1 = f"吹样({float(min(max(ops[0].blow_out_air_volume,0),10))}ul)"
assist_fun1 = f"吹样({float(min(max(ops[0].blow_out_air_volume,5),10))}ul)"
else :
assist_fun1 = f"吹样({5.0}ul)"
step = self.api_client.Tapping(
axis=axis,