From a7a6d77d7a1489b401221f05e2d9995b0dc3c9a7 Mon Sep 17 00:00:00 2001 From: yexiaozhou Date: Fri, 3 Apr 2026 01:42:22 +0800 Subject: [PATCH] fix(layout_optimizer): apply code review follow-ups --- unilabos/layout_optimizer/README.md | 5 +- unilabos/layout_optimizer/constraints.py | 58 ++++++--- unilabos/layout_optimizer/device_catalog.py | 14 ++- .../layout_optimizer/intent_interpreter.py | 52 +++++--- .../llm_skill/layout_intent_translator.md | 28 +++++ unilabos/layout_optimizer/models.py | 3 +- unilabos/layout_optimizer/server.py | 115 +++++++++++++++--- .../tests/test_bugfixes_v2.py | 12 +- .../tests/test_device_catalog.py | 12 ++ .../tests/test_intent_interpreter.py | 13 ++ .../tests/test_interpret_api.py | 2 +- .../layout_optimizer/tests/test_optimizer.py | 90 ++++++++++++++ 12 files changed, 336 insertions(+), 68 deletions(-) diff --git a/unilabos/layout_optimizer/README.md b/unilabos/layout_optimizer/README.md index 25e2a6f2..6a0548a0 100644 --- a/unilabos/layout_optimizer/README.md +++ b/unilabos/layout_optimizer/README.md @@ -167,13 +167,14 @@ Returns `{"status": "ok"}`. --- -## 3. Intent Types (10 total) +## 3. Intent Types (11 total) | Intent | Params | Generates | Type | |--------|--------|-----------|------| | `reachable_by` | `arm` (str), `targets` (list[str]) | `reachability` per target | hard | | `close_together` | `devices` (list[str]), `priority` (low/medium/high) | `minimize_distance` per pair | soft | | `far_apart` | `devices` (list[str]), `priority` | `maximize_distance` per pair | soft | +| `keep_adjacent` | `devices` (list[str]), `priority` | `minimize_distance` per pair | soft | | `max_distance` | `device_a`, `device_b`, `distance` (float m) | `distance_less_than` | hard | | `min_distance` | `device_a`, `device_b`, `distance` (float m) | `distance_greater_than` | hard | | `min_spacing` | `min_gap` (float m, default 0.3) | `min_spacing` | hard | @@ -182,7 +183,7 @@ Returns `{"status": "ok"}`. | `face_inward` | (none) | `prefer_orientation_mode` inward | soft | | `align_cardinal` | (none) | `prefer_aligned` | soft | -Priority weights: `low=1.0`, `medium=3.0`, `high=8.0`. +Intent priorities are baked into the final emitted constraint `weight` during interpretation. The caller only sees the resulting weight, not a separate constraint-level priority field. --- diff --git a/unilabos/layout_optimizer/constraints.py b/unilabos/layout_optimizer/constraints.py index 6dd74000..0aefcdf7 100644 --- a/unilabos/layout_optimizer/constraints.py +++ b/unilabos/layout_optimizer/constraints.py @@ -6,6 +6,7 @@ from __future__ import annotations +import logging import math from typing import TYPE_CHECKING @@ -24,6 +25,8 @@ if TYPE_CHECKING: from .interfaces import CollisionChecker, ReachabilityChecker +logger = logging.getLogger(__name__) + # 归一化默认权重 — 1cm距离违规 ≈ 5°角度违规 的惩罚量级 DEFAULT_WEIGHT_DISTANCE: float = 100.0 # 1cm → penalty 1.0 DEFAULT_WEIGHT_ANGLE: float = 60.0 # 5° → penalty ~1.0 @@ -175,10 +178,7 @@ def _evaluate_single( params = constraint.params is_hard = constraint.type == "hard" - # 根据优先级等级计算有效权重 effective_weight = constraint.weight - if constraint.priority and constraint.priority in PRIORITY_MULTIPLIERS: - effective_weight *= PRIORITY_MULTIPLIERS[constraint.priority] if rule == "no_collision": checker_placements = _to_checker_format_from_maps(device_map, placement_map) @@ -208,8 +208,9 @@ def _evaluate_single( max_dist = params["distance"] da, db = device_map.get(a_id), device_map.get(b_id) pa, pb = placement_map.get(a_id), placement_map.get(b_id) - if pa is None or pb is None: - return 0.0 + missing_cost = _missing_reference_cost(constraint, placement_map, a_id, b_id) + if missing_cost is not None: + return missing_cost if da and db: dist = _device_distance_obb(da, pa, db, pb) else: @@ -226,8 +227,9 @@ def _evaluate_single( min_dist = params["distance"] da, db = device_map.get(a_id), device_map.get(b_id) pa, pb = placement_map.get(a_id), placement_map.get(b_id) - if pa is None or pb is None: - return 0.0 + missing_cost = _missing_reference_cost(constraint, placement_map, a_id, b_id) + if missing_cost is not None: + return missing_cost if da and db: dist = _device_distance_obb(da, pa, db, pb) else: @@ -243,8 +245,9 @@ def _evaluate_single( a_id, b_id = params["device_a"], params["device_b"] da, db = device_map.get(a_id), device_map.get(b_id) pa, pb = placement_map.get(a_id), placement_map.get(b_id) - if pa is None or pb is None: - return 0.0 + missing_cost = _missing_reference_cost(constraint, placement_map, a_id, b_id) + if missing_cost is not None: + return missing_cost if da and db: dist = _device_distance_obb(da, pa, db, pb) else: @@ -255,8 +258,9 @@ def _evaluate_single( a_id, b_id = params["device_a"], params["device_b"] da, db = device_map.get(a_id), device_map.get(b_id) pa, pb = placement_map.get(a_id), placement_map.get(b_id) - if pa is None or pb is None: - return 0.0 + missing_cost = _missing_reference_cost(constraint, placement_map, a_id, b_id) + if missing_cost is not None: + return missing_cost if da and db: dist = _device_distance_obb(da, pa, db, pb) else: @@ -293,8 +297,11 @@ def _evaluate_single( target_device_id = params["target_device_id"] arm_p = placement_map.get(arm_id) target_p = placement_map.get(target_device_id) - if arm_p is None or target_p is None: - return 0.0 + missing_cost = _missing_reference_cost( + constraint, placement_map, arm_id, target_device_id, + ) + if missing_cost is not None: + return missing_cost arm_dev = device_map.get(arm_id) target_dev = device_map.get(target_device_id) @@ -546,19 +553,36 @@ def evaluate_constraints_breakdown( c, device_map, placement_map, lab, collision_checker, reachability_checker, graduated=True, ) - ew = c.weight - if c.priority and c.priority in PRIORITY_MULTIPLIERS: - ew *= PRIORITY_MULTIPLIERS[c.priority] results.append({ "name": _constraint_display_name(c), "rule": c.rule_name, "type": c.type, "cost": cost, - "weight": ew, + "weight": c.weight, }) return results +def _missing_reference_cost( + constraint: Constraint, + placement_map: dict[str, Placement], + *device_ids: str, +) -> float | None: + """当约束引用不存在的设备时返回对应 cost。""" + missing = sorted({device_id for device_id in device_ids if device_id not in placement_map}) + if not missing: + return None + + logger.warning( + "Constraint %s references missing device IDs: %s", + constraint.rule_name, + ", ".join(missing), + ) + if constraint.type == "hard": + return math.inf + return 0.0 + + def _constraint_display_name(c: Constraint) -> str: """为约束生成可读的显示名称。""" params = c.params diff --git a/unilabos/layout_optimizer/device_catalog.py b/unilabos/layout_optimizer/device_catalog.py index eca4669b..e19408f6 100644 --- a/unilabos/layout_optimizer/device_catalog.py +++ b/unilabos/layout_optimizer/device_catalog.py @@ -9,6 +9,7 @@ footprints.json 由 extract_footprints.py 生成,包含碰撞包围盒、开 from __future__ import annotations +from collections import Counter import json import logging from pathlib import Path @@ -262,11 +263,17 @@ def create_devices_from_list( """ footprints = load_footprints() devices = [] + catalog_counts = Counter(spec["id"] for spec in device_specs) + catalog_seen: Counter[str] = Counter() + for spec in device_specs: catalog_id = spec["id"] - # Use uuid as internal ID when available to ensure uniqueness - # (multiple instances of the same catalog device get different IDs) - dev_id = spec.get("uuid") or catalog_id + catalog_seen[catalog_id] += 1 + instance_idx = catalog_seen[catalog_id] + if catalog_counts[catalog_id] > 1 and instance_idx > 1: + dev_id = f"{catalog_id}#{instance_idx}" + else: + dev_id = catalog_id size = spec.get("size") if size: bbox = (float(size[0]), float(size[1])) @@ -289,6 +296,7 @@ def create_devices_from_list( bbox=bbox, device_type=spec.get("device_type", "static"), openings=openings, + uuid=spec.get("uuid", ""), ) ) return devices diff --git a/unilabos/layout_optimizer/intent_interpreter.py b/unilabos/layout_optimizer/intent_interpreter.py index 0f6e2a50..6ee838c5 100644 --- a/unilabos/layout_optimizer/intent_interpreter.py +++ b/unilabos/layout_optimizer/intent_interpreter.py @@ -6,6 +6,7 @@ import itertools from collections.abc import Callable from dataclasses import dataclass, field +from .constraints import PRIORITY_MULTIPLIERS from .models import Constraint, Intent # 优先级权重映射 @@ -13,6 +14,16 @@ _PRIORITY_WEIGHTS: dict[str, float] = {"low": 1.0, "medium": 3.0, "high": 8.0} _DEFAULT_WEIGHT = _PRIORITY_WEIGHTS["medium"] +def _priority_key(priority: str) -> str: + """将 intent priority 映射到 constraint 权重等级。""" + return "normal" if priority == "medium" else priority + + +def _final_weight(base_weight: float, priority: str) -> float: + """在解释阶段直接烘焙优先级乘数。""" + return base_weight * PRIORITY_MULTIPLIERS.get(priority, 1.0) + + @dataclass class InterpretResult: """意图解释结果。""" @@ -41,7 +52,7 @@ def _handle_reachable_by(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="reachability", params={"arm_id": arm, "target_device_id": target}, - priority="critical", + weight=_final_weight(1.0, "critical"), ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -64,9 +75,10 @@ def _handle_close_together(intent: Intent, result: InterpretResult) -> None: result.errors.append(f"close_together: 参数 'devices' 至少需要 2 个设备,当前 {len(devices)} 个") return - weight = _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT) - # 映射 intent priority 到 constraint priority 等级 - constraint_priority = "high" if priority == "high" else "normal" + weight = _final_weight( + _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT), + _priority_key(priority), + ) generated: list[dict] = [] for dev_a, dev_b in itertools.combinations(devices, 2): c = Constraint( @@ -74,7 +86,6 @@ def _handle_close_together(intent: Intent, result: InterpretResult) -> None: rule_name="minimize_distance", params={"device_a": dev_a, "device_b": dev_b}, weight=weight, - priority=constraint_priority, ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -97,9 +108,10 @@ def _handle_far_apart(intent: Intent, result: InterpretResult) -> None: result.errors.append(f"far_apart: 参数 'devices' 至少需要 2 个设备,当前 {len(devices)} 个") return - weight = _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT) - # 映射 intent priority 到 constraint priority 等级 - constraint_priority = "high" if priority == "high" else "normal" + weight = _final_weight( + _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT), + _priority_key(priority), + ) generated: list[dict] = [] for dev_a, dev_b in itertools.combinations(devices, 2): c = Constraint( @@ -107,7 +119,6 @@ def _handle_far_apart(intent: Intent, result: InterpretResult) -> None: rule_name="maximize_distance", params={"device_a": dev_a, "device_b": dev_b}, weight=weight, - priority=constraint_priority, ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -138,7 +149,7 @@ def _handle_max_distance(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="distance_less_than", params={"device_a": device_a, "device_b": device_b, "distance": distance}, - priority="normal", + weight=_final_weight(1.0, "normal"), ) result.constraints.append(c) @@ -168,7 +179,7 @@ def _handle_min_distance(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="distance_greater_than", params={"device_a": device_a, "device_b": device_b, "distance": distance}, - priority="normal", + weight=_final_weight(1.0, "normal"), ) result.constraints.append(c) @@ -189,7 +200,7 @@ def _handle_min_spacing(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="min_spacing", params={"min_gap": min_gap}, - priority="high", + weight=_final_weight(1.0, "high"), ) result.constraints.append(c) @@ -208,7 +219,7 @@ def _handle_face_outward(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="prefer_orientation_mode", params={"mode": "outward"}, - priority="low", + weight=_final_weight(1.0, "low"), ) result.constraints.append(c) @@ -227,7 +238,7 @@ def _handle_face_inward(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="prefer_orientation_mode", params={"mode": "inward"}, - priority="low", + weight=_final_weight(1.0, "low"), ) result.constraints.append(c) @@ -246,7 +257,7 @@ def _handle_align_cardinal(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="prefer_aligned", params={}, - priority="low", + weight=_final_weight(1.0, "low"), ) result.constraints.append(c) @@ -268,9 +279,10 @@ def _handle_keep_adjacent(intent: Intent, result: InterpretResult) -> None: result.errors.append(f"keep_adjacent: 参数 'devices' 至少需要 2 个设备,当前 {len(devices)} 个") return - weight = _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT) - # 映射 intent priority 到 constraint priority 等级 - constraint_priority = "high" if priority == "high" else "normal" + weight = _final_weight( + _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT), + _priority_key(priority), + ) generated: list[dict] = [] for dev_a, dev_b in itertools.combinations(devices, 2): c = Constraint( @@ -278,7 +290,6 @@ def _handle_keep_adjacent(intent: Intent, result: InterpretResult) -> None: rule_name="minimize_distance", params={"device_a": dev_a, "device_b": dev_b}, weight=weight, - priority=constraint_priority, ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -309,7 +320,7 @@ def _handle_workflow_hint(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="minimize_distance", params={"device_a": dev_a, "device_b": dev_b}, - priority="normal", + weight=_final_weight(1.0, "normal"), ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -336,6 +347,7 @@ _HANDLERS: dict[str, Callable[[Intent, InterpretResult], None]] = { "face_outward": _handle_face_outward, "face_inward": _handle_face_inward, "align_cardinal": _handle_align_cardinal, + "keep_adjacent": _handle_keep_adjacent, "workflow_hint": _handle_workflow_hint, } diff --git a/unilabos/layout_optimizer/llm_skill/layout_intent_translator.md b/unilabos/layout_optimizer/llm_skill/layout_intent_translator.md index bf5f844a..1251937c 100644 --- a/unilabos/layout_optimizer/llm_skill/layout_intent_translator.md +++ b/unilabos/layout_optimizer/llm_skill/layout_intent_translator.md @@ -53,6 +53,7 @@ You MUST output a JSON object with an `intents` array. Each intent has: } ``` **Priority:** `"low"` (nice-to-have), `"medium"` (default), `"high"` (critical for workflow speed) +Priority is only part of the intent input. The interpreter automatically bakes it into the emitted constraint `weight`; there is no separate constraint-level `priority` field in `/interpret` output or `/optimize` input. ### `far_apart` — Devices should be separated ```json @@ -66,6 +67,18 @@ You MUST output a JSON object with an `intents` array. Each intent has: ``` **When to use:** Thermal interference, contamination risk, safety separation. +### `keep_adjacent` — Devices should stay adjacent +```json +{ + "intent": "keep_adjacent", + "params": { + "devices": ["device_a", "device_b"], + "priority": "high" + } +} +``` +**When to use:** User explicitly asks for a pair or group to stay side-by-side / adjacent. This currently maps to the same optimizer behavior as `close_together`, but is semantically more precise. + ### `max_distance` — Hard limit on maximum distance ```json { @@ -146,6 +159,20 @@ Devices in scene: 4. **Type match**: "robot arm" / "the arm" → look for `device_type: articulation` 5. **Ambiguous**: If multiple devices could match, list candidates in the `description` field and pick the most likely one. If truly ambiguous, return an error intent asking the user to clarify. +### Duplicate Device Convention + +When the same catalog device appears multiple times in the scene: + +- first instance keeps the bare catalog ID, e.g. `plate_reader` +- second and later instances use `#N`, e.g. `plate_reader#2`, `plate_reader#3` +- a bare ID in an intent fans out to all instances +- a suffixed ID applies only to that specific instance + +Examples: + +- `{"devices": ["plate_reader", "storage_hotel"]}` applies to every `plate_reader` instance +- `{"devices": ["plate_reader#2", "storage_hotel"]}` applies only to the second instance + ### Example Resolution User says: "the robot should reach the PCR machine and the liquid handler" @@ -167,6 +194,7 @@ When a user describes a process (e.g., "prepare samples, then run PCR, then seal ### 3. Implicit Constraints - If devices frequently exchange items → `close_together` (high priority) +- If user explicitly says "keep these adjacent", "side by side", or "next to each other" → `keep_adjacent` - If a robot arm is mentioned "in between" → `reachable_by` for all involved devices - If user says "short transit" or "fast transfer" → `close_together` with `"priority": "high"` - If user says "keep X away from Y" → `far_apart` or `min_distance` diff --git a/unilabos/layout_optimizer/models.py b/unilabos/layout_optimizer/models.py index 394c4ef7..5d2760b1 100644 --- a/unilabos/layout_optimizer/models.py +++ b/unilabos/layout_optimizer/models.py @@ -33,6 +33,7 @@ class Device: model_path: str = "" model_type: str = "" thumbnail_url: str = "" + uuid: str = "" @dataclass @@ -84,8 +85,6 @@ class Constraint: params: dict = field(default_factory=dict) # 仅 soft 约束使用 weight: float = 1.0 - # 优先级等级,影响有效权重的乘数 - priority: str | None = None # "critical" | "high" | "normal" | "low" @dataclass diff --git a/unilabos/layout_optimizer/server.py b/unilabos/layout_optimizer/server.py index d0522d0d..8ceb0092 100644 --- a/unilabos/layout_optimizer/server.py +++ b/unilabos/layout_optimizer/server.py @@ -19,6 +19,8 @@ from __future__ import annotations +from collections import defaultdict +import itertools import logging import logging.handlers import math @@ -120,6 +122,7 @@ else: # ---------- 设备目录缓存 ---------- _device_cache: list[dict] | None = None +_DEVICE_PARAM_KEYS = {"device_a", "device_b", "arm_id", "target_device_id", "device"} # 消耗品/配件关键词(不独立放置于实验台) @@ -199,6 +202,80 @@ def _build_device_list() -> list[dict]: return _device_cache +def _catalog_id_from_internal(device_id: str) -> str: + """内部实例 ID → catalog ID。""" + return device_id.split("#", 1)[0] + + +def _expand_constraints_for_duplicates( + constraints: list[Constraint], devices: list, +) -> list[Constraint]: + """将引用 bare catalog ID 的约束扩展到所有重复实例。""" + catalog_instances: dict[str, list[str]] = defaultdict(list) + for dev in devices: + catalog_instances[_catalog_id_from_internal(dev.id)].append(dev.id) + + expanded_constraints: list[Constraint] = [] + for constraint in constraints: + fan_out_keys: list[str] = [] + fan_out_values: list[list[str]] = [] + + for key in _DEVICE_PARAM_KEYS: + if key not in constraint.params: + continue + ref_id = constraint.params[key] + if "#" in ref_id: + continue + instances = catalog_instances.get(ref_id, []) + if len(instances) > 1: + fan_out_keys.append(key) + fan_out_values.append(instances) + logger.info( + "Fan-out: %s %s=%s -> %d instances", + constraint.rule_name, key, ref_id, len(instances), + ) + + if not fan_out_keys: + expanded_constraints.append(constraint) + continue + + for combo in itertools.product(*fan_out_values): + new_params = dict(constraint.params) + for key, internal_id in zip(fan_out_keys, combo): + new_params[key] = internal_id + expanded_constraints.append( + Constraint( + type=constraint.type, + rule_name=constraint.rule_name, + params=new_params, + weight=constraint.weight, + ) + ) + + return expanded_constraints + + +def _maybe_add_prefer_aligned_constraint( + constraints: list[Constraint], align_weight: float, +) -> list[Constraint]: + """仅在用户未显式提供 prefer_aligned 时注入对齐约束。""" + if align_weight <= 0: + return constraints + + if any(c.rule_name == "prefer_aligned" for c in constraints): + logger.info("Skipping auto-injected prefer_aligned because one already exists") + return constraints + + constraints.append( + Constraint( + type="soft", + rule_name="prefer_aligned", + weight=align_weight, + ) + ) + return constraints + + # ---------- 路由 ---------- @@ -322,6 +399,14 @@ async def interpret_schema(): }, "generates": "soft maximize_distance for each pair", }, + "keep_adjacent": { + "description": "Devices should stay adjacent, similar to close_together", + "params": { + "devices": {"type": "list[string]", "required": True, "description": "Device IDs (min 2)"}, + "priority": {"type": "string", "required": False, "default": "medium", "enum": ["low", "medium", "high"]}, + }, + "generates": "soft minimize_distance for each pair", + }, "max_distance": { "description": "Two devices must be within a maximum distance", "params": { @@ -410,6 +495,7 @@ class OptimizeRequest(BaseModel): seed: int | None = None snap_cardinal: bool = False angle_granularity: int | None = None + arm_reach: dict[str, float] = {} class PositionXYZ(BaseModel): @@ -439,7 +525,7 @@ async def run_optimize(request: OptimizeRequest): from fastapi import HTTPException from .constraints import evaluate_default_hard_constraints, evaluate_constraints - from .mock_checkers import MockCollisionChecker + from .mock_checkers import MockCollisionChecker, MockReachabilityChecker from .optimizer import optimize, snap_theta, snap_theta_safe from .seeders import resolve_seeder_params, seed_layout @@ -460,19 +546,12 @@ async def run_optimize(request: OptimizeRequest): detail="angle_granularity must be one of: 4, 8, 12, 24", ) - # Build mapping: internal uuid-based id → (catalog_id, uuid) - # create_devices_from_list uses uuid as Device.id when available - id_to_catalog: dict[str, str] = {} - id_to_uuid: dict[str, str] = {} - for d in request.devices: - internal_id = d.uuid or d.id - id_to_catalog[internal_id] = d.id - id_to_uuid[internal_id] = d.uuid or d.id - # 转换输入 devices = create_devices_from_list( [d.model_dump() for d in request.devices] ) + id_to_catalog = {dev.id: _catalog_id_from_internal(dev.id) for dev in devices} + id_to_uuid = {dev.id: (dev.uuid or dev.id) for dev in devices} lab = parse_lab(request.lab.model_dump()) constraints = [ Constraint( @@ -483,6 +562,7 @@ async def run_optimize(request: OptimizeRequest): ) for c in request.constraints ] + constraints = _expand_constraints_for_duplicates(constraints, devices) # 1. Resolve seeder try: @@ -509,23 +589,22 @@ async def run_optimize(request: OptimizeRequest): weight=request.seeder_overrides.get("orientation_weight", DEFAULT_WEIGHT_ANGLE), )) # prefer_aligned: penalize non-cardinal angles(默认关闭,用户可通过 align_cardinal intent 或 seeder_overrides 开启) - align_weight = request.seeder_overrides.get("align_weight", 0) - if align_weight > 0: - constraints.append(Constraint( - type="soft", - rule_name="prefer_aligned", - weight=align_weight, - )) + constraints = _maybe_add_prefer_aligned_constraint( + constraints, + request.seeder_overrides.get("align_weight", 0), + ) # 4. Conditional Differential Evolution de_ran = False checker = MockCollisionChecker() + reachability_checker = MockReachabilityChecker(request.arm_reach or None) if request.run_de: result_placements = optimize( devices=devices, lab=lab, constraints=constraints, collision_checker=checker, + reachability_checker=reachability_checker, seed_placements=seed_placements, maxiter=request.maxiter, seed=request.seed, @@ -552,7 +631,7 @@ async def run_optimize(request: OptimizeRequest): # 也检查用户硬约束(binary 模式) if constraints and not math.isinf(final_cost): user_hard_cost = evaluate_constraints( - devices, result_placements, lab, constraints, checker, + devices, result_placements, lab, constraints, checker, reachability_checker, graduated=False, ) if math.isinf(user_hard_cost): diff --git a/unilabos/layout_optimizer/tests/test_bugfixes_v2.py b/unilabos/layout_optimizer/tests/test_bugfixes_v2.py index 616a52b6..b8609782 100644 --- a/unilabos/layout_optimizer/tests/test_bugfixes_v2.py +++ b/unilabos/layout_optimizer/tests/test_bugfixes_v2.py @@ -1,7 +1,7 @@ """Regression tests for V2 Stage 1 bugfixes. Covers: -- Duplicate device ID stacking (uuid-based internal IDs) +- Duplicate device ID stacking (catalog ID + #N internal IDs) - DE orientation preservation (prefer_orientation_mode constraint) - prefer_aligned auto-injection and adjustability - Preset switch reorientation @@ -101,16 +101,18 @@ class TestDuplicateDeviceIDs: graduated=False) assert math.isinf(cost_binary) - def test_create_devices_uses_uuid(self): - """create_devices_from_list should use uuid as Device.id.""" + def test_create_devices_uses_catalog_id_with_suffixes(self): + """create_devices_from_list should keep catalog IDs and suffix duplicates.""" from ..device_catalog import create_devices_from_list specs = [ {"id": "opentrons_liquid_handler", "uuid": "abc-123"}, {"id": "opentrons_liquid_handler", "uuid": "def-456"}, ] devices = create_devices_from_list(specs) - assert devices[0].id == "abc-123" - assert devices[1].id == "def-456" + assert devices[0].id == "opentrons_liquid_handler" + assert devices[1].id == "opentrons_liquid_handler#2" + assert devices[0].uuid == "abc-123" + assert devices[1].uuid == "def-456" # Both should have the same bbox from footprints assert devices[0].bbox == devices[1].bbox diff --git a/unilabos/layout_optimizer/tests/test_device_catalog.py b/unilabos/layout_optimizer/tests/test_device_catalog.py index ed720aeb..a9e63c2b 100644 --- a/unilabos/layout_optimizer/tests/test_device_catalog.py +++ b/unilabos/layout_optimizer/tests/test_device_catalog.py @@ -181,6 +181,18 @@ class TestCreateDevicesFromList: devs = create_devices_from_list(specs) assert devs[0].bbox != (0.6, 0.4) # 使用 footprints 中的真实尺寸 + def test_duplicate_catalog_ids_use_suffixes_and_store_uuid(self): + specs = [ + {"id": "opentrons_liquid_handler", "uuid": "u1"}, + {"id": "opentrons_liquid_handler", "uuid": "u2"}, + ] + devs = create_devices_from_list(specs) + assert [dev.id for dev in devs] == [ + "opentrons_liquid_handler", + "opentrons_liquid_handler#2", + ] + assert [dev.uuid for dev in devs] == ["u1", "u2"] + # ---------- server endpoint (需要 httpx) ---------- diff --git a/unilabos/layout_optimizer/tests/test_intent_interpreter.py b/unilabos/layout_optimizer/tests/test_intent_interpreter.py index 08e2e1c3..0971f19e 100644 --- a/unilabos/layout_optimizer/tests/test_intent_interpreter.py +++ b/unilabos/layout_optimizer/tests/test_intent_interpreter.py @@ -53,6 +53,8 @@ def test_close_together_priority_scales_weight(): low = interpret_intents([Intent(intent="close_together", params={"devices": ["a", "b"], "priority": "low"})]) high = interpret_intents([Intent(intent="close_together", params={"devices": ["a", "b"], "priority": "high"})]) assert high.constraints[0].weight > low.constraints[0].weight + assert high.constraints[0].weight == pytest.approx(16.0) + assert low.constraints[0].weight == pytest.approx(0.5) def test_close_together_single_device_error(): @@ -124,6 +126,17 @@ def test_face_inward(): def test_align_cardinal(): result = interpret_intents([Intent(intent="align_cardinal")]) assert result.constraints[0].rule_name == "prefer_aligned" + assert result.constraints[0].weight == pytest.approx(0.5) + + +def test_keep_adjacent_generates_minimize_distance(): + result = interpret_intents([Intent(intent="keep_adjacent", params={ + "devices": ["opentrons_liquid_handler", "agilent_plateloc"], + "priority": "high", + })]) + assert len(result.constraints) == 1 + assert result.constraints[0].rule_name == "minimize_distance" + assert result.constraints[0].weight == pytest.approx(16.0) # --- min_spacing --- diff --git a/unilabos/layout_optimizer/tests/test_interpret_api.py b/unilabos/layout_optimizer/tests/test_interpret_api.py index 0d3d6d0a..419a0147 100644 --- a/unilabos/layout_optimizer/tests/test_interpret_api.py +++ b/unilabos/layout_optimizer/tests/test_interpret_api.py @@ -103,7 +103,7 @@ def test_interpret_schema_returns_all_intents(): data = resp.json() intents = data["intents"] expected = { - "reachable_by", "close_together", "far_apart", + "reachable_by", "close_together", "far_apart", "keep_adjacent", "max_distance", "min_distance", "min_spacing", "workflow_hint", "face_outward", "face_inward", "align_cardinal", } diff --git a/unilabos/layout_optimizer/tests/test_optimizer.py b/unilabos/layout_optimizer/tests/test_optimizer.py index 4dc7fbe0..2f478636 100644 --- a/unilabos/layout_optimizer/tests/test_optimizer.py +++ b/unilabos/layout_optimizer/tests/test_optimizer.py @@ -313,6 +313,96 @@ def test_optimize_endpoint_accepts_angle_granularity_with_pcr_fixture(): ) +def test_optimize_endpoint_binary_reachability_uses_checker(): + """Final binary evaluation should fail when hard reachability is impossible.""" + resp = _post_app("/optimize", { + "devices": [ + {"id": "arm_slider", "name": "Arm", "device_type": "articulation", "size": [1.0, 0.2]}, + {"id": "opentrons_liquid_handler", "name": "Target", "size": [0.6, 0.5]}, + ], + "lab": {"width": 8.0, "depth": 4.0}, + "constraints": [ + { + "type": "hard", + "rule_name": "reachability", + "params": { + "arm_id": "arm_slider", + "target_device_id": "opentrons_liquid_handler", + }, + "weight": 1.0, + } + ], + "seeder": "row_fallback", + "run_de": False, + "arm_reach": {"arm_slider": 0.0}, + }) + + assert resp.status_code == 200 + data = resp.json() + assert data["success"] is False + assert data["cost"] is None or not math.isfinite(data["cost"]) + + +def test_expand_constraints_for_duplicates_fans_out_bare_catalog_ids(): + """Bare catalog IDs should expand to all duplicate instances.""" + from ..device_catalog import create_devices_from_list + from ..server import _expand_constraints_for_duplicates + + devices = create_devices_from_list([ + {"id": "opentrons_liquid_handler", "uuid": "u1"}, + {"id": "opentrons_liquid_handler", "uuid": "u2"}, + {"id": "arm_slider", "uuid": "arm-u1"}, + ]) + constraints = [ + Constraint( + type="hard", + rule_name="reachability", + params={"arm_id": "arm_slider", "target_device_id": "opentrons_liquid_handler"}, + ) + ] + + expanded = _expand_constraints_for_duplicates(constraints, devices) + + assert len(expanded) == 2 + assert {c.params["target_device_id"] for c in expanded} == { + "opentrons_liquid_handler", + "opentrons_liquid_handler#2", + } + assert all(c.params["arm_id"] == "arm_slider" for c in expanded) + + +def test_maybe_add_prefer_aligned_constraint_skips_existing(): + """Explicit prefer_aligned should win over server auto-injection.""" + from ..server import _maybe_add_prefer_aligned_constraint + + constraints = [Constraint(type="soft", rule_name="prefer_aligned", weight=0.5)] + result = _maybe_add_prefer_aligned_constraint(constraints, 3.0) + + assert result is constraints + assert [c.weight for c in result if c.rule_name == "prefer_aligned"] == [0.5] + + +def test_optimize_endpoint_duplicate_instances_return_catalog_id_and_unique_uuid(): + """Duplicate catalog devices should round-trip with shared catalog ID and distinct UUIDs.""" + resp = _post_app("/optimize", { + "devices": [ + {"id": "opentrons_liquid_handler", "uuid": "u1"}, + {"id": "opentrons_liquid_handler", "uuid": "u2"}, + ], + "lab": {"width": 3.0, "depth": 3.0}, + "seeder": "row_fallback", + "run_de": False, + }) + + assert resp.status_code == 200 + placements = resp.json()["placements"] + assert [p["device_id"] for p in placements] == [ + "opentrons_liquid_handler", + "opentrons_liquid_handler", + ] + assert {p["uuid"] for p in placements} == {"u1", "u2"} + + def test_full_pipeline_seed_only(): """Full pipeline: seeder → snap_theta → correct count and bounds.