diff --git a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py index c14ee126..d9c8767d 100644 --- a/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py +++ b/unilabos/devices/workstation/bioyond_studio/sirna_station/sirna_station.py @@ -557,12 +557,27 @@ class BioyondSirnaStation(BioyondWorkstation): or self._kwarg_text(kwargs, "device_id") or "bioyond_sirna_station" ) + logger.info( + "小核酸实验1提交开始: sample_throughput=%s target_device=%s " + "auto_register_materials=%s overrides=%s", + sample_throughput, + target_device, + bool(auto_register_materials), + bool(parameter_overrides), + ) del timeout_seconds, assignee_user_ids, kwargs rpc = self._require_hardware_interface("create_order") # 自动解析实验1工作流(无需用户指定workflow_name) workflow = self._resolve_experiment_1_workflow(rpc) + logger.info( + "小核酸实验1工作流已解析: root=%s(%s) sub=%s(%s)", + workflow.get("workflow_name", ""), + workflow.get("root_workflow_id", ""), + workflow.get("sub_workflow_name", ""), + workflow.get("sub_workflow_id", ""), + ) step_data = rpc.workflow_step_query(workflow["sub_workflow_id"]) @@ -575,7 +590,16 @@ class BioyondSirnaStation(BioyondWorkstation): include_all_task_displayable=True, ) if not param_values: + logger.error("小核酸实验1参数构建失败: LIMS 子工作流未返回可用 paramValues") raise RuntimeError("未从 LIMS 子工作流参数中提取到 create_order paramValues") + param_entry_count = sum(len(entries) for entries in param_values.values()) + logger.info( + "小核酸实验1参数已构建: steps=%s entries=%s template_items=%s overrides=%s", + len(param_values), + param_entry_count, + len(parameter_template), + len(parsed_overrides), + ) resolved_order_code, resolved_order_name = self._build_bioyond_order_identity("", order_name) order_payload = [ @@ -611,6 +635,31 @@ class BioyondSirnaStation(BioyondWorkstation): material_records=material_records, suggested_locations=suggested_locations, ) + material_type_counts = { + str(key): len(self._as_list(value)) + for key, value in confirmation_data.get("materials_by_type", {}).items() + } + create_success = self._create_result_success(parsed_result, order_ids, material_records) + logger.info( + "小核酸实验1创建结果: success=%s order_ids=%s materials=%s " + "suggested_locations=%s material_types=%s", + create_success, + order_ids, + len(material_records), + len(suggested_locations), + material_type_counts, + ) + if not create_success: + logger.error( + "小核酸实验1创建未返回成功结果: order_code=%s order_ids=%s materials=%s", + resolved_order_code, + order_ids, + len(material_records), + ) + elif not material_records: + logger.warning("小核酸实验1创建成功但未返回物料分配记录: order_code=%s", resolved_order_code) + elif not suggested_locations: + logger.warning("小核酸实验1创建成功但未解析到推荐库位: order_code=%s", resolved_order_code) registration_result = { "registered": [], @@ -627,7 +676,7 @@ class BioyondSirnaStation(BioyondWorkstation): # registration_result = {"error": str(e)} result = { - "success": self._create_result_success(parsed_result, order_ids, material_records), + "success": create_success, "order_code": resolved_order_code, "order_name": resolved_order_name, "order_id": order_ids[0] if order_ids else "", @@ -652,6 +701,15 @@ class BioyondSirnaStation(BioyondWorkstation): "registration_result": registration_result, } result.update(result["manual_load_probe"]) + logger.info( + "小核酸实验1提交完成: order_code=%s success=%s manual_load_rows=%s", + resolved_order_code, + result["success"], + { + key: len(self._as_list(value.get("material_name") if isinstance(value, dict) else [])) + for key, value in result["manual_load_tables"].items() + }, + ) return result def _parse_parameter_overrides_text(self, text: str) -> Dict[str, Any]: @@ -821,6 +879,11 @@ class BioyondSirnaStation(BioyondWorkstation): gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)} if required and not ticked: missing_labels.append(label) + if missing_labels: + logger.warning( + "小核酸实验启动存在未确认装载项,但当前动作按兼容策略继续: missing=%s", + missing_labels, + ) # if missing_labels: # raise RuntimeError( # f"以下分类装载尚未确认,无法启动调度: {', '.join(missing_labels)}" @@ -829,9 +892,19 @@ class BioyondSirnaStation(BioyondWorkstation): start_info = self._resolve_start_experiment_info( submit_experiment_result, order_id, order_ids ) + logger.info( + "小核酸实验启动检查: order_id=%s order_ids=%s gates=%s missing=%s", + start_info.get("order_id", ""), + start_info.get("order_ids", []), + gates, + missing_labels, + ) rpc = self._require_hardware_interface("scheduler_start") logger.info("正在启动小核酸调度器") result = rpc.scheduler_start() + logger.info("小核酸调度器启动返回: result=%s success=%s", result, result == 1) + if result != 1: + logger.error("小核酸调度器启动失败或返回非成功码: result=%s", result) return self._with_ready_signal({ "success": result == 1, "return_info": result, @@ -954,6 +1027,15 @@ class BioyondSirnaStation(BioyondWorkstation): manifest = self._build_unload_materials_by_type( order_id_filter=order_filter or None, ) + manifest_counts = { + mode_key: len(self._as_list(records)) + for mode_key, records in manifest.items() + } + logger.info( + "小核酸实验卸载检查开始: order_id=%s manifest_counts=%s", + order_filter or "", + manifest_counts, + ) unload_tables = self._build_manual_load_tables(manifest) category_specs = [ @@ -968,12 +1050,15 @@ class BioyondSirnaStation(BioyondWorkstation): gates[gate_key] = {"label": label, "required": required, "ticked": bool(ticked)} if required and not ticked: missing_labels.append(label) + logger.info("小核酸实验卸载门禁状态: gates=%s missing=%s", gates, missing_labels) if missing_labels: + logger.error("小核酸实验卸载被阻断: missing=%s", missing_labels) raise RuntimeError( f"以下分类卸载尚未确认,无法清空资源树: {', '.join(missing_labels)}" ) cleared = self._clear_unloaded_materials(manifest) + logger.info("小核酸实验卸载清理完成: cleared=%s", cleared) # Refresh the table snapshot so consumers see the post-mutation state. post_manifest = self._build_unload_materials_by_type( @@ -1098,6 +1183,18 @@ class BioyondSirnaStation(BioyondWorkstation): order_id_value = order_ids[0] else: order_id_value = "" + logger.info( + "Bioyond LIMS 订单列表查询完成: raw_items=%s orders=%s selected_order_id=%s", + len(items), + len(orders), + order_id_value, + ) + if not orders: + logger.warning( + "Bioyond LIMS 订单列表未查询到结果: filter=%r status=%r", + filter_text, + status, + ) return { "success": bool(orders), @@ -1107,6 +1204,348 @@ class BioyondSirnaStation(BioyondWorkstation): "query": query_payload, } + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={ + "order_id": "", + "timeout_seconds": 3600, + "assignee_user_ids": [], + }, + feedback_interval=300, + description="只读查询 Bioyond LIMS 订单报告", + handles=[ + ActionOutputHandle( + key="order_id", + data_type="bioyond_order_id", + label="实验ID", + data_key="order_id", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="report", + data_type="bioyond_order_report", + label="订单报告", + data_key="report", + data_source=DataSource.EXECUTOR, + ), + ], + ) + def get_order_report( + self, + order_id: str = "", + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """只读查询 Bioyond LIMS 订单报告。 + + ``api_host`` / ``api_key`` 隐藏不作为前端可见参数,可由执行器 kwargs 注入覆盖。 + + Args: + order_id: Bioyond LIMS 订单 ID。 + timeout_seconds: 超时时间(秒,框架参数)。 + assignee_user_ids: 分配用户 ID 列表(框架参数)。 + + Returns: + ``{"success": bool, "order_id": str, "report": {...}, "raw": ...}``。 + """ + with self._debug_call_session("get_order_report"): + del timeout_seconds, assignee_user_ids + normalized_order_id = str(order_id or "").strip() + if not normalized_order_id: + raise ValueError("order_id 不能为空") + + api_host = self._kwarg_text(kwargs, "api_host") + api_key = self._kwarg_text(kwargs, "api_key") + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + + rpc = self._require_hardware_interface("order_report") + logger.info("正在查询 Bioyond LIMS 订单报告 order_id=%s", normalized_order_id) + raw_result = rpc.order_report(normalized_order_id) + result = self._normalize_order_report_result(normalized_order_id, raw_result) + report_data = result.get("report") + logger.info( + "Bioyond LIMS 订单报告查询完成: order_id=%s success=%s report_keys=%s", + result.get("order_id", normalized_order_id), + result.get("success"), + list(report_data.keys()) if isinstance(report_data, dict) else type(report_data).__name__, + ) + if not result.get("success"): + logger.warning( + "Bioyond LIMS 订单报告未返回可用数据: order_id=%s message=%s", + normalized_order_id, + result.get("message", ""), + ) + return result + + @action( + always_free=True, + node_type=NodeType.MANUAL_CONFIRM, + placeholder_keys={"assignee_user_ids": "unilabos_manual_confirm"}, + goal_default={ + "order_id": "", + "filter_text": "", + "include_order_list": True, + "include_order_report": True, + "include_gantt": True, + "include_gantt_with_simulation": True, + "include_material_info": True, + "include_raw": True, + "timeout_seconds": 3600, + "assignee_user_ids": [], + }, + feedback_interval=300, + description="聚合 Bioyond LIMS 前端样式订单报告", + handles=[ + ActionOutputHandle( + key="order_id", + data_type="bioyond_order_id", + label="实验ID", + data_key="order_id", + data_source=DataSource.EXECUTOR, + ), + ActionOutputHandle( + key="report", + data_type="bioyond_frontend_like_order_report", + label="聚合订单报告", + data_key="report", + data_source=DataSource.EXECUTOR, + ), + ], + ) + def get_aggregated_order_report( + self, + order_id: str = "", + filter_text: str = "", + include_order_list: bool = True, + include_order_report: bool = True, + include_gantt: bool = True, + include_gantt_with_simulation: bool = True, + include_material_info: bool = True, + include_raw: bool = True, + timeout_seconds: int = 3600, + assignee_user_ids: Optional[List[str]] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + """聚合 Bioyond LIMS 前端样式订单报告。 + + Args: + order_id: Bioyond LIMS 订单 ID。为空时用 ``filter_text`` 从订单列表解析。 + filter_text: 订单名称 / 编号 / ID 过滤文本。 + include_order_list: 是否查询订单列表摘要。 + include_order_report: 是否查询订单报告详情。 + include_gantt: 是否允许普通甘特图作为时间线回退。 + include_gantt_with_simulation: 是否优先查询合并模拟甘特图。 + include_material_info: 是否对订单物料 ID 查询物料详情。 + include_raw: 是否返回各分段原始响应和错误。 + timeout_seconds: 超时时间(秒,框架参数)。 + assignee_user_ids: 分配用户 ID 列表(框架参数)。 + + Returns: + 前端报告调试结构,包含 header、parameters、samples、materials、timeline、raw/errors。 + """ + with self._debug_call_session("get_aggregated_order_report"): + del timeout_seconds, assignee_user_ids + api_host = self._kwarg_text(kwargs, "api_host") + api_key = self._kwarg_text(kwargs, "api_key") + self._update_runtime_api_config(api_host=api_host, api_key=api_key) + + requested_order_id = str(order_id or "").strip() + query_filter = str(filter_text or requested_order_id or "").strip() + if not requested_order_id and not query_filter: + raise ValueError("order_id 和 filter_text 不能同时为空") + + rpc = self._require_hardware_interface_for_reset() + included_sections = { + "order_list": bool(include_order_list), + "order_report": bool(include_order_report), + "gantt": bool(include_gantt), + "gantt_with_simulation": bool(include_gantt_with_simulation), + "material_info": bool(include_material_info), + "raw": bool(include_raw), + } + logger.info( + "聚合订单报告查询开始: order_id=%s filter=%s sections=%s", + requested_order_id, + query_filter, + included_sections, + ) + raw_sections: Dict[str, Any] = {} + errors: List[str] = [] + warnings: List[str] = [] + + order_list_payload: Any = None + order_items: List[Dict[str, Any]] = [] + if include_order_list: + query_payload = self._order_list_query_payload(query_filter) + order_list_payload = self._call_order_query_for_report( + rpc, + query_payload, + errors, + fallback_on_empty=include_raw, + ) + if include_raw: + raw_sections["order_list"] = order_list_payload + self._append_lims_section_error(order_list_payload, errors, "order_list") + order_items = self._order_items(order_list_payload) + + order_record = self._select_order_record(order_items, requested_order_id, query_filter) + if requested_order_id and order_items and order_record is None: + warnings.append("order_list 未返回与 order_id 完全匹配的订单,跳过订单列表摘要合并") + resolved_order_id = requested_order_id + if order_record is not None and order_record.get("id"): + resolved_order_id = str(order_record["id"]) + if not resolved_order_id: + raise RuntimeError("无法解析 Bioyond 订单 ID,请提供 order_id 或可命中的 filter_text") + logger.info( + "聚合订单报告已解析订单: resolved_order_id=%s order_list_items=%s matched=%s", + resolved_order_id, + len(order_items), + bool(order_record), + ) + + report_payload: Any = None + normalized_report: Dict[str, Any] = { + "success": False, + "order_id": resolved_order_id, + "report": {}, + "raw": {}, + } + if include_order_report: + report_payload = self._call_single_arg_lims_section( + rpc, + "order_report", + "/api/lims/order/order-report", + resolved_order_id, + errors, + "order_report", + fallback_on_empty=True, + ) + normalized_report = self._normalize_order_report_result(resolved_order_id, report_payload) + if not normalized_report.get("success"): + message = str(normalized_report.get("message") or "order_report 未返回可用 data") + logger.error( + "Bioyond LIMS order_report 调用失败 order_id=%s: %s", + resolved_order_id, + message, + ) + errors.append(f"order_report: {message}") + if include_raw: + raw_sections["order_report"] = report_payload + + report_data = normalized_report.get("report") + if not isinstance(report_data, dict): + report_data = {} + + timeline: List[Dict[str, Any]] = [] + timeline_source = "" + if include_gantt_with_simulation: + gantt_payload = self._call_single_arg_lims_section( + rpc, + "gantt_with_simulation_by_order_id", + "/api/lims/order/gantt-with-simulation-by-order-id", + resolved_order_id, + errors, + "gantt_with_simulation", + fallback_on_empty=include_raw, + ) + if include_raw: + raw_sections["gantt_with_simulation"] = gantt_payload + self._append_lims_section_error(gantt_payload, errors, "gantt_with_simulation") + timeline = self._gantt_items(gantt_payload) + if timeline: + timeline_source = "gantt_with_simulation" + if not timeline and include_gantt: + gantt_payload = self._call_single_arg_lims_section( + rpc, + "gantts_by_order_id", + "/api/lims/order/gantts-by-order-id", + resolved_order_id, + errors, + "gantt", + fallback_on_empty=include_raw, + ) + if include_raw: + raw_sections["gantt"] = gantt_payload + self._append_lims_section_error(gantt_payload, errors, "gantt") + timeline = self._gantt_items(gantt_payload) + if timeline: + timeline_source = "gantt" + if (include_gantt or include_gantt_with_simulation) and not timeline: + warnings.append("未获取到甘特图时间线") + + material_ids = self._order_material_ids(order_record, report_data) + logger.info("聚合订单报告物料详情查询准备: material_ids=%s", len(material_ids)) + material_info_by_id: Dict[str, Any] = {} + if include_material_info: + for material_id in material_ids: + material_payload = self._call_single_arg_lims_section( + rpc, + "material_info", + "/api/lims/storage/material-info", + material_id, + errors, + f"material_info:{material_id}", + fallback_on_empty=include_raw, + ) + material_data = self._service_data_or_value(material_payload) + if isinstance(material_data, dict): + material_info_by_id[material_id] = material_data + if include_raw: + raw_sections.setdefault("material_info", {})[material_id] = material_payload + self._append_lims_section_error(material_payload, errors, f"material_info:{material_id}") + + header = self._build_aggregated_report_header(order_record, report_data) + if not header.get("order_id"): + header["order_id"] = resolved_order_id + parameters = self._build_aggregated_report_parameters(report_data, warnings) + samples = self._build_aggregated_report_samples(order_record, report_data, material_info_by_id) + reagents, consumables = self._split_used_materials(report_data) + + success = bool(order_record or report_data or timeline or samples) + logger.info( + "聚合订单报告查询完成: order_id=%s success=%s samples=%s reagents=%s " + "consumables=%s timeline=%s source=%s errors=%s warnings=%s", + resolved_order_id, + success, + len(samples), + len(reagents), + len(consumables), + len(timeline), + timeline_source, + len(errors), + len(warnings), + ) + result: Dict[str, Any] = { + "success": success, + "order_id": resolved_order_id, + "order_code": header.get("code", ""), + "order_name": header.get("name", ""), + "included_sections": included_sections, + "header": header, + "parameters": parameters, + "samples": samples, + "reagents": reagents, + "consumables": consumables, + "timeline": timeline, + "timeline_source": timeline_source, + "errors": errors, + "warnings": warnings, + } + if include_raw: + result["raw"] = raw_sections + result["report"] = { + "header": header, + "parameters": parameters, + "samples": samples, + "reagents": reagents, + "consumables": consumables, + "timeline": timeline, + } + return result + def _require_hardware_interface(self, method_name: str) -> Any: rpc = getattr(self, "hardware_interface", None) if rpc is None: @@ -1194,8 +1633,10 @@ class BioyondSirnaStation(BioyondWorkstation): or "" ) workflow_query = {"type": 0, "filter": workflow_name or sub_workflow_name, "includeDetail": True} + logger.info("正在解析小核酸工作流: filter=%r", workflow_query["filter"]) workflow_data = rpc.query_workflow(json.dumps(workflow_query, ensure_ascii=False)) workflow_items = self._workflow_items(workflow_data) + logger.info("小核酸工作流查询返回: items=%s", len(workflow_items)) roots_with_children = [item for item in workflow_items if self._sub_workflow_records(item)] root_candidates = roots_with_children or workflow_items if not workflow_name and sub_workflow_name: @@ -1208,10 +1649,17 @@ class BioyondSirnaStation(BioyondWorkstation): root_candidates = roots_matching_sub root = self._select_workflow_record(root_candidates, workflow_name) if not root: + logger.error( + "未从 LIMS 查询到可用的小核酸工作流: workflow_name=%r sub_workflow_name=%r items=%s", + workflow_name, + sub_workflow_name, + len(workflow_items), + ) raise RuntimeError("未从 LIMS 查询到可用的小核酸工作流") sub = self._select_workflow_record(self._sub_workflow_records(root), sub_workflow_name) if not sub: label = self._record_name(root) or workflow_name or self._record_id(root) + logger.error("小核酸工作流缺少可用子工作流: root=%s sub_filter=%r", label, sub_workflow_name) raise RuntimeError(f"工作流 {label} 缺少可用子工作流") sub_id = self._record_id(sub) self._require_uuid(sub_id, "workFlowId") @@ -1233,6 +1681,7 @@ class BioyondSirnaStation(BioyondWorkstation): ) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]: parameter_map = self._extract_workflow_parameter_map(step_data) if not isinstance(parameter_map, dict): + logger.error("workflow_step_query 未返回可解析的步骤参数: type=%s", type(parameter_map).__name__) raise RuntimeError("workflow_step_query 未返回可解析的步骤参数") param_values: Dict[str, List[Dict[str, Any]]] = {} parameter_template: List[Dict[str, Any]] = [] @@ -1309,6 +1758,7 @@ class BioyondSirnaStation(BioyondWorkstation): override_items.extend((str(key), value) for key, value in override.items()) continue if "=" not in str(override): + logger.error("参数覆盖格式错误: override=%r", override) raise ValueError(f"参数覆盖必须使用 key=value 格式: {override!r}") key, value = str(override).split("=", 1) override_items.append((key, value)) @@ -1321,6 +1771,7 @@ class BioyondSirnaStation(BioyondWorkstation): entry["value"] = value matched = True if not matched: + logger.error("参数覆盖未命中 paramValues: key=%s", key) raise ValueError(f"paramValues 中找不到可覆盖参数: {key}") def _build_bioyond_order_identity( @@ -1374,21 +1825,33 @@ class BioyondSirnaStation(BioyondWorkstation): "synchronizer": "BioyondResourceSynchronizer", "refresh_material_cache": {"skipped": True, "reason": "disabled"}, } + logger.info( + "共享 Bioyond 外部物料同步开始: refresh_material_cache=%s", + bool(refresh_material_cache), + ) if refresh_material_cache: if hasattr(rpc, "refresh_material_cache"): result["refresh_material_cache"] = rpc.refresh_material_cache() + logger.info( + "共享 Bioyond 外部物料缓存刷新完成: result=%s", + result["refresh_material_cache"], + ) else: result["refresh_material_cache"] = { "skipped": True, "reason": "rpc_method_unavailable", } + logger.warning("共享 Bioyond 外部物料缓存刷新跳过: RPC 缺少 refresh_material_cache") try: synchronizer = BioyondResourceSynchronizer(self) result["success"] = bool(synchronizer.sync_from_external()) if result["success"]: self._publish_resource_tree_update() + logger.info("共享 Bioyond 外部物料同步完成并已发布资源树更新") + else: + logger.warning("共享 Bioyond 外部物料同步未返回成功,继续返回结果供上游处理") except Exception as exc: logger.error(f"共享 Bioyond 外部物料同步失败: {exc}") result["error"] = str(exc) @@ -1409,6 +1872,13 @@ class BioyondSirnaStation(BioyondWorkstation): "experiment_1_reset_order_filter", "sirna_exp1_reset_order_filter", ) or "" + logger.info( + "小核酸复位操作开始: operations=%s reset_order_id=%s reset_location_id=%s cleanup_order_code=%s", + operations, + reset_order_id, + reset_location_id, + cleanup_order_code, + ) skipped_operations: List[Dict[str, str]] = [] if "reset_order_status" in operations: reset_order_id = self._resolve_reset_order_id(rpc, reset_order_id, cleanup_order_code) @@ -1417,6 +1887,7 @@ class BioyondSirnaStation(BioyondWorkstation): "operation": "reset_order_status", "reason": "未查询到可复位订单,跳过订单状态复位", }) + logger.warning("小核酸复位跳过订单状态复位: 未查询到可复位订单") if "reset_location" in operations: reset_location_id = self._resolve_reset_location_id(rpc, reset_location_id) @@ -1430,19 +1901,46 @@ class BioyondSirnaStation(BioyondWorkstation): if "scheduler_reset" in operations: self._require_rpc_method(rpc, "scheduler_reset") result["scheduler_reset"] = rpc.scheduler_reset() + logger.info("小核酸复位 scheduler_reset 返回: %s", result["scheduler_reset"]) + if result["scheduler_reset"] in ({}, None, False): + logger.warning("小核酸复位 scheduler_reset 未返回明确成功结果") if "reset_order_status" in operations and reset_order_id: self._require_rpc_method(rpc, "reset_order_status") result["reset_order_status"] = rpc.reset_order_status(reset_order_id) + logger.info( + "小核酸复位 reset_order_status 返回: order_id=%s result=%s", + reset_order_id, + result["reset_order_status"], + ) + if result["reset_order_status"] in ({}, None, False): + logger.warning("小核酸复位 reset_order_status 未返回明确成功结果: order_id=%s", reset_order_id) if "reset_location" in operations: self._require_rpc_method(rpc, "reset_location") result["reset_location"] = rpc.reset_location(reset_location_id) + logger.info( + "小核酸复位 reset_location 返回: location_id=%s result=%s", + reset_location_id, + result["reset_location"], + ) + if result["reset_location"] in ({}, None, False): + logger.warning("小核酸复位 reset_location 未返回明确成功结果: location_id=%s", reset_location_id) if "reset_order_status" in operations and reset_order_id: snapshot = self._query_order_snapshot(rpc, cleanup_order_code or reset_order_id) targets = self._extract_takeout_targets(snapshot, reset_order_id) result["cleanup_targets"] = targets + logger.info( + "小核酸复位清理检查: requires_take_out=%s preintakes=%s materials=%s", + targets["requires_take_out"], + len(targets["preintake_ids"]), + len(targets["material_ids"]), + ) if targets["requires_take_out"]: result["take_out"] = self._take_out_remaining_materials(rpc, targets) + logger.info("小核酸复位 take_out 返回: %s", result["take_out"]) + if result["take_out"] in ({}, None, False): + logger.warning("小核酸复位 take_out 未返回明确成功结果") + logger.info("小核酸复位操作完成: selected=%s skipped=%s", operations, len(skipped_operations)) return result def _resolve_reset_order_id(self, rpc: Any, reset_order_id: str = "", cleanup_order_code: str = "") -> str: @@ -1826,6 +2324,423 @@ class BioyondSirnaStation(BioyondWorkstation): return [item for item in parsed["items"] if isinstance(item, dict)] return [] + def _normalize_order_report_result(self, order_id: str, raw_result: Any) -> Dict[str, Any]: + parsed = self._parse_lims_result(raw_result) + report = parsed + success = bool(parsed) + message = "" + + if isinstance(parsed, dict) and "data" in parsed: + success = parsed.get("code") in {1, "1", None} and bool(parsed.get("data")) + message = str(parsed.get("message") or "") + report = self._parse_lims_result(parsed.get("data")) + + resolved_order_id = str(order_id or "").strip() + if isinstance(report, dict): + for key in ("id", "orderId", "order_id"): + value = report.get(key) + if not self._is_blank(value): + resolved_order_id = str(value) + break + + result: Dict[str, Any] = { + "success": bool(success), + "order_id": resolved_order_id, + "report": report if report is not None else {}, + "raw": parsed, + } + if message: + result["message"] = message + return result + + def _order_list_query_payload(self, filter_text: str, max_results: int = 20) -> Dict[str, Any]: + return { + "timeType": "", + "beginTime": None, + "endTime": None, + "status": "", + "filter": str(filter_text or ""), + "skipCount": 0, + "pageCount": max_results, + "sorting": "creationTime desc", + } + + def _call_order_query_for_report( + self, + rpc: Any, + query_payload: Dict[str, Any], + errors: List[str], + fallback_on_empty: bool = False, + ) -> Any: + if hasattr(rpc, "order_query"): + try: + result = rpc.order_query(json.dumps(query_payload, ensure_ascii=False)) + if fallback_on_empty and result in ({}, None, []): + return self._fallback_or_original_lims_section( + rpc, + "/api/lims/order/order-list", + query_payload, + errors, + "order_list", + result, + ) + return result + except Exception as exc: + errors.append(f"order_list: {exc}") + logger.warning("聚合订单报告 order_list 查询失败,继续使用空结果: %s", exc) + return {} + logger.warning("聚合订单报告 order_list 缺少专用 RPC 方法,尝试通用 POST fallback") + return self._post_lims_section( + rpc, + "/api/lims/order/order-list", + query_payload, + errors, + "order_list", + ) + + def _call_single_arg_lims_section( + self, + rpc: Any, + method_name: str, + endpoint: str, + data: str, + errors: List[str], + section_name: str, + fallback_on_empty: bool = False, + ) -> Any: + if hasattr(rpc, method_name): + try: + result = getattr(rpc, method_name)(data) + if fallback_on_empty and result in ({}, None, []): + logger.warning( + "聚合订单报告 %s 返回为空,尝试通用 POST fallback: endpoint=%s", + section_name, + endpoint, + ) + return self._fallback_or_original_lims_section( + rpc, + endpoint, + data, + errors, + section_name, + result, + ) + return result + except Exception as exc: + errors.append(f"{section_name}: {exc}") + logger.warning( + "聚合订单报告 %s 查询失败,继续使用空结果: %s", + section_name, + exc, + ) + return {} + logger.warning( + "聚合订单报告 %s 缺少 RPC 方法 %s,尝试通用 POST fallback", + section_name, + method_name, + ) + return self._post_lims_section(rpc, endpoint, data, errors, section_name) + + def _fallback_or_original_lims_section( + self, + rpc: Any, + endpoint: str, + data: Any, + errors: List[str], + section_name: str, + original: Any, + ) -> Any: + if not self._can_raw_post_lims_section(rpc): + logger.warning( + "聚合订单报告 %s 返回为空且 RPC 不支持通用 POST fallback,保留原始空结果", + section_name, + ) + return original + fallback = self._post_lims_section(rpc, endpoint, data, errors, section_name) + return fallback or original + + def _can_raw_post_lims_section(self, rpc: Any) -> bool: + return all(hasattr(rpc, attr) for attr in ("post", "host", "api_key", "get_current_time_iso8601")) + + def _post_lims_section( + self, + rpc: Any, + endpoint: str, + data: Any, + errors: List[str], + section_name: str, + ) -> Any: + if not self._can_raw_post_lims_section(rpc): + errors.append(f"{section_name}: Bioyond RPC 客户端缺少 {endpoint} 调用能力") + logger.error( + "聚合订单报告 %s 无法 fallback: RPC 缺少 %s 调用能力", + section_name, + endpoint, + ) + return {} + try: + return rpc.post( + url=f"{str(rpc.host).rstrip('/')}{endpoint}", + params={ + "apiKey": rpc.api_key, + "requestTime": rpc.get_current_time_iso8601(), + "data": data, + }, + ) or {} + except Exception as exc: + errors.append(f"{section_name}: {exc}") + logger.warning( + "聚合订单报告 %s fallback POST 失败,继续使用空结果: endpoint=%s error=%s", + section_name, + endpoint, + exc, + ) + return {} + + def _service_data_or_value(self, payload: Any) -> Any: + parsed = self._parse_lims_result(payload) + if isinstance(parsed, dict) and "data" in parsed and ( + "code" in parsed or "message" in parsed or "timestamp" in parsed + ): + return self._parse_lims_result(parsed.get("data")) + return parsed + + def _append_lims_section_error( + self, + payload: Any, + errors: List[str], + section_name: str, + ) -> None: + parsed = self._parse_lims_result(payload) + if not isinstance(parsed, dict) or "code" not in parsed: + return + if parsed.get("code") in {1, "1", None}: + return + message = str(parsed.get("message") or f"LIMS 返回 code={parsed.get('code')}") + entry = f"{section_name}: {message}" + if entry not in errors: + errors.append(entry) + + def _select_order_record( + self, + order_items: List[Dict[str, Any]], + order_id: str, + filter_text: str, + ) -> Optional[Dict[str, Any]]: + if not order_items: + return None + normalized_order_id = str(order_id or "").strip() + normalized_filter = str(filter_text or "").strip() + if normalized_order_id: + for item in order_items: + if str(item.get("id") or "") == normalized_order_id: + return item + return None + if normalized_filter: + for item in order_items: + candidates = { + str(item.get("id") or ""), + str(item.get("name") or ""), + str(item.get("code") or ""), + str(item.get("orderCode") or ""), + str(item.get("orderName") or ""), + } + if normalized_filter in candidates: + return item + return order_items[0] + + def _build_aggregated_report_header( + self, + order_record: Optional[Dict[str, Any]], + report_data: Dict[str, Any], + ) -> Dict[str, Any]: + source: Dict[str, Any] = {} + if order_record: + source.update(order_record) + if report_data: + for key, value in report_data.items(): + if key not in source or self._is_blank(source.get(key)): + source[key] = value + + sample_count = self._sample_count(source) + return { + "order_id": str(source.get("id") or source.get("orderId") or ""), + "name": str(source.get("name") or source.get("orderName") or ""), + "code": str(source.get("code") or source.get("orderCode") or ""), + "status": source.get("status"), + "statusName": str(source.get("statusName") or ""), + "requester": str(source.get("requester") or ""), + "workflowName": str(source.get("workflowName") or ""), + "sample_count": sample_count, + "sampleInfo": source.get("sampleInfo"), + "requestTime": source.get("requestTime"), + "startPreparationTime": source.get("startPreparationTime"), + "completeTime": source.get("completeTime"), + "useTime": source.get("useTime"), + "orderProgress": source.get("orderProgress"), + } + + def _sample_count(self, source: Dict[str, Any]) -> int: + sample_info = source.get("sampleInfo") + if isinstance(sample_info, (int, float)): + return int(sample_info) + if isinstance(sample_info, str) and sample_info.strip().isdigit(): + return int(sample_info.strip()) + sample_ids = set() + for preintake in self._as_list(source.get("preIntakes")): + if not isinstance(preintake, dict): + continue + for sample in self._as_list(preintake.get("sampleMaterials")): + if isinstance(sample, dict): + sample_id = sample.get("materialId") or sample.get("materialCode") or sample.get("sampleCode") + if sample_id: + sample_ids.add(str(sample_id)) + material_ids_text = str(preintake.get("materialIds") or "") + for material_id in material_ids_text.replace(";", "|").replace(",", "|").split("|"): + if material_id.strip(): + sample_ids.add(material_id.strip()) + return len(sample_ids) + + def _build_aggregated_report_parameters( + self, + report_data: Dict[str, Any], + warnings: List[str], + ) -> Dict[str, Any]: + raw_parameters = report_data.get("workflowParameters") if isinstance(report_data, dict) else None + if self._is_blank(raw_parameters): + warnings.append("order_report 未提供 workflowParameters,实验参数为空") + return { + "source": "order_report.workflowParameters", + "items": {}, + "reason": "order_report workflowParameters unavailable", + } + parsed = self._json_loads_if_string(raw_parameters) + return { + "source": "order_report.workflowParameters", + "items": parsed if isinstance(parsed, (dict, list)) else {}, + "raw": raw_parameters if not isinstance(parsed, (dict, list)) else None, + } + + def _order_material_ids( + self, + order_record: Optional[Dict[str, Any]], + report_data: Dict[str, Any], + ) -> List[str]: + material_ids: List[str] = [] + for source in (order_record or {}, report_data or {}): + for preintake in self._as_list(source.get("preIntakes")): + if not isinstance(preintake, dict): + continue + if preintake.get("materialId"): + material_ids.append(str(preintake["materialId"])) + material_ids_text = str(preintake.get("materialIds") or "") + for material_id in material_ids_text.replace(";", "|").replace(",", "|").split("|"): + if material_id.strip(): + material_ids.append(material_id.strip()) + for sample in self._as_list(preintake.get("sampleMaterials")): + if isinstance(sample, dict) and sample.get("materialId"): + material_ids.append(str(sample["materialId"])) + return list(dict.fromkeys(material_ids)) + + def _build_aggregated_report_samples( + self, + order_record: Optional[Dict[str, Any]], + report_data: Dict[str, Any], + material_info_by_id: Dict[str, Any], + ) -> List[Dict[str, Any]]: + sample_by_id: Dict[str, Dict[str, Any]] = {} + for source in (order_record or {}, report_data or {}): + for preintake in self._as_list(source.get("preIntakes")): + if not isinstance(preintake, dict): + continue + status_name = preintake.get("statusName") + throughput_code = preintake.get("code") + material_ids_text = str(preintake.get("materialIds") or "") + for sample in self._as_list(preintake.get("sampleMaterials")): + if not isinstance(sample, dict): + continue + material_id = str(sample.get("materialId") or "") + if not material_id: + continue + sample_by_id.setdefault(material_id, {}) + sample_by_id[material_id].update({ + "material_id": material_id, + "name": sample.get("materialName"), + "type": sample.get("materialTypeName"), + "code": sample.get("materialCode"), + "barcode": sample.get("materialBarCode") or sample.get("materialCode"), + "location": sample.get("materialLocation"), + "target_location": sample.get("materialTargetLocation"), + "sample_code": sample.get("sampleCode"), + "throughput_code": throughput_code, + "statusName": status_name, + }) + for material_id in material_ids_text.replace(";", "|").replace(",", "|").split("|"): + material_id = material_id.strip() + if material_id: + sample_by_id.setdefault(material_id, {"material_id": material_id, "throughput_code": throughput_code, "statusName": status_name}) + + for material_id, material_info in material_info_by_id.items(): + sample = sample_by_id.setdefault(material_id, {"material_id": material_id}) + if not isinstance(material_info, dict): + continue + sample.setdefault("name", material_info.get("name")) + sample.setdefault("type", material_info.get("typeName")) + sample.setdefault("code", material_info.get("code")) + sample.setdefault("barcode", material_info.get("barCode") or material_info.get("code")) + locations = self._as_list(material_info.get("locations")) + location = next((item for item in locations if isinstance(item, dict)), {}) + if isinstance(location, dict): + location_text = self._material_location_text(location) + if location_text: + sample["location"] = sample.get("location") or location_text + sample["target_location"] = sample.get("target_location") or location_text + sample["location_detail"] = location + sample["material_info"] = material_info + + return list(sample_by_id.values()) + + def _material_location_text(self, location: Dict[str, Any]) -> str: + wh_name = str(location.get("whName") or "") + code = str(location.get("code") or "") + if wh_name and code: + return f"{wh_name}:{code}" + return wh_name or code + + def _split_used_materials(self, report_data: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + reagents: List[Dict[str, Any]] = [] + consumables: List[Dict[str, Any]] = [] + for material in self._as_list(report_data.get("usedMaterials") if isinstance(report_data, dict) else None): + if not isinstance(material, dict): + continue + item = { + "name": material.get("holdMName") or material.get("materialName"), + "code": material.get("holdMCode") or material.get("materialCode"), + "type_name": material.get("holdMTypeName") or material.get("materialTypeName"), + "quantity": material.get("quantity"), + "unit": material.get("unit"), + "location": material.get("fromLocationCode") or material.get("toLocationCode"), + "batchCode": material.get("batchCode"), + "raw": material, + } + type_value = material.get("type") + name = str(item.get("name") or "") + if type_value == 2 or "试剂" in name: + reagents.append(item) + else: + consumables.append(item) + return reagents, consumables + + def _gantt_items(self, gantt_payload: Any) -> List[Dict[str, Any]]: + parsed = self._service_data_or_value(gantt_payload) + if isinstance(parsed, dict): + items = parsed.get("items") + if isinstance(items, list): + return [item for item in items if isinstance(item, dict)] + if isinstance(parsed, list): + return [item for item in parsed if isinstance(item, dict)] + return [] + def _extract_create_order_materials(self, result: Any) -> List[Dict[str, Any]]: parsed = self._parse_lims_result(result) if isinstance(parsed, dict) and "data" in parsed: