diff --git a/unilabos/layout_optimizer/README.md b/unilabos/layout_optimizer/README.md index f9991e16..25e2a6f2 100644 --- a/unilabos/layout_optimizer/README.md +++ b/unilabos/layout_optimizer/README.md @@ -129,6 +129,7 @@ Returns all 10 intent types with parameter specs. LLM agent should call this bef "workflow_edges": [["device_a", "device_b"]], "seeder": "compact_outward", "run_de": true, + "angle_granularity": 4, "maxiter": 200, "seed": 42 } @@ -154,6 +155,7 @@ Returns all 10 intent types with parameter specs. LLM agent should call this bef ``` `position`/`rotation` format matches Cloud's `CommonPositionType`. `rotation.z` is θ in radians. +`angle_granularity` is optional and opt-in. Supported values are `4`, `8`, `12`, `24`. When set, the optimizer uses an angle-first hybrid mode: snap all device angles onto the global lattice, greedily sweep angles, then run DE on `x/y` only. `4` is the practical axis-aligned mode for tidy lab layouts. ### `GET /devices` — Device catalog @@ -438,6 +440,7 @@ curl -X POST http://localhost:8000/optimize \ ["agilent_plateloc", "inheco_odtc_96xl"] ], "run_de": true, + "angle_granularity": 4, "maxiter": 100, "seed": 42 }' | python3 -m json.tool diff --git a/unilabos/layout_optimizer/optimizer.py b/unilabos/layout_optimizer/optimizer.py index cae38952..6b3c771f 100644 --- a/unilabos/layout_optimizer/optimizer.py +++ b/unilabos/layout_optimizer/optimizer.py @@ -203,6 +203,417 @@ def _generate_seeds( return seeds +def _build_bounds( + devices: list[Device], lab: Lab, *, include_theta: bool, +) -> np.ndarray: + """构建搜索边界。""" + bounds = [] + for dev in devices: + half_min = min(dev.bbox[0], dev.bbox[1]) / 2 + bounds.append((half_min, lab.width - half_min)) + bounds.append((half_min, lab.depth - half_min)) + if include_theta: + bounds.append((0, 2 * math.pi)) + return np.array(bounds) + + +def _evaluate_layout_cost( + devices: list[Device], + placements: list[Placement], + lab: Lab, + collision_checker: Any, + reachability_checker: Any, + constraints: list[Constraint], +) -> float: + """统一计算布局总 cost。""" + hard_cost = evaluate_default_hard_constraints( + devices, placements, lab, collision_checker, + ) + if math.isinf(hard_cost): + return 1e18 + + if constraints: + user_cost = evaluate_constraints( + devices, placements, lab, constraints, + collision_checker, reachability_checker, + ) + if math.isinf(user_cost): + return 1e18 + return hard_cost + user_cost + + return hard_cost + + +def _make_progress_callback( + devices: list[Device], + lab: Lab, + constraints: list[Constraint], + collision_checker: Any, + reachability_checker: Any, + placements_from_vector: Callable[[np.ndarray], list[Placement]], +) -> Callable[[int, np.ndarray, float], None]: + """构造统一的 DEBUG 进度回调。""" + + def _progress_cb(gen: int, best_vec: np.ndarray, best_cost_val: float) -> None: + if not logger.isEnabledFor(logging.DEBUG): + return + pls = placements_from_vector(best_vec) + hard_bd = evaluate_default_hard_constraints_breakdown( + devices, pls, lab, collision_checker, + ) + lines = [f"=== DE Gen {gen} | best_cost={best_cost_val:.4f} ==="] + lines.append(f" {'Constraint':<45} {'Type':<6} {'Weight':>8} {'Cost':>10}") + lines.append(f" {'─' * 71}") + lines.append( + f" {'[predefined] collision':<45} {'hard':<6} {hard_bd['collision_weight']:>8.0f} {hard_bd['collision']:>10.4f}" + ) + lines.append( + f" {'[predefined] boundary':<45} {'hard':<6} {hard_bd['boundary_weight']:>8.0f} {hard_bd['boundary']:>10.4f}" + ) + if constraints: + user_bd = evaluate_constraints_breakdown( + devices, pls, lab, constraints, + collision_checker, reachability_checker, + ) + for item in user_bd: + lines.append( + f" {item['name']:<45} {item['type']:<6} {item['weight']:>8.1f} {item['cost']:>10.4f}" + ) + lines.append(f" {'─' * 71}") + lines.append(f" {'TOTAL':<45} {'':6} {'':>8} {best_cost_val:>10.4f}") + logger.debug("\n".join(lines)) + + return _progress_cb + + +def _log_final_summary( + devices: list[Device], + final_placements: list[Placement], + lab: Lab, + constraints: list[Constraint], + collision_checker: Any, + reachability_checker: Any, + best_cost: float, + n_generations: int, + n_evaluations: int, +) -> None: + """输出最终布局分项明细。""" + hard_bd = evaluate_default_hard_constraints_breakdown( + devices, final_placements, lab, collision_checker, + ) + all_hard_met = hard_bd["total"] == 0.0 + all_violators: list[dict] = [ + {"name": "[predefined] collision", "cost": hard_bd["collision"]}, + {"name": "[predefined] boundary", "cost": hard_bd["boundary"]}, + ] + if constraints: + user_bd = evaluate_constraints_breakdown( + devices, final_placements, lab, constraints, + collision_checker, reachability_checker, + ) + user_total = sum(item["cost"] for item in user_bd) + for c_item in user_bd: + all_violators.append({"name": c_item["name"], "cost": c_item["cost"]}) + if c_item["type"] == "hard" and c_item["cost"] > 0: + all_hard_met = False + else: + user_total = 0.0 + + summary = [ + "DE complete: success=%s, cost=%.4f, %d gens, %d evals" + % (all_hard_met, best_cost, n_generations, n_evaluations), + " Predefined: subtotal=%.4f" % hard_bd["total"], + ] + if constraints: + summary.append(f" User: subtotal={user_total:.4f}") + top_violators = sorted(all_violators, key=lambda x: x["cost"], reverse=True)[:3] + top_violators = [v for v in top_violators if v["cost"] > 0] + if top_violators: + summary.append(" Top violators:") + for v in top_violators: + summary.append(f" {v['name']} = {v['cost']:.4f}") + logger.info("\n".join(summary)) + + +def _angle_lattice(granularity: int) -> list[float]: + """生成角度离散格点。""" + return [(2 * math.pi * idx) / granularity for idx in range(granularity)] + + +def _nearest_lattice_theta(theta: float, angles: list[float]) -> float: + """返回距离最近的离散角度。""" + theta_mod = theta % (2 * math.pi) + return min( + angles, + key=lambda angle: min( + abs(theta_mod - angle), + 2 * math.pi - abs(theta_mod - angle), + ), + ) + + +def _snap_placements_to_lattice( + placements: list[Placement], angles: list[float], +) -> list[Placement]: + """将所有设备角度吸附到离散格点。""" + return [ + Placement( + device_id=p.device_id, + x=p.x, + y=p.y, + theta=_nearest_lattice_theta(p.theta, angles), + uuid=p.uuid, + ) + for p in placements + ] + + +def _placements_to_position_vector( + placements: list[Placement], devices: list[Device], +) -> np.ndarray: + """将 Placement 列表编码为 2N 维位置向量。""" + placement_map = {p.device_id: p for p in placements} + vec = np.zeros(2 * len(devices)) + for i, dev in enumerate(devices): + p = placement_map.get(dev.id) + if p is not None: + vec[2 * i] = p.x + vec[2 * i + 1] = p.y + return vec + + +def _position_vector_to_placements( + x: np.ndarray, + devices: list[Device], + base_placements: list[Placement], +) -> list[Placement]: + """将 2N 维位置向量解码为保留 theta 的 Placement 列表。""" + base_map = {p.device_id: p for p in base_placements} + placements = [] + for i, dev in enumerate(devices): + base = base_map.get(dev.id) + theta = base.theta if base is not None else 0.0 + uuid = base.uuid if base is not None else "" + placements.append( + Placement( + device_id=dev.id, + x=float(x[2 * i]), + y=float(x[2 * i + 1]), + theta=float(theta % (2 * math.pi)), + uuid=uuid, + ) + ) + return placements + + +def _run_de_xy( + cost_fn: Callable[[np.ndarray], float], + bounds: np.ndarray, + init_pop: np.ndarray, + maxiter: int, + tol: float, + atol: float, + mutation: tuple[float, float], + recombination: float, + seed: int | None, + n_devices: int, + strategy: str = "currenttobest1bin", + progress_callback: Callable[[int, np.ndarray, float], None] | None = None, +) -> tuple[np.ndarray, float, int]: + """固定 theta 的 2N 维位置 DE。""" + rng = np.random.default_rng(seed) + pop_size, ndim = init_pop.shape + lower = bounds[:, 0] + upper = bounds[:, 1] + f_min, f_max = mutation + + costs = np.array([cost_fn(ind) for ind in init_pop]) + best_idx = int(np.argmin(costs)) + best_cost = costs[best_idx] + best_vector = init_pop[best_idx].copy() + + patience = 200 + best_cost_history: list[float] = [best_cost] + + for gen in range(1, maxiter + 1): + for i in range(pop_size): + f_val = rng.uniform(f_min, f_max) + candidates = list(range(pop_size)) + candidates.remove(i) + chosen = rng.choice(candidates, size=2, replace=False) + r1, r2 = int(chosen[0]), int(chosen[1]) + + if strategy == "best1bin": + mutant = best_vector + f_val * (init_pop[r1] - init_pop[r2]) + else: + mutant = ( + init_pop[i] + + f_val * 0.1 * (upper - lower) * rng.uniform(-1, 1, size=ndim) + + f_val * (best_vector - init_pop[i]) + + f_val * (init_pop[r1] - init_pop[r2]) + ) + + trial = init_pop[i].copy() + j_rand = rng.integers(0, n_devices) + for d in range(n_devices): + if rng.random() < recombination or d == j_rand: + trial[2 * d: 2 * d + 2] = mutant[2 * d: 2 * d + 2] + + trial = np.clip(trial, lower, upper) + trial_cost = cost_fn(trial) + if trial_cost <= costs[i]: + init_pop[i] = trial + costs[i] = trial_cost + if trial_cost < best_cost: + best_cost = trial_cost + best_vector = trial.copy() + + best_idx = int(np.argmin(costs)) + + if progress_callback and gen % 10 == 0: + progress_callback(gen, best_vector, best_cost) + + best_cost_history.append(best_cost) + if len(best_cost_history) >= patience: + old_cost = best_cost_history[-patience] + improvement = (old_cost - best_cost) / old_cost if old_cost > 0 else 0.0 + if improvement < 0.001: + logger.info( + "Early stop: cost 在 %d 代内稳定在 %.4f(改善 < 0.1%%)", + patience, best_cost, + ) + return best_vector, best_cost, gen + + if np.std(costs) <= atol + tol * abs(best_cost): + logger.info( + "收敛终止:std(costs)=%.6f <= atol+tol*|best|=%.6f,第 %d 代", + np.std(costs), atol + tol * abs(best_cost), gen, + ) + return best_vector, best_cost, gen + + return best_vector, best_cost, maxiter + + +def _angle_sweep_once( + devices: list[Device], + placements: list[Placement], + angles: list[float], + lab: Lab, + constraints: list[Constraint], + collision_checker: Any, + reachability_checker: Any, +) -> tuple[list[Placement], float, bool]: + """固定位置做一轮逐设备离散角度贪心扫描。""" + current = list(placements) + current_cost = _evaluate_layout_cost( + devices, current, lab, collision_checker, reachability_checker, constraints, + ) + changed = False + + for idx, dev in enumerate(devices): + best_theta = current[idx].theta + best_cost = current_cost + + for angle in angles: + if abs((best_theta - angle) % (2 * math.pi)) < 1e-9: + continue + candidate = list(current) + base = candidate[idx] + candidate[idx] = Placement( + device_id=base.device_id, + x=base.x, + y=base.y, + theta=angle, + uuid=base.uuid, + ) + candidate_cost = _evaluate_layout_cost( + devices, candidate, lab, collision_checker, reachability_checker, constraints, + ) + if candidate_cost < best_cost - 1e-9: + best_theta = angle + best_cost = candidate_cost + + if abs((current[idx].theta - best_theta) % (2 * math.pi)) >= 1e-9: + base = current[idx] + current[idx] = Placement( + device_id=base.device_id, + x=base.x, + y=base.y, + theta=best_theta, + uuid=base.uuid, + ) + current_cost = best_cost + changed = True + + return current, current_cost, changed + + +def _optimize_positions_fixed_theta( + devices: list[Device], + lab: Lab, + constraints: list[Constraint], + collision_checker: Any, + reachability_checker: Any, + seed_placements: list[Placement], + maxiter: int, + popsize: int, + tol: float, + seed: int | None, + strategy: str, +) -> tuple[list[Placement], float, int, int]: + """在固定离散 theta 下,只优化位置。""" + n = len(devices) + bounds_array = _build_bounds(devices, lab, include_theta=False) + seed_vector = np.clip( + _placements_to_position_vector(seed_placements, devices), + bounds_array[:, 0], + bounds_array[:, 1], + ) + + def cost_function(x: np.ndarray) -> float: + placements = _position_vector_to_placements(x, devices, seed_placements) + return _evaluate_layout_cost( + devices, placements, lab, collision_checker, reachability_checker, constraints, + ) + + rng = np.random.default_rng(seed) + pop_count = popsize * 2 * n + init_pop = rng.uniform( + bounds_array[:, 0], bounds_array[:, 1], size=(pop_count, 2 * n), + ) + init_pop[0] = seed_vector + + progress_cb = _make_progress_callback( + devices, + lab, + constraints, + collision_checker, + reachability_checker, + lambda vec: _position_vector_to_placements(vec, devices, seed_placements), + ) + + best_vector, best_cost, n_generations = _run_de_xy( + cost_fn=cost_function, + bounds=bounds_array, + init_pop=init_pop, + maxiter=maxiter, + tol=tol, + atol=1e-3, + mutation=(0.5, 1.0), + recombination=0.7, + seed=seed, + n_devices=n, + strategy=strategy, + progress_callback=progress_cb, + ) + return ( + _position_vector_to_placements(best_vector, devices, seed_placements), + best_cost, + n_generations, + pop_count + n_generations * pop_count, + ) + + def optimize( devices: list[Device], lab: Lab, @@ -216,6 +627,7 @@ def optimize( seed: int | None = None, strategy: str = "currenttobest1bin", workflow_edges: list[list[str]] | None = None, + angle_granularity: int | None = None, ) -> list[Placement]: """运行差分进化优化,返回最优布局。 @@ -246,17 +658,7 @@ def optimize( constraints = [] n = len(devices) - - # 构建边界:每个设备 (x, y, θ) - # 使用较小半径作为搜索边界,让 graduated boundary penalty 处理实际越界 - # 对角线半径过于保守,会阻止长设备贴边对齐 - bounds = [] - for dev in devices: - half_min = min(dev.bbox[0], dev.bbox[1]) / 2 - bounds.append((half_min, lab.width - half_min)) # x - bounds.append((half_min, lab.depth - half_min)) # y - bounds.append((0, 2 * math.pi)) # θ - bounds_array = np.array(bounds) + bounds_array = _build_bounds(devices, lab, include_theta=True) # 生成种子个体 if seed_placements is None: @@ -269,25 +671,86 @@ def optimize( def cost_function(x: np.ndarray) -> float: placements = _vector_to_placements(x, devices) - - # 默认硬约束(碰撞 + 边界) - hard_cost = evaluate_default_hard_constraints( - devices, placements, lab, collision_checker + return _evaluate_layout_cost( + devices, placements, lab, collision_checker, reachability_checker, constraints, ) - if math.isinf(hard_cost): - return 1e18 # DE 不接受 inf,用大数替代 - # 用户自定义约束 - if constraints: - user_cost = evaluate_constraints( - devices, placements, lab, constraints, - collision_checker, reachability_checker, + if angle_granularity is not None: + angles = _angle_lattice(angle_granularity) + current_placements = _snap_placements_to_lattice(seed_placements, angles) + best_placements = current_placements + best_cost = _evaluate_layout_cost( + devices, best_placements, lab, collision_checker, reachability_checker, constraints, + ) + total_generations = 0 + total_evaluations = 0 + logger.info( + "Starting hybrid optimization: %d devices, granularity=%d, outer_rounds=%d, strategy=%s", + n, angle_granularity, 3, strategy, + ) + + maxiter_xy = max(40, math.ceil(maxiter / 3)) + for round_idx in range(3): + round_start_best = best_cost + angle_placements, angle_cost, changed = _angle_sweep_once( + devices, + current_placements, + angles, + lab, + constraints, + collision_checker, + reachability_checker, ) - if math.isinf(user_cost): - return 1e18 - return hard_cost + user_cost - return hard_cost + round_seed = None if seed is None else seed + round_idx + polished_placements, polished_cost, n_generations, n_evaluations = ( + _optimize_positions_fixed_theta( + devices=devices, + lab=lab, + constraints=constraints, + collision_checker=collision_checker, + reachability_checker=reachability_checker, + seed_placements=angle_placements, + maxiter=maxiter_xy, + popsize=popsize, + tol=tol, + seed=round_seed, + strategy=strategy, + ) + ) + total_generations += n_generations + total_evaluations += n_evaluations + current_placements = polished_placements + + if polished_cost < best_cost: + best_cost = polished_cost + best_placements = polished_placements + improved = polished_cost < round_start_best - 1e-9 + + logger.info( + "Hybrid round %d complete: changed=%s, angle_cost=%.4f, polished_cost=%.4f", + round_idx + 1, changed, angle_cost, polished_cost, + ) + + if not changed and not improved: + logger.info( + "Hybrid early stop: 第 %d 轮无角度变化且无 cost 改善", + round_idx + 1, + ) + break + + _log_final_summary( + devices, + best_placements, + lab, + constraints, + collision_checker, + reachability_checker, + best_cost, + total_generations, + total_evaluations, + ) + return best_placements # 构建初始种群:种子个体 + 多样性种子 + 随机个体 rng = np.random.default_rng(seed) @@ -309,35 +772,14 @@ def optimize( n, 3 * n, pop_count, maxiter, strategy, ) - # DEBUG 模式进度回调:每 10 代输出完整约束分项表格 - def _progress_cb(gen: int, best_vec: np.ndarray, best_cost_val: float) -> None: - if not logger.isEnabledFor(logging.DEBUG): - return - pls = _vector_to_placements(best_vec, devices) - hard_bd = evaluate_default_hard_constraints_breakdown( - devices, pls, lab, collision_checker, - ) - lines = [f"=== DE Gen {gen} | best_cost={best_cost_val:.4f} ==="] - lines.append(f" {'Constraint':<45} {'Type':<6} {'Weight':>8} {'Cost':>10}") - lines.append(f" {'─' * 71}") - lines.append( - f" {'[predefined] collision':<45} {'hard':<6} {hard_bd['collision_weight']:>8.0f} {hard_bd['collision']:>10.4f}" - ) - lines.append( - f" {'[predefined] boundary':<45} {'hard':<6} {hard_bd['boundary_weight']:>8.0f} {hard_bd['boundary']:>10.4f}" - ) - if constraints: - user_bd = evaluate_constraints_breakdown( - devices, pls, lab, constraints, - collision_checker, reachability_checker, - ) - for item in user_bd: - lines.append( - f" {item['name']:<45} {item['type']:<6} {item['weight']:>8.1f} {item['cost']:>10.4f}" - ) - lines.append(f" {'─' * 71}") - lines.append(f" {'TOTAL':<45} {'':6} {'':>8} {best_cost_val:>10.4f}") - logger.debug("\n".join(lines)) + progress_cb = _make_progress_callback( + devices, + lab, + constraints, + collision_checker, + reachability_checker, + lambda vec: _vector_to_placements(vec, devices), + ) best_vector, best_cost, n_generations = _run_de( cost_fn=cost_function, @@ -351,54 +793,26 @@ def optimize( seed=seed, n_devices=n, strategy=strategy, - progress_callback=_progress_cb, + progress_callback=progress_cb, ) # 评估次数估算:每代 pop_count 次(初始 + 每代 trial) n_evaluations = pop_count + n_generations * pop_count - # 最终布局分项明细(INFO 级别) final_placements = _vector_to_placements(best_vector, devices) - hard_bd = evaluate_default_hard_constraints_breakdown( - devices, final_placements, lab, collision_checker, + _log_final_summary( + devices, + final_placements, + lab, + constraints, + collision_checker, + reachability_checker, + best_cost, + n_generations, + n_evaluations, ) - # success = 所有 hard 约束均满足(predefined + 用户 hard) - all_hard_met = hard_bd["total"] == 0.0 - # 所有约束的 top violators 候选池(predefined + user) - all_violators: list[dict] = [ - {"name": "[predefined] collision", "cost": hard_bd["collision"]}, - {"name": "[predefined] boundary", "cost": hard_bd["boundary"]}, - ] - if constraints: - user_bd = evaluate_constraints_breakdown( - devices, final_placements, lab, constraints, - collision_checker, reachability_checker, - ) - user_total = sum(item["cost"] for item in user_bd) - for c_item in user_bd: - all_violators.append({"name": c_item["name"], "cost": c_item["cost"]}) - if c_item["type"] == "hard" and c_item["cost"] > 0: - all_hard_met = False - else: - user_bd = [] - user_total = 0.0 - summary = [ - "DE complete: success=%s, cost=%.4f, %d gens, %d evals" - % (all_hard_met, best_cost, n_generations, n_evaluations), - " Predefined: subtotal=%.4f" % hard_bd["total"], - ] - if constraints: - summary.append(f" User: subtotal={user_total:.4f}") - top_violators = sorted(all_violators, key=lambda x: x["cost"], reverse=True)[:3] - top_violators = [v for v in top_violators if v["cost"] > 0] - if top_violators: - summary.append(" Top violators:") - for v in top_violators: - summary.append(f" {v['name']} = {v['cost']:.4f}") - logger.info("\n".join(summary)) - - return _vector_to_placements(best_vector, devices) + return final_placements def snap_theta(placements: list[Placement], threshold_deg: float = 15.0) -> list[Placement]: diff --git a/unilabos/layout_optimizer/server.py b/unilabos/layout_optimizer/server.py index 2b334658..d0522d0d 100644 --- a/unilabos/layout_optimizer/server.py +++ b/unilabos/layout_optimizer/server.py @@ -409,6 +409,7 @@ class OptimizeRequest(BaseModel): maxiter: int = 200 seed: int | None = None snap_cardinal: bool = False + angle_granularity: int | None = None class PositionXYZ(BaseModel): @@ -443,15 +444,22 @@ async def run_optimize(request: OptimizeRequest): from .seeders import resolve_seeder_params, seed_layout logger.info( - "Optimize request: %d devices, lab %.1f×%.1f, %d constraints, seeder=%s, run_de=%s", + "Optimize request: %d devices, lab %.1f×%.1f, %d constraints, seeder=%s, run_de=%s, angle_granularity=%s", len(request.devices), request.lab.width, request.lab.depth, len(request.constraints), request.seeder, request.run_de, + request.angle_granularity, ) + if request.angle_granularity not in (None, 4, 8, 12, 24): + raise HTTPException( + status_code=400, + detail="angle_granularity must be one of: 4, 8, 12, 24", + ) + # Build mapping: internal uuid-based id → (catalog_id, uuid) # create_devices_from_list uses uuid as Device.id when available id_to_catalog: dict[str, str] = {} @@ -522,14 +530,20 @@ async def run_optimize(request: OptimizeRequest): maxiter=request.maxiter, seed=request.seed, workflow_edges=request.workflow_edges or None, + angle_granularity=request.angle_granularity, ) de_ran = True else: result_placements = seed_placements # 5. θ snap post-processing(opt-in,默认关闭) - if request.snap_cardinal: + if request.snap_cardinal and request.angle_granularity is None: result_placements = snap_theta_safe(result_placements, devices, lab, checker) + elif request.snap_cardinal and request.angle_granularity is not None: + logger.info( + "snap_cardinal ignored because angle_granularity=%s already constrains theta", + request.angle_granularity, + ) # 6. Evaluate final cost (binary mode for pass/fail reporting) final_cost = evaluate_default_hard_constraints( diff --git a/unilabos/layout_optimizer/tests/test_optimizer.py b/unilabos/layout_optimizer/tests/test_optimizer.py index a7892092..4dc7fbe0 100644 --- a/unilabos/layout_optimizer/tests/test_optimizer.py +++ b/unilabos/layout_optimizer/tests/test_optimizer.py @@ -1,12 +1,45 @@ """差分进化优化器端到端测试。""" +import asyncio import math +import httpx from ..mock_checkers import MockCollisionChecker -from ..models import Device, Lab, Placement +from ..models import Constraint, Device, Lab, Placement import numpy as np import pytest -from ..optimizer import _run_de, optimize, snap_theta +from ..optimizer import _angle_sweep_once, _run_de, optimize, snap_theta + + +def _is_on_angle_lattice(theta: float, granularity: int) -> bool: + """检查 theta 是否落在离散角度格点上。""" + step = 2 * math.pi / granularity + theta_mod = theta % (2 * math.pi) + quotient = theta_mod / step + return abs(quotient - round(quotient)) < 1e-6 + + +PCR_DEVICES = [ + {"id": "thermo_orbitor_rs2_hotel", "name": "Plate Hotel", "device_type": "static"}, + {"id": "arm_slider", "name": "Robot Arm", "device_type": "articulation"}, + {"id": "opentrons_liquid_handler", "name": "Liquid Handler", "device_type": "static"}, + {"id": "agilent_plateloc", "name": "Plate Sealer", "device_type": "static"}, + {"id": "inheco_odtc_96xl", "name": "Thermal Cycler", "device_type": "static"}, +] + +PCR_LAB = {"width": 6.0, "depth": 4.0} + + +def _post_app(path: str, payload: dict) -> httpx.Response: + """通过 ASGITransport 调用应用,避免 TestClient 在沙箱中卡住。""" + from ..server import app + + async def _run() -> httpx.Response: + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: + return await client.post(path, json=payload) + + return asyncio.run(_run()) def test_optimize_three_devices_no_collision(): @@ -150,6 +183,18 @@ def test_optimize_endpoint_unknown_seeder_returns_400(): assert resp.status_code == 400 +def test_optimize_endpoint_invalid_angle_granularity_returns_400(): + """Unsupported angle_granularity should be rejected.""" + resp = _post_app("/optimize", { + "devices": [{"id": "test_device", "name": "Test"}], + "lab": {"width": 5, "depth": 4}, + "run_de": True, + "angle_granularity": 6, + }) + assert resp.status_code == 400 + assert "angle_granularity" in resp.json()["detail"] + + def test_optimize_endpoint_backward_compatible(): """Existing calls without seeder/run_de fields still work.""" from fastapi.testclient import TestClient @@ -166,6 +211,108 @@ def test_optimize_endpoint_backward_compatible(): assert "de_ran" in data +def test_optimize_none_angle_granularity_matches_default(): + """Explicit None should keep the continuous-theta path unchanged.""" + devices = [ + Device(id="a", name="A", bbox=(0.8, 0.6)), + Device(id="b", name="B", bbox=(0.6, 0.5)), + Device(id="c", name="C", bbox=(0.5, 0.5)), + ] + lab = Lab(width=5.0, depth=5.0) + seed_placements = [ + Placement(device_id="a", x=1.0, y=1.0, theta=0.13), + Placement(device_id="b", x=2.2, y=1.5, theta=1.21), + Placement(device_id="c", x=3.4, y=3.2, theta=2.42), + ] + + baseline = optimize( + devices, lab, seed_placements=seed_placements, + seed=42, maxiter=40, popsize=8, + ) + explicit_none = optimize( + devices, lab, seed_placements=seed_placements, + seed=42, maxiter=40, popsize=8, angle_granularity=None, + ) + + assert len(baseline) == len(explicit_none) + for p_base, p_none in zip(baseline, explicit_none): + assert p_base.device_id == p_none.device_id + assert p_base.x == pytest.approx(p_none.x) + assert p_base.y == pytest.approx(p_none.y) + assert p_base.theta == pytest.approx(p_none.theta) + + +def test_angle_sweep_picks_lowest_cost_lattice_angle(): + """逐设备离散扫描应选出当前最优格点角度。""" + devices = [Device(id="a", name="A", bbox=(0.8, 0.6))] + lab = Lab(width=4.0, depth=4.0) + placements = [Placement(device_id="a", x=2.0, y=2.0, theta=math.pi / 4)] + constraints = [Constraint(type="soft", rule_name="prefer_aligned", weight=10.0)] + angles = [0.0, math.pi / 2, math.pi, 3 * math.pi / 2] + + swept, cost, changed = _angle_sweep_once( + devices=devices, + placements=placements, + angles=angles, + lab=lab, + constraints=constraints, + collision_checker=MockCollisionChecker(), + reachability_checker=None, + ) + + assert changed is True + assert swept[0].theta == pytest.approx(0.0) + assert cost == pytest.approx(0.0, abs=1e-6) + + +def test_optimize_angle_granularity_returns_lattice_thetas(): + """Hybrid mode should keep all returned thetas on the chosen lattice.""" + devices = [ + Device(id="a", name="A", bbox=(0.8, 0.6)), + Device(id="b", name="B", bbox=(0.6, 0.5)), + Device(id="c", name="C", bbox=(0.5, 0.5)), + ] + lab = Lab(width=5.0, depth=5.0) + seed_placements = [ + Placement(device_id="a", x=1.0, y=1.0, theta=0.13), + Placement(device_id="b", x=2.3, y=1.5, theta=1.21), + Placement(device_id="c", x=3.6, y=3.2, theta=2.42), + ] + + placements = optimize( + devices, lab, seed_placements=seed_placements, + seed=42, maxiter=45, popsize=8, angle_granularity=4, + ) + + assert len(placements) == 3 + for placement in placements: + assert _is_on_angle_lattice(placement.theta, 4), ( + f"{placement.device_id} theta={placement.theta} not on 4-angle lattice" + ) + + +def test_optimize_endpoint_accepts_angle_granularity_with_pcr_fixture(): + """POST /optimize should accept angle_granularity on the 5-device PCR fixture.""" + resp = _post_app("/optimize", { + "devices": PCR_DEVICES, + "lab": PCR_LAB, + "seeder": "row_fallback", + "run_de": True, + "maxiter": 20, + "seed": 42, + "angle_granularity": 4, + }) + + assert resp.status_code == 200 + data = resp.json() + assert len(data["placements"]) == 5 + assert data["de_ran"] is True + for placement in data["placements"]: + assert _is_on_angle_lattice(placement["rotation"]["z"], 4), ( + f"{placement['device_id']} rotation.z={placement['rotation']['z']} not on lattice" + ) + + def test_full_pipeline_seed_only(): """Full pipeline: seeder → snap_theta → correct count and bounds.