Files
Uni-Lab-OS/unilabos/layout_optimizer/optimizer.py
yexiaozhou 99dc821a01 refactor(layout_optimizer): DE optimizer — discrete angles, strategy fixes, decoupled mutation, API exposure
- Extract _compute_mutant helper with circular angle diff (fixes 0/2π boundary bug)
- Fix currenttobest1bin (remove non-standard noise term), add rand1bin strategy
- Decoupled mutation: independent F ranges for position vs theta
- Configurable crossover mode: per-device (default) or per-dimension
- Discrete angle snapping in normal 3N DE (joint mode, replaces hybrid as default)
- Stop auto-injecting prefer_orientation_mode into DE
- Expose DE hyperparameters (mutation, theta_mutation, recombination, strategy, angle_mode) via API
2026-04-10 14:41:13 +08:00

1057 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""差分进化布局优化器。
编码N 个设备 → 3N 维向量 [x0, y0, θ0, x1, y1, θ1, ...]
使用自定义差分进化循环per-device crossover + θ wrapping进行全局优化。
初始布局Pencil/回退)注入为种群种子个体加速收敛。
"""
from __future__ import annotations
import logging
import math
from typing import Any, Callable
import numpy as np
from .constraints import (
evaluate_constraints,
evaluate_constraints_breakdown,
evaluate_default_hard_constraints,
evaluate_default_hard_constraints_breakdown,
)
from .mock_checkers import MockCollisionChecker, MockReachabilityChecker
from .models import Constraint, Device, Lab, Placement
from .pencil_integration import generate_initial_layout
from .seeders import resolve_seeder_params, seed_layout
logger = logging.getLogger(__name__)
def _circular_diff(
a: np.ndarray, b: np.ndarray, n_devices: int, dims_per_device: int = 3,
) -> np.ndarray:
"""计算 a - b对 theta 分量使用最短圆周距离。
对于 dims_per_device=3每个设备的第 3 个分量theta使用
(delta + π) % (2π) - π 计算最短角度差,避免 0/2π 边界跳变。
对于 dims_per_device=2纯位置等价于普通减法。
"""
result = a - b
if dims_per_device == 3:
two_pi = 2 * math.pi
for d in range(n_devices):
idx = 3 * d + 2
result[idx] = (result[idx] + math.pi) % two_pi - math.pi
return result
def _compute_mutant(
strategy: str,
pop: np.ndarray,
best_vector: np.ndarray,
target_idx: int,
f_val: float,
f_val_theta: float,
rng: np.random.Generator,
n_devices: int,
dims_per_device: int = 3,
) -> np.ndarray:
"""计算 DE 变异向量(统一所有策略)。
支持策略:
- "best1bin": mutant = best + F*(r1 - r2)
- "currenttobest1bin": mutant = target + F*(best - target) + F*(r1 - r2)
- "rand1bin": mutant = r0 + F*(r1 - r2)
使用 _circular_diff 处理角度差,避免 0/2π 边界问题。
当 f_val_theta != f_val 且 dims_per_device == 3 时,对 theta 分量
使用独立的变异因子 f_val_theta 进行缩放。
"""
pop_size = pop.shape[0]
candidates = list(range(pop_size))
candidates.remove(target_idx)
if strategy == "rand1bin":
chosen = rng.choice(candidates, size=3, replace=False)
r0, r1, r2 = int(chosen[0]), int(chosen[1]), int(chosen[2])
diff = _circular_diff(pop[r1], pop[r2], n_devices, dims_per_device)
mutant = pop[r0] + f_val * diff
elif strategy == "best1bin":
chosen = rng.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
diff = _circular_diff(pop[r1], pop[r2], n_devices, dims_per_device)
mutant = best_vector + f_val * diff
elif strategy == "currenttobest1bin":
chosen = rng.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
diff_best = _circular_diff(best_vector, pop[target_idx], n_devices, dims_per_device)
diff_rand = _circular_diff(pop[r1], pop[r2], n_devices, dims_per_device)
mutant = pop[target_idx] + f_val * diff_best + f_val * diff_rand
else:
raise ValueError(f"Unknown DE strategy: {strategy!r}")
# 解耦 theta 变异:当 f_val_theta != f_val 时重新缩放 theta 分量
if dims_per_device == 3 and f_val_theta != f_val:
for d_idx in range(n_devices):
theta_idx = 3 * d_idx + 2
# 确定该策略的 base theta变异前的参考点
if strategy == "best1bin":
base_theta = best_vector[theta_idx]
elif strategy == "currenttobest1bin":
base_theta = pop[target_idx, theta_idx]
else: # rand1bin
base_theta = pop[int(chosen[0]), theta_idx]
diff_theta = mutant[theta_idx] - base_theta
mutant[theta_idx] = base_theta + (f_val_theta / f_val) * diff_theta
return mutant
def _run_de(
cost_fn: Callable[[np.ndarray], float],
bounds: np.ndarray,
init_pop: np.ndarray,
maxiter: int,
tol: float,
atol: float,
mutation: tuple[float, float],
recombination: float,
seed: int | None,
n_devices: int,
strategy: str = "currenttobest1bin",
progress_callback: Callable[[int, np.ndarray, float], None] | None = None,
theta_mutation: tuple[float, float] | None = None,
crossover_mode: str = "device",
allowed_angles: list[float] | None = None,
) -> tuple[np.ndarray, float, int]:
"""自定义差分进化循环。
特性:
- 支持 currenttobest1bin / best1bin / rand1bin 三种策略
- Per-device crossover默认以设备 (x, y, θ) 三元组为原子单元进行交叉
- Per-dimension crossover可选每个标量维度独立交叉
- θ wrapping交叉后对角度取模 [0, 2π)
- 离散角度吸附:可选将 θ 吸附到指定格点
- 解耦变异position 和 theta 可使用不同 F 范围
- Early stopping最近 200 代改善 < 0.1% 时提前终止
- scipy 风格收敛判断std(costs) <= atol + tol * |best_cost|
Args:
cost_fn: 目标函数 f(x) → float
bounds: 边界数组 shape=(ndim, 2),每行 [low, high]
init_pop: 初始种群 shape=(pop_size, ndim)
maxiter: 最大迭代代数
tol: 相对收敛容差
atol: 绝对收敛容差
mutation: 变异因子范围 (F_min, F_max),用于位置分量
recombination: 交叉概率 CR
seed: 随机种子
n_devices: 设备数量(用于 per-device crossover
strategy: 变异策略,"currenttobest1bin""best1bin""rand1bin"
progress_callback: 每 10 代调用一次 (gen, best_vector, best_cost)
theta_mutation: theta 变异因子范围None 时使用 mutation
crossover_mode: "device"per-device 三元组原子交叉)或 "dimension"(逐维独立交叉)
allowed_angles: 离散角度格点列表,非 None 时将 θ 吸附到最近格点
Returns:
(best_vector, best_cost, n_generations)
"""
rng = np.random.default_rng(seed)
pop_size, ndim = init_pop.shape
lower = bounds[:, 0]
upper = bounds[:, 1]
if theta_mutation is None:
theta_mutation = mutation
# 离散角度:吸附初始种群 θ 到格点
if allowed_angles is not None:
for ind_idx in range(pop_size):
for d in range(n_devices):
init_pop[ind_idx, 3 * d + 2] = _nearest_lattice_theta(
init_pop[ind_idx, 3 * d + 2], allowed_angles,
)
# 评估初始种群适应度
costs = np.array([cost_fn(ind) for ind in init_pop])
best_idx = int(np.argmin(costs))
best_cost = costs[best_idx]
best_vector = init_pop[best_idx].copy()
# Early stopping 跟踪
patience = 200
best_cost_history: list[float] = [best_cost]
for gen in range(1, maxiter + 1):
for i in range(pop_size):
# 采样变异因子 F位置和 theta 各自独立)
f_val = rng.uniform(mutation[0], mutation[1])
f_val_theta = rng.uniform(theta_mutation[0], theta_mutation[1])
# 变异向量(使用统一 helper
mutant = _compute_mutant(
strategy, init_pop, best_vector, i,
f_val, f_val_theta, rng, n_devices, dims_per_device=3,
)
# 交叉
trial = init_pop[i].copy()
if crossover_mode == "dimension":
# 逐维独立交叉
j_rand = rng.integers(0, ndim)
for j in range(ndim):
if rng.random() < recombination or j == j_rand:
trial[j] = mutant[j]
else:
# Per-device crossover以 (x, y, θ) 三元组为原子单元
j_rand = rng.integers(0, n_devices)
for d in range(n_devices):
if rng.random() < recombination or d == j_rand:
trial[3 * d: 3 * d + 3] = mutant[3 * d: 3 * d + 3]
# θ wrapping角度取模 [0, 2π)
for d in range(n_devices):
trial[3 * d + 2] %= 2 * math.pi
# 离散角度吸附
if allowed_angles is not None:
for d in range(n_devices):
trial[3 * d + 2] = _nearest_lattice_theta(
trial[3 * d + 2], allowed_angles,
)
# 钳位到边界内,然后重新 normalize θ(避免 clip 破坏 modulo
trial = np.clip(trial, lower, upper)
for d in range(n_devices):
trial[3 * d + 2] %= 2 * math.pi
# 贪心选择trial 不比当前差则替换
trial_cost = cost_fn(trial)
if trial_cost <= costs[i]:
init_pop[i] = trial
costs[i] = trial_cost
if trial_cost < best_cost:
best_cost = trial_cost
best_vector = trial.copy()
# 更新 best_idx种群可能整体更新
best_idx = int(np.argmin(costs))
# 进度回调:每 10 代报告最优个体状态
if progress_callback and gen % 10 == 0:
progress_callback(gen, best_vector, best_cost)
# Early stopping最近 patience 代改善 < 0.1%
best_cost_history.append(best_cost)
if len(best_cost_history) >= patience:
old_cost = best_cost_history[-patience]
if old_cost > 0:
improvement = (old_cost - best_cost) / old_cost
else:
improvement = 0.0
if improvement < 0.001:
logger.info(
"Early stop: cost 在 %d 代内稳定在 %.4f(改善 < 0.1%%",
patience, best_cost,
)
return best_vector, best_cost, gen
# scipy 风格收敛判断
if np.std(costs) <= atol + tol * abs(best_cost):
logger.info(
"收敛终止std(costs)=%.6f <= atol+tol*|best|=%.6f,第 %d",
np.std(costs), atol + tol * abs(best_cost), gen,
)
return best_vector, best_cost, gen
return best_vector, best_cost, maxiter
def _generate_seeds(
devices: list[Device],
lab: Lab,
rng: np.random.Generator,
workflow_edges: list[list[str]] | None = None,
n_variants: int = 3,
sigma_pos_frac: float = 0.05,
sigma_theta: float = math.pi / 6,
allowed_angles: list[float] | None = None,
) -> list[np.ndarray]:
"""从多个 seeder preset 生成多样性种子个体 + 变异版本。"""
seeds: list[np.ndarray] = []
presets = ["compact_outward", "spread_inward"]
if workflow_edges:
presets.append("workflow_cluster")
for preset_name in presets:
try:
params = resolve_seeder_params(preset_name)
except ValueError:
continue
if params is None:
continue
base_placements = seed_layout(devices, lab, params, workflow_edges)
base_vec = _placements_to_vector(base_placements, devices)
# 离散角度吸附
if allowed_angles is not None:
for d in range(len(devices)):
base_vec[3 * d + 2] = _nearest_lattice_theta(
base_vec[3 * d + 2], allowed_angles,
)
seeds.append(base_vec)
# 变异版本:对 (x,y) 加高斯噪声 σ=5% lab 尺寸,θ 加 σ=π/6
for _ in range(n_variants):
variant = base_vec.copy()
for d in range(len(devices)):
variant[3 * d] += rng.normal(0, sigma_pos_frac * lab.width)
variant[3 * d + 1] += rng.normal(0, sigma_pos_frac * lab.depth)
variant[3 * d + 2] += rng.normal(0, sigma_theta)
variant[3 * d + 2] %= 2 * math.pi
if allowed_angles is not None:
variant[3 * d + 2] = _nearest_lattice_theta(
variant[3 * d + 2], allowed_angles,
)
seeds.append(variant)
return seeds
def _build_bounds(
devices: list[Device], lab: Lab, *, include_theta: bool,
) -> np.ndarray:
"""构建搜索边界。"""
bounds = []
for dev in devices:
half_min = min(dev.bbox[0], dev.bbox[1]) / 2
bounds.append((half_min, lab.width - half_min))
bounds.append((half_min, lab.depth - half_min))
if include_theta:
bounds.append((0, 2 * math.pi))
return np.array(bounds)
def _evaluate_layout_cost(
devices: list[Device],
placements: list[Placement],
lab: Lab,
collision_checker: Any,
reachability_checker: Any,
constraints: list[Constraint],
) -> float:
"""统一计算布局总 cost。"""
hard_cost = evaluate_default_hard_constraints(
devices, placements, lab, collision_checker,
)
if math.isinf(hard_cost):
return 1e18
if constraints:
user_cost = evaluate_constraints(
devices, placements, lab, constraints,
collision_checker, reachability_checker,
)
if math.isinf(user_cost):
return 1e18
return hard_cost + user_cost
return hard_cost
def _make_progress_callback(
devices: list[Device],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
placements_from_vector: Callable[[np.ndarray], list[Placement]],
) -> Callable[[int, np.ndarray, float], None]:
"""构造统一的 DEBUG 进度回调。"""
def _progress_cb(gen: int, best_vec: np.ndarray, best_cost_val: float) -> None:
if not logger.isEnabledFor(logging.DEBUG):
return
pls = placements_from_vector(best_vec)
hard_bd = evaluate_default_hard_constraints_breakdown(
devices, pls, lab, collision_checker,
)
lines = [f"=== DE Gen {gen} | best_cost={best_cost_val:.4f} ==="]
lines.append(f" {'Constraint':<45} {'Type':<6} {'Weight':>8} {'Cost':>10}")
lines.append(f" {'' * 71}")
lines.append(
f" {'[predefined] collision':<45} {'hard':<6} {hard_bd['collision_weight']:>8.0f} {hard_bd['collision']:>10.4f}"
)
lines.append(
f" {'[predefined] boundary':<45} {'hard':<6} {hard_bd['boundary_weight']:>8.0f} {hard_bd['boundary']:>10.4f}"
)
if constraints:
user_bd = evaluate_constraints_breakdown(
devices, pls, lab, constraints,
collision_checker, reachability_checker,
)
for item in user_bd:
lines.append(
f" {item['name']:<45} {item['type']:<6} {item['weight']:>8.1f} {item['cost']:>10.4f}"
)
lines.append(f" {'' * 71}")
lines.append(f" {'TOTAL':<45} {'':6} {'':>8} {best_cost_val:>10.4f}")
logger.debug("\n".join(lines))
return _progress_cb
def _log_final_summary(
devices: list[Device],
final_placements: list[Placement],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
best_cost: float,
n_generations: int,
n_evaluations: int,
) -> None:
"""输出最终布局分项明细。"""
hard_bd = evaluate_default_hard_constraints_breakdown(
devices, final_placements, lab, collision_checker,
)
all_hard_met = hard_bd["total"] == 0.0
all_violators: list[dict] = [
{"name": "[predefined] collision", "cost": hard_bd["collision"]},
{"name": "[predefined] boundary", "cost": hard_bd["boundary"]},
]
if constraints:
user_bd = evaluate_constraints_breakdown(
devices, final_placements, lab, constraints,
collision_checker, reachability_checker,
)
user_total = sum(item["cost"] for item in user_bd)
for c_item in user_bd:
all_violators.append({"name": c_item["name"], "cost": c_item["cost"]})
if c_item["type"] == "hard" and c_item["cost"] > 0:
all_hard_met = False
else:
user_total = 0.0
summary = [
"DE complete: success=%s, cost=%.4f, %d gens, %d evals"
% (all_hard_met, best_cost, n_generations, n_evaluations),
" Predefined: subtotal=%.4f" % hard_bd["total"],
]
if constraints:
summary.append(f" User: subtotal={user_total:.4f}")
top_violators = sorted(all_violators, key=lambda x: x["cost"], reverse=True)[:3]
top_violators = [v for v in top_violators if v["cost"] > 0]
if top_violators:
summary.append(" Top violators:")
for v in top_violators:
summary.append(f" {v['name']} = {v['cost']:.4f}")
logger.info("\n".join(summary))
def _angle_lattice(granularity: int) -> list[float]:
"""生成角度离散格点。"""
return [(2 * math.pi * idx) / granularity for idx in range(granularity)]
def _nearest_lattice_theta(theta: float, angles: list[float]) -> float:
"""返回距离最近的离散角度。"""
theta_mod = theta % (2 * math.pi)
return min(
angles,
key=lambda angle: min(
abs(theta_mod - angle),
2 * math.pi - abs(theta_mod - angle),
),
)
def _snap_placements_to_lattice(
placements: list[Placement], angles: list[float],
) -> list[Placement]:
"""将所有设备角度吸附到离散格点。"""
return [
Placement(
device_id=p.device_id,
x=p.x,
y=p.y,
theta=_nearest_lattice_theta(p.theta, angles),
uuid=p.uuid,
)
for p in placements
]
def _placements_to_position_vector(
placements: list[Placement], devices: list[Device],
) -> np.ndarray:
"""将 Placement 列表编码为 2N 维位置向量。"""
placement_map = {p.device_id: p for p in placements}
vec = np.zeros(2 * len(devices))
for i, dev in enumerate(devices):
p = placement_map.get(dev.id)
if p is not None:
vec[2 * i] = p.x
vec[2 * i + 1] = p.y
return vec
def _position_vector_to_placements(
x: np.ndarray,
devices: list[Device],
base_placements: list[Placement],
) -> list[Placement]:
"""将 2N 维位置向量解码为保留 theta 的 Placement 列表。"""
base_map = {p.device_id: p for p in base_placements}
placements = []
for i, dev in enumerate(devices):
base = base_map.get(dev.id)
theta = base.theta if base is not None else 0.0
uuid = base.uuid if base is not None else ""
placements.append(
Placement(
device_id=dev.id,
x=float(x[2 * i]),
y=float(x[2 * i + 1]),
theta=float(theta % (2 * math.pi)),
uuid=uuid,
)
)
return placements
def _run_de_xy(
cost_fn: Callable[[np.ndarray], float],
bounds: np.ndarray,
init_pop: np.ndarray,
maxiter: int,
tol: float,
atol: float,
mutation: tuple[float, float],
recombination: float,
seed: int | None,
n_devices: int,
strategy: str = "currenttobest1bin",
progress_callback: Callable[[int, np.ndarray, float], None] | None = None,
crossover_mode: str = "device",
) -> tuple[np.ndarray, float, int]:
"""固定 theta 的 2N 维位置 DE。"""
rng = np.random.default_rng(seed)
pop_size, ndim = init_pop.shape
lower = bounds[:, 0]
upper = bounds[:, 1]
costs = np.array([cost_fn(ind) for ind in init_pop])
best_idx = int(np.argmin(costs))
best_cost = costs[best_idx]
best_vector = init_pop[best_idx].copy()
patience = 60
best_cost_history: list[float] = [best_cost]
for gen in range(1, maxiter + 1):
for i in range(pop_size):
f_val = rng.uniform(mutation[0], mutation[1])
# 变异向量dims_per_device=2无 theta
mutant = _compute_mutant(
strategy, init_pop, best_vector, i,
f_val, f_val, rng, n_devices, dims_per_device=2,
)
# 交叉
trial = init_pop[i].copy()
if crossover_mode == "dimension":
j_rand = rng.integers(0, ndim)
for j in range(ndim):
if rng.random() < recombination or j == j_rand:
trial[j] = mutant[j]
else:
j_rand = rng.integers(0, n_devices)
for d in range(n_devices):
if rng.random() < recombination or d == j_rand:
trial[2 * d: 2 * d + 2] = mutant[2 * d: 2 * d + 2]
trial = np.clip(trial, lower, upper)
trial_cost = cost_fn(trial)
if trial_cost <= costs[i]:
init_pop[i] = trial
costs[i] = trial_cost
if trial_cost < best_cost:
best_cost = trial_cost
best_vector = trial.copy()
best_idx = int(np.argmin(costs))
if progress_callback and gen % 10 == 0:
progress_callback(gen, best_vector, best_cost)
best_cost_history.append(best_cost)
if len(best_cost_history) >= patience:
old_cost = best_cost_history[-patience]
improvement = (old_cost - best_cost) / old_cost if old_cost > 0 else 0.0
if improvement < 0.001:
logger.info(
"Early stop: cost 在 %d 代内稳定在 %.4f(改善 < 0.1%%",
patience, best_cost,
)
return best_vector, best_cost, gen
if np.std(costs) <= atol + tol * abs(best_cost):
logger.info(
"收敛终止std(costs)=%.6f <= atol+tol*|best|=%.6f,第 %d",
np.std(costs), atol + tol * abs(best_cost), gen,
)
return best_vector, best_cost, gen
return best_vector, best_cost, maxiter
def _angle_sweep_once(
devices: list[Device],
placements: list[Placement],
angles: list[float],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
) -> tuple[list[Placement], float, bool]:
"""固定位置做一轮逐设备离散角度贪心扫描。"""
current = list(placements)
current_cost = _evaluate_layout_cost(
devices, current, lab, collision_checker, reachability_checker, constraints,
)
changed = False
for idx, dev in enumerate(devices):
best_theta = current[idx].theta
best_cost = current_cost
for angle in angles:
if abs((best_theta - angle) % (2 * math.pi)) < 1e-9:
continue
candidate = list(current)
base = candidate[idx]
candidate[idx] = Placement(
device_id=base.device_id,
x=base.x,
y=base.y,
theta=angle,
uuid=base.uuid,
)
candidate_cost = _evaluate_layout_cost(
devices, candidate, lab, collision_checker, reachability_checker, constraints,
)
if candidate_cost < best_cost - 1e-9:
best_theta = angle
best_cost = candidate_cost
if abs((current[idx].theta - best_theta) % (2 * math.pi)) >= 1e-9:
base = current[idx]
current[idx] = Placement(
device_id=base.device_id,
x=base.x,
y=base.y,
theta=best_theta,
uuid=base.uuid,
)
current_cost = best_cost
changed = True
return current, current_cost, changed
def _optimize_positions_fixed_theta(
devices: list[Device],
lab: Lab,
constraints: list[Constraint],
collision_checker: Any,
reachability_checker: Any,
seed_placements: list[Placement],
maxiter: int,
popsize: int,
tol: float,
seed: int | None,
strategy: str,
mutation: tuple[float, float] = (0.5, 1.0),
recombination: float = 0.7,
atol: float = 1e-3,
crossover_mode: str = "device",
) -> tuple[list[Placement], float, int, int]:
"""在固定离散 theta 下,只优化位置。"""
n = len(devices)
bounds_array = _build_bounds(devices, lab, include_theta=False)
seed_vector = np.clip(
_placements_to_position_vector(seed_placements, devices),
bounds_array[:, 0],
bounds_array[:, 1],
)
def cost_function(x: np.ndarray) -> float:
placements = _position_vector_to_placements(x, devices, seed_placements)
return _evaluate_layout_cost(
devices, placements, lab, collision_checker, reachability_checker, constraints,
)
rng = np.random.default_rng(seed)
pop_count = popsize * 2 * n
init_pop = rng.uniform(
bounds_array[:, 0], bounds_array[:, 1], size=(pop_count, 2 * n),
)
init_pop[0] = seed_vector
progress_cb = _make_progress_callback(
devices,
lab,
constraints,
collision_checker,
reachability_checker,
lambda vec: _position_vector_to_placements(vec, devices, seed_placements),
)
best_vector, best_cost, n_generations = _run_de_xy(
cost_fn=cost_function,
bounds=bounds_array,
init_pop=init_pop,
maxiter=maxiter,
tol=tol,
atol=atol,
mutation=mutation,
recombination=recombination,
seed=seed,
n_devices=n,
strategy=strategy,
progress_callback=progress_cb,
crossover_mode=crossover_mode,
)
return (
_position_vector_to_placements(best_vector, devices, seed_placements),
best_cost,
n_generations,
pop_count + n_generations * pop_count,
)
def optimize(
devices: list[Device],
lab: Lab,
constraints: list[Constraint] | None = None,
collision_checker: Any | None = None,
reachability_checker: Any | None = None,
seed_placements: list[Placement] | None = None,
maxiter: int = 200,
popsize: int = 15,
tol: float = 1e-6,
seed: int | None = None,
strategy: str = "currenttobest1bin",
workflow_edges: list[list[str]] | None = None,
angle_granularity: int | None = None,
angle_mode: str = "joint",
mutation: tuple[float, float] = (0.5, 1.0),
theta_mutation: tuple[float, float] | None = None,
recombination: float = 0.7,
atol: float = 1e-3,
crossover_mode: str = "device",
) -> list[Placement]:
"""运行差分进化优化,返回最优布局。
Args:
devices: 待排布的设备列表
lab: 实验室平面图
constraints: 用户自定义约束列表(可选)
collision_checker: 碰撞检测实例(默认使用 MockCollisionChecker
reachability_checker: 可达性检测实例(默认使用 MockReachabilityChecker
seed_placements: 种子布局(若为 None 则自动生成)
maxiter: 最大迭代次数
popsize: 种群大小倍数
tol: 收敛容差
seed: 随机种子(用于可复现性)
strategy: DE 变异策略("currenttobest1bin""best1bin""rand1bin"
workflow_edges: 工作流边列表
angle_granularity: 离散角度粒度4/8/12/24None 为连续
angle_mode: 离散角度模式,"joint"3N DE + 格点吸附)或 "hybrid"(角度扫描 + 位置 DE
mutation: 位置变异因子范围 (F_min, F_max)
theta_mutation: theta 变异因子范围None 时使用 mutation
recombination: 交叉概率 CR
atol: 绝对收敛容差
crossover_mode: "device"per-device 三元组原子交叉)或 "dimension"(逐维独立交叉)
Returns:
最优布局 Placement 列表
"""
if not devices:
return []
if collision_checker is None:
collision_checker = MockCollisionChecker()
if reachability_checker is None:
reachability_checker = MockReachabilityChecker()
if constraints is None:
constraints = []
if theta_mutation is None:
theta_mutation = mutation
n = len(devices)
bounds_array = _build_bounds(devices, lab, include_theta=True)
# 生成种子个体
if seed_placements is None:
seed_placements = generate_initial_layout(devices, lab)
seed_vector = _placements_to_vector(seed_placements, devices)
# 将种子钳位到边界内
seed_vector = np.clip(seed_vector, bounds_array[:, 0], bounds_array[:, 1])
def cost_function(x: np.ndarray) -> float:
placements = _vector_to_placements(x, devices)
return _evaluate_layout_cost(
devices, placements, lab, collision_checker, reachability_checker, constraints,
)
# === 离散角度 hybrid 模式(角度扫描 + 位置 DE===
if angle_granularity is not None and angle_mode == "hybrid":
angles = _angle_lattice(angle_granularity)
current_placements = _snap_placements_to_lattice(seed_placements, angles)
best_placements = current_placements
best_cost = _evaluate_layout_cost(
devices, best_placements, lab, collision_checker, reachability_checker, constraints,
)
total_generations = 0
total_evaluations = 0
logger.info(
"Starting hybrid optimization: %d devices, granularity=%d, outer_rounds=%d, strategy=%s",
n, angle_granularity, 3, strategy,
)
maxiter_xy = max(40, math.ceil(maxiter / 3))
for round_idx in range(3):
round_start_best = best_cost
angle_placements, angle_cost, changed = _angle_sweep_once(
devices,
current_placements,
angles,
lab,
constraints,
collision_checker,
reachability_checker,
)
round_seed = None if seed is None else seed + round_idx
polished_placements, polished_cost, n_generations, n_evaluations = (
_optimize_positions_fixed_theta(
devices=devices,
lab=lab,
constraints=constraints,
collision_checker=collision_checker,
reachability_checker=reachability_checker,
seed_placements=angle_placements,
maxiter=maxiter_xy,
popsize=popsize,
tol=tol,
seed=round_seed,
strategy=strategy,
mutation=mutation,
recombination=recombination,
atol=atol,
crossover_mode=crossover_mode,
)
)
total_generations += n_generations
total_evaluations += n_evaluations
current_placements = polished_placements
if polished_cost < best_cost:
best_cost = polished_cost
best_placements = polished_placements
improved = polished_cost < round_start_best - 1e-9
logger.info(
"Hybrid round %d complete: changed=%s, angle_cost=%.4f, polished_cost=%.4f",
round_idx + 1, changed, angle_cost, polished_cost,
)
if not changed and not improved:
logger.info(
"Hybrid early stop: 第 %d 轮无角度变化且无 cost 改善",
round_idx + 1,
)
break
_log_final_summary(
devices,
best_placements,
lab,
constraints,
collision_checker,
reachability_checker,
best_cost,
total_generations,
total_evaluations,
)
return best_placements
# === 标准 3N DE 路径(连续 theta 或 joint 离散 theta===
allowed_angles: list[float] | None = None
if angle_granularity is not None:
# joint 模式:在 3N DE 中吸附 theta 到离散格点
allowed_angles = _angle_lattice(angle_granularity)
seed_placements = _snap_placements_to_lattice(seed_placements, allowed_angles)
seed_vector = _placements_to_vector(seed_placements, devices)
seed_vector = np.clip(seed_vector, bounds_array[:, 0], bounds_array[:, 1])
# 构建初始种群:种子个体 + 多样性种子 + 随机个体
rng = np.random.default_rng(seed)
pop_count = popsize * 3 * n # scipy 默认 popsize * dim
init_pop = rng.uniform(
bounds_array[:, 0], bounds_array[:, 1], size=(pop_count, 3 * n)
)
init_pop[0] = seed_vector # 注入原始种子
# 多样性种子注入(多 preset + 变异版本)
extra_seeds = _generate_seeds(
devices, lab, rng, workflow_edges, allowed_angles=allowed_angles,
)
for i, s in enumerate(extra_seeds):
idx = i + 1 # 原始种子占 [0]
if idx < pop_count:
init_pop[idx] = np.clip(s, bounds_array[:, 0], bounds_array[:, 1])
logger.info(
"Starting DE optimization: %d devices, %d-dim, popsize=%d, maxiter=%d, strategy=%s, angle_mode=%s",
n, 3 * n, pop_count, maxiter, strategy,
"joint-discrete" if allowed_angles else "continuous",
)
progress_cb = _make_progress_callback(
devices,
lab,
constraints,
collision_checker,
reachability_checker,
lambda vec: _vector_to_placements(vec, devices),
)
best_vector, best_cost, n_generations = _run_de(
cost_fn=cost_function,
bounds=bounds_array,
init_pop=init_pop,
maxiter=maxiter,
tol=tol,
atol=atol,
mutation=mutation,
recombination=recombination,
seed=seed,
n_devices=n,
strategy=strategy,
progress_callback=progress_cb,
theta_mutation=theta_mutation,
crossover_mode=crossover_mode,
allowed_angles=allowed_angles,
)
# 评估次数估算:每代 pop_count 次(初始 + 每代 trial
n_evaluations = pop_count + n_generations * pop_count
final_placements = _vector_to_placements(best_vector, devices)
_log_final_summary(
devices,
final_placements,
lab,
constraints,
collision_checker,
reachability_checker,
best_cost,
n_generations,
n_evaluations,
)
return final_placements
def snap_theta(placements: list[Placement], threshold_deg: float = 15.0) -> list[Placement]:
"""Snap each placement's theta to nearest 90° if within threshold.
Returns new Placement list (does not mutate input).
"""
threshold_rad = math.radians(threshold_deg)
cardinals = [0, math.pi / 2, math.pi, 3 * math.pi / 2, 2 * math.pi]
result = []
for p in placements:
theta_mod = p.theta % (2 * math.pi)
best_cardinal = min(cardinals, key=lambda c: abs(theta_mod - c))
if abs(theta_mod - best_cardinal) <= threshold_rad:
snapped = best_cardinal % (2 * math.pi)
else:
snapped = p.theta
result.append(Placement(
device_id=p.device_id, x=p.x, y=p.y, theta=snapped, uuid=p.uuid,
))
return result
def snap_theta_safe(
placements: list[Placement],
devices: list[Device],
lab: Lab,
collision_checker: Any,
threshold_deg: float = 15.0,
) -> list[Placement]:
"""Snap theta 到基数方向,但碰撞时回退到原始角度。
逐设备检查snap 后如果产生碰撞或越界,则该设备保留原始 theta。
"""
snapped = snap_theta(placements, threshold_deg)
result = list(snapped)
for idx, (orig, snap) in enumerate(zip(placements, snapped)):
if abs(orig.theta - snap.theta) < 1e-9:
continue # 未 snap跳过
# 检查 snap 版本是否导致新碰撞
test_placements = result.copy()
test_placements[idx] = snap
cost = evaluate_default_hard_constraints(
devices, test_placements, lab, collision_checker, graduated=False,
)
if math.isinf(cost):
result[idx] = orig # 回退到未 snap 的角度
logger.info(
"snap_theta_safe: 设备 %s snap θ=%.2f%.2f 导致碰撞,已回退",
snap.device_id, orig.theta, snap.theta,
)
return result
def _placements_to_vector(
placements: list[Placement], devices: list[Device]
) -> np.ndarray:
"""将 Placement 列表编码为 3N 维向量。
按 devices 列表的顺序排列。若某设备在 placements 中缺失,用 (0, 0, 0) 填充。
"""
placement_map = {p.device_id: p for p in placements}
vec = np.zeros(3 * len(devices))
for i, dev in enumerate(devices):
p = placement_map.get(dev.id)
if p is not None:
vec[3 * i] = p.x
vec[3 * i + 1] = p.y
vec[3 * i + 2] = p.theta
return vec
def _vector_to_placements(
x: np.ndarray, devices: list[Device]
) -> list[Placement]:
"""将 3N 维向量解码为 Placement 列表。"""
placements = []
for i, dev in enumerate(devices):
placements.append(
Placement(
device_id=dev.id,
x=float(x[3 * i]),
y=float(x[3 * i + 1]),
theta=float(x[3 * i + 2] % (2 * math.pi)),
)
)
return placements