新增manual_confirm

This commit is contained in:
Xuwznln
2026-03-24 23:01:28 +08:00
parent b9d9666003
commit ff1e21fcd8
5 changed files with 83 additions and 30 deletions

View File

@@ -679,14 +679,17 @@ def _resolve_name(name: str, import_map: Dict[str, str]) -> str:
return name
_DECORATOR_ENUM_CLASSES = frozenset({"Side", "DataSource", "NodeType"})
def _resolve_attribute(node: ast.Attribute, import_map: Dict[str, str]) -> str:
"""
Resolve an attribute access like Side.NORTH or DataSource.HANDLE.
Returns a string like "NORTH" for enum values, or
"module.path:Class.attr" for imported references.
对于来自 ``unilabos.registry.decorators`` 的枚举类 (Side / DataSource / NodeType)
直接返回枚举成员名 (如 ``"NORTH"`` / ``"HANDLE"`` / ``"MANUAL_CONFIRM"``)
省去消费端二次 rsplit 解析。其它 import 仍返回完整模块路径。
"""
# Get the full dotted path
parts = []
current = node
while isinstance(current, ast.Attribute):
@@ -696,21 +699,20 @@ def _resolve_attribute(node: ast.Attribute, import_map: Dict[str, str]) -> str:
parts.append(current.id)
parts.reverse()
# parts = ["Side", "NORTH"] or ["DataSource", "HANDLE"]
# parts = ["Side", "NORTH"] or ["DataSource", "HANDLE"] or ["NodeType", "MANUAL_CONFIRM"]
if len(parts) >= 2:
base = parts[0]
attr = ".".join(parts[1:])
# If the base is an imported name, resolve it
if base in _DECORATOR_ENUM_CLASSES:
source = import_map.get(base, "")
if not source or _REGISTRY_DECORATOR_MODULE in source:
return parts[-1]
if base in import_map:
return f"{import_map[base]}.{attr}"
# For known enum-like patterns, return just the value
# e.g. Side.NORTH -> "NORTH"
if base in ("Side", "DataSource"):
return parts[-1]
return ".".join(parts)

View File

@@ -8,7 +8,7 @@ Usage:
device, action, resource,
InputHandle, OutputHandle,
ActionInputHandle, ActionOutputHandle,
HardwareInterface, Side, DataSource,
HardwareInterface, Side, DataSource, NodeType,
)
@device(
@@ -73,6 +73,13 @@ class DataSource(str, Enum):
EXECUTOR = "executor" # 从执行器输出数据 (用于 OutputHandle)
class NodeType(str, Enum):
"""动作的节点类型(用于区分 ILab 节点和人工确认节点等)"""
ILAB = "ILab"
MANUAL_CONFIRM = "manual_confirm"
# ---------------------------------------------------------------------------
# Device / Resource Handle (设备/资源级别端口, 序列化时包含 io_type)
# ---------------------------------------------------------------------------
@@ -335,6 +342,7 @@ def action(
description: str = "",
auto_prefix: bool = False,
parent: bool = False,
node_type: Optional["NodeType"] = None,
):
"""
动作方法装饰器
@@ -365,6 +373,8 @@ def action(
description: 动作描述
auto_prefix: 若为 True动作名使用 auto-{method_name} 形式(与无 @action 时一致)
parent: 若为 True当方法参数为空 (*args, **kwargs) 时,通过 MRO 从父类获取真实方法参数
node_type: 动作的节点类型 (NodeType.ILAB / NodeType.MANUAL_CONFIRM)。
不填写时不写入注册表。
"""
def decorator(func: F) -> F:
@@ -389,6 +399,8 @@ def action(
"auto_prefix": auto_prefix,
"parent": parent,
}
if node_type is not None:
meta["node_type"] = node_type.value if isinstance(node_type, NodeType) else str(node_type)
wrapper._action_registry_meta = meta # type: ignore[attr-defined]
# 设置 _is_always_free 保持与旧 @always_free 装饰器兼容
@@ -515,6 +527,38 @@ def clear_registry():
_registered_resources.clear()
# ---------------------------------------------------------------------------
# 枚举值归一化
# ---------------------------------------------------------------------------
def normalize_enum_value(raw: Any, enum_cls) -> Optional[str]:
"""将 AST 提取的枚举成员名 / YAML 值字符串 / 旧格式长路径统一归一化为枚举值。
适用于 Side、DataSource、NodeType 等继承自 ``str, Enum`` 的装饰器枚举。
处理以下格式:
- "MANUAL_CONFIRM" → NodeType["MANUAL_CONFIRM"].value = "manual_confirm"
- "manual_confirm" → NodeType("manual_confirm").value = "manual_confirm"
- "HANDLE" → DataSource["HANDLE"].value = "handle"
- "NORTH" → Side["NORTH"].value = "NORTH"
- 旧缓存长路径 "unilabos...NodeType.MANUAL_CONFIRM" → 先 rsplit 再查找
"""
if not raw:
return None
raw_str = str(raw)
if "." in raw_str:
raw_str = raw_str.rsplit(".", 1)[-1]
try:
return enum_cls[raw_str].value
except KeyError:
pass
try:
return enum_cls(raw_str).value
except ValueError:
return raw_str
# ---------------------------------------------------------------------------
# topic_config / not_action / always_free 装饰器
# ---------------------------------------------------------------------------

View File

@@ -33,6 +33,8 @@ from unilabos.registry.decorators import (
is_not_action,
is_always_free,
get_topic_config,
NodeType,
normalize_enum_value,
)
from unilabos.registry.utils import (
ROSMsgNotFound,
@@ -159,9 +161,10 @@ class Registry:
ast_entry = self.device_type_registry.get("host_node", {})
ast_actions = ast_entry.get("class", {}).get("action_value_mappings", {})
# 取出 AST 生成的 auto-method entries, 补充特定覆写
# 取出 AST 生成的 action entries, 补充特定覆写
test_latency_action = ast_actions.get("auto-test_latency", {})
test_resource_action = ast_actions.get("auto-test_resource", {})
manual_confirm_action = ast_actions.get("manual_confirm", {})
test_resource_action["handles"] = {
"input": [
{
@@ -237,6 +240,7 @@ class Registry:
},
"test_latency": test_latency_action,
"auto-test_resource": test_resource_action,
"manual_confirm": manual_confirm_action,
},
"init_params": {},
},
@@ -847,6 +851,9 @@ class Registry:
}
if (action_args or {}).get("always_free") or method_info.get("always_free"):
entry["always_free"] = True
nt = normalize_enum_value((action_args or {}).get("node_type"), NodeType)
if nt:
entry["node_type"] = nt
return action_name, entry
# 1) auto- actions
@@ -971,6 +978,9 @@ class Registry:
}
if action_args.get("always_free") or method_info.get("always_free"):
action_entry["always_free"] = True
nt = normalize_enum_value(action_args.get("node_type"), NodeType)
if nt:
action_entry["node_type"] = nt
action_value_mappings[action_name] = action_entry
action_value_mappings = dict(sorted(action_value_mappings.items()))
@@ -1153,7 +1163,7 @@ class Registry:
return Path(BasicConfig.working_dir) / "registry_cache.pkl"
return None
_CACHE_VERSION = 3
_CACHE_VERSION = 4
def _load_config_cache(self) -> dict:
import pickle
@@ -1878,6 +1888,9 @@ class Registry:
}
if v.get("always_free"):
entry["always_free"] = True
old_node_type = old_cfg.get("node_type")
if old_node_type in [NodeType.ILAB.value, NodeType.MANUAL_CONFIRM.value]:
entry["node_type"] = old_node_type
device_config["class"]["action_value_mappings"][action_key] = entry
device_config["init_param_schema"] = {}

