From 5f36b6c04b4cb5101d0a8c1d95ce1ea008f6f02e Mon Sep 17 00:00:00 2001 From: Junhan Chang Date: Wed, 25 Mar 2026 13:11:34 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E5=99=A83=E4=B8=AA=E7=A1=AE=E8=AE=A4bug=20+=20=E5=8E=BB?= =?UTF-8?q?=E9=87=8D=E7=AE=80=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - separate_protocol: 修复vessel_id字符串解包crash和tuple truthy or逻辑错误 - heatchill_protocol: 修复vessel字段传入enriched dict而非vessel_id的问题 - hydrogenate_protocol: 修复5处vessel格式错误(裸字符串→{"id": vessel_id}) - 三个文件同时完成debug_print统一和工具函数去重 Co-Authored-By: Claude Opus 4.6 --- unilabos/compile/heatchill_protocol.py | 255 +++-------- unilabos/compile/hydrogenate_protocol.py | 302 ++++--------- unilabos/compile/separate_protocol.py | 544 ++++------------------- 3 files changed, 237 insertions(+), 864 deletions(-) diff --git a/unilabos/compile/heatchill_protocol.py b/unilabos/compile/heatchill_protocol.py index 9e1f6597..46219857 100644 --- a/unilabos/compile/heatchill_protocol.py +++ b/unilabos/compile/heatchill_protocol.py @@ -1,118 +1,24 @@ from typing import List, Dict, Any, Union import networkx as nx -import logging -import re -from .utils.vessel_parser import get_vessel -from .utils.unit_parser import parse_time_input +from .utils.vessel_parser import get_vessel, find_connected_heatchill +from .utils.unit_parser import parse_time_input, parse_temperature_input +from .utils.logger_util import debug_print -logger = logging.getLogger(__name__) - -def debug_print(message): - """调试输出""" - logger.info(f"[HEATCHILL] {message}") - - -def parse_temp_input(temp_input: Union[str, float], default_temp: float = 25.0) -> float: - """ - 解析温度输入(统一函数) - - Args: - temp_input: 温度输入 - default_temp: 默认温度 - - Returns: - float: 温度(°C) - """ - if not temp_input: - return default_temp - - # 🔢 数值输入 - if isinstance(temp_input, (int, float)): - result = float(temp_input) - debug_print(f"🌡️ 数值温度: {temp_input} → {result}°C") - return result - - # 📝 字符串输入 - temp_str = str(temp_input).lower().strip() - debug_print(f"🔍 解析温度: '{temp_str}'") - - # 🎯 特殊温度 - special_temps = { - "room temperature": 25.0, "reflux": 78.0, "ice bath": 0.0, - "boiling": 100.0, "hot": 60.0, "warm": 40.0, "cold": 10.0 - } - - if temp_str in special_temps: - result = special_temps[temp_str] - debug_print(f"🎯 特殊温度: '{temp_str}' → {result}°C") - return result - - # 📐 正则解析(如 "256 °C") - temp_pattern = r'(\d+(?:\.\d+)?)\s*°?[cf]?' - match = re.search(temp_pattern, temp_str) - - if match: - result = float(match.group(1)) - debug_print(f"✅ 温度解析: '{temp_str}' → {result}°C") - return result - - debug_print(f"⚠️ 无法解析温度: '{temp_str}',使用默认值: {default_temp}°C") - return default_temp - -def find_connected_heatchill(G: nx.DiGraph, vessel: str) -> str: - """查找与指定容器相连的加热/冷却设备""" - debug_print(f"🔍 查找加热设备,目标容器: {vessel}") - - # 🔧 查找所有加热设备 - heatchill_nodes = [] - for node in G.nodes(): - node_data = G.nodes[node] - node_class = node_data.get('class', '') or '' - - if 'heatchill' in node_class.lower() or 'virtual_heatchill' in node_class: - heatchill_nodes.append(node) - debug_print(f"🎉 找到加热设备: {node}") - - # 🔗 检查连接 - if vessel and heatchill_nodes: - for heatchill in heatchill_nodes: - if G.has_edge(heatchill, vessel) or G.has_edge(vessel, heatchill): - debug_print(f"✅ 加热设备 '{heatchill}' 与容器 '{vessel}' 相连") - return heatchill - - # 🎯 使用第一个可用设备 - if heatchill_nodes: - selected = heatchill_nodes[0] - debug_print(f"🔧 使用第一个加热设备: {selected}") - return selected - - # 🆘 默认设备 - debug_print("⚠️ 未找到加热设备,使用默认设备") - return "heatchill_1" def validate_and_fix_params(temp: float, time: float, stir_speed: float) -> tuple: """验证和修正参数""" - # 🌡️ 温度范围验证 if temp < -50.0 or temp > 300.0: debug_print(f"⚠️ 温度 {temp}°C 超出范围,修正为 25°C") temp = 25.0 - else: - debug_print(f"✅ 温度 {temp}°C 在正常范围内") - - # ⏰ 时间验证 + if time < 0: debug_print(f"⚠️ 时间 {time}s 无效,修正为 300s") time = 300.0 - else: - debug_print(f"✅ 时间 {time}s ({time/60:.1f}分钟) 有效") - - # 🌪️ 搅拌速度验证 + if stir_speed < 0 or stir_speed > 1500.0: debug_print(f"⚠️ 搅拌速度 {stir_speed} RPM 超出范围,修正为 300 RPM") stir_speed = 300.0 - else: - debug_print(f"✅ 搅拌速度 {stir_speed} RPM 在正常范围内") - + return temp, time, stir_speed def generate_heat_chill_protocol( @@ -131,7 +37,7 @@ def generate_heat_chill_protocol( ) -> List[Dict[str, Any]]: """ 生成加热/冷却操作的协议序列 - 支持vessel字典 - + Args: G: 设备图 vessel: 容器字典(从XDL传入) @@ -145,82 +51,58 @@ def generate_heat_chill_protocol( stir_speed: 搅拌速度 (RPM) purpose: 操作目的说明 **kwargs: 其他参数(兼容性) - + Returns: List[Dict[str, Any]]: 加热/冷却操作的动作序列 """ - + # 🔧 核心修改:从字典中提取容器ID vessel_id, vessel_data = get_vessel(vessel) - - debug_print("🌡️" * 20) - debug_print("🚀 开始生成加热冷却协议(支持vessel字典)✨") - debug_print(f"📝 输入参数:") - debug_print(f" 🥽 vessel: {vessel} (ID: {vessel_id})") - debug_print(f" 🌡️ temp: {temp}°C") - debug_print(f" ⏰ time: {time}") - debug_print(f" 🎯 temp_spec: {temp_spec}") - debug_print(f" ⏱️ time_spec: {time_spec}") - debug_print(f" 🌪️ stir: {stir} ({stir_speed} RPM)") - debug_print(f" 🎭 purpose: '{purpose}'") - debug_print("🌡️" * 20) - - # 📋 参数验证 - debug_print("📍 步骤1: 参数验证... 🔧") - if not vessel_id: # 🔧 使用 vessel_id - debug_print("❌ vessel 参数不能为空! 😱") + + debug_print(f"开始生成加热冷却协议: vessel={vessel_id}, temp={temp}°C, " + f"time={time}, stir={stir} ({stir_speed} RPM), purpose='{purpose}'") + + # 参数验证 + if not vessel_id: raise ValueError("vessel 参数不能为空") - - if vessel_id not in G.nodes(): # 🔧 使用 vessel_id - debug_print(f"❌ 容器 '{vessel_id}' 不存在于系统中! 😞") + + if vessel_id not in G.nodes(): raise ValueError(f"容器 '{vessel_id}' 不存在于系统中") - - debug_print("✅ 基础参数验证通过 🎯") - - # 🔄 参数解析 - debug_print("📍 步骤2: 参数解析... ⚡") - - #温度解析:优先使用 temp_spec - final_temp = parse_temp_input(temp_spec, temp) if temp_spec else temp - + + # 参数解析 + # 温度解析:优先使用 temp_spec + final_temp = parse_temperature_input(temp_spec, temp) if temp_spec else temp + # 时间解析:优先使用 time_spec final_time = parse_time_input(time_spec) if time_spec else parse_time_input(time) - + # 参数修正 final_temp, final_time, stir_speed = validate_and_fix_params(final_temp, final_time, stir_speed) - - debug_print(f"🎯 最终参数: temp={final_temp}°C, time={final_time}s, stir_speed={stir_speed} RPM") - - # 🔍 查找设备 - debug_print("📍 步骤3: 查找加热设备... 🔍") + + debug_print(f"最终参数: temp={final_temp}°C, time={final_time}s, stir_speed={stir_speed} RPM") + + # 查找设备 try: - heatchill_id = find_connected_heatchill(G, vessel_id) # 🔧 使用 vessel_id - debug_print(f"🎉 使用加热设备: {heatchill_id} ✨") + heatchill_id = find_connected_heatchill(G, vessel_id) + debug_print(f"使用加热设备: {heatchill_id}") except Exception as e: - debug_print(f"❌ 设备查找失败: {str(e)} 😭") raise ValueError(f"无法找到加热设备: {str(e)}") - - # 🚀 生成动作 - debug_print("📍 步骤4: 生成加热动作... 🔥") - - # 🕐 模拟运行时间优化 - debug_print(" ⏱️ 检查模拟运行时间限制...") + + # 生成动作 + # 模拟运行时间优化 original_time = final_time simulation_time_limit = 100.0 # 模拟运行时间限制:100秒 - + if final_time > simulation_time_limit: final_time = simulation_time_limit - debug_print(f" 🎮 模拟运行优化: {original_time}s → {final_time}s (限制为{simulation_time_limit}s) ⚡") - debug_print(f" 📊 时间缩短: {original_time/60:.1f}分钟 → {final_time/60:.1f}分钟 🚀") - else: - debug_print(f" ✅ 时间在限制内: {final_time}s ({final_time/60:.1f}分钟) 保持不变 🎯") - + debug_print(f"模拟运行优化: {original_time}s → {final_time}s (限制为{simulation_time_limit}s)") + action_sequence = [] heatchill_action = { "device_id": heatchill_id, "action_name": "heat_chill", "action_kwargs": { - "vessel": {"id": vessel}, + "vessel": {"id": vessel_id}, "temp": float(final_temp), "time": float(final_time), "stir": bool(stir), @@ -229,21 +111,10 @@ def generate_heat_chill_protocol( } } action_sequence.append(heatchill_action) - debug_print("✅ 加热动作已添加 🔥✨") - - # 显示时间调整信息 - if original_time != final_time: - debug_print(f" 🎭 模拟优化说明: 原计划 {original_time/60:.1f}分钟,实际模拟 {final_time/60:.1f}分钟 ⚡") - - # 🎊 总结 - debug_print("🎊" * 20) - debug_print(f"🎉 加热冷却协议生成完成! ✨") - debug_print(f"📊 总动作数: {len(action_sequence)} 个") - debug_print(f"🥽 加热容器: {vessel_id}") - debug_print(f"🌡️ 目标温度: {final_temp}°C") - debug_print(f"⏰ 加热时间: {final_time}s ({final_time/60:.1f}分钟)") - debug_print("🎊" * 20) - + + debug_print(f"加热冷却协议生成完成: {len(action_sequence)} 个动作, " + f"vessel={vessel_id}, temp={final_temp}°C, time={final_time}s") + return action_sequence def generate_heat_chill_to_temp_protocol( @@ -255,7 +126,7 @@ def generate_heat_chill_to_temp_protocol( ) -> List[Dict[str, Any]]: """生成加热到指定温度的协议(简化版)""" vessel_id, _ = get_vessel(vessel) - debug_print(f"🌡️ 生成加热到温度协议: {vessel_id} → {temp}°C") + debug_print(f"生成加热到温度协议: {vessel_id} → {temp}°C") return generate_heat_chill_protocol(G, vessel, temp, time, **kwargs) def generate_heat_chill_start_protocol( @@ -266,21 +137,19 @@ def generate_heat_chill_start_protocol( **kwargs ) -> List[Dict[str, Any]]: """生成开始加热操作的协议序列""" - + # 🔧 核心修改:从字典中提取容器ID vessel_id, _ = get_vessel(vessel) - - debug_print("🔥 开始生成启动加热协议 ✨") - debug_print(f"🥽 vessel: {vessel} (ID: {vessel_id}), 🌡️ temp: {temp}°C") - + + debug_print(f"生成启动加热协议: vessel={vessel_id}, temp={temp}°C") + # 基础验证 - if not vessel_id or vessel_id not in G.nodes(): # 🔧 使用 vessel_id - debug_print("❌ 容器验证失败!") + if not vessel_id or vessel_id not in G.nodes(): raise ValueError("vessel 参数无效") - + # 查找设备 - heatchill_id = find_connected_heatchill(G, vessel_id) # 🔧 使用 vessel_id - + heatchill_id = find_connected_heatchill(G, vessel_id) + # 生成动作 action_sequence = [{ "device_id": heatchill_id, @@ -291,8 +160,8 @@ def generate_heat_chill_start_protocol( "vessel": {"id": vessel_id}, } }] - - debug_print(f"✅ 启动加热协议生成完成 🎯") + + debug_print(f"启动加热协议生成完成") return action_sequence def generate_heat_chill_stop_protocol( @@ -301,21 +170,19 @@ def generate_heat_chill_stop_protocol( **kwargs ) -> List[Dict[str, Any]]: """生成停止加热操作的协议序列""" - + # 🔧 核心修改:从字典中提取容器ID vessel_id, _ = get_vessel(vessel) - - debug_print("🛑 开始生成停止加热协议 ✨") - debug_print(f"🥽 vessel: {vessel} (ID: {vessel_id})") - + + debug_print(f"生成停止加热协议: vessel={vessel_id}") + # 基础验证 - if not vessel_id or vessel_id not in G.nodes(): # 🔧 使用 vessel_id - debug_print("❌ 容器验证失败!") + if not vessel_id or vessel_id not in G.nodes(): raise ValueError("vessel 参数无效") - + # 查找设备 - heatchill_id = find_connected_heatchill(G, vessel_id) # 🔧 使用 vessel_id - + heatchill_id = find_connected_heatchill(G, vessel_id) + # 生成动作 action_sequence = [{ "device_id": heatchill_id, @@ -323,6 +190,6 @@ def generate_heat_chill_stop_protocol( "action_kwargs": { } }] - - debug_print(f"✅ 停止加热协议生成完成 🎯") + + debug_print(f"停止加热协议生成完成") return action_sequence diff --git a/unilabos/compile/hydrogenate_protocol.py b/unilabos/compile/hydrogenate_protocol.py index 9b65788d..b3371e1f 100644 --- a/unilabos/compile/hydrogenate_protocol.py +++ b/unilabos/compile/hydrogenate_protocol.py @@ -1,105 +1,50 @@ import networkx as nx from typing import List, Dict, Any, Optional from .utils.vessel_parser import get_vessel - - -def parse_temperature(temp_str: str) -> float: - """ - 解析温度字符串,支持多种格式 - - Args: - temp_str: 温度字符串(如 "45 °C", "45°C", "45") - - Returns: - float: 温度值(摄氏度) - """ - try: - # 移除常见的温度单位和符号 - temp_clean = temp_str.replace("°C", "").replace("°", "").replace("C", "").strip() - return float(temp_clean) - except ValueError: - print(f"HYDROGENATE: 无法解析温度 '{temp_str}',使用默认温度 25°C") - return 25.0 - - -def parse_time(time_str: str) -> float: - """ - 解析时间字符串,支持多种格式 - - Args: - time_str: 时间字符串(如 "2 h", "120 min", "7200 s") - - Returns: - float: 时间值(秒) - """ - try: - time_clean = time_str.lower().strip() - - # 处理小时 - if "h" in time_clean: - hours = float(time_clean.replace("h", "").strip()) - return hours * 3600.0 - - # 处理分钟 - if "min" in time_clean: - minutes = float(time_clean.replace("min", "").strip()) - return minutes * 60.0 - - # 处理秒 - if "s" in time_clean: - seconds = float(time_clean.replace("s", "").strip()) - return seconds - - # 默认按小时处理 - return float(time_clean) * 3600.0 - - except ValueError: - print(f"HYDROGENATE: 无法解析时间 '{time_str}',使用默认时间 2小时") - return 7200.0 # 2小时 +from .utils.logger_util import debug_print +from .utils.unit_parser import parse_temperature_input, parse_time_input def find_associated_solenoid_valve(G: nx.DiGraph, device_id: str) -> Optional[str]: """查找与指定设备相关联的电磁阀""" solenoid_valves = [ - node for node in G.nodes() + node for node in G.nodes() if ('solenoid' in (G.nodes[node].get('class') or '').lower() or 'solenoid_valve' in node) ] - + # 通过网络连接查找直接相连的电磁阀 for solenoid in solenoid_valves: if G.has_edge(device_id, solenoid) or G.has_edge(solenoid, device_id): return solenoid - + # 通过命名规则查找关联的电磁阀 device_type = "" if 'gas' in device_id.lower(): device_type = "gas" elif 'h2' in device_id.lower() or 'hydrogen' in device_id.lower(): device_type = "gas" - + if device_type: for solenoid in solenoid_valves: if device_type in solenoid.lower(): return solenoid - + return None def find_connected_device(G: nx.DiGraph, vessel: str, device_type: str) -> str: """ 查找与容器相连的指定类型设备 - + Args: G: 网络图 vessel: 容器名称 device_type: 设备类型 ('heater', 'stirrer', 'gas_source') - + Returns: str: 设备ID,如果没有则返回None """ - print(f"HYDROGENATE: 正在查找与容器 '{vessel}' 相连的 {device_type}...") - # 根据设备类型定义搜索关键词 if device_type == 'heater': keywords = ['heater', 'heat', 'heatchill'] @@ -112,40 +57,38 @@ def find_connected_device(G: nx.DiGraph, vessel: str, device_type: str) -> str: device_class = 'virtual_gas_source' else: return None - + # 查找设备节点 device_nodes = [] for node in G.nodes(): node_data = G.nodes[node] node_name = node.lower() node_class = node_data.get('class', '').lower() - - # 通过名称匹配 + if any(keyword in node_name for keyword in keywords): device_nodes.append(node) - # 通过类型匹配 elif device_class in node_class: device_nodes.append(node) - - print(f"HYDROGENATE: 找到的{device_type}节点: {device_nodes}") - + + debug_print(f"找到的{device_type}节点: {device_nodes}") + # 检查是否有设备与目标容器相连 for device in device_nodes: if G.has_edge(device, vessel) or G.has_edge(vessel, device): - print(f"HYDROGENATE: 找到与容器 '{vessel}' 相连的{device_type}: {device}") + debug_print(f"找到与容器 '{vessel}' 相连的{device_type}: {device}") return device - + # 如果没有直接连接,查找距离最近的设备 for device in device_nodes: try: path = nx.shortest_path(G, source=device, target=vessel) if len(path) <= 3: # 最多2个中间节点 - print(f"HYDROGENATE: 找到距离较近的{device_type}: {device}") + debug_print(f"找到距离较近的{device_type}: {device}") return device except nx.NetworkXNoPath: continue - - print(f"HYDROGENATE: 未找到与容器 '{vessel}' 相连的{device_type}") + + debug_print(f"未找到与容器 '{vessel}' 相连的{device_type}") return None @@ -158,36 +101,31 @@ def generate_hydrogenate_protocol( ) -> List[Dict[str, Any]]: """ 生成氢化反应协议序列 - 支持vessel字典 - + Args: G: 有向图,节点为容器和设备 vessel: 反应容器字典(从XDL传入) temp: 反应温度(如 "45 °C") time: 反应时间(如 "2 h") **kwargs: 其他可选参数,但不使用 - + Returns: List[Dict[str, Any]]: 动作序列 """ - + # 🔧 核心修改:从字典中提取容器ID vessel_id, vessel_data = get_vessel(vessel) - + action_sequence = [] - + # 解析参数 - temperature = parse_temperature(temp) - reaction_time = parse_time(time) - - print("🧪" * 20) - print(f"HYDROGENATE: 开始生成氢化反应协议(支持vessel字典)✨") - print(f"📝 输入参数:") - print(f" 🥽 vessel: {vessel} (ID: {vessel_id})") - print(f" 🌡️ 反应温度: {temperature}°C") - print(f" ⏰ 反应时间: {reaction_time/3600:.1f} 小时") - print("🧪" * 20) - - # 🔧 新增:记录氢化前的容器状态(可选,氢化反应通常不改变体积) + temperature = parse_temperature_input(temp) + reaction_time = parse_time_input(time) + + debug_print(f"开始生成氢化反应协议: vessel={vessel_id}, " + f"temp={temperature}°C, time={reaction_time/3600:.1f}h") + + # 记录氢化前的容器状态 original_liquid_volume = 0.0 if "data" in vessel and "liquid_volume" in vessel["data"]: current_volume = vessel["data"]["liquid_volume"] @@ -195,47 +133,36 @@ def generate_hydrogenate_protocol( original_liquid_volume = current_volume[0] elif isinstance(current_volume, (int, float)): original_liquid_volume = current_volume - print(f"📊 氢化前液体体积: {original_liquid_volume:.2f}mL") - + # 1. 验证目标容器存在 - print("📍 步骤1: 验证目标容器...") - if vessel_id not in G.nodes(): # 🔧 使用 vessel_id - print(f"⚠️ HYDROGENATE: 警告 - 容器 '{vessel_id}' 不存在于系统中,跳过氢化反应") + if vessel_id not in G.nodes(): + debug_print(f"⚠️ 容器 '{vessel_id}' 不存在于系统中,跳过氢化反应") return action_sequence - print(f"✅ 容器 '{vessel_id}' 验证通过") - + # 2. 查找相连的设备 - print("📍 步骤2: 查找相连设备...") - heater_id = find_connected_device(G, vessel_id, 'heater') # 🔧 使用 vessel_id - stirrer_id = find_connected_device(G, vessel_id, 'stirrer') # 🔧 使用 vessel_id - gas_source_id = find_connected_device(G, vessel_id, 'gas_source') # 🔧 使用 vessel_id - - print(f"🔧 设备配置:") - print(f" 🔥 加热器: {heater_id or '未找到'}") - print(f" 🌪️ 搅拌器: {stirrer_id or '未找到'}") - print(f" 💨 气源: {gas_source_id or '未找到'}") - + heater_id = find_connected_device(G, vessel_id, 'heater') + stirrer_id = find_connected_device(G, vessel_id, 'stirrer') + gas_source_id = find_connected_device(G, vessel_id, 'gas_source') + + debug_print(f"设备配置: heater={heater_id or '未找到'}, " + f"stirrer={stirrer_id or '未找到'}, gas={gas_source_id or '未找到'}") + # 3. 启动搅拌器 - print("📍 步骤3: 启动搅拌器...") if stirrer_id: - print(f"🌪️ 启动搅拌器 {stirrer_id}") action_sequence.append({ "device_id": stirrer_id, "action_name": "start_stir", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, "stir_speed": 300.0, "purpose": "氢化反应: 开始搅拌" } }) - print("✅ 搅拌器启动动作已添加") else: - print(f"⚠️ HYDROGENATE: 警告 - 未找到搅拌器,继续执行") - + debug_print(f"⚠️ 未找到搅拌器,继续执行") + # 4. 启动气源(氢气) - print("📍 步骤4: 启动氢气源...") if gas_source_id: - print(f"💨 启动气源 {gas_source_id} (氢气)") action_sequence.append({ "device_id": gas_source_id, "action_name": "set_status", @@ -243,11 +170,10 @@ def generate_hydrogenate_protocol( "string": "ON" } }) - + # 查找相关的电磁阀 gas_solenoid = find_associated_solenoid_valve(G, gas_source_id) if gas_solenoid: - print(f"🚪 开启气源电磁阀 {gas_solenoid}") action_sequence.append({ "device_id": gas_solenoid, "action_name": "set_valve_position", @@ -255,12 +181,10 @@ def generate_hydrogenate_protocol( "command": "OPEN" } }) - print("✅ 氢气源启动动作已添加") else: - print(f"⚠️ HYDROGENATE: 警告 - 未找到气源,继续执行") - + debug_print(f"⚠️ 未找到气源,继续执行") + # 5. 等待气体稳定 - print("📍 步骤5: 等待气体环境稳定...") action_sequence.append({ "action_name": "wait", "action_kwargs": { @@ -268,22 +192,19 @@ def generate_hydrogenate_protocol( "description": "等待氢气环境稳定" } }) - print("✅ 气体稳定等待动作已添加") - + # 6. 启动加热器 - print("📍 步骤6: 启动加热反应...") if heater_id: - print(f"🔥 启动加热器 {heater_id} 到 {temperature}°C") action_sequence.append({ "device_id": heater_id, "action_name": "heat_chill_start", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, "temp": temperature, "purpose": f"氢化反应: 加热到 {temperature}°C" } }) - + # 等待温度稳定 action_sequence.append({ "action_name": "wait", @@ -292,52 +213,38 @@ def generate_hydrogenate_protocol( "description": f"等待温度稳定到 {temperature}°C" } }) - - # 🕐 模拟运行时间优化 - print(" ⏰ 检查模拟运行时间限制...") + + # 模拟运行时间优化 original_reaction_time = reaction_time - simulation_time_limit = 60.0 # 模拟运行时间限制:60秒 - + simulation_time_limit = 60.0 + if reaction_time > simulation_time_limit: reaction_time = simulation_time_limit - print(f" 🎮 模拟运行优化: {original_reaction_time}s → {reaction_time}s (限制为{simulation_time_limit}s)") - print(f" 📊 时间缩短: {original_reaction_time/3600:.2f}小时 → {reaction_time/60:.1f}分钟") - else: - print(f" ✅ 时间在限制内: {reaction_time}s ({reaction_time/60:.1f}分钟) 保持不变") - + debug_print(f"模拟运行优化: {original_reaction_time}s → {reaction_time}s") + # 保持反应温度 action_sequence.append({ "device_id": heater_id, "action_name": "heat_chill", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, "temp": temperature, "time": reaction_time, "purpose": f"氢化反应: 保持 {temperature}°C,反应 {reaction_time/60:.1f}分钟" + (f" (模拟时间)" if original_reaction_time != reaction_time else "") } }) - - # 显示时间调整信息 - if original_reaction_time != reaction_time: - print(f" 🎭 模拟优化说明: 原计划 {original_reaction_time/3600:.2f}小时,实际模拟 {reaction_time/60:.1f}分钟") - - print("✅ 加热反应动作已添加") - + else: - print(f"⚠️ HYDROGENATE: 警告 - 未找到加热器,使用室温反应") - - # 🕐 室温反应也需要时间优化 - print(" ⏰ 检查室温反应模拟时间限制...") + debug_print(f"⚠️ 未找到加热器,使用室温反应") + + # 室温反应也需要时间优化 original_reaction_time = reaction_time - simulation_time_limit = 60.0 # 模拟运行时间限制:60秒 - + simulation_time_limit = 60.0 + if reaction_time > simulation_time_limit: reaction_time = simulation_time_limit - print(f" 🎮 室温反应时间优化: {original_reaction_time}s → {reaction_time}s") - print(f" 📊 时间缩短: {original_reaction_time/3600:.2f}小时 → {reaction_time/60:.1f}分钟") - else: - print(f" ✅ 室温反应时间在限制内: {reaction_time}s 保持不变") - + debug_print(f"室温反应时间优化: {original_reaction_time}s → {reaction_time}s") + # 室温反应,只等待时间 action_sequence.append({ "action_name": "wait", @@ -346,28 +253,19 @@ def generate_hydrogenate_protocol( "description": f"室温氢化反应 {reaction_time/60:.1f}分钟" + (f" (模拟时间)" if original_reaction_time != reaction_time else "") } }) - - # 显示时间调整信息 - if original_reaction_time != reaction_time: - print(f" 🎭 室温反应优化说明: 原计划 {original_reaction_time/3600:.2f}小时,实际模拟 {reaction_time/60:.1f}分钟") - - print("✅ 室温反应等待动作已添加") - + # 7. 停止加热 - print("📍 步骤7: 停止加热...") if heater_id: action_sequence.append({ "device_id": heater_id, "action_name": "heat_chill_stop", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, "purpose": "氢化反应完成,停止加热" } }) - print("✅ 停止加热动作已添加") - + # 8. 等待冷却 - print("📍 步骤8: 等待冷却...") action_sequence.append({ "action_name": "wait", "action_kwargs": { @@ -375,15 +273,12 @@ def generate_hydrogenate_protocol( "description": "等待反应混合物冷却" } }) - print("✅ 冷却等待动作已添加") - + # 9. 停止气源 - print("📍 步骤9: 停止氢气源...") if gas_source_id: # 先关闭电磁阀 gas_solenoid = find_associated_solenoid_valve(G, gas_source_id) if gas_solenoid: - print(f"🚪 关闭气源电磁阀 {gas_solenoid}") action_sequence.append({ "device_id": gas_solenoid, "action_name": "set_valve_position", @@ -391,7 +286,7 @@ def generate_hydrogenate_protocol( "command": "CLOSED" } }) - + # 再关闭气源 action_sequence.append({ "device_id": gas_source_id, @@ -400,59 +295,24 @@ def generate_hydrogenate_protocol( "string": "OFF" } }) - print("✅ 氢气源停止动作已添加") - + # 10. 停止搅拌 - print("📍 步骤10: 停止搅拌...") if stirrer_id: action_sequence.append({ "device_id": stirrer_id, "action_name": "stop_stir", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, "purpose": "氢化反应完成,停止搅拌" } }) - print("✅ 停止搅拌动作已添加") - - # 🔧 新增:氢化完成后的状态(氢化反应通常不改变体积) - final_liquid_volume = original_liquid_volume # 氢化反应体积基本不变 - + + # 氢化完成后的状态(氢化反应通常不改变体积) + final_liquid_volume = original_liquid_volume + # 总结 - print("🎊" * 20) - print(f"🎉 氢化反应协议生成完成! ✨") - print(f"📊 总动作数: {len(action_sequence)} 个") - print(f"🥽 反应容器: {vessel_id}") - print(f"🌡️ 反应温度: {temperature}°C") - print(f"⏰ 反应时间: {reaction_time/60:.1f}分钟") - print(f"⏱️ 预计总时间: {(reaction_time + 450)/3600:.1f} 小时") - print(f"📊 体积状态:") - print(f" - 反应前体积: {original_liquid_volume:.2f}mL") - print(f" - 反应后体积: {final_liquid_volume:.2f}mL (氢化反应体积基本不变)") - print("🎊" * 20) - + debug_print(f"氢化反应协议生成完成: {len(action_sequence)} 个动作, " + f"vessel={vessel_id}, temp={temperature}°C, time={reaction_time/60:.1f}min, " + f"volume={original_liquid_volume:.2f}→{final_liquid_volume:.2f}mL") + return action_sequence - - -# 测试函数 -def test_hydrogenate_protocol(): - """测试氢化反应协议""" - print("🧪 === HYDROGENATE PROTOCOL 测试 === ✨") - - # 测试温度解析 - test_temps = ["45 °C", "45°C", "45", "25 C", "invalid"] - for temp in test_temps: - parsed = parse_temperature(temp) - print(f"温度 '{temp}' -> {parsed}°C") - - # 测试时间解析 - test_times = ["2 h", "120 min", "7200 s", "2", "invalid"] - for time in test_times: - parsed = parse_time(time) - print(f"时间 '{time}' -> {parsed/3600:.1f} 小时") - - print("✅ 测试完成 🎉") - - -if __name__ == "__main__": - test_hydrogenate_protocol() \ No newline at end of file diff --git a/unilabos/compile/separate_protocol.py b/unilabos/compile/separate_protocol.py index 307d9787..0e812dc8 100644 --- a/unilabos/compile/separate_protocol.py +++ b/unilabos/compile/separate_protocol.py @@ -1,41 +1,11 @@ -from functools import partial - import networkx as nx -import re -import logging -import sys from typing import List, Dict, Any, Union -from .utils.vessel_parser import get_vessel -from .utils.logger_util import action_log +from .utils.vessel_parser import get_vessel, find_solvent_vessel, find_connected_stirrer +from .utils.resource_helper import get_resource_liquid_volume, update_vessel_volume +from .utils.logger_util import debug_print, action_log +from .utils.unit_parser import parse_volume_input from .pump_protocol import generate_pump_protocol_with_rinsing -logger = logging.getLogger(__name__) - -# 确保输出编码为UTF-8 -if hasattr(sys.stdout, 'reconfigure'): - try: - sys.stdout.reconfigure(encoding='utf-8') - sys.stderr.reconfigure(encoding='utf-8') - except: - pass - -def debug_print(message): - """调试输出函数 - 支持中文""" - try: - # 确保消息是字符串格式 - safe_message = str(message) - logger.info(f"[SEPARATE] {safe_message}") - except UnicodeEncodeError: - # 如果编码失败,尝试替换不支持的字符 - safe_message = str(message).encode('utf-8', errors='replace').decode('utf-8') - logger.info(f"[SEPARATE] {safe_message}") - except Exception as e: - # 最后的安全措施 - fallback_message = f"日志输出错误: {repr(message)}" - logger.info(f"[SEPARATE] {fallback_message}") - -create_action_log = partial(action_log, prefix="[SEPARATE]") - def generate_separate_protocol( G: nx.DiGraph, @@ -93,45 +63,33 @@ def generate_separate_protocol( # 🔧 核心修改:从字典中提取容器ID vessel_id, vessel_data = get_vessel(vessel) - debug_print("🌀" * 20) - debug_print("🚀 开始生成分离协议(支持vessel字典和体积运算)✨") - debug_print(f"📝 输入参数:") - debug_print(f" 🥽 vessel: {vessel} (ID: {vessel_id})") - debug_print(f" 🎯 分离目的: '{purpose}'") - debug_print(f" 📊 产物相: '{product_phase}'") - debug_print(f" 💧 溶剂: '{solvent}'") - debug_print(f" 📏 体积: {volume} (类型: {type(volume)})") - debug_print(f" 🔄 重复次数: {repeats}") - debug_print(f" 🎯 产物容器: '{product_vessel}'") - debug_print(f" 🗑️ 废液容器: '{waste_vessel}'") - debug_print(f" 📦 其他参数: {kwargs}") - debug_print("🌀" * 20) + debug_print(f"开始生成分离协议: vessel={vessel_id}, purpose={purpose}, " + f"product_phase={product_phase}, solvent={solvent}, " + f"volume={volume}, repeats={repeats}") action_sequence = [] - # 🔧 新增:记录分离前的容器状态 - debug_print("🔍 记录分离前容器状态...") - original_liquid_volume = get_vessel_liquid_volume(vessel) - debug_print(f"📊 分离前液体体积: {original_liquid_volume:.2f}mL") + # 记录分离前的容器状态 + original_liquid_volume = get_resource_liquid_volume(vessel) + debug_print(f"分离前液体体积: {original_liquid_volume:.2f}mL") # === 参数验证和标准化 === - debug_print("🔍 步骤1: 参数验证和标准化...") - action_sequence.append(create_action_log(f"开始分离操作 - 容器: {vessel_id}", "🎬")) - action_sequence.append(create_action_log(f"分离目的: {purpose}", "🧪")) - action_sequence.append(create_action_log(f"产物相: {product_phase}", "📊")) + action_sequence.append(action_log(f"开始分离操作 - 容器: {vessel_id}", "🎬", prefix="[SEPARATE]")) + action_sequence.append(action_log(f"分离目的: {purpose}", "🧪", prefix="[SEPARATE]")) + action_sequence.append(action_log(f"产物相: {product_phase}", "📊", prefix="[SEPARATE]")) # 统一容器参数 - 支持字典和字符串 - def extract_vessel_id(vessel_param): - if isinstance(vessel_param, dict): - return vessel_param.get("id", "") - elif isinstance(vessel_param, str): - return vessel_param - else: - return "" + final_vessel_id = vessel_id - final_vessel_id, _ = vessel_id - final_to_vessel_id, _ = get_vessel(to_vessel) or get_vessel(product_vessel) - final_waste_vessel_id, _ = get_vessel(waste_phase_to_vessel) or get_vessel(waste_vessel) + to_vessel_result = get_vessel(to_vessel) if to_vessel else None + if to_vessel_result is None or to_vessel_result[0] == "": + to_vessel_result = get_vessel(product_vessel) if product_vessel else None + final_to_vessel_id = to_vessel_result[0] if to_vessel_result else "" + + waste_vessel_result = get_vessel(waste_phase_to_vessel) if waste_phase_to_vessel else None + if waste_vessel_result is None or waste_vessel_result[0] == "": + waste_vessel_result = get_vessel(waste_vessel) if waste_vessel else None + final_waste_vessel_id = waste_vessel_result[0] if waste_vessel_result else "" # 统一体积参数 final_volume = parse_volume_input(volume or solvent_volume) @@ -141,16 +99,12 @@ def generate_separate_protocol( repeats = 1 debug_print(f"⚠️ 重复次数参数 <= 0,自动设置为 1") - debug_print(f"🔧 标准化后的参数:") - debug_print(f" 🥼 分离容器: '{final_vessel_id}'") - debug_print(f" 🎯 产物容器: '{final_to_vessel_id}'") - debug_print(f" 🗑️ 废液容器: '{final_waste_vessel_id}'") - debug_print(f" 📏 溶剂体积: {final_volume}mL") - debug_print(f" 🔄 重复次数: {repeats}") + debug_print(f"标准化参数: vessel={final_vessel_id}, to={final_to_vessel_id}, " + f"waste={final_waste_vessel_id}, volume={final_volume}mL, repeats={repeats}") - action_sequence.append(create_action_log(f"分离容器: {final_vessel_id}", "🧪")) - action_sequence.append(create_action_log(f"溶剂体积: {final_volume}mL", "📏")) - action_sequence.append(create_action_log(f"重复次数: {repeats}", "🔄")) + action_sequence.append(action_log(f"分离容器: {final_vessel_id}", "🧪", prefix="[SEPARATE]")) + action_sequence.append(action_log(f"溶剂体积: {final_volume}mL", "📏", prefix="[SEPARATE]")) + action_sequence.append(action_log(f"重复次数: {repeats}", "🔄", prefix="[SEPARATE]")) # 验证必需参数 if not purpose: @@ -160,72 +114,68 @@ def generate_separate_protocol( if purpose not in ["wash", "extract", "separate"]: debug_print(f"⚠️ 未知的分离目的 '{purpose}',使用默认值 'separate'") purpose = "separate" - action_sequence.append(create_action_log(f"未知目的,使用: {purpose}", "⚠️")) + action_sequence.append(action_log(f"未知目的,使用: {purpose}", "⚠️", prefix="[SEPARATE]")) if product_phase not in ["top", "bottom"]: debug_print(f"⚠️ 未知的产物相 '{product_phase}',使用默认值 'top'") product_phase = "top" - action_sequence.append(create_action_log(f"未知相别,使用: {product_phase}", "⚠️")) + action_sequence.append(action_log(f"未知相别,使用: {product_phase}", "⚠️", prefix="[SEPARATE]")) - debug_print("✅ 参数验证通过") - action_sequence.append(create_action_log("参数验证通过", "✅")) + action_sequence.append(action_log("参数验证通过", "✅", prefix="[SEPARATE]")) # === 查找设备 === - debug_print("🔍 步骤2: 查找设备...") - action_sequence.append(create_action_log("正在查找相关设备...", "🔍")) + action_sequence.append(action_log("正在查找相关设备...", "🔍", prefix="[SEPARATE]")) # 查找分离器设备 - separator_device = find_separator_device(G, final_vessel_id) # 🔧 使用 final_vessel_id + separator_device = find_separator_device(G, final_vessel_id) if separator_device: - action_sequence.append(create_action_log(f"找到分离器设备: {separator_device}", "🧪")) + action_sequence.append(action_log(f"找到分离器设备: {separator_device}", "🧪", prefix="[SEPARATE]")) else: debug_print("⚠️ 未找到分离器设备,可能无法执行分离") - action_sequence.append(create_action_log("未找到分离器设备", "⚠️")) + action_sequence.append(action_log("未找到分离器设备", "⚠️", prefix="[SEPARATE]")) # 查找搅拌器 - stirrer_device = find_connected_stirrer(G, final_vessel_id) # 🔧 使用 final_vessel_id + stirrer_device = find_connected_stirrer(G, final_vessel_id) if stirrer_device: - action_sequence.append(create_action_log(f"找到搅拌器: {stirrer_device}", "🌪️")) + action_sequence.append(action_log(f"找到搅拌器: {stirrer_device}", "🌪️", prefix="[SEPARATE]")) else: - action_sequence.append(create_action_log("未找到搅拌器", "⚠️")) + action_sequence.append(action_log("未找到搅拌器", "⚠️", prefix="[SEPARATE]")) # 查找溶剂容器(如果需要) solvent_vessel = "" if solvent and solvent.strip(): - solvent_vessel = find_solvent_vessel(G, solvent) + try: + solvent_vessel = find_solvent_vessel(G, solvent) + except ValueError: + solvent_vessel = "" if solvent_vessel: - action_sequence.append(create_action_log(f"找到溶剂容器: {solvent_vessel}", "💧")) + action_sequence.append(action_log(f"找到溶剂容器: {solvent_vessel}", "💧", prefix="[SEPARATE]")) else: - action_sequence.append(create_action_log(f"未找到溶剂容器: {solvent}", "⚠️")) + action_sequence.append(action_log(f"未找到溶剂容器: {solvent}", "⚠️", prefix="[SEPARATE]")) - debug_print(f"📊 设备配置:") - debug_print(f" 🧪 分离器设备: '{separator_device}'") - debug_print(f" 🌪️ 搅拌器设备: '{stirrer_device}'") - debug_print(f" 💧 溶剂容器: '{solvent_vessel}'") + debug_print(f"设备配置: separator={separator_device}, stirrer={stirrer_device}, solvent_vessel={solvent_vessel}") # === 执行分离流程 === - debug_print("🔍 步骤3: 执行分离流程...") - action_sequence.append(create_action_log("开始分离工作流程", "🎯")) + action_sequence.append(action_log("开始分离工作流程", "🎯", prefix="[SEPARATE]")) - # 🔧 新增:体积变化跟踪变量 + # 体积变化跟踪变量 current_volume = original_liquid_volume try: for repeat_idx in range(repeats): cycle_num = repeat_idx + 1 - debug_print(f"🔄 第{cycle_num}轮: 开始分离循环 {cycle_num}/{repeats}") - action_sequence.append(create_action_log(f"分离循环 {cycle_num}/{repeats} 开始", "🔄")) + debug_print(f"分离循环 {cycle_num}/{repeats} 开始") + action_sequence.append(action_log(f"分离循环 {cycle_num}/{repeats} 开始", "🔄", prefix="[SEPARATE]")) # 步骤3.1: 添加溶剂(如果需要) if solvent_vessel and final_volume > 0: - debug_print(f"🔄 第{cycle_num}轮 步骤1: 添加溶剂 {solvent} ({final_volume}mL)") - action_sequence.append(create_action_log(f"向分离容器添加 {final_volume}mL {solvent}", "💧")) + action_sequence.append(action_log(f"向分离容器添加 {final_volume}mL {solvent}", "💧", prefix="[SEPARATE]")) try: # 使用pump protocol添加溶剂 pump_actions = generate_pump_protocol_with_rinsing( G=G, from_vessel=solvent_vessel, - to_vessel=final_vessel_id, # 🔧 使用 final_vessel_id + to_vessel=final_vessel_id, volume=final_volume, amount="", time=0.0, @@ -242,30 +192,27 @@ def generate_separate_protocol( **kwargs ) action_sequence.extend(pump_actions) - debug_print(f"✅ 溶剂添加完成,添加了 {len(pump_actions)} 个动作") - action_sequence.append(create_action_log(f"溶剂转移完成 ({len(pump_actions)} 个操作)", "✅")) + action_sequence.append(action_log(f"溶剂转移完成 ({len(pump_actions)} 个操作)", "✅", prefix="[SEPARATE]")) - # 🔧 新增:更新体积 - 添加溶剂后 + # 更新体积 - 添加溶剂后 current_volume += final_volume update_vessel_volume(vessel, G, current_volume, f"添加{final_volume}mL {solvent}后") except Exception as e: debug_print(f"❌ 溶剂添加失败: {str(e)}") - action_sequence.append(create_action_log(f"溶剂添加失败: {str(e)}", "❌")) + action_sequence.append(action_log(f"溶剂添加失败: {str(e)}", "❌", prefix="[SEPARATE]")) else: - debug_print(f"🔄 第{cycle_num}轮 步骤1: 无需添加溶剂") - action_sequence.append(create_action_log("无需添加溶剂", "⏭️")) + action_sequence.append(action_log("无需添加溶剂", "⏭️", prefix="[SEPARATE]")) # 步骤3.2: 启动搅拌(如果有搅拌器) if stirrer_device and stir_time > 0: - debug_print(f"🔄 第{cycle_num}轮 步骤2: 开始搅拌 ({stir_speed}rpm,持续 {stir_time}s)") - action_sequence.append(create_action_log(f"开始搅拌: {stir_speed}rpm,持续 {stir_time}s", "🌪️")) + action_sequence.append(action_log(f"开始搅拌: {stir_speed}rpm,持续 {stir_time}s", "🌪️", prefix="[SEPARATE]")) action_sequence.append({ "device_id": stirrer_device, "action_name": "start_stir", "action_kwargs": { - "vessel": {"id": final_vessel_id}, # 🔧 使用 final_vessel_id + "vessel": {"id": final_vessel_id}, "stir_speed": stir_speed, "purpose": f"分离混合 - {purpose}" } @@ -273,43 +220,37 @@ def generate_separate_protocol( # 搅拌等待 stir_minutes = stir_time / 60 - action_sequence.append(create_action_log(f"搅拌中,持续 {stir_minutes:.1f} 分钟", "⏱️")) + action_sequence.append(action_log(f"搅拌中,持续 {stir_minutes:.1f} 分钟", "⏱️", prefix="[SEPARATE]")) action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": stir_time} }) # 停止搅拌 - action_sequence.append(create_action_log("停止搅拌器", "🛑")) + action_sequence.append(action_log("停止搅拌器", "🛑", prefix="[SEPARATE]")) action_sequence.append({ "device_id": stirrer_device, "action_name": "stop_stir", - "action_kwargs": {"vessel": final_vessel_id} # 🔧 使用 final_vessel_id + "action_kwargs": {"vessel": final_vessel_id} }) else: - debug_print(f"🔄 第{cycle_num}轮 步骤2: 无需搅拌") - action_sequence.append(create_action_log("无需搅拌", "⏭️")) + action_sequence.append(action_log("无需搅拌", "⏭️", prefix="[SEPARATE]")) # 步骤3.3: 静置分层 if settling_time > 0: - debug_print(f"🔄 第{cycle_num}轮 步骤3: 静置分层 ({settling_time}s)") settling_minutes = settling_time / 60 - action_sequence.append(create_action_log(f"静置分层 ({settling_minutes:.1f} 分钟)", "⚖️")) + action_sequence.append(action_log(f"静置分层 ({settling_minutes:.1f} 分钟)", "⚖️", prefix="[SEPARATE]")) action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": settling_time} }) else: - debug_print(f"🔄 第{cycle_num}轮 步骤3: 未指定静置时间") - action_sequence.append(create_action_log("未指定静置时间", "⏭️")) + action_sequence.append(action_log("未指定静置时间", "⏭️", prefix="[SEPARATE]")) # 步骤3.4: 执行分离操作 if separator_device: - debug_print(f"🔄 第{cycle_num}轮 步骤4: 执行分离操作") - action_sequence.append(create_action_log(f"执行分离: 收集{product_phase}相", "🧪")) - - # 🔧 替换为具体的分离操作逻辑(基于old版本) + action_sequence.append(action_log(f"执行分离: 收集{product_phase}相", "🧪", prefix="[SEPARATE]")) # 首先进行分液判断(电导突跃) action_sequence.append({ @@ -324,11 +265,10 @@ def generate_separate_protocol( phase_volume = current_volume / 2 # 智能查找分离容器底部 - separation_vessel_bottom = find_separation_vessel_bottom(G, final_vessel_id) # ✅ + separation_vessel_bottom = find_separation_vessel_bottom(G, final_vessel_id) if product_phase == "bottom": - debug_print(f"🔄 收集底相产物到 {final_to_vessel_id}") - action_sequence.append(create_action_log("收集底相产物", "📦")) + action_sequence.append(action_log("收集底相产物", "📦", prefix="[SEPARATE]")) # 产物转移到目标瓶 if final_to_vessel_id: @@ -364,8 +304,7 @@ def generate_separate_protocol( action_sequence.extend(pump_actions) elif product_phase == "top": - debug_print(f"🔄 收集上相产物到 {final_to_vessel_id}") - action_sequence.append(create_action_log("收集上相产物", "📦")) + action_sequence.append(action_log("收集上相产物", "📦", prefix="[SEPARATE]")) # 弃去下面那一相进废液 if final_waste_vessel_id: @@ -400,10 +339,9 @@ def generate_separate_protocol( ) action_sequence.extend(pump_actions) - debug_print(f"✅ 分离操作已完成") - action_sequence.append(create_action_log("分离操作完成", "✅")) + action_sequence.append(action_log("分离操作完成", "✅", prefix="[SEPARATE]")) - # 🔧 新增:分离后体积估算 + # 分离后体积估算 separated_volume = phase_volume * 0.95 # 假设5%损失,只保留产物相体积 update_vessel_volume(vessel, G, separated_volume, f"分离操作后(第{cycle_num}轮)") current_volume = separated_volume @@ -411,23 +349,21 @@ def generate_separate_protocol( # 收集结果 if final_to_vessel_id: action_sequence.append( - create_action_log(f"产物 ({product_phase}相) 收集到: {final_to_vessel_id}", "📦")) + action_log(f"产物 ({product_phase}相) 收集到: {final_to_vessel_id}", "📦", prefix="[SEPARATE]")) if final_waste_vessel_id: - action_sequence.append(create_action_log(f"废相收集到: {final_waste_vessel_id}", "🗑️")) + action_sequence.append(action_log(f"废相收集到: {final_waste_vessel_id}", "🗑️", prefix="[SEPARATE]")) else: - debug_print(f"🔄 第{cycle_num}轮 步骤4: 无分离器设备,跳过分离") - action_sequence.append(create_action_log("无分离器设备可用", "❌")) + action_sequence.append(action_log("无分离器设备可用", "❌", prefix="[SEPARATE]")) # 添加等待时间模拟分离 action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": 10.0} }) - # 🔧 新增:如果不是最后一次,从中转瓶转移回分液漏斗(基于old版本逻辑) + # 如果不是最后一次,从中转瓶转移回分液漏斗 if repeat_idx < repeats - 1 and final_to_vessel_id and final_to_vessel_id != final_vessel_id: - debug_print(f"🔄 第{cycle_num}轮: 产物转移回分离容器准备下一轮") - action_sequence.append(create_action_log("产物转回分离容器,准备下一轮", "🔄")) + action_sequence.append(action_log("产物转回分离容器,准备下一轮", "🔄", prefix="[SEPARATE]")) pump_actions = generate_pump_protocol_with_rinsing( G=G, @@ -444,368 +380,85 @@ def generate_separate_protocol( # 循环间等待(除了最后一次) if repeat_idx < repeats - 1: - debug_print(f"🔄 第{cycle_num}轮: 等待下一次循环...") - action_sequence.append(create_action_log("等待下一次循环...", "⏳")) + action_sequence.append(action_log("等待下一次循环...", "⏳", prefix="[SEPARATE]")) action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": 5} }) else: - action_sequence.append(create_action_log(f"分离循环 {cycle_num}/{repeats} 完成", "🌟")) + action_sequence.append(action_log(f"分离循环 {cycle_num}/{repeats} 完成", "🌟", prefix="[SEPARATE]")) except Exception as e: debug_print(f"❌ 分离工作流程执行失败: {str(e)}") - action_sequence.append(create_action_log(f"分离工作流程失败: {str(e)}", "❌")) + action_sequence.append(action_log(f"分离工作流程失败: {str(e)}", "❌", prefix="[SEPARATE]")) - # 🔧 新增:分离完成后的最终状态报告 - final_liquid_volume = get_vessel_liquid_volume(vessel) + # 分离完成后的最终状态报告 + final_liquid_volume = get_resource_liquid_volume(vessel) # === 最终结果 === total_time = (stir_time + settling_time + 15) * repeats # 估算总时间 - debug_print("🌀" * 20) - debug_print(f"🎉 分离协议生成完成") - debug_print(f"📊 协议统计:") - debug_print(f" 📋 总动作数: {len(action_sequence)}") - debug_print(f" ⏱️ 预计总时间: {total_time:.0f}s ({total_time / 60:.1f} 分钟)") - debug_print(f" 🥼 分离容器: {final_vessel_id}") - debug_print(f" 🎯 分离目的: {purpose}") - debug_print(f" 📊 产物相: {product_phase}") - debug_print(f" 🔄 重复次数: {repeats}") - debug_print(f"💧 体积变化统计:") - debug_print(f" - 分离前体积: {original_liquid_volume:.2f}mL") - debug_print(f" - 分离后体积: {final_liquid_volume:.2f}mL") - if solvent: - debug_print(f" 💧 溶剂: {solvent} ({final_volume}mL × {repeats}轮 = {final_volume * repeats:.2f}mL)") - if final_to_vessel_id: - debug_print(f" 🎯 产物容器: {final_to_vessel_id}") - if final_waste_vessel_id: - debug_print(f" 🗑️ 废液容器: {final_waste_vessel_id}") - debug_print("🌀" * 20) + debug_print(f"分离协议生成完成: {len(action_sequence)} 个动作, " + f"预计 {total_time:.0f}s, 体积 {original_liquid_volume:.2f}→{final_liquid_volume:.2f}mL") # 添加完成日志 summary_msg = f"分离协议完成: {final_vessel_id} ({purpose},{repeats} 次循环)" if solvent: summary_msg += f",使用 {final_volume * repeats:.2f}mL {solvent}" - action_sequence.append(create_action_log(summary_msg, "🎉")) + action_sequence.append(action_log(summary_msg, "🎉", prefix="[SEPARATE]")) return action_sequence -def parse_volume_input(volume_input: Union[str, float]) -> float: - """ - 解析体积输入,支持带单位的字符串 - - Args: - volume_input: 体积输入(如 "200 mL", "?", 50.0) - - Returns: - float: 体积(毫升) - """ - if isinstance(volume_input, (int, float)): - debug_print(f"📏 体积输入为数值: {volume_input}") - return float(volume_input) - - if not volume_input or not str(volume_input).strip(): - debug_print(f"⚠️ 体积输入为空,返回 0.0mL") - return 0.0 - - volume_str = str(volume_input).lower().strip() - debug_print(f"🔍 解析体积输入: '{volume_str}'") - - # 处理未知体积 - if volume_str in ['?', 'unknown', 'tbd', 'to be determined', '未知', '待定']: - default_volume = 100.0 # 默认100mL - debug_print(f"❓ 检测到未知体积,使用默认值: {default_volume}mL") - return default_volume - - # 移除空格并提取数字和单位 - volume_clean = re.sub(r'\s+', '', volume_str) - - # 匹配数字和单位的正则表达式 - match = re.match(r'([0-9]*\.?[0-9]+)\s*(ml|l|μl|ul|microliter|milliliter|liter|毫升|升|微升)?', volume_clean) - - if not match: - debug_print(f"⚠️ 无法解析体积: '{volume_str}',使用默认值 100mL") - return 100.0 - - value = float(match.group(1)) - unit = match.group(2) or 'ml' # 默认单位为毫升 - - # 转换为毫升 - if unit in ['l', 'liter', '升']: - volume = value * 1000.0 # L -> mL - debug_print(f"🔄 体积转换: {value}L -> {volume}mL") - elif unit in ['μl', 'ul', 'microliter', '微升']: - volume = value / 1000.0 # μL -> mL - debug_print(f"🔄 体积转换: {value}μL -> {volume}mL") - else: # ml, milliliter, 毫升 或默认 - volume = value # 已经是mL - debug_print(f"✅ 体积已为毫升单位: {volume}mL") - - return volume - -def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: - """查找溶剂容器,支持多种匹配模式""" - if not solvent or not solvent.strip(): - debug_print("⏭️ 未指定溶剂,跳过溶剂容器查找") - return "" - - debug_print(f"🔍 正在查找溶剂 '{solvent}' 的容器...") - - # 🔧 方法1:直接搜索 data.reagent_name 和 config.reagent - debug_print(f"📋 方法1: 搜索试剂字段...") - for node in G.nodes(): - node_data = G.nodes[node].get('data', {}) - node_type = G.nodes[node].get('type', '') - config_data = G.nodes[node].get('config', {}) - - # 只搜索容器类型的节点 - if node_type == 'container': - reagent_name = node_data.get('reagent_name', '').lower() - config_reagent = config_data.get('reagent', '').lower() - - # 精确匹配 - if reagent_name == solvent.lower() or config_reagent == solvent.lower(): - debug_print(f"✅ 通过试剂字段精确匹配找到容器: {node}") - return node - - # 模糊匹配 - if (solvent.lower() in reagent_name and reagent_name) or \ - (solvent.lower() in config_reagent and config_reagent): - debug_print(f"✅ 通过试剂字段模糊匹配找到容器: {node}") - return node - - # 🔧 方法2:常见的容器命名规则 - debug_print(f"📋 方法2: 使用命名规则...") - solvent_clean = solvent.lower().replace(' ', '_').replace('-', '_') - possible_names = [ - f"flask_{solvent_clean}", - f"bottle_{solvent_clean}", - f"vessel_{solvent_clean}", - f"{solvent_clean}_flask", - f"{solvent_clean}_bottle", - f"solvent_{solvent_clean}", - f"reagent_{solvent_clean}", - f"reagent_bottle_{solvent_clean}", - f"reagent_bottle_1", # 通用试剂瓶 - f"reagent_bottle_2", - f"reagent_bottle_3" - ] - - debug_print(f"🎯 尝试的容器名称: {possible_names[:5]}... (共 {len(possible_names)} 个)") - - for name in possible_names: - if name in G.nodes(): - node_type = G.nodes[name].get('type', '') - if node_type == 'container': - debug_print(f"✅ 通过命名规则找到容器: {name}") - return name - - # 🔧 方法3:使用第一个试剂瓶作为备选 - debug_print(f"📋 方法3: 查找备用试剂瓶...") - for node_id in G.nodes(): - node_data = G.nodes[node_id] - if (node_data.get('type') == 'container' and - ('reagent' in node_id.lower() or 'bottle' in node_id.lower())): - debug_print(f"⚠️ 未找到专用容器,使用备用容器: {node_id}") - return node_id - - debug_print(f"❌ 无法找到溶剂 '{solvent}' 的容器") - return "" def find_separator_device(G: nx.DiGraph, vessel: str) -> str: """查找分离器设备,支持多种查找方式""" - debug_print(f"🔍 正在查找容器 '{vessel}' 的分离器设备...") - # 方法1:查找连接到容器的分离器设备 - debug_print(f"📋 方法1: 检查连接的分离器...") separator_nodes = [] for node in G.nodes(): node_class = G.nodes[node].get('class', '').lower() if 'separator' in node_class: separator_nodes.append(node) - debug_print(f"📋 发现分离器设备: {node}") - # 检查是否连接到目标容器 if G.has_edge(node, vessel) or G.has_edge(vessel, node): - debug_print(f"✅ 找到连接的分离器: {node}") return node - - debug_print(f"📊 找到的分离器总数: {len(separator_nodes)}") - + # 方法2:根据命名规则查找 - debug_print(f"📋 方法2: 使用命名规则...") possible_names = [ f"{vessel}_controller", f"{vessel}_separator", vessel, # 容器本身可能就是分离器 "separator_1", "virtual_separator", - "liquid_handler_1", # 液体处理器也可能用于分离 + "liquid_handler_1", "controller_1" ] - - debug_print(f"🎯 尝试的分离器名称: {possible_names}") - + for name in possible_names: if name in G.nodes(): node_class = G.nodes[name].get('class', '').lower() if 'separator' in node_class or 'controller' in node_class: - debug_print(f"✅ 通过命名规则找到分离器: {name}") return name - - # 方法3:查找第一个分离器设备 - debug_print(f"📋 方法3: 使用第一个可用分离器...") + + # 方法3:使用第一个可用分离器 if separator_nodes: debug_print(f"⚠️ 使用第一个分离器设备: {separator_nodes[0]}") return separator_nodes[0] - + debug_print(f"❌ 未找到分离器设备") return "" -def find_connected_stirrer(G: nx.DiGraph, vessel: str) -> str: - """查找连接到指定容器的搅拌器""" - debug_print(f"🔍 正在查找与容器 {vessel} 连接的搅拌器...") - - stirrer_nodes = [] - for node in G.nodes(): - node_data = G.nodes[node] - node_class = node_data.get('class', '') or '' - - if 'stirrer' in node_class.lower(): - stirrer_nodes.append(node) - debug_print(f"📋 发现搅拌器: {node}") - - debug_print(f"📊 找到的搅拌器总数: {len(stirrer_nodes)}") - - # 检查哪个搅拌器与目标容器相连 - for stirrer in stirrer_nodes: - if G.has_edge(stirrer, vessel) or G.has_edge(vessel, stirrer): - debug_print(f"✅ 找到连接的搅拌器: {stirrer}") - return stirrer - - # 如果没有连接的搅拌器,返回第一个可用的 - if stirrer_nodes: - debug_print(f"⚠️ 未找到直接连接的搅拌器,使用第一个可用的: {stirrer_nodes[0]}") - return stirrer_nodes[0] - - debug_print("❌ 未找到搅拌器") - return "" - -def get_vessel_liquid_volume(vessel: dict) -> float: - """ - 获取容器中的液体体积 - 支持vessel字典 - - Args: - vessel: 容器字典 - - Returns: - float: 液体体积(mL) - """ - if not vessel or "data" not in vessel: - debug_print(f"⚠️ 容器数据为空,返回 0.0mL") - return 0.0 - - vessel_data = vessel["data"] - vessel_id = vessel.get("id", "unknown") - - debug_print(f"🔍 读取容器 '{vessel_id}' 体积数据: {vessel_data}") - - # 检查liquid_volume字段 - if "liquid_volume" in vessel_data: - liquid_volume = vessel_data["liquid_volume"] - - # 处理列表格式 - if isinstance(liquid_volume, list): - if len(liquid_volume) > 0: - volume = liquid_volume[0] - if isinstance(volume, (int, float)): - debug_print(f"✅ 容器 '{vessel_id}' 体积: {volume}mL (列表格式)") - return float(volume) - - # 处理直接数值格式 - elif isinstance(liquid_volume, (int, float)): - debug_print(f"✅ 容器 '{vessel_id}' 体积: {liquid_volume}mL (数值格式)") - return float(liquid_volume) - - # 检查其他可能的体积字段 - volume_keys = ['current_volume', 'total_volume', 'volume'] - for key in volume_keys: - if key in vessel_data: - try: - volume = float(vessel_data[key]) - if volume > 0: - debug_print(f"✅ 容器 '{vessel_id}' 体积: {volume}mL (字段: {key})") - return volume - except (ValueError, TypeError): - continue - - debug_print(f"⚠️ 无法获取容器 '{vessel_id}' 的体积,返回默认值 50.0mL") - return 50.0 - -def update_vessel_volume(vessel: dict, G: nx.DiGraph, new_volume: float, description: str = "") -> None: - """ - 更新容器体积(同时更新vessel字典和图节点) - - Args: - vessel: 容器字典 - G: 网络图 - new_volume: 新体积 - description: 更新描述 - """ - vessel_id = vessel.get("id", "unknown") - - if description: - debug_print(f"🔧 更新容器体积 - {description}") - - # 更新vessel字典中的体积 - if "data" in vessel: - if "liquid_volume" in vessel["data"]: - current_volume = vessel["data"]["liquid_volume"] - if isinstance(current_volume, list): - if len(current_volume) > 0: - vessel["data"]["liquid_volume"][0] = new_volume - else: - vessel["data"]["liquid_volume"] = [new_volume] - else: - vessel["data"]["liquid_volume"] = new_volume - else: - vessel["data"]["liquid_volume"] = new_volume - else: - vessel["data"] = {"liquid_volume": new_volume} - - # 同时更新图中的容器数据 - if vessel_id in G.nodes(): - if 'data' not in G.nodes[vessel_id]: - G.nodes[vessel_id]['data'] = {} - - vessel_node_data = G.nodes[vessel_id]['data'] - current_node_volume = vessel_node_data.get('liquid_volume', 0.0) - - if isinstance(current_node_volume, list): - if len(current_node_volume) > 0: - G.nodes[vessel_id]['data']['liquid_volume'][0] = new_volume - else: - G.nodes[vessel_id]['data']['liquid_volume'] = [new_volume] - else: - G.nodes[vessel_id]['data']['liquid_volume'] = new_volume - - debug_print(f"📊 容器 '{vessel_id}' 体积已更新为: {new_volume:.2f}mL") - def find_separation_vessel_bottom(G: nx.DiGraph, vessel_id: str) -> str: """ 智能查找分离容器的底部容器(假设为flask或vessel类型) - + Args: G: 网络图 vessel_id: 分离容器ID - + Returns: str: 底部容器ID """ - debug_print(f"🔍 查找分离容器 {vessel_id} 的底部容器...") - # 方法1:根据命名规则推测 possible_bottoms = [ f"{vessel_id}_bottom", @@ -814,32 +467,25 @@ def find_separation_vessel_bottom(G: nx.DiGraph, vessel_id: str) -> str: f"{vessel_id}_flask", f"{vessel_id}_vessel" ] - - debug_print(f"📋 尝试的底部容器名称: {possible_bottoms}") - + for bottom_id in possible_bottoms: if bottom_id in G.nodes(): node_type = G.nodes[bottom_id].get('type', '') if node_type == 'container': - debug_print(f"✅ 通过命名规则找到底部容器: {bottom_id}") return bottom_id - - # 方法2:查找与分离器相连的容器(假设底部容器会与分离器相连) - debug_print(f"📋 方法2: 查找连接的容器...") + + # 方法2:查找与分离器相连的容器 for node in G.nodes(): node_data = G.nodes[node] node_class = node_data.get('class', '') or '' - + if 'separator' in node_class.lower(): - # 检查分离器的输入端 if G.has_edge(node, vessel_id): for neighbor in G.neighbors(node): if neighbor != vessel_id: neighbor_type = G.nodes[neighbor].get('type', '') if neighbor_type == 'container': - debug_print(f"✅ 通过连接找到底部容器: {neighbor}") return neighbor - + debug_print(f"❌ 无法找到分离容器 {vessel_id} 的底部容器") return "" -