mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-05-24 18:05:56 +00:00
Merge branch 'dev' into prcix9320
This commit is contained in:
@@ -4,6 +4,8 @@ import json
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from unilabos.utils.tools import fast_dumps_str as _fast_dumps_str, fast_loads as _fast_loads
|
||||
from typing import (
|
||||
get_type_hints,
|
||||
TypeVar,
|
||||
@@ -78,6 +80,67 @@ if TYPE_CHECKING:
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class RclpyAsyncMutex:
|
||||
"""rclpy executor 兼容的异步互斥锁
|
||||
|
||||
通过 executor.create_task 唤醒等待者,避免 timer 的 InvalidHandle 问题。
|
||||
"""
|
||||
|
||||
def __init__(self, name: str = ""):
|
||||
self._lock = threading.Lock()
|
||||
self._acquired = False
|
||||
self._queue: List[Future] = []
|
||||
self._name = name
|
||||
self._holder: Optional[str] = None
|
||||
|
||||
async def acquire(self, node: "BaseROS2DeviceNode", tag: str = ""):
|
||||
"""获取锁。如果已被占用,则异步等待直到锁释放。"""
|
||||
# t0 = time.time()
|
||||
with self._lock:
|
||||
# qlen = len(self._queue)
|
||||
if not self._acquired:
|
||||
self._acquired = True
|
||||
self._holder = tag
|
||||
# node.lab_logger().debug(
|
||||
# f"[Mutex:{self._name}] 获取锁 tag={tag} (无等待, queue=0)"
|
||||
# )
|
||||
return
|
||||
waiter = Future()
|
||||
self._queue.append(waiter)
|
||||
# node.lab_logger().info(
|
||||
# f"[Mutex:{self._name}] 等待锁 tag={tag} "
|
||||
# f"(holder={self._holder}, queue={qlen + 1})"
|
||||
# )
|
||||
await waiter
|
||||
# wait_ms = (time.time() - t0) * 1000
|
||||
self._holder = tag
|
||||
# node.lab_logger().info(
|
||||
# f"[Mutex:{self._name}] 获取锁 tag={tag} (等了 {wait_ms:.0f}ms)"
|
||||
# )
|
||||
|
||||
def release(self, node: "BaseROS2DeviceNode"):
|
||||
"""释放锁,通过 executor task 唤醒下一个等待者。"""
|
||||
with self._lock:
|
||||
# old_holder = self._holder
|
||||
if self._queue:
|
||||
next_waiter = self._queue.pop(0)
|
||||
# node.lab_logger().debug(
|
||||
# f"[Mutex:{self._name}] 释放锁 holder={old_holder} → 唤醒下一个 (剩余 queue={len(self._queue)})"
|
||||
# )
|
||||
|
||||
async def _wake():
|
||||
if not next_waiter.done():
|
||||
next_waiter.set_result(None)
|
||||
|
||||
rclpy.get_global_executor().create_task(_wake())
|
||||
else:
|
||||
self._acquired = False
|
||||
self._holder = None
|
||||
# node.lab_logger().debug(
|
||||
# f"[Mutex:{self._name}] 释放锁 holder={old_holder} → 空闲"
|
||||
# )
|
||||
|
||||
|
||||
# 在线设备注册表
|
||||
registered_devices: Dict[str, "DeviceInfoType"] = {}
|
||||
|
||||
@@ -355,6 +418,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}"
|
||||
)
|
||||
|
||||
self._append_resource_lock = RclpyAsyncMutex(name=f"AR:{device_id}")
|
||||
|
||||
# 创建资源管理客户端
|
||||
self._resource_clients: Dict[str, Client] = {
|
||||
"resource_add": self.create_client(ResourceAdd, "/resources/add", callback_group=self.callback_group),
|
||||
@@ -378,15 +443,40 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
return res
|
||||
|
||||
async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response):
|
||||
_cmd = _fast_loads(req.command)
|
||||
_res_name = _cmd.get("resource", [{}])
|
||||
_res_name = (_res_name[0].get("id", "?") if isinstance(_res_name, list) and _res_name
|
||||
else _res_name.get("id", "?") if isinstance(_res_name, dict) else "?")
|
||||
_ar_tag = f"{_res_name}"
|
||||
# _t_enter = time.time()
|
||||
# self.lab_logger().info(f"[AR:{_ar_tag}] 进入 append_resource")
|
||||
await self._append_resource_lock.acquire(self, tag=_ar_tag)
|
||||
# _t_locked = time.time()
|
||||
try:
|
||||
return await _append_resource_inner(req, res, _ar_tag)
|
||||
# _t_done = time.time()
|
||||
# self.lab_logger().info(
|
||||
# f"[AR:{_ar_tag}] 完成 "
|
||||
# f"等锁={(_t_locked - _t_enter) * 1000:.0f}ms "
|
||||
# f"执行={(_t_done - _t_locked) * 1000:.0f}ms "
|
||||
# f"总计={(_t_done - _t_enter) * 1000:.0f}ms"
|
||||
# )
|
||||
except Exception as _ex:
|
||||
self.lab_logger().error(f"[AR:{_ar_tag}] 异常: {_ex}")
|
||||
raise
|
||||
finally:
|
||||
self._append_resource_lock.release(self)
|
||||
|
||||
async def _append_resource_inner(req: SerialCommand_Request, res: SerialCommand_Response, _ar_tag: str = ""):
|
||||
from pylabrobot.resources.deck import Deck
|
||||
from pylabrobot.resources import Coordinate
|
||||
from pylabrobot.resources import Plate
|
||||
|
||||
# 物料传输到对应的node节点
|
||||
# _t0 = time.time()
|
||||
client = self._resource_clients["c2s_update_resource_tree"]
|
||||
request = SerialCommand.Request()
|
||||
request2 = SerialCommand.Request()
|
||||
command_json = json.loads(req.command)
|
||||
command_json = _fast_loads(req.command)
|
||||
namespace = command_json["namespace"]
|
||||
bind_parent_id = command_json["bind_parent_id"]
|
||||
edge_device_id = command_json["edge_device_id"]
|
||||
@@ -439,7 +529,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}"
|
||||
)
|
||||
# noinspection PyUnresolvedReferences
|
||||
request.command = json.dumps(
|
||||
# _t1 = time.time()
|
||||
# self.lab_logger().debug(
|
||||
# f"[AR:{_ar_tag}] 准备完成 PLR转换+序列化 {((_t1 - _t0) * 1000):.0f}ms, 发送首次上传..."
|
||||
# )
|
||||
request.command = _fast_dumps_str(
|
||||
{
|
||||
"action": "add",
|
||||
"data": {
|
||||
@@ -450,7 +544,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
}
|
||||
)
|
||||
tree_response: SerialCommand.Response = await client.call_async(request)
|
||||
uuid_maps = json.loads(tree_response.response)
|
||||
# _t2 = time.time()
|
||||
# self.lab_logger().debug(
|
||||
# f"[AR:{_ar_tag}] 首次上传完成 {((_t2 - _t1) * 1000):.0f}ms"
|
||||
# )
|
||||
uuid_maps = _fast_loads(tree_response.response)
|
||||
plr_instances = rts.to_plr_resources()
|
||||
for plr_instance in plr_instances:
|
||||
self.resource_tracker.loop_update_uuid(plr_instance, uuid_maps)
|
||||
@@ -508,7 +606,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
for input_well, liquid_type, liquid_volume, liquid_input_slot in zip(
|
||||
input_wells, ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT
|
||||
):
|
||||
input_well.set_liquids([(liquid_type, liquid_volume, "uL")])
|
||||
input_well.set_liquids([(liquid_type, liquid_volume, "ul")])
|
||||
final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(
|
||||
input_wells
|
||||
).dump()
|
||||
@@ -527,12 +625,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
Coordinate(location["x"], location["y"], location["z"]),
|
||||
**other_calling_param,
|
||||
)
|
||||
# 调整了液体以及Deck之后要重新Assign
|
||||
# noinspection PyUnresolvedReferences
|
||||
rts_with_parent = ResourceTreeSet.from_plr_resources([plr_instance])
|
||||
if rts_with_parent.root_nodes[0].res_content.uuid_parent is None:
|
||||
rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid
|
||||
request.command = json.dumps(
|
||||
request.command = _fast_dumps_str(
|
||||
{
|
||||
"action": "add",
|
||||
"data": {
|
||||
@@ -542,6 +639,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
},
|
||||
}
|
||||
)
|
||||
# _t4 = time.time()
|
||||
# self.lab_logger().debug(
|
||||
# f"[AR:{_ar_tag}] 二次上传序列化 {_n_parent}节点 {((_t4 - _t3) * 1000):.0f}ms, 发送中..."
|
||||
# )
|
||||
tree_response: SerialCommand.Response = await client.call_async(request)
|
||||
_raw_resp = tree_response.response if tree_response else ""
|
||||
if _raw_resp:
|
||||
@@ -550,8 +651,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
uuid_maps = {}
|
||||
self._lab_logger.warning("Resource tree add 返回空响应,跳过 UUID 映射")
|
||||
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
|
||||
self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
|
||||
# 这里created_resources不包含parent_resource
|
||||
# self._lab_logger.info(
|
||||
# f"[AR:{_ar_tag}] 二次上传完成 HTTP={(_t5 - _t4) * 1000:.0f}ms "
|
||||
# f"UUID映射={len(uuid_maps)}节点 总执行={(_t5 - _t0) * 1000:.0f}ms"
|
||||
# )
|
||||
# 发送给ResourceMeshManager
|
||||
action_client = ActionClient(
|
||||
self,
|
||||
@@ -688,7 +791,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
)
|
||||
# 发送请求并等待响应
|
||||
response: SerialCommand_Response = await self._resource_clients["resource_get"].call_async(r)
|
||||
if not response.response:
|
||||
raise ValueError(f"查询资源 {resource_id} 失败:服务端返回空响应")
|
||||
raw_data = json.loads(response.response)
|
||||
if not raw_data:
|
||||
raise ValueError(f"查询资源 {resource_id} 失败:返回数据为空")
|
||||
|
||||
# 转换为 PLR 资源
|
||||
tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
|
||||
@@ -1137,7 +1244,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
if uid is None:
|
||||
raise ValueError(f"目标物料{target_resource}没有unilabos_uuid属性,无法转运")
|
||||
target_uids.append(uid)
|
||||
srv_address = f"/srv{target_device_id}/s2c_resource_tree"
|
||||
_ns = target_device_id if target_device_id.startswith("/devices/") else f"/devices/{target_device_id.lstrip('/')}"
|
||||
srv_address = f"/srv{_ns}/s2c_resource_tree"
|
||||
sclient = self.create_client(SerialCommand, srv_address)
|
||||
# 等待服务可用(设置超时)
|
||||
if not sclient.wait_for_service(timeout_sec=5.0):
|
||||
@@ -1187,7 +1295,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
return False
|
||||
time.sleep(0.05)
|
||||
self.lab_logger().info(f"资源本地增加到{target_device_id}结果: {response.response}")
|
||||
return None
|
||||
return "转运完成"
|
||||
|
||||
def register_device(self):
|
||||
"""向注册表中注册设备信息"""
|
||||
@@ -1572,37 +1680,75 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
feedback_msg_types = action_type.Feedback.get_fields_and_field_types()
|
||||
result_msg_types = action_type.Result.get_fields_and_field_types()
|
||||
|
||||
while future is not None and not future.done():
|
||||
if goal_handle.is_cancel_requested:
|
||||
self.lab_logger().info(f"取消动作: {action_name}")
|
||||
future.cancel() # 尝试取消线程池中的任务
|
||||
goal_handle.canceled()
|
||||
return action_type.Result()
|
||||
# 低频 feedback timer(10s),不阻塞完成检测
|
||||
_feedback_timer = None
|
||||
|
||||
self._time_spent = time.time() - time_start
|
||||
self._time_remaining = time_overall - self._time_spent
|
||||
def _publish_feedback():
|
||||
if future is not None and not future.done():
|
||||
self._time_spent = time.time() - time_start
|
||||
self._time_remaining = time_overall - self._time_spent
|
||||
feedback_values = {}
|
||||
for msg_name, attr_name in action_value_mapping["feedback"].items():
|
||||
if hasattr(self.driver_instance, f"get_{attr_name}"):
|
||||
method = getattr(self.driver_instance, f"get_{attr_name}")
|
||||
if not asyncio.iscoroutinefunction(method):
|
||||
feedback_values[msg_name] = method()
|
||||
elif hasattr(self.driver_instance, attr_name):
|
||||
feedback_values[msg_name] = getattr(self.driver_instance, attr_name)
|
||||
if self._print_publish:
|
||||
self.lab_logger().info(f"反馈: {feedback_values}")
|
||||
feedback_msg = convert_to_ros_msg_with_mapping(
|
||||
ros_msg_type=action_type.Feedback(),
|
||||
obj=feedback_values,
|
||||
value_mapping=action_value_mapping["feedback"],
|
||||
)
|
||||
goal_handle.publish_feedback(feedback_msg)
|
||||
|
||||
# 发布反馈
|
||||
feedback_values = {}
|
||||
for msg_name, attr_name in action_value_mapping["feedback"].items():
|
||||
if hasattr(self.driver_instance, f"get_{attr_name}"):
|
||||
method = getattr(self.driver_instance, f"get_{attr_name}")
|
||||
if not asyncio.iscoroutinefunction(method):
|
||||
feedback_values[msg_name] = method()
|
||||
elif hasattr(self.driver_instance, attr_name):
|
||||
feedback_values[msg_name] = getattr(self.driver_instance, attr_name)
|
||||
|
||||
if self._print_publish:
|
||||
self.lab_logger().info(f"反馈: {feedback_values}")
|
||||
|
||||
feedback_msg = convert_to_ros_msg_with_mapping(
|
||||
ros_msg_type=action_type.Feedback(),
|
||||
obj=feedback_values,
|
||||
value_mapping=action_value_mapping["feedback"],
|
||||
if action_value_mapping.get("feedback"):
|
||||
_fb_interval = action_value_mapping.get("feedback_interval", 0.5)
|
||||
_feedback_timer = self.create_timer(
|
||||
_fb_interval, _publish_feedback, callback_group=self.callback_group
|
||||
)
|
||||
|
||||
goal_handle.publish_feedback(feedback_msg)
|
||||
time.sleep(0.5)
|
||||
# 等待 action 完成
|
||||
if future is not None:
|
||||
if isinstance(future, Task):
|
||||
# rclpy Task:直接 await,完成瞬间唤醒
|
||||
try:
|
||||
_raw_result = await future
|
||||
except Exception as e:
|
||||
_raw_result = e
|
||||
else:
|
||||
# concurrent.futures.Future(同步 action):用 rclpy 兼容的轮询
|
||||
_poll_future = Future()
|
||||
|
||||
def _on_sync_done(fut):
|
||||
if not _poll_future.done():
|
||||
_poll_future.set_result(None)
|
||||
|
||||
future.add_done_callback(_on_sync_done)
|
||||
await _poll_future
|
||||
try:
|
||||
_raw_result = future.result()
|
||||
except Exception as e:
|
||||
_raw_result = e
|
||||
|
||||
# 确保 execution_error/success 被正确设置(不依赖 done callback 时序)
|
||||
if isinstance(_raw_result, BaseException):
|
||||
if not execution_error:
|
||||
execution_error = traceback.format_exception(
|
||||
type(_raw_result), _raw_result, _raw_result.__traceback__
|
||||
)
|
||||
execution_error = "".join(execution_error)
|
||||
execution_success = False
|
||||
action_return_value = _raw_result
|
||||
elif not execution_error:
|
||||
execution_success = True
|
||||
action_return_value = _raw_result
|
||||
|
||||
# 清理 feedback timer
|
||||
if _feedback_timer is not None:
|
||||
_feedback_timer.cancel()
|
||||
|
||||
if future is not None and future.cancelled():
|
||||
self.lab_logger().info(f"动作 {action_name} 已取消")
|
||||
@@ -1611,8 +1757,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
# self.lab_logger().info(f"动作执行完成: {action_name}")
|
||||
del future
|
||||
|
||||
# 执行失败时跳过物料状态更新
|
||||
if execution_error:
|
||||
execution_success = False
|
||||
|
||||
# 向Host更新物料当前状态
|
||||
if action_name not in ["create_resource_detailed", "create_resource"]:
|
||||
if not execution_error and action_name not in ["create_resource_detailed", "create_resource"]:
|
||||
for k, v in goal.get_fields_and_field_types().items():
|
||||
if v not in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
|
||||
continue
|
||||
@@ -1668,7 +1818,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
|
||||
for attr_name in result_msg_types.keys():
|
||||
if attr_name in ["success", "reached_goal"]:
|
||||
setattr(result_msg, attr_name, True)
|
||||
setattr(result_msg, attr_name, execution_success)
|
||||
elif attr_name == "return_info":
|
||||
setattr(
|
||||
result_msg,
|
||||
@@ -1804,7 +1954,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
raise ValueError("至少需要提供一个 UUID")
|
||||
|
||||
uuids_list = list(uuids)
|
||||
future = self._resource_clients["c2s_update_resource_tree"].call_async(
|
||||
future: Future = self._resource_clients["c2s_update_resource_tree"].call_async(
|
||||
SerialCommand.Request(
|
||||
command=json.dumps(
|
||||
{
|
||||
@@ -1830,6 +1980,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
raise Exception(f"资源查询返回空结果: {uuids_list}")
|
||||
|
||||
raw_data = json.loads(response.response)
|
||||
if not raw_data:
|
||||
raise Exception(f"资源原始查询返回空结果: {raw_data}")
|
||||
|
||||
# 转换为 PLR 资源
|
||||
tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
|
||||
@@ -1851,10 +2003,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
|
||||
mapped_plr_resources = []
|
||||
for uuid in uuids_list:
|
||||
found = None
|
||||
for plr_resource in figured_resources:
|
||||
r = self.resource_tracker.loop_find_with_uuid(plr_resource, uuid)
|
||||
mapped_plr_resources.append(r)
|
||||
break
|
||||
if r is not None:
|
||||
found = r
|
||||
break
|
||||
if found is None:
|
||||
raise Exception(f"未能在已解析的资源树中找到 uuid={uuid} 对应的资源")
|
||||
mapped_plr_resources.append(found)
|
||||
|
||||
return mapped_plr_resources
|
||||
|
||||
@@ -1947,16 +2104,27 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
async def _convert_resource_async(self, resource_data: Dict[str, Any]):
|
||||
"""异步转换资源数据为实例"""
|
||||
# 使用封装的get_resource_with_dir方法获取PLR资源
|
||||
plr_resource = await self.get_resource_with_dir(resource_ids=resource_data["id"], with_children=True)
|
||||
async def _convert_resource_async(self, resource_data: "ResourceDictType"):
|
||||
"""异步转换 ResourceDictType 为 PLR 实例,优先用 uuid 查询"""
|
||||
unilabos_uuid = resource_data.get("uuid")
|
||||
|
||||
if unilabos_uuid:
|
||||
resource_tree = await self.get_resource([unilabos_uuid], with_children=True)
|
||||
plr_resources = resource_tree.to_plr_resources()
|
||||
if plr_resources:
|
||||
plr_resource = plr_resources[0]
|
||||
else:
|
||||
raise ValueError(f"通过 uuid={unilabos_uuid} 查询资源为空")
|
||||
else:
|
||||
res_id = resource_data.get("id") or resource_data.get("name", "")
|
||||
if not res_id:
|
||||
raise ValueError(f"资源数据缺少 uuid 和 id: {list(resource_data.keys())}")
|
||||
plr_resource = await self.get_resource_with_dir(resource_id=res_id, with_children=True)
|
||||
|
||||
# 通过资源跟踪器获取本地实例
|
||||
res = self.resource_tracker.figure_resource(plr_resource, try_mode=True)
|
||||
if len(res) == 0:
|
||||
# todo: 后续通过decoration来区分,减少warning
|
||||
self.lab_logger().warning(f"资源转换未能索引到实例: {resource_data},返回新建实例")
|
||||
self.lab_logger().warning(f"资源转换未能索引到实例: {resource_data.get('id', '?')},返回新建实例")
|
||||
return plr_resource
|
||||
elif len(res) == 1:
|
||||
return res[0]
|
||||
|
||||
Reference in New Issue
Block a user