chore(DE): add debug mode and detailed log regarding cost changes

This commit is contained in:
yexiaozhou
2026-04-02 12:05:49 +08:00
parent 9ef24b7768
commit 31e79e9aff
2 changed files with 176 additions and 4 deletions

View File

@@ -20,6 +20,8 @@ from .obb import (
)
if TYPE_CHECKING:
from typing import Any
from .interfaces import CollisionChecker, ReachabilityChecker
# 归一化默认权重 — 1cm距离违规 ≈ 5°角度违规 的惩罚量级
@@ -482,6 +484,98 @@ def _opening_surface_center(
return (world_x, world_y)
def evaluate_default_hard_constraints_breakdown(
devices: list[Device],
placements: list[Placement],
lab: Lab,
collision_checker: CollisionChecker,
*,
collision_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER,
boundary_weight: float = DEFAULT_WEIGHT_DISTANCE * HARD_MULTIPLIER,
) -> dict[str, float]:
"""与 evaluate_default_hard_constraints 逻辑相同,但返回分项明细。"""
device_map = {d.id: d for d in devices}
collision_cost = 0.0
boundary_cost = 0.0
candidate_pairs = sweep_and_prune_pairs(devices, placements)
for i, j in candidate_pairs:
di, dj = device_map[placements[i].device_id], device_map[placements[j].device_id]
ci = obb_corners(placements[i].x, placements[i].y,
di.bbox[0], di.bbox[1], placements[i].theta)
cj = obb_corners(placements[j].x, placements[j].y,
dj.bbox[0], dj.bbox[1], placements[j].theta)
depth = obb_penetration_depth(ci, cj)
if depth > 0:
collision_cost += collision_weight * depth
for p in placements:
dev = device_map[p.device_id]
hw, hd = p.rotated_bbox(dev)
overshoot = 0.0
overshoot += max(0.0, hw - p.x)
overshoot += max(0.0, (p.x + hw) - lab.width)
overshoot += max(0.0, hd - p.y)
overshoot += max(0.0, (p.y + hd) - lab.depth)
boundary_cost += boundary_weight * overshoot
return {
"collision": collision_cost,
"boundary": boundary_cost,
"total": collision_cost + boundary_cost,
"collision_weight": collision_weight,
"boundary_weight": boundary_weight,
}
def evaluate_constraints_breakdown(
devices: list[Device],
placements: list[Placement],
lab: Lab,
constraints: list[Constraint],
collision_checker: CollisionChecker,
reachability_checker: ReachabilityChecker | None = None,
) -> list[dict[str, Any]]:
"""与 evaluate_constraints 逻辑相同,但返回每条约束的分项明细。"""
device_map = {d.id: d for d in devices}
placement_map = {p.device_id: p for p in placements}
results = []
for c in constraints:
cost = _evaluate_single(
c, device_map, placement_map, lab, collision_checker, reachability_checker,
graduated=True,
)
ew = c.weight
if c.priority and c.priority in PRIORITY_MULTIPLIERS:
ew *= PRIORITY_MULTIPLIERS[c.priority]
results.append({
"name": _constraint_display_name(c),
"rule": c.rule_name,
"type": c.type,
"cost": cost,
"weight": ew,
})
return results
def _constraint_display_name(c: Constraint) -> str:
"""为约束生成可读的显示名称。"""
params = c.params
if c.rule_name in (
"distance_less_than", "distance_greater_than",
"minimize_distance", "maximize_distance",
):
return f"{c.rule_name}({params.get('device_a', '?')}, {params.get('device_b', '?')})"
if c.rule_name == "reachability":
return f"reachability({params.get('arm_id', '?')}, {params.get('target_device_id', '?')})"
if c.rule_name == "min_spacing":
return f"min_spacing(gap={params.get('min_gap', '?')})"
if c.rule_name == "prefer_orientation_mode":
return f"prefer_orientation_mode({params.get('mode', '?')})"
return c.rule_name
def _line_of_sight_penalty(
arm_id: str,
arm_p: Placement,

View File

@@ -13,7 +13,12 @@ from typing import Any, Callable
import numpy as np
from .constraints import evaluate_constraints, evaluate_default_hard_constraints
from .constraints import (
evaluate_constraints,
evaluate_constraints_breakdown,
evaluate_default_hard_constraints,
evaluate_default_hard_constraints_breakdown,
)
from .mock_checkers import MockCollisionChecker, MockReachabilityChecker
from .models import Constraint, Device, Lab, Placement
from .pencil_integration import generate_initial_layout
@@ -34,6 +39,7 @@ def _run_de(
seed: int | None,
n_devices: int,
strategy: str = "currenttobest1bin",
progress_callback: Callable[[int, np.ndarray, float], None] | None = None,
) -> tuple[np.ndarray, float, int]:
"""自定义差分进化循环。
@@ -56,6 +62,7 @@ def _run_de(
seed: 随机种子
n_devices: 设备数量(用于 per-device crossover
strategy: 变异策略,"currenttobest1bin""best1bin"
progress_callback: 每 10 代调用一次 (gen, best_vector, best_cost)
Returns:
(best_vector, best_cost, n_generations)
@@ -125,6 +132,10 @@ def _run_de(
# 更新 best_idx种群可能整体更新
best_idx = int(np.argmin(costs))
# 进度回调:每 10 代报告最优个体状态
if progress_callback and gen % 10 == 0:
progress_callback(gen, best_vector, best_cost)
# Early stopping最近 patience 代改善 < 0.1%
best_cost_history.append(best_cost)
if len(best_cost_history) >= patience:
@@ -296,6 +307,36 @@ def optimize(
n, 3 * n, pop_count, maxiter, strategy,
)
# DEBUG 模式进度回调:每 10 代输出完整约束分项表格
def _progress_cb(gen: int, best_vec: np.ndarray, best_cost_val: float) -> None:
if not logger.isEnabledFor(logging.DEBUG):
return
pls = _vector_to_placements(best_vec, devices)
hard_bd = evaluate_default_hard_constraints_breakdown(
devices, pls, lab, collision_checker,
)
lines = [f"=== DE Gen {gen} | best_cost={best_cost_val:.4f} ==="]
lines.append(f" {'Constraint':<45} {'Type':<6} {'Weight':>8} {'Cost':>10}")
lines.append(f" {'' * 71}")
lines.append(
f" {'[predefined] collision':<45} {'hard':<6} {hard_bd['collision_weight']:>8.0f} {hard_bd['collision']:>10.4f}"
)
lines.append(
f" {'[predefined] boundary':<45} {'hard':<6} {hard_bd['boundary_weight']:>8.0f} {hard_bd['boundary']:>10.4f}"
)
if constraints:
user_bd = evaluate_constraints_breakdown(
devices, pls, lab, constraints,
collision_checker, reachability_checker,
)
for item in user_bd:
lines.append(
f" {item['name']:<45} {item['type']:<6} {item['weight']:>8.1f} {item['cost']:>10.4f}"
)
lines.append(f" {'' * 71}")
lines.append(f" {'TOTAL':<45} {'':6} {'':>8} {best_cost_val:>10.4f}")
logger.debug("\n".join(lines))
best_vector, best_cost, n_generations = _run_de(
cost_fn=cost_function,
bounds=bounds_array,
@@ -308,15 +349,52 @@ def optimize(
seed=seed,
n_devices=n,
strategy=strategy,
progress_callback=_progress_cb,
)
# 评估次数估算:每代 pop_count 次(初始 + 每代 trial
n_evaluations = pop_count + n_generations * pop_count
logger.info(
"DE optimization complete: success=%s, cost=%.4f, iterations=%d, evaluations=%d",
best_cost < 1e17, best_cost, n_generations, n_evaluations,
# 最终布局分项明细INFO 级别)
final_placements = _vector_to_placements(best_vector, devices)
hard_bd = evaluate_default_hard_constraints_breakdown(
devices, final_placements, lab, collision_checker,
)
# success = 所有 hard 约束均满足predefined + 用户 hard
all_hard_met = hard_bd["total"] == 0.0
# 所有约束的 top violators 候选池predefined + user
all_violators: list[dict] = [
{"name": "[predefined] collision", "cost": hard_bd["collision"]},
{"name": "[predefined] boundary", "cost": hard_bd["boundary"]},
]
if constraints:
user_bd = evaluate_constraints_breakdown(
devices, final_placements, lab, constraints,
collision_checker, reachability_checker,
)
user_total = sum(item["cost"] for item in user_bd)
for c_item in user_bd:
all_violators.append({"name": c_item["name"], "cost": c_item["cost"]})
if c_item["type"] == "hard" and c_item["cost"] > 0:
all_hard_met = False
else:
user_bd = []
user_total = 0.0
summary = [
"DE complete: success=%s, cost=%.4f, %d gens, %d evals"
% (all_hard_met, best_cost, n_generations, n_evaluations),
" Predefined: subtotal=%.4f" % hard_bd["total"],
]
if constraints:
summary.append(f" User: subtotal={user_total:.4f}")
top_violators = sorted(all_violators, key=lambda x: x["cost"], reverse=True)[:3]
top_violators = [v for v in top_violators if v["cost"] > 0]
if top_violators:
summary.append(" Top violators:")
for v in top_violators:
summary.append(f" {v['name']} = {v['cost']:.4f}")
logger.info("\n".join(summary))
return _vector_to_placements(best_vector, devices)