更新prcxi的版面更新与工作流上传方法

This commit is contained in:
q434343
2026-03-25 19:42:25 +08:00
parent 04c0564366
commit ad2e5a1c04
4 changed files with 237 additions and 49 deletions

View File

@@ -793,8 +793,8 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
rail_interval=0,
x_increase = -0.003636,
y_increase = -0.003636,
x_offset = -0.8,
y_offset = -37.98,
x_offset = 9.2,
y_offset = -27.98,
deck_z = 300,
deck_y = 400,
rail_width=27.5,
@@ -809,28 +809,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
self.x_offset = x_offset
self.y_offset = y_offset
self.xy_coupling = xy_coupling
self.left_2_claw = Coordinate(-130.2, 34, -134)
self.right_2_left = Coordinate(22,-1, 8)
tablets_info = {}
plate_positions = []
for child in deck.children:
number = int(child.name.replace("T", ""))
if child.children:
if "Material" in child.children[0]._unilabos_state:
tablets_info[number] = child.children[0]._unilabos_state["Material"].get("uuid", "730067cf07ae43849ddf4034299030e9")
else:
tablets_info[number] = "730067cf07ae43849ddf4034299030e9"
else:
tablets_info[number] = "730067cf07ae43849ddf4034299030e9"
pos = self.plr_pos_to_prcxi(child)
plate_positions.append(
{
"Number": number,
"XPos": pos.x,
"YPos": pos.y,
"ZPos": pos.z
}
)
if is_9320:
@@ -850,8 +832,180 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
def plr_pos_to_prcxi(self, resource: Resource):
self._first_transfer_done = False
@staticmethod
def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
extra = getattr(resource, "unilabos_extra", {}) or {}
site = extra.get("update_resource_site", "")
if site:
digits = "".join(c for c in str(site) if c.isdigit())
return int(digits) if digits else None
# 使用 resource.location.x 和 resource.location.y 反算槽位号
# 参考 _DEFAULT_SITE_POSITIONS: x = (i%4)*137.5+5, y = (int(i/4))*96+13
loc = getattr(resource, "location", None)
if loc is not None and loc.x is not None and loc.y is not None:
col = round((loc.x - 5) / 137.5) # 0-3
row = round(3-(loc.y - 13) / 96) # 0-3
idx = row * 4 + col # 0-15
if 0 <= idx < 16:
return idx + 1 # 槽位号从 1 开始
return None
def _match_and_create_matrix(self):
"""首次 transfer_liquid 时,根据 deck 上的 resource 自动匹配耗材并创建 WorkTabletMatrix。"""
backend = self._unilabos_backend
api = backend.api_client
if backend.matrix_id:
return
material_list = api.get_all_materials()
if not material_list:
return
# 按 materialEnum 分组: {enum_value: [material, ...]}
material_dict = {}
material_uuid_map = {}
for m in material_list:
enum_key = m.get("materialEnum")
material_dict.setdefault(enum_key, []).append(m)
if "uuid" in m:
material_uuid_map[m["uuid"]] = m
work_tablets = []
slot_none = [i for i in range(1, 17)]
for child in self.deck.children:
resource = child
number = self._get_slot_number(resource)
if number is None:
continue
# 如果 resource 已有 Material UUID直接使用
if hasattr(resource, "_unilabos_state") and "Material" in getattr(resource, "_unilabos_state", {}):
mat_uuid = resource._unilabos_state["Material"].get("uuid")
if mat_uuid and mat_uuid in material_uuid_map:
work_tablets.append({"Number": number, "Material": material_uuid_map[mat_uuid]})
continue
# 根据 resource 类型推断 materialEnum
# MaterialEnum: Other=0, Tips=1, DeepWellPlate=2, PCRPlate=3, ELISAPlate=4, Reservoir=5, WasteBox=6
expected_enum = None
if isinstance(resource, PRCXI9300TipRack) or isinstance(resource, TipRack):
expected_enum = 1 # Tips
elif isinstance(resource, PRCXI9300Trash) or isinstance(resource, Trash):
expected_enum = 6 # WasteBox
elif isinstance(resource, (PRCXI9300Plate, Plate)):
expected_enum = None # Plate 可能是 DeepWellPlate/PCRPlate/ELISAPlate不限定
# 根据 expected_enum 筛选候选耗材列表
if expected_enum is not None:
candidates = material_dict.get(expected_enum, [])
else:
# expected_enum 未确定时,搜索所有耗材
candidates = material_list
# 根据 children 个数和容量匹配最相似的耗材
num_children = len(resource.children)
child_max_volume = None
if resource.children:
first_child = resource.children[0]
if hasattr(first_child, "max_volume") and first_child.max_volume is not None:
child_max_volume = first_child.max_volume
best_material = None
best_score = float("inf")
for material in candidates:
hole_count = (material.get("HoleRow", 0) or 0) * (material.get("HoleColum", 0) or 0)
material_volume = material.get("Volume", 0) or 0
# 孔数差异(高权重优先匹配孔数)
hole_diff = abs(num_children - hole_count)
# 容量差异(归一化)
if child_max_volume is not None and material_volume > 0:
vol_diff = abs(child_max_volume - material_volume) / material_volume
else:
vol_diff = 0
score = hole_diff * 1000 + vol_diff
if score < best_score:
best_score = score
best_material = material
if best_material:
work_tablets.append({"Number": number, "Material": best_material})
slot_none.remove(number)
if not work_tablets:
return
matrix_id = str(uuid.uuid4())
matrix_info = {
"MatrixId": matrix_id,
"MatrixName": matrix_id,
"WorkTablets": work_tablets +
[{"Number": number, "Material": {"uuid": "730067cf07ae43849ddf4034299030e9"}} for number in slot_none],
}
res = api.add_WorkTablet_Matrix(matrix_info)
if res.get("Success"):
backend.matrix_id = matrix_id
backend.matrix_info = matrix_info
# 重新计算所有槽位的位置(初始化时 deck 可能为空,此时才有资源)
pipetting_positions = []
plate_positions = []
for child in self.deck.children:
number = self._get_slot_number(child)
if number is None:
continue
pos = self.plr_pos_to_prcxi(child)
plate_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": pos.z})
if child.children:
pip_pos = self.plr_pos_to_prcxi(child.children[0], self.left_2_claw)
else:
pip_pos = self.plr_pos_to_prcxi(child, Coordinate(50, self.left_2_claw.y, self.left_2_claw.z))
half_x = child.get_size_x() / 2 * abs(1 + self.x_increase)
z_wall = child.get_size_z()
pipetting_positions.append({
"Number": number,
"XPos": pip_pos.x,
"YPos": pip_pos.y,
"ZPos": pip_pos.z,
"X_Left": half_x,
"X_Right": half_x,
"ZAgainstTheWall": pip_pos.z - z_wall,
"X2Pos": pip_pos.x + self.right_2_left.x,
"Y2Pos": pip_pos.y + self.right_2_left.y,
"Z2Pos": pip_pos.z + self.right_2_left.z,
"X2_Left": half_x,
"X2_Right": half_x,
"ZAgainstTheWall2": pip_pos.z - z_wall,
})
if pipetting_positions:
api.update_pipetting_position(matrix_id, pipetting_positions)
# 更新 backend 中的 plate_positions
backend.plate_positions = plate_positions
if plate_positions:
api.update_clamp_jaw_position(matrix_id, plate_positions)
print(f"Auto-matched materials and created matrix: {matrix_id}")
else:
raise PRCXIError(f"Failed to create auto-matched matrix: {res.get('Message', 'Unknown error')}")
def plr_pos_to_prcxi(self, resource: Resource, offset: Coordinate = Coordinate(0, 0, 0)):
resource_pos = resource.get_absolute_location(x="c",y="c",z="t")
x = resource_pos.x
y = resource_pos.y
@@ -869,9 +1023,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
prcxi_y = (self.deck_y - y)*(1+self.y_increase) + self.y_offset
prcxi_z = self.deck_z - z
prcxi_x = min(max(0, prcxi_x),self.deck_x)
prcxi_y = min(max(0, prcxi_y),self.deck_y)
prcxi_z = min(max(0, prcxi_z),self.deck_z)
prcxi_x = min(max(0, prcxi_x+offset.x),self.deck_x)
prcxi_y = min(max(0, prcxi_y+offset.y),self.deck_y)
prcxi_z = min(max(0, prcxi_z+offset.z),self.deck_z)
return Coordinate(prcxi_x, prcxi_y, prcxi_z)
@@ -1008,6 +1162,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
delays: Optional[List[int]] = None,
none_keys: List[str] = [],
) -> TransferLiquidReturn:
if not self._first_transfer_done:
self._match_and_create_matrix()
self._first_transfer_done = True
if self.step_mode:
await self.create_protocol(f"transfer_liquid{time.time()}")
res = await super().transfer_liquid(
@@ -1069,10 +1226,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
offsets: Optional[List[Coordinate]] = None,
**backend_kwargs,
):
if self.step_mode:
await self.create_protocol(f"单点动作{time.time()}")
await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
await self.run_protocol()
return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
async def aspirate(
@@ -1432,7 +1585,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
await asyncio.sleep(1)
print("PRCXI9300 reset successfully.")
self.api_client.update_clamp_jaw_position(self.matrix_id, self.plate_positions)
# self.api_client.update_clamp_jaw_position(self.matrix_id, self.plate_positions)
except ConnectionRefusedError as e:
raise RuntimeError(
@@ -1460,8 +1613,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
deck = plate.parent
plate_index = deck.children.index(plate)
# print(f"Plate index: {plate_index}, Plate name: {plate.name}")
# print(f"Number of children in deck: {len(deck.children)}")
@@ -1535,8 +1688,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
deck = plate.parent
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
raise ValueError(
@@ -1595,7 +1748,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
for op in targets:
deck = op.parent.parent.parent
plate = op.parent
plate_index = deck.children.index(plate.parent)
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -1646,8 +1799,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
deck = plate.parent
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -1703,8 +1856,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
plate_indexes = []
for op in ops:
plate = op.resource.parent
deck = plate.parent.parent
plate_index = deck.children.index(plate.parent)
deck = plate.parent
plate_index = deck.children.index(plate)
plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
@@ -1939,11 +2092,19 @@ class PRCXI9300Api:
def update_clamp_jaw_position(self, target_matrix_id: str, plate_positions: List[Dict[str, Any]]):
position_params = {
"MatrixId": target_matrix_id,
"MatrixId": target_matrix_id,
"WorkTablets": plate_positions
}
return self.call("IMatrix", "UpdateClampJawPosition", [position_params])
def update_pipetting_position(self, target_matrix_id: str, pipetting_positions: List[Dict[str, Any]]):
"""UpdatePipettingPosition - 更新移液位置"""
position_params = {
"MatrixId": target_matrix_id,
"WorkTablets": pipetting_positions
}
return self.call("IMatrix", "UpdatePipettingPosition", [position_params])
def add_WorkTablet_Matrix(self, matrix: MatrixInfo):
return self.call("IMatrix", "AddWorkTabletMatrix2" if self.is_9320 else "AddWorkTabletMatrix", [matrix])