#!/usr/bin/env python3 import argparse import re from collections import defaultdict from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Sequence, Tuple import pymysql from openpyxl import Workbook, load_workbook from openpyxl.styles import Font from config import DB_CONFIG @dataclass(frozen=True) class LawyerRecord: id: int name: str phone: str law_firm: str province: str city: str domain: str create_time: int @dataclass(frozen=True) class PhoneBackfill: matched_phones: List[str] records: List[LawyerRecord] best_name: str best_law_firm: str best_domain: str candidate_names: List[str] candidate_firms: List[str] candidate_domains: List[str] DOMAIN_PRIORITY = { "华律": 90, "大律师": 85, "找法网": 82, "法律快车": 80, "律图": 72, "众法利单页": 68, "众法利": 66, "六四三六五": 64, "智飞律师在线": 40, "高德地图": 10, } GENERIC_FIRMS = {"高德搜索"} def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="按律所名从数据库补手机号并导出对比表") parser.add_argument("--input", default="man.xlsx", help="原始 xlsx 文件路径") parser.add_argument( "--output", default="man_firm_phone_compare.xlsx", help="输出 xlsx 文件路径", ) return parser.parse_args() def normalize_text(value: object) -> str: text = str(value or "").strip() text = text.replace("(", "(").replace(")", ")") text = re.sub(r"\s+", "", text) return text def normalize_firm(value: object) -> str: text = normalize_text(value) text = text.replace("本地大所", "").replace("特色律所", "") return text def normalize_name(value: object) -> str: text = normalize_text(value) return text.replace("律师", "") def normalize_province(value: object) -> str: text = str(value or "").strip() mapping = { "北京市": "北京", "天津市": "天津", "上海市": "上海", "重庆市": "重庆", "内蒙古自治区": "内蒙古", "广西壮族自治区": "广西", "宁夏回族自治区": "宁夏", "新疆维吾尔自治区": "新疆", "西藏自治区": "西藏", "香港特别行政区": "香港", "澳门特别行政区": "澳门", "新疆生产建设兵团": "新疆", } if text in mapping: return mapping[text] if text.endswith("省") and len(text) > 1: return text[:-1] return text def normalize_city(value: object) -> str: text = str(value or "").strip() for suffix in ("市", "地区", "盟"): if text.endswith(suffix) and len(text) > len(suffix): return text[: -len(suffix)] return text def split_phones(value: object) -> List[str]: return re.findall(r"1\d{10}", str(value or "")) def unique_phones(records: Sequence[LawyerRecord]) -> List[str]: output: List[str] = [] seen = set() for record in sorted(records, key=lambda item: (item.create_time, item.id), reverse=True): if record.phone and record.phone not in seen: seen.add(record.phone) output.append(record.phone) return output def unique_values(records: Sequence[LawyerRecord], attr: str) -> List[str]: output: List[str] = [] seen = set() for record in sorted(records, key=lambda item: (item.create_time, item.id), reverse=True): value = getattr(record, attr, "") if value and value not in seen: seen.add(value) output.append(value) return output def phone_record_sort_key( record: LawyerRecord, target_name: object, target_province: object, target_city: object, ) -> Tuple[int, int, int]: score = 0 normalized_target_name = normalize_name(target_name) normalized_target_province = normalize_province(target_province) normalized_target_city = normalize_city(target_city) if normalized_target_name: if normalize_name(record.name) == normalized_target_name: score += 400 elif record.name: score -= 40 if record.law_firm and record.law_firm not in GENERIC_FIRMS: score += 220 elif record.law_firm: score += 40 if record.name: score += 100 if normalized_target_city: if normalize_city(record.city) == normalized_target_city: score += 45 elif record.city: score -= 10 if normalized_target_province: if normalize_province(record.province) == normalized_target_province: score += 25 elif record.province: score -= 5 score += DOMAIN_PRIORITY.get(record.domain, 50) return score, record.create_time, record.id def compare_result(original_phones: Sequence[str], candidate_phones: Sequence[str]) -> str: if not candidate_phones: return "未匹配" if not original_phones: return "原手机号为空" original_set = set(original_phones) candidate_set = set(candidate_phones) if original_set == candidate_set: return "完全一致" if original_set & candidate_set: return "候选包含原手机号" return "不包含原手机号" def infer_firm_from_address(address: object, ordered_firms: Sequence[str]) -> str: normalized_address = normalize_text(address) if not normalized_address: return "" for firm in ordered_firms: if len(firm) < 4: continue if firm in normalized_address: return firm return "" def load_db_indexes() -> Tuple[Dict[str, List[LawyerRecord]], List[str], Dict[str, List[LawyerRecord]]]: conn = pymysql.connect(**DB_CONFIG) firm_index: Dict[str, List[LawyerRecord]] = defaultdict(list) phone_index: Dict[str, List[LawyerRecord]] = defaultdict(list) try: with conn.cursor() as cur: cur.execute( """ SELECT id, name, phone, law_firm, province, city, domain, create_time FROM lawyer WHERE phone IS NOT NULL AND phone <> '' """ ) for row in cur.fetchall(): record = LawyerRecord( id=int(row[0]), name=str(row[1] or "").strip(), phone=str(row[2] or "").strip(), law_firm=str(row[3] or "").strip(), province=str(row[4] or "").strip(), city=str(row[5] or "").strip(), domain=str(row[6] or "").strip(), create_time=int(row[7] or 0), ) phone_index[record.phone].append(record) normalized_firm = normalize_firm(record.law_firm) if normalized_firm: firm_index[normalized_firm].append(record) finally: conn.close() ordered_firms = sorted(firm_index.keys(), key=len, reverse=True) return firm_index, ordered_firms, phone_index def build_phone_backfill( original_phone: object, name: object, province: object, city: object, phone_index: Dict[str, List[LawyerRecord]], ) -> PhoneBackfill: def pick_best_name(records: Sequence[LawyerRecord], target_name: object) -> str: normalized_target_name = normalize_name(target_name) if normalized_target_name: for item in records: if item.name and normalize_name(item.name) == normalized_target_name: return item.name for item in records: if item.name: return item.name return "" records: List[LawyerRecord] = [] seen_ids = set() for phone in split_phones(original_phone): for record in phone_index.get(phone, []): if record.id in seen_ids: continue seen_ids.add(record.id) records.append(record) sorted_records = sorted( records, key=lambda item: phone_record_sort_key(item, name, province, city), reverse=True, ) candidate_names = unique_values(sorted_records, "name") candidate_firms = unique_values( [item for item in sorted_records if item.law_firm and item.law_firm not in GENERIC_FIRMS], "law_firm", ) if not candidate_firms: candidate_firms = unique_values( [item for item in sorted_records if item.law_firm], "law_firm", ) candidate_domains = unique_values(sorted_records, "domain") matched_phones = unique_values(sorted_records, "phone") best_name = pick_best_name(sorted_records, name) best_law_firm = "" best_domain = "" preferred_name = normalize_name(name) or normalize_name(best_name) for record in sorted_records: if not record.law_firm or record.law_firm in GENERIC_FIRMS: continue if preferred_name and normalize_name(record.name) != preferred_name: continue best_law_firm = record.law_firm best_domain = record.domain break if not best_law_firm: for record in sorted_records: if record.law_firm and record.law_firm not in GENERIC_FIRMS: best_law_firm = record.law_firm best_domain = record.domain break if not best_domain and sorted_records: best_domain = sorted_records[0].domain return PhoneBackfill( matched_phones=matched_phones, records=sorted_records, best_name=best_name, best_law_firm=best_law_firm, best_domain=best_domain, candidate_names=candidate_names, candidate_firms=candidate_firms, candidate_domains=candidate_domains, ) def match_row( name: object, original_phone: object, law_firm: object, province: object, city: object, address: object, phone_backfill: PhoneBackfill, firm_index: Dict[str, List[LawyerRecord]], ordered_firms: Sequence[str], ) -> Tuple[str, str, List[LawyerRecord]]: def add_method(part: str, method_parts: List[str]) -> None: if part and part not in method_parts: method_parts.append(part) matched_firm = normalize_firm(law_firm) used_phone_backfill_firm = False inferred_from_address = False if not matched_firm: matched_firm = normalize_firm(phone_backfill.best_law_firm) used_phone_backfill_firm = bool(matched_firm) if not matched_firm: matched_firm = infer_firm_from_address(address, ordered_firms) inferred_from_address = bool(matched_firm) if not matched_firm: return "", "无可用律所名", [] candidates = firm_index.get(matched_firm, []) if not candidates: return matched_firm, "数据库无此律所", [] method_parts = ["律所"] chosen = list(candidates) normalized_name = normalize_name(name) if not normalized_name: normalized_name = normalize_name(phone_backfill.best_name) if normalized_name: name_filtered = [item for item in chosen if normalize_name(item.name) == normalized_name] if name_filtered: chosen = name_filtered add_method("姓名", method_parts) if len(unique_phones(chosen)) != 1: normalized_province = normalize_province(province) normalized_city = normalize_city(city) if normalized_province and normalized_city: province_city_filtered = [ item for item in chosen if normalize_province(item.province) == normalized_province and normalize_city(item.city) == normalized_city ] if province_city_filtered: chosen = province_city_filtered add_method("省份", method_parts) add_method("城市", method_parts) if len(unique_phones(chosen)) != 1 and normalized_city: city_filtered = [ item for item in chosen if normalize_city(item.city) == normalized_city ] if city_filtered: chosen = city_filtered add_method("城市", method_parts) if len(unique_phones(chosen)) != 1 and normalized_province: province_filtered = [ item for item in chosen if normalize_province(item.province) == normalized_province ] if province_filtered: chosen = province_filtered add_method("省份", method_parts) method = "+".join(method_parts) if used_phone_backfill_firm: method = "手机号回填律所|" + method elif inferred_from_address: method = "地址推断律所|" + method return matched_firm, method, chosen def autosize_columns(ws) -> None: for column_cells in ws.columns: values = [str(cell.value or "") for cell in column_cells] max_length = min(max((len(value) for value in values), default=0), 60) column_letter = column_cells[0].column_letter ws.column_dimensions[column_letter].width = max_length + 2 def iter_input_rows(ws) -> Iterable[Tuple[int, List[object]]]: for row_idx in range(1, ws.max_row + 1): yield row_idx, [ws.cell(row_idx, col_idx).value for col_idx in range(1, 8)] def build_output(input_path: str, output_path: str) -> Dict[str, int]: workbook = load_workbook(input_path) source_ws = workbook.active firm_index, ordered_firms, phone_index = load_db_indexes() out_wb = Workbook() out_ws = out_wb.active out_ws.title = "firm_phone_compare" headers = [ "原始行号", "原姓名", "原手机号", "原律所", "原省份", "原城市", "原地址", "原备注", "手机号命中记录数", "手机号命中手机号", "手机号补全姓名", "手机号补全律所", "手机号补全来源", "手机号候选姓名", "手机号候选律所", "用于匹配的律所", "匹配方式", "数据库候选手机号", "候选数量", "原手机号对比", "数据库候选姓名", "数据库候选省市", "数据库来源", ] out_ws.append(headers) for cell in out_ws[1]: cell.font = Font(bold=True) stats = defaultdict(int) for row_idx, row in iter_input_rows(source_ws): name, original_phone, law_firm, province, city, address, remark = row needs_phone_completion = not normalize_firm(law_firm) phone_backfill = build_phone_backfill( original_phone=original_phone, name=name, province=province, city=city, phone_index=phone_index, ) matched_firm, method, matched_records = match_row( name=name, original_phone=original_phone, law_firm=law_firm, province=province, city=city, address=address, phone_backfill=phone_backfill, firm_index=firm_index, ordered_firms=ordered_firms, ) candidate_phones = unique_phones(matched_records) compare = compare_result(split_phones(original_phone), candidate_phones) candidate_names = unique_values(matched_records, "name") candidate_domains = unique_values(matched_records, "domain") city_province_pairs = [] seen_pairs = set() for record in matched_records: pair = f"{record.province}-{record.city}".strip("-") if pair and pair not in seen_pairs: seen_pairs.add(pair) city_province_pairs.append(pair) out_ws.append( [ row_idx, name or "", original_phone or "", law_firm or "", province or "", city or "", address or "", remark or "", len(phone_backfill.records) if needs_phone_completion else "", " / ".join(phone_backfill.matched_phones) if needs_phone_completion else "", phone_backfill.best_name if needs_phone_completion else "", phone_backfill.best_law_firm if needs_phone_completion else "", phone_backfill.best_domain if needs_phone_completion else "", " / ".join(phone_backfill.candidate_names) if needs_phone_completion else "", " / ".join(phone_backfill.candidate_firms) if needs_phone_completion else "", matched_firm or "", method or "", " / ".join(candidate_phones) or "", len(candidate_phones), compare, " / ".join(candidate_names) or "", " / ".join(city_province_pairs) or "", " / ".join(candidate_domains) or "", ] ) if needs_phone_completion and phone_backfill.records: stats["phone_backfill_hit_rows"] += 1 if needs_phone_completion and phone_backfill.best_name: stats["phone_backfill_name_rows"] += 1 if needs_phone_completion and phone_backfill.best_law_firm: stats["phone_backfill_firm_rows"] += 1 if needs_phone_completion and method.startswith("手机号回填律所|"): stats["phone_backfill_used_for_match_rows"] += 1 if candidate_phones: stats["matched_rows"] += 1 if len(candidate_phones) == 1: stats["unique_rows"] += 1 else: stats["multi_rows"] += 1 else: stats["unmatched_rows"] += 1 if compare == "完全一致": stats["same_rows"] += 1 elif compare == "候选包含原手机号": stats["contains_rows"] += 1 elif compare == "不包含原手机号": stats["diff_rows"] += 1 elif compare == "原手机号为空": stats["blank_phone_rows"] += 1 out_ws.freeze_panes = "A2" autosize_columns(out_ws) out_wb.save(output_path) return dict(stats) def main() -> None: args = parse_args() stats = build_output(args.input, args.output) print(f"已生成: {args.output}") for key in sorted(stats): print(f"{key}={stats[key]}") if __name__ == "__main__": main()