Compare commits

...

27 Commits

Author SHA1 Message Date
Xuwznln
e0394bf414 Merge remote-tracking branch 'origin/dev' into dev 2026-03-04 19:18:55 +08:00
Xuwznln
975a56415a import gzip 2026-03-04 19:18:36 +08:00
Xuwznln
cadbe87e3f add gzip 2026-03-04 19:18:19 +08:00
Xuwznln
b993c1f590 add gzip 2026-03-04 19:18:09 +08:00
Xuwznln
e0fae94c10 change pose extra to any 2026-03-04 19:06:58 +08:00
Xuwznln
b5cd181ac1 add isFlapY 2026-03-04 18:59:45 +08:00
Xuwznln
5c047beb83 support container as example
add z index

(cherry picked from commit 145fcaae65)
2026-03-03 18:04:13 +08:00
Xuwznln
b40c087143 fix container volume 2026-03-03 17:13:32 +08:00
Xuwznln
7f1cc3b2a5 update materials 2026-03-03 11:43:52 +08:00
Xuwznln
3f160c2049 更新prcxi deck & 新增 unilabos_resource_slot 2026-03-03 11:40:23 +08:00
Xuwznln
a54e7c0f23 new workflow & prcxi slot removal 2026-03-02 18:29:25 +08:00
Xuwznln
e5015cd5e0 fix size change 2026-03-02 15:52:44 +08:00
Xuwznln
514373c164 v0.10.18
(cherry picked from commit 06b6f0d804)
2026-03-02 02:30:10 +08:00
Xuwznln
fcea02585a no opcua installation on macos 2026-02-28 09:41:37 +08:00
Xuwznln
07cf690897 fix possible crash 2026-02-12 01:46:26 +08:00
Xuwznln
cfea27460a fix deck & host_node 2026-02-12 01:46:24 +08:00
Xuwznln
b7d3e980a9 set liquid with tube 2026-02-12 01:46:23 +08:00
Xuwznln
f9ed6cb3fb add test_resource_schema 2026-02-11 14:02:21 +08:00
Xuwznln
699a0b3ce7 fix test resource schema 2026-02-10 23:08:29 +08:00
Xuwznln
cf3a20ae79 registry update & workflow update 2026-02-10 22:46:07 +08:00
Xuwznln
cdf0652020 add test mode 2026-02-10 15:18:41 +08:00
Xuwznln
60073ff139 support description & tags upload 2026-02-10 14:38:55 +08:00
Xuwznln
a9053b822f fix config load 2026-02-10 13:06:05 +08:00
Xuwznln
d238c2ab8b fix log 2026-02-10 13:04:33 +08:00
Xuwznln
9a7d5c7c82 add registry name & add always free 2026-02-07 02:11:43 +08:00
Xuwznln
4f7d431c0b correct config organic synthesis 2026-02-06 12:04:19 +08:00
Xuwznln
341a1b537c Adapt to new scheduler, sampels, and edge upload format (#230)
* add sample_material

* adapt to new samples sys

* fix pump transfer. fix resource update when protocol & ros callback

* Adapt to new scheduler.
2026-02-06 00:49:53 +08:00
48 changed files with 1816 additions and 1255 deletions

View File

@@ -3,7 +3,7 @@
package:
name: unilabos
version: 0.10.17
version: 0.10.18
source:
path: ../../unilabos
@@ -46,13 +46,15 @@ requirements:
- jinja2
- requests
- uvicorn
- opcua # [not osx]
- if: not osx
then:
- opcua
- pyserial
- pandas
- pymodbus
- matplotlib
- pylibftdi
- uni-lab::unilabos-env ==0.10.17
- uni-lab::unilabos-env ==0.10.18
about:
repository: https://github.com/deepmodeling/Uni-Lab-OS

View File

@@ -2,7 +2,7 @@
package:
name: unilabos-env
version: 0.10.17
version: 0.10.18
build:
noarch: generic

View File

@@ -3,7 +3,7 @@
package:
name: unilabos-full
version: 0.10.17
version: 0.10.18
build:
noarch: generic
@@ -11,7 +11,7 @@ build:
requirements:
run:
# Base unilabos package (includes unilabos-env)
- uni-lab::unilabos ==0.10.17
- uni-lab::unilabos ==0.10.18
# Documentation tools
- sphinx
- sphinx_rtd_theme

View File

@@ -452,8 +452,9 @@ unilab --ak your_ak --sk your_sk -g test/experiments/mock_devices/mock_all.json
**操作步骤:**
1. 将两个 `container` 拖拽到 `workstation`
2.`virtual_transfer_pump` 拖拽到 `workstation`
3. 在画布上连接它们(建立父子关系)
2.`virtual_multiway_valve` 拖拽到 `workstation`
3. `virtual_transfer_pump` 拖拽到 `workstation`
4. 在画布上连接它们(建立父子关系)
![设备连接](image/links.png)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 275 KiB

After

Width:  |  Height:  |  Size: 415 KiB

View File

@@ -1,6 +1,6 @@
package:
name: ros-humble-unilabos-msgs
version: 0.10.17
version: 0.10.18
source:
path: ../../unilabos_msgs
target_directory: src

View File

@@ -1,6 +1,6 @@
package:
name: unilabos
version: "0.10.17"
version: "0.10.18"
source:
path: ../..

View File

@@ -4,7 +4,7 @@ package_name = 'unilabos'
setup(
name=package_name,
version='0.10.17',
version='0.10.18',
packages=find_packages(),
include_package_data=True,
install_requires=['setuptools'],

View File

@@ -1 +1 @@
__version__ = "0.10.17"
__version__ = "0.10.18"

View File

@@ -1,6 +1,7 @@
import argparse
import asyncio
import os
import platform
import shutil
import signal
import sys
@@ -171,6 +172,12 @@ def parse_args():
action="store_true",
help="Disable sending update feedback to server",
)
parser.add_argument(
"--test_mode",
action="store_true",
default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware",
)
# workflow upload subcommand
workflow_parser = subparsers.add_parser(
"workflow_upload",
@@ -204,6 +211,12 @@ def parse_args():
default=False,
help="Whether to publish the workflow (default: False)",
)
workflow_parser.add_argument(
"--description",
type=str,
default="",
help="Workflow description, used when publishing the workflow",
)
return parser
@@ -231,52 +244,60 @@ def main():
# 加载配置文件优先加载config然后从env读取
config_path = args_dict.get("config")
if check_mode:
args_dict["working_dir"] = os.path.abspath(os.getcwd())
# 当 skip_env_check 时,默认使用当前目录作为 working_dir
if skip_env_check and not args_dict.get("working_dir") and not config_path:
# === 解析 working_dir ===
# 规则1: working_dir 传入 → 检测 unilabos_data 子目录,已是则不修改
# 规则2: 仅 config_path 传入 → 用其父目录作为 working_dir
# 规则4: 两者都传入 → 各用各的,但 working_dir 仍做 unilabos_data 子目录检测
raw_working_dir = args_dict.get("working_dir")
if raw_working_dir:
working_dir = os.path.abspath(raw_working_dir)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(os.path.abspath(config_path))
else:
working_dir = os.path.abspath(os.getcwd())
print_status(f"跳过环境检查模式:使用当前目录作为工作目录 {working_dir}", "info")
# 检查当前目录是否有 local_config.py
local_config_in_cwd = os.path.join(working_dir, "local_config.py")
if os.path.exists(local_config_in_cwd):
config_path = local_config_in_cwd
# unilabos_data 子目录自动检测
if os.path.basename(working_dir) != "unilabos_data":
unilabos_data_sub = os.path.join(working_dir, "unilabos_data")
if os.path.isdir(unilabos_data_sub):
working_dir = unilabos_data_sub
elif not raw_working_dir and not (config_path and os.path.exists(config_path)):
# 未显式指定路径,默认使用 cwd/unilabos_data
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
# === 解析 config_path ===
if config_path and not os.path.exists(config_path):
# config_path 传入但不存在,尝试在 working_dir 中查找
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"在工作目录中发现配置文件: {config_path}", "info")
else:
print_status(
f"配置文件 {config_path} 不存在,工作目录 {working_dir} 中也未找到 local_config.py"
f"请通过 --config 传入 local_config.py 文件路径",
"error",
)
os._exit(1)
elif not config_path:
# 规则3: 未传入 config_path尝试 working_dir/local_config.py
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"发现本地配置文件: {config_path}", "info")
else:
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
elif os.getcwd().endswith("unilabos_data"):
working_dir = os.path.abspath(os.getcwd())
else:
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
if args_dict.get("working_dir"):
working_dir = args_dict.get("working_dir", "")
if config_path and not os.path.exists(config_path):
config_path = os.path.join(working_dir, "local_config.py")
if not os.path.exists(config_path):
print_status(
f"当前工作目录 {working_dir} 未找到local_config.py请通过 --config 传入 local_config.py 文件路径",
"error",
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info")
if check_mode or input() != "n":
os.makedirs(working_dir, exist_ok=True)
config_path = os.path.join(working_dir, "local_config.py")
shutil.copy(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"),
config_path,
)
print_status(f"已创建 local_config.py 路径: {config_path}", "info")
else:
os._exit(1)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(config_path)
elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")):
config_path = os.path.join(working_dir, "local_config.py")
elif not skip_env_check and not config_path and (
not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py"))
):
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info")
if input() != "n":
os.makedirs(working_dir, exist_ok=True)
config_path = os.path.join(working_dir, "local_config.py")
shutil.copy(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), config_path
)
print_status(f"已创建 local_config.py 路径: {config_path}", "info")
else:
os._exit(1)
# 加载配置文件 (check_mode 跳过)
print_status(f"当前工作目录为 {working_dir}", "info")
@@ -288,7 +309,9 @@ def main():
if hasattr(BasicConfig, "log_level"):
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
file_path = configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if file_path is not None:
logger.info(f"[LOG_FILE] {file_path}")
if args.addr != parser.get_default("addr"):
if args.addr == "test":
@@ -332,8 +355,11 @@ def main():
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
BasicConfig.upload_registry = args_dict.get("upload_registry", False)
BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False)
BasicConfig.test_mode = args_dict.get("test_mode", False)
if BasicConfig.test_mode:
print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning")
BasicConfig.communication_protocol = "websocket"
machine_name = os.popen("hostname").read().strip()
machine_name = platform.node()
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
BasicConfig.machine_name = machine_name
BasicConfig.vis_2d_enable = args_dict["2d_vis"]

View File

@@ -54,7 +54,7 @@ class JobAddReq(BaseModel):
action_type: str = Field(
examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default=""
)
sample_material: dict = Field(examples=[{"string": "string"}], description="sample uuid to material uuid", default_factory=dict)
sample_material: dict = Field(examples=[{"string": "string"}], description="sample uuid to material uuid")
action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict)
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")

View File

@@ -38,9 +38,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(devices_to_register.values())})
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}ms")
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}s")
else:
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}ms")
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}s")
except Exception as e:
logger.error(f"[UniLab Register] 设备注册异常: {e}")
@@ -51,9 +51,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(resources_to_register.values())})
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}ms")
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}s")
else:
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}ms")
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}s")
except Exception as e:
logger.error(f"[UniLab Register] 资源注册异常: {e}")

View File

