sync recent dev changes to main

Bring over recent dev-only updates for notebook-aware job state, workflow conversion modules, and conda build environment handling as a single squashed change.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Xuwznln
2026-05-23 22:28:31 +08:00
parent 8ba4138a09
commit 35de4a5fee
10 changed files with 454 additions and 507 deletions

View File

View File

@@ -0,0 +1,241 @@
import ast
import json
from typing import Dict, List, Any, Tuple, Optional
from .common import WorkflowGraph, RegistryAdapter
Json = Dict[str, Any]
# ---------------- Converter ----------------
class DeviceMethodConverter:
"""
- 字段统一resource_name原 device_class、template_name原 action_key
- params 单层inputs 使用 'params.' 前缀
- SimpleGraph.add_workflow_node 负责变量连线与边
"""
def __init__(self, device_registry: Optional[Dict[str, Any]] = None):
self.graph = WorkflowGraph()
self.variable_sources: Dict[str, Dict[str, Any]] = {} # var -> {node_id, output_name}
self.instance_to_resource: Dict[str, Optional[str]] = {} # 实例名 -> resource_name
self.node_id_counter: int = 0
self.registry = RegistryAdapter(device_registry or {})
# ---- helpers ----
def _new_node_id(self) -> int:
nid = self.node_id_counter
self.node_id_counter += 1
return nid
def _assign_targets(self, targets) -> List[str]:
names: List[str] = []
import ast
if isinstance(targets, ast.Tuple):
for elt in targets.elts:
if isinstance(elt, ast.Name):
names.append(elt.id)
elif isinstance(targets, ast.Name):
names.append(targets.id)
return names
def _extract_device_instantiation(self, node) -> Optional[Tuple[str, str]]:
import ast
if not isinstance(node.value, ast.Call):
return None
callee = node.value.func
if isinstance(callee, ast.Name):
class_name = callee.id
elif isinstance(callee, ast.Attribute) and isinstance(callee.value, ast.Name):
class_name = callee.attr
else:
return None
if isinstance(node.targets[0], ast.Name):
instance = node.targets[0].id
return instance, class_name
return None
def _extract_call(self, call) -> Tuple[str, str, Dict[str, Any], str]:
import ast
owner_name, method_name, call_kind = "", "", "func"
if isinstance(call.func, ast.Attribute):
method_name = call.func.attr
if isinstance(call.func.value, ast.Name):
owner_name = call.func.value.id
call_kind = "instance" if owner_name in self.instance_to_resource else "class_or_module"
elif isinstance(call.func.value, ast.Attribute) and isinstance(call.func.value.value, ast.Name):
owner_name = call.func.value.attr
call_kind = "class_or_module"
elif isinstance(call.func, ast.Name):
method_name = call.func.id
call_kind = "func"
def pack(node):
if isinstance(node, ast.Name):
return {"type": "variable", "value": node.id}
if isinstance(node, ast.Constant):
return {"type": "constant", "value": node.value}
if isinstance(node, ast.Dict):
return {"type": "dict", "value": self._parse_dict(node)}
if isinstance(node, ast.List):
return {"type": "list", "value": self._parse_list(node)}
return {"type": "raw", "value": ast.unparse(node) if hasattr(ast, "unparse") else str(node)}
args: Dict[str, Any] = {}
pos: List[Any] = []
for a in call.args:
pos.append(pack(a))
for kw in call.keywords:
args[kw.arg] = pack(kw.value)
if pos:
args["_positional"] = pos
return owner_name, method_name, args, call_kind
def _parse_dict(self, node) -> Dict[str, Any]:
import ast
out: Dict[str, Any] = {}
for k, v in zip(node.keys, node.values):
if isinstance(k, ast.Constant):
key = str(k.value)
if isinstance(v, ast.Name):
out[key] = f"var:{v.id}"
elif isinstance(v, ast.Constant):
out[key] = v.value
elif isinstance(v, ast.Dict):
out[key] = self._parse_dict(v)
elif isinstance(v, ast.List):
out[key] = self._parse_list(v)
return out
def _parse_list(self, node) -> List[Any]:
import ast
out: List[Any] = []
for elt in node.elts:
if isinstance(elt, ast.Name):
out.append(f"var:{elt.id}")
elif isinstance(elt, ast.Constant):
out.append(elt.value)
elif isinstance(elt, ast.Dict):
out.append(self._parse_dict(elt))
elif isinstance(elt, ast.List):
out.append(self._parse_list(elt))
return out
def _normalize_var_tokens(self, x: Any) -> Any:
if isinstance(x, str) and x.startswith("var:"):
return {"__var__": x[4:]}
if isinstance(x, list):
return [self._normalize_var_tokens(i) for i in x]
if isinstance(x, dict):
return {k: self._normalize_var_tokens(v) for k, v in x.items()}
return x
def _make_params_payload(self, resource_name: Optional[str], template_name: str, call_args: Dict[str, Any]) -> Dict[str, Any]:
input_keys = self.registry.get_action_input_keys(resource_name, template_name) if resource_name else []
defaults = self.registry.get_action_goal_default(resource_name, template_name) if resource_name else {}
params: Dict[str, Any] = dict(defaults)
def unpack(p):
t, v = p.get("type"), p.get("value")
if t == "variable":
return {"__var__": v}
if t == "dict":
return self._normalize_var_tokens(v)
if t == "list":
return self._normalize_var_tokens(v)
return v
for k, p in call_args.items():
if k == "_positional":
continue
params[k] = unpack(p)
pos = call_args.get("_positional", [])
if pos:
if input_keys:
for i, p in enumerate(pos):
if i >= len(input_keys):
break
name = input_keys[i]
if name in params:
continue
params[name] = unpack(p)
else:
for i, p in enumerate(pos):
params[f"arg_{i}"] = unpack(p)
return params
# ---- handlers ----
def _on_assign(self, stmt):
import ast
inst = self._extract_device_instantiation(stmt)
if inst:
instance, code_class = inst
resource_name = self.registry.resolve_resource_by_classname(code_class)
self.instance_to_resource[instance] = resource_name
return
if isinstance(stmt.value, ast.Call):
owner, method, call_args, kind = self._extract_call(stmt.value)
if kind == "instance":
device_key = owner
resource_name = self.instance_to_resource.get(owner)
else:
device_key = owner
resource_name = self.registry.resolve_resource_by_classname(owner)
module = self.registry.get_device_module(resource_name)
params = self._make_params_payload(resource_name, method, call_args)
nid = self._new_node_id()
self.graph.add_workflow_node(
nid,
device_key=device_key,
resource_name=resource_name, # ✅
module=module,
template_name=method, # ✅
params=params,
variable_sources=self.variable_sources,
add_ready_if_no_vars=True,
prev_node_id=(nid - 1) if nid > 0 else None,
)
out_vars = self._assign_targets(stmt.targets[0])
for var in out_vars:
self.variable_sources[var] = {"node_id": nid, "output_name": "result"}
def _on_expr(self, stmt):
import ast
if not isinstance(stmt.value, ast.Call):
return
owner, method, call_args, kind = self._extract_call(stmt.value)
if kind == "instance":
device_key = owner
resource_name = self.instance_to_resource.get(owner)
else:
device_key = owner
resource_name = self.registry.resolve_resource_by_classname(owner)
module = self.registry.get_device_module(resource_name)
params = self._make_params_payload(resource_name, method, call_args)
nid = self._new_node_id()
self.graph.add_workflow_node(
nid,
device_key=device_key,
resource_name=resource_name, # ✅
module=module,
template_name=method, # ✅
params=params,
variable_sources=self.variable_sources,
add_ready_if_no_vars=True,
prev_node_id=(nid - 1) if nid > 0 else None,
)
def convert(self, python_code: str):
tree = ast.parse(python_code)
for stmt in tree.body:
if isinstance(stmt, ast.Assign):
self._on_assign(stmt)
elif isinstance(stmt, ast.Expr):
self._on_expr(stmt)
return self

