diff --git a/unilabos/layout_optimizer/constraints.py b/unilabos/layout_optimizer/constraints.py index f726428a..6dd74000 100644 --- a/unilabos/layout_optimizer/constraints.py +++ b/unilabos/layout_optimizer/constraints.py @@ -16,10 +16,12 @@ from .obb import ( obb_corners, obb_min_distance, obb_penetration_depth, - segment_intersects_obb, + segment_obb_intersection_length, ) if TYPE_CHECKING: + from typing import Any + from .interfaces import CollisionChecker, ReachabilityChecker # 归一化默认权重 — 1cm距离违规 ≈ 5°角度违规 的惩罚量级 @@ -296,10 +298,7 @@ def _evaluate_single( arm_dev = device_map.get(arm_id) target_dev = device_map.get(target_device_id) - # Distance from target's opening surface center to nearest point on arm OBB. - # This naturally enforces orientation: a device facing away has its opening - # far from the arm, so it fails reachability without needing a separate - # facing penalty. + # opening surface center → nearest point on arm OBB if arm_dev and target_dev: opening_pt = _opening_surface_center(target_dev, target_p) arm_corners = obb_corners( @@ -308,27 +307,30 @@ def _evaluate_single( nearest = nearest_point_on_obb(opening_pt[0], opening_pt[1], arm_corners) dist = math.sqrt((opening_pt[0] - nearest[0])**2 + (opening_pt[1] - nearest[1])**2) else: + opening_pt = (target_p.x, target_p.y) + nearest = (arm_p.x, arm_p.y) dist = _device_distance_center(arm_p, target_p) or 0.0 + # 交叉惩罚始终计算(soft, 不依赖可达性结果) + crossing_cost = _crossing_penalty( + opening_pt, nearest, + arm_id, target_device_id, + device_map, placement_map, + ) + arm_pose = {"x": arm_p.x, "y": arm_p.y, "theta": arm_p.theta} target_point = {"x": target_p.x, "y": target_p.y, "z": 0.0} target_point["_obb_dist"] = dist if not reachability_checker.is_reachable(arm_id, arm_pose, target_point): if is_hard and not graduated: return math.inf - # Graduated: penalty proportional to overshoot + # Graduated: overshoot penalty + crossing cost max_reach = reachability_checker.arm_reach.get(arm_id, 2.0) overshoot = max(0.0, dist - max_reach) w = effective_weight * (HARD_MULTIPLIER if is_hard else 1.0) - return w * overshoot * 10.0 + return w * overshoot * 10.0 + crossing_cost - # Line-of-sight penalty: penalize if any other device OBB blocks - # the path from opening to arm - los_cost = _line_of_sight_penalty( - arm_id, arm_p, target_device_id, target_p, - device_map, placement_map, effective_weight, - ) - return los_cost + return crossing_cost if rule == "prefer_aligned": alignment_cost = sum( @@ -482,23 +484,111 @@ def _opening_surface_center( return (world_x, world_y) -def _line_of_sight_penalty( +def evaluate_default_hard_constraints_breakdown( + devices: list[Device], + placements: list[Placement], + lab: Lab, + collision_checker: CollisionChecker, + *, + collision_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER, + boundary_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER, +) -> dict[str, float]: + """与 evaluate_default_hard_constraints 逻辑相同,但返回分项明细。""" + device_map = {d.id: d for d in devices} + collision_cost = 0.0 + boundary_cost = 0.0 + + candidate_pairs = sweep_and_prune_pairs(devices, placements) + for i, j in candidate_pairs: + di, dj = device_map[placements[i].device_id], device_map[placements[j].device_id] + ci = obb_corners(placements[i].x, placements[i].y, + di.bbox[0], di.bbox[1], placements[i].theta) + cj = obb_corners(placements[j].x, placements[j].y, + dj.bbox[0], dj.bbox[1], placements[j].theta) + depth = obb_penetration_depth(ci, cj) + if depth > 0: + collision_cost += collision_weight * depth + + for p in placements: + dev = device_map[p.device_id] + hw, hd = p.rotated_bbox(dev) + overshoot = 0.0 + overshoot += max(0.0, hw - p.x) + overshoot += max(0.0, (p.x + hw) - lab.width) + overshoot += max(0.0, hd - p.y) + overshoot += max(0.0, (p.y + hd) - lab.depth) + boundary_cost += boundary_weight * overshoot + + return { + "collision": collision_cost, + "boundary": boundary_cost, + "total": collision_cost + boundary_cost, + "collision_weight": collision_weight, + "boundary_weight": boundary_weight, + } + + +def evaluate_constraints_breakdown( + devices: list[Device], + placements: list[Placement], + lab: Lab, + constraints: list[Constraint], + collision_checker: CollisionChecker, + reachability_checker: ReachabilityChecker | None = None, +) -> list[dict[str, Any]]: + """与 evaluate_constraints 逻辑相同,但返回每条约束的分项明细。""" + device_map = {d.id: d for d in devices} + placement_map = {p.device_id: p for p in placements} + + results = [] + for c in constraints: + cost = _evaluate_single( + c, device_map, placement_map, lab, collision_checker, reachability_checker, + graduated=True, + ) + ew = c.weight + if c.priority and c.priority in PRIORITY_MULTIPLIERS: + ew *= PRIORITY_MULTIPLIERS[c.priority] + results.append({ + "name": _constraint_display_name(c), + "rule": c.rule_name, + "type": c.type, + "cost": cost, + "weight": ew, + }) + return results + + +def _constraint_display_name(c: Constraint) -> str: + """为约束生成可读的显示名称。""" + params = c.params + if c.rule_name in ( + "distance_less_than", "distance_greater_than", + "minimize_distance", "maximize_distance", + ): + return f"{c.rule_name}({params.get('device_a', '?')}, {params.get('device_b', '?')})" + if c.rule_name == "reachability": + return f"reachability({params.get('arm_id', '?')}, {params.get('target_device_id', '?')})" + if c.rule_name == "min_spacing": + return f"min_spacing(gap={params.get('min_gap', '?')})" + if c.rule_name == "prefer_orientation_mode": + return f"prefer_orientation_mode({params.get('mode', '?')})" + return c.rule_name + + +def _crossing_penalty( + opening_pt: tuple[float, float], + arm_nearest_pt: tuple[float, float], arm_id: str, - arm_p: Placement, target_id: str, - target_p: Placement, device_map: dict[str, Device], placement_map: dict[str, Placement], - weight: float, ) -> float: - """Penalty for other devices blocking the line from target to arm center. + """交叉惩罚:其他设备 OBB 遮挡 opening→arm 路径的长度加权 penalty。 - For each other device whose OBB intersects the segment (target_center → arm_center), - adds a penalty proportional to the weight. This encourages layouts where - the arm has a clear path to each target. + Soft penalty,权重 = DEFAULT_WEIGHT_DISTANCE * 穿过各遮挡设备 OBB 的线段长度之和。 + 始终生效(不论可达性是否通过),为 DE 提供清晰的梯度信号。 """ - p1 = (target_p.x, target_p.y) - p2 = (arm_p.x, arm_p.y) cost = 0.0 for dev_id, p in placement_map.items(): if dev_id == arm_id or dev_id == target_id: @@ -507,6 +597,6 @@ def _line_of_sight_penalty( if dev is None: continue corners = obb_corners(p.x, p.y, dev.bbox[0], dev.bbox[1], p.theta) - if segment_intersects_obb(p1, p2, corners): - cost += weight * 2.0 # penalty per blocking device + crossing_len = segment_obb_intersection_length(opening_pt, arm_nearest_pt, corners) + cost += DEFAULT_WEIGHT_DISTANCE * crossing_len return cost diff --git a/unilabos/layout_optimizer/llm_skill/demo_agent.md b/unilabos/layout_optimizer/llm_skill/demo_agent.md index c7b03ea2..8fa369e1 100644 --- a/unilabos/layout_optimizer/llm_skill/demo_agent.md +++ b/unilabos/layout_optimizer/llm_skill/demo_agent.md @@ -86,7 +86,8 @@ Build the optimize request using: - `constraints`: from Step 3 interpret response - `workflow_edges`: from Step 3 interpret response - `seeder`: `"compact_outward"` (default) -- `seeder_overrides`: `{"align_weight": W}` — if user explicitly says "align cardinal", "lock to horizontal/vertical", or "angle locked", use `align_weight: 15.0` instead of default 2.0 +- `seeder_overrides`: generally not needed. Cardinal alignment is handled by the `align_cardinal` intent (generates `prefer_aligned` constraint). Do NOT use `align_weight` in seeder_overrides — it is deprecated. +- `snap_cardinal`: `false` (default). Set `true` only if user explicitly requests snapping to 0/90/180/270. - `run_de`: `true` - `maxiter`: `200` - `seed`: `42` diff --git a/unilabos/layout_optimizer/obb.py b/unilabos/layout_optimizer/obb.py index eaf0a6b2..e89619a1 100644 --- a/unilabos/layout_optimizer/obb.py +++ b/unilabos/layout_optimizer/obb.py @@ -183,6 +183,56 @@ def _point_in_convex( return True +def segment_obb_intersection_length( + p1: tuple[float, float], + p2: tuple[float, float], + corners: list[tuple[float, float]], +) -> float: + """线段 p1-p2 与 OBB(凸多边形)的交集长度。 + + Cyrus-Beck 线段裁剪算法。corners 假定为 CCW 顺序(obb_corners 生成)。 + 无交集返回 0.0。 + """ + dx = p2[0] - p1[0] + dy = p2[1] - p1[1] + seg_len_sq = dx * dx + dy * dy + if seg_len_sq < 1e-24: + return 0.0 + + t_enter = 0.0 + t_exit = 1.0 + n = len(corners) + + for i in range(n): + ax, ay = corners[i] + bx, by = corners[(i + 1) % n] + # CCW 多边形边的外法线: (ey, -ex), e = b - a + ex, ey = bx - ax, by - ay + nx, ny = ey, -ex + + denom = nx * dx + ny * dy + numer = nx * (p1[0] - ax) + ny * (p1[1] - ay) + + if abs(denom) < 1e-12: + if numer > 0: + return 0.0 # 在此边外侧且平行 + continue + + t = -numer / denom + if denom < 0: + t_enter = max(t_enter, t) # 进入 + else: + t_exit = min(t_exit, t) # 退出 + + if t_enter > t_exit: + return 0.0 + + if t_enter >= t_exit: + return 0.0 + + return (t_exit - t_enter) * math.sqrt(seg_len_sq) + + def obb_min_distance( corners_a: list[tuple[float, float]], corners_b: list[tuple[float, float]], diff --git a/unilabos/layout_optimizer/optimizer.py b/unilabos/layout_optimizer/optimizer.py index 84ea0aa7..9c1f33f2 100644 --- a/unilabos/layout_optimizer/optimizer.py +++ b/unilabos/layout_optimizer/optimizer.py @@ -13,7 +13,12 @@ from typing import Any, Callable import numpy as np -from .constraints import evaluate_constraints, evaluate_default_hard_constraints +from .constraints import ( + evaluate_constraints, + evaluate_constraints_breakdown, + evaluate_default_hard_constraints, + evaluate_default_hard_constraints_breakdown, +) from .mock_checkers import MockCollisionChecker, MockReachabilityChecker from .models import Constraint, Device, Lab, Placement from .pencil_integration import generate_initial_layout @@ -34,6 +39,7 @@ def _run_de( seed: int | None, n_devices: int, strategy: str = "currenttobest1bin", + progress_callback: Callable[[int, np.ndarray, float], None] | None = None, ) -> tuple[np.ndarray, float, int]: """自定义差分进化循环。 @@ -56,6 +62,7 @@ def _run_de( seed: 随机种子 n_devices: 设备数量(用于 per-device crossover) strategy: 变异策略,"currenttobest1bin" 或 "best1bin" + progress_callback: 每 10 代调用一次 (gen, best_vector, best_cost) Returns: (best_vector, best_cost, n_generations) @@ -125,6 +132,10 @@ def _run_de( # 更新 best_idx(种群可能整体更新) best_idx = int(np.argmin(costs)) + # 进度回调:每 10 代报告最优个体状态 + if progress_callback and gen % 10 == 0: + progress_callback(gen, best_vector, best_cost) + # Early stopping:最近 patience 代改善 < 0.1% best_cost_history.append(best_cost) if len(best_cost_history) >= patience: @@ -296,6 +307,36 @@ def optimize( n, 3 * n, pop_count, maxiter, strategy, ) + # DEBUG 模式进度回调:每 10 代输出完整约束分项表格 + def _progress_cb(gen: int, best_vec: np.ndarray, best_cost_val: float) -> None: + if not logger.isEnabledFor(logging.DEBUG): + return + pls = _vector_to_placements(best_vec, devices) + hard_bd = evaluate_default_hard_constraints_breakdown( + devices, pls, lab, collision_checker, + ) + lines = [f"=== DE Gen {gen} | best_cost={best_cost_val:.4f} ==="] + lines.append(f" {'Constraint':<45} {'Type':<6} {'Weight':>8} {'Cost':>10}") + lines.append(f" {'─' * 71}") + lines.append( + f" {'[predefined] collision':<45} {'hard':<6} {hard_bd['collision_weight']:>8.0f} {hard_bd['collision']:>10.4f}" + ) + lines.append( + f" {'[predefined] boundary':<45} {'hard':<6} {hard_bd['boundary_weight']:>8.0f} {hard_bd['boundary']:>10.4f}" + ) + if constraints: + user_bd = evaluate_constraints_breakdown( + devices, pls, lab, constraints, + collision_checker, reachability_checker, + ) + for item in user_bd: + lines.append( + f" {item['name']:<45} {item['type']:<6} {item['weight']:>8.1f} {item['cost']:>10.4f}" + ) + lines.append(f" {'─' * 71}") + lines.append(f" {'TOTAL':<45} {'':6} {'':>8} {best_cost_val:>10.4f}") + logger.debug("\n".join(lines)) + best_vector, best_cost, n_generations = _run_de( cost_fn=cost_function, bounds=bounds_array, @@ -308,15 +349,52 @@ def optimize( seed=seed, n_devices=n, strategy=strategy, + progress_callback=_progress_cb, ) # 评估次数估算:每代 pop_count 次(初始 + 每代 trial) n_evaluations = pop_count + n_generations * pop_count - logger.info( - "DE optimization complete: success=%s, cost=%.4f, iterations=%d, evaluations=%d", - best_cost < 1e17, best_cost, n_generations, n_evaluations, + # 最终布局分项明细(INFO 级别) + final_placements = _vector_to_placements(best_vector, devices) + hard_bd = evaluate_default_hard_constraints_breakdown( + devices, final_placements, lab, collision_checker, ) + # success = 所有 hard 约束均满足(predefined + 用户 hard) + all_hard_met = hard_bd["total"] == 0.0 + # 所有约束的 top violators 候选池(predefined + user) + all_violators: list[dict] = [ + {"name": "[predefined] collision", "cost": hard_bd["collision"]}, + {"name": "[predefined] boundary", "cost": hard_bd["boundary"]}, + ] + if constraints: + user_bd = evaluate_constraints_breakdown( + devices, final_placements, lab, constraints, + collision_checker, reachability_checker, + ) + user_total = sum(item["cost"] for item in user_bd) + for c_item in user_bd: + all_violators.append({"name": c_item["name"], "cost": c_item["cost"]}) + if c_item["type"] == "hard" and c_item["cost"] > 0: + all_hard_met = False + else: + user_bd = [] + user_total = 0.0 + + summary = [ + "DE complete: success=%s, cost=%.4f, %d gens, %d evals" + % (all_hard_met, best_cost, n_generations, n_evaluations), + " Predefined: subtotal=%.4f" % hard_bd["total"], + ] + if constraints: + summary.append(f" User: subtotal={user_total:.4f}") + top_violators = sorted(all_violators, key=lambda x: x["cost"], reverse=True)[:3] + top_violators = [v for v in top_violators if v["cost"] > 0] + if top_violators: + summary.append(" Top violators:") + for v in top_violators: + summary.append(f" {v['name']} = {v['cost']:.4f}") + logger.info("\n".join(summary)) return _vector_to_placements(best_vector, devices) diff --git a/unilabos/layout_optimizer/server.py b/unilabos/layout_optimizer/server.py index d3107d23..2b334658 100644 --- a/unilabos/layout_optimizer/server.py +++ b/unilabos/layout_optimizer/server.py @@ -4,7 +4,14 @@ 集成阶段合并到 Uni-Lab-OS 的 FastAPI 服务中。 运行方式: - uvicorn layout_optimizer.server:app --host 0.0.0.0 --port 8000 --reload + uvicorn unilabos.layout_optimizer.server:app --host 0.0.0.0 --port 8000 --reload + +调试模式(启用 DEBUG 日志,含优化器逐代 cost 明细): + LAYOUT_DEBUG=1 uvicorn unilabos.layout_optimizer.server:app --host 0.0.0.0 --port 8000 --reload + +日志文件: + 自动写入 layout_optimizer/logs/{YYYYMMDD_HHMMSS}.log(始终 DEBUG 级别)。 + 前端 1s 轮询的 GET /scene/placements 200 行不写入日志文件。 前端访问: http://localhost:8000/ @@ -13,8 +20,10 @@ from __future__ import annotations import logging +import logging.handlers import math import os +from datetime import datetime from pathlib import Path from fastapi import FastAPI @@ -36,9 +45,40 @@ from .intent_interpreter import InterpretResult, interpret_intents from .models import Constraint, Intent from .optimizer import optimize -logging.basicConfig(level=logging.INFO) +_console_level = logging.DEBUG if os.getenv("LAYOUT_DEBUG") else logging.INFO +# root logger must be DEBUG so the file handler receives all records; +# console output level is controlled separately via its handler. +logging.basicConfig(level=logging.DEBUG) +# basicConfig creates a default StreamHandler — set its level to the console level +for _h in logging.getLogger().handlers: + if isinstance(_h, logging.StreamHandler): + _h.setLevel(_console_level) logger = logging.getLogger(__name__) +# --- 文件日志:实时写入 logs/ 目录,按启动时间命名 --- +_LOG_DIR = Path(__file__).parent / "logs" +_LOG_DIR.mkdir(exist_ok=True) +_log_file = _LOG_DIR / f"{datetime.now():%Y%m%d_%H%M%S}.log" + + +class _PollingFilter(logging.Filter): + """过滤掉前端 1s 轮询产生的 GET /scene/placements 日志行。""" + + def filter(self, record: logging.LogRecord) -> bool: + msg = record.getMessage() + if "GET /scene/placements" in msg and "200" in msg: + return False + return True + + +_file_handler = logging.FileHandler(_log_file, encoding="utf-8") +_file_handler.setLevel(logging.DEBUG) +_file_handler.setFormatter( + logging.Formatter("%(asctime)s %(levelname)-5s [%(name)s] %(message)s") +) +_file_handler.addFilter(_PollingFilter()) +logging.getLogger().addHandler(_file_handler) + STATIC_DIR = Path(__file__).parent / "static" # 可配置路径 @@ -368,6 +408,7 @@ class OptimizeRequest(BaseModel): workflow_edges: list[list[str]] = [] maxiter: int = 200 seed: int | None = None + snap_cardinal: bool = False class PositionXYZ(BaseModel): @@ -459,8 +500,8 @@ async def run_optimize(request: OptimizeRequest): params={"mode": orientation_mode}, weight=request.seeder_overrides.get("orientation_weight", DEFAULT_WEIGHT_ANGLE), )) - # prefer_aligned: penalize non-cardinal angles - align_weight = request.seeder_overrides.get("align_weight", DEFAULT_WEIGHT_ANGLE) + # prefer_aligned: penalize non-cardinal angles(默认关闭,用户可通过 align_cardinal intent 或 seeder_overrides 开启) + align_weight = request.seeder_overrides.get("align_weight", 0) if align_weight > 0: constraints.append(Constraint( type="soft", @@ -486,8 +527,9 @@ async def run_optimize(request: OptimizeRequest): else: result_placements = seed_placements - # 5. θ snap post-processing(碰撞安全:snap 后验证,失败则回退) - result_placements = snap_theta_safe(result_placements, devices, lab, checker) + # 5. θ snap post-processing(opt-in,默认关闭) + if request.snap_cardinal: + result_placements = snap_theta_safe(result_placements, devices, lab, checker) # 6. Evaluate final cost (binary mode for pass/fail reporting) final_cost = evaluate_default_hard_constraints( diff --git a/unilabos/layout_optimizer/tests/test_bugfixes_v2.py b/unilabos/layout_optimizer/tests/test_bugfixes_v2.py index 8e63fa31..616a52b6 100644 --- a/unilabos/layout_optimizer/tests/test_bugfixes_v2.py +++ b/unilabos/layout_optimizer/tests/test_bugfixes_v2.py @@ -376,3 +376,58 @@ class TestScenarios: assert not _has_collision(devices, result) for i, p in enumerate(result): assert _facing_dot(p, devices[i], lab) > 0 + + +# ── V2 Stage 1: 默认关闭 cardinal snap/alignment ──────── + +class TestV2Stage1Bugfixes: + """align_weight 默认为 0,snap_cardinal 默认关闭。""" + + def test_default_align_weight_is_zero(self): + """Default request (no seeder_overrides) should NOT inject prefer_aligned.""" + from fastapi.testclient import TestClient + from ..server import app + + client = TestClient(app) + resp = client.post("/optimize", json={ + "devices": [{"id": "opentrons_liquid_handler", "uuid": "u1"}], + "lab": {"width": 3, "depth": 3}, + "seeder": "compact_outward", + "run_de": True, + "maxiter": 50, + "seed": 42, + }) + assert resp.status_code == 200 + + def test_snap_cardinal_off_by_default(self): + """Default request should NOT snap theta to cardinal.""" + from fastapi.testclient import TestClient + from ..server import app + + client = TestClient(app) + resp = client.post("/optimize", json={ + "devices": [{"id": "opentrons_liquid_handler", "uuid": "u1"}], + "lab": {"width": 3, "depth": 3}, + "seeder": "compact_outward", + "run_de": True, + "maxiter": 10, + "seed": 42, + }) + assert resp.status_code == 200 + + def test_snap_cardinal_opt_in(self): + """snap_cardinal=True should be accepted and snap angles.""" + from fastapi.testclient import TestClient + from ..server import app + + client = TestClient(app) + resp = client.post("/optimize", json={ + "devices": [{"id": "opentrons_liquid_handler", "uuid": "u1"}], + "lab": {"width": 3, "depth": 3}, + "seeder": "compact_outward", + "snap_cardinal": True, + "run_de": True, + "maxiter": 10, + "seed": 42, + }) + assert resp.status_code == 200 diff --git a/unilabos/layout_optimizer/tests/test_constraints.py b/unilabos/layout_optimizer/tests/test_constraints.py index ac8b3a68..68c73f9a 100644 --- a/unilabos/layout_optimizer/tests/test_constraints.py +++ b/unilabos/layout_optimizer/tests/test_constraints.py @@ -5,11 +5,15 @@ import math import pytest from ..constraints import ( + _crossing_penalty, + _opening_surface_center, + DEFAULT_WEIGHT_DISTANCE, evaluate_constraints, evaluate_default_hard_constraints, ) from ..mock_checkers import MockCollisionChecker, MockReachabilityChecker -from ..models import Constraint, Device, Lab, Placement +from ..models import Constraint, Device, Opening, Placement, Lab +from ..obb import nearest_point_on_obb, obb_corners def _make_devices(): @@ -421,3 +425,81 @@ class TestGraduatedHardConstraints: devices, placements, _make_lab(), constraints, checker, ) assert not math.isinf(cost) + + +class TestCrossingPenalty: + """_crossing_penalty: 交叉长度加权的 soft penalty。""" + + def _make_device(self, dev_id, bbox=(0.5, 0.5), direction=(0.0, -1.0)): + return Device( + id=dev_id, name=dev_id, device_type="static", + bbox=bbox, height=0.3, + openings=[Opening(direction=direction, label="front")], + ) + + def test_no_blockers_returns_zero(self): + """arm 与 target 之间无遮挡设备 → 交叉代价为 0。""" + arm = self._make_device("arm", bbox=(2.14, 0.35)) + target = self._make_device("target") + arm_p = Placement(device_id="arm", x=2.0, y=1.0, theta=0.0) + target_p = Placement(device_id="target", x=0.5, y=1.0, theta=3.14159) + device_map = {"arm": arm, "target": target} + placement_map = {"arm": arm_p, "target": target_p} + + opening_pt = _opening_surface_center(target, target_p) + arm_corners = obb_corners(arm_p.x, arm_p.y, arm.bbox[0], arm.bbox[1], arm_p.theta) + nearest = nearest_point_on_obb(opening_pt[0], opening_pt[1], arm_corners) + + cost = _crossing_penalty( + opening_pt, nearest, + "arm", "target", + device_map, placement_map, + ) + assert cost == 0.0 + + def test_one_blocker_proportional_to_length(self): + """一个遮挡设备 → cost = DEFAULT_WEIGHT_DISTANCE * 穿过长度。""" + arm = self._make_device("arm", bbox=(2.14, 0.35)) + target = self._make_device("target") + blocker = self._make_device("blocker", bbox=(0.5, 0.5)) + arm_p = Placement(device_id="arm", x=3.0, y=1.0, theta=0.0) + target_p = Placement(device_id="target", x=0.0, y=1.0, theta=0.0) + blocker_p = Placement(device_id="blocker", x=1.5, y=1.0, theta=0.0) + device_map = {"arm": arm, "target": target, "blocker": blocker} + placement_map = {"arm": arm_p, "target": target_p, "blocker": blocker_p} + + opening_pt = _opening_surface_center(target, target_p) + arm_corners = obb_corners(arm_p.x, arm_p.y, arm.bbox[0], arm.bbox[1], arm_p.theta) + nearest = nearest_point_on_obb(opening_pt[0], opening_pt[1], arm_corners) + + cost = _crossing_penalty( + opening_pt, nearest, + "arm", "target", + device_map, placement_map, + ) + # blocker 宽 0.5m,theta=0,路径水平 → 穿过长度 ≈ 0.5m + # cost = DEFAULT_WEIGHT_DISTANCE * 0.5 = 100 * 0.5 = 50 + assert cost > 0 + assert abs(cost - DEFAULT_WEIGHT_DISTANCE * 0.5) < DEFAULT_WEIGHT_DISTANCE * 0.1 + + def test_blocker_off_path_returns_zero(self): + """不在路径上的设备 → 交叉代价为 0。""" + arm = self._make_device("arm", bbox=(2.14, 0.35)) + target = self._make_device("target") + bystander = self._make_device("bystander", bbox=(0.5, 0.5)) + arm_p = Placement(device_id="arm", x=3.0, y=1.0, theta=0.0) + target_p = Placement(device_id="target", x=0.0, y=1.0, theta=0.0) + bystander_p = Placement(device_id="bystander", x=1.5, y=3.0, theta=0.0) + device_map = {"arm": arm, "target": target, "bystander": bystander} + placement_map = {"arm": arm_p, "target": target_p, "bystander": bystander_p} + + opening_pt = _opening_surface_center(target, target_p) + arm_corners = obb_corners(arm_p.x, arm_p.y, arm.bbox[0], arm.bbox[1], arm_p.theta) + nearest = nearest_point_on_obb(opening_pt[0], opening_pt[1], arm_corners) + + cost = _crossing_penalty( + opening_pt, nearest, + "arm", "target", + device_map, placement_map, + ) + assert cost == 0.0 diff --git a/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py b/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py index 53557722..bd5e20e9 100644 --- a/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py +++ b/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py @@ -199,6 +199,8 @@ class TestStage3VerifyPlacements: "run_de": True, "maxiter": 100, "seed": 42, + "snap_cardinal": True, + "seeder_overrides": {"align_weight": 60}, }) data = optimize_resp.json() assert data["success"] is True diff --git a/unilabos/layout_optimizer/tests/test_obb.py b/unilabos/layout_optimizer/tests/test_obb.py index 9fc4d455..c3bbe9e8 100644 --- a/unilabos/layout_optimizer/tests/test_obb.py +++ b/unilabos/layout_optimizer/tests/test_obb.py @@ -1,7 +1,7 @@ """Tests for OBB (Oriented Bounding Box) geometry utilities.""" import math import pytest -from ..obb import obb_corners, obb_overlap, obb_min_distance +from ..obb import obb_corners, obb_overlap, obb_min_distance, segment_obb_intersection_length class TestObbCorners: @@ -115,3 +115,42 @@ class TestObbMinDistance: a = obb_corners(0, 0, 2.0, 2.0, 0.0) b = obb_corners(2.0, 0, 2.0, 2.0, 0.0) assert obb_min_distance(a, b) == pytest.approx(0.0) + + +class TestSegmentOBBIntersectionLength: + """segment_obb_intersection_length: Cyrus-Beck clipping.""" + + def test_segment_fully_outside(self): + corners = obb_corners(0, 0, 2, 2, 0) + length = segment_obb_intersection_length((-5, 3), (5, 3), corners) + assert length == 0.0 + + def test_segment_fully_inside(self): + corners = obb_corners(0, 0, 4, 4, 0) + length = segment_obb_intersection_length((-0.5, 0), (0.5, 0), corners) + assert abs(length - 1.0) < 1e-6 + + def test_segment_crosses_through(self): + corners = obb_corners(0, 0, 2, 2, 0) + length = segment_obb_intersection_length((-5, 0), (5, 0), corners) + assert abs(length - 2.0) < 1e-6 + + def test_segment_partial_overlap(self): + corners = obb_corners(0, 0, 2, 2, 0) + length = segment_obb_intersection_length((0, 0), (5, 0), corners) + assert abs(length - 1.0) < 1e-6 + + def test_rotated_obb(self): + corners = obb_corners(0, 0, 2, 2, math.pi / 4) + length = segment_obb_intersection_length((-3, 0), (3, 0), corners) + expected = 2 * math.sqrt(2) + assert abs(length - expected) < 1e-4 + + def test_zero_length_segment(self): + corners = obb_corners(0, 0, 2, 2, 0) + assert segment_obb_intersection_length((0, 0), (0, 0), corners) == 0.0 + + def test_parallel_outside(self): + corners = obb_corners(0, 0, 2, 2, 0) + length = segment_obb_intersection_length((-5, 2), (5, 2), corners) + assert length == 0.0