refactor(layout_optimizer): DE optimizer — discrete angles, strategy fixes, decoupled mutation, API exposure

- Extract _compute_mutant helper with circular angle diff (fixes 0/2π boundary bug)
- Fix currenttobest1bin (remove non-standard noise term), add rand1bin strategy
- Decoupled mutation: independent F ranges for position vs theta
- Configurable crossover mode: per-device (default) or per-dimension
- Discrete angle snapping in normal 3N DE (joint mode, replaces hybrid as default)
- Stop auto-injecting prefer_orientation_mode into DE
- Expose DE hyperparameters (mutation, theta_mutation, recombination, strategy, angle_mode) via API
This commit is contained in:
yexiaozhou
2026-04-10 00:11:07 +08:00
parent a7a6d77d7a
commit 99dc821a01
4 changed files with 4663 additions and 1068 deletions

View File

@@ -27,6 +27,86 @@ from .seeders import resolve_seeder_params, seed_layout
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _circular_diff(
a: np.ndarray, b: np.ndarray, n_devices: int, dims_per_device: int = 3,
) -> np.ndarray:
"""计算 a - b对 theta 分量使用最短圆周距离。
对于 dims_per_device=3每个设备的第 3 个分量theta使用
(delta + π) % (2π) - π 计算最短角度差,避免 0/2π 边界跳变。
对于 dims_per_device=2纯位置等价于普通减法。
"""
result = a - b
if dims_per_device == 3:
two_pi = 2 * math.pi
for d in range(n_devices):
idx = 3 * d + 2
result[idx] = (result[idx] + math.pi) % two_pi - math.pi
return result
def _compute_mutant(
strategy: str,
pop: np.ndarray,
best_vector: np.ndarray,
target_idx: int,
f_val: float,
f_val_theta: float,
rng: np.random.Generator,
n_devices: int,
dims_per_device: int = 3,
) -> np.ndarray:
"""计算 DE 变异向量(统一所有策略)。
支持策略:
- "best1bin": mutant = best + F*(r1 - r2)
- "currenttobest1bin": mutant = target + F*(best - target) + F*(r1 - r2)
- "rand1bin": mutant = r0 + F*(r1 - r2)
使用 _circular_diff 处理角度差,避免 0/2π 边界问题。
当 f_val_theta != f_val 且 dims_per_device == 3 时,对 theta 分量
使用独立的变异因子 f_val_theta 进行缩放。
"""
pop_size = pop.shape[0]
candidates = list(range(pop_size))
candidates.remove(target_idx)
if strategy == "rand1bin":
chosen = rng.choice(candidates, size=3, replace=False)
r0, r1, r2 = int(chosen[0]), int(chosen[1]), int(chosen[2])
diff = _circular_diff(pop[r1], pop[r2], n_devices, dims_per_device)
mutant = pop[r0] + f_val * diff
elif strategy == "best1bin":
chosen = rng.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
diff = _circular_diff(pop[r1], pop[r2], n_devices, dims_per_device)
mutant = best_vector + f_val * diff
elif strategy == "currenttobest1bin":
chosen = rng.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
diff_best = _circular_diff(best_vector, pop[target_idx], n_devices, dims_per_device)
diff_rand = _circular_diff(pop[r1], pop[r2], n_devices, dims_per_device)
mutant = pop[target_idx] + f_val * diff_best + f_val * diff_rand
else:
raise ValueError(f"Unknown DE strategy: {strategy!r}")
# 解耦 theta 变异:当 f_val_theta != f_val 时重新缩放 theta 分量
if dims_per_device == 3 and f_val_theta != f_val:
for d_idx in range(n_devices):
theta_idx = 3 * d_idx + 2
# 确定该策略的 base theta变异前的参考点
if strategy == "best1bin":
base_theta = best_vector[theta_idx]
elif strategy == "currenttobest1bin":
base_theta = pop[target_idx, theta_idx]
else: # rand1bin
base_theta = pop[int(chosen[0]), theta_idx]
diff_theta = mutant[theta_idx] - base_theta
mutant[theta_idx] = base_theta + (f_val_theta / f_val) * diff_theta
return mutant
def _run_de( def _run_de(
cost_fn: Callable[[np.ndarray], float], cost_fn: Callable[[np.ndarray], float],
bounds: np.ndarray, bounds: np.ndarray,
@@ -40,14 +120,20 @@ def _run_de(
n_devices: int, n_devices: int,
strategy: str = "currenttobest1bin", strategy: str = "currenttobest1bin",
progress_callback: Callable[[int, np.ndarray, float], None] | None = None, progress_callback: Callable[[int, np.ndarray, float], None] | None = None,
theta_mutation: tuple[float, float] | None = None,
crossover_mode: str = "device",
allowed_angles: list[float] | None = None,
) -> tuple[np.ndarray, float, int]: ) -> tuple[np.ndarray, float, int]:
"""自定义差分进化循环。 """自定义差分进化循环。
特性: 特性:
- 支持 currenttobest1bin / best1bin 种策略 - 支持 currenttobest1bin / best1bin / rand1bin 三种策略
- Per-device crossover以设备 (x, y, θ) 三元组为原子单元进行交叉 - Per-device crossover(默认):以设备 (x, y, θ) 三元组为原子单元进行交叉
- Per-dimension crossover可选每个标量维度独立交叉
- θ wrapping交叉后对角度取模 [0, 2π) - θ wrapping交叉后对角度取模 [0, 2π)
- Early stopping最近 20 代改善 < 0.1% 时提前终止 - 离散角度吸附:可选将 θ 吸附到指定格点
- 解耦变异position 和 theta 可使用不同 F 范围
- Early stopping最近 200 代改善 < 0.1% 时提前终止
- scipy 风格收敛判断std(costs) <= atol + tol * |best_cost| - scipy 风格收敛判断std(costs) <= atol + tol * |best_cost|
Args: Args:
@@ -57,12 +143,15 @@ def _run_de(
maxiter: 最大迭代代数 maxiter: 最大迭代代数
tol: 相对收敛容差 tol: 相对收敛容差
atol: 绝对收敛容差 atol: 绝对收敛容差
mutation: 变异因子范围 (F_min, F_max) mutation: 变异因子范围 (F_min, F_max),用于位置分量
recombination: 交叉概率 CR recombination: 交叉概率 CR
seed: 随机种子 seed: 随机种子
n_devices: 设备数量(用于 per-device crossover n_devices: 设备数量(用于 per-device crossover
strategy: 变异策略,"currenttobest1bin""best1bin" strategy: 变异策略,"currenttobest1bin""best1bin""rand1bin"
progress_callback: 每 10 代调用一次 (gen, best_vector, best_cost) progress_callback: 每 10 代调用一次 (gen, best_vector, best_cost)
theta_mutation: theta 变异因子范围None 时使用 mutation
crossover_mode: "device"per-device 三元组原子交叉)或 "dimension"(逐维独立交叉)
allowed_angles: 离散角度格点列表,非 None 时将 θ 吸附到最近格点
Returns: Returns:
(best_vector, best_cost, n_generations) (best_vector, best_cost, n_generations)
@@ -71,7 +160,16 @@ def _run_de(
pop_size, ndim = init_pop.shape pop_size, ndim = init_pop.shape
lower = bounds[:, 0] lower = bounds[:, 0]
upper = bounds[:, 1] upper = bounds[:, 1]
f_min, f_max = mutation if theta_mutation is None:
theta_mutation = mutation
# 离散角度:吸附初始种群 θ 到格点
if allowed_angles is not None:
for ind_idx in range(pop_size):
for d in range(n_devices):
init_pop[ind_idx, 3 * d + 2] = _nearest_lattice_theta(
init_pop[ind_idx, 3 * d + 2], allowed_angles,
)
# 评估初始种群适应度 # 评估初始种群适应度
costs = np.array([cost_fn(ind) for ind in init_pop]) costs = np.array([cost_fn(ind) for ind in init_pop])
@@ -85,42 +183,46 @@ def _run_de(
for gen in range(1, maxiter + 1): for gen in range(1, maxiter + 1):
for i in range(pop_size): for i in range(pop_size):
# 选择变异因子 F每个个体独立采样 # 采样变异因子 F位置和 theta 各自独立
f_val = rng.uniform(f_min, f_max) f_val = rng.uniform(mutation[0], mutation[1])
f_val_theta = rng.uniform(theta_mutation[0], theta_mutation[1])
# 选择两个不同于 i 和 best_idx 的个体索引 # 变异向量(使用统一 helper
candidates = list(range(pop_size)) mutant = _compute_mutant(
candidates.remove(i) strategy, init_pop, best_vector, i,
chosen = rng.choice(candidates, size=2, replace=False) f_val, f_val_theta, rng, n_devices, dims_per_device=3,
r1, r2 = int(chosen[0]), int(chosen[1]) )
# 变异向量 # 交叉
if strategy == "best1bin":
# Turbo 模式mutant = best + F*(r1 - r2)
mutant = best_vector + f_val * (init_pop[r1] - init_pop[r2])
else:
# 默认 currenttobest1binmutant = target + F*(best - target) + F*(r1 - r2)
mutant = (
init_pop[i]
# add a scaled minimum to encourage exploration
+ f_val * 0.1 * (upper - lower) * rng.uniform(-1, 1, size=ndim)
+ f_val * (best_vector - init_pop[i])
+ f_val * (init_pop[r1] - init_pop[r2])
)
# Per-device crossover以 (x, y, θ) 三元组为原子单元
trial = init_pop[i].copy() trial = init_pop[i].copy()
j_rand = rng.integers(0, n_devices) # 保证至少一个设备来自 mutant if crossover_mode == "dimension":
for d in range(n_devices): # 逐维独立交叉
if rng.random() < recombination or d == j_rand: j_rand = rng.integers(0, ndim)
trial[3 * d: 3 * d + 3] = mutant[3 * d: 3 * d + 3] for j in range(ndim):
if rng.random() < recombination or j == j_rand:
trial[j] = mutant[j]
else:
# Per-device crossover以 (x, y, θ) 三元组为原子单元
j_rand = rng.integers(0, n_devices)
for d in range(n_devices):
if rng.random() < recombination or d == j_rand:
trial[3 * d: 3 * d + 3] = mutant[3 * d: 3 * d + 3]
# θ wrapping角度取模 [0, 2π) # θ wrapping角度取模 [0, 2π)
for d in range(n_devices): for d in range(n_devices):
trial[3 * d + 2] %= 2 * math.pi trial[3 * d + 2] %= 2 * math.pi
# 钳位到边界内 # 离散角度吸附
if allowed_angles is not None:
for d in range(n_devices):
trial[3 * d + 2] = _nearest_lattice_theta(
trial[3 * d + 2], allowed_angles,
)
# 钳位到边界内,然后重新 normalize θ(避免 clip 破坏 modulo
trial = np.clip(trial, lower, upper) trial = np.clip(trial, lower, upper)
for d in range(n_devices):
trial[3 * d + 2] %= 2 * math.pi
# 贪心选择trial 不比当前差则替换 # 贪心选择trial 不比当前差则替换
trial_cost = cost_fn(trial) trial_cost = cost_fn(trial)
@@ -172,6 +274,7 @@ def _generate_seeds(
n_variants: int = 3, n_variants: int = 3,
sigma_pos_frac: float = 0.05, sigma_pos_frac: float = 0.05,
sigma_theta: float = math.pi / 6, sigma_theta: float = math.pi / 6,
allowed_angles: list[float] | None = None,
) -> list[np.ndarray]: ) -> list[np.ndarray]:
"""从多个 seeder preset 生成多样性种子个体 + 变异版本。""" """从多个 seeder preset 生成多样性种子个体 + 变异版本。"""
seeds: list[np.ndarray] = [] seeds: list[np.ndarray] = []
@@ -188,6 +291,12 @@ def _generate_seeds(
continue continue
base_placements = seed_layout(devices, lab, params, workflow_edges) base_placements = seed_layout(devices, lab, params, workflow_edges)
base_vec = _placements_to_vector(base_placements, devices) base_vec = _placements_to_vector(base_placements, devices)
# 离散角度吸附
if allowed_angles is not None:
for d in range(len(devices)):
base_vec[3 * d + 2] = _nearest_lattice_theta(
base_vec[3 * d + 2], allowed_angles,
)
seeds.append(base_vec) seeds.append(base_vec)
# 变异版本:对 (x,y) 加高斯噪声 σ=5% lab 尺寸,θ 加 σ=π/6 # 变异版本:对 (x,y) 加高斯噪声 σ=5% lab 尺寸,θ 加 σ=π/6
@@ -198,6 +307,10 @@ def _generate_seeds(
variant[3 * d + 1] += rng.normal(0, sigma_pos_frac * lab.depth) variant[3 * d + 1] += rng.normal(0, sigma_pos_frac * lab.depth)
variant[3 * d + 2] += rng.normal(0, sigma_theta) variant[3 * d + 2] += rng.normal(0, sigma_theta)
variant[3 * d + 2] %= 2 * math.pi variant[3 * d + 2] %= 2 * math.pi
if allowed_angles is not None:
variant[3 * d + 2] = _nearest_lattice_theta(
variant[3 * d + 2], allowed_angles,
)
seeds.append(variant) seeds.append(variant)
return seeds return seeds
@@ -419,45 +532,44 @@ def _run_de_xy(
n_devices: int, n_devices: int,
strategy: str = "currenttobest1bin", strategy: str = "currenttobest1bin",
progress_callback: Callable[[int, np.ndarray, float], None] | None = None, progress_callback: Callable[[int, np.ndarray, float], None] | None = None,
crossover_mode: str = "device",
) -> tuple[np.ndarray, float, int]: ) -> tuple[np.ndarray, float, int]:
"""固定 theta 的 2N 维位置 DE。""" """固定 theta 的 2N 维位置 DE。"""
rng = np.random.default_rng(seed) rng = np.random.default_rng(seed)
pop_size, ndim = init_pop.shape pop_size, ndim = init_pop.shape
lower = bounds[:, 0] lower = bounds[:, 0]
upper = bounds[:, 1] upper = bounds[:, 1]
f_min, f_max = mutation
costs = np.array([cost_fn(ind) for ind in init_pop]) costs = np.array([cost_fn(ind) for ind in init_pop])
best_idx = int(np.argmin(costs)) best_idx = int(np.argmin(costs))
best_cost = costs[best_idx] best_cost = costs[best_idx]
best_vector = init_pop[best_idx].copy() best_vector = init_pop[best_idx].copy()
patience = 200 patience = 60
best_cost_history: list[float] = [best_cost] best_cost_history: list[float] = [best_cost]
for gen in range(1, maxiter + 1): for gen in range(1, maxiter + 1):
for i in range(pop_size): for i in range(pop_size):
f_val = rng.uniform(f_min, f_max) f_val = rng.uniform(mutation[0], mutation[1])
candidates = list(range(pop_size))
candidates.remove(i)
chosen = rng.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
if strategy == "best1bin": # 变异向量dims_per_device=2无 theta
mutant = best_vector + f_val * (init_pop[r1] - init_pop[r2]) mutant = _compute_mutant(
else: strategy, init_pop, best_vector, i,
mutant = ( f_val, f_val, rng, n_devices, dims_per_device=2,
init_pop[i] )
+ f_val * 0.1 * (upper - lower) * rng.uniform(-1, 1, size=ndim)
+ f_val * (best_vector - init_pop[i])
+ f_val * (init_pop[r1] - init_pop[r2])
)
# 交叉
trial = init_pop[i].copy() trial = init_pop[i].copy()
j_rand = rng.integers(0, n_devices) if crossover_mode == "dimension":
for d in range(n_devices): j_rand = rng.integers(0, ndim)
if rng.random() < recombination or d == j_rand: for j in range(ndim):
trial[2 * d: 2 * d + 2] = mutant[2 * d: 2 * d + 2] if rng.random() < recombination or j == j_rand:
trial[j] = mutant[j]
else:
j_rand = rng.integers(0, n_devices)
for d in range(n_devices):
if rng.random() < recombination or d == j_rand:
trial[2 * d: 2 * d + 2] = mutant[2 * d: 2 * d + 2]
trial = np.clip(trial, lower, upper) trial = np.clip(trial, lower, upper)
trial_cost = cost_fn(trial) trial_cost = cost_fn(trial)
@@ -560,6 +672,10 @@ def _optimize_positions_fixed_theta(
tol: float, tol: float,
seed: int | None, seed: int | None,
strategy: str, strategy: str,
mutation: tuple[float, float] = (0.5, 1.0),
recombination: float = 0.7,
atol: float = 1e-3,
crossover_mode: str = "device",
) -> tuple[list[Placement], float, int, int]: ) -> tuple[list[Placement], float, int, int]:
"""在固定离散 theta 下,只优化位置。""" """在固定离散 theta 下,只优化位置。"""
n = len(devices) n = len(devices)
@@ -598,13 +714,14 @@ def _optimize_positions_fixed_theta(
init_pop=init_pop, init_pop=init_pop,
maxiter=maxiter, maxiter=maxiter,
tol=tol, tol=tol,
atol=1e-3, atol=atol,
mutation=(0.5, 1.0), mutation=mutation,
recombination=0.7, recombination=recombination,
seed=seed, seed=seed,
n_devices=n, n_devices=n,
strategy=strategy, strategy=strategy,
progress_callback=progress_cb, progress_callback=progress_cb,
crossover_mode=crossover_mode,
) )
return ( return (
_position_vector_to_placements(best_vector, devices, seed_placements), _position_vector_to_placements(best_vector, devices, seed_placements),
@@ -628,6 +745,12 @@ def optimize(
strategy: str = "currenttobest1bin", strategy: str = "currenttobest1bin",
workflow_edges: list[list[str]] | None = None, workflow_edges: list[list[str]] | None = None,
angle_granularity: int | None = None, angle_granularity: int | None = None,
angle_mode: str = "joint",
mutation: tuple[float, float] = (0.5, 1.0),
theta_mutation: tuple[float, float] | None = None,
recombination: float = 0.7,
atol: float = 1e-3,
crossover_mode: str = "device",
) -> list[Placement]: ) -> list[Placement]:
"""运行差分进化优化,返回最优布局。 """运行差分进化优化,返回最优布局。
@@ -642,7 +765,15 @@ def optimize(
popsize: 种群大小倍数 popsize: 种群大小倍数
tol: 收敛容差 tol: 收敛容差
seed: 随机种子(用于可复现性) seed: 随机种子(用于可复现性)
strategy: DE 变异策略("currenttobest1bin""best1bin" strategy: DE 变异策略("currenttobest1bin""best1bin""rand1bin"
workflow_edges: 工作流边列表
angle_granularity: 离散角度粒度4/8/12/24None 为连续
angle_mode: 离散角度模式,"joint"3N DE + 格点吸附)或 "hybrid"(角度扫描 + 位置 DE
mutation: 位置变异因子范围 (F_min, F_max)
theta_mutation: theta 变异因子范围None 时使用 mutation
recombination: 交叉概率 CR
atol: 绝对收敛容差
crossover_mode: "device"per-device 三元组原子交叉)或 "dimension"(逐维独立交叉)
Returns: Returns:
最优布局 Placement 列表 最优布局 Placement 列表
@@ -656,6 +787,8 @@ def optimize(
reachability_checker = MockReachabilityChecker() reachability_checker = MockReachabilityChecker()
if constraints is None: if constraints is None:
constraints = [] constraints = []
if theta_mutation is None:
theta_mutation = mutation
n = len(devices) n = len(devices)
bounds_array = _build_bounds(devices, lab, include_theta=True) bounds_array = _build_bounds(devices, lab, include_theta=True)
@@ -675,7 +808,8 @@ def optimize(
devices, placements, lab, collision_checker, reachability_checker, constraints, devices, placements, lab, collision_checker, reachability_checker, constraints,
) )
if angle_granularity is not None: # === 离散角度 hybrid 模式(角度扫描 + 位置 DE===
if angle_granularity is not None and angle_mode == "hybrid":
angles = _angle_lattice(angle_granularity) angles = _angle_lattice(angle_granularity)
current_placements = _snap_placements_to_lattice(seed_placements, angles) current_placements = _snap_placements_to_lattice(seed_placements, angles)
best_placements = current_placements best_placements = current_placements
@@ -716,6 +850,10 @@ def optimize(
tol=tol, tol=tol,
seed=round_seed, seed=round_seed,
strategy=strategy, strategy=strategy,
mutation=mutation,
recombination=recombination,
atol=atol,
crossover_mode=crossover_mode,
) )
) )
total_generations += n_generations total_generations += n_generations
@@ -752,6 +890,15 @@ def optimize(
) )
return best_placements return best_placements
# === 标准 3N DE 路径(连续 theta 或 joint 离散 theta===
allowed_angles: list[float] | None = None
if angle_granularity is not None:
# joint 模式:在 3N DE 中吸附 theta 到离散格点
allowed_angles = _angle_lattice(angle_granularity)
seed_placements = _snap_placements_to_lattice(seed_placements, allowed_angles)
seed_vector = _placements_to_vector(seed_placements, devices)
seed_vector = np.clip(seed_vector, bounds_array[:, 0], bounds_array[:, 1])
# 构建初始种群:种子个体 + 多样性种子 + 随机个体 # 构建初始种群:种子个体 + 多样性种子 + 随机个体
rng = np.random.default_rng(seed) rng = np.random.default_rng(seed)
pop_count = popsize * 3 * n # scipy 默认 popsize * dim pop_count = popsize * 3 * n # scipy 默认 popsize * dim
@@ -761,15 +908,18 @@ def optimize(
init_pop[0] = seed_vector # 注入原始种子 init_pop[0] = seed_vector # 注入原始种子
# 多样性种子注入(多 preset + 变异版本) # 多样性种子注入(多 preset + 变异版本)
extra_seeds = _generate_seeds(devices, lab, rng, workflow_edges) extra_seeds = _generate_seeds(
devices, lab, rng, workflow_edges, allowed_angles=allowed_angles,
)
for i, s in enumerate(extra_seeds): for i, s in enumerate(extra_seeds):
idx = i + 1 # 原始种子占 [0] idx = i + 1 # 原始种子占 [0]
if idx < pop_count: if idx < pop_count:
init_pop[idx] = np.clip(s, bounds_array[:, 0], bounds_array[:, 1]) init_pop[idx] = np.clip(s, bounds_array[:, 0], bounds_array[:, 1])
logger.info( logger.info(
"Starting DE optimization: %d devices, %d-dim, popsize=%d, maxiter=%d, strategy=%s", "Starting DE optimization: %d devices, %d-dim, popsize=%d, maxiter=%d, strategy=%s, angle_mode=%s",
n, 3 * n, pop_count, maxiter, strategy, n, 3 * n, pop_count, maxiter, strategy,
"joint-discrete" if allowed_angles else "continuous",
) )
progress_cb = _make_progress_callback( progress_cb = _make_progress_callback(
@@ -787,13 +937,16 @@ def optimize(
init_pop=init_pop, init_pop=init_pop,
maxiter=maxiter, maxiter=maxiter,
tol=tol, tol=tol,
atol=1e-3, atol=atol,
mutation=(0.5, 1.0), mutation=mutation,
recombination=0.7, recombination=recombination,
seed=seed, seed=seed,
n_devices=n, n_devices=n,
strategy=strategy, strategy=strategy,
progress_callback=progress_cb, progress_callback=progress_cb,
theta_mutation=theta_mutation,
crossover_mode=crossover_mode,
allowed_angles=allowed_angles,
) )
# 评估次数估算:每代 pop_count 次(初始 + 每代 trial # 评估次数估算:每代 pop_count 次(初始 + 每代 trial

View File

@@ -34,7 +34,7 @@ from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel from pydantic import BaseModel
from .constraints import DEFAULT_WEIGHT_ANGLE from .constraints import DEFAULT_WEIGHT_ANGLE # noqa: F401 — kept for external use
from .device_catalog import ( from .device_catalog import (
create_devices_from_list, create_devices_from_list,
load_devices_from_assets, load_devices_from_assets,
@@ -496,6 +496,13 @@ class OptimizeRequest(BaseModel):
snap_cardinal: bool = False snap_cardinal: bool = False
angle_granularity: int | None = None angle_granularity: int | None = None
arm_reach: dict[str, float] = {} arm_reach: dict[str, float] = {}
# DE 超参数
strategy: str = "currenttobest1bin"
angle_mode: str = "joint"
mutation: list[float] = [0.5, 1.0]
theta_mutation: list[float] | None = None
recombination: float = 0.7
crossover_mode: str = "device"
class PositionXYZ(BaseModel): class PositionXYZ(BaseModel):
@@ -576,25 +583,43 @@ async def run_optimize(request: OptimizeRequest):
request.workflow_edges or None, request.workflow_edges or None,
) )
# 3. Auto-inject orientation soft constraints for DE # 3. Auto-inject alignment soft constraint (opt-in via seeder_overrides)
if request.run_de and request.seeder != "row_fallback" and seed_placements: if request.run_de and seed_placements:
# Resolve orientation mode from seeder preset
orientation_mode = params.orientation_mode if params else "none"
if orientation_mode != "none":
# prefer_orientation_mode: position-aware outward/inward facing penalty
constraints.append(Constraint(
type="soft",
rule_name="prefer_orientation_mode",
params={"mode": orientation_mode},
weight=request.seeder_overrides.get("orientation_weight", DEFAULT_WEIGHT_ANGLE),
))
# prefer_aligned: penalize non-cardinal angles默认关闭用户可通过 align_cardinal intent 或 seeder_overrides 开启) # prefer_aligned: penalize non-cardinal angles默认关闭用户可通过 align_cardinal intent 或 seeder_overrides 开启)
constraints = _maybe_add_prefer_aligned_constraint( constraints = _maybe_add_prefer_aligned_constraint(
constraints, constraints,
request.seeder_overrides.get("align_weight", 0), request.seeder_overrides.get("align_weight", 0),
) )
# 4. Conditional Differential Evolution # 4. Validate DE hyperparameters
if request.strategy not in {"currenttobest1bin", "best1bin", "rand1bin"}:
raise HTTPException(
status_code=400,
detail=f"strategy must be one of: currenttobest1bin, best1bin, rand1bin (got {request.strategy!r})",
)
if request.angle_mode not in {"joint", "hybrid"}:
raise HTTPException(
status_code=400,
detail=f"angle_mode must be one of: joint, hybrid (got {request.angle_mode!r})",
)
if request.crossover_mode not in {"device", "dimension"}:
raise HTTPException(
status_code=400,
detail=f"crossover_mode must be one of: device, dimension (got {request.crossover_mode!r})",
)
if len(request.mutation) != 2 or request.mutation[0] > request.mutation[1]:
raise HTTPException(status_code=400, detail="mutation must be [F_min, F_max] with F_min <= F_max")
if request.mutation[0] < 0 or request.mutation[1] > 2.0:
raise HTTPException(status_code=400, detail="mutation values must be in [0, 2.0]")
if request.theta_mutation is not None:
if len(request.theta_mutation) != 2 or request.theta_mutation[0] > request.theta_mutation[1]:
raise HTTPException(status_code=400, detail="theta_mutation must be [F_min, F_max] with F_min <= F_max")
if request.theta_mutation[0] < 0 or request.theta_mutation[1] > 2.0:
raise HTTPException(status_code=400, detail="theta_mutation values must be in [0, 2.0]")
if not (0 <= request.recombination <= 1.0):
raise HTTPException(status_code=400, detail="recombination must be in [0, 1.0]")
# 5. Conditional Differential Evolution
de_ran = False de_ran = False
checker = MockCollisionChecker() checker = MockCollisionChecker()
reachability_checker = MockReachabilityChecker(request.arm_reach or None) reachability_checker = MockReachabilityChecker(request.arm_reach or None)
@@ -608,8 +633,14 @@ async def run_optimize(request: OptimizeRequest):
seed_placements=seed_placements, seed_placements=seed_placements,
maxiter=request.maxiter, maxiter=request.maxiter,
seed=request.seed, seed=request.seed,
strategy=request.strategy,
workflow_edges=request.workflow_edges or None, workflow_edges=request.workflow_edges or None,
angle_granularity=request.angle_granularity, angle_granularity=request.angle_granularity,
angle_mode=request.angle_mode,
mutation=tuple(request.mutation),
theta_mutation=tuple(request.theta_mutation) if request.theta_mutation else None,
recombination=request.recombination,
crossover_mode=request.crossover_mode,
) )
de_ran = True de_ran = True
else: else:

View File

@@ -8,7 +8,14 @@ from ..mock_checkers import MockCollisionChecker
from ..models import Constraint, Device, Lab, Placement from ..models import Constraint, Device, Lab, Placement
import numpy as np import numpy as np
import pytest import pytest
from ..optimizer import _angle_sweep_once, _run_de, optimize, snap_theta from ..optimizer import (
_angle_sweep_once,
_circular_diff,
_compute_mutant,
_run_de,
optimize,
snap_theta,
)
def _is_on_angle_lattice(theta: float, granularity: int) -> bool: def _is_on_angle_lattice(theta: float, granularity: int) -> bool:
@@ -676,3 +683,437 @@ def test_run_de_returns_correct_tuple():
assert best_vec.shape == (3,) assert best_vec.shape == (3,)
assert best_cost == pytest.approx(42.0) assert best_cost == pytest.approx(42.0)
assert isinstance(n_gen, int) and n_gen >= 1 assert isinstance(n_gen, int) and n_gen >= 1
# ──────────────────────────────────────────────────────────────
# DE 重构测试_circular_diff / _compute_mutant / 新策略 / 解耦变异 / 交叉模式 / joint 离散角度 / API
# ──────────────────────────────────────────────────────────────
class TestCircularDiff:
"""_circular_diff 圆周角度差测试。"""
def test_near_zero_boundary(self):
"""0.1 vs 2π-0.1 应≈0.2,不是 -6.08。"""
a = np.array([1.0, 2.0, 0.1])
b = np.array([1.0, 2.0, 2 * math.pi - 0.1])
result = _circular_diff(a, b, n_devices=1, dims_per_device=3)
assert result[0] == pytest.approx(0.0) # x unchanged
assert result[1] == pytest.approx(0.0) # y unchanged
assert abs(result[2]) == pytest.approx(0.2, abs=1e-6) # theta shortest
def test_same_angle(self):
"""相同角度差为 0。"""
a = np.array([0, 0, math.pi, 0, 0, math.pi])
b = np.array([0, 0, math.pi, 0, 0, math.pi])
result = _circular_diff(a, b, n_devices=2, dims_per_device=3)
for d in range(2):
assert result[3 * d + 2] == pytest.approx(0.0)
def test_opposite_angles(self):
"""π vs 0 应≈π。"""
a = np.array([0, 0, math.pi])
b = np.array([0, 0, 0.0])
result = _circular_diff(a, b, n_devices=1, dims_per_device=3)
assert abs(result[2]) == pytest.approx(math.pi, abs=1e-6)
def test_dims_per_device_2_is_plain_diff(self):
"""dims_per_device=2 时退化为普通减法。"""
a = np.array([3.0, 5.0, 1.0, 2.0])
b = np.array([1.0, 2.0, 0.5, 1.0])
result = _circular_diff(a, b, n_devices=2, dims_per_device=2)
np.testing.assert_array_almost_equal(result, a - b)
class TestComputeMutant:
"""_compute_mutant 统一变异向量测试。"""
def _make_pop(self, n_devices=2, pop_size=10, seed=42):
rng = np.random.default_rng(seed)
ndim = 3 * n_devices
return rng.uniform(0, 5, size=(pop_size, ndim)), rng
def test_currenttobest1bin_no_noise(self):
"""currenttobest1bin 结果 = target + F*(best-target) + F*(r1-r2),无随机噪声。"""
pop, rng = self._make_pop()
best = pop[0].copy()
f_val = 0.7
# 固定 rng 以确定 r1, r2
rng_copy = np.random.default_rng(123)
mutant = _compute_mutant(
"currenttobest1bin", pop, best, target_idx=3,
f_val=f_val, f_val_theta=f_val, rng=rng_copy,
n_devices=2, dims_per_device=3,
)
# 重建:获取 rng_copy 选的 r1, r2
rng_verify = np.random.default_rng(123)
candidates = list(range(10))
candidates.remove(3)
chosen = rng_verify.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
diff_best = _circular_diff(best, pop[3], 2, 3)
diff_rand = _circular_diff(pop[r1], pop[r2], 2, 3)
expected = pop[3] + f_val * diff_best + f_val * diff_rand
np.testing.assert_array_almost_equal(mutant, expected)
def test_rand1bin_three_distinct(self):
"""rand1bin 应选 3 个不同于 target 的个体。"""
pop, rng = self._make_pop(pop_size=5)
# 不应 raise
mutant = _compute_mutant(
"rand1bin", pop, pop[0], target_idx=0,
f_val=0.5, f_val_theta=0.5, rng=rng,
n_devices=2, dims_per_device=3,
)
assert mutant.shape == (6,)
def test_unknown_strategy_raises(self):
"""未知策略应 raise ValueError。"""
pop, rng = self._make_pop()
with pytest.raises(ValueError, match="Unknown DE strategy"):
_compute_mutant(
"nonexistent", pop, pop[0], target_idx=0,
f_val=0.5, f_val_theta=0.5, rng=rng,
n_devices=2, dims_per_device=3,
)
def test_best1bin_formula(self):
"""best1bin 结果 = best + F*(r1-r2)。"""
pop, _ = self._make_pop()
best = pop[0].copy()
f_val = 0.6
rng1 = np.random.default_rng(99)
mutant = _compute_mutant(
"best1bin", pop, best, target_idx=2,
f_val=f_val, f_val_theta=f_val, rng=rng1,
n_devices=2, dims_per_device=3,
)
rng2 = np.random.default_rng(99)
candidates = list(range(10))
candidates.remove(2)
chosen = rng2.choice(candidates, size=2, replace=False)
r1, r2 = int(chosen[0]), int(chosen[1])
diff = _circular_diff(pop[r1], pop[r2], 2, 3)
expected = best + f_val * diff
np.testing.assert_array_almost_equal(mutant, expected)
class TestDecoupledMutation:
"""解耦 theta 变异测试。"""
def test_decoupled_scales_theta_only(self):
"""theta_mutation < mutation 时theta 变异幅度应更小。"""
rng = np.random.default_rng(42)
pop = rng.uniform(0, 5, size=(10, 6))
best = pop[0].copy()
# 大 F 给位置+theta
rng1 = np.random.default_rng(77)
mutant_same = _compute_mutant(
"best1bin", pop, best, 1, f_val=0.8, f_val_theta=0.8,
rng=rng1, n_devices=2, dims_per_device=3,
)
# 大 F 给位置,小 F 给 theta
rng2 = np.random.default_rng(77)
mutant_decoupled = _compute_mutant(
"best1bin", pop, best, 1, f_val=0.8, f_val_theta=0.2,
rng=rng2, n_devices=2, dims_per_device=3,
)
# x, y 应相同
for d in range(2):
assert mutant_same[3 * d] == pytest.approx(mutant_decoupled[3 * d])
assert mutant_same[3 * d + 1] == pytest.approx(mutant_decoupled[3 * d + 1])
# theta 差值应更小(绝对值)
for d in range(2):
theta_diff_same = abs(mutant_same[3 * d + 2] - best[3 * d + 2])
theta_diff_decoupled = abs(mutant_decoupled[3 * d + 2] - best[3 * d + 2])
assert theta_diff_decoupled <= theta_diff_same + 1e-9
def test_decoupled_no_effect_when_same(self):
"""theta_mutation == mutation 时行为应完全一致。"""
rng = np.random.default_rng(42)
pop = rng.uniform(0, 5, size=(10, 6))
best = pop[0].copy()
rng1 = np.random.default_rng(77)
m1 = _compute_mutant(
"currenttobest1bin", pop, best, 2, 0.7, 0.7,
rng1, 2, 3,
)
rng2 = np.random.default_rng(77)
m2 = _compute_mutant(
"currenttobest1bin", pop, best, 2, 0.7, 0.7,
rng2, 2, 3,
)
np.testing.assert_array_almost_equal(m1, m2)
class TestCrossoverMode:
"""交叉模式测试。"""
def test_crossover_device_mode_atomicity(self):
"""device 模式:(x,y,θ) 三元组始终整体复制。"""
rng = np.random.default_rng(42)
n_devices = 3
parent = np.zeros(9)
mutant = np.ones(9) * 10.0
violations = 0
for _ in range(200):
trial = parent.copy()
j_rand = rng.integers(0, n_devices)
for d in range(n_devices):
if rng.random() < 0.7 or d == j_rand:
trial[3 * d: 3 * d + 3] = mutant[3 * d: 3 * d + 3]
for d in range(n_devices):
triple = trial[3 * d: 3 * d + 3]
if not (np.allclose(triple, 0.0) or np.allclose(triple, 10.0)):
violations += 1
assert violations == 0
def test_crossover_dimension_mode_can_split_triplet(self):
"""dimension 模式:单个维度可以独立交叉,三元组可被拆分。"""
rng = np.random.default_rng(42)
n_devices = 3
ndim = 9
parent = np.zeros(ndim)
mutant = np.ones(ndim) * 10.0
split_seen = False
for _ in range(500):
trial = parent.copy()
j_rand = rng.integers(0, ndim)
for j in range(ndim):
if rng.random() < 0.5 or j == j_rand:
trial[j] = mutant[j]
for d in range(n_devices):
triple = trial[3 * d: 3 * d + 3]
if not (np.allclose(triple, 0.0) or np.allclose(triple, 10.0)):
split_seen = True
break
if split_seen:
break
assert split_seen, "dimension 模式应允许三元组拆分"
class TestJointDiscreteAngle:
"""joint 模式离散角度 DE 测试。"""
def test_joint_mode_returns_lattice_thetas(self):
"""angle_granularity=4, angle_mode='joint' → 所有 theta 在格点上。"""
devices = [
Device(id="a", name="A", bbox=(0.8, 0.6)),
Device(id="b", name="B", bbox=(0.6, 0.5)),
Device(id="c", name="C", bbox=(0.5, 0.5)),
]
lab = Lab(width=5.0, depth=5.0)
seed_placements = [
Placement(device_id="a", x=1.0, y=1.0, theta=0.13),
Placement(device_id="b", x=2.3, y=1.5, theta=1.21),
Placement(device_id="c", x=3.6, y=3.2, theta=2.42),
]
placements = optimize(
devices, lab, seed_placements=seed_placements,
seed=42, maxiter=45, popsize=8,
angle_granularity=4, angle_mode="joint",
)
assert len(placements) == 3
for p in placements:
assert _is_on_angle_lattice(p.theta, 4), (
f"{p.device_id} theta={p.theta} not on 4-angle lattice"
)
def test_hybrid_mode_still_works(self):
"""angle_mode='hybrid' → 现有 hybrid 行为不变。"""
devices = [
Device(id="a", name="A", bbox=(0.8, 0.6)),
Device(id="b", name="B", bbox=(0.6, 0.5)),
]
lab = Lab(width=5.0, depth=5.0)
placements = optimize(
devices, lab, seed=42, maxiter=30, popsize=8,
angle_granularity=4, angle_mode="hybrid",
)
assert len(placements) == 2
for p in placements:
assert _is_on_angle_lattice(p.theta, 4)
def test_joint_default_when_granularity_set(self):
"""省略 angle_mode + 设定 angle_granularity → 默认 joint 模式。"""
devices = [
Device(id="a", name="A", bbox=(0.8, 0.6)),
Device(id="b", name="B", bbox=(0.6, 0.5)),
]
lab = Lab(width=5.0, depth=5.0)
placements = optimize(
devices, lab, seed=42, maxiter=30, popsize=8,
angle_granularity=4,
)
assert len(placements) == 2
for p in placements:
assert _is_on_angle_lattice(p.theta, 4)
class TestNewStrategies:
"""rand1bin 策略集成测试。"""
def test_rand1bin_converges(self):
"""rand1bin 策略应能收敛。"""
devices = [
Device(id="a", name="A", bbox=(0.6, 0.4)),
Device(id="b", name="B", bbox=(0.6, 0.4)),
]
lab = Lab(width=5.0, depth=5.0)
placements = optimize(
devices, lab, seed=42, maxiter=80, popsize=10,
strategy="rand1bin",
)
assert len(placements) == 2
checker = MockCollisionChecker()
checker_placements = [
{"id": p.device_id, "bbox": next(d.bbox for d in devices if d.id == p.device_id),
"pos": (p.x, p.y, p.theta)}
for p in placements
]
collisions = checker.check(checker_placements)
assert collisions == [], f"rand1bin 策略产生碰撞: {collisions}"
class TestAPINewParams:
"""POST /optimize 新参数 API 测试。"""
def test_api_accepts_strategy(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"strategy": "rand1bin",
"maxiter": 10,
"seed": 42,
})
assert resp.status_code == 200
def test_api_rejects_invalid_strategy(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"strategy": "invalid",
"run_de": False,
})
assert resp.status_code == 400
assert "strategy" in resp.json()["detail"]
def test_api_accepts_angle_mode(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"angle_mode": "hybrid",
"angle_granularity": 4,
"maxiter": 10,
"seed": 42,
})
assert resp.status_code == 200
def test_api_rejects_invalid_angle_mode(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"angle_mode": "bogus",
"run_de": False,
})
assert resp.status_code == 400
assert "angle_mode" in resp.json()["detail"]
def test_api_accepts_theta_mutation(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"theta_mutation": [0.2, 0.5],
"maxiter": 10,
"seed": 42,
})
assert resp.status_code == 200
def test_api_accepts_mutation_range(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"mutation": [0.4, 0.8],
"maxiter": 10,
"seed": 42,
})
assert resp.status_code == 200
def test_api_rejects_invalid_mutation(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"mutation": [0.8, 0.4], # min > max
"run_de": False,
})
assert resp.status_code == 400
def test_api_accepts_recombination(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"recombination": 0.95,
"maxiter": 10,
"seed": 42,
})
assert resp.status_code == 200
def test_api_rejects_invalid_recombination(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"recombination": 1.5,
"run_de": False,
})
assert resp.status_code == 400
def test_api_accepts_crossover_mode(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"crossover_mode": "dimension",
"maxiter": 10,
"seed": 42,
})
assert resp.status_code == 200
def test_api_rejects_invalid_crossover_mode(self):
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"crossover_mode": "bogus",
"run_de": False,
})
assert resp.status_code == 400
def test_api_backward_compatible_new_fields(self):
"""旧 payload无新字段应继续正常工作。"""
resp = _post_app("/optimize", {
"devices": [{"id": "test_device", "name": "Test"}],
"lab": {"width": 5, "depth": 4},
"maxiter": 10,
"seed": 42,
})
assert resp.status_code == 200
data = resp.json()
assert data["de_ran"] is True
class TestOrientationConstraint:
"""prefer_orientation_mode 不再自动注入测试。"""
def test_no_auto_orientation_constraint(self):
"""不带 face_outward/face_inward 意图时不应有 prefer_orientation_mode 约束。"""
from ..models import Constraint as ConstraintModel
from ..server import _expand_constraints_for_duplicates
# 模拟 server 逻辑:构建 constraints 但不自动注入 orientation
constraints = [
ConstraintModel(type="soft", rule_name="prefer_aligned", weight=1.0),
]
# 验证没有 prefer_orientation_mode
assert not any(c.rule_name == "prefer_orientation_mode" for c in constraints)

File diff suppressed because it is too large Load Diff