certflow.handlers.id_generator 源代码

"""唯一键生成器 - 分层策略(配置驱动)

新数据(有生产令号):用配置的稳定字段
老数据(无生产令号):用配置的内容字段
"""

import hashlib
import json
from datetime import datetime
from typing import Any

import pandas as pd
from loguru import logger


[文档] class IDGenerator: """分层唯一键生成器(配置驱动) 提供合格证编号生成、基于数据内容的唯一键生成及批量生成功能。 唯一键采用分层策略,通过配置文件 id_generator 节点驱动字段选择。 分层策略: - 新数据模式:当配置的 new_data_fields 字段全部有值时使用 (默认: sales_order_no + production_order_no) - 老数据模式:当 new_data_fields 任一为空时降级使用 (默认: 9个内容字段) 配置节点: id_generator.new_data_fields: 新数据稳定字段列表 id_generator.old_data_fields: 老数据内容匹配字段列表 id_generator.empty_marker_prefix: 空值标记前缀(默认 "【空白") 向后兼容: generate_unique_key(data, keys) 指定 keys 参数时走旧逻辑 主要功能: - generate_certificate_no(): 生成合格证编号 - generate_unique_key(): 生成MD5唯一键(支持分层策略) - generate_unique_key_readable(): 生成可读唯一键(调试用) - generate_batch_unique_keys(): 批量生成唯一键 - get_all_unique_key_fields(): 获取所有唯一键字段(供去重配置使用) - normalize_empty_values(): 空值规范化 - normalize_dataframe_empty_values(): DataFrame空值规范化 """ _field_name_cn_cache: dict[str, str] | None = None _new_data_fields_cache: list[str] | None = None _old_data_fields_cache: list[str] | None = None _empty_marker_prefix_cache: str | None = None @classmethod def _get_new_data_fields(cls) -> list[str]: """获取新数据模式字段(从配置读取,带缓存)""" if cls._new_data_fields_cache is not None: return cls._new_data_fields_cache try: from certflow.config.settings import _cfg cls._new_data_fields_cache = _cfg( "id_generator.new_data_fields", ["sales_order_no", "production_order_no"] ) except Exception as e: logger.warning(f"读取 new_data_fields 配置失败: {e},使用默认值") cls._new_data_fields_cache = ["sales_order_no", "production_order_no"] return cls._new_data_fields_cache @classmethod def _get_old_data_fields(cls) -> list[str]: """获取老数据模式字段(从配置读取,带缓存)""" if cls._old_data_fields_cache is not None: return cls._old_data_fields_cache try: from certflow.config.settings import _cfg cls._old_data_fields_cache = _cfg( "id_generator.old_data_fields", [ "plan_date", "customer", "project_name", "product_name", "product_model", "product_spec", "quantity", ], ) except Exception as e: logger.warning(f"读取 old_data_fields 配置失败: {e},使用默认值") cls._old_data_fields_cache = [ "plan_date", "customer", "project_name", "product_name", "product_model", "product_spec", "quantity", ] return cls._old_data_fields_cache @classmethod def _get_empty_marker_prefix(cls) -> str: """获取空值标记前缀(从配置读取,带缓存) 用于识别被 normalize 处理过的空值标记,如"【空白合同号】"。 当分层策略中 new_data_fields 的值为该前缀开头时,视为空值。 Returns: str: 空值标记前缀,默认 "【空白" """ if cls._empty_marker_prefix_cache is not None: return cls._empty_marker_prefix_cache try: from certflow.config.settings import _cfg cls._empty_marker_prefix_cache = _cfg("id_generator.empty_marker_prefix", "【空白") except Exception as e: logger.warning(f"读取 empty_marker_prefix 配置失败: {e},使用默认值") cls._empty_marker_prefix_cache = "【空白" return cls._empty_marker_prefix_cache @classmethod def _get_field_name_cn(cls) -> dict[str, str]: """获取英文字段名到中文字段名的映射 从配置文件的 sales_plan.column_mapping.direct 反向构建映射关系。 用于生成空值标记时的中文显示。 Returns: Dict[str, str]: 英文字段名到中文字段名的映射字典 Examples: >>> mapping = IDGenerator._get_field_name_cn() >>> print(mapping.get("contract_no")) "合同号" """ if cls._field_name_cn_cache is not None: return cls._field_name_cn_cache try: from certflow.config.settings import _cfg # 从 direct 映射反向构建 direct_mapping = _cfg("sales_plan.column_mapping.direct", {}) # direct_mapping 格式: {"合同号": "contract_no", "计划日期": "plan_date", ...} # 反向映射: 英文字段名 -> 中文字段名 reverse_mapping = {} for cn_name, en_field in direct_mapping.items(): reverse_mapping[en_field] = cn_name cls._field_name_cn_cache = reverse_mapping logger.debug(f"加载字段中文名映射: {len(reverse_mapping)} 个字段") except Exception as e: logger.warning(f"加载字段中文名映射失败: {e},将使用英文字段名") cls._field_name_cn_cache = {} return cls._field_name_cn_cache @classmethod def _get_empty_marker(cls, field_name: str) -> str: """获取空值标记 生成统一格式的空值标记,用于在数据中标识缺失字段。 格式为 "【空白中文字段名】"。 Args: field_name: 英文字段名 Returns: str: 空值标记字符串,如"【空白合同号】" Examples: >>> marker = IDGenerator._get_empty_marker("contract_no") >>> print(marker) "【空白合同号】" """ cn_map = cls._get_field_name_cn() cn_name = cn_map.get(field_name, field_name) return f"【空白{cn_name}】"
[文档] @staticmethod def generate_certificate_no(prefix: str = "CERT", sequence: int | None = None) -> str: """生成合格证编号 生成格式为 CERT-YYYYMMDD-NNNN 的合格证编号, 序号部分自动补零至4位。 Args: prefix: 编号前缀,默认为"CERT" sequence: 序号,如果为None则默认为1 Returns: str: 生成的合格证编号字符串 Examples: >>> # 生成默认格式的编号 >>> cert_no = IDGenerator.generate_certificate_no() >>> print(cert_no) # 输出: CERT-20231201-0001 >>> >>> # 生成自定义前缀和序号 >>> cert_no = IDGenerator.generate_certificate_no(prefix="QT", sequence=5) >>> print(cert_no) # 输出: QT-20231201-0005 """ date_str = datetime.now().strftime("%Y%m%d") if sequence is None: sequence = 1 return f"{prefix}-{date_str}-{sequence:04d}"
[文档] @classmethod def generate_unique_key(cls, data: dict[str, Any], keys: list[str] | None = None) -> str: """生成唯一键(分层策略) Args: data: 数据字典,包含需要生成唯一键的字段 keys: 指定字段列表。有值时走旧逻辑(向后兼容), None 时走分层策略 Returns: str: 16位MD5哈希值 """ # 指定 keys:走旧逻辑 if keys: filtered_data = {k: data.get(k, "") for k in keys} exclude_keys = ["_original_order", "_sort_group"] filtered_data = {k: v for k, v in filtered_data.items() if k not in exclude_keys} processed_data = {} for k, v in filtered_data.items(): if v is None or v == "": processed_data[k] = cls._get_empty_marker(k) else: processed_data[k] = str(v).strip() key_str = json.dumps(processed_data, sort_keys=True, ensure_ascii=False) return hashlib.md5(key_str.encode("utf-8")).hexdigest()[:16] # 分层策略 new_fields = cls._get_new_data_fields() old_fields = cls._get_old_data_fields() new_values = [] for f in new_fields: v = str(data.get(f, "")).strip() if v.startswith(cls._get_empty_marker_prefix()): v = "" new_values.append(v) if all(new_values): key_str = "|".join(new_values) else: parts = [] for field in old_fields: v = str(data.get(field, "")).strip() parts.append(v if v else cls._get_empty_marker(field)) key_str = "|".join(parts) return hashlib.md5(key_str.encode("utf-8")).hexdigest()[:16]
[文档] @classmethod def generate_unique_key_readable( cls, data: dict[str, Any], keys: list[str] | None = None ) -> str: """生成可读唯一键(分层策略,调试用) Args: data: 数据字典 keys: 指定字段列表。有值时走旧逻辑,None 时走分层策略 Returns: str: 可读的唯一键字符串,字段值之间用"::"分隔 """ if keys: filtered_data = {k: data.get(k, "") for k in keys} exclude_keys = ["_original_order", "_sort_group"] filtered_data = {k: v for k, v in filtered_data.items() if k not in exclude_keys} key_parts = [] for k in keys: v = filtered_data.get(k, "") if v is None or v == "": key_parts.append(cls._get_empty_marker(k)) else: key_parts.append(str(v).strip()) return "::".join(key_parts) new_fields = cls._get_new_data_fields() old_fields = cls._get_old_data_fields() new_values = [] for f in new_fields: v = str(data.get(f, "")).strip() if v.startswith(cls._get_empty_marker_prefix()): v = "" new_values.append(v) if all(new_values): return "::".join(new_values) parts = [] for field in old_fields: v = str(data.get(field, "")).strip() parts.append(v if v else cls._get_empty_marker(field)) return "::".join(parts)
[文档] @staticmethod def generate_batch_unique_keys( data_list: list[dict[str, Any]], keys: list[str] | None = None ) -> list[dict[str, Any]]: """批量生成唯一键 为数据列表中的每条记录生成唯一键,并将键值写入"unique_key"字段。 Args: data_list: 数据字典列表 keys: 用于生成唯一键的字段列表,如果为None则使用所有字段 Returns: List[Dict[str, Any]]: 添加了unique_key字段的数据列表 Examples: >>> data_list = [ ... {"contract_no": "PO-001", "product_model": "阀门A"}, ... {"contract_no": "PO-002", "product_model": "阀门B"} ... ] >>> result = IDGenerator.generate_batch_unique_keys(data_list, ["contract_no"]) >>> for item in result: ... print(item["unique_key"]) """ result = [] for data in data_list: unique_key = IDGenerator.generate_unique_key(data, keys) data["unique_key"] = unique_key result.append(data) return result
[文档] @classmethod def normalize_empty_values( cls, data: dict[str, Any], keys: list[str] | None = None ) -> dict[str, Any]: """规范化字典中的空值 将 None 或空字符串转换为 【空白中文字段名】 格式。 用于统一空值表示,便于后续处理和识别。 Args: data: 原始数据字典 keys: 需要处理的字段列表,如果为None则处理所有字段 Returns: Dict[str, Any]: 空值规范化后的字典 Examples: >>> data = {"contract_no": "", "product_model": "阀门A", "quantity": None} >>> normalized = IDGenerator.normalize_empty_values(data, ["contract_no", "quantity"]) >>> print(normalized["contract_no"]) # 输出: "【空白合同号】" >>> print(normalized["quantity"]) # 输出: "【空白数量】" """ if keys is None: keys = list(data.keys()) normalized = {} for k in keys: v = data.get(k, "") if v is None or v == "": normalized[k] = cls._get_empty_marker(k) else: normalized[k] = str(v).strip() return normalized
[文档] @classmethod def normalize_dataframe_empty_values(cls, df: pd.DataFrame, fields: list[str]) -> pd.DataFrame: """规范化DataFrame中指定字段的空值 将DataFrame中指定字段的空值(NaN、None、空字符串)统一替换为格式化的空值标记。 适用于批量数据处理场景。 Args: df: 需要处理的DataFrame fields: 需要处理的字段列表 Returns: pd.DataFrame: 空值规范化后的DataFrame副本 Examples: >>> import pandas as pd >>> df = pd.DataFrame({ ... "contract_no": ["PO-001", "", None], ... "product_model": ["阀门A", "阀门B", "阀门C"] ... }) >>> cleaned = IDGenerator.normalize_dataframe_empty_values(df, ["contract_no"]) >>> print(cleaned["contract_no"][1]) # 输出: "【空白合同号】" """ df_clean = df.copy() for field in fields: if field not in df_clean.columns: continue # 获取该字段的空值标记 empty_marker = cls._get_empty_marker(field) # 替换空值 df_clean[field] = df_clean[field].fillna(empty_marker) df_clean[field] = df_clean[field].replace(["", "nan", "None"], empty_marker) # 确保转换为字符串 df_clean[field] = df_clean[field].astype(str).str.strip() return df_clean
[文档] @classmethod def get_all_unique_key_fields(cls) -> list[str]: """获取所有唯一键字段(去重合并,供 import_deduplication 使用) Returns: list[str]: 去重后的唯一键字段列表 """ new_fields = cls._get_new_data_fields() old_fields = cls._get_old_data_fields() seen = set() result = [] for f in new_fields + old_fields: if f not in seen: seen.add(f) result.append(f) return result