diff --git a/unilabos/compile/adjustph_protocol.py b/unilabos/compile/adjustph_protocol.py index d48c17e0..91643f72 100644 --- a/unilabos/compile/adjustph_protocol.py +++ b/unilabos/compile/adjustph_protocol.py @@ -19,8 +19,6 @@ def find_acid_base_vessel(G: nx.DiGraph, reagent: str) -> str: Returns: str: 试剂容器ID """ - debug_print(f"🔍 正在查找试剂 '{reagent}' 的容器...") - # 常见酸碱试剂的别名映射 reagent_aliases = { "hydrochloric acid": ["HCl", "hydrochloric_acid", "hcl", "muriatic_acid"], @@ -34,17 +32,13 @@ def find_acid_base_vessel(G: nx.DiGraph, reagent: str) -> str: # 构建搜索名称列表 search_names = [reagent.lower()] - debug_print(f"📋 基础搜索名称: {reagent.lower()}") - + # 添加别名 for base_name, aliases in reagent_aliases.items(): if reagent.lower() in base_name.lower() or base_name.lower() in reagent.lower(): search_names.extend([alias.lower() for alias in aliases]) - debug_print(f"🔗 添加别名: {aliases}") break - debug_print(f"📝 完整搜索列表: {search_names}") - # 构建可能的容器名称 possible_names = [] for name in search_names: @@ -59,17 +53,15 @@ def find_acid_base_vessel(G: nx.DiGraph, reagent: str) -> str: name_clean ]) - debug_print(f"🎯 可能的容器名称 (前5个): {possible_names[:5]}... (共{len(possible_names)}个)") - + debug_print(f"搜索容器: {len(possible_names)} 个候选名称") + # 第一步:通过容器名称匹配 - debug_print(f"📋 方法1: 精确名称匹配...") for vessel_name in possible_names: if vessel_name in G.nodes(): - debug_print(f"✅ 通过名称匹配找到容器: {vessel_name} 🎯") + debug_print(f"通过名称匹配找到容器: {vessel_name}") return vessel_name - + # 第二步:通过模糊匹配 - debug_print(f"📋 方法2: 模糊名称匹配...") for node_id in G.nodes(): if G.nodes[node_id].get('type') == 'container': node_name = G.nodes[node_id].get('name', '').lower() @@ -77,11 +69,10 @@ def find_acid_base_vessel(G: nx.DiGraph, reagent: str) -> str: # 检查是否包含任何搜索名称 for search_name in search_names: if search_name in node_id.lower() or search_name in node_name: - debug_print(f"✅ 通过模糊匹配找到容器: {node_id} 🔍") + debug_print(f"通过模糊匹配找到容器: {node_id}") return node_id - + # 第三步:通过液体类型匹配 - debug_print(f"📋 方法3: 液体类型匹配...") for node_id in G.nodes(): if G.nodes[node_id].get('type') == 'container': vessel_data = G.nodes[node_id].get('data', {}) @@ -94,7 +85,7 @@ def find_acid_base_vessel(G: nx.DiGraph, reagent: str) -> str: for search_name in search_names: if search_name in liquid_type or search_name in reagent_name: - debug_print(f"✅ 通过液体类型匹配找到容器: {node_id} 💧") + debug_print(f"通过液体类型匹配找到容器: {node_id}") return node_id # 列出可用容器帮助调试 @@ -167,7 +158,7 @@ def generate_adjust_ph_protocol( if not vessel_id: raise ValueError("vessel 参数无效,必须包含id字段或直接提供容器ID") - debug_print(f"开始生成pH调节协议: vessel={vessel_id}, ph={ph_value}, reagent='{reagent}'") + debug_print(f"pH调节协议: vessel={vessel_id}, ph={ph_value}, reagent='{reagent}'") action_sequence = [] @@ -290,28 +281,23 @@ def generate_adjust_ph_protocol( # 体积运算 - 试剂添加成功后更新容器液体体积 if "data" in vessel and "liquid_volume" in vessel["data"]: current_volume = vessel["data"]["liquid_volume"] - debug_print(f"📊 添加前容器体积: {current_volume}") - + # 处理不同的体积数据格式 if isinstance(current_volume, list): if len(current_volume) > 0: # 增加体积(添加试剂) vessel["data"]["liquid_volume"][0] += volume - debug_print(f"📊 添加后容器体积: {vessel['data']['liquid_volume'][0]:.2f}mL (+{volume:.2f}mL)") else: # 如果列表为空,创建新的体积记录 vessel["data"]["liquid_volume"] = [volume] - debug_print(f"📊 初始化容器体积: {volume:.2f}mL") elif isinstance(current_volume, (int, float)): # 直接数值类型 vessel["data"]["liquid_volume"] += volume - debug_print(f"📊 添加后容器体积: {vessel['data']['liquid_volume']:.2f}mL (+{volume:.2f}mL)") else: - debug_print(f"⚠️ 未知的体积数据格式: {type(current_volume)}") + debug_print(f"未知的体积数据格式: {type(current_volume)}") # 创建新的体积记录 vessel["data"]["liquid_volume"] = volume else: - debug_print(f"📊 容器无液体体积数据,创建新记录: {volume:.2f}mL") # 确保vessel有data字段 if "data" not in vessel: vessel["data"] = {} @@ -329,19 +315,16 @@ def generate_adjust_ph_protocol( G.nodes[vessel_id]['data']['liquid_volume'] = [volume] else: G.nodes[vessel_id]['data']['liquid_volume'] = current_node_volume + volume - - debug_print(f"✅ 图节点体积数据已更新") - + action_sequence.append(create_action_log(f"容器体积已更新 (+{volume:.2f}mL)", "📊")) except Exception as e: - debug_print(f"❌ 生成泵协议时出错: {str(e)}") + debug_print(f"生成泵协议时出错: {str(e)}") action_sequence.append(create_action_log(f"泵协议生成失败: {str(e)}", "❌")) raise ValueError(f"生成泵协议时出错: {str(e)}") # 7. 混合搅拌 if stir and stirrer_id: - debug_print(f"🔍 步骤7: 混合搅拌...") action_sequence.append(create_action_log(f"开始混合搅拌 {stir_time:.0f}s", "🌀")) action_sequence.append({ @@ -354,14 +337,10 @@ def generate_adjust_ph_protocol( "purpose": f"pH调节: 混合试剂,目标pH={ph_value}" } }) - - debug_print(f"✅ 混合搅拌设置完成") else: - debug_print(f"⏭️ 跳过混合搅拌") action_sequence.append(create_action_log("跳过混合搅拌", "⏭️")) - + # 8. 等待平衡 - debug_print(f"🔍 步骤8: 反应平衡...") action_sequence.append(create_action_log(f"等待pH平衡 {settling_time:.0f}s", "⚖️")) action_sequence.append({ @@ -374,17 +353,7 @@ def generate_adjust_ph_protocol( # 9. 完成总结 total_time = addition_time + stir_time + settling_time - - debug_print("=" * 60) - debug_print(f"🎉 pH调节协议生成完成") - debug_print(f"📊 协议统计:") - debug_print(f" 📋 总动作数: {len(action_sequence)}") - debug_print(f" ⏱️ 预计总时间: {total_time:.0f}s ({total_time/60:.1f}分钟)") - debug_print(f" 🧪 试剂: {reagent}") - debug_print(f" 📏 体积: {volume:.2f}mL") - debug_print(f" 📊 目标pH: {ph_value}") - debug_print(f" 🥼 目标容器: {vessel_id}") - debug_print("=" * 60) + debug_print(f"pH调节协议完成: {len(action_sequence)} 个动作, {total_time:.0f}s, {volume:.2f}mL {reagent} → {vessel_id} pH {ph_value}") # 添加完成日志 summary_msg = f"pH调节协议完成: {vessel_id} → pH {ph_value} (使用 {volume:.2f}mL {reagent})" @@ -416,28 +385,18 @@ def generate_adjust_ph_protocol_stepwise( """ # 🔧 核心修改:从字典中提取容器ID vessel_id = vessel["id"] - - debug_print("=" * 60) - debug_print(f"🔄 开始分步pH调节") - debug_print(f"📋 分步参数:") - debug_print(f" 🥼 vessel: {vessel} (ID: {vessel_id})") - debug_print(f" 📊 ph_value: {ph_value}") - debug_print(f" 🧪 reagent: {reagent}") - debug_print(f" 📏 max_volume: {max_volume}mL") - debug_print(f" 🔢 steps: {steps}") - debug_print("=" * 60) + + debug_print(f"分步pH调节: vessel={vessel_id}, ph={ph_value}, reagent={reagent}, max_volume={max_volume}mL, steps={steps}") action_sequence = [] # 每步添加的体积 step_volume = max_volume / steps - debug_print(f"📊 每步体积: {step_volume:.2f}mL") action_sequence.append(create_action_log(f"开始分步pH调节 ({steps}步)", "🔄")) action_sequence.append(create_action_log(f"每步添加: {step_volume:.2f}mL", "📏")) for i in range(steps): - debug_print(f"🔄 执行第 {i+1}/{steps} 步,添加 {step_volume:.2f}mL") action_sequence.append(create_action_log(f"第 {i+1}/{steps} 步开始", "🚀")) # 生成单步协议 @@ -454,12 +413,10 @@ def generate_adjust_ph_protocol_stepwise( ) action_sequence.extend(step_actions) - debug_print(f"✅ 第 {i+1}/{steps} 步完成,添加了 {len(step_actions)} 个动作") action_sequence.append(create_action_log(f"第 {i+1}/{steps} 步完成", "✅")) # 步骤间等待 if i < steps - 1: - debug_print(f"⏳ 步骤间等待30s") action_sequence.append(create_action_log("步骤间等待...", "⏳")) action_sequence.append({ "action_name": "wait", @@ -469,7 +426,7 @@ def generate_adjust_ph_protocol_stepwise( } }) - debug_print(f"🎉 分步pH调节完成,共 {len(action_sequence)} 个动作") + debug_print(f"分步pH调节完成: {len(action_sequence)} 个动作") action_sequence.append(create_action_log("分步pH调节全部完成", "🎉")) return action_sequence @@ -483,7 +440,7 @@ def generate_acidify_protocol( ) -> List[Dict[str, Any]]: """酸化协议""" vessel_id = vessel["id"] - debug_print(f"🍋 生成酸化协议: {vessel_id} → pH {target_ph} (使用 {acid})") + debug_print(f"酸化协议: {vessel_id} → pH {target_ph} ({acid})") return generate_adjust_ph_protocol( G, vessel, target_ph, acid ) @@ -496,7 +453,7 @@ def generate_basify_protocol( ) -> List[Dict[str, Any]]: """碱化协议""" vessel_id = vessel["id"] - debug_print(f"🧂 生成碱化协议: {vessel_id} → pH {target_ph} (使用 {base})") + debug_print(f"碱化协议: {vessel_id} → pH {target_ph} ({base})") return generate_adjust_ph_protocol( G, vessel, target_ph, base ) @@ -508,7 +465,7 @@ def generate_neutralize_protocol( ) -> List[Dict[str, Any]]: """中和协议(pH=7)""" vessel_id = vessel["id"] - debug_print(f"⚖️ 生成中和协议: {vessel_id} → pH 7.0 (使用 {reagent})") + debug_print(f"中和协议: {vessel_id} → pH 7.0 ({reagent})") return generate_adjust_ph_protocol( G, vessel, 7.0, reagent ) @@ -516,10 +473,7 @@ def generate_neutralize_protocol( # 测试函数 def test_adjust_ph_protocol(): """测试pH调节协议""" - debug_print("=== ADJUST PH PROTOCOL 增强版测试 ===") - # 测试体积计算 - debug_print("🧮 测试体积计算...") test_cases = [ (2.0, "hydrochloric acid", 100.0), (4.0, "hydrochloric acid", 100.0), @@ -527,12 +481,12 @@ def test_adjust_ph_protocol(): (10.0, "sodium hydroxide", 100.0), (7.0, "unknown reagent", 100.0) ] - + for ph, reagent, volume in test_cases: result = calculate_reagent_volume(ph, reagent, volume) - debug_print(f"📊 {reagent} → pH {ph}: {result:.2f}mL") - - debug_print("✅ 测试完成") + debug_print(f"{reagent} → pH {ph}: {result:.2f}mL") + + debug_print("测试完成") if __name__ == "__main__": test_adjust_ph_protocol() \ No newline at end of file diff --git a/unilabos/compile/dissolve_protocol.py b/unilabos/compile/dissolve_protocol.py index 50f9989c..c10c3d71 100644 --- a/unilabos/compile/dissolve_protocol.py +++ b/unilabos/compile/dissolve_protocol.py @@ -56,24 +56,17 @@ def generate_dissolve_protocol( # 从字典中提取容器ID vessel_id, vessel_data = get_vessel(vessel) - debug_print(f"开始生成溶解协议: vessel={vessel_id}, solvent='{solvent}', " - f"volume={volume}, mass={mass}, temp={temp}, time={time}, " - f"reagent='{reagent}', mol='{mol}', event='{event}'") - + debug_print(f"溶解协议: vessel={vessel_id}, solvent='{solvent}', volume={volume}, " + f"mass={mass}, temp={temp}, time={time}") + action_sequence = [] - - # === 参数验证 === - action_sequence.append(create_action_log(f"开始溶解操作 - 容器: {vessel_id}", "🎬")) - + if not vessel_id: - debug_print("❌ vessel 参数不能为空") raise ValueError("vessel 参数不能为空") if vessel_id not in G.nodes(): raise ValueError(f"容器 '{vessel_id}' 不存在于系统中") - action_sequence.append(create_action_log("参数验证通过", "✅")) - # 记录溶解前的容器状态 original_liquid_volume = 0.0 if "data" in vessel and "liquid_volume" in vessel["data"]: @@ -84,19 +77,14 @@ def generate_dissolve_protocol( original_liquid_volume = current_volume # === 参数解析 === - action_sequence.append(create_action_log("正在解析溶解参数...", "🔍")) - - # 解析各种参数为数值 final_volume = parse_volume_input(volume) final_mass = parse_mass_input(mass) final_temp = parse_temperature_input(temp) final_time = parse_time_input(time) - debug_print(f"解析结果: volume={final_volume}mL, mass={final_mass}g, " - f"temp={final_temp}°C, time={final_time}s") + debug_print(f"参数解析: vol={final_volume}mL, mass={final_mass}g, temp={final_temp}°C, time={final_time}s") # === 判断溶解类型 === - action_sequence.append(create_action_log("正在判断溶解类型...", "🔍")) # 判断是固体溶解还是液体溶解 is_solid_dissolve = (final_mass > 0 or (mol and mol.strip() != "") or (reagent and reagent.strip() != "")) @@ -108,41 +96,31 @@ def generate_dissolve_protocol( final_volume = 50.0 if not solvent: solvent = "water" # 默认溶剂 - debug_print("⚠️ 未明确指定溶解参数,默认为50mL水溶解") + debug_print("未明确指定溶解参数,默认为50mL水溶解") dissolve_type = "固体溶解" if is_solid_dissolve else "液体溶解" - dissolve_emoji = "🧂" if is_solid_dissolve else "💧" debug_print(f"溶解类型: {dissolve_type}") - action_sequence.append(create_action_log(f"确定溶解类型: {dissolve_type} {dissolve_emoji}", "📋")) + action_sequence.append(create_action_log(f"溶解类型: {dissolve_type}", "📋")) # === 查找设备 === - action_sequence.append(create_action_log("正在查找相关设备...", "🔍")) - - # 查找加热搅拌器 heatchill_id = find_connected_heatchill(G, vessel_id) stirrer_id = find_connected_stirrer(G, vessel_id) # 优先使用加热搅拌器,否则使用独立搅拌器 stir_device_id = heatchill_id or stirrer_id - debug_print(f"设备映射: heatchill='{heatchill_id}', stirrer='{stirrer_id}', 使用='{stir_device_id}'") - - if heatchill_id: - action_sequence.append(create_action_log(f"找到加热搅拌器: {heatchill_id}", "🔥")) - elif stirrer_id: - action_sequence.append(create_action_log(f"找到搅拌器: {stirrer_id}", "🌪️")) - else: + debug_print(f"设备: heatchill='{heatchill_id}', stirrer='{stirrer_id}'") + + if not stir_device_id: action_sequence.append(create_action_log("未找到搅拌设备,将跳过搅拌", "⚠️")) # === 执行溶解流程 === try: - # 步骤5.1: 启动加热搅拌(如果需要) + # 启动加热搅拌(如果需要) if stir_device_id and (final_temp > 25.0 or final_time > 0 or stir_speed > 0): - action_sequence.append(create_action_log(f"准备加热搅拌 (目标温度: {final_temp}°C)", "🔥")) - + if heatchill_id and (final_temp > 25.0 or final_time > 0): # 使用加热搅拌器 - action_sequence.append(create_action_log(f"启动加热搅拌器 {heatchill_id}", "🔥")) heatchill_action = { "device_id": heatchill_id, @@ -158,7 +136,6 @@ def generate_dissolve_protocol( # 等待温度稳定 if final_temp > 25.0: wait_time = min(60, abs(final_temp - 25.0) * 1.5) - action_sequence.append(create_action_log(f"等待温度稳定 ({wait_time:.0f}秒)", "⏳")) action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": wait_time} @@ -166,7 +143,6 @@ def generate_dissolve_protocol( elif stirrer_id: # 使用独立搅拌器 - action_sequence.append(create_action_log(f"启动搅拌器 {stirrer_id} (速度: {stir_speed}rpm)", "🌪️")) stir_action = { "device_id": stirrer_id, @@ -178,9 +154,8 @@ def generate_dissolve_protocol( } } action_sequence.append(stir_action) - + # 等待搅拌稳定 - action_sequence.append(create_action_log("等待搅拌稳定...", "⏳")) action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": 5} @@ -188,11 +163,8 @@ def generate_dissolve_protocol( if is_solid_dissolve: # === 固体溶解路径 === - action_sequence.append(create_action_log("开始固体溶解流程", "🧂")) - solid_dispenser = find_solid_dispenser(G) if solid_dispenser: - action_sequence.append(create_action_log(f"找到固体加样器: {solid_dispenser}", "🥄")) # 固体加样 add_kwargs = { @@ -204,38 +176,27 @@ def generate_dissolve_protocol( if final_mass > 0: add_kwargs["mass"] = str(final_mass) - action_sequence.append(create_action_log(f"准备添加固体: {final_mass}g", "⚖️")) if mol and mol.strip(): add_kwargs["mol"] = mol - action_sequence.append(create_action_log(f"按摩尔数添加: {mol}", "🧬")) - - action_sequence.append(create_action_log("开始固体加样操作", "🥄")) + action_sequence.append({ "device_id": solid_dispenser, "action_name": "add_solid", "action_kwargs": add_kwargs }) - - action_sequence.append(create_action_log("固体加样完成", "✅")) # 固体溶解体积运算 - 固体本身不会显著增加体积 - action_sequence.append(create_action_log(f"固体已添加: {final_mass}g", "📊")) else: - debug_print("⚠️ 未找到固体加样器,跳过固体添加") + debug_print("未找到固体加样器,跳过固体添加") action_sequence.append(create_action_log("未找到固体加样器,无法添加固体", "❌")) elif is_liquid_dissolve: # === 液体溶解路径 === - action_sequence.append(create_action_log("开始液体溶解流程", "💧")) - - # 查找溶剂容器 - action_sequence.append(create_action_log("正在查找溶剂容器...", "🔍")) try: solvent_vessel = find_solvent_vessel(G, solvent) - action_sequence.append(create_action_log(f"找到溶剂容器: {solvent_vessel}", "🧪")) except ValueError as e: - debug_print(f"⚠️ {str(e)},跳过溶剂添加") + debug_print(f"溶剂容器查找失败: {str(e)},跳过溶剂添加") action_sequence.append(create_action_log(f"溶剂容器查找失败: {str(e)}", "❌")) solvent_vessel = None @@ -243,10 +204,7 @@ def generate_dissolve_protocol( # 计算流速 - 溶解时通常用较慢的速度,避免飞溅 flowrate = 1.0 # 较慢的注入速度 transfer_flowrate = 0.5 # 较慢的转移速度 - - action_sequence.append(create_action_log(f"设置流速: {flowrate}mL/min (缓慢注入)", "⚡")) - action_sequence.append(create_action_log(f"开始转移 {final_volume}mL {solvent}", "🚰")) - + # 调用pump protocol pump_actions = generate_pump_protocol_with_rinsing( G=G, @@ -268,10 +226,9 @@ def generate_dissolve_protocol( **kwargs ) action_sequence.extend(pump_actions) - action_sequence.append(create_action_log(f"溶剂转移完成 ({len(pump_actions)} 个操作)", "✅")) # 液体溶解体积运算 - 添加溶剂后更新容器体积 - + # 确保vessel有data字段 if "data" not in vessel: vessel["data"] = {} @@ -305,24 +262,19 @@ def generate_dissolve_protocol( G.nodes[vessel_id]['data']['liquid_volume'] = [final_volume] else: G.nodes[vessel_id]['data']['liquid_volume'] = current_node_volume + final_volume - - action_sequence.append(create_action_log(f"容器体积已更新 (+{final_volume:.2f}mL)", "📊")) - + # 溶剂添加后等待 - action_sequence.append(create_action_log("溶剂添加后短暂等待...", "⏳")) action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": 5} }) - # 步骤5.4: 等待溶解完成 + # 等待溶解完成 if final_time > 0: wait_minutes = final_time / 60 - action_sequence.append(create_action_log(f"开始溶解等待 ({wait_minutes:.1f}分钟)", "⏰")) - + if heatchill_id: # 使用定时加热搅拌 - action_sequence.append(create_action_log(f"使用加热搅拌器进行定时溶解", "🔥")) dissolve_action = { "device_id": heatchill_id, @@ -340,7 +292,6 @@ def generate_dissolve_protocol( elif stirrer_id: # 使用定时搅拌 - action_sequence.append(create_action_log(f"使用搅拌器进行定时溶解", "🌪️")) stir_action = { "device_id": stirrer_id, @@ -357,7 +308,6 @@ def generate_dissolve_protocol( else: # 简单等待 - action_sequence.append(create_action_log(f"简单等待溶解完成", "⏳")) action_sequence.append({ "action_name": "wait", "action_kwargs": {"time": final_time} @@ -365,8 +315,7 @@ def generate_dissolve_protocol( # 步骤5.5: 停止加热搅拌(如果需要) if heatchill_id and final_time == 0 and final_temp > 25.0: - action_sequence.append(create_action_log("停止加热搅拌器", "🛑")) - + stop_action = { "device_id": heatchill_id, "action_name": "heat_chill_stop", @@ -377,7 +326,7 @@ def generate_dissolve_protocol( action_sequence.append(stop_action) except Exception as e: - debug_print(f"❌ 溶解流程执行失败: {str(e)}") + debug_print(f"溶解流程执行失败: {str(e)}") action_sequence.append(create_action_log(f"溶解流程失败: {str(e)}", "❌")) # 添加错误日志 action_sequence.append({ @@ -398,8 +347,8 @@ def generate_dissolve_protocol( final_liquid_volume = current_volume # === 最终结果 === - debug_print(f"溶解协议生成完成: {vessel_id}, 类型={dissolve_type}, " - f"动作数={len(action_sequence)}, 前后体积={original_liquid_volume:.2f}→{final_liquid_volume:.2f}mL") + debug_print(f"溶解协议完成: {vessel_id}, 类型={dissolve_type}, " + f"动作数={len(action_sequence)}, 体积={original_liquid_volume:.2f}→{final_liquid_volume:.2f}mL") # 添加完成日志 summary_msg = f"溶解协议完成: {vessel_id}" @@ -408,7 +357,7 @@ def generate_dissolve_protocol( if is_solid_dissolve: summary_msg += f" (溶解 {final_mass}g {reagent})" - action_sequence.append(create_action_log(summary_msg, "🎉")) + action_sequence.append(create_action_log(summary_msg, "✅")) return action_sequence @@ -420,7 +369,7 @@ def dissolve_solid_by_mass(G: nx.DiGraph, vessel: dict, reagent: str, mass: Unio temp: Union[str, float] = 25.0, time: Union[str, float] = "10 min") -> List[Dict[str, Any]]: """按质量溶解固体""" vessel_id = vessel["id"] - debug_print(f"🧂 快速固体溶解: {reagent} ({mass}) → {vessel_id}") + debug_print(f"快速固体溶解: {reagent} ({mass}) → {vessel_id}") return generate_dissolve_protocol( G, vessel, mass=mass, @@ -433,7 +382,7 @@ def dissolve_solid_by_moles(G: nx.DiGraph, vessel: dict, reagent: str, mol: str, temp: Union[str, float] = 25.0, time: Union[str, float] = "10 min") -> List[Dict[str, Any]]: """按摩尔数溶解固体""" vessel_id = vessel["id"] - debug_print(f"🧬 按摩尔数溶解固体: {reagent} ({mol}) → {vessel_id}") + debug_print(f"按摩尔数溶解固体: {reagent} ({mol}) → {vessel_id}") return generate_dissolve_protocol( G, vessel, mol=mol, @@ -446,7 +395,7 @@ def dissolve_with_solvent(G: nx.DiGraph, vessel: dict, solvent: str, volume: Uni temp: Union[str, float] = 25.0, time: Union[str, float] = "5 min") -> List[Dict[str, Any]]: """用溶剂溶解""" vessel_id = vessel["id"] - debug_print(f"💧 溶剂溶解: {solvent} ({volume}) → {vessel_id}") + debug_print(f"溶剂溶解: {solvent} ({volume}) → {vessel_id}") return generate_dissolve_protocol( G, vessel, solvent=solvent, @@ -458,7 +407,7 @@ def dissolve_with_solvent(G: nx.DiGraph, vessel: dict, solvent: str, volume: Uni def dissolve_at_room_temp(G: nx.DiGraph, vessel: dict, solvent: str, volume: Union[str, float]) -> List[Dict[str, Any]]: """室温溶解""" vessel_id = vessel["id"] - debug_print(f"🌡️ 室温溶解: {solvent} ({volume}) → {vessel_id}") + debug_print(f"室温溶解: {solvent} ({volume}) → {vessel_id}") return generate_dissolve_protocol( G, vessel, solvent=solvent, @@ -471,7 +420,7 @@ def dissolve_with_heating(G: nx.DiGraph, vessel: dict, solvent: str, volume: Uni temp: Union[str, float] = "60 °C", time: Union[str, float] = "15 min") -> List[Dict[str, Any]]: """加热溶解""" vessel_id = vessel["id"] - debug_print(f"🔥 加热溶解: {solvent} ({volume}) → {vessel_id} @ {temp}") + debug_print(f"加热溶解: {solvent} ({volume}) → {vessel_id} @ {temp}") return generate_dissolve_protocol( G, vessel, solvent=solvent, @@ -483,37 +432,31 @@ def dissolve_with_heating(G: nx.DiGraph, vessel: dict, solvent: str, volume: Uni # 测试函数 def test_dissolve_protocol(): """测试溶解协议的各种参数解析""" - debug_print("=== DISSOLVE PROTOCOL 增强版测试 ===") - # 测试体积解析 - debug_print("💧 测试体积解析...") volumes = ["10 mL", "?", 10.0, "1 L", "500 μL"] for vol in volumes: result = parse_volume_input(vol) - debug_print(f"📏 体积解析: {vol} → {result}mL") - + debug_print(f"体积解析: {vol} → {result}mL") + # 测试质量解析 - debug_print("⚖️ 测试质量解析...") masses = ["2.9 g", "?", 2.5, "500 mg"] for mass in masses: result = parse_mass_input(mass) - debug_print(f"⚖️ 质量解析: {mass} → {result}g") - + debug_print(f"质量解析: {mass} → {result}g") + # 测试温度解析 - debug_print("🌡️ 测试温度解析...") temps = ["60 °C", "room temperature", "?", 25.0, "reflux"] for temp in temps: result = parse_temperature_input(temp) - debug_print(f"🌡️ 温度解析: {temp} → {result}°C") - + debug_print(f"温度解析: {temp} → {result}°C") + # 测试时间解析 - debug_print("⏱️ 测试时间解析...") times = ["30 min", "1 h", "?", 60.0] for time in times: result = parse_time_input(time) - debug_print(f"⏱️ 时间解析: {time} → {result}s") - - debug_print("✅ 测试完成") + debug_print(f"时间解析: {time} → {result}s") + + debug_print("测试完成") if __name__ == "__main__": test_dissolve_protocol() \ No newline at end of file diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index f7e06730..8468915e 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -41,108 +41,77 @@ def is_integrated_pump(node_class: str, node_name: str = "") -> bool: def find_connected_pump(G, valve_node): """ - 查找与阀门相连的泵节点 - 修复版本 - 🔧 修复:区分电磁阀和多通阀,电磁阀不参与泵查找 + 查找与阀门相连的泵节点 + 区分电磁阀和多通阀,电磁阀不参与泵查找 """ - debug_print(f"🔍 查找与阀门 {valve_node} 相连的泵...") - - # 🔧 关键修复:检查节点类型,电磁阀不应该查找泵 + # 检查节点类型,电磁阀不应该查找泵 node_data = G.nodes.get(valve_node, {}) node_class = node_data.get("class", "") or "" - debug_print(f" - 阀门类型: {node_class}") - # 如果是电磁阀,不应该查找泵(电磁阀只是开关) if ("solenoid" in node_class.lower() or "solenoid_valve" in valve_node.lower()): - debug_print(f" ⚠️ {valve_node} 是电磁阀,不应该查找泵节点") raise ValueError(f"电磁阀 {valve_node} 不应该参与泵查找逻辑") # 只有多通阀等复杂阀门才需要查找连接的泵 if ("multiway" in node_class.lower() or "valve" in node_class.lower()): - debug_print(f" - {valve_node} 是多通阀,查找连接的泵...") # 方法1:直接相邻的泵 for neighbor in G.neighbors(valve_node): neighbor_class = G.nodes[neighbor].get("class", "") or "" - # 排除非 电磁阀 和 泵 的邻居 - debug_print(f" - 检查邻居 {neighbor}, class: {neighbor_class}") if "pump" in neighbor_class.lower(): - debug_print(f" ✅ 找到直接相连的泵: {neighbor}") return neighbor # 方法2:通过路径查找泵(最多2跳) - debug_print(f" - 未找到直接相连的泵,尝试路径查找...") + pump_nodes = [ + node_id for node_id in G.nodes() + if "pump" in (G.nodes[node_id].get("class", "") or "").lower() + ] - # 获取所有泵节点 - pump_nodes = [] - for node_id in G.nodes(): - node_class = G.nodes[node_id].get("class", "") or "" - if "pump" in node_class.lower(): - pump_nodes.append(node_id) - - debug_print(f" - 系统中的泵节点: {pump_nodes}") - - # 查找到泵的最短路径 for pump_node in pump_nodes: try: if nx.has_path(G, valve_node, pump_node): path = nx.shortest_path(G, valve_node, pump_node) - path_length = len(path) - 1 - debug_print(f" - 到泵 {pump_node} 的路径: {path}, 距离: {path_length}") - - if path_length <= 2: # 最多允许2跳 - debug_print(f" ✅ 通过路径找到泵: {pump_node}") + if len(path) - 1 <= 2: # 最多允许2跳 return pump_node except nx.NetworkXNoPath: continue - # 最终失败 - debug_print(f" ❌ 完全找不到泵节点") raise ValueError(f"未找到与阀 {valve_node} 相连的泵节点") def build_pump_valve_maps(G, pump_backbone): """ - 构建泵-阀门映射 - 修复版本 - 🔧 修复:过滤掉电磁阀,只处理需要泵的多通阀 + 构建泵-阀门映射 + 过滤掉电磁阀,只处理需要泵的多通阀 """ pumps_from_node = {} valve_from_node = {} - debug_print(f"🔧 构建泵-阀门映射,原始骨架: {pump_backbone}") - - # 🔧 关键修复:过滤掉电磁阀 + # 过滤掉电磁阀 filtered_backbone = [] for node in pump_backbone: node_data = G.nodes.get(node, {}) node_class = node_data.get("class", "") or "" - # 跳过电磁阀 if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): - debug_print(f" - 跳过电磁阀: {node}") continue filtered_backbone.append(node) - debug_print(f"🔧 过滤后的骨架: {filtered_backbone}") - for node in filtered_backbone: node_data = G.nodes.get(node, {}) node_class = node_data.get("class", "") or "" if is_integrated_pump(node_class, node): pumps_from_node[node] = node valve_from_node[node] = node - debug_print(f" - 集成泵-阀: {node}") else: try: pump_node = find_connected_pump(G, node) pumps_from_node[node] = pump_node valve_from_node[node] = node - debug_print(f" - 阀门 {node} -> 泵 {pump_node}") - except ValueError as e: - debug_print(f" - 跳过节点 {node}: {str(e)}") + except ValueError: continue - debug_print(f"🔧 最终映射: pumps={pumps_from_node}, valves={valve_from_node}") + debug_print(f"泵-阀映射: pumps={pumps_from_node}, valves={valve_from_node}") return pumps_from_node, valve_from_node @@ -155,8 +124,8 @@ def generate_pump_protocol( transfer_flowrate: float = 0.5, ) -> List[Dict[str, Any]]: """ - 生成泵操作的动作序列 - 修复版本 - 🔧 修复:正确处理包含电磁阀的路径 + 生成泵操作的动作序列 + 正确处理包含电磁阀的路径 """ pump_action_sequence = [] nodes = G.nodes(data=True) @@ -175,7 +144,6 @@ def generate_pump_protocol( logger.warning(f"transfer_flowrate <= 0,使用默认值 {transfer_flowrate}mL/s") # 验证容器存在 - debug_print(f"🔍 验证源容器 '{from_vessel_id}' 和目标容器 '{to_vessel_id}' 是否存在...") if from_vessel_id not in G.nodes(): logger.error(f"源容器 '{from_vessel_id}' 不存在") return pump_action_sequence @@ -191,28 +159,24 @@ def generate_pump_protocol( logger.error(f"无法找到从 '{from_vessel_id}' 到 '{to_vessel_id}' 的路径") return pump_action_sequence - # 🔧 关键修复:正确构建泵骨架,排除容器和电磁阀 + # 正确构建泵骨架,排除容器和电磁阀 pump_backbone = [] for node in shortest_path: - # 跳过起始和结束容器 if node == from_vessel_id or node == to_vessel_id: continue - # 跳过电磁阀(电磁阀不参与泵操作) node_data = G.nodes.get(node, {}) node_class = node_data.get("class", "") or "" if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): - debug_print(f"PUMP_TRANSFER: 跳过电磁阀 {node}") continue - # 只包含多通阀和泵 if ("multiway" in node_class.lower() or "valve" in node_class.lower() or "pump" in node_class.lower()): pump_backbone.append(node) - debug_print(f"PUMP_TRANSFER: 过滤后的泵骨架: {pump_backbone}") + debug_print(f"PUMP_TRANSFER: 泵骨架: {pump_backbone}") if not pump_backbone: - debug_print("PUMP_TRANSFER: 没有泵骨架节点,可能是直接容器连接或只有电磁阀") + debug_print("PUMP_TRANSFER: 没有泵骨架节点") return pump_action_sequence if transfer_flowrate == 0: @@ -228,7 +192,7 @@ def generate_pump_protocol( debug_print("PUMP_TRANSFER: 没有可用的泵映射") return pump_action_sequence - # 🔧 修复:安全地获取最小转移体积 + # 安全地获取最小转移体积 try: min_transfer_volumes = [] for node in pump_backbone: @@ -258,19 +222,19 @@ def generate_pump_protocol( volume_left = volume debug_print(f"PUMP_TRANSFER: 需要 {repeats} 次转移,单次最大体积 {min_transfer_volume} mL") - # 🆕 只在开头打印总体概览 + # 只在开头打印总体概览 if repeats > 1: - debug_print(f"🔄 分批转移概览: 总体积 {volume:.2f}mL,需要 {repeats} 次转移") - logger.info(f"🔄 分批转移概览: 总体积 {volume:.2f}mL,需要 {repeats} 次转移") + debug_print(f"分批转移: 总体积 {volume:.2f}mL, {repeats} 次, 单次最大 {min_transfer_volume} mL") + logger.info(f"分批转移: 总体积 {volume:.2f}mL, {repeats} 次转移") - # 🔧 创建一个自定义的wait动作,用于在执行时打印日志 + # 创建一个自定义的wait动作,用于在执行时打印日志 def create_progress_log_action(message: str) -> Dict[str, Any]: """创建一个特殊的等待动作,在执行时打印进度日志""" return { "action_name": "wait", "action_kwargs": { - "time": 0.1, # 很短的等待时间 - "progress_message": message # 自定义字段,用于进度日志 + "time": 0.1, + "progress_message": message } } @@ -278,12 +242,12 @@ def generate_pump_protocol( for i in range(repeats): current_volume = min(volume_left, min_transfer_volume) - # 🆕 在每次循环开始时添加进度日志 if repeats > 1: - start_message = f"🚀 准备开始第 {i + 1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel_id} → {to_vessel_id}) 🚰" - pump_action_sequence.append(create_progress_log_action(start_message)) + pump_action_sequence.append(create_progress_log_action( + f"第 {i + 1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel_id} -> {to_vessel_id})" + )) - # 🔧 修复:安全地获取边数据 + # 安全地获取边数据 def get_safe_edge_data(node_a, node_b, key): try: edge_data = G.get_edge_data(node_a, node_b) @@ -386,13 +350,13 @@ def generate_pump_protocol( ]) pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) - # 🆕 在每次循环结束时添加完成日志 + # 在每次循环结束时添加完成日志 if repeats > 1: remaining_volume = volume_left - current_volume if remaining_volume > 0: - end_message = f"✅ 第 {i + 1}/{repeats} 次转移完成! 剩余 {remaining_volume:.2f}mL 待转移 ⏳" + end_message = f"第 {i + 1}/{repeats} 次完成, 剩余 {remaining_volume:.2f}mL" else: - end_message = f"🎉 第 {i + 1}/{repeats} 次转移完成! 全部 {volume:.2f}mL 转移完毕 ✨" + end_message = f"第 {i + 1}/{repeats} 次完成, 全部 {volume:.2f}mL 转移完毕" pump_action_sequence.append(create_progress_log_action(end_message)) @@ -441,7 +405,6 @@ def generate_pump_protocol_with_rinsing( # 1. 处理体积参数 final_volume = volume - debug_print(f"初始体积: {final_volume}") # 如果volume为0,从容器读取实际体积 if volume == 0.0: @@ -450,13 +413,9 @@ def generate_pump_protocol_with_rinsing( if actual_volume > 0: final_volume = actual_volume - debug_print(f"从容器读取体积: {final_volume}mL") else: final_volume = 10.0 logger.warning(f"无法从容器读取体积,使用默认值: {final_volume}mL") - else: - debug_print(f"使用指定体积: {final_volume}mL") - # 处理 amount 参数 if amount and amount.strip(): parsed_volume = _parse_amount_to_volume(amount) @@ -477,7 +436,6 @@ def generate_pump_protocol_with_rinsing( debug_print(f"最终体积: {final_volume}mL") # 2. 处理流速参数 - final_flowrate = flowrate if flowrate > 0 else 2.5 final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 @@ -506,7 +464,7 @@ def generate_pump_protocol_with_rinsing( elif rate_spec == "quickly": final_flowrate = max(final_flowrate, 5.0) final_transfer_flowrate = max(final_transfer_flowrate, 2.0) - debug_print(f"速度规格 '{rate_spec}' 应用后: flowrate={final_flowrate}, transfer={final_transfer_flowrate}") + debug_print(f"速度规格 '{rate_spec}': flowrate={final_flowrate}, transfer={final_transfer_flowrate}") # 5. 处理冲洗参数 final_rinsing_solvent = rinsing_solvent @@ -528,7 +486,7 @@ def generate_pump_protocol_with_rinsing( f"transfer_flowrate={final_transfer_flowrate}mL/s, " f"rinsing={final_rinsing_solvent}/{final_rinsing_volume}mL/{final_rinsing_repeats}次") - # ========== 执行基础转移 ========== + # 执行基础转移 try: pump_action_sequence = generate_pump_protocol( @@ -539,14 +497,14 @@ def generate_pump_protocol_with_rinsing( debug_print(f"基础转移生成了 {len(pump_action_sequence)} 个动作") if not pump_action_sequence: - debug_print("基础转移协议生成为空,可能是路径问题") + debug_print("基础转移协议为空") if from_vessel_id in G.nodes() and to_vessel_id in G.nodes(): try: path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) debug_print(f"路径存在: {path}") - except Exception as path_error: - debug_print(f"无法找到路径: {str(path_error)}") + except Exception: + pass return [ { @@ -559,9 +517,7 @@ def generate_pump_protocol_with_rinsing( ] except Exception as e: - debug_print(f"基础转移失败: {str(e)}") - import traceback - debug_print(f"详细错误: {traceback.format_exc()}") + debug_print(f"基础转移失败: {str(e)}\n{traceback.format_exc()}") return [ { "device_id": "system", @@ -572,9 +528,8 @@ def generate_pump_protocol_with_rinsing( } ] - # ========== 执行冲洗操作 ========== + # 执行冲洗操作 if final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0: - debug_print(f"开始冲洗操作,溶剂: '{final_rinsing_solvent}'") try: if final_rinsing_solvent.strip() != "air": @@ -591,17 +546,16 @@ def generate_pump_protocol_with_rinsing( ) pump_action_sequence.extend(air_rinsing_actions) except Exception as e: - debug_print(f"冲洗操作失败: {str(e)},跳过冲洗") + debug_print(f"冲洗操作失败: {str(e)}") else: - debug_print(f"跳过冲洗操作 (solvent='{final_rinsing_solvent}', repeats={final_rinsing_repeats})") + debug_print(f"跳过冲洗 (solvent='{final_rinsing_solvent}', repeats={final_rinsing_repeats})") - # ========== 最终结果 ========== + # 最终结果 debug_print(f"PUMP_TRANSFER 完成: {from_vessel_id} -> {to_vessel_id}, " f"volume={final_volume}mL, 动作数={len(pump_action_sequence)}") # 最终验证 if len(pump_action_sequence) == 0: - debug_print("协议生成结果为空!") return [ { "device_id": "system", diff --git a/unilabos/compile/stir_protocol.py b/unilabos/compile/stir_protocol.py index 7daf3f48..86f04b79 100644 --- a/unilabos/compile/stir_protocol.py +++ b/unilabos/compile/stir_protocol.py @@ -11,30 +11,21 @@ logger = logging.getLogger(__name__) def validate_and_fix_params(stir_time: float, stir_speed: float, settling_time: float) -> tuple: """验证和修正参数""" - # ⏰ 搅拌时间验证 if stir_time < 0: - debug_print(f"⚠️ 搅拌时间 {stir_time}s 无效,修正为 100s 🕐") + debug_print(f"搅拌时间 {stir_time}s 无效,修正为 100s") stir_time = 100.0 elif stir_time > 100: # 限制为100s - debug_print(f"⚠️ 搅拌时间 {stir_time}s 过长,仿真运行时,修正为 100s 🕐") + debug_print(f"搅拌时间 {stir_time}s 过长,仿真运行时修正为 100s") stir_time = 100.0 - else: - debug_print(f"✅ 搅拌时间 {stir_time}s ({stir_time/60:.1f}分钟) 有效 ⏰") - - # 🌪️ 搅拌速度验证 + if stir_speed < 10.0 or stir_speed > 1500.0: - debug_print(f"⚠️ 搅拌速度 {stir_speed} RPM 超出范围,修正为 300 RPM 🌪️") + debug_print(f"搅拌速度 {stir_speed} RPM 超出范围,修正为 300 RPM") stir_speed = 300.0 - else: - debug_print(f"✅ 搅拌速度 {stir_speed} RPM 在正常范围内 🌪️") - - # ⏱️ 沉降时间验证 + if settling_time < 0 or settling_time > 600: # 限制为10分钟 - debug_print(f"⚠️ 沉降时间 {settling_time}s 超出范围,修正为 60s ⏱️") + debug_print(f"沉降时间 {settling_time}s 超出范围,修正为 60s") settling_time = 60.0 - else: - debug_print(f"✅ 沉降时间 {settling_time}s 在正常范围内 ⏱️") - + return stir_time, stir_speed, settling_time def extract_vessel_id(vessel) -> str: @@ -58,16 +49,13 @@ def generate_stir_protocol( ) -> List[Dict[str, Any]]: """生成搅拌操作的协议序列 - 修复vessel参数传递""" - # 🔧 核心修改:正确处理vessel参数 vessel_id = extract_vessel_id(vessel) vessel_display = get_vessel_display_info(vessel) - - # 🔧 关键修复:确保vessel_resource是完整的Resource对象 + + # 确保vessel_resource是完整的Resource对象 if isinstance(vessel, dict): - vessel_resource = vessel # 已经是完整的Resource字典 - debug_print(f"✅ 使用传入的vessel Resource对象") + vessel_resource = vessel else: - # 如果只是字符串,构建一个基本的Resource对象 vessel_resource = { "id": vessel, "name": "", @@ -83,10 +71,6 @@ def generate_stir_protocol( "sample_id": "", "type": "" } - debug_print(f"🔧 构建了基本的vessel Resource对象: {vessel}") - - debug_print(f"开始生成搅拌协议: vessel={vessel_id}, time={time}, " - f"stir_time={stir_time}, stir_speed={stir_speed}RPM, settling={settling_time}") # 参数验证 if not vessel_id: @@ -140,8 +124,7 @@ def generate_stir_protocol( "device_id": stirrer_id, "action_name": "stir", "action_kwargs": { - # 🔧 关键修复:传递vessel_id字符串,而不是完整的Resource对象 - "vessel": {"id": vessel_id}, # 传递字符串ID,不是Resource对象 + "vessel": {"id": vessel_id}, "time": str(time), "event": event, "time_spec": time_spec, @@ -171,16 +154,13 @@ def generate_start_stir_protocol( ) -> List[Dict[str, Any]]: """生成开始搅拌操作的协议序列 - 修复vessel参数传递""" - # 🔧 核心修改:正确处理vessel参数 vessel_id = extract_vessel_id(vessel) vessel_display = get_vessel_display_info(vessel) - - # 🔧 关键修复:确保vessel_resource是完整的Resource对象 + + # 确保vessel_resource是完整的Resource对象 if isinstance(vessel, dict): - vessel_resource = vessel # 已经是完整的Resource字典 - debug_print(f"✅ 使用传入的vessel Resource对象") + vessel_resource = vessel else: - # 如果只是字符串,构建一个基本的Resource对象 vessel_resource = { "id": vessel, "name": "", @@ -196,10 +176,7 @@ def generate_start_stir_protocol( "sample_id": "", "type": "" } - debug_print(f"构建了基本的vessel Resource对象: {vessel}") - debug_print(f"启动搅拌协议: vessel={vessel_id}, speed={stir_speed}RPM, purpose='{purpose}'") - # 基础验证 if not vessel_id or vessel_id not in G.nodes(): raise ValueError("vessel 参数无效") @@ -207,23 +184,21 @@ def generate_start_stir_protocol( # 参数修正 if stir_speed < 10.0 or stir_speed > 1500.0: stir_speed = 300.0 - + # 查找设备 stirrer_id = find_connected_stirrer(G, vessel_id) - - # 🔧 关键修复:传递vessel_id字符串 + action_sequence = [{ "device_id": stirrer_id, "action_name": "start_stir", "action_kwargs": { - # 🔧 关键修复:传递vessel_id字符串,而不是完整的Resource对象 - "vessel": {"id": vessel_id}, # 传递字符串ID,不是Resource对象 + "vessel": {"id": vessel_id}, "stir_speed": stir_speed, "purpose": purpose or f"启动搅拌 {stir_speed} RPM" } }] - - debug_print(f"启动搅拌协议生成完成: {stirrer_id}") + + debug_print(f"启动搅拌协议: {vessel_display}, {stir_speed}RPM, device={stirrer_id}") return action_sequence def generate_stop_stir_protocol( @@ -233,16 +208,13 @@ def generate_stop_stir_protocol( ) -> List[Dict[str, Any]]: """生成停止搅拌操作的协议序列 - 修复vessel参数传递""" - # 🔧 核心修改:正确处理vessel参数 vessel_id = extract_vessel_id(vessel) vessel_display = get_vessel_display_info(vessel) - - # 🔧 关键修复:确保vessel_resource是完整的Resource对象 + + # 确保vessel_resource是完整的Resource对象 if isinstance(vessel, dict): - vessel_resource = vessel # 已经是完整的Resource字典 - debug_print(f"✅ 使用传入的vessel Resource对象") + vessel_resource = vessel else: - # 如果只是字符串,构建一个基本的Resource对象 vessel_resource = { "id": vessel, "name": "", @@ -258,28 +230,23 @@ def generate_stop_stir_protocol( "sample_id": "", "type": "" } - debug_print(f"构建了基本的vessel Resource对象: {vessel}") - - debug_print(f"停止搅拌协议: vessel={vessel_id}") # 基础验证 if not vessel_id or vessel_id not in G.nodes(): raise ValueError("vessel 参数无效") - + # 查找设备 stirrer_id = find_connected_stirrer(G, vessel_id) - - # 🔧 关键修复:传递vessel_id字符串 + action_sequence = [{ "device_id": stirrer_id, "action_name": "stop_stir", "action_kwargs": { - # 🔧 关键修复:传递vessel_id字符串,而不是完整的Resource对象 - "vessel": {"id": vessel_id}, # 传递字符串ID,不是Resource对象 + "vessel": {"id": vessel_id}, } }] - - debug_print(f"停止搅拌协议生成完成: {stirrer_id}") + + debug_print(f"停止搅拌协议: {vessel_display}, device={stirrer_id}") return action_sequence # 便捷函数 @@ -287,84 +254,79 @@ def stir_briefly(G: nx.DiGraph, vessel: Union[str, dict], speed: float = 300.0) -> List[Dict[str, Any]]: """短时间搅拌(30秒)""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"⚡ 短时间搅拌: {vessel_display} @ {speed}RPM (30s)") + debug_print(f"短时间搅拌: {vessel_display} @ {speed}RPM (30s)") return generate_stir_protocol(G, vessel, time="30", stir_speed=speed) def stir_slowly(G: nx.DiGraph, vessel: Union[str, dict], time: Union[str, float] = "10 min") -> List[Dict[str, Any]]: """慢速搅拌""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"🐌 慢速搅拌: {vessel_display} @ 150RPM") + debug_print(f"慢速搅拌: {vessel_display} @ 150RPM") return generate_stir_protocol(G, vessel, time=time, stir_speed=150.0) def stir_vigorously(G: nx.DiGraph, vessel: Union[str, dict], time: Union[str, float] = "5 min") -> List[Dict[str, Any]]: """剧烈搅拌""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"💨 剧烈搅拌: {vessel_display} @ 800RPM") + debug_print(f"剧烈搅拌: {vessel_display} @ 800RPM") return generate_stir_protocol(G, vessel, time=time, stir_speed=800.0) def stir_for_reaction(G: nx.DiGraph, vessel: Union[str, dict], time: Union[str, float] = "1 h") -> List[Dict[str, Any]]: """反应搅拌(标准速度,长时间)""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"🧪 反应搅拌: {vessel_display} @ 400RPM") + debug_print(f"反应搅拌: {vessel_display} @ 400RPM") return generate_stir_protocol(G, vessel, time=time, stir_speed=400.0) def stir_for_dissolution(G: nx.DiGraph, vessel: Union[str, dict], time: Union[str, float] = "15 min") -> List[Dict[str, Any]]: """溶解搅拌(中等速度)""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"💧 溶解搅拌: {vessel_display} @ 500RPM") + debug_print(f"溶解搅拌: {vessel_display} @ 500RPM") return generate_stir_protocol(G, vessel, time=time, stir_speed=500.0) def stir_gently(G: nx.DiGraph, vessel: Union[str, dict], time: Union[str, float] = "30 min") -> List[Dict[str, Any]]: """温和搅拌""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"🍃 温和搅拌: {vessel_display} @ 200RPM") + debug_print(f"温和搅拌: {vessel_display} @ 200RPM") return generate_stir_protocol(G, vessel, time=time, stir_speed=200.0) def stir_overnight(G: nx.DiGraph, vessel: Union[str, dict]) -> List[Dict[str, Any]]: """过夜搅拌(模拟时缩短为2小时)""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"🌙 过夜搅拌(模拟2小时): {vessel_display} @ 300RPM") + debug_print(f"过夜搅拌(模拟2小时): {vessel_display} @ 300RPM") return generate_stir_protocol(G, vessel, time="2 h", stir_speed=300.0) def start_continuous_stirring(G: nx.DiGraph, vessel: Union[str, dict], speed: float = 300.0, purpose: str = "continuous stirring") -> List[Dict[str, Any]]: """开始连续搅拌""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"🔄 开始连续搅拌: {vessel_display} @ {speed}RPM") + debug_print(f"开始连续搅拌: {vessel_display} @ {speed}RPM") return generate_start_stir_protocol(G, vessel, stir_speed=speed, purpose=purpose) def stop_all_stirring(G: nx.DiGraph, vessel: Union[str, dict]) -> List[Dict[str, Any]]: """停止所有搅拌""" vessel_display = get_vessel_display_info(vessel) - debug_print(f"🛑 停止搅拌: {vessel_display}") + debug_print(f"停止搅拌: {vessel_display}") return generate_stop_stir_protocol(G, vessel) # 测试函数 def test_stir_protocol(): """测试搅拌协议""" - debug_print("🧪 === STIR PROTOCOL 测试 === ✨") - - # 测试vessel参数处理 - debug_print("🔧 测试vessel参数处理...") - # 测试字典格式 vessel_dict = {"id": "flask_1", "name": "反应瓶1"} vessel_id = extract_vessel_id(vessel_dict) vessel_display = get_vessel_display_info(vessel_dict) - debug_print(f" 字典格式: {vessel_dict} → ID: {vessel_id}, 显示: {vessel_display}") - + debug_print(f"字典格式: {vessel_dict} -> ID: {vessel_id}, 显示: {vessel_display}") + # 测试字符串格式 vessel_str = "flask_2" vessel_id = extract_vessel_id(vessel_str) vessel_display = get_vessel_display_info(vessel_str) - debug_print(f" 字符串格式: {vessel_str} → ID: {vessel_id}, 显示: {vessel_display}") - - debug_print("✅ 测试完成 🎉") + debug_print(f"字符串格式: {vessel_str} -> ID: {vessel_id}, 显示: {vessel_display}") + + debug_print("测试完成") if __name__ == "__main__": test_stir_protocol()