Merge branch 'prcix9320' into sjs_middle_school

This commit is contained in:
q434343
2026-03-31 18:48:20 +08:00
parent a48985720c
commit 5c9c8a4ee9
10 changed files with 1751 additions and 324 deletions

View File

@@ -57,7 +57,8 @@ from unilabos.devices.liquid_handling.liquid_handler_abstract import (
)
from unilabos.registry.placeholder_type import ResourceSlot
from unilabos.resources.itemized_carrier import ItemizedCarrier
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
from unilabos.resources.resource_tracker import ResourceTreeSet
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class PRCXIError(RuntimeError):
@@ -109,11 +110,19 @@ class PRCXI9300Deck(Deck):
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
# T1-T16 默认位置 (4列×4行)
_DEFAULT_SITE_POSITIONS = [((i%4)*137.5+5, (3-int(i/4))*96+13, 0) for i in range(0, 16)]
_9320_SITE_POSITIONS = [((i%4)*137.5+5, (3-int(i/4))*96+13, 0) for i in range(0, 16)]
# 9300: 3列×2行 = 6 slots间距与9320相同X: 138mm, Y: 96mm
_9300_SITE_POSITIONS = [
(0, 96, 0), (138, 96, 0), (276, 96, 0), # T1-T3 (第1行, 上)
(0, 0, 0), (138, 0, 0), (276, 0, 0), # T4-T6 (第2行, 下)
]
# 向后兼容别名
_DEFAULT_SITE_POSITIONS = _9320_SITE_POSITIONS
_DEFAULT_SITE_SIZE = {"width": 128.0, "height": 86, "depth": 0}
_DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "plates", "tip_racks", "tube_rack", "adaptor"]
_DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "plates", "tip_racks", "tube_rack", "adaptor", "plateadapter", "module", "trash"]
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
sites: Optional[List[Dict[str, Any]]] = None, **kwargs):
@@ -190,6 +199,7 @@ class PRCXI9300Deck(Deck):
def serialize(self) -> dict:
data = super().serialize()
data["model"] = self.model
sites_out = []
for i, site in enumerate(self.sites):
occupied = self._get_site_resource(i)
@@ -276,30 +286,9 @@ class PRCXI9300Plate(Plate):
elif value is None:
ordering_param = ordering
else:
# ordering 的值是对象(可能是 Well 对象),检查是否有有效的 location
# 如果是反序列化过程Well 对象可能没有正确的 location需要让 Plate 重新创建
sample_value = next(iter(ordering.values()), None)
if sample_value is not None and hasattr(sample_value, 'location'):
# 如果是 Well 对象但 location 为 None说明是反序列化过程
# 让 Plate 自己创建 Well 对象
if sample_value.location is None:
items = None
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# Well 对象有有效的 location可以直接使用
items = ordering
ordering_param = None
elif sample_value is None:
# ordering 的值都是 None让 Plate 自己创建 Well 对象
items = None
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# 其他情况,直接使用
items = ordering
ordering_param = None
else:
items = None
ordering_param = collections.OrderedDict() # 提供空的 ordering
# ordering 的值已经是对象,可以直接使用
items = ordering
ordering_param = None
# 根据情况传递不同的参数
if items is not None:
@@ -381,16 +370,9 @@ class PRCXI9300TipRack(TipRack):
items = None
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,需要过滤掉 None 值
# 只保留有效的对象,用于 ordered_items 参数
valid_items = {k: v for k, v in ordering.items() if v is not None}
if valid_items:
items = valid_items
ordering_param = None
else:
# 如果没有有效对象,使用 ordering 参数
items = None
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
# ordering 的值已经是对象,可以直接使用
items = ordering
ordering_param = None
else:
items = None
ordering_param = None
@@ -449,11 +431,17 @@ class PRCXI9300Trash(Trash):
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
category: str = "plate",
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
category: str = "trash",
material_info: Optional[Dict[str, Any]] = None,
**kwargs,
):
if name != "trash":
print(f"Warning: PRCXI9300Trash usually expects name='trash' for backend logic, but got '{name}'.")
super().__init__(name, size_x, size_y, size_z, **kwargs)
@@ -521,25 +509,20 @@ class PRCXI9300TubeRack(TubeRack):
items_to_pass = ordered_items
ordering_param = None
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,这种情况下我们让 TubeRack 使用默认行为
# 不在初始化时创建 items而是在 deserialize 后处理
# 检查 ordering 中的值类型来决定如何处理:
# - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param
# - None 值(从第二次往返序列化): 同样只用键创建 ordering_param
# - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用
first_val = next(iter(ordering.values()), None) if ordering else None
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象
items_to_pass = None
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) # 提供空的 ordering 来满足要求
# 保存 ordering 信息以便后续处理
self._temp_ordering = ordering
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,需要过滤掉 None 值
# 只保留有效的对象,用于 ordered_items 参数
valid_items = {k: v for k, v in ordering.items() if v is not None}
if valid_items:
items_to_pass = valid_items
ordering_param = None
else:
# 如果没有有效对象,创建空的 ordered_items
items_to_pass = {}
ordering_param = None
# ordering 的值已经是对象,可以直接使用
items_to_pass = ordering
ordering_param = None
elif items is not None:
# 兼容旧的 items 参数
items_to_pass = items
@@ -561,29 +544,6 @@ class PRCXI9300TubeRack(TubeRack):
if material_info:
self._unilabos_state["Material"] = material_info
# 如果有临时 ordering 信息,在初始化完成后处理
if hasattr(self, '_temp_ordering') and self._temp_ordering:
self._process_temp_ordering()
def _process_temp_ordering(self):
"""处理临时的 ordering 信息,创建相应的 Tube 对象"""
from pylabrobot.resources import Tube, Coordinate
for location, item_type in self._temp_ordering.items():
if item_type == 'Tube' or item_type == 'tube':
# 为每个位置创建 Tube 对象
tube = Tube(name=f"{self.name}_{location}", size_x=10, size_y=10, size_z=50, max_volume=2000.0)
# 使用 assign_child_resource 添加到 rack 中
self.assign_child_resource(tube, location=Coordinate(0, 0, 0))
# 清理临时数据
del self._temp_ordering
def load_state(self, state: Dict[str, Any]) -> None:
"""从给定的状态加载工作台信息。"""
# super().load_state(state)
self._unilabos_state = state
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
try:
data = super().serialize_state()
@@ -610,62 +570,66 @@ class PRCXI9300TubeRack(TubeRack):
data.update(safe_state)
return data
class PRCXI9300PlateAdapterSite(ItemizedCarrier):
class PRCXI9300ModuleSite(ItemizedCarrier):
"""
PRCXI 功能模块的基础站点类(加热/冷却/震荡/磁吸等)。
- 继承 ItemizedCarrier可被拖放到 Deck 槽位上
- 顶面有一个 ResourceHolder 站点,可吸附板类资源(叠放)
- content_type 包含 "plateadapter" 以支持适配器叠放
- 支持 material_info 注入
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
material_info: Optional[Dict[str, Any]] = None, **kwargs):
# 处理 sites 参数的不同格式
sites = create_homogeneous_resources(
klass=ResourceHolder,
locations=[Coordinate(0, 0, 0)],
resource_size_x=size_x,
resource_size_y=size_y,
resource_size_z=size_z,
name_prefix=name,
klass=ResourceHolder,
locations=[Coordinate(0, 0, 0)],
resource_size_x=size_x,
resource_size_y=size_y,
resource_size_z=size_z,
name_prefix=name,
)[0]
# 确保不传递重复的参数
kwargs.pop('layout', None)
sites_in = kwargs.pop('sites', None)
# 创建默认的sites字典
sites_dict = {name: sites}
# 优先从 sites_in 读取 'content_type',否则使用默认值
content_type = [
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack"
]
# 如果提供了sites参数则用sites_in中的值替换sites_dict中对应的元素
"plate",
"tip_rack",
"plates",
"tip_racks",
"tube_rack",
"plateadapter",
]
if sites_in is not None and isinstance(sites_in, dict):
for site_key, site_value in sites_in.items():
if site_key in sites_dict:
sites_dict[site_key] = site_value
super().__init__(name, size_x, size_y, size_z,
sites=sites_dict,
num_items_x=kwargs.pop('num_items_x', 1),
num_items_y=kwargs.pop('num_items_y', 1),
num_items_z=kwargs.pop('num_items_z', 1),
content_type=content_type,
**kwargs)
super().__init__(
name, size_x, size_y, size_z,
sites=sites_dict,
num_items_x=kwargs.pop('num_items_x', 1),
num_items_y=kwargs.pop('num_items_y', 1),
num_items_z=kwargs.pop('num_items_z', 1),
content_type=content_type,
**kwargs,
)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
def assign_child_resource(self, resource, location=Coordinate(0, 0, 0), reassign=True, spot=None):
"""重写 assign_child_resource 方法,对于适配器位置,不使用索引分配"""
# 直接调用 Resource 的 assign_child_resource避免 ItemizedCarrier 的索引逻辑
from pylabrobot.resources.resource import Resource
Resource.assign_child_resource(self, resource, location=location, reassign=reassign)
def unassign_child_resource(self, resource):
"""重写 unassign_child_resource 方法,对于适配器位置,不使用 sites 列表"""
# 直接调用 Resource 的 unassign_child_resource避免 ItemizedCarrier 的 sites 逻辑
from pylabrobot.resources.resource import Resource
Resource.unassign_child_resource(self, resource)
@@ -675,13 +639,10 @@ class PRCXI9300PlateAdapterSite(ItemizedCarrier):
except AttributeError:
data = {}
# 包含 sites 配置信息,但避免序列化 ResourceHolder 对象
if hasattr(self, 'sites') and self.sites:
# 只保存 sites 的基本信息,不保存 ResourceHolder 对象本身
sites_info = []
for site in self.sites:
if hasattr(site, '__class__') and 'pylabrobot' in str(site.__class__.__module__):
# 对于 pylabrobot 对象,只保存基本信息
sites_info.append({
"__pylabrobot_object__": True,
"class": site.__class__.__name__,
@@ -692,13 +653,23 @@ class PRCXI9300PlateAdapterSite(ItemizedCarrier):
sites_info.append(site)
data['sites'] = sites_info
if hasattr(self, "_unilabos_state") and self._unilabos_state:
safe_state: Dict[str, Any] = {}
for k, v in self._unilabos_state.items():
if k == "Material" and isinstance(v, dict):
safe_material: Dict[str, Any] = {}
for mk, mv in v.items():
if isinstance(mv, (str, int, float, bool, list, dict, type(None))):
safe_material[mk] = mv
safe_state[k] = safe_material
elif isinstance(v, (str, int, float, bool, list, dict, type(None))):
safe_state[k] = v
data.update(safe_state)
return data
def load_state(self, state: Dict[str, Any]) -> None:
"""加载状态,包括 sites 配置信息"""
super().load_state(state)
# 从状态中恢复 sites 配置信息
if 'sites' in state:
self.sites = [state['sites']]
@@ -793,7 +764,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
def __init__(
self,
deck: Deck,
deck: PRCXI9300Deck,
host: str,
port: int,
timeout: float,
@@ -828,12 +799,25 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
self.xy_coupling = xy_coupling
self.left_2_claw = Coordinate(-130.2, 34, -134)
self.right_2_left = Coordinate(22,-1, 8)
tablets_info = {}
plate_positions = []
tablets_info = []
if is_9320 is None:
is_9320 = getattr(deck, 'model', '9300') == '9320'
if is_9320:
print("当前设备是9320")
else:
for site_id in range(len(deck.sites)):
child = deck._get_site_resource(site_id)
# 如果放其他类型的物料,是不可以的
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
number = site_id + 1
tablets_info.append(
WorkTablets(
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]
)
)
# 始终初始化 step_mode 属性
self.step_mode = False
if step_mode:
@@ -841,13 +825,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
self.step_mode = step_mode
else:
print("9300设备不支持 单点动作模式")
self._unilabos_backend = PRCXI9300Backend(
tablets_info, plate_positions, host, port, timeout, channel_num, axis, setup, debug, matrix_id, is_9320,
x_increase, y_increase, x_offset, y_offset,
deck_z, deck_x=self.deck_x, deck_y=self.deck_y, xy_coupling=xy_coupling
tablets_info, host, port, timeout, channel_num, axis, setup, debug, matrix_id, is_9320
)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
self._first_transfer_done = False
@@ -1245,32 +1225,18 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
try:
return await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
return await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread="custom",
**backend_kwargs,
)
raise
return await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
async def drop_tips(
self,
@@ -1294,33 +1260,17 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
try:
return await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
# 目标资源过小无法分布多通道时,退化为 custom所有通道对准中心
return await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
"custom",
**backend_kwargs,
)
raise
return await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
async def discard_tips(
self,
@@ -1340,11 +1290,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
return await self._unilabos_backend.shaker_action(time, module_no, amplitude, is_wait)
async def magnetic_action(self, time: int, module_no: int, height: int, is_wait: bool):
return await self._unilabos_backend.magnetic_action(time, module_no, height, is_wait)
async def shaking_incubation_action(self, time: int, module_no: int, amplitude: int, is_wait: bool, temperature: int):
return await self._unilabos_backend.shaking_incubation_action(time, module_no, amplitude, is_wait, temperature)
async def heater_action(self, temperature: float, time: int):
return await self._unilabos_backend.heater_action(temperature, time)
@@ -1361,7 +1306,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
**backend_kwargs,
):
res = await super().move_plate(
return await super().move_plate(
plate,
to,
intermediate_locations,
@@ -1373,12 +1318,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
target_plate_number=to,
**backend_kwargs,
)
plate.unassign()
to.assign_child_resource(plate, location=Coordinate(0, 0, 0))
ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{
"resources": [self.deck]
})
return res
class PRCXI9300Backend(LiquidHandlerBackend):
@@ -1403,7 +1342,6 @@ class PRCXI9300Backend(LiquidHandlerBackend):
def __init__(
self,
tablets_info: list[WorkTablets],
plate_positions: dict[int, Coordinate],
host: str = "127.0.0.1",
port: int = 9999,
timeout: float = 10.0,
@@ -1412,19 +1350,10 @@ class PRCXI9300Backend(LiquidHandlerBackend):
setup=True,
debug=False,
matrix_id="",
is_9320=False,
x_increase = 0,
y_increase = 0,
x_offset = 0,
y_offset = 0,
deck_z = 300,
deck_x = 0,
deck_y = 0,
xy_coupling = 0.0,
is_9320=False,
) -> None:
super().__init__()
self.tablets_info = tablets_info
self.plate_positions = plate_positions
self.matrix_id = matrix_id
self.api_client = PRCXI9300Api(host, port, timeout, axis, debug, is_9320)
self.host, self.port, self.timeout = host, port, timeout
@@ -1432,15 +1361,6 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._execute_setup = setup
self.debug = debug
self.axis = "Left"
self.x_increase = x_increase
self.y_increase = y_increase
self.xy_coupling = xy_coupling
self.x_offset = x_offset
self.y_offset = y_offset
self.deck_x = deck_x
self.deck_y = deck_y
self.deck_z = deck_z
self.tip_length = 0
@staticmethod
def _deck_plate_slot_no(plate, deck) -> int:
@@ -1470,27 +1390,6 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.steps_todo_list.append(step)
return step
async def shaking_incubation_action(self, time: int, module_no: int, amplitude: int, is_wait: bool, temperature: int):
step = self.api_client.shaking_incubation_action(
time=time,
module_no=module_no,
amplitude=amplitude,
is_wait=is_wait,
temperature=temperature,
)
self.steps_todo_list.append(step)
return step
async def magnetic_action(self, time: int, module_no: int, height: int, is_wait: bool):
step = self.api_client.magnetic_action(
time=time,
module_no=module_no,
height=height,
is_wait=is_wait,
)
self.steps_todo_list.append(step)
return step
async def pick_up_resource(self, pickup: ResourcePickup, **backend_kwargs):
resource = pickup.resource
@@ -1529,8 +1428,6 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._ros_node = ros_node
def create_protocol(self, protocol_name):
if protocol_name == "":
protocol_name = f"protocol_{time.time()}"
self.protocol_name = protocol_name
self.steps_todo_list = []
@@ -2047,10 +1944,10 @@ class PRCXI9300Api:
start = False
while not success:
status = self.step_state_list()
if status is None:
break
if len(status) == 1:
start = True
if status is None:
break
if len(status) == 0:
break
if status[-1]["State"] == 2 and start:
@@ -2386,26 +2283,6 @@ class PRCXI9300Api:
"AssistFun4": is_wait,
}
def shaking_incubation_action(self, time: int, module_no: int, amplitude: int, is_wait: bool, temperature: int):
return {
"StepAxis": "Left",
"Function": "Shaking_Incubation",
"AssistFun1": time,
"AssistFun2": module_no,
"AssistFun3": amplitude,
"AssistFun4": is_wait,
"AssistFun5": temperature,
}
def magnetic_action(self, time: int, module_no: int, height: int, is_wait: bool):
return {
"StepAxis": "Left",
"Function": "Magnetic",
"AssistFun1": time,
"AssistFun2": module_no,
"AssistFun3": height,
"AssistFun4": is_wait,
}
class DefaultLayout:
@@ -2420,8 +2297,20 @@ class DefaultLayout:
self.rows = 2
self.columns = 3
self.layout = [1, 2, 3, 4, 5, 6]
self.trash_slot = 3
self.waste_liquid_slot = 6
self.trash_slot = 6
self.default_layout = {
"MatrixId": f"{time.time()}",
"MatrixName": f"{time.time()}",
"MatrixCount": 6,
"WorkTablets": [
{"Number": 1, "Code": "T1", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 2, "Code": "T2", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 3, "Code": "T3", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 4, "Code": "T4", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 5, "Code": "T5", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 6, "Code": "T6", "Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0}}, # trash
],
}
elif product_name == "PRCXI9320":
self.rows = 4
@@ -2437,94 +2326,96 @@ class DefaultLayout:
{
"Number": 1,
"Code": "T1",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 2,
"Code": "T2",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 3,
"Code": "T3",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 4,
"Code": "T4",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 5,
"Code": "T5",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 6,
"Code": "T6",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 7,
"Code": "T7",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 8,
"Code": "T8",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 9,
"Code": "T9",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 10,
"Code": "T10",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 11,
"Code": "T11",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 12,
"Code": "T12",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9"},
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
}, # 这个设置成废液槽,用储液槽表示
{
"Number": 13,
"Code": "T13",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 14,
"Code": "T14",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 15,
"Code": "T15",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f"},
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 16,
"Code": "T16",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9"},
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
}, # 这个设置成垃圾桶,用储液槽表示
],
}
def get_layout(self) -> Dict[str, Any]:
return {
result = {
"rows": self.rows,
"columns": self.columns,
"layout": self.layout,
"trash_slot": self.trash_slot,
"waste_liquid_slot": self.waste_liquid_slot,
}
if hasattr(self, 'waste_liquid_slot'):
result["waste_liquid_slot"] = self.waste_liquid_slot
return result
def get_trash_slot(self) -> int:
return self.trash_slot
@@ -2542,15 +2433,18 @@ class DefaultLayout:
if material_name not in self.labresource:
raise ValueError(f"Material {reagent_name} not found in lab resources.")
# 预留位置12和16不
reserved_positions = {12, 16}
available_positions = [i for i in range(1, 17) if i not in reserved_positions]
# 预留位置动态计算
reserved_positions = {self.trash_slot}
if hasattr(self, 'waste_liquid_slot'):
reserved_positions.add(self.waste_liquid_slot)
total_slots = self.rows * self.columns
available_positions = [i for i in range(1, total_slots + 1) if i not in reserved_positions]
# 计算总需求
total_needed = sum(count for _, _, count in needs)
if total_needed > len(available_positions):
raise ValueError(
f"需要 {total_needed} 个位置,但只有 {len(available_positions)} 个可用位置(排除位置12和16"
f"需要 {total_needed} 个位置,但只有 {len(available_positions)} 个可用位置(排除预留位置 {reserved_positions}"
)
# 依次分配位置