Compare commits

..

12 Commits

Author SHA1 Message Date
q434343
68ef739f4a Merge pull request #253 from ALITTLELZ/9300
Add PRCXI 9300 (3x2) deck layout support
2026-03-31 17:28:52 +08:00
ALITTLELZ
29a484f16f Add "trash" to site content_type in Deck and experiment JSONs
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 17:08:20 +08:00
ALITTLELZ
14cf4ddc0d Add PRCXI 9300 (3x2) deck layout support via model parameter
PRCXI9300Deck now accepts model="9300"|"9320" to auto-select 6-slot or
16-slot layout. DefaultLayout gains default_layout for 9300 with T6 as
trash. PRCXI9300Handler auto-derives is_9320 from deck.model when not
explicitly passed. Includes 9300 slim experiment JSON and test fixes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 16:00:26 +08:00
q434343
d13d3f7dfe Merge pull request #250 from ALITTLELZ/adaptors
Add PRCXI functional modules and fix Deck layout
2026-03-26 12:27:06 +08:00
ALITTLELZ
71d35d31af Register PRCXI9300ModuleSite/FunctionalModule for PLR deserialization
Added PRCXI9300ModuleSite and PRCXI9300FunctionalModule to the PLR
class registration in plr_additional_res_reg.py so find_subclass can
locate them during deserialization of cached cloud data. Also added
"module" and "carrier" to replace_plr_type and TYPE_MAP in
resource_tracker.py to suppress unknown type warnings.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 18:32:01 +08:00
ALITTLELZ
7f4b57f589 Fix Deck slot Y-axis inversion: T1 should be top-left, not bottom-left
Upstream rewrite of PRCXI9300Deck lost the Y-axis flip logic from the
original `(3-row)*96+13` formula. T1-T4 were rendered at the bottom
instead of the top. Reversed _DEFAULT_SITE_POSITIONS Y coordinates and
updated prcxi_9320_slim.json accordingly. Also added "plateadapter" and
"module" to slim JSON content_type entries.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 17:16:04 +08:00
ALITTLELZ
0c667e68e6 Remove deprecated PRCXI9300PlateAdapterSite, replaced by PRCXI9300ModuleSite
PRCXI9300PlateAdapterSite was already removed by upstream/prcix9320.
Its functionality is now provided by PRCXI9300ModuleSite which serves
as the base class for functional modules (heating/cooling/shaking/magnetic).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 16:22:39 +08:00
ALITTLELZ
9430be51a4 Merge remote-tracking branch 'upstream/prcix9320' into adaptors
# Conflicts:
#	unilabos/devices/liquid_handling/prcxi/prcxi.py
2026-03-25 16:04:17 +08:00
ALITTLELZ
a187a57430 Add PRCXI functional modules (heating/cooling/shaking/magnetic) and registry config
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 15:19:48 +08:00
q434343
68029217de Merge branch 'dev' into prcix9320 2026-03-25 14:44:52 +08:00
q434343
792504e08c Update .gitignore 2026-03-25 14:39:02 +08:00
ALITTLELZ
ca985f92ab Add 'plateadapter' to device and test configurations 2026-03-02 14:35:12 +08:00
18 changed files with 2624 additions and 5228 deletions

View File

@@ -2,7 +2,7 @@
package: package:
name: unilabos-env name: unilabos-env
version: 0.10.19 version: 0.10.17
build: build:
noarch: generic noarch: generic

4
.gitignore vendored
View File

@@ -253,4 +253,8 @@ test_config.py
/.claude /.claude
/.conda
/.cursor /.cursor
/.github
/.conda/base
.conda/base/recipe.yaml

View File

@@ -1,95 +1,4 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. Please follow the rules defined in:
## Build & Development @AGENTS.md
```bash
# Install (requires mamba env with python 3.11)
mamba create -n unilab python=3.11.14
mamba activate unilab
mamba install uni-lab::unilabos-env -c robostack-staging -c conda-forge
pip install -e .
uv pip install -r unilabos/utils/requirements.txt
# Run with a device graph
unilab --graph <graph.json> --config <config.py> --backend ros
unilab --graph <graph.json> --config <config.py> --backend simple # no ROS2 needed
# Common CLI flags
unilab --app_bridges websocket fastapi # communication bridges
unilab --test_mode # simulate hardware, no real execution
unilab --check_mode # CI validation of registry imports (AST-based)
unilab --skip_env_check # skip auto-install of dependencies
unilab --visual rviz|web|disable # visualization mode
unilab --is_slave # run as slave node
unilab --restart_mode # auto-restart on config changes (supervisor/child process)
# Workflow upload subcommand
unilab workflow_upload -f <workflow.json> -n <name> --tags tag1 tag2
# Tests
pytest tests/ # all tests
pytest tests/resources/test_resourcetreeset.py # single test file
pytest tests/resources/test_resourcetreeset.py::TestClassName::test_method # single test
# CI check (matches .github/workflows/ci-check.yml)
python -m unilabos --check_mode --skip_env_check
```
## Architecture
### Startup Flow
`unilab` CLI (entry point in `setup.py`) → `unilabos/app/main.py:main()` → loads config → builds registry → reads device graph (JSON/GraphML) → starts backend thread (ROS2/simple) → starts FastAPI web server + WebSocket client.
### Core Layers
**Registry** (`unilabos/registry/`): Singleton `Registry` class discovers and catalogs all device types, resource types, and communication devices. Two registration mechanisms: YAML definitions in `registry/devices/*.yaml` and Python decorators (`@device`, `@action`, `@resource` in `registry/decorators.py`). AST scanning discovers decorated classes without importing them. Class paths resolved to Python classes via `utils/import_manager.py`.
**Resource Tracking** (`unilabos/resources/resource_tracker.py`): Pydantic-based `ResourceDict``ResourceDictInstance``ResourceTreeSet` hierarchy. `ResourceTreeSet` is the canonical in-memory representation of all devices and resources. Graph I/O in `resources/graphio.py` reads JSON/GraphML device topology files into `nx.Graph` + `ResourceTreeSet`.
**Device Drivers** (`unilabos/devices/`): 30+ hardware drivers organized by category (liquid_handling, hplc, balance, arm, etc.). Each driver class gets wrapped by `ros/device_node_wrapper.py:ros2_device_node()` into a `ROS2DeviceNode` (defined in `ros/nodes/base_device_node.py`) with publishers, subscribers, and action servers.
**ROS2 Layer** (`unilabos/ros/`): Preset node types in `ros/nodes/presets/``host_node` (main orchestrator, ~90KB), `controller_node`, `workstation`, `serial_node`, `camera`, `resource_mesh_manager`. Custom messages in `unilabos_msgs/` (80+ action types, pre-built via conda `ros-humble-unilabos-msgs`).
**Protocol Compilation** (`unilabos/compile/`): 20+ protocol compilers (add, centrifuge, dissolve, filter, heatchill, stir, pump, etc.) registered in `__init__.py:action_protocol_generators` dict. Utility parsers in `compile/utils/` (vessel, unit, logger).
**Workflow** (`unilabos/workflow/`): Converts workflow definitions from multiple formats — JSON (`convert_from_json.py`, `common.py`), Python scripts (`from_python_script.py`), XDL (`from_xdl.py`) — into executable `WorkflowGraph`. Legacy converters in `workflow/legacy/`.
**Communication** (`unilabos/device_comms/`): Hardware adapters — OPC-UA, Modbus PLC, RPC, universal driver. `app/communication.py` provides factory pattern for WebSocket connections.
**Web/API** (`unilabos/app/web/`): FastAPI server with REST API (`api.py`), Jinja2 templates (`pages.py`), HTTP client (`client.py`). Default port 8002.
### Configuration System
- **Config classes** in `unilabos/config/config.py`: `BasicConfig`, `WSConfig`, `HTTPConfig`, `ROSConfig` — class-level attributes, loaded from Python `.py` config files (see `config/example_config.py`)
- Environment variable overrides with prefix `UNILABOS_` (e.g., `UNILABOS_BASICCONFIG_PORT=9000`)
- Device topology defined in graph files (JSON node-link format or GraphML)
### Key Data Flow
1. Graph file → `graphio.read_node_link_json()``(nx.Graph, ResourceTreeSet, resource_links)`
2. `ResourceTreeSet` + `Registry``initialize_device.initialize_device_from_dict()``ROS2DeviceNode` instances
3. Device nodes communicate via ROS2 topics/actions or direct Python calls (simple backend)
4. Cloud sync via WebSocket (`app/ws_client.py`) and HTTP (`app/web/client.py`)
### Test Data
Example device graphs and experiment configs are in `unilabos/test/experiments/` (not `tests/`). Registry test fixtures in `unilabos/test/registry/`.
## Code Conventions
- Code comments and log messages in **simplified Chinese**
- Python 3.11+, type hints expected
- Pydantic models for data validation (`resource_tracker.py`)
- Singleton pattern via `@singleton` decorator (`utils/decorator.py`)
- Dynamic class loading via `utils/import_manager.py` — device classes resolved at runtime from registry YAML paths
- CLI argument dashes auto-converted to underscores for consistency
- No linter/formatter configuration enforced (no ruff, black, flake8, mypy configs)
- Documentation built with Sphinx (Chinese language, `sphinx_rtd_theme`, `myst_parser`)
## Licensing
- Framework code: GPL-3.0
- Device drivers (`unilabos/devices/`): DP Technology Proprietary License — do not redistribute

File diff suppressed because it is too large Load Diff

View File

@@ -201,42 +201,17 @@ class ResourceVisualization:
self.moveit_controllers_yaml['moveit_simple_controller_manager'][f"{name}_{controller_name}"] = moveit_dict['moveit_simple_controller_manager'][controller_name] self.moveit_controllers_yaml['moveit_simple_controller_manager'][f"{name}_{controller_name}"] = moveit_dict['moveit_simple_controller_manager'][controller_name]
@staticmethod
def _ensure_ros2_env() -> dict:
"""确保 ROS2 环境变量正确设置,返回可用于子进程的 env dict"""
import sys
env = dict(os.environ)
conda_prefix = os.path.dirname(os.path.dirname(sys.executable))
if "AMENT_PREFIX_PATH" not in env or not env["AMENT_PREFIX_PATH"].strip():
candidate = os.pathsep.join([conda_prefix, os.path.join(conda_prefix, "Library")])
env["AMENT_PREFIX_PATH"] = candidate
os.environ["AMENT_PREFIX_PATH"] = candidate
extra_bin_dirs = [
os.path.join(conda_prefix, "Library", "bin"),
os.path.join(conda_prefix, "Library", "lib"),
os.path.join(conda_prefix, "Scripts"),
conda_prefix,
]
current_path = env.get("PATH", "")
for d in extra_bin_dirs:
if d not in current_path:
current_path = d + os.pathsep + current_path
env["PATH"] = current_path
os.environ["PATH"] = current_path
return env
def create_launch_description(self) -> LaunchDescription: def create_launch_description(self) -> LaunchDescription:
""" """
创建launch描述包含robot_state_publisher和move_group节点 创建launch描述包含robot_state_publisher和move_group节点
Args:
urdf_str: URDF文本
Returns: Returns:
LaunchDescription: launch描述对象 LaunchDescription: launch描述对象
""" """
launch_env = self._ensure_ros2_env() # 检查ROS 2环境变量
if "AMENT_PREFIX_PATH" not in os.environ: if "AMENT_PREFIX_PATH" not in os.environ:
raise OSError( raise OSError(
"ROS 2环境未正确设置。需要设置 AMENT_PREFIX_PATH 环境变量。\n" "ROS 2环境未正确设置。需要设置 AMENT_PREFIX_PATH 环境变量。\n"
@@ -315,7 +290,7 @@ class ResourceVisualization:
{"robot_description": robot_description}, {"robot_description": robot_description},
ros2_controllers, ros2_controllers,
], ],
env=launch_env, env=dict(os.environ)
) )
) )
for controller in self.moveit_controllers_yaml['moveit_simple_controller_manager']['controller_names']: for controller in self.moveit_controllers_yaml['moveit_simple_controller_manager']['controller_names']:
@@ -325,7 +300,7 @@ class ResourceVisualization:
executable="spawner", executable="spawner",
arguments=[f"{controller}", "--controller-manager", f"controller_manager"], arguments=[f"{controller}", "--controller-manager", f"controller_manager"],
output="screen", output="screen",
env=launch_env, env=dict(os.environ)
) )
) )
controllers.append( controllers.append(
@@ -334,7 +309,7 @@ class ResourceVisualization:
executable="spawner", executable="spawner",
arguments=["joint_state_broadcaster", "--controller-manager", f"controller_manager"], arguments=["joint_state_broadcaster", "--controller-manager", f"controller_manager"],
output="screen", output="screen",
env=launch_env, env=dict(os.environ)
) )
) )
for i in controllers: for i in controllers:
@@ -342,6 +317,7 @@ class ResourceVisualization:
else: else:
ros2_controllers = None ros2_controllers = None
# 创建robot_state_publisher节点
robot_state_publisher = nd( robot_state_publisher = nd(
package='robot_state_publisher', package='robot_state_publisher',
executable='robot_state_publisher', executable='robot_state_publisher',
@@ -351,8 +327,9 @@ class ResourceVisualization:
'robot_description': robot_description, 'robot_description': robot_description,
'use_sim_time': False 'use_sim_time': False
}, },
# kinematics_dict
], ],
env=launch_env, env=dict(os.environ)
) )
@@ -384,7 +361,7 @@ class ResourceVisualization:
executable='move_group', executable='move_group',
output='screen', output='screen',
parameters=moveit_params, parameters=moveit_params,
env=launch_env, env=dict(os.environ)
) )
@@ -402,11 +379,13 @@ class ResourceVisualization:
arguments=['-d', f"{str(self.mesh_path)}/view_robot.rviz"], arguments=['-d', f"{str(self.mesh_path)}/view_robot.rviz"],
output='screen', output='screen',
parameters=[ parameters=[
{'robot_description_kinematics': kinematics_dict}, {'robot_description_kinematics': kinematics_dict,
},
robot_description_planning, robot_description_planning,
planning_pipelines, planning_pipelines,
], ],
env=launch_env, env=dict(os.environ)
) )
self.launch_description.add_action(rviz_node) self.launch_description.add_action(rviz_node)

