Compare commits

..

6 Commits

Author SHA1 Message Date
q434343
41be9e4e19 merge lab_resource,并修改transfer liquid 2026-02-28 16:05:29 +08:00
q434343
f38f3dfc89 Merge branch 'dev' into prcix9320 2026-02-11 11:44:15 +08:00
Xuwznln
699a0b3ce7 fix test resource schema 2026-02-10 23:08:29 +08:00
Xuwznln
cf3a20ae79 registry update & workflow update 2026-02-10 22:46:07 +08:00
Xuwznln
cdf0652020 add test mode 2026-02-10 15:18:41 +08:00
Xuwznln
60073ff139 support description & tags upload 2026-02-10 14:38:55 +08:00
21 changed files with 11413 additions and 402 deletions

View File

@@ -171,6 +171,12 @@ def parse_args():
action="store_true",
help="Disable sending update feedback to server",
)
parser.add_argument(
"--test_mode",
action="store_true",
default=False,
help="Test mode: all actions simulate execution and return mock results without running real hardware",
)
# workflow upload subcommand
workflow_parser = subparsers.add_parser(
"workflow_upload",
@@ -204,6 +210,12 @@ def parse_args():
default=False,
help="Whether to publish the workflow (default: False)",
)
workflow_parser.add_argument(
"--description",
type=str,
default="",
help="Workflow description, used when publishing the workflow",
)
return parser
@@ -231,52 +243,60 @@ def main():
# 加载配置文件优先加载config然后从env读取
config_path = args_dict.get("config")
if check_mode:
args_dict["working_dir"] = os.path.abspath(os.getcwd())
# 当 skip_env_check 时,默认使用当前目录作为 working_dir
if skip_env_check and not args_dict.get("working_dir") and not config_path:
# === 解析 working_dir ===
# 规则1: working_dir 传入 → 检测 unilabos_data 子目录,已是则不修改
# 规则2: 仅 config_path 传入 → 用其父目录作为 working_dir
# 规则4: 两者都传入 → 各用各的,但 working_dir 仍做 unilabos_data 子目录检测
raw_working_dir = args_dict.get("working_dir")
if raw_working_dir:
working_dir = os.path.abspath(raw_working_dir)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(os.path.abspath(config_path))
else:
working_dir = os.path.abspath(os.getcwd())
print_status(f"跳过环境检查模式:使用当前目录作为工作目录 {working_dir}", "info")
# 检查当前目录是否有 local_config.py
local_config_in_cwd = os.path.join(working_dir, "local_config.py")
if os.path.exists(local_config_in_cwd):
config_path = local_config_in_cwd
# unilabos_data 子目录自动检测
if os.path.basename(working_dir) != "unilabos_data":
unilabos_data_sub = os.path.join(working_dir, "unilabos_data")
if os.path.isdir(unilabos_data_sub):
working_dir = unilabos_data_sub
elif not raw_working_dir and not (config_path and os.path.exists(config_path)):
# 未显式指定路径,默认使用 cwd/unilabos_data
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
# === 解析 config_path ===
if config_path and not os.path.exists(config_path):
# config_path 传入但不存在,尝试在 working_dir 中查找
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"在工作目录中发现配置文件: {config_path}", "info")
else:
print_status(
f"配置文件 {config_path} 不存在,工作目录 {working_dir} 中也未找到 local_config.py"
f"请通过 --config 传入 local_config.py 文件路径",
"error",
)
os._exit(1)
elif not config_path:
# 规则3: 未传入 config_path尝试 working_dir/local_config.py
candidate = os.path.join(working_dir, "local_config.py")
if os.path.exists(candidate):
config_path = candidate
print_status(f"发现本地配置文件: {config_path}", "info")
else:
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
elif os.getcwd().endswith("unilabos_data"):
working_dir = os.path.abspath(os.getcwd())
else:
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
if args_dict.get("working_dir"):
working_dir = args_dict.get("working_dir", "")
if config_path and not os.path.exists(config_path):
config_path = os.path.join(working_dir, "local_config.py")
if not os.path.exists(config_path):
print_status(
f"当前工作目录 {working_dir} 未找到local_config.py请通过 --config 传入 local_config.py 文件路径",
"error",
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info")
if check_mode or input() != "n":
os.makedirs(working_dir, exist_ok=True)
config_path = os.path.join(working_dir, "local_config.py")
shutil.copy(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"),
config_path,
)
print_status(f"已创建 local_config.py 路径: {config_path}", "info")
else:
os._exit(1)
elif config_path and os.path.exists(config_path):
working_dir = os.path.dirname(config_path)
elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")):
config_path = os.path.join(working_dir, "local_config.py")
elif not skip_env_check and not config_path and (
not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py"))
):
print_status(f"未指定config路径可通过 --config 传入 local_config.py 文件路径", "info")
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info")
if input() != "n":
os.makedirs(working_dir, exist_ok=True)
config_path = os.path.join(working_dir, "local_config.py")
shutil.copy(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "example_config.py"), config_path
)
print_status(f"已创建 local_config.py 路径: {config_path}", "info")
else:
os._exit(1)
# 加载配置文件 (check_mode 跳过)
print_status(f"当前工作目录为 {working_dir}", "info")
@@ -334,6 +354,9 @@ def main():
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
BasicConfig.upload_registry = args_dict.get("upload_registry", False)
BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False)
BasicConfig.test_mode = args_dict.get("test_mode", False)
if BasicConfig.test_mode:
print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning")
BasicConfig.communication_protocol = "websocket"
machine_name = os.popen("hostname").read().strip()
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])

View File

@@ -38,9 +38,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(devices_to_register.values())})
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}ms")
logger.info(f"[UniLab Register] 成功注册 {len(devices_to_register)} 个设备 {cost_time}s")
else:
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}ms")
logger.error(f"[UniLab Register] 设备注册失败: {response.status_code}, {response.text} {cost_time}s")
except Exception as e:
logger.error(f"[UniLab Register] 设备注册异常: {e}")
@@ -51,9 +51,9 @@ def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[
response = http_client.resource_registry({"resources": list(resources_to_register.values())})
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}ms")
logger.info(f"[UniLab Register] 成功注册 {len(resources_to_register)} 个资源 {cost_time}s")
else:
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}ms")
logger.error(f"[UniLab Register] 资源注册失败: {response.status_code}, {response.text} {cost_time}s")
except Exception as e:
logger.error(f"[UniLab Register] 资源注册异常: {e}")

