#!/usr/bin/env python3 import argparse import json import os import sys import time from datetime import datetime from typing import Dict, List, Optional import pymysql from openpyxl import Workbook from openpyxl.styles import Font current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) if project_root not in sys.path: sys.path.insert(0, project_root) from Db import Db def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="导出律师数据到 Excel") parser.add_argument( "--output", default="", help="输出 xlsx 文件路径,默认输出到 data/export_lawyers_时间戳.xlsx", ) parser.add_argument( "--start-ts", type=int, default=None, help="create_time 起始时间戳(含),不传时默认取最近7天", ) parser.add_argument( "--end-ts", type=int, default=None, help="create_time 结束时间戳(含),默认不限制上限", ) parser.add_argument( "--domain", default="", help="按 domain 过滤,例如:大律师 / 找法网 / 华律", ) parser.add_argument( "--province", default="", help="按省份过滤,例如:北京、广东", ) parser.add_argument( "--city", default="", help="按城市过滤,例如:北京、深圳", ) parser.add_argument( "--keyword", default="", help="关键词过滤(匹配姓名/律所/手机号)", ) parser.add_argument( "--limit", type=int, default=0, help="最多导出多少条,0 表示不限", ) parser.add_argument( "--include-extra", action="store_true", help="导出更多扩展字段(url/domain/create_time/site_time 等)", ) parser.add_argument( "--no-parse-params", action="store_true", help="关闭 params JSON 扩展信息解析(默认开启)", ) return parser.parse_args() def apply_default_time_filter(args: argparse.Namespace) -> None: # 未显式传时间范围时,默认导出最近7天的数据 if args.start_ts is None and args.end_ts is None: args.start_ts = int(time.time()) - 7 * 24 * 3600 args.end_ts = 0 return if args.start_ts is None: args.start_ts = 0 if args.end_ts is None: args.end_ts = 0 def build_output_path(user_output: str) -> str: if user_output: return os.path.abspath(user_output) ts = int(time.time()) return os.path.abspath(f"/www/wwwroot/lawyers/data/export_lawyers_{ts}.xlsx") def ts_to_text(ts_value: Optional[int]) -> str: if ts_value in (None, 0, ""): return "" try: return datetime.fromtimestamp(int(ts_value)).strftime("%Y-%m-%d %H:%M:%S") except Exception: return "" def build_query(args: argparse.Namespace) -> (str, List): where: List[str] = [] params: List = [] if args.start_ts > 0: where.append("create_time >= %s") params.append(args.start_ts) if args.end_ts > 0: where.append("create_time <= %s") params.append(args.end_ts) if args.domain.strip(): where.append("domain = %s") params.append(args.domain.strip()) if args.province.strip(): where.append("province = %s") params.append(args.province.strip()) if args.city.strip(): where.append("city = %s") params.append(args.city.strip()) if args.keyword.strip(): like = f"%{args.keyword.strip()}%" where.append("(name LIKE %s OR law_firm LIKE %s OR phone LIKE %s)") params.extend([like, like, like]) where_sql = f"WHERE {' AND '.join(where)}" if where else "" limit_sql = f"LIMIT {int(args.limit)}" if args.limit and args.limit > 0 else "" sql = ( "SELECT id, name, phone, law_firm, province, city, url, domain, " "create_time, site_time, params " f"FROM lawyer {where_sql} ORDER BY id ASC {limit_sql}" ) return sql, params def parse_params(params_text: str) -> Dict[str, str]: if not params_text: return {} try: data = json.loads(params_text) except Exception: return {} if not isinstance(data, dict): return {} profile = data.get("profile") or {} source = data.get("source") or {} if not isinstance(profile, dict): profile = {} if not isinstance(source, dict): source = {} specialties = profile.get("specialties") if isinstance(specialties, list): specialties_text = ",".join(str(x) for x in specialties if x) else: specialties_text = "" return { "email": str(profile.get("email") or ""), "address": str(profile.get("address") or ""), "license_no": str(profile.get("license_no") or ""), "practice_years": str(profile.get("practice_years") or ""), "specialties": specialties_text, "source_site": str(source.get("site") or ""), "detail_url": str(source.get("detail_url") or ""), "list_url": str(source.get("list_url") or ""), } def export_to_excel(rows: List[Dict], output_path: str, include_extra: bool, parse_params_flag: bool) -> int: wb = Workbook() ws = wb.active ws.title = "lawyers" headers = ["手机号", "姓名", "律所", "省份", "市区", "站点名称", "domain"] if include_extra: headers.extend( [ "URL", "站点", "create_time", "create_time_text", "site_time", "site_time_text", "ID", ] ) if parse_params_flag: headers.extend( [ "邮箱", "地址", "执业证号", "执业年限", "擅长领域", "source_site", "detail_url", "list_url", ] ) ws.append(headers) for cell in ws[1]: cell.font = Font(bold=True) exported = 0 for row in rows: info = parse_params(row.get("params", "") or "") if parse_params_flag else {} site_name = info.get("source_site") or (row.get("domain", "") or "") line = [ row.get("phone", "") or "", row.get("name", "") or "", row.get("law_firm", "") or "", row.get("province", "") or "", row.get("city", "") or "", site_name, row.get("domain", "") or "", ] if include_extra: line.extend( [ row.get("url", "") or "", row.get("domain", "") or "", row.get("create_time", "") or "", ts_to_text(row.get("create_time")), row.get("site_time", "") or "", ts_to_text(row.get("site_time")), row.get("id", "") or "", ] ) if parse_params_flag: line.extend( [ info.get("email", ""), info.get("address", ""), info.get("license_no", ""), info.get("practice_years", ""), info.get("specialties", ""), info.get("source_site", ""), info.get("detail_url", ""), info.get("list_url", ""), ] ) ws.append(line) exported += 1 os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) wb.save(output_path) return exported def main() -> None: args = parse_args() apply_default_time_filter(args) output_path = build_output_path(args.output) sql, sql_params = build_query(args) with Db() as db: cursor = db.db.cursor(pymysql.cursors.DictCursor) try: cursor.execute(sql, sql_params) rows = cursor.fetchall() finally: cursor.close() count = export_to_excel( rows=rows, output_path=output_path, include_extra=args.include_extra, parse_params_flag=not args.no_parse_params, ) print(f"[export] 导出完成,共 {count} 条") print(f"[export] 文件路径: {output_path}") print( f"[export] 时间筛选 create_time: start={args.start_ts or '-'} end={args.end_ts or '-'}" ) if __name__ == "__main__": main()