File diff suppressed because it is too large Load Diff

View File

@@ -87,31 +87,19 @@ class MatrixInfo(TypedDict):
WorkTablets: list[WorkTablets] WorkTablets: list[WorkTablets]
def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
extra = getattr(resource, "unilabos_extra", {}) or {}
site = extra.get("update_resource_site", "")
if site:
digits = "".join(c for c in str(site) if c.isdigit())
return int(digits) if digits else None
loc = getattr(resource, "location", None)
if loc is not None and loc.x is not None and loc.y is not None:
col = round((loc.x - 5) / 137.5)
row = round(3 - (loc.y - 13) / 96)
idx = row * 4 + col
if 0 <= idx < 16:
return idx + 1
return None
class PRCXI9300Deck(Deck): class PRCXI9300Deck(Deck):
"""PRCXI 9300 的专用 Deck 类,继承自 Deck。 """PRCXI 9300 的专用 Deck 类,继承自 Deck。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。 该类定义了 PRCXI 9300 的工作台布局和槽位信息。
""" """
_9320_SITE_POSITIONS = [((i%4)*137.5+5, (3-int(i/4))*96+13, 0) for i in range(0, 16)] # 9320: 4列×4行 = 16 slotsY轴从上往下递减, T1在左上角
_9320_SITE_POSITIONS = [
(0, 288, 0), (138, 288, 0), (276, 288, 0), (414, 288, 0), # T1-T4 (第1行, 最上)
(0, 192, 0), (138, 192, 0), (276, 192, 0), (414, 192, 0), # T5-T8 (第2行)
(0, 96, 0), (138, 96, 0), (276, 96, 0), (414, 96, 0), # T9-T12 (第3行)
(0, 0, 0), (138, 0, 0), (276, 0, 0), (414, 0, 0), # T13-T16 (第4行, 最下)
]
# 9300: 3列×2行 = 6 slots间距与9320相同X: 138mm, Y: 96mm # 9300: 3列×2行 = 6 slots间距与9320相同X: 138mm, Y: 96mm
_9300_SITE_POSITIONS = [ _9300_SITE_POSITIONS = [
@@ -125,13 +113,15 @@ class PRCXI9300Deck(Deck):
_DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "plates", "tip_racks", "tube_rack", "adaptor", "plateadapter", "module", "trash"] _DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "plates", "tip_racks", "tube_rack", "adaptor", "plateadapter", "module", "trash"]
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
sites: Optional[List[Dict[str, Any]]] = None, **kwargs): sites: Optional[List[Dict[str, Any]]] = None, model: str = "9320", **kwargs):
super().__init__( size_x, size_y, size_z, name=name) super().__init__(size_x, size_y, size_z, name)
self.model = model
if sites is not None: if sites is not None:
self.sites: List[Dict[str, Any]] = [dict(s) for s in sites] self.sites: List[Dict[str, Any]] = [dict(s) for s in sites]
else: else:
positions = self._9300_SITE_POSITIONS if model == "9300" else self._9320_SITE_POSITIONS
self.sites = [] self.sites = []
for i, (x, y, z) in enumerate(self._DEFAULT_SITE_POSITIONS): for i, (x, y, z) in enumerate(positions):
self.sites.append({ self.sites.append({
"label": f"T{i + 1}", "label": f"T{i + 1}",
"visible": True, "visible": True,
@@ -143,7 +133,6 @@ class PRCXI9300Deck(Deck):
self._ordering = collections.OrderedDict( self._ordering = collections.OrderedDict(
(site["label"], None) for site in self.sites (site["label"], None) for site in self.sites
) )
self.root = self.get_root()
def _get_site_location(self, idx: int) -> Coordinate: def _get_site_location(self, idx: int) -> Coordinate:
pos = self.sites[idx]["position"] pos = self.sites[idx]["position"]
@@ -186,10 +175,7 @@ class PRCXI9300Deck(Deck):
raise ValueError(f"No available site on deck '{self.name}' for resource '{resource.name}'") raise ValueError(f"No available site on deck '{self.name}' for resource '{resource.name}'")
if not reassign and self._get_site_resource(idx) is not None: if not reassign and self._get_site_resource(idx) is not None:
existing = self.root.get_resource(resource.name) raise ValueError(f"Site {idx} ('{self.sites[idx]['label']}') is already occupied")
if existing is not resource and existing.parent is not None:
existing.parent.unassign_child_resource(existing)
loc = self._get_site_location(idx) loc = self._get_site_location(idx)
super().assign_child_resource(resource, location=loc, reassign=reassign) super().assign_child_resource(resource, location=loc, reassign=reassign)
@@ -775,49 +761,23 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
simulator=False, simulator=False,
step_mode=False, step_mode=False,
matrix_id="", matrix_id="",
is_9320=False, is_9320=None,
start_rail=2,
rail_nums=4,
rail_interval=0,
x_increase = -0.003636,
y_increase = -0.003636,
x_offset = 9.2,
y_offset = -27.98,
deck_z = 300,
deck_y = 400,
rail_width=27.5,
xy_coupling = -0.0045,
): ):
self.deck_x = (start_rail + rail_nums*5 + (rail_nums-1)*rail_interval) * rail_width
self.deck_y = deck_y
self.deck_z = deck_z
self.x_increase = x_increase
self.y_increase = y_increase
self.x_offset = x_offset
self.y_offset = y_offset
self.xy_coupling = xy_coupling
self.left_2_claw = Coordinate(-130.2, 34, -134)
self.right_2_left = Coordinate(22,-1, 8)
plate_positions = []
tablets_info = [] tablets_info = []
for site_id in range(len(deck.sites)):
child = deck._get_site_resource(site_id)
# 如果放其他类型的物料,是不可以的
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
number = site_id + 1
tablets_info.append(
WorkTablets(
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]
)
)
if is_9320 is None: if is_9320 is None:
is_9320 = getattr(deck, 'model', '9300') == '9320' is_9320 = getattr(deck, 'model', '9300') == '9320'
if is_9320: if is_9320:
print("当前设备是9320") print("当前设备是9320")
else:
for site_id in range(len(deck.sites)):
child = deck._get_site_resource(site_id)
# 如果放其他类型的物料,是不可以的
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
number = site_id + 1
tablets_info.append(
WorkTablets(
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]
)
)
# 始终初始化 step_mode 属性 # 始终初始化 step_mode 属性
self.step_mode = False self.step_mode = False
if step_mode: if step_mode:
@@ -829,190 +789,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
tablets_info, host, port, timeout, channel_num, axis, setup, debug, matrix_id, is_9320 tablets_info, host, port, timeout, channel_num, axis, setup, debug, matrix_id, is_9320
) )
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num) super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
self._first_transfer_done = False
@staticmethod
def _get_slot_number(resource) -> Optional[int]:
"""从 resource 的 unilabos_extra["update_resource_site"](如 "T13")或位置反算槽位号。"""
return _get_slot_number(resource)
def _match_and_create_matrix(self):
"""首次 transfer_liquid 时,根据 deck 上的 resource 自动匹配耗材并创建 WorkTabletMatrix。"""
backend = self._unilabos_backend
api = backend.api_client
if backend.matrix_id:
return
material_list = api.get_all_materials()
if not material_list:
return
# 按 materialEnum 分组: {enum_value: [material, ...]}
material_dict = {}
material_uuid_map = {}
for m in material_list:
enum_key = m.get("materialEnum")
material_dict.setdefault(enum_key, []).append(m)
if "uuid" in m:
material_uuid_map[m["uuid"]] = m
work_tablets = []
slot_none = [i for i in range(1, 17)]
for child in self.deck.children:
resource = child
number = self._get_slot_number(resource)
if number is None:
continue
# 如果 resource 已有 Material UUID直接使用
if hasattr(resource, "_unilabos_state") and "Material" in getattr(resource, "_unilabos_state", {}):
mat_uuid = resource._unilabos_state["Material"].get("uuid")
if mat_uuid and mat_uuid in material_uuid_map:
work_tablets.append({"Number": number, "Material": material_uuid_map[mat_uuid]})
continue
# 根据 resource 类型推断 materialEnum
# MaterialEnum: Other=0, Tips=1, DeepWellPlate=2, PCRPlate=3, ELISAPlate=4, Reservoir=5, WasteBox=6
expected_enum = None
if isinstance(resource, PRCXI9300TipRack) or isinstance(resource, TipRack):
expected_enum = 1 # Tips
elif isinstance(resource, PRCXI9300Trash) or isinstance(resource, Trash):
expected_enum = 6 # WasteBox
elif isinstance(resource, (PRCXI9300Plate, Plate)):
expected_enum = None # Plate 可能是 DeepWellPlate/PCRPlate/ELISAPlate不限定
# 根据 expected_enum 筛选候选耗材列表
if expected_enum is not None:
candidates = material_dict.get(expected_enum, [])
else:
# expected_enum 未确定时,搜索所有耗材
candidates = material_list
# 根据 children 个数和容量匹配最相似的耗材
num_children = len(resource.children)
child_max_volume = None
if resource.children:
first_child = resource.children[0]
if hasattr(first_child, "max_volume") and first_child.max_volume is not None:
child_max_volume = first_child.max_volume
best_material = None
best_score = float("inf")
for material in candidates:
hole_count = (material.get("HoleRow", 0) or 0) * (material.get("HoleColum", 0) or 0)
material_volume = material.get("Volume", 0) or 0
# 孔数差异(高权重优先匹配孔数)
hole_diff = abs(num_children - hole_count)
# 容量差异(归一化)
if child_max_volume is not None and material_volume > 0:
vol_diff = abs(child_max_volume - material_volume) / material_volume
else:
vol_diff = 0
score = hole_diff * 1000 + vol_diff
if score < best_score:
best_score = score
best_material = material
if best_material:
work_tablets.append({"Number": number, "Material": best_material})
slot_none.remove(number)
if not work_tablets:
return
matrix_id = str(uuid.uuid4())
matrix_info = {
"MatrixId": matrix_id,
"MatrixName": matrix_id,
"WorkTablets": work_tablets +
[{"Number": number, "Material": {"uuid": "730067cf07ae43849ddf4034299030e9"}} for number in slot_none],
}
res = api.add_WorkTablet_Matrix(matrix_info)
if res.get("Success"):
backend.matrix_id = matrix_id
backend.matrix_info = matrix_info
# 重新计算所有槽位的位置(初始化时 deck 可能为空,此时才有资源)
pipetting_positions = []
plate_positions = []
for child in self.deck.children:
number = self._get_slot_number(child)
if number is None:
continue
pos = self.plr_pos_to_prcxi(child)
plate_positions.append({"Number": number, "XPos": pos.x, "YPos": pos.y, "ZPos": pos.z})
if child.children:
pip_pos = self.plr_pos_to_prcxi(child.children[0], self.left_2_claw)
else:
pip_pos = self.plr_pos_to_prcxi(child, Coordinate(-100, self.left_2_claw.y, self.left_2_claw.z))
half_x = child.get_size_x() / 2 * abs(1 + self.x_increase)
z_wall = child.get_size_z()
pipetting_positions.append({
"Number": number,
"XPos": pip_pos.x,
"YPos": pip_pos.y,
"ZPos": pip_pos.z,
"X_Left": half_x,
"X_Right": half_x,
"ZAgainstTheWall": pip_pos.z - z_wall,
"X2Pos": pip_pos.x + self.right_2_left.x,
"Y2Pos": pip_pos.y + self.right_2_left.y,
"Z2Pos": pip_pos.z + self.right_2_left.z,
"X2_Left": half_x,
"X2_Right": half_x,
"ZAgainstTheWall2": pip_pos.z - z_wall,
})
if pipetting_positions:
api.update_pipetting_position(matrix_id, pipetting_positions)
# 更新 backend 中的 plate_positions
backend.plate_positions = plate_positions
if plate_positions:
api.update_clamp_jaw_position(matrix_id, plate_positions)
print(f"Auto-matched materials and created matrix: {matrix_id}")
else:
raise PRCXIError(f"Failed to create auto-matched matrix: {res.get('Message', 'Unknown error')}")
def plr_pos_to_prcxi(self, resource: Resource, offset: Coordinate = Coordinate(0, 0, 0)):
z_pos = 'c'
if isinstance(resource, Tip):
z_pos = 'b'
resource_pos = resource.get_absolute_location(x="c",y="c",z=z_pos)
x = resource_pos.x
y = resource_pos.y
z = resource_pos.z
# 如果z等于0则递归resource.parent的高度并向z加使用get_size_z方法
parent = resource.parent
res_z = resource.location.z
while not isinstance(parent, LiquidHandlerAbstract) and (res_z == 0) and parent is not None:
z += parent.get_size_z()
res_z = parent.location.z
parent = getattr(parent, "parent", None)
prcxi_x = (self.deck_x - x)*(1+self.x_increase) + self.x_offset + self.xy_coupling * (self.deck_y - y)
prcxi_y = (self.deck_y - y)*(1+self.y_increase) + self.y_offset
prcxi_z = self.deck_z - z
prcxi_x = min(max(0, prcxi_x+offset.x),self.deck_x)
prcxi_y = min(max(0, prcxi_y+offset.y),self.deck_y)
prcxi_z = min(max(0, prcxi_z+offset.z),self.deck_z)
return Coordinate(prcxi_x, prcxi_y, prcxi_z)
def post_init(self, ros_node: BaseROS2DeviceNode): def post_init(self, ros_node: BaseROS2DeviceNode):
super().post_init(ros_node) super().post_init(ros_node)
@@ -1044,8 +820,8 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
): ):
self._unilabos_backend.create_protocol(protocol_name) self._unilabos_backend.create_protocol(protocol_name)
async def run_protocol(self, protocol_id: str = None): async def run_protocol(self):
return self._unilabos_backend.run_protocol(protocol_id) return self._unilabos_backend.run_protocol()
async def remove_liquid( async def remove_liquid(
self, self,
@@ -1136,7 +912,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
touch_tip: bool = False, touch_tip: bool = False,
liquid_height: Optional[List[Optional[float]]] = None, liquid_height: Optional[List[Optional[float]]] = None,
blow_out_air_volume: Optional[List[Optional[float]]] = None, blow_out_air_volume: Optional[List[Optional[float]]] = None,
blow_out_air_volume_before: Optional[List[Optional[float]]] = None,
spread: Literal["wide", "tight", "custom"] = "wide", spread: Literal["wide", "tight", "custom"] = "wide",
is_96_well: bool = False, is_96_well: bool = False,
mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none", mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none",
@@ -1147,12 +922,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
delays: Optional[List[int]] = None, delays: Optional[List[int]] = None,
none_keys: List[str] = [], none_keys: List[str] = [],
) -> TransferLiquidReturn: ) -> TransferLiquidReturn:
if not self._first_transfer_done: return await super().transfer_liquid(
self._match_and_create_matrix()
self._first_transfer_done = True
if self.step_mode:
await self.create_protocol(f"transfer_liquid{time.time()}")
res = await super().transfer_liquid(
sources, sources,
targets, targets,
tip_racks, tip_racks,
@@ -1165,7 +935,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
touch_tip=touch_tip, touch_tip=touch_tip,
liquid_height=liquid_height, liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume, blow_out_air_volume=blow_out_air_volume,
blow_out_air_volume_before=blow_out_air_volume_before,
spread=spread, spread=spread,
is_96_well=is_96_well, is_96_well=is_96_well,
mix_stage=mix_stage, mix_stage=mix_stage,
@@ -1176,9 +945,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
delays=delays, delays=delays,
none_keys=none_keys, none_keys=none_keys,
) )
if self.step_mode:
await self.run_protocol()
return res
async def custom_delay(self, seconds=0, msg=None): async def custom_delay(self, seconds=0, msg=None):
return await super().custom_delay(seconds, msg) return await super().custom_delay(seconds, msg)
@@ -1195,10 +961,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
offsets: Optional[Coordinate] = None, offsets: Optional[Coordinate] = None,
mix_rate: Optional[float] = None, mix_rate: Optional[float] = None,
none_keys: List[str] = [], none_keys: List[str] = [],
use_channels: Optional[List[int]] = [0],
): ):
return await self._unilabos_backend.mix( return await self._unilabos_backend.mix(
targets, mix_time, mix_vol, height_to_bottom, offsets, mix_rate, none_keys, use_channels targets, mix_time, mix_vol, height_to_bottom, offsets, mix_rate, none_keys
) )
def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]: def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]:
@@ -1211,6 +976,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
offsets: Optional[List[Coordinate]] = None, offsets: Optional[List[Coordinate]] = None,
**backend_kwargs, **backend_kwargs,
): ):
if self.step_mode:
await self.create_protocol(f"单点动作{time.time()}")
await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
await self.run_protocol()
return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs) return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
async def aspirate( async def aspirate(
@@ -1362,24 +1131,6 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.debug = debug self.debug = debug
self.axis = "Left" self.axis = "Left"
@staticmethod
def _deck_plate_slot_no(plate, deck) -> int:
"""台面板位槽号116与 PRCXI9300Handler._get_slot_number 一致;无法解析时退回 deck 子项顺序 +1。"""
sn = PRCXI9300Handler._get_slot_number(plate)
if sn is not None:
return sn
return deck.children.index(plate) + 1
@staticmethod
def _resource_num_items_y(resource) -> int:
"""板/TipRack 等在 Y 向孔位数;无 ``num_items_y`` 或非正数时返回 1。"""
ny = getattr(resource, "num_items_y", None)
try:
n = int(ny) if ny is not None else 1
except (TypeError, ValueError):
n = 1
return n if n >= 1 else 1
async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool): async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool):
step = self.api_client.shaker_action( step = self.api_client.shaker_action(
time=time, time=time,
@@ -1431,40 +1182,26 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.protocol_name = protocol_name self.protocol_name = protocol_name
self.steps_todo_list = [] self.steps_todo_list = []
if not len(self.matrix_id): def run_protocol(self):
self.matrix_id = str(uuid.uuid4())
material_list = self.api_client.get_all_materials()
material_dict = {material["uuid"]: material for material in material_list}
work_tablets = []
for num, material_id in self.tablets_info.items():
work_tablets.append({
"Number": num,
"Material": material_dict[material_id]
})
self.matrix_info = {
"MatrixId": self.matrix_id,
"MatrixName": self.matrix_id,
"WorkTablets": work_tablets,
}
# print(json.dumps(self.matrix_info, indent=2))
res = self.api_client.add_WorkTablet_Matrix(self.matrix_info)
if not res["Success"]:
self.matrix_id = ""
raise AssertionError(f"Failed to create matrix: {res.get('Message', 'Unknown error')}")
print(f"PRCXI9300Backend created matrix with ID: {self.matrix_info['MatrixId']}, result: {res}")
def run_protocol(self, protocol_id: str = None):
assert self.is_reset_ok, "PRCXI9300Backend is not reset successfully. Please call setup() first." assert self.is_reset_ok, "PRCXI9300Backend is not reset successfully. Please call setup() first."
run_time = time.time() run_time = time.time()
if protocol_id == "" or protocol_id is None: self.matrix_info = MatrixInfo(
MatrixId=f"{int(run_time)}",
MatrixName=f"protocol_{run_time}",
MatrixCount=len(self.tablets_info),
WorkTablets=self.tablets_info,
)
# print(json.dumps(self.matrix_info, indent=2))
if not len(self.matrix_id):
res = self.api_client.add_WorkTablet_Matrix(self.matrix_info)
assert res["Success"], f"Failed to create matrix: {res.get('Message', 'Unknown error')}"
print(f"PRCXI9300Backend created matrix with ID: {self.matrix_info['MatrixId']}, result: {res}")
solution_id = self.api_client.add_solution( solution_id = self.api_client.add_solution(
f"protocol_{run_time}", self.matrix_id, self.steps_todo_list f"protocol_{run_time}", self.matrix_info["MatrixId"], self.steps_todo_list
) )
else: else:
solution_id = protocol_id print(f"PRCXI9300Backend using predefined worktable {self.matrix_id}, skipping matrix creation.")
solution_id = self.api_client.add_solution(f"protocol_{run_time}", self.matrix_id, self.steps_todo_list)
print(f"PRCXI9300Backend created solution with ID: {solution_id}") print(f"PRCXI9300Backend created solution with ID: {solution_id}")
self.api_client.load_solution(solution_id) self.api_client.load_solution(solution_id)
print(json.dumps(self.steps_todo_list, indent=2)) print(json.dumps(self.steps_todo_list, indent=2))
@@ -1507,9 +1244,6 @@ class PRCXI9300Backend(LiquidHandlerBackend):
else: else:
await asyncio.sleep(1) await asyncio.sleep(1)
print("PRCXI9300 reset successfully.") print("PRCXI9300 reset successfully.")
# self.api_client.update_clamp_jaw_position(self.matrix_id, self.plate_positions)
except ConnectionRefusedError as e: except ConnectionRefusedError as e:
raise RuntimeError( raise RuntimeError(
f"Failed to connect to PRCXI9300 API at {self.host}:{self.port}. " f"Failed to connect to PRCXI9300 API at {self.host}:{self.port}. "
@@ -1533,33 +1267,33 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right" axis = "Right"
else: else:
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
plate_slots = [] plate_indexes = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent.parent
plate_slots.append(self._deck_plate_slot_no(plate, deck)) plate_index = deck.children.index(plate.parent)
# print(f"Plate index: {plate_index}, Plate name: {plate.name}")
# print(f"Number of children in deck: {len(deck.children)}")
if len(set(plate_slots)) != 1: plate_indexes.append(plate_index)
raise ValueError("All pickups must be from the same plate (slot). Found different slots: " + str(plate_slots))
if len(set(plate_indexes)) != 1:
raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes))
_rack = ops[0].resource.parent
ny = self._resource_num_items_y(_rack)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All pickups must use tip racks with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // ny) tip_columns.append(tipspot_index // 8)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns) "All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
) )
PlateNo = plate_slots[0] PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self._num_channels == 1:
hole_row = tipspot_index % ny + 1 hole_row = tipspot_index % 8 + 1
step = self.api_client.Load( step = self.api_client.Load(
axis=axis, axis=axis,
@@ -1570,8 +1304,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}", plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers=f"{(hole_col - 1) * ny + hole_row}" if self._num_channels != 8 else "1,2,3,4,5", hole_numbers=f"{(hole_col - 1) * 8 + hole_row}" if self._num_channels == 1 else "1,2,3,4,5",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1589,9 +1323,8 @@ class PRCXI9300Backend(LiquidHandlerBackend):
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
# 检查trash # # 检查trash #
if ops[0].resource.name == "trash": if ops[0].resource.name == "trash":
_plate = ops[0].resource
_deck = _plate.parent PlateNo = ops[0].resource.parent.parent.children.index(ops[0].resource.parent) + 1
PlateNo = self._deck_plate_slot_no(_plate, _deck)
step = self.api_client.UnLoad( step = self.api_client.UnLoad(
axis=axis, axis=axis,
@@ -1609,35 +1342,32 @@ class PRCXI9300Backend(LiquidHandlerBackend):
return return
# print(ops[0].resource.parent.children.index(ops[0].resource)) # print(ops[0].resource.parent.children.index(ops[0].resource))
plate_slots = [] plate_indexes = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent.parent
plate_slots.append(self._deck_plate_slot_no(plate, deck)) plate_index = deck.children.index(plate.parent)
if len(set(plate_slots)) != 1: plate_indexes.append(plate_index)
if len(set(plate_indexes)) != 1:
raise ValueError( raise ValueError(
"All drop_tips must be from the same plate (slot). Found different slots: " + str(plate_slots) "All drop_tips must be from the same plate. Found different plates: " + str(plate_indexes)
) )
_rack = ops[0].resource.parent
ny = self._resource_num_items_y(_rack)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All drop_tips must use tip racks with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // ny) tip_columns.append(tipspot_index // 8)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All drop_tips must be from the same tip column. Found different columns: " + str(tip_columns) "All drop_tips must be from the same tip column. Found different columns: " + str(tip_columns)
) )
PlateNo = plate_slots[0] PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1
if self.num_channels != 8: if self.channel_num == 1:
hole_row = tipspot_index % ny + 1 hole_row = tipspot_index % 8 + 1
step = self.api_client.UnLoad( step = self.api_client.UnLoad(
axis=axis, axis=axis,
@@ -1648,7 +1378,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}", plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1662,43 +1392,34 @@ class PRCXI9300Backend(LiquidHandlerBackend):
offsets: Optional[Coordinate] = None, offsets: Optional[Coordinate] = None,
mix_rate: Optional[float] = None, mix_rate: Optional[float] = None,
none_keys: List[str] = [], none_keys: List[str] = [],
use_channels: Optional[List[int]] = [0],
): ):
"""Mix liquid in the specified resources.""" """Mix liquid in the specified resources."""
if use_channels == [0]:
axis = "Left" plate_indexes = []
elif use_channels == [1]:
axis = "Right"
else:
raise ValueError("Invalid use channels: " + str(use_channels))
plate_slots = []
for op in targets: for op in targets:
deck = op.parent.parent.parent deck = op.parent.parent.parent
plate = op.parent plate = op.parent
plate_slots.append(self._deck_plate_slot_no(plate, deck)) plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_slots)) != 1: if len(set(plate_indexes)) != 1:
raise ValueError("All mix targets must be from the same plate (slot). Found different slots: " + str(plate_slots)) raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes))
_plate0 = targets[0].parent
ny = self._resource_num_items_y(_plate0)
tip_columns = [] tip_columns = []
for op in targets: for op in targets:
if self._resource_num_items_y(op.parent) != ny:
raise ValueError("All mix targets must be on plates with the same num_items_y")
tipspot_index = op.parent.children.index(op) tipspot_index = op.parent.children.index(op)
tip_columns.append(tipspot_index // ny) tip_columns.append(tipspot_index // 8)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All mix targets must be in the same column group. Found different columns: " + str(tip_columns) "All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
) )
PlateNo = plate_slots[0] PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self.num_channels == 1:
hole_row = tipspot_index % ny + 1 hole_row = tipspot_index % 8 + 1
assert mix_time > 0 assert mix_time > 0
step = self.api_client.Blending( step = self.api_client.Blending(
@@ -1709,7 +1430,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=mix_time, blending_times=mix_time,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}", plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1726,39 +1447,36 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right" axis = "Right"
else: else:
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
plate_slots = [] plate_indexes = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent.parent
plate_slots.append(self._deck_plate_slot_no(plate, deck)) plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_slots)) != 1: if len(set(plate_indexes)) != 1:
raise ValueError("All aspirate must be from the same plate (slot). Found different slots: " + str(plate_slots)) raise ValueError("All pickups must be from the same plate. Found different plates: " + str(plate_indexes))
_plate0 = ops[0].resource.parent
ny = self._resource_num_items_y(_plate0)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All aspirate wells must be on plates with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // ny) tip_columns.append(tipspot_index // 8)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
"All aspirate must be from the same tip column. Found different columns: " + str(tip_columns) "All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
) )
volumes = [op.volume for op in ops] volumes = [op.volume for op in ops]
if len(set(volumes)) != 1: if len(set(volumes)) != 1:
raise ValueError("All aspirate volumes must be the same. Found different volumes: " + str(volumes)) raise ValueError("All aspirate volumes must be the same. Found different volumes: " + str(volumes))
PlateNo = plate_slots[0] PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self.num_channels == 1:
hole_row = tipspot_index % ny + 1 hole_row = tipspot_index % 8 + 1
step = self.api_client.Imbibing( step = self.api_client.Imbibing(
axis=axis, axis=axis,
@@ -1769,7 +1487,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}", plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -1786,24 +1504,21 @@ class PRCXI9300Backend(LiquidHandlerBackend):
axis = "Right" axis = "Right"
else: else:
raise ValueError("Invalid use channels: " + str(_use_channels)) raise ValueError("Invalid use channels: " + str(_use_channels))
plate_slots = [] plate_indexes = []
for op in ops: for op in ops:
plate = op.resource.parent plate = op.resource.parent
deck = plate.parent deck = plate.parent.parent
plate_slots.append(self._deck_plate_slot_no(plate, deck)) plate_index = deck.children.index(plate.parent)
plate_indexes.append(plate_index)
if len(set(plate_slots)) != 1: if len(set(plate_indexes)) != 1:
raise ValueError("All dispense must be from the same plate (slot). Found different slots: " + str(plate_slots)) raise ValueError("All dispense must be from the same plate. Found different plates: " + str(plate_indexes))
_plate0 = ops[0].resource.parent
ny = self._resource_num_items_y(_plate0)
tip_columns = [] tip_columns = []
for op in ops: for op in ops:
tipspot = op.resource tipspot = op.resource
if self._resource_num_items_y(tipspot.parent) != ny:
raise ValueError("All dispense wells must be on plates with the same num_items_y")
tipspot_index = tipspot.parent.children.index(tipspot) tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // ny) tip_columns.append(tipspot_index // 8)
if len(set(tip_columns)) != 1: if len(set(tip_columns)) != 1:
raise ValueError( raise ValueError(
@@ -1814,12 +1529,12 @@ class PRCXI9300Backend(LiquidHandlerBackend):
if len(set(volumes)) != 1: if len(set(volumes)) != 1:
raise ValueError("All dispense volumes must be the same. Found different volumes: " + str(volumes)) raise ValueError("All dispense volumes must be the same. Found different volumes: " + str(volumes))
PlateNo = plate_slots[0] PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1 hole_col = tip_columns[0] + 1
hole_row = 1 hole_row = 1
if self.num_channels != 8: if self.num_channels == 1:
hole_row = tipspot_index % ny + 1 hole_row = tipspot_index % 8 + 1
step = self.api_client.Tapping( step = self.api_client.Tapping(
axis=axis, axis=axis,
@@ -1830,7 +1545,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
hole_col=hole_col, hole_col=hole_col,
blending_times=0, blending_times=0,
balance_height=0, balance_height=0,
plate_or_hole=f"H{hole_col}-{ny},T{PlateNo}", plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8", hole_numbers="1,2,3,4,5,6,7,8",
) )
self.steps_todo_list.append(step) self.steps_todo_list.append(step)
@@ -2026,21 +1741,6 @@ class PRCXI9300Api:
"""GetWorkTabletMatrixById""" """GetWorkTabletMatrixById"""
return self.call("IMatrix", "GetWorkTabletMatrixById", [matrix_id]) return self.call("IMatrix", "GetWorkTabletMatrixById", [matrix_id])
def update_clamp_jaw_position(self, target_matrix_id: str, plate_positions: List[Dict[str, Any]]):
position_params = {
"MatrixId": target_matrix_id,
"WorkTablets": plate_positions
}
return self.call("IMatrix", "UpdateClampJawPosition", [position_params])
def update_pipetting_position(self, target_matrix_id: str, pipetting_positions: List[Dict[str, Any]]):
"""UpdatePipettingPosition - 更新移液位置"""
position_params = {
"MatrixId": target_matrix_id,
"WorkTablets": pipetting_positions
}
return self.call("IMatrix", "UpdatePipettingPosition", [position_params])
def add_WorkTablet_Matrix(self, matrix: MatrixInfo): def add_WorkTablet_Matrix(self, matrix: MatrixInfo):
return self.call("IMatrix", "AddWorkTabletMatrix2" if self.is_9320 else "AddWorkTabletMatrix", [matrix]) return self.call("IMatrix", "AddWorkTabletMatrix2" if self.is_9320 else "AddWorkTabletMatrix", [matrix])

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable, Dict, List, Optional, Tuple from typing import Optional
from pylabrobot.resources import Tube, Coordinate from pylabrobot.resources import Tube, Coordinate
from pylabrobot.resources.well import Well, WellBottomType, CrossSectionType from pylabrobot.resources.well import Well, WellBottomType, CrossSectionType
from pylabrobot.resources.tip import Tip, TipCreator from pylabrobot.resources.tip import Tip, TipCreator
@@ -839,101 +839,3 @@ def PRCXI_30mm_Adapter(name: str) -> PRCXI9300PlateAdapter:
"SupplyType": 2 "SupplyType": 2
} }
) )
# ---------------------------------------------------------------------------
# 协议上传 / workflow 用:与设备端耗材字典字段对齐的模板描述(供 common 自动匹配)
# ---------------------------------------------------------------------------
_PRCXI_TEMPLATE_SPECS_CACHE: Optional[List[Dict[str, Any]]] = None
def _probe_prcxi_resource(factory: Callable[..., Any]) -> Any:
probe = "__unilab_template_probe__"
if factory.__name__ == "PRCXI_trash":
return factory()
return factory(probe)
def _first_child_capacity_for_match(resource: Any) -> float:
"""Well max_volume 或 Tip 的 maximal_volume用于与设备端 Volume 类似的打分。"""
ch = getattr(resource, "children", None) or []
if not ch:
return 0.0
c0 = ch[0]
mv = getattr(c0, "max_volume", None)
if mv is not None:
return float(mv)
tip = getattr(c0, "tip", None)
if tip is not None:
mv2 = getattr(tip, "maximal_volume", None)
if mv2 is not None:
return float(mv2)
return 0.0
# (factory, kind) — 不含各类 Adapter避免与真实板子误匹配
PRCXI_TEMPLATE_FACTORY_KINDS: List[Tuple[Callable[..., Any], str]] = [
(PRCXI_BioER_96_wellplate, "plate"),
(PRCXI_nest_1_troughplate, "plate"),
(PRCXI_BioRad_384_wellplate, "plate"),
(PRCXI_AGenBio_4_troughplate, "plate"),
(PRCXI_nest_12_troughplate, "plate"),
(PRCXI_CellTreat_96_wellplate, "plate"),
(PRCXI_10ul_eTips, "tip_rack"),
(PRCXI_300ul_Tips, "tip_rack"),
(PRCXI_PCR_Plate_200uL_nonskirted, "plate"),
(PRCXI_PCR_Plate_200uL_semiskirted, "plate"),
(PRCXI_PCR_Plate_200uL_skirted, "plate"),
(PRCXI_trash, "trash"),
(PRCXI_96_DeepWell, "plate"),
(PRCXI_EP_Adapter, "tube_rack"),
(PRCXI_1250uL_Tips, "tip_rack"),
(PRCXI_10uL_Tips, "tip_rack"),
(PRCXI_1000uL_Tips, "tip_rack"),
(PRCXI_200uL_Tips, "tip_rack"),
(PRCXI_48_DeepWell, "plate"),
]
def get_prcxi_labware_template_specs() -> List[Dict[str, Any]]:
"""返回与 ``prcxi._match_and_create_matrix`` 中耗材字段兼容的模板列表,用于按孔数+容量打分。"""
global _PRCXI_TEMPLATE_SPECS_CACHE
if _PRCXI_TEMPLATE_SPECS_CACHE is not None:
return _PRCXI_TEMPLATE_SPECS_CACHE
out: List[Dict[str, Any]] = []
for factory, kind in PRCXI_TEMPLATE_FACTORY_KINDS:
try:
r = _probe_prcxi_resource(factory)
except Exception:
continue
nx = int(getattr(r, "num_items_x", None) or 0)
ny = int(getattr(r, "num_items_y", None) or 0)
nchild = len(getattr(r, "children", []) or [])
hole_count = nx * ny if nx > 0 and ny > 0 else nchild
hole_row = ny if nx > 0 and ny > 0 else 0
hole_col = nx if nx > 0 and ny > 0 else 0
mi = getattr(r, "material_info", None) or {}
vol = _first_child_capacity_for_match(r)
menum = mi.get("materialEnum")
if menum is None and kind == "tip_rack":
menum = 1
elif menum is None and kind == "trash":
menum = 6
out.append(
{
"class_name": factory.__name__,
"kind": kind,
"materialEnum": menum,
"HoleRow": hole_row,
"HoleColum": hole_col,
"Volume": vol,
"hole_count": hole_count,
"material_uuid": mi.get("uuid"),
"material_code": mi.get("Code"),
}
)
_PRCXI_TEMPLATE_SPECS_CACHE = out
return out

