diff --git a/tests/compile/__init__.py b/tests/compile/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/compile/test_batch_transfer_protocol.py b/tests/compile/test_batch_transfer_protocol.py new file mode 100644 index 00000000..df32f954 --- /dev/null +++ b/tests/compile/test_batch_transfer_protocol.py @@ -0,0 +1,296 @@ +""" +批量转运编译器测试 + +覆盖:单物料退化、刚好一批、多批次、空操作、AGV 配置发现、children dict 状态。 +""" + +import pytest +import networkx as nx + +from unilabos.compile.batch_transfer_protocol import generate_batch_transfer_protocol +from unilabos.compile.agv_transfer_protocol import generate_agv_transfer_protocol +from unilabos.compile._agv_utils import find_agv_config, get_agv_capacity, split_batches + + +# ============ 构建测试用设备图 ============ + +def _make_graph(capacity_x=2, capacity_y=1, capacity_z=1): + """构建包含 AGV 节点的测试设备图""" + G = nx.DiGraph() + + # AGV 节点 + G.add_node("AGV", **{ + "type": "device", + "class_": "agv_transport_station", + "config": { + "protocol_type": ["AGVTransferProtocol", "BatchTransferProtocol"], + "device_roles": { + "navigator": "zhixing_agv", + "arm": "zhixing_ur_arm" + }, + "route_table": { + "StationA->StationB": { + "nav_command": '{"target": "LM1"}', + "arm_pick": '{"task_name": "pick.urp"}', + "arm_place": '{"task_name": "place.urp"}' + }, + "AGV->StationA": { + "nav_command": '{"target": "LM1"}', + "arm_pick": '{"task_name": "pick.urp"}', + "arm_place": '{"task_name": "place.urp"}' + }, + "StationA->StationA": { + "nav_command": '{"target": "LM1"}', + "arm_pick": '{"task_name": "pick.urp"}', + "arm_place": '{"task_name": "place.urp"}' + }, + } + } + }) + + # AGV 子设备 + G.add_node("zhixing_agv", type="device", class_="zhixing_agv") + G.add_node("zhixing_ur_arm", type="device", class_="zhixing_ur_arm") + G.add_edge("AGV", "zhixing_agv") + G.add_edge("AGV", "zhixing_ur_arm") + + # AGV Warehouse 子资源 + G.add_node("agv_platform", **{ + "type": "warehouse", + "config": { + "name": "agv_platform", + "num_items_x": capacity_x, + "num_items_y": capacity_y, + "num_items_z": capacity_z, + } + }) + G.add_edge("AGV", "agv_platform") + + # 来源/目标工站 + G.add_node("StationA", type="device", class_="workstation") + G.add_node("StationB", type="device", class_="workstation") + + return G + + +def _make_repos(items_count=2): + """构建测试用的 from_repo 和 to_repo dict""" + children = {} + for i in range(items_count): + pos = f"A{i + 1:02d}" + children[pos] = { + "id": f"resource_{i + 1}", + "name": f"R{i + 1}", + "parent": "StationA", + "type": "resource", + } + + from_repo = { + "StationA": { + "id": "StationA", + "name": "StationA", + "children": children, + } + } + to_repo = { + "StationB": { + "id": "StationB", + "name": "StationB", + "children": {}, + } + } + return from_repo, to_repo + + +def _make_items(count=2): + """构建 transfer_resources / from_positions / to_positions""" + resources = [ + { + "id": f"resource_{i + 1}", + "name": f"R{i + 1}", + "sample_id": f"uuid-{i + 1}", + "parent": "StationA", + "type": "resource", + } + for i in range(count) + ] + from_positions = [f"A{i + 1:02d}" for i in range(count)] + to_positions = [f"A{i + 1:02d}" for i in range(count)] + return resources, from_positions, to_positions + + +# ============ _agv_utils 测试 ============ + +class TestAGVUtils: + def test_find_agv_config(self): + G = _make_graph() + cfg = find_agv_config(G) + assert cfg["agv_id"] == "AGV" + assert cfg["device_roles"]["navigator"] == "zhixing_agv" + assert cfg["device_roles"]["arm"] == "zhixing_ur_arm" + assert "StationA->StationB" in cfg["route_table"] + + def test_find_agv_config_by_id(self): + G = _make_graph() + cfg = find_agv_config(G, agv_id="AGV") + assert cfg["agv_id"] == "AGV" + + def test_find_agv_config_not_found(self): + G = nx.DiGraph() + G.add_node("SomeDevice", type="device", class_="pump") + with pytest.raises(ValueError, match="未找到 AGV"): + find_agv_config(G) + + def test_get_agv_capacity(self): + G = _make_graph(capacity_x=2, capacity_y=1, capacity_z=1) + assert get_agv_capacity(G, "AGV") == 2 + + def test_get_agv_capacity_multi_layer(self): + G = _make_graph(capacity_x=1, capacity_y=2, capacity_z=3) + assert get_agv_capacity(G, "AGV") == 6 + + def test_split_batches_exact(self): + assert split_batches([1, 2], 2) == [[1, 2]] + + def test_split_batches_overflow(self): + assert split_batches([1, 2, 3], 2) == [[1, 2], [3]] + + def test_split_batches_single(self): + assert split_batches([1], 4) == [[1]] + + def test_split_batches_zero_capacity(self): + with pytest.raises(ValueError): + split_batches([1], 0) + + +# ============ 批量转运编译器测试 ============ + +class TestBatchTransferProtocol: + def test_empty_items(self): + """空物料列表返回空 steps""" + G = _make_graph() + from_repo, to_repo = _make_repos(0) + steps = generate_batch_transfer_protocol(G, from_repo, to_repo, [], [], []) + assert steps == [] + + def test_single_item(self): + """单物料转运(BatchTransfer 退化为单物料)""" + G = _make_graph(capacity_x=2) + from_repo, to_repo = _make_repos(1) + resources, from_pos, to_pos = _make_items(1) + steps = generate_batch_transfer_protocol(G, from_repo, to_repo, resources, from_pos, to_pos) + + # 应该有: nav到来源 + 1个pick + nav到目标 + 1个place = 4 steps + assert len(steps) == 4 + assert steps[0]["action_name"] == "send_nav_task" + assert steps[1]["action_name"] == "move_pos_task" + assert steps[1]["_transfer_meta"]["phase"] == "pick" + assert steps[2]["action_name"] == "send_nav_task" + assert steps[3]["action_name"] == "move_pos_task" + assert steps[3]["_transfer_meta"]["phase"] == "place" + + def test_exact_capacity(self): + """物料数 = AGV 容量,刚好一批""" + G = _make_graph(capacity_x=2) + from_repo, to_repo = _make_repos(2) + resources, from_pos, to_pos = _make_items(2) + steps = generate_batch_transfer_protocol(G, from_repo, to_repo, resources, from_pos, to_pos) + + # nav + 2 pick + nav + 2 place = 6 steps + assert len(steps) == 6 + pick_steps = [s for s in steps if s.get("_transfer_meta", {}).get("phase") == "pick"] + place_steps = [s for s in steps if s.get("_transfer_meta", {}).get("phase") == "place"] + assert len(pick_steps) == 2 + assert len(place_steps) == 2 + + def test_multi_batch(self): + """物料数 > AGV 容量,自动分批""" + G = _make_graph(capacity_x=2) + from_repo, to_repo = _make_repos(3) + resources, from_pos, to_pos = _make_items(3) + steps = generate_batch_transfer_protocol(G, from_repo, to_repo, resources, from_pos, to_pos) + + # 批次1: nav + 2 pick + nav + 2 place + nav(返回) = 7 + # 批次2: nav + 1 pick + nav + 1 place = 4 + # 总计 11 steps + assert len(steps) == 11 + + nav_steps = [s for s in steps if s["action_name"] == "send_nav_task"] + # 批次1: 2 nav(去来源+去目标) + 1 nav(返回) + 批次2: 2 nav = 5 nav + assert len(nav_steps) == 5 + + def test_children_dict_updated(self): + """compile 阶段三方 children dict 状态正确""" + G = _make_graph(capacity_x=2) + from_repo, to_repo = _make_repos(2) + resources, from_pos, to_pos = _make_items(2) + + assert "A01" in from_repo["StationA"]["children"] + assert "A02" in from_repo["StationA"]["children"] + assert len(to_repo["StationB"]["children"]) == 0 + + generate_batch_transfer_protocol(G, from_repo, to_repo, resources, from_pos, to_pos) + + # compile 后 from_repo 的 children 应该被 pop 掉 + assert "A01" not in from_repo["StationA"]["children"] + assert "A02" not in from_repo["StationA"]["children"] + # to_repo 应该有新物料 + assert "A01" in to_repo["StationB"]["children"] + assert "A02" in to_repo["StationB"]["children"] + assert to_repo["StationB"]["children"]["A01"]["id"] == "resource_1" + + def test_device_ids_from_config(self): + """设备 ID 全部从配置读取,不硬编码""" + G = _make_graph() + from_repo, to_repo = _make_repos(1) + resources, from_pos, to_pos = _make_items(1) + steps = generate_batch_transfer_protocol(G, from_repo, to_repo, resources, from_pos, to_pos) + + device_ids = {s["device_id"] for s in steps} + assert "zhixing_agv" in device_ids + assert "zhixing_ur_arm" in device_ids + + def test_route_not_found(self): + """路由表中无对应路线时报错""" + G = _make_graph() + from_repo = {"Unknown": {"id": "Unknown", "children": {"A01": {"id": "R1", "parent": "Unknown"}}}} + to_repo = {"Other": {"id": "Other", "children": {}}} + resources = [{"id": "R1", "name": "R1"}] + with pytest.raises(KeyError, match="路由表"): + generate_batch_transfer_protocol(G, from_repo, to_repo, resources, ["A01"], ["B01"]) + + def test_length_mismatch(self): + """三个数组长度不一致时报错""" + G = _make_graph() + from_repo, to_repo = _make_repos(2) + resources = [{"id": "R1"}] + with pytest.raises(ValueError, match="长度不一致"): + generate_batch_transfer_protocol(G, from_repo, to_repo, resources, ["A01", "A02"], ["B01"]) + + +# ============ 改造后的 AGV 单物料编译器测试 ============ + +class TestAGVTransferProtocol: + def test_single_transfer_from_config(self): + """改造后的单物料编译器从 G 读取配置""" + G = _make_graph() + from_repo = {"StationA": {"id": "StationA", "children": {"A01": {"id": "R1", "parent": "StationA"}}}} + to_repo = {"StationB": {"id": "StationB", "children": {}}} + steps = generate_agv_transfer_protocol(G, from_repo, "A01", to_repo, "B01") + + assert len(steps) == 2 + assert steps[0]["device_id"] == "zhixing_agv" + assert steps[0]["action_name"] == "send_nav_task" + assert steps[1]["device_id"] == "zhixing_ur_arm" + assert steps[1]["action_name"] == "move_pos_task" + + def test_children_updated(self): + """单物料编译后 children dict 正确更新""" + G = _make_graph() + from_repo = {"StationA": {"id": "StationA", "children": {"A01": {"id": "R1", "parent": "StationA"}}}} + to_repo = {"StationB": {"id": "StationB", "children": {}}} + generate_agv_transfer_protocol(G, from_repo, "A01", to_repo, "B01") + + assert "A01" not in from_repo["StationA"]["children"] + assert "B01" in to_repo["StationB"]["children"] + assert to_repo["StationB"]["children"]["B01"]["parent"] == "StationB" diff --git a/tests/compile/test_full_chain_conversion_to_compile.py b/tests/compile/test_full_chain_conversion_to_compile.py new file mode 100644 index 00000000..577dee6b --- /dev/null +++ b/tests/compile/test_full_chain_conversion_to_compile.py @@ -0,0 +1,706 @@ +""" +全链路集成测试:ROS Goal 转换 → ResourceTreeSet → get_plr_nested_dict → 编译器 → 动作列表 + +模拟 workstation.py 中的完整路径: +1. host 返回 raw_data(模拟 resource_get 响应) +2. ResourceTreeSet.from_raw_dict_list(raw_data) 构建资源树 +3. tree.root_node.get_plr_nested_dict() 生成嵌套 dict +4. protocol_kwargs 传给编译器 +5. 编译器返回 action_list,验证结构和关键字段 +""" + +import copy +import json +import pytest +import networkx as nx + +from unilabos.resources.resource_tracker import ( + ResourceDictInstance, + ResourceTreeSet, +) +from unilabos.compile.utils.resource_helper import ( + ensure_resource_instance, + resource_to_dict, + get_resource_id, + get_resource_data, +) +from unilabos.compile.utils.vessel_parser import get_vessel + +# ============ 构建模拟设备图 ============ + +def _build_test_graph(): + """构建一个包含常用设备节点的测试图""" + G = nx.DiGraph() + + # 容器 + G.add_node("reactor_01", **{ + "id": "reactor_01", + "name": "reactor_01", + "type": "device", + "class": "virtual_stirrer", + "data": {}, + "config": {}, + }) + + # 搅拌设备 + G.add_node("stirrer_1", **{ + "id": "stirrer_1", + "name": "stirrer_1", + "type": "device", + "class": "virtual_stirrer", + "data": {}, + "config": {}, + }) + G.add_edge("stirrer_1", "reactor_01") + + # 加热设备 + G.add_node("heatchill_1", **{ + "id": "heatchill_1", + "name": "heatchill_1", + "type": "device", + "class": "virtual_heatchill", + "data": {}, + "config": {}, + }) + G.add_edge("heatchill_1", "reactor_01") + + # 试剂容器(液体) + G.add_node("flask_water", **{ + "id": "flask_water", + "name": "flask_water", + "type": "container", + "class": "", + "data": {"reagent_name": "water", "liquid": [{"liquid_type": "water", "volume": 500.0}]}, + "config": {"reagent": "water"}, + }) + + # 固体加样器 + G.add_node("solid_dispenser_1", **{ + "id": "solid_dispenser_1", + "name": "solid_dispenser_1", + "type": "device", + "class": "solid_dispenser", + "data": {}, + "config": {}, + }) + + # 泵 + G.add_node("pump_1", **{ + "id": "pump_1", + "name": "pump_1", + "type": "device", + "class": "virtual_pump", + "data": {}, + "config": {}, + }) + G.add_edge("flask_water", "pump_1") + G.add_edge("pump_1", "reactor_01") + + return G + + +# ============ 构建模拟 host 返回数据 ============ + +def _make_raw_resource( + id="reactor_01", + uuid="uuid-reactor-01", + name="reactor_01", + klass="virtual_stirrer", + type_="device", + parent=None, + parent_uuid=None, + data=None, + config=None, + extra=None, +): + """模拟 host 返回的单个资源 dict(与 resource_get 服务响应一致)""" + return { + "id": id, + "uuid": uuid, + "name": name, + "class": klass, + "type": type_, + "parent": parent, + "parent_uuid": parent_uuid or "", + "description": "", + "config": config or {}, + "data": data or {}, + "extra": extra or {}, + "position": {"x": 0.0, "y": 0.0, "z": 0.0}, + } + + +def _simulate_workstation_resource_enrichment(raw_data_list, field_type="unilabos_msgs/Resource"): + """ + 模拟 workstation.py 中 resource enrichment 的核心逻辑: + raw_data → ResourceTreeSet.from_raw_dict_list → get_plr_nested_dict → protocol_kwargs[k] + """ + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data_list) + + if field_type == "unilabos_msgs/Resource": + # 单个 Resource:取第一棵树的根节点 + root_instance = tree_set.trees[0].root_node if tree_set.trees else None + return root_instance.get_plr_nested_dict() if root_instance else {} + else: + # sequence:返回列表 + return [tree.root_node.get_plr_nested_dict() for tree in tree_set.trees] + + +# ============ 全链路测试:Stir 协议 ============ + +class TestStirProtocolFullChain: + """Stir 协议全链路:host raw_data → enriched dict → compiler → action_list""" + + def test_stir_with_enriched_resource_dict(self): + """单个 Resource 经过 enrichment 后传给 stir compiler""" + from unilabos.compile.stir_protocol import generate_stir_protocol + + raw_data = [_make_raw_resource( + id="reactor_01", uuid="uuid-reactor-01", + klass="virtual_stirrer", type_="device", + )] + + # 模拟 workstation enrichment + enriched_vessel = _simulate_workstation_resource_enrichment(raw_data) + + assert enriched_vessel["id"] == "reactor_01" + assert enriched_vessel["uuid"] == "uuid-reactor-01" + assert enriched_vessel["class"] == "virtual_stirrer" + + # 传给编译器 + G = _build_test_graph() + actions = generate_stir_protocol( + G=G, + vessel=enriched_vessel, + time="60", + stir_speed=300.0, + ) + + assert isinstance(actions, list) + assert len(actions) >= 1 + action = actions[0] + assert action["device_id"] == "stirrer_1" + assert action["action_name"] == "stir" + assert "vessel" in action["action_kwargs"] + assert action["action_kwargs"]["vessel"]["id"] == "reactor_01" + + def test_stir_with_resource_dict_instance(self): + """直接用 ResourceDictInstance 传给 stir compiler(通过 get_plr_nested_dict 转换)""" + from unilabos.compile.stir_protocol import generate_stir_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) + inst = tree_set.trees[0].root_node + + # 通过 resource_to_dict 转换(resource_helper 兼容层) + vessel_dict = resource_to_dict(inst) + assert isinstance(vessel_dict, dict) + assert vessel_dict["id"] == "reactor_01" + + G = _build_test_graph() + actions = generate_stir_protocol(G=G, vessel=vessel_dict, time="30") + + assert len(actions) >= 1 + assert actions[0]["action_name"] == "stir" + + def test_stir_with_string_vessel(self): + """兼容旧模式:直接传 vessel 字符串""" + from unilabos.compile.stir_protocol import generate_stir_protocol + + G = _build_test_graph() + actions = generate_stir_protocol(G=G, vessel="reactor_01", time="30") + + assert len(actions) >= 1 + assert actions[0]["device_id"] == "stirrer_1" + assert actions[0]["action_kwargs"]["vessel"]["id"] == "reactor_01" + + +# ============ 全链路测试:HeatChill 协议 ============ + +class TestHeatChillProtocolFullChain: + """HeatChill 协议全链路""" + + def test_heatchill_with_enriched_resource(self): + from unilabos.compile.heatchill_protocol import generate_heat_chill_protocol + + raw_data = [_make_raw_resource(id="reactor_01", klass="virtual_stirrer")] + enriched_vessel = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_heat_chill_protocol( + G=G, + vessel=enriched_vessel, + temp=80.0, + time="300", + ) + + assert isinstance(actions, list) + assert len(actions) >= 1 + action = actions[0] + assert action["device_id"] == "heatchill_1" + assert action["action_name"] == "heat_chill" + assert action["action_kwargs"]["temp"] == 80.0 + + def test_heatchill_start_with_enriched_resource(self): + from unilabos.compile.heatchill_protocol import generate_heat_chill_start_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched_vessel = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_heat_chill_start_protocol( + G=G, + vessel=enriched_vessel, + temp=60.0, + ) + + assert len(actions) >= 1 + assert actions[0]["action_name"] == "heat_chill_start" + assert actions[0]["action_kwargs"]["temp"] == 60.0 + + def test_heatchill_stop_with_enriched_resource(self): + from unilabos.compile.heatchill_protocol import generate_heat_chill_stop_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched_vessel = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_heat_chill_stop_protocol(G=G, vessel=enriched_vessel) + + assert len(actions) >= 1 + assert actions[0]["action_name"] == "heat_chill_stop" + + +# ============ 全链路测试:Add 协议 ============ + +class TestAddProtocolFullChain: + """Add 协议全链路:vessel enrichment + reagent 查找 + 泵传输""" + + def test_add_solid_with_enriched_resource(self): + from unilabos.compile.add_protocol import generate_add_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched_vessel = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_add_protocol( + G=G, + vessel=enriched_vessel, + reagent="NaCl", + mass="5 g", + ) + + assert isinstance(actions, list) + assert len(actions) >= 1 + # 应该包含至少一个 add_solid 或 log_message 动作 + action_names = [a.get("action_name", "") for a in actions] + assert any(name in ["add_solid", "log_message"] for name in action_names) + + def test_add_liquid_with_enriched_resource(self): + from unilabos.compile.add_protocol import generate_add_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched_vessel = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_add_protocol( + G=G, + vessel=enriched_vessel, + reagent="water", + volume="10 mL", + ) + + assert isinstance(actions, list) + assert len(actions) >= 1 + + +# ============ 全链路测试:ResourceDictInstance 兼容层 ============ + +class TestResourceDictInstanceCompatibility: + """验证编译器兼容层对 ResourceDictInstance 的处理""" + + def test_get_vessel_from_enriched_dict(self): + """get_vessel 对 enriched dict 的处理""" + raw_data = [_make_raw_resource( + id="reactor_01", + data={"temperature": 25.0, "liquid": [{"liquid_type": "water", "volume": 10.0}]}, + )] + enriched = _simulate_workstation_resource_enrichment(raw_data) + + vessel_id, vessel_data = get_vessel(enriched) + assert vessel_id == "reactor_01" + assert vessel_data["temperature"] == 25.0 + assert len(vessel_data["liquid"]) == 1 + + def test_get_vessel_from_resource_instance(self): + """get_vessel 直接对 ResourceDictInstance 的处理""" + raw_data = [_make_raw_resource( + id="reactor_01", + data={"temperature": 25.0}, + )] + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) + inst = tree_set.trees[0].root_node + + vessel_id, vessel_data = get_vessel(inst) + assert vessel_id == "reactor_01" + assert vessel_data["temperature"] == 25.0 + + def test_ensure_resource_instance_round_trip(self): + """ensure_resource_instance → resource_to_dict 无损往返""" + raw_data = [_make_raw_resource( + id="reactor_01", uuid="uuid-r01", klass="virtual_stirrer", + data={"temp": 25.0}, + )] + enriched = _simulate_workstation_resource_enrichment(raw_data) + + # dict → ResourceDictInstance + inst = ensure_resource_instance(enriched) + assert isinstance(inst, ResourceDictInstance) + assert inst.res_content.id == "reactor_01" + assert inst.res_content.uuid == "uuid-r01" + + # ResourceDictInstance → dict + d = resource_to_dict(inst) + assert isinstance(d, dict) + assert d["id"] == "reactor_01" + assert d["uuid"] == "uuid-r01" + assert d["class"] == "virtual_stirrer" + + +# ============ 全链路测试:带 children 的资源树 ============ + +class TestResourceTreeWithChildren: + """测试带 children 结构的资源树通过编译器的路径""" + + def _make_tree_with_children(self): + """构建 StationA -> [Flask1, Flask2] 的资源树""" + return [ + _make_raw_resource( + id="StationA", uuid="uuid-station-a", + name="StationA", klass="workstation", type_="device", + ), + _make_raw_resource( + id="Flask1", uuid="uuid-flask-1", + name="Flask1", klass="", type_="resource", + parent="StationA", parent_uuid="uuid-station-a", + data={"liquid": [{"liquid_type": "water", "volume": 10.0}]}, + ), + _make_raw_resource( + id="Flask2", uuid="uuid-flask-2", + name="Flask2", klass="", type_="resource", + parent="StationA", parent_uuid="uuid-station-a", + data={"liquid": [{"liquid_type": "ethanol", "volume": 5.0}]}, + ), + ] + + def test_enrichment_preserves_children_structure(self): + """验证 enrichment 后 children 为嵌套 dict""" + raw_data = self._make_tree_with_children() + enriched = _simulate_workstation_resource_enrichment(raw_data) + + assert enriched["id"] == "StationA" + assert "children" in enriched + assert isinstance(enriched["children"], dict) + assert "Flask1" in enriched["children"] + assert "Flask2" in enriched["children"] + + def test_children_preserve_uuid_and_data(self): + """验证 children 中的 uuid 和 data 被正确保留""" + raw_data = self._make_tree_with_children() + enriched = _simulate_workstation_resource_enrichment(raw_data) + + flask1 = enriched["children"]["Flask1"] + assert flask1["uuid"] == "uuid-flask-1" + assert flask1["data"]["liquid"][0]["liquid_type"] == "water" + assert flask1["data"]["liquid"][0]["volume"] == 10.0 + + flask2 = enriched["children"]["Flask2"] + assert flask2["uuid"] == "uuid-flask-2" + assert flask2["data"]["liquid"][0]["liquid_type"] == "ethanol" + + def test_children_dict_can_be_popped(self): + """模拟 batch_transfer_protocol 中 pop children 的操作""" + raw_data = self._make_tree_with_children() + enriched = _simulate_workstation_resource_enrichment(raw_data) + + # batch_transfer_protocol 中会 pop children + children = enriched["children"] + popped = children.pop("Flask1") + assert popped["id"] == "Flask1" + assert "Flask1" not in enriched["children"] + assert "Flask2" in enriched["children"] + + def test_children_dict_usable_as_from_repo(self): + """模拟 batch_transfer_protocol 中 from_repo 参数""" + raw_data = self._make_tree_with_children() + enriched = _simulate_workstation_resource_enrichment(raw_data) + + # 模拟编译器接收的 from_repo 格式 + from_repo = {"StationA": enriched} + from_repo_ = list(from_repo.values())[0] + + assert from_repo_["id"] == "StationA" + assert "Flask1" in from_repo_["children"] + assert from_repo_["children"]["Flask1"]["uuid"] == "uuid-flask-1" + + def test_sequence_resource_enrichment(self): + """sequence 情况:多个独立资源树""" + raw_data1 = [_make_raw_resource(id="R1", uuid="uuid-r1")] + raw_data2 = [_make_raw_resource(id="R2", uuid="uuid-r2")] + + tree_set1 = ResourceTreeSet.from_raw_dict_list(raw_data1) + tree_set2 = ResourceTreeSet.from_raw_dict_list(raw_data2) + + results = [ + tree.root_node.get_plr_nested_dict() + for ts in [tree_set1, tree_set2] + for tree in ts.trees + ] + + assert len(results) == 2 + assert results[0]["id"] == "R1" + assert results[1]["id"] == "R2" + + +# ============ 全链路测试:动作列表结构验证 ============ + +class TestActionListStructure: + """验证编译器返回的 action_list 结构符合 workstation 预期""" + + def _validate_action(self, action): + """验证单个 action dict 的结构""" + if action.get("action_name") == "wait": + # wait 伪动作不需要 device_id + assert "action_kwargs" in action + assert "time" in action["action_kwargs"] + return + + if action.get("action_name") == "log_message": + # log 伪动作 + assert "action_kwargs" in action + return + + # 正常设备动作 + assert "device_id" in action, f"action 缺少 device_id: {action}" + assert "action_name" in action, f"action 缺少 action_name: {action}" + assert "action_kwargs" in action, f"action 缺少 action_kwargs: {action}" + assert isinstance(action["action_kwargs"], dict) + + def test_stir_action_list_structure(self): + from unilabos.compile.stir_protocol import generate_stir_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_stir_protocol(G=G, vessel=enriched, time="60") + + for action in actions: + if isinstance(action, list): + # 并行动作 + for sub_action in action: + self._validate_action(sub_action) + else: + self._validate_action(action) + + def test_heatchill_action_list_structure(self): + from unilabos.compile.heatchill_protocol import generate_heat_chill_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_heat_chill_protocol(G=G, vessel=enriched, temp=80.0, time="60") + + for action in actions: + if isinstance(action, list): + for sub_action in action: + self._validate_action(sub_action) + else: + self._validate_action(action) + + def test_add_action_list_structure(self): + from unilabos.compile.add_protocol import generate_add_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched = _simulate_workstation_resource_enrichment(raw_data) + + G = _build_test_graph() + actions = generate_add_protocol(G=G, vessel=enriched, reagent="NaCl", mass="5 g") + + for action in actions: + if isinstance(action, list): + for sub_action in action: + self._validate_action(sub_action) + else: + self._validate_action(action) + + +# ============ 全链路测试:message_converter 到 enrichment ============ + +class TestMessageConverterToEnrichment: + """模拟从 ROS 消息转换后的 dict 到 enrichment 的完整链路""" + + def test_ros_goal_conversion_simulation(self): + """ + 模拟 workstation.py 中的完整流程: + 1. ROS goal 中的 vessel 字段被 convert_from_ros_msg 转换为浅层 dict + 2. workstation 用 resource_id 请求 host 获取完整资源数据 + 3. ResourceTreeSet.from_raw_dict_list 构建资源树 + 4. get_plr_nested_dict 生成嵌套 dict 替换 protocol_kwargs[k] + """ + # 步骤1: 模拟 convert_from_ros_msg 的输出(浅层 dict,只有 id 等基本字段) + shallow_vessel = { + "id": "reactor_01", + "uuid": "uuid-reactor-01", + "name": "reactor_01", + "type": "device", + "category": "virtual_stirrer", + "children": [], + "parent": "", + "parent_uuid": "", + "config": {}, + "data": {}, + "extra": {}, + "position": {"x": 0.0, "y": 0.0, "z": 0.0}, + } + + protocol_kwargs = { + "vessel": shallow_vessel, + "time": "300", + "stir_speed": 300.0, + } + + # 步骤2: 提取 resource_id + resource_id = protocol_kwargs["vessel"]["id"] + assert resource_id == "reactor_01" + + # 步骤3: 模拟 host 返回完整数据(带 children) + host_response = [ + _make_raw_resource( + id="reactor_01", uuid="uuid-reactor-01", + klass="virtual_stirrer", type_="device", + data={"temperature": 25.0, "pressure": 1.0}, + config={"max_temp": 300.0}, + ), + ] + + # 步骤4: enrichment + enriched = _simulate_workstation_resource_enrichment(host_response) + protocol_kwargs["vessel"] = enriched + + # 验证 enrichment 后的 protocol_kwargs + assert protocol_kwargs["vessel"]["id"] == "reactor_01" + assert protocol_kwargs["vessel"]["uuid"] == "uuid-reactor-01" + assert protocol_kwargs["vessel"]["class"] == "virtual_stirrer" + assert protocol_kwargs["vessel"]["data"]["temperature"] == 25.0 + assert protocol_kwargs["vessel"]["config"]["max_temp"] == 300.0 + + # 步骤5: 传给编译器 + from unilabos.compile.stir_protocol import generate_stir_protocol + G = _build_test_graph() + actions = generate_stir_protocol(G=G, **protocol_kwargs) + + assert len(actions) >= 1 + assert actions[0]["device_id"] == "stirrer_1" + assert actions[0]["action_name"] == "stir" + + def test_ros_goal_with_children_enrichment(self): + """ROS goal → enrichment 带 children 的场景(batch transfer)""" + # 模拟 host 返回带 children 的数据 + host_response = [ + _make_raw_resource( + id="StationA", uuid="uuid-sa", klass="workstation", type_="device", + config={"num_items_x": 4, "num_items_y": 2}, + ), + _make_raw_resource( + id="Plate1", uuid="uuid-p1", type_="resource", + parent="StationA", parent_uuid="uuid-sa", + data={"sample": "sample_A"}, + ), + _make_raw_resource( + id="Plate2", uuid="uuid-p2", type_="resource", + parent="StationA", parent_uuid="uuid-sa", + data={"sample": "sample_B"}, + ), + ] + + enriched = _simulate_workstation_resource_enrichment(host_response) + + assert enriched["id"] == "StationA" + assert enriched["class"] == "workstation" + assert len(enriched["children"]) == 2 + assert enriched["children"]["Plate1"]["data"]["sample"] == "sample_A" + assert enriched["children"]["Plate2"]["uuid"] == "uuid-p2" + + # 模拟 batch_transfer 的 from_repo 格式 + from_repo = {"StationA": enriched} + from_repo_ = list(from_repo.values())[0] + assert "Plate1" in from_repo_["children"] + assert from_repo_["children"]["Plate1"]["uuid"] == "uuid-p1" + + +# ============ 全链路测试:多协议连续调用 ============ + +class TestMultiProtocolChain: + """模拟连续执行多个协议(如 add → stir → heatchill)""" + + def test_sequential_protocol_execution(self): + """模拟典型合成路径:add → stir → heatchill""" + from unilabos.compile.stir_protocol import generate_stir_protocol + from unilabos.compile.heatchill_protocol import generate_heat_chill_protocol + from unilabos.compile.add_protocol import generate_add_protocol + + raw_data = [_make_raw_resource( + id="reactor_01", uuid="uuid-reactor-01", + klass="virtual_stirrer", type_="device", + )] + enriched = _simulate_workstation_resource_enrichment(raw_data) + G = _build_test_graph() + + # 每次调用用 enriched 的副本,避免编译器修改原数据 + all_actions = [] + + # 步骤1: 添加试剂 + add_actions = generate_add_protocol( + G=G, vessel=copy.deepcopy(enriched), + reagent="NaCl", mass="5 g", + ) + all_actions.extend(add_actions) + + # 步骤2: 搅拌 + stir_actions = generate_stir_protocol( + G=G, vessel=copy.deepcopy(enriched), + time="60", stir_speed=300.0, + ) + all_actions.extend(stir_actions) + + # 步骤3: 加热 + heat_actions = generate_heat_chill_protocol( + G=G, vessel=copy.deepcopy(enriched), + temp=80.0, time="300", + ) + all_actions.extend(heat_actions) + + # 验证总动作列表 + assert len(all_actions) >= 3 + # 每个协议至少产生一个核心动作 + action_names = [a.get("action_name", "") for a in all_actions if isinstance(a, dict)] + assert "stir" in action_names + assert "heat_chill" in action_names + + def test_enriched_resource_not_mutated(self): + """验证编译器不应修改传入的 enriched dict(如果需要修改应 deepcopy)""" + from unilabos.compile.stir_protocol import generate_stir_protocol + + raw_data = [_make_raw_resource(id="reactor_01")] + enriched = _simulate_workstation_resource_enrichment(raw_data) + original_id = enriched["id"] + original_uuid = enriched["uuid"] + + G = _build_test_graph() + generate_stir_protocol(G=G, vessel=enriched, time="60") + + # 验证 enriched dict 核心字段未被修改 + assert enriched["id"] == original_id + assert enriched["uuid"] == original_uuid diff --git a/tests/compile/test_pump_separate_full_chain.py b/tests/compile/test_pump_separate_full_chain.py new file mode 100644 index 00000000..8988933a --- /dev/null +++ b/tests/compile/test_pump_separate_full_chain.py @@ -0,0 +1,538 @@ +""" +PumpTransfer 和 Separate 全链路测试 + +构建包含泵/阀门/分液漏斗的完整设备图, +输出完整的中间数据(最短路径、泵骨架、动作列表等)。 +""" + +import copy +import json +import pprint +import pytest +import networkx as nx + +from unilabos.resources.resource_tracker import ResourceTreeSet +from unilabos.compile.utils.resource_helper import get_resource_id, get_resource_data +from unilabos.compile.utils.vessel_parser import get_vessel + + +def _make_raw_resource(id, uuid=None, name=None, klass="", type_="device", + parent=None, parent_uuid=None, data=None, config=None, extra=None): + return { + "id": id, + "uuid": uuid or f"uuid-{id}", + "name": name or id, + "class": klass, + "type": type_, + "parent": parent, + "parent_uuid": parent_uuid or "", + "description": "", + "config": config or {}, + "data": data or {}, + "extra": extra or {}, + "position": {"x": 0.0, "y": 0.0, "z": 0.0}, + } + + +def _simulate_enrichment(raw_data_list): + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data_list) + root = tree_set.trees[0].root_node if tree_set.trees else None + return root.get_plr_nested_dict() if root else {} + + +def _build_pump_transfer_graph(): + """ + 构建带泵/阀门的设备图,用于测试 PumpTransfer: + + flask_water (container) + ↓ + valve_1 (multiway_valve, pump_1 连接) + ↓ + reactor_01 (device) + + 同时有: stirrer_1, heatchill_1, separator_1 + """ + G = nx.DiGraph() + + # 源容器 + G.add_node("flask_water", **{ + "id": "flask_water", "name": "flask_water", + "type": "container", "class": "", + "data": {"reagent_name": "water", "liquid": [{"liquid_type": "water", "volume": 200.0}]}, + "config": {"reagent": "water"}, + }) + + # 多通阀 + G.add_node("valve_1", **{ + "id": "valve_1", "name": "valve_1", + "type": "device", "class": "multiway_valve", + "data": {}, "config": {}, + }) + + # 注射泵(连接到阀门) + G.add_node("pump_1", **{ + "id": "pump_1", "name": "pump_1", + "type": "device", "class": "virtual_pump", + "data": {}, "config": {"max_volume": 25.0}, + }) + + # 目标容器 + G.add_node("reactor_01", **{ + "id": "reactor_01", "name": "reactor_01", + "type": "device", "class": "virtual_stirrer", + "data": {"liquid": [{"liquid_type": "water", "volume": 50.0}]}, + "config": {}, + }) + + # 搅拌器 + G.add_node("stirrer_1", **{ + "id": "stirrer_1", "name": "stirrer_1", + "type": "device", "class": "virtual_stirrer", + "data": {}, "config": {}, + }) + + # 加热器 + G.add_node("heatchill_1", **{ + "id": "heatchill_1", "name": "heatchill_1", + "type": "device", "class": "virtual_heatchill", + "data": {}, "config": {}, + }) + + # 分离器 + G.add_node("separator_1", **{ + "id": "separator_1", "name": "separator_1", + "type": "device", "class": "separator_controller", + "data": {}, "config": {}, + }) + + # 废液容器 + G.add_node("waste_workup", **{ + "id": "waste_workup", "name": "waste_workup", + "type": "container", "class": "", + "data": {}, "config": {}, + }) + + # 产物收集瓶 + G.add_node("product_flask", **{ + "id": "product_flask", "name": "product_flask", + "type": "container", "class": "", + "data": {}, "config": {}, + }) + + # DCM溶剂瓶 + G.add_node("flask_dcm", **{ + "id": "flask_dcm", "name": "flask_dcm", + "type": "container", "class": "", + "data": {"reagent_name": "dcm", "liquid": [{"liquid_type": "dcm", "volume": 500.0}]}, + "config": {"reagent": "dcm"}, + }) + + # 边连接 —— flask_water → valve_1 → reactor_01 + G.add_edge("flask_water", "valve_1", port={"valve_1": "port_1"}) + G.add_edge("valve_1", "reactor_01", port={"valve_1": "port_2"}) + # 阀门 → 泵 + G.add_edge("valve_1", "pump_1") + G.add_edge("pump_1", "valve_1") + # 搅拌器 ↔ reactor + G.add_edge("stirrer_1", "reactor_01") + # 加热器 ↔ reactor + G.add_edge("heatchill_1", "reactor_01") + # 分离器 ↔ reactor + G.add_edge("separator_1", "reactor_01") + G.add_edge("reactor_01", "separator_1") + # DCM → valve → reactor (同一泵路) + G.add_edge("flask_dcm", "valve_1", port={"valve_1": "port_3"}) + # reactor → valve → product/waste + G.add_edge("valve_1", "product_flask", port={"valve_1": "port_4"}) + G.add_edge("valve_1", "waste_workup", port={"valve_1": "port_5"}) + + return G + + +def _format_action(action, indent=0): + """格式化单个 action 为可读字符串""" + prefix = " " * indent + if isinstance(action, list): + # 并行动作 + lines = [f"{prefix}[PARALLEL]"] + for sub in action: + lines.append(_format_action(sub, indent + 1)) + return "\n".join(lines) + + name = action.get("action_name", "?") + device = action.get("device_id", "") + kwargs = action.get("action_kwargs", {}) + comment = action.get("_comment", "") + meta = action.get("_transfer_meta", "") + + parts = [f"{prefix}→ {device}::{name}"] + if kwargs: + # 精简输出 + kw_str = ", ".join(f"{k}={v}" for k, v in kwargs.items() + if k not in ("progress_message",)) + if kw_str: + parts.append(f" kwargs: {{{kw_str}}}") + if comment: + parts.append(f" # {comment}") + if meta: + parts.append(f" meta: {meta}") + return "\n".join(f"{prefix}{p}" if i > 0 else p for i, p in enumerate(parts)) + + +def _dump_actions(actions, title=""): + """打印完整动作列表""" + print(f"\n{'='*70}") + print(f" {title}") + print(f" 总动作数: {len(actions)}") + print(f"{'='*70}") + for i, action in enumerate(actions): + print(f"\n [{i:02d}] {_format_action(action, indent=2)}") + print(f"\n{'='*70}\n") + + +# ==================== PumpTransfer 全链路 ==================== + +class TestPumpTransferFullChain: + """PumpTransfer: 包含图路径查找、泵骨架构建、动作序列生成""" + + def test_pump_transfer_basic(self): + """基础泵转移:flask_water → valve_1 → reactor_01""" + from unilabos.compile.pump_protocol import generate_pump_protocol + + G = _build_pump_transfer_graph() + + # 检查最短路径 + path = nx.shortest_path(G, "flask_water", "reactor_01") + print(f"\n最短路径: {path}") + assert "valve_1" in path + + # 调用编译器 + actions = generate_pump_protocol( + G=G, + from_vessel_id="flask_water", + to_vessel_id="reactor_01", + volume=10.0, + flowrate=2.5, + transfer_flowrate=0.5, + ) + + _dump_actions(actions, "PumpTransfer: flask_water → reactor_01, 10mL") + + # 验证 + assert isinstance(actions, list) + assert len(actions) > 0 + # 应该有 set_valve_position 和 set_position 动作 + flat = [a for a in actions if isinstance(a, dict)] + action_names = [a.get("action_name") for a in flat] + print(f"动作名称列表: {action_names}") + assert "set_valve_position" in action_names + assert "set_position" in action_names + + def test_pump_transfer_with_rinsing_enriched_vessel(self): + """pump_with_rinsing 接收 enriched vessel dict""" + from unilabos.compile.pump_protocol import generate_pump_protocol_with_rinsing + + G = _build_pump_transfer_graph() + + # 模拟 enrichment + from_raw = [_make_raw_resource( + id="flask_water", klass="", type_="container", + data={"reagent_name": "water", "liquid": [{"liquid_type": "water", "volume": 200.0}]}, + )] + to_raw = [_make_raw_resource( + id="reactor_01", klass="virtual_stirrer", type_="device", + )] + + from_enriched = _simulate_enrichment(from_raw) + to_enriched = _simulate_enrichment(to_raw) + + print(f"\nfrom_vessel enriched: {json.dumps(from_enriched, indent=2, ensure_ascii=False)[:300]}...") + print(f"to_vessel enriched: {json.dumps(to_enriched, indent=2, ensure_ascii=False)[:300]}...") + + # get_vessel 兼容 + fid, fdata = get_vessel(from_enriched) + tid, tdata = get_vessel(to_enriched) + print(f"from_vessel_id={fid}, to_vessel_id={tid}") + assert fid == "flask_water" + assert tid == "reactor_01" + + actions = generate_pump_protocol_with_rinsing( + G=G, + from_vessel=from_enriched, + to_vessel=to_enriched, + volume=15.0, + flowrate=2.5, + transfer_flowrate=0.5, + ) + + _dump_actions(actions, "PumpTransferWithRinsing: flask_water → reactor_01, 15mL (enriched)") + + assert isinstance(actions, list) + assert len(actions) > 0 + + def test_pump_transfer_multi_batch(self): + """体积 > max_volume 时自动分批""" + from unilabos.compile.pump_protocol import generate_pump_protocol + + G = _build_pump_transfer_graph() + + # pump_1 的 max_volume = 25mL,转 60mL 应该分 3 批 + actions = generate_pump_protocol( + G=G, + from_vessel_id="flask_water", + to_vessel_id="reactor_01", + volume=60.0, + flowrate=2.5, + transfer_flowrate=0.5, + ) + + _dump_actions(actions, "PumpTransfer 分批: 60mL (max_volume=25mL, 预期 3 批)") + + assert len(actions) > 0 + # 应该有多轮 set_position + flat = [a for a in actions if isinstance(a, dict)] + set_position_count = sum(1 for a in flat if a.get("action_name") == "set_position") + print(f"set_position 动作数: {set_position_count}") + # 3批 × 2次 (吸液 + 排液) = 6 次 set_position + assert set_position_count >= 6 + + def test_pump_transfer_no_path(self): + """无路径时返回空""" + from unilabos.compile.pump_protocol import generate_pump_protocol + + G = _build_pump_transfer_graph() + G.add_node("isolated_flask", type="container") + + actions = generate_pump_protocol( + G=G, + from_vessel_id="isolated_flask", + to_vessel_id="reactor_01", + volume=10.0, + ) + + print(f"\n无路径时的动作列表: {actions}") + assert actions == [] + + def test_pump_backbone_filtering(self): + """验证泵骨架过滤逻辑(电磁阀被跳过)""" + from unilabos.compile.pump_protocol import generate_pump_protocol + + G = _build_pump_transfer_graph() + # 添加电磁阀到路径中 + G.add_node("solenoid_valve_1", **{ + "type": "device", "class": "solenoid_valve", + "data": {}, "config": {}, + }) + # flask_water → solenoid_valve_1 → valve_1 → reactor_01 + G.remove_edge("flask_water", "valve_1") + G.add_edge("flask_water", "solenoid_valve_1") + G.add_edge("solenoid_valve_1", "valve_1") + + path = nx.shortest_path(G, "flask_water", "reactor_01") + print(f"\n含电磁阀的路径: {path}") + assert "solenoid_valve_1" in path + + actions = generate_pump_protocol( + G=G, + from_vessel_id="flask_water", + to_vessel_id="reactor_01", + volume=10.0, + ) + + _dump_actions(actions, "PumpTransfer 含电磁阀: flask_water → solenoid → valve_1 → reactor_01") + # 电磁阀应被跳过,泵骨架只有 valve_1 + assert len(actions) > 0 + + +# ==================== Separate 全链路 ==================== + +class TestSeparateProtocolFullChain: + """Separate: 包含 bug 确认和正常路径测试""" + + def test_separate_bug_line_128_fixed(self): + """验证 separate_protocol.py:128 的 bug 已修复(不再 crash)""" + from unilabos.compile.separate_protocol import generate_separate_protocol + + G = _build_pump_transfer_graph() + + raw_data = [_make_raw_resource( + id="reactor_01", klass="virtual_stirrer", + data={"liquid": [{"liquid_type": "water", "volume": 100.0}]}, + )] + enriched = _simulate_enrichment(raw_data) + + # 修复前:final_vessel_id, _ = vessel_id 会 crash(字符串解包) + # 修复后:final_vessel_id = vessel_id,正常返回 action 列表 + result = generate_separate_protocol( + G=G, + vessel=enriched, + purpose="extract", + product_phase="top", + product_vessel="product_flask", + waste_vessel="waste_workup", + solvent="dcm", + volume="100 mL", + ) + assert isinstance(result, list) + assert len(result) > 0 + + def test_separate_manual_workaround(self): + """ + 绕过 line 128 bug,手动测试分离编译器中可以工作的子函数 + """ + from unilabos.compile.separate_protocol import ( + find_separator_device, + find_separation_vessel_bottom, + ) + from unilabos.compile.utils.vessel_parser import ( + find_connected_stirrer, + find_solvent_vessel, + ) + from unilabos.compile.utils.unit_parser import parse_volume_input + from unilabos.compile.utils.resource_helper import get_resource_liquid_volume as get_vessel_liquid_volume + + G = _build_pump_transfer_graph() + + # 1. get_vessel 解析 enriched dict + raw_data = [_make_raw_resource( + id="reactor_01", klass="virtual_stirrer", + data={"liquid": [{"liquid_type": "water", "volume": 100.0}]}, + )] + enriched = _simulate_enrichment(raw_data) + vessel_id, vessel_data = get_vessel(enriched) + print(f"\nvessel_id: {vessel_id}") + print(f"vessel_data: {vessel_data}") + assert vessel_id == "reactor_01" + assert vessel_data["liquid"][0]["volume"] == 100.0 + + # 2. find_separator_device + sep = find_separator_device(G, vessel_id) + print(f"分离器设备: {sep}") + assert sep == "separator_1" + + # 3. find_connected_stirrer + stirrer = find_connected_stirrer(G, vessel_id) + print(f"搅拌器设备: {stirrer}") + assert stirrer == "stirrer_1" + + # 4. find_solvent_vessel + solvent_v = find_solvent_vessel(G, "dcm") + print(f"DCM溶剂容器: {solvent_v}") + assert solvent_v == "flask_dcm" + + # 5. parse_volume_input + vol = parse_volume_input("200 mL") + print(f"体积解析: '200 mL' → {vol}") + assert vol == 200.0 + + vol2 = parse_volume_input("1.5 L") + print(f"体积解析: '1.5 L' → {vol2}") + assert vol2 == 1500.0 + + # 6. get_vessel_liquid_volume + liq_vol = get_vessel_liquid_volume(enriched) + print(f"液体体积 (enriched dict): {liq_vol}") + assert liq_vol == 100.0 + + # 7. find_separation_vessel_bottom + bottom = find_separation_vessel_bottom(G, vessel_id) + print(f"分离容器底部: {bottom}") + # 当前图中没有命名匹配的底部容器 + + def test_pump_transfer_for_separate_subflow(self): + """测试 separate 中调用的 pump 子流程(溶剂添加 → 分液漏斗)""" + from unilabos.compile.pump_protocol import generate_pump_protocol_with_rinsing + + G = _build_pump_transfer_graph() + + # 模拟分离前的溶剂添加步骤 + actions = generate_pump_protocol_with_rinsing( + G=G, + from_vessel="flask_dcm", + to_vessel="reactor_01", + volume=100.0, + flowrate=2.5, + transfer_flowrate=0.5, + ) + + _dump_actions(actions, "Separate 子流程: flask_dcm → reactor_01, 100mL DCM") + + assert isinstance(actions, list) + assert len(actions) > 0 + + # 模拟分离后产物转移 + actions2 = generate_pump_protocol_with_rinsing( + G=G, + from_vessel="reactor_01", + to_vessel="product_flask", + volume=50.0, + flowrate=2.5, + transfer_flowrate=0.5, + ) + + _dump_actions(actions2, "Separate 子流程: reactor_01 → product_flask, 50mL 产物") + + assert len(actions2) > 0 + + # 废液转移 + actions3 = generate_pump_protocol_with_rinsing( + G=G, + from_vessel="reactor_01", + to_vessel="waste_workup", + volume=50.0, + flowrate=2.5, + transfer_flowrate=0.5, + ) + + _dump_actions(actions3, "Separate 子流程: reactor_01 → waste_workup, 50mL 废液") + + assert len(actions3) > 0 + + +# ==================== 图路径可视化 ==================== + +class TestGraphPathVisualization: + """输出图中关键路径信息""" + + def test_all_shortest_paths(self): + """输出所有容器之间的最短路径""" + G = _build_pump_transfer_graph() + + containers = [n for n in G.nodes() if G.nodes[n].get("type") == "container"] + devices = [n for n in G.nodes() if G.nodes[n].get("type") == "device"] + + print(f"\n{'='*70}") + print(f" 设备图概览") + print(f"{'='*70}") + print(f" 容器节点 ({len(containers)}): {containers}") + print(f" 设备节点 ({len(devices)}): {devices}") + print(f" 边数: {G.number_of_edges()}") + print(f" 边列表:") + for u, v, data in G.edges(data=True): + port_info = data.get("port", "") + print(f" {u} → {v} {port_info if port_info else ''}") + + print(f"\n 关键路径:") + pairs = [ + ("flask_water", "reactor_01"), + ("flask_dcm", "reactor_01"), + ("reactor_01", "product_flask"), + ("reactor_01", "waste_workup"), + ("flask_water", "product_flask"), + ] + for src, dst in pairs: + try: + path = nx.shortest_path(G, src, dst) + length = len(path) - 1 + # 标注路径上的节点类型 + annotated = [] + for n in path: + ntype = G.nodes[n].get("type", "?") + nclass = G.nodes[n].get("class", "") + annotated.append(f"{n}({ntype}{'/' + nclass if nclass else ''})") + print(f" {src} → {dst}: 距离={length}") + print(f" 路径: {' → '.join(annotated)}") + except nx.NetworkXNoPath: + print(f" {src} → {dst}: 无路径!") + + print(f"{'='*70}\n") diff --git a/tests/compile/test_resource_conversion_path.py b/tests/compile/test_resource_conversion_path.py new file mode 100644 index 00000000..8092e23e --- /dev/null +++ b/tests/compile/test_resource_conversion_path.py @@ -0,0 +1,324 @@ +""" +ROS Goal → Resource 转换 → 编译器路径的集成测试 + +覆盖: +1. Resource.msg 新字段(uuid, klass, extra)的往返转换 +2. dict → ROS Resource → dict 往返无损 +3. ResourceTreeSet → get_plr_nested_dict 保留 children 结构 +4. resource_helper 兼容 dict / ResourceDictInstance +5. vessel_parser.get_vessel 兼容 ResourceDictInstance +""" + +import json +import pytest + +# 不依赖 ROS 的测试 —— 直接测试 resource 处理路径 +from unilabos.resources.resource_tracker import ( + ResourceDict, + ResourceDictInstance, + ResourceTreeInstance, + ResourceTreeSet, +) +from unilabos.compile.utils.resource_helper import ( + ensure_resource_instance, + resource_to_dict, + get_resource_id, + get_resource_data, + get_resource_display_info, + get_resource_liquid_volume, +) +from unilabos.compile.utils.vessel_parser import get_vessel + + +# ============ 构建测试数据 ============ + + +def _make_resource_dict( + id="reactor_01", + uuid="uuid-reactor-01", + name="reactor_01", + klass="virtual_stirrer", + type_="device", + parent=None, + parent_uuid=None, + data=None, + config=None, + extra=None, +): + return { + "id": id, + "uuid": uuid, + "name": name, + "class": klass, + "type": type_, + "parent": parent, + "parent_uuid": parent_uuid or "", + "description": "", + "config": config or {}, + "data": data or {}, + "extra": extra or {}, + "position": {"x": 1.0, "y": 2.0, "z": 3.0}, + } + + +def _make_resource_instance(id="reactor_01", **kwargs): + d = _make_resource_dict(id=id, **kwargs) + return ResourceDictInstance.get_resource_instance_from_dict(d) + + +def _make_tree_with_children(): + """构建 StationA -> [R1, R2] 的资源树""" + raw_data = [ + _make_resource_dict( + id="StationA", + uuid="uuid-station-a", + name="StationA", + klass="workstation", + type_="device", + ), + _make_resource_dict( + id="R1", + uuid="uuid-r1", + name="R1", + klass="", + type_="resource", + parent="StationA", + parent_uuid="uuid-station-a", + data={"liquid": [{"liquid_type": "water", "volume": 10.0}]}, + ), + _make_resource_dict( + id="R2", + uuid="uuid-r2", + name="R2", + klass="", + type_="resource", + parent="StationA", + parent_uuid="uuid-station-a", + data={"liquid": [{"liquid_type": "ethanol", "volume": 5.0}]}, + ), + ] + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) + return tree_set + + +# ============ resource_helper 测试 ============ + + +class TestResourceHelper: + """测试 resource_helper 对 dict / ResourceDictInstance 的兼容性""" + + def test_ensure_resource_instance_from_dict(self): + d = _make_resource_dict() + inst = ensure_resource_instance(d) + assert isinstance(inst, ResourceDictInstance) + assert inst.res_content.id == "reactor_01" + assert inst.res_content.uuid == "uuid-reactor-01" + + def test_ensure_resource_instance_passthrough(self): + inst = _make_resource_instance() + result = ensure_resource_instance(inst) + assert result is inst # 同一个对象,不复制 + + def test_ensure_resource_instance_none(self): + assert ensure_resource_instance(None) is None + + def test_get_resource_id_from_dict(self): + d = _make_resource_dict(id="my_device") + assert get_resource_id(d) == "my_device" + + def test_get_resource_id_from_instance(self): + inst = _make_resource_instance(id="my_device") + assert get_resource_id(inst) == "my_device" + + def test_get_resource_id_from_string(self): + assert get_resource_id("my_device") == "my_device" + + def test_get_resource_id_from_wrapped_dict(self): + """兼容 {station_id: {...}} 格式""" + d = {"StationA": {"id": "StationA", "name": "StationA"}} + assert get_resource_id(d) == "StationA" + + def test_get_resource_data_from_dict(self): + d = _make_resource_dict(data={"temperature": 25.0}) + assert get_resource_data(d) == {"temperature": 25.0} + + def test_get_resource_data_from_instance(self): + inst = _make_resource_instance(data={"temperature": 25.0}) + data = get_resource_data(inst) + assert data["temperature"] == 25.0 + + def test_get_resource_display_info_from_dict(self): + d = _make_resource_dict(id="reactor_01", name="Reactor #1") + info = get_resource_display_info(d) + assert "reactor_01" in info + assert "Reactor #1" in info + + def test_get_resource_display_info_from_instance(self): + inst = _make_resource_instance(id="reactor_01", name="Reactor #1") + info = get_resource_display_info(inst) + assert "reactor_01" in info + + def test_get_resource_display_info_from_string(self): + assert get_resource_display_info("reactor_01") == "reactor_01" + + def test_get_resource_liquid_volume(self): + d = _make_resource_dict(data={"liquid": [{"liquid_type": "water", "volume": 15.5}]}) + assert get_resource_liquid_volume(d) == pytest.approx(15.5) + + def test_resource_to_dict_from_instance(self): + inst = _make_resource_instance(id="reactor_01", klass="virtual_stirrer") + d = resource_to_dict(inst) + assert isinstance(d, dict) + assert d["id"] == "reactor_01" + assert d["class"] == "virtual_stirrer" + + def test_resource_to_dict_passthrough(self): + d = _make_resource_dict() + result = resource_to_dict(d) + assert result is d # 同一个 dict + + +# ============ vessel_parser 兼容性测试 ============ + + +class TestVesselParser: + """测试 vessel_parser.get_vessel 对 ResourceDictInstance 的兼容""" + + def test_get_vessel_from_dict(self): + d = _make_resource_dict(id="reactor_01", data={"temperature": 25.0}) + vessel_id, vessel_data = get_vessel(d) + assert vessel_id == "reactor_01" + assert vessel_data["temperature"] == 25.0 + + def test_get_vessel_from_string(self): + vessel_id, vessel_data = get_vessel("reactor_01") + assert vessel_id == "reactor_01" + assert vessel_data == {} + + def test_get_vessel_from_resource_instance(self): + inst = _make_resource_instance(id="reactor_01", data={"temperature": 25.0}) + vessel_id, vessel_data = get_vessel(inst) + assert vessel_id == "reactor_01" + assert vessel_data["temperature"] == 25.0 + + def test_get_vessel_from_wrapped_dict(self): + """兼容 {station_id: {id: ..., data: {...}}} 格式""" + d = {"StationA": {"id": "StationA", "data": {"vol": 100}}} + vessel_id, vessel_data = get_vessel(d) + assert vessel_id == "StationA" + + +# ============ ResourceTreeSet → get_plr_nested_dict 测试 ============ + + +class TestResourceTreeRoundTrip: + """测试 ResourceTreeSet → get_plr_nested_dict 保留树结构和关键字段""" + + def test_tree_preserves_children(self): + tree_set = _make_tree_with_children() + assert len(tree_set.trees) == 1 + root = tree_set.trees[0].root_node + assert root.res_content.id == "StationA" + assert len(root.children) == 2 + + def test_plr_nested_dict_has_children(self): + tree_set = _make_tree_with_children() + root = tree_set.trees[0].root_node + nested = root.get_plr_nested_dict() + assert isinstance(nested, dict) + assert "children" in nested + assert isinstance(nested["children"], dict) + assert "R1" in nested["children"] + assert "R2" in nested["children"] + + def test_plr_nested_dict_preserves_uuid(self): + tree_set = _make_tree_with_children() + root = tree_set.trees[0].root_node + nested = root.get_plr_nested_dict() + assert nested["uuid"] == "uuid-station-a" + assert nested["children"]["R1"]["uuid"] == "uuid-r1" + + def test_plr_nested_dict_preserves_klass(self): + tree_set = _make_tree_with_children() + root = tree_set.trees[0].root_node + nested = root.get_plr_nested_dict() + assert nested["class"] == "workstation" + + def test_plr_nested_dict_preserves_data(self): + tree_set = _make_tree_with_children() + root = tree_set.trees[0].root_node + nested = root.get_plr_nested_dict() + r1_data = nested["children"]["R1"]["data"] + assert "liquid" in r1_data + assert r1_data["liquid"][0]["volume"] == 10.0 + + def test_plr_nested_dict_usable_by_get_vessel(self): + """get_plr_nested_dict 的结果可以直接传给 get_vessel""" + tree_set = _make_tree_with_children() + root = tree_set.trees[0].root_node + nested = root.get_plr_nested_dict() + vessel_id, vessel_data = get_vessel(nested) + assert vessel_id == "StationA" + + def test_dump_vs_plr_nested_dict(self): + """dump() 是扁平化的,get_plr_nested_dict 保留树结构""" + tree_set = _make_tree_with_children() + # dump 返回扁平列表 + dumped = tree_set.dump() + assert isinstance(dumped[0], list) + assert len(dumped[0]) == 3 # StationA + R1 + R2,全部扁平 + + # get_plr_nested_dict 保留嵌套 + root = tree_set.trees[0].root_node + nested = root.get_plr_nested_dict() + assert isinstance(nested["children"], dict) + assert len(nested["children"]) == 2 # 嵌套的 children + + +# ============ 模拟 workstation 路径测试 ============ + + +class TestWorkstationPath: + """模拟 workstation.py 中的关键路径: + raw_data → ResourceTreeSet.from_raw_dict_list → get_plr_nested_dict → compiler + """ + + def test_single_resource_path(self): + """单个 Resource: 取第一棵树的根节点""" + raw_data = [ + _make_resource_dict(id="reactor_01", uuid="uuid-r01", klass="virtual_stirrer"), + ] + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) + root = tree_set.trees[0].root_node + result = root.get_plr_nested_dict() + assert result["id"] == "reactor_01" + assert result["uuid"] == "uuid-r01" + assert result["class"] == "virtual_stirrer" + + def test_resource_with_children_path(self): + """Resource 带 children: AGV/batch transfer 场景""" + tree_set = _make_tree_with_children() + root = tree_set.trees[0].root_node + nested = root.get_plr_nested_dict() + + # 模拟编译器接收到的参数 + from_repo = {"StationA": nested} + assert "A01" not in from_repo["StationA"]["children"] # children 按 id 索引 + assert "R1" in from_repo["StationA"]["children"] + assert from_repo["StationA"]["children"]["R1"]["uuid"] == "uuid-r1" + + def test_multiple_resource_path(self): + """多个 Resource: 每棵树取根节点""" + raw_data1 = [_make_resource_dict(id="R1", uuid="uuid-r1")] + raw_data2 = [_make_resource_dict(id="R2", uuid="uuid-r2")] + # 模拟 host 返回多棵树 + tree_set1 = ResourceTreeSet.from_raw_dict_list(raw_data1) + tree_set2 = ResourceTreeSet.from_raw_dict_list(raw_data2) + results = [ + tree.root_node.get_plr_nested_dict() + for ts in [tree_set1, tree_set2] + for tree in ts.trees + ] + assert len(results) == 2 + assert results[0]["id"] == "R1" + assert results[1]["id"] == "R2" diff --git a/tests/devices/test_agv_transport_station.py b/tests/devices/test_agv_transport_station.py new file mode 100644 index 00000000..486a87b8 --- /dev/null +++ b/tests/devices/test_agv_transport_station.py @@ -0,0 +1,137 @@ +""" +AGVTransportStation driver 测试 + +覆盖:初始化、carrier property、slot 查询、路由查询、capacity 计算。 +""" + +import pytest +from unittest.mock import MagicMock, patch + +from unilabos.devices.transport.agv_workstation import AGVTransportStation +from unilabos.resources.warehouse import WareHouse, warehouse_factory + + +class TestAGVTransportStation: + def _make_driver(self, route_table=None, device_roles=None): + """创建一个 AGVTransportStation 实例""" + return AGVTransportStation( + deck=None, + route_table=route_table or { + "A->B": {"nav_command": '{"target":"LM1"}', "arm_pick": "pick.urp", "arm_place": "place.urp"} + }, + device_roles=device_roles or {"navigator": "agv_nav", "arm": "agv_arm"}, + ) + + def _make_warehouse(self, name="agv_platform", nx=2, ny=1, nz=1): + """创建一个测试用 Warehouse""" + return warehouse_factory(name=name, num_items_x=nx, num_items_y=ny, num_items_z=nz) + + def test_init_deck_none(self): + """AGVTransportStation 初始化时 deck=None""" + driver = self._make_driver() + assert driver.deck is None + + def test_init_route_table(self): + """路由表正确存储""" + driver = self._make_driver() + assert "A->B" in driver.route_table + + def test_init_device_roles(self): + """设备角色正确存储""" + driver = self._make_driver() + assert driver.device_roles["navigator"] == "agv_nav" + assert driver.device_roles["arm"] == "agv_arm" + + def test_carrier_without_ros_node(self): + """未 post_init 时 carrier 返回 None""" + driver = self._make_driver() + assert driver.carrier is None + + def test_carrier_with_warehouse(self): + """post_init 后 carrier 返回正确的 WareHouse""" + driver = self._make_driver() + wh = self._make_warehouse() + + # 模拟 ros_node 和 resource_tracker + mock_ros_node = MagicMock() + mock_ros_node.resource_tracker.resources = [wh] + mock_ros_node.device_id = "AGV" + driver.post_init(mock_ros_node) + + assert driver.carrier is wh + assert isinstance(driver.carrier, WareHouse) + + def test_capacity(self): + """容量计算正确""" + driver = self._make_driver() + wh = self._make_warehouse(nx=2, ny=1, nz=1) + mock_ros_node = MagicMock() + mock_ros_node.resource_tracker.resources = [wh] + mock_ros_node.device_id = "AGV" + driver.post_init(mock_ros_node) + + assert driver.capacity == 2 + + def test_capacity_multi_layer(self): + """多层 Warehouse 容量""" + driver = self._make_driver() + wh = self._make_warehouse(nx=1, ny=2, nz=3) + mock_ros_node = MagicMock() + mock_ros_node.resource_tracker.resources = [wh] + mock_ros_node.device_id = "AGV" + driver.post_init(mock_ros_node) + + assert driver.capacity == 6 + + def test_capacity_no_carrier(self): + """无 carrier 时容量为 0""" + driver = self._make_driver() + assert driver.capacity == 0 + + def test_free_slots(self): + """空载时所有 slot 为空闲""" + driver = self._make_driver() + wh = self._make_warehouse(nx=2, ny=1, nz=1) + mock_ros_node = MagicMock() + mock_ros_node.resource_tracker.resources = [wh] + mock_ros_node.device_id = "AGV" + driver.post_init(mock_ros_node) + + free = driver.free_slots + assert len(free) == 2 + + def test_occupied_slots_empty(self): + """空载时 occupied_slots 为空""" + driver = self._make_driver() + wh = self._make_warehouse(nx=2, ny=1, nz=1) + mock_ros_node = MagicMock() + mock_ros_node.resource_tracker.resources = [wh] + mock_ros_node.device_id = "AGV" + driver.post_init(mock_ros_node) + + assert len(driver.occupied_slots) == 0 + + def test_resolve_route(self): + """路由查询返回正确的指令""" + driver = self._make_driver() + route = driver.resolve_route("A", "B") + assert route["nav_command"] == '{"target":"LM1"}' + assert route["arm_pick"] == "pick.urp" + + def test_resolve_route_not_found(self): + """查询不存在的路线时抛出 KeyError""" + driver = self._make_driver() + with pytest.raises(KeyError, match="路由表"): + driver.resolve_route("X", "Y") + + def test_get_device_id(self): + """获取子设备 ID""" + driver = self._make_driver() + assert driver.get_device_id("navigator") == "agv_nav" + assert driver.get_device_id("arm") == "agv_arm" + + def test_get_device_id_not_found(self): + """获取不存在的角色时抛出 KeyError""" + driver = self._make_driver() + with pytest.raises(KeyError, match="未配置设备角色"): + driver.get_device_id("gripper")