#!/usr/bin/env python3 import argparse from concurrent.futures import ThreadPoolExecutor, as_completed import hashlib import json import os import re import subprocess import sys import time import urllib.request from typing import Dict, List, Optional, Set, Tuple current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) if project_root not in sys.path: sys.path.append(project_root) from Db import Db SITE_NAME = "zhongfali_group80" LEGACY_DOMAIN = "众法利单页" START_URL = "http://m.zhongfali.com/pg.jsp?groupId=80&pgt=0&pgs=1" DEFAULT_OUTPUT = "/www/wwwroot/lawyers/data/one_off_sites/zhongfali_records_all.jsonl" SOCKS_PROXY = "127.0.0.1:7891" CLASH_CONTROLLER = os.environ.get("CLASH_CONTROLLER", "http://127.0.0.1:9090") CLASH_SECRET = os.environ.get("CLASH_SECRET", "") PHONE_RE = re.compile(r"1[3-9]\d{9}") INITIAL_STATE_RE = re.compile(r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})", re.S) class ProxyRotator: def __init__(self, controller: str, secret: str): self.controller = controller.rstrip("/") self.secret = secret.strip() self.nodes: List[str] = [] self.index = 0 def _api(self, path: str, method: str = "GET", payload: Optional[Dict] = None) -> Dict: headers = {} if self.secret: headers["Authorization"] = f"Bearer {self.secret}" body = None if payload is not None: headers["Content-Type"] = "application/json" body = json.dumps(payload).encode("utf-8") req = urllib.request.Request( f"{self.controller}{path}", data=body, headers=headers, method=method, ) with urllib.request.urlopen(req, timeout=10) as resp: raw = resp.read().decode("utf-8", errors="ignore") return json.loads(raw) if raw else {} def initialize(self) -> None: if not self.secret: return try: self._api("/configs", method="PATCH", payload={"mode": "global"}) proxy_data = self._api("/proxies") proxies = proxy_data.get("proxies", {}) or {} skip = { "GLOBAL", "DIRECT", "REJECT", "REJECT-DROP", "PASS", "COMPATIBLE", "🔰 选择节点", "☁️ OneDrive", "🐟 漏网之鱼", "🎯 全球直连", "🛑 拦截广告", "🌍 爱奇艺&哔哩哔哩", "🎮 Steam 登录/下载", "🎮 Steam 商店/社区", "🌩️ Cloudflare", "🎬 动画疯", "🎓学术网站", "🇨🇳 国内网站", } self.nodes = [ name for name, info in proxies.items() if name not in skip and isinstance(info, dict) and info.get("type") not in {"Selector", "URLTest", "Fallback", "LoadBalance"} ] if self.nodes: self.switch_to(self.nodes[0]) except Exception as exc: print(f"[proxy] rotator init failed: {exc}") self.nodes = [] def switch_to(self, node_name: str) -> None: self._api("/proxies/GLOBAL", method="PUT", payload={"name": node_name}) def rotate(self) -> None: if not self.nodes: return self.index = (self.index + 1) % len(self.nodes) node = self.nodes[self.index] self.switch_to(node) def normalize_phone(value: str) -> str: compact = "".join(ch for ch in str(value or "") if ch.isdigit()) match = PHONE_RE.search(compact) return match.group(0) if match else "" def fetch_html( url: str, rotator: Optional[ProxyRotator] = None, max_retries: int = 6, timeout_seconds: int = 18, ) -> str: last_error = "" for attempt in range(max_retries): cmd = [ "curl", "-sS", "--socks5-hostname", SOCKS_PROXY, "-L", "--compressed", "--max-time", str(timeout_seconds), "-w", "\n__CODE__:%{http_code}", url, ] proc = subprocess.run(cmd, capture_output=True) if proc.returncode == 0: raw = proc.stdout.decode("utf-8", errors="ignore") marker = "\n__CODE__:" split_at = raw.rfind(marker) if split_at != -1: text = raw[:split_at] code_text = raw[split_at + len(marker):].strip() else: text = raw code_text = "" code_ok = code_text == "200" if code_text else bool(text) if text and code_ok: return text last_error = "empty body" else: last_error = proc.stderr.decode("utf-8", errors="ignore").strip() or f"exit={proc.returncode}" if rotator and rotator.nodes: try: rotator.rotate() except Exception as exc: last_error = f"{last_error}; rotate failed: {exc}" if attempt < max_retries - 1: time.sleep(0.6 * (attempt + 1)) raise RuntimeError(f"fetch failed: {url}, reason={last_error}") def parse_initial_state(html: str) -> Dict: match = INITIAL_STATE_RE.search(html) if not match: raise ValueError("window.__INITIAL_STATE__ not found") return json.loads(match.group(1)) def extract_group_urls_from_group80(state: Dict) -> List[str]: module = (state.get("currentPageModuleIdMap") or {}).get("21") or {} ext_info = module.get("extInfo", {}) or {} second_group_map = ext_info.get("secondGroupMap", {}) or {} rows = second_group_map.get("80") or [] urls: Set[str] = set() for row in rows: url = str(row.get("url") or "").strip() if url: urls.add(url) for city in row.get("thirdGroupList") or []: city_url = str(city.get("url") or "").strip() if city_url: urls.add(city_url) return sorted(urls) def extract_detail_urls_from_group_html(html: str) -> Set[str]: detail_ids = set(re.findall(r"h-pd-(\d+)\.html", html)) return {f"http://m.zhongfali.com/h-pd-{pid}.html" for pid in detail_ids} def parse_location_and_name(product_name: str) -> Tuple[str, str, str]: text = re.sub(r"\s+", " ", str(product_name or "")).strip() province = "" city = "" name = "" province_match = re.search(r"([\u4e00-\u9fa5]{2,}省)", text) if province_match: province = province_match.group(1) city_match = re.search(r"(?:[\u4e00-\u9fa5]+省)?([\u4e00-\u9fa5]+(?:市|区|县|州|盟))", text) if city_match: city = city_match.group(1) name_match = re.search(r"([\u4e00-\u9fa5]{2,4})\s*律师", text) if name_match: name = name_match.group(1) return province, city, name def parse_detail_record(detail_url: str, html: str, source_list_url: str) -> Optional[Dict]: state = parse_initial_state(html) module = None for mod in (state.get("currentPageModuleIdMap") or {}).values(): if isinstance(mod, dict) and (mod.get("extInfo") or {}).get("productInfo"): module = mod break if not module: return None ext_info = module.get("extInfo", {}) or {} product_info = ext_info.get("productInfo", {}) or {} phone = normalize_phone(product_info.get("material", "")) if not phone: return None product_name = str(product_info.get("name") or "").strip() province, city, lawyer_name = parse_location_and_name(product_name) law_firm = str(product_info.get("prop0") or "").strip() if not lawyer_name: lawyer_name = product_name now = int(time.time()) record_id = hashlib.md5(detail_url.encode("utf-8")).hexdigest() return { "record_id": record_id, "collected_at": now, "source": { "site": SITE_NAME, "list_url": source_list_url, "detail_url": detail_url, "province": province, "province_py": "", "city": city, "city_py": "", "page": 1, }, "list_snapshot": { "name": lawyer_name, "law_firm": law_firm, "specialties": [], "answer_count": None, }, "profile": { "name": lawyer_name, "law_firm": law_firm, "phone": phone, "license_no": str(product_info.get("prop1") or "").strip(), "practice_years": None, "email": "", "address": str(product_info.get("prop3") or "").strip(), "specialties": [], }, "raw": { "product_name": product_name, "group_ids": product_info.get("groupIdList") or [], }, } def to_legacy_row(record: Dict) -> Optional[Dict[str, str]]: profile = record.get("profile", {}) or {} source = record.get("source", {}) or {} phone = normalize_phone(profile.get("phone", "")) if not phone: return None province = str(source.get("province") or "").strip() city = str(source.get("city") or province).strip() return { "name": str(profile.get("name") or "").strip(), "law_firm": str(profile.get("law_firm") or "").strip(), "province": province, "city": city, "phone": phone, "url": str(source.get("detail_url") or "").strip(), "domain": LEGACY_DOMAIN, "create_time": int(record.get("collected_at") or time.time()), "params": json.dumps(record, ensure_ascii=False), } def delete_old_domain_data(db: Db, domain: str) -> int: cur = db.db.cursor() try: cur.execute("DELETE FROM lawyer WHERE domain=%s", (domain,)) affected = cur.rowcount db.db.commit() return affected finally: cur.close() def write_records_to_db(db: Db, records: List[Dict]) -> int: inserted = 0 for record in records: row = to_legacy_row(record) if not row: continue try: db.insert_data("lawyer", row) inserted += 1 except Exception as exc: print(f"[db] insert failed phone={row.get('phone', '')}: {exc}") return inserted def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="众法利 groupId=80 基础字段采集(姓名/手机号/地区)") parser.add_argument("--start-url", default=START_URL, help="入口分组页 URL") parser.add_argument("--output", default=DEFAULT_OUTPUT, help="JSONL 输出路径") parser.add_argument("--no-db", action="store_true", help="只写 JSON,不写 DB") parser.add_argument("--no-reset", action="store_true", help="不清理 domain 旧数据") parser.add_argument("--workers", type=int, default=16, help="详情页并发数") return parser.parse_args() def main() -> None: args = parse_args() os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) rotator = ProxyRotator(CLASH_CONTROLLER, CLASH_SECRET) rotator.initialize() if rotator.nodes: print(f"[proxy] rotator enabled, nodes={len(rotator.nodes)}") else: print("[proxy] rotator disabled, using current proxy route") start_retries = max(8, len(rotator.nodes) + 2) if rotator.nodes else 8 group_html = fetch_html(args.start_url, rotator=rotator, max_retries=start_retries) group_state = parse_initial_state(group_html) group_urls = extract_group_urls_from_group80(group_state) print(f"[group] found group urls: {len(group_urls)}") detail_url_to_source: Dict[str, str] = {} for idx, rel_url in enumerate(group_urls, start=1): list_url = f"http://m.zhongfali.com/{rel_url.lstrip('/')}" try: html = fetch_html(list_url, rotator=rotator, max_retries=4, timeout_seconds=12) detail_urls = extract_detail_urls_from_group_html(html) except Exception as exc: print(f"[group] failed {list_url}: {exc}") continue for detail_url in detail_urls: detail_url_to_source.setdefault(detail_url, list_url) if idx % 10 == 0: print(f"[group] {idx}/{len(group_urls)} detail_urls={len(detail_url_to_source)}") records: List[Dict] = [] seen_phones: Set[str] = set() detail_urls = sorted(detail_url_to_source.keys()) print(f"[detail] total detail urls: {len(detail_urls)}") def process_detail(detail_url: str) -> Optional[Dict]: try: html = fetch_html(detail_url, rotator=rotator, max_retries=2, timeout_seconds=8) record = parse_detail_record(detail_url, html, detail_url_to_source[detail_url]) return record except Exception as exc: print(f"[detail] failed {detail_url}: {exc}") return None done = 0 with ThreadPoolExecutor(max_workers=max(1, int(args.workers))) as executor: futures = [executor.submit(process_detail, detail_url) for detail_url in detail_urls] for future in as_completed(futures): done += 1 record = future.result() if record: phone = normalize_phone((record.get("profile", {}) or {}).get("phone", "")) if phone and phone not in seen_phones: seen_phones.add(phone) records.append(record) if done % 50 == 0: print(f"[detail] {done}/{len(detail_urls)} valid_records={len(records)}") with open(args.output, "w", encoding="utf-8") as out: for record in records: out.write(json.dumps(record, ensure_ascii=False) + "\n") deleted = 0 inserted = 0 if not args.no_db: with Db() as db: if not args.no_reset: deleted = delete_old_domain_data(db, LEGACY_DOMAIN) inserted = write_records_to_db(db, records) print( f"[done] records={len(records)}, db_deleted={deleted}, db_inserted={inserted}, output={args.output}" ) if __name__ == "__main__": main()