@@ -3,7 +3,7 @@ HTTP客户端模块
提供与远程服务器通信的客户端功能只有host需要用
"""
import gzip
import json
import os
from typing import List, Dict, Any, Optional
@@ -290,10 +290,17 @@ class HTTPClient:
Returns:
Response: API响应对象
"""
compressed_body = gzip.compress(
json.dumps(registry_data, ensure_ascii=False, default=str).encode("utf-8")
)
response = requests.post(
f"{self.remote_addr}/lab/resource",
json=registry_data,
headers={"Authorization": f"Lab {self.auth}"},
data=compressed_body,
headers={
"Authorization": f"Lab {self.auth}",
"Content-Type": "application/json",
"Content-Encoding": "gzip",
},
timeout=30,
)
if response.status_code not in [200, 201]:
@@ -343,9 +350,10 @@ class HTTPClient:
edges: List[Dict[str, Any]],
tags: Optional[List[str]] = None,
published: bool = False,
description: str = "",
) -> Dict[str, Any]:
"""
导入工作流到服务器
导入工作流到服务器,如果 published 为 True则额外发起发布请求
Args:
name: 工作流名称(顶层)
@@ -355,6 +363,7 @@ class HTTPClient:
edges: 工作流边列表
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns:
Dict: API响应数据包含 code 和 data (uuid, name)
@@ -367,7 +376,6 @@ class HTTPClient:
"nodes": nodes,
"edges": edges,
"tags": tags if tags is not None else [],
"published": published,
},
}
# 保存请求到文件
@@ -388,11 +396,51 @@ class HTTPClient:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"导入工作流失败: {response.text}")
return res
# 导入成功后,如果需要发布则额外发起发布请求
if published:
imported_uuid = res.get("data", {}).get("uuid", workflow_uuid)
publish_res = self.workflow_publish(imported_uuid, description)
res["publish_result"] = publish_res
return res
else:
logger.error(f"导入工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
def workflow_publish(self, workflow_uuid: str, description: str = "") -> Dict[str, Any]:
"""
发布工作流
Args:
workflow_uuid: 工作流UUID
description: 工作流描述
Returns:
Dict: API响应数据
"""
payload = {
"uuid": workflow_uuid,
"description": description,
"published": True,
}
logger.info(f"正在发布工作流: {workflow_uuid}")
response = requests.patch(
f"{self.remote_addr}/lab/workflow/owner",
json=payload,
headers={"Authorization": f"Lab {self.auth}"},
timeout=60,
)
if response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"发布工作流失败: {response.text}")
else:
logger.info(f"工作流发布成功: {workflow_uuid}")
return res
else:
logger.error(f"发布工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
# 创建默认客户端实例
http_client = HTTPClient()

View File

@@ -76,6 +76,7 @@ class JobInfo:
start_time: float
last_update_time: float = field(default_factory=time.time)
ready_timeout: Optional[float] = None # READY状态的超时时间
always_free: bool = False # 是否为永久闲置动作(不受排队限制)
def update_timestamp(self):
"""更新最后更新时间"""
@@ -127,6 +128,15 @@ class DeviceActionManager:
# 总是将job添加到all_jobs中
self.all_jobs[job_info.job_id] = job_info
# always_free的动作不受排队限制直接设为READY
if job_info.always_free:
job_info.status = JobStatus.READY
job_info.update_timestamp()
job_info.set_ready_timeout(10)
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.trace(f"[DeviceActionManager] Job {job_log} always_free, start immediately")
return True
# 检查是否有正在执行或准备执行的任务
if device_key in self.active_jobs:
# 有正在执行或准备执行的任务,加入队列
@@ -176,11 +186,15 @@ class DeviceActionManager:
logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}")
return False
# 检查设备上是否是这个job
if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id:
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}")
return False
# always_free的job不需要检查active_jobs
if not job_info.always_free:
# 检查设备上是否是这个job
if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id:
job_log = format_job_log(
job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name
)
logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}")
return False
# 开始执行任务将状态从READY转换为STARTED
job_info.status = JobStatus.STARTED
@@ -203,6 +217,13 @@ class DeviceActionManager:
job_info = self.all_jobs[job_id]
device_key = job_info.device_action_key
# always_free的job直接清理不影响队列
if job_info.always_free:
job_info.status = JobStatus.ENDED
job_info.update_timestamp()
del self.all_jobs[job_id]
return None
# 移除活跃任务
if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id:
del self.active_jobs[device_key]
@@ -234,9 +255,14 @@ class DeviceActionManager:
return None
def get_active_jobs(self) -> List[JobInfo]:
"""获取所有正在执行的任务"""
"""获取所有正在执行的任务(含active_jobs和always_free的STARTED job)"""
with self.lock:
return list(self.active_jobs.values())
jobs = list(self.active_jobs.values())
# 补充 always_free 的 STARTED job(它们不在 active_jobs 中)
for job in self.all_jobs.values():
if job.always_free and job.status == JobStatus.STARTED and job not in jobs:
jobs.append(job)
return jobs
def get_queued_jobs(self) -> List[JobInfo]:
"""获取所有排队中的任务"""
@@ -261,6 +287,14 @@ class DeviceActionManager:
job_info = self.all_jobs[job_id]
device_key = job_info.device_action_key
# always_free的job直接清理
if job_info.always_free:
job_info.status = JobStatus.ENDED
del self.all_jobs[job_id]
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.trace(f"[DeviceActionManager] Always-free job {job_log} cancelled")
return True
# 如果是正在执行的任务
if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id:
# 清理active job状态
@@ -334,13 +368,18 @@ class DeviceActionManager:
timeout_jobs = []
with self.lock:
# 统计READY状态的任务数量
ready_jobs_count = sum(1 for job in self.active_jobs.values() if job.status == JobStatus.READY)
# 收集所有需要检查的 READY 任务(active_jobs + always_free READY jobs)
ready_candidates = list(self.active_jobs.values())
for job in self.all_jobs.values():
if job.always_free and job.status == JobStatus.READY and job not in ready_candidates:
ready_candidates.append(job)
ready_jobs_count = sum(1 for job in ready_candidates if job.status == JobStatus.READY)
if ready_jobs_count > 0:
logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501
# 找到所有超时的READY任务只检测不处理
for job_info in self.active_jobs.values():
for job_info in ready_candidates:
if job_info.is_ready_timeout():
timeout_jobs.append(job_info)
job_log = format_job_log(
@@ -608,6 +647,24 @@ class MessageProcessor:
if host_node:
host_node.handle_pong_response(pong_data)
def _check_action_always_free(self, device_id: str, action_name: str) -> bool:
"""检查该action是否标记为always_free通过HostNode统一的_action_value_mappings查找"""
try:
host_node = HostNode.get_instance(0)
if not host_node:
return False
# noinspection PyProtectedMember
action_mappings = host_node._action_value_mappings.get(device_id)
if not action_mappings:
return False
# 尝试直接匹配或 auto- 前缀匹配
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
return action_mappings[key].get("always_free", False)
return False
except Exception:
return False
async def _handle_query_action_state(self, data: Dict[str, Any]):
"""处理query_action_state消息"""
device_id = data.get("device_id", "")
@@ -622,6 +679,9 @@ class MessageProcessor:
device_action_key = f"/devices/{device_id}/{action_name}"
# 检查action是否为always_free
action_always_free = self._check_action_always_free(device_id, action_name)
# 创建任务信息
job_info = JobInfo(
job_id=job_id,
@@ -631,6 +691,7 @@ class MessageProcessor:
device_action_key=device_action_key,
status=JobStatus.QUEUE,
start_time=time.time(),
always_free=action_always_free,
)
# 添加到设备管理器
@@ -657,6 +718,8 @@ class MessageProcessor:
async def _handle_job_start(self, data: Dict[str, Any]):
"""处理job_start消息"""
try:
if not data.get("sample_material"):
data["sample_material"] = {}
req = JobAddReq(**data)
job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action)
@@ -1121,6 +1184,11 @@ class QueueProcessor:
logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs")
for job_info in queued_jobs:
# 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY
# 此时不应再发送 busy/need_more否则会覆盖已发出的 free=True 通知
if job_info.status != JobStatus.QUEUE:
continue
message = {
"action": "report_action_state",
"data": {

View File

@@ -95,8 +95,29 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float:
return total_volume
def is_integrated_pump(node_name):
return "pump" in node_name and "valve" in node_name
def is_integrated_pump(node_class: str, node_name: str = "") -> bool:
"""
判断是否为泵阀一体设备
"""
class_lower = (node_class or "").lower()
name_lower = (node_name or "").lower()
if "pump" not in class_lower and "pump" not in name_lower:
return False
integrated_markers = [
"valve",
"pump_valve",
"pumpvalve",
"integrated",
"transfer_pump",
]
for marker in integrated_markers:
if marker in class_lower or marker in name_lower:
return True
return False
def find_connected_pump(G, valve_node):
@@ -186,7 +207,9 @@ def build_pump_valve_maps(G, pump_backbone):
debug_print(f"🔧 过滤后的骨架: {filtered_backbone}")
for node in filtered_backbone:
if is_integrated_pump(G.nodes[node]["class"]):
node_data = G.nodes.get(node, {})
node_class = node_data.get("class", "") or ""
if is_integrated_pump(node_class, node):
pumps_from_node[node] = node
valve_from_node[node] = node
debug_print(f" - 集成泵-阀: {node}")

View File

@@ -23,6 +23,7 @@ class BasicConfig:
disable_browser = False # 禁止浏览器自动打开
port = 8002 # 本地HTTP服务
check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性
test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"
@@ -145,5 +146,5 @@ def load_config(config_path=None):
traceback.print_exc()
exit(1)
else:
config_path = os.path.join(os.path.dirname(__file__), "local_config.py")
config_path = os.path.join(os.path.dirname(__file__), "example_config.py")
load_config(config_path)

View File

@@ -21,7 +21,7 @@ from pylabrobot.resources import (
ResourceHolder,
Lid,
Trash,
Tip,
Tip, TubeRack,
)
from typing_extensions import TypedDict
@@ -690,18 +690,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
)
def set_liquid_from_plate(
self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float]
self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float]
) -> SetLiquidFromPlateReturn:
"""Set the liquid in wells of a plate by well names (e.g., A1, A2, B3).
如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。
"""
if isinstance(plate, list): # 未来移除
plate = plate[0]
assert issubclass(plate.__class__, Plate), "plate must be a Plate"
plate: Plate = cast(Plate, plate)
assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}"
plate: Union[Plate, TubeRack]
# 根据 well_names 获取对应的 Well 对象
wells = [plate.get_well(name) for name in well_names]
if issubclass(plate.__class__, Plate):
wells = [plate.get_well(name) for name in well_names]
elif issubclass(plate.__class__, TubeRack):
wells = [plate.get_tube(name) for name in well_names]
res_volumes = []
# 如果 liquid_names 和 volumes 都为空,直接返回

View File

@@ -55,6 +55,7 @@ from unilabos.devices.liquid_handling.liquid_handler_abstract import (
TransferLiquidReturn,
)
from unilabos.registry.placeholder_type import ResourceSlot
from unilabos.resources.resource_tracker import ResourceTreeSet
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
@@ -90,20 +91,103 @@ class PRCXI9300Deck(Deck):
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
super().__init__(name, size_x, size_y, size_z)
self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位
self.slot_locations = [Coordinate(0, 0, 0)] * 16
# T1-T16 默认位置 (4列×4行)
_DEFAULT_SITE_POSITIONS = [
(0, 0, 0), (138, 0, 0), (276, 0, 0), (414, 0, 0), # T1-T4
(0, 96, 0), (138, 96, 0), (276, 96, 0), (414, 96, 0), # T5-T8
(0, 192, 0), (138, 192, 0), (276, 192, 0), (414, 192, 0), # T9-T12
(0, 288, 0), (138, 288, 0), (276, 288, 0), (414, 288, 0), # T13-T16
]
_DEFAULT_SITE_SIZE = {"width": 128.0, "height": 86, "depth": 0}
_DEFAULT_CONTENT_TYPE = ["plate", "tip_rack", "plates", "tip_racks", "tube_rack", "adaptor"]
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
sites: Optional[List[Dict[str, Any]]] = None, **kwargs):
super().__init__(size_x, size_y, size_z, name)
if sites is not None:
self.sites: List[Dict[str, Any]] = [dict(s) for s in sites]
else:
self.sites = []
for i, (x, y, z) in enumerate(self._DEFAULT_SITE_POSITIONS):
self.sites.append({
"label": f"T{i + 1}",
"visible": True,
"position": {"x": x, "y": y, "z": z},
"size": dict(self._DEFAULT_SITE_SIZE),
"content_type": list(self._DEFAULT_CONTENT_TYPE),
})
# _ordering: label -> None, 用于外部通过 list(keys()).index(site) 将 Tn 转换为 spot index
self._ordering = collections.OrderedDict(
(site["label"], None) for site in self.sites
)
def _get_site_location(self, idx: int) -> Coordinate:
pos = self.sites[idx]["position"]
return Coordinate(pos["x"], pos["y"], pos["z"])
def _get_site_resource(self, idx: int) -> Optional[Resource]:
site_loc = self._get_site_location(idx)
for child in self.children:
if child.location == site_loc:
return child
return None
def assign_child_resource(
self,
resource: Resource,
location: Optional[Coordinate] = None,
reassign: bool = True,
spot: Optional[int] = None,
):
idx = spot
if spot is not None:
idx = spot
else:
for i, site in enumerate(self.sites):
site_loc = self._get_site_location(i)
if site.get("label") == resource.name:
idx = i
break
if location is not None and site_loc == location:
idx = i
break
if idx is None:
for i in range(len(self.sites)):
if self._get_site_resource(i) is None:
idx = i
break
if idx is None:
raise ValueError(f"No available site on deck '{self.name}' for resource '{resource.name}'")
if not reassign and self._get_site_resource(idx) is not None:
raise ValueError(f"Site {idx} ('{self.sites[idx]['label']}') is already occupied")
loc = self._get_site_location(idx)
super().assign_child_resource(resource, location=loc, reassign=reassign)
def assign_child_at_slot(self, resource: Resource, slot: int, reassign: bool = False) -> None:
if self.slots[slot - 1] is not None and not reassign:
raise ValueError(f"Spot {slot} is already occupied")
self.assign_child_resource(resource, spot=slot - 1, reassign=reassign)
self.slots[slot - 1] = resource
super().assign_child_resource(resource, location=self.slot_locations[slot - 1])
def serialize(self) -> dict:
data = super().serialize()
sites_out = []
for i, site in enumerate(self.sites):
occupied = self._get_site_resource(i)
sites_out.append({
"label": site["label"],
"visible": site.get("visible", True),
"occupied_by": occupied.name if occupied is not None else None,
"position": site["position"],
"size": site["size"],
"content_type": site["content_type"],
})
data["sites"] = sites_out
return data
class PRCXI9300Container(Plate):
class PRCXI9300Container(Container):
"""PRCXI 9300 的专用 Container 类,继承自 Plate用于槽位定位和未知模块。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
@@ -116,11 +200,10 @@ class PRCXI9300Container(Plate):
size_y: float,
size_z: float,
category: str,
ordering: collections.OrderedDict,
model: Optional[str] = None,
**kwargs,
):
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
super().__init__(name, size_x, size_y, size_z, category=category, model=model)
self._unilabos_state = {}
def load_state(self, state: Dict[str, Any]) -> None:
@@ -248,14 +331,15 @@ class PRCXI9300TipRack(TipRack):
if ordered_items is not None:
items = ordered_items
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 TipRack 自己创建 Tip 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 检查 ordering 中的值类型来决定如何处理:
# - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param
# - None 值(从第二次往返序列化): 同样只用键创建 ordering_param
# - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用
first_val = next(iter(ordering.values()), None) if ordering else None
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TipRack 自己创建 Tip 对象
items = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,可以直接使用
@@ -397,14 +481,15 @@ class PRCXI9300TubeRack(TubeRack):
items_to_pass = ordered_items
ordering_param = None
elif ordering is not None:
# 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况)
# 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象
# 我们只传递位置信息(键),不传递值,使用 ordering 参数
if ordering and isinstance(next(iter(ordering.values()), None), str):
# ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict
# 检查 ordering 中的值类型来决定如何处理:
# - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param
# - None 值(从第二次往返序列化): 同样只用键创建 ordering_param
# - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用
first_val = next(iter(ordering.values()), None) if ordering else None
if not ordering or first_val is None or isinstance(first_val, str):
# ordering 的值是字符串或 None只使用键位置信息创建新的 OrderedDict
# 传递 ordering 参数而不是 ordered_items让 TubeRack 自己创建 Tube 对象
items_to_pass = None
# 使用 ordering 参数,只包含位置信息(键)
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
else:
# ordering 的值已经是对象,可以直接使用
@@ -565,14 +650,14 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
tablets_info = []
count = 0
for child in deck.children:
if child.children:
if "Material" in child.children[0]._unilabos_state:
number = int(child.name.replace("T", ""))
tablets_info.append(
WorkTablets(
Number=number, Code=f"T{number}", Material=child.children[0]._unilabos_state["Material"]
)
# 如果放其他类型的物料,是不可以的
if hasattr(child, "_unilabos_state") and "Material" in child._unilabos_state:
number = int(child.name.replace("T", ""))
tablets_info.append(
WorkTablets(
Number=number, Code=f"T{number}", Material=child._unilabos_state["Material"]
)
)
if is_9320:
print("当前设备是9320")
# 始终初始化 step_mode 属性
@@ -595,7 +680,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
return super().set_liquid(wells, liquid_names, volumes)
def set_liquid_from_plate(
self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float]
self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float]
) -> SetLiquidFromPlateReturn:
return super().set_liquid_from_plate(plate, well_names, liquid_names, volumes)