View File

@@ -17,6 +17,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union
from msgcenterpy.instances.typed_dict_instance import TypedDictMessageInstance
from unilabos.utils.cls_creator import import_class
from unilabos.registry.decorators import Side, DataSource, normalize_enum_value
_logger = logging.getLogger(__name__)
@@ -487,10 +488,7 @@ def normalize_ast_handles(handles_raw: Any) -> List[Dict[str, Any]]:
}
side = h.get("side")
if side:
if isinstance(side, str) and "." in side:
val = side.rsplit(".", 1)[-1]
side = val.lower() if val in ("LEFT", "RIGHT", "TOP", "BOTTOM") else val
entry["side"] = side
entry["side"] = normalize_enum_value(side, Side) or side
label = h.get("label")
if label:
entry["label"] = label
@@ -499,10 +497,7 @@ def normalize_ast_handles(handles_raw: Any) -> List[Dict[str, Any]]:
entry["data_key"] = data_key
data_source = h.get("data_source")
if data_source:
if isinstance(data_source, str) and "." in data_source:
val = data_source.rsplit(".", 1)[-1]
data_source = val.lower() if val in ("HANDLE", "EXECUTOR") else val
entry["data_source"] = data_source
entry["data_source"] = normalize_enum_value(data_source, DataSource) or data_source
description = h.get("description")
if description:
entry["description"] = description
@@ -537,17 +532,12 @@ def normalize_ast_action_handles(handles_raw: Any) -> Dict[str, Any]:
"data_type": h.get("data_type", ""),
"label": h.get("label", ""),
}
_FIELD_ENUM_MAP = {"side": Side, "data_source": DataSource}
for opt_key in ("side", "data_key", "data_source", "description", "io_type"):
val = h.get(opt_key)
if val is not None:
# Only resolve enum-style refs (e.g. DataSource.HANDLE -> handle) for data_source/side
# data_key values like "wells.@flatten", "@this.0@@@plate" must be preserved as-is
if (
isinstance(val, str)
and "." in val
and opt_key not in ("io_type", "data_key")
):
val = val.rsplit(".", 1)[-1].lower()
if opt_key in _FIELD_ENUM_MAP:
val = normalize_enum_value(val, _FIELD_ENUM_MAP[opt_key]) or val
entry[opt_key] = val
# io_type: only add when explicitly set; do not default output to "sink" (YAML convention omits it)

View File

@@ -24,7 +24,7 @@ from unilabos_msgs.srv import (
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
from unilabos.registry.decorators import device
from unilabos.registry.decorators import device, action, NodeType
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from unilabos.registry.registry import lab_registry
from unilabos.resources.container import RegularContainer
@@ -1621,6 +1621,10 @@ class HostNode(BaseROS2DeviceNode):
}
return res
@action(always_free=True, node_type=NodeType.MANUAL_CONFIRM)
def manual_confirm(self, **kwargs) -> dict:
return kwargs
def test_resource(
self,
sample_uuids: SampleUUIDsType,