add external devices param

fix registry upload missing type
This commit is contained in:
Xuwznln
2026-03-23 15:01:16 +08:00
parent d2f204c5b0
commit 3d8123849a
7 changed files with 1074 additions and 173 deletions

View File

@@ -264,6 +264,12 @@ def parse_args():
default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware",
)
parser.add_argument(
"--external_devices_only",
action="store_true",
default=False,
help="Only load external device packages (--devices), skip built-in unilabos/devices/ scanning and YAML device registry",
)
parser.add_argument(
"--extra_resource",
action="store_true",
@@ -342,11 +348,18 @@ def main():
check_mode = args_dict.get("check_mode", False)
if not skip_env_check:
from unilabos.utils.environment_check import check_environment
from unilabos.utils.environment_check import check_environment, check_device_package_requirements
if not check_environment(auto_install=True):
print_status("环境检查失败,程序退出", "error")
os._exit(1)
# 第一次设备包依赖检查build_registry 之前,确保 import map 可用
devices_dirs_for_req = args_dict.get("devices", None)
if devices_dirs_for_req:
if not check_device_package_requirements(devices_dirs_for_req):
print_status("设备包依赖检查失败,程序退出", "error")
os._exit(1)
else:
print_status("跳过环境依赖检查", "warning")
@@ -477,19 +490,7 @@ def main():
BasicConfig.vis_2d_enable = args_dict["2d_vis"]
BasicConfig.check_mode = check_mode
from unilabos.resources.graphio import (
read_node_link_json,
read_graphml,
dict_from_graph,
)
from unilabos.app.communication import get_communication_client
from unilabos.registry.registry import build_registry
from unilabos.app.backend import start_backend
from unilabos.app.web import http_client
from unilabos.app.web import start_server
from unilabos.app.register import register_devices_and_resources
from unilabos.resources.graphio import modify_to_backend_format
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
# 显示启动横幅
print_unilab_banner(args_dict)
@@ -498,12 +499,14 @@ def main():
# check_mode 和 upload_registry 都会执行实际 import 验证
devices_dirs = args_dict.get("devices", None)
complete_registry = args_dict.get("complete_registry", False) or check_mode
external_only = args_dict.get("external_devices_only", False)
lab_registry = build_registry(
registry_paths=args_dict["registry_path"],
devices_dirs=devices_dirs,
upload_registry=BasicConfig.upload_registry,
check_mode=check_mode,
complete_registry=complete_registry,
external_only=external_only,
)
# Check mode: 注册表验证完成后直接退出
@@ -513,6 +516,20 @@ def main():
print_status(f"Check mode: 注册表验证完成 ({device_count} 设备, {resource_count} 资源),退出", "info")
os._exit(0)
# 以下导入依赖 ROS2 环境check_mode 已退出不需要
from unilabos.resources.graphio import (
read_node_link_json,
read_graphml,
dict_from_graph,
modify_to_backend_format,
)
from unilabos.app.communication import get_communication_client
from unilabos.app.backend import start_backend
from unilabos.app.web import http_client
from unilabos.app.web import start_server
from unilabos.app.register import register_devices_and_resources
from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict
# Step 1: 上传全部注册表到服务端,同步保存到 unilabos_data
if BasicConfig.upload_registry:
if BasicConfig.ak and BasicConfig.sk:
@@ -610,6 +627,10 @@ def main():
resource_tree_set.merge_remote_resources(remote_tree_set)
print_status("远端物料同步完成", "info")
# 第二次设备包依赖检查云端物料同步后community 包可能引入新的 requirements
# TODO: 当 community device package 功能上线后,在这里调用
# install_requirements_txt(community_pkg_path / "requirements.txt", label="community.xxx")
# 使用 ResourceTreeSet 代替 list
args_dict["resources_config"] = resource_tree_set
args_dict["devices_config"] = resource_tree_set

View File

@@ -139,6 +139,7 @@ def scan_directory(
executor: ThreadPoolExecutor = None,
exclude_files: Optional[set] = None,
cache: Optional[Dict[str, Any]] = None,
include_files: Optional[List[Union[str, Path]]] = None,
) -> Dict[str, Any]:
"""
Recursively scan .py files under *root_dir* for @device and @resource
@@ -164,6 +165,7 @@ def scan_directory(
exclude_files: 要排除的文件名集合 (如 {"lab_resources.py"})
cache: Mutable cache dict (``load_scan_cache()`` result). Hits are read
from here; misses are written back so the caller can persist later.
include_files: 指定扫描的文件列表,提供时跳过目录递归收集,直接扫描这些文件。
"""
if executor is None:
raise ValueError("executor is required and must not be None")
@@ -175,7 +177,10 @@ def scan_directory(
python_path = Path(python_path).resolve()
# --- Collect files (depth/count limited) ---
py_files = _collect_py_files(root_dir, max_depth=max_depth, max_files=max_files, exclude_files=exclude_files)
if include_files is not None:
py_files = [Path(f).resolve() for f in include_files if Path(f).resolve().exists()]
else:
py_files = _collect_py_files(root_dir, max_depth=max_depth, max_files=max_files, exclude_files=exclude_files)
cache_files: Dict[str, Any] = cache.get("files", {}) if cache else {}

View File

@@ -112,7 +112,7 @@ class Registry:
# 统一入口
# ------------------------------------------------------------------
def setup(self, devices_dirs=None, upload_registry=False, complete_registry=False):
def setup(self, devices_dirs=None, upload_registry=False, complete_registry=False, external_only=False):
"""统一构建注册表入口。"""
if self._setup_called:
logger.critical("[UniLab Registry] setup方法已被调用过不允许多次调用")
@@ -123,24 +123,27 @@ class Registry:
)
# 1. AST 静态扫描 (快速, 无需 import)
self._run_ast_scan(devices_dirs, upload_registry=upload_registry)
self._run_ast_scan(devices_dirs, upload_registry=upload_registry, external_only=external_only)
# 2. Host node 内置设备
self._setup_host_node()
# 3. YAML 注册表加载 (兼容旧格式)
self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths):
sys_path = path.parent
logger.trace(f"[UniLab Registry] Path {i+1}/{len(self.registry_paths)}: {sys_path}")
sys.path.append(str(sys_path))
self.load_device_types(path, complete_registry=complete_registry)
if BasicConfig.enable_resource_load:
self.load_resource_types(path, upload_registry, complete_registry=complete_registry)
else:
logger.warning(
"[UniLab Registry] 资源加载已禁用 (enable_resource_load=False),跳过资源注册表加载"
)
# 3. YAML 注册表加载 (兼容旧格式) — external_only 模式下跳过
if external_only:
logger.info("[UniLab Registry] external_only 模式: 跳过 YAML 注册表加载")
else:
self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths):
sys_path = path.parent
logger.trace(f"[UniLab Registry] Path {i+1}/{len(self.registry_paths)}: {sys_path}")
sys.path.append(str(sys_path))
self.load_device_types(path, complete_registry=complete_registry)
if BasicConfig.enable_resource_load:
self.load_resource_types(path, upload_registry, complete_registry=complete_registry)
else:
logger.warning(
"[UniLab Registry] 资源加载已禁用 (enable_resource_load=False),跳过资源注册表加载"
)
self._startup_executor.shutdown(wait=True)
self._startup_executor = None
self._setup_called = True
@@ -253,7 +256,7 @@ class Registry:
# AST 静态扫描
# ------------------------------------------------------------------
def _run_ast_scan(self, devices_dirs=None, upload_registry=False):
def _run_ast_scan(self, devices_dirs=None, upload_registry=False, external_only=False):
"""
执行 AST 静态扫描,从 Python 代码中提取 @device / @resource 装饰器元数据。
无需 import 任何驱动模块,速度极快。
@@ -298,16 +301,30 @@ class Registry:
extra_dirs.append(d_path)
# 主扫描
exclude_files = {"lab_resources.py"} if not BasicConfig.extra_resource else None
scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor,
exclude_files=exclude_files, cache=ast_cache,
)
if exclude_files:
logger.info(
f"[UniLab Registry] 排除扫描文件: {exclude_files} "
f"(可通过 --extra_resource 启用加载)"
if external_only:
core_files = [
pkg_root / "ros" / "nodes" / "presets" / "host_node.py",
pkg_root / "resources" / "container.py",
]
scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor,
cache=ast_cache, include_files=core_files,
)
logger.info(
f"[UniLab Registry] external_only 模式: 仅扫描核心文件 "
f"({', '.join(f.name for f in core_files)})"
)
else:
exclude_files = {"lab_resources.py"} if not BasicConfig.extra_resource else None
scan_result = scan_directory(
scan_root, python_path=python_path, executor=self._startup_executor,
exclude_files=exclude_files, cache=ast_cache,
)
if exclude_files:
logger.info(
f"[UniLab Registry] 排除扫描文件: {exclude_files} "
f"(可通过 --extra_resource 启用加载)"
)
# 合并缓存统计
total_stats = scan_result.pop("_cache_stats", {"hits": 0, "misses": 0, "total": 0})
@@ -1534,9 +1551,9 @@ class Registry:
del resource_info["config_info"]
if "file_path" in resource_info:
del resource_info["file_path"]
complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items())))
resource_info["registry_type"] = "resource"
resource_info["file_path"] = str(file.absolute()).replace("\\", "/")
complete_data[resource_id] = copy.deepcopy(dict(sorted(resource_info.items())))
for rid in skip_ids:
data.pop(rid, None)
@@ -2175,7 +2192,7 @@ class Registry:
lab_registry = Registry()
def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False):
def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False, check_mode=False, complete_registry=False, external_only=False):
"""
构建或获取Registry单例实例
"""
@@ -2189,7 +2206,7 @@ def build_registry(registry_paths=None, devices_dirs=None, upload_registry=False
if path not in current_paths:
lab_registry.registry_paths.append(path)
lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry)
lab_registry.setup(devices_dirs=devices_dirs, upload_registry=upload_registry, complete_registry=complete_registry, external_only=external_only)
# 将 AST 扫描的字符串类型替换为实际 ROS2 消息类(仅查找 ROS2 类型,不 import 设备模块)
lab_registry.resolve_all_types()

View File

@@ -6,20 +6,180 @@
import argparse
import importlib
import locale
import shutil
import subprocess
import sys
from pathlib import Path
from typing import List, Optional
from unilabos.utils.banner_print import print_status
# ---------------------------------------------------------------------------
# 底层安装工具
# ---------------------------------------------------------------------------
def _is_chinese_locale() -> bool:
try:
lang = locale.getdefaultlocale()[0]
return bool(lang and ("zh" in lang.lower() or "chinese" in lang.lower()))
except Exception:
return False
_USE_UV: Optional[bool] = None
def _has_uv() -> bool:
global _USE_UV
if _USE_UV is None:
_USE_UV = shutil.which("uv") is not None
return _USE_UV
def _install_packages(
packages: List[str],
upgrade: bool = False,
label: str = "",
) -> bool:
"""
安装/升级一组包。优先 uv pip install回退 sys pip。
逐个安装,任意一个失败不影响后续包。
Returns:
True if all succeeded, False otherwise.
"""
if not packages:
return True
is_chinese = _is_chinese_locale()
use_uv = _has_uv()
failed: List[str] = []
for pkg in packages:
action_word = "升级" if upgrade else "安装"
if label:
print_status(f"[{label}] 正在{action_word} {pkg}...", "info")
else:
print_status(f"正在{action_word} {pkg}...", "info")
if use_uv:
cmd = ["uv", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["--index-url", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
else:
cmd = [sys.executable, "-m", "pip", "install"]
if upgrade:
cmd.append("--upgrade")
cmd.append(pkg)
if is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
installer = "uv" if use_uv else "pip"
print_status(f"{pkg} {action_word}成功 (via {installer})", "success")
else:
stderr_short = result.stderr.strip().split("\n")[-1] if result.stderr else "unknown error"
print_status(f"× {pkg} {action_word}失败: {stderr_short}", "error")
failed.append(pkg)
except subprocess.TimeoutExpired:
print_status(f"× {pkg} {action_word}超时 (300s)", "error")
failed.append(pkg)
except Exception as e:
print_status(f"× {pkg} {action_word}异常: {e}", "error")
failed.append(pkg)
if failed:
print_status(f"{len(failed)} 个包操作失败: {', '.join(failed)}", "error")
return False
return True
# ---------------------------------------------------------------------------
# requirements.txt 安装(可多次调用)
# ---------------------------------------------------------------------------
def install_requirements_txt(req_path: str | Path, label: str = "") -> bool:
"""
读取一个 requirements.txt 文件,检查缺失的包并安装。
Args:
req_path: requirements.txt 文件路径
label: 日志前缀标签(如 "device_package_sim"
Returns:
True if all ok, False if any install failed.
"""
req_path = Path(req_path)
if not req_path.exists():
return True
tag = label or req_path.parent.name
print_status(f"[{tag}] 检查依赖: {req_path}", "info")
reqs: List[str] = []
with open(req_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and not line.startswith("-"):
reqs.append(line)
if not reqs:
return True
missing: List[str] = []
for req in reqs:
pkg_import = req.split(">=")[0].split("==")[0].split("<")[0].split("[")[0].split(">")[0].strip()
pkg_import = pkg_import.replace("-", "_")
try:
importlib.import_module(pkg_import)
except ImportError:
missing.append(req)
if not missing:
print_status(f"[{tag}] ✓ 依赖检查通过 ({len(reqs)} 个包)", "success")
return True
print_status(f"[{tag}] 缺失 {len(missing)} 个依赖: {', '.join(missing)}", "warning")
return _install_packages(missing, label=tag)
def check_device_package_requirements(devices_dirs: list[str]) -> bool:
"""
检查 --devices 指定的所有外部设备包目录中的 requirements.txt。
对每个目录查找 requirements.txt先在目录内找再在父目录找
"""
if not devices_dirs:
return True
all_ok = True
for d in devices_dirs:
d_path = Path(d).resolve()
req_file = d_path / "requirements.txt"
if not req_file.exists():
req_file = d_path.parent / "requirements.txt"
if not req_file.exists():
continue
if not install_requirements_txt(req_file, label=d_path.name):
all_ok = False
return all_ok
# ---------------------------------------------------------------------------
# UniLabOS 核心环境检查
# ---------------------------------------------------------------------------
class EnvironmentChecker:
"""环境检查器"""
def __init__(self):
# 定义必需的包及其安装名称的映射
self.required_packages = {
# 包导入名 : pip安装名
# "pymodbus.framer.FramerType": "pymodbus==3.9.2",
"websockets": "websockets",
"msgcenterpy": "msgcenterpy",
"orjson": "orjson",
@@ -28,33 +188,17 @@ class EnvironmentChecker:
"crcmod": "crcmod-plus",
}
# 特殊安装包(需要特殊处理的包)
self.special_packages = {"pylabrobot": "git+https://github.com/Xuwznln/pylabrobot.git"}
# 包版本要求(包名: 最低版本)
self.version_requirements = {
"msgcenterpy": "0.1.8", # msgcenterpy 最低版本要求
"msgcenterpy": "0.1.8",
}
self.missing_packages = []
self.failed_installs = []
self.packages_need_upgrade = []
# 检测系统语言
self.is_chinese = self._is_chinese_locale()
def _is_chinese_locale(self) -> bool:
"""检测系统是否为中文环境"""
try:
lang = locale.getdefaultlocale()[0]
if lang and ("zh" in lang.lower() or "chinese" in lang.lower()):
return True
except Exception:
pass
return False
self.missing_packages: List[tuple] = []
self.failed_installs: List[tuple] = []
self.packages_need_upgrade: List[tuple] = []
def check_package_installed(self, package_name: str) -> bool:
"""检查包是否已安装"""
try:
importlib.import_module(package_name)
return True
@@ -62,7 +206,6 @@ class EnvironmentChecker:
return False
def get_package_version(self, package_name: str) -> str | None:
"""获取已安装包的版本"""
try:
module = importlib.import_module(package_name)
return getattr(module, "__version__", None)
@@ -70,88 +213,32 @@ class EnvironmentChecker:
return None
def compare_version(self, current: str, required: str) -> bool:
"""
比较版本号
Returns:
True: current >= required
False: current < required
"""
try:
current_parts = [int(x) for x in current.split(".")]
required_parts = [int(x) for x in required.split(".")]
# 补齐长度
max_len = max(len(current_parts), len(required_parts))
current_parts.extend([0] * (max_len - len(current_parts)))
required_parts.extend([0] * (max_len - len(required_parts)))
return current_parts >= required_parts
except Exception:
return True # 如果无法比较,假设版本满足要求
def install_package(self, package_name: str, pip_name: str, upgrade: bool = False) -> bool:
"""安装包"""
try:
action = "升级" if upgrade else "安装"
print_status(f"正在{action} {package_name} ({pip_name})...", "info")
# 构建安装命令
cmd = [sys.executable, "-m", "pip", "install"]
# 如果是升级操作,添加 --upgrade 参数
if upgrade:
cmd.append("--upgrade")
cmd.append(pip_name)
# 如果是中文环境,使用清华镜像源
if self.is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
# 执行安装
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) # 5分钟超时
if result.returncode == 0:
print_status(f"{package_name} {action}成功", "success")
return True
else:
print_status(f"× {package_name} {action}失败: {result.stderr}", "error")
return False
except subprocess.TimeoutExpired:
print_status(f"× {package_name} {action}超时", "error")
return False
except Exception as e:
print_status(f"× {package_name} {action}异常: {str(e)}", "error")
return False
def upgrade_package(self, package_name: str, pip_name: str) -> bool:
"""升级包"""
return self.install_package(package_name, pip_name, upgrade=True)
return True
def check_all_packages(self) -> bool:
"""检查所有必需的包"""
print_status("开始检查环境依赖...", "info")
# 检查常规包
for import_name, pip_name in self.required_packages.items():
if not self.check_package_installed(import_name):
self.missing_packages.append((import_name, pip_name))
else:
# 检查版本要求
if import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
elif import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
if current_version and not self.compare_version(current_version, required_version):
print_status(
f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})",
"warning",
)
self.packages_need_upgrade.append((import_name, pip_name))
if current_version:
if not self.compare_version(current_version, required_version):
print_status(
f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})",
"warning",
)
self.packages_need_upgrade.append((import_name, pip_name))
# 检查特殊包
for package_name, install_url in self.special_packages.items():
if not self.check_package_installed(package_name):
self.missing_packages.append((package_name, install_url))
@@ -170,7 +257,6 @@ class EnvironmentChecker:
return False
def install_missing_packages(self, auto_install: bool = True) -> bool:
"""安装缺失的包"""
if not self.missing_packages and not self.packages_need_upgrade:
return True
@@ -178,62 +264,36 @@ class EnvironmentChecker:
if self.missing_packages:
print_status("缺失以下包:", "warning")
for import_name, pip_name in self.missing_packages:
print_status(f" - {import_name} (pip install {pip_name})", "warning")
print_status(f" - {import_name} ({pip_name})", "warning")
if self.packages_need_upgrade:
print_status("需要升级以下包:", "warning")
for import_name, pip_name in self.packages_need_upgrade:
print_status(f" - {import_name} (pip install --upgrade {pip_name})", "warning")
print_status(f" - {import_name} ({pip_name})", "warning")
return False
# 安装缺失的包
if self.missing_packages:
print_status(f"开始自动安装 {len(self.missing_packages)} 个缺失的包...", "info")
pkgs = [pip_name for _, pip_name in self.missing_packages]
if not _install_packages(pkgs, label="unilabos"):
self.failed_installs.extend(self.missing_packages)
success_count = 0
for import_name, pip_name in self.missing_packages:
if self.install_package(import_name, pip_name):
success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功安装 {success_count}/{len(self.missing_packages)} 个包", "success")
# 升级需要更新的包
if self.packages_need_upgrade:
print_status(f"开始自动升级 {len(self.packages_need_upgrade)} 个包...", "info")
pkgs = [pip_name for _, pip_name in self.packages_need_upgrade]
if not _install_packages(pkgs, upgrade=True, label="unilabos"):
self.failed_installs.extend(self.packages_need_upgrade)
upgrade_success_count = 0
for import_name, pip_name in self.packages_need_upgrade:
if self.upgrade_package(import_name, pip_name):
upgrade_success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功升级 {upgrade_success_count}/{len(self.packages_need_upgrade)} 个包", "success")
if self.failed_installs:
print_status(f"{len(self.failed_installs)} 个包操作失败:", "error")
for import_name, pip_name in self.failed_installs:
print_status(f" - {import_name} ({pip_name})", "error")
return False
return True
return not self.failed_installs
def verify_installation(self) -> bool:
"""验证安装结果"""
if not self.missing_packages and not self.packages_need_upgrade:
return True
print_status("验证安装结果...", "info")
failed_verification = []
# 验证新安装的包
for import_name, pip_name in self.missing_packages:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
# 验证升级的包
for import_name, pip_name in self.packages_need_upgrade:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
@@ -270,17 +330,14 @@ def check_environment(auto_install: bool = True, show_details: bool = True) -> b
"""
checker = EnvironmentChecker()
# 检查包
if checker.check_all_packages():
return True
# 安装缺失的包
if not checker.install_missing_packages(auto_install):
if show_details:
print_status("请手动安装缺失的包后重新启动程序", "error")
return False
# 验证安装
if not checker.verify_installation():
if show_details:
print_status("安装验证失败,请检查网络连接或手动安装", "error")
@@ -290,14 +347,12 @@ def check_environment(auto_install: bool = True, show_details: bool = True) -> b
if __name__ == "__main__":
# 命令行参数解析
parser = argparse.ArgumentParser(description="UniLabOS 环境依赖检查工具")
parser.add_argument("--no-auto-install", action="store_true", help="仅检查环境,不自动安装缺失的包")
parser.add_argument("--silent", action="store_true", help="静默模式,不显示详细信息")
args = parser.parse_args()
# 执行环境检查
auto_install = not args.no_auto_install
show_details = not args.silent