mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-05-23 05:26:09 +00:00
Compare commits
89 Commits
dependabot
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bcb1790897 | ||
|
|
916a6dfc60 | ||
|
|
f71ea2a258 | ||
|
|
f6b2bfaf8e | ||
|
|
71107e9552 | ||
|
|
1ad4766221 | ||
|
|
67a74172dc | ||
|
|
ccbf5378dd | ||
|
|
c001f6a151 | ||
|
|
145fcaae65 | ||
|
|
a79c0a88bf | ||
|
|
06b6f0d804 | ||
|
|
b551e69f64 | ||
|
|
5179a7e48e | ||
|
|
3a2d9e9603 | ||
|
|
a277bd2bed | ||
|
|
176de521b4 | ||
|
|
38c5c267af | ||
|
|
2a5ddd611d | ||
|
|
8580b84167 | ||
|
|
3f80349d7d | ||
|
|
024156848e | ||
|
|
8066c200b9 | ||
|
|
266366cc25 | ||
|
|
121c3985cc | ||
|
|
6ca5c72fc6 | ||
|
|
bc8c49ddda | ||
|
|
28f93737ac | ||
|
|
5dc81ec9be | ||
|
|
13a6795657 | ||
|
|
53219d8b04 | ||
|
|
b1cdef9185 | ||
|
|
9854ed8c9c | ||
|
|
52544a2c69 | ||
|
|
5ce433e235 | ||
|
|
c7c14d2332 | ||
|
|
6fdd482649 | ||
|
|
d390236318 | ||
|
|
ed8ee29732 | ||
|
|
ffc583e9d5 | ||
|
|
f1ad0c9c96 | ||
|
|
8fa3407649 | ||
|
|
d3282822fc | ||
|
|
554bcade24 | ||
|
|
a662c75de1 | ||
|
|
931614fe64 | ||
|
|
d39662f65f | ||
|
|
acf5fdebf8 | ||
|
|
7f7b1c13c0 | ||
|
|
75f09034ff | ||
|
|
549a50220b | ||
|
|
4189a2cfbe | ||
|
|
48895a9bb1 | ||
|
|
891f126ed6 | ||
|
|
4d3475a849 | ||
|
|
b475db66df | ||
|
|
a625a86e3e | ||
|
|
37e0f1037c | ||
|
|
a242253145 | ||
|
|
448e0074b7 | ||
|
|
304827fc8d | ||
|
|
872b3d781f | ||
|
|
813400f2b4 | ||
|
|
b6dfe2b944 | ||
|
|
8807865649 | ||
|
|
5fc7eb7586 | ||
|
|
9bd72b48e1 | ||
|
|
42b78ab4c1 | ||
|
|
9645609a05 | ||
|
|
a2a827d7ac | ||
|
|
bb3ca645a4 | ||
|
|
37ee43d19a | ||
|
|
bc30f23e34 | ||
|
|
166d84afe1 | ||
|
|
1b43c53015 | ||
|
|
d4415f5a35 | ||
|
|
0260cbbedb | ||
|
|
7c440d10ab | ||
|
|
c85c49817d | ||
|
|
c70eafa5f0 | ||
|
|
b64466d443 | ||
|
|
ef3f24ed48 | ||
|
|
2a8e8d014b | ||
|
|
e0da1c7217 | ||
|
|
51d3e61723 | ||
|
|
6b5765bbf3 | ||
|
|
eb1f3fbe1c | ||
|
|
fb93b1cd94 | ||
|
|
9aeffebde1 |
@@ -3,7 +3,7 @@
|
||||
|
||||
package:
|
||||
name: unilabos
|
||||
version: 0.11.1
|
||||
version: 0.11.2
|
||||
|
||||
source:
|
||||
path: ../../unilabos
|
||||
@@ -54,7 +54,7 @@ requirements:
|
||||
- pymodbus
|
||||
- matplotlib
|
||||
- pylibftdi
|
||||
- uni-lab::unilabos-env ==0.11.1
|
||||
- uni-lab::unilabos-env ==0.11.2
|
||||
|
||||
about:
|
||||
repository: https://github.com/deepmodeling/Uni-Lab-OS
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
package:
|
||||
name: unilabos-env
|
||||
version: 0.11.1
|
||||
version: 0.11.2
|
||||
|
||||
build:
|
||||
noarch: generic
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
package:
|
||||
name: unilabos-full
|
||||
version: 0.11.1
|
||||
version: 0.11.2
|
||||
|
||||
build:
|
||||
noarch: generic
|
||||
@@ -11,7 +11,7 @@ build:
|
||||
requirements:
|
||||
run:
|
||||
# Base unilabos package (includes unilabos-env)
|
||||
- uni-lab::unilabos ==0.11.1
|
||||
- uni-lab::unilabos ==0.11.2
|
||||
# Documentation tools
|
||||
- sphinx
|
||||
- sphinx_rtd_theme
|
||||
|
||||
2
.github/workflows/ci-check.yml
vendored
2
.github/workflows/ci-check.yml
vendored
@@ -25,7 +25,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Setup Miniforge
|
||||
uses: conda-incubator/setup-miniconda@v4
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
miniforge-version: latest
|
||||
use-mamba: true
|
||||
|
||||
2
.github/workflows/conda-pack-build.yml
vendored
2
.github/workflows/conda-pack-build.yml
vendored
@@ -86,7 +86,7 @@ jobs:
|
||||
|
||||
- name: Setup Miniforge (with mamba)
|
||||
if: steps.should_build.outputs.should_build == 'true'
|
||||
uses: conda-incubator/setup-miniconda@v4
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
miniforge-version: latest
|
||||
use-mamba: true
|
||||
|
||||
2
.github/workflows/deploy-docs.yml
vendored
2
.github/workflows/deploy-docs.yml
vendored
@@ -51,7 +51,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Setup Miniforge (with mamba)
|
||||
uses: conda-incubator/setup-miniconda@v4
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
miniforge-version: latest
|
||||
use-mamba: true
|
||||
|
||||
2
.github/workflows/multi-platform-build.yml
vendored
2
.github/workflows/multi-platform-build.yml
vendored
@@ -101,7 +101,7 @@ jobs:
|
||||
|
||||
- name: Setup Miniforge
|
||||
if: steps.should_build.outputs.should_build == 'true'
|
||||
uses: conda-incubator/setup-miniconda@v4
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
miniforge-version: latest
|
||||
use-mamba: true
|
||||
|
||||
2
.github/workflows/unilabos-conda-build.yml
vendored
2
.github/workflows/unilabos-conda-build.yml
vendored
@@ -94,7 +94,7 @@ jobs:
|
||||
|
||||
- name: Setup Miniforge
|
||||
if: steps.should_build.outputs.should_build == 'true'
|
||||
uses: conda-incubator/setup-miniconda@v4
|
||||
uses: conda-incubator/setup-miniconda@v3
|
||||
with:
|
||||
miniforge-version: latest
|
||||
use-mamba: true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package:
|
||||
name: ros-humble-unilabos-msgs
|
||||
version: 0.11.1
|
||||
version: 0.11.2
|
||||
source:
|
||||
path: ../../unilabos_msgs
|
||||
target_directory: src
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package:
|
||||
name: unilabos
|
||||
version: "0.11.1"
|
||||
version: "0.11.2"
|
||||
|
||||
source:
|
||||
path: ../..
|
||||
|
||||
@@ -2,6 +2,7 @@ import json
|
||||
import logging
|
||||
import traceback
|
||||
import uuid
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import networkx as nx
|
||||
@@ -24,15 +25,7 @@ class SimpleGraph:
|
||||
|
||||
def add_edge(self, source, target, **attrs):
|
||||
"""添加边"""
|
||||
# edge = {"source": source, "target": target, **attrs}
|
||||
edge = {
|
||||
"source": source, "target": target,
|
||||
"source_node_uuid": source,
|
||||
"target_node_uuid": target,
|
||||
"source_handle_io": "source",
|
||||
"target_handle_io": "target",
|
||||
**attrs
|
||||
}
|
||||
edge = {"source": source, "target": target, **attrs}
|
||||
self.edges.append(edge)
|
||||
|
||||
def to_dict(self):
|
||||
@@ -49,7 +42,6 @@ class SimpleGraph:
|
||||
"multigraph": False,
|
||||
"graph": {},
|
||||
"nodes": nodes_list,
|
||||
"edges": self.edges,
|
||||
"links": self.edges,
|
||||
}
|
||||
|
||||
@@ -66,8 +58,495 @@ def extract_json_from_markdown(text: str) -> str:
|
||||
return text
|
||||
|
||||
|
||||
def convert_to_type(val: str) -> Any:
|
||||
"""将字符串值转换为适当的数据类型"""
|
||||
if val == "True":
|
||||
return True
|
||||
if val == "False":
|
||||
return False
|
||||
if val == "?":
|
||||
return None
|
||||
if val.endswith(" g"):
|
||||
return float(val.split(" ")[0])
|
||||
if val.endswith("mg"):
|
||||
return float(val.split("mg")[0])
|
||||
elif val.endswith("mmol"):
|
||||
return float(val.split("mmol")[0]) / 1000
|
||||
elif val.endswith("mol"):
|
||||
return float(val.split("mol")[0])
|
||||
elif val.endswith("ml"):
|
||||
return float(val.split("ml")[0])
|
||||
elif val.endswith("RPM"):
|
||||
return float(val.split("RPM")[0])
|
||||
elif val.endswith(" °C"):
|
||||
return float(val.split(" ")[0])
|
||||
elif val.endswith(" %"):
|
||||
return float(val.split(" ")[0])
|
||||
return val
|
||||
|
||||
|
||||
def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""统一的数据重构函数,根据操作类型自动选择模板"""
|
||||
refactored_data = []
|
||||
|
||||
# 定义操作映射,包含生物实验和有机化学的所有操作
|
||||
OPERATION_MAPPING = {
|
||||
# 生物实验操作
|
||||
"transfer_liquid": "SynBioFactory-liquid_handler.prcxi-transfer_liquid",
|
||||
"transfer": "SynBioFactory-liquid_handler.biomek-transfer",
|
||||
"incubation": "SynBioFactory-liquid_handler.biomek-incubation",
|
||||
"move_labware": "SynBioFactory-liquid_handler.biomek-move_labware",
|
||||
"oscillation": "SynBioFactory-liquid_handler.biomek-oscillation",
|
||||
# 有机化学操作
|
||||
"HeatChillToTemp": "SynBioFactory-workstation-HeatChillProtocol",
|
||||
"StopHeatChill": "SynBioFactory-workstation-HeatChillStopProtocol",
|
||||
"StartHeatChill": "SynBioFactory-workstation-HeatChillStartProtocol",
|
||||
"HeatChill": "SynBioFactory-workstation-HeatChillProtocol",
|
||||
"Dissolve": "SynBioFactory-workstation-DissolveProtocol",
|
||||
"Transfer": "SynBioFactory-workstation-TransferProtocol",
|
||||
"Evaporate": "SynBioFactory-workstation-EvaporateProtocol",
|
||||
"Recrystallize": "SynBioFactory-workstation-RecrystallizeProtocol",
|
||||
"Filter": "SynBioFactory-workstation-FilterProtocol",
|
||||
"Dry": "SynBioFactory-workstation-DryProtocol",
|
||||
"Add": "SynBioFactory-workstation-AddProtocol",
|
||||
}
|
||||
|
||||
UNSUPPORTED_OPERATIONS = ["Purge", "Wait", "Stir", "ResetHandling"]
|
||||
|
||||
for step in data:
|
||||
operation = step.get("action")
|
||||
if not operation or operation in UNSUPPORTED_OPERATIONS:
|
||||
continue
|
||||
|
||||
# 处理重复操作
|
||||
if operation == "Repeat":
|
||||
times = step.get("times", step.get("parameters", {}).get("times", 1))
|
||||
sub_steps = step.get("steps", step.get("parameters", {}).get("steps", []))
|
||||
for i in range(int(times)):
|
||||
sub_data = refactor_data(sub_steps)
|
||||
refactored_data.extend(sub_data)
|
||||
continue
|
||||
|
||||
# 获取模板名称
|
||||
template = OPERATION_MAPPING.get(operation)
|
||||
if not template:
|
||||
# 自动推断模板类型
|
||||
if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]:
|
||||
template = f"SynBioFactory-liquid_handler.biomek-{operation}"
|
||||
else:
|
||||
template = f"SynBioFactory-workstation-{operation}Protocol"
|
||||
|
||||
# 创建步骤数据
|
||||
step_data = {
|
||||
"template": template,
|
||||
"description": step.get("description", step.get("purpose", f"{operation} operation")),
|
||||
"lab_node_type": "Device",
|
||||
"parameters": step.get("parameters", step.get("action_args", {})),
|
||||
}
|
||||
refactored_data.append(step_data)
|
||||
|
||||
return refactored_data
|
||||
|
||||
|
||||
def build_protocol_graph(
|
||||
labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str
|
||||
) -> SimpleGraph:
|
||||
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑"""
|
||||
G = SimpleGraph()
|
||||
resource_last_writer = {}
|
||||
LAB_NAME = "SynBioFactory"
|
||||
|
||||
protocol_steps = refactor_data(protocol_steps)
|
||||
|
||||
# 检查协议步骤中的模板来判断协议类型
|
||||
has_biomek_template = any(
|
||||
("biomek" in step.get("template", "")) or ("prcxi" in step.get("template", ""))
|
||||
for step in protocol_steps
|
||||
)
|
||||
|
||||
if has_biomek_template:
|
||||
# 生物实验协议图构建
|
||||
for labware_id, labware in labware_info.items():
|
||||
node_id = str(uuid.uuid4())
|
||||
|
||||
labware_attrs = labware.copy()
|
||||
labware_id = labware_attrs.pop("id", labware_attrs.get("name", f"labware_{uuid.uuid4()}"))
|
||||
labware_attrs["description"] = labware_id
|
||||
labware_attrs["lab_node_type"] = (
|
||||
"Reagent" if "Plate" in str(labware_id) else "Labware" if "Rack" in str(labware_id) else "Sample"
|
||||
)
|
||||
labware_attrs["device_id"] = workstation_name
|
||||
|
||||
G.add_node(node_id, template=f"{LAB_NAME}-host_node-create_resource", **labware_attrs)
|
||||
resource_last_writer[labware_id] = f"{node_id}:labware"
|
||||
|
||||
# 处理协议步骤
|
||||
prev_node = None
|
||||
for i, step in enumerate(protocol_steps):
|
||||
node_id = str(uuid.uuid4())
|
||||
G.add_node(node_id, **step)
|
||||
|
||||
# 添加控制流边
|
||||
if prev_node is not None:
|
||||
G.add_edge(prev_node, node_id, source_port="ready", target_port="ready")
|
||||
prev_node = node_id
|
||||
|
||||
# 处理物料流
|
||||
params = step.get("parameters", {})
|
||||
if "sources" in params and params["sources"] in resource_last_writer:
|
||||
source_node, source_port = resource_last_writer[params["sources"]].split(":")
|
||||
G.add_edge(source_node, node_id, source_port=source_port, target_port="labware")
|
||||
|
||||
if "targets" in params:
|
||||
resource_last_writer[params["targets"]] = f"{node_id}:labware"
|
||||
|
||||
# 添加协议结束节点
|
||||
end_id = str(uuid.uuid4())
|
||||
G.add_node(end_id, template=f"{LAB_NAME}-liquid_handler.biomek-run_protocol")
|
||||
if prev_node is not None:
|
||||
G.add_edge(prev_node, end_id, source_port="ready", target_port="ready")
|
||||
|
||||
else:
|
||||
# 有机化学协议图构建
|
||||
WORKSTATION_ID = workstation_name
|
||||
|
||||
# 为所有labware创建资源节点
|
||||
for item_id, item in labware_info.items():
|
||||
# item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}")
|
||||
node_id = str(uuid.uuid4())
|
||||
|
||||
# 判断节点类型
|
||||
if item.get("type") == "hardware" or "reactor" in str(item_id).lower():
|
||||
if "reactor" not in str(item_id).lower():
|
||||
continue
|
||||
lab_node_type = "Sample"
|
||||
description = f"Prepare Reactor: {item_id}"
|
||||
liquid_type = []
|
||||
liquid_volume = []
|
||||
else:
|
||||
lab_node_type = "Reagent"
|
||||
description = f"Add Reagent to Flask: {item_id}"
|
||||
liquid_type = [item_id]
|
||||
liquid_volume = [1e5]
|
||||
|
||||
G.add_node(
|
||||
node_id,
|
||||
template=f"{LAB_NAME}-host_node-create_resource",
|
||||
description=description,
|
||||
lab_node_type=lab_node_type,
|
||||
res_id=item_id,
|
||||
device_id=WORKSTATION_ID,
|
||||
class_name="container",
|
||||
parent=WORKSTATION_ID,
|
||||
bind_locations={"x": 0.0, "y": 0.0, "z": 0.0},
|
||||
liquid_input_slot=[-1],
|
||||
liquid_type=liquid_type,
|
||||
liquid_volume=liquid_volume,
|
||||
slot_on_deck="",
|
||||
role=item.get("role", ""),
|
||||
)
|
||||
resource_last_writer[item_id] = f"{node_id}:labware"
|
||||
|
||||
last_control_node_id = None
|
||||
|
||||
# 处理协议步骤
|
||||
for step in protocol_steps:
|
||||
node_id = str(uuid.uuid4())
|
||||
G.add_node(node_id, **step)
|
||||
|
||||
# 控制流
|
||||
if last_control_node_id is not None:
|
||||
G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready")
|
||||
last_control_node_id = node_id
|
||||
|
||||
# 物料流
|
||||
params = step.get("parameters", {})
|
||||
input_resources = {
|
||||
"Vessel": params.get("vessel"),
|
||||
"ToVessel": params.get("to_vessel"),
|
||||
"FromVessel": params.get("from_vessel"),
|
||||
"reagent": params.get("reagent"),
|
||||
"solvent": params.get("solvent"),
|
||||
"compound": params.get("compound"),
|
||||
"sources": params.get("sources"),
|
||||
"targets": params.get("targets"),
|
||||
}
|
||||
|
||||
for target_port, resource_name in input_resources.items():
|
||||
if resource_name and resource_name in resource_last_writer:
|
||||
source_node, source_port = resource_last_writer[resource_name].split(":")
|
||||
G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port)
|
||||
|
||||
output_resources = {
|
||||
"VesselOut": params.get("vessel"),
|
||||
"FromVesselOut": params.get("from_vessel"),
|
||||
"ToVesselOut": params.get("to_vessel"),
|
||||
"FiltrateOut": params.get("filtrate_vessel"),
|
||||
"reagent": params.get("reagent"),
|
||||
"solvent": params.get("solvent"),
|
||||
"compound": params.get("compound"),
|
||||
"sources_out": params.get("sources"),
|
||||
"targets_out": params.get("targets"),
|
||||
}
|
||||
|
||||
for source_port, resource_name in output_resources.items():
|
||||
if resource_name:
|
||||
resource_last_writer[resource_name] = f"{node_id}:{source_port}"
|
||||
|
||||
return G
|
||||
|
||||
|
||||
def draw_protocol_graph(protocol_graph: SimpleGraph, output_path: str):
|
||||
"""
|
||||
(辅助功能) 使用 networkx 和 matplotlib 绘制协议工作流图,用于可视化。
|
||||
"""
|
||||
if not protocol_graph:
|
||||
print("Cannot draw graph: Graph object is empty.")
|
||||
return
|
||||
|
||||
G = nx.DiGraph()
|
||||
|
||||
for node_id, attrs in protocol_graph.nodes.items():
|
||||
label = attrs.get("description", attrs.get("template", node_id[:8]))
|
||||
G.add_node(node_id, label=label, **attrs)
|
||||
|
||||
for edge in protocol_graph.edges:
|
||||
G.add_edge(edge["source"], edge["target"])
|
||||
|
||||
plt.figure(figsize=(20, 15))
|
||||
try:
|
||||
pos = nx.nx_agraph.graphviz_layout(G, prog="dot")
|
||||
except Exception:
|
||||
pos = nx.shell_layout(G) # Fallback layout
|
||||
|
||||
node_labels = {node: data["label"] for node, data in G.nodes(data=True)}
|
||||
nx.draw(
|
||||
G,
|
||||
pos,
|
||||
with_labels=False,
|
||||
node_size=2500,
|
||||
node_color="skyblue",
|
||||
node_shape="o",
|
||||
edge_color="gray",
|
||||
width=1.5,
|
||||
arrowsize=15,
|
||||
)
|
||||
nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=8, font_weight="bold")
|
||||
|
||||
plt.title("Chemical Protocol Workflow Graph", size=15)
|
||||
plt.savefig(output_path, dpi=300, bbox_inches="tight")
|
||||
plt.close()
|
||||
print(f" - Visualization saved to '{output_path}'")
|
||||
|
||||
|
||||
from networkx.drawing.nx_agraph import to_agraph
|
||||
import re
|
||||
|
||||
COMPASS = {"n","e","s","w","ne","nw","se","sw","c"}
|
||||
|
||||
def _is_compass(port: str) -> bool:
|
||||
return isinstance(port, str) and port.lower() in COMPASS
|
||||
|
||||
def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"):
|
||||
"""
|
||||
使用 Graphviz 端口语法绘制协议工作流图。
|
||||
- 若边上的 source_port/target_port 是 compass(n/e/s/w/...),直接用 compass。
|
||||
- 否则自动为节点创建 record 形状并定义命名端口 <portname>。
|
||||
最终由 PyGraphviz 渲染并输出到 output_path(后缀决定格式,如 .png/.svg/.pdf)。
|
||||
"""
|
||||
if not protocol_graph:
|
||||
print("Cannot draw graph: Graph object is empty.")
|
||||
return
|
||||
|
||||
# 1) 先用 networkx 搭建有向图,保留端口属性
|
||||
G = nx.DiGraph()
|
||||
for node_id, attrs in protocol_graph.nodes.items():
|
||||
label = attrs.get("description", attrs.get("template", node_id[:8]))
|
||||
# 保留一个干净的“中心标签”,用于放在 record 的中间槽
|
||||
G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)})
|
||||
|
||||
edges_data = []
|
||||
in_ports_by_node = {} # 收集命名输入端口
|
||||
out_ports_by_node = {} # 收集命名输出端口
|
||||
|
||||
for edge in protocol_graph.edges:
|
||||
u = edge["source"]
|
||||
v = edge["target"]
|
||||
sp = edge.get("source_port")
|
||||
tp = edge.get("target_port")
|
||||
|
||||
# 记录到图里(保留原始端口信息)
|
||||
G.add_edge(u, v, source_port=sp, target_port=tp)
|
||||
edges_data.append((u, v, sp, tp))
|
||||
|
||||
# 如果不是 compass,就按“命名端口”先归类,等会儿给节点造 record
|
||||
if sp and not _is_compass(sp):
|
||||
out_ports_by_node.setdefault(u, set()).add(str(sp))
|
||||
if tp and not _is_compass(tp):
|
||||
in_ports_by_node.setdefault(v, set()).add(str(tp))
|
||||
|
||||
# 2) 转为 AGraph,使用 Graphviz 渲染
|
||||
A = to_agraph(G)
|
||||
A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10")
|
||||
A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica")
|
||||
A.edge_attr.update(arrowsize="0.8", color="#666666")
|
||||
|
||||
# 3) 为需要命名端口的节点设置 record 形状与 label
|
||||
# 左列 = 输入端口;中间 = 核心标签;右列 = 输出端口
|
||||
for n in A.nodes():
|
||||
node = A.get_node(n)
|
||||
core = G.nodes[n].get("_core_label", n)
|
||||
|
||||
in_ports = sorted(in_ports_by_node.get(n, []))
|
||||
out_ports = sorted(out_ports_by_node.get(n, []))
|
||||
|
||||
# 如果该节点涉及命名端口,则用 record;否则保留原 box
|
||||
if in_ports or out_ports:
|
||||
def port_fields(ports):
|
||||
if not ports:
|
||||
return " " # 必须留一个空槽占位
|
||||
# 每个端口一个小格子,<p> name
|
||||
return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports)
|
||||
|
||||
left = port_fields(in_ports)
|
||||
right = port_fields(out_ports)
|
||||
|
||||
# 三栏:左(入) | 中(节点名) | 右(出)
|
||||
record_label = f"{{ {left} | {core} | {right} }}"
|
||||
node.attr.update(shape="record", label=record_label)
|
||||
else:
|
||||
# 没有命名端口:普通盒子,显示核心标签
|
||||
node.attr.update(label=str(core))
|
||||
|
||||
# 4) 给边设置 headport / tailport
|
||||
# - 若端口为 compass:直接用 compass(e.g., headport="e")
|
||||
# - 若端口为命名端口:使用在 record 中定义的 <port> 名(同名即可)
|
||||
for (u, v, sp, tp) in edges_data:
|
||||
e = A.get_edge(u, v)
|
||||
|
||||
# Graphviz 属性:tail 是源,head 是目标
|
||||
if sp:
|
||||
if _is_compass(sp):
|
||||
e.attr["tailport"] = sp.lower()
|
||||
else:
|
||||
# 与 record label 中 <port> 名一致;特殊字符已在 label 中做了清洗
|
||||
e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp))
|
||||
|
||||
if tp:
|
||||
if _is_compass(tp):
|
||||
e.attr["headport"] = tp.lower()
|
||||
else:
|
||||
e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp))
|
||||
|
||||
# 可选:若想让边更贴边缘,可设置 constraint/spline 等
|
||||
# e.attr["arrowhead"] = "vee"
|
||||
|
||||
# 5) 输出
|
||||
A.draw(output_path, prog="dot")
|
||||
print(f" - Port-aware workflow rendered to '{output_path}'")
|
||||
|
||||
|
||||
def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]:
|
||||
"""展平嵌套的XDL程序结构"""
|
||||
flattened_operations = []
|
||||
TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"]
|
||||
|
||||
def extract_operations(element: ET.Element):
|
||||
if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]:
|
||||
if element.tag not in TEMP_UNSUPPORTED_PROTOCOL:
|
||||
flattened_operations.append(element)
|
||||
|
||||
for child in element:
|
||||
extract_operations(child)
|
||||
|
||||
for child in procedure_elem:
|
||||
extract_operations(child)
|
||||
|
||||
return flattened_operations
|
||||
|
||||
|
||||
def parse_xdl_content(xdl_content: str) -> tuple:
|
||||
"""解析XDL内容"""
|
||||
try:
|
||||
xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable())
|
||||
root = ET.fromstring(xdl_content_cleaned)
|
||||
|
||||
synthesis_elem = root.find("Synthesis")
|
||||
if synthesis_elem is None:
|
||||
return None, None, None
|
||||
|
||||
# 解析硬件组件
|
||||
hardware_elem = synthesis_elem.find("Hardware")
|
||||
hardware = []
|
||||
if hardware_elem is not None:
|
||||
hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")]
|
||||
|
||||
# 解析试剂
|
||||
reagents_elem = synthesis_elem.find("Reagents")
|
||||
reagents = []
|
||||
if reagents_elem is not None:
|
||||
reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")]
|
||||
|
||||
# 解析程序
|
||||
procedure_elem = synthesis_elem.find("Procedure")
|
||||
if procedure_elem is None:
|
||||
return None, None, None
|
||||
|
||||
flattened_operations = flatten_xdl_procedure(procedure_elem)
|
||||
return hardware, reagents, flattened_operations
|
||||
|
||||
except ET.ParseError as e:
|
||||
raise ValueError(f"Invalid XDL format: {e}")
|
||||
|
||||
|
||||
def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]:
|
||||
"""
|
||||
将XDL XML格式转换为标准的字典格式
|
||||
|
||||
Args:
|
||||
xdl_content: XDL XML内容
|
||||
|
||||
Returns:
|
||||
转换结果,包含步骤和器材信息
|
||||
"""
|
||||
try:
|
||||
hardware, reagents, flattened_operations = parse_xdl_content(xdl_content)
|
||||
if hardware is None:
|
||||
return {"error": "Failed to parse XDL content", "success": False}
|
||||
|
||||
# 将XDL元素转换为字典格式
|
||||
steps_data = []
|
||||
for elem in flattened_operations:
|
||||
# 转换参数类型
|
||||
parameters = {}
|
||||
for key, val in elem.attrib.items():
|
||||
converted_val = convert_to_type(val)
|
||||
if converted_val is not None:
|
||||
parameters[key] = converted_val
|
||||
|
||||
step_dict = {
|
||||
"operation": elem.tag,
|
||||
"parameters": parameters,
|
||||
"description": elem.get("purpose", f"Operation: {elem.tag}"),
|
||||
}
|
||||
steps_data.append(step_dict)
|
||||
|
||||
# 合并硬件和试剂为统一的labware_info格式
|
||||
labware_data = []
|
||||
labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware)
|
||||
labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"steps": steps_data,
|
||||
"labware": labware_data,
|
||||
"message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"XDL conversion failed: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return {"error": error_msg, "success": False}
|
||||
|
||||
|
||||
def create_workflow(
|
||||
|
||||
2
setup.py
2
setup.py
@@ -4,7 +4,7 @@ package_name = 'unilabos'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.11.1',
|
||||
version='0.11.2',
|
||||
packages=find_packages(),
|
||||
include_package_data=True,
|
||||
install_requires=['setuptools'],
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "0.11.1"
|
||||
__version__ = "0.11.2"
|
||||
|
||||
@@ -10,29 +10,170 @@ import shutil
|
||||
import sys
|
||||
|
||||
|
||||
_PATCH_MARKER = "# UniLabOS DLL Patch"
|
||||
_PATCH_END_MARKER = "# End UniLabOS DLL Patch"
|
||||
|
||||
# 75 = EX_TEMPFAIL: 临时失败、重试即可,避免与业务退出码冲突
|
||||
_RESTART_EXIT_CODE = 75
|
||||
|
||||
|
||||
def _build_dll_patch(lib_bin: str, preload_pyd: str = "") -> str:
|
||||
"""生成一段加在目标文件顶部的 DLL 加载补丁源码。
|
||||
|
||||
- 始终把 ``lib_bin`` 加入 DLL 搜索路径,并把 handle 挂在模块属性上,
|
||||
防止 GC 清掉搜索路径(``os.add_dll_directory`` 的句柄被回收时
|
||||
目录会被移除)。
|
||||
- 可选地用 ``ctypes.CDLL`` 预加载一个 .pyd,把它的依赖 DLL 提前装入
|
||||
进程内存,作为 ``rclpy._rclpy_pybind11`` 这类首次加载点的兜底。
|
||||
"""
|
||||
# 用 repr() 序列化路径:Python 解析 repr 的结果会还原成原始字符串,
|
||||
# 不需要也不能再叠加 raw-string 前缀(叠了反而会让 \\ 变成两个反斜杠)。
|
||||
lines = [
|
||||
_PATCH_MARKER,
|
||||
"import os as _ulab_os",
|
||||
f"_ulab_p = {lib_bin!r}",
|
||||
'if hasattr(_ulab_os, "add_dll_directory") and _ulab_os.path.isdir(_ulab_p):',
|
||||
" try: _UNILAB_DLL_HANDLE = _ulab_os.add_dll_directory(_ulab_p)",
|
||||
" except Exception: _UNILAB_DLL_HANDLE = None",
|
||||
]
|
||||
if preload_pyd:
|
||||
lines.extend(
|
||||
[
|
||||
"import ctypes as _ulab_ctypes",
|
||||
f"try: _ulab_ctypes.CDLL({preload_pyd!r})",
|
||||
"except Exception: pass",
|
||||
]
|
||||
)
|
||||
lines.append(_PATCH_END_MARKER)
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def _apply_dll_patch(file_path: str, lib_bin: str, preload_pyd: str = "") -> bool:
|
||||
"""把 DLL 补丁前置到 ``file_path``。文件不存在或已打过补丁则返回 False。"""
|
||||
if not os.path.isfile(file_path):
|
||||
return False
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
if _PATCH_MARKER in content:
|
||||
return False
|
||||
shutil.copy2(file_path, file_path + ".bak")
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(_build_dll_patch(lib_bin, preload_pyd) + content)
|
||||
return True
|
||||
|
||||
|
||||
def _print_restart_banner(patched_files):
|
||||
"""打印重启提示并以 EX_TEMPFAIL 退出。
|
||||
|
||||
- 不使用 ANSI 颜色码:Windows 旧版 cmd / PowerShell 5 默认不开 VT 处理,
|
||||
会把 ``\\033[1;33m`` 当做字面字符显示,反而让用户看不到正文。
|
||||
- 同时写入 stderr 与 stdout:某些上层 launcher / supervisor 只重定向
|
||||
其中一路,写两遍能保证用户至少看到一份。
|
||||
- 写入前防御性把流切到 UTF-8 with replace:``main.py`` 里已经做过一次,
|
||||
但本模块也可能被绕过 ``main.py`` 的代码路径直接 import;reconfigure
|
||||
失败也只是退回 errors=replace,不影响整体流程。
|
||||
"""
|
||||
if sys.platform == "win32":
|
||||
for _stream in (sys.stdout, sys.stderr):
|
||||
try:
|
||||
_stream.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
|
||||
except (AttributeError, OSError):
|
||||
pass
|
||||
|
||||
bar = "#" * 78
|
||||
files_lines = [f"[UniLabOS] - {p}" for p in patched_files]
|
||||
body = "\n".join(
|
||||
[
|
||||
"",
|
||||
bar,
|
||||
bar,
|
||||
"##",
|
||||
"## [UniLabOS] Windows + conda 下检测到 DLL 加载失败,已自动打补丁。",
|
||||
"## [UniLabOS] DLL load failure detected on Windows + conda;",
|
||||
"## [UniLabOS] the following files have been auto-patched:",
|
||||
"##",
|
||||
*[f"## {line}" for line in files_lines],
|
||||
"##",
|
||||
"## [UniLabOS] 当前进程的 rclpy 状态已损坏,补丁需要在新进程才生效。",
|
||||
"## [UniLabOS] The current process is unusable; the patch only takes",
|
||||
"## [UniLabOS] effect on a fresh process.",
|
||||
"##",
|
||||
"## >>> 请重新运行刚才的命令 / Please re-run the same command. <<<",
|
||||
"##",
|
||||
bar,
|
||||
bar,
|
||||
"",
|
||||
]
|
||||
)
|
||||
|
||||
for stream in (sys.stderr, sys.stdout):
|
||||
try:
|
||||
stream.write(body)
|
||||
stream.flush()
|
||||
except Exception:
|
||||
try:
|
||||
print(body, file=stream)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
sys.exit(_RESTART_EXIT_CODE)
|
||||
|
||||
|
||||
def patch_rclpy_dll_windows():
|
||||
"""在 Windows + conda 环境下为 rclpy 打 DLL 加载补丁"""
|
||||
"""在 Windows + conda 环境下修复 rclpy / rosidl typesupport 的 DLL 加载。
|
||||
|
||||
背景:conda 安装的 ros 系列包,其原生扩展依赖 ``$CONDA_PREFIX/Library/bin``
|
||||
下的 DLL;只有 conda 环境被正确激活、且 PATH 中含 ``Library/bin`` 时,
|
||||
``os.add_dll_directory`` 才能找到它们。当从快捷方式 / IDE / 子进程 /
|
||||
没激活的 shell 启动 ``unilab`` 时,会出现 ``DLL load failed``。
|
||||
|
||||
本函数会:
|
||||
1) 修补 ``rclpy/impl/implementation_singleton.py`` —— rclpy 自身的 C 扩展入口;
|
||||
2) 修补 ``rpyutils/add_dll_directories.py`` —— 所有 ``*_s__rosidl_typesupport_c.pyd``
|
||||
(``geometry_msgs`` / ``std_msgs`` / ``sensor_msgs`` 等)的统一加载入口。
|
||||
|
||||
打完补丁后**必须重启进程**才能生效(当前进程的 rclpy 已经发生过
|
||||
``ImportError``,子模块仍处于损坏状态)。因此函数会主动退出,并在
|
||||
stdout/stderr 同时打印明显的重启提示,避免用户被后续报错淹没。
|
||||
"""
|
||||
if sys.platform != "win32" or not os.environ.get("CONDA_PREFIX"):
|
||||
return
|
||||
|
||||
try:
|
||||
import rclpy
|
||||
import rclpy # noqa: F401
|
||||
|
||||
return
|
||||
except ImportError as e:
|
||||
if not str(e).startswith("DLL load failed"):
|
||||
return
|
||||
|
||||
cp = os.environ["CONDA_PREFIX"]
|
||||
impl = os.path.join(cp, "Lib", "site-packages", "rclpy", "impl", "implementation_singleton.py")
|
||||
pyd = glob.glob(os.path.join(cp, "Lib", "site-packages", "rclpy", "_rclpy_pybind11*.pyd"))
|
||||
if not os.path.exists(impl) or not pyd:
|
||||
lib_bin = os.path.join(cp, "Library", "bin")
|
||||
site_packages = os.path.join(cp, "Lib", "site-packages")
|
||||
if not os.path.isdir(lib_bin):
|
||||
return
|
||||
with open(impl, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
lib_bin = os.path.join(cp, "Library", "bin").replace("\\", "/")
|
||||
patch = f'# UniLabOS DLL Patch\nimport os,ctypes\nos.add_dll_directory("{lib_bin}") if hasattr(os,"add_dll_directory") else None\ntry: ctypes.CDLL("{pyd[0].replace(chr(92),"/")}")\nexcept: pass\n# End Patch\n'
|
||||
shutil.copy2(impl, impl + ".bak")
|
||||
with open(impl, "w", encoding="utf-8") as f:
|
||||
f.write(patch + content)
|
||||
|
||||
patched = []
|
||||
|
||||
# 1) rclpy 自身的入口
|
||||
rclpy_impl = os.path.join(site_packages, "rclpy", "impl", "implementation_singleton.py")
|
||||
rclpy_pyd_matches = glob.glob(os.path.join(site_packages, "rclpy", "_rclpy_pybind11*.pyd"))
|
||||
rclpy_pyd = rclpy_pyd_matches[0] if rclpy_pyd_matches else ""
|
||||
if rclpy_pyd and _apply_dll_patch(rclpy_impl, lib_bin, preload_pyd=rclpy_pyd):
|
||||
patched.append(rclpy_impl)
|
||||
|
||||
# 2) rpyutils —— 所有 rosidl typesupport pyd 的加载点;放在 rclpy 之后
|
||||
# 例:geometry_msgs/geometry_msgs_s__rosidl_typesupport_c.pyd
|
||||
rpyutils_dll = os.path.join(site_packages, "rpyutils", "add_dll_directories.py")
|
||||
if _apply_dll_patch(rpyutils_dll, lib_bin):
|
||||
patched.append(rpyutils_dll)
|
||||
|
||||
if not patched:
|
||||
# 已经打过补丁但 rclpy 仍然加载失败:原因不是缺 DLL 搜索路径,
|
||||
# 不要再次打补丁污染文件,让上层看到真实的 ImportError。
|
||||
return
|
||||
|
||||
_print_restart_banner(patched)
|
||||
|
||||
|
||||
patch_rclpy_dll_windows()
|
||||
|
||||
@@ -42,7 +42,7 @@ def canonicalize_nodes_data(
|
||||
Returns:
|
||||
ResourceTreeSet: 标准化后的资源树集合
|
||||
"""
|
||||
print_status(f"{len(nodes)} Resources loaded", "info")
|
||||
print_status(f"{len(nodes)} Resources loaded:", "info")
|
||||
|
||||
# 第一步:基本预处理(处理graphml的label字段)
|
||||
outer_host_node_id = None
|
||||
|
||||
@@ -47,7 +47,10 @@ def _has_uv() -> bool:
|
||||
|
||||
def _install_command(installer: str, package: str, upgrade: bool, is_chinese: bool) -> List[str]:
|
||||
if installer == "uv":
|
||||
cmd = ["uv", "pip", "install"]
|
||||
# uv >= 0.5 默认要求虚拟环境,对 conda env 会报 "No virtual environment found"。
|
||||
# 显式 --python sys.executable 让 uv 把当前解释器(conda/venv/system 都行)
|
||||
# 视为目标环境,绕开 venv 检测。
|
||||
cmd = ["uv", "pip", "install", "--python", sys.executable]
|
||||
if upgrade:
|
||||
cmd.append("--upgrade")
|
||||
cmd.append(package)
|
||||
@@ -89,7 +92,11 @@ def _print_manual_git_install_hint(requirement: str) -> None:
|
||||
return
|
||||
|
||||
repo_dir = _repo_dir_name(git_url)
|
||||
install_cmd = "uv pip install -e ." if _has_uv() else f"{sys.executable} -m pip install -e ."
|
||||
install_cmd = (
|
||||
f'uv pip install --python "{sys.executable}" -e .'
|
||||
if _has_uv()
|
||||
else f"{sys.executable} -m pip install -e ."
|
||||
)
|
||||
if _is_chinese_locale() and not _has_uv():
|
||||
install_cmd += " -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"
|
||||
|
||||
|
||||
@@ -1,241 +0,0 @@
|
||||
import ast
|
||||
import json
|
||||
from typing import Dict, List, Any, Tuple, Optional
|
||||
|
||||
from .common import WorkflowGraph, RegistryAdapter
|
||||
|
||||
Json = Dict[str, Any]
|
||||
|
||||
# ---------------- Converter ----------------
|
||||
|
||||
class DeviceMethodConverter:
|
||||
"""
|
||||
- 字段统一:resource_name(原 device_class)、template_name(原 action_key)
|
||||
- params 单层;inputs 使用 'params.' 前缀
|
||||
- SimpleGraph.add_workflow_node 负责变量连线与边
|
||||
"""
|
||||
def __init__(self, device_registry: Optional[Dict[str, Any]] = None):
|
||||
self.graph = WorkflowGraph()
|
||||
self.variable_sources: Dict[str, Dict[str, Any]] = {} # var -> {node_id, output_name}
|
||||
self.instance_to_resource: Dict[str, Optional[str]] = {} # 实例名 -> resource_name
|
||||
self.node_id_counter: int = 0
|
||||
self.registry = RegistryAdapter(device_registry or {})
|
||||
|
||||
# ---- helpers ----
|
||||
def _new_node_id(self) -> int:
|
||||
nid = self.node_id_counter
|
||||
self.node_id_counter += 1
|
||||
return nid
|
||||
|
||||
def _assign_targets(self, targets) -> List[str]:
|
||||
names: List[str] = []
|
||||
import ast
|
||||
if isinstance(targets, ast.Tuple):
|
||||
for elt in targets.elts:
|
||||
if isinstance(elt, ast.Name):
|
||||
names.append(elt.id)
|
||||
elif isinstance(targets, ast.Name):
|
||||
names.append(targets.id)
|
||||
return names
|
||||
|
||||
def _extract_device_instantiation(self, node) -> Optional[Tuple[str, str]]:
|
||||
import ast
|
||||
if not isinstance(node.value, ast.Call):
|
||||
return None
|
||||
callee = node.value.func
|
||||
if isinstance(callee, ast.Name):
|
||||
class_name = callee.id
|
||||
elif isinstance(callee, ast.Attribute) and isinstance(callee.value, ast.Name):
|
||||
class_name = callee.attr
|
||||
else:
|
||||
return None
|
||||
if isinstance(node.targets[0], ast.Name):
|
||||
instance = node.targets[0].id
|
||||
return instance, class_name
|
||||
return None
|
||||
|
||||
def _extract_call(self, call) -> Tuple[str, str, Dict[str, Any], str]:
|
||||
import ast
|
||||
owner_name, method_name, call_kind = "", "", "func"
|
||||
if isinstance(call.func, ast.Attribute):
|
||||
method_name = call.func.attr
|
||||
if isinstance(call.func.value, ast.Name):
|
||||
owner_name = call.func.value.id
|
||||
call_kind = "instance" if owner_name in self.instance_to_resource else "class_or_module"
|
||||
elif isinstance(call.func.value, ast.Attribute) and isinstance(call.func.value.value, ast.Name):
|
||||
owner_name = call.func.value.attr
|
||||
call_kind = "class_or_module"
|
||||
elif isinstance(call.func, ast.Name):
|
||||
method_name = call.func.id
|
||||
call_kind = "func"
|
||||
|
||||
def pack(node):
|
||||
if isinstance(node, ast.Name):
|
||||
return {"type": "variable", "value": node.id}
|
||||
if isinstance(node, ast.Constant):
|
||||
return {"type": "constant", "value": node.value}
|
||||
if isinstance(node, ast.Dict):
|
||||
return {"type": "dict", "value": self._parse_dict(node)}
|
||||
if isinstance(node, ast.List):
|
||||
return {"type": "list", "value": self._parse_list(node)}
|
||||
return {"type": "raw", "value": ast.unparse(node) if hasattr(ast, "unparse") else str(node)}
|
||||
|
||||
args: Dict[str, Any] = {}
|
||||
pos: List[Any] = []
|
||||
for a in call.args:
|
||||
pos.append(pack(a))
|
||||
for kw in call.keywords:
|
||||
args[kw.arg] = pack(kw.value)
|
||||
if pos:
|
||||
args["_positional"] = pos
|
||||
return owner_name, method_name, args, call_kind
|
||||
|
||||
def _parse_dict(self, node) -> Dict[str, Any]:
|
||||
import ast
|
||||
out: Dict[str, Any] = {}
|
||||
for k, v in zip(node.keys, node.values):
|
||||
if isinstance(k, ast.Constant):
|
||||
key = str(k.value)
|
||||
if isinstance(v, ast.Name):
|
||||
out[key] = f"var:{v.id}"
|
||||
elif isinstance(v, ast.Constant):
|
||||
out[key] = v.value
|
||||
elif isinstance(v, ast.Dict):
|
||||
out[key] = self._parse_dict(v)
|
||||
elif isinstance(v, ast.List):
|
||||
out[key] = self._parse_list(v)
|
||||
return out
|
||||
|
||||
def _parse_list(self, node) -> List[Any]:
|
||||
import ast
|
||||
out: List[Any] = []
|
||||
for elt in node.elts:
|
||||
if isinstance(elt, ast.Name):
|
||||
out.append(f"var:{elt.id}")
|
||||
elif isinstance(elt, ast.Constant):
|
||||
out.append(elt.value)
|
||||
elif isinstance(elt, ast.Dict):
|
||||
out.append(self._parse_dict(elt))
|
||||
elif isinstance(elt, ast.List):
|
||||
out.append(self._parse_list(elt))
|
||||
return out
|
||||
|
||||
def _normalize_var_tokens(self, x: Any) -> Any:
|
||||
if isinstance(x, str) and x.startswith("var:"):
|
||||
return {"__var__": x[4:]}
|
||||
if isinstance(x, list):
|
||||
return [self._normalize_var_tokens(i) for i in x]
|
||||
if isinstance(x, dict):
|
||||
return {k: self._normalize_var_tokens(v) for k, v in x.items()}
|
||||
return x
|
||||
|
||||
def _make_params_payload(self, resource_name: Optional[str], template_name: str, call_args: Dict[str, Any]) -> Dict[str, Any]:
|
||||
input_keys = self.registry.get_action_input_keys(resource_name, template_name) if resource_name else []
|
||||
defaults = self.registry.get_action_goal_default(resource_name, template_name) if resource_name else {}
|
||||
params: Dict[str, Any] = dict(defaults)
|
||||
|
||||
def unpack(p):
|
||||
t, v = p.get("type"), p.get("value")
|
||||
if t == "variable":
|
||||
return {"__var__": v}
|
||||
if t == "dict":
|
||||
return self._normalize_var_tokens(v)
|
||||
if t == "list":
|
||||
return self._normalize_var_tokens(v)
|
||||
return v
|
||||
|
||||
for k, p in call_args.items():
|
||||
if k == "_positional":
|
||||
continue
|
||||
params[k] = unpack(p)
|
||||
|
||||
pos = call_args.get("_positional", [])
|
||||
if pos:
|
||||
if input_keys:
|
||||
for i, p in enumerate(pos):
|
||||
if i >= len(input_keys):
|
||||
break
|
||||
name = input_keys[i]
|
||||
if name in params:
|
||||
continue
|
||||
params[name] = unpack(p)
|
||||
else:
|
||||
for i, p in enumerate(pos):
|
||||
params[f"arg_{i}"] = unpack(p)
|
||||
return params
|
||||
|
||||
# ---- handlers ----
|
||||
def _on_assign(self, stmt):
|
||||
import ast
|
||||
inst = self._extract_device_instantiation(stmt)
|
||||
if inst:
|
||||
instance, code_class = inst
|
||||
resource_name = self.registry.resolve_resource_by_classname(code_class)
|
||||
self.instance_to_resource[instance] = resource_name
|
||||
return
|
||||
|
||||
if isinstance(stmt.value, ast.Call):
|
||||
owner, method, call_args, kind = self._extract_call(stmt.value)
|
||||
if kind == "instance":
|
||||
device_key = owner
|
||||
resource_name = self.instance_to_resource.get(owner)
|
||||
else:
|
||||
device_key = owner
|
||||
resource_name = self.registry.resolve_resource_by_classname(owner)
|
||||
|
||||
module = self.registry.get_device_module(resource_name)
|
||||
params = self._make_params_payload(resource_name, method, call_args)
|
||||
|
||||
nid = self._new_node_id()
|
||||
self.graph.add_workflow_node(
|
||||
nid,
|
||||
device_key=device_key,
|
||||
resource_name=resource_name, # ✅
|
||||
module=module,
|
||||
template_name=method, # ✅
|
||||
params=params,
|
||||
variable_sources=self.variable_sources,
|
||||
add_ready_if_no_vars=True,
|
||||
prev_node_id=(nid - 1) if nid > 0 else None,
|
||||
)
|
||||
|
||||
out_vars = self._assign_targets(stmt.targets[0])
|
||||
for var in out_vars:
|
||||
self.variable_sources[var] = {"node_id": nid, "output_name": "result"}
|
||||
|
||||
def _on_expr(self, stmt):
|
||||
import ast
|
||||
if not isinstance(stmt.value, ast.Call):
|
||||
return
|
||||
owner, method, call_args, kind = self._extract_call(stmt.value)
|
||||
if kind == "instance":
|
||||
device_key = owner
|
||||
resource_name = self.instance_to_resource.get(owner)
|
||||
else:
|
||||
device_key = owner
|
||||
resource_name = self.registry.resolve_resource_by_classname(owner)
|
||||
|
||||
module = self.registry.get_device_module(resource_name)
|
||||
params = self._make_params_payload(resource_name, method, call_args)
|
||||
|
||||
nid = self._new_node_id()
|
||||
self.graph.add_workflow_node(
|
||||
nid,
|
||||
device_key=device_key,
|
||||
resource_name=resource_name, # ✅
|
||||
module=module,
|
||||
template_name=method, # ✅
|
||||
params=params,
|
||||
variable_sources=self.variable_sources,
|
||||
add_ready_if_no_vars=True,
|
||||
prev_node_id=(nid - 1) if nid > 0 else None,
|
||||
)
|
||||
|
||||
def convert(self, python_code: str):
|
||||
tree = ast.parse(python_code)
|
||||
for stmt in tree.body:
|
||||
if isinstance(stmt, ast.Assign):
|
||||
self._on_assign(stmt)
|
||||
elif isinstance(stmt, ast.Expr):
|
||||
self._on_expr(stmt)
|
||||
return self
|
||||
@@ -1,131 +0,0 @@
|
||||
from typing import List, Any, Dict
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
|
||||
def convert_to_type(val: str) -> Any:
|
||||
"""将字符串值转换为适当的数据类型"""
|
||||
if val == "True":
|
||||
return True
|
||||
if val == "False":
|
||||
return False
|
||||
if val == "?":
|
||||
return None
|
||||
if val.endswith(" g"):
|
||||
return float(val.split(" ")[0])
|
||||
if val.endswith("mg"):
|
||||
return float(val.split("mg")[0])
|
||||
elif val.endswith("mmol"):
|
||||
return float(val.split("mmol")[0]) / 1000
|
||||
elif val.endswith("mol"):
|
||||
return float(val.split("mol")[0])
|
||||
elif val.endswith("ml"):
|
||||
return float(val.split("ml")[0])
|
||||
elif val.endswith("RPM"):
|
||||
return float(val.split("RPM")[0])
|
||||
elif val.endswith(" °C"):
|
||||
return float(val.split(" ")[0])
|
||||
elif val.endswith(" %"):
|
||||
return float(val.split(" ")[0])
|
||||
return val
|
||||
|
||||
|
||||
def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]:
|
||||
"""展平嵌套的XDL程序结构"""
|
||||
flattened_operations = []
|
||||
TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"]
|
||||
|
||||
def extract_operations(element: ET.Element):
|
||||
if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]:
|
||||
if element.tag not in TEMP_UNSUPPORTED_PROTOCOL:
|
||||
flattened_operations.append(element)
|
||||
|
||||
for child in element:
|
||||
extract_operations(child)
|
||||
|
||||
for child in procedure_elem:
|
||||
extract_operations(child)
|
||||
|
||||
return flattened_operations
|
||||
|
||||
|
||||
def parse_xdl_content(xdl_content: str) -> tuple:
|
||||
"""解析XDL内容"""
|
||||
try:
|
||||
xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable())
|
||||
root = ET.fromstring(xdl_content_cleaned)
|
||||
|
||||
synthesis_elem = root.find("Synthesis")
|
||||
if synthesis_elem is None:
|
||||
return None, None, None
|
||||
|
||||
# 解析硬件组件
|
||||
hardware_elem = synthesis_elem.find("Hardware")
|
||||
hardware = []
|
||||
if hardware_elem is not None:
|
||||
hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")]
|
||||
|
||||
# 解析试剂
|
||||
reagents_elem = synthesis_elem.find("Reagents")
|
||||
reagents = []
|
||||
if reagents_elem is not None:
|
||||
reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")]
|
||||
|
||||
# 解析程序
|
||||
procedure_elem = synthesis_elem.find("Procedure")
|
||||
if procedure_elem is None:
|
||||
return None, None, None
|
||||
|
||||
flattened_operations = flatten_xdl_procedure(procedure_elem)
|
||||
return hardware, reagents, flattened_operations
|
||||
|
||||
except ET.ParseError as e:
|
||||
raise ValueError(f"Invalid XDL format: {e}")
|
||||
|
||||
|
||||
def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]:
|
||||
"""
|
||||
将XDL XML格式转换为标准的字典格式
|
||||
|
||||
Args:
|
||||
xdl_content: XDL XML内容
|
||||
|
||||
Returns:
|
||||
转换结果,包含步骤和器材信息
|
||||
"""
|
||||
try:
|
||||
hardware, reagents, flattened_operations = parse_xdl_content(xdl_content)
|
||||
if hardware is None:
|
||||
return {"error": "Failed to parse XDL content", "success": False}
|
||||
|
||||
# 将XDL元素转换为字典格式
|
||||
steps_data = []
|
||||
for elem in flattened_operations:
|
||||
# 转换参数类型
|
||||
parameters = {}
|
||||
for key, val in elem.attrib.items():
|
||||
converted_val = convert_to_type(val)
|
||||
if converted_val is not None:
|
||||
parameters[key] = converted_val
|
||||
|
||||
step_dict = {
|
||||
"operation": elem.tag,
|
||||
"parameters": parameters,
|
||||
"description": elem.get("purpose", f"Operation: {elem.tag}"),
|
||||
}
|
||||
steps_data.append(step_dict)
|
||||
|
||||
# 合并硬件和试剂为统一的labware_info格式
|
||||
labware_data = []
|
||||
labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware)
|
||||
labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"steps": steps_data,
|
||||
"labware": labware_data,
|
||||
"message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"XDL conversion failed: {str(e)}"
|
||||
return {"error": error_msg, "success": False}
|
||||
@@ -2,7 +2,7 @@
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>unilabos_msgs</name>
|
||||
<version>0.11.1</version>
|
||||
<version>0.11.2</version>
|
||||
<description>ROS2 Messages package for unilabos devices</description>
|
||||
<maintainer email="changjh@pku.edu.cn">Junhan Chang</maintainer>
|
||||
<maintainer email="18435084+Xuwznln@users.noreply.github.com">Xuwznln</maintainer>
|
||||
|
||||
Reference in New Issue
Block a user