View File

@@ -19,10 +19,11 @@ from rclpy.node import Node
import re
class LiquidHandlerJointPublisher(BaseROS2DeviceNode):
def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", **kwargs):
def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", registry_name: str = "lh_joint_publisher", **kwargs):
super().__init__(
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types={},
action_value_mappings={},
hardware_interface={},

View File

@@ -15,35 +15,35 @@ class VirtualPumpMode(Enum):
class VirtualTransferPump:
"""虚拟转移泵类 - 模拟泵的基本功能,无需实际硬件 🚰"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: dict = None, **kwargs):
"""
初始化虚拟转移泵
Args:
device_id: 设备ID
config: 配置字典包含max_volume, port等参数
**kwargs: 其他参数,确保兼容性
"""
self.device_id = device_id or "virtual_transfer_pump"
# 从config或kwargs中获取参数确保类型正确
if config:
self.max_volume = float(config.get('max_volume', 25.0))
self.port = config.get('port', 'VIRTUAL')
self.max_volume = float(config.get("max_volume", 25.0))
self.port = config.get("port", "VIRTUAL")
else:
self.max_volume = float(kwargs.get('max_volume', 25.0))
self.port = kwargs.get('port', 'VIRTUAL')
self._transfer_rate = float(kwargs.get('transfer_rate', 0))
self.mode = kwargs.get('mode', VirtualPumpMode.Normal)
self.max_volume = float(kwargs.get("max_volume", 25.0))
self.port = kwargs.get("port", "VIRTUAL")
self._transfer_rate = float(kwargs.get("transfer_rate", 0))
self.mode = kwargs.get("mode", VirtualPumpMode.Normal)
# 状态变量 - 确保都是正确类型
self._status = "Idle"
self._position = 0.0 # float
self._max_velocity = 5.0 # float
self._max_velocity = 5.0 # float
self._current_volume = 0.0 # float
# 🚀 新增:快速模式设置 - 大幅缩短执行时间
@@ -52,14 +52,16 @@ class VirtualTransferPump:
self._fast_dispense_time = 1.0 # 快速喷射时间(秒)
self.logger = logging.getLogger(f"VirtualTransferPump.{self.device_id}")
print(f"🚰 === 虚拟转移泵 {self.device_id} 已创建 === ✨")
print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s")
print(
f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s"
)
print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""初始化虚拟泵 🚀"""
self.logger.info(f"🔧 初始化虚拟转移泵 {self.device_id}")
@@ -68,33 +70,33 @@ class VirtualTransferPump:
self._current_volume = 0.0
self.logger.info(f"✅ 转移泵 {self.device_id} 初始化完成 🚰")
return True
async def cleanup(self) -> bool:
"""清理虚拟泵 🧹"""
self.logger.info(f"🧹 清理虚拟转移泵 {self.device_id} 🔚")
self._status = "Idle"
self.logger.info(f"✅ 转移泵 {self.device_id} 清理完成 💤")
return True
# 基本属性
@property
def status(self) -> str:
return self._status
@property
def position(self) -> float:
"""当前柱塞位置 (ml) 📍"""
return self._position
@property
def current_volume(self) -> float:
"""当前注射器中的体积 (ml) 💧"""
return self._current_volume
@property
def max_velocity(self) -> float:
return self._max_velocity
@property
def transfer_rate(self) -> float:
return self._transfer_rate
@@ -103,17 +105,17 @@ class VirtualTransferPump:
"""设置最大速度 (ml/s) 🌊"""
self._max_velocity = max(0.1, min(50.0, velocity)) # 限制在合理范围内
self.logger.info(f"🌊 设置最大速度为 {self._max_velocity} mL/s")
def get_status(self) -> str:
"""获取泵状态 📋"""
return self._status
async def _simulate_operation(self, duration: float):
"""模拟操作延时 ⏱️"""
self._status = "Busy"
await self._ros_node.sleep(duration)
self._status = "Idle"
def _calculate_duration(self, volume: float, velocity: float = None) -> float:
"""
计算操作持续时间 ⏰
@@ -121,10 +123,10 @@ class VirtualTransferPump:
"""
if velocity is None:
velocity = self._max_velocity
# 📊 计算理论时间(用于日志显示)
theoretical_duration = abs(volume) / velocity
# 🚀 如果启用快速模式,使用固定的快速时间
if self._fast_mode:
# 根据操作类型选择快速时间
@@ -132,13 +134,13 @@ class VirtualTransferPump:
actual_duration = self._fast_move_time
else: # 很小的操作
actual_duration = 0.5
self.logger.debug(f"⚡ 快速模式: 理论时间 {theoretical_duration:.2f}s → 实际时间 {actual_duration:.2f}s")
return actual_duration
else:
# 正常模式使用理论时间
return theoretical_duration
def _calculate_display_duration(self, volume: float, velocity: float = None) -> float:
"""
计算显示用的持续时间(用于日志) 📊
@@ -147,16 +149,16 @@ class VirtualTransferPump:
if velocity is None:
velocity = self._max_velocity
return abs(volume) / velocity
# 新的set_position方法 - 专门用于SetPumpPosition动作
async def set_position(self, position: float, max_velocity: float = None):
"""
移动到绝对位置 - 专门用于SetPumpPosition动作 🎯
Args:
position (float): 目标位置 (ml)
max_velocity (float): 移动速度 (ml/s)
Returns:
dict: 符合SetPumpPosition.action定义的结果
"""
@@ -164,19 +166,19 @@ class VirtualTransferPump:
# 验证并转换参数
target_position = float(position)
velocity = float(max_velocity) if max_velocity is not None else self._max_velocity
# 限制位置在有效范围内
target_position = max(0.0, min(float(self.max_volume), target_position))
# 计算移动距离
volume_to_move = abs(target_position - self._position)
# 📊 计算显示用的时间(用于日志)
display_duration = self._calculate_display_duration(volume_to_move, velocity)
# ⚡ 计算实际执行时间(快速模式)
actual_duration = self._calculate_duration(volume_to_move, velocity)
# 🎯 确定操作类型和emoji
if target_position > self._position:
operation_type = "吸液"
@@ -187,28 +189,34 @@ class VirtualTransferPump:
else:
operation_type = "保持"
operation_emoji = "📍"
self.logger.info(f"🎯 SET_POSITION: {operation_type} {operation_emoji}")
self.logger.info(f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)")
self.logger.info(
f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)"
)
self.logger.info(f" 🌊 速度: {velocity:.2f} mL/s")
self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s")
if self._fast_mode:
self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s")
# 🚀 模拟移动过程
if volume_to_move > 0.01: # 只有当移动距离足够大时才显示进度
start_position = self._position
steps = 5 if actual_duration > 0.5 else 2 # 根据实际时间调整步数
step_duration = actual_duration / steps
self.logger.info(f"🚀 开始{operation_type}... {operation_emoji}")
for i in range(steps + 1):
# 计算当前位置和进度
progress = (i / steps) * 100 if steps > 0 else 100
current_pos = start_position + (target_position - start_position) * (i / steps) if steps > 0 else target_position
current_pos = (
start_position + (target_position - start_position) * (i / steps)
if steps > 0
else target_position
)
# 更新状态
if i < steps:
self._status = f"{operation_type}"
@@ -216,10 +224,10 @@ class VirtualTransferPump:
else:
self._status = "Idle"
status_emoji = ""
self._position = current_pos
self._current_volume = current_pos
# 显示进度每25%或最后一步)
if i == 0:
self.logger.debug(f" 🔄 {operation_type}开始: {progress:.0f}%")
@@ -227,7 +235,7 @@ class VirtualTransferPump:
self.logger.debug(f" 🔄 {operation_type}进度: {progress:.0f}%")
elif i == steps:
self.logger.info(f"{operation_type}完成: {progress:.0f}% | 当前位置: {current_pos:.2f}mL")
# 等待一小步时间
if i < steps and step_duration > 0:
await self._ros_node.sleep(step_duration)
@@ -236,25 +244,27 @@ class VirtualTransferPump:
self._position = target_position
self._current_volume = target_position
self.logger.info(f" 📍 微调完成: {target_position:.2f}mL")
# 确保最终位置准确
self._position = target_position
self._current_volume = target_position
self._status = "Idle"
# 📊 最终状态日志
if volume_to_move > 0.01:
self.logger.info(f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL")
self.logger.info(
f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL"
)
# 返回符合action定义的结果
return {
"success": True,
"message": f"✅ 成功移动到位置 {self._position:.2f}mL ({operation_type})",
"final_position": self._position,
"final_volume": self._current_volume,
"operation_type": operation_type
"operation_type": operation_type,
}
except Exception as e:
error_msg = f"❌ 设置位置失败: {str(e)}"
self.logger.error(error_msg)
@@ -262,134 +272,136 @@ class VirtualTransferPump:
"success": False,
"message": error_msg,
"final_position": self._position,
"final_volume": self._current_volume
"final_volume": self._current_volume,
}
# 其他泵操作方法
async def pull_plunger(self, volume: float, velocity: float = None):
"""
拉取柱塞(吸液) 📥
Args:
volume (float): 要拉取的体积 (ml)
velocity (float): 拉取速度 (ml/s)
"""
new_position = min(self.max_volume, self._position + volume)
actual_volume = new_position - self._position
if actual_volume <= 0:
self.logger.warning("⚠️ 无法吸液 - 已达到最大容量")
return
display_duration = self._calculate_display_duration(actual_volume, velocity)
actual_duration = self._calculate_duration(actual_volume, velocity)
self.logger.info(f"📥 开始吸液: {actual_volume:.2f}mL")
self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL")
self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s")
if self._fast_mode:
self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s")
await self._simulate_operation(actual_duration)
self._position = new_position
self._current_volume = new_position
self.logger.info(f"✅ 吸液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL")
async def push_plunger(self, volume: float, velocity: float = None):
"""
推出柱塞(排液) 📤
Args:
volume (float): 要推出的体积 (ml)
velocity (float): 推出速度 (ml/s)
"""
new_position = max(0, self._position - volume)
actual_volume = self._position - new_position
if actual_volume <= 0:
self.logger.warning("⚠️ 无法排液 - 已达到最小容量")
return
display_duration = self._calculate_display_duration(actual_volume, velocity)
actual_duration = self._calculate_duration(actual_volume, velocity)
self.logger.info(f"📤 开始排液: {actual_volume:.2f}mL")
self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL")
self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s")
if self._fast_mode:
self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s")
await self._simulate_operation(actual_duration)
self._position = new_position
self._current_volume = new_position
self.logger.info(f"✅ 排液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL")
# 便捷操作方法
async def aspirate(self, volume: float, velocity: float = None):
"""吸液操作 📥"""
await self.pull_plunger(volume, velocity)
async def dispense(self, volume: float, velocity: float = None):
"""排液操作 📤"""
await self.push_plunger(volume, velocity)
async def transfer(self, volume: float, aspirate_velocity: float = None, dispense_velocity: float = None):
"""转移操作(先吸后排) 🔄"""
self.logger.info(f"🔄 开始转移操作: {volume:.2f}mL")
# 吸液
await self.aspirate(volume, aspirate_velocity)
# 短暂停顿
self.logger.debug("⏸️ 短暂停顿...")
await self._ros_node.sleep(0.1)
# 排液
await self.dispense(volume, dispense_velocity)
async def empty_syringe(self, velocity: float = None):
"""清空注射器"""
await self.set_position(0, velocity)
async def fill_syringe(self, velocity: float = None):
"""充满注射器"""
await self.set_position(self.max_volume, velocity)
async def stop_operation(self):
"""停止当前操作"""
self._status = "Idle"
self.logger.info("Operation stopped")
# 状态查询方法
def get_position(self) -> float:
"""获取当前位置"""
return self._position
def get_current_volume(self) -> float:
"""获取当前体积"""
return self._current_volume
def get_remaining_capacity(self) -> float:
"""获取剩余容量"""
return self.max_volume - self._current_volume
def is_empty(self) -> bool:
"""检查是否为空"""
return self._current_volume <= 0.01 # 允许小量误差
def is_full(self) -> bool:
"""检查是否已满"""
return self._current_volume >= (self.max_volume - 0.01) # 允许小量误差
def __str__(self):
return f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})"
return (
f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})"
)
def __repr__(self):
return self.__str__()
@@ -398,20 +410,20 @@ class VirtualTransferPump:
async def demo():
"""虚拟泵使用示例"""
pump = VirtualTransferPump("demo_pump", {"max_volume": 50.0})
await pump.initialize()
print(f"Initial state: {pump}")
# 测试set_position方法
result = await pump.set_position(10.0, max_velocity=2.0)
print(f"Set position result: {result}")
print(f"After setting position to 10ml: {pump}")
# 吸液测试
await pump.aspirate(5.0, velocity=2.0)
print(f"After aspirating 5ml: {pump}")
# 清空测试
result = await pump.set_position(0.0)
print(f"Empty result: {result}")

View File

@@ -11,9 +11,10 @@ Virtual Workbench Device - 模拟工作台设备
注意:调用来自线程池,使用 threading.Lock 进行同步
"""
import logging
import time
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum
from threading import Lock, RLock
@@ -21,38 +22,47 @@ from threading import Lock, RLock
from typing_extensions import TypedDict
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.utils.decorator import not_action
from unilabos.utils.decorator import not_action, always_free
from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES
# ============ TypedDict 返回类型定义 ============
class MoveToHeatingStationResult(TypedDict):
"""move_to_heating_station 返回类型"""
success: bool
station_id: int
material_id: str
material_number: int
message: str
unilabos_samples: List[LabSample]
class StartHeatingResult(TypedDict):
"""start_heating 返回类型"""
success: bool
station_id: int
material_id: str
material_number: int
message: str
unilabos_samples: List[LabSample]
class MoveToOutputResult(TypedDict):
"""move_to_output 返回类型"""
success: bool
station_id: int
material_id: str
unilabos_samples: List[LabSample]
class PrepareMaterialsResult(TypedDict):
"""prepare_materials 返回类型 - 批量准备物料"""
success: bool
count: int
material_1: int # 物料编号1
@@ -61,12 +71,15 @@ class PrepareMaterialsResult(TypedDict):
material_4: int # 物料编号4
material_5: int # 物料编号5
message: str
unilabos_samples: List[LabSample]
# ============ 状态枚举 ============
class HeatingStationState(Enum):
"""加热台状态枚举"""
IDLE = "idle" # 空闲
OCCUPIED = "occupied" # 已放置物料,等待加热
HEATING = "heating" # 加热中
@@ -75,6 +88,7 @@ class HeatingStationState(Enum):
class ArmState(Enum):
"""机械臂状态枚举"""
IDLE = "idle" # 空闲
BUSY = "busy" # 工作中
@@ -82,6 +96,7 @@ class ArmState(Enum):
@dataclass
class HeatingStation:
"""加热台数据结构"""
station_id: int
state: HeatingStationState = HeatingStationState.IDLE
current_material: Optional[str] = None # 当前物料 (如 "A1", "A2")
@@ -108,8 +123,8 @@ class VirtualWorkbench:
_ros_node: BaseROS2DeviceNode
# 配置常量
ARM_OPERATION_TIME: float = 3.0 # 机械臂操作时间(秒)
HEATING_TIME: float = 10.0 # 加热时间(秒)
ARM_OPERATION_TIME: float = 2 # 机械臂操作时间(秒)
HEATING_TIME: float = 60.0 # 加热时间(秒)
NUM_HEATING_STATIONS: int = 3 # 加热台数量
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
@@ -126,9 +141,9 @@ class VirtualWorkbench:
self.data: Dict[str, Any] = {}
# 从config中获取可配置参数
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", 3.0))
self.HEATING_TIME = float(self.config.get("heating_time", 10.0))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", 3))
self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", self.ARM_OPERATION_TIME))
self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME))
self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS))
# 机械臂状态和锁 (使用threading.Lock)
self._arm_lock = Lock()
@@ -137,8 +152,7 @@ class VirtualWorkbench:
# 加热台状态 (station_id -> HeatingStation) - 立即初始化不依赖initialize()
self._heating_stations: Dict[int, HeatingStation] = {
i: HeatingStation(station_id=i)
for i in range(1, self.NUM_HEATING_STATIONS + 1)
i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1)
}
self._stations_lock = RLock() # 可重入锁,保护加热台状态
@@ -178,14 +192,16 @@ class VirtualWorkbench:
station.heating_progress = 0.0
# 初始化状态
self.data.update({
"status": "Ready",
"arm_state": ArmState.IDLE.value,
"arm_current_task": None,
"heating_stations": self._get_stations_status(),
"active_tasks_count": 0,
"message": "工作台就绪",
})
self.data.update(
{
"status": "Ready",
"arm_state": ArmState.IDLE.value,
"arm_current_task": None,
"heating_stations": self._get_stations_status(),
"active_tasks_count": 0,
"message": "工作台就绪",
}
)
self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪")
return True
@@ -204,12 +220,14 @@ class VirtualWorkbench:
with self._tasks_lock:
self._active_tasks.clear()
self.data.update({
"status": "Offline",
"arm_state": ArmState.IDLE.value,
"heating_stations": {},
"message": "工作台已关闭",
})
self.data.update(
{
"status": "Offline",
"arm_state": ArmState.IDLE.value,
"heating_stations": {},
"message": "工作台已关闭",
}
)
return True
def _get_stations_status(self) -> Dict[int, Dict[str, Any]]:
@@ -227,12 +245,14 @@ class VirtualWorkbench:
def _update_data_status(self, message: Optional[str] = None):
"""更新状态数据"""
self.data.update({
"arm_state": self._arm_state.value,
"arm_current_task": self._arm_current_task,
"heating_stations": self._get_stations_status(),
"active_tasks_count": len(self._active_tasks),
})
self.data.update(
{
"arm_state": self._arm_state.value,
"arm_current_task": self._arm_current_task,
"heating_stations": self._get_stations_status(),
"active_tasks_count": len(self._active_tasks),
}
)
if message:
self.data["message"] = message
@@ -280,6 +300,7 @@ class VirtualWorkbench:
def prepare_materials(
self,
sample_uuids: SampleUUIDsType,
count: int = 5,
) -> PrepareMaterialsResult:
"""
@@ -297,10 +318,7 @@ class VirtualWorkbench:
# 生成物料列表 A1 - A{count}
materials = [i for i in range(1, count + 1)]
self.logger.info(
f"[准备物料] 生成 {count} 个物料: "
f"A1-A{count} -> material_1~material_{count}"
)
self.logger.info(f"[准备物料] 生成 {count} 个物料: " f"A1-A{count} -> material_1~material_{count}")
return {
"success": True,
@@ -311,10 +329,12 @@ class VirtualWorkbench:
"material_4": materials[3] if len(materials) > 3 else 0,
"material_5": materials[4] if len(materials) > 4 else 0,
"message": f"已准备 {count} 个物料: A1-A{count}",
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
}
def move_to_heating_station(
self,
sample_uuids: SampleUUIDsType,
material_number: int,
) -> MoveToHeatingStationResult:
"""
@@ -391,6 +411,9 @@ class VirtualWorkbench:
"material_id": material_id,
"material_number": material_number,
"message": f"{material_id}已成功移动到加热台{station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
except Exception as e:
@@ -403,10 +426,15 @@ class VirtualWorkbench:
"material_id": material_id,
"material_number": material_number,
"message": f"移动失败: {str(e)}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
@always_free
def start_heating(
self,
sample_uuids: SampleUUIDsType,
station_id: int,
material_number: int,
) -> StartHeatingResult:
@@ -429,6 +457,9 @@ class VirtualWorkbench:
"material_id": "",
"material_number": material_number,
"message": f"无效的加热台ID: {station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
with self._stations_lock:
@@ -441,6 +472,9 @@ class VirtualWorkbench:
"material_id": "",
"material_number": material_number,
"message": f"加热台{station_id}上没有物料",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
if station.state == HeatingStationState.HEATING:
@@ -450,6 +484,9 @@ class VirtualWorkbench:
"material_id": station.current_material,
"material_number": material_number,
"message": f"加热台{station_id}已经在加热中",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
material_id = station.current_material
@@ -465,10 +502,21 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}开始加热{material_id}")
# 模拟加热过程 (10秒)
# 打印当前所有正在加热的台位
with self._stations_lock:
heating_list = [
f"加热台{sid}:{s.current_material}"
for sid, s in self._heating_stations.items()
if s.state == HeatingStationState.HEATING and s.current_material
]
self.logger.info(f"[并行加热] 当前同时加热中: {', '.join(heating_list)}")
# 模拟加热过程
start_time = time.time()
last_countdown_log = start_time
while True:
elapsed = time.time() - start_time
remaining = max(0.0, self.HEATING_TIME - elapsed)
progress = min(100.0, (elapsed / self.HEATING_TIME) * 100)
with self._stations_lock:
@@ -476,6 +524,11 @@ class VirtualWorkbench:
self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%")
# 每5秒打印一次倒计时
if time.time() - last_countdown_log >= 5.0:
self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s")
last_countdown_log = time.time()
if elapsed >= self.HEATING_TIME:
break
@@ -499,10 +552,14 @@ class VirtualWorkbench:
"material_id": material_id,
"material_number": material_number,
"message": f"加热台{station_id}加热完成",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
def move_to_output(
self,
sample_uuids: SampleUUIDsType,
station_id: int,
material_number: int,
) -> MoveToOutputResult:
@@ -525,6 +582,9 @@ class VirtualWorkbench:
"material_id": "",
"output_position": f"C{output_number}",
"message": f"无效的加热台ID: {station_id}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
with self._stations_lock:
@@ -538,6 +598,9 @@ class VirtualWorkbench:
"material_id": "",
"output_position": f"C{output_number}",
"message": f"加热台{station_id}上没有物料",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
if station.state != HeatingStationState.COMPLETED:
@@ -547,6 +610,9 @@ class VirtualWorkbench:
"material_id": material_id,
"output_position": f"C{output_number}",
"message": f"加热台{station_id}尚未完成加热 (当前状态: {station.state.value})",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
output_position = f"C{output_number}"
@@ -595,6 +661,9 @@ class VirtualWorkbench:
"material_id": material_id,
"output_position": output_position,
"message": f"{material_id}已成功移动到{output_position}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
except Exception as e:
@@ -607,6 +676,9 @@ class VirtualWorkbench:
"material_id": "",
"output_position": output_position,
"message": f"移动失败: {str(e)}",
"unilabos_samples": [
LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for
sample_uuid, content in sample_uuids.items()]
}
# ============ 状态属性 ============