View File

@@ -343,9 +343,10 @@ class HTTPClient:
edges: List[Dict[str, Any]],
tags: Optional[List[str]] = None,
published: bool = False,
description: str = "",
) -> Dict[str, Any]:
"""
导入工作流到服务器
导入工作流到服务器,如果 published 为 True则额外发起发布请求
Args:
name: 工作流名称(顶层)
@@ -355,6 +356,7 @@ class HTTPClient:
edges: 工作流边列表
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns:
Dict: API响应数据包含 code 和 data (uuid, name)
@@ -367,7 +369,6 @@ class HTTPClient:
"nodes": nodes,
"edges": edges,
"tags": tags if tags is not None else [],
"published": published,
},
}
# 保存请求到文件
@@ -388,11 +389,51 @@ class HTTPClient:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"导入工作流失败: {response.text}")
return res
# 导入成功后,如果需要发布则额外发起发布请求
if published:
imported_uuid = res.get("data", {}).get("uuid", workflow_uuid)
publish_res = self.workflow_publish(imported_uuid, description)
res["publish_result"] = publish_res
return res
else:
logger.error(f"导入工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
def workflow_publish(self, workflow_uuid: str, description: str = "") -> Dict[str, Any]:
"""
发布工作流
Args:
workflow_uuid: 工作流UUID
description: 工作流描述
Returns:
Dict: API响应数据
"""
payload = {
"uuid": workflow_uuid,
"description": description,
"published": True,
}
logger.info(f"正在发布工作流: {workflow_uuid}")
response = requests.patch(
f"{self.remote_addr}/lab/workflow/owner",
json=payload,
headers={"Authorization": f"Lab {self.auth}"},
timeout=60,
)
if response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"发布工作流失败: {response.text}")
else:
logger.info(f"工作流发布成功: {workflow_uuid}")
return res
else:
logger.error(f"发布工作流失败: {response.status_code}, {response.text}")
return {"code": response.status_code, "message": response.text}
# 创建默认客户端实例
http_client = HTTPClient()

View File

@@ -1177,6 +1177,11 @@ class QueueProcessor:
logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs")
for job_info in queued_jobs:
# 快照可能已过期:在遍历过程中 end_job() 可能已将此 job 移至 READY
# 此时不应再发送 busy/need_more否则会覆盖已发出的 free=True 通知
if job_info.status != JobStatus.QUEUE:
continue
message = {
"action": "report_action_state",
"data": {

View File

@@ -22,6 +22,8 @@ class BasicConfig:
startup_json_path = None # 填写绝对路径
disable_browser = False # 禁止浏览器自动打开
port = 8002 # 本地HTTP服务
check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性
test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"

View File

@@ -21,7 +21,7 @@ from pylabrobot.resources import (
ResourceHolder,
Lid,
Trash,
Tip,
Tip, TubeRack,
)
from typing_extensions import TypedDict
@@ -208,7 +208,8 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
if spread == "":
spread = "wide"
if self._simulator:
return await self._simulate_handler.aspirate(
resources,
@@ -221,17 +222,33 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread,
**backend_kwargs,
)
await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
try:
await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread="custom",
**backend_kwargs,
)
else:
raise
res_samples = []
res_volumes = []
@@ -243,7 +260,8 @@ class LiquidHandlerMiddleware(LiquidHandler):
channels_to_use = use_channels
for resource, volume, channel in zip(resources, vols, channels_to_use):
res_samples.append({"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)})
sample_uuid_value = getattr(resource, "unilabos_extra", {}).get(EXTRA_SAMPLE_UUID, None)
res_samples.append({"name": resource.name, "sample_uuid": sample_uuid_value})
res_volumes.append(volume)
self.pending_liquids_dict[channel] = {
EXTRA_SAMPLE_UUID: sample_uuid_value,
@@ -263,6 +281,8 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
) -> SimpleReturn:
if spread == "":
spread = "wide"
if self._simulator:
return await self._simulate_handler.dispense(
resources,
@@ -275,16 +295,33 @@ class LiquidHandlerMiddleware(LiquidHandler):
spread,
**backend_kwargs,
)
await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
**backend_kwargs,
)
try:
await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
"custom",
**backend_kwargs,
)
else:
raise
res_samples = []
res_volumes = []
for resource, volume, channel in zip(resources, vols, use_channels):
@@ -702,10 +739,13 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。
"""
assert issubclass(plate.__class__, Plate), "plate must be a Plate"
plate: Plate = cast(Plate, cast(Resource, plate))
assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}"
plate: Union[Plate, TubeRack]
# 根据 well_names 获取对应的 Well 对象
wells = [plate.get_well(name) for name in well_names]
if issubclass(plate.__class__, Plate):
wells = [plate.get_well(name) for name in well_names]
elif issubclass(plate.__class__, TubeRack):
wells = [plate.get_tube(name) for name in well_names]
res_volumes = []
# 如果 liquid_names 和 volumes 都为空,直接返回
@@ -851,7 +891,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for _ in range(len(sources)):
tip = []
for __ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
await self.aspirate(
resources=[sources[_]],
@@ -891,7 +931,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for i in range(0, len(sources), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
current_targets = waste_liquid[i : i + 8]
current_reagent_sources = sources[i : i + 8]
@@ -985,7 +1025,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for _ in range(len(targets)):
tip = []
for x in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
await self.aspirate(
@@ -1037,7 +1077,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for i in range(0, len(targets), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
current_targets = targets[i : i + 8]
current_reagent_sources = reagent_sources[i : i + 8]
@@ -1162,11 +1202,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
Number of mix cycles. If *None* (default) no mixing occurs regardless of
mix_stage.
"""
num_sources = len(sources)
num_targets = len(targets)
len_asp_vols = len(asp_vols)
len_dis_vols = len(dis_vols)
# 确保 use_channels 有默认值
if use_channels is None:
# 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7
use_channels = list(range(self.channel_num)) if self.channel_num > 0 else [0]
use_channels = list(range(self.channel_num)) if self.channel_num == 8 else [0]
elif len(use_channels) == 8:
if self.channel_num != 8:
raise ValueError(f"if channel_num is 8, use_channels length must be 8, but got {len(use_channels)}")
if num_sources%8 != 0 or num_targets%8 != 0 or len_asp_vols%8 != 0 or len_dis_vols%8 != 0:
raise ValueError(f"if channel_num is 8, sources, targets, asp_vols, and dis_vols length must be divisible by 8, but got {num_sources}, {num_targets}, {len_asp_vols}, and {len_dis_vols}")
if is_96_well:
pass # This mode is not verified.
@@ -1200,87 +1248,228 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
# 识别传输模式mix_times 为 None 也应该能正常移液,只是不做 mix
num_sources = len(sources)
num_targets = len(targets)
len_asp_vols = len(asp_vols)
len_dis_vols = len(dis_vols)
if num_sources == 1 and num_targets > 1:
# 模式1: 一对多 (1 source -> N targets)
await self._transfer_one_to_many(
sources[0],
targets,
tip_racks,
use_channels,
asp_vols,
dis_vols,
asp_flow_rates,
dis_flow_rates,
offsets,
touch_tip,
liquid_height,
blow_out_air_volume,
spread,
mix_stage,
mix_times,
mix_vol,
mix_rate,
mix_liquid_height,
delays,
)
elif num_sources > 1 and num_targets == 1:
# 模式2: 多对一 (N sources -> 1 target)
await self._transfer_many_to_one(
sources,
targets[0],
tip_racks,
use_channels,
asp_vols,
dis_vols,
asp_flow_rates,
dis_flow_rates,
offsets,
touch_tip,
liquid_height,
blow_out_air_volume,
spread,
mix_stage,
mix_times,
mix_vol,
mix_rate,
mix_liquid_height,
delays,
)
elif num_sources == num_targets:
# 模式3: 一对一 (N sources -> N targets)
await self._transfer_one_to_one(
sources,
targets,
tip_racks,
use_channels,
asp_vols,
dis_vols,
asp_flow_rates,
dis_flow_rates,
offsets,
touch_tip,
liquid_height,
blow_out_air_volume,
spread,
mix_stage,
mix_times,
mix_vol,
mix_rate,
mix_liquid_height,
delays,
)
else:
raise ValueError(
f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
"Supported modes: 1->N, N->1, or N->N."
)
# if num_targets != 1 and num_sources != 1:
# if len_asp_vols != num_sources and len_asp_vols != num_targets:
# raise ValueError(f"asp_vols length must be equal to sources or targets length, but got {len_asp_vols} and {num_sources} and {num_targets}")
# if len_dis_vols != num_sources and len_dis_vols != num_targets:
# raise ValueError(f"dis_vols length must be equal to sources or targets length, but got {len_dis_vols} and {num_sources} and {num_targets}")
if len(use_channels) != 8:
max_len = max(num_sources, num_targets)
for i in range(max_len):
# 辅助函数安全地从列表中获取元素如果列表为空则返回None
def safe_get(lst, idx, default=None):
return [lst[idx]] if lst else default
# 动态构建参数字典,只传递实际提供的参数
kwargs = {
'sources': [sources[i%num_sources]],
'targets': [targets[i%num_targets]],
'tip_racks': tip_racks,
'use_channels': use_channels,
'asp_vols': [asp_vols[i%len_asp_vols]],
'dis_vols': [dis_vols[i%len_dis_vols]],
}
# 条件性添加可选参数
if asp_flow_rates is not None:
kwargs['asp_flow_rates'] = [asp_flow_rates[i%len_asp_vols]]
if dis_flow_rates is not None:
kwargs['dis_flow_rates'] = [dis_flow_rates[i%len_dis_vols]]
if offsets is not None:
kwargs['offsets'] = safe_get(offsets, i)
if touch_tip is not None:
kwargs['touch_tip'] = touch_tip if touch_tip else False
if liquid_height is not None:
kwargs['liquid_height'] = safe_get(liquid_height, i)
if blow_out_air_volume is not None:
kwargs['blow_out_air_volume'] = safe_get(blow_out_air_volume, i)
if spread is not None:
kwargs['spread'] = spread
if mix_stage is not None:
kwargs['mix_stage'] = safe_get(mix_stage, i)
if mix_times is not None:
kwargs['mix_times'] = safe_get(mix_times, i)
if mix_vol is not None:
kwargs['mix_vol'] = safe_get(mix_vol, i)
if mix_rate is not None:
kwargs['mix_rate'] = safe_get(mix_rate, i)
if mix_liquid_height is not None:
kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i)
if delays is not None:
kwargs['delays'] = safe_get(delays, i)
await self._transfer_base_method(**kwargs)
# if num_sources == 1 and num_targets > 1:
# # 模式1: 一对多 (1 source -> N targets)
# await self._transfer_one_to_many(
# sources,
# targets,
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# elif num_sources > 1 and num_targets == 1:
# # 模式2: 多对一 (N sources -> 1 target)
# await self._transfer_many_to_one(
# sources,
# targets[0],
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# elif num_sources == num_targets:
# # 模式3: 一对一 (N sources -> N targets)
# await self._transfer_one_to_one(
# sources,
# targets,
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# else:
# raise ValueError(
# f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
# "Supported modes: 1->N, N->1, or N->N."
# )
return TransferLiquidReturn(
sources=ResourceTreeSet.from_plr_resources(list(sources), known_newly_created=False).dump(), # type: ignore
targets=ResourceTreeSet.from_plr_resources(list(targets), known_newly_created=False).dump(), # type: ignore
)
async def _transfer_base_method(
self,
sources: Sequence[Container],
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
**kwargs
):
# 从kwargs中提取参数提供默认值
asp_flow_rates = kwargs.get('asp_flow_rates')
dis_flow_rates = kwargs.get('dis_flow_rates')
offsets = kwargs.get('offsets')
touch_tip = kwargs.get('touch_tip', False)
liquid_height = kwargs.get('liquid_height')
blow_out_air_volume = kwargs.get('blow_out_air_volume')
spread = kwargs.get('spread', 'wide')
mix_stage = kwargs.get('mix_stage')
mix_times = kwargs.get('mix_times')
mix_vol = kwargs.get('mix_vol')
mix_rate = kwargs.get('mix_rate')
mix_liquid_height = kwargs.get('mix_liquid_height')
delays = kwargs.get('delays')
tip = []
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[0]],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
await self.aspirate(
resources=[sources[0]],
vols=[asp_vols[0]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
),
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=[targets[0]],
vols=[dis_vols[0]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[0]] if dis_flow_rates and len(dis_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
),
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[0]],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[0])
await self.touch_tip(targets[0])
await self.discard_tips(use_channels=use_channels)
async def _transfer_one_to_one(
self,
sources: Sequence[Container],
@@ -1322,7 +1511,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for _ in range(len(targets)):
tip = []
for ___ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
@@ -1386,7 +1575,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for i in range(0, len(targets), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
current_targets = targets[i : i + 8]
current_reagent_sources = sources[i : i + 8]
@@ -1491,7 +1680,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
# 单通道模式:一次吸液,多次分液
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
@@ -1563,7 +1752,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for i in range(0, len(targets), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
current_targets = targets[i : i + 8]
@@ -1702,7 +1891,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
await self.mix(
@@ -1721,7 +1910,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for idx, source in enumerate(sources):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
await self.aspirate(
@@ -1804,7 +1993,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
await self.mix(
@@ -1822,7 +2011,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for i in range(0, len(sources), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
tip.extend(self._get_next_tip())
await self.pick_up_tips(tip)
current_sources = sources[i : i + 8]
@@ -2022,7 +2211,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for rack in tip_racks:
for tip in rack:
yield tip
raise RuntimeError("Out of tips!")
# raise RuntimeError("Out of tips!")
def _get_next_tip(self):
"""从 current_tip 迭代器获取下一个 tip耗尽时抛出明确错误而非 StopIteration"""
try:
return next(self.current_tip)
except StopIteration as e:
raise RuntimeError("Tip rack exhausted: no more tips available for transfer") from e
def set_tiprack(self, tip_racks: Sequence[TipRack]):
"""Set the tip racks for the liquid handler."""

View File

@@ -928,57 +928,35 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
none_keys: List[str] = [],
) -> TransferLiquidReturn:
if self.step_mode:
self._unilabos_backend.create_protocol(f"step_mode_protocol_{time.time()}")
await self.create_protocol(f"step_mode_protocol_{time.time()}")
res =await super().transfer_liquid(
sources,
targets,
tip_racks,
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
touch_tip=touch_tip,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
is_96_well=is_96_well,
mix_stage=mix_stage,
mix_times=mix_times,
mix_vol=mix_vol,
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
delays=delays,
none_keys=none_keys,
)
self._unilabos_backend.run_protocol()
return res
else:
return await super().transfer_liquid(
sources,
targets,
tip_racks,
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
touch_tip=touch_tip,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
is_96_well=is_96_well,
mix_stage=mix_stage,
mix_times=mix_times,
mix_vol=mix_vol,
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
delays=delays,
none_keys=none_keys,
)
res =await super().transfer_liquid(
sources,
targets,
tip_racks,
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
touch_tip=touch_tip,
liquid_height=liquid_height,
blow_out_air_volume=blow_out_air_volume,
spread=spread,
is_96_well=is_96_well,
mix_stage=mix_stage,
mix_times=mix_times,
mix_vol=mix_vol,
mix_rate=mix_rate,
mix_liquid_height=mix_liquid_height,
delays=delays,
none_keys=none_keys,
)
if self.step_mode:
await self.run_protocol()
return res
async def custom_delay(self, seconds=0, msg=None):
return await super().custom_delay(seconds, msg)
@@ -1028,18 +1006,32 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
return await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
try:
return await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
return await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread="custom",
**backend_kwargs,
)
raise
async def drop_tips(
self,
@@ -1063,17 +1055,33 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
return await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
try:
return await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
except ValueError as e:
if "Resource is too small to space channels" in str(e) and spread != "custom":
# 目标资源过小无法分布多通道时,退化为 custom所有通道对准中心
return await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
"custom",
**backend_kwargs,
)
raise
async def discard_tips(
self,

View File

@@ -96,10 +96,13 @@ serial:
type: string
port:
type: string
registry_name:
type: string
resource_tracker:
type: object
required:
- device_id
- registry_name
- port
type: object
data:

View File

@@ -67,6 +67,9 @@ camera:
period:
default: 0.1
type: number
registry_name:
default: ''
type: string
resource_tracker:
type: object
required: []

View File

@@ -4134,19 +4134,8 @@ liquid_handler:
data_source: handle
data_type: resource
handler_key: sources
label: sources
label: 待移动液体
- data_key: targets
data_source: handle
data_type: resource
handler_key: targets
label: targets
- data_key: tip_racks
data_source: handle
data_type: resource
handler_key: tip_racks
label: tip_racks
output:
- data_key: sources
data_source: handle
data_type: resource
handler_key: targets
@@ -4161,9 +4150,9 @@ liquid_handler:
data_source: executor
data_type: resource
handler_key: sources_out
label: sources
- data_key: targets
data_source: handle
label: 移液后源孔
- data_key: targets.@flatten
data_source: executor
data_type: resource
handler_key: targets_out
label: 移液后目标孔
@@ -4812,13 +4801,13 @@ liquid_handler.biomek:
targets: ''
handles:
input:
- data_key: sources
- data_key: liquid
data_source: handle
data_type: resource
handler_key: sources
label: sources
output:
- data_key: targets
- data_key: liquid
data_source: handle
data_type: resource
handler_key: targets
@@ -4971,21 +4960,21 @@ liquid_handler.biomek:
volume: 0.0
handles:
input:
- data_key: sources
- data_key: liquid
data_source: handle
data_type: resource
handler_key: sources
label: sources
- data_key: targets
data_source: handle
- data_key: liquid
data_source: executor
data_type: resource
handler_key: targets
label: targets
- data_key: tip_racks
data_source: handle
- data_key: liquid
data_source: executor
data_type: resource
handler_key: tip_racks
label: tip_racks
handler_key: tip_rack
label: tip_rack
output:
- data_key: sources
data_source: handle
@@ -5166,28 +5155,30 @@ liquid_handler.biomek:
data_source: handle
data_type: resource
handler_key: sources
label: sources
io_type: target
label: 待移动液体
- data_key: targets
data_source: handle
data_type: resource
handler_key: targets
label: targets
label: 转移目标
- data_key: tip_racks
data_source: handle
data_type: resource
handler_key: tip_racks
label: tip_racks
handler_key: tip_rack
label: 枪头盒
output:
- data_key: sources
data_source: handle
- data_key: sources.@flatten
data_source: executor
data_type: resource
handler_key: sources_out
label: sources
- data_key: targets
data_source: handle
io_type: source
label: 移液后源孔
- data_key: targets.@flatten
data_source: executor
data_type: resource
handler_key: targets_out
label: targets
label: 移液后目标孔
placeholder_keys:
sources: unilabos_resources
targets: unilabos_resources
@@ -7735,31 +7726,6 @@ liquid_handler.prcxi:
title: move_to参数
type: object
type: UniLabJsonCommandAsync
auto-plr_pos_to_prcxi:
feedback: {}
goal: {}
goal_default:
resource: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
resource:
type: object
required:
- resource
type: object
result: {}
required:
- goal
title: plr_pos_to_prcxi参数
type: object
type: UniLabJsonCommand
auto-post_init:
feedback: {}
goal: {}
@@ -8655,19 +8621,7 @@ liquid_handler.prcxi:
z: 0.0
sample_id: ''
type: ''
handles:
input:
- data_key: plate
data_source: handle
data_type: resource
handler_key: plate
label: plate
output:
- data_key: plate
data_source: handle
data_type: resource
handler_key: plate
label: plate
handles: {}
placeholder_keys:
plate: unilabos_resources
to: unilabos_resources
@@ -10276,26 +10230,26 @@ liquid_handler.prcxi:
- data_key: sources
data_source: handle
data_type: resource
handler_key: sources
label: sources
handler_key: sources_identifier
label: 待移动液体
- data_key: targets
data_source: handle
data_type: resource
handler_key: targets
label: targets
handler_key: targets_identifier
label: 转移目标
- data_key: tip_racks
data_source: handle
data_type: resource
handler_key: tip_racks
label: tip_racks
handler_key: tip_rack_identifier
label: 枪头盒
output:
- data_key: sources
data_source: handle
- data_key: sources.@flatten
data_source: executor
data_type: resource
handler_key: sources_out
label: sources
- data_key: targets
data_source: handle
label: 移液后源孔
- data_key: targets.@flatten
data_source: executor
data_type: resource
handler_key: targets_out
label: 移液后目标孔
@@ -10679,12 +10633,6 @@ liquid_handler.prcxi:
type: string
deck:
type: object
deck_y:
default: 400
type: string
deck_z:
default: 300
type: string
host:
type: string
is_9320:
@@ -10695,44 +10643,17 @@ liquid_handler.prcxi:
type: string
port:
type: integer
rail_interval:
default: 0
type: string
rail_nums:
default: 4
type: string
rail_width:
default: 27.5
type: string
setup:
default: true
type: string
simulator:
default: false
type: string
start_rail:
default: 2
type: string
step_mode:
default: false
type: string
timeout:
type: number
x_increase:
default: -0.003636
type: string
x_offset:
default: -0.8
type: string
xy_coupling:
default: -0.0045
type: string
y_increase:
default: -0.003636
type: string
y_offset:
default: -37.98
type: string
required:
- deck
- host

View File

@@ -5,6 +5,7 @@ import sys
import inspect
import importlib
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Union, Tuple
@@ -88,6 +89,14 @@ class Registry:
)
test_latency_schema["description"] = "用于测试延迟的动作,返回延迟时间和时间差。"
test_resource_method_info = host_node_enhanced_info.get("action_methods", {}).get("test_resource", {})
test_resource_schema = self._generate_unilab_json_command_schema(
test_resource_method_info.get("args", []),
"test_resource",
test_resource_method_info.get("return_annotation"),
)
test_resource_schema["description"] = "用于测试物料、设备和样本。"
self.device_type_registry.update(
{
"host_node": {
@@ -189,32 +198,7 @@ class Registry:
"goal": {},
"feedback": {},
"result": {},
"schema": {
"description": "",
"properties": {
"feedback": {},
"goal": {
"properties": {
"resource": ros_message_to_json_schema(Resource, "resource"),
"resources": {
"items": {
"properties": ros_message_to_json_schema(
Resource, "resources"
),
"type": "object",
},
"type": "array",
},
"device": {"type": "string"},
"devices": {"items": {"type": "string"}, "type": "array"},
},
"type": "object",
},
"result": {},
},
"title": "test_resource",
"type": "object",
},
"schema": test_resource_schema,
"placeholder_keys": {
"device": "unilabos_devices",
"devices": "unilabos_devices",
@@ -944,6 +928,7 @@ class Registry:
if is_valid:
results.append((file, data, device_ids))
except Exception as e:
traceback.print_exc()
logger.warning(f"[UniLab Registry] 处理设备文件异常: {file}, 错误: {e}")
# 线程安全地更新注册表

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -38,24 +38,52 @@ class LabSample(TypedDict):
extra: Dict[str, Any]
class ResourceDictPositionSizeType(TypedDict):
depth: float
width: float
height: float
class ResourceDictPositionSize(BaseModel):
depth: float = Field(description="Depth", default=0.0) # z
width: float = Field(description="Width", default=0.0) # x
height: float = Field(description="Height", default=0.0) # y
class ResourceDictPositionScaleType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionScale(BaseModel):
x: float = Field(description="x scale", default=0.0)
y: float = Field(description="y scale", default=0.0)
z: float = Field(description="z scale", default=0.0)
class ResourceDictPositionObjectType(TypedDict):
x: float
y: float
z: float
class ResourceDictPositionObject(BaseModel):
x: float = Field(description="X coordinate", default=0.0)
y: float = Field(description="Y coordinate", default=0.0)
z: float = Field(description="Z coordinate", default=0.0)
class ResourceDictPositionType(TypedDict):
size: ResourceDictPositionSizeType
scale: ResourceDictPositionScaleType
layout: Literal["2d", "x-y", "z-y", "x-z"]
position: ResourceDictPositionObjectType
position3d: ResourceDictPositionObjectType
rotation: ResourceDictPositionObjectType
cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"]
class ResourceDictPosition(BaseModel):
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
@@ -74,6 +102,24 @@ class ResourceDictPosition(BaseModel):
)
class ResourceDictType(TypedDict):
id: str
uuid: str
name: str
description: str
resource_schema: Dict[str, Any]
model: Dict[str, Any]
icon: str
parent_uuid: Optional[str]
parent: Optional["ResourceDictType"]
type: Union[Literal["device"], str]
klass: str
pose: ResourceDictPositionType
config: Dict[str, Any]
data: Dict[str, Any]
extra: Dict[str, Any]
# 统一的资源字典模型parent 自动序列化为 parent_uuidchildren 不序列化
class ResourceDict(BaseModel):
id: str = Field(description="Resource ID")
@@ -468,10 +514,17 @@ class ResourceTreeSet(object):
trees.append(tree_instance)
return cls(trees)
def to_plr_resources(self, skip_devices=True) -> List["PLRResource"]:
def to_plr_resources(
self, skip_devices: bool = True, requested_uuids: Optional[List[str]] = None
) -> List["PLRResource"]:
"""
将 ResourceTreeSet 转换为 PLR 资源列表
Args:
skip_devices: 是否跳过 device 类型节点
requested_uuids: 若指定,则按此 UUID 顺序返回对应资源(用于批量查询时一一对应),
否则返回各树的根节点列表
Returns:
List[PLRResource]: PLR 资源实例列表
"""
@@ -526,6 +579,71 @@ class ResourceTreeSet(object):
d["model"] = res.config.get("model", None)
return d
# deserialize 会单独处理的元数据 key不传给构造函数
_META_KEYS = {"type", "parent_name", "location", "children", "rotation", "barcode"}
# deserialize 自定义逻辑使用的 key如 TipSpot 用 prototype_tip 构建 make_tip需保留
_DESERIALIZE_PRESERVED_KEYS = {"prototype_tip"}
def remove_incompatible_params(plr_d: dict) -> None:
"""递归移除 PLR 类不接受的参数,避免 deserialize 报错。
- 移除构造函数不接受的参数(如 compute_height_from_volume、ordering、category
- 对 TubeRack将 ordering 转为 ordered_items
- 保留 deserialize 自定义逻辑需要的 key如 prototype_tip
"""
if "type" in plr_d:
sub_cls = find_subclass(plr_d["type"], PLRResource)
if sub_cls is not None:
spec = inspect.signature(sub_cls)
valid_params = set(spec.parameters.keys())
# TubeRack 特殊处理:先转换 ordering再参与后续过滤
if "ordering" not in valid_params and "ordering" in plr_d:
ordering = plr_d.pop("ordering", None)
if sub_cls.__name__ == "TubeRack":
plr_d["ordered_items"] = (
_ordering_to_ordered_items(plr_d, ordering)
if ordering
else {}
)
# 移除构造函数不接受的参数(保留 META 和 deserialize 自定义逻辑需要的 key
for key in list(plr_d.keys()):
if (
key not in _META_KEYS
and key not in _DESERIALIZE_PRESERVED_KEYS
and key not in valid_params
):
plr_d.pop(key, None)
for child in plr_d.get("children", []):
remove_incompatible_params(child)
def _ordering_to_ordered_items(plr_d: dict, ordering: dict) -> dict:
"""将 ordering 转为 ordered_items从 children 构建 Tube 对象"""
from pylabrobot.resources import Tube, Coordinate
from pylabrobot.serializer import deserialize as plr_deserialize
children = plr_d.get("children", [])
ordered_items = {}
for idx, (ident, child_name) in enumerate(ordering.items()):
child_data = children[idx] if idx < len(children) else None
if child_data is None:
continue
loc_data = child_data.get("location")
loc = (
plr_deserialize(loc_data)
if loc_data
else Coordinate(0, 0, 0)
)
tube = Tube(
name=child_data.get("name", child_name or ident),
size_x=child_data.get("size_x", 10),
size_y=child_data.get("size_y", 10),
size_z=child_data.get("size_z", 50),
max_volume=child_data.get("max_volume", 1000),
)
tube.location = loc
ordered_items[ident] = tube
plr_d["children"] = [] # 已并入 ordered_items避免重复反序列化
return ordered_items
plr_resources = []
tracker = DeviceNodeResourceTracker()
@@ -545,9 +663,7 @@ class ResourceTreeSet(object):
raise ValueError(
f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}"
)
spec = inspect.signature(sub_cls)
if "category" not in spec.parameters:
plr_dict.pop("category", None)
remove_incompatible_params(plr_dict)
plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True)
from pylabrobot.resources import Coordinate
from pylabrobot.serializer import deserialize
@@ -567,6 +683,18 @@ class ResourceTreeSet(object):
logger.error(f"堆栈: {traceback.format_exc()}")
raise
if requested_uuids:
# 按请求的 UUID 顺序返回对应资源(从整棵树中按 uuid 提取)
result = []
for uid in requested_uuids:
if uid in tracker.uuid_to_resources:
result.append(tracker.uuid_to_resources[uid])
else:
raise ValueError(
f"请求的 UUID {uid} 在资源树中未找到。"
f"可用 UUID 数量: {len(tracker.uuid_to_resources)}"
)
return result
return plr_resources
@classmethod

View File

@@ -460,7 +460,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
}
res.response = json.dumps(final_response)
# 如果driver自己就有assign的方法那就使用driver自己的assign方法
if hasattr(self.driver_instance, "create_resource"):
if hasattr(self.driver_instance, "create_resource") and self.node_name != "host_node":
create_resource_func = getattr(self.driver_instance, "create_resource")
try:
ret = create_resource_func(
@@ -1389,9 +1389,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if uuid_indices:
uuids = [item[1] for item in uuid_indices]
resource_tree = await self.get_resource(uuids)
plr_resources = resource_tree.to_plr_resources()
plr_resources = resource_tree.to_plr_resources(requested_uuids=uuids)
for i, (idx, _, resource_data) in enumerate(uuid_indices):
plr_resource = plr_resources[i]
try:
plr_resource = plr_resources[i]
except Exception as e:
self.lab_logger().error(f"资源查询结果: 共 {len(queried_resources)} 个资源,但查询结果只有 {len(plr_resources)} 个资源,索引为 {i} 的资源不存在")
raise e
if "sample_id" in resource_data:
plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"]
queried_resources[idx] = plr_resource

View File

@@ -35,7 +35,7 @@ from unilabos.resources.resource_tracker import (
ResourceTreeInstance,
RETURN_UNILABOS_SAMPLES,
JSON_UNILABOS_PARAM,
PARAM_SAMPLE_UUIDS,
PARAM_SAMPLE_UUIDS, SampleUUIDsType, LabSample,
)
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
@@ -51,6 +51,7 @@ from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info
from unilabos.config.config import BasicConfig
if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem
@@ -63,7 +64,8 @@ class DeviceActionStatus:
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[DeviceSlot]
devices: List[Dict[str, Any]]
unilabos_samples: List[LabSample]
class TestLatencyReturn(TypedDict):
@@ -776,6 +778,17 @@ class HostNode(BaseROS2DeviceNode):
u = uuid.UUID(item.job_id)
device_id = item.device_id
action_name = item.action_name
if BasicConfig.test_mode:
action_id = f"/devices/{device_id}/{action_name}"
self.lab_logger().info(
f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}"
)
# 根据注册表 handles 构建模拟返回值
mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs)
self._handle_test_mode_result(item, action_id, mock_return)
return
if action_type.startswith("UniLabJsonCommand"):
if action_name.startswith("auto-"):
action_name = action_name[5:]
@@ -813,6 +826,51 @@ class HostNode(BaseROS2DeviceNode):
)
future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f))
def _build_test_mode_return(
self, device_id: str, action_name: str, action_kwargs: Dict[str, Any]
) -> Dict[str, Any]:
"""
根据注册表 handles 的 output 定义构建测试模式的模拟返回值
根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。
例如: "vessel"{}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]]
"""
mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name}
action_mappings = self._action_value_mappings.get(device_id, {})
action_mapping = action_mappings.get(action_name, {})
handles = action_mapping.get("handles", {})
if isinstance(handles, dict):
for output_handle in handles.get("output", []):
data_key = output_handle.get("data_key", "")
handler_key = output_handle.get("handler_key", "")
# 根据 @flatten 层数构建嵌套数组,叶子为空字典
flatten_count = data_key.count("@flatten")
value: Any = {}
for _ in range(flatten_count):
value = [value]
mock_return[handler_key] = value
return mock_return
def _handle_test_mode_result(
self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any]
) -> None:
"""
测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS
"""
job_id = item.job_id
status = "success"
return_info = serialize_result_info("", True, mock_return)
self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}")
from unilabos.app.web.controller import store_job_result
store_job_result(job_id, status, return_info, mock_return)
# 发布状态到桥接器
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(mock_return, item, status, return_info)
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""目标响应回调"""
goal_handle = future.result()
@@ -1525,6 +1583,7 @@ class HostNode(BaseROS2DeviceNode):
def test_resource(
self,
sample_uuids: SampleUUIDsType,
resource: ResourceSlot = None,
resources: List[ResourceSlot] = None,
device: DeviceSlot = None,
@@ -1539,6 +1598,7 @@ class HostNode(BaseROS2DeviceNode):
return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources], known_newly_created=True).dump(),
"devices": [device, *devices],
"unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()]
}
def handle_pong_response(self, pong_data: dict):

View File

@@ -21,12 +21,12 @@
},
"host": "10.20.30.184",
"port": 9999,
"debug": true,
"setup": true,
"debug": false,
"setup": false,
"is_9320": true,
"timeout": 10,
"matrix_id": "5de524d0-3f95-406c-86dd-f83626ebc7cb",
"simulator": true,
"simulator": false,
"channel_num": 2
},
"data": {
@@ -789,6 +789,47 @@
]
},
"data": {}
},
{
"id": "trash",
"name": "trash",
"children": [],
"parent": "T16",
"type": "trash",
"class": "",
"position": {
"x": 0,
"y": 0,
"z": 0
},
"config": {
"type": "PRCXI9300Trash",
"size_x": 127.5,
"size_y": 86,
"size_z": 10,
"rotation": {
"x": 0,
"y": 0,
"z": 0,
"type": "Rotation"
},
"category": "trash",
"model": null,
"barcode": null,
"max_volume": "Infinity",
"material_z_thickness": 0,
"compute_volume_from_height": null,
"compute_height_from_volume": null
},
"data": {
"liquids": [],
"pending_liquids": [],
"liquid_history": [],
"Material": {
"uuid": "730067cf07ae43849ddf4034299030e9"
}
}
}
],
"edges": []

