diff --git a/.conda/base/recipe.yaml b/.conda/base/recipe.yaml index fd20564b..cf908c9d 100644 --- a/.conda/base/recipe.yaml +++ b/.conda/base/recipe.yaml @@ -3,7 +3,7 @@ package: name: unilabos - version: 0.10.17 + version: 0.10.18 source: path: ../../unilabos @@ -46,13 +46,15 @@ requirements: - jinja2 - requests - uvicorn - - opcua # [not osx] + - if: not osx + then: + - opcua - pyserial - pandas - pymodbus - matplotlib - pylibftdi - - uni-lab::unilabos-env ==0.10.17 + - uni-lab::unilabos-env ==0.10.18 about: repository: https://github.com/deepmodeling/Uni-Lab-OS diff --git a/.conda/environment/recipe.yaml b/.conda/environment/recipe.yaml index 3f8df0f6..56ff44d4 100644 --- a/.conda/environment/recipe.yaml +++ b/.conda/environment/recipe.yaml @@ -2,7 +2,7 @@ package: name: unilabos-env - version: 0.10.17 + version: 0.10.18 build: noarch: generic diff --git a/.conda/full/recipe.yaml b/.conda/full/recipe.yaml index 037f5b4f..ff8b4824 100644 --- a/.conda/full/recipe.yaml +++ b/.conda/full/recipe.yaml @@ -3,7 +3,7 @@ package: name: unilabos-full - version: 0.10.17 + version: 0.10.18 build: noarch: generic @@ -11,7 +11,7 @@ build: requirements: run: # Base unilabos package (includes unilabos-env) - - uni-lab::unilabos ==0.10.17 + - uni-lab::unilabos ==0.10.18 # Documentation tools - sphinx - sphinx_rtd_theme diff --git a/docs/user_guide/best_practice.md b/docs/user_guide/best_practice.md index 767dc4d8..499ee9ee 100644 --- a/docs/user_guide/best_practice.md +++ b/docs/user_guide/best_practice.md @@ -452,8 +452,9 @@ unilab --ak your_ak --sk your_sk -g test/experiments/mock_devices/mock_all.json **操作步骤:** 1. 将两个 `container` 拖拽到 `workstation` 中 -2. 将 `virtual_transfer_pump` 拖拽到 `workstation` 中 -3. 在画布上连接它们(建立父子关系) +2. 将 `virtual_multiway_valve` 拖拽到 `workstation` 中 +3. 将 `virtual_transfer_pump` 拖拽到 `workstation` 中 +4. 在画布上连接它们(建立父子关系) ![设备连接](image/links.png) diff --git a/docs/user_guide/image/links.png b/docs/user_guide/image/links.png index 7e5e2bbe..c5fc2452 100644 Binary files a/docs/user_guide/image/links.png and b/docs/user_guide/image/links.png differ diff --git a/recipes/msgs/recipe.yaml b/recipes/msgs/recipe.yaml index f78df2e9..a3c2d2bd 100644 --- a/recipes/msgs/recipe.yaml +++ b/recipes/msgs/recipe.yaml @@ -1,6 +1,6 @@ package: name: ros-humble-unilabos-msgs - version: 0.10.17 + version: 0.10.18 source: path: ../../unilabos_msgs target_directory: src diff --git a/recipes/unilabos/recipe.yaml b/recipes/unilabos/recipe.yaml index feca503a..aeb76a0c 100644 --- a/recipes/unilabos/recipe.yaml +++ b/recipes/unilabos/recipe.yaml @@ -1,6 +1,6 @@ package: name: unilabos - version: "0.10.17" + version: "0.10.18" source: path: ../.. diff --git a/setup.py b/setup.py index b3a00f1d..dc7bbc73 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ package_name = 'unilabos' setup( name=package_name, - version='0.10.17', + version='0.10.18', packages=find_packages(), include_package_data=True, install_requires=['setuptools'], diff --git a/unilabos/__init__.py b/unilabos/__init__.py index 50ab2b04..63e3face 100644 --- a/unilabos/__init__.py +++ b/unilabos/__init__.py @@ -1 +1 @@ -__version__ = "0.10.17" +__version__ = "0.10.18" diff --git a/unilabos/app/main.py b/unilabos/app/main.py index a6539c33..c652757c 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -171,6 +171,12 @@ def parse_args(): action="store_true", help="Disable sending update feedback to server", ) + parser.add_argument( + "--test_mode", + action="store_true", + default=False, + help="Test mode: all actions simulate execution and return mock results without running real hardware", + ) # workflow upload subcommand workflow_parser = subparsers.add_parser( "workflow_upload", @@ -204,6 +210,12 @@ def parse_args(): default=False, help="Whether to publish the workflow (default: False)", ) + workflow_parser.add_argument( + "--description", + type=str, + default="", + help="Workflow description, used when publishing the workflow", + ) return parser @@ -231,52 +243,60 @@ def main(): # 加载配置文件,优先加载config,然后从env读取 config_path = args_dict.get("config") - if check_mode: - args_dict["working_dir"] = os.path.abspath(os.getcwd()) - # 当 skip_env_check 时,默认使用当前目录作为 working_dir - if skip_env_check and not args_dict.get("working_dir") and not config_path: + # === 解析 working_dir === + # 规则1: working_dir 传入 → 检测 unilabos_data 子目录,已是则不修改 + # 规则2: 仅 config_path 传入 → 用其父目录作为 working_dir + # 规则4: 两者都传入 → 各用各的,但 working_dir 仍做 unilabos_data 子目录检测 + raw_working_dir = args_dict.get("working_dir") + if raw_working_dir: + working_dir = os.path.abspath(raw_working_dir) + elif config_path and os.path.exists(config_path): + working_dir = os.path.dirname(os.path.abspath(config_path)) + else: working_dir = os.path.abspath(os.getcwd()) - print_status(f"跳过环境检查模式:使用当前目录作为工作目录 {working_dir}", "info") - # 检查当前目录是否有 local_config.py - local_config_in_cwd = os.path.join(working_dir, "local_config.py") - if os.path.exists(local_config_in_cwd): - config_path = local_config_in_cwd + + # unilabos_data 子目录自动检测 + if os.path.basename(working_dir) != "unilabos_data": + unilabos_data_sub = os.path.join(working_dir, "unilabos_data") + if os.path.isdir(unilabos_data_sub): + working_dir = unilabos_data_sub + elif not raw_working_dir and not (config_path and os.path.exists(config_path)): + # 未显式指定路径,默认使用 cwd/unilabos_data + working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data")) + + # === 解析 config_path === + if config_path and not os.path.exists(config_path): + # config_path 传入但不存在,尝试在 working_dir 中查找 + candidate = os.path.join(working_dir, "local_config.py") + if os.path.exists(candidate): + config_path = candidate + print_status(f"在工作目录中发现配置文件: {config_path}", "info") + else: + print_status( + f"配置文件 {config_path} 不存在,工作目录 {working_dir} 中也未找到 local_config.py," + f"请通过 --config 传入 local_config.py 文件路径", + "error", + ) + os._exit(1) + elif not config_path: + # 规则3: 未传入 config_path,尝试 working_dir/local_config.py + candidate = os.path.join(working_dir, "local_config.py") + if os.path.exists(candidate): + config_path = candidate print_status(f"发现本地配置文件: {config_path}", "info") else: print_status(f"未指定config路径,可通过 --config 传入 local_config.py 文件路径", "info") - elif os.getcwd().endswith("unilabos_data"): - working_dir = os.path.abspath(os.getcwd()) - else: - working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data")) - - if args_dict.get("working_dir"): - working_dir = args_dict.get("working_dir", "") - if config_path and not os.path.exists(config_path): - config_path = os.path.join(working_dir, "local_config.py") - if not os.path.exists(config_path): - print_status( - f"当前工作目录 {working_dir} 未找到local_config.py,请通过 --config 传入 local_config.py 文件路径", - "error", + print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info") + if check_mode or input() != "n": + os.makedirs(working_dir, exist_ok=True) + config_path = os.path.join(working_dir, "local_config.py") + shutil.copy( + os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), + config_path, ) + print_status(f"已创建 local_config.py 路径: {config_path}", "info") + else: os._exit(1) - elif config_path and os.path.exists(config_path): - working_dir = os.path.dirname(config_path) - elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")): - config_path = os.path.join(working_dir, "local_config.py") - elif not skip_env_check and not config_path and ( - not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py")) - ): - print_status(f"未指定config路径,可通过 --config 传入 local_config.py 文件路径", "info") - print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info") - if input() != "n": - os.makedirs(working_dir, exist_ok=True) - config_path = os.path.join(working_dir, "local_config.py") - shutil.copy( - os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), config_path - ) - print_status(f"已创建 local_config.py 路径: {config_path}", "info") - else: - os._exit(1) # 加载配置文件 (check_mode 跳过) print_status(f"当前工作目录为 {working_dir}", "info") @@ -288,7 +308,9 @@ def main(): if hasattr(BasicConfig, "log_level"): logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.") - configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir) + file_path = configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir) + if file_path is not None: + logger.info(f"[LOG_FILE] {file_path}") if args.addr != parser.get_default("addr"): if args.addr == "test": @@ -332,6 +354,9 @@ def main(): BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.upload_registry = args_dict.get("upload_registry", False) BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False) + BasicConfig.test_mode = args_dict.get("test_mode", False) + if BasicConfig.test_mode: + print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning") BasicConfig.communication_protocol = "websocket" machine_name = os.popen("hostname").read().strip() machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) diff --git a/unilabos/app/model.py b/unilabos/app/model.py index 6f40e731..f80ce35a 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -54,6 +54,7 @@ class JobAddReq(BaseModel): action_type: str = Field( examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default="" ) + sample_material: dict = Field(examples=[{"string": "string"}], description="sample uuid to material uuid") action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict) task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="") job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="") diff --git a/unilabos/app/register.py b/unilabos/app/register.py index 633df98f..5918b43a 100644 --- a/unilabos/app/register.py +++ b/unilabos/app/register.py @@ -38,9 +38,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[ response = http_client.resource_registry({"resources": list(devices_to_register.values())}) cost_time = time.time() - start_time if response.status_code in [200, 201]: - logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}ms") + logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}s") else: - logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}ms") + logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}s") except Exception as e: logger.error(f"[UniLab Register] 设备注册异常: {e}") @@ -51,9 +51,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[ response = http_client.resource_registry({"resources": list(resources_to_register.values())}) cost_time = time.time() - start_time if response.status_code in [200, 201]: - logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}ms") + logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}s") else: - logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}ms") + logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}s") except Exception as e: logger.error(f"[UniLab Register] 资源注册异常: {e}") diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 0ecf4608..b43b0f44 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -343,9 +343,10 @@ class HTTPClient: edges: List[Dict[str, Any]], tags: Optional[List[str]] = None, published: bool = False, + description: str = "", ) -> Dict[str, Any]: """ - 导入工作流到服务器 + 导入工作流到服务器,如果 published 为 True,则额外发起发布请求 Args: name: 工作流名称(顶层) @@ -355,6 +356,7 @@ class HTTPClient: edges: 工作流边列表 tags: 工作流标签列表,默认为空列表 published: 是否发布工作流,默认为False + description: 工作流描述,发布时使用 Returns: Dict: API响应数据,包含 code 和 data (uuid, name) @@ -367,7 +369,6 @@ class HTTPClient: "nodes": nodes, "edges": edges, "tags": tags if tags is not None else [], - "published": published, }, } # 保存请求到文件 @@ -388,11 +389,51 @@ class HTTPClient: res = response.json() if "code" in res and res["code"] != 0: logger.error(f"导入工作流失败: {response.text}") + return res + # 导入成功后,如果需要发布则额外发起发布请求 + if published: + imported_uuid = res.get("data", {}).get("uuid", workflow_uuid) + publish_res = self.workflow_publish(imported_uuid, description) + res["publish_result"] = publish_res return res else: logger.error(f"导入工作流失败: {response.status_code}, {response.text}") return {"code": response.status_code, "message": response.text} + def workflow_publish(self, workflow_uuid: str, description: str = "") -> Dict[str, Any]: + """ + 发布工作流 + + Args: + workflow_uuid: 工作流UUID + description: 工作流描述 + + Returns: + Dict: API响应数据 + """ + payload = { + "uuid": workflow_uuid, + "description": description, + "published": True, + } + logger.info(f"正在发布工作流: {workflow_uuid}") + response = requests.patch( + f"{self.remote_addr}/lab/workflow/owner", + json=payload, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=60, + ) + if response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"发布工作流失败: {response.text}") + else: + logger.info(f"工作流发布成功: {workflow_uuid}") + return res + else: + logger.error(f"发布工作流失败: {response.status_code}, {response.text}") + return {"code": response.status_code, "message": response.text} + # 创建默认客户端实例 http_client = HTTPClient() diff --git a/unilabos/app/web/controller.py b/unilabos/app/web/controller.py index acd1f56b..6a01645c 100644 --- a/unilabos/app/web/controller.py +++ b/unilabos/app/web/controller.py @@ -327,6 +327,7 @@ def job_add(req: JobAddReq) -> JobData: queue_item, action_type=action_type, action_kwargs=action_args, + sample_material=req.sample_material, server_info=server_info, ) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 8644353b..f2235e94 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -76,6 +76,7 @@ class JobInfo: start_time: float last_update_time: float = field(default_factory=time.time) ready_timeout: Optional[float] = None # READY状态的超时时间 + always_free: bool = False # 是否为永久闲置动作(不受排队限制) def update_timestamp(self): """更新最后更新时间""" @@ -127,6 +128,15 @@ class DeviceActionManager: # 总是将job添加到all_jobs中 self.all_jobs[job_info.job_id] = job_info + # always_free的动作不受排队限制,直接设为READY + if job_info.always_free: + job_info.status = JobStatus.READY + job_info.update_timestamp() + job_info.set_ready_timeout(10) + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.trace(f"[DeviceActionManager] Job {job_log} always_free, start immediately") + return True + # 检查是否有正在执行或准备执行的任务 if device_key in self.active_jobs: # 有正在执行或准备执行的任务,加入队列 @@ -176,11 +186,15 @@ class DeviceActionManager: logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}") return False - # 检查设备上是否是这个job - if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: - job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) - logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}") - return False + # always_free的job不需要检查active_jobs + if not job_info.always_free: + # 检查设备上是否是这个job + if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: + job_log = format_job_log( + job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name + ) + logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}") + return False # 开始执行任务,将状态从READY转换为STARTED job_info.status = JobStatus.STARTED @@ -203,6 +217,13 @@ class DeviceActionManager: job_info = self.all_jobs[job_id] device_key = job_info.device_action_key + # always_free的job直接清理,不影响队列 + if job_info.always_free: + job_info.status = JobStatus.ENDED + job_info.update_timestamp() + del self.all_jobs[job_id] + return None + # 移除活跃任务 if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: del self.active_jobs[device_key] @@ -234,9 +255,14 @@ class DeviceActionManager: return None def get_active_jobs(self) -> List[JobInfo]: - """获取所有正在执行的任务""" + """获取所有正在执行的任务(含active_jobs和always_free的STARTED job)""" with self.lock: - return list(self.active_jobs.values()) + jobs = list(self.active_jobs.values()) + # 补充 always_free 的 STARTED job(它们不在 active_jobs 中) + for job in self.all_jobs.values(): + if job.always_free and job.status == JobStatus.STARTED and job not in jobs: + jobs.append(job) + return jobs def get_queued_jobs(self) -> List[JobInfo]: """获取所有排队中的任务""" @@ -261,6 +287,14 @@ class DeviceActionManager: job_info = self.all_jobs[job_id] device_key = job_info.device_action_key + # always_free的job直接清理 + if job_info.always_free: + job_info.status = JobStatus.ENDED + del self.all_jobs[job_id] + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.trace(f"[DeviceActionManager] Always-free job {job_log} cancelled") + return True + # 如果是正在执行的任务 if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: # 清理active job状态 @@ -334,13 +368,18 @@ class DeviceActionManager: timeout_jobs = [] with self.lock: - # 统计READY状态的任务数量 - ready_jobs_count = sum(1 for job in self.active_jobs.values() if job.status == JobStatus.READY) + # 收集所有需要检查的 READY 任务(active_jobs + always_free READY jobs) + ready_candidates = list(self.active_jobs.values()) + for job in self.all_jobs.values(): + if job.always_free and job.status == JobStatus.READY and job not in ready_candidates: + ready_candidates.append(job) + + ready_jobs_count = sum(1 for job in ready_candidates if job.status == JobStatus.READY) if ready_jobs_count > 0: logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501 # 找到所有超时的READY任务(只检测,不处理) - for job_info in self.active_jobs.values(): + for job_info in ready_candidates: if job_info.is_ready_timeout(): timeout_jobs.append(job_info) job_log = format_job_log( @@ -545,7 +584,7 @@ class MessageProcessor: try: message_str = json.dumps(msg, ensure_ascii=False) await self.websocket.send(message_str) - logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 + # logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 except Exception as e: logger.error(f"[MessageProcessor] Failed to send message: {str(e)}") logger.error(traceback.format_exc()) @@ -608,6 +647,24 @@ class MessageProcessor: if host_node: host_node.handle_pong_response(pong_data) + def _check_action_always_free(self, device_id: str, action_name: str) -> bool: + """检查该action是否标记为always_free,通过HostNode统一的_action_value_mappings查找""" + try: + host_node = HostNode.get_instance(0) + if not host_node: + return False + # noinspection PyProtectedMember + action_mappings = host_node._action_value_mappings.get(device_id) + if not action_mappings: + return False + # 尝试直接匹配或 auto- 前缀匹配 + for key in [action_name, f"auto-{action_name}"]: + if key in action_mappings: + return action_mappings[key].get("always_free", False) + return False + except Exception: + return False + async def _handle_query_action_state(self, data: Dict[str, Any]): """处理query_action_state消息""" device_id = data.get("device_id", "") @@ -622,6 +679,9 @@ class MessageProcessor: device_action_key = f"/devices/{device_id}/{action_name}" + # 检查action是否为always_free + action_always_free = self._check_action_always_free(device_id, action_name) + # 创建任务信息 job_info = JobInfo( job_id=job_id, @@ -631,6 +691,7 @@ class MessageProcessor: device_action_key=device_action_key, status=JobStatus.QUEUE, start_time=time.time(), + always_free=action_always_free, ) # 添加到设备管理器 @@ -657,6 +718,8 @@ class MessageProcessor: async def _handle_job_start(self, data: Dict[str, Any]): """处理job_start消息""" try: + if not data.get("sample_material"): + data["sample_material"] = {} req = JobAddReq(**data) job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action) @@ -688,6 +751,7 @@ class MessageProcessor: queue_item, action_type=req.action_type, action_kwargs=req.action_args, + sample_material=req.sample_material, server_info=req.server_info, ) @@ -1120,6 +1184,11 @@ class QueueProcessor: logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs") for job_info in queued_jobs: + # 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY, + # 此时不应再发送 busy/need_more,否则会覆盖已发出的 free=True 通知 + if job_info.status != JobStatus.QUEUE: + continue + message = { "action": "report_action_state", "data": { @@ -1301,7 +1370,7 @@ class WebSocketClient(BaseCommunicationClient): }, } self.message_processor.send_message(message) - logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") + # logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") def publish_job_status( self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index 7215fc5b..d2dd4978 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -95,8 +95,29 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: return total_volume -def is_integrated_pump(node_name): - return "pump" in node_name and "valve" in node_name +def is_integrated_pump(node_class: str, node_name: str = "") -> bool: + """ + 判断是否为泵阀一体设备 + """ + class_lower = (node_class or "").lower() + name_lower = (node_name or "").lower() + + if "pump" not in class_lower and "pump" not in name_lower: + return False + + integrated_markers = [ + "valve", + "pump_valve", + "pumpvalve", + "integrated", + "transfer_pump", + ] + + for marker in integrated_markers: + if marker in class_lower or marker in name_lower: + return True + + return False def find_connected_pump(G, valve_node): @@ -186,7 +207,9 @@ def build_pump_valve_maps(G, pump_backbone): debug_print(f"🔧 过滤后的骨架: {filtered_backbone}") for node in filtered_backbone: - if is_integrated_pump(G.nodes[node]["class"]): + node_data = G.nodes.get(node, {}) + node_class = node_data.get("class", "") or "" + if is_integrated_pump(node_class, node): pumps_from_node[node] = node valve_from_node[node] = node debug_print(f" - 集成泵-阀: {node}") diff --git a/unilabos/config/config.py b/unilabos/config/config.py index c91a07d4..4b7d91a4 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -23,6 +23,7 @@ class BasicConfig: disable_browser = False # 禁止浏览器自动打开 port = 8002 # 本地HTTP服务 check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性 + test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果 # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG" @@ -145,5 +146,5 @@ def load_config(config_path=None): traceback.print_exc() exit(1) else: - config_path = os.path.join(os.path.dirname(__file__), "local_config.py") + config_path = os.path.join(os.path.dirname(__file__), "example_config.py") load_config(config_path) diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 24d85187..48ec082b 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -21,10 +21,21 @@ from pylabrobot.resources import ( ResourceHolder, Lid, Trash, - Tip, + Tip, TubeRack, ) +from typing_extensions import TypedDict + +from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend +from unilabos.registry.placeholder_type import ResourceSlot +from unilabos.resources.resource_tracker import ( + ResourceTreeSet, + ResourceDict, + EXTRA_SAMPLE_UUID, + EXTRA_UNILABOS_SAMPLE_UUID, +) +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode + -from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode class SimpleReturn(TypedDict): samples: List[List[ResourceDict]] volumes: List[float] @@ -237,12 +248,11 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples = [] res_volumes = [] for resource, volume, channel in zip(resources, vols, use_channels): - res_samples.append( - {"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)} - ) + sample_uuid_value = resource.unilabos_extra.get(EXTRA_SAMPLE_UUID, None) + res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: sample_uuid_value}) res_volumes.append(volume) self.pending_liquids_dict[channel] = { - "sample_uuid": resource.unilabos_extra.get("sample_uuid", None), + EXTRA_SAMPLE_UUID: sample_uuid_value, "volume": volume, } return SimpleReturn(samples=res_samples, volumes=res_volumes) @@ -284,10 +294,10 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples = [] res_volumes = [] for resource, volume, channel in zip(resources, vols, use_channels): - res_uuid = self.pending_liquids_dict[channel]["sample_uuid"] + res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID] self.pending_liquids_dict[channel]["volume"] -= volume - resource.unilabos_extra["sample_uuid"] = res_uuid - res_samples.append({"name": resource.name, "sample_uuid": res_uuid}) + resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid + res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid}) res_volumes.append(volume) return SimpleReturn(samples=res_samples, volumes=res_volumes) @@ -687,7 +697,52 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): well.set_liquids([(liquid_name, volume)]) # type: ignore res_volumes.append(volume) - return SimpleReturn(samples=res_samples, volumes=res_volumes) + return SetLiquidReturn( + wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore + ) + + def set_liquid_from_plate( + self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float] + ) -> SetLiquidFromPlateReturn: + """Set the liquid in wells of a plate by well names (e.g., A1, A2, B3). + + 如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。 + """ + assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}" + plate: Union[Plate, TubeRack] + # 根据 well_names 获取对应的 Well 对象 + if issubclass(plate.__class__, Plate): + wells = [plate.get_well(name) for name in well_names] + elif issubclass(plate.__class__, TubeRack): + wells = [plate.get_tube(name) for name in well_names] + res_volumes = [] + + # 如果 liquid_names 和 volumes 都为空,直接返回 + if not liquid_names and not volumes: + return SetLiquidFromPlateReturn( + plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore + wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore + volumes=res_volumes, + ) + + for well, liquid_name, volume in zip(wells, liquid_names, volumes): + well.set_liquids([(liquid_name, volume)]) # type: ignore + res_volumes.append(volume) + + task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells}) + submit_time = time.time() + while not task.done(): + if time.time() - submit_time > 10: + self._ros_node.lab_logger().info(f"set_liquid_from_plate {plate} 超时") + break + time.sleep(0.01) + + return SetLiquidFromPlateReturn( + plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore + wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore + volumes=res_volumes, + ) + # --------------------------------------------------------------- # REMOVE LIQUID -------------------------------------------------- # --------------------------------------------------------------- diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index f3375296..3f33c960 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -91,7 +91,7 @@ class PRCXI9300Deck(Deck): """ def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs): - super().__init__(name, size_x, size_y, size_z) + super().__init__(size_x, size_y, size_z, name) self.slots = [None] * 16 # PRCXI 9300/9320 最大有 16 个槽位 self.slot_locations = [Coordinate(0, 0, 0)] * 16 @@ -248,14 +248,15 @@ class PRCXI9300TipRack(TipRack): if ordered_items is not None: items = ordered_items elif ordering is not None: - # 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况) - # 如果是字符串,说明这是位置名称,需要让 TipRack 自己创建 Tip 对象 - # 我们只传递位置信息(键),不传递值,使用 ordering 参数 - if ordering and isinstance(next(iter(ordering.values()), None), str): - # ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict + # 检查 ordering 中的值类型来决定如何处理: + # - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param + # - None 值(从第二次往返序列化): 同样只用键创建 ordering_param + # - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用 + first_val = next(iter(ordering.values()), None) if ordering else None + if not ordering or first_val is None or isinstance(first_val, str): + # ordering 的值是字符串或 None,只使用键(位置信息)创建新的 OrderedDict # 传递 ordering 参数而不是 ordered_items,让 TipRack 自己创建 Tip 对象 items = None - # 使用 ordering 参数,只包含位置信息(键) ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) else: # ordering 的值已经是对象,可以直接使用 @@ -397,14 +398,15 @@ class PRCXI9300TubeRack(TubeRack): items_to_pass = ordered_items ordering_param = None elif ordering is not None: - # 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况) - # 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象 - # 我们只传递位置信息(键),不传递值,使用 ordering 参数 - if ordering and isinstance(next(iter(ordering.values()), None), str): - # ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict + # 检查 ordering 中的值类型来决定如何处理: + # - 字符串值(从 JSON 反序列化): 只用键创建 ordering_param + # - None 值(从第二次往返序列化): 同样只用键创建 ordering_param + # - 对象值(已经是实际的 Resource 对象): 直接作为 ordered_items 使用 + first_val = next(iter(ordering.values()), None) if ordering else None + if not ordering or first_val is None or isinstance(first_val, str): + # ordering 的值是字符串或 None,只使用键(位置信息)创建新的 OrderedDict # 传递 ordering 参数而不是 ordered_items,让 TubeRack 自己创建 Tube 对象 items_to_pass = None - # 使用 ordering 参数,只包含位置信息(键) ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) else: # ordering 的值已经是对象,可以直接使用 @@ -595,7 +597,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): return super().set_liquid(wells, liquid_names, volumes) def set_liquid_from_plate( - self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float] + self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float] ) -> SetLiquidFromPlateReturn: return super().set_liquid_from_plate(plate, well_names, liquid_names, volumes) diff --git a/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py b/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py index 2ec7afe5..16ff5b6e 100644 --- a/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py +++ b/unilabos/devices/ros_dev/liquid_handler_joint_publisher.py @@ -19,10 +19,11 @@ from rclpy.node import Node import re class LiquidHandlerJointPublisher(BaseROS2DeviceNode): - def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", **kwargs): + def __init__(self,resources_config:list, resource_tracker, rate=50, device_id:str = "lh_joint_publisher", registry_name: str = "lh_joint_publisher", **kwargs): super().__init__( driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types={}, action_value_mappings={}, hardware_interface={}, diff --git a/unilabos/devices/virtual/virtual_transferpump.py b/unilabos/devices/virtual/virtual_transferpump.py index 7b8eea86..2d3c9d8b 100644 --- a/unilabos/devices/virtual/virtual_transferpump.py +++ b/unilabos/devices/virtual/virtual_transferpump.py @@ -15,35 +15,35 @@ class VirtualPumpMode(Enum): class VirtualTransferPump: """虚拟转移泵类 - 模拟泵的基本功能,无需实际硬件 🚰""" - + _ros_node: BaseROS2DeviceNode - + def __init__(self, device_id: str = None, config: dict = None, **kwargs): """ 初始化虚拟转移泵 - + Args: device_id: 设备ID config: 配置字典,包含max_volume, port等参数 **kwargs: 其他参数,确保兼容性 """ self.device_id = device_id or "virtual_transfer_pump" - + # 从config或kwargs中获取参数,确保类型正确 if config: - self.max_volume = float(config.get('max_volume', 25.0)) - self.port = config.get('port', 'VIRTUAL') + self.max_volume = float(config.get("max_volume", 25.0)) + self.port = config.get("port", "VIRTUAL") else: - self.max_volume = float(kwargs.get('max_volume', 25.0)) - self.port = kwargs.get('port', 'VIRTUAL') - - self._transfer_rate = float(kwargs.get('transfer_rate', 0)) - self.mode = kwargs.get('mode', VirtualPumpMode.Normal) - + self.max_volume = float(kwargs.get("max_volume", 25.0)) + self.port = kwargs.get("port", "VIRTUAL") + + self._transfer_rate = float(kwargs.get("transfer_rate", 0)) + self.mode = kwargs.get("mode", VirtualPumpMode.Normal) + # 状态变量 - 确保都是正确类型 self._status = "Idle" self._position = 0.0 # float - self._max_velocity = 5.0 # float + self._max_velocity = 5.0 # float self._current_volume = 0.0 # float # 🚀 新增:快速模式设置 - 大幅缩短执行时间 @@ -52,14 +52,16 @@ class VirtualTransferPump: self._fast_dispense_time = 1.0 # 快速喷射时间(秒) self.logger = logging.getLogger(f"VirtualTransferPump.{self.device_id}") - + print(f"🚰 === 虚拟转移泵 {self.device_id} 已创建 === ✨") - print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s") + print( + f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s" + ) print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}") - + def post_init(self, ros_node: BaseROS2DeviceNode): self._ros_node = ros_node - + async def initialize(self) -> bool: """初始化虚拟泵 🚀""" self.logger.info(f"🔧 初始化虚拟转移泵 {self.device_id} ✨") @@ -68,33 +70,33 @@ class VirtualTransferPump: self._current_volume = 0.0 self.logger.info(f"✅ 转移泵 {self.device_id} 初始化完成 🚰") return True - + async def cleanup(self) -> bool: """清理虚拟泵 🧹""" self.logger.info(f"🧹 清理虚拟转移泵 {self.device_id} 🔚") self._status = "Idle" self.logger.info(f"✅ 转移泵 {self.device_id} 清理完成 💤") return True - + # 基本属性 @property def status(self) -> str: return self._status - + @property def position(self) -> float: """当前柱塞位置 (ml) 📍""" return self._position - + @property def current_volume(self) -> float: """当前注射器中的体积 (ml) 💧""" return self._current_volume - + @property def max_velocity(self) -> float: return self._max_velocity - + @property def transfer_rate(self) -> float: return self._transfer_rate @@ -103,17 +105,17 @@ class VirtualTransferPump: """设置最大速度 (ml/s) 🌊""" self._max_velocity = max(0.1, min(50.0, velocity)) # 限制在合理范围内 self.logger.info(f"🌊 设置最大速度为 {self._max_velocity} mL/s") - + def get_status(self) -> str: """获取泵状态 📋""" return self._status - + async def _simulate_operation(self, duration: float): """模拟操作延时 ⏱️""" self._status = "Busy" await self._ros_node.sleep(duration) self._status = "Idle" - + def _calculate_duration(self, volume: float, velocity: float = None) -> float: """ 计算操作持续时间 ⏰ @@ -121,10 +123,10 @@ class VirtualTransferPump: """ if velocity is None: velocity = self._max_velocity - + # 📊 计算理论时间(用于日志显示) theoretical_duration = abs(volume) / velocity - + # 🚀 如果启用快速模式,使用固定的快速时间 if self._fast_mode: # 根据操作类型选择快速时间 @@ -132,13 +134,13 @@ class VirtualTransferPump: actual_duration = self._fast_move_time else: # 很小的操作 actual_duration = 0.5 - + self.logger.debug(f"⚡ 快速模式: 理论时间 {theoretical_duration:.2f}s → 实际时间 {actual_duration:.2f}s") return actual_duration else: # 正常模式使用理论时间 return theoretical_duration - + def _calculate_display_duration(self, volume: float, velocity: float = None) -> float: """ 计算显示用的持续时间(用于日志) 📊 @@ -147,16 +149,16 @@ class VirtualTransferPump: if velocity is None: velocity = self._max_velocity return abs(volume) / velocity - + # 新的set_position方法 - 专门用于SetPumpPosition动作 async def set_position(self, position: float, max_velocity: float = None): """ 移动到绝对位置 - 专门用于SetPumpPosition动作 🎯 - + Args: position (float): 目标位置 (ml) max_velocity (float): 移动速度 (ml/s) - + Returns: dict: 符合SetPumpPosition.action定义的结果 """ @@ -164,19 +166,19 @@ class VirtualTransferPump: # 验证并转换参数 target_position = float(position) velocity = float(max_velocity) if max_velocity is not None else self._max_velocity - + # 限制位置在有效范围内 target_position = max(0.0, min(float(self.max_volume), target_position)) - + # 计算移动距离 volume_to_move = abs(target_position - self._position) - + # 📊 计算显示用的时间(用于日志) display_duration = self._calculate_display_duration(volume_to_move, velocity) - + # ⚡ 计算实际执行时间(快速模式) actual_duration = self._calculate_duration(volume_to_move, velocity) - + # 🎯 确定操作类型和emoji if target_position > self._position: operation_type = "吸液" @@ -187,28 +189,34 @@ class VirtualTransferPump: else: operation_type = "保持" operation_emoji = "📍" - + self.logger.info(f"🎯 SET_POSITION: {operation_type} {operation_emoji}") - self.logger.info(f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)") + self.logger.info( + f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)" + ) self.logger.info(f" 🌊 速度: {velocity:.2f} mL/s") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + # 🚀 模拟移动过程 if volume_to_move > 0.01: # 只有当移动距离足够大时才显示进度 start_position = self._position steps = 5 if actual_duration > 0.5 else 2 # 根据实际时间调整步数 step_duration = actual_duration / steps - + self.logger.info(f"🚀 开始{operation_type}... {operation_emoji}") - + for i in range(steps + 1): # 计算当前位置和进度 progress = (i / steps) * 100 if steps > 0 else 100 - current_pos = start_position + (target_position - start_position) * (i / steps) if steps > 0 else target_position - + current_pos = ( + start_position + (target_position - start_position) * (i / steps) + if steps > 0 + else target_position + ) + # 更新状态 if i < steps: self._status = f"{operation_type}中" @@ -216,10 +224,10 @@ class VirtualTransferPump: else: self._status = "Idle" status_emoji = "✅" - + self._position = current_pos self._current_volume = current_pos - + # 显示进度(每25%或最后一步) if i == 0: self.logger.debug(f" 🔄 {operation_type}开始: {progress:.0f}%") @@ -227,7 +235,7 @@ class VirtualTransferPump: self.logger.debug(f" 🔄 {operation_type}进度: {progress:.0f}%") elif i == steps: self.logger.info(f" ✅ {operation_type}完成: {progress:.0f}% | 当前位置: {current_pos:.2f}mL") - + # 等待一小步时间 if i < steps and step_duration > 0: await self._ros_node.sleep(step_duration) @@ -236,25 +244,27 @@ class VirtualTransferPump: self._position = target_position self._current_volume = target_position self.logger.info(f" 📍 微调完成: {target_position:.2f}mL") - + # 确保最终位置准确 self._position = target_position self._current_volume = target_position self._status = "Idle" - + # 📊 最终状态日志 if volume_to_move > 0.01: - self.logger.info(f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") - + self.logger.info( + f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL" + ) + # 返回符合action定义的结果 return { "success": True, "message": f"✅ 成功移动到位置 {self._position:.2f}mL ({operation_type})", "final_position": self._position, "final_volume": self._current_volume, - "operation_type": operation_type + "operation_type": operation_type, } - + except Exception as e: error_msg = f"❌ 设置位置失败: {str(e)}" self.logger.error(error_msg) @@ -262,134 +272,136 @@ class VirtualTransferPump: "success": False, "message": error_msg, "final_position": self._position, - "final_volume": self._current_volume + "final_volume": self._current_volume, } - + # 其他泵操作方法 async def pull_plunger(self, volume: float, velocity: float = None): """ 拉取柱塞(吸液) 📥 - + Args: volume (float): 要拉取的体积 (ml) velocity (float): 拉取速度 (ml/s) """ new_position = min(self.max_volume, self._position + volume) actual_volume = new_position - self._position - + if actual_volume <= 0: self.logger.warning("⚠️ 无法吸液 - 已达到最大容量") return - + display_duration = self._calculate_display_duration(actual_volume, velocity) actual_duration = self._calculate_duration(actual_volume, velocity) - + self.logger.info(f"📥 开始吸液: {actual_volume:.2f}mL") self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + await self._simulate_operation(actual_duration) - + self._position = new_position self._current_volume = new_position - + self.logger.info(f"✅ 吸液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") async def push_plunger(self, volume: float, velocity: float = None): """ 推出柱塞(排液) 📤 - + Args: volume (float): 要推出的体积 (ml) velocity (float): 推出速度 (ml/s) """ new_position = max(0, self._position - volume) actual_volume = self._position - new_position - + if actual_volume <= 0: self.logger.warning("⚠️ 无法排液 - 已达到最小容量") return - + display_duration = self._calculate_display_duration(actual_volume, velocity) actual_duration = self._calculate_duration(actual_volume, velocity) - + self.logger.info(f"📤 开始排液: {actual_volume:.2f}mL") self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + await self._simulate_operation(actual_duration) - + self._position = new_position self._current_volume = new_position - + self.logger.info(f"✅ 排液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") # 便捷操作方法 async def aspirate(self, volume: float, velocity: float = None): """吸液操作 📥""" await self.pull_plunger(volume, velocity) - + async def dispense(self, volume: float, velocity: float = None): """排液操作 📤""" await self.push_plunger(volume, velocity) - + async def transfer(self, volume: float, aspirate_velocity: float = None, dispense_velocity: float = None): """转移操作(先吸后排) 🔄""" self.logger.info(f"🔄 开始转移操作: {volume:.2f}mL") - + # 吸液 await self.aspirate(volume, aspirate_velocity) - + # 短暂停顿 self.logger.debug("⏸️ 短暂停顿...") await self._ros_node.sleep(0.1) - + # 排液 await self.dispense(volume, dispense_velocity) - + async def empty_syringe(self, velocity: float = None): """清空注射器""" await self.set_position(0, velocity) - + async def fill_syringe(self, velocity: float = None): """充满注射器""" await self.set_position(self.max_volume, velocity) - + async def stop_operation(self): """停止当前操作""" self._status = "Idle" self.logger.info("Operation stopped") - + # 状态查询方法 def get_position(self) -> float: """获取当前位置""" return self._position - + def get_current_volume(self) -> float: """获取当前体积""" return self._current_volume - + def get_remaining_capacity(self) -> float: """获取剩余容量""" return self.max_volume - self._current_volume - + def is_empty(self) -> bool: """检查是否为空""" return self._current_volume <= 0.01 # 允许小量误差 - + def is_full(self) -> bool: """检查是否已满""" return self._current_volume >= (self.max_volume - 0.01) # 允许小量误差 - + def __str__(self): - return f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})" - + return ( + f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})" + ) + def __repr__(self): return self.__str__() @@ -398,20 +410,20 @@ class VirtualTransferPump: async def demo(): """虚拟泵使用示例""" pump = VirtualTransferPump("demo_pump", {"max_volume": 50.0}) - + await pump.initialize() - + print(f"Initial state: {pump}") - + # 测试set_position方法 result = await pump.set_position(10.0, max_velocity=2.0) print(f"Set position result: {result}") print(f"After setting position to 10ml: {pump}") - + # 吸液测试 await pump.aspirate(5.0, velocity=2.0) print(f"After aspirating 5ml: {pump}") - + # 清空测试 result = await pump.set_position(0.0) print(f"Empty result: {result}") diff --git a/unilabos/devices/virtual/workbench.py b/unilabos/devices/virtual/workbench.py index 7a8e1454..f5fae47e 100644 --- a/unilabos/devices/virtual/workbench.py +++ b/unilabos/devices/virtual/workbench.py @@ -11,9 +11,10 @@ Virtual Workbench Device - 模拟工作台设备 注意:调用来自线程池,使用 threading.Lock 进行同步 """ + import logging import time -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List from dataclasses import dataclass from enum import Enum from threading import Lock, RLock @@ -21,38 +22,47 @@ from threading import Lock, RLock from typing_extensions import TypedDict from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode -from unilabos.utils.decorator import not_action +from unilabos.utils.decorator import not_action, always_free +from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES # ============ TypedDict 返回类型定义 ============ + class MoveToHeatingStationResult(TypedDict): """move_to_heating_station 返回类型""" + success: bool station_id: int material_id: str material_number: int message: str + unilabos_samples: List[LabSample] class StartHeatingResult(TypedDict): """start_heating 返回类型""" + success: bool station_id: int material_id: str material_number: int message: str + unilabos_samples: List[LabSample] class MoveToOutputResult(TypedDict): """move_to_output 返回类型""" + success: bool station_id: int material_id: str + unilabos_samples: List[LabSample] class PrepareMaterialsResult(TypedDict): """prepare_materials 返回类型 - 批量准备物料""" + success: bool count: int material_1: int # 物料编号1 @@ -61,12 +71,15 @@ class PrepareMaterialsResult(TypedDict): material_4: int # 物料编号4 material_5: int # 物料编号5 message: str + unilabos_samples: List[LabSample] # ============ 状态枚举 ============ + class HeatingStationState(Enum): """加热台状态枚举""" + IDLE = "idle" # 空闲 OCCUPIED = "occupied" # 已放置物料,等待加热 HEATING = "heating" # 加热中 @@ -75,6 +88,7 @@ class HeatingStationState(Enum): class ArmState(Enum): """机械臂状态枚举""" + IDLE = "idle" # 空闲 BUSY = "busy" # 工作中 @@ -82,6 +96,7 @@ class ArmState(Enum): @dataclass class HeatingStation: """加热台数据结构""" + station_id: int state: HeatingStationState = HeatingStationState.IDLE current_material: Optional[str] = None # 当前物料 (如 "A1", "A2") @@ -108,8 +123,8 @@ class VirtualWorkbench: _ros_node: BaseROS2DeviceNode # 配置常量 - ARM_OPERATION_TIME: float = 3.0 # 机械臂操作时间(秒) - HEATING_TIME: float = 10.0 # 加热时间(秒) + ARM_OPERATION_TIME: float = 2 # 机械臂操作时间(秒) + HEATING_TIME: float = 60.0 # 加热时间(秒) NUM_HEATING_STATIONS: int = 3 # 加热台数量 def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs): @@ -126,9 +141,9 @@ class VirtualWorkbench: self.data: Dict[str, Any] = {} # 从config中获取可配置参数 - self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", 3.0)) - self.HEATING_TIME = float(self.config.get("heating_time", 10.0)) - self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", 3)) + self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", self.ARM_OPERATION_TIME)) + self.HEATING_TIME = float(self.config.get("heating_time", self.HEATING_TIME)) + self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", self.NUM_HEATING_STATIONS)) # 机械臂状态和锁 (使用threading.Lock) self._arm_lock = Lock() @@ -137,8 +152,7 @@ class VirtualWorkbench: # 加热台状态 (station_id -> HeatingStation) - 立即初始化,不依赖initialize() self._heating_stations: Dict[int, HeatingStation] = { - i: HeatingStation(station_id=i) - for i in range(1, self.NUM_HEATING_STATIONS + 1) + i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1) } self._stations_lock = RLock() # 可重入锁,保护加热台状态 @@ -178,14 +192,16 @@ class VirtualWorkbench: station.heating_progress = 0.0 # 初始化状态 - self.data.update({ - "status": "Ready", - "arm_state": ArmState.IDLE.value, - "arm_current_task": None, - "heating_stations": self._get_stations_status(), - "active_tasks_count": 0, - "message": "工作台就绪", - }) + self.data.update( + { + "status": "Ready", + "arm_state": ArmState.IDLE.value, + "arm_current_task": None, + "heating_stations": self._get_stations_status(), + "active_tasks_count": 0, + "message": "工作台就绪", + } + ) self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪") return True @@ -204,12 +220,14 @@ class VirtualWorkbench: with self._tasks_lock: self._active_tasks.clear() - self.data.update({ - "status": "Offline", - "arm_state": ArmState.IDLE.value, - "heating_stations": {}, - "message": "工作台已关闭", - }) + self.data.update( + { + "status": "Offline", + "arm_state": ArmState.IDLE.value, + "heating_stations": {}, + "message": "工作台已关闭", + } + ) return True def _get_stations_status(self) -> Dict[int, Dict[str, Any]]: @@ -227,12 +245,14 @@ class VirtualWorkbench: def _update_data_status(self, message: Optional[str] = None): """更新状态数据""" - self.data.update({ - "arm_state": self._arm_state.value, - "arm_current_task": self._arm_current_task, - "heating_stations": self._get_stations_status(), - "active_tasks_count": len(self._active_tasks), - }) + self.data.update( + { + "arm_state": self._arm_state.value, + "arm_current_task": self._arm_current_task, + "heating_stations": self._get_stations_status(), + "active_tasks_count": len(self._active_tasks), + } + ) if message: self.data["message"] = message @@ -280,6 +300,7 @@ class VirtualWorkbench: def prepare_materials( self, + sample_uuids: SampleUUIDsType, count: int = 5, ) -> PrepareMaterialsResult: """ @@ -297,10 +318,7 @@ class VirtualWorkbench: # 生成物料列表 A1 - A{count} materials = [i for i in range(1, count + 1)] - self.logger.info( - f"[准备物料] 生成 {count} 个物料: " - f"A1-A{count} -> material_1~material_{count}" - ) + self.logger.info(f"[准备物料] 生成 {count} 个物料: " f"A1-A{count} -> material_1~material_{count}") return { "success": True, @@ -311,10 +329,12 @@ class VirtualWorkbench: "material_4": materials[3] if len(materials) > 3 else 0, "material_5": materials[4] if len(materials) > 4 else 0, "message": f"已准备 {count} 个物料: A1-A{count}", + "unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()] } def move_to_heating_station( self, + sample_uuids: SampleUUIDsType, material_number: int, ) -> MoveToHeatingStationResult: """ @@ -391,6 +411,9 @@ class VirtualWorkbench: "material_id": material_id, "material_number": material_number, "message": f"{material_id}已成功移动到加热台{station_id}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } except Exception as e: @@ -403,10 +426,15 @@ class VirtualWorkbench: "material_id": material_id, "material_number": material_number, "message": f"移动失败: {str(e)}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } + @always_free def start_heating( self, + sample_uuids: SampleUUIDsType, station_id: int, material_number: int, ) -> StartHeatingResult: @@ -429,6 +457,9 @@ class VirtualWorkbench: "material_id": "", "material_number": material_number, "message": f"无效的加热台ID: {station_id}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } with self._stations_lock: @@ -441,6 +472,9 @@ class VirtualWorkbench: "material_id": "", "material_number": material_number, "message": f"加热台{station_id}上没有物料", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } if station.state == HeatingStationState.HEATING: @@ -450,6 +484,9 @@ class VirtualWorkbench: "material_id": station.current_material, "material_number": material_number, "message": f"加热台{station_id}已经在加热中", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } material_id = station.current_material @@ -465,10 +502,21 @@ class VirtualWorkbench: self._update_data_status(f"加热台{station_id}开始加热{material_id}") - # 模拟加热过程 (10秒) + # 打印当前所有正在加热的台位 + with self._stations_lock: + heating_list = [ + f"加热台{sid}:{s.current_material}" + for sid, s in self._heating_stations.items() + if s.state == HeatingStationState.HEATING and s.current_material + ] + self.logger.info(f"[并行加热] 当前同时加热中: {', '.join(heating_list)}") + + # 模拟加热过程 start_time = time.time() + last_countdown_log = start_time while True: elapsed = time.time() - start_time + remaining = max(0.0, self.HEATING_TIME - elapsed) progress = min(100.0, (elapsed / self.HEATING_TIME) * 100) with self._stations_lock: @@ -476,6 +524,11 @@ class VirtualWorkbench: self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%") + # 每5秒打印一次倒计时 + if time.time() - last_countdown_log >= 5.0: + self.logger.info(f"[加热台{station_id}] {material_id} 剩余 {remaining:.1f}s") + last_countdown_log = time.time() + if elapsed >= self.HEATING_TIME: break @@ -499,10 +552,14 @@ class VirtualWorkbench: "material_id": material_id, "material_number": material_number, "message": f"加热台{station_id}加热完成", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } def move_to_output( self, + sample_uuids: SampleUUIDsType, station_id: int, material_number: int, ) -> MoveToOutputResult: @@ -525,6 +582,9 @@ class VirtualWorkbench: "material_id": "", "output_position": f"C{output_number}", "message": f"无效的加热台ID: {station_id}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } with self._stations_lock: @@ -538,6 +598,9 @@ class VirtualWorkbench: "material_id": "", "output_position": f"C{output_number}", "message": f"加热台{station_id}上没有物料", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } if station.state != HeatingStationState.COMPLETED: @@ -547,6 +610,9 @@ class VirtualWorkbench: "material_id": material_id, "output_position": f"C{output_number}", "message": f"加热台{station_id}尚未完成加热 (当前状态: {station.state.value})", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } output_position = f"C{output_number}" @@ -595,6 +661,9 @@ class VirtualWorkbench: "material_id": material_id, "output_position": output_position, "message": f"{material_id}已成功移动到{output_position}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } except Exception as e: @@ -607,6 +676,9 @@ class VirtualWorkbench: "material_id": "", "output_position": output_position, "message": f"移动失败: {str(e)}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] } # ============ 状态属性 ============ diff --git a/unilabos/registry/device_comms/communication_devices.yaml b/unilabos/registry/device_comms/communication_devices.yaml index ea3f1b61..782889d4 100644 --- a/unilabos/registry/device_comms/communication_devices.yaml +++ b/unilabos/registry/device_comms/communication_devices.yaml @@ -96,10 +96,13 @@ serial: type: string port: type: string + registry_name: + type: string resource_tracker: type: object required: - device_id + - registry_name - port type: object data: diff --git a/unilabos/registry/devices/camera.yaml b/unilabos/registry/devices/camera.yaml index fe1aef28..c8b9d944 100644 --- a/unilabos/registry/devices/camera.yaml +++ b/unilabos/registry/devices/camera.yaml @@ -67,6 +67,9 @@ camera: period: default: 0.1 type: number + registry_name: + default: '' + type: string resource_tracker: type: object required: [] diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index 319d0331..b04d6317 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -9468,7 +9468,7 @@ liquid_handler.prcxi: well_names: null handles: input: - - data_key: plate + - data_key: '@this.0@@@plate' data_source: handle data_type: resource handler_key: input_plate @@ -9503,81 +9503,78 @@ liquid_handler.prcxi: type: string type: array plate: - items: - properties: - category: + properties: + category: + type: string + children: + items: type: string - children: - items: - type: string - type: array - config: - type: string - data: - type: string - id: - type: string - name: - type: string - parent: - type: string - pose: - properties: - orientation: - properties: - w: - type: number - x: - type: number - y: - type: number - z: - type: number - required: - - x - - y - - z - - w - title: orientation - type: object - position: - properties: - x: - type: number - y: - type: number - z: - type: number - required: - - x - - y - - z - title: position - type: object - required: - - position - - orientation - title: pose - type: object - sample_id: - type: string - type: - type: string - required: - - id - - name - - sample_id - - children - - parent - - type - - category - - pose - - config - - data - title: plate - type: object + type: array + config: + type: string + data: + type: string + id: + type: string + name: + type: string + parent: + type: string + pose: + properties: + orientation: + properties: + w: + type: number + x: + type: number + y: + type: number + z: + type: number + required: + - x + - y + - z + - w + title: orientation + type: object + position: + properties: + x: + type: number + y: + type: number + z: + type: number + required: + - x + - y + - z + title: position + type: object + required: + - position + - orientation + title: pose + type: object + sample_id: + type: string + type: + type: string + required: + - id + - name + - sample_id + - children + - parent + - type + - category + - pose + - config + - data title: plate - type: array + type: object volumes: items: type: number @@ -9593,17 +9590,207 @@ liquid_handler.prcxi: - volumes type: object result: + $defs: + ResourceDict: + properties: + class: + description: Resource class name + title: Class + type: string + config: + additionalProperties: true + description: Resource configuration + title: Config + type: object + data: + additionalProperties: true + description: 'Resource data, eg: container liquid data' + title: Data + type: object + description: + default: '' + description: Resource description + title: Description + type: string + extra: + additionalProperties: true + description: 'Extra data, eg: slot index' + title: Extra + type: object + icon: + default: '' + description: Resource icon + title: Icon + type: string + id: + description: Resource ID + title: Id + type: string + model: + additionalProperties: true + description: Resource model + title: Model + type: object + name: + description: Resource name + title: Name + type: string + parent: + anyOf: + - $ref: '#/$defs/ResourceDict' + - type: 'null' + default: null + description: Parent resource object + parent_uuid: + anyOf: + - type: string + - type: 'null' + default: null + description: Parent resource uuid + title: Parent Uuid + pose: + $ref: '#/$defs/ResourceDictPosition' + description: Resource position + schema: + additionalProperties: true + description: Resource schema + title: Schema + type: object + type: + anyOf: + - const: device + type: string + - type: string + description: Resource type + title: Type + uuid: + description: Resource UUID + title: Uuid + type: string + required: + - id + - uuid + - name + - type + - class + - config + - data + - extra + title: ResourceDict + type: object + ResourceDictPosition: + properties: + cross_section_type: + default: rectangle + description: Cross section type + enum: + - rectangle + - circle + - rounded_rectangle + title: Cross Section Type + type: string + layout: + default: x-y + description: Resource layout + enum: + - 2d + - x-y + - z-y + - x-z + title: Layout + type: string + position: + $ref: '#/$defs/ResourceDictPositionObject' + description: Resource position + position3d: + $ref: '#/$defs/ResourceDictPositionObject' + description: Resource position in 3D space + rotation: + $ref: '#/$defs/ResourceDictPositionObject' + description: Resource rotation + scale: + $ref: '#/$defs/ResourceDictPositionScale' + description: Resource scale + size: + $ref: '#/$defs/ResourceDictPositionSize' + description: Resource size + title: ResourceDictPosition + type: object + ResourceDictPositionObject: + properties: + x: + default: 0.0 + description: X coordinate + title: X + type: number + y: + default: 0.0 + description: Y coordinate + title: Y + type: number + z: + default: 0.0 + description: Z coordinate + title: Z + type: number + title: ResourceDictPositionObject + type: object + ResourceDictPositionScale: + properties: + x: + default: 0.0 + description: x scale + title: X + type: number + y: + default: 0.0 + description: y scale + title: Y + type: number + z: + default: 0.0 + description: z scale + title: Z + type: number + title: ResourceDictPositionScale + type: object + ResourceDictPositionSize: + properties: + depth: + default: 0.0 + description: Depth + title: Depth + type: number + height: + default: 0.0 + description: Height + title: Height + type: number + width: + default: 0.0 + description: Width + title: Width + type: number + title: ResourceDictPositionSize + type: object properties: plate: - items: {} + items: + items: + $ref: '#/$defs/ResourceDict' + type: array title: Plate type: array volumes: - items: {} + items: + type: number title: Volumes type: array wells: - items: {} + items: + items: + $ref: '#/$defs/ResourceDict' + type: array title: Wells type: array required: diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml index c38655ca..f0635755 100644 --- a/unilabos/registry/devices/virtual_device.yaml +++ b/unilabos/registry/devices/virtual_device.yaml @@ -5835,6 +5835,25 @@ virtual_workbench: - material_number type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: move_to_heating_station 返回类型 properties: material_id: @@ -5853,12 +5872,18 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - station_id - material_id - material_number - message + - unilabos_samples title: MoveToHeatingStationResult type: object required: @@ -5903,6 +5928,25 @@ virtual_workbench: - material_number type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: move_to_output 返回类型 properties: material_id: @@ -5914,10 +5958,16 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - station_id - material_id + - unilabos_samples title: MoveToOutputResult type: object required: @@ -5972,6 +6022,25 @@ virtual_workbench: required: [] type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: prepare_materials 返回类型 - 批量准备物料 properties: count: @@ -5998,6 +6067,11 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - count @@ -6007,6 +6081,7 @@ virtual_workbench: - material_4 - material_5 - message + - unilabos_samples title: PrepareMaterialsResult type: object required: @@ -6015,6 +6090,7 @@ virtual_workbench: type: object type: UniLabJsonCommand auto-start_heating: + always_free: true feedback: {} goal: {} goal_default: @@ -6062,6 +6138,25 @@ virtual_workbench: - material_number type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: start_heating 返回类型 properties: material_id: @@ -6079,12 +6174,18 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - station_id - material_id - material_number - message + - unilabos_samples title: StartHeatingResult type: object required: diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index ef111e61..844d4cf8 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -5,6 +5,7 @@ import sys import inspect import importlib import threading +import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Any, Dict, List, Union, Tuple @@ -88,6 +89,14 @@ class Registry: ) test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。" + test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {}) + test_resource_schema = self._generate_unilab_json_command_schema( + test_resource_method_info.get("args", []), + "test_resource", + test_resource_method_info.get("return_annotation"), + ) + test_resource_schema["description"] = "用于测试物料、设备和样本。" + self.device_type_registry.update( { "host_node": { @@ -189,32 +198,7 @@ class Registry: "goal": {}, "feedback": {}, "result": {}, - "schema": { - "description": "", - "properties": { - "feedback": {}, - "goal": { - "properties": { - "resource": ros_message_to_json_schema(Resource, "resource"), - "resources": { - "items": { - "properties": ros_message_to_json_schema( - Resource, "resources" - ), - "type": "object", - }, - "type": "array", - }, - "device": {"type": "string"}, - "devices": {"items": {"type": "string"}, "type": "array"}, - }, - "type": "object", - }, - "result": {}, - }, - "title": "test_resource", - "type": "object", - }, + "schema": test_resource_schema, "placeholder_keys": { "device": "unilabos_devices", "devices": "unilabos_devices", @@ -838,6 +822,7 @@ class Registry: ("list", "unilabos.registry.placeholder_type:DeviceSlot"), ] }, + **({"always_free": True} if v.get("always_free") else {}), } for k, v in enhanced_info["action_methods"].items() if k not in device_config["class"]["action_value_mappings"] @@ -943,6 +928,7 @@ class Registry: if is_valid: results.append((file, data, device_ids)) except Exception as e: + traceback.print_exc() logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}") # 线程安全地更新注册表 diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index 6d2d8d01..363112ad 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -5,6 +5,8 @@ from pydantic import BaseModel, field_serializer, field_validator, ValidationErr from pydantic import Field from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union +from typing_extensions import TypedDict + from unilabos.resources.plr_additional_res_reg import register from unilabos.utils.log import logger @@ -14,6 +16,32 @@ if TYPE_CHECKING: EXTRA_CLASS = "unilabos_resource_class" +EXTRA_SAMPLE_UUID = "sample_uuid" +EXTRA_UNILABOS_SAMPLE_UUID = "unilabos_sample_uuid" + +# 函数参数名常量 - 用于自动注入 sample_uuids 列表 +PARAM_SAMPLE_UUIDS = "sample_uuids" + +# JSON Command 中的系统参数字段名 +JSON_UNILABOS_PARAM = "unilabos_param" + +# 返回值中的 samples 字段名 +RETURN_UNILABOS_SAMPLES = "unilabos_samples" + +# sample_uuids 参数类型 (用于 virtual bench 等设备添加 sample_uuids 参数) +SampleUUIDsType = Dict[str, Optional["PLRResource"]] + + +class LabSample(TypedDict): + sample_uuid: str + oss_path: str + extra: Dict[str, Any] + + +class ResourceDictPositionSizeType(TypedDict): + depth: float + width: float + height: float class ResourceDictPositionSize(BaseModel): @@ -22,18 +50,40 @@ class ResourceDictPositionSize(BaseModel): height: float = Field(description="Height", default=0.0) # y +class ResourceDictPositionScaleType(TypedDict): + x: float + y: float + z: float + + class ResourceDictPositionScale(BaseModel): x: float = Field(description="x scale", default=0.0) y: float = Field(description="y scale", default=0.0) z: float = Field(description="z scale", default=0.0) +class ResourceDictPositionObjectType(TypedDict): + x: float + y: float + z: float + + class ResourceDictPositionObject(BaseModel): x: float = Field(description="X coordinate", default=0.0) y: float = Field(description="Y coordinate", default=0.0) z: float = Field(description="Z coordinate", default=0.0) +class ResourceDictPositionType(TypedDict): + size: ResourceDictPositionSizeType + scale: ResourceDictPositionScaleType + layout: Literal["2d", "x-y", "z-y", "x-z"] + position: ResourceDictPositionObjectType + position3d: ResourceDictPositionObjectType + rotation: ResourceDictPositionObjectType + cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] + + class ResourceDictPosition(BaseModel): size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize) scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale) @@ -52,6 +102,24 @@ class ResourceDictPosition(BaseModel): ) +class ResourceDictType(TypedDict): + id: str + uuid: str + name: str + description: str + resource_schema: Dict[str, Any] + model: Dict[str, Any] + icon: str + parent_uuid: Optional[str] + parent: Optional["ResourceDictType"] + type: Union[Literal["device"], str] + klass: str + pose: ResourceDictPositionType + config: Dict[str, Any] + data: Dict[str, Any] + extra: Dict[str, Any] + + # 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化 class ResourceDict(BaseModel): id: str = Field(description="Resource ID") @@ -530,6 +598,7 @@ class ResourceTreeSet(object): plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True) from pylabrobot.resources import Coordinate from pylabrobot.serializer import deserialize + location = cast(Coordinate, deserialize(plr_dict["location"])) plr_resource.location = location plr_resource.load_all_state(all_states) diff --git a/unilabos/ros/device_node_wrapper.py b/unilabos/ros/device_node_wrapper.py index db9caa41..889441a7 100644 --- a/unilabos/ros/device_node_wrapper.py +++ b/unilabos/ros/device_node_wrapper.py @@ -44,8 +44,7 @@ def ros2_device_node( # 从属性中自动发现可发布状态 if status_types is None: status_types = {} - if device_config is None: - raise ValueError("device_config cannot be None") + assert device_config is not None, "device_config cannot be None" if action_value_mappings is None: action_value_mappings = {} if hardware_interface is None: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 56585f68..eddb57a2 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -4,8 +4,20 @@ import json import threading import time import traceback -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \ - Tuple +from typing import ( + get_type_hints, + TypeVar, + Generic, + Dict, + Any, + Type, + TypedDict, + Optional, + List, + TYPE_CHECKING, + Union, + Tuple, +) from concurrent.futures import ThreadPoolExecutor import asyncio @@ -48,6 +60,9 @@ from unilabos.resources.resource_tracker import ( ResourceTreeSet, ResourceTreeInstance, ResourceDictInstance, + EXTRA_SAMPLE_UUID, + PARAM_SAMPLE_UUIDS, + JSON_UNILABOS_PARAM, ) from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator from rclpy.task import Task, Future @@ -131,7 +146,7 @@ def init_wrapper( device_id: str, device_uuid: str, driver_class: type[T], - device_config: ResourceTreeInstance, + device_config: ResourceDictInstance, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], hardware_interface: Dict[str, Any], @@ -216,14 +231,15 @@ class PropertyPublisher: def publish_property(self): try: - self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") + # self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") value = self.get_property() if self.print_publish: - self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") + pass + # self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") if value is not None: msg = convert_to_ros_msg(self.msg_type, value) self.publisher_.publish(msg) - self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") + # self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") except Exception as e: self.node.lab_logger().error( f"【.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}" @@ -263,6 +279,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): self, driver_instance: T, device_id: str, + registry_name: str, device_uuid: str, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], @@ -284,6 +301,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): """ self.driver_instance = driver_instance self.device_id = device_id + self.registry_name = registry_name self.uuid = device_uuid self.publish_high_frequency = False self.callback_group = ReentrantCallbackGroup() @@ -361,6 +379,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): from pylabrobot.resources.deck import Deck from pylabrobot.resources import Coordinate from pylabrobot.resources import Plate + # 物料传输到对应的node节点 client = self._resource_clients["c2s_update_resource_tree"] request = SerialCommand.Request() @@ -388,33 +407,29 @@ class BaseROS2DeviceNode(Node, Generic[T]): rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources) parent_resource = None if bind_parent_id != self.node_name: - parent_resource = self.resource_tracker.figure_resource( - {"name": bind_parent_id} - ) + parent_resource = self.resource_tracker.figure_resource({"name": bind_parent_id}) for r in rts.root_nodes: # noinspection PyUnresolvedReferences r.res_content.parent_uuid = parent_resource.unilabos_uuid else: for r in rts.root_nodes: r.res_content.parent_uuid = self.uuid - - if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer): + rts_plr_instances = rts.to_plr_resources() + if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer): # noinspection PyTypeChecker - container_instance: RegularContainer = rts.root_nodes[0] + container_instance: RegularContainer = rts_plr_instances[0] found_resources = self.resource_tracker.figure_resource( - {"id": container_instance.name}, try_mode=True + {"name": container_instance.name}, try_mode=True ) if not len(found_resources): self.resource_tracker.add_resource(container_instance) logger.info(f"添加物料{container_instance.name}到资源跟踪器") else: - assert ( - len(found_resources) == 1 - ), f"找到多个同名物料: {container_instance.name}, 请检查物料系统" + assert len(found_resources) == 1, f"找到多个同名物料: {container_instance.name}, 请检查物料系统" found_resource = found_resources[0] if isinstance(found_resource, RegularContainer): logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}") - found_resource.state.update(json.loads(container_instance.state)) + found_resource.state.update(container_instance.state) elif isinstance(found_resource, dict): raise ValueError("已不支持 字典 版本的RegularContainer") else: @@ -422,14 +437,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}" ) # noinspection PyUnresolvedReferences - request.command = json.dumps({ - "action": "add", - "data": { - "data": rts.dump(), - "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", - "first_add": False, - }, - }) + request.command = json.dumps( + { + "action": "add", + "data": { + "data": rts.dump(), + "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else self.uuid, + "first_add": False, + }, + } + ) tree_response: SerialCommand.Response = await client.call_async(request) uuid_maps = json.loads(tree_response.response) plr_instances = rts.to_plr_resources() @@ -443,7 +460,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): } res.response = json.dumps(final_response) # 如果driver自己就有assign的方法,那就使用driver自己的assign方法 - if hasattr(self.driver_instance, "create_resource"): + if hasattr(self.driver_instance, "create_resource") and self.node_name != "host_node": create_resource_func = getattr(self.driver_instance, "create_resource") try: ret = create_resource_func( @@ -471,7 +488,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1: ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT) LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT) - self.lab_logger().warning(f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个") + self.lab_logger().warning( + f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个" + ) for liquid_type, liquid_volume, liquid_input_slot in zip( ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT ): @@ -490,9 +509,15 @@ class BaseROS2DeviceNode(Node, Generic[T]): input_wells = [] for r in LIQUID_INPUT_SLOT: input_wells.append(plr_instance.children[r]) - final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(input_wells).dump() + final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources( + input_wells + ).dump() res.response = json.dumps(final_response) - if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param: + if ( + issubclass(parent_resource.__class__, Deck) + and hasattr(parent_resource, "assign_child_at_slot") + and "slot" in other_calling_param + ): other_calling_param["slot"] = int(other_calling_param["slot"]) parent_resource.assign_child_at_slot(plr_instance, **other_calling_param) else: @@ -507,14 +532,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource]) if rts_with_parent.root_nodes[0].res_content.uuid_parent is None: rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid - request.command = json.dumps({ - "action": "add", - "data": { - "data": rts_with_parent.dump(), - "mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent, - "first_add": False, - }, - }) + request.command = json.dumps( + { + "action": "add", + "data": { + "data": rts_with_parent.dump(), + "mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent, + "first_add": False, + }, + } + ) tree_response: SerialCommand.Response = await client.call_async(request) uuid_maps = json.loads(tree_response.response) self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) @@ -811,7 +838,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): } def _handle_update( - plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any] + plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], + tree_set: ResourceTreeSet, + additional_add_params: Dict[str, Any], ) -> Tuple[Dict[str, Any], List[ResourcePLR]]: """ 处理资源更新操作的内部函数 @@ -836,7 +865,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): original_parent_resource = original_instance.parent original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None) target_parent_resource_uuid = tree.root_node.res_content.uuid_parent - not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None + not_same_parent = ( + original_parent_resource_uuid != target_parent_resource_uuid + and original_parent_resource is not None + ) old_name = original_instance.name new_name = plr_resource.name parent_appended = False @@ -872,8 +904,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): else: # 判断是否变更了resource_site,重新登记 target_site = original_instance.unilabos_extra.get("update_resource_site") - sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None - site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else [] + sites = ( + original_instance.parent.sites + if original_instance.parent is not None and hasattr(original_instance.parent, "sites") + else None + ) + site_names = ( + list(original_instance.parent._ordering.keys()) + if original_instance.parent is not None and hasattr(original_instance.parent, "sites") + else [] + ) if target_site is not None and sites is not None and site_names is not None: site_index = sites.index(original_instance) site_name = site_names[site_index] @@ -910,9 +950,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): action = i.get("action") # remove, add, update resources_uuid: List[str] = i.get("data") # 资源数据 additional_add_params = i.get("additional_add_params", {}) # 额外参数 - self.lab_logger().trace( - f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}" - ) + self.lab_logger().trace(f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}") tree_set = None if action in ["add", "update"]: tree_set = await self.get_resource( @@ -939,9 +977,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( - {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 + {"data": {"data": new_tree_set.dump()}, "action": "update"} + ) # 和Update Resource一致 response: SerialCommand_Response = await self._resource_clients[ - "c2s_update_resource_tree"].call_async(r) # type: ignore + "c2s_update_resource_tree" + ].call_async( + r + ) # type: ignore self.lab_logger().info(f"确认资源云端 Add 结果: {response.response}") results.append(result) elif action == "update": @@ -961,9 +1003,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( - {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 + {"data": {"data": new_tree_set.dump()}, "action": "update"} + ) # 和Update Resource一致 response: SerialCommand_Response = await self._resource_clients[ - "c2s_update_resource_tree"].call_async(r) # type: ignore + "c2s_update_resource_tree" + ].call_async( + r + ) # type: ignore self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}") results.append(result) elif action == "remove": @@ -1110,6 +1156,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): "machine_name": BasicConfig.machine_name, "type": "slave", "edge_device_id": self.device_id, + "registry_name": self.registry_name, } }, ensure_ascii=False, @@ -1333,7 +1380,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): resource_id=resource_data["id"], with_children=True ) if "sample_id" in resource_data: - plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"] + plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"] queried_resources[idx] = plr_resource else: uuid_indices.append((idx, unilabos_uuid, resource_data)) @@ -1346,7 +1393,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): for i, (idx, _, resource_data) in enumerate(uuid_indices): plr_resource = plr_resources[i] if "sample_id" in resource_data: - plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"] + plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"] queried_resources[idx] = plr_resource self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源") @@ -1354,7 +1401,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 通过资源跟踪器获取本地实例 final_resources = queried_resources if is_sequence else queried_resources[0] if not is_sequence: - plr = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) + plr = self.resource_tracker.figure_resource( + {"name": final_resources.name}, try_mode=False + ) # 保留unilabos_extra if hasattr(final_resources, "unilabos_extra") and hasattr(plr, "unilabos_extra"): plr.unilabos_extra = getattr(final_resources, "unilabos_extra", {}).copy() @@ -1393,8 +1442,12 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_success = True except Exception as _: execution_error = traceback.format_exc() - error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}") - trace(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + error( + f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}" + ) + trace( + f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) future.add_done_callback(_handle_future_exception) @@ -1414,9 +1467,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): except Exception as _: execution_error = traceback.format_exc() error( - f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}") + f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}" + ) trace( - f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) future.add_done_callback(_handle_future_exception) @@ -1483,11 +1538,18 @@ class BaseROS2DeviceNode(Node, Generic[T]): if isinstance(rs, list): for r in rs: res = self.resource_tracker.parent_resource(r) # 获取 resource 对象 + if res is None: + res = rs + if id(res) not in seen: + seen.add(id(res)) + unique_resources.append(res) else: res = self.resource_tracker.parent_resource(rs) - if id(res) not in seen: - seen.add(id(res)) - unique_resources.append(res) + if res is None: + res = rs + if id(res) not in seen: + seen.add(id(res)) + unique_resources.append(res) # 使用新的资源树接口 if unique_resources: @@ -1539,20 +1601,37 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: function_name = target["function_name"] function_args = target["function_args"] + # 获取 unilabos 系统参数 + unilabos_param: Dict[str, Any] = target[JSON_UNILABOS_PARAM] + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) assert callable( function ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" - # 处理 ResourceSlot 类型参数 - args_list = default_manager._analyze_method_signature(function)["args"] + # 处理参数(包含 unilabos 系统参数如 sample_uuids) + args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"] for arg in args_list: arg_name = arg["name"] arg_type = arg["type"] # 跳过不在 function_args 中的参数 if arg_name not in function_args: + # 处理 sample_uuids 参数注入 + if arg_name == PARAM_SAMPLE_UUIDS: + raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {}) + # 将 material uuid 转换为 resource 实例 + # key: sample_uuid, value: material_uuid -> resource 实例 + resolved_sample_uuids: Dict[str, Any] = {} + for sample_uuid, material_uuid in raw_sample_uuids.items(): + if material_uuid and self.resource_tracker: + resource = self.resource_tracker.uuid_to_resources.get(material_uuid) + resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid + else: + resolved_sample_uuids[sample_uuid] = material_uuid + function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids + self.lab_logger().debug(f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}") continue # 处理单个 ResourceSlot @@ -1581,6 +1660,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}" ) raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}") + # todo: 默认反报送 return function(**function_args) except KeyError as ex: @@ -1601,14 +1681,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): raise ValueError("至少需要提供一个 UUID") uuids_list = list(uuids) - future = self._resource_clients["c2s_update_resource_tree"].call_async(SerialCommand.Request( - command=json.dumps( - { - "data": {"data": uuids_list, "with_children": True}, - "action": "get", - } + future = self._resource_clients["c2s_update_resource_tree"].call_async( + SerialCommand.Request( + command=json.dumps( + { + "data": {"data": uuids_list, "with_children": True}, + "action": "get", + } + ) ) - )) + ) # 等待结果(使用while循环,每次sleep 0.05秒,最多等待30秒) timeout = 30.0 @@ -1666,6 +1748,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: function_name = target["function_name"] function_args = target["function_args"] + # 获取 unilabos 系统参数 + unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {}) + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) assert callable( @@ -1675,14 +1760,30 @@ class BaseROS2DeviceNode(Node, Generic[T]): function ), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" - # 处理 ResourceSlot 类型参数 - args_list = default_manager._analyze_method_signature(function)["args"] + # 处理参数(包含 unilabos 系统参数如 sample_uuids) + args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"] for arg in args_list: arg_name = arg["name"] arg_type = arg["type"] # 跳过不在 function_args 中的参数 if arg_name not in function_args: + # 处理 sample_uuids 参数注入 + if arg_name == PARAM_SAMPLE_UUIDS: + raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {}) + # 将 material uuid 转换为 resource 实例 + # key: sample_uuid, value: material_uuid -> resource 实例 + resolved_sample_uuids: Dict[str, Any] = {} + for sample_uuid, material_uuid in raw_sample_uuids.items(): + if material_uuid and self.resource_tracker: + resource = self.resource_tracker.uuid_to_resources.get(material_uuid) + resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid + else: + resolved_sample_uuids[sample_uuid] = material_uuid + function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids + self.lab_logger().debug( + f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}" + ) continue # 处理单个 ResourceSlot @@ -1907,6 +2008,7 @@ class ROS2DeviceNode: if driver_is_ros: driver_params["device_id"] = device_id + driver_params["registry_name"] = device_config.res_content.klass driver_params["resource_tracker"] = self.resource_tracker self._driver_instance = self._driver_creator.create_instance(driver_params) if self._driver_instance is None: @@ -1924,6 +2026,7 @@ class ROS2DeviceNode: children=children, driver_instance=self._driver_instance, # type: ignore device_id=device_id, + registry_name=device_config.res_content.klass, device_uuid=device_uuid, status_types=status_types, action_value_mappings=action_value_mappings, @@ -1935,6 +2038,7 @@ class ROS2DeviceNode: self._ros_node = BaseROS2DeviceNode( driver_instance=self._driver_instance, device_id=device_id, + registry_name=device_config.res_content.klass, device_uuid=device_uuid, status_types=status_types, action_value_mappings=action_value_mappings, @@ -1943,6 +2047,7 @@ class ROS2DeviceNode: resource_tracker=self.resource_tracker, ) self._ros_node: BaseROS2DeviceNode + # 将注册表类型名传递给BaseROS2DeviceNode,用于slave上报 self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}") self.driver_instance._ros_node = self._ros_node # type: ignore self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore @@ -1960,7 +2065,9 @@ class ROS2DeviceNode: asyncio.set_event_loop(loop) loop.run_forever() - ROS2DeviceNode._asyncio_loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNode") + ROS2DeviceNode._asyncio_loop_thread = threading.Thread( + target=run_event_loop, daemon=True, name="ROS2DeviceNode" + ) ROS2DeviceNode._asyncio_loop_thread.start() logger.info(f"循环线程已启动") diff --git a/unilabos/ros/nodes/presets/camera.py b/unilabos/ros/nodes/presets/camera.py index 25ae921a..2267f676 100644 --- a/unilabos/ros/nodes/presets/camera.py +++ b/unilabos/ros/nodes/presets/camera.py @@ -6,12 +6,13 @@ from cv_bridge import CvBridge from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker class VideoPublisher(BaseROS2DeviceNode): - def __init__(self, device_id='video_publisher', device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None): + def __init__(self, device_id='video_publisher', registry_name="", device_uuid='', camera_index=0, period: float = 0.1, resource_tracker: DeviceNodeResourceTracker = None): # 初始化BaseROS2DeviceNode,使用自身作为driver_instance BaseROS2DeviceNode.__init__( self, driver_instance=self, device_id=device_id, + registry_name=registry_name, device_uuid=device_uuid, status_types={}, action_value_mappings={}, diff --git a/unilabos/ros/nodes/presets/controller_node.py b/unilabos/ros/nodes/presets/controller_node.py index 84510737..78d07577 100644 --- a/unilabos/ros/nodes/presets/controller_node.py +++ b/unilabos/ros/nodes/presets/controller_node.py @@ -10,6 +10,7 @@ class ControllerNode(BaseROS2DeviceNode): def __init__( self, device_id: str, + registry_name: str, controller_func: Callable, update_rate: float, inputs: Dict[str, Dict[str, type | str]], @@ -51,6 +52,7 @@ class ControllerNode(BaseROS2DeviceNode): self, driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types=status_types, action_value_mappings=action_value_mappings, hardware_interface=hardware_interface, diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 4cfa9c1d..30c5d414 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -1,17 +1,17 @@ import collections -from dataclasses import dataclass, field import json import threading import time import traceback import uuid +from dataclasses import dataclass, field from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union -from typing_extensions import TypedDict from action_msgs.msg import GoalStatus from geometry_msgs.msg import Point from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.service import Service +from typing_extensions import TypedDict from unilabos_msgs.msg import Resource # type: ignore from unilabos_msgs.srv import ( ResourceAdd, @@ -23,10 +23,20 @@ from unilabos_msgs.srv import ( from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unique_identifier_msgs.msg import UUID +from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot from unilabos.registry.registry import lab_registry from unilabos.resources.container import RegularContainer from unilabos.resources.graphio import initialize_resource from unilabos.resources.registry import add_schema +from unilabos.resources.resource_tracker import ( + ResourceDict, + ResourceDictInstance, + ResourceTreeSet, + ResourceTreeInstance, + RETURN_UNILABOS_SAMPLES, + JSON_UNILABOS_PARAM, + PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample, +) from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( get_msg_type, @@ -37,17 +47,11 @@ from unilabos.ros.msgs.message_converter import ( ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode -from unilabos.resources.resource_tracker import ( - ResourceDict, - ResourceDictInstance, - ResourceTreeSet, - ResourceTreeInstance, -) from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.log import warning from unilabos.utils.type_check import serialize_result_info -from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot +from unilabos.config.config import BasicConfig if TYPE_CHECKING: from unilabos.app.ws_client import QueueItem @@ -60,7 +64,8 @@ class DeviceActionStatus: class TestResourceReturn(TypedDict): resources: List[List[ResourceDict]] - devices: List[DeviceSlot] + devices: List[Dict[str, Any]] + unilabos_samples: List[LabSample] class TestLatencyReturn(TypedDict): @@ -245,6 +250,7 @@ class HostNode(BaseROS2DeviceNode): self, driver_instance=self, device_id=device_id, + registry_name="host_node", device_uuid=host_node_dict["uuid"], status_types={}, action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"], @@ -299,7 +305,8 @@ class HostNode(BaseROS2DeviceNode): } # 用来存储多个ActionClient实例 self._action_value_mappings: Dict[str, Dict] = ( {} - ) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系 + ) # device_id -> action_value_mappings(本地+远程设备统一存储) + self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings) self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态 self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备 self._last_discovery_time = 0.0 # 上次设备发现的时间 @@ -633,6 +640,8 @@ class HostNode(BaseROS2DeviceNode): self.device_machine_names[device_id] = "本地" self.devices_instances[device_id] = d # noinspection PyProtectedMember + self._action_value_mappings[device_id] = d._ros_node._action_value_mappings + # noinspection PyProtectedMember for action_name, action_value_mapping in d._ros_node._action_value_mappings.items(): if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith( "UniLabJsonCommand" @@ -755,6 +764,7 @@ class HostNode(BaseROS2DeviceNode): item: "QueueItem", action_type: str, action_kwargs: Dict[str, Any], + sample_material: Dict[str, str], server_info: Optional[Dict[str, Any]] = None, ) -> None: """ @@ -768,18 +778,29 @@ class HostNode(BaseROS2DeviceNode): u = uuid.UUID(item.job_id) device_id = item.device_id action_name = item.action_name + + if BasicConfig.test_mode: + action_id = f"/devices/{device_id}/{action_name}" + self.lab_logger().info( + f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}" + ) + # 根据注册表 handles 构建模拟返回值 + mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs) + self._handle_test_mode_result(item, action_id, mock_return) + return + if action_type.startswith("UniLabJsonCommand"): if action_name.startswith("auto-"): action_name = action_name[5:] action_id = f"/devices/{device_id}/_execute_driver_command" - action_kwargs = { - "string": json.dumps( - { - "function_name": action_name, - "function_args": action_kwargs, - } - ) + json_command: Dict[str, Any] = { + "function_name": action_name, + "function_args": action_kwargs, + JSON_UNILABOS_PARAM: { + PARAM_SAMPLE_UUIDS: sample_material, + }, } + action_kwargs = {"string": json.dumps(json_command)} if action_type.startswith("UniLabJsonCommandAsync"): action_id = f"/devices/{device_id}/_execute_driver_command_async" else: @@ -790,21 +811,6 @@ class HostNode(BaseROS2DeviceNode): raise ValueError(f"ActionClient {action_id} not found.") action_client: ActionClient = self._action_clients[action_id] - - # 遍历action_kwargs下的所有子dict,将"sample_uuid"的值赋给"sample_id" - def assign_sample_id(obj): - if isinstance(obj, dict): - if "sample_uuid" in obj: - obj["sample_id"] = obj["sample_uuid"] - obj.pop("sample_uuid") - for k, v in obj.items(): - if k != "unilabos_extra": - assign_sample_id(v) - elif isinstance(obj, list): - for item in obj: - assign_sample_id(item) - - assign_sample_id(action_kwargs) goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) # self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}") @@ -820,6 +826,51 @@ class HostNode(BaseROS2DeviceNode): ) future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f)) + def _build_test_mode_return( + self, device_id: str, action_name: str, action_kwargs: Dict[str, Any] + ) -> Dict[str, Any]: + """ + 根据注册表 handles 的 output 定义构建测试模式的模拟返回值 + + 根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。 + 例如: "vessel" → {}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]] + """ + mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name} + action_mappings = self._action_value_mappings.get(device_id, {}) + action_mapping = action_mappings.get(action_name, {}) + handles = action_mapping.get("handles", {}) + if isinstance(handles, dict): + for output_handle in handles.get("output", []): + data_key = output_handle.get("data_key", "") + handler_key = output_handle.get("handler_key", "") + # 根据 @flatten 层数构建嵌套数组,叶子为空字典 + flatten_count = data_key.count("@flatten") + value: Any = {} + for _ in range(flatten_count): + value = [value] + mock_return[handler_key] = value + return mock_return + + def _handle_test_mode_result( + self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any] + ) -> None: + """ + 测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS) + """ + job_id = item.job_id + status = "success" + return_info = serialize_result_info("", True, mock_return) + + self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}") + + from unilabos.app.web.controller import store_job_result + store_job_result(job_id, status, return_info, mock_return) + + # 发布状态到桥接器 + for bridge in self.bridges: + if hasattr(bridge, "publish_job_status"): + bridge.publish_job_status(mock_return, item, status, return_info) + def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: """目标响应回调""" goal_handle = future.result() @@ -867,14 +918,14 @@ class HostNode(BaseROS2DeviceNode): # 适配后端的一些额外处理 return_value = return_info.get("return_value") if isinstance(return_value, dict): - unilabos_samples = return_value.pop("unilabos_samples", None) + unilabos_samples = return_value.pop(RETURN_UNILABOS_SAMPLES, None) if isinstance(unilabos_samples, list) and unilabos_samples: self.lab_logger().info( f"[Host Node] Job {job_id[:8]} returned {len(unilabos_samples)} sample(s): " f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}" f"{'...' if len(unilabos_samples) > 5 else ''}" ) - return_info["unilabos_samples"] = unilabos_samples + return_info["samples"] = unilabos_samples suc = return_info.get("suc", False) if not suc: status = "failed" @@ -1179,6 +1230,10 @@ class HostNode(BaseROS2DeviceNode): def _node_info_update_callback(self, request, response): """ 更新节点信息回调 + + 处理两种消息: + 1. 首次上报(main_slave_run): 带 devices_config + registry_config,存储 action_value_mappings + 2. 设备重注册(SYNC_SLAVE_NODE_INFO): 带 edge_device_id + registry_name,用 registry_name 索引已存储的 mappings """ self.lab_logger().trace(f"[Host Node] Node info update request received: {request}") try: @@ -1190,12 +1245,48 @@ class HostNode(BaseROS2DeviceNode): info = info["SYNC_SLAVE_NODE_INFO"] machine_name = info["machine_name"] edge_device_id = info["edge_device_id"] + registry_name = info.get("registry_name", "") self.device_machine_names[edge_device_id] = machine_name + + # 用 registry_name 索引已存储的 registry_config,获取 action_value_mappings + if registry_name and registry_name in self._slave_registry_configs: + action_mappings = self._slave_registry_configs[registry_name].get( + "class", {} + ).get("action_value_mappings", {}) + if action_mappings: + self._action_value_mappings[edge_device_id] = action_mappings + self.lab_logger().info( + f"[Host Node] Loaded {len(action_mappings)} action mappings " + f"for remote device {edge_device_id} (registry: {registry_name})" + ) else: devices_config = info.pop("devices_config") registry_config = info.pop("registry_config") if registry_config: http_client.resource_registry({"resources": registry_config}) + + # 存储 slave 的 registry_config,用于后续 SYNC_SLAVE_NODE_INFO 索引 + for reg_name, reg_data in registry_config.items(): + if isinstance(reg_data, dict) and "class" in reg_data: + self._slave_registry_configs[reg_name] = reg_data + + # 解析 devices_config,建立 device_id -> action_value_mappings 映射 + if devices_config: + for device_tree in devices_config: + for device_dict in device_tree: + device_id = device_dict.get("id", "") + class_name = device_dict.get("class", "") + if device_id and class_name and class_name in self._slave_registry_configs: + action_mappings = self._slave_registry_configs[class_name].get( + "class", {} + ).get("action_value_mappings", {}) + if action_mappings: + self._action_value_mappings[device_id] = action_mappings + self.lab_logger().info( + f"[Host Node] Stored {len(action_mappings)} action mappings " + f"for remote device {device_id} (class: {class_name})" + ) + self.lab_logger().debug(f"[Host Node] Node info update: {info}") response.response = "OK" except Exception as e: @@ -1492,6 +1583,7 @@ class HostNode(BaseROS2DeviceNode): def test_resource( self, + sample_uuids: SampleUUIDsType, resource: ResourceSlot = None, resources: List[ResourceSlot] = None, device: DeviceSlot = None, @@ -1506,6 +1598,7 @@ class HostNode(BaseROS2DeviceNode): return { "resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(), "devices": [device, *devices], + "unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()] } def handle_pong_response(self, pong_data: dict): diff --git a/unilabos/ros/nodes/presets/joint_republisher.py b/unilabos/ros/nodes/presets/joint_republisher.py index 65218303..b8290377 100644 --- a/unilabos/ros/nodes/presets/joint_republisher.py +++ b/unilabos/ros/nodes/presets/joint_republisher.py @@ -7,10 +7,11 @@ from rclpy.callback_groups import ReentrantCallbackGroup from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode class JointRepublisher(BaseROS2DeviceNode): - def __init__(self,device_id,resource_tracker, **kwargs): + def __init__(self,device_id, registry_name, resource_tracker, **kwargs): super().__init__( driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types={}, action_value_mappings={}, hardware_interface={}, diff --git a/unilabos/ros/nodes/presets/resource_mesh_manager.py b/unilabos/ros/nodes/presets/resource_mesh_manager.py index 1ff504c4..45e330dd 100644 --- a/unilabos/ros/nodes/presets/resource_mesh_manager.py +++ b/unilabos/ros/nodes/presets/resource_mesh_manager.py @@ -26,7 +26,7 @@ from unilabos.resources.graphio import initialize_resources from unilabos.registry.registry import lab_registry class ResourceMeshManager(BaseROS2DeviceNode): - def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", rate=50, **kwargs): + def __init__(self, resource_model: dict, resource_config: list,resource_tracker, device_id: str = "resource_mesh_manager", registry_name: str = "", rate=50, **kwargs): """初始化资源网格管理器节点 Args: @@ -37,6 +37,7 @@ class ResourceMeshManager(BaseROS2DeviceNode): super().__init__( driver_instance=self, device_id=device_id, + registry_name=registry_name, status_types={}, action_value_mappings={}, hardware_interface={}, diff --git a/unilabos/ros/nodes/presets/serial_node.py b/unilabos/ros/nodes/presets/serial_node.py index 545682bd..11a04bda 100644 --- a/unilabos/ros/nodes/presets/serial_node.py +++ b/unilabos/ros/nodes/presets/serial_node.py @@ -7,7 +7,7 @@ from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeRe class ROS2SerialNode(BaseROS2DeviceNode): - def __init__(self, device_id, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None): + def __init__(self, device_id, registry_name, port: str, baudrate: int = 9600, resource_tracker: DeviceNodeResourceTracker=None): # 保存属性,以便在调用父类初始化前使用 self.port = port self.baudrate = baudrate @@ -28,6 +28,7 @@ class ROS2SerialNode(BaseROS2DeviceNode): BaseROS2DeviceNode.__init__( self, driver_instance=self, + registry_name=registry_name, device_id=device_id, status_types={}, action_value_mappings={}, diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index f30e33b2..902e2967 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -47,6 +47,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): *, driver_instance: "WorkstationBase", device_id: str, + registry_name: str, device_uuid: str, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], @@ -62,6 +63,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): super().__init__( driver_instance=driver_instance, device_id=device_id, + registry_name=registry_name, device_uuid=device_uuid, status_types=status_types, action_value_mappings={**action_value_mappings, **self.protocol_action_mappings}, @@ -340,6 +342,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False) # 获取父资源 res = self.resource_tracker.parent_resource(plr) + if res is None: + res = plr if id(res) not in seen: seen.add(id(res)) unique_resources.append(res) diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index 35aca5ec..47e7533c 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -52,7 +52,8 @@ class DeviceClassCreator(Generic[T]): if self.device_instance is not None: for c in self.children: if c.res_content.type != "device": - self.resource_tracker.add_resource(c.get_plr_nested_dict()) + res = ResourceTreeSet([ResourceTreeInstance(c)]).to_plr_resources()[0] + self.resource_tracker.add_resource(res) def create_instance(self, data: Dict[str, Any]) -> T: """ @@ -119,7 +120,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): # return resource, source_type def _process_resource_references( - self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None + self, data: Any, processed_child_names: Optional[Dict[str, Any]], to_dict=False, states=None, prefix_path="", name_to_uuid=None ) -> Any: """ 递归处理资源引用,替换_resource_child_name对应的资源 @@ -164,6 +165,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): states[prefix_path] = resource_instance.serialize_all_state() return serialized else: + processed_child_names[child_name] = resource_instance self.resource_tracker.add_resource(resource_instance) # 立即设置UUID,state已经在resource_ulab_to_plr中处理过了 if name_to_uuid: @@ -182,12 +184,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): result = {} for key, value in data.items(): new_prefix = f"{prefix_path}.{key}" if prefix_path else key - result[key] = self._process_resource_references(value, to_dict, states, new_prefix, name_to_uuid) + result[key] = self._process_resource_references(value, processed_child_names, to_dict, states, new_prefix, name_to_uuid) return result elif isinstance(data, list): return [ - self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) + self._process_resource_references(item, processed_child_names, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) for i, item in enumerate(data) ] @@ -234,7 +236,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): # 首先处理资源引用 states = {} processed_data = self._process_resource_references( - data, to_dict=True, states=states, name_to_uuid=name_to_uuid + data, {}, to_dict=True, states=states, name_to_uuid=name_to_uuid ) try: @@ -270,7 +272,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): arg_value = spec_args[param_name].annotation data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}") - processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid) + processed_child_names = {} + processed_data = self._process_resource_references(data, processed_child_names, to_dict=False, name_to_uuid=name_to_uuid) + for child_name, resource_instance in processed_data.items(): + for ind, name in enumerate([child.res_content.name for child in self.children]): + if name == child_name: + self.children.pop(ind) self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用,调用的自身的attach_resource except Exception as e: logger.error(f"PyLabRobot创建实例失败: {e}") @@ -342,9 +349,10 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): try: # 创建实例,额外补充一个给protocol node的字段,后面考虑取消 data["children"] = self.children - for child in self.children: - if child.res_content.type != "device": - self.resource_tracker.add_resource(child.get_plr_nested_dict()) + # super(WorkstationNodeCreator, self).create_instance(data)的时候会attach + # for child in self.children: + # if child.res_content.type != "device": + # self.resource_tracker.add_resource(child.get_plr_nested_dict()) deck_dict = data.get("deck") if deck_dict: from pylabrobot.resources import Deck, Resource diff --git a/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json b/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json index 9af64af3..74ca47e2 100644 --- a/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json +++ b/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json @@ -339,13 +339,8 @@ "z": 0 }, "config": { - "max_volume": 500.0, "type": "RegularContainer", - "category": "container", - "max_temp": 200.0, - "min_temp": -20.0, - "has_stirrer": true, - "has_heater": true + "category": "container" }, "data": { "liquids": [], @@ -769,9 +764,7 @@ "size_y": 250, "size_z": 0, "type": "RegularContainer", - "category": "container", - "reagent": "sodium_chloride", - "physical_state": "solid" + "category": "container" }, "data": { "current_mass": 500.0, @@ -792,14 +785,11 @@ "z": 0 }, "config": { - "volume": 500.0, "size_x": 600, "size_y": 250, "size_z": 0, "type": "RegularContainer", - "category": "container", - "reagent": "sodium_carbonate", - "physical_state": "solid" + "category": "container" }, "data": { "current_mass": 500.0, @@ -820,14 +810,11 @@ "z": 0 }, "config": { - "volume": 500.0, "size_x": 650, "size_y": 250, "size_z": 0, "type": "RegularContainer", - "category": "container", - "reagent": "magnesium_chloride", - "physical_state": "solid" + "category": "container" }, "data": { "current_mass": 500.0, diff --git a/unilabos/utils/decorator.py b/unilabos/utils/decorator.py index 57e968a8..22a90736 100644 --- a/unilabos/utils/decorator.py +++ b/unilabos/utils/decorator.py @@ -184,6 +184,51 @@ def get_all_subscriptions(instance) -> list: return subscriptions +def always_free(func: F) -> F: + """ + 标记动作为永久闲置(不受busy队列限制)的装饰器 + + 被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制, + 任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。 + + Example: + class MyDriver: + @always_free + def query_status(self, param: str): + # 这个动作可以随时执行,不需要排队 + return self._status + + def transfer(self, volume: float): + # 这个动作会按正常排队逻辑执行 + pass + + Note: + - 可以与其他装饰器组合使用,@always_free 应放在最外层 + - 仅影响 WebSocket 调度层的 busy/free 判断,不影响 ROS2 层 + """ + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + wrapper._is_always_free = True # type: ignore[attr-defined] + + return wrapper # type: ignore[return-value] + + +def is_always_free(func) -> bool: + """ + 检查函数是否被标记为永久闲置 + + Args: + func: 被检查的函数 + + Returns: + 如果函数被 @always_free 装饰则返回 True,否则返回 False + """ + return getattr(func, "_is_always_free", False) + + def not_action(func: F) -> F: """ 标记方法为非动作的装饰器 diff --git a/unilabos/utils/import_manager.py b/unilabos/utils/import_manager.py index 2df76360..dabbe1a7 100644 --- a/unilabos/utils/import_manager.py +++ b/unilabos/utils/import_manager.py @@ -27,8 +27,9 @@ __all__ = [ from ast import Constant +from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS from unilabos.utils import logger -from unilabos.utils.decorator import is_not_action +from unilabos.utils.decorator import is_not_action, is_always_free class ImportManager: @@ -281,6 +282,9 @@ class ImportManager: continue # 其他非_开头的方法归类为action method_info = self._analyze_method_signature(method) + # 检查是否被 @always_free 装饰器标记 + if is_always_free(method): + method_info["always_free"] = True result["action_methods"][name] = method_info return result @@ -338,16 +342,24 @@ class ImportManager: if self._is_not_action_method(node): continue # 其他非_开头的方法归类为action + # 检查是否被 @always_free 装饰器标记 + if self._is_always_free_method(node): + method_info["always_free"] = True result["action_methods"][method_name] = method_info return result - def _analyze_method_signature(self, method) -> Dict[str, Any]: + def _analyze_method_signature(self, method, skip_unilabos_params: bool = True) -> Dict[str, Any]: """ 分析方法签名,提取具体的命名参数信息 注意:此方法会跳过*args和**kwargs,只提取具体的命名参数 这样可以确保通过**dict方式传参时的准确性 + Args: + method: 要分析的方法 + skip_unilabos_params: 是否跳过 unilabos 系统参数(如 sample_uuids), + registry 补全时为 True,JsonCommand 执行时为 False + 示例用法: method_info = self._analyze_method_signature(some_method) params = {"param1": "value1", "param2": "value2"} @@ -368,6 +380,10 @@ class ImportManager: if param.kind == param.VAR_KEYWORD: # **kwargs continue + # 跳过 sample_uuids 参数(由系统自动注入,registry 补全时跳过) + if skip_unilabos_params and param_name == PARAM_SAMPLE_UUIDS: + continue + is_required = param.default == inspect.Parameter.empty if is_required: num_required += 1 @@ -464,6 +480,13 @@ class ImportManager: return True return False + def _is_always_free_method(self, node: ast.FunctionDef) -> bool: + """检查是否是@always_free装饰的方法""" + for decorator in node.decorator_list: + if isinstance(decorator, ast.Name) and decorator.id == "always_free": + return True + return False + def _get_property_name_from_setter(self, node: ast.FunctionDef) -> str: """从setter装饰器中获取属性名""" for decorator in node.decorator_list: @@ -563,6 +586,9 @@ class ImportManager: for i, arg in enumerate(node.args.args): if arg.arg == "self": continue + # 跳过 sample_uuids 参数(由系统自动注入) + if arg.arg == PARAM_SAMPLE_UUIDS: + continue arg_info = { "name": arg.arg, "type": None, diff --git a/unilabos/utils/log.py b/unilabos/utils/log.py index cee3269b..be5d8c31 100644 --- a/unilabos/utils/log.py +++ b/unilabos/utils/log.py @@ -193,6 +193,7 @@ def configure_logger(loglevel=None, working_dir=None): root_logger.addHandler(console_handler) # 如果指定了工作目录,添加文件处理器 + log_filepath = None if working_dir is not None: logs_dir = os.path.join(working_dir, "logs") os.makedirs(logs_dir, exist_ok=True) @@ -213,6 +214,7 @@ def configure_logger(loglevel=None, working_dir=None): logging.getLogger("asyncio").setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.INFO) + return log_filepath diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index f4c0ac81..3a1fee22 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -60,7 +60,11 @@ ==================== 连接关系图 ==================== 控制流 (ready 端口串联): - create_resource_1 -> create_resource_2 -> ... -> set_liquid_1 -> set_liquid_2 -> ... -> transfer_liquid_1 -> transfer_liquid_2 -> ... + - create_resource 之间: 无 ready 连接 + - set_liquid_from_plate 之间: 无 ready 连接 + - create_resource 与 set_liquid_from_plate 之间: 无 ready 连接 + - transfer_liquid 之间: 通过 ready 端口串联 + transfer_liquid_1 -> transfer_liquid_2 -> transfer_liquid_3 -> ... 物料流: [create_resource] --labware--> [set_liquid_from_plate] --output_wells--> [transfer_liquid] --sources_out/targets_out--> [下一个 transfer_liquid] @@ -358,14 +362,16 @@ def build_protocol_graph( protocol_steps: List[Dict[str, Any]], workstation_name: str, action_resource_mapping: Optional[Dict[str, str]] = None, + labware_defs: Optional[List[Dict[str, Any]]] = None, ) -> WorkflowGraph: """统一的协议图构建函数,根据设备类型自动选择构建逻辑 Args: - labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...} + labware_info: reagent 信息字典,格式为 {name: {slot, well}, ...},用于 set_liquid 和 well 查找 protocol_steps: 协议步骤列表 workstation_name: 工作站名称 action_resource_mapping: action 到 resource_name 的映射字典,可选 + labware_defs: labware 定义列表,格式为 [{"name": "...", "slot": "1", "type": "lab_xxx"}, ...] """ G = WorkflowGraph() resource_last_writer = {} # reagent_name -> "node_id:port" @@ -373,18 +379,7 @@ def build_protocol_graph( protocol_steps = refactor_data(protocol_steps, action_resource_mapping) - # ==================== 第一步:按 slot 去重创建 create_resource 节点 ==================== - # 收集所有唯一的 slot - slots_info = {} # slot -> {labware, res_id} - for labware_id, item in labware_info.items(): - slot = str(item.get("slot", "")) - if slot and slot not in slots_info: - res_id = f"plate_slot_{slot}" - slots_info[slot] = { - "labware": item.get("labware", ""), - "res_id": res_id, - } - + # ==================== 第一步:按 slot 创建 create_resource 节点 ==================== # 创建 Group 节点,包含所有 create_resource 节点 group_node_id = str(uuid.uuid4()) G.add_node( @@ -400,30 +395,35 @@ def build_protocol_graph( param=None, ) - # 为每个唯一的 slot 创建 create_resource 节点 + # 直接使用 JSON 中的 labware 定义,每个 slot 一条记录,type 即 class_name res_index = 0 - last_create_resource_id = None - for slot, info in slots_info.items(): - node_id = str(uuid.uuid4()) - res_id = info["res_id"] + for lw in (labware_defs or []): + slot = str(lw.get("slot", "")) + if not slot or slot in slot_to_create_resource: + continue # 跳过空 slot 或已处理的 slot + + lw_name = lw.get("name", f"slot {slot}") + lw_type = lw.get("type", CREATE_RESOURCE_DEFAULTS["class_name"]) + res_id = f"plate_slot_{slot}" res_index += 1 + node_id = str(uuid.uuid4()) G.add_node( node_id, template_name="create_resource", resource_name="host_node", - name=f"Plate {res_index}", - description=f"Create plate on slot {slot}", + name=lw_name, + description=f"Create {lw_name}", lab_node_type="Labware", footer="create_resource-host_node", device_name=DEVICE_NAME_HOST, type=NODE_TYPE_DEFAULT, - parent_uuid=group_node_id, # 指向 Group 节点 - minimized=True, # 折叠显示 + parent_uuid=group_node_id, + minimized=True, param={ "res_id": res_id, "device_id": CREATE_RESOURCE_DEFAULTS["device_id"], - "class_name": CREATE_RESOURCE_DEFAULTS["class_name"], + "class_name": lw_type, "parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot), "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, "slot_on_deck": slot, @@ -431,11 +431,6 @@ def build_protocol_graph( ) slot_to_create_resource[slot] = node_id - # create_resource 之间通过 ready 串联 - if last_create_resource_id is not None: - G.add_edge(last_create_resource_id, node_id, source_port="ready", target_port="ready") - last_create_resource_id = node_id - # ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ==================== # 创建 Group 节点,包含所有 set_liquid_from_plate 节点 set_liquid_group_id = str(uuid.uuid4()) @@ -453,7 +448,6 @@ def build_protocol_graph( ) set_liquid_index = 0 - last_set_liquid_id = last_create_resource_id # set_liquid_from_plate 连接在 create_resource 之后 for labware_id, item in labware_info.items(): # 跳过 Tip/Rack 类型 @@ -494,10 +488,7 @@ def build_protocol_graph( }, ) - # ready 连接:上一个节点 -> set_liquid_from_plate - if last_set_liquid_id is not None: - G.add_edge(last_set_liquid_id, node_id, source_port="ready", target_port="ready") - last_set_liquid_id = node_id + # set_liquid_from_plate 之间不需要 ready 连接 # 物料流:create_resource 的 labware -> set_liquid_from_plate 的 input_plate create_res_node_id = slot_to_create_resource.get(slot) @@ -507,7 +498,8 @@ def build_protocol_graph( # set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid resource_last_writer[labware_id] = f"{node_id}:output_wells" - last_control_node_id = last_set_liquid_id + # transfer_liquid 之间通过 ready 串联,从 None 开始 + last_control_node_id = None # 端口名称映射:JSON 字段名 -> 实际 handle key INPUT_PORT_MAPPING = { diff --git a/unilabos/workflow/convert_from_json.py b/unilabos/workflow/convert_from_json.py index ff749d72..acd0d71a 100644 --- a/unilabos/workflow/convert_from_json.py +++ b/unilabos/workflow/convert_from_json.py @@ -1,16 +1,20 @@ """ JSON 工作流转换模块 -将 workflow/reagent 格式的 JSON 转换为统一工作流格式。 +将 workflow/reagent/labware 格式的 JSON 转换为统一工作流格式。 输入格式: { + "labware": [ + {"name": "...", "slot": "1", "type": "lab_xxx"}, + ... + ], "workflow": [ {"action": "...", "action_args": {...}}, ... ], "reagent": { - "reagent_name": {"slot": int, "well": [...], "labware": "..."}, + "reagent_name": {"slot": int, "well": [...]}, ... } } @@ -245,18 +249,18 @@ def convert_from_json( if "workflow" not in json_data or "reagent" not in json_data: raise ValueError( "不支持的 JSON 格式。请使用标准格式:\n" - '{"workflow": [{"action": "...", "action_args": {...}}, ...], ' - '"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}' + '{"labware": [...], "workflow": [...], "reagent": {...}}' ) # 提取数据 workflow = json_data["workflow"] reagent = json_data["reagent"] + labware_defs = json_data.get("labware", []) # 新的 labware 定义列表 # 规范化步骤数据 protocol_steps = normalize_workflow_steps(workflow) - # reagent 已经是字典格式,直接使用 + # reagent 已经是字典格式,用于 set_liquid 和 well 数量查找 labware_info = reagent # 构建工作流图 @@ -265,6 +269,7 @@ def convert_from_json( protocol_steps=protocol_steps, workstation_name=workstation_name, action_resource_mapping=ACTION_RESOURCE_MAPPING, + labware_defs=labware_defs, ) # 校验句柄配置 diff --git a/unilabos/workflow/wf_utils.py b/unilabos/workflow/wf_utils.py index 46451281..6332f1d5 100644 --- a/unilabos/workflow/wf_utils.py +++ b/unilabos/workflow/wf_utils.py @@ -41,6 +41,7 @@ def upload_workflow( workflow_name: Optional[str] = None, tags: Optional[List[str]] = None, published: bool = False, + description: str = "", ) -> Dict[str, Any]: """ 上传工作流到服务器 @@ -56,6 +57,7 @@ def upload_workflow( workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名 tags: 工作流标签列表,默认为空列表 published: 是否发布工作流,默认为False + description: 工作流描述,发布时使用 Returns: Dict: API响应数据 @@ -75,6 +77,14 @@ def upload_workflow( print_status(f"工作流文件JSON解析失败: {e}", "error") return {"code": -1, "message": f"JSON解析失败: {e}"} + # 从 JSON 文件中提取 description 和 tags(作为 fallback) + if not description and "description" in workflow_data: + description = workflow_data["description"] + print_status(f"从文件中读取 description", "info") + if not tags and "tags" in workflow_data: + tags = workflow_data["tags"] + print_status(f"从文件中读取 tags: {tags}", "info") + # 自动检测并转换格式 if not _is_node_link_format(workflow_data): try: @@ -96,6 +106,7 @@ def upload_workflow( print_status(f" - 节点数量: {len(nodes)}", "info") print_status(f" - 边数量: {len(edges)}", "info") print_status(f" - 标签: {tags or []}", "info") + print_status(f" - 描述: {description[:50]}{'...' if len(description) > 50 else ''}", "info") print_status(f" - 发布状态: {published}", "info") # 调用 http_client 上传 @@ -107,6 +118,7 @@ def upload_workflow( edges=edges, tags=tags, published=published, + description=description, ) if result.get("code") == 0: @@ -131,8 +143,9 @@ def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None: workflow_name = args_dict.get("workflow_name") tags = args_dict.get("tags", []) published = args_dict.get("published", False) + description = args_dict.get("description", "") if workflow_file: - upload_workflow(workflow_file, workflow_name, tags, published) + upload_workflow(workflow_file, workflow_name, tags, published, description) else: print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error") diff --git a/unilabos_msgs/package.xml b/unilabos_msgs/package.xml index 68ad1328..6957f7bf 100644 --- a/unilabos_msgs/package.xml +++ b/unilabos_msgs/package.xml @@ -2,7 +2,7 @@ unilabos_msgs - 0.10.17 + 0.10.18 ROS2 Messages package for unilabos devices Junhan Chang Xuwznln