View File

@@ -96,10 +96,13 @@ serial:
type: string
port:
type: string
registry_name:
type: string
resource_tracker:
type: object
required:
- device_id
- registry_name
- port
type: object
data:

View File

@@ -67,6 +67,9 @@ camera:
period:
default: 0.1
type: number
registry_name:
default: ''
type: string
resource_tracker:
type: object
required: []

View File

@@ -9468,7 +9468,7 @@ liquid_handler.prcxi:
well_names: null
handles:
input:
- data_key: plate
- data_key: '@this.0@@@plate'
data_source: handle
data_type: resource
handler_key: input_plate
@@ -9503,81 +9503,78 @@ liquid_handler.prcxi:
type: string
type: array
plate:
items:
properties:
category:
properties:
category:
type: string
children:
items:
type: string
children:
items:
type: string
type: array
config:
type: string
data:
type: string
id:
type: string
name:
type: string
parent:
type: string
pose:
properties:
orientation:
properties:
w:
type: number
x:
type: number
y:
type: number
z:
type: number
required:
- x
- y
- z
- w
title: orientation
type: object
position:
properties:
x:
type: number
y:
type: number
z:
type: number
required:
- x
- y
- z
title: position
type: object
required:
- position
- orientation
title: pose
type: object
sample_id:
type: string
type:
type: string
required:
- id
- name
- sample_id
- children
- parent
- type
- category
- pose
- config
- data
title: plate
type: object
type: array
config:
type: string
data:
type: string
id:
type: string
name:
type: string
parent:
type: string
pose:
properties:
orientation:
properties:
w:
type: number
x:
type: number
y:
type: number
z:
type: number
required:
- x
- y
- z
- w
title: orientation
type: object
position:
properties:
x:
type: number
y:
type: number
z:
type: number
required:
- x
- y
- z
title: position
type: object
required:
- position
- orientation
title: pose
type: object
sample_id:
type: string
type:
type: string
required:
- id
- name
- sample_id
- children
- parent
- type
- category
- pose
- config
- data
title: plate
type: array
type: object
volumes:
items:
type: number
@@ -9593,17 +9590,207 @@ liquid_handler.prcxi:
- volumes
type: object
result:
$defs:
ResourceDict:
properties:
class:
description: Resource class name
title: Class
type: string
config:
additionalProperties: true
description: Resource configuration
title: Config
type: object
data:
additionalProperties: true
description: 'Resource data, eg: container liquid data'
title: Data
type: object
description:
default: ''
description: Resource description
title: Description
type: string
extra:
additionalProperties: true
description: 'Extra data, eg: slot index'
title: Extra
type: object
icon:
default: ''
description: Resource icon
title: Icon
type: string
id:
description: Resource ID
title: Id
type: string
model:
additionalProperties: true
description: Resource model
title: Model
type: object
name:
description: Resource name
title: Name
type: string
parent:
anyOf:
- $ref: '#/$defs/ResourceDict'
- type: 'null'
default: null
description: Parent resource object
parent_uuid:
anyOf:
- type: string
- type: 'null'
default: null
description: Parent resource uuid
title: Parent Uuid
pose:
$ref: '#/$defs/ResourceDictPosition'
description: Resource position
schema:
additionalProperties: true
description: Resource schema
title: Schema
type: object
type:
anyOf:
- const: device
type: string
- type: string
description: Resource type
title: Type
uuid:
description: Resource UUID
title: Uuid
type: string
required:
- id
- uuid
- name
- type
- class
- config
- data
- extra
title: ResourceDict
type: object
ResourceDictPosition:
properties:
cross_section_type:
default: rectangle
description: Cross section type
enum:
- rectangle
- circle
- rounded_rectangle
title: Cross Section Type
type: string
layout:
default: x-y
description: Resource layout
enum:
- 2d
- x-y
- z-y
- x-z
title: Layout
type: string
position:
$ref: '#/$defs/ResourceDictPositionObject'
description: Resource position
position3d:
$ref: '#/$defs/ResourceDictPositionObject'
description: Resource position in 3D space
rotation:
$ref: '#/$defs/ResourceDictPositionObject'
description: Resource rotation
scale:
$ref: '#/$defs/ResourceDictPositionScale'
description: Resource scale
size:
$ref: '#/$defs/ResourceDictPositionSize'
description: Resource size
title: ResourceDictPosition
type: object
ResourceDictPositionObject:
properties:
x:
default: 0.0
description: X coordinate
title: X
type: number
y:
default: 0.0
description: Y coordinate
title: Y
type: number
z:
default: 0.0
description: Z coordinate
title: Z
type: number
title: ResourceDictPositionObject
type: object
ResourceDictPositionScale:
properties:
x:
default: 0.0
description: x scale
title: X
type: number
y:
default: 0.0
description: y scale
title: Y
type: number
z:
default: 0.0
description: z scale
title: Z
type: number
title: ResourceDictPositionScale
type: object
ResourceDictPositionSize:
properties:
depth:
default: 0.0
description: Depth
title: Depth
type: number
height:
default: 0.0
description: Height
title: Height
type: number
width:
default: 0.0
description: Width
title: Width
type: number
title: ResourceDictPositionSize
type: object
properties:
plate:
items: {}
items:
items:
$ref: '#/$defs/ResourceDict'
type: array
title: Plate
type: array
volumes:
items: {}
items:
type: number
title: Volumes
type: array
wells:
items: {}
items:
items:
$ref: '#/$defs/ResourceDict'
type: array
title: Wells
type: array
required:

