mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-24 11:49:14 +00:00
fix container volume
update materials 更新prcxi deck & 新增 unilabos_resource_slot new workflow & prcxi slot removal fix size change
This commit is contained in:
@@ -55,6 +55,7 @@ from unilabos.devices.liquid_handling.liquid_handler_abstract import (
|
||||
TransferLiquidReturn,
|
||||
)
|
||||
from unilabos.registry.placeholder_type import ResourceSlot
|
||||
from unilabos.resources.resource_tracker import ResourceTreeSet
|
||||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
|
||||
|
||||
|
||||
@@ -90,20 +91,103 @@ class PRCXI9300Deck(Deck):
|
||||
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
|
||||
# T1-T16 默认位置 (4列×4行)
|
||||
_DEFAULT_SITE_POSITIONS = [
|
||||
(0, 0, 0), (138, 0, 0), (276, 0, 0), (414, 0, 0), # T1-T4
|
||||
(0, 96, 0), (138, 96, 0), (276, 96, 0), (414, 96, 0), # T5-T8
|
||||
(0, 192, 0), (138, 192, 0), (276, 192, 0), (414, 192, 0), # T9-T12
|
||||
(0, 288, 0), (138, 288, 0), (276, 288, 0), (414, 288, 0), # T13-T16
|
||||
]
|
||||
_DEFAULT_SITE_SIZE = {"width": 128.0, "height": 86, "depth": 0}
|
||||
_DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "plates", "tip_racks", "tube_rack", "adaptor"]
|
||||
|
||||
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
|
||||
sites: Optional[List[Dict[str, Any]]] = None, **kwargs):
|
||||
super().__init__(size_x, size_y, size_z, name)
|
||||
self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位
|
||||
self.slot_locations = [Coordinate(0, 0, 0)] * 16
|
||||
if sites is not None:
|
||||
self.sites: List[Dict[str, Any]] = [dict(s) for s in sites]
|
||||
else:
|
||||
self.sites = []
|
||||
for i, (x, y, z) in enumerate(self._DEFAULT_SITE_POSITIONS):
|
||||
self.sites.append({
|
||||
"label": f"T{i + 1}",
|
||||
"visible": True,
|
||||
"position": {"x": x, "y": y, "z": z},
|
||||
"size": dict(self._DEFAULT_SITE_SIZE),
|
||||
"content_type": list(self._DEFAULT_CONTENT_TYPE),
|
||||
})
|
||||
# _ordering: label -> None, 用于外部通过 list(keys()).index(site) 将 Tn 转换为 spot index
|
||||
self._ordering = collections.OrderedDict(
|
||||
(site["label"], None) for site in self.sites
|
||||
)
|
||||
|
||||
def _get_site_location(self, idx: int) -> Coordinate:
|
||||
pos = self.sites[idx]["position"]
|
||||
return Coordinate(pos["x"], pos["y"], pos["z"])
|
||||
|
||||
def _get_site_resource(self, idx: int) -> Optional[Resource]:
|
||||
site_loc = self._get_site_location(idx)
|
||||
for child in self.children:
|
||||
if child.location == site_loc:
|
||||
return child
|
||||
return None
|
||||
|
||||
def assign_child_resource(
|
||||
self,
|
||||
resource: Resource,
|
||||
location: Optional[Coordinate] = None,
|
||||
reassign: bool = True,
|
||||
spot: Optional[int] = None,
|
||||
):
|
||||
idx = spot
|
||||
if spot is not None:
|
||||
idx = spot
|
||||
else:
|
||||
for i, site in enumerate(self.sites):
|
||||
site_loc = self._get_site_location(i)
|
||||
if site.get("label") == resource.name:
|
||||
idx = i
|
||||
break
|
||||
if location is not None and site_loc == location:
|
||||
idx = i
|
||||
break
|
||||
|
||||
if idx is None:
|
||||
for i in range(len(self.sites)):
|
||||
if self._get_site_resource(i) is None:
|
||||
idx = i
|
||||
break
|
||||
|
||||
if idx is None:
|
||||
raise ValueError(f"No available site on deck '{self.name}' for resource '{resource.name}'")
|
||||
|
||||
if not reassign and self._get_site_resource(idx) is not None:
|
||||
raise ValueError(f"Site {idx} ('{self.sites[idx]['label']}') is already occupied")
|
||||
|
||||
loc = self._get_site_location(idx)
|
||||
super().assign_child_resource(resource, location=loc, reassign=reassign)
|
||||
|
||||
def assign_child_at_slot(self, resource: Resource, slot: int, reassign: bool = False) -> None:
|
||||
if self.slots[slot - 1] is not None and not reassign:
|
||||
raise ValueError(f"Spot {slot} is already occupied")
|
||||
self.assign_child_resource(resource, spot=slot - 1, reassign=reassign)
|
||||
|
||||
self.slots[slot - 1] = resource
|
||||
super().assign_child_resource(resource, location=self.slot_locations[slot - 1])
|
||||
def serialize(self) -> dict:
|
||||
data = super().serialize()
|
||||
sites_out = []
|
||||
for i, site in enumerate(self.sites):
|
||||
occupied = self._get_site_resource(i)
|
||||
sites_out.append({
|
||||
"label": site["label"],
|
||||
"visible": site.get("visible", True),
|
||||
"occupied_by": occupied.name if occupied is not None else None,
|
||||
"position": site["position"],
|
||||
"size": site["size"],
|
||||
"content_type": site["content_type"],
|
||||
})
|
||||
data["sites"] = sites_out
|
||||
return data
|
||||
|
||||
|
||||
class PRCXI9300Container(Plate):
|
||||
class PRCXI9300Container(Container):
|
||||
"""PRCXI 9300 的专用 Container 类,继承自 Plate,用于槽位定位和未知模块。
|
||||
|
||||
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
|
||||
@@ -116,11 +200,10 @@ class PRCXI9300Container(Plate):
|
||||
size_y: float,
|
||||
size_z: float,
|
||||
category: str,
|
||||
ordering: collections.OrderedDict,
|
||||
model: Optional[str] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
|
||||
super().__init__(name, size_x, size_y, size_z, category=category, model=model)
|
||||
self._unilabos_state = {}
|
||||
|
||||
def load_state(self, state: Dict[str, Any]) -> None:
|
||||
@@ -567,14 +650,14 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
|
||||
tablets_info = []
|
||||
count = 0
|
||||
for child in deck.children:
|
||||
if child.children:
|
||||
if "Material" in child.children[0]._unilabos_state:
|
||||
number = int(child.name.replace("T", ""))
|
||||
tablets_info.append(
|
||||
WorkTablets(
|
||||
Number=number, Code=f"T{number}", Material=child.children[0]._unilabos_state["Material"]
|
||||
)
|
||||
# 如果放其他类型的物料,是不可以的
|
||||
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
|
||||
number = int(child.name.replace("T", ""))
|
||||
tablets_info.append(
|
||||
WorkTablets(
|
||||
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]
|
||||
)
|
||||
)
|
||||
if is_9320:
|
||||
print("当前设备是9320")
|
||||
# 始终初始化 step_mode 属性
|
||||
|
||||
Reference in New Issue
Block a user