From 31e79e9affd73981a3cb99634a4cd2a8f94fd6b2 Mon Sep 17 00:00:00 2001 From: yexiaozhou Date: Thu, 2 Apr 2026 12:05:49 +0800 Subject: [PATCH] chore(DE): add debug mode and detailed log regarding cost changes --- unilabos/layout_optimizer/constraints.py | 94 ++++++++++++++++++++++++ unilabos/layout_optimizer/optimizer.py | 86 +++++++++++++++++++++- 2 files changed, 176 insertions(+), 4 deletions(-) diff --git a/unilabos/layout_optimizer/constraints.py b/unilabos/layout_optimizer/constraints.py index f726428a..08929cac 100644 --- a/unilabos/layout_optimizer/constraints.py +++ b/unilabos/layout_optimizer/constraints.py @@ -20,6 +20,8 @@ from .obb import ( ) if TYPE_CHECKING: + from typing import Any + from .interfaces import CollisionChecker, ReachabilityChecker # 归一化默认权重 — 1cm距离违规 ≈ 5°角度违规 的惩罚量级 @@ -482,6 +484,98 @@ def _opening_surface_center( return (world_x, world_y) +def evaluate_default_hard_constraints_breakdown( + devices: list[Device], + placements: list[Placement], + lab: Lab, + collision_checker: CollisionChecker, + *, + collision_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER, + boundary_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER, +) -> dict[str, float]: + """与 evaluate_default_hard_constraints 逻辑相同,但返回分项明细。""" + device_map = {d.id: d for d in devices} + collision_cost = 0.0 + boundary_cost = 0.0 + + candidate_pairs = sweep_and_prune_pairs(devices, placements) + for i, j in candidate_pairs: + di, dj = device_map[placements[i].device_id], device_map[placements[j].device_id] + ci = obb_corners(placements[i].x, placements[i].y, + di.bbox[0], di.bbox[1], placements[i].theta) + cj = obb_corners(placements[j].x, placements[j].y, + dj.bbox[0], dj.bbox[1], placements[j].theta) + depth = obb_penetration_depth(ci, cj) + if depth > 0: + collision_cost += collision_weight * depth + + for p in placements: + dev = device_map[p.device_id] + hw, hd = p.rotated_bbox(dev) + overshoot = 0.0 + overshoot += max(0.0, hw - p.x) + overshoot += max(0.0, (p.x + hw) - lab.width) + overshoot += max(0.0, hd - p.y) + overshoot += max(0.0, (p.y + hd) - lab.depth) + boundary_cost += boundary_weight * overshoot + + return { + "collision": collision_cost, + "boundary": boundary_cost, + "total": collision_cost + boundary_cost, + "collision_weight": collision_weight, + "boundary_weight": boundary_weight, + } + + +def evaluate_constraints_breakdown( + devices: list[Device], + placements: list[Placement], + lab: Lab, + constraints: list[Constraint], + collision_checker: CollisionChecker, + reachability_checker: ReachabilityChecker | None = None, +) -> list[dict[str, Any]]: + """与 evaluate_constraints 逻辑相同,但返回每条约束的分项明细。""" + device_map = {d.id: d for d in devices} + placement_map = {p.device_id: p for p in placements} + + results = [] + for c in constraints: + cost = _evaluate_single( + c, device_map, placement_map, lab, collision_checker, reachability_checker, + graduated=True, + ) + ew = c.weight + if c.priority and c.priority in PRIORITY_MULTIPLIERS: + ew *= PRIORITY_MULTIPLIERS[c.priority] + results.append({ + "name": _constraint_display_name(c), + "rule": c.rule_name, + "type": c.type, + "cost": cost, + "weight": ew, + }) + return results + + +def _constraint_display_name(c: Constraint) -> str: + """为约束生成可读的显示名称。""" + params = c.params + if c.rule_name in ( + "distance_less_than", "distance_greater_than", + "minimize_distance", "maximize_distance", + ): + return f"{c.rule_name}({params.get('device_a', '?')}, {params.get('device_b', '?')})" + if c.rule_name == "reachability": + return f"reachability({params.get('arm_id', '?')}, {params.get('target_device_id', '?')})" + if c.rule_name == "min_spacing": + return f"min_spacing(gap={params.get('min_gap', '?')})" + if c.rule_name == "prefer_orientation_mode": + return f"prefer_orientation_mode({params.get('mode', '?')})" + return c.rule_name + + def _line_of_sight_penalty( arm_id: str, arm_p: Placement, diff --git a/unilabos/layout_optimizer/optimizer.py b/unilabos/layout_optimizer/optimizer.py index 84ea0aa7..9c1f33f2 100644 --- a/unilabos/layout_optimizer/optimizer.py +++ b/unilabos/layout_optimizer/optimizer.py @@ -13,7 +13,12 @@ from typing import Any, Callable import numpy as np -from .constraints import evaluate_constraints, evaluate_default_hard_constraints +from .constraints import ( + evaluate_constraints, + evaluate_constraints_breakdown, + evaluate_default_hard_constraints, + evaluate_default_hard_constraints_breakdown, +) from .mock_checkers import MockCollisionChecker, MockReachabilityChecker from .models import Constraint, Device, Lab, Placement from .pencil_integration import generate_initial_layout @@ -34,6 +39,7 @@ def _run_de( seed: int | None, n_devices: int, strategy: str = "currenttobest1bin", + progress_callback: Callable[[int, np.ndarray, float], None] | None = None, ) -> tuple[np.ndarray, float, int]: """自定义差分进化循环。 @@ -56,6 +62,7 @@ def _run_de( seed: 随机种子 n_devices: 设备数量(用于 per-device crossover) strategy: 变异策略,"currenttobest1bin" 或 "best1bin" + progress_callback: 每 10 代调用一次 (gen, best_vector, best_cost) Returns: (best_vector, best_cost, n_generations) @@ -125,6 +132,10 @@ def _run_de( # 更新 best_idx(种群可能整体更新) best_idx = int(np.argmin(costs)) + # 进度回调:每 10 代报告最优个体状态 + if progress_callback and gen % 10 == 0: + progress_callback(gen, best_vector, best_cost) + # Early stopping:最近 patience 代改善 < 0.1% best_cost_history.append(best_cost) if len(best_cost_history) >= patience: @@ -296,6 +307,36 @@ def optimize( n, 3 * n, pop_count, maxiter, strategy, ) + # DEBUG 模式进度回调:每 10 代输出完整约束分项表格 + def _progress_cb(gen: int, best_vec: np.ndarray, best_cost_val: float) -> None: + if not logger.isEnabledFor(logging.DEBUG): + return + pls = _vector_to_placements(best_vec, devices) + hard_bd = evaluate_default_hard_constraints_breakdown( + devices, pls, lab, collision_checker, + ) + lines = [f"=== DE Gen {gen} | best_cost={best_cost_val:.4f} ==="] + lines.append(f" {'Constraint':<45} {'Type':<6} {'Weight':>8} {'Cost':>10}") + lines.append(f" {'─' * 71}") + lines.append( + f" {'[predefined] collision':<45} {'hard':<6} {hard_bd['collision_weight']:>8.0f} {hard_bd['collision']:>10.4f}" + ) + lines.append( + f" {'[predefined] boundary':<45} {'hard':<6} {hard_bd['boundary_weight']:>8.0f} {hard_bd['boundary']:>10.4f}" + ) + if constraints: + user_bd = evaluate_constraints_breakdown( + devices, pls, lab, constraints, + collision_checker, reachability_checker, + ) + for item in user_bd: + lines.append( + f" {item['name']:<45} {item['type']:<6} {item['weight']:>8.1f} {item['cost']:>10.4f}" + ) + lines.append(f" {'─' * 71}") + lines.append(f" {'TOTAL':<45} {'':6} {'':>8} {best_cost_val:>10.4f}") + logger.debug("\n".join(lines)) + best_vector, best_cost, n_generations = _run_de( cost_fn=cost_function, bounds=bounds_array, @@ -308,15 +349,52 @@ def optimize( seed=seed, n_devices=n, strategy=strategy, + progress_callback=_progress_cb, ) # 评估次数估算:每代 pop_count 次(初始 + 每代 trial) n_evaluations = pop_count + n_generations * pop_count - logger.info( - "DE optimization complete: success=%s, cost=%.4f, iterations=%d, evaluations=%d", - best_cost < 1e17, best_cost, n_generations, n_evaluations, + # 最终布局分项明细(INFO 级别) + final_placements = _vector_to_placements(best_vector, devices) + hard_bd = evaluate_default_hard_constraints_breakdown( + devices, final_placements, lab, collision_checker, ) + # success = 所有 hard 约束均满足(predefined + 用户 hard) + all_hard_met = hard_bd["total"] == 0.0 + # 所有约束的 top violators 候选池(predefined + user) + all_violators: list[dict] = [ + {"name": "[predefined] collision", "cost": hard_bd["collision"]}, + {"name": "[predefined] boundary", "cost": hard_bd["boundary"]}, + ] + if constraints: + user_bd = evaluate_constraints_breakdown( + devices, final_placements, lab, constraints, + collision_checker, reachability_checker, + ) + user_total = sum(item["cost"] for item in user_bd) + for c_item in user_bd: + all_violators.append({"name": c_item["name"], "cost": c_item["cost"]}) + if c_item["type"] == "hard" and c_item["cost"] > 0: + all_hard_met = False + else: + user_bd = [] + user_total = 0.0 + + summary = [ + "DE complete: success=%s, cost=%.4f, %d gens, %d evals" + % (all_hard_met, best_cost, n_generations, n_evaluations), + " Predefined: subtotal=%.4f" % hard_bd["total"], + ] + if constraints: + summary.append(f" User: subtotal={user_total:.4f}") + top_violators = sorted(all_violators, key=lambda x: x["cost"], reverse=True)[:3] + top_violators = [v for v in top_violators if v["cost"] > 0] + if top_violators: + summary.append(" Top violators:") + for v in top_violators: + summary.append(f" {v['name']} = {v['cost']:.4f}") + logger.info("\n".join(summary)) return _vector_to_placements(best_vector, devices)