no opcua installation on macos

fix possible crash

fix deck & host_node

set liquid with tube

add test_resource_schema

fix test resource schema

registry update & workflow update

add test mode

support description & tags upload

fix config load

fix log

add registry name & add always free

correct config organic synthesis

Adapt to new scheduler, sampels, and edge upload format (#230)

* add sample_material

* adapt to new samples sys

* fix pump transfer. fix resource update when protocol & ros callback

* Adapt to new scheduler.

Feat/samples (#229)

* add sample_material

* adapt to new samples sys

adapt to new samples sys

adapt to new edge format

workflow upload & prcxi transfer liquid

lh liquid

speed up registry load

workflow upload & set liquid fix & add set liquid with plate

fix upload workflow json
This commit is contained in:
Xuwznln
2026-02-02 17:18:45 +08:00
parent 5179a7e48e
commit b551e69f64
41 changed files with 2671 additions and 747 deletions

View File

@@ -1,17 +1,17 @@
import collections
from dataclasses import dataclass, field
import json
import threading
import time
import traceback
import uuid
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
from typing_extensions import TypedDict
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Point
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.service import Service
from typing_extensions import TypedDict
from unilabos_msgs.msg import Resource # type: ignore
from unilabos_msgs.srv import (
ResourceAdd,
@@ -23,10 +23,20 @@ from unilabos_msgs.srv import (
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.registry.registry import lab_registry
from unilabos.resources.container import RegularContainer
from unilabos.resources.graphio import initialize_resource
from unilabos.resources.registry import add_schema
from unilabos.resources.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
RETURN_UNILABOS_SAMPLES,
JSON_UNILABOS_PARAM,
PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample,
)
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
get_msg_type,
@@ -37,17 +47,11 @@ from unilabos.ros.msgs.message_converter import (
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode
from unilabos.resources.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
)
from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.config.config import BasicConfig
if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem
@@ -60,7 +64,8 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[DeviceSlot]
devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict):
@@ -245,6 +250,7 @@ class HostNode(BaseROS2DeviceNode):
self,
driver_instance=self,
device_id=device_id,
registry_name="host_node",
device_uuid=host_node_dict["uuid"],
status_types={},
action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"],
@@ -299,7 +305,8 @@ class HostNode(BaseROS2DeviceNode):
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
) # device_id -> action_value_mappings(本地+远程设备统一存储)
self._slave_registry_configs: Dict[str, Dict] = {} # registry_name -> registry_config(含action_value_mappings)
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
self._last_discovery_time = 0.0 # 上次设备发现的时间
@@ -633,6 +640,8 @@ class HostNode(BaseROS2DeviceNode):
self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d
# noinspection PyProtectedMember
self._action_value_mappings[device_id] = d._ros_node._action_value_mappings
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith(
"UniLabJsonCommand"
@@ -755,6 +764,7 @@ class HostNode(BaseROS2DeviceNode):
item: "QueueItem",
action_type: str,
action_kwargs: Dict[str, Any],
sample_material: Dict[str, str],
server_info: Optional[Dict[str, Any]] = None,
) -> None:
"""
@@ -768,18 +778,29 @@ class HostNode(BaseROS2DeviceNode):
u = uuid.UUID(item.job_id)
device_id = item.device_id
action_name = item.action_name
if BasicConfig.test_mode:
action_id = f"/devices/{device_id}/{action_name}"
self.lab_logger().info(
f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}"
)
# 根据注册表 handles 构建模拟返回值
mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs)
self._handle_test_mode_result(item, action_id, mock_return)
return
if action_type.startswith("UniLabJsonCommand"):
if action_name.startswith("auto-"):
action_name = action_name[5:]
action_id = f"/devices/{device_id}/_execute_driver_command"
action_kwargs = {
"string": json.dumps(
{
"function_name": action_name,
"function_args": action_kwargs,
}
)
json_command: Dict[str, Any] = {
"function_name": action_name,
"function_args": action_kwargs,
JSON_UNILABOS_PARAM: {
PARAM_SAMPLE_UUIDS: sample_material,
},
}
action_kwargs = {"string": json.dumps(json_command)}
if action_type.startswith("UniLabJsonCommandAsync"):
action_id = f"/devices/{device_id}/_execute_driver_command_async"
else:
@@ -790,24 +811,9 @@ class HostNode(BaseROS2DeviceNode):
raise ValueError(f"ActionClient {action_id} not found.")
action_client: ActionClient = self._action_clients[action_id]
# 遍历action_kwargs下的所有子dict将"sample_uuid"的值赋给"sample_id"
def assign_sample_id(obj):
if isinstance(obj, dict):
if "sample_uuid" in obj:
obj["sample_id"] = obj["sample_uuid"]
obj.pop("sample_uuid")
for k, v in obj.items():
if k != "unilabos_extra":
assign_sample_id(v)
elif isinstance(obj, list):
for item in obj:
assign_sample_id(item)
assign_sample_id(action_kwargs)
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
self.lab_logger().info(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}")
# self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}")
self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {action_kwargs}")
self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {goal_msg}")
action_client.wait_for_server()
@@ -820,6 +826,51 @@ class HostNode(BaseROS2DeviceNode):
)
future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f))
def _build_test_mode_return(
self, device_id: str, action_name: str, action_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""
根据注册表 handles 的 output 定义构建测试模式的模拟返回值
根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。
例如: "vessel"{}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]]
"""
mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name}
action_mappings = self._action_value_mappings.get(device_id, {})
action_mapping = action_mappings.get(action_name, {})
handles = action_mapping.get("handles", {})
if isinstance(handles, dict):
for output_handle in handles.get("output", []):
data_key = output_handle.get("data_key", "")
handler_key = output_handle.get("handler_key", "")
# 根据 @flatten 层数构建嵌套数组,叶子为空字典
flatten_count = data_key.count("@flatten")
value: Any = {}
for _ in range(flatten_count):
value = [value]
mock_return[handler_key] = value
return mock_return
def _handle_test_mode_result(
self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any]
) -> None:
"""
测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS
"""
job_id = item.job_id
status = "success"
return_info = serialize_result_info("", True, mock_return)
self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}")
from unilabos.app.web.controller import store_job_result
store_job_result(job_id, status, return_info, mock_return)
# 发布状态到桥接器
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(mock_return, item, status, return_info)
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""目标响应回调"""
goal_handle = future.result()
@@ -867,14 +918,14 @@ class HostNode(BaseROS2DeviceNode):
# 适配后端的一些额外处理
return_value = return_info.get("return_value")
if isinstance(return_value, dict):
unilabos_samples = return_value.pop("unilabos_samples", None)
unilabos_samples = return_value.pop(RETURN_UNILABOS_SAMPLES, None)
if isinstance(unilabos_samples, list) and unilabos_samples:
self.lab_logger().info(
f"[Host Node] Job {job_id[:8]} returned {len(unilabos_samples)} sample(s): "
f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}"
f"{'...' if len(unilabos_samples) > 5 else ''}"
)
return_info["unilabos_samples"] = unilabos_samples
return_info["samples"] = unilabos_samples
suc = return_info.get("suc", False)
if not suc:
status = "failed"
@@ -1179,8 +1230,12 @@ class HostNode(BaseROS2DeviceNode):
def _node_info_update_callback(self, request, response):
"""
更新节点信息回调
处理两种消息:
1. 首次上报(main_slave_run): 带 devices_config + registry_config,存储 action_value_mappings
2. 设备重注册(SYNC_SLAVE_NODE_INFO): 带 edge_device_id + registry_name,用 registry_name 索引已存储的 mappings
"""
# self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
self.lab_logger().trace(f"[Host Node] Node info update request received: {request}")
try:
from unilabos.app.communication import get_communication_client
from unilabos.app.web.client import HTTPClient, http_client
@@ -1190,12 +1245,48 @@ class HostNode(BaseROS2DeviceNode):
info = info["SYNC_SLAVE_NODE_INFO"]
machine_name = info["machine_name"]
edge_device_id = info["edge_device_id"]
registry_name = info.get("registry_name", "")
self.device_machine_names[edge_device_id] = machine_name
# 用 registry_name 索引已存储的 registry_config,获取 action_value_mappings
if registry_name and registry_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[registry_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[edge_device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Loaded {len(action_mappings)} action mappings "
f"for remote device {edge_device_id} (registry: {registry_name})"
)
else:
devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config")
if registry_config:
http_client.resource_registry({"resources": registry_config})
# 存储 slave 的 registry_config,用于后续 SYNC_SLAVE_NODE_INFO 索引
for reg_name, reg_data in registry_config.items():
if isinstance(reg_data, dict) and "class" in reg_data:
self._slave_registry_configs[reg_name] = reg_data
# 解析 devices_config,建立 device_id -> action_value_mappings 映射
if devices_config:
for device_tree in devices_config:
for device_dict in device_tree:
device_id = device_dict.get("id", "")
class_name = device_dict.get("class", "")
if device_id and class_name and class_name in self._slave_registry_configs:
action_mappings = self._slave_registry_configs[class_name].get(
"class", {}
).get("action_value_mappings", {})
if action_mappings:
self._action_value_mappings[device_id] = action_mappings
self.lab_logger().info(
f"[Host Node] Stored {len(action_mappings)} action mappings "
f"for remote device {device_id} (class: {class_name})"
)
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
@@ -1492,6 +1583,7 @@ class HostNode(BaseROS2DeviceNode):
def test_resource(
self,
sample_uuids: SampleUUIDsType,
resource: ResourceSlot = None,
resources: List[ResourceSlot] = None,
device: DeviceSlot = None,
@@ -1506,6 +1598,7 @@ class HostNode(BaseROS2DeviceNode):
return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(),
"devices": [device, *devices],
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
}
def handle_pong_response(self, pong_data: dict):