566 lines
18 KiB
Python
566 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import re
|
||
from collections import defaultdict
|
||
from dataclasses import dataclass
|
||
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
|
||
|
||
import pymysql
|
||
from openpyxl import Workbook, load_workbook
|
||
from openpyxl.styles import Font
|
||
|
||
from config import DB_CONFIG
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class LawyerRecord:
|
||
id: int
|
||
name: str
|
||
phone: str
|
||
law_firm: str
|
||
province: str
|
||
city: str
|
||
domain: str
|
||
create_time: int
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class PhoneBackfill:
|
||
matched_phones: List[str]
|
||
records: List[LawyerRecord]
|
||
best_name: str
|
||
best_law_firm: str
|
||
best_domain: str
|
||
candidate_names: List[str]
|
||
candidate_firms: List[str]
|
||
candidate_domains: List[str]
|
||
|
||
|
||
DOMAIN_PRIORITY = {
|
||
"华律": 90,
|
||
"大律师": 85,
|
||
"找法网": 82,
|
||
"法律快车": 80,
|
||
"律图": 72,
|
||
"众法利单页": 68,
|
||
"众法利": 66,
|
||
"六四三六五": 64,
|
||
"智飞律师在线": 40,
|
||
"高德地图": 10,
|
||
}
|
||
|
||
GENERIC_FIRMS = {"高德搜索"}
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(description="按律所名从数据库补手机号并导出对比表")
|
||
parser.add_argument("--input", default="man.xlsx", help="原始 xlsx 文件路径")
|
||
parser.add_argument(
|
||
"--output",
|
||
default="man_firm_phone_compare.xlsx",
|
||
help="输出 xlsx 文件路径",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def normalize_text(value: object) -> str:
|
||
text = str(value or "").strip()
|
||
text = text.replace("(", "(").replace(")", ")")
|
||
text = re.sub(r"\s+", "", text)
|
||
return text
|
||
|
||
|
||
def normalize_firm(value: object) -> str:
|
||
text = normalize_text(value)
|
||
text = text.replace("本地大所", "").replace("特色律所", "")
|
||
return text
|
||
|
||
|
||
def normalize_name(value: object) -> str:
|
||
text = normalize_text(value)
|
||
return text.replace("律师", "")
|
||
|
||
|
||
def normalize_province(value: object) -> str:
|
||
text = str(value or "").strip()
|
||
mapping = {
|
||
"北京市": "北京",
|
||
"天津市": "天津",
|
||
"上海市": "上海",
|
||
"重庆市": "重庆",
|
||
"内蒙古自治区": "内蒙古",
|
||
"广西壮族自治区": "广西",
|
||
"宁夏回族自治区": "宁夏",
|
||
"新疆维吾尔自治区": "新疆",
|
||
"西藏自治区": "西藏",
|
||
"香港特别行政区": "香港",
|
||
"澳门特别行政区": "澳门",
|
||
"新疆生产建设兵团": "新疆",
|
||
}
|
||
if text in mapping:
|
||
return mapping[text]
|
||
if text.endswith("省") and len(text) > 1:
|
||
return text[:-1]
|
||
return text
|
||
|
||
|
||
def normalize_city(value: object) -> str:
|
||
text = str(value or "").strip()
|
||
for suffix in ("市", "地区", "盟"):
|
||
if text.endswith(suffix) and len(text) > len(suffix):
|
||
return text[: -len(suffix)]
|
||
return text
|
||
|
||
|
||
def split_phones(value: object) -> List[str]:
|
||
return re.findall(r"1\d{10}", str(value or ""))
|
||
|
||
|
||
def unique_phones(records: Sequence[LawyerRecord]) -> List[str]:
|
||
output: List[str] = []
|
||
seen = set()
|
||
for record in sorted(records, key=lambda item: (item.create_time, item.id), reverse=True):
|
||
if record.phone and record.phone not in seen:
|
||
seen.add(record.phone)
|
||
output.append(record.phone)
|
||
return output
|
||
|
||
|
||
def unique_values(records: Sequence[LawyerRecord], attr: str) -> List[str]:
|
||
output: List[str] = []
|
||
seen = set()
|
||
for record in sorted(records, key=lambda item: (item.create_time, item.id), reverse=True):
|
||
value = getattr(record, attr, "")
|
||
if value and value not in seen:
|
||
seen.add(value)
|
||
output.append(value)
|
||
return output
|
||
|
||
|
||
def phone_record_sort_key(
|
||
record: LawyerRecord,
|
||
target_name: object,
|
||
target_province: object,
|
||
target_city: object,
|
||
) -> Tuple[int, int, int]:
|
||
score = 0
|
||
normalized_target_name = normalize_name(target_name)
|
||
normalized_target_province = normalize_province(target_province)
|
||
normalized_target_city = normalize_city(target_city)
|
||
|
||
if normalized_target_name:
|
||
if normalize_name(record.name) == normalized_target_name:
|
||
score += 400
|
||
elif record.name:
|
||
score -= 40
|
||
|
||
if record.law_firm and record.law_firm not in GENERIC_FIRMS:
|
||
score += 220
|
||
elif record.law_firm:
|
||
score += 40
|
||
|
||
if record.name:
|
||
score += 100
|
||
|
||
if normalized_target_city:
|
||
if normalize_city(record.city) == normalized_target_city:
|
||
score += 45
|
||
elif record.city:
|
||
score -= 10
|
||
|
||
if normalized_target_province:
|
||
if normalize_province(record.province) == normalized_target_province:
|
||
score += 25
|
||
elif record.province:
|
||
score -= 5
|
||
|
||
score += DOMAIN_PRIORITY.get(record.domain, 50)
|
||
return score, record.create_time, record.id
|
||
|
||
|
||
def compare_result(original_phones: Sequence[str], candidate_phones: Sequence[str]) -> str:
|
||
if not candidate_phones:
|
||
return "未匹配"
|
||
if not original_phones:
|
||
return "原手机号为空"
|
||
|
||
original_set = set(original_phones)
|
||
candidate_set = set(candidate_phones)
|
||
if original_set == candidate_set:
|
||
return "完全一致"
|
||
if original_set & candidate_set:
|
||
return "候选包含原手机号"
|
||
return "不包含原手机号"
|
||
|
||
|
||
def infer_firm_from_address(address: object, ordered_firms: Sequence[str]) -> str:
|
||
normalized_address = normalize_text(address)
|
||
if not normalized_address:
|
||
return ""
|
||
for firm in ordered_firms:
|
||
if len(firm) < 4:
|
||
continue
|
||
if firm in normalized_address:
|
||
return firm
|
||
return ""
|
||
|
||
|
||
def load_db_indexes() -> Tuple[Dict[str, List[LawyerRecord]], List[str], Dict[str, List[LawyerRecord]]]:
|
||
conn = pymysql.connect(**DB_CONFIG)
|
||
firm_index: Dict[str, List[LawyerRecord]] = defaultdict(list)
|
||
phone_index: Dict[str, List[LawyerRecord]] = defaultdict(list)
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id, name, phone, law_firm, province, city, domain, create_time
|
||
FROM lawyer
|
||
WHERE phone IS NOT NULL
|
||
AND phone <> ''
|
||
"""
|
||
)
|
||
for row in cur.fetchall():
|
||
record = LawyerRecord(
|
||
id=int(row[0]),
|
||
name=str(row[1] or "").strip(),
|
||
phone=str(row[2] or "").strip(),
|
||
law_firm=str(row[3] or "").strip(),
|
||
province=str(row[4] or "").strip(),
|
||
city=str(row[5] or "").strip(),
|
||
domain=str(row[6] or "").strip(),
|
||
create_time=int(row[7] or 0),
|
||
)
|
||
phone_index[record.phone].append(record)
|
||
normalized_firm = normalize_firm(record.law_firm)
|
||
if normalized_firm:
|
||
firm_index[normalized_firm].append(record)
|
||
finally:
|
||
conn.close()
|
||
|
||
ordered_firms = sorted(firm_index.keys(), key=len, reverse=True)
|
||
return firm_index, ordered_firms, phone_index
|
||
|
||
|
||
def build_phone_backfill(
|
||
original_phone: object,
|
||
name: object,
|
||
province: object,
|
||
city: object,
|
||
phone_index: Dict[str, List[LawyerRecord]],
|
||
) -> PhoneBackfill:
|
||
def pick_best_name(records: Sequence[LawyerRecord], target_name: object) -> str:
|
||
normalized_target_name = normalize_name(target_name)
|
||
if normalized_target_name:
|
||
for item in records:
|
||
if item.name and normalize_name(item.name) == normalized_target_name:
|
||
return item.name
|
||
for item in records:
|
||
if item.name:
|
||
return item.name
|
||
return ""
|
||
|
||
records: List[LawyerRecord] = []
|
||
seen_ids = set()
|
||
for phone in split_phones(original_phone):
|
||
for record in phone_index.get(phone, []):
|
||
if record.id in seen_ids:
|
||
continue
|
||
seen_ids.add(record.id)
|
||
records.append(record)
|
||
|
||
sorted_records = sorted(
|
||
records,
|
||
key=lambda item: phone_record_sort_key(item, name, province, city),
|
||
reverse=True,
|
||
)
|
||
candidate_names = unique_values(sorted_records, "name")
|
||
candidate_firms = unique_values(
|
||
[item for item in sorted_records if item.law_firm and item.law_firm not in GENERIC_FIRMS],
|
||
"law_firm",
|
||
)
|
||
if not candidate_firms:
|
||
candidate_firms = unique_values(
|
||
[item for item in sorted_records if item.law_firm],
|
||
"law_firm",
|
||
)
|
||
candidate_domains = unique_values(sorted_records, "domain")
|
||
matched_phones = unique_values(sorted_records, "phone")
|
||
|
||
best_name = pick_best_name(sorted_records, name)
|
||
best_law_firm = ""
|
||
best_domain = ""
|
||
preferred_name = normalize_name(name) or normalize_name(best_name)
|
||
|
||
for record in sorted_records:
|
||
if not record.law_firm or record.law_firm in GENERIC_FIRMS:
|
||
continue
|
||
if preferred_name and normalize_name(record.name) != preferred_name:
|
||
continue
|
||
best_law_firm = record.law_firm
|
||
best_domain = record.domain
|
||
break
|
||
|
||
if not best_law_firm:
|
||
for record in sorted_records:
|
||
if record.law_firm and record.law_firm not in GENERIC_FIRMS:
|
||
best_law_firm = record.law_firm
|
||
best_domain = record.domain
|
||
break
|
||
|
||
if not best_domain and sorted_records:
|
||
best_domain = sorted_records[0].domain
|
||
|
||
return PhoneBackfill(
|
||
matched_phones=matched_phones,
|
||
records=sorted_records,
|
||
best_name=best_name,
|
||
best_law_firm=best_law_firm,
|
||
best_domain=best_domain,
|
||
candidate_names=candidate_names,
|
||
candidate_firms=candidate_firms,
|
||
candidate_domains=candidate_domains,
|
||
)
|
||
|
||
|
||
def match_row(
|
||
name: object,
|
||
original_phone: object,
|
||
law_firm: object,
|
||
province: object,
|
||
city: object,
|
||
address: object,
|
||
phone_backfill: PhoneBackfill,
|
||
firm_index: Dict[str, List[LawyerRecord]],
|
||
ordered_firms: Sequence[str],
|
||
) -> Tuple[str, str, List[LawyerRecord]]:
|
||
def add_method(part: str, method_parts: List[str]) -> None:
|
||
if part and part not in method_parts:
|
||
method_parts.append(part)
|
||
|
||
matched_firm = normalize_firm(law_firm)
|
||
used_phone_backfill_firm = False
|
||
inferred_from_address = False
|
||
if not matched_firm:
|
||
matched_firm = normalize_firm(phone_backfill.best_law_firm)
|
||
used_phone_backfill_firm = bool(matched_firm)
|
||
if not matched_firm:
|
||
matched_firm = infer_firm_from_address(address, ordered_firms)
|
||
inferred_from_address = bool(matched_firm)
|
||
if not matched_firm:
|
||
return "", "无可用律所名", []
|
||
|
||
candidates = firm_index.get(matched_firm, [])
|
||
if not candidates:
|
||
return matched_firm, "数据库无此律所", []
|
||
|
||
method_parts = ["律所"]
|
||
chosen = list(candidates)
|
||
|
||
normalized_name = normalize_name(name)
|
||
if not normalized_name:
|
||
normalized_name = normalize_name(phone_backfill.best_name)
|
||
if normalized_name:
|
||
name_filtered = [item for item in chosen if normalize_name(item.name) == normalized_name]
|
||
if name_filtered:
|
||
chosen = name_filtered
|
||
add_method("姓名", method_parts)
|
||
|
||
if len(unique_phones(chosen)) != 1:
|
||
normalized_province = normalize_province(province)
|
||
normalized_city = normalize_city(city)
|
||
|
||
if normalized_province and normalized_city:
|
||
province_city_filtered = [
|
||
item
|
||
for item in chosen
|
||
if normalize_province(item.province) == normalized_province
|
||
and normalize_city(item.city) == normalized_city
|
||
]
|
||
if province_city_filtered:
|
||
chosen = province_city_filtered
|
||
add_method("省份", method_parts)
|
||
add_method("城市", method_parts)
|
||
|
||
if len(unique_phones(chosen)) != 1 and normalized_city:
|
||
city_filtered = [
|
||
item for item in chosen if normalize_city(item.city) == normalized_city
|
||
]
|
||
if city_filtered:
|
||
chosen = city_filtered
|
||
add_method("城市", method_parts)
|
||
|
||
if len(unique_phones(chosen)) != 1 and normalized_province:
|
||
province_filtered = [
|
||
item
|
||
for item in chosen
|
||
if normalize_province(item.province) == normalized_province
|
||
]
|
||
if province_filtered:
|
||
chosen = province_filtered
|
||
add_method("省份", method_parts)
|
||
|
||
method = "+".join(method_parts)
|
||
if used_phone_backfill_firm:
|
||
method = "手机号回填律所|" + method
|
||
elif inferred_from_address:
|
||
method = "地址推断律所|" + method
|
||
return matched_firm, method, chosen
|
||
|
||
|
||
def autosize_columns(ws) -> None:
|
||
for column_cells in ws.columns:
|
||
values = [str(cell.value or "") for cell in column_cells]
|
||
max_length = min(max((len(value) for value in values), default=0), 60)
|
||
column_letter = column_cells[0].column_letter
|
||
ws.column_dimensions[column_letter].width = max_length + 2
|
||
|
||
|
||
def iter_input_rows(ws) -> Iterable[Tuple[int, List[object]]]:
|
||
for row_idx in range(1, ws.max_row + 1):
|
||
yield row_idx, [ws.cell(row_idx, col_idx).value for col_idx in range(1, 8)]
|
||
|
||
|
||
def build_output(input_path: str, output_path: str) -> Dict[str, int]:
|
||
workbook = load_workbook(input_path)
|
||
source_ws = workbook.active
|
||
|
||
firm_index, ordered_firms, phone_index = load_db_indexes()
|
||
|
||
out_wb = Workbook()
|
||
out_ws = out_wb.active
|
||
out_ws.title = "firm_phone_compare"
|
||
headers = [
|
||
"原始行号",
|
||
"原姓名",
|
||
"原手机号",
|
||
"原律所",
|
||
"原省份",
|
||
"原城市",
|
||
"原地址",
|
||
"原备注",
|
||
"手机号命中记录数",
|
||
"手机号命中手机号",
|
||
"手机号补全姓名",
|
||
"手机号补全律所",
|
||
"手机号补全来源",
|
||
"手机号候选姓名",
|
||
"手机号候选律所",
|
||
"用于匹配的律所",
|
||
"匹配方式",
|
||
"数据库候选手机号",
|
||
"候选数量",
|
||
"原手机号对比",
|
||
"数据库候选姓名",
|
||
"数据库候选省市",
|
||
"数据库来源",
|
||
]
|
||
out_ws.append(headers)
|
||
for cell in out_ws[1]:
|
||
cell.font = Font(bold=True)
|
||
|
||
stats = defaultdict(int)
|
||
for row_idx, row in iter_input_rows(source_ws):
|
||
name, original_phone, law_firm, province, city, address, remark = row
|
||
needs_phone_completion = not normalize_firm(law_firm)
|
||
phone_backfill = build_phone_backfill(
|
||
original_phone=original_phone,
|
||
name=name,
|
||
province=province,
|
||
city=city,
|
||
phone_index=phone_index,
|
||
)
|
||
matched_firm, method, matched_records = match_row(
|
||
name=name,
|
||
original_phone=original_phone,
|
||
law_firm=law_firm,
|
||
province=province,
|
||
city=city,
|
||
address=address,
|
||
phone_backfill=phone_backfill,
|
||
firm_index=firm_index,
|
||
ordered_firms=ordered_firms,
|
||
)
|
||
candidate_phones = unique_phones(matched_records)
|
||
compare = compare_result(split_phones(original_phone), candidate_phones)
|
||
candidate_names = unique_values(matched_records, "name")
|
||
candidate_domains = unique_values(matched_records, "domain")
|
||
city_province_pairs = []
|
||
seen_pairs = set()
|
||
for record in matched_records:
|
||
pair = f"{record.province}-{record.city}".strip("-")
|
||
if pair and pair not in seen_pairs:
|
||
seen_pairs.add(pair)
|
||
city_province_pairs.append(pair)
|
||
|
||
out_ws.append(
|
||
[
|
||
row_idx,
|
||
name or "",
|
||
original_phone or "",
|
||
law_firm or "",
|
||
province or "",
|
||
city or "",
|
||
address or "",
|
||
remark or "",
|
||
len(phone_backfill.records) if needs_phone_completion else "",
|
||
" / ".join(phone_backfill.matched_phones) if needs_phone_completion else "",
|
||
phone_backfill.best_name if needs_phone_completion else "",
|
||
phone_backfill.best_law_firm if needs_phone_completion else "",
|
||
phone_backfill.best_domain if needs_phone_completion else "",
|
||
" / ".join(phone_backfill.candidate_names) if needs_phone_completion else "",
|
||
" / ".join(phone_backfill.candidate_firms) if needs_phone_completion else "",
|
||
matched_firm or "",
|
||
method or "",
|
||
" / ".join(candidate_phones) or "",
|
||
len(candidate_phones),
|
||
compare,
|
||
" / ".join(candidate_names) or "",
|
||
" / ".join(city_province_pairs) or "",
|
||
" / ".join(candidate_domains) or "",
|
||
]
|
||
)
|
||
|
||
if needs_phone_completion and phone_backfill.records:
|
||
stats["phone_backfill_hit_rows"] += 1
|
||
if needs_phone_completion and phone_backfill.best_name:
|
||
stats["phone_backfill_name_rows"] += 1
|
||
if needs_phone_completion and phone_backfill.best_law_firm:
|
||
stats["phone_backfill_firm_rows"] += 1
|
||
if needs_phone_completion and method.startswith("手机号回填律所|"):
|
||
stats["phone_backfill_used_for_match_rows"] += 1
|
||
|
||
if candidate_phones:
|
||
stats["matched_rows"] += 1
|
||
if len(candidate_phones) == 1:
|
||
stats["unique_rows"] += 1
|
||
else:
|
||
stats["multi_rows"] += 1
|
||
else:
|
||
stats["unmatched_rows"] += 1
|
||
|
||
if compare == "完全一致":
|
||
stats["same_rows"] += 1
|
||
elif compare == "候选包含原手机号":
|
||
stats["contains_rows"] += 1
|
||
elif compare == "不包含原手机号":
|
||
stats["diff_rows"] += 1
|
||
elif compare == "原手机号为空":
|
||
stats["blank_phone_rows"] += 1
|
||
|
||
out_ws.freeze_panes = "A2"
|
||
autosize_columns(out_ws)
|
||
out_wb.save(output_path)
|
||
return dict(stats)
|
||
|
||
|
||
def main() -> None:
|
||
args = parse_args()
|
||
stats = build_output(args.input, args.output)
|
||
print(f"已生成: {args.output}")
|
||
for key in sorted(stats):
|
||
print(f"{key}={stats[key]}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|