View File

@@ -5835,6 +5835,25 @@ virtual_workbench:
- material_number
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: move_to_heating_station 返回类型
properties:
material_id:
@@ -5853,12 +5872,18 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- station_id
- material_id
- material_number
- message
- unilabos_samples
title: MoveToHeatingStationResult
type: object
required:
@@ -5903,6 +5928,25 @@ virtual_workbench:
- material_number
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: move_to_output 返回类型
properties:
material_id:
@@ -5914,10 +5958,16 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- station_id
- material_id
- unilabos_samples
title: MoveToOutputResult
type: object
required:
@@ -5972,6 +6022,25 @@ virtual_workbench:
required: []
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: prepare_materials 返回类型 - 批量准备物料
properties:
count:
@@ -5998,6 +6067,11 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- count
@@ -6007,6 +6081,7 @@ virtual_workbench:
- material_4
- material_5
- message
- unilabos_samples
title: PrepareMaterialsResult
type: object
required:
@@ -6015,6 +6090,7 @@ virtual_workbench:
type: object
type: UniLabJsonCommand
auto-start_heating:
always_free: true
feedback: {}
goal: {}
goal_default:
@@ -6062,6 +6138,25 @@ virtual_workbench:
- material_number
type: object
result:
$defs:
LabSample:
properties:
extra:
additionalProperties: true
title: Extra
type: object
oss_path:
title: Oss Path
type: string
sample_uuid:
title: Sample Uuid
type: string
required:
- sample_uuid
- oss_path
- extra
title: LabSample
type: object
description: start_heating 返回类型
properties:
material_id:
@@ -6079,12 +6174,18 @@ virtual_workbench:
success:
title: Success
type: boolean
unilabos_samples:
items:
$ref: '#/$defs/LabSample'
title: Unilabos Samples
type: array
required:
- success
- station_id
- material_id
- material_number
- message
- unilabos_samples
title: StartHeatingResult
type: object
required:

View File

@@ -5,6 +5,7 @@ import sys
import inspect
import importlib
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Union, Tuple
@@ -88,6 +89,14 @@ class Registry:
)
test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。"
test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {})
test_resource_schema = self._generate_unilab_json_command_schema(
test_resource_method_info.get("args", []),
"test_resource",
test_resource_method_info.get("return_annotation"),
)
test_resource_schema["description"] = "用于测试物料、设备和样本。"
self.device_type_registry.update(
{
"host_node": {
@@ -166,7 +175,8 @@ class Registry:
"res_id": "unilabos_resources", # 将当前实验室的全部物料id作为下拉框可选择
"device_id": "unilabos_devices", # 将当前实验室的全部设备id作为下拉框可选择
"parent": "unilabos_nodes", # 将当前实验室的设备/物料作为下拉框可选择
"class_name": "unilabos_class",
"class_name": "unilabos_class", # 当前实验室物料的class name
"slot_on_deck": "unilabos_resource_slot:parent", # 勾选的parent的config中的sites的name展示name参数对应slotindex
},
},
"test_latency": {
@@ -189,32 +199,7 @@ class Registry:
"goal": {},
"feedback": {},
"result": {},
"schema": {
"description": "",
"properties": {
"feedback": {},
"goal": {
"properties": {
"resource": ros_message_to_json_schema(Resource, "resource"),
"resources": {
"items": {
"properties": ros_message_to_json_schema(
Resource, "resources"
),
"type": "object",
},
"type": "array",
},
"device": {"type": "string"},
"devices": {"items": {"type": "string"}, "type": "array"},
},
"type": "object",
},
"result": {},
},
"title": "test_resource",
"type": "object",
},
"schema": test_resource_schema,
"placeholder_keys": {
"device": "unilabos_devices",
"devices": "unilabos_devices",
@@ -838,6 +823,7 @@ class Registry:
("list", "unilabos.registry.placeholder_type:DeviceSlot"),
]
},
**({"always_free": True} if v.get("always_free") else {}),
}
for k, v in enhanced_info["action_methods"].items()
if k not in device_config["class"]["action_value_mappings"]
@@ -943,6 +929,7 @@ class Registry:
if is_valid:
results.append((file, data, device_ids))
except Exception as e:
traceback.print_exc()
logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}")
# 线程安全地更新注册表

View File

