"""唯一键生成器 - 分层策略(配置驱动)
新数据(有生产令号):用配置的稳定字段
老数据(无生产令号):用配置的内容字段
"""
import hashlib
import json
from datetime import datetime
from typing import Any
import pandas as pd
from loguru import logger
[文档]
class IDGenerator:
"""分层唯一键生成器(配置驱动)
提供合格证编号生成、基于数据内容的唯一键生成及批量生成功能。
唯一键采用分层策略,通过配置文件 id_generator 节点驱动字段选择。
分层策略:
- 新数据模式:当配置的 new_data_fields 字段全部有值时使用
(默认: sales_order_no + production_order_no)
- 老数据模式:当 new_data_fields 任一为空时降级使用
(默认: 9个内容字段)
配置节点:
id_generator.new_data_fields: 新数据稳定字段列表
id_generator.old_data_fields: 老数据内容匹配字段列表
id_generator.empty_marker_prefix: 空值标记前缀(默认 "【空白")
向后兼容:
generate_unique_key(data, keys) 指定 keys 参数时走旧逻辑
主要功能:
- generate_certificate_no(): 生成合格证编号
- generate_unique_key(): 生成MD5唯一键(支持分层策略)
- generate_unique_key_readable(): 生成可读唯一键(调试用)
- generate_batch_unique_keys(): 批量生成唯一键
- get_all_unique_key_fields(): 获取所有唯一键字段(供去重配置使用)
- normalize_empty_values(): 空值规范化
- normalize_dataframe_empty_values(): DataFrame空值规范化
"""
_field_name_cn_cache: dict[str, str] | None = None
_new_data_fields_cache: list[str] | None = None
_old_data_fields_cache: list[str] | None = None
_empty_marker_prefix_cache: str | None = None
@classmethod
def _get_new_data_fields(cls) -> list[str]:
"""获取新数据模式字段(从配置读取,带缓存)"""
if cls._new_data_fields_cache is not None:
return cls._new_data_fields_cache
try:
from certflow.config.settings import _cfg
cls._new_data_fields_cache = _cfg(
"id_generator.new_data_fields", ["sales_order_no", "production_order_no"]
)
except Exception as e:
logger.warning(f"读取 new_data_fields 配置失败: {e},使用默认值")
cls._new_data_fields_cache = ["sales_order_no", "production_order_no"]
return cls._new_data_fields_cache
@classmethod
def _get_old_data_fields(cls) -> list[str]:
"""获取老数据模式字段(从配置读取,带缓存)"""
if cls._old_data_fields_cache is not None:
return cls._old_data_fields_cache
try:
from certflow.config.settings import _cfg
cls._old_data_fields_cache = _cfg(
"id_generator.old_data_fields",
[
"plan_date",
"customer",
"project_name",
"product_name",
"product_model",
"product_spec",
"quantity",
],
)
except Exception as e:
logger.warning(f"读取 old_data_fields 配置失败: {e},使用默认值")
cls._old_data_fields_cache = [
"plan_date",
"customer",
"project_name",
"product_name",
"product_model",
"product_spec",
"quantity",
]
return cls._old_data_fields_cache
@classmethod
def _get_empty_marker_prefix(cls) -> str:
"""获取空值标记前缀(从配置读取,带缓存)
用于识别被 normalize 处理过的空值标记,如"【空白合同号】"。
当分层策略中 new_data_fields 的值为该前缀开头时,视为空值。
Returns:
str: 空值标记前缀,默认 "【空白"
"""
if cls._empty_marker_prefix_cache is not None:
return cls._empty_marker_prefix_cache
try:
from certflow.config.settings import _cfg
cls._empty_marker_prefix_cache = _cfg("id_generator.empty_marker_prefix", "【空白")
except Exception as e:
logger.warning(f"读取 empty_marker_prefix 配置失败: {e},使用默认值")
cls._empty_marker_prefix_cache = "【空白"
return cls._empty_marker_prefix_cache
@classmethod
def _get_field_name_cn(cls) -> dict[str, str]:
"""获取英文字段名到中文字段名的映射
从配置文件的 sales_plan.column_mapping.direct 反向构建映射关系。
用于生成空值标记时的中文显示。
Returns:
Dict[str, str]: 英文字段名到中文字段名的映射字典
Examples:
>>> mapping = IDGenerator._get_field_name_cn()
>>> print(mapping.get("contract_no"))
"合同号"
"""
if cls._field_name_cn_cache is not None:
return cls._field_name_cn_cache
try:
from certflow.config.settings import _cfg
# 从 direct 映射反向构建
direct_mapping = _cfg("sales_plan.column_mapping.direct", {})
# direct_mapping 格式: {"合同号": "contract_no", "计划日期": "plan_date", ...}
# 反向映射: 英文字段名 -> 中文字段名
reverse_mapping = {}
for cn_name, en_field in direct_mapping.items():
reverse_mapping[en_field] = cn_name
cls._field_name_cn_cache = reverse_mapping
logger.debug(f"加载字段中文名映射: {len(reverse_mapping)} 个字段")
except Exception as e:
logger.warning(f"加载字段中文名映射失败: {e},将使用英文字段名")
cls._field_name_cn_cache = {}
return cls._field_name_cn_cache
@classmethod
def _get_empty_marker(cls, field_name: str) -> str:
"""获取空值标记
生成统一格式的空值标记,用于在数据中标识缺失字段。
格式为 "【空白中文字段名】"。
Args:
field_name: 英文字段名
Returns:
str: 空值标记字符串,如"【空白合同号】"
Examples:
>>> marker = IDGenerator._get_empty_marker("contract_no")
>>> print(marker)
"【空白合同号】"
"""
cn_map = cls._get_field_name_cn()
cn_name = cn_map.get(field_name, field_name)
return f"【空白{cn_name}】"
[文档]
@staticmethod
def generate_certificate_no(prefix: str = "CERT", sequence: int | None = None) -> str:
"""生成合格证编号
生成格式为 CERT-YYYYMMDD-NNNN 的合格证编号,
序号部分自动补零至4位。
Args:
prefix: 编号前缀,默认为"CERT"
sequence: 序号,如果为None则默认为1
Returns:
str: 生成的合格证编号字符串
Examples:
>>> # 生成默认格式的编号
>>> cert_no = IDGenerator.generate_certificate_no()
>>> print(cert_no) # 输出: CERT-20231201-0001
>>>
>>> # 生成自定义前缀和序号
>>> cert_no = IDGenerator.generate_certificate_no(prefix="QT", sequence=5)
>>> print(cert_no) # 输出: QT-20231201-0005
"""
date_str = datetime.now().strftime("%Y%m%d")
if sequence is None:
sequence = 1
return f"{prefix}-{date_str}-{sequence:04d}"
[文档]
@classmethod
def generate_unique_key(cls, data: dict[str, Any], keys: list[str] | None = None) -> str:
"""生成唯一键(分层策略)
Args:
data: 数据字典,包含需要生成唯一键的字段
keys: 指定字段列表。有值时走旧逻辑(向后兼容),
None 时走分层策略
Returns:
str: 16位MD5哈希值
"""
# 指定 keys:走旧逻辑
if keys:
filtered_data = {k: data.get(k, "") for k in keys}
exclude_keys = ["_original_order", "_sort_group"]
filtered_data = {k: v for k, v in filtered_data.items() if k not in exclude_keys}
processed_data = {}
for k, v in filtered_data.items():
if v is None or v == "":
processed_data[k] = cls._get_empty_marker(k)
else:
processed_data[k] = str(v).strip()
key_str = json.dumps(processed_data, sort_keys=True, ensure_ascii=False)
return hashlib.md5(key_str.encode("utf-8")).hexdigest()[:16]
# 分层策略
new_fields = cls._get_new_data_fields()
old_fields = cls._get_old_data_fields()
new_values = []
for f in new_fields:
v = str(data.get(f, "")).strip()
if v.startswith(cls._get_empty_marker_prefix()):
v = ""
new_values.append(v)
if all(new_values):
key_str = "|".join(new_values)
else:
parts = []
for field in old_fields:
v = str(data.get(field, "")).strip()
parts.append(v if v else cls._get_empty_marker(field))
key_str = "|".join(parts)
return hashlib.md5(key_str.encode("utf-8")).hexdigest()[:16]
[文档]
@classmethod
def generate_unique_key_readable(
cls, data: dict[str, Any], keys: list[str] | None = None
) -> str:
"""生成可读唯一键(分层策略,调试用)
Args:
data: 数据字典
keys: 指定字段列表。有值时走旧逻辑,None 时走分层策略
Returns:
str: 可读的唯一键字符串,字段值之间用"::"分隔
"""
if keys:
filtered_data = {k: data.get(k, "") for k in keys}
exclude_keys = ["_original_order", "_sort_group"]
filtered_data = {k: v for k, v in filtered_data.items() if k not in exclude_keys}
key_parts = []
for k in keys:
v = filtered_data.get(k, "")
if v is None or v == "":
key_parts.append(cls._get_empty_marker(k))
else:
key_parts.append(str(v).strip())
return "::".join(key_parts)
new_fields = cls._get_new_data_fields()
old_fields = cls._get_old_data_fields()
new_values = []
for f in new_fields:
v = str(data.get(f, "")).strip()
if v.startswith(cls._get_empty_marker_prefix()):
v = ""
new_values.append(v)
if all(new_values):
return "::".join(new_values)
parts = []
for field in old_fields:
v = str(data.get(field, "")).strip()
parts.append(v if v else cls._get_empty_marker(field))
return "::".join(parts)
[文档]
@staticmethod
def generate_batch_unique_keys(
data_list: list[dict[str, Any]], keys: list[str] | None = None
) -> list[dict[str, Any]]:
"""批量生成唯一键
为数据列表中的每条记录生成唯一键,并将键值写入"unique_key"字段。
Args:
data_list: 数据字典列表
keys: 用于生成唯一键的字段列表,如果为None则使用所有字段
Returns:
List[Dict[str, Any]]: 添加了unique_key字段的数据列表
Examples:
>>> data_list = [
... {"contract_no": "PO-001", "product_model": "阀门A"},
... {"contract_no": "PO-002", "product_model": "阀门B"}
... ]
>>> result = IDGenerator.generate_batch_unique_keys(data_list, ["contract_no"])
>>> for item in result:
... print(item["unique_key"])
"""
result = []
for data in data_list:
unique_key = IDGenerator.generate_unique_key(data, keys)
data["unique_key"] = unique_key
result.append(data)
return result
[文档]
@classmethod
def normalize_empty_values(
cls, data: dict[str, Any], keys: list[str] | None = None
) -> dict[str, Any]:
"""规范化字典中的空值
将 None 或空字符串转换为 【空白中文字段名】 格式。
用于统一空值表示,便于后续处理和识别。
Args:
data: 原始数据字典
keys: 需要处理的字段列表,如果为None则处理所有字段
Returns:
Dict[str, Any]: 空值规范化后的字典
Examples:
>>> data = {"contract_no": "", "product_model": "阀门A", "quantity": None}
>>> normalized = IDGenerator.normalize_empty_values(data, ["contract_no", "quantity"])
>>> print(normalized["contract_no"]) # 输出: "【空白合同号】"
>>> print(normalized["quantity"]) # 输出: "【空白数量】"
"""
if keys is None:
keys = list(data.keys())
normalized = {}
for k in keys:
v = data.get(k, "")
if v is None or v == "":
normalized[k] = cls._get_empty_marker(k)
else:
normalized[k] = str(v).strip()
return normalized
[文档]
@classmethod
def normalize_dataframe_empty_values(cls, df: pd.DataFrame, fields: list[str]) -> pd.DataFrame:
"""规范化DataFrame中指定字段的空值
将DataFrame中指定字段的空值(NaN、None、空字符串)统一替换为格式化的空值标记。
适用于批量数据处理场景。
Args:
df: 需要处理的DataFrame
fields: 需要处理的字段列表
Returns:
pd.DataFrame: 空值规范化后的DataFrame副本
Examples:
>>> import pandas as pd
>>> df = pd.DataFrame({
... "contract_no": ["PO-001", "", None],
... "product_model": ["阀门A", "阀门B", "阀门C"]
... })
>>> cleaned = IDGenerator.normalize_dataframe_empty_values(df, ["contract_no"])
>>> print(cleaned["contract_no"][1]) # 输出: "【空白合同号】"
"""
df_clean = df.copy()
for field in fields:
if field not in df_clean.columns:
continue
# 获取该字段的空值标记
empty_marker = cls._get_empty_marker(field)
# 替换空值
df_clean[field] = df_clean[field].fillna(empty_marker)
df_clean[field] = df_clean[field].replace(["", "nan", "None"], empty_marker)
# 确保转换为字符串
df_clean[field] = df_clean[field].astype(str).str.strip()
return df_clean
[文档]
@classmethod
def get_all_unique_key_fields(cls) -> list[str]:
"""获取所有唯一键字段(去重合并,供 import_deduplication 使用)
Returns:
list[str]: 去重后的唯一键字段列表
"""
new_fields = cls._get_new_data_fields()
old_fields = cls._get_old_data_fields()
seen = set()
result = []
for f in new_fields + old_fields:
if f not in seen:
seen.add(f)
result.append(f)
return result