View File

@@ -0,0 +1,131 @@
from typing import List, Any, Dict
import xml.etree.ElementTree as ET
def convert_to_type(val: str) -> Any:
"""将字符串值转换为适当的数据类型"""
if val == "True":
return True
if val == "False":
return False
if val == "?":
return None
if val.endswith(" g"):
return float(val.split(" ")[0])
if val.endswith("mg"):
return float(val.split("mg")[0])
elif val.endswith("mmol"):
return float(val.split("mmol")[0]) / 1000
elif val.endswith("mol"):
return float(val.split("mol")[0])
elif val.endswith("ml"):
return float(val.split("ml")[0])
elif val.endswith("RPM"):
return float(val.split("RPM")[0])
elif val.endswith(" °C"):
return float(val.split(" ")[0])
elif val.endswith(" %"):
return float(val.split(" ")[0])
return val
def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]:
"""展平嵌套的XDL程序结构"""
flattened_operations = []
TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"]
def extract_operations(element: ET.Element):
if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]:
if element.tag not in TEMP_UNSUPPORTED_PROTOCOL:
flattened_operations.append(element)
for child in element:
extract_operations(child)
for child in procedure_elem:
extract_operations(child)
return flattened_operations
def parse_xdl_content(xdl_content: str) -> tuple:
"""解析XDL内容"""
try:
xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable())
root = ET.fromstring(xdl_content_cleaned)
synthesis_elem = root.find("Synthesis")
if synthesis_elem is None:
return None, None, None
# 解析硬件组件
hardware_elem = synthesis_elem.find("Hardware")
hardware = []
if hardware_elem is not None:
hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")]
# 解析试剂
reagents_elem = synthesis_elem.find("Reagents")
reagents = []
if reagents_elem is not None:
reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")]
# 解析程序
procedure_elem = synthesis_elem.find("Procedure")
if procedure_elem is None:
return None, None, None
flattened_operations = flatten_xdl_procedure(procedure_elem)
return hardware, reagents, flattened_operations
except ET.ParseError as e:
raise ValueError(f"Invalid XDL format: {e}")
def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]:
"""
将XDL XML格式转换为标准的字典格式
Args:
xdl_content: XDL XML内容
Returns:
转换结果,包含步骤和器材信息
"""
try:
hardware, reagents, flattened_operations = parse_xdl_content(xdl_content)
if hardware is None:
return {"error": "Failed to parse XDL content", "success": False}
# 将XDL元素转换为字典格式
steps_data = []
for elem in flattened_operations:
# 转换参数类型
parameters = {}
for key, val in elem.attrib.items():
converted_val = convert_to_type(val)
if converted_val is not None:
parameters[key] = converted_val
step_dict = {
"operation": elem.tag,
"parameters": parameters,
"description": elem.get("purpose", f"Operation: {elem.tag}"),
}
steps_data.append(step_dict)
# 合并硬件和试剂为统一的labware_info格式
labware_data = []
labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware)
labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents)
return {
"success": True,
"steps": steps_data,
"labware": labware_data,
"message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.",
}
except Exception as e:
error_msg = f"XDL conversion failed: {str(e)}"
return {"error": error_msg, "success": False}