@@ -1,10 +1,6 @@
import json
from typing import Dict, Any
from pylabrobot.resources import Container
from unilabos_msgs.msg import Resource
from unilabos.ros.msgs.message_converter import convert_from_ros_msg
class RegularContainer(Container):
@@ -16,12 +12,12 @@ class RegularContainer(Container):
kwargs["size_y"] = 0
if "size_z" not in kwargs:
kwargs["size_z"] = 0
self.kwargs = kwargs
self.state = {}
super().__init__(*args, category="container", **kwargs)
def load_state(self, state: Dict[str, Any]):
self.state = state
super().load_state(state)
def get_regular_container(name="container"):
@@ -29,7 +25,6 @@ def get_regular_container(name="container"):
r.category = "container"
return r
#
# class RegularContainer(object):
# # 第一个参数必须是id传入
# # noinspection PyShadowingBuiltins
@@ -89,4 +84,4 @@ def get_regular_container(name="container"):
# return to_dict
#
# def __str__(self):
# return f"{self.id}"
# return f"{self.id}"

View File

@@ -76,7 +76,7 @@ def canonicalize_nodes_data(
if sample_id:
logger.error(f"{node}的sample_id参数已弃用sample_id: {sample_id}")
for k in list(node.keys()):
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children", "pose"]:
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children", "pose", "extra"]:
v = node.pop(k)
node["config"][k] = v
if outer_host_node_id is not None:

View File

@@ -5,6 +5,8 @@ from pydantic import BaseModel, field_serializer, field_validator, ValidationErr
from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
from typing_extensions import TypedDict
from unilabos.resources.plr_additional_res_reg import register
from unilabos.utils.log import logger
@@ -14,6 +16,7 @@ if TYPE_CHECKING:
EXTRA_CLASS = "unilabos_resource_class"
FRONTEND_POSE_EXTRA = "unilabos_frontend_pose_extra"
EXTRA_SAMPLE_UUID = "sample_uuid"
EXTRA_UNILABOS_SAMPLE_UUID = "unilabos_sample_uuid"
@@ -30,24 +33,58 @@ RETURN_UNILABOS_SAMPLES = "unilabos_samples"
SampleUUIDsType = Dict[str, Optional["PLRResource"]]
class LabSample(TypedDict):
sample_uuid: str
oss_path: str
extra: Dict[str, Any]
class ResourceDictPositionSizeType(TypedDict):
depth: float
width: float
height: float
class ResourceDictPositionSize(BaseModel):
depth: float = Field(description="Depth", default=0.0) # z
width: float = Field(description="Width", default=0.0) # x
height: float = Field(description="Height", default=0.0) # y
class ResourceDictPositionScaleType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionScale(BaseModel):
x: float = Field(description="x scale", default=0.0)
y: float = Field(description="y scale", default=0.0)
z: float = Field(description="z scale", default=0.0)
class ResourceDictPositionObjectType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionObject(BaseModel):
x: float = Field(description="X coordinate", default=0.0)
y: float = Field(description="Y coordinate", default=0.0)
z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType
layout: Literal["2d", "x-y", "z-y", "x-z"]
position: ResourceDictPositionObjectType
position3d: ResourceDictPositionObjectType
rotation: ResourceDictPositionObjectType
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"]
class ResourceDictPosition(BaseModel):
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
@@ -64,6 +101,25 @@ class ResourceDictPosition(BaseModel):
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field(
description="Cross section type", default="rectangle"
)
extra: Optional[Dict[str, Any]] = Field(description="Extra data", default=None)
class ResourceDictType(TypedDict):
id: str
uuid: str
name: str
description: str
resource_schema: Dict[str, Any]
model: Dict[str, Any]
icon: str
parent_uuid: Optional[str]
parent: Optional["ResourceDictType"]
type: Union[Literal["device"], str]
klass: str
pose: ResourceDictPositionType
config: Dict[str, Any]
data: Dict[str, Any]
extra: Dict[str, Any]
# 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化
@@ -357,6 +413,15 @@ class ResourceTreeSet(object):
"tip_spot": "tip_spot",
"tube": "tube",
"bottle_carrier": "bottle_carrier",
"material_hole": "material_hole",
"container": "container",
"material_plate": "material_plate",
"electrode_sheet": "electrode_sheet",
"warehouse": "warehouse",
"magazine_holder": "magazine_holder",
"resource_group": "resource_group",
"trash": "trash",
"plate_adapter": "plate_adapter",
}
if source in replace_info:
return replace_info[source]
@@ -400,6 +465,7 @@ class ResourceTreeSet(object):
"position3d": raw_pos,
"rotation": d["rotation"],
"cross_section_type": d.get("cross_section_type", "rectangle"),
"extra": extra.get(FRONTEND_POSE_EXTRA)
}
# 先构建当前节点的字典不包含children
@@ -485,6 +551,7 @@ class ResourceTreeSet(object):
name_to_uuid[node.res_content.name] = node.res_content.uuid
all_states[node.res_content.name] = node.res_content.data
name_to_extra[node.res_content.name] = node.res_content.extra
name_to_extra[node.res_content.name][FRONTEND_POSE_EXTRA] = node.res_content.pose.extra
name_to_extra[node.res_content.name][EXTRA_CLASS] = node.res_content.klass
for child in node.children:
collect_node_data(child, name_to_uuid, all_states, name_to_extra)
@@ -553,7 +620,7 @@ class ResourceTreeSet(object):
plr_resources.append(plr_resource)
except Exception as e:
logger.error(f"转换 PLR 资源失败: {e}")
logger.error(f"转换 PLR 资源失败: {e} {str(plr_dict)[:1000]}")
import traceback
logger.error(f"堆栈: {traceback.format_exc()}")
@@ -773,14 +840,27 @@ class ResourceTreeSet(object):
f"从远端同步了 {added_count} 个物料子树"
)
else:
# 情况2: 二级物料(不是 device
if remote_child_name not in local_children_map:
# 引入整个子树
remote_child.res_content.parent = local_device.res_content
local_device.children.append(remote_child)
logger.info(f"Device '{remote_root_id}': 从远端同步物料子树 '{remote_child_name}'")
else:
logger.info(f"物料 '{remote_root_id}/{remote_child_name}' 已存在,跳过")
# 二级物料已存在,比较三级子节点是否缺失
local_material = local_children_map[remote_child_name]
local_material_children_map = {child.res_content.name: child for child in
local_material.children}
added_count = 0
for remote_sub in remote_child.children:
remote_sub_name = remote_sub.res_content.name
if remote_sub_name not in local_material_children_map:
remote_sub.res_content.parent = local_material.res_content
local_material.children.append(remote_sub)
added_count += 1
else:
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}/{remote_sub_name}' "
f"已存在,跳过"
)
if added_count > 0:
logger.info(
f"物料 '{remote_root_id}/{remote_child_name}': "
f"从远端同步了 {added_count} 个子物料"
)
else:
# 情况1: 一级节点是物料(不是 device
# 检查是否已存在

View File