View File

@@ -59,7 +59,6 @@ class UniLiquidHandlerRvizBackend(LiquidHandlerBackend):
self.total_height = total_height self.total_height = total_height
self.joint_config = kwargs.get("joint_config", None) self.joint_config = kwargs.get("joint_config", None)
self.lh_device_id = kwargs.get("lh_device_id", "lh_joint_publisher") self.lh_device_id = kwargs.get("lh_device_id", "lh_joint_publisher")
self.simulate_rviz = kwargs.get("simulate_rviz", False)
if not rclpy.ok(): if not rclpy.ok():
rclpy.init() rclpy.init()
self.joint_state_publisher = None self.joint_state_publisher = None
@@ -70,7 +69,7 @@ class UniLiquidHandlerRvizBackend(LiquidHandlerBackend):
self.joint_state_publisher = LiquidHandlerJointPublisher( self.joint_state_publisher = LiquidHandlerJointPublisher(
joint_config=self.joint_config, joint_config=self.joint_config,
lh_device_id=self.lh_device_id, lh_device_id=self.lh_device_id,
simulate_rviz=self.simulate_rviz) simulate_rviz=True)
# 启动ROS executor # 启动ROS executor
self.executor = rclpy.executors.MultiThreadedExecutor() self.executor = rclpy.executors.MultiThreadedExecutor()

