From 9ef24b77680943f5cccab19dc2094a47ae167d7d Mon Sep 17 00:00:00 2001 From: yexiaozhou Date: Wed, 1 Apr 2026 00:32:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(layout=5Foptimizer):=20DE=20optimizer=20V2?= =?UTF-8?q?=20=E2=80=94=20custom=20loop,=20graduated=20hard=20constraints,?= =?UTF-8?q?=20broad=20phase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace scipy differential_evolution with custom DE loop for per-device crossover, circular θ wrapping, and configurable mutation strategy (currenttobest1bin default, best1bin as turbo mode). Key improvements: - Graduate ALL hard constraints during DE (proportional penalty instead of flat inf), giving DE smooth gradient for reachability, min_spacing, etc. Binary inf preserved for final pass/fail reporting. - 2-axis sweep-and-prune AABB broad phase for collision pair pruning - Multi-seed injection from multiple seeder presets + Gaussian variants - snap_theta_safe: collision-check after angle snapping, revert on violation - Weight normalization (100 distance / 60 angle / 5× hard multiplier) - Constraint priority field (critical/high/normal/low → weight multiplier) with LLM intent interpreter setting priority per constraint type - Final success field now checks user hard constraints in binary mode - arm_slider added to mock checker reach table (1.07m) Tests: 202 passed, 24 new tests added (optimizer 7, constraints 6, broad_phase 11) Co-Authored-By: Claude Opus 4.6 --- unilabos/layout_optimizer/broad_phase.py | 66 +++++ unilabos/layout_optimizer/constraints.py | 115 +++++--- .../layout_optimizer/intent_interpreter.py | 47 ++++ unilabos/layout_optimizer/mock_checkers.py | 1 + unilabos/layout_optimizer/models.py | 2 + unilabos/layout_optimizer/optimizer.py | 263 +++++++++++++++--- unilabos/layout_optimizer/server.py | 22 +- .../tests/test_broad_phase.py | 241 ++++++++++++++++ .../tests/test_bugfixes_v2.py | 9 +- .../tests/test_constraints.py | 157 ++++++++++- .../tests/test_e2e_pcr_pipeline.py | 8 +- .../layout_optimizer/tests/test_optimizer.py | 224 ++++++++++++++- 12 files changed, 1072 insertions(+), 83 deletions(-) create mode 100644 unilabos/layout_optimizer/broad_phase.py create mode 100644 unilabos/layout_optimizer/tests/test_broad_phase.py diff --git a/unilabos/layout_optimizer/broad_phase.py b/unilabos/layout_optimizer/broad_phase.py new file mode 100644 index 00000000..ca433a38 --- /dev/null +++ b/unilabos/layout_optimizer/broad_phase.py @@ -0,0 +1,66 @@ +"""2 轴 sweep-and-prune 宽相碰撞检测。 + +对每个设备计算旋转后的 AABB,先沿 x 轴排序并剪枝, +再用 y 轴交叠过滤。返回候选碰撞对(索引对列表), +供后续 OBB SAT 精确检测使用。 +""" + +from __future__ import annotations + +from .models import Device, Placement + + +def sweep_and_prune_pairs( + devices: list[Device], + placements: list[Placement], +) -> list[tuple[int, int]]: + """2 轴 sweep-and-prune,返回 AABB 交叠的索引对。 + + Args: + devices: 设备列表,与 placements 一一对应。 + placements: 布局位姿列表。 + + Returns: + 候选碰撞对列表,每个元素为 (i, j), + i < j,索引对应 placements 原始顺序。 + """ + n = len(devices) + if n < 2: + return [] + + # --- 计算每个设备旋转后的 AABB --- + aabbs: list[tuple[float, float, float, float]] = [] + for dev, pl in zip(devices, placements): + hw, hd = pl.rotated_bbox(dev) + aabbs.append((pl.x - hw, pl.x + hw, pl.y - hd, pl.y + hd)) + + # --- 按 xmin 排序,保留原始索引映射 --- + sorted_indices = sorted(range(n), key=lambda k: aabbs[k][0]) + + # --- 扫描 x 轴,y 轴过滤 --- + candidates: list[tuple[int, int]] = [] + for si in range(len(sorted_indices)): + i = sorted_indices[si] + x_min_i, x_max_i, y_min_i, y_max_i = aabbs[i] + for sj in range(si + 1, len(sorted_indices)): + j = sorted_indices[sj] + x_min_j, _x_max_j, y_min_j, y_max_j = aabbs[j] + # 由于按 xmin 排序,x_min_j >= x_min_i + if x_min_j > x_max_i: + break # 后续设备 xmin 更大,不可能与 i 在 x 轴交叠 + # x 轴交叠确认,检查 y 轴 + if y_min_i <= y_max_j and y_min_j <= y_max_i: + # 保证输出 (min_idx, max_idx) 方便去重和测试 + pair = (min(i, j), max(i, j)) + candidates.append(pair) + + return candidates + + +def broad_phase_device_pairs( + devices: list[Device], + placements: list[Placement], +) -> list[tuple[str, str]]: + """返回候选碰撞对的 device_id 字符串元组列表。""" + index_pairs = sweep_and_prune_pairs(devices, placements) + return [(placements[i].device_id, placements[j].device_id) for i, j in index_pairs] diff --git a/unilabos/layout_optimizer/constraints.py b/unilabos/layout_optimizer/constraints.py index d23d0f66..f726428a 100644 --- a/unilabos/layout_optimizer/constraints.py +++ b/unilabos/layout_optimizer/constraints.py @@ -9,6 +9,7 @@ from __future__ import annotations import math from typing import TYPE_CHECKING +from .broad_phase import sweep_and_prune_pairs from .models import Constraint, Device, Lab, Placement from .obb import ( nearest_point_on_obb, @@ -21,6 +22,21 @@ from .obb import ( if TYPE_CHECKING: from .interfaces import CollisionChecker, ReachabilityChecker +# 归一化默认权重 — 1cm距离违规 ≈ 5°角度违规 的惩罚量级 +DEFAULT_WEIGHT_DISTANCE: float = 100.0 # 1cm → penalty 1.0 +DEFAULT_WEIGHT_ANGLE: float = 60.0 # 5° → penalty ~1.0 + +# 硬约束graduated模式下的惩罚倍数 +HARD_MULTIPLIER: float = 5.0 + +# 优先级等级对应的权重乘数 +PRIORITY_MULTIPLIERS: dict[str, float] = { + "critical": 5.0, + "high": 2.0, + "normal": 1.0, + "low": 0.5, +} + def evaluate_constraints( devices: list[Device], @@ -29,6 +45,8 @@ def evaluate_constraints( constraints: list[Constraint], collision_checker: CollisionChecker, reachability_checker: ReachabilityChecker | None = None, + *, + graduated: bool = True, ) -> float: """统一评估所有约束,返回总 cost。 @@ -39,9 +57,10 @@ def evaluate_constraints( constraints: 约束规则列表 collision_checker: 碰撞检测实例 reachability_checker: 可达性检测实例(可选) + graduated: True=比例惩罚(DE优化用),False=二值inf(最终pass/fail用) Returns: - 总 cost。硬约束违反返回 inf,否则为软约束 penalty 之和。 + 总 cost。硬约束违反在非graduated模式返回 inf,否则为加权 penalty 之和。 """ device_map = {d.id: d for d in devices} placement_map = {p.device_id: p for p in placements} @@ -50,7 +69,8 @@ def evaluate_constraints( for c in constraints: cost = _evaluate_single( - c, device_map, placement_map, lab, collision_checker, reachability_checker + c, device_map, placement_map, lab, collision_checker, reachability_checker, + graduated=graduated, ) if math.isinf(cost): return math.inf @@ -66,8 +86,8 @@ def evaluate_default_hard_constraints( collision_checker: CollisionChecker, *, graduated: bool = True, - collision_weight: float = 1000.0, - boundary_weight: float = 1000.0, + collision_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER, # 500 + boundary_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER, # 500 ) -> float: """评估默认硬约束(碰撞 + 边界),无需显式声明约束列表。 @@ -86,18 +106,17 @@ def evaluate_default_hard_constraints( device_map = {d.id: d for d in devices} cost = 0.0 - # Graduated collision penalty: sum of penetration depths - n = len(placements) - for i in range(n): - for j in range(i + 1, n): - di, dj = device_map[placements[i].device_id], device_map[placements[j].device_id] - ci = obb_corners(placements[i].x, placements[i].y, - di.bbox[0], di.bbox[1], placements[i].theta) - cj = obb_corners(placements[j].x, placements[j].y, - dj.bbox[0], dj.bbox[1], placements[j].theta) - depth = obb_penetration_depth(ci, cj) - if depth > 0: - cost += collision_weight * depth + # Graduated collision penalty: 2 轴 sweep-and-prune 宽相 + OBB SAT 精确检测 + candidate_pairs = sweep_and_prune_pairs(devices, placements) + for i, j in candidate_pairs: + di, dj = device_map[placements[i].device_id], device_map[placements[j].device_id] + ci = obb_corners(placements[i].x, placements[i].y, + di.bbox[0], di.bbox[1], placements[i].theta) + cj = obb_corners(placements[j].x, placements[j].y, + dj.bbox[0], dj.bbox[1], placements[j].theta) + depth = obb_penetration_depth(ci, cj) + if depth > 0: + cost += collision_weight * depth # Graduated boundary penalty: sum of overshoot distances (rotation-aware) for p in placements: @@ -142,17 +161,31 @@ def _evaluate_single( lab: Lab, collision_checker: CollisionChecker, reachability_checker: ReachabilityChecker | None, + *, + graduated: bool = True, ) -> float: - """评估单条约束规则。""" + """评估单条约束规则。 + + graduated=True 时硬约束返回比例惩罚(DE用), + graduated=False 时硬约束返回 inf(最终 pass/fail)。 + """ rule = constraint.rule_name params = constraint.params is_hard = constraint.type == "hard" + # 根据优先级等级计算有效权重 + effective_weight = constraint.weight + if constraint.priority and constraint.priority in PRIORITY_MULTIPLIERS: + effective_weight *= PRIORITY_MULTIPLIERS[constraint.priority] + if rule == "no_collision": checker_placements = _to_checker_format_from_maps(device_map, placement_map) collisions = collision_checker.check(checker_placements) if collisions: - return math.inf if is_hard else constraint.weight * len(collisions) + if is_hard and not graduated: + return math.inf + w = effective_weight * (HARD_MULTIPLIER if is_hard else 1.0) + return w * len(collisions) return 0.0 if rule == "within_bounds": @@ -162,7 +195,10 @@ def _evaluate_single( checker_placements, lab.width, lab.depth ) if oob: - return math.inf if is_hard else constraint.weight * len(oob) + if is_hard and not graduated: + return math.inf + w = effective_weight * (HARD_MULTIPLIER if is_hard else 1.0) + return w * len(oob) return 0.0 if rule == "distance_less_than": @@ -177,7 +213,10 @@ def _evaluate_single( else: dist = _device_distance_center(pa, pb) or 0.0 if dist > max_dist: - return math.inf if is_hard else constraint.weight * (dist - max_dist) + if is_hard and not graduated: + return math.inf + w = effective_weight * (HARD_MULTIPLIER if is_hard else 1.0) + return w * (dist - max_dist) return 0.0 if rule == "distance_greater_than": @@ -192,7 +231,10 @@ def _evaluate_single( else: dist = _device_distance_center(pa, pb) or 0.0 if dist < min_dist: - return math.inf if is_hard else constraint.weight * (min_dist - dist) + if is_hard and not graduated: + return math.inf + w = effective_weight * (HARD_MULTIPLIER if is_hard else 1.0) + return w * (min_dist - dist) return 0.0 if rule == "minimize_distance": @@ -205,7 +247,7 @@ def _evaluate_single( dist = _device_distance_obb(da, pa, db, pb) else: dist = _device_distance_center(pa, pb) or 0.0 - return constraint.weight * dist + return effective_weight * dist if rule == "maximize_distance": a_id, b_id = params["device_a"], params["device_b"] @@ -218,11 +260,12 @@ def _evaluate_single( else: dist = _device_distance_center(pa, pb) or 0.0 max_possible = math.sqrt(lab.width**2 + lab.depth**2) - return constraint.weight * (max_possible - dist) + return effective_weight * (max_possible - dist) if rule == "min_spacing": min_gap = params.get("min_gap", 0.0) all_placements = list(placement_map.values()) + total_penalty = 0.0 for i in range(len(all_placements)): for j in range(i + 1, len(all_placements)): pi, pj = all_placements[i], all_placements[j] @@ -233,9 +276,12 @@ def _evaluate_single( else: dist = _device_distance_center(pi, pj) or 0.0 if dist < min_gap: - if is_hard: - return math.inf - return constraint.weight * (min_gap - dist) + total_penalty += (min_gap - dist) + if total_penalty > 0: + if is_hard and not graduated: + return math.inf + w = effective_weight * (HARD_MULTIPLIER if is_hard else 1.0) + return w * total_penalty return 0.0 if rule == "reachability": @@ -268,18 +314,19 @@ def _evaluate_single( target_point = {"x": target_p.x, "y": target_p.y, "z": 0.0} target_point["_obb_dist"] = dist if not reachability_checker.is_reachable(arm_id, arm_pose, target_point): - if is_hard: + if is_hard and not graduated: return math.inf # Graduated: penalty proportional to overshoot max_reach = reachability_checker.arm_reach.get(arm_id, 2.0) overshoot = max(0.0, dist - max_reach) - return constraint.weight * overshoot * 10.0 + w = effective_weight * (HARD_MULTIPLIER if is_hard else 1.0) + return w * overshoot * 10.0 # Line-of-sight penalty: penalize if any other device OBB blocks # the path from opening to arm los_cost = _line_of_sight_penalty( arm_id, arm_p, target_device_id, target_p, - device_map, placement_map, constraint.weight, + device_map, placement_map, effective_weight, ) return los_cost @@ -288,8 +335,10 @@ def _evaluate_single( (1 - math.cos(4 * p.theta)) / 2 for p in placement_map.values() ) if is_hard: - return math.inf if alignment_cost > 1e-6 else 0.0 - return constraint.weight * alignment_cost + if not graduated: + return math.inf if alignment_cost > 1e-6 else 0.0 + return HARD_MULTIPLIER * effective_weight * alignment_cost + return effective_weight * alignment_cost if rule == "prefer_seeder_orientation": target_thetas = params.get("target_thetas", {}) @@ -301,7 +350,7 @@ def _evaluate_single( # Circular distance: (1 - cos(diff)) / 2 gives 0..1 range diff = p.theta - target cost += (1 - math.cos(diff)) / 2 - return constraint.weight * cost + return effective_weight * cost if rule == "prefer_orientation_mode": mode = params.get("mode", "outward") @@ -319,7 +368,7 @@ def _evaluate_single( continue diff = p.theta - target cost += (1 - math.cos(diff)) / 2 - return constraint.weight * cost + return effective_weight * cost # 未知约束类型,忽略 return 0.0 diff --git a/unilabos/layout_optimizer/intent_interpreter.py b/unilabos/layout_optimizer/intent_interpreter.py index f009d8ae..0f6e2a50 100644 --- a/unilabos/layout_optimizer/intent_interpreter.py +++ b/unilabos/layout_optimizer/intent_interpreter.py @@ -41,6 +41,7 @@ def _handle_reachable_by(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="reachability", params={"arm_id": arm, "target_device_id": target}, + priority="critical", ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -64,6 +65,8 @@ def _handle_close_together(intent: Intent, result: InterpretResult) -> None: return weight = _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT) + # 映射 intent priority 到 constraint priority 等级 + constraint_priority = "high" if priority == "high" else "normal" generated: list[dict] = [] for dev_a, dev_b in itertools.combinations(devices, 2): c = Constraint( @@ -71,6 +74,7 @@ def _handle_close_together(intent: Intent, result: InterpretResult) -> None: rule_name="minimize_distance", params={"device_a": dev_a, "device_b": dev_b}, weight=weight, + priority=constraint_priority, ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -94,6 +98,8 @@ def _handle_far_apart(intent: Intent, result: InterpretResult) -> None: return weight = _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT) + # 映射 intent priority 到 constraint priority 等级 + constraint_priority = "high" if priority == "high" else "normal" generated: list[dict] = [] for dev_a, dev_b in itertools.combinations(devices, 2): c = Constraint( @@ -101,6 +107,7 @@ def _handle_far_apart(intent: Intent, result: InterpretResult) -> None: rule_name="maximize_distance", params={"device_a": dev_a, "device_b": dev_b}, weight=weight, + priority=constraint_priority, ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) @@ -131,6 +138,7 @@ def _handle_max_distance(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="distance_less_than", params={"device_a": device_a, "device_b": device_b, "distance": distance}, + priority="normal", ) result.constraints.append(c) @@ -160,6 +168,7 @@ def _handle_min_distance(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="distance_greater_than", params={"device_a": device_a, "device_b": device_b, "distance": distance}, + priority="normal", ) result.constraints.append(c) @@ -180,6 +189,7 @@ def _handle_min_spacing(intent: Intent, result: InterpretResult) -> None: type="hard", rule_name="min_spacing", params={"min_gap": min_gap}, + priority="high", ) result.constraints.append(c) @@ -198,6 +208,7 @@ def _handle_face_outward(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="prefer_orientation_mode", params={"mode": "outward"}, + priority="low", ) result.constraints.append(c) @@ -216,6 +227,7 @@ def _handle_face_inward(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="prefer_orientation_mode", params={"mode": "inward"}, + priority="low", ) result.constraints.append(c) @@ -234,6 +246,7 @@ def _handle_align_cardinal(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="prefer_aligned", params={}, + priority="low", ) result.constraints.append(c) @@ -246,6 +259,39 @@ def _handle_align_cardinal(intent: Intent, result: InterpretResult) -> None: }) +def _handle_keep_adjacent(intent: Intent, result: InterpretResult) -> None: + """keep_adjacent:两个设备保持相邻(同 close_together 逻辑,支持 priority 映射)。""" + devices: list[str] = intent.params.get("devices", []) + priority: str = intent.params.get("priority", "medium") + + if len(devices) < 2: + result.errors.append(f"keep_adjacent: 参数 'devices' 至少需要 2 个设备,当前 {len(devices)} 个") + return + + weight = _PRIORITY_WEIGHTS.get(priority, _DEFAULT_WEIGHT) + # 映射 intent priority 到 constraint priority 等级 + constraint_priority = "high" if priority == "high" else "normal" + generated: list[dict] = [] + for dev_a, dev_b in itertools.combinations(devices, 2): + c = Constraint( + type="soft", + rule_name="minimize_distance", + params={"device_a": dev_a, "device_b": dev_b}, + weight=weight, + priority=constraint_priority, + ) + result.constraints.append(c) + generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) + + result.translations.append({ + "source_intent": intent.intent, + "source_description": intent.description, + "source_params": intent.params, + "generated_constraints": generated, + "explanation": f"设备组 {devices} 应保持相邻(优先级: {priority})", + }) + + def _handle_workflow_hint(intent: Intent, result: InterpretResult) -> None: """workflow_hint:工作流顺序暗示,相邻步骤设备靠近。""" workflow: str = intent.params.get("workflow", "") @@ -263,6 +309,7 @@ def _handle_workflow_hint(intent: Intent, result: InterpretResult) -> None: type="soft", rule_name="minimize_distance", params={"device_a": dev_a, "device_b": dev_b}, + priority="normal", ) result.constraints.append(c) generated.append({"type": c.type, "rule_name": c.rule_name, "params": c.params, "weight": c.weight}) diff --git a/unilabos/layout_optimizer/mock_checkers.py b/unilabos/layout_optimizer/mock_checkers.py index 2d5ecdb9..d677a40c 100644 --- a/unilabos/layout_optimizer/mock_checkers.py +++ b/unilabos/layout_optimizer/mock_checkers.py @@ -78,6 +78,7 @@ class MockReachabilityChecker: "elite_cs66": 0.914, "elite_cs612": 1.304, "elite_cs620": 1.800, + "arm_slider": 1.07, # 线性导轨臂:body 2.14m × 0.35m,reach ≈ half length } # 未知型号回退臂展:realistic default for lab-scale arms diff --git a/unilabos/layout_optimizer/models.py b/unilabos/layout_optimizer/models.py index 7f3e8f99..394c4ef7 100644 --- a/unilabos/layout_optimizer/models.py +++ b/unilabos/layout_optimizer/models.py @@ -84,6 +84,8 @@ class Constraint: params: dict = field(default_factory=dict) # 仅 soft 约束使用 weight: float = 1.0 + # 优先级等级,影响有效权重的乘数 + priority: str | None = None # "critical" | "high" | "normal" | "low" @dataclass diff --git a/unilabos/layout_optimizer/optimizer.py b/unilabos/layout_optimizer/optimizer.py index c86ee007..84ea0aa7 100644 --- a/unilabos/layout_optimizer/optimizer.py +++ b/unilabos/layout_optimizer/optimizer.py @@ -1,7 +1,7 @@ """差分进化布局优化器。 编码:N 个设备 → 3N 维向量 [x0, y0, θ0, x1, y1, θ1, ...] -使用 scipy.optimize.differential_evolution 进行全局优化。 +使用自定义差分进化循环(per-device crossover + θ wrapping)进行全局优化。 初始布局(Pencil/回退)注入为种群种子个体加速收敛。 """ @@ -9,19 +9,187 @@ from __future__ import annotations import logging import math -from typing import Any +from typing import Any, Callable import numpy as np -from scipy.optimize import differential_evolution from .constraints import evaluate_constraints, evaluate_default_hard_constraints from .mock_checkers import MockCollisionChecker, MockReachabilityChecker from .models import Constraint, Device, Lab, Placement from .pencil_integration import generate_initial_layout +from .seeders import resolve_seeder_params, seed_layout logger = logging.getLogger(__name__) +def _run_de( + cost_fn: Callable[[np.ndarray], float], + bounds: np.ndarray, + init_pop: np.ndarray, + maxiter: int, + tol: float, + atol: float, + mutation: tuple[float, float], + recombination: float, + seed: int | None, + n_devices: int, + strategy: str = "currenttobest1bin", +) -> tuple[np.ndarray, float, int]: + """自定义差分进化循环。 + + 特性: + - 支持 currenttobest1bin / best1bin 两种策略 + - Per-device crossover:以设备 (x, y, θ) 三元组为原子单元进行交叉 + - θ wrapping:交叉后对角度取模 [0, 2π) + - Early stopping:最近 20 代改善 < 0.1% 时提前终止 + - scipy 风格收敛判断:std(costs) <= atol + tol * |best_cost| + + Args: + cost_fn: 目标函数 f(x) → float + bounds: 边界数组 shape=(ndim, 2),每行 [low, high] + init_pop: 初始种群 shape=(pop_size, ndim) + maxiter: 最大迭代代数 + tol: 相对收敛容差 + atol: 绝对收敛容差 + mutation: 变异因子范围 (F_min, F_max) + recombination: 交叉概率 CR + seed: 随机种子 + n_devices: 设备数量(用于 per-device crossover) + strategy: 变异策略,"currenttobest1bin" 或 "best1bin" + + Returns: + (best_vector, best_cost, n_generations) + """ + rng = np.random.default_rng(seed) + pop_size, ndim = init_pop.shape + lower = bounds[:, 0] + upper = bounds[:, 1] + f_min, f_max = mutation + + # 评估初始种群适应度 + costs = np.array([cost_fn(ind) for ind in init_pop]) + best_idx = int(np.argmin(costs)) + best_cost = costs[best_idx] + best_vector = init_pop[best_idx].copy() + + # Early stopping 跟踪 + patience = 20 + best_cost_history: list[float] = [best_cost] + + for gen in range(1, maxiter + 1): + for i in range(pop_size): + # 选择变异因子 F(每个个体独立采样) + f_val = rng.uniform(f_min, f_max) + + # 选择两个不同于 i 和 best_idx 的个体索引 + candidates = list(range(pop_size)) + candidates.remove(i) + chosen = rng.choice(candidates, size=2, replace=False) + r1, r2 = int(chosen[0]), int(chosen[1]) + + # 变异向量 + if strategy == "best1bin": + # Turbo 模式:mutant = best + F*(r1 - r2) + mutant = best_vector + f_val * (init_pop[r1] - init_pop[r2]) + else: + # 默认 currenttobest1bin:mutant = target + F*(best - target) + F*(r1 - r2) + mutant = ( + init_pop[i] + + f_val * (best_vector - init_pop[i]) + + f_val * (init_pop[r1] - init_pop[r2]) + ) + + # Per-device crossover:以 (x, y, θ) 三元组为原子单元 + trial = init_pop[i].copy() + j_rand = rng.integers(0, n_devices) # 保证至少一个设备来自 mutant + for d in range(n_devices): + if rng.random() < recombination or d == j_rand: + trial[3 * d: 3 * d + 3] = mutant[3 * d: 3 * d + 3] + + # θ wrapping:角度取模 [0, 2π) + for d in range(n_devices): + trial[3 * d + 2] %= 2 * math.pi + + # 钳位到边界内 + trial = np.clip(trial, lower, upper) + + # 贪心选择:trial 不比当前差则替换 + trial_cost = cost_fn(trial) + if trial_cost <= costs[i]: + init_pop[i] = trial + costs[i] = trial_cost + if trial_cost < best_cost: + best_cost = trial_cost + best_vector = trial.copy() + + # 更新 best_idx(种群可能整体更新) + best_idx = int(np.argmin(costs)) + + # Early stopping:最近 patience 代改善 < 0.1% + best_cost_history.append(best_cost) + if len(best_cost_history) >= patience: + old_cost = best_cost_history[-patience] + if old_cost > 0: + improvement = (old_cost - best_cost) / old_cost + else: + improvement = 0.0 + if improvement < 0.001: + logger.info( + "Early stop: cost 在 %d 代内稳定在 %.4f(改善 < 0.1%%)", + patience, best_cost, + ) + return best_vector, best_cost, gen + + # scipy 风格收敛判断 + if np.std(costs) <= atol + tol * abs(best_cost): + logger.info( + "收敛终止:std(costs)=%.6f <= atol+tol*|best|=%.6f,第 %d 代", + np.std(costs), atol + tol * abs(best_cost), gen, + ) + return best_vector, best_cost, gen + + return best_vector, best_cost, maxiter + + +def _generate_seeds( + devices: list[Device], + lab: Lab, + rng: np.random.Generator, + workflow_edges: list[list[str]] | None = None, + n_variants: int = 3, + sigma_pos_frac: float = 0.05, + sigma_theta: float = math.pi / 6, +) -> list[np.ndarray]: + """从多个 seeder preset 生成多样性种子个体 + 变异版本。""" + seeds: list[np.ndarray] = [] + presets = ["compact_outward", "spread_inward"] + if workflow_edges: + presets.append("workflow_cluster") + + for preset_name in presets: + try: + params = resolve_seeder_params(preset_name) + except ValueError: + continue + if params is None: + continue + base_placements = seed_layout(devices, lab, params, workflow_edges) + base_vec = _placements_to_vector(base_placements, devices) + seeds.append(base_vec) + + # 变异版本:对 (x,y) 加高斯噪声 σ=5% lab 尺寸,θ 加 σ=π/6 + for _ in range(n_variants): + variant = base_vec.copy() + for d in range(len(devices)): + variant[3 * d] += rng.normal(0, sigma_pos_frac * lab.width) + variant[3 * d + 1] += rng.normal(0, sigma_pos_frac * lab.depth) + variant[3 * d + 2] += rng.normal(0, sigma_theta) + variant[3 * d + 2] %= 2 * math.pi + seeds.append(variant) + + return seeds + + def optimize( devices: list[Device], lab: Lab, @@ -33,6 +201,8 @@ def optimize( popsize: int = 15, tol: float = 1e-6, seed: int | None = None, + strategy: str = "currenttobest1bin", + workflow_edges: list[list[str]] | None = None, ) -> list[Placement]: """运行差分进化优化,返回最优布局。 @@ -47,6 +217,7 @@ def optimize( popsize: 种群大小倍数 tol: 收敛容差 seed: 随机种子(用于可复现性) + strategy: DE 变异策略("currenttobest1bin" 或 "best1bin") Returns: 最优布局 Placement 列表 @@ -105,57 +276,49 @@ def optimize( return hard_cost - # 构建初始种群:种子个体 + 随机个体 + # 构建初始种群:种子个体 + 多样性种子 + 随机个体 rng = np.random.default_rng(seed) pop_count = popsize * 3 * n # scipy 默认 popsize * dim init_pop = rng.uniform( bounds_array[:, 0], bounds_array[:, 1], size=(pop_count, 3 * n) ) - init_pop[0] = seed_vector # 注入种子 + init_pop[0] = seed_vector # 注入原始种子 + + # 多样性种子注入(多 preset + 变异版本) + extra_seeds = _generate_seeds(devices, lab, rng, workflow_edges) + for i, s in enumerate(extra_seeds): + idx = i + 1 # 原始种子占 [0] + if idx < pop_count: + init_pop[idx] = np.clip(s, bounds_array[:, 0], bounds_array[:, 1]) logger.info( - "Starting DE optimization: %d devices, %d-dim, popsize=%d, maxiter=%d", - n, 3 * n, pop_count, maxiter, + "Starting DE optimization: %d devices, %d-dim, popsize=%d, maxiter=%d, strategy=%s", + n, 3 * n, pop_count, maxiter, strategy, ) - # Early stopping: stop when cost hasn't improved by >0.1% for 20 generations - _best_costs: list[float] = [] - _patience = 20 - - def _early_stop_callback(xk, convergence=0): - cost = cost_function(xk) - _best_costs.append(cost) - if len(_best_costs) >= _patience: - recent = _best_costs[-_patience:] - if recent[0] > 0: - improvement = (recent[0] - recent[-1]) / recent[0] - else: - improvement = 0.0 - if improvement < 0.001: # < 0.1% improvement over last 20 gens - logger.info("Early stop: cost stable at %.4f for %d generations", cost, _patience) - return True - return False - - result = differential_evolution( - cost_function, - bounds=list(bounds), - init=init_pop, + best_vector, best_cost, n_generations = _run_de( + cost_fn=cost_function, + bounds=bounds_array, + init_pop=init_pop, maxiter=maxiter, tol=tol, atol=1e-3, mutation=(0.5, 1.0), recombination=0.7, seed=seed, - disp=False, - callback=_early_stop_callback, + n_devices=n, + strategy=strategy, ) + # 评估次数估算:每代 pop_count 次(初始 + 每代 trial) + n_evaluations = pop_count + n_generations * pop_count + logger.info( "DE optimization complete: success=%s, cost=%.4f, iterations=%d, evaluations=%d", - result.success, result.fun, result.nit, result.nfev, + best_cost < 1e17, best_cost, n_generations, n_evaluations, ) - return _vector_to_placements(result.x, devices) + return _vector_to_placements(best_vector, devices) def snap_theta(placements: list[Placement], threshold_deg: float = 15.0) -> list[Placement]: @@ -179,6 +342,38 @@ def snap_theta(placements: list[Placement], threshold_deg: float = 15.0) -> list return result +def snap_theta_safe( + placements: list[Placement], + devices: list[Device], + lab: Lab, + collision_checker: Any, + threshold_deg: float = 15.0, +) -> list[Placement]: + """Snap theta 到基数方向,但碰撞时回退到原始角度。 + + 逐设备检查:snap 后如果产生碰撞或越界,则该设备保留原始 theta。 + """ + snapped = snap_theta(placements, threshold_deg) + + result = list(snapped) + for idx, (orig, snap) in enumerate(zip(placements, snapped)): + if abs(orig.theta - snap.theta) < 1e-9: + continue # 未 snap,跳过 + # 检查 snap 版本是否导致新碰撞 + test_placements = result.copy() + test_placements[idx] = snap + cost = evaluate_default_hard_constraints( + devices, test_placements, lab, collision_checker, graduated=False, + ) + if math.isinf(cost): + result[idx] = orig # 回退到未 snap 的角度 + logger.info( + "snap_theta_safe: 设备 %s snap θ=%.2f→%.2f 导致碰撞,已回退", + snap.device_id, orig.theta, snap.theta, + ) + return result + + def _placements_to_vector( placements: list[Placement], devices: list[Device] ) -> np.ndarray: @@ -208,7 +403,7 @@ def _vector_to_placements( device_id=dev.id, x=float(x[3 * i]), y=float(x[3 * i + 1]), - theta=float(x[3 * i + 2]), + theta=float(x[3 * i + 2] % (2 * math.pi)), ) ) return placements diff --git a/unilabos/layout_optimizer/server.py b/unilabos/layout_optimizer/server.py index caf9cb61..f24f68a6 100644 --- a/unilabos/layout_optimizer/server.py +++ b/unilabos/layout_optimizer/server.py @@ -23,6 +23,7 @@ from fastapi.responses import FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel +from .constraints import DEFAULT_WEIGHT_ANGLE from .device_catalog import ( create_devices_from_list, load_devices_from_assets, @@ -395,9 +396,9 @@ async def run_optimize(request: OptimizeRequest): """接收设备列表+约束,返回最优布局方案。""" from fastapi import HTTPException - from .constraints import evaluate_default_hard_constraints + from .constraints import evaluate_default_hard_constraints, evaluate_constraints from .mock_checkers import MockCollisionChecker - from .optimizer import optimize, snap_theta + from .optimizer import optimize, snap_theta, snap_theta_safe from .seeders import resolve_seeder_params, seed_layout logger.info( @@ -456,10 +457,10 @@ async def run_optimize(request: OptimizeRequest): type="soft", rule_name="prefer_orientation_mode", params={"mode": orientation_mode}, - weight=request.seeder_overrides.get("orientation_weight", 5.0), + weight=request.seeder_overrides.get("orientation_weight", DEFAULT_WEIGHT_ANGLE), )) # prefer_aligned: penalize non-cardinal angles - align_weight = request.seeder_overrides.get("align_weight", 2.0) + align_weight = request.seeder_overrides.get("align_weight", DEFAULT_WEIGHT_ANGLE) if align_weight > 0: constraints.append(Constraint( type="soft", @@ -479,18 +480,27 @@ async def run_optimize(request: OptimizeRequest): seed_placements=seed_placements, maxiter=request.maxiter, seed=request.seed, + workflow_edges=request.workflow_edges or None, ) de_ran = True else: result_placements = seed_placements - # 5. θ snap post-processing - result_placements = snap_theta(result_placements) + # 5. θ snap post-processing(碰撞安全:snap 后验证,失败则回退) + result_placements = snap_theta_safe(result_placements, devices, lab, checker) # 6. Evaluate final cost (binary mode for pass/fail reporting) final_cost = evaluate_default_hard_constraints( devices, result_placements, lab, checker, graduated=False, ) + # 也检查用户硬约束(binary 模式) + if constraints and not math.isinf(final_cost): + user_hard_cost = evaluate_constraints( + devices, result_placements, lab, constraints, checker, + graduated=False, + ) + if math.isinf(user_hard_cost): + final_cost = math.inf return OptimizeResponse( placements=[ diff --git a/unilabos/layout_optimizer/tests/test_broad_phase.py b/unilabos/layout_optimizer/tests/test_broad_phase.py new file mode 100644 index 00000000..444a4e40 --- /dev/null +++ b/unilabos/layout_optimizer/tests/test_broad_phase.py @@ -0,0 +1,241 @@ +"""Tests for broad_phase.py — 2 轴 sweep-and-prune 宽相碰撞检测。""" + +from __future__ import annotations + +import math +import random + +import pytest + +from ..broad_phase import broad_phase_device_pairs, sweep_and_prune_pairs +from ..models import Device, Placement + + +# --------------------------------------------------------------------------- +# 测试用辅助函数 +# --------------------------------------------------------------------------- + +def _make_device(device_id: str, w: float = 0.6, d: float = 0.4) -> Device: + """创建简单测试设备。""" + return Device(id=device_id, name=device_id, bbox=(w, d)) + + +def _make_placement( + device_id: str, x: float, y: float, theta: float = 0.0 +) -> Placement: + """创建简单测试放置。""" + return Placement(device_id=device_id, x=x, y=y, theta=theta) + + +# --------------------------------------------------------------------------- +# 测试类 +# --------------------------------------------------------------------------- + + +class TestNoOverlap: + """两台设备距离足够远,宽相不返回候选对。""" + + def test_no_overlap_returns_empty(self): + """水平方向间距远大于 AABB 尺寸 → 0 候选对。""" + devices = [_make_device("A", 1.0, 1.0), _make_device("B", 1.0, 1.0)] + placements = [ + _make_placement("A", 0.0, 0.0), + _make_placement("B", 10.0, 0.0), + ] + pairs = sweep_and_prune_pairs(devices, placements) + assert pairs == [] + + +class TestOverlapping: + """两台设备 AABB 明显重叠。""" + + def test_overlapping_devices_returned(self): + """两台 1×1 设备中心距 0.5m → 1 候选对。""" + devices = [_make_device("A", 1.0, 1.0), _make_device("B", 1.0, 1.0)] + placements = [ + _make_placement("A", 0.0, 0.0), + _make_placement("B", 0.5, 0.0), + ] + pairs = sweep_and_prune_pairs(devices, placements) + assert len(pairs) == 1 + assert pairs[0] == (0, 1) + + +class TestXOverlapYNoOverlap: + """x 轴投影交叠但 y 轴不交叠,应被 y 轴检查剪枝。""" + + def test_x_overlap_y_no_overlap(self): + """水平接近但垂直方向偏移足够大 → 0 候选对。""" + devices = [_make_device("A", 2.0, 1.0), _make_device("B", 2.0, 1.0)] + placements = [ + _make_placement("A", 0.0, 0.0), + _make_placement("B", 0.5, 5.0), # x 轴交叠但 y 轴相距很远 + ] + pairs = sweep_and_prune_pairs(devices, placements) + assert pairs == [] + + +class TestTouchingDevices: + """AABB 恰好接触(边缘距离 = 0)应作为候选对返回。""" + + def test_touching_devices_included(self): + """两个 1×1 设备中心距恰好为 1.0(半宽 0.5 + 0.5 = 1.0), + AABB 边界接触 → 应包含在候选对中(<= 判定)。""" + devices = [_make_device("A", 1.0, 1.0), _make_device("B", 1.0, 1.0)] + placements = [ + _make_placement("A", 0.0, 0.0), + _make_placement("B", 1.0, 0.0), # xmax_A = 0.5, xmin_B = 0.5 → 接触 + ] + pairs = sweep_and_prune_pairs(devices, placements) + # 接触算作潜在碰撞,安全起见需保留 + assert len(pairs) == 1 + assert pairs[0] == (0, 1) + + +class TestMultipleDevices: + """4 台设备验证精确的候选对列表。""" + + def test_multiple_devices_correct_pairs(self): + """排列 4 台设备,只有特定配对 AABB 交叠。 + + 布局(1×1 设备): + A(0,0) B(0.8,0) — A-B 交叠(中心距 0.8 < 1.0) + C(0,5) — 远离 A、B + D(0.9,5) — C-D 交叠(中心距 0.9 < 1.0) + + 期望候选对: (A,B) 和 (C,D)。 + """ + devices = [ + _make_device("A", 1.0, 1.0), + _make_device("B", 1.0, 1.0), + _make_device("C", 1.0, 1.0), + _make_device("D", 1.0, 1.0), + ] + placements = [ + _make_placement("A", 0.0, 0.0), + _make_placement("B", 0.8, 0.0), + _make_placement("C", 0.0, 5.0), + _make_placement("D", 0.9, 5.0), + ] + pairs = sweep_and_prune_pairs(devices, placements) + pair_set = set(pairs) + assert (0, 1) in pair_set # A-B + assert (2, 3) in pair_set # C-D + assert len(pair_set) == 2 + + +class TestRotatedDeviceAabb: + """旋转设备导致 AABB 变大,命中候选对。""" + + def test_rotated_device_aabb(self): + """一台窄长设备 (2.0×0.2): + - 未旋转时 AABB 半宽 = 1.0,两设备中心距 2.5 → 不交叠 + - 旋转 90° 后 AABB 半宽 = 0.1,半深 = 1.0 → 仍不交叠 + - 旋转 45° 后 AABB 半宽 ≈ (2*cos45 + 0.2*sin45)/2 ≈ 0.778 + 但另一台放在 x=1.6,半宽 = 1.0 + 所以 xmax_A = 0 + 0.778 = 0.778 < 1.6 - 1.0 = 0.6 → 不够 + + 更好的方案:用中心距 1.5,未旋转时不交叠,旋转后交叠。 + 未旋转: half_w_A = 0.3 (bbox 0.6x2.0), half_w_B = 0.3 + A: xmax = 0 + 0.3 = 0.3, B: xmin = 1.5 - 0.3 = 1.2 → 不交叠 + 旋转 90°: A 的 bbox (0.6, 2.0) → half_w = (0.6*0 + 2.0*1)/2 = 1.0 + A: xmax = 0 + 1.0 = 1.0, B: xmin = 1.5 - 0.3 = 1.2 → 仍不交叠 + + 用 bbox (0.4, 2.0),间距 1.2: + 未旋转: half_w = 0.2, xmax_A = 0.2, xmin_B = 1.2 - 0.2 = 1.0 → 不交叠 + 旋转 45°: half_w = (0.4*cos45 + 2.0*sin45)/2 = (0.283+1.414)/2 = 0.849 + xmax_A = 0.849, xmin_B = 1.2 - 0.2 = 1.0 → 不交叠 + + 间距 0.8: + 未旋转: xmax_A = 0.2, xmin_B = 0.8 - 0.2 = 0.6 → 不交叠 ✓ + 旋转 45°: xmax_A = 0.849, xmin_B = 0.6 → 交叠 ✓ (0.849 > 0.6) + y 轴: half_d_A_rot = (0.4*sin45 + 2.0*cos45)/2 = 0.849, half_d_B = 1.0 + ymax_A = 0.849, ymin_B = -1.0 → 交叠 ✓ + """ + dev_narrow = _make_device("narrow", 0.4, 2.0) + dev_normal = _make_device("normal", 0.4, 2.0) + # 未旋转:不交叠 + placements_no_rot = [ + _make_placement("narrow", 0.0, 0.0, theta=0.0), + _make_placement("normal", 0.8, 0.0, theta=0.0), + ] + assert sweep_and_prune_pairs([dev_narrow, dev_normal], placements_no_rot) == [] + + # narrow 旋转 45° → AABB 变大 → 交叠 + placements_rot = [ + _make_placement("narrow", 0.0, 0.0, theta=math.pi / 4), + _make_placement("normal", 0.8, 0.0, theta=0.0), + ] + pairs = sweep_and_prune_pairs([dev_narrow, dev_normal], placements_rot) + assert len(pairs) == 1 + + +class TestOriginalIndices: + """验证返回的索引对应 placements 原始顺序而非排序后顺序。""" + + def test_sorted_output_preserves_original_indices(self): + """故意让 placements 按 x 坐标逆序排列, + 验证返回的索引仍是原始顺序。""" + devices = [ + _make_device("A", 1.0, 1.0), + _make_device("B", 1.0, 1.0), + _make_device("C", 1.0, 1.0), + ] + # 逆序排列:C 在最左,A 在最右 + placements = [ + _make_placement("A", 5.0, 0.0), # idx 0, 最右 + _make_placement("B", 4.5, 0.0), # idx 1, 中间(与 A 交叠) + _make_placement("C", 0.0, 0.0), # idx 2, 最左(独立) + ] + pairs = sweep_and_prune_pairs(devices, placements) + # A(idx=0) 和 B(idx=1) AABB 交叠,索引应为 (0, 1) + assert len(pairs) == 1 + assert pairs[0] == (0, 1) + + # 同时验证 broad_phase_device_pairs 返回正确 device_id + id_pairs = broad_phase_device_pairs(devices, placements) + assert id_pairs == [("A", "B")] + + +class TestPairCountReduction: + """大规模随机测试:宽相候选对数应远小于 N*(N-1)/2。""" + + def test_pair_count_reduction(self): + """N=15 台设备随机放置在 10×10 实验室 → 候选对数显著少于全量。""" + random.seed(42) + n = 15 + devices = [_make_device(f"D{i}", 0.5, 0.5) for i in range(n)] + placements = [ + _make_placement(f"D{i}", random.uniform(0, 10), random.uniform(0, 10)) + for i in range(n) + ] + pairs = sweep_and_prune_pairs(devices, placements) + full_pairs = n * (n - 1) // 2 # = 105 + # 在 10×10 区域放 15 台 0.5×0.5 设备,交叠率应很低 + assert len(pairs) < full_pairs + # 额外断言:候选对数不超过全量的一半(保守判定) + assert len(pairs) < full_pairs * 0.5 + + +class TestEdgeCases: + """边界情况。""" + + def test_empty_input(self): + """空列表 → 空结果。""" + assert sweep_and_prune_pairs([], []) == [] + + def test_single_device(self): + """单台设备 → 无候选对。""" + devices = [_make_device("A")] + placements = [_make_placement("A", 0.0, 0.0)] + assert sweep_and_prune_pairs(devices, placements) == [] + + def test_identical_positions(self): + """两台设备完全重叠 → 1 候选对。""" + devices = [_make_device("A", 1.0, 1.0), _make_device("B", 1.0, 1.0)] + placements = [ + _make_placement("A", 0.0, 0.0), + _make_placement("B", 0.0, 0.0), + ] + pairs = sweep_and_prune_pairs(devices, placements) + assert len(pairs) == 1 diff --git a/unilabos/layout_optimizer/tests/test_bugfixes_v2.py b/unilabos/layout_optimizer/tests/test_bugfixes_v2.py index 021ebd6f..8e63fa31 100644 --- a/unilabos/layout_optimizer/tests/test_bugfixes_v2.py +++ b/unilabos/layout_optimizer/tests/test_bugfixes_v2.py @@ -90,9 +90,16 @@ class TestDuplicateDeviceIDs: lab = Lab(width=5, depth=5) constraints = [Constraint(type="hard", rule_name="min_spacing", params={"min_gap": 0.05})] + # graduated=True (default): 返回有限惩罚 cost = evaluate_constraints(devices, stacked, lab, constraints, MockCollisionChecker()) - assert math.isinf(cost) + assert cost > 0 + assert not math.isinf(cost) + # graduated=False: binary inf + cost_binary = evaluate_constraints(devices, stacked, lab, constraints, + MockCollisionChecker(), + graduated=False) + assert math.isinf(cost_binary) def test_create_devices_uses_uuid(self): """create_devices_from_list should use uuid as Device.id.""" diff --git a/unilabos/layout_optimizer/tests/test_constraints.py b/unilabos/layout_optimizer/tests/test_constraints.py index 66ba04fd..ac8b3a68 100644 --- a/unilabos/layout_optimizer/tests/test_constraints.py +++ b/unilabos/layout_optimizer/tests/test_constraints.py @@ -123,7 +123,7 @@ class TestUserConstraints: assert cost == 0.0 def test_distance_less_than_violated_hard(self): - """硬距离约束违反返回 inf。""" + """硬距离约束违反:graduated模式返回有限惩罚,binary模式返回inf。""" devices = _make_devices() placements = [ Placement("a", 1.0, 1.0, 0.0), @@ -134,10 +134,18 @@ class TestUserConstraints: params={"device_a": "a", "device_b": "b", "distance": 1.0}) ] checker = MockCollisionChecker() + # graduated=True (default): 有限惩罚 cost = evaluate_constraints( devices, placements, _make_lab(), constraints, checker ) - assert math.isinf(cost) + assert cost > 0 + assert not math.isinf(cost) + # graduated=False: binary inf + cost_binary = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, + graduated=False, + ) + assert math.isinf(cost_binary) def test_minimize_distance_cost(self): """minimize_distance 约束应返回正比于距离的 cost。""" @@ -184,7 +192,7 @@ class TestUserConstraints: assert not math.isinf(cost) # reachable → no hard failure def test_reachability_constraint_violated(self): - """可达性约束:目标超出臂展应返回 inf。""" + """可达性约束:目标超出臂展 — graduated返回有限惩罚,binary返回inf。""" devices = [ Device(id="arm", name="Arm", bbox=(0.2, 0.2), device_type="articulation"), Device(id="target", name="Target", bbox=(0.5, 0.5)), @@ -199,10 +207,18 @@ class TestUserConstraints: ] checker = MockCollisionChecker() reachability = MockReachabilityChecker(arm_reach={"arm": 1.0}) + # graduated=True (default): 有限惩罚 cost = evaluate_constraints( devices, placements, _make_lab(), constraints, checker, reachability ) - assert math.isinf(cost) + assert cost > 0 + assert not math.isinf(cost) + # graduated=False: binary inf + cost_binary = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, reachability, + graduated=False, + ) + assert math.isinf(cost_binary) def test_distance_less_than_uses_edge_to_edge(): @@ -272,3 +288,136 @@ def test_prefer_aligned_sums_over_devices(): cost = evaluate_constraints(devices, placements, lab, [constraint], checker) # 2 devices × 1.0 × weight 2.0 = 4.0 assert cost == pytest.approx(4.0) + + +class TestGraduatedHardConstraints: + """graduated 模式下硬约束返回比例惩罚而非 inf。""" + + def test_hard_reachability_graduated_finite(self): + """graduated=True: 硬可达性返回有限惩罚。""" + devices = [ + Device(id="arm", name="Arm", bbox=(0.2, 0.2), device_type="articulation"), + Device(id="t", name="Target", bbox=(0.5, 0.5)), + ] + placements = [ + Placement("arm", 1.0, 1.0, 0.0), + Placement("t", 4.0, 3.0, 0.0), + ] + constraints = [ + Constraint(type="hard", rule_name="reachability", + params={"arm_id": "arm", "target_device_id": "t"}, weight=1.0) + ] + checker = MockCollisionChecker() + reach = MockReachabilityChecker(arm_reach={"arm": 1.0}) + cost = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, reach, + graduated=True, + ) + assert cost > 0 + assert not math.isinf(cost) + + def test_hard_reachability_binary_inf(self): + """graduated=False: 硬可达性返回 inf。""" + devices = [ + Device(id="arm", name="Arm", bbox=(0.2, 0.2), device_type="articulation"), + Device(id="t", name="Target", bbox=(0.5, 0.5)), + ] + placements = [ + Placement("arm", 1.0, 1.0, 0.0), + Placement("t", 4.0, 3.0, 0.0), + ] + constraints = [ + Constraint(type="hard", rule_name="reachability", + params={"arm_id": "arm", "target_device_id": "t"}, weight=1.0) + ] + checker = MockCollisionChecker() + reach = MockReachabilityChecker(arm_reach={"arm": 1.0}) + cost = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, reach, + graduated=False, + ) + assert math.isinf(cost) + + def test_hard_min_spacing_graduated_sums_all_pairs(self): + """graduated模式:min_spacing 对所有违规对求和(不只第一对)。""" + devices = [ + Device(id="a", name="A", bbox=(0.5, 0.5)), + Device(id="b", name="B", bbox=(0.5, 0.5)), + Device(id="c", name="C", bbox=(0.5, 0.5)), + ] + # 三个设备间距都小于 min_gap=1.0 + placements = [ + Placement("a", 1.0, 2.0, 0.0), + Placement("b", 1.3, 2.0, 0.0), # OBB 边缘距 a 约 0.3 + Placement("c", 1.6, 2.0, 0.0), # OBB 边缘距 b 约 0.3, 距 a 约 0.6 + ] + constraints = [ + Constraint(type="hard", rule_name="min_spacing", + params={"min_gap": 1.0}, weight=1.0) + ] + checker = MockCollisionChecker() + cost = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, + graduated=True, + ) + # 应大于 0 且有限(累加多对违规) + assert cost > 0 + assert not math.isinf(cost) + + def test_hard_min_spacing_binary_inf(self): + """graduated=False: min_spacing 违规返回 inf。""" + devices = _make_devices() + placements = [ + Placement("a", 1.0, 2.0, 0.0), + Placement("b", 1.3, 2.0, 0.0), + ] + constraints = [ + Constraint(type="hard", rule_name="min_spacing", + params={"min_gap": 1.0}, weight=1.0) + ] + checker = MockCollisionChecker() + cost = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, + graduated=False, + ) + assert math.isinf(cost) + + def test_hard_distance_less_than_graduated(self): + """graduated模式:distance_less_than 硬约束返回比例惩罚。""" + devices = _make_devices() + placements = [ + Placement("a", 1.0, 2.0, 0.0), + Placement("b", 4.0, 2.0, 0.0), + ] + constraints = [ + Constraint(type="hard", rule_name="distance_less_than", + params={"device_a": "a", "device_b": "b", "distance": 0.5}, + weight=2.0) + ] + checker = MockCollisionChecker() + cost = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, + graduated=True, + ) + # HARD_MULTIPLIER(5) × weight(2) × overshoot > 0 + assert cost > 0 + assert not math.isinf(cost) + + def test_graduated_default_is_true(self): + """不传 graduated 参数时默认使用 graduated 模式。""" + devices = _make_devices() + placements = [ + Placement("a", 1.0, 2.0, 0.0), + Placement("b", 4.0, 2.0, 0.0), + ] + constraints = [ + Constraint(type="hard", rule_name="distance_less_than", + params={"device_a": "a", "device_b": "b", "distance": 0.5}, + weight=1.0) + ] + checker = MockCollisionChecker() + # 不指定 graduated — 默认应为 True → 有限惩罚 + cost = evaluate_constraints( + devices, placements, _make_lab(), constraints, checker, + ) + assert not math.isinf(cost) diff --git a/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py b/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py index 5910d186..53557722 100644 --- a/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py +++ b/unilabos/layout_optimizer/tests/test_e2e_pcr_pipeline.py @@ -185,9 +185,9 @@ class TestStage3VerifyPlacements: def test_no_hard_constraint_violation(self): """Full pipeline with all intents including reachability converges cleanly. - MockReachabilityChecker uses large fallback reach for unknown arms, - so arm_slider reachability constraints are satisfied in mock mode. - When real ROS checkers replace mock, this test validates the same pipeline. + MockReachabilityChecker now includes arm_slider in the default reach table + (1.07m). Binary final evaluation checks all hard constraints including + user-defined reachability. """ interpret_data = client.post("/interpret", json={"intents": LLM_INTENTS}).json() @@ -197,7 +197,7 @@ class TestStage3VerifyPlacements: "constraints": interpret_data["constraints"], "workflow_edges": interpret_data["workflow_edges"], "run_de": True, - "maxiter": 50, + "maxiter": 100, "seed": 42, }) data = optimize_resp.json() diff --git a/unilabos/layout_optimizer/tests/test_optimizer.py b/unilabos/layout_optimizer/tests/test_optimizer.py index 3d430bfa..a7892092 100644 --- a/unilabos/layout_optimizer/tests/test_optimizer.py +++ b/unilabos/layout_optimizer/tests/test_optimizer.py @@ -4,8 +4,9 @@ import math from ..mock_checkers import MockCollisionChecker from ..models import Device, Lab, Placement +import numpy as np import pytest -from ..optimizer import optimize, snap_theta +from ..optimizer import _run_de, optimize, snap_theta def test_optimize_three_devices_no_collision(): @@ -217,3 +218,224 @@ def test_full_pipeline_with_de(): for p in result: assert 0 <= p.x <= lab.width assert 0 <= p.y <= lab.depth + + +# ────────────────────────────────────────────────────────────── +# DE V2 新增测试 +# ────────────────────────────────────────────────────────────── + + +def test_theta_wrapping_convergence(): + """θ 在 0/2π 边界附近应正确收敛,不发散。 + + 构造一个简单的 cost function,最优解 θ≈0(即 2π 附近也可以)。 + 验证自定义 DE 在 θ wrapping 下能收敛。 + """ + n_devices = 2 + + def cost_fn(x: np.ndarray) -> float: + # 最优:两设备分开放置,θ 接近 0 + cost = 0.0 + for d in range(n_devices): + theta = x[3 * d + 2] + # θ penalty: 偏离 0 的圆周距离 + cost += (1 - math.cos(theta)) * 10.0 + # 两设备距离 penalty + dx = x[0] - x[3] + dy = x[1] - x[4] + dist = math.sqrt(dx * dx + dy * dy) + if dist < 1.0: + cost += (1.0 - dist) * 100.0 + return cost + + bounds = np.array([ + [0.5, 4.5], [0.5, 4.5], [0.0, 2 * math.pi], # 设备 0 + [0.5, 4.5], [0.5, 4.5], [0.0, 2 * math.pi], # 设备 1 + ]) + rng = np.random.default_rng(42) + pop_size = 30 + init_pop = rng.uniform(bounds[:, 0], bounds[:, 1], size=(pop_size, 6)) + # 故意注入 θ 接近 2π 的个体,测试 wrapping + init_pop[0, 2] = 2 * math.pi - 0.01 + init_pop[0, 5] = 2 * math.pi - 0.05 + + best_vec, best_cost, n_gen = _run_de( + cost_fn=cost_fn, + bounds=bounds, + init_pop=init_pop, + maxiter=100, + tol=1e-6, + atol=1e-3, + mutation=(0.5, 1.0), + recombination=0.7, + seed=42, + n_devices=n_devices, + ) + + # θ 应在 [0, 2π) 范围内 + for d in range(n_devices): + theta = best_vec[3 * d + 2] + assert 0 <= theta < 2 * math.pi, f"θ 超出 [0, 2π): {theta}" + + # cost 应显著下降(不发散) + assert best_cost < 50.0, f"θ wrapping 未正确收敛,cost={best_cost}" + + +def test_per_device_crossover_atomicity(): + """验证 per-device crossover 的原子性:同一设备的 (x, y, θ) 来自同一来源。 + + 构造 2 设备场景,跟踪 crossover 后每个设备三元组是否完整来自 mutant 或 parent。 + """ + rng = np.random.default_rng(123) + n_devices = 3 + ndim = 3 * n_devices + + # 构造明显不同的 parent 和 mutant + parent = np.zeros(ndim) + mutant = np.ones(ndim) * 10.0 + + # 模拟多次 per-device crossover,检查原子性 + violations = 0 + n_trials = 200 + for _ in range(n_trials): + trial = parent.copy() + j_rand = rng.integers(0, n_devices) + for d in range(n_devices): + if rng.random() < 0.7 or d == j_rand: + trial[3 * d: 3 * d + 3] = mutant[3 * d: 3 * d + 3] + + # 检查每个设备的三元组要么全是 0(parent),要么全是 10(mutant) + for d in range(n_devices): + triple = trial[3 * d: 3 * d + 3] + all_parent = np.allclose(triple, 0.0) + all_mutant = np.allclose(triple, 10.0) + if not (all_parent or all_mutant): + violations += 1 + + assert violations == 0, f"Per-device crossover 原子性违反 {violations}/{n_trials * n_devices} 次" + + +def test_strategy_currenttobest1bin_converges(): + """currenttobest1bin 策略在简单 2 设备问题上应能收敛。""" + devices = [ + Device(id="a", name="A", bbox=(0.6, 0.4)), + Device(id="b", name="B", bbox=(0.6, 0.4)), + ] + lab = Lab(width=5.0, depth=5.0) + + placements = optimize( + devices, lab, seed=42, maxiter=80, popsize=10, + strategy="currenttobest1bin", + ) + + assert len(placements) == 2 + checker = MockCollisionChecker() + checker_placements = [ + {"id": p.device_id, "bbox": next(d.bbox for d in devices if d.id == p.device_id), + "pos": (p.x, p.y, p.theta)} + for p in placements + ] + collisions = checker.check(checker_placements) + assert collisions == [], f"currenttobest1bin 策略产生碰撞: {collisions}" + + +def test_strategy_best1bin_converges(): + """best1bin 策略在简单 2 设备问题上应能收敛。""" + devices = [ + Device(id="a", name="A", bbox=(0.6, 0.4)), + Device(id="b", name="B", bbox=(0.6, 0.4)), + ] + lab = Lab(width=5.0, depth=5.0) + + placements = optimize( + devices, lab, seed=42, maxiter=80, popsize=10, + strategy="best1bin", + ) + + assert len(placements) == 2 + checker = MockCollisionChecker() + checker_placements = [ + {"id": p.device_id, "bbox": next(d.bbox for d in devices if d.id == p.device_id), + "pos": (p.x, p.y, p.theta)} + for p in placements + ] + collisions = checker.check(checker_placements) + assert collisions == [], f"best1bin 策略产生碰撞: {collisions}" + + +def test_convergence_quality_two_devices(): + """2 设备无碰撞放置,cost 应低于阈值,验证优化质量。""" + devices = [ + Device(id="d1", name="D1", bbox=(0.8, 0.6)), + Device(id="d2", name="D2", bbox=(0.6, 0.5)), + ] + lab = Lab(width=5.0, depth=5.0) + + placements = optimize(devices, lab, seed=42, maxiter=100, popsize=10) + + # 验证结果质量:cost 应很低(无碰撞、在边界内) + from ..constraints import evaluate_default_hard_constraints + checker = MockCollisionChecker() + cost = evaluate_default_hard_constraints(devices, placements, lab, checker) + assert cost < 1.0, f"优化质量不佳,cost={cost}" + + +def test_run_de_early_stopping(): + """验证 _run_de 的 early stopping 机制:简单 cost function 应提前终止。""" + + def trivial_cost(x: np.ndarray) -> float: + # 简单二次函数,最优解在原点附近 + return float(np.sum(x ** 2)) + + ndim = 6 + n_devices = 2 + bounds = np.array([[-5.0, 5.0]] * ndim) + rng = np.random.default_rng(42) + pop_size = 20 + init_pop = rng.uniform(-5, 5, size=(pop_size, ndim)) + + _, _, n_gen = _run_de( + cost_fn=trivial_cost, + bounds=bounds, + init_pop=init_pop, + maxiter=500, + tol=1e-6, + atol=1e-3, + mutation=(0.5, 1.0), + recombination=0.7, + seed=42, + n_devices=n_devices, + ) + + # 简单问题应在远少于 maxiter=500 代内收敛 + assert n_gen < 500, f"Early stopping 未生效,运行了 {n_gen} 代" + + +def test_run_de_returns_correct_tuple(): + """验证 _run_de 返回值格式正确。""" + + def const_cost(x: np.ndarray) -> float: + return 42.0 + + bounds = np.array([[0.0, 1.0]] * 3) + init_pop = np.random.default_rng(0).uniform(0, 1, size=(5, 3)) + + result = _run_de( + cost_fn=const_cost, + bounds=bounds, + init_pop=init_pop, + maxiter=10, + tol=1e-6, + atol=1e-3, + mutation=(0.5, 1.0), + recombination=0.7, + seed=0, + n_devices=1, + ) + + assert isinstance(result, tuple) and len(result) == 3 + best_vec, best_cost, n_gen = result + assert isinstance(best_vec, np.ndarray) + assert best_vec.shape == (3,) + assert best_cost == pytest.approx(42.0) + assert isinstance(n_gen, int) and n_gen >= 1