From c2b77975c1cc7ff88cb3210a5faf5076801bdb5e Mon Sep 17 00:00:00 2001 From: hello-dd-code Date: Mon, 9 Mar 2026 21:26:50 +0800 Subject: [PATCH] feat: add douyin data export functionality to lawyer export script - Introduced a new command-line argument `--douyin-only` to export data specifically for Douyin, including additional fields such as sec_uid, douyin_uid, and user information. - Updated the README to include instructions for exporting Douyin data. - Enhanced the export logic to accommodate new fields when exporting Douyin-specific data. --- README.md | 4 + common_sites/export_lawyers_excel.py | 77 +++++++++- common_sites/hualv.py | 206 +++++++++++++++++++++++++-- 3 files changed, 273 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index f04cfff..ea020f0 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,10 @@ DLS_DIRECT=1 DLS_NO_DB=1 ./common_sites/start.sh # 如果不需要解析 params 扩展信息 ./.venv/bin/python ./common_sites/export_lawyers_excel.py --no-parse-params + +# 导出抖音采集数据(domain=抖音),并附带 sec_uid/抖音号/简介/API来源等字段 +./.venv/bin/python ./common_sites/export_lawyers_excel.py \ + --douyin-only --start-ts 0 --output ./data/douyin_lawyers_export.xlsx ``` ## 一次性站点(众法利) diff --git a/common_sites/export_lawyers_excel.py b/common_sites/export_lawyers_excel.py index 739734c..898996a 100644 --- a/common_sites/export_lawyers_excel.py +++ b/common_sites/export_lawyers_excel.py @@ -74,6 +74,11 @@ def parse_args() -> argparse.Namespace: action="store_true", help="关闭 params JSON 扩展信息解析(默认开启)", ) + parser.add_argument( + "--douyin-only", + action="store_true", + help="仅导出抖音采集数据(domain=抖音),并追加抖音专用字段", + ) return parser.parse_args() @@ -109,13 +114,18 @@ def build_query(args: argparse.Namespace) -> (str, List): where: List[str] = [] params: List = [] + if args.douyin_only: + target_domain = args.domain.strip() or "抖音" + where.append("domain = %s") + params.append(target_domain) + if args.start_ts > 0: where.append("create_time >= %s") params.append(args.start_ts) if args.end_ts > 0: where.append("create_time <= %s") params.append(args.end_ts) - if args.domain.strip(): + if args.domain.strip() and not args.douyin_only: where.append("domain = %s") params.append(args.domain.strip()) if args.province.strip(): @@ -161,6 +171,13 @@ def parse_params(params_text: str) -> Dict[str, str]: else: specialties_text = "" + user_info = data.get("user_info") or {} + if not isinstance(user_info, dict): + user_info = {} + + sec_uid = str(data.get("sec_uid") or "") + douyin_url = f"https://www.douyin.com/user/{sec_uid}" if sec_uid else "" + return { "email": str(profile.get("email") or ""), "address": str(profile.get("address") or ""), @@ -170,10 +187,26 @@ def parse_params(params_text: str) -> Dict[str, str]: "source_site": str(source.get("site") or ""), "detail_url": str(source.get("detail_url") or ""), "list_url": str(source.get("list_url") or ""), + "api_source": str(data.get("api_source") or ""), + "api_url": str(data.get("api_url") or ""), + "city_index": str(data.get("city_index") or ""), + "captured_at": str(data.get("captured_at") or ""), + "sec_uid": sec_uid, + "douyin_uid": str(user_info.get("uid") or ""), + "douyin_unique_id": str(user_info.get("unique_id") or ""), + "douyin_signature": str(user_info.get("signature") or ""), + "douyin_nickname": str(user_info.get("nickname") or ""), + "douyin_url": douyin_url, } -def export_to_excel(rows: List[Dict], output_path: str, include_extra: bool, parse_params_flag: bool) -> int: +def export_to_excel( + rows: List[Dict], + output_path: str, + include_extra: bool, + parse_params_flag: bool, + douyin_only: bool, +) -> int: wb = Workbook() ws = wb.active ws.title = "lawyers" @@ -204,6 +237,22 @@ def export_to_excel(rows: List[Dict], output_path: str, include_extra: bool, par "list_url", ] ) + if parse_params_flag and douyin_only: + headers.extend( + [ + "sec_uid", + "抖音uid", + "抖音号", + "抖音昵称", + "抖音简介", + "抖音主页URL", + "api_source", + "api_url", + "city_index", + "captured_at", + "captured_at_text", + ] + ) ws.append(headers) for cell in ws[1]: @@ -250,6 +299,29 @@ def export_to_excel(rows: List[Dict], output_path: str, include_extra: bool, par ] ) + if parse_params_flag and douyin_only: + captured_at_text = "" + try: + captured_at_text = ts_to_text(int(info.get("captured_at", "") or 0)) + except Exception: + captured_at_text = "" + + line.extend( + [ + info.get("sec_uid", ""), + info.get("douyin_uid", ""), + info.get("douyin_unique_id", ""), + info.get("douyin_nickname", ""), + info.get("douyin_signature", ""), + info.get("douyin_url", ""), + info.get("api_source", ""), + info.get("api_url", ""), + info.get("city_index", ""), + info.get("captured_at", ""), + captured_at_text, + ] + ) + ws.append(line) exported += 1 @@ -277,6 +349,7 @@ def main() -> None: output_path=output_path, include_extra=args.include_extra, parse_params_flag=not args.no_parse_params, + douyin_only=args.douyin_only, ) print(f"[export] 导出完成,共 {count} 条") diff --git a/common_sites/hualv.py b/common_sites/hualv.py index f6eb9ad..56c7b78 100644 --- a/common_sites/hualv.py +++ b/common_sites/hualv.py @@ -3,6 +3,7 @@ import ast import hashlib import json import os +import pymysql import random import re import sys @@ -34,8 +35,8 @@ SITE_BASE = "https://m.66law.cn" CITY_DATA_URL = "https://cache.66law.cn/dist/main-v2.0.js" LIST_API_URL = "https://m.66law.cn/findlawyer/rpc/loadlawyerlist/" -PHONE_RE = re.compile(r"1[3-9]\d{9}") EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") +PHONE_CANDIDATE_RE = re.compile(r"(? str: - compact = re.sub(r"\D", "", text or "") - match = PHONE_RE.search(compact) - return match.group(0) if match else "" + if not text: + return "" + + # 避免把邮箱前缀中的数字误识别为手机号 + sanitized = EMAIL_RE.sub(" ", str(text)) + for match in PHONE_CANDIDATE_RE.finditer(sanitized): + candidate = match.group(0) + compact = re.sub(r"\D", "", candidate) + if compact.startswith("86") and len(compact) == 13: + compact = compact[2:] + if len(compact) == 11 and compact.startswith("1") and compact[1] in "3456789": + return compact + + return "" def strip_html_tags(text: str) -> str: @@ -474,9 +486,110 @@ class HualvCrawler: return existing - def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: + def _extract_email_from_params_text(self, params_text: str) -> str: + if not params_text: + return "" + try: + data = json.loads(params_text) + except Exception: + return "" + if not isinstance(data, dict): + return "" + profile = data.get("profile") or {} + if not isinstance(profile, dict): + return "" + return str(profile.get("email") or "").strip() + + def _is_phone_from_email_prefix(self, phone: str, email: str) -> bool: + phone_text = str(phone or "").strip() + email_text = str(email or "").strip() + if not phone_text or not email_text or "@" not in email_text: + return False + prefix = email_text.split("@", 1)[0] + prefix_phone = normalize_phone(prefix) + return bool(prefix_phone) and prefix_phone == phone_text + + def _existing_rows_by_urls(self, urls: List[str]) -> Dict[str, List[Dict[str, str]]]: + if not self.db or not urls: + return {} + + deduped = sorted({u for u in urls if u}) + if not deduped: + return {} + + result: Dict[str, List[Dict[str, str]]] = {} + cur = self.db.db.cursor(pymysql.cursors.DictCursor) + try: + chunk_size = 200 + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = ( + "SELECT id, phone, url, params FROM lawyer " + f"WHERE domain=%s AND url IN ({placeholders})" + ) + cur.execute(sql, [LEGACY_DOMAIN, *chunk]) + for row in cur.fetchall() or []: + key = str(row.get("url") or "") + if not key: + continue + result.setdefault(key, []).append(row) + finally: + cur.close() + + return result + + def _cleanup_dirty_duplicates_for_urls(self, urls: List[str]) -> int: if not self.db: - return 0, 0 + return 0 + + by_url = self._existing_rows_by_urls(urls) + if not by_url: + return 0 + + delete_ids: List[int] = [] + for _, rows in by_url.items(): + if len(rows) <= 1: + continue + + dirty_ids: List[int] = [] + has_clean = False + for row in rows: + row_id = int(row.get("id") or 0) + row_phone = str(row.get("phone") or "").strip() + row_email = self._extract_email_from_params_text(str(row.get("params") or "")) + if row_id <= 0: + continue + if self._is_phone_from_email_prefix(row_phone, row_email): + dirty_ids.append(row_id) + else: + has_clean = True + + if has_clean and dirty_ids: + delete_ids.extend(dirty_ids) + + if not delete_ids: + return 0 + + removed = 0 + cur = self.db.db.cursor() + try: + chunk_size = 300 + for i in range(0, len(delete_ids), chunk_size): + chunk = delete_ids[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"DELETE FROM lawyer WHERE id IN ({placeholders})" + cur.execute(sql, chunk) + removed += cur.rowcount + self.db.db.commit() + finally: + cur.close() + + return removed + + def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int, int, int]: + if not self.db: + return 0, 0, 0, 0 rows: List[Dict[str, str]] = [] for record in records: @@ -484,14 +597,73 @@ class HualvCrawler: if row: rows.append(row) if not rows: - return 0, 0 + return 0, 0, 0, 0 existing = self._existing_phones_in_db([row["phone"] for row in rows]) + existing_by_url = self._existing_rows_by_urls([str(row.get("url") or "") for row in rows]) inserted = 0 skipped = 0 + repaired = 0 + + cur = self.db.db.cursor() + update_sql = ( + "UPDATE lawyer SET name=%s, phone=%s, law_firm=%s, province=%s, city=%s, " + "url=%s, domain=%s, create_time=%s, params=%s WHERE id=%s" + ) for row in rows: - phone = row.get("phone", "") + phone = str(row.get("phone") or "").strip() + url = str(row.get("url") or "").strip() + if not phone: + skipped += 1 + continue + + same_url_rows = existing_by_url.get(url, []) if url else [] + if same_url_rows: + if any(str(item.get("phone") or "").strip() == phone for item in same_url_rows): + skipped += 1 + continue + + row_email = self._extract_email_from_params_text(str(row.get("params") or "")) + new_is_dirty = self._is_phone_from_email_prefix(phone, row_email) + + repair_target = None + for item in same_url_rows: + old_phone = str(item.get("phone") or "").strip() + old_email = self._extract_email_from_params_text(str(item.get("params") or "")) + if self._is_phone_from_email_prefix(old_phone, old_email): + repair_target = item + break + + if repair_target and not new_is_dirty: + try: + cur.execute( + update_sql, + ( + row.get("name") or "", + phone, + row.get("law_firm") or "", + row.get("province") or "", + row.get("city") or "", + row.get("url") or "", + row.get("domain") or LEGACY_DOMAIN, + int(row.get("create_time") or time.time()), + row.get("params") or "{}", + int(repair_target.get("id") or 0), + ), + ) + self.db.db.commit() + repaired += 1 + existing.add(phone) + old_phone = str(repair_target.get("phone") or "").strip() + if old_phone: + existing.discard(old_phone) + repair_target["phone"] = phone + repair_target["params"] = row.get("params") or "{}" + continue + except Exception as exc: + print(f"[db] 修复失败 phone={phone} url={url}: {exc}") + if not phone or phone in existing: skipped += 1 continue @@ -501,9 +673,13 @@ class HualvCrawler: inserted += 1 except Exception as exc: skipped += 1 - print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") + print(f"[db] 插入失败 phone={phone} url={url}: {exc}") - return inserted, skipped + cur.close() + + cleaned = self._cleanup_dirty_duplicates_for_urls([str(row.get("url") or "") for row in rows]) + + return inserted, skipped, repaired, cleaned def crawl( self, @@ -547,6 +723,8 @@ class HualvCrawler: total_new_json = 0 total_new_db = 0 total_skip_db = 0 + total_repair_db = 0 + total_clean_db = 0 with open(output_path, "a", encoding="utf-8") as out: for idx, target in enumerate(cities, start=1): @@ -566,17 +744,21 @@ class HualvCrawler: city_new_json += 1 total_new_json += 1 - city_new_db, city_skip_db = self._write_records_to_db(city_records) + city_new_db, city_skip_db, city_repair_db, city_clean_db = self._write_records_to_db(city_records) total_new_db += city_new_db total_skip_db += city_skip_db + total_repair_db += city_repair_db + total_clean_db += city_clean_db print( f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " - f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" + f"DB新增{city_new_db}条, DB修复{city_repair_db}条, " + f"DB清理{city_clean_db}条, DB跳过{city_skip_db}条" ) print( f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " + f"DB修复{total_repair_db}条, DB清理{total_clean_db}条, " f"DB跳过{total_skip_db}条, 输出: {output_path}" )