View File

@@ -119,11 +119,14 @@ DEVICE_NAME_DEFAULT = "PRCXI" # transfer_liquid, set_liquid_from_plate 等动
# 节点类型
NODE_TYPE_DEFAULT = "ILab" # 所有节点的默认类型
CLASS_NAMES_MAPPING = {
"plate": "PRCXI_BioER_96_wellplate",
"tip_rack": "PRCXI_300ul_Tips",
}
# create_resource 节点默认参数
CREATE_RESOURCE_DEFAULTS = {
"device_id": "/PRCXI",
"parent_template": "/PRCXI/PRCXI_Deck/T{slot}", # {slot} 会被替换为实际的 slot 值
"class_name": "PRCXI_BioER_96_wellplate",
}
# 默认液体体积 (uL)
@@ -362,6 +365,7 @@ def build_protocol_graph(
protocol_steps: List[Dict[str, Any]],
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
labware_defs: Optional[List[Dict[str, Any]]] = None,
) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
@@ -387,6 +391,7 @@ def build_protocol_graph(
slots_info[slot] = {
"labware": item.get("labware", ""),
"res_id": res_id,
"labware_id": labware_id,
}
# 创建 Group 节点,包含所有 create_resource 节点
@@ -405,17 +410,17 @@ def build_protocol_graph(
)
# 为每个唯一的 slot 创建 create_resource 节点
res_index = 0
for slot, info in slots_info.items():
node_id = str(uuid.uuid4())
res_id = info["res_id"]
res_index += 1
res_type_name = info["labware"].lower().replace(".", "point")
res_type_name = f"lab_{res_type_name}"
G.add_node(
node_id,
template_name="create_resource",
resource_name="host_node",
name=f"Plate {res_index}",
name=f"{res_type_name}_slot{slot}",
description=f"Create plate on slot {slot}",
lab_node_type="Labware",
footer="create_resource-host_node",
@@ -426,14 +431,15 @@ def build_protocol_graph(
param={
"res_id": res_id,
"device_id": CREATE_RESOURCE_DEFAULTS["device_id"],
"class_name": CREATE_RESOURCE_DEFAULTS["class_name"],
"class_name": res_type_name,
"parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot),
"bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0},
"slot_on_deck": slot,
},
)
slot_to_create_resource[slot] = node_id
if "tip" in res_type_name and "rack" in res_type_name:
resource_last_writer[info["labware_id"]] = f"{node_id}:labware"
# create_resource 之间不需要 ready 连接
# ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ====================
@@ -516,6 +522,7 @@ def build_protocol_graph(
"reagent": "reagent",
"solvent": "solvent",
"compound": "compound",
"tip_racks": "tip_rack_identifier",
}
OUTPUT_PORT_MAPPING = {

View File

@@ -1,16 +1,20 @@
"""
JSON 工作流转换模块
将 workflow/reagent 格式的 JSON 转换为统一工作流格式。
将 workflow/reagent/labware 格式的 JSON 转换为统一工作流格式。
输入格式:
{
"labware": [
{"name": "...", "slot": "1", "type": "lab_xxx"},
...
],
"workflow": [
{"action": "...", "action_args": {...}},
...
],
"reagent": {
"reagent_name": {"slot": int, "well": [...], "labware": "..."},
"reagent_name": {"slot": int, "well": [...]},
...
}
}
@@ -245,18 +249,18 @@ def convert_from_json(
if "workflow" not in json_data or "reagent" not in json_data:
raise ValueError(
"不支持的 JSON 格式。请使用标准格式:\n"
'{"workflow": [{"action": "...", "action_args": {...}}, ...], '
'"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}'
'{"labware": [...], "workflow": [...], "reagent": {...}}'
)
# 提取数据
workflow = json_data["workflow"]
reagent = json_data["reagent"]
labware_defs = json_data.get("labware", []) # 新的 labware 定义列表
# 规范化步骤数据
protocol_steps = normalize_workflow_steps(workflow)
# reagent 已经是字典格式,直接使
# reagent 已经是字典格式,用于 set_liquid 和 well 数量查找
labware_info = reagent
# 构建工作流图
@@ -265,6 +269,7 @@ def convert_from_json(
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
labware_defs=labware_defs,
)
# 校验句柄配置

View File

@@ -41,6 +41,7 @@ def upload_workflow(
workflow_name: Optional[str] = None,
tags: Optional[List[str]] = None,
published: bool = False,
description: str = "",
) -> Dict[str, Any]:
"""
上传工作流到服务器
@@ -56,6 +57,7 @@ def upload_workflow(
workflow_name: 工作流名称,如果不提供则从文件中读取或使用文件名
tags: 工作流标签列表,默认为空列表
published: 是否发布工作流默认为False
description: 工作流描述,发布时使用
Returns:
Dict: API响应数据
@@ -75,6 +77,14 @@ def upload_workflow(
print_status(f"工作流文件JSON解析失败: {e}", "error")
return {"code": -1, "message": f"JSON解析失败: {e}"}
# 从 JSON 文件中提取 description 和 tags作为 fallback
if not description and "description" in workflow_data:
description = workflow_data["description"]
print_status(f"从文件中读取 description", "info")
if not tags and "tags" in workflow_data:
tags = workflow_data["tags"]
print_status(f"从文件中读取 tags: {tags}", "info")
# 自动检测并转换格式
if not _is_node_link_format(workflow_data):
try:
@@ -96,6 +106,7 @@ def upload_workflow(
print_status(f" - 节点数量: {len(nodes)}", "info")
print_status(f" - 边数量: {len(edges)}", "info")
print_status(f" - 标签: {tags or []}", "info")
print_status(f" - 描述: {description[:50]}{'...' if len(description) > 50 else ''}", "info")
print_status(f" - 发布状态: {published}", "info")
# 调用 http_client 上传
@@ -107,6 +118,7 @@ def upload_workflow(
edges=edges,
tags=tags,
published=published,
description=description,
)
if result.get("code") == 0:
@@ -131,8 +143,9 @@ def handle_workflow_upload_command(args_dict: Dict[str, Any]) -> None:
workflow_name = args_dict.get("workflow_name")
tags = args_dict.get("tags", [])
published = args_dict.get("published", False)
description = args_dict.get("description", "")
if workflow_file:
upload_workflow(workflow_file, workflow_name, tags, published)
upload_workflow(workflow_file, workflow_name, tags, published, description)
else:
print_status("未指定工作流文件路径,请使用 -f/--workflow_file 参数", "error")