new registry sys

exp. support with add device
This commit is contained in:
Xuwznln
2026-03-21 19:24:14 +08:00
parent 2c554182d3
commit 0f6264503a
31 changed files with 5453 additions and 1180 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,614 @@
"""
装饰器注册表系统
通过 @device, @action, @resource 装饰器替代 YAML 配置文件来定义设备/动作/资源注册表信息。
Usage:
from unilabos.registry.decorators import (
device, action, resource,
InputHandle, OutputHandle,
ActionInputHandle, ActionOutputHandle,
HardwareInterface, Side, DataSource,
)
@device(
id="solenoid_valve.mock",
category=["pump_and_valve"],
description="模拟电磁阀设备",
handles=[
InputHandle(key="in", data_type="fluid", label="in", side=Side.NORTH),
OutputHandle(key="out", data_type="fluid", label="out", side=Side.SOUTH),
],
hardware_interface=HardwareInterface(
name="hardware_interface",
read="send_command",
write="send_command",
),
)
class SolenoidValveMock:
@action(action_type=EmptyIn)
def close(self):
...
@action(
handles=[
ActionInputHandle(key="in", data_type="fluid", label="in"),
ActionOutputHandle(key="out", data_type="fluid", label="out"),
],
)
def set_valve_position(self, position):
...
# 无 @action 装饰器 => auto- 前缀动作
def is_open(self):
...
"""
from enum import Enum
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, TypeVar
from pydantic import BaseModel, ConfigDict, Field
F = TypeVar("F", bound=Callable[..., Any])
# ---------------------------------------------------------------------------
# 枚举
# ---------------------------------------------------------------------------
class Side(str, Enum):
"""UI 上 Handle 的显示位置"""
NORTH = "NORTH"
SOUTH = "SOUTH"
EAST = "EAST"
WEST = "WEST"
class DataSource(str, Enum):
"""Handle 的数据来源"""
HANDLE = "handle" # 从上游 handle 获取数据 (用于 InputHandle)
EXECUTOR = "executor" # 从执行器输出数据 (用于 OutputHandle)
# ---------------------------------------------------------------------------
# Device / Resource Handle (设备/资源级别端口, 序列化时包含 io_type)
# ---------------------------------------------------------------------------
class _DeviceHandleBase(BaseModel):
"""设备/资源端口基类 (内部使用)"""
model_config = ConfigDict(populate_by_name=True)
key: str = Field(serialization_alias="handler_key")
data_type: str
label: str
side: Optional[Side] = None
data_key: Optional[str] = None
data_source: Optional[str] = None
description: Optional[str] = None
# 子类覆盖
io_type: str = ""
def to_registry_dict(self) -> Dict[str, Any]:
return self.model_dump(by_alias=True, exclude_none=True)
class InputHandle(_DeviceHandleBase):
"""
输入端口 (io_type="target"), 用于 @device / @resource handles
Example:
InputHandle(key="in", data_type="fluid", label="in", side=Side.NORTH)
"""
io_type: str = "target"
class OutputHandle(_DeviceHandleBase):
"""
输出端口 (io_type="source"), 用于 @device / @resource handles
Example:
OutputHandle(key="out", data_type="fluid", label="out", side=Side.SOUTH)
"""
io_type: str = "source"
# ---------------------------------------------------------------------------
# Action Handle (动作级别端口, 序列化时不含 io_type, 按类型自动分组)
# ---------------------------------------------------------------------------
class _ActionHandleBase(BaseModel):
"""动作端口基类 (内部使用)"""
model_config = ConfigDict(populate_by_name=True)
key: str = Field(serialization_alias="handler_key")
data_type: str
label: str
side: Optional[Side] = None
data_key: Optional[str] = None
data_source: Optional[str] = None
description: Optional[str] = None
io_type: Optional[str] = None # source/sink (dataflow) or target/source (device-style)
def to_registry_dict(self) -> Dict[str, Any]:
return self.model_dump(by_alias=True, exclude_none=True)
class ActionInputHandle(_ActionHandleBase):
"""
动作输入端口, 用于 @action handles, 序列化后归入 "input"
Example:
ActionInputHandle(
key="material_input", data_type="workbench_material",
label="物料编号", data_key="material_number", data_source="handle",
)
"""
pass
class ActionOutputHandle(_ActionHandleBase):
"""
动作输出端口, 用于 @action handles, 序列化后归入 "output"
Example:
ActionOutputHandle(
key="station_output", data_type="workbench_station",
label="加热台ID", data_key="station_id", data_source="executor",
)
"""
pass
# ---------------------------------------------------------------------------
# HardwareInterface
# ---------------------------------------------------------------------------
class HardwareInterface(BaseModel):
"""
硬件通信接口定义
描述设备与底层硬件通信的方式 (串口、Modbus 等)。
Example:
HardwareInterface(name="hardware_interface", read="send_command", write="send_command")
"""
name: str
read: Optional[str] = None
write: Optional[str] = None
extra_info: Optional[List[str]] = None
# ---------------------------------------------------------------------------
# 全局注册表 -- 记录所有被装饰器标记的类/函数
# ---------------------------------------------------------------------------
_registered_devices: Dict[str, type] = {} # device_id -> class
_registered_resources: Dict[str, Any] = {} # resource_id -> class or function
def _device_handles_to_list(
handles: Optional[List[_DeviceHandleBase]],
) -> List[Dict[str, Any]]:
"""将设备/资源 Handle 列表序列化为字典列表 (含 io_type)"""
if handles is None:
return []
return [h.to_registry_dict() for h in handles]
def _action_handles_to_dict(
handles: Optional[List[_ActionHandleBase]],
) -> Dict[str, Any]:
"""
将动作 Handle 列表序列化为 {"input": [...], "output": [...]} 格式。
ActionInputHandle => "input", ActionOutputHandle => "output"
"""
if handles is None:
return {}
input_list = [h.to_registry_dict() for h in handles if isinstance(h, ActionInputHandle)]
output_list = [h.to_registry_dict() for h in handles if isinstance(h, ActionOutputHandle)]
result: Dict[str, Any] = {}
if input_list:
result["input"] = input_list
if output_list:
result["output"] = output_list
return result
# ---------------------------------------------------------------------------
# @device 类装饰器
# ---------------------------------------------------------------------------
# noinspection PyShadowingBuiltins
def device(
id: Optional[str] = None,
ids: Optional[List[str]] = None,
id_meta: Optional[Dict[str, Dict[str, Any]]] = None,
category: Optional[List[str]] = None,
description: str = "",
display_name: str = "",
icon: str = "",
version: str = "1.0.0",
handles: Optional[List[_DeviceHandleBase]] = None,
model: Optional[Dict[str, Any]] = None,
device_type: str = "python",
hardware_interface: Optional[HardwareInterface] = None,
):
"""
设备类装饰器
将类标记为一个 UniLab-OS 设备,并附加注册表元数据。
支持两种模式:
1. 单设备: id="xxx", category=[...]
2. 多设备: ids=["id1","id2"], id_meta={"id1":{handles:[...]}, "id2":{...}}
Args:
id: 单设备时的注册表唯一标识
ids: 多设备时的 id 列表,与 id_meta 配合使用
id_meta: 每个 device_id 的覆盖元数据 (handles/description/icon/model)
category: 设备分类标签列表 (必填)
description: 设备描述
display_name: 人类可读的设备显示名称,缺失时默认使用 id
icon: 图标路径
version: 版本号
handles: 设备端口列表 (单设备或 id_meta 未覆盖时使用)
model: 可选的 3D 模型配置
device_type: 设备实现类型 ("python" / "ros2")
hardware_interface: 硬件通信接口 (HardwareInterface)
"""
# Resolve device ids
if ids is not None:
device_ids = list(ids)
if not device_ids:
raise ValueError("@device ids 不能为空")
id_meta = id_meta or {}
elif id is not None:
device_ids = [id]
id_meta = {}
else:
raise ValueError("@device 必须提供 id 或 ids")
if category is None:
raise ValueError("@device category 必填")
base_meta = {
"category": category,
"description": description,
"display_name": display_name,
"icon": icon,
"version": version,
"handles": _device_handles_to_list(handles),
"model": model,
"device_type": device_type,
"hardware_interface": (hardware_interface.model_dump(exclude_none=True) if hardware_interface else None),
}
def decorator(cls):
cls._device_registry_meta = base_meta
cls._device_registry_id_meta = id_meta
cls._device_registry_ids = device_ids
for did in device_ids:
if did in _registered_devices:
raise ValueError(f"@device id 重复: '{did}' 已被 {_registered_devices[did]} 注册")
_registered_devices[did] = cls
return cls
return decorator
# ---------------------------------------------------------------------------
# @action 方法装饰器
# ---------------------------------------------------------------------------
# 区分 "用户没传 action_type" 和 "用户传了 None"
_ACTION_TYPE_UNSET = object()
# noinspection PyShadowingNames
def action(
action_type: Any = _ACTION_TYPE_UNSET,
goal: Optional[Dict[str, str]] = None,
feedback: Optional[Dict[str, str]] = None,
result: Optional[Dict[str, str]] = None,
handles: Optional[List[_ActionHandleBase]] = None,
goal_default: Optional[Dict[str, Any]] = None,
placeholder_keys: Optional[Dict[str, str]] = None,
always_free: bool = False,
is_protocol: bool = False,
description: str = "",
auto_prefix: bool = False,
parent: bool = False,
):
"""
动作方法装饰器
标记方法为注册表动作。有三种用法:
1. @action(action_type=EmptyIn, ...) -- 非 auto, 使用指定 ROS Action 类型
2. @action() -- 非 auto, UniLabJsonCommand (从方法签名生成 schema)
3. 不加 @action -- auto- 前缀, UniLabJsonCommand
Protocol 用法:
@action(action_type=Add, is_protocol=True)
def AddProtocol(self): ...
标记该动作为高级协议 (protocol),运行时通过 ROS Action 路由到
protocol generator 执行。action_type 指向 unilabos_msgs 的 Action 类型。
Args:
action_type: ROS Action 消息类型 (如 EmptyIn, SendCmd, HeatChill).
不传/默认 = UniLabJsonCommand (非 auto).
goal: Goal 字段映射 (ROS字段名 -> 设备参数名).
protocol 模式下可留空,系统自动生成 identity 映射.
feedback: Feedback 字段映射
result: Result 字段映射
handles: 动作端口列表 (ActionInputHandle / ActionOutputHandle)
goal_default: Goal 字段默认值映射 (字段名 -> 默认值), 与自动生成的 goal_default 合并
placeholder_keys: 参数占位符配置
always_free: 是否为永久闲置动作 (不受排队限制)
is_protocol: 是否为工作站协议 (protocol)。True 时运行时走 protocol generator 路径。
description: 动作描述
auto_prefix: 若为 True动作名使用 auto-{method_name} 形式(与无 @action 时一致)
parent: 若为 True当方法参数为空 (*args, **kwargs) 时,通过 MRO 从父类获取真实方法参数
"""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
# action_type 为哨兵值 => 用户没传, 视为 None (UniLabJsonCommand)
resolved_type = None if action_type is _ACTION_TYPE_UNSET else action_type
meta = {
"action_type": resolved_type,
"goal": goal or {},
"feedback": feedback or {},
"result": result or {},
"handles": _action_handles_to_dict(handles),
"goal_default": goal_default or {},
"placeholder_keys": placeholder_keys or {},
"always_free": always_free,
"is_protocol": is_protocol,
"description": description,
"auto_prefix": auto_prefix,
"parent": parent,
}
wrapper._action_registry_meta = meta # type: ignore[attr-defined]
# 设置 _is_always_free 保持与旧 @always_free 装饰器兼容
if always_free:
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
return decorator
def get_action_meta(func) -> Optional[Dict[str, Any]]:
"""获取方法上的 @action 装饰器元数据"""
return getattr(func, "_action_registry_meta", None)
def has_action_decorator(func) -> bool:
"""检查函数是否带有 @action 装饰器"""
return hasattr(func, "_action_registry_meta")
# ---------------------------------------------------------------------------
# @resource 类/函数装饰器
# ---------------------------------------------------------------------------
def resource(
id: str,
category: List[str],
description: str = "",
icon: str = "",
version: str = "1.0.0",
handles: Optional[List[_DeviceHandleBase]] = None,
model: Optional[Dict[str, Any]] = None,
class_type: str = "pylabrobot",
):
"""
资源类/函数装饰器
将类或工厂函数标记为一个 UniLab-OS 资源,附加注册表元数据。
Args:
id: 注册表唯一标识 (必填, 不可重复)
category: 资源分类标签列表 (必填)
description: 资源描述
icon: 图标路径
version: 版本号
handles: 端口列表 (InputHandle / OutputHandle)
model: 可选的 3D 模型配置
class_type: 资源实现类型 ("python" / "pylabrobot" / "unilabos")
"""
def decorator(obj):
meta = {
"resource_id": id,
"category": category,
"description": description,
"icon": icon,
"version": version,
"handles": _device_handles_to_list(handles),
"model": model,
"class_type": class_type,
}
obj._resource_registry_meta = meta
if id in _registered_resources:
raise ValueError(f"@resource id 重复: '{id}' 已被 {_registered_resources[id]} 注册")
_registered_resources[id] = obj
return obj
return decorator
def get_device_meta(cls, device_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
获取类上的 @device 装饰器元数据。
当 device_id 存在且类使用 ids+id_meta 时,返回合并后的 meta
(base_meta 与 id_meta[device_id] 深度合并)。
"""
base = getattr(cls, "_device_registry_meta", None)
if base is None:
return None
id_meta = getattr(cls, "_device_registry_id_meta", None) or {}
if device_id is None or device_id not in id_meta:
result = dict(base)
ids = getattr(cls, "_device_registry_ids", None)
result["device_id"] = device_id if device_id is not None else (ids[0] if ids else None)
return result
overrides = id_meta[device_id]
result = dict(base)
result["device_id"] = device_id
for key in ["handles", "description", "icon", "model"]:
if key in overrides:
val = overrides[key]
if key == "handles" and isinstance(val, list):
# handles 必须是 Handle 对象列表
result[key] = [h.to_registry_dict() for h in val]
else:
result[key] = val
return result
def get_resource_meta(obj) -> Optional[Dict[str, Any]]:
"""获取对象上的 @resource 装饰器元数据"""
return getattr(obj, "_resource_registry_meta", None)
def get_all_registered_devices() -> Dict[str, type]:
"""获取所有已注册的设备类"""
return _registered_devices.copy()
def get_all_registered_resources() -> Dict[str, Any]:
"""获取所有已注册的资源"""
return _registered_resources.copy()
def clear_registry():
"""清空全局注册表 (用于测试)"""
_registered_devices.clear()
_registered_resources.clear()
# ---------------------------------------------------------------------------
# topic_config / not_action / always_free 装饰器
# ---------------------------------------------------------------------------
def topic_config(
period: Optional[float] = None,
print_publish: Optional[bool] = None,
qos: Optional[int] = None,
name: Optional[str] = None,
) -> Callable[[F], F]:
"""
Topic发布配置装饰器
用于装饰 get_{attr_name} 方法或 @property控制对应属性的ROS topic发布行为。
Args:
period: 发布周期。None 表示使用默认值 5.0
print_publish: 是否打印发布日志。None 表示使用节点默认配置
qos: QoS深度配置。None 表示使用默认值 10
name: 自定义发布名称。None 表示使用方法名(去掉 get_ 前缀)
Note:
与 @property 连用时,@topic_config 必须放在 @property 下面,
这样装饰器执行顺序为:先 topic_config 添加配置,再 property 包装。
"""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._topic_period = period # type: ignore[attr-defined]
wrapper._topic_print_publish = print_publish # type: ignore[attr-defined]
wrapper._topic_qos = qos # type: ignore[attr-defined]
wrapper._topic_name = name # type: ignore[attr-defined]
wrapper._has_topic_config = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
return decorator
def get_topic_config(func) -> dict:
"""获取函数上的 topic 配置 (period, print_publish, qos, name)"""
if hasattr(func, "_has_topic_config") and getattr(func, "_has_topic_config", False):
return {
"period": getattr(func, "_topic_period", None),
"print_publish": getattr(func, "_topic_print_publish", None),
"qos": getattr(func, "_topic_qos", None),
"name": getattr(func, "_topic_name", None),
}
return {}
def always_free(func: F) -> F:
"""
标记动作为永久闲置(不受busy队列限制)的装饰器
被此装饰器标记的 action 方法,在执行时不会受到设备级别的排队限制,
任何时候请求都可以立即执行。适用于查询类、状态读取类等轻量级操作。
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_always_free = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_always_free(func) -> bool:
"""检查函数是否被标记为永久闲置"""
return getattr(func, "_is_always_free", False)
def not_action(func: F) -> F:
"""
标记方法为非动作的装饰器
用于装饰 driver 类中的方法,使其在注册表扫描时不被识别为动作。
适用于辅助方法、内部工具方法等不应暴露为设备动作的公共方法。
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper._is_not_action = True # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
def is_not_action(func) -> bool:
"""检查函数是否被标记为非动作"""
return getattr(func, "_is_not_action", False)

File diff suppressed because it is too large Load Diff

699
unilabos/registry/utils.py Normal file
View File

@@ -0,0 +1,699 @@
"""
注册表工具函数
从 registry.py 中提取的纯工具函数,包括:
- docstring 解析
- 类型字符串 → JSON Schema 转换
- AST 类型节点解析
- TypedDict / Slot / Handle 等辅助检测
"""
import inspect
import logging
import re
import typing
from typing import Any, Dict, List, Optional, Tuple, Union
from msgcenterpy.instances.typed_dict_instance import TypedDictMessageInstance
from unilabos.utils.cls_creator import import_class
_logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 异常
# ---------------------------------------------------------------------------
class ROSMsgNotFound(Exception):
pass
# ---------------------------------------------------------------------------
# Docstring 解析 (Google-style)
# ---------------------------------------------------------------------------
_SECTION_RE = re.compile(r"^(\w[\w\s]*):\s*$")
def parse_docstring(docstring: Optional[str]) -> Dict[str, Any]:
"""
解析 Google-style docstring提取描述和参数说明。
Returns:
{"description": "短描述", "params": {"param1": "参数1描述", ...}}
"""
result: Dict[str, Any] = {"description": "", "params": {}}
if not docstring:
return result
lines = docstring.strip().splitlines()
if not lines:
return result
result["description"] = lines[0].strip()
in_args = False
current_param: Optional[str] = None
current_desc_parts: list = []
for line in lines[1:]:
stripped = line.strip()
section_match = _SECTION_RE.match(stripped)
if section_match:
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
current_param = None
current_desc_parts = []
section_name = section_match.group(1).lower()
in_args = section_name in ("args", "arguments", "parameters", "params")
continue
if not in_args:
continue
if ":" in stripped and not stripped.startswith(" "):
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
param_part, _, desc_part = stripped.partition(":")
param_name = param_part.strip().split("(")[0].strip()
current_param = param_name
current_desc_parts = [desc_part.strip()]
elif current_param is not None:
aline = line
if aline.startswith(" "):
aline = aline[4:]
elif aline.startswith("\t"):
aline = aline[1:]
current_desc_parts.append(aline.strip())
if current_param is not None:
result["params"][current_param] = "\n".join(current_desc_parts).strip()
return result
# ---------------------------------------------------------------------------
# 类型常量
# ---------------------------------------------------------------------------
SIMPLE_TYPE_MAP = {
"str": "string",
"string": "string",
"int": "integer",
"integer": "integer",
"float": "number",
"number": "number",
"bool": "boolean",
"boolean": "boolean",
"list": "array",
"array": "array",
"dict": "object",
"object": "object",
}
ARRAY_TYPES = {"list", "List", "tuple", "Tuple", "set", "Set", "Sequence", "Iterable"}
OBJECT_TYPES = {"dict", "Dict", "Mapping"}
WRAPPER_TYPES = {"Optional"}
SLOT_TYPES = {"ResourceSlot", "DeviceSlot"}
# ---------------------------------------------------------------------------
# 简单类型映射
# ---------------------------------------------------------------------------
def get_json_schema_type(type_str: str) -> str:
"""简单类型名 -> JSON Schema type"""
return SIMPLE_TYPE_MAP.get(type_str.lower(), "string")
# ---------------------------------------------------------------------------
# AST 类型解析
# ---------------------------------------------------------------------------
def parse_type_node(type_str: str):
"""将类型注解字符串解析为 AST 节点,失败返回 None。"""
import ast as _ast
try:
return _ast.parse(type_str.strip(), mode="eval").body
except Exception:
return None
def _collect_bitor(node, out: list):
"""递归收集 X | Y | Z 的所有分支。"""
import ast as _ast
if isinstance(node, _ast.BinOp) and isinstance(node.op, _ast.BitOr):
_collect_bitor(node.left, out)
_collect_bitor(node.right, out)
else:
out.append(node)
def type_node_to_schema(
node,
import_map: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""将 AST 类型注解节点递归转换为 JSON Schema dict。
当提供 import_map 时,对于未知类名会尝试通过 import_map 解析模块路径,
然后 import 真实类型对象来生成 schema (支持 TypedDict 等)。
映射规则:
- Optional[X] → X 的 schema (剥掉 Optional)
- Union[X, Y] → {"anyOf": [X_schema, Y_schema]}
- List[X] / Tuple[X] / Set[X] → {"type": "array", "items": X_schema}
- Dict[K, V] → {"type": "object", "additionalProperties": V_schema}
- Literal["a", "b"] → {"type": "string", "enum": ["a", "b"]}
- TypedDict (via import_map) → {"type": "object", "properties": {...}}
- 基本类型 str/int/... → {"type": "string"/"integer"/...}
"""
import ast as _ast
# --- Name 节点: str / int / dict / ResourceSlot / 自定义类 ---
if isinstance(node, _ast.Name):
name = node.id
if name in SLOT_TYPES:
return {"$slot": name}
json_type = SIMPLE_TYPE_MAP.get(name.lower())
if json_type:
return {"type": json_type}
# 尝试通过 import_map 解析并 import 真实类型
if import_map and name in import_map:
type_obj = resolve_type_object(import_map[name])
if type_obj is not None:
return type_to_schema(type_obj)
# 未知类名 → 无法转 schema 的自定义类型默认当 object
return {"type": "object"}
if isinstance(node, _ast.Constant):
if isinstance(node.value, str):
return {"type": SIMPLE_TYPE_MAP.get(node.value.lower(), "string")}
return {"type": "string"}
# --- Subscript 节点: List[X], Dict[K,V], Optional[X], Literal[...] 等 ---
if isinstance(node, _ast.Subscript):
base_name = node.value.id if isinstance(node.value, _ast.Name) else ""
# Optional[X] → 剥掉
if base_name in WRAPPER_TYPES:
return type_node_to_schema(node.slice, import_map)
# Union[X, None] → 剥掉 None; Union[X, Y] → anyOf
if base_name == "Union":
elts = node.slice.elts if isinstance(node.slice, _ast.Tuple) else [node.slice]
non_none = [
e
for e in elts
if not (isinstance(e, _ast.Constant) and e.value is None)
and not (isinstance(e, _ast.Name) and e.id == "None")
]
if len(non_none) == 1:
return type_node_to_schema(non_none[0], import_map)
if len(non_none) > 1:
return {"anyOf": [type_node_to_schema(e, import_map) for e in non_none]}
return {"type": "string"}
# Literal["a", "b", 1] → enum
if base_name == "Literal":
elts = node.slice.elts if isinstance(node.slice, _ast.Tuple) else [node.slice]
values = []
for e in elts:
if isinstance(e, _ast.Constant):
values.append(e.value)
elif isinstance(e, _ast.Name):
values.append(e.id)
if values:
return {"type": "string", "enum": values}
return {"type": "string"}
# List / Tuple / Set → array
if base_name in ARRAY_TYPES:
if isinstance(node.slice, _ast.Tuple) and node.slice.elts:
inner_node = node.slice.elts[0]
else:
inner_node = node.slice
return {"type": "array", "items": type_node_to_schema(inner_node, import_map)}
# Dict → object
if base_name in OBJECT_TYPES:
schema: Dict[str, Any] = {"type": "object"}
if isinstance(node.slice, _ast.Tuple) and len(node.slice.elts) >= 2:
val_node = node.slice.elts[1]
# Dict[str, Any] → 不加 additionalProperties (Any 等同于无约束)
is_any = (isinstance(val_node, _ast.Name) and val_node.id == "Any") or (
isinstance(val_node, _ast.Constant) and val_node.value is None
)
if not is_any:
val_schema = type_node_to_schema(val_node, import_map)
schema["additionalProperties"] = val_schema
return schema
# --- BinOp: X | Y (Python 3.10+) → 当 Union 处理 ---
if isinstance(node, _ast.BinOp) and isinstance(node.op, _ast.BitOr):
parts: list = []
_collect_bitor(node, parts)
non_none = [
p
for p in parts
if not (isinstance(p, _ast.Constant) and p.value is None)
and not (isinstance(p, _ast.Name) and p.id == "None")
]
if len(non_none) == 1:
return type_node_to_schema(non_none[0], import_map)
if len(non_none) > 1:
return {"anyOf": [type_node_to_schema(p, import_map) for p in non_none]}
return {"type": "string"}
return {"type": "string"}
# ---------------------------------------------------------------------------
# 真实类型对象解析 (import-based)
# ---------------------------------------------------------------------------
def resolve_type_object(type_ref: str) -> Optional[Any]:
"""通过 'module.path:ClassName' 格式的引用 import 并返回真实类型对象。
对于 typing 内置名 (str, int, List 等) 直接返回 None (由 AST 路径处理)。
import 失败时静默返回 None。
"""
if ":" not in type_ref:
return None
try:
return import_class(type_ref)
except Exception:
return None
def is_typed_dict_class(obj: Any) -> bool:
"""检查对象是否是 TypedDict 类。"""
if obj is None:
return False
try:
from typing_extensions import is_typeddict
return is_typeddict(obj)
except ImportError:
if isinstance(obj, type):
return hasattr(obj, "__required_keys__") and hasattr(obj, "__optional_keys__")
return False
def type_to_schema(tp: Any) -> Dict[str, Any]:
"""将真实 typing 对象递归转换为 JSON Schema dict。
支持:
- 基本类型: str, int, float, bool → {"type": "string"/"integer"/...}
- typing 泛型: List[X], Dict[K,V], Optional[X], Union[X,Y], Literal[...]
- TypedDict → {"type": "object", "properties": {...}, "required": [...]}
- 自定义类 (ResourceSlot 等) → {"$slot": "..."} 或 {"type": "string"}
"""
origin = getattr(tp, "__origin__", None)
args = getattr(tp, "__args__", None)
# --- None / NoneType ---
if tp is type(None):
return {"type": "null"}
# --- 基本类型 ---
if tp is str:
return {"type": "string"}
if tp is int:
return {"type": "integer"}
if tp is float:
return {"type": "number"}
if tp is bool:
return {"type": "boolean"}
# --- TypedDict ---
if is_typed_dict_class(tp):
try:
return TypedDictMessageInstance.get_json_schema_from_typed_dict(tp)
except Exception:
return {"type": "object"}
# --- Literal ---
if origin is typing.Literal:
values = list(args) if args else []
return {"type": "string", "enum": values}
# --- Optional / Union ---
if origin is typing.Union:
non_none = [a for a in (args or ()) if a is not type(None)]
if len(non_none) == 1:
return type_to_schema(non_none[0])
if len(non_none) > 1:
return {"anyOf": [type_to_schema(a) for a in non_none]}
return {"type": "string"}
# --- List / Sequence / Set / Tuple / Iterable ---
if origin in (list, tuple, set, frozenset) or (
origin is not None
and getattr(origin, "__name__", "") in ("Sequence", "Iterable", "Iterator", "MutableSequence")
):
if args:
return {"type": "array", "items": type_to_schema(args[0])}
return {"type": "array"}
# --- Dict / Mapping ---
if origin in (dict,) or (origin is not None and getattr(origin, "__name__", "") in ("Mapping", "MutableMapping")):
schema: Dict[str, Any] = {"type": "object"}
if args and len(args) >= 2:
schema["additionalProperties"] = type_to_schema(args[1])
return schema
# --- Slot 类型 ---
if isinstance(tp, type):
name = tp.__name__
if name in SLOT_TYPES:
return {"$slot": name}
# --- 其他未知类型 fallback ---
if isinstance(tp, type):
return {"type": "object"}
return {"type": "string"}
# ---------------------------------------------------------------------------
# Slot / Placeholder 检测
# ---------------------------------------------------------------------------
def detect_slot_type(ptype) -> Tuple[Optional[str], bool]:
"""检测参数类型是否为 ResourceSlot / DeviceSlot。
兼容多种格式:
- runtime: "unilabos.registry.placeholder_type:ResourceSlot"
- runtime tuple: ("list", "unilabos.registry.placeholder_type:ResourceSlot")
- AST 裸名: "ResourceSlot", "List[ResourceSlot]", "Optional[ResourceSlot]"
Returns: (slot_name | None, is_list)
"""
ptype_str = str(ptype)
# 快速路径: 字符串里根本没有 Slot
if "ResourceSlot" not in ptype_str and "DeviceSlot" not in ptype_str:
return (None, False)
# runtime 格式: 完整模块路径
if isinstance(ptype, str):
if ptype.endswith(":ResourceSlot") or ptype == "ResourceSlot":
return ("ResourceSlot", False)
if ptype.endswith(":DeviceSlot") or ptype == "DeviceSlot":
return ("DeviceSlot", False)
# AST 复杂格式: List[ResourceSlot], Optional[ResourceSlot] 等
if "[" in ptype:
node = parse_type_node(ptype)
if node is not None:
schema = type_node_to_schema(node)
# 直接是 slot
if "$slot" in schema:
return (schema["$slot"], False)
# array 包裹 slot: {"type": "array", "items": {"$slot": "..."}}
items = schema.get("items", {})
if isinstance(items, dict) and "$slot" in items:
return (items["$slot"], True)
return (None, False)
# runtime tuple 格式
if isinstance(ptype, tuple) and len(ptype) == 2:
inner_str = str(ptype[1])
if "ResourceSlot" in inner_str:
return ("ResourceSlot", True)
if "DeviceSlot" in inner_str:
return ("DeviceSlot", True)
return (None, False)
def detect_placeholder_keys(params: list) -> Dict[str, str]:
"""Detect parameters that reference ResourceSlot or DeviceSlot."""
result: Dict[str, str] = {}
for p in params:
ptype = p.get("type", "")
if "ResourceSlot" in str(ptype):
result[p["name"]] = "unilabos_resources"
elif "DeviceSlot" in str(ptype):
result[p["name"]] = "unilabos_devices"
return result
# ---------------------------------------------------------------------------
# Handle 规范化
# ---------------------------------------------------------------------------
def normalize_ast_handles(handles_raw: Any) -> List[Dict[str, Any]]:
"""Convert AST-parsed handle structures to the standard registry format."""
if not handles_raw:
return []
# handle_type → io_type 映射 (AST 内部类名 → YAML 标准字段值)
_HANDLE_TYPE_TO_IO_TYPE = {
"input": "target",
"output": "source",
"action_input": "action_target",
"action_output": "action_source",
}
result: List[Dict[str, Any]] = []
for h in handles_raw:
if isinstance(h, dict):
call = h.get("_call", "")
if "InputHandle" in call:
handle_type = "input"
elif "OutputHandle" in call:
handle_type = "output"
elif "ActionInputHandle" in call:
handle_type = "action_input"
elif "ActionOutputHandle" in call:
handle_type = "action_output"
else:
handle_type = h.get("handle_type", "unknown")
io_type = _HANDLE_TYPE_TO_IO_TYPE.get(handle_type, handle_type)
entry: Dict[str, Any] = {
"handler_key": h.get("key", ""),
"data_type": h.get("data_type", ""),
"io_type": io_type,
}
side = h.get("side")
if side:
if isinstance(side, str) and "." in side:
val = side.rsplit(".", 1)[-1]
side = val.lower() if val in ("LEFT", "RIGHT", "TOP", "BOTTOM") else val
entry["side"] = side
label = h.get("label")
if label:
entry["label"] = label
data_key = h.get("data_key")
if data_key:
entry["data_key"] = data_key
data_source = h.get("data_source")
if data_source:
if isinstance(data_source, str) and "." in data_source:
val = data_source.rsplit(".", 1)[-1]
data_source = val.lower() if val in ("HANDLE", "EXECUTOR") else val
entry["data_source"] = data_source
description = h.get("description")
if description:
entry["description"] = description
result.append(entry)
return result
def normalize_ast_action_handles(handles_raw: Any) -> Dict[str, Any]:
"""Convert AST-parsed action handle list to {"input": [...], "output": [...]}.
Mirrors the runtime behavior of decorators._action_handles_to_dict:
- ActionInputHandle => grouped under "input"
- ActionOutputHandle => grouped under "output"
Field mapping: key -> handler_key (matches Pydantic serialization_alias).
"""
if not handles_raw or not isinstance(handles_raw, list):
return {}
input_list: List[Dict[str, Any]] = []
output_list: List[Dict[str, Any]] = []
for h in handles_raw:
if not isinstance(h, dict):
continue
call = h.get("_call", "")
is_input = "ActionInputHandle" in call or "InputHandle" in call
is_output = "ActionOutputHandle" in call or "OutputHandle" in call
entry: Dict[str, Any] = {
"handler_key": h.get("key", ""),
"data_type": h.get("data_type", ""),
"label": h.get("label", ""),
}
for opt_key in ("side", "data_key", "data_source", "description", "io_type"):
val = h.get(opt_key)
if val is not None:
# Only resolve enum-style refs (e.g. DataSource.HANDLE -> handle) for data_source/side
# data_key values like "wells.@flatten", "@this.0@@@plate" must be preserved as-is
if (
isinstance(val, str)
and "." in val
and opt_key not in ("io_type", "data_key")
):
val = val.rsplit(".", 1)[-1].lower()
entry[opt_key] = val
# io_type: only add when explicitly set; do not default output to "sink" (YAML convention omits it)
if "io_type" not in entry and is_input:
entry["io_type"] = "source"
if is_input:
input_list.append(entry)
elif is_output:
output_list.append(entry)
result: Dict[str, Any] = {}
if input_list:
result["input"] = input_list
# Always include output (empty list when no outputs) to match YAML
result["output"] = output_list
return result
# ---------------------------------------------------------------------------
# Schema 辅助
# ---------------------------------------------------------------------------
def wrap_action_schema(
goal_schema: Dict[str, Any],
action_name: str,
description: str = "",
result_schema: Optional[Dict[str, Any]] = None,
feedback_schema: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
将 goal 参数 schema 包装为标准的 action schema 格式:
{ "properties": { "goal": ..., "feedback": ..., "result": ... }, ... }
"""
# 去掉 auto- 前缀用于 title/description与 YAML 路径保持一致
display_name = action_name.removeprefix("auto-")
return {
"title": f"{display_name}参数",
"description": description or f"{display_name}的参数schema",
"type": "object",
"properties": {
"goal": goal_schema,
"feedback": feedback_schema or {},
"result": result_schema or {},
},
"required": ["goal"],
}
def preserve_field_descriptions(new_schema: Dict[str, Any], prev_schema: Dict[str, Any]):
"""保留之前 schema 中的 field descriptions"""
if not prev_schema or not new_schema:
return
prev_props = prev_schema.get("properties", {})
new_props = new_schema.get("properties", {})
for field_name, prev_field in prev_props.items():
if field_name in new_props and "title" in prev_field:
new_props[field_name].setdefault("title", prev_field["title"])
# ---------------------------------------------------------------------------
# 深度对比
# ---------------------------------------------------------------------------
def _short(val, limit=120):
"""截断过长的值用于日志显示。"""
s = repr(val)
return s if len(s) <= limit else s[:limit] + "..."
def deep_diff(old, new, path="", max_depth=10) -> list:
"""递归对比两个对象,返回所有差异的描述列表。"""
diffs = []
if max_depth <= 0:
if old != new:
diffs.append(f"{path}: (达到最大深度) OLD≠NEW")
return diffs
if type(old) != type(new):
diffs.append(f"{path}: 类型不同 OLD={type(old).__name__}({_short(old)}) NEW={type(new).__name__}({_short(new)})")
return diffs
if isinstance(old, dict):
old_keys = set(old.keys())
new_keys = set(new.keys())
for k in sorted(new_keys - old_keys):
diffs.append(f"{path}.{k}: 新增字段 (AST有, YAML无) = {_short(new[k])}")
for k in sorted(old_keys - new_keys):
diffs.append(f"{path}.{k}: 缺失字段 (YAML有, AST无) = {_short(old[k])}")
for k in sorted(old_keys & new_keys):
diffs.extend(deep_diff(old[k], new[k], f"{path}.{k}", max_depth - 1))
elif isinstance(old, (list, tuple)):
if len(old) != len(new):
diffs.append(f"{path}: 列表长度不同 OLD={len(old)} NEW={len(new)}")
for i in range(min(len(old), len(new))):
diffs.extend(deep_diff(old[i], new[i], f"{path}[{i}]", max_depth - 1))
if len(new) > len(old):
for i in range(len(old), len(new)):
diffs.append(f"{path}[{i}]: 新增元素 = {_short(new[i])}")
elif len(old) > len(new):
for i in range(len(new), len(old)):
diffs.append(f"{path}[{i}]: 缺失元素 = {_short(old[i])}")
else:
if old != new:
diffs.append(f"{path}: OLD={_short(old)} NEW={_short(new)}")
return diffs
# ---------------------------------------------------------------------------
# MRO 方法参数解析
# ---------------------------------------------------------------------------
def resolve_method_params_via_import(module_str: str, method_name: str) -> Dict[str, str]:
"""当 AST 方法参数为空 (如 *args, **kwargs) 时, import class 并通过 MRO 获取真实方法参数.
返回 identity mapping {param_name: param_name}.
"""
if not module_str or ":" not in module_str:
return {}
try:
cls = import_class(module_str)
except Exception as e:
_logger.debug(f"[AST] resolve_method_params_via_import: import_class('{module_str}') failed: {e}")
return {}
try:
for base_cls in cls.__mro__:
if method_name not in base_cls.__dict__:
continue
method = base_cls.__dict__[method_name]
actual = getattr(method, "__wrapped__", method)
if isinstance(actual, (staticmethod, classmethod)):
actual = actual.__func__
if not callable(actual):
continue
sig = inspect.signature(actual, follow_wrapped=True)
params = [
p.name for p in sig.parameters.values()
if p.name not in ("self", "cls")
and p.kind not in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD)
]
if params:
return {p: p for p in params}
except Exception as e:
_logger.debug(f"[AST] resolve_method_params_via_import: MRO walk for '{method_name}' failed: {e}")
return {}