refactor: Move config from module to instance initialization

This commit is contained in:
ZiWei
2026-01-14 18:43:59 +08:00
parent 2b04457037
commit a90613f3de
7 changed files with 560 additions and 210 deletions

View File

@@ -9,12 +9,8 @@ from datetime import datetime
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import MachineState
from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String
from unilabos.devices.workstation.bioyond_studio.config import (
WORKFLOW_STEP_IDS,
WORKFLOW_TO_SECTION_MAP,
ACTION_NAMES
)
from unilabos.devices.workstation.bioyond_studio.config import API_CONFIG
class BioyondReactor:
@@ -63,9 +59,72 @@ class BioyondReactionStation(BioyondWorkstation):
protocol_type: 协议类型(由ROS系统传递,此处忽略)
**kwargs: 其他可能的参数
"""
if config is None:
config = {}
# 将 kwargs 合并到 config 中 (处理扁平化配置如 api_key)
config.update(kwargs)
if deck is None and config:
deck = config.get('deck')
# 🔧 修复: 确保 Deck 上的 warehouses 具有正确的 UUID (必须在 super().__init__ 之前执行,因为父类会触发同步)
# 从配置中读取 warehouse_mapping并应用到实际的 deck 资源上
if config and "warehouse_mapping" in config and deck:
warehouse_mapping = config["warehouse_mapping"]
print(f"正在根据配置更新 Deck warehouse UUIDs... (共有 {len(warehouse_mapping)} 个配置)")
user_deck = deck
# 初始化 warehouses 字典
if not hasattr(user_deck, "warehouses") or user_deck.warehouses is None:
user_deck.warehouses = {}
# 1. 尝试从 children 中查找匹配的资源
for child in user_deck.children:
# 简单判断: 如果名字在 mapping 中,就认为是 warehouse
if child.name in warehouse_mapping:
user_deck.warehouses[child.name] = child
print(f" - 从子资源中找到 warehouse: {child.name}")
# 2. 如果还是没找到,且 Deck 类有 setup 方法,尝试调用 setup (针对 Deck 对象正确但未初始化的情况)
if not user_deck.warehouses and hasattr(user_deck, "setup"):
print(" - 尝试调用 deck.setup() 初始化仓库...")
try:
user_deck.setup()
# setup 后重新检查
if hasattr(user_deck, "warehouses") and user_deck.warehouses:
print(f" - setup() 成功,找到 {len(user_deck.warehouses)} 个仓库")
except Exception as e:
print(f" - 调用 setup() 失败: {e}")
# 3. 如果仍然为空,可能需要手动创建 (仅针对特定已知的 Deck 类型进行补救,这里暂时只打印警告)
if not user_deck.warehouses:
print(" - ⚠️ 仍然无法找到任何 warehouse 资源!")
for wh_name, wh_config in warehouse_mapping.items():
target_uuid = wh_config.get("uuid")
# 尝试在 deck.warehouses 中查找
wh_resource = None
if hasattr(user_deck, "warehouses") and wh_name in user_deck.warehouses:
wh_resource = user_deck.warehouses[wh_name]
# 如果没找到,尝试在所有子资源中查找
if not wh_resource:
wh_resource = user_deck.get_resource(wh_name)
if wh_resource:
if target_uuid:
current_uuid = getattr(wh_resource, "uuid", None)
print(f"✅ 更新仓库 '{wh_name}' UUID: {current_uuid} -> {target_uuid}")
wh_resource.uuid = target_uuid
else:
print(f"⚠️ 仓库 '{wh_name}' 在配置中没有 UUID")
else:
print(f"❌ 在 Deck 中未找到配置的仓库: '{wh_name}'")
super().__init__(bioyond_config=config, deck=deck)
print(f"BioyondReactionStation初始化 - config包含workflow_mappings: {'workflow_mappings' in (config or {})}")
if config and 'workflow_mappings' in config:
print(f"workflow_mappings内容: {config['workflow_mappings']}")
@@ -96,16 +155,19 @@ class BioyondReactionStation(BioyondWorkstation):
# 动态获取工作流步骤ID
self.workflow_step_ids = self._fetch_workflow_step_ids()
# 从配置中获取 action_names
self.action_names = self.bioyond_config.get("action_names", {})
def _fetch_workflow_step_ids(self) -> Dict[str, Dict[str, str]]:
"""动态获取工作流步骤ID"""
print("正在从LIMS获取最新工作流步骤ID...")
api_host = API_CONFIG.get("api_host")
api_key = API_CONFIG.get("api_key")
api_host = self.bioyond_config.get("api_host")
api_key = self.bioyond_config.get("api_key")
if not api_host or not api_key:
print("API配置缺失使用默认配置")
return WORKFLOW_STEP_IDS
print("API配置缺失无法动态获取工作流步骤ID")
return {}
def call_api(endpoint, data=None):
url = f"{api_host}{endpoint}"
@@ -124,8 +186,8 @@ class BioyondReactionStation(BioyondWorkstation):
# 1. 获取工作流列表
resp = call_api("/api/lims/workflow/work-flow-list", {"type": 2, "includeDetail": True})
if not resp:
print("无法获取工作流列表,使用默认配置")
return WORKFLOW_STEP_IDS
print("无法获取工作流列表")
return {}
workflows = resp.get("data", [])
if isinstance(workflows, dict):
@@ -135,13 +197,16 @@ class BioyondReactionStation(BioyondWorkstation):
workflows = workflows["items"]
if not workflows:
print("工作流列表为空,使用默认配置")
return WORKFLOW_STEP_IDS
print("工作流列表为空")
return {}
new_ids = {}
#从配置中获取workflow_to_section_map
workflow_to_section_map = self.bioyond_config.get("workflow_to_section_map", {})
# 2. 遍历映射表
for internal_name, section_name in WORKFLOW_TO_SECTION_MAP.items():
for internal_name, section_name in workflow_to_section_map.items():
# 查找对应的工作流对象
wf_obj = next((w for w in workflows if w.get("name") == section_name), None)
if not wf_obj:
@@ -1511,7 +1576,7 @@ class BioyondReactionStation(BioyondWorkstation):
dict: 服务端响应,失败时返回 {code:0,message,...}
"""
request_data = {
"apiKey": API_CONFIG["api_key"],
"apiKey": self.bioyond_config["api_key"],
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": data
}
@@ -1551,7 +1616,7 @@ class BioyondReactionStation(BioyondWorkstation):
dict: 服务端响应,失败时返回 {code:0,message,...}
"""
request_data = {
"apiKey": API_CONFIG["api_key"],
"apiKey": self.bioyond_config["api_key"],
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": data
}
@@ -1746,7 +1811,7 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"🕒 工作流名称已添加时间戳: {original_name} -> {data['name']}")
request_data = {
"apiKey": API_CONFIG["api_key"],
"apiKey": self.bioyond_config["api_key"],
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": data
}
@@ -1863,7 +1928,7 @@ class BioyondReactionStation(BioyondWorkstation):
tcm_bs_list = []
if self.pending_time_constraints:
print(f"\n🔗 处理时间约束 ({len(self.pending_time_constraints)} 个)...")
from unilabos.devices.workstation.bioyond_studio.config import WORKFLOW_STEP_IDS
# 建立索引到名称的映射
workflow_names_by_index = [w["name"] for w in workflows_result]