View File

@@ -42,7 +42,6 @@ class LiquidHandlerJointPublisher(Node):
while self.resource_action is None: while self.resource_action is None:
self.resource_action = self.check_tf_update_actions() self.resource_action = self.check_tf_update_actions()
time.sleep(1) time.sleep(1)
self.get_logger().info(f'Waiting for TfUpdate server: {self.resource_action}')
self.resource_action_client = ActionClient(self, SendCmd, self.resource_action) self.resource_action_client = ActionClient(self, SendCmd, self.resource_action)
while not self.resource_action_client.wait_for_server(timeout_sec=1.0): while not self.resource_action_client.wait_for_server(timeout_sec=1.0):

File diff suppressed because it is too large Load Diff

View File

@@ -51,7 +51,6 @@ def main(
bridges: List[Any] = [], bridges: List[Any] = [],
visual: str = "disable", visual: str = "disable",
resources_mesh_config: dict = {}, resources_mesh_config: dict = {},
resources_mesh_resource_list: list = [],
rclpy_init_args: List[str] = ["--log-level", "debug"], rclpy_init_args: List[str] = ["--log-level", "debug"],
discovery_interval: float = 15.0, discovery_interval: float = 15.0,
) -> None: ) -> None:
@@ -78,12 +77,12 @@ def main(
if visual != "disable": if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
# 优先使用从 main.py 传入的完整资源列表(包含所有子资源) # 将 ResourceTreeSet 转换为 list 用于 visual 组件
if resources_mesh_resource_list: resources_list = (
resources_list = resources_mesh_resource_list [node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes]
else: if resources_config
# fallback: 从 ResourceTreeSet 获取 else []
resources_list = [node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes] )
resource_mesh_manager = ResourceMeshManager( resource_mesh_manager = ResourceMeshManager(
resources_mesh_config, resources_mesh_config,
resources_list, resources_list,
@@ -91,7 +90,7 @@ def main(
device_id="resource_mesh_manager", device_id="resource_mesh_manager",
device_uuid=str(uuid.uuid4()), device_uuid=str(uuid.uuid4()),
) )
joint_republisher = JointRepublisher("joint_republisher","joint_republisher", host_node.resource_tracker) joint_republisher = JointRepublisher("joint_republisher", host_node.resource_tracker)
# lh_joint_pub = LiquidHandlerJointPublisher( # lh_joint_pub = LiquidHandlerJointPublisher(
# resources_config=resources_list, resource_tracker=host_node.resource_tracker # resources_config=resources_list, resource_tracker=host_node.resource_tracker
# ) # )
@@ -115,7 +114,6 @@ def slave(
bridges: List[Any] = [], bridges: List[Any] = [],
visual: str = "disable", visual: str = "disable",
resources_mesh_config: dict = {}, resources_mesh_config: dict = {},
resources_mesh_resource_list: list = [],
rclpy_init_args: List[str] = ["--log-level", "debug"], rclpy_init_args: List[str] = ["--log-level", "debug"],
) -> None: ) -> None:
"""从节点函数""" """从节点函数"""
@@ -210,12 +208,12 @@ def slave(
if visual != "disable": if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
# 优先使用从 main.py 传入的完整资源列表(包含所有子资源) # 将 ResourceTreeSet 转换为 list 用于 visual 组件
if resources_mesh_resource_list: resources_list = (
resources_list = resources_mesh_resource_list [node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes]
else: if resources_config
# fallback: 从 ResourceTreeSet 获取 else []
resources_list = [node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes] )
resource_mesh_manager = ResourceMeshManager( resource_mesh_manager = ResourceMeshManager(
resources_mesh_config, resources_mesh_config,
resources_list, resources_list,

View File

@@ -22,7 +22,6 @@ from unilabos_msgs.srv import (
SerialCommand, SerialCommand,
) # type: ignore ) # type: ignore
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unilabos_msgs.action import SendCmd
from unique_identifier_msgs.msg import UUID from unique_identifier_msgs.msg import UUID
from unilabos.registry.decorators import device, action, NodeType from unilabos.registry.decorators import device, action, NodeType
@@ -314,15 +313,9 @@ class HostNode(BaseROS2DeviceNode):
callback_group=self.callback_group, callback_group=self.callback_group,
), ),
} # 用来存储多个ActionClient实例 } # 用来存储多个ActionClient实例
self._add_resource_mesh_client = ActionClient(
self,
SendCmd,
"/devices/resource_mesh_manager/add_resource_mesh",
callback_group=self.callback_group,
)
self._action_value_mappings: Dict[str, Dict] = { self._action_value_mappings: Dict[str, Dict] = {
device_id: self._action_value_mappings device_id: self._action_value_mappings
} # device_id -> action_value_mappings(本地+远程设备统一存储) } # device_id -> action_value_mappings(本地+远程设备统一存储)
self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings) self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings)
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态 self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备 self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
@@ -1138,27 +1131,6 @@ class HostNode(BaseROS2DeviceNode):
), ),
} }
def _notify_resource_mesh_add(self, resource_tree_set: ResourceTreeSet):
"""通知 ResourceMeshManager 添加资源的 mesh 可视化"""
if not self._add_resource_mesh_client.server_is_ready():
self.lab_logger().debug("[Host Node] ResourceMeshManager 未就绪,跳过 mesh 添加通知")
return
resource_configs = []
for node in resource_tree_set.all_nodes:
res_dict = node.res_content.model_dump(by_alias=True)
if res_dict.get("type") == "device":
continue
resource_configs.append(res_dict)
if not resource_configs:
return
goal_msg = SendCmd.Goal()
goal_msg.command = json.dumps({"resources": resource_configs})
self._add_resource_mesh_client.send_goal_async(goal_msg)
self.lab_logger().info(f"[Host Node] 已发送 {len(resource_configs)} 个资源 mesh 添加请求")
async def _resource_tree_action_add_callback(self, data: dict, response: SerialCommand_Response): # OK async def _resource_tree_action_add_callback(self, data: dict, response: SerialCommand_Response): # OK
resource_tree_set = ResourceTreeSet.load(data["data"]) resource_tree_set = ResourceTreeSet.load(data["data"])
mount_uuid = data["mount_uuid"] mount_uuid = data["mount_uuid"]
@@ -1199,12 +1171,6 @@ class HostNode(BaseROS2DeviceNode):
response.response = json.dumps(uuid_mapping) if success else "FAILED" response.response = json.dumps(uuid_mapping) if success else "FAILED"
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}") self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
if success:
try:
self._notify_resource_mesh_add(resource_tree_set)
except Exception as e:
self.lab_logger().error(f"[Host Node] 通知 ResourceMeshManager 添加 mesh 失败: {e}")
async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK
uuid_list: List[str] = data["data"] uuid_list: List[str] = data["data"]
with_children: bool = data["with_children"] with_children: bool = data["with_children"]
@@ -1256,12 +1222,6 @@ class HostNode(BaseROS2DeviceNode):
response.response = json.dumps(uuid_mapping) response.response = json.dumps(uuid_mapping)
self.lab_logger().info(f"[Host Node-Resource] Resource tree update completed, success: {success}") self.lab_logger().info(f"[Host Node-Resource] Resource tree update completed, success: {success}")
if success:
try:
self._notify_resource_mesh_add(new_tree_set)
except Exception as e:
self.lab_logger().error(f"[Host Node] 通知 ResourceMeshManager 更新 mesh 失败: {e}")
async def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response): async def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response):
""" """
子节点通知Host物料树更新 子节点通知Host物料树更新

