Files
Uni-Lab-OS/unilabos/layout_optimizer/optimizer.py

904 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""差分进化布局优化器。
编码N 个设备 → 3N 维向量 [x0, y0, θ0, x1, y1, θ1, ...]
使用自定义差分进化循环per-device crossover + θ wrapping进行全局优化。
初始布局Pencil/回退)注入为种群种子个体加速收敛。
"""
from __future__ import annotations
import logging
import math
from typing import Any, Callable
import numpy as np
from .constraints import (
evaluate_constraints,
evaluate_constraints_breakdown,
evaluate_default_hard_constraints,
evaluate_default_hard_constraints_breakdown,
)
from .mock_checkers import MockCollisionChecker, MockReachabilityChecker
from .models import Constraint, Device, Lab, Placement
from .pencil_integration import generate_initial_layout
from .seeders import resolve_seeder_params, seed_layout
logger = logging.getLogger(__name__)
def _run_de(
cost_fn: Callable[[np.ndarray], float],
bounds: np.ndarray,
init_pop: np.ndarray,
maxiter: int,
tol: float,
atol: float,
mutation: tuple[float, float],
recombination: float,
seed: int | None,
n_devices: int,
strategy: str = "currenttobest1bin",
progress_callback: Callable[[int, np.ndarray, float], None] | None = None,
) -> tuple[np.ndarray, float, int]:
"""自定义差分进化循环。
特性:
- 支持 currenttobest1bin / best1bin 两种策略
- Per-device crossover以设备 (x, y, θ) 三元组为原子单元进行交叉
- θ wrapping交叉后对角度取模 [0, 2π)
- Early stopping最近 20 代改善 < 0.1% 时提前终止
- scipy 风格收敛判断std(costs) <= atol + tol * |best_cost|
Args:
cost_fn: 目标函数 f(x) → float
bounds: 边界数组 shape=(ndim, 2),每行 [low, high]
init_pop: 初始种群 shape=(pop_size, ndim)
maxiter: 最大迭代代数
tol: 相对收敛容差
atol: 绝对收敛容差
mutation: 变异因子范围 (F_min, F_max)
recombination: 交叉概率 CR
seed: 随机种子
n_devices: 设备数量(用于 per-device crossover
strategy: 变异策略,"currenttobest1bin""best1bin"
progress_callback: 每 10 代调用一次 (gen, best_vector, best_cost)
Returns:
(best_vector, best_cost, n_generations)
"""
rng = np.random.default_rng(seed)
pop_size, ndim = init_pop.shape
lower = bounds[:, 0]
upper = bounds[:, 1]
f_min, f_max = mutation
# 评估初始种群适应度
costs = np.array([cost_fn(ind) for ind in init_pop])
best_idx = int(np.argmin(costs))
best_cost = costs[best_idx]
best_vector = init_pop[best_idx].copy()
# Early stopping 跟踪
patience = 200
best_cost_history: list[float] = [best_cost]
for gen in range(1, maxiter + 1):
for i in range(pop_size):
# 选择变异因子 F每个个体独立采样
f_val = rng.uniform(f_min, f_max)
# 选择两个不同于 i 和 best_idx 的个体索引
candidates = list(range(pop_size))
candidates.remove(i)
chosen = rng.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
# 变异向量
if strategy == "best1bin":
# Turbo 模式mutant = best + F*(r1 - r2)
mutant = best_vector + f_val * (init_pop[r1] - init_pop[r2])
else:
# 默认 currenttobest1binmutant = target + F*(best - target) + F*(r1 - r2)
mutant = (
init_pop[i]
# add a scaled minimum to encourage exploration
+ f_val * 0.1 * (upper - lower) * rng.uniform(-1, 1, size=ndim)
+ f_val * (best_vector - init_pop[i])
+ f_val * (init_pop[r1] - init_pop[r2])
)
# Per-device crossover以 (x, y, θ) 三元组为原子单元
trial = init_pop[i].copy()
j_rand = rng.integers(0, n_devices) # 保证至少一个设备来自 mutant
for d in range(n_devices):
if rng.random() < recombination or d == j_rand:
trial[3 * d: 3 * d + 3] = mutant[3 * d: 3 * d + 3]
# θ wrapping角度取模 [0, 2π)
for d in range(n_devices):
trial[3 * d + 2] %= 2 * math.pi
# 钳位到边界内
trial = np.clip(trial, lower, upper)
# 贪心选择trial 不比当前差则替换
trial_cost = cost_fn(trial)
if trial_cost <= costs[i]:
init_pop[i] = trial
costs[i] = trial_cost
if trial_cost < best_cost:
best_cost = trial_cost
best_vector = trial.copy()
# 更新 best_idx种群可能整体更新
best_idx = int(np.argmin(costs))
# 进度回调:每 10 代报告最优个体状态
if progress_callback and gen % 10 == 0:
progress_callback(gen, best_vector, best_cost)
# Early stopping最近 patience 代改善 < 0.1%
best_cost_history.append(best_cost)
if len(best_cost_history) >= patience:
old_cost = best_cost_history[-patience]
if old_cost > 0:
improvement = (old_cost - best_cost) / old_cost
else:
improvement = 0.0
if improvement < 0.001:
logger.info(
"Early stop: cost 在 %d 代内稳定在 %.4f(改善 < 0.1%%",
patience, best_cost,
)
return best_vector, best_cost, gen
# scipy 风格收敛判断
if np.std(costs) <= atol + tol * abs(best_cost):
logger.info(
"收敛终止std(costs)=%.6f <= atol+tol*|best|=%.6f,第 %d",
np.std(costs), atol + tol * abs(best_cost), gen,
)
return best_vector, best_cost, gen
return best_vector, best_cost, maxiter
def _generate_seeds(
devices: list[Device],
lab: Lab,
rng: np.random.Generator,
workflow_edges: list[list[str]] | None = None,
n_variants: int = 3,
sigma_pos_frac: float = 0.05,
sigma_theta: float = math.pi / 6,
) -> list[np.ndarray]:
"""从多个 seeder preset 生成多样性种子个体 + 变异版本。"""
seeds: list[np.ndarray] = []
presets = ["compact_outward", "spread_inward"]
if workflow_edges:
presets.append("workflow_cluster")
for preset_name in presets:
try:
params = resolve_seeder_params(preset_name)
except ValueError:
continue
if params is None:
continue
base_placements = seed_layout(devices, lab, params, workflow_edges)
base_vec = _placements_to_vector(base_placements, devices)
seeds.append(base_vec)
# 变异版本:对 (x,y) 加高斯噪声 σ=5% lab 尺寸,θ 加 σ=π/6
for _ in range(n_variants):
variant = base_vec.copy()
for d in range(len(devices)):
variant[3 * d] += rng.normal(0, sigma_pos_frac * lab.width)
variant[3 * d + 1] += rng.normal(0, sigma_pos_frac * lab.depth)
variant[3 * d + 2] += rng.normal(0, sigma_theta)
variant[3 * d + 2] %= 2 * math.pi
seeds.append(variant)
return seeds
def _build_bounds(
devices: list[Device], lab: Lab, *, include_theta: bool,
) -> np.ndarray:
"""构建搜索边界。"""
bounds = []
for dev in devices:
half_min = min(dev.bbox[0], dev.bbox[1]) / 2
bounds.append((half_min, lab.width - half_min))
bounds.append((half_min, lab.depth - half_min))
if include_theta:
bounds.append((0, 2 * math.pi))
return np.array(bounds)
def _evaluate_layout_cost(
devices: list[Device],
placements: list[Placement],
lab: Lab,
collision_checker: Any,
reachability_checker: Any,
constraints: list[Constraint],
) -> float:
"""统一计算布局总 cost。"""
hard_cost = evaluate_default_hard_constraints(
devices, placements, lab, collision_checker,
)
if math.isinf(hard_cost):
return 1e18
if constraints:
user_cost = evaluate_constraints(
devices, placements, lab, constraints,
collision_checker, reachability_checker,
)
if math.isinf(user_cost):
return 1e18
return hard_cost + user_cost
return hard_cost
def _make_progress_callback(
devices: list[Device],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
placements_from_vector: Callable[[np.ndarray], list[Placement]],
) -> Callable[[int, np.ndarray, float], None]:
"""构造统一的 DEBUG 进度回调。"""
def _progress_cb(gen: int, best_vec: np.ndarray, best_cost_val: float) -> None:
if not logger.isEnabledFor(logging.DEBUG):
return
pls = placements_from_vector(best_vec)
hard_bd = evaluate_default_hard_constraints_breakdown(
devices, pls, lab, collision_checker,
)
lines = [f"=== DE Gen {gen} | best_cost={best_cost_val:.4f} ==="]
lines.append(f" {'Constraint':<45} {'Type':<6} {'Weight':>8} {'Cost':>10}")
lines.append(f" {'' * 71}")
lines.append(
f" {'[predefined] collision':<45} {'hard':<6} {hard_bd['collision_weight']:>8.0f} {hard_bd['collision']:>10.4f}"
)
lines.append(
f" {'[predefined] boundary':<45} {'hard':<6} {hard_bd['boundary_weight']:>8.0f} {hard_bd['boundary']:>10.4f}"
)
if constraints:
user_bd = evaluate_constraints_breakdown(
devices, pls, lab, constraints,
collision_checker, reachability_checker,
)
for item in user_bd:
lines.append(
f" {item['name']:<45} {item['type']:<6} {item['weight']:>8.1f} {item['cost']:>10.4f}"
)
lines.append(f" {'' * 71}")
lines.append(f" {'TOTAL':<45} {'':6} {'':>8} {best_cost_val:>10.4f}")
logger.debug("\n".join(lines))
return _progress_cb
def _log_final_summary(
devices: list[Device],
final_placements: list[Placement],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
best_cost: float,
n_generations: int,
n_evaluations: int,
) -> None:
"""输出最终布局分项明细。"""
hard_bd = evaluate_default_hard_constraints_breakdown(
devices, final_placements, lab, collision_checker,
)
all_hard_met = hard_bd["total"] == 0.0
all_violators: list[dict] = [
{"name": "[predefined] collision", "cost": hard_bd["collision"]},
{"name": "[predefined] boundary", "cost": hard_bd["boundary"]},
]
if constraints:
user_bd = evaluate_constraints_breakdown(
devices, final_placements, lab, constraints,
collision_checker, reachability_checker,
)
user_total = sum(item["cost"] for item in user_bd)
for c_item in user_bd:
all_violators.append({"name": c_item["name"], "cost": c_item["cost"]})
if c_item["type"] == "hard" and c_item["cost"] > 0:
all_hard_met = False
else:
user_total = 0.0
summary = [
"DE complete: success=%s, cost=%.4f, %d gens, %d evals"
% (all_hard_met, best_cost, n_generations, n_evaluations),
" Predefined: subtotal=%.4f" % hard_bd["total"],
]
if constraints:
summary.append(f" User: subtotal={user_total:.4f}")
top_violators = sorted(all_violators, key=lambda x: x["cost"], reverse=True)[:3]
top_violators = [v for v in top_violators if v["cost"] > 0]
if top_violators:
summary.append(" Top violators:")
for v in top_violators:
summary.append(f" {v['name']} = {v['cost']:.4f}")
logger.info("\n".join(summary))
def _angle_lattice(granularity: int) -> list[float]:
"""生成角度离散格点。"""
return [(2 * math.pi * idx) / granularity for idx in range(granularity)]
def _nearest_lattice_theta(theta: float, angles: list[float]) -> float:
"""返回距离最近的离散角度。"""
theta_mod = theta % (2 * math.pi)
return min(
angles,
key=lambda angle: min(
abs(theta_mod - angle),
2 * math.pi - abs(theta_mod - angle),
),
)
def _snap_placements_to_lattice(
placements: list[Placement], angles: list[float],
) -> list[Placement]:
"""将所有设备角度吸附到离散格点。"""
return [
Placement(
device_id=p.device_id,
x=p.x,
y=p.y,
theta=_nearest_lattice_theta(p.theta, angles),
uuid=p.uuid,
)
for p in placements
]
def _placements_to_position_vector(
placements: list[Placement], devices: list[Device],
) -> np.ndarray:
"""将 Placement 列表编码为 2N 维位置向量。"""
placement_map = {p.device_id: p for p in placements}
vec = np.zeros(2 * len(devices))
for i, dev in enumerate(devices):
p = placement_map.get(dev.id)
if p is not None:
vec[2 * i] = p.x
vec[2 * i + 1] = p.y
return vec
def _position_vector_to_placements(
x: np.ndarray,
devices: list[Device],
base_placements: list[Placement],
) -> list[Placement]:
"""将 2N 维位置向量解码为保留 theta 的 Placement 列表。"""
base_map = {p.device_id: p for p in base_placements}
placements = []
for i, dev in enumerate(devices):
base = base_map.get(dev.id)
theta = base.theta if base is not None else 0.0
uuid = base.uuid if base is not None else ""
placements.append(
Placement(
device_id=dev.id,
x=float(x[2 * i]),
y=float(x[2 * i + 1]),
theta=float(theta % (2 * math.pi)),
uuid=uuid,
)
)
return placements
def _run_de_xy(
cost_fn: Callable[[np.ndarray], float],
bounds: np.ndarray,
init_pop: np.ndarray,
maxiter: int,
tol: float,
atol: float,
mutation: tuple[float, float],
recombination: float,
seed: int | None,
n_devices: int,
strategy: str = "currenttobest1bin",
progress_callback: Callable[[int, np.ndarray, float], None] | None = None,
) -> tuple[np.ndarray, float, int]:
"""固定 theta 的 2N 维位置 DE。"""
rng = np.random.default_rng(seed)
pop_size, ndim = init_pop.shape
lower = bounds[:, 0]
upper = bounds[:, 1]
f_min, f_max = mutation
costs = np.array([cost_fn(ind) for ind in init_pop])
best_idx = int(np.argmin(costs))
best_cost = costs[best_idx]
best_vector = init_pop[best_idx].copy()
patience = 200
best_cost_history: list[float] = [best_cost]
for gen in range(1, maxiter + 1):
for i in range(pop_size):
f_val = rng.uniform(f_min, f_max)
candidates = list(range(pop_size))
candidates.remove(i)
chosen = rng.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
if strategy == "best1bin":
mutant = best_vector + f_val * (init_pop[r1] - init_pop[r2])
else:
mutant = (
init_pop[i]
+ f_val * 0.1 * (upper - lower) * rng.uniform(-1, 1, size=ndim)
+ f_val * (best_vector - init_pop[i])
+ f_val * (init_pop[r1] - init_pop[r2])
)
trial = init_pop[i].copy()
j_rand = rng.integers(0, n_devices)
for d in range(n_devices):
if rng.random() < recombination or d == j_rand:
trial[2 * d: 2 * d + 2] = mutant[2 * d: 2 * d + 2]
trial = np.clip(trial, lower, upper)
trial_cost = cost_fn(trial)
if trial_cost <= costs[i]:
init_pop[i] = trial
costs[i] = trial_cost
if trial_cost < best_cost:
best_cost = trial_cost
best_vector = trial.copy()
best_idx = int(np.argmin(costs))
if progress_callback and gen % 10 == 0:
progress_callback(gen, best_vector, best_cost)
best_cost_history.append(best_cost)
if len(best_cost_history) >= patience:
old_cost = best_cost_history[-patience]
improvement = (old_cost - best_cost) / old_cost if old_cost > 0 else 0.0
if improvement < 0.001:
logger.info(
"Early stop: cost 在 %d 代内稳定在 %.4f(改善 < 0.1%%",
patience, best_cost,
)
return best_vector, best_cost, gen
if np.std(costs) <= atol + tol * abs(best_cost):
logger.info(
"收敛终止std(costs)=%.6f <= atol+tol*|best|=%.6f,第 %d",
np.std(costs), atol + tol * abs(best_cost), gen,
)
return best_vector, best_cost, gen
return best_vector, best_cost, maxiter
def _angle_sweep_once(
devices: list[Device],
placements: list[Placement],
angles: list[float],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
) -> tuple[list[Placement], float, bool]:
"""固定位置做一轮逐设备离散角度贪心扫描。"""
current = list(placements)
current_cost = _evaluate_layout_cost(
devices, current, lab, collision_checker, reachability_checker, constraints,
)
changed = False
for idx, dev in enumerate(devices):
best_theta = current[idx].theta
best_cost = current_cost
for angle in angles:
if abs((best_theta - angle) % (2 * math.pi)) < 1e-9:
continue
candidate = list(current)
base = candidate[idx]
candidate[idx] = Placement(
device_id=base.device_id,
x=base.x,
y=base.y,
theta=angle,
uuid=base.uuid,
)
candidate_cost = _evaluate_layout_cost(
devices, candidate, lab, collision_checker, reachability_checker, constraints,
)
if candidate_cost < best_cost - 1e-9:
best_theta = angle
best_cost = candidate_cost
if abs((current[idx].theta - best_theta) % (2 * math.pi)) >= 1e-9:
base = current[idx]
current[idx] = Placement(
device_id=base.device_id,
x=base.x,
y=base.y,
theta=best_theta,
uuid=base.uuid,
)
current_cost = best_cost
changed = True
return current, current_cost, changed
def _optimize_positions_fixed_theta(
devices: list[Device],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
seed_placements: list[Placement],
maxiter: int,
popsize: int,
tol: float,
seed: int | None,
strategy: str,
) -> tuple[list[Placement], float, int, int]:
"""在固定离散 theta 下,只优化位置。"""
n = len(devices)
bounds_array = _build_bounds(devices, lab, include_theta=False)
seed_vector = np.clip(
_placements_to_position_vector(seed_placements, devices),
bounds_array[:, 0],
bounds_array[:, 1],
)
def cost_function(x: np.ndarray) -> float:
placements = _position_vector_to_placements(x, devices, seed_placements)
return _evaluate_layout_cost(
devices, placements, lab, collision_checker, reachability_checker, constraints,
)
rng = np.random.default_rng(seed)
pop_count = popsize * 2 * n
init_pop = rng.uniform(
bounds_array[:, 0], bounds_array[:, 1], size=(pop_count, 2 * n),
)
init_pop[0] = seed_vector
progress_cb = _make_progress_callback(
devices,
lab,
constraints,
collision_checker,
reachability_checker,
lambda vec: _position_vector_to_placements(vec, devices, seed_placements),
)
best_vector, best_cost, n_generations = _run_de_xy(
cost_fn=cost_function,
bounds=bounds_array,
init_pop=init_pop,
maxiter=maxiter,
tol=tol,
atol=1e-3,
mutation=(0.5, 1.0),
recombination=0.7,
seed=seed,
n_devices=n,
strategy=strategy,
progress_callback=progress_cb,
)
return (
_position_vector_to_placements(best_vector, devices, seed_placements),
best_cost,
n_generations,
pop_count + n_generations * pop_count,
)
def optimize(
devices: list[Device],
lab: Lab,
constraints: list[Constraint] | None = None,
collision_checker: Any | None = None,
reachability_checker: Any | None = None,
seed_placements: list[Placement] | None = None,
maxiter: int = 200,
popsize: int = 15,
tol: float = 1e-6,
seed: int | None = None,
strategy: str = "currenttobest1bin",
workflow_edges: list[list[str]] | None = None,
angle_granularity: int | None = None,
) -> list[Placement]:
"""运行差分进化优化,返回最优布局。
Args:
devices: 待排布的设备列表
lab: 实验室平面图
constraints: 用户自定义约束列表(可选)
collision_checker: 碰撞检测实例(默认使用 MockCollisionChecker
reachability_checker: 可达性检测实例(默认使用 MockReachabilityChecker
seed_placements: 种子布局(若为 None 则自动生成)
maxiter: 最大迭代次数
popsize: 种群大小倍数
tol: 收敛容差
seed: 随机种子(用于可复现性)
strategy: DE 变异策略("currenttobest1bin""best1bin"
Returns:
最优布局 Placement 列表
"""
if not devices:
return []
if collision_checker is None:
collision_checker = MockCollisionChecker()
if reachability_checker is None:
reachability_checker = MockReachabilityChecker()
if constraints is None:
constraints = []
n = len(devices)
bounds_array = _build_bounds(devices, lab, include_theta=True)
# 生成种子个体
if seed_placements is None:
seed_placements = generate_initial_layout(devices, lab)
seed_vector = _placements_to_vector(seed_placements, devices)
# 将种子钳位到边界内
seed_vector = np.clip(seed_vector, bounds_array[:, 0], bounds_array[:, 1])
def cost_function(x: np.ndarray) -> float:
placements = _vector_to_placements(x, devices)
return _evaluate_layout_cost(
devices, placements, lab, collision_checker, reachability_checker, constraints,
)
if angle_granularity is not None:
angles = _angle_lattice(angle_granularity)
current_placements = _snap_placements_to_lattice(seed_placements, angles)
best_placements = current_placements
best_cost = _evaluate_layout_cost(
devices, best_placements, lab, collision_checker, reachability_checker, constraints,
)
total_generations = 0
total_evaluations = 0
logger.info(
"Starting hybrid optimization: %d devices, granularity=%d, outer_rounds=%d, strategy=%s",
n, angle_granularity, 3, strategy,
)
maxiter_xy = max(40, math.ceil(maxiter / 3))
for round_idx in range(3):
round_start_best = best_cost
angle_placements, angle_cost, changed = _angle_sweep_once(
devices,
current_placements,
angles,
lab,
constraints,
collision_checker,
reachability_checker,
)
round_seed = None if seed is None else seed + round_idx
polished_placements, polished_cost, n_generations, n_evaluations = (
_optimize_positions_fixed_theta(
devices=devices,
lab=lab,
constraints=constraints,
collision_checker=collision_checker,
reachability_checker=reachability_checker,
seed_placements=angle_placements,
maxiter=maxiter_xy,
popsize=popsize,
tol=tol,
seed=round_seed,
strategy=strategy,
)
)
total_generations += n_generations
total_evaluations += n_evaluations
current_placements = polished_placements
if polished_cost < best_cost:
best_cost = polished_cost
best_placements = polished_placements
improved = polished_cost < round_start_best - 1e-9
logger.info(
"Hybrid round %d complete: changed=%s, angle_cost=%.4f, polished_cost=%.4f",
round_idx + 1, changed, angle_cost, polished_cost,
)
if not changed and not improved:
logger.info(
"Hybrid early stop: 第 %d 轮无角度变化且无 cost 改善",
round_idx + 1,
)
break
_log_final_summary(
devices,
best_placements,
lab,
constraints,
collision_checker,
reachability_checker,
best_cost,
total_generations,
total_evaluations,
)
return best_placements
# 构建初始种群:种子个体 + 多样性种子 + 随机个体
rng = np.random.default_rng(seed)
pop_count = popsize * 3 * n # scipy 默认 popsize * dim
init_pop = rng.uniform(
bounds_array[:, 0], bounds_array[:, 1], size=(pop_count, 3 * n)
)
init_pop[0] = seed_vector # 注入原始种子
# 多样性种子注入(多 preset + 变异版本)
extra_seeds = _generate_seeds(devices, lab, rng, workflow_edges)
for i, s in enumerate(extra_seeds):
idx = i + 1 # 原始种子占 [0]
if idx < pop_count:
init_pop[idx] = np.clip(s, bounds_array[:, 0], bounds_array[:, 1])
logger.info(
"Starting DE optimization: %d devices, %d-dim, popsize=%d, maxiter=%d, strategy=%s",
n, 3 * n, pop_count, maxiter, strategy,
)
progress_cb = _make_progress_callback(
devices,
lab,
constraints,
collision_checker,
reachability_checker,
lambda vec: _vector_to_placements(vec, devices),
)
best_vector, best_cost, n_generations = _run_de(
cost_fn=cost_function,
bounds=bounds_array,
init_pop=init_pop,
maxiter=maxiter,
tol=tol,
atol=1e-3,
mutation=(0.5, 1.0),
recombination=0.7,
seed=seed,
n_devices=n,
strategy=strategy,
progress_callback=progress_cb,
)
# 评估次数估算:每代 pop_count 次(初始 + 每代 trial
n_evaluations = pop_count + n_generations * pop_count
final_placements = _vector_to_placements(best_vector, devices)
_log_final_summary(
devices,
final_placements,
lab,
constraints,
collision_checker,
reachability_checker,
best_cost,
n_generations,
n_evaluations,
)
return final_placements
def snap_theta(placements: list[Placement], threshold_deg: float = 15.0) -> list[Placement]:
"""Snap each placement's theta to nearest 90° if within threshold.
Returns new Placement list (does not mutate input).
"""
threshold_rad = math.radians(threshold_deg)
cardinals = [0, math.pi / 2, math.pi, 3 * math.pi / 2, 2 * math.pi]
result = []
for p in placements:
theta_mod = p.theta % (2 * math.pi)
best_cardinal = min(cardinals, key=lambda c: abs(theta_mod - c))
if abs(theta_mod - best_cardinal) <= threshold_rad:
snapped = best_cardinal % (2 * math.pi)
else:
snapped = p.theta
result.append(Placement(
device_id=p.device_id, x=p.x, y=p.y, theta=snapped, uuid=p.uuid,
))
return result
def snap_theta_safe(
placements: list[Placement],
devices: list[Device],
lab: Lab,
collision_checker: Any,
threshold_deg: float = 15.0,
) -> list[Placement]:
"""Snap theta 到基数方向,但碰撞时回退到原始角度。
逐设备检查snap 后如果产生碰撞或越界,则该设备保留原始 theta。
"""
snapped = snap_theta(placements, threshold_deg)
result = list(snapped)
for idx, (orig, snap) in enumerate(zip(placements, snapped)):
if abs(orig.theta - snap.theta) < 1e-9:
continue # 未 snap跳过
# 检查 snap 版本是否导致新碰撞
test_placements = result.copy()
test_placements[idx] = snap
cost = evaluate_default_hard_constraints(
devices, test_placements, lab, collision_checker, graduated=False,
)
if math.isinf(cost):
result[idx] = orig # 回退到未 snap 的角度
logger.info(
"snap_theta_safe: 设备 %s snap θ=%.2f%.2f 导致碰撞,已回退",
snap.device_id, orig.theta, snap.theta,
)
return result
def _placements_to_vector(
placements: list[Placement], devices: list[Device]
) -> np.ndarray:
"""将 Placement 列表编码为 3N 维向量。
按 devices 列表的顺序排列。若某设备在 placements 中缺失,用 (0, 0, 0) 填充。
"""
placement_map = {p.device_id: p for p in placements}
vec = np.zeros(3 * len(devices))
for i, dev in enumerate(devices):
p = placement_map.get(dev.id)
if p is not None:
vec[3 * i] = p.x
vec[3 * i + 1] = p.y
vec[3 * i + 2] = p.theta
return vec
def _vector_to_placements(
x: np.ndarray, devices: list[Device]
) -> list[Placement]:
"""将 3N 维向量解码为 Placement 列表。"""
placements = []
for i, dev in enumerate(devices):
placements.append(
Placement(
device_id=dev.id,
x=float(x[3 * i]),
y=float(x[3 * i + 1]),
theta=float(x[3 * i + 2] % (2 * math.pi)),
)
)
return placements