diff --git a/.github/workflows/multi-platform-build.yml b/.github/workflows/multi-platform-build.yml index 9d323abd..aa3666f5 100644 --- a/.github/workflows/multi-platform-build.yml +++ b/.github/workflows/multi-platform-build.yml @@ -105,6 +105,7 @@ jobs: with: miniforge-version: latest use-mamba: true + python-version: '3.11.14' channels: conda-forge,robostack-staging channel-priority: strict activate-environment: build-env @@ -114,20 +115,22 @@ jobs: - name: Install rattler-build and anaconda-client if: steps.should_build.outputs.should_build == 'true' run: | - mamba install --override-channels -c conda-forge rattler-build anaconda-client -y + mamba install -n build-env --override-channels -c conda-forge rattler-build anaconda-client -y - name: Show environment info if: steps.should_build.outputs.should_build == 'true' run: | conda info - conda list | grep -E "(rattler-build|anaconda-client)" + conda list -n build-env | grep -E "(rattler-build|anaconda-client)" + conda run -n build-env rattler-build --version + conda run -n build-env anaconda --version echo "Platform: ${{ matrix.platform }}" echo "OS: ${{ matrix.os }}" - name: Build conda package if: steps.should_build.outputs.should_build == 'true' run: | - rattler-build build -r ./recipes/msgs/recipe.yaml --target-platform ${{ matrix.platform }} -c robostack -c robostack-staging -c conda-forge + conda run -n build-env rattler-build build -r ./recipes/msgs/recipe.yaml --target-platform ${{ matrix.platform }} -c robostack -c robostack-staging -c conda-forge - name: List built packages if: steps.should_build.outputs.should_build == 'true' @@ -167,5 +170,5 @@ jobs: run: | for package in $(find ./output -name "*.conda"); do echo "Uploading $package to unilab organization..." - anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" + conda run -n build-env anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" done diff --git a/.github/workflows/unilabos-conda-build.yml b/.github/workflows/unilabos-conda-build.yml index b516dbf8..cd652c99 100644 --- a/.github/workflows/unilabos-conda-build.yml +++ b/.github/workflows/unilabos-conda-build.yml @@ -98,6 +98,7 @@ jobs: with: miniforge-version: latest use-mamba: true + python-version: '3.11.14' channels: conda-forge,robostack-staging,uni-lab channel-priority: strict activate-environment: build-env @@ -107,13 +108,15 @@ jobs: - name: Install rattler-build and anaconda-client if: steps.should_build.outputs.should_build == 'true' run: | - mamba install --override-channels -c conda-forge rattler-build anaconda-client -y + mamba install -n build-env --override-channels -c conda-forge rattler-build anaconda-client -y - name: Show environment info if: steps.should_build.outputs.should_build == 'true' run: | conda info - conda list | grep -E "(rattler-build|anaconda-client)" + conda list -n build-env | grep -E "(rattler-build|anaconda-client)" + conda run -n build-env rattler-build --version + conda run -n build-env anaconda --version echo "Platform: ${{ matrix.platform }}" echo "OS: ${{ matrix.os }}" echo "Build full package: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.build_full == 'true' }}" @@ -128,7 +131,7 @@ jobs: if: steps.should_build.outputs.should_build == 'true' run: | echo "Building unilabos-env (conda environment dependencies)..." - rattler-build build -r .conda/environment/recipe.yaml --target-platform ${{ matrix.platform }} -c uni-lab -c robostack-staging -c conda-forge + conda run -n build-env rattler-build build -r .conda/environment/recipe.yaml --target-platform ${{ matrix.platform }} -c uni-lab -c robostack-staging -c conda-forge - name: Upload unilabos-env to Anaconda.org (if enabled) if: | @@ -140,7 +143,7 @@ jobs: run: | echo "Uploading unilabos-env to uni-lab organization..." for package in $(find ./output -name "unilabos-env*.conda"); do - anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" + conda run -n build-env anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" done - name: Build unilabos (with pip package) @@ -148,7 +151,7 @@ jobs: run: | echo "Building unilabos package..." # 如果已上传到 Anaconda,从 uni-lab channel 获取 unilabos-env;否则从本地 output 获取 - rattler-build build -r .conda/base/recipe.yaml --target-platform ${{ matrix.platform }} -c uni-lab -c robostack-staging -c conda-forge --channel ./output + conda run -n build-env rattler-build build -r .conda/base/recipe.yaml --target-platform ${{ matrix.platform }} -c uni-lab -c robostack-staging -c conda-forge --channel ./output - name: Upload unilabos to Anaconda.org (if enabled) if: | @@ -160,7 +163,7 @@ jobs: run: | echo "Uploading unilabos to uni-lab organization..." for package in $(find ./output -name "unilabos-0*.conda" -o -name "unilabos-[0-9]*.conda"); do - anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" + conda run -n build-env anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" done - name: Build unilabos-full - Only when explicitly requested @@ -170,7 +173,7 @@ jobs: github.event.inputs.build_full == 'true' run: | echo "Building unilabos-full package on ${{ matrix.platform }}..." - rattler-build build -r .conda/full/recipe.yaml --target-platform ${{ matrix.platform }} -c uni-lab -c robostack-staging -c conda-forge --channel ./output + conda run -n build-env rattler-build build -r .conda/full/recipe.yaml --target-platform ${{ matrix.platform }} -c uni-lab -c robostack-staging -c conda-forge --channel ./output - name: Upload unilabos-full to Anaconda.org (if enabled) if: | @@ -181,7 +184,7 @@ jobs: run: | echo "Uploading unilabos-full to uni-lab organization..." for package in $(find ./output -name "unilabos-full*.conda"); do - anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" + conda run -n build-env anaconda -t ${{ secrets.ANACONDA_API_TOKEN }} upload --user uni-lab --force "$package" done - name: List built packages diff --git a/scripts/workflow.py b/scripts/workflow.py index be7bbd1e..8bd89640 100644 --- a/scripts/workflow.py +++ b/scripts/workflow.py @@ -2,7 +2,6 @@ import json import logging import traceback import uuid -import xml.etree.ElementTree as ET from typing import Any, Dict, List import networkx as nx @@ -25,7 +24,15 @@ class SimpleGraph: def add_edge(self, source, target, **attrs): """添加边""" - edge = {"source": source, "target": target, **attrs} + # edge = {"source": source, "target": target, **attrs} + edge = { + "source": source, "target": target, + "source_node_uuid": source, + "target_node_uuid": target, + "source_handle_io": "source", + "target_handle_io": "target", + **attrs + } self.edges.append(edge) def to_dict(self): @@ -42,6 +49,7 @@ class SimpleGraph: "multigraph": False, "graph": {}, "nodes": nodes_list, + "edges": self.edges, "links": self.edges, } @@ -58,495 +66,8 @@ def extract_json_from_markdown(text: str) -> str: return text -def convert_to_type(val: str) -> Any: - """将字符串值转换为适当的数据类型""" - if val == "True": - return True - if val == "False": - return False - if val == "?": - return None - if val.endswith(" g"): - return float(val.split(" ")[0]) - if val.endswith("mg"): - return float(val.split("mg")[0]) - elif val.endswith("mmol"): - return float(val.split("mmol")[0]) / 1000 - elif val.endswith("mol"): - return float(val.split("mol")[0]) - elif val.endswith("ml"): - return float(val.split("ml")[0]) - elif val.endswith("RPM"): - return float(val.split("RPM")[0]) - elif val.endswith(" °C"): - return float(val.split(" ")[0]) - elif val.endswith(" %"): - return float(val.split(" ")[0]) - return val -def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """统一的数据重构函数,根据操作类型自动选择模板""" - refactored_data = [] - - # 定义操作映射,包含生物实验和有机化学的所有操作 - OPERATION_MAPPING = { - # 生物实验操作 - "transfer_liquid": "SynBioFactory-liquid_handler.prcxi-transfer_liquid", - "transfer": "SynBioFactory-liquid_handler.biomek-transfer", - "incubation": "SynBioFactory-liquid_handler.biomek-incubation", - "move_labware": "SynBioFactory-liquid_handler.biomek-move_labware", - "oscillation": "SynBioFactory-liquid_handler.biomek-oscillation", - # 有机化学操作 - "HeatChillToTemp": "SynBioFactory-workstation-HeatChillProtocol", - "StopHeatChill": "SynBioFactory-workstation-HeatChillStopProtocol", - "StartHeatChill": "SynBioFactory-workstation-HeatChillStartProtocol", - "HeatChill": "SynBioFactory-workstation-HeatChillProtocol", - "Dissolve": "SynBioFactory-workstation-DissolveProtocol", - "Transfer": "SynBioFactory-workstation-TransferProtocol", - "Evaporate": "SynBioFactory-workstation-EvaporateProtocol", - "Recrystallize": "SynBioFactory-workstation-RecrystallizeProtocol", - "Filter": "SynBioFactory-workstation-FilterProtocol", - "Dry": "SynBioFactory-workstation-DryProtocol", - "Add": "SynBioFactory-workstation-AddProtocol", - } - - UNSUPPORTED_OPERATIONS = ["Purge", "Wait", "Stir", "ResetHandling"] - - for step in data: - operation = step.get("action") - if not operation or operation in UNSUPPORTED_OPERATIONS: - continue - - # 处理重复操作 - if operation == "Repeat": - times = step.get("times", step.get("parameters", {}).get("times", 1)) - sub_steps = step.get("steps", step.get("parameters", {}).get("steps", [])) - for i in range(int(times)): - sub_data = refactor_data(sub_steps) - refactored_data.extend(sub_data) - continue - - # 获取模板名称 - template = OPERATION_MAPPING.get(operation) - if not template: - # 自动推断模板类型 - if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]: - template = f"SynBioFactory-liquid_handler.biomek-{operation}" - else: - template = f"SynBioFactory-workstation-{operation}Protocol" - - # 创建步骤数据 - step_data = { - "template": template, - "description": step.get("description", step.get("purpose", f"{operation} operation")), - "lab_node_type": "Device", - "parameters": step.get("parameters", step.get("action_args", {})), - } - refactored_data.append(step_data) - - return refactored_data - - -def build_protocol_graph( - labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str -) -> SimpleGraph: - """统一的协议图构建函数,根据设备类型自动选择构建逻辑""" - G = SimpleGraph() - resource_last_writer = {} - LAB_NAME = "SynBioFactory" - - protocol_steps = refactor_data(protocol_steps) - - # 检查协议步骤中的模板来判断协议类型 - has_biomek_template = any( - ("biomek" in step.get("template", "")) or ("prcxi" in step.get("template", "")) - for step in protocol_steps - ) - - if has_biomek_template: - # 生物实验协议图构建 - for labware_id, labware in labware_info.items(): - node_id = str(uuid.uuid4()) - - labware_attrs = labware.copy() - labware_id = labware_attrs.pop("id", labware_attrs.get("name", f"labware_{uuid.uuid4()}")) - labware_attrs["description"] = labware_id - labware_attrs["lab_node_type"] = ( - "Reagent" if "Plate" in str(labware_id) else "Labware" if "Rack" in str(labware_id) else "Sample" - ) - labware_attrs["device_id"] = workstation_name - - G.add_node(node_id, template=f"{LAB_NAME}-host_node-create_resource", **labware_attrs) - resource_last_writer[labware_id] = f"{node_id}:labware" - - # 处理协议步骤 - prev_node = None - for i, step in enumerate(protocol_steps): - node_id = str(uuid.uuid4()) - G.add_node(node_id, **step) - - # 添加控制流边 - if prev_node is not None: - G.add_edge(prev_node, node_id, source_port="ready", target_port="ready") - prev_node = node_id - - # 处理物料流 - params = step.get("parameters", {}) - if "sources" in params and params["sources"] in resource_last_writer: - source_node, source_port = resource_last_writer[params["sources"]].split(":") - G.add_edge(source_node, node_id, source_port=source_port, target_port="labware") - - if "targets" in params: - resource_last_writer[params["targets"]] = f"{node_id}:labware" - - # 添加协议结束节点 - end_id = str(uuid.uuid4()) - G.add_node(end_id, template=f"{LAB_NAME}-liquid_handler.biomek-run_protocol") - if prev_node is not None: - G.add_edge(prev_node, end_id, source_port="ready", target_port="ready") - - else: - # 有机化学协议图构建 - WORKSTATION_ID = workstation_name - - # 为所有labware创建资源节点 - for item_id, item in labware_info.items(): - # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") - node_id = str(uuid.uuid4()) - - # 判断节点类型 - if item.get("type") == "hardware" or "reactor" in str(item_id).lower(): - if "reactor" not in str(item_id).lower(): - continue - lab_node_type = "Sample" - description = f"Prepare Reactor: {item_id}" - liquid_type = [] - liquid_volume = [] - else: - lab_node_type = "Reagent" - description = f"Add Reagent to Flask: {item_id}" - liquid_type = [item_id] - liquid_volume = [1e5] - - G.add_node( - node_id, - template=f"{LAB_NAME}-host_node-create_resource", - description=description, - lab_node_type=lab_node_type, - res_id=item_id, - device_id=WORKSTATION_ID, - class_name="container", - parent=WORKSTATION_ID, - bind_locations={"x": 0.0, "y": 0.0, "z": 0.0}, - liquid_input_slot=[-1], - liquid_type=liquid_type, - liquid_volume=liquid_volume, - slot_on_deck="", - role=item.get("role", ""), - ) - resource_last_writer[item_id] = f"{node_id}:labware" - - last_control_node_id = None - - # 处理协议步骤 - for step in protocol_steps: - node_id = str(uuid.uuid4()) - G.add_node(node_id, **step) - - # 控制流 - if last_control_node_id is not None: - G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready") - last_control_node_id = node_id - - # 物料流 - params = step.get("parameters", {}) - input_resources = { - "Vessel": params.get("vessel"), - "ToVessel": params.get("to_vessel"), - "FromVessel": params.get("from_vessel"), - "reagent": params.get("reagent"), - "solvent": params.get("solvent"), - "compound": params.get("compound"), - "sources": params.get("sources"), - "targets": params.get("targets"), - } - - for target_port, resource_name in input_resources.items(): - if resource_name and resource_name in resource_last_writer: - source_node, source_port = resource_last_writer[resource_name].split(":") - G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port) - - output_resources = { - "VesselOut": params.get("vessel"), - "FromVesselOut": params.get("from_vessel"), - "ToVesselOut": params.get("to_vessel"), - "FiltrateOut": params.get("filtrate_vessel"), - "reagent": params.get("reagent"), - "solvent": params.get("solvent"), - "compound": params.get("compound"), - "sources_out": params.get("sources"), - "targets_out": params.get("targets"), - } - - for source_port, resource_name in output_resources.items(): - if resource_name: - resource_last_writer[resource_name] = f"{node_id}:{source_port}" - - return G - - -def draw_protocol_graph(protocol_graph: SimpleGraph, output_path: str): - """ - (辅助功能) 使用 networkx 和 matplotlib 绘制协议工作流图,用于可视化。 - """ - if not protocol_graph: - print("Cannot draw graph: Graph object is empty.") - return - - G = nx.DiGraph() - - for node_id, attrs in protocol_graph.nodes.items(): - label = attrs.get("description", attrs.get("template", node_id[:8])) - G.add_node(node_id, label=label, **attrs) - - for edge in protocol_graph.edges: - G.add_edge(edge["source"], edge["target"]) - - plt.figure(figsize=(20, 15)) - try: - pos = nx.nx_agraph.graphviz_layout(G, prog="dot") - except Exception: - pos = nx.shell_layout(G) # Fallback layout - - node_labels = {node: data["label"] for node, data in G.nodes(data=True)} - nx.draw( - G, - pos, - with_labels=False, - node_size=2500, - node_color="skyblue", - node_shape="o", - edge_color="gray", - width=1.5, - arrowsize=15, - ) - nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=8, font_weight="bold") - - plt.title("Chemical Protocol Workflow Graph", size=15) - plt.savefig(output_path, dpi=300, bbox_inches="tight") - plt.close() - print(f" - Visualization saved to '{output_path}'") - - -from networkx.drawing.nx_agraph import to_agraph -import re - -COMPASS = {"n","e","s","w","ne","nw","se","sw","c"} - -def _is_compass(port: str) -> bool: - return isinstance(port, str) and port.lower() in COMPASS - -def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"): - """ - 使用 Graphviz 端口语法绘制协议工作流图。 - - 若边上的 source_port/target_port 是 compass(n/e/s/w/...),直接用 compass。 - - 否则自动为节点创建 record 形状并定义命名端口 。 - 最终由 PyGraphviz 渲染并输出到 output_path(后缀决定格式,如 .png/.svg/.pdf)。 - """ - if not protocol_graph: - print("Cannot draw graph: Graph object is empty.") - return - - # 1) 先用 networkx 搭建有向图,保留端口属性 - G = nx.DiGraph() - for node_id, attrs in protocol_graph.nodes.items(): - label = attrs.get("description", attrs.get("template", node_id[:8])) - # 保留一个干净的“中心标签”,用于放在 record 的中间槽 - G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)}) - - edges_data = [] - in_ports_by_node = {} # 收集命名输入端口 - out_ports_by_node = {} # 收集命名输出端口 - - for edge in protocol_graph.edges: - u = edge["source"] - v = edge["target"] - sp = edge.get("source_port") - tp = edge.get("target_port") - - # 记录到图里(保留原始端口信息) - G.add_edge(u, v, source_port=sp, target_port=tp) - edges_data.append((u, v, sp, tp)) - - # 如果不是 compass,就按“命名端口”先归类,等会儿给节点造 record - if sp and not _is_compass(sp): - out_ports_by_node.setdefault(u, set()).add(str(sp)) - if tp and not _is_compass(tp): - in_ports_by_node.setdefault(v, set()).add(str(tp)) - - # 2) 转为 AGraph,使用 Graphviz 渲染 - A = to_agraph(G) - A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10") - A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica") - A.edge_attr.update(arrowsize="0.8", color="#666666") - - # 3) 为需要命名端口的节点设置 record 形状与 label - # 左列 = 输入端口;中间 = 核心标签;右列 = 输出端口 - for n in A.nodes(): - node = A.get_node(n) - core = G.nodes[n].get("_core_label", n) - - in_ports = sorted(in_ports_by_node.get(n, [])) - out_ports = sorted(out_ports_by_node.get(n, [])) - - # 如果该节点涉及命名端口,则用 record;否则保留原 box - if in_ports or out_ports: - def port_fields(ports): - if not ports: - return " " # 必须留一个空槽占位 - # 每个端口一个小格子,