View File

@@ -23,32 +23,17 @@ from unilabos_msgs.action import SendCmd
from rclpy.action.server import ServerGoalHandle from rclpy.action.server import ServerGoalHandle
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode,DeviceNodeResourceTracker from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode,DeviceNodeResourceTracker
from unilabos.resources.graphio import initialize_resources from unilabos.resources.graphio import initialize_resources
from unilabos.resources.resource_tracker import EXTRA_CLASS
from unilabos.registry.registry import lab_registry from unilabos.registry.registry import lab_registry
class ResourceMeshManager(BaseROS2DeviceNode): class ResourceMeshManager(BaseROS2DeviceNode):
def __init__( def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", registry_name: str = "", rate=50, **kwargs):
self,
resource_model: Optional[dict] = None,
resource_config: Optional[list] = None,
resource_tracker=None,
device_id: str = "resource_mesh_manager",
registry_name: str = "",
rate=50,
**kwargs,
):
"""初始化资源网格管理器节点 """初始化资源网格管理器节点
Args: Args:
resource_model: 资源模型字典(可选,为 None 时自动从 registry 构建) resource_model (dict): 资源模型字典,包含资源的3D模型信息
resource_config: 资源配置列表(可选,为 None 时启动后通过 ActionServer 或 load_from_resource_tree 加载) resource_config (dict): 资源配置字典,包含资源的配置信息
resource_tracker: 资源追踪器 device_id (str): 节点名称
device_id: 节点名称
rate: TF 发布频率
""" """
if resource_tracker is None:
resource_tracker = DeviceNodeResourceTracker()
super().__init__( super().__init__(
driver_instance=self, driver_instance=self,
device_id=device_id, device_id=device_id,
@@ -61,10 +46,8 @@ class ResourceMeshManager(BaseROS2DeviceNode):
device_uuid=kwargs.get("uuid", str(uuid.uuid4())), device_uuid=kwargs.get("uuid", str(uuid.uuid4())),
) )
self.resource_model = resource_model if resource_model is not None else {} self.resource_model = resource_model
self.resource_config_dict = ( self.resource_config_dict = {item['uuid']: item for item in resource_config}
{item['uuid']: item for item in resource_config} if resource_config else {}
)
self.move_group_ready = False self.move_group_ready = False
self.resource_tf_dict = {} self.resource_tf_dict = {}
self.tf_broadcaster = TransformBroadcaster(self) self.tf_broadcaster = TransformBroadcaster(self)
@@ -94,6 +77,7 @@ class ResourceMeshManager(BaseROS2DeviceNode):
callback_group=callback_group, callback_group=callback_group,
) )
# Create a service for applying the planning scene
self._apply_planning_scene_service = self.create_client( self._apply_planning_scene_service = self.create_client(
srv_type=ApplyPlanningScene, srv_type=ApplyPlanningScene,
srv_name="/apply_planning_scene", srv_name="/apply_planning_scene",
@@ -119,36 +103,27 @@ class ResourceMeshManager(BaseROS2DeviceNode):
AttachedCollisionObject, "/attached_collision_object", 0 AttachedCollisionObject, "/attached_collision_object", 0
) )
# 创建一个Action Server用于修改resource_tf_dict
self._action_server = ActionServer( self._action_server = ActionServer(
self, self,
SendCmd, SendCmd,
f"tf_update", f"tf_update",
self.tf_update, self.tf_update,
callback_group=callback_group, callback_group=callback_group
) )
# 创建一个Action Server用于添加新的资源模型与resource_tf_dict
self._add_resource_mesh_action_server = ActionServer( self._add_resource_mesh_action_server = ActionServer(
self, self,
SendCmd, SendCmd,
f"add_resource_mesh", f"add_resource_mesh",
self.add_resource_mesh_callback, self.add_resource_mesh_callback,
callback_group=callback_group, callback_group=callback_group
) )
self._reload_resource_mesh_action_server = ActionServer( self.resource_tf_dict = self.resource_mesh_setup(self.resource_config_dict)
self, self.create_timer(1/self.rate, self.publish_resource_tf)
SendCmd, self.create_timer(1/self.rate, self.check_resource_pose_changes)
f"reload_resource_mesh",
self._reload_resource_mesh_callback,
callback_group=callback_group,
)
if self.resource_config_dict:
self.resource_tf_dict = self.resource_mesh_setup(self.resource_config_dict)
else:
self.get_logger().info("未提供 resource_config将通过 ActionServer 或 load_from_resource_tree 加载")
self.create_timer(1 / self.rate, self.publish_resource_tf)
self.create_timer(1 / self.rate, self.check_resource_pose_changes)
def check_move_group_ready(self): def check_move_group_ready(self):
"""检查move_group节点是否已初始化完成""" """检查move_group节点是否已初始化完成"""
@@ -165,107 +140,10 @@ class ResourceMeshManager(BaseROS2DeviceNode):
self.add_resource_collision_meshes(self.resource_tf_dict) self.add_resource_collision_meshes(self.resource_tf_dict)
def _build_resource_model_for_config(self, resource_config_dict: dict): def add_resource_mesh_callback(self, goal_handle : ServerGoalHandle):
"""从 registry 中为给定的资源配置自动构建 resource_modelmesh 信息)"""
registry = lab_registry
for _uuid, res_cfg in resource_config_dict.items():
resource_id = res_cfg.get('id', '')
resource_class = res_cfg.get('class', '')
if not resource_class:
continue
if resource_class not in registry.resource_type_registry:
continue
reg_entry = registry.resource_type_registry[resource_class]
if 'model' not in reg_entry:
continue
model_config = reg_entry['model']
if model_config.get('type') != 'resource':
continue
if resource_id in self.resource_model:
continue
self.resource_model[resource_id] = {
'mesh': f"{str(self.mesh_path)}/device_mesh/resources/{model_config['mesh']}",
'mesh_tf': model_config['mesh_tf'],
}
if model_config.get('children_mesh') is not None:
self.resource_model[f"{resource_id}_"] = {
'mesh': f"{str(self.mesh_path)}/device_mesh/resources/{model_config['children_mesh']}",
'mesh_tf': model_config['children_mesh_tf'],
}
def load_from_resource_tree(self):
"""从 resource_tracker 中读取资源树,自动构建 resource_config_dict / resource_model 并刷新 TF"""
new_config_dict: dict = {}
def _collect_plr_resource(res, parent_uuid: Optional[str] = None):
res_uuid = getattr(res, 'unilabos_uuid', None)
if not res_uuid:
res_uuid = str(uuid.uuid4())
extra = getattr(res, 'unilabos_extra', {}) or {}
resource_class = extra.get(EXTRA_CLASS, '')
location = getattr(res, 'location', None)
pos_x = float(location.x) if location else 0.0
pos_y = float(location.y) if location else 0.0
pos_z = float(location.z) if location else 0.0
rotation = extra.get('rotation', {'x': 0, 'y': 0, 'z': 0})
new_config_dict[res_uuid] = {
'id': res.name,
'uuid': res_uuid,
'class': resource_class,
'parent_uuid': parent_uuid,
'pose': {
'position': {'x': pos_x, 'y': pos_y, 'z': pos_z},
'rotation': rotation,
},
}
for child in getattr(res, 'children', []) or []:
_collect_plr_resource(child, res_uuid)
for resource in self.resource_tracker.resources:
root_parent_uuid = None
plr_parent = getattr(resource, 'parent', None)
if plr_parent is not None:
root_parent_uuid = getattr(plr_parent, 'unilabos_uuid', None)
_collect_plr_resource(resource, root_parent_uuid)
if not new_config_dict:
self.get_logger().warning("resource_tracker 中没有找到任何资源")
return
self.resource_config_dict = {**self.resource_config_dict, **new_config_dict}
self._build_resource_model_for_config(new_config_dict)
tf_dict = self.resource_mesh_setup(new_config_dict)
self.resource_tf_dict = {**self.resource_tf_dict, **tf_dict}
self.publish_resource_tf()
if self.move_group_ready:
self.add_resource_collision_meshes(tf_dict)
self.get_logger().info(f"从资源树加载了 {len(new_config_dict)} 个资源")
def _reload_resource_mesh_callback(self, goal_handle: ServerGoalHandle):
"""ActionServer 回调:重新从资源树加载所有 mesh"""
try:
self.load_from_resource_tree()
except Exception as e:
self.get_logger().error(f"重新加载资源失败: {e}")
goal_handle.abort()
return SendCmd.Result(success=False)
goal_handle.succeed()
return SendCmd.Result(success=True)
def add_resource_mesh_callback(self, goal_handle: ServerGoalHandle):
tf_update_msg = goal_handle.request tf_update_msg = goal_handle.request
try: try:
parsed = json.loads(tf_update_msg.command.replace("'", '"')) self.add_resource_mesh(tf_update_msg.command)
if 'resources' in parsed:
for res_config in parsed['resources']:
self.add_resource_mesh(json.dumps(res_config))
else:
self.add_resource_mesh(tf_update_msg.command)
except Exception as e: except Exception as e:
self.get_logger().error(f"添加资源失败: {e}") self.get_logger().error(f"添加资源失败: {e}")
goal_handle.abort() goal_handle.abort()
@@ -273,48 +151,45 @@ class ResourceMeshManager(BaseROS2DeviceNode):
goal_handle.succeed() goal_handle.succeed()
return SendCmd.Result(success=True) return SendCmd.Result(success=True)
def add_resource_mesh(self, resource_config_str: str): def add_resource_mesh(self,resource_config_str:str):
"""添加单个资源的 mesh 配置""" """刷新资源配置"""
registry = lab_registry registry = lab_registry
resource_config = json.loads(resource_config_str.replace("'", '"')) resource_config = json.loads(resource_config_str.replace("'",'"'))
if resource_config['id'] in self.resource_config_dict: if resource_config['id'] in self.resource_config_dict:
self.get_logger().info(f'资源 {resource_config["id"]} 已存在') self.get_logger().info(f'资源 {resource_config["id"]} 已存在')
return return
resource_class = resource_config.get('class', '') if resource_config['class'] in registry.resource_type_registry.keys():
if resource_class and resource_class in registry.resource_type_registry: model_config = registry.resource_type_registry[resource_config['class']]['model']
reg_entry = registry.resource_type_registry[resource_class] if model_config['type'] == 'resource':
if 'model' in reg_entry: self.resource_model[resource_config['id']] = {
model_config = reg_entry['model'] 'mesh': f"{str(self.mesh_path)}/device_mesh/resources/{model_config['mesh']}",
if model_config.get('type') == 'resource': 'mesh_tf': model_config['mesh_tf']}
self.resource_model[resource_config['id']] = { if 'children_mesh' in model_config.keys():
'mesh': f"{str(self.mesh_path)}/device_mesh/resources/{model_config['mesh']}", self.resource_model[f"{resource_config['id']}_"] = {
'mesh_tf': model_config['mesh_tf'], 'mesh': f"{str(self.mesh_path)}/device_mesh/resources/{model_config['children_mesh']}",
'mesh_tf': model_config['children_mesh_tf']
} }
if model_config.get('children_mesh') is not None:
self.resource_model[f"{resource_config['id']}_"] = {
'mesh': f"{str(self.mesh_path)}/device_mesh/resources/{model_config['children_mesh']}",
'mesh_tf': model_config['children_mesh_tf'],
}
resources = initialize_resources([resource_config]) resources = initialize_resources([resource_config])
resource_dict = {item['id']: item for item in resources} resource_dict = {item['id']: item for item in resources}
self.resource_config_dict = {**self.resource_config_dict, **resource_dict} self.resource_config_dict = {**self.resource_config_dict,**resource_dict}
tf_dict = self.resource_mesh_setup(resource_dict) tf_dict = self.resource_mesh_setup(resource_dict)
self.resource_tf_dict = {**self.resource_tf_dict, **tf_dict} self.resource_tf_dict = {**self.resource_tf_dict,**tf_dict}
self.publish_resource_tf() self.publish_resource_tf()
self.add_resource_collision_meshes(tf_dict) self.add_resource_collision_meshes(tf_dict)
def resource_mesh_setup(self, resource_config_dict: dict):
"""根据资源配置字典设置 TF 关系""" def resource_mesh_setup(self, resource_config_dict:dict):
"""move_group初始化完成后的设置"""
self.get_logger().info('开始设置资源网格管理器') self.get_logger().info('开始设置资源网格管理器')
#遍历resource_config中的资源配置判断panent是否在resource_model中
resource_tf_dict = {} resource_tf_dict = {}
for resource_uuid, resource_config in resource_config_dict.items(): for resource_uuid, resource_config in resource_config_dict.items():
parent = None parent = None
resource_id = resource_config['id'] resource_id = resource_config['id']
parent_uuid = resource_config.get('parent_uuid') if resource_config['parent_uuid'] is not None and resource_config['parent_uuid'] != "":
if parent_uuid is not None and parent_uuid != "": parent = resource_config_dict[resource_config['parent_uuid']]['id']
parent_entry = resource_config_dict.get(parent_uuid) or self.resource_config_dict.get(parent_uuid)
parent = parent_entry['id'] if parent_entry else None
parent_link = 'world' parent_link = 'world'
if parent in self.resource_model: if parent in self.resource_model:

View File

@@ -8,8 +8,8 @@
"parent": "", "parent": "",
"pose": { "pose": {
"size": { "size": {
"width": 550, "width": 562,
"height": 400, "height": 394,
"depth": 0 "depth": 0
} }
}, },
@@ -55,9 +55,9 @@
}, },
"config": { "config": {
"type": "PRCXI9300Deck", "type": "PRCXI9300Deck",
"size_x": 550, "size_x": 542,
"size_y": 400, "size_y": 374,
"size_z": 17, "size_z": 0,
"rotation": { "rotation": {
"x": 0, "x": 0,
"y": 0, "y": 0,

View File

@@ -51,7 +51,6 @@
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
- 遍历 workflow 数组,为每个动作创建步骤节点 - 遍历 workflow 数组,为每个动作创建步骤节点
- 参数重命名: asp_vol -> asp_vols, dis_vol -> dis_vols, asp_flow_rate -> asp_flow_rates, dis_flow_rate -> dis_flow_rates - 参数重命名: asp_vol -> asp_vols, dis_vol -> dis_vols, asp_flow_rate -> asp_flow_rates, dis_flow_rate -> dis_flow_rates
- 参数输入转换: liquid_height按 wells 扩展mix_stage/mix_times/mix_vol/mix_rate/mix_liquid_height 保持标量
- 参数扩展: 根据 targets 的 wells 数量,将单值扩展为数组 - 参数扩展: 根据 targets 的 wells 数量,将单值扩展为数组
例: asp_vol=100.0, targets 有 3 个 wells -> asp_vols=[100.0, 100.0, 100.0] 例: asp_vol=100.0, targets 有 3 个 wells -> asp_vols=[100.0, 100.0, 100.0]
- 连接处理: 如果 sources/targets 已通过 set_liquid_from_plate 连接,参数值改为 [] - 连接处理: 如果 sources/targets 已通过 set_liquid_from_plate 连接,参数值改为 []
@@ -120,14 +119,11 @@ DEVICE_NAME_DEFAULT = "PRCXI" # transfer_liquid, set_liquid_from_plate 等动
# 节点类型 # 节点类型
NODE_TYPE_DEFAULT = "ILab" # 所有节点的默认类型 NODE_TYPE_DEFAULT = "ILab" # 所有节点的默认类型
CLASS_NAMES_MAPPING = {
"plate": "PRCXI_BioER_96_wellplate",
"tip_rack": "PRCXI_300ul_Tips",
}
# create_resource 节点默认参数 # create_resource 节点默认参数
CREATE_RESOURCE_DEFAULTS = { CREATE_RESOURCE_DEFAULTS = {
"device_id": "/PRCXI", "device_id": "/PRCXI",
"parent_template": "/PRCXI/PRCXI_Deck", "parent_template": "/PRCXI/PRCXI_Deck",
"class_name": "PRCXI_BioER_96_wellplate",
} }
# 默认液体体积 (uL) # 默认液体体积 (uL)
@@ -142,263 +138,6 @@ PARAM_RENAME_MAPPING = {
} }
def _map_deck_slot(raw_slot: str, object_type: str = "") -> str:
"""协议槽位 -> 实际 deck4→138→1412+trash→16其余不变。"""
s = "" if raw_slot is None else str(raw_slot).strip()
if not s:
return ""
if s == "12" and (object_type or "").strip().lower() == "trash":
return "16"
return {"4": "13", "8": "14"}.get(s, s)
def _labware_def_index(labware_defs: Optional[List[Dict[str, Any]]]) -> Dict[str, Dict[str, Any]]:
m: Dict[str, Dict[str, Any]] = {}
for d in labware_defs or []:
for k in ("id", "name", "reagent_id", "reagent"):
key = d.get(k)
if key is not None and str(key):
m[str(key)] = d
return m
def _labware_hint_text(labware_id: str, item: Dict[str, Any]) -> str:
"""合并 id 与协议里的 labware 描述OpenTrons 全名常在 labware 字段)。"""
parts = [str(labware_id), str(item.get("labware", "") or "")]
return " ".join(parts).lower()
def _infer_reagent_kind(labware_id: str, item: Dict[str, Any]) -> str:
ot = (item.get("object") or "").strip().lower()
if ot == "trash":
return "trash"
if ot == "tiprack":
return "tip_rack"
lid = _labware_hint_text(labware_id, item)
if "trash" in lid:
return "trash"
# tiprack / tip + rack顺序在 tuberack 之前)
if "tiprack" in lid or ("tip" in lid and "rack" in lid):
return "tip_rack"
# 离心管架 / OpenTrons tuberack勿与 96 tiprack 混淆)
if "tuberack" in lid or "tube_rack" in lid:
return "tube_rack"
if "eppendorf" in lid and "rack" in lid:
return "tube_rack"
if "safelock" in lid and "rack" in lid:
return "tube_rack"
if "rack" in lid and "tip" not in lid:
return "tube_rack"
return "plate"
def _infer_tube_rack_num_positions(labware_id: str, item: Dict[str, Any]) -> int:
"""从 ``24_tuberack`` 等命名中解析孔位数;解析不到则默认 24与 PRCXI_EP_Adapter 4×6 一致)。"""
hint = _labware_hint_text(labware_id, item)
m = re.search(r"(\d+)_tuberack", hint)
if m:
return int(m.group(1))
m = re.search(r"tuberack[_\s]*(\d+)", hint)
if m:
return int(m.group(1))
m = re.search(r"(\d+)\s*[-_]?\s*pos(?:ition)?s?", hint)
if m:
return int(m.group(1))
return 96
def _tip_volume_hint(item: Dict[str, Any], labware_id: str) -> Optional[float]:
s = _labware_hint_text(labware_id, item)
for v in (1250, 1000, 300, 200, 10):
if f"{v}ul" in s or f"{v}μl" in s or f"{v}u" in s:
return float(v)
if f" {v} " in f" {s} ":
return float(v)
return None
def _volume_template_covers_requirement(template: Dict[str, Any], req: Optional[float], kind: str) -> bool:
"""有明确需求体积时,模板标称 Volume 必须 >= 需求;无 Volume 的模板不参与trash 除外)。"""
if kind == "trash":
return True
if req is None or req <= 0:
return True
mv = float(template.get("Volume") or 0)
if mv <= 0:
return False
return mv >= req
def _direct_labware_class_name(item: Dict[str, Any]) -> str:
"""仅用于 tip_rack 且 ``preserve_tip_rack_incoming_class=True````class_name``/``class`` 原样;否则 ``labware`` → ``lab_*``。"""
explicit = item.get("class_name") or item.get("class")
if explicit is not None and str(explicit).strip() != "":
return str(explicit).strip()
lw = str(item.get("labware", "") or "").strip()
if lw:
return f"lab_{lw.lower().replace('.', 'point').replace(' ', '_')}"
return ""
def _match_score_prcxi_template(
template: Dict[str, Any],
num_children: int,
child_max_volume: Optional[float],
) -> float:
"""孔数差主导;有需求体积且模板已满足 >= 时,余量比例 (模板-需求)/需求 越小越好(优先选刚好够的)。"""
hole_count = int(template.get("hole_count") or 0)
hole_diff = abs(num_children - hole_count)
material_volume = float(template.get("Volume") or 0)
req = child_max_volume
if req is not None and req > 0 and material_volume > 0:
vol_diff = (material_volume - req) / max(req, 1e-9)
elif material_volume > 0 and req is not None:
vol_diff = abs(float(req) - material_volume) / material_volume
else:
vol_diff = 0.0
return hole_diff * 1000 + vol_diff
def _apply_prcxi_labware_auto_match(
labware_info: Dict[str, Dict[str, Any]],
labware_defs: Optional[List[Dict[str, Any]]] = None,
*,
preserve_tip_rack_incoming_class: bool = True,
) -> None:
"""上传构建图前:按孔数+容量将 reagent 条目匹配到 ``prcxi_labware`` 注册类名,写入 ``prcxi_class_name``。
若给出需求体积,仅选用模板标称 Volume >= 该值的物料,并在满足条件的模板中选余量最小者。
``preserve_tip_rack_incoming_class=True``(默认)时:**仅 tip_rack** 不做模板匹配,类名由 ``class_name``/``class`` 或
``labware````lab_*``)直接给出;**plate / tube_rack / trash 等**仍按注册模板匹配。
``False`` 时 **全部**(含 tip_rack走模板匹配。"""
if not labware_info:
return
default_prcxi_tip_class = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
try:
from unilabos.devices.liquid_handling.prcxi.prcxi_labware import get_prcxi_labware_template_specs
except Exception:
return
templates = get_prcxi_labware_template_specs()
if not templates:
return
def_map = _labware_def_index(labware_defs)
for labware_id, item in labware_info.items():
if item.get("prcxi_class_name"):
continue
kind = _infer_reagent_kind(labware_id, item)
if preserve_tip_rack_incoming_class and kind == "tip_rack":
inc_s = _direct_labware_class_name(item)
if inc_s == default_prcxi_tip_class:
inc_s = ""
if inc_s:
item["prcxi_class_name"] = inc_s
continue
explicit = item.get("class_name") or item.get("class")
if explicit and str(explicit).startswith("PRCXI_"):
item["prcxi_class_name"] = str(explicit)
continue
extra = def_map.get(str(labware_id), {})
wells = item.get("well") or []
well_n = len(wells) if isinstance(wells, list) else 0
num_from_def = int(extra.get("num_wells") or extra.get("well_count") or item.get("num_wells") or 0)
if kind == "trash":
num_children = 0
elif kind == "tip_rack":
num_children = num_from_def if num_from_def > 0 else 96
elif kind == "tube_rack":
if num_from_def > 0:
num_children = num_from_def
elif well_n > 0:
num_children = well_n
else:
num_children = _infer_tube_rack_num_positions(labware_id, item)
else:
num_children = num_from_def if num_from_def > 0 else 96
child_max_volume = item.get("max_volume")
if child_max_volume is None:
child_max_volume = extra.get("max_volume")
try:
child_max_volume_f = float(child_max_volume) if child_max_volume is not None else None
except (TypeError, ValueError):
child_max_volume_f = None
if kind == "tip_rack" and child_max_volume_f is None:
child_max_volume_f = _tip_volume_hint(item, labware_id) or 300.0
candidates = [t for t in templates if t["kind"] == kind]
if not candidates:
continue
best = None
best_score = float("inf")
for t in candidates:
if kind != "trash" and int(t.get("hole_count") or 0) <= 0:
continue
if not _volume_template_covers_requirement(t, child_max_volume_f, kind):
continue
sc = _match_score_prcxi_template(t, num_children, child_max_volume_f)
if sc < best_score:
best_score = sc
best = t
if best:
item["prcxi_class_name"] = best["class_name"]
def _reconcile_slot_carrier_prcxi_class(
labware_info: Dict[str, Dict[str, Any]],
*,
preserve_tip_rack_incoming_class: bool = False,
) -> None:
"""同一 deck 槽位上多条 reagent 时,按载体类型优先级统一 ``prcxi_class_name``,避免先遍历到 96 板后槽位被错误绑定。
``preserve_tip_rack_incoming_class=True`` 时tip_rack 条目不参与同槽类名合并(不被覆盖、也不把 tip 类名扩散到同槽其它条目)。"""
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
for lid, item in labware_info.items():
ot = item.get("object", "") or ""
slot = _map_deck_slot(str(item.get("slot", "")), ot)
if not slot:
continue
by_slot.setdefault(str(slot), []).append((lid, item))
priority = {"trash": 0, "tube_rack": 1, "tip_rack": 2, "plate": 3}
for _slot, pairs in by_slot.items():
if len(pairs) < 2:
continue
def _rank(p: Tuple[str, Dict[str, Any]]) -> int:
return priority.get(_infer_reagent_kind(p[0], p[1]), 9)
pairs_sorted = sorted(pairs, key=_rank)
best_cls = None
for lid, it in pairs_sorted:
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
continue
c = it.get("prcxi_class_name")
if c:
best_cls = c
break
if not best_cls:
continue
for lid, it in pairs:
if preserve_tip_rack_incoming_class and _infer_reagent_kind(lid, it) == "tip_rack":
continue
it["prcxi_class_name"] = best_cls
# ---------------- Graph ---------------- # ---------------- Graph ----------------
@@ -624,77 +363,23 @@ def build_protocol_graph(
workstation_name: str, workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None, action_resource_mapping: Optional[Dict[str, str]] = None,
labware_defs: Optional[List[Dict[str, Any]]] = None, labware_defs: Optional[List[Dict[str, Any]]] = None,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph: ) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑 """统一的协议图构建函数,根据设备类型自动选择构建逻辑
Args: Args:
labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...} labware_info: reagent 信息字典,格式为 {name: {slot, well}, ...},用于 set_liquid 和 well 查找
protocol_steps: 协议步骤列表 protocol_steps: 协议步骤列表
workstation_name: 工作站名称 workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选 action_resource_mapping: action 到 resource_name 的映射字典,可选
labware_defs: 可选,``[{"id": "...", "num_wells": 96, "max_volume": 2200}, ...]`` 等,辅助 PRCXI 模板匹配 labware_defs: labware 定义列表,格式为 [{"name": "...", "slot": "1", "type": "lab_xxx"}, ...]
preserve_tip_rack_incoming_class: 默认 True 时**仅 tip_rack** 不跑模板匹配(类名由传入的 class/labware 决定);
**其它载体**仍按 PRCXI 模板匹配。False 时 **全部**(含 tip_rack都走模板匹配。
""" """
G = WorkflowGraph() G = WorkflowGraph()
resource_last_writer = {} # reagent_name -> "node_id:port" resource_last_writer = {} # reagent_name -> "node_id:port"
slot_to_create_resource = {} # slot -> create_resource node_id slot_to_create_resource = {} # slot -> create_resource node_id
_apply_prcxi_labware_auto_match(
labware_info,
labware_defs,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
_reconcile_slot_carrier_prcxi_class(
labware_info,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
protocol_steps = refactor_data(protocol_steps, action_resource_mapping) protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# ==================== 第一步:按 slot 去重创建 create_resource 节点 ==================== # ==================== 第一步:按 slot 创建 create_resource 节点 ====================
# 按槽聚合:同一 slot 多条 reagent 时不能只取遍历顺序第一条,否则 tip 的 prcxi_class_name / object 会被其它条目盖住
by_slot: Dict[str, List[Tuple[str, Dict[str, Any]]]] = {}
for labware_id, item in labware_info.items():
object_type = item.get("object", "") or ""
slot = _map_deck_slot(str(item.get("slot", "")), object_type)
if not slot:
continue
by_slot.setdefault(slot, []).append((labware_id, item))
slots_info: Dict[str, Dict[str, Any]] = {}
for slot, pairs in by_slot.items():
def _ot_tip(it: Dict[str, Any]) -> bool:
return str(it.get("object", "") or "").strip().lower() == "tiprack"
tip_pairs = [(lid, it) for lid, it in pairs if _ot_tip(it)]
chosen_lid = ""
chosen_item: Dict[str, Any] = {}
prcxi_val: Optional[str] = None
scan = tip_pairs if tip_pairs else pairs
for lid, it in scan:
c = it.get("prcxi_class_name")
if c:
chosen_lid, chosen_item, prcxi_val = lid, it, str(c)
break
if not chosen_lid and scan:
chosen_lid, chosen_item = scan[0]
pv = chosen_item.get("prcxi_class_name")
prcxi_val = str(pv) if pv else None
labware = str(chosen_item.get("labware", "") or "")
res_id = f"{labware}_slot_{slot}" if labware.strip() else f"{chosen_lid}_slot_{slot}"
res_id = res_id.replace(" ", "_")
slots_info[slot] = {
"labware": labware,
"res_id": res_id,
"labware_id": chosen_lid,
"object": chosen_item.get("object", "") or "",
"prcxi_class_name": prcxi_val,
}
# 创建 Group 节点,包含所有 create_resource 节点 # 创建 Group 节点,包含所有 create_resource 节点
group_node_id = str(uuid.uuid4()) group_node_id = str(uuid.uuid4())
G.add_node( G.add_node(
@@ -710,54 +395,41 @@ def build_protocol_graph(
param=None, param=None,
) )
trash_create_node_id = None # 记录 trash 的 create_resource 节点 # 直接使用 JSON 中的 labware 定义,每个 slot 一条记录type 即 class_name
res_index = 0
for lw in (labware_defs or []):
slot = str(lw.get("slot", ""))
if not slot or slot in slot_to_create_resource:
continue # 跳过空 slot 或已处理的 slot
# 为每个唯一的 slot 创建 create_resource 节点 lw_name = lw.get("name", f"slot {slot}")
for slot, info in slots_info.items(): lw_type = lw.get("type", CREATE_RESOURCE_DEFAULTS["class_name"])
res_id = f"plate_slot_{slot}"
res_index += 1
node_id = str(uuid.uuid4()) node_id = str(uuid.uuid4())
res_id = info["res_id"]
object_type = info.get("object", "") or ""
ot_lo = str(object_type).strip().lower()
matched = info.get("prcxi_class_name")
if ot_lo == "trash":
res_type_name = "PRCXI_trash"
elif matched:
res_type_name = matched
elif ot_lo == "tiprack":
if preserve_tip_rack_incoming_class:
lid = str(info.get("labware_id") or "").strip() or "tip_rack"
res_type_name = f"lab_{lid.lower().replace('.', 'point').replace(' ', '_')}"
else:
res_type_name = CLASS_NAMES_MAPPING.get("tip_rack", "PRCXI_300ul_Tips")
else:
res_type_name = f"lab_{info['labware'].lower().replace('.', 'point')}"
G.add_node( G.add_node(
node_id, node_id,
template_name="create_resource", template_name="create_resource",
resource_name="host_node", resource_name="host_node",
name=f"{res_type_name}_slot{slot}", name=lw_name,
description=f"Create plate on slot {slot}", description=f"Create {lw_name}",
lab_node_type="Labware", lab_node_type="Labware",
footer="create_resource-host_node", footer="create_resource-host_node",
device_name=DEVICE_NAME_HOST, device_name=DEVICE_NAME_HOST,
type=NODE_TYPE_DEFAULT, type=NODE_TYPE_DEFAULT,
parent_uuid=group_node_id, # 指向 Group 节点 parent_uuid=group_node_id,
minimized=True, # 折叠显示 minimized=True,
param={ param={
"res_id": res_id, "res_id": res_id,
"device_id": CREATE_RESOURCE_DEFAULTS["device_id"], "device_id": CREATE_RESOURCE_DEFAULTS["device_id"],
"class_name": res_type_name, "class_name": lw_type,
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot), "parent": CREATE_RESOURCE_DEFAULTS["parent_template"],
"bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0},
"slot_on_deck": slot, "slot_on_deck": slot,
}, },
) )
slot_to_create_resource[slot] = node_id slot_to_create_resource[slot] = node_id
if ot_lo == "tiprack":
resource_last_writer[info["labware_id"]] = f"{node_id}:labware"
if ot_lo == "trash":
trash_create_node_id = node_id
# create_resource 之间不需要 ready 连接
# ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ==================== # ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ====================
# 创建 Group 节点,包含所有 set_liquid_from_plate 节点 # 创建 Group 节点,包含所有 set_liquid_from_plate 节点
@@ -784,8 +456,7 @@ def build_protocol_graph(
if item.get("type") == "hardware": if item.get("type") == "hardware":
continue continue
object_type = item.get("object", "") or "" slot = str(item.get("slot", ""))
slot = _map_deck_slot(str(item.get("slot", "")), object_type)
wells = item.get("well", []) wells = item.get("well", [])
if not wells or not slot: if not wells or not slot:
continue continue
@@ -793,7 +464,6 @@ def build_protocol_graph(
# res_id 不能有空格 # res_id 不能有空格
res_id = str(labware_id).replace(" ", "_") res_id = str(labware_id).replace(" ", "_")
well_count = len(wells) well_count = len(wells)
liquid_volume = DEFAULT_LIQUID_VOLUME if object_type == "source" else 0
node_id = str(uuid.uuid4()) node_id = str(uuid.uuid4())
set_liquid_index += 1 set_liquid_index += 1
@@ -814,7 +484,7 @@ def build_protocol_graph(
"plate": [], # 通过连接传递 "plate": [], # 通过连接传递
"well_names": wells, # 孔位名数组,如 ["A1", "A3", "A5"] "well_names": wells, # 孔位名数组,如 ["A1", "A3", "A5"]
"liquid_names": [res_id] * well_count, "liquid_names": [res_id] * well_count,
"volumes": [liquid_volume] * well_count, "volumes": [DEFAULT_LIQUID_VOLUME] * well_count,
}, },
) )
@@ -828,12 +498,8 @@ def build_protocol_graph(
# set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid # set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid
resource_last_writer[labware_id] = f"{node_id}:output_wells" resource_last_writer[labware_id] = f"{node_id}:output_wells"
# 收集所有 create_resource 节点 ID用于让第一个 transfer_liquid 等待所有资源创建完成 # transfer_liquid 之间通过 ready 串联,从 None 开始
all_create_resource_node_ids = list(slot_to_create_resource.values()) last_control_node_id = None
# transfer_liquid 之间通过 ready 串联;第一个 transfer_liquid 需要等待所有 create_resource 完成
last_control_node_id = trash_create_node_id
is_first_action_node = True
# 端口名称映射JSON 字段名 -> 实际 handle key # 端口名称映射JSON 字段名 -> 实际 handle key
INPUT_PORT_MAPPING = { INPUT_PORT_MAPPING = {
@@ -845,7 +511,6 @@ def build_protocol_graph(
"reagent": "reagent", "reagent": "reagent",
"solvent": "solvent", "solvent": "solvent",
"compound": "compound", "compound": "compound",
"tip_racks": "tip_rack_identifier",
} }
OUTPUT_PORT_MAPPING = { OUTPUT_PORT_MAPPING = {
@@ -860,17 +525,8 @@ def build_protocol_graph(
"compound": "compound", "compound": "compound",
} }
# 需要根据 wells 数量扩展的参数列表 # 需要根据 wells 数量扩展的参数列表(复数形式)
# - 复数参数asp_vols 等)支持单值自动扩展 EXPAND_BY_WELLS_PARAMS = ["asp_vols", "dis_vols", "asp_flow_rates", "dis_flow_rates"]
# - liquid_height 按 wells 扩展为数组
# - mix_* 参数保持标量,避免被转换为 list
EXPAND_BY_WELLS_PARAMS = [
"asp_vols",
"dis_vols",
"asp_flow_rates",
"dis_flow_rates",
"liquid_height",
]
# 处理协议步骤 # 处理协议步骤
for step in protocol_steps: for step in protocol_steps:
@@ -884,57 +540,6 @@ def build_protocol_graph(
if old_name in params: if old_name in params:
params[new_name] = params.pop(old_name) params[new_name] = params.pop(old_name)
# touch_tip 输入归一化:
# - 支持 bool / 0/1 / "true"/"false" / 单元素 list
# - 最终统一为 bool 标量,避免被下游误当作序列处理
if "touch_tip" in params:
touch_tip_value = params.get("touch_tip")
if isinstance(touch_tip_value, list):
if len(touch_tip_value) == 1:
touch_tip_value = touch_tip_value[0]
elif len(touch_tip_value) == 0:
touch_tip_value = False
else:
warnings.append(f"touch_tip 期望标量,但收到长度为 {len(touch_tip_value)} 的列表,使用首个值")
touch_tip_value = touch_tip_value[0]
if isinstance(touch_tip_value, str):
norm = touch_tip_value.strip().lower()
if norm in {"true", "1", "yes", "y", "on"}:
touch_tip_value = True
elif norm in {"false", "0", "no", "n", "off", ""}:
touch_tip_value = False
else:
warnings.append(f"touch_tip 字符串值无法识别: {touch_tip_value},按 True 处理")
touch_tip_value = True
elif isinstance(touch_tip_value, (int, float)):
touch_tip_value = bool(touch_tip_value)
elif touch_tip_value is None:
touch_tip_value = False
else:
touch_tip_value = bool(touch_tip_value)
params["touch_tip"] = touch_tip_value
# delays 输入归一化:
# - 支持标量int/float/字符串数字)与 list
# - 最终统一为数字列表,供下游按 delays[0]/delays[1] 使用
if "delays" in params:
delays_value = params.get("delays")
if delays_value is None or delays_value == "":
params["delays"] = []
else:
raw_list = delays_value if isinstance(delays_value, list) else [delays_value]
normalized_delays = []
for delay_item in raw_list:
if isinstance(delay_item, str):
delay_item = delay_item.strip()
if delay_item == "":
continue
try:
normalized_delays.append(float(delay_item))
except (TypeError, ValueError):
warnings.append(f"delays 包含无法转换为数字的值: {delay_item},已忽略")
params["delays"] = normalized_delays
# 处理输入连接 # 处理输入连接
for param_key, target_port in INPUT_PORT_MAPPING.items(): for param_key, target_port in INPUT_PORT_MAPPING.items():
resource_name = params.get(param_key) resource_name = params.get(param_key)
@@ -1001,12 +606,7 @@ def build_protocol_graph(
G.add_node(node_id, **step_copy) G.add_node(node_id, **step_copy)
# 控制流 # 控制流
if is_first_action_node: if last_control_node_id is not None:
# 第一个 transfer_liquid 需要等待所有 create_resource 完成
for cr_node_id in all_create_resource_node_ids:
G.add_edge(cr_node_id, node_id, source_port="ready", target_port="ready")
is_first_action_node = False
elif last_control_node_id is not None:
G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready") G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready")
last_control_node_id = node_id last_control_node_id = node_id

View File

@@ -210,7 +210,6 @@ def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]], data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION, workstation_name: str = DEFAULT_WORKSTATION,
validate: bool = True, validate: bool = True,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph: ) -> WorkflowGraph:
""" """
从 JSON 数据或文件转换为 WorkflowGraph 从 JSON 数据或文件转换为 WorkflowGraph
@@ -222,8 +221,6 @@ def convert_from_json(
data: JSON 文件路径、字典数据、或 JSON 字符串 data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi" workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True validate: 是否校验句柄配置,默认 True
preserve_tip_rack_incoming_class: True默认时仅 tip_rack 不跑模板、按传入类名/labware其它载体仍自动匹配。
False 时全部走模板。JSON 根 ``preserve_tip_rack_incoming_class`` 可覆盖此参数。
Returns: Returns:
WorkflowGraph: 构建好的工作流图 WorkflowGraph: 构建好的工作流图
@@ -266,10 +263,6 @@ def convert_from_json(
# reagent 已经是字典格式,用于 set_liquid 和 well 数量查找 # reagent 已经是字典格式,用于 set_liquid 和 well 数量查找
labware_info = reagent labware_info = reagent
preserve = preserve_tip_rack_incoming_class
if "preserve_tip_rack_incoming_class" in json_data:
preserve = bool(json_data["preserve_tip_rack_incoming_class"])
# 构建工作流图 # 构建工作流图
graph = build_protocol_graph( graph = build_protocol_graph(
labware_info=labware_info, labware_info=labware_info,
@@ -277,7 +270,6 @@ def convert_from_json(
workstation_name=workstation_name, workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING, action_resource_mapping=ACTION_RESOURCE_MAPPING,
labware_defs=labware_defs, labware_defs=labware_defs,
preserve_tip_rack_incoming_class=preserve,
) )
# 校验句柄配置 # 校验句柄配置
@@ -295,7 +287,6 @@ def convert_from_json(
def convert_json_to_node_link( def convert_json_to_node_link(
data: Union[str, PathLike, Dict[str, Any]], data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION, workstation_name: str = DEFAULT_WORKSTATION,
preserve_tip_rack_incoming_class: bool = True,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
将 JSON 数据转换为 node-link 格式的字典 将 JSON 数据转换为 node-link 格式的字典
@@ -307,18 +298,13 @@ def convert_json_to_node_link(
Returns: Returns:
Dict: node-link 格式的工作流数据 Dict: node-link 格式的工作流数据
""" """
graph = convert_from_json( graph = convert_from_json(data, workstation_name)
data,
workstation_name,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
return graph.to_node_link_dict() return graph.to_node_link_dict()
def convert_json_to_workflow_list( def convert_json_to_workflow_list(
data: Union[str, PathLike, Dict[str, Any]], data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = DEFAULT_WORKSTATION, workstation_name: str = DEFAULT_WORKSTATION,
preserve_tip_rack_incoming_class: bool = True,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
将 JSON 数据转换为工作流列表格式 将 JSON 数据转换为工作流列表格式
@@ -330,9 +316,5 @@ def convert_json_to_workflow_list(
Returns: Returns:
List: 工作流节点列表 List: 工作流节点列表
""" """
graph = convert_from_json( graph = convert_from_json(data, workstation_name)
data,
workstation_name,
preserve_tip_rack_incoming_class=preserve_tip_rack_incoming_class,
)
return graph.to_dict() return graph.to_dict()

View File

@@ -234,7 +234,6 @@ def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]], data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi", workstation_name: str = "PRCXi",
validate: bool = True, validate: bool = True,
preserve_tip_rack_incoming_class: bool = True,
) -> WorkflowGraph: ) -> WorkflowGraph:
""" """
从 JSON 数据或文件转换为 WorkflowGraph 从 JSON 数据或文件转换为 WorkflowGraph
@@ -247,7 +246,6 @@ def convert_from_json(
data: JSON 文件路径、字典数据、或 JSON 字符串 data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi" workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True validate: 是否校验句柄配置,默认 True
preserve_tip_rack_incoming_class: True 时仅 tip 不跑模板False 时全部匹配JSON 根字段同名可覆盖
Returns: Returns:
WorkflowGraph: 构建好的工作流图 WorkflowGraph: 构建好的工作流图
@@ -297,17 +295,12 @@ def convert_from_json(
"3. {'steps': [...], 'labware': [...]}" "3. {'steps': [...], 'labware': [...]}"
) )
preserve = preserve_tip_rack_incoming_class
if "preserve_tip_rack_incoming_class" in json_data:
preserve = bool(json_data["preserve_tip_rack_incoming_class"])
# 构建工作流图 # 构建工作流图
graph = build_protocol_graph( graph = build_protocol_graph(
labware_info=labware_info, labware_info=labware_info,
protocol_steps=protocol_steps, protocol_steps=protocol_steps,
workstation_name=workstation_name, workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING, action_resource_mapping=ACTION_RESOURCE_MAPPING,
preserve_tip_rack_incoming_class=preserve,
) )
# 校验句柄配置 # 校验句柄配置