更新扣电组装驱动 coin_cell_assembly.py

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Andy6M
2026-05-22 14:52:00 +08:00
parent 86f1640efb
commit 865dd87556
3 changed files with 876 additions and 37 deletions

View File

@@ -6,7 +6,7 @@ import threading
import time
import types
from datetime import datetime
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from functools import wraps
from pylabrobot.resources import Deck, Resource as PLRResource
from unilabos_msgs.msg import Resource
@@ -1406,12 +1406,45 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
self._formulations_map = {
f["orderCode"]: f for f in formulations
} if formulations else {}
# ✅ 新增:存储配方列表(按接收顺序),用于索引访问
# ✅ 新增:存储配方列表(按接收顺序),用于索引访问(兜底用)
self._formulations_list = formulations
# ✅ 新增按分液瓶条码vial_bottle_barcodes反向索引配方
# 配液站夹爪取放顺序与扣电站夹取顺序可能不同,所以不能再依赖位置序号,
# 必须用扣电站扫码得到的 data_electrolyte_code 去对齐配液站登记的瓶条码。
# vial_bottle_barcodes 字段可能形如 "LG100114"(单瓶)或 '["LG100114","LG100115"]'(多瓶)。
self._formulations_by_vial_barcode: Dict[str, Dict] = {}
for f in formulations:
raw_barcodes = f.get("vial_bottle_barcodes", "")
if not raw_barcodes:
continue
barcodes: List[str] = []
if isinstance(raw_barcodes, list):
barcodes = [str(b).strip() for b in raw_barcodes if b]
else:
s = str(raw_barcodes).strip()
if s.startswith("[") and s.endswith("]"):
try:
parsed = json.loads(s)
if isinstance(parsed, list):
barcodes = [str(b).strip() for b in parsed if b]
else:
barcodes = [str(parsed).strip()]
except Exception:
barcodes = [s]
else:
barcodes = [s]
for bc in barcodes:
if bc and bc not in self._formulations_by_vial_barcode:
self._formulations_by_vial_barcode[bc] = f
logger.info(
f"已建立分液瓶条码 → 配方索引: {len(self._formulations_by_vial_barcode)}"
f"(条码: {list(self._formulations_by_vial_barcode.keys())})"
)
else:
logger.warning("未接收到配方信息CSV将不包含配方字段")
self._formulations_map = {}
self._formulations_list = []
self._formulations_by_vial_barcode = {}
# ✅ 新增:存储每瓶电池数,用于计算当前使用的瓶号
# ⚠️ 确保转换为整数(前端可能传递字符串)
@@ -1680,50 +1713,78 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
# 从 self._formulations_list 获取配方信息
if hasattr(self, '_formulations_list') and self._formulations_list:
# ✅ 新方案:根据电池编号和每瓶电池数计算当前瓶号
# 例如elec_use_num=2时电池1-2用瓶0电池3-4用瓶1
if hasattr(self, '_elec_use_num') and self._elec_use_num:
# ⚠️ 确保转换为整数(防御性编程)
elec_use_num_int = int(self._elec_use_num) if self._elec_use_num else 1
if elec_use_num_int > 0:
current_bottle_index = (data_battery_number - 1) // elec_use_num_int
# ============================================================
# ✅ 主方案:用扣电站扫码得到的电解液瓶条码 (data_electrolyte_code)
# 反查配液站登记的 vial_bottle_barcodes避免依赖夹爪取放顺序。
# 配液站和扣电站的瓶子顺序往往不一致(不同自动化设备的取放策略不同),
# 按位置序号匹配会错位;但每个瓶子的条码是唯一的,按条码匹配最可靠。
# ============================================================
formulation = None
match_method = ""
barcode_map = getattr(self, "_formulations_by_vial_barcode", {}) or {}
scan_code = (data_electrolyte_code or "").strip()
if scan_code and scan_code != "N/A" and barcode_map:
formulation = barcode_map.get(scan_code)
if formulation is not None:
match_method = f"按条码({scan_code})精确匹配"
else:
current_bottle_index = 0
logger.debug(
f"[CSV写入] 电池 {data_battery_number}: 计算瓶号索引={current_bottle_index} "
f"(每瓶{self._elec_use_num}颗电池)"
)
else:
# 降级方案:尝试从二维码解析(仅当参数未设置时)
current_bottle_index = int(data_electrolyte_code.split('-')[-1]) if '-' in str(data_electrolyte_code) else 0
logger.debug(
f"[CSV写入] 电池 {data_battery_number}: 从二维码解析瓶号索引={current_bottle_index}"
)
# 从配方列表中获取对应配方
if 0 <= current_bottle_index < len(self._formulations_list):
formulation = self._formulations_list[current_bottle_index]
logger.warning(
f"[CSV写入] 电池 {data_battery_number}: 扫码条码 {scan_code} "
f"在配方索引中找不到 (已登记条码: {list(barcode_map.keys())})"
)
# ============================================================
# 🔁 降级方案:扫码失败 / 条码缺失时按瓶号位置兜底
# 保留原有"每瓶电池数"或"二维码尾号"的位置推断逻辑,
# 确保在异常路径下仍能落盘(位置推断的结果可能不准,仅供回溯)。
# ============================================================
if formulation is None:
if hasattr(self, '_elec_use_num') and self._elec_use_num:
elec_use_num_int = int(self._elec_use_num) if self._elec_use_num else 1
if elec_use_num_int > 0:
current_bottle_index = (data_battery_number - 1) // elec_use_num_int
else:
current_bottle_index = 0
logger.debug(
f"[CSV写入] 电池 {data_battery_number}: 降级按瓶号索引={current_bottle_index} "
f"(每瓶{self._elec_use_num}颗电池)"
)
else:
current_bottle_index = (
int(data_electrolyte_code.split('-')[-1])
if '-' in str(data_electrolyte_code)
else 0
)
logger.debug(
f"[CSV写入] 电池 {data_battery_number}: 降级按二维码尾号瓶号索引={current_bottle_index}"
)
if 0 <= current_bottle_index < len(self._formulations_list):
formulation = self._formulations_list[current_bottle_index]
match_method = f"按位置兜底匹配[{current_bottle_index}]"
else:
logger.warning(
f"[CSV写入] 电池 {data_battery_number}: 瓶号索引 {current_bottle_index} "
f"超出配方列表范围 (共{len(self._formulations_list)}个配方)"
)
if formulation is not None:
formulation_order_name = formulation.get("orderName", "")
prep_bottle_barcode = formulation.get("prep_bottle_barcode", "")
vial_bottle_barcodes = formulation.get("vial_bottle_barcodes", "")
real_ratio = formulation.get("real_mass_ratio", {})
target_ratio = formulation.get("target_mass_ratio", {})
# 将配方比例转为JSON字符串
import json
target_ratio_str = json.dumps(target_ratio, ensure_ascii=False) if target_ratio else ""
real_ratio_str = json.dumps(real_ratio, ensure_ascii=False) if real_ratio else ""
logger.info(
f"[CSV写入] 电池 {data_battery_number}: 使用配方[{current_bottle_index}] "
f"orderName={formulation_order_name}, 配液瓶={prep_bottle_barcode}, 分液瓶={vial_bottle_barcodes}"
)
else:
logger.warning(
f"[CSV写入] 电池 {data_battery_number}: 瓶号索引 {current_bottle_index} "
f"超出配方列表范围 (共{len(self._formulations_list)}个配方)"
f"[CSV写入] 电池 {data_battery_number} ({match_method}): "
f"orderName={formulation_order_name}, 配液瓶={prep_bottle_barcode}, "
f"分液瓶={vial_bottle_barcodes}"
)
else:
logger.debug(f"[CSV写入] 电池 {data_battery_number}: 未找到配方信息数据")