name - return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports) - - left = port_fields(in_ports) - right = port_fields(out_ports) - - # 三栏:左(入) | 中(节点名) | 右(出) - record_label = f"{{ {left} | {core} | {right} }}" - node.attr.update(shape="record", label=record_label) - else: - # 没有命名端口:普通盒子,显示核心标签 - node.attr.update(label=str(core)) - - # 4) 给边设置 headport / tailport - # - 若端口为 compass:直接用 compass(e.g., headport="e") - # - 若端口为命名端口:使用在 record 中定义的 名(同名即可) - for (u, v, sp, tp) in edges_data: - e = A.get_edge(u, v) - - # Graphviz 属性:tail 是源,head 是目标 - if sp: - if _is_compass(sp): - e.attr["tailport"] = sp.lower() - else: - # 与 record label 中 名一致;特殊字符已在 label 中做了清洗 - e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp)) - - if tp: - if _is_compass(tp): - e.attr["headport"] = tp.lower() - else: - e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp)) - - # 可选:若想让边更贴边缘,可设置 constraint/spline 等 - # e.attr["arrowhead"] = "vee" - - # 5) 输出 - A.draw(output_path, prog="dot") - print(f" - Port-aware workflow rendered to '{output_path}'") - - -def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]: - """展平嵌套的XDL程序结构""" - flattened_operations = [] - TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"] - - def extract_operations(element: ET.Element): - if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]: - if element.tag not in TEMP_UNSUPPORTED_PROTOCOL: - flattened_operations.append(element) - - for child in element: - extract_operations(child) - - for child in procedure_elem: - extract_operations(child) - - return flattened_operations - - -def parse_xdl_content(xdl_content: str) -> tuple: - """解析XDL内容""" - try: - xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable()) - root = ET.fromstring(xdl_content_cleaned) - - synthesis_elem = root.find("Synthesis") - if synthesis_elem is None: - return None, None, None - - # 解析硬件组件 - hardware_elem = synthesis_elem.find("Hardware") - hardware = [] - if hardware_elem is not None: - hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")] - - # 解析试剂 - reagents_elem = synthesis_elem.find("Reagents") - reagents = [] - if reagents_elem is not None: - reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")] - - # 解析程序 - procedure_elem = synthesis_elem.find("Procedure") - if procedure_elem is None: - return None, None, None - - flattened_operations = flatten_xdl_procedure(procedure_elem) - return hardware, reagents, flattened_operations - - except ET.ParseError as e: - raise ValueError(f"Invalid XDL format: {e}") - - -def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]: - """ - 将XDL XML格式转换为标准的字典格式 - - Args: - xdl_content: XDL XML内容 - - Returns: - 转换结果,包含步骤和器材信息 - """ - try: - hardware, reagents, flattened_operations = parse_xdl_content(xdl_content) - if hardware is None: - return {"error": "Failed to parse XDL content", "success": False} - - # 将XDL元素转换为字典格式 - steps_data = [] - for elem in flattened_operations: - # 转换参数类型 - parameters = {} - for key, val in elem.attrib.items(): - converted_val = convert_to_type(val) - if converted_val is not None: - parameters[key] = converted_val - - step_dict = { - "operation": elem.tag, - "parameters": parameters, - "description": elem.get("purpose", f"Operation: {elem.tag}"), - } - steps_data.append(step_dict) - - # 合并硬件和试剂为统一的labware_info格式 - labware_data = [] - labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware) - labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents) - - return { - "success": True, - "steps": steps_data, - "labware": labware_data, - "message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.", - } - - except Exception as e: - error_msg = f"XDL conversion failed: {str(e)}" - logger.error(error_msg) - return {"error": error_msg, "success": False} def create_workflow( diff --git a/unilabos/app/model.py b/unilabos/app/model.py index f80ce35a..3a031aaa 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -59,6 +59,7 @@ class JobAddReq(BaseModel): task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="") job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="") node_id: str = Field(examples=["node_id"], description="node uuid", default="") + notebook_id: str = Field(examples=["notebook_id"], description="notebook uuid", default="") server_info: dict = Field( examples=[{"send_timestamp": 1717000000.0}], description="server info (auto-generated if empty)", diff --git a/unilabos/app/web/controller.py b/unilabos/app/web/controller.py index 6a01645c..147b4d20 100644 --- a/unilabos/app/web/controller.py +++ b/unilabos/app/web/controller.py @@ -320,6 +320,7 @@ def job_add(req: JobAddReq) -> JobData: action_name=action_name, task_id=task_id, job_id=job_id, + notebook_id=req.notebook_id, device_action_key=device_action_key, ) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 4823a232..fbe19b43 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -59,6 +59,7 @@ class QueueItem: action_name: str task_id: str job_id: str + notebook_id: str device_action_key: str next_run_time: float = 0 # 下次执行时间戳 retry_count: int = 0 # 重试次数 @@ -71,6 +72,7 @@ class JobInfo: job_id: str task_id: str device_id: str + notebook_id: str action_name: str device_action_key: str status: JobStatus @@ -539,7 +541,10 @@ class MessageProcessor: self.reconnect_count += 1 backoff = WSConfig.reconnect_interval logger.info( - f"[MessageProcessor] 即将在 {backoff} 秒后重连 (已尝试 {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" + "[MessageProcessor] 即将在 %s 秒后重连 (已尝试 %s/%s)", + backoff, + self.reconnect_count, + WSConfig.max_reconnect_attempts, ) await asyncio.sleep(backoff) else: @@ -703,6 +708,7 @@ class MessageProcessor: action_name = data.get("action_name", "") task_id = data.get("task_id", "") job_id = data.get("job_id", "") + notebook_id = data.get("notebook_id", "") if not all([device_id, action_name, task_id, job_id]): logger.error("[MessageProcessor] Missing required fields in query_action_state") @@ -718,6 +724,7 @@ class MessageProcessor: job_id=job_id, task_id=task_id, device_id=device_id, + notebook_id=notebook_id, action_name=action_name, device_action_key=device_action_key, status=JobStatus.QUEUE, @@ -732,13 +739,27 @@ class MessageProcessor: if can_start_immediately: # 可以立即开始 await self._send_action_state_response( - device_id, action_name, task_id, job_id, "query_action_status", True, 0 + device_id, + action_name, + task_id, + job_id, + "query_action_status", + True, + 0, + notebook_id=notebook_id, ) logger.trace(f"[MessageProcessor] Job {job_log} can start immediately") else: # 需要排队 await self._send_action_state_response( - device_id, action_name, task_id, job_id, "query_action_status", False, 10 + device_id, + action_name, + task_id, + job_id, + "query_action_status", + False, + 10, + notebook_id=notebook_id, ) logger.trace(f"[MessageProcessor] Job {job_log} queued") @@ -768,6 +789,7 @@ class MessageProcessor: job_id=req.job_id, task_id=req.task_id, device_id=req.device_id, + notebook_id=req.notebook_id, action_name=action_name, device_action_key=device_action_key, status=JobStatus.QUEUE, @@ -775,11 +797,16 @@ class MessageProcessor: always_free=True, ) self.device_manager.add_queue_request(job_info) + existing_job = job_info logger.info(f"[MessageProcessor] Job {job_log} always_free, auto-registered from direct job_start") else: logger.error(f"[MessageProcessor] Job {job_log} not registered (missing query_action_state)") return + if existing_job and req.notebook_id and not existing_job.notebook_id: + existing_job.notebook_id = req.notebook_id + notebook_id = req.notebook_id or (existing_job.notebook_id if existing_job else "") + success = self.device_manager.start_job(req.job_id) if not success: logger.error(f"[MessageProcessor] Failed to start job {job_log}") @@ -795,6 +822,7 @@ class MessageProcessor: action_name=req.action, task_id=req.task_id, job_id=req.job_id, + notebook_id=notebook_id, device_action_key=device_action_key, ) @@ -834,6 +862,7 @@ class MessageProcessor: "job_id": req.job_id, "task_id": req.task_id, "device_id": req.device_id, + "notebook_id": queue_item.notebook_id, "action_name": req.action, "status": "failed", "feedback_data": {}, @@ -855,6 +884,7 @@ class MessageProcessor: "query_action_status", True, 0, + notebook_id=next_job.notebook_id, ) next_job_log = format_job_log( next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name @@ -1101,7 +1131,15 @@ class MessageProcessor: logger.info(f"[MessageProcessor] Restart cleanup scheduled") async def _send_action_state_response( - self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int + self, + device_id: str, + action_name: str, + task_id: str, + job_id: str, + typ: str, + free: bool, + need_more: int, + notebook_id: str = "", ): """发送动作状态响应""" message = { @@ -1112,6 +1150,7 @@ class MessageProcessor: "action_name": action_name, "task_id": task_id, "job_id": job_id, + "notebook_id": notebook_id, "free": free, "need_more": need_more + 1, }, @@ -1194,6 +1233,7 @@ class QueueProcessor: action_name=timeout_job.action_name, task_id=timeout_job.task_id, job_id=timeout_job.job_id, + notebook_id=timeout_job.notebook_id, device_action_key=timeout_job.device_action_key, ) # 发布超时失败状态,这会触发正常的job完成流程 @@ -1252,6 +1292,7 @@ class QueueProcessor: "action_name": job_info.action_name, "task_id": job_info.task_id, "job_id": job_info.job_id, + "notebook_id": job_info.notebook_id, "free": False, "need_more": 10 + 1, }, @@ -1291,6 +1332,7 @@ class QueueProcessor: "action_name": job_info.action_name, "task_id": job_info.task_id, "job_id": job_info.job_id, + "notebook_id": job_info.notebook_id, "free": False, "need_more": 10 + 1, }, @@ -1336,12 +1378,15 @@ class QueueProcessor: "action_name": next_job.action_name, "task_id": next_job.task_id, "job_id": next_job.job_id, + "notebook_id": next_job.notebook_id, "free": True, "need_more": 0, }, } self.message_processor.send_message(message) - # next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name) + # next_job_log = format_job_log( + # next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name + # ) # logger.debug(f"[QueueProcessor] Notified next job {next_job_log} can start") # 立即触发下一轮状态检查 @@ -1510,6 +1555,7 @@ class WebSocketClient(BaseCommunicationClient): "job_id": item.job_id, "task_id": item.task_id, "device_id": item.device_id, + "notebook_id": item.notebook_id, "action_name": item.action_name, "status": status, "feedback_data": feedback_data, diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 4a6911ca..b3ad7368 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -42,7 +42,7 @@ def canonicalize_nodes_data( Returns: ResourceTreeSet: 标准化后的资源树集合 """ - print_status(f"{len(nodes)} Resources loaded:", "info") + print_status(f"{len(nodes)} Resources loaded", "info") # 第一步:基本预处理(处理graphml的label字段) outer_host_node_id = None diff --git a/unilabos/workflow/__init__.py b/unilabos/workflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/workflow/from_python_script.py b/unilabos/workflow/from_python_script.py new file mode 100644 index 00000000..5a8ce38e --- /dev/null +++ b/unilabos/workflow/from_python_script.py @@ -0,0 +1,241 @@ +import ast +import json +from typing import Dict, List, Any, Tuple, Optional + +from .common import WorkflowGraph, RegistryAdapter + +Json = Dict[str, Any] + +# ---------------- Converter ---------------- + +class DeviceMethodConverter: + """ + - 字段统一:resource_name(原 device_class)、template_name(原 action_key) + - params 单层;inputs 使用 'params.' 前缀 + - SimpleGraph.add_workflow_node 负责变量连线与边 + """ + def __init__(self, device_registry: Optional[Dict[str, Any]] = None): + self.graph = WorkflowGraph() + self.variable_sources: Dict[str, Dict[str, Any]] = {} # var -> {node_id, output_name} + self.instance_to_resource: Dict[str, Optional[str]] = {} # 实例名 -> resource_name + self.node_id_counter: int = 0 + self.registry = RegistryAdapter(device_registry or {}) + + # ---- helpers ---- + def _new_node_id(self) -> int: + nid = self.node_id_counter + self.node_id_counter += 1 + return nid + + def _assign_targets(self, targets) -> List[str]: + names: List[str] = [] + import ast + if isinstance(targets, ast.Tuple): + for elt in targets.elts: + if isinstance(elt, ast.Name): + names.append(elt.id) + elif isinstance(targets, ast.Name): + names.append(targets.id) + return names + + def _extract_device_instantiation(self, node) -> Optional[Tuple[str, str]]: + import ast + if not isinstance(node.value, ast.Call): + return None + callee = node.value.func + if isinstance(callee, ast.Name): + class_name = callee.id + elif isinstance(callee, ast.Attribute) and isinstance(callee.value, ast.Name): + class_name = callee.attr + else: + return None + if isinstance(node.targets[0], ast.Name): + instance = node.targets[0].id + return instance, class_name + return None + + def _extract_call(self, call) -> Tuple[str, str, Dict[str, Any], str]: + import ast + owner_name, method_name, call_kind = "", "", "func" + if isinstance(call.func, ast.Attribute): + method_name = call.func.attr + if isinstance(call.func.value, ast.Name): + owner_name = call.func.value.id + call_kind = "instance" if owner_name in self.instance_to_resource else "class_or_module" + elif isinstance(call.func.value, ast.Attribute) and isinstance(call.func.value.value, ast.Name): + owner_name = call.func.value.attr + call_kind = "class_or_module" + elif isinstance(call.func, ast.Name): + method_name = call.func.id + call_kind = "func" + + def pack(node): + if isinstance(node, ast.Name): + return {"type": "variable", "value": node.id} + if isinstance(node, ast.Constant): + return {"type": "constant", "value": node.value} + if isinstance(node, ast.Dict): + return {"type": "dict", "value": self._parse_dict(node)} + if isinstance(node, ast.List): + return {"type": "list", "value": self._parse_list(node)} + return {"type": "raw", "value": ast.unparse(node) if hasattr(ast, "unparse") else str(node)} + + args: Dict[str, Any] = {} + pos: List[Any] = [] + for a in call.args: + pos.append(pack(a)) + for kw in call.keywords: + args[kw.arg] = pack(kw.value) + if pos: + args["_positional"] = pos + return owner_name, method_name, args, call_kind + + def _parse_dict(self, node) -> Dict[str, Any]: + import ast + out: Dict[str, Any] = {} + for k, v in zip(node.keys, node.values): + if isinstance(k, ast.Constant): + key = str(k.value) + if isinstance(v, ast.Name): + out[key] = f"var:{v.id}" + elif isinstance(v, ast.Constant): + out[key] = v.value + elif isinstance(v, ast.Dict): + out[key] = self._parse_dict(v) + elif isinstance(v, ast.List): + out[key] = self._parse_list(v) + return out + + def _parse_list(self, node) -> List[Any]: + import ast + out: List[Any] = [] + for elt in node.elts: + if isinstance(elt, ast.Name): + out.append(f"var:{elt.id}") + elif isinstance(elt, ast.Constant): + out.append(elt.value) + elif isinstance(elt, ast.Dict): + out.append(self._parse_dict(elt)) + elif isinstance(elt, ast.List): + out.append(self._parse_list(elt)) + return out + + def _normalize_var_tokens(self, x: Any) -> Any: + if isinstance(x, str) and x.startswith("var:"): + return {"__var__": x[4:]} + if isinstance(x, list): + return [self._normalize_var_tokens(i) for i in x] + if isinstance(x, dict): + return {k: self._normalize_var_tokens(v) for k, v in x.items()} + return x + + def _make_params_payload(self, resource_name: Optional[str], template_name: str, call_args: Dict[str, Any]) -> Dict[str, Any]: + input_keys = self.registry.get_action_input_keys(resource_name, template_name) if resource_name else [] + defaults = self.registry.get_action_goal_default(resource_name, template_name) if resource_name else {} + params: Dict[str, Any] = dict(defaults) + + def unpack(p): + t, v = p.get("type"), p.get("value") + if t == "variable": + return {"__var__": v} + if t == "dict": + return self._normalize_var_tokens(v) + if t == "list": + return self._normalize_var_tokens(v) + return v + + for k, p in call_args.items(): + if k == "_positional": + continue + params[k] = unpack(p) + + pos = call_args.get("_positional", []) + if pos: + if input_keys: + for i, p in enumerate(pos): + if i >= len(input_keys): + break + name = input_keys[i] + if name in params: + continue + params[name] = unpack(p) + else: + for i, p in enumerate(pos): + params[f"arg_{i}"] = unpack(p) + return params + + # ---- handlers ---- + def _on_assign(self, stmt): + import ast + inst = self._extract_device_instantiation(stmt) + if inst: + instance, code_class = inst + resource_name = self.registry.resolve_resource_by_classname(code_class) + self.instance_to_resource[instance] = resource_name + return + + if isinstance(stmt.value, ast.Call): + owner, method, call_args, kind = self._extract_call(stmt.value) + if kind == "instance": + device_key = owner + resource_name = self.instance_to_resource.get(owner) + else: + device_key = owner + resource_name = self.registry.resolve_resource_by_classname(owner) + + module = self.registry.get_device_module(resource_name) + params = self._make_params_payload(resource_name, method, call_args) + + nid = self._new_node_id() + self.graph.add_workflow_node( + nid, + device_key=device_key, + resource_name=resource_name, # ✅ + module=module, + template_name=method, # ✅ + params=params, + variable_sources=self.variable_sources, + add_ready_if_no_vars=True, + prev_node_id=(nid - 1) if nid > 0 else None, + ) + + out_vars = self._assign_targets(stmt.targets[0]) + for var in out_vars: + self.variable_sources[var] = {"node_id": nid, "output_name": "result"} + + def _on_expr(self, stmt): + import ast + if not isinstance(stmt.value, ast.Call): + return + owner, method, call_args, kind = self._extract_call(stmt.value) + if kind == "instance": + device_key = owner + resource_name = self.instance_to_resource.get(owner) + else: + device_key = owner + resource_name = self.registry.resolve_resource_by_classname(owner) + + module = self.registry.get_device_module(resource_name) + params = self._make_params_payload(resource_name, method, call_args) + + nid = self._new_node_id() + self.graph.add_workflow_node( + nid, + device_key=device_key, + resource_name=resource_name, # ✅ + module=module, + template_name=method, # ✅ + params=params, + variable_sources=self.variable_sources, + add_ready_if_no_vars=True, + prev_node_id=(nid - 1) if nid > 0 else None, + ) + + def convert(self, python_code: str): + tree = ast.parse(python_code) + for stmt in tree.body: + if isinstance(stmt, ast.Assign): + self._on_assign(stmt) + elif isinstance(stmt, ast.Expr): + self._on_expr(stmt) + return self diff --git a/unilabos/workflow/from_xdl.py b/unilabos/workflow/from_xdl.py new file mode 100644 index 00000000..1041f9ad --- /dev/null +++ b/unilabos/workflow/from_xdl.py @@ -0,0 +1,131 @@ +from typing import List, Any, Dict +import xml.etree.ElementTree as ET + + +def convert_to_type(val: str) -> Any: + """将字符串值转换为适当的数据类型""" + if val == "True": + return True + if val == "False": + return False + if val == "?": + return None + if val.endswith(" g"): + return float(val.split(" ")[0]) + if val.endswith("mg"): + return float(val.split("mg")[0]) + elif val.endswith("mmol"): + return float(val.split("mmol")[0]) / 1000 + elif val.endswith("mol"): + return float(val.split("mol")[0]) + elif val.endswith("ml"): + return float(val.split("ml")[0]) + elif val.endswith("RPM"): + return float(val.split("RPM")[0]) + elif val.endswith(" °C"): + return float(val.split(" ")[0]) + elif val.endswith(" %"): + return float(val.split(" ")[0]) + return val + + +def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]: + """展平嵌套的XDL程序结构""" + flattened_operations = [] + TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"] + + def extract_operations(element: ET.Element): + if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]: + if element.tag not in TEMP_UNSUPPORTED_PROTOCOL: + flattened_operations.append(element) + + for child in element: + extract_operations(child) + + for child in procedure_elem: + extract_operations(child) + + return flattened_operations + + +def parse_xdl_content(xdl_content: str) -> tuple: + """解析XDL内容""" + try: + xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable()) + root = ET.fromstring(xdl_content_cleaned) + + synthesis_elem = root.find("Synthesis") + if synthesis_elem is None: + return None, None, None + + # 解析硬件组件 + hardware_elem = synthesis_elem.find("Hardware") + hardware = [] + if hardware_elem is not None: + hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")] + + # 解析试剂 + reagents_elem = synthesis_elem.find("Reagents") + reagents = [] + if reagents_elem is not None: + reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")] + + # 解析程序 + procedure_elem = synthesis_elem.find("Procedure") + if procedure_elem is None: + return None, None, None + + flattened_operations = flatten_xdl_procedure(procedure_elem) + return hardware, reagents, flattened_operations + + except ET.ParseError as e: + raise ValueError(f"Invalid XDL format: {e}") + + +def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]: + """ + 将XDL XML格式转换为标准的字典格式 + + Args: + xdl_content: XDL XML内容 + + Returns: + 转换结果,包含步骤和器材信息 + """ + try: + hardware, reagents, flattened_operations = parse_xdl_content(xdl_content) + if hardware is None: + return {"error": "Failed to parse XDL content", "success": False} + + # 将XDL元素转换为字典格式 + steps_data = [] + for elem in flattened_operations: + # 转换参数类型 + parameters = {} + for key, val in elem.attrib.items(): + converted_val = convert_to_type(val) + if converted_val is not None: + parameters[key] = converted_val + + step_dict = { + "operation": elem.tag, + "parameters": parameters, + "description": elem.get("purpose", f"Operation: {elem.tag}"), + } + steps_data.append(step_dict) + + # 合并硬件和试剂为统一的labware_info格式 + labware_data = [] + labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware) + labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents) + + return { + "success": True, + "steps": steps_data, + "labware": labware_data, + "message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.", + } + + except Exception as e: + error_msg = f"XDL conversion failed: {str(e)}" + return {"error": error_msg, "success": False}