Files
lawyers/common_sites/export_lawyers_excel.py
2026-03-20 10:07:48 +08:00

377 lines
11 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import os
import sys
import time
from datetime import datetime
from typing import Dict, List, Optional
import pymysql
from openpyxl import Workbook
from openpyxl.styles import Font
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
if project_root not in sys.path:
sys.path.insert(0, project_root)
from Db import Db
DEFAULT_EXPORT_START_TS = 1772932103
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="导出律师数据到 Excel")
parser.add_argument(
"--output",
default="",
help="输出 xlsx 文件路径,默认输出到 data/export_lawyers_时间戳.xlsx",
)
parser.add_argument(
"--start-ts",
type=int,
default=None,
help=(
"create_time 起始时间戳(含),"
f"不传时默认取 {DEFAULT_EXPORT_START_TS} 之后的数据"
),
)
parser.add_argument(
"--end-ts",
type=int,
default=None,
help="create_time 结束时间戳(含),默认不限制上限",
)
parser.add_argument(
"--domain",
default="",
help="按 domain 过滤,例如:大律师 / 找法网 / 华律",
)
parser.add_argument(
"--exclude-domain",
default="",
help="排除指定 domain,例如:高德地图",
)
parser.add_argument(
"--province",
default="",
help="按省份过滤,例如:北京、广东",
)
parser.add_argument(
"--city",
default="",
help="按城市过滤,例如:北京、深圳",
)
parser.add_argument(
"--keyword",
default="",
help="关键词过滤(匹配姓名/律所/手机号)",
)
parser.add_argument(
"--limit",
type=int,
default=0,
help="最多导出多少条,0 表示不限",
)
parser.add_argument(
"--include-extra",
action="store_true",
help="导出更多扩展字段(url/domain/create_time/site_time 等)",
)
parser.add_argument(
"--no-parse-params",
action="store_true",
help="关闭 params JSON 扩展信息解析(默认开启)",
)
parser.add_argument(
"--douyin-only",
action="store_true",
help="仅导出抖音采集数据(domain=抖音),并追加抖音专用字段",
)
return parser.parse_args()
def apply_default_time_filter(args: argparse.Namespace) -> None:
# 未显式传时间范围时,默认导出指定时间戳之后的数据
if args.start_ts is None and args.end_ts is None:
args.start_ts = DEFAULT_EXPORT_START_TS
args.end_ts = 0
return
if args.start_ts is None:
args.start_ts = 0
if args.end_ts is None:
args.end_ts = 0
def build_output_path(user_output: str) -> str:
if user_output:
return os.path.abspath(user_output)
ts = int(time.time())
return os.path.abspath(f"/www/wwwroot/lawyers/data/export_lawyers_{ts}.xlsx")
def ts_to_text(ts_value: Optional[int]) -> str:
if ts_value in (None, 0, ""):
return ""
try:
return datetime.fromtimestamp(int(ts_value)).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ""
def build_query(args: argparse.Namespace) -> (str, List):
where: List[str] = []
params: List = []
if args.douyin_only:
target_domain = args.domain.strip() or "抖音"
where.append("domain = %s")
params.append(target_domain)
if args.start_ts > 0:
where.append("create_time >= %s")
params.append(args.start_ts)
if args.end_ts > 0:
where.append("create_time <= %s")
params.append(args.end_ts)
if args.domain.strip() and not args.douyin_only:
where.append("domain = %s")
params.append(args.domain.strip())
if args.exclude_domain.strip():
where.append("domain <> %s")
params.append(args.exclude_domain.strip())
if args.province.strip():
where.append("province = %s")
params.append(args.province.strip())
if args.city.strip():
where.append("city = %s")
params.append(args.city.strip())
if args.keyword.strip():
like = f"%{args.keyword.strip()}%"
where.append("(name LIKE %s OR law_firm LIKE %s OR phone LIKE %s)")
params.extend([like, like, like])
where_sql = f"WHERE {' AND '.join(where)}" if where else ""
limit_sql = f"LIMIT {int(args.limit)}" if args.limit and args.limit > 0 else ""
sql = (
"SELECT id, name, phone, law_firm, province, city, url, domain, "
"create_time, site_time, params "
f"FROM lawyer {where_sql} ORDER BY id ASC {limit_sql}"
)
return sql, params
def parse_params(params_text: str) -> Dict[str, str]:
if not params_text:
return {}
try:
data = json.loads(params_text)
except Exception:
return {}
if not isinstance(data, dict):
return {}
profile = data.get("profile") or {}
source = data.get("source") or {}
if not isinstance(profile, dict):
profile = {}
if not isinstance(source, dict):
source = {}
specialties = profile.get("specialties")
if isinstance(specialties, list):
specialties_text = ",".join(str(x) for x in specialties if x)
else:
specialties_text = ""
user_info = data.get("user_info") or {}
if not isinstance(user_info, dict):
user_info = {}
sec_uid = str(data.get("sec_uid") or "")
douyin_url = f"https://www.douyin.com/user/{sec_uid}" if sec_uid else ""
return {
"email": str(profile.get("email") or ""),
"address": str(profile.get("address") or ""),
"license_no": str(profile.get("license_no") or ""),
"practice_years": str(profile.get("practice_years") or ""),
"specialties": specialties_text,
"source_site": str(source.get("site") or ""),
"detail_url": str(source.get("detail_url") or ""),
"list_url": str(source.get("list_url") or ""),
"api_source": str(data.get("api_source") or ""),
"api_url": str(data.get("api_url") or ""),
"city_index": str(data.get("city_index") or ""),
"captured_at": str(data.get("captured_at") or ""),
"sec_uid": sec_uid,
"douyin_uid": str(user_info.get("uid") or ""),
"douyin_unique_id": str(user_info.get("unique_id") or ""),
"douyin_signature": str(user_info.get("signature") or ""),
"douyin_nickname": str(user_info.get("nickname") or ""),
"douyin_url": douyin_url,
}
def export_to_excel(
rows: List[Dict],
output_path: str,
include_extra: bool,
parse_params_flag: bool,
douyin_only: bool,
) -> int:
wb = Workbook()
ws = wb.active
ws.title = "lawyers"
headers = ["手机号", "姓名", "律所", "省份", "市区", "站点名称", "domain", "URL"]
if include_extra:
headers.extend(
[
"站点",
"create_time",
"create_time_text",
"site_time",
"site_time_text",
"ID",
]
)
if parse_params_flag:
headers.extend(
[
"邮箱",
"地址",
"执业证号",
"执业年限",
"擅长领域",
"source_site",
"detail_url",
"list_url",
]
)
if parse_params_flag and douyin_only:
headers.extend(
[
"sec_uid",
"抖音uid",
"抖音号",
"抖音昵称",
"抖音简介",
"抖音主页URL",
"api_source",
"api_url",
"city_index",
"captured_at",
"captured_at_text",
]
)
ws.append(headers)
for cell in ws[1]:
cell.font = Font(bold=True)
exported = 0
for row in rows:
info = parse_params(row.get("params", "") or "") if parse_params_flag else {}
site_name = info.get("source_site") or (row.get("domain", "") or "")
line = [
row.get("phone", "") or "",
row.get("name", "") or "",
row.get("law_firm", "") or "",
row.get("province", "") or "",
row.get("city", "") or "",
site_name,
row.get("domain", "") or "",
row.get("url", "") or "",
]
if include_extra:
line.extend(
[
row.get("domain", "") or "",
row.get("create_time", "") or "",
ts_to_text(row.get("create_time")),
row.get("site_time", "") or "",
ts_to_text(row.get("site_time")),
row.get("id", "") or "",
]
)
if parse_params_flag:
line.extend(
[
info.get("email", ""),
info.get("address", ""),
info.get("license_no", ""),
info.get("practice_years", ""),
info.get("specialties", ""),
info.get("source_site", ""),
info.get("detail_url", ""),
info.get("list_url", ""),
]
)
if parse_params_flag and douyin_only:
captured_at_text = ""
try:
captured_at_text = ts_to_text(int(info.get("captured_at", "") or 0))
except Exception:
captured_at_text = ""
line.extend(
[
info.get("sec_uid", ""),
info.get("douyin_uid", ""),
info.get("douyin_unique_id", ""),
info.get("douyin_nickname", ""),
info.get("douyin_signature", ""),
info.get("douyin_url", ""),
info.get("api_source", ""),
info.get("api_url", ""),
info.get("city_index", ""),
info.get("captured_at", ""),
captured_at_text,
]
)
ws.append(line)
exported += 1
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
wb.save(output_path)
return exported
def main() -> None:
args = parse_args()
apply_default_time_filter(args)
output_path = build_output_path(args.output)
sql, sql_params = build_query(args)
with Db() as db:
cursor = db.db.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(sql, sql_params)
rows = cursor.fetchall()
finally:
cursor.close()
count = export_to_excel(
rows=rows,
output_path=output_path,
include_extra=args.include_extra,
parse_params_flag=not args.no_parse_params,
douyin_only=args.douyin_only,
)
print(f"[export] 导出完成,共 {count}")
print(f"[export] 文件路径: {output_path}")
print(
f"[export] 时间筛选 create_time: start={args.start_ts or '-'} end={args.end_ts or '-'}"
)
if __name__ == "__main__":
main()