From 16ee3de086a9d76f6b53090f925fc7d580370cb7 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Mon, 8 Dec 2025 19:12:05 +0800 Subject: [PATCH] Add workflow upload func. --- {unilabos/test => test}/resources/__init__.py | 0 .../bioyond_materials_liquidhandling_1.json | 0 .../bioyond_materials_liquidhandling_2.json | 0 .../resources/bioyond_materials_reaction.json | 0 .../resources/test_bottle_carrier.py | 0 .../resources/test_converter_bioyond.py | 0 .../resources/test_itemized_carrier.py | 0 .../resources/test_resourcetreeset.py | 0 {unilabos/test => test}/ros/__init__.py | 0 {unilabos/test => test}/ros/msgs/__init__.py | 0 .../test => test}/ros/msgs/test_basic.py | 0 .../test => test}/ros/msgs/test_conversion.py | 0 .../test => test}/ros/msgs/test_mapping.py | 0 .../test => test}/ros/msgs/test_runner.py | 0 {unilabos/test => test}/workflow/__init__.py | 0 .../test => test}/workflow/example_bio.json | 0 .../workflow/example_bio_graph.png | Bin .../test => test}/workflow/example_prcxi.json | 0 .../workflow/example_prcxi_graph.png | Bin .../example_prcxi_graph_20251022_1359.png | Bin test/workflow/merge_workflow.py | 35 ++ unilabos/app/main.py | 100 +++-- unilabos/app/web/client.py | 64 +++- unilabos/config/config.py | 6 +- unilabos/registry/devices/liquid_handler.yaml | 29 +- unilabos/ros/nodes/resource_tracker.py | 4 +- unilabos/test/workflow/merge_workflow.py | 94 ----- unilabos/workflow/__init__.py | 0 unilabos/workflow/common.py | 183 ++++++--- unilabos/workflow/convert_from_json.py | 356 ++++++++++++++++++ unilabos/workflow/from_labwares_and_steps.py | 24 -- unilabos/workflow/wf_utils.py | 138 +++++++ 32 files changed, 811 insertions(+), 222 deletions(-) rename {unilabos/test => test}/resources/__init__.py (100%) rename {unilabos/test => test}/resources/bioyond_materials_liquidhandling_1.json (100%) rename {unilabos/test => test}/resources/bioyond_materials_liquidhandling_2.json (100%) rename {unilabos/test => test}/resources/bioyond_materials_reaction.json (100%) rename {unilabos/test => test}/resources/test_bottle_carrier.py (100%) rename {unilabos/test => test}/resources/test_converter_bioyond.py (100%) rename {unilabos/test => test}/resources/test_itemized_carrier.py (100%) rename {unilabos/test => test}/resources/test_resourcetreeset.py (100%) rename {unilabos/test => test}/ros/__init__.py (100%) rename {unilabos/test => test}/ros/msgs/__init__.py (100%) rename {unilabos/test => test}/ros/msgs/test_basic.py (100%) rename {unilabos/test => test}/ros/msgs/test_conversion.py (100%) rename {unilabos/test => test}/ros/msgs/test_mapping.py (100%) rename {unilabos/test => test}/ros/msgs/test_runner.py (100%) rename {unilabos/test => test}/workflow/__init__.py (100%) rename {unilabos/test => test}/workflow/example_bio.json (100%) rename {unilabos/test => test}/workflow/example_bio_graph.png (100%) rename {unilabos/test => test}/workflow/example_prcxi.json (100%) rename {unilabos/test => test}/workflow/example_prcxi_graph.png (100%) rename {unilabos/test => test}/workflow/example_prcxi_graph_20251022_1359.png (100%) create mode 100644 test/workflow/merge_workflow.py delete mode 100644 unilabos/test/workflow/merge_workflow.py create mode 100644 unilabos/workflow/__init__.py create mode 100644 unilabos/workflow/convert_from_json.py delete mode 100644 unilabos/workflow/from_labwares_and_steps.py create mode 100644 unilabos/workflow/wf_utils.py diff --git a/unilabos/test/resources/__init__.py b/test/resources/__init__.py similarity index 100% rename from unilabos/test/resources/__init__.py rename to test/resources/__init__.py diff --git a/unilabos/test/resources/bioyond_materials_liquidhandling_1.json b/test/resources/bioyond_materials_liquidhandling_1.json similarity index 100% rename from unilabos/test/resources/bioyond_materials_liquidhandling_1.json rename to test/resources/bioyond_materials_liquidhandling_1.json diff --git a/unilabos/test/resources/bioyond_materials_liquidhandling_2.json b/test/resources/bioyond_materials_liquidhandling_2.json similarity index 100% rename from unilabos/test/resources/bioyond_materials_liquidhandling_2.json rename to test/resources/bioyond_materials_liquidhandling_2.json diff --git a/unilabos/test/resources/bioyond_materials_reaction.json b/test/resources/bioyond_materials_reaction.json similarity index 100% rename from unilabos/test/resources/bioyond_materials_reaction.json rename to test/resources/bioyond_materials_reaction.json diff --git a/unilabos/test/resources/test_bottle_carrier.py b/test/resources/test_bottle_carrier.py similarity index 100% rename from unilabos/test/resources/test_bottle_carrier.py rename to test/resources/test_bottle_carrier.py diff --git a/unilabos/test/resources/test_converter_bioyond.py b/test/resources/test_converter_bioyond.py similarity index 100% rename from unilabos/test/resources/test_converter_bioyond.py rename to test/resources/test_converter_bioyond.py diff --git a/unilabos/test/resources/test_itemized_carrier.py b/test/resources/test_itemized_carrier.py similarity index 100% rename from unilabos/test/resources/test_itemized_carrier.py rename to test/resources/test_itemized_carrier.py diff --git a/unilabos/test/resources/test_resourcetreeset.py b/test/resources/test_resourcetreeset.py similarity index 100% rename from unilabos/test/resources/test_resourcetreeset.py rename to test/resources/test_resourcetreeset.py diff --git a/unilabos/test/ros/__init__.py b/test/ros/__init__.py similarity index 100% rename from unilabos/test/ros/__init__.py rename to test/ros/__init__.py diff --git a/unilabos/test/ros/msgs/__init__.py b/test/ros/msgs/__init__.py similarity index 100% rename from unilabos/test/ros/msgs/__init__.py rename to test/ros/msgs/__init__.py diff --git a/unilabos/test/ros/msgs/test_basic.py b/test/ros/msgs/test_basic.py similarity index 100% rename from unilabos/test/ros/msgs/test_basic.py rename to test/ros/msgs/test_basic.py diff --git a/unilabos/test/ros/msgs/test_conversion.py b/test/ros/msgs/test_conversion.py similarity index 100% rename from unilabos/test/ros/msgs/test_conversion.py rename to test/ros/msgs/test_conversion.py diff --git a/unilabos/test/ros/msgs/test_mapping.py b/test/ros/msgs/test_mapping.py similarity index 100% rename from unilabos/test/ros/msgs/test_mapping.py rename to test/ros/msgs/test_mapping.py diff --git a/unilabos/test/ros/msgs/test_runner.py b/test/ros/msgs/test_runner.py similarity index 100% rename from unilabos/test/ros/msgs/test_runner.py rename to test/ros/msgs/test_runner.py diff --git a/unilabos/test/workflow/__init__.py b/test/workflow/__init__.py similarity index 100% rename from unilabos/test/workflow/__init__.py rename to test/workflow/__init__.py diff --git a/unilabos/test/workflow/example_bio.json b/test/workflow/example_bio.json similarity index 100% rename from unilabos/test/workflow/example_bio.json rename to test/workflow/example_bio.json diff --git a/unilabos/test/workflow/example_bio_graph.png b/test/workflow/example_bio_graph.png similarity index 100% rename from unilabos/test/workflow/example_bio_graph.png rename to test/workflow/example_bio_graph.png diff --git a/unilabos/test/workflow/example_prcxi.json b/test/workflow/example_prcxi.json similarity index 100% rename from unilabos/test/workflow/example_prcxi.json rename to test/workflow/example_prcxi.json diff --git a/unilabos/test/workflow/example_prcxi_graph.png b/test/workflow/example_prcxi_graph.png similarity index 100% rename from unilabos/test/workflow/example_prcxi_graph.png rename to test/workflow/example_prcxi_graph.png diff --git a/unilabos/test/workflow/example_prcxi_graph_20251022_1359.png b/test/workflow/example_prcxi_graph_20251022_1359.png similarity index 100% rename from unilabos/test/workflow/example_prcxi_graph_20251022_1359.png rename to test/workflow/example_prcxi_graph_20251022_1359.png diff --git a/test/workflow/merge_workflow.py b/test/workflow/merge_workflow.py new file mode 100644 index 00000000..2801a747 --- /dev/null +++ b/test/workflow/merge_workflow.py @@ -0,0 +1,35 @@ +import sys +from datetime import datetime +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parents[2] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +import pytest + +from unilabos.workflow.convert_from_json import ( + convert_from_json, + normalize_steps as _normalize_steps, + normalize_labware as _normalize_labware, +) +from unilabos.workflow.common import draw_protocol_graph_with_ports + + +@pytest.mark.parametrize( + "protocol_name", + [ + "example_bio", + # "bioyond_materials_liquidhandling_1", + "example_prcxi", + ], +) +def test_build_protocol_graph(protocol_name): + data_path = Path(__file__).with_name(f"{protocol_name}.json") + + graph = convert_from_json(data_path, workstation_name="PRCXi") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M") + output_path = data_path.with_name(f"{protocol_name}_graph_{timestamp}.png") + draw_protocol_graph_with_ports(graph, str(output_path)) + print(graph) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 08595de5..ffef4a52 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -20,6 +20,7 @@ if unilabos_dir not in sys.path: from unilabos.utils.banner_print import print_status, print_unilab_banner from unilabos.config.config import load_config, BasicConfig, HTTPConfig + def load_config_from_file(config_path): if config_path is None: config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH", None) @@ -41,7 +42,7 @@ def convert_argv_dashes_to_underscores(args: argparse.ArgumentParser): for i, arg in enumerate(sys.argv): for option_string in option_strings: if arg.startswith(option_string): - new_arg = arg[:2] + arg[2:len(option_string)].replace("-", "_") + arg[len(option_string):] + new_arg = arg[:2] + arg[2 : len(option_string)].replace("-", "_") + arg[len(option_string) :] sys.argv[i] = new_arg break @@ -155,14 +156,39 @@ def parse_args(): default=False, help="Complete registry information", ) - - # label + # workflow upload subcommand workflow_parser = subparsers.add_parser( "workflow_upload", + aliases=["wf"], help="Upload workflow from xdl/json/python files", ) - workflow_parser.add_argument("-t", "--labeltype", default="singlepoint", type=str, - help="QM calculation type, support 'singlepoint', 'optimize' and 'dimer' currently") + workflow_parser.add_argument( + "-f", + "--workflow_file", + type=str, + required=True, + help="Path to the workflow file (JSON format)", + ) + workflow_parser.add_argument( + "-n", + "--workflow_name", + type=str, + default=None, + help="Workflow name, if not provided will use the name from file or filename", + ) + workflow_parser.add_argument( + "--tags", + type=str, + nargs="*", + default=[], + help="Tags for the workflow (space-separated)", + ) + workflow_parser.add_argument( + "--published", + action="store_true", + default=False, + help="Whether to publish the workflow (default: False)", + ) return parser @@ -173,9 +199,6 @@ def main(): convert_argv_dashes_to_underscores(args) args_dict = vars(args.parse_args()) - # 显示启动横幅 - print_unilab_banner(args_dict) - # 环境检查 - 检查并自动安装必需的包 (可选) if not args_dict.get("skip_env_check", False): from unilabos.utils.environment_check import check_environment @@ -254,18 +277,10 @@ def main(): print_status("传入了sk参数,优先采用传入参数!", "info") BasicConfig.working_dir = working_dir - # 显示启动横幅 - print_unilab_banner(args_dict) + workflow_upload = args_dict.get("command") in ("workflow_upload", "wf") - ##################################### - ######## 启动设备接入端(主入口) ######## - ##################################### - launch(args_dict) - - -def launch(args_dict: Dict[str, Any]): # 使用远程资源启动 - if args_dict["use_remote_resource"]: + if not workflow_upload and args_dict["use_remote_resource"]: print_status("使用远程资源启动", "info") from unilabos.app.web import http_client @@ -301,11 +316,36 @@ def launch(args_dict: Dict[str, Any]): from unilabos.resources.graphio import modify_to_backend_format from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDict + # 显示启动横幅 + print_unilab_banner(args_dict) + # 注册表 lab_registry = build_registry( - args_dict["registry_path"], args_dict.get("complete_registry", False), args_dict["upload_registry"] + args_dict["registry_path"], args_dict.get("complete_registry", False), BasicConfig.upload_registry ) + if BasicConfig.upload_registry: + # 设备注册到服务端 - 需要 ak 和 sk + if BasicConfig.ak and BasicConfig.sk: + print_status("开始注册设备到服务端...", "info") + try: + register_devices_and_resources(lab_registry) + print_status("设备注册完成", "info") + except Exception as e: + print_status(f"设备注册失败: {e}", "error") + else: + print_status("未提供 ak 和 sk,跳过设备注册", "info") + else: + print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning") + + # 处理 workflow_upload 子命令 + if workflow_upload: + from unilabos.workflow.wf_utils import handle_workflow_upload_command + + handle_workflow_upload_command(args_dict) + print_status("工作流上传完成,程序退出", "info") + os._exit(0) + if not BasicConfig.ak or not BasicConfig.sk: print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning") os._exit(1) @@ -382,20 +422,6 @@ def launch(args_dict: Dict[str, Any]): args_dict["devices_config"] = resource_tree_set args_dict["graph"] = graph_res.physical_setup_graph - if BasicConfig.upload_registry: - # 设备注册到服务端 - 需要 ak 和 sk - if BasicConfig.ak and BasicConfig.sk: - print_status("开始注册设备到服务端...", "info") - try: - register_devices_and_resources(lab_registry) - print_status("设备注册完成", "info") - except Exception as e: - print_status(f"设备注册失败: {e}", "error") - else: - print_status("未提供 ak 和 sk,跳过设备注册", "info") - else: - print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning") - if args_dict["controllers"] is not None: args_dict["controllers_config"] = yaml.safe_load(open(args_dict["controllers"], encoding="utf-8")) else: @@ -410,6 +436,7 @@ def launch(args_dict: Dict[str, Any]): comm_client = get_communication_client() if "websocket" in args_dict["app_bridges"]: args_dict["bridges"].append(comm_client) + def _exit(signum, frame): comm_client.stop() sys.exit(0) @@ -451,16 +478,13 @@ def launch(args_dict: Dict[str, Any]): resource_visualization.start() except OSError as e: if "AMENT_PREFIX_PATH" in str(e): - print_status( - f"ROS 2环境未正确设置,跳过3D可视化启动。错误详情: {e}", - "warning" - ) + print_status(f"ROS 2环境未正确设置,跳过3D可视化启动。错误详情: {e}", "warning") print_status( "建议解决方案:\n" "1. 激活Conda环境: conda activate unilab\n" "2. 或使用 --backend simple 参数\n" "3. 或使用 --visual disable 参数禁用可视化", - "info" + "info", ) else: raise diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 72c079a1..1f40a0b8 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -76,7 +76,8 @@ class HTTPClient: Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid} """ with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f: - f.write(json.dumps({"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, indent=4)) + payload = {"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid} + f.write(json.dumps(payload, indent=4)) # 从序列化数据中提取所有节点的UUID(保存旧UUID) old_uuids = {n.res_content.uuid: n for n in resources.all_nodes} if not self.initialized or first_add: @@ -331,6 +332,67 @@ class HTTPClient: logger.error(f"响应内容: {response.text}") return None + def workflow_import( + self, + name: str, + workflow_uuid: str, + workflow_name: str, + nodes: List[Dict[str, Any]], + edges: List[Dict[str, Any]], + tags: Optional[List[str]] = None, + published: bool = False, + ) -> Dict[str, Any]: + """ + 导入工作流到服务器 + + Args: + name: 工作流名称(顶层) + workflow_uuid: 工作流UUID + workflow_name: 工作流名称(data内部) + nodes: 工作流节点列表 + edges: 工作流边列表 + tags: 工作流标签列表,默认为空列表 + published: 是否发布工作流,默认为False + + Returns: + Dict: API响应数据,包含 code 和 data (uuid, name) + """ + # target_lab_uuid 暂时使用默认值,后续由后端根据 ak/sk 获取 + payload = { + "target_lab_uuid": "28c38bb0-63f6-4352-b0d8-b5b8eb1766d5", + "name": name, + "data": { + "workflow_uuid": workflow_uuid, + "workflow_name": workflow_name, + "nodes": nodes, + "edges": edges, + "tags": tags if tags is not None else [], + "published": published, + }, + } + # 保存请求到文件 + with open(os.path.join(BasicConfig.working_dir, "req_workflow_upload.json"), "w", encoding="utf-8") as f: + f.write(json.dumps(payload, indent=4, ensure_ascii=False)) + + response = requests.post( + f"{self.remote_addr}/lab/workflow/owner/import", + json=payload, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=60, + ) + # 保存响应到文件 + with open(os.path.join(BasicConfig.working_dir, "res_workflow_upload.json"), "w", encoding="utf-8") as f: + f.write(f"{response.status_code}" + "\n" + response.text) + + if response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"导入工作流失败: {response.text}") + return res + else: + logger.error(f"导入工作流失败: {response.status_code}, {response.text}") + return {"code": response.status_code, "message": response.text} + # 创建默认客户端实例 http_client = HTTPClient() diff --git a/unilabos/config/config.py b/unilabos/config/config.py index c13064ef..223d12c6 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -21,7 +21,8 @@ class BasicConfig: startup_json_path = None # 填写绝对路径 disable_browser = False # 禁止浏览器自动打开 port = 8002 # 本地HTTP服务 - log_level: Literal['TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = "DEBUG" # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' + # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' + log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG" @classmethod def auth_secret(cls): @@ -65,13 +66,14 @@ def _update_config_from_module(module): if not attr.startswith("_"): setattr(obj, attr, getattr(getattr(module, name), attr)) + def _update_config_from_env(): prefix = "UNILABOS_" for env_key, env_value in os.environ.items(): if not env_key.startswith(prefix): continue try: - key_path = env_key[len(prefix):] # Remove UNILAB_ prefix + key_path = env_key[len(prefix) :] # Remove UNILAB_ prefix class_field = key_path.upper().split("_", 1) if len(class_field) != 2: logger.warning(f"[ENV] 环境变量格式不正确:{env_key}") diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index d38c43a3..fdfb6b5c 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -9333,7 +9333,34 @@ liquid_handler.prcxi: touch_tip: false use_channels: - 0 - handles: {} + handles: + input: + - data_key: liquid + data_source: handle + data_type: resource + handler_key: sources + label: sources + - data_key: liquid + data_source: executor + data_type: resource + handler_key: targets + label: targets + - data_key: liquid + data_source: executor + data_type: resource + handler_key: tip_rack + label: tip_rack + output: + - data_key: liquid + data_source: handle + data_type: resource + handler_key: sources_out + label: sources + - data_key: liquid + data_source: executor + data_type: resource + handler_key: targets_out + label: targets placeholder_keys: sources: unilabos_resources targets: unilabos_resources diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 0eed1172..849d64a8 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -66,8 +66,8 @@ class ResourceDict(BaseModel): klass: str = Field(alias="class", description="Resource class name") pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) config: Dict[str, Any] = Field(description="Resource configuration") - data: Dict[str, Any] = Field(description="Resource data") - extra: Dict[str, Any] = Field(description="Extra data") + data: Dict[str, Any] = Field(description="Resource data, eg: container liquid data") + extra: Dict[str, Any] = Field(description="Extra data, eg: slot index") @field_serializer("parent_uuid") def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]): diff --git a/unilabos/test/workflow/merge_workflow.py b/unilabos/test/workflow/merge_workflow.py deleted file mode 100644 index 3d3c6586..00000000 --- a/unilabos/test/workflow/merge_workflow.py +++ /dev/null @@ -1,94 +0,0 @@ -import json -import sys -from datetime import datetime -from pathlib import Path - -ROOT_DIR = Path(__file__).resolve().parents[2] -if str(ROOT_DIR) not in sys.path: - sys.path.insert(0, str(ROOT_DIR)) - -import pytest - -from unilabos.workflow.common import build_protocol_graph, draw_protocol_graph, draw_protocol_graph_with_ports - - -ROOT_DIR = Path(__file__).resolve().parents[2] -if str(ROOT_DIR) not in sys.path: - sys.path.insert(0, str(ROOT_DIR)) - - -def _normalize_steps(data): - normalized = [] - for step in data: - action = step.get("action") or step.get("operation") - if not action: - continue - raw_params = step.get("parameters") or step.get("action_args") or {} - params = dict(raw_params) - - if "source" in raw_params and "sources" not in raw_params: - params["sources"] = raw_params["source"] - if "target" in raw_params and "targets" not in raw_params: - params["targets"] = raw_params["target"] - - description = step.get("description") or step.get("purpose") - step_dict = {"action": action, "parameters": params} - if description: - step_dict["description"] = description - normalized.append(step_dict) - return normalized - - -def _normalize_labware(data): - labware = {} - for item in data: - reagent_name = item.get("reagent_name") - key = reagent_name or item.get("material_name") or item.get("name") - if not key: - continue - key = str(key) - idx = 1 - original_key = key - while key in labware: - idx += 1 - key = f"{original_key}_{idx}" - - labware[key] = { - "slot": item.get("positions") or item.get("slot"), - "labware": item.get("material_name") or item.get("labware"), - "well": item.get("well", []), - "type": item.get("type", "reagent"), - "role": item.get("role", ""), - "name": key, - } - return labware - - -@pytest.mark.parametrize("protocol_name", [ - "example_bio", - # "bioyond_materials_liquidhandling_1", - "example_prcxi", -]) -def test_build_protocol_graph(protocol_name): - data_path = Path(__file__).with_name(f"{protocol_name}.json") - with data_path.open("r", encoding="utf-8") as fp: - d = json.load(fp) - - if "workflow" in d and "reagent" in d: - protocol_steps = d["workflow"] - labware_info = d["reagent"] - elif "steps_info" in d and "labware_info" in d: - protocol_steps = _normalize_steps(d["steps_info"]) - labware_info = _normalize_labware(d["labware_info"]) - else: - raise ValueError("Unsupported protocol format") - - graph = build_protocol_graph( - labware_info=labware_info, - protocol_steps=protocol_steps, - workstation_name="PRCXi", - ) - timestamp = datetime.now().strftime("%Y%m%d_%H%M") - output_path = data_path.with_name(f"{protocol_name}_graph_{timestamp}.png") - draw_protocol_graph_with_ports(graph, str(output_path)) - print(graph) \ No newline at end of file diff --git a/unilabos/workflow/__init__.py b/unilabos/workflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index b884a706..9bff0494 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -10,6 +10,7 @@ Json = Dict[str, Any] # ---------------- Graph ---------------- + class WorkflowGraph: """简单的有向图实现:使用 params 单层参数;inputs 内含连线;支持 node-link 导出""" @@ -21,20 +22,31 @@ class WorkflowGraph: self.nodes[node_id] = attrs def add_edge(self, source: str, target: str, **attrs): + # 将 source_port/target_port 映射为服务端期望的 source_handle_key/target_handle_key + source_handle_key = attrs.pop("source_port", "") or attrs.pop("source_handle_key", "") + target_handle_key = attrs.pop("target_port", "") or attrs.pop("target_handle_key", "") + edge = { "source": source, "target": target, "source_node_uuid": source, "target_node_uuid": target, + "source_handle_key": source_handle_key, "source_handle_io": attrs.pop("source_handle_io", "source"), + "target_handle_key": target_handle_key, "target_handle_io": attrs.pop("target_handle_io", "target"), - **attrs + **attrs, } self.edges.append(edge) - def _materialize_wiring_into_inputs(self, obj: Any, inputs: Dict[str, Any], - variable_sources: Dict[str, Dict[str, Any]], - target_node_id: str, base_path: List[str]): + def _materialize_wiring_into_inputs( + self, + obj: Any, + inputs: Dict[str, Any], + variable_sources: Dict[str, Dict[str, Any]], + target_node_id: str, + base_path: List[str], + ): has_var = False def walk(node: Any, path: List[str]): @@ -48,9 +60,12 @@ class WorkflowGraph: if src: key = ".".join(path) # e.g. "params.foo.bar.0" inputs[key] = {"node": src["node_id"], "output": src.get("output_name", "result")} - self.add_edge(str(src["node_id"]), target_node_id, - source_handle_io=src.get("output_name", "result"), - target_handle_io=key) + self.add_edge( + str(src["node_id"]), + target_node_id, + source_handle_io=src.get("output_name", "result"), + target_handle_io=key, + ) return placeholder return {k: walk(v, path + [k]) for k, v in node.items()} if isinstance(node, list): @@ -60,18 +75,20 @@ class WorkflowGraph: replaced = walk(obj, base_path[:]) return replaced, has_var - def add_workflow_node(self, - node_id: int, - *, - device_key: Optional[str] = None, # 实例名,如 "ser" - resource_name: Optional[str] = None, # registry key(原 device_class) - module: Optional[str] = None, - template_name: Optional[str] = None, # 动作/模板名(原 action_key) - params: Dict[str, Any], - variable_sources: Dict[str, Dict[str, Any]], - add_ready_if_no_vars: bool = True, - prev_node_id: Optional[int] = None, - **extra_attrs) -> None: + def add_workflow_node( + self, + node_id: int, + *, + device_key: Optional[str] = None, # 实例名,如 "ser" + resource_name: Optional[str] = None, # registry key(原 device_class) + module: Optional[str] = None, + template_name: Optional[str] = None, # 动作/模板名(原 action_key) + params: Dict[str, Any], + variable_sources: Dict[str, Dict[str, Any]], + add_ready_if_no_vars: bool = True, + prev_node_id: Optional[int] = None, + **extra_attrs, + ) -> None: """添加工作流节点:params 单层;自动变量连线与 ready 串联;支持附加属性""" node_id_str = str(node_id) inputs: Dict[str, Any] = {} @@ -87,9 +104,9 @@ class WorkflowGraph: node_obj = { "device_key": device_key, - "resource_name": resource_name, # ✅ 新名字 + "resource_name": resource_name, # ✅ 新名字 "module": module, - "template_name": template_name, # ✅ 新名字 + "template_name": template_name, # ✅ 新名字 "params": params, "inputs": inputs, } @@ -100,13 +117,13 @@ class WorkflowGraph: def to_dict(self) -> List[Dict[str, Any]]: result = [] for node_id, attrs in self.nodes.items(): - node = {"id": node_id} + node = {"uuid": node_id} params = dict(attrs.get("parameters", {}) or {}) flat = {k: v for k, v in attrs.items() if k != "parameters"} flat.update(params) node.update(flat) result.append(node) - return sorted(result, key=lambda n: int(n["id"]) if str(n["id"]).isdigit() else n["id"]) + return sorted(result, key=lambda n: int(n["uuid"]) if str(n["uuid"]).isdigit() else n["uuid"]) # node-link 导出(含 edges) def to_node_link_dict(self) -> Dict[str, Any]: @@ -115,12 +132,27 @@ class WorkflowGraph: node_attrs = attrs.copy() params = node_attrs.pop("parameters", {}) or {} node_attrs.update(params) - nodes_list.append({"id": node_id, **node_attrs}) - return {"directed": True, "multigraph": False, "graph": {}, "nodes": nodes_list, "edges": self.edges, "links": self.edges} + nodes_list.append({"uuid": node_id, **node_attrs}) + return { + "directed": True, + "multigraph": False, + "graph": {}, + "nodes": nodes_list, + "edges": self.edges, + "links": self.edges, + } -def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """统一的数据重构函数,根据操作类型自动选择模板""" +def refactor_data( + data: List[Dict[str, Any]], + action_resource_mapping: Optional[Dict[str, str]] = None, +) -> List[Dict[str, Any]]: + """统一的数据重构函数,根据操作类型自动选择模板 + + Args: + data: 原始步骤数据列表 + action_resource_mapping: action 到 resource_name 的映射字典,可选 + """ refactored_data = [] # 定义操作映射,包含生物实验和有机化学的所有操作 @@ -157,43 +189,67 @@ def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: times = step.get("times", step.get("parameters", {}).get("times", 1)) sub_steps = step.get("steps", step.get("parameters", {}).get("steps", [])) for i in range(int(times)): - sub_data = refactor_data(sub_steps) + sub_data = refactor_data(sub_steps, action_resource_mapping) refactored_data.extend(sub_data) continue # 获取模板名称 - template = OPERATION_MAPPING.get(operation) - if not template: + template_name = OPERATION_MAPPING.get(operation) + if not template_name: # 自动推断模板类型 if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]: - template = f"biomek-{operation}" + template_name = f"biomek-{operation}" else: - template = f"{operation}Protocol" + template_name = f"{operation}Protocol" + + # 获取 resource_name + resource_name = f"device.{operation.lower()}" + if action_resource_mapping: + resource_name = action_resource_mapping.get(operation, resource_name) + + # 获取步骤编号,生成 name 字段 + step_number = step.get("step_number") + name = f"Step {step_number}" if step_number is not None else None # 创建步骤数据 step_data = { - "template": template, + "template_name": template_name, + "resource_name": resource_name, "description": step.get("description", step.get("purpose", f"{operation} operation")), "lab_node_type": "Device", - "parameters": step.get("parameters", step.get("action_args", {})), + "param": step.get("parameters", step.get("action_args", {})), + "footer": f"{template_name}-{resource_name}", } + if name: + step_data["name"] = name refactored_data.append(step_data) return refactored_data def build_protocol_graph( - labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str + labware_info: List[Dict[str, Any]], + protocol_steps: List[Dict[str, Any]], + workstation_name: str, + action_resource_mapping: Optional[Dict[str, str]] = None, ) -> WorkflowGraph: - """统一的协议图构建函数,根据设备类型自动选择构建逻辑""" + """统一的协议图构建函数,根据设备类型自动选择构建逻辑 + + Args: + labware_info: labware 信息字典 + protocol_steps: 协议步骤列表 + workstation_name: 工作站名称 + action_resource_mapping: action 到 resource_name 的映射字典,可选 + """ G = WorkflowGraph() resource_last_writer = {} - protocol_steps = refactor_data(protocol_steps) + protocol_steps = refactor_data(protocol_steps, action_resource_mapping) # 有机化学&移液站协议图构建 WORKSTATION_ID = workstation_name # 为所有labware创建资源节点 + res_index = 0 for labware_id, item in labware_info.items(): # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") node_id = str(uuid.uuid4()) @@ -217,13 +273,16 @@ def build_protocol_graph( liquid_type = [labware_id] liquid_volume = [1e5] + res_index += 1 G.add_node( node_id, - template_name=f"create_resource", + template_name="create_resource", resource_name="host_node", + name=f"Res {res_index}", description=description, lab_node_type=lab_node_type, - params={ + footer="create_resource-host_node", + param={ "res_id": labware_id, "device_id": WORKSTATION_ID, "class_name": "container", @@ -234,7 +293,6 @@ def build_protocol_graph( "liquid_volume": liquid_volume, "slot_on_deck": "", }, - role=item.get("role", ""), ) resource_last_writer[labware_id] = f"{node_id}:labware" @@ -251,7 +309,7 @@ def build_protocol_graph( last_control_node_id = node_id # 物料流 - params = step.get("parameters", {}) + params = step.get("param", {}) input_resources_possible_names = [ "vessel", "to_vessel", @@ -299,7 +357,7 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str): G = nx.DiGraph() for node_id, attrs in protocol_graph.nodes.items(): - label = attrs.get("description", attrs.get("template", node_id[:8])) + label = attrs.get("description", attrs.get("template_name", node_id[:8])) G.add_node(node_id, label=label, **attrs) for edge in protocol_graph.edges: @@ -331,11 +389,13 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str): print(f" - Visualization saved to '{output_path}'") -COMPASS = {"n","e","s","w","ne","nw","se","sw","c"} +COMPASS = {"n", "e", "s", "w", "ne", "nw", "se", "sw", "c"} + def _is_compass(port: str) -> bool: return isinstance(port, str) and port.lower() in COMPASS + def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"): """ 使用 Graphviz 端口语法绘制协议工作流图。 @@ -350,22 +410,22 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st # 1) 先用 networkx 搭建有向图,保留端口属性 G = nx.DiGraph() for node_id, attrs in protocol_graph.nodes.items(): - label = attrs.get("description", attrs.get("template", node_id[:8])) + label = attrs.get("description", attrs.get("template_name", node_id[:8])) # 保留一个干净的“中心标签”,用于放在 record 的中间槽 - G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)}) + G.add_node(node_id, _core_label=str(label), **{k: v for k, v in attrs.items() if k not in ("label",)}) edges_data = [] - in_ports_by_node = {} # 收集命名输入端口 + in_ports_by_node = {} # 收集命名输入端口 out_ports_by_node = {} # 收集命名输出端口 for edge in protocol_graph.edges: u = edge["source"] v = edge["target"] - sp = edge.get("source_port") - tp = edge.get("target_port") + sp = edge.get("source_handle_key") or edge.get("source_port") + tp = edge.get("target_handle_key") or edge.get("target_port") # 记录到图里(保留原始端口信息) - G.add_edge(u, v, source_port=sp, target_port=tp) + G.add_edge(u, v, source_handle_key=sp, target_handle_key=tp) edges_data.append((u, v, sp, tp)) # 如果不是 compass,就按“命名端口”先归类,等会儿给节点造 record @@ -377,7 +437,9 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st # 2) 转为 AGraph,使用 Graphviz 渲染 A = to_agraph(G) A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10") - A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica") + A.node_attr.update( + shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica" + ) A.edge_attr.update(arrowsize="0.8", color="#666666") # 3) 为需要命名端口的节点设置 record 形状与 label @@ -386,18 +448,19 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st node = A.get_node(n) core = G.nodes[n].get("_core_label", n) - in_ports = sorted(in_ports_by_node.get(n, [])) + in_ports = sorted(in_ports_by_node.get(n, [])) out_ports = sorted(out_ports_by_node.get(n, [])) # 如果该节点涉及命名端口,则用 record;否则保留原 box if in_ports or out_ports: + def port_fields(ports): if not ports: return " " # 必须留一个空槽占位 # 每个端口一个小格子,
name
return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports)
- left = port_fields(in_ports)
+ left = port_fields(in_ports)
right = port_fields(out_ports)
# 三栏:左(入) | 中(节点名) | 右(出)
@@ -410,7 +473,7 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 4) 给边设置 headport / tailport
# - 若端口为 compass:直接用 compass(e.g., headport="e")
# - 若端口为命名端口:使用在 record 中定义的