@@ -44,8 +44,7 @@ def ros2_device_node(
# 从属性中自动发现可发布状态
if status_types is None:
status_types = {}
if device_config is None:
raise ValueError("device_config cannot be None")
assert device_config is not None, "device_config cannot be None"
if action_value_mappings is None:
action_value_mappings = {}
if hardware_interface is None:

View File

@@ -146,7 +146,7 @@ def init_wrapper(
device_id: str,
device_uuid: str,
driver_class: type[T],
device_config: ResourceTreeInstance,
device_config: ResourceDictInstance,
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
hardware_interface: Dict[str, Any],
@@ -279,6 +279,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self,
driver_instance: T,
device_id: str,
registry_name: str,
device_uuid: str,
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
@@ -300,6 +301,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"""
self.driver_instance = driver_instance
self.device_id = device_id
self.registry_name = registry_name
self.uuid = device_uuid
self.publish_high_frequency = False
self.callback_group = ReentrantCallbackGroup()
@@ -412,16 +414,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
else:
for r in rts.root_nodes:
r.res_content.parent_uuid = self.uuid
if (
len(LIQUID_INPUT_SLOT)
and LIQUID_INPUT_SLOT[0] == -1
and len(rts.root_nodes) == 1
and isinstance(rts.root_nodes[0], RegularContainer)
):
rts_plr_instances = rts.to_plr_resources()
if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer):
# noinspection PyTypeChecker
container_instance: RegularContainer = rts.root_nodes[0]
found_resources = self.resource_tracker.figure_resource({"id": container_instance.name}, try_mode=True)
container_instance: RegularContainer = rts_plr_instances[0]
found_resources = self.resource_tracker.figure_resource(
{"name": container_instance.name}, try_mode=True
)
if not len(found_resources):
self.resource_tracker.add_resource(container_instance)
logger.info(f"添加物料{container_instance.name}到资源跟踪器")
@@ -430,7 +429,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
found_resource = found_resources[0]
if isinstance(found_resource, RegularContainer):
logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}")
found_resource.state.update(json.loads(container_instance.state))
found_resource.state.update(container_instance.state)
elif isinstance(found_resource, dict):
raise ValueError("已不支持 字典 版本的RegularContainer")
else:
@@ -443,7 +442,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"action": "add",
"data": {
"data": rts.dump(),
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "",
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else self.uuid,
"first_add": False,
},
}
@@ -461,7 +460,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
}
res.response = json.dumps(final_response)
# 如果driver自己就有assign的方法那就使用driver自己的assign方法
if hasattr(self.driver_instance, "create_resource"):
if hasattr(self.driver_instance, "create_resource") and self.node_name != "host_node":
create_resource_func = getattr(self.driver_instance, "create_resource")
try:
ret = create_resource_func(
@@ -916,8 +915,24 @@ class BaseROS2DeviceNode(Node, Generic[T]):
else []
)
if target_site is not None and sites is not None and site_names is not None:
site_index = sites.index(original_instance)
site_name = site_names[site_index]
site_index = None
try:
# sites 可能是 Resource 列表或 dict 列表 (如 PRCXI9300Deck)
# 只有itemized_carrier在使用准备弃用
site_index = sites.index(original_instance)
except ValueError:
# dict 类型的 sites: 通过name匹配
for idx, site in enumerate(sites):
if original_instance.name == site["occupied_by"]:
site_index = idx
break
elif (original_instance.location.x == site["position"]["x"] and original_instance.location.y == site["position"]["y"] and original_instance.location.z == site["position"]["z"]):
site_index = idx
break
if site_index is None:
site_name = None
else:
site_name = site_names[site_index]
if site_name != target_site:
parent = self.transfer_to_new_resource(original_instance, tree, additional_add_params)
if parent is not None:
@@ -925,6 +940,14 @@ class BaseROS2DeviceNode(Node, Generic[T]):
parent_appended = True
# 加载状态
# noinspection PyProtectedMember
original_instance._size_x = plr_resource._size_x
# noinspection PyProtectedMember
original_instance._size_y = plr_resource._size_y
# noinspection PyProtectedMember
original_instance._size_z = plr_resource._size_z
# noinspection PyProtectedMember
original_instance._local_size_z = plr_resource._local_size_z
original_instance.location = plr_resource.location
original_instance.rotation = plr_resource.rotation
original_instance.barcode = plr_resource.barcode
@@ -985,7 +1008,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
].call_async(
r
) # type: ignore
self.lab_logger().info(f"确认资源云端 Add 结果: {response.response}")
self.lab_logger().trace(f"确认资源云端 Add 结果: {response.response}")
results.append(result)
elif action == "update":
if tree_set is None:
@@ -1011,7 +1034,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
].call_async(
r
) # type: ignore
self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}")
self.lab_logger().trace(f"确认资源云端 Update 结果: {response.response}")
results.append(result)
elif action == "remove":
result = _handle_remove(resources_uuid)
@@ -1157,6 +1180,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"machine_name": BasicConfig.machine_name,
"type": "slave",
"edge_device_id": self.device_id,
"registry_name": self.registry_name,
}
},
ensure_ascii=False,
@@ -1538,11 +1562,18 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if isinstance(rs, list):
for r in rs:
res = self.resource_tracker.parent_resource(r) # 获取 resource 对象
if res is None:
res = rs
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
else:
res = self.resource_tracker.parent_resource(rs)
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
if res is None:
res = rs
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
# 使用新的资源树接口
if unique_resources:
@@ -1624,9 +1655,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
else:
resolved_sample_uuids[sample_uuid] = material_uuid
function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids
self.lab_logger().debug(
f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}"
)
self.lab_logger().debug(f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}")
continue
# 处理单个 ResourceSlot
@@ -2003,6 +2032,7 @@ class ROS2DeviceNode:
if driver_is_ros:
driver_params["device_id"] = device_id
driver_params["registry_name"] = device_config.res_content.klass
driver_params["resource_tracker"] = self.resource_tracker
self._driver_instance = self._driver_creator.create_instance(driver_params)
if self._driver_instance is None:
@@ -2020,6 +2050,7 @@ class ROS2DeviceNode:
children=children,
driver_instance=self._driver_instance, # type: ignore
device_id=device_id,
registry_name=device_config.res_content.klass,
device_uuid=device_uuid,
status_types=status_types,
action_value_mappings=action_value_mappings,
@@ -2031,6 +2062,7 @@ class ROS2DeviceNode:
self._ros_node = BaseROS2DeviceNode(
driver_instance=self._driver_instance,
device_id=device_id,
registry_name=device_config.res_content.klass,
device_uuid=device_uuid,
status_types=status_types,
action_value_mappings=action_value_mappings,
@@ -2039,6 +2071,7 @@ class ROS2DeviceNode:
resource_tracker=self.resource_tracker,
)
self._ros_node: BaseROS2DeviceNode
# 将注册表类型名传递给BaseROS2DeviceNode,用于slave上报
self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}")
self.driver_instance._ros_node = self._ros_node # type: ignore
self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore

View File

@@ -6,12 +6,13 @@ from cv_bridge import CvBridge
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker
class VideoPublisher(BaseROS2DeviceNode):
def __init__(self, device_id='video_publisher', device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
def __init__(self, device_id='video_publisher', registry_name="", device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None):
# 初始化BaseROS2DeviceNode使用自身作为driver_instance
BaseROS2DeviceNode.__init__(
self,
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
device_uuid=device_uuid,
status_types={},
action_value_mappings={},

View File

@@ -10,6 +10,7 @@ class ControllerNode(BaseROS2DeviceNode):
def __init__(
self,
device_id: str,
registry_name: str,
controller_func: Callable,
update_rate: float,
inputs: Dict[str, Dict[str, type | str]],
@@ -51,6 +52,7 @@ class ControllerNode(BaseROS2DeviceNode):
self,
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types=status_types,
action_value_mappings=action_value_mappings,
hardware_interface=hardware_interface,

View File

@@ -35,7 +35,7 @@ from unilabos.resources.resource_tracker import (
ResourceTreeInstance,
RETURN_UNILABOS_SAMPLES,
JSON_UNILABOS_PARAM,
PARAM_SAMPLE_UUIDS,
PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample,
)
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
@@ -51,6 +51,7 @@ from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info
from unilabos.config.config import BasicConfig
if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem
@@ -63,7 +64,8 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[DeviceSlot]
devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict):
@@ -248,6 +250,7 @@ class HostNode(BaseROS2DeviceNode):
self,
driver_instance=self,
device_id=device_id,
registry_name="host_node",
device_uuid=host_node_dict["uuid"],
status_types={},
action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"],
@@ -302,7 +305,8 @@ class HostNode(BaseROS2DeviceNode):
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
) # device_id -> action_value_mappings(本地+远程设备统一存储)
self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings)
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
self._last_discovery_time = 0.0 # 上次设备发现的时间
@@ -636,6 +640,8 @@ class HostNode(BaseROS2DeviceNode):
self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d
# noinspection PyProtectedMember
self._action_value_mappings[device_id] = d._ros_node._action_value_mappings
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith(
"UniLabJsonCommand"
@@ -772,6 +778,17 @@ class HostNode(BaseROS2DeviceNode):
u = uuid.UUID(item.job_id)
device_id = item.device_id
action_name = item.action_name
if BasicConfig.test_mode:
action_id = f"/devices/{device_id}/{action_name}"
self.lab_logger().info(
f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}"
)
# 根据注册表 handles 构建模拟返回值
mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs)
self._handle_test_mode_result(item, action_id, mock_return)
return
if action_type.startswith("UniLabJsonCommand"):
if action_name.startswith("auto-"):
action_name = action_name[5:]
@@ -809,6 +826,51 @@ class HostNode(BaseROS2DeviceNode):
)
future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f))
def _build_test_mode_return(
self, device_id: str, action_name: str, action_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""
根据注册表 handles 的 output 定义构建测试模式的模拟返回值
根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。
例如: "vessel"{}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]]
"""
mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name}
action_mappings = self._action_value_mappings.get(device_id, {})
action_mapping = action_mappings.get(action_name, {})
handles = action_mapping.get("handles", {})
if isinstance(handles, dict):
for output_handle in handles.get("output", []):
data_key = output_handle.get("data_key", "")
handler_key = output_handle.get("handler_key", "")
# 根据 @flatten 层数构建嵌套数组,叶子为空字典
flatten_count = data_key.count("@flatten")
value: Any = {}
for _ in range(flatten_count):
value = [value]
mock_return[handler_key] = value
return mock_return
def _handle_test_mode_result(
self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any]
) -> None:
"""
测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS
"""
job_id = item.job_id
status = "success"
return_info = serialize_result_info("", True, mock_return)
self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}")
from unilabos.app.web.controller import store_job_result
store_job_result(job_id, status, return_info, mock_return)
# 发布状态到桥接器
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(mock_return, item, status, return_info)
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""目标响应回调"""
goal_handle = future.result()
@@ -863,7 +925,7 @@ class HostNode(BaseROS2DeviceNode):
f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}"
f"{'...' if len(unilabos_samples) > 5 else ''}"
)
return_info[RETURN_UNILABOS_SAMPLES] = unilabos_samples
return_info["samples"] = unilabos_samples
suc = return_info.get("suc", False)
if not suc:
status = "failed"
@@ -1133,7 +1195,7 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点")
# 还需要加入到资源图中,暂不实现,考虑资源图新的获取方式
response.response = json.dumps(uuid_mapping)
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
self.lab_logger().info(f"[Host Node-Resource] Resource tree update completed, success: {success}")
async def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response):
"""
@@ -1168,6 +1230,10 @@ class HostNode(BaseROS2DeviceNode):
def _node_info_update_callback(self, request, response):
"""
更新节点信息回调
处理两种消息:
1. 首次上报(main_slave_run): 带 devices_config + registry_config,存储 action_value_mappings
2. 设备重注册(SYNC_SLAVE_NODE_INFO): 带 edge_device_id + registry_name,用 registry_name 索引已存储的 mappings
"""
self.lab_logger().trace(f"[Host Node] Node info update request received: {request}")
try:
@@ -1179,12 +1245,48 @@ class HostNode(BaseROS2DeviceNode):
info = info["SYNC_SLAVE_NODE_INFO"]
machine_name = info["machine_name"]
edge_device_id = info["edge_device_id"]
registry_name = info.get("registry_name", "")
self.device_machine_names[edge_device_id] = machine_name
# 用 registry_name 索引已存储的 registry_config,获取 action_value_mappings
if registry_name and registry_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[registry_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[edge_device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Loaded {len(action_mappings)} action mappings "
f"for remote device {edge_device_id} (registry: {registry_name})"
)
else:
devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config")
if registry_config:
http_client.resource_registry({"resources": registry_config})
# 存储 slave 的 registry_config,用于后续 SYNC_SLAVE_NODE_INFO 索引
for reg_name, reg_data in registry_config.items():
if isinstance(reg_data, dict) and "class" in reg_data:
self._slave_registry_configs[reg_name] = reg_data
# 解析 devices_config,建立 device_id -> action_value_mappings 映射
if devices_config:
for device_tree in devices_config:
for device_dict in device_tree:
device_id = device_dict.get("id", "")
class_name = device_dict.get("class", "")
if device_id and class_name and class_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[class_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Stored {len(action_mappings)} action mappings "
f"for remote device {device_id} (class: {class_name})"
)
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
@@ -1481,6 +1583,7 @@ class HostNode(BaseROS2DeviceNode):
def test_resource(
self,
sample_uuids: SampleUUIDsType,
resource: ResourceSlot = None,
resources: List[ResourceSlot] = None,
device: DeviceSlot = None,
@@ -1495,6 +1598,7 @@ class HostNode(BaseROS2DeviceNode):
return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(),
"devices": [device, *devices],
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
}
def handle_pong_response(self, pong_data: dict):

View File

@@ -7,10 +7,11 @@ from rclpy.callback_groups import ReentrantCallbackGroup
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class JointRepublisher(BaseROS2DeviceNode):
def __init__(self,device_id,resource_tracker, **kwargs):
def __init__(self,device_id, registry_name, resource_tracker, **kwargs):
super().__init__(
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types={},
action_value_mappings={},
hardware_interface={},

View File

@@ -26,7 +26,7 @@ from unilabos.resources.graphio import initialize_resources
from unilabos.registry.registry import lab_registry
class ResourceMeshManager(BaseROS2DeviceNode):
def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", rate=50, **kwargs):
def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", registry_name: str = "", rate=50, **kwargs):
"""初始化资源网格管理器节点
Args:
@@ -37,6 +37,7 @@ class ResourceMeshManager(BaseROS2DeviceNode):
super().__init__(
driver_instance=self,
device_id=device_id,
registry_name=registry_name,
status_types={},
action_value_mappings={},
hardware_interface={},

View File

@@ -7,7 +7,7 @@ from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeRe
class ROS2SerialNode(BaseROS2DeviceNode):
def __init__(self, device_id, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None):
def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None):
# 保存属性,以便在调用父类初始化前使用
self.port = port
self.baudrate = baudrate
@@ -28,6 +28,7 @@ class ROS2SerialNode(BaseROS2DeviceNode):
BaseROS2DeviceNode.__init__(
self,
driver_instance=self,
registry_name=registry_name,
device_id=device_id,
status_types={},
action_value_mappings={},

View File

@@ -47,6 +47,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
*,
driver_instance: "WorkstationBase",
device_id: str,
registry_name: str,
device_uuid: str,
status_types: Dict[str, Any],
action_value_mappings: Dict[str, Any],
@@ -62,6 +63,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
super().__init__(
driver_instance=driver_instance,
device_id=device_id,
registry_name=registry_name,
device_uuid=device_uuid,
status_types=status_types,
action_value_mappings={**action_value_mappings, **self.protocol_action_mappings},
@@ -340,6 +342,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False)
# 获取父资源
res = self.resource_tracker.parent_resource(plr)
if res is None:
res = plr
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)

View File

@@ -52,7 +52,8 @@ class DeviceClassCreator(Generic[T]):
if self.device_instance is not None:
for c in self.children:
if c.res_content.type != "device":
self.resource_tracker.add_resource(c.get_plr_nested_dict())
res = ResourceTreeSet([ResourceTreeInstance(c)]).to_plr_resources()[0]
self.resource_tracker.add_resource(res)
def create_instance(self, data: Dict[str, Any]) -> T:
"""
@@ -119,7 +120,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
# return resource, source_type
def _process_resource_references(
self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None
self, data: Any, processed_child_names: Optional[Dict[str, Any]], to_dict=False, states=None, prefix_path="", name_to_uuid=None
) -> Any:
"""
递归处理资源引用替换_resource_child_name对应的资源
@@ -164,6 +165,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
states[prefix_path] = resource_instance.serialize_all_state()
return serialized
else:
processed_child_names[child_name] = resource_instance
self.resource_tracker.add_resource(resource_instance)
# 立即设置UUIDstate已经在resource_ulab_to_plr中处理过了
if name_to_uuid:
@@ -182,12 +184,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
result = {}
for key, value in data.items():
new_prefix = f"{prefix_path}.{key}" if prefix_path else key
result[key] = self._process_resource_references(value, to_dict, states, new_prefix, name_to_uuid)
result[key] = self._process_resource_references(value, processed_child_names, to_dict, states, new_prefix, name_to_uuid)
return result
elif isinstance(data, list):
return [
self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid)
self._process_resource_references(item, processed_child_names, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid)
for i, item in enumerate(data)
]
@@ -234,7 +236,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
# 首先处理资源引用
states = {}
processed_data = self._process_resource_references(
data, to_dict=True, states=states, name_to_uuid=name_to_uuid
data, {}, to_dict=True, states=states, name_to_uuid=name_to_uuid
)
try:
@@ -270,7 +272,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
arg_value = spec_args[param_name].annotation
data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value
logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}")
processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid)
processed_child_names = {}
processed_data = self._process_resource_references(data, processed_child_names, to_dict=False, name_to_uuid=name_to_uuid)
for child_name, resource_instance in processed_data.items():
for ind, name in enumerate([child.res_content.name for child in self.children]):
if name == child_name:
self.children.pop(ind)
self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用调用的自身的attach_resource
except Exception as e:
logger.error(f"PyLabRobot创建实例失败: {e}")
@@ -342,9 +349,10 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
try:
# 创建实例额外补充一个给protocol node的字段后面考虑取消
data["children"] = self.children
for child in self.children:
if child.res_content.type != "device":
self.resource_tracker.add_resource(child.get_plr_nested_dict())
# super(WorkstationNodeCreator, self).create_instance(data)的时候会attach
# for child in self.children:
# if child.res_content.type != "device":
# self.resource_tracker.add_resource(child.get_plr_nested_dict())
deck_dict = data.get("deck")
if deck_dict:
from pylabrobot.resources import Deck, Resource

View File

@@ -339,13 +339,8 @@
"z": 0
},
"config": {
"max_volume": 500.0,
"type": "RegularContainer",
"category": "container",
"max_temp": 200.0,
"min_temp": -20.0,
"has_stirrer": true,
"has_heater": true
"category": "container"
},
"data": {
"liquids": [],
@@ -769,9 +764,7 @@
"size_y": 250,
"size_z": 0,
"type": "RegularContainer",
"category": "container",
"reagent": "sodium_chloride",
"physical_state": "solid"
"category": "container"
},
"data": {
"current_mass": 500.0,
@@ -792,14 +785,11 @@
"z": 0
},
"config": {
"volume": 500.0,
"size_x": 600,
"size_y": 250,
"size_z": 0,
"type": "RegularContainer",
"category": "container",
"reagent": "sodium_carbonate",
"physical_state": "solid"
"category": "container"
},
"data": {
"current_mass": 500.0,
@@ -820,14 +810,11 @@
"z": 0
},
"config": {
"volume": 500.0,
"size_x": 650,
"size_y": 250,
"size_z": 0,
"type": "RegularContainer",
"category": "container",
"reagent": "magnesium_chloride",
"physical_state": "solid"
"category": "container"
},
"data": {
"current_mass": 500.0,

File diff suppressed because it is too large Load Diff

View File

@@ -184,6 +184,51 @@ def get_all_subscriptions(instance) -> list:
return subscriptions
def always_free(func: F) -> F:
"""
标记动作为永久闲置(不受busy队列限制)的装饰器
被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制,
任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。
Example:
class MyDriver:
@always_free
def query_status(self, param: str):
# 这个动作可以随时执行,不需要排队
return self._status
def transfer(self, volume: float):
# 这个动作会按正常排队逻辑执行
pass
Note:
- 可以与其他装饰器组合使用,@always_free 应放在最外层
- 仅影响 WebSocket 调度层的 busy/free 判断,不影响 ROS2 层
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_always_free(func) -> bool:
"""
检查函数是否被标记为永久闲置
Args:
func: 被检查的函数
Returns:
如果函数被 @always_free 装饰则返回 True否则返回 False
"""
return getattr(func, "_is_always_free", False)
def not_action(func: F) -> F:
"""
标记方法为非动作的装饰器

View File

@@ -29,7 +29,7 @@ from ast import Constant
from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS
from unilabos.utils import logger
from unilabos.utils.decorator import is_not_action
from unilabos.utils.decorator import is_not_action, is_always_free
class ImportManager:
@@ -282,6 +282,9 @@ class ImportManager:
continue
# 其他非_开头的方法归类为action
method_info = self._analyze_method_signature(method)
# 检查是否被 @always_free 装饰器标记
if is_always_free(method):
method_info["always_free"] = True
result["action_methods"][name] = method_info
return result
@@ -339,6 +342,9 @@ class ImportManager:
if self._is_not_action_method(node):
continue
# 其他非_开头的方法归类为action
# 检查是否被 @always_free 装饰器标记
if self._is_always_free_method(node):
method_info["always_free"] = True
result["action_methods"][method_name] = method_info
return result
@@ -474,6 +480,13 @@ class ImportManager:
return True
return False
def _is_always_free_method(self, node: ast.FunctionDef) -> bool:
"""检查是否是@always_free装饰的方法"""
for decorator in node.decorator_list:
if isinstance(decorator, ast.Name) and decorator.id == "always_free":
return True
return False
def _get_property_name_from_setter(self, node: ast.FunctionDef) -> str:
"""从setter装饰器中获取属性名"""
for decorator in node.decorator_list:

View File

@@ -193,6 +193,7 @@ def configure_logger(loglevel=None, working_dir=None):
root_logger.addHandler(console_handler)
# 如果指定了工作目录,添加文件处理器
log_filepath = None
if working_dir is not None:
logs_dir = os.path.join(working_dir, "logs")
os.makedirs(logs_dir, exist_ok=True)
@@ -213,6 +214,7 @@ def configure_logger(loglevel=None, working_dir=None):
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
return log_filepath

View File

@@ -26,7 +26,7 @@
res_id: plate_slot_{slot}
device_id: /PRCXI
class_name: PRCXI_BioER_96_wellplate
parent: /PRCXI/PRCXI_Deck/T{slot}
parent: /PRCXI/PRCXI_Deck
slot_on_deck: "{slot}"
- 输出端口: labware用于连接 set_liquid_from_plate
- 控制流: create_resource 之间通过 ready 端口串联
@@ -122,7 +122,7 @@ NODE_TYPE_DEFAULT = "ILab" # 所有节点的默认类型
# create_resource 节点默认参数
CREATE_RESOURCE_DEFAULTS = {
"device_id": "/PRCXI",
"parent_template": "/PRCXI/PRCXI_Deck/T{slot}", # {slot} 会被替换为实际的 slot 值
"parent_template": "/PRCXI/PRCXI_Deck",
"class_name": "PRCXI_BioER_96_wellplate",
}
@@ -362,14 +362,16 @@ def build_protocol_graph(
protocol_steps: List[Dict[str, Any]],
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
labware_defs: Optional[List[Dict[str, Any]]] = None,
) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
Args:
labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...}
labware_info: reagent 信息字典,格式为 {name: {slot, well}, ...},用于 set_liquid 和 well 查找
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
labware_defs: labware 定义列表,格式为 [{"name": "...", "slot": "1", "type": "lab_xxx"}, ...]
"""
G = WorkflowGraph()
resource_last_writer = {} # reagent_name -> "node_id:port"
@@ -377,18 +379,7 @@ def build_protocol_graph(
protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# ==================== 第一步:按 slot 去重创建 create_resource 节点 ====================
# 收集所有唯一的 slot
slots_info = {} # slot -> {labware, res_id}
for labware_id, item in labware_info.items():
slot = str(item.get("slot", ""))
if slot and slot not in slots_info:
res_id = f"plate_slot_{slot}"
slots_info[slot] = {
"labware": item.get("labware", ""),
"res_id": res_id,
}
# ==================== 第一步:按 slot 创建 create_resource 节点 ====================
# 创建 Group 节点,包含所有 create_resource 节点
group_node_id = str(uuid.uuid4())
G.add_node(
@@ -404,38 +395,42 @@ def build_protocol_graph(
param=None,
)
# 为每个唯一的 slot 创建 create_resource 节点
# 直接使用 JSON 中的 labware 定义,每个 slot 一条记录type 即 class_name
res_index = 0
for slot, info in slots_info.items():
node_id = str(uuid.uuid4())
res_id = info["res_id"]
for lw in (labware_defs or []):
slot = str(lw.get("slot", ""))
if not slot or slot in slot_to_create_resource:
continue # 跳过空 slot 或已处理的 slot
lw_name = lw.get("name", f"slot {slot}")
lw_type = lw.get("type", CREATE_RESOURCE_DEFAULTS["class_name"])
res_id = f"plate_slot_{slot}"
res_index += 1
node_id = str(uuid.uuid4())
G.add_node(
node_id,
template_name="create_resource",
resource_name="host_node",
name=f"Plate {res_index}",
description=f"Create plate on slot {slot}",
name=lw_name,
description=f"Create {lw_name}",
lab_node_type="Labware",
footer="create_resource-host_node",
device_name=DEVICE_NAME_HOST,
type=NODE_TYPE_DEFAULT,
parent_uuid=group_node_id, # 指向 Group 节点
minimized=True, # 折叠显示
parent_uuid=group_node_id,
minimized=True,
param={
"res_id": res_id,
"device_id": CREATE_RESOURCE_DEFAULTS["device_id"],
"class_name": CREATE_RESOURCE_DEFAULTS["class_name"],
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot),
"class_name": lw_type,
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"],
"bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0},
"slot_on_deck": slot,
},
)
slot_to_create_resource[slot] = node_id
# create_resource 之间不需要 ready 连接
# ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ====================
# 创建 Group 节点,包含所有 set_liquid_from_plate 节点
set_liquid_group_id = str(uuid.uuid4())

View File

@@ -1,16 +1,20 @@
"""
JSON 工作流转换模块
将 workflow/reagent 格式的 JSON 转换为统一工作流格式。
将 workflow/reagent/labware 格式的 JSON 转换为统一工作流格式。
输入格式:
{
"labware": [
{"name": "...", "slot": "1", "type": "lab_xxx"},
...
],
"workflow": [
{"action": "...", "action_args": {...}},
...
],
"reagent": {
"reagent_name": {"slot": int, "well": [...], "labware": "..."},
"reagent_name": {"slot": int, "well": [...]},
...
}
}
@@ -245,18 +249,18 @@ def convert_from_json(
if "workflow" not in json_data or "reagent" not in json_data:
raise ValueError(
"不支持的 JSON 格式。请使用标准格式:\n"
'{"workflow": [{"action": "...", "action_args": {...}}, ...], '
'"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}'
'{"labware": [...], "workflow": [...], "reagent": {...}}'
)
# 提取数据
workflow = json_data["workflow"]
reagent = json_data["reagent"]
labware_defs = json_data.get("labware", []) # 新的 labware 定义列表
# 规范化步骤数据
protocol_steps = normalize_workflow_steps(workflow)
# reagent 已经是字典格式,直接使
# reagent 已经是字典格式,用于 set_liquid 和 well 数量查找
labware_info = reagent
# 构建工作流图
@@ -265,6 +269,7 @@ def convert_from_json(
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
labware_defs=labware_defs,
)
# 校验句柄配置

View File

@@ -41,6 +41,7 @@ def upload_workflow(
workflow_name: Optional[str] = None,
tags: Optional[List[str]] = None,
published: bool = False,
description: str = "",
) -> Dict[str, Any]:
"""
上传工作流到服务器
@@ -56,6 +57,7 @@ def upload_workflow(
workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns:
Dict: API响应数据
@@ -75,6 +77,14 @@ def upload_workflow(
print_status(f"工作流文件JSON解析失败: {e}", "error")
return {"code": -1, "message": f"JSON解析失败: {e}"}
# 从 JSON 文件中提取 description 和 tags作为 fallback
if not description and "description" in workflow_data:
description = workflow_data["description"]
print_status(f"从文件中读取 description", "info")
if not tags and "tags" in workflow_data:
tags = workflow_data["tags"]
print_status(f"从文件中读取 tags: {tags}", "info")
# 自动检测并转换格式
if not _is_node_link_format(workflow_data):
try:
@@ -96,6 +106,7 @@ def upload_workflow(
print_status(f" - 节点数量: {len(nodes)}", "info")
print_status(f" - 边数量: {len(edges)}", "info")
print_status(f" - 标签: {tags or []}", "info")
print_status(f" - 描述: {description[:50]}{'...' if len(description) > 50 else ''}", "info")
print_status(f" - 发布状态: {published}", "info")
# 调用 http_client 上传
@@ -107,6 +118,7 @@ def upload_workflow(
edges=edges,
tags=tags,
published=published,
description=description,
)
if result.get("code") == 0:
@@ -131,8 +143,9 @@ def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None:
workflow_name = args_dict.get("workflow_name")
tags = args_dict.get("tags", [])
published = args_dict.get("published", False)
description = args_dict.get("description", "")
if workflow_file:
upload_workflow(workflow_file, workflow_name, tags, published)
upload_workflow(workflow_file, workflow_name, tags, published, description)
else:
print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")

View File

@@ -2,7 +2,7 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>unilabos_msgs</name>
<version>0.10.17</version>
<version>0.10.18</version>
<description>ROS2 Messages package for unilabos devices</description>
<maintainer email="changjh@pku.edu.cn">Junhan Chang</maintainer>
<maintainer email="18435084+Xuwznln@users.noreply.github.com">Xuwznln</maintainer>