diff --git a/.playwright/cli.config.json b/.playwright/cli.config.json new file mode 100644 index 0000000..c669ee3 --- /dev/null +++ b/.playwright/cli.config.json @@ -0,0 +1,5 @@ +{ + "launchOptions": { + "chromiumSandbox": false + } +} diff --git a/one_off_sites/zhongfali_group80_basic.py b/one_off_sites/zhongfali_group80_basic.py new file mode 100644 index 0000000..694fb74 --- /dev/null +++ b/one_off_sites/zhongfali_group80_basic.py @@ -0,0 +1,411 @@ +#!/usr/bin/env python3 +import argparse +from concurrent.futures import ThreadPoolExecutor, as_completed +import hashlib +import json +import os +import re +import subprocess +import sys +import time +import urllib.request +from typing import Dict, List, Optional, Set, Tuple + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +from Db import Db + +SITE_NAME = "zhongfali_group80" +LEGACY_DOMAIN = "众法利单页" +START_URL = "http://m.zhongfali.com/pg.jsp?groupId=80&pgt=0&pgs=1" +DEFAULT_OUTPUT = "/www/wwwroot/lawyers/data/one_off_sites/zhongfali_records_all.jsonl" + +SOCKS_PROXY = "127.0.0.1:7891" +CLASH_CONTROLLER = os.environ.get("CLASH_CONTROLLER", "http://127.0.0.1:9090") +CLASH_SECRET = os.environ.get("CLASH_SECRET", "") +PHONE_RE = re.compile(r"1[3-9]\d{9}") +INITIAL_STATE_RE = re.compile(r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})", re.S) + + +class ProxyRotator: + def __init__(self, controller: str, secret: str): + self.controller = controller.rstrip("/") + self.secret = secret.strip() + self.nodes: List[str] = [] + self.index = 0 + + def _api(self, path: str, method: str = "GET", payload: Optional[Dict] = None) -> Dict: + headers = {} + if self.secret: + headers["Authorization"] = f"Bearer {self.secret}" + body = None + if payload is not None: + headers["Content-Type"] = "application/json" + body = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + f"{self.controller}{path}", + data=body, + headers=headers, + method=method, + ) + with urllib.request.urlopen(req, timeout=10) as resp: + raw = resp.read().decode("utf-8", errors="ignore") + return json.loads(raw) if raw else {} + + def initialize(self) -> None: + if not self.secret: + return + try: + self._api("/configs", method="PATCH", payload={"mode": "global"}) + proxy_data = self._api("/proxies") + proxies = proxy_data.get("proxies", {}) or {} + skip = { + "GLOBAL", + "DIRECT", + "REJECT", + "REJECT-DROP", + "PASS", + "COMPATIBLE", + "🔰 选择节点", + "☁️ OneDrive", + "🐟 漏网之鱼", + "🎯 全球直连", + "🛑 拦截广告", + "🌍 爱奇艺&哔哩哔哩", + "🎮 Steam 登录/下载", + "🎮 Steam 商店/社区", + "🌩️ Cloudflare", + "🎬 动画疯", + "🎓学术网站", + "🇨🇳 国内网站", + } + self.nodes = [ + name + for name, info in proxies.items() + if name not in skip and isinstance(info, dict) + and info.get("type") not in {"Selector", "URLTest", "Fallback", "LoadBalance"} + ] + if self.nodes: + self.switch_to(self.nodes[0]) + except Exception as exc: + print(f"[proxy] rotator init failed: {exc}") + self.nodes = [] + + def switch_to(self, node_name: str) -> None: + self._api("/proxies/GLOBAL", method="PUT", payload={"name": node_name}) + + def rotate(self) -> None: + if not self.nodes: + return + self.index = (self.index + 1) % len(self.nodes) + node = self.nodes[self.index] + self.switch_to(node) + + +def normalize_phone(value: str) -> str: + compact = "".join(ch for ch in str(value or "") if ch.isdigit()) + match = PHONE_RE.search(compact) + return match.group(0) if match else "" + + +def fetch_html( + url: str, + rotator: Optional[ProxyRotator] = None, + max_retries: int = 6, + timeout_seconds: int = 18, +) -> str: + last_error = "" + for attempt in range(max_retries): + cmd = [ + "curl", + "-sS", + "--socks5-hostname", + SOCKS_PROXY, + "-L", + "--compressed", + "--max-time", + str(timeout_seconds), + "-w", + "\n__CODE__:%{http_code}", + url, + ] + proc = subprocess.run(cmd, capture_output=True) + if proc.returncode == 0: + raw = proc.stdout.decode("utf-8", errors="ignore") + marker = "\n__CODE__:" + split_at = raw.rfind(marker) + if split_at != -1: + text = raw[:split_at] + code_text = raw[split_at + len(marker):].strip() + else: + text = raw + code_text = "" + code_ok = code_text == "200" if code_text else bool(text) + if text and code_ok: + return text + last_error = "empty body" + else: + last_error = proc.stderr.decode("utf-8", errors="ignore").strip() or f"exit={proc.returncode}" + if rotator and rotator.nodes: + try: + rotator.rotate() + except Exception as exc: + last_error = f"{last_error}; rotate failed: {exc}" + if attempt < max_retries - 1: + time.sleep(0.6 * (attempt + 1)) + raise RuntimeError(f"fetch failed: {url}, reason={last_error}") + + +def parse_initial_state(html: str) -> Dict: + match = INITIAL_STATE_RE.search(html) + if not match: + raise ValueError("window.__INITIAL_STATE__ not found") + return json.loads(match.group(1)) + + +def extract_group_urls_from_group80(state: Dict) -> List[str]: + module = (state.get("currentPageModuleIdMap") or {}).get("21") or {} + ext_info = module.get("extInfo", {}) or {} + second_group_map = ext_info.get("secondGroupMap", {}) or {} + rows = second_group_map.get("80") or [] + + urls: Set[str] = set() + for row in rows: + url = str(row.get("url") or "").strip() + if url: + urls.add(url) + for city in row.get("thirdGroupList") or []: + city_url = str(city.get("url") or "").strip() + if city_url: + urls.add(city_url) + return sorted(urls) + + +def extract_detail_urls_from_group_html(html: str) -> Set[str]: + detail_ids = set(re.findall(r"h-pd-(\d+)\.html", html)) + return {f"http://m.zhongfali.com/h-pd-{pid}.html" for pid in detail_ids} + + +def parse_location_and_name(product_name: str) -> Tuple[str, str, str]: + text = re.sub(r"\s+", " ", str(product_name or "")).strip() + province = "" + city = "" + name = "" + + province_match = re.search(r"([\u4e00-\u9fa5]{2,}省)", text) + if province_match: + province = province_match.group(1) + + city_match = re.search(r"(?:[\u4e00-\u9fa5]+省)?([\u4e00-\u9fa5]+(?:市|区|县|州|盟))", text) + if city_match: + city = city_match.group(1) + + name_match = re.search(r"([\u4e00-\u9fa5]{2,4})\s*律师", text) + if name_match: + name = name_match.group(1) + + return province, city, name + + +def parse_detail_record(detail_url: str, html: str, source_list_url: str) -> Optional[Dict]: + state = parse_initial_state(html) + + module = None + for mod in (state.get("currentPageModuleIdMap") or {}).values(): + if isinstance(mod, dict) and (mod.get("extInfo") or {}).get("productInfo"): + module = mod + break + if not module: + return None + + ext_info = module.get("extInfo", {}) or {} + product_info = ext_info.get("productInfo", {}) or {} + + phone = normalize_phone(product_info.get("material", "")) + if not phone: + return None + + product_name = str(product_info.get("name") or "").strip() + province, city, lawyer_name = parse_location_and_name(product_name) + law_firm = str(product_info.get("prop0") or "").strip() + + if not lawyer_name: + lawyer_name = product_name + + now = int(time.time()) + record_id = hashlib.md5(detail_url.encode("utf-8")).hexdigest() + return { + "record_id": record_id, + "collected_at": now, + "source": { + "site": SITE_NAME, + "list_url": source_list_url, + "detail_url": detail_url, + "province": province, + "province_py": "", + "city": city, + "city_py": "", + "page": 1, + }, + "list_snapshot": { + "name": lawyer_name, + "law_firm": law_firm, + "specialties": [], + "answer_count": None, + }, + "profile": { + "name": lawyer_name, + "law_firm": law_firm, + "phone": phone, + "license_no": str(product_info.get("prop1") or "").strip(), + "practice_years": None, + "email": "", + "address": str(product_info.get("prop3") or "").strip(), + "specialties": [], + }, + "raw": { + "product_name": product_name, + "group_ids": product_info.get("groupIdList") or [], + }, + } + + +def to_legacy_row(record: Dict) -> Optional[Dict[str, str]]: + profile = record.get("profile", {}) or {} + source = record.get("source", {}) or {} + phone = normalize_phone(profile.get("phone", "")) + if not phone: + return None + + province = str(source.get("province") or "").strip() + city = str(source.get("city") or province).strip() + return { + "name": str(profile.get("name") or "").strip(), + "law_firm": str(profile.get("law_firm") or "").strip(), + "province": province, + "city": city, + "phone": phone, + "url": str(source.get("detail_url") or "").strip(), + "domain": LEGACY_DOMAIN, + "create_time": int(record.get("collected_at") or time.time()), + "params": json.dumps(record, ensure_ascii=False), + } + + +def delete_old_domain_data(db: Db, domain: str) -> int: + cur = db.db.cursor() + try: + cur.execute("DELETE FROM lawyer WHERE domain=%s", (domain,)) + affected = cur.rowcount + db.db.commit() + return affected + finally: + cur.close() + + +def write_records_to_db(db: Db, records: List[Dict]) -> int: + inserted = 0 + for record in records: + row = to_legacy_row(record) + if not row: + continue + try: + db.insert_data("lawyer", row) + inserted += 1 + except Exception as exc: + print(f"[db] insert failed phone={row.get('phone', '')}: {exc}") + return inserted + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="众法利 groupId=80 基础字段采集(姓名/手机号/地区)") + parser.add_argument("--start-url", default=START_URL, help="入口分组页 URL") + parser.add_argument("--output", default=DEFAULT_OUTPUT, help="JSONL 输出路径") + parser.add_argument("--no-db", action="store_true", help="只写 JSON,不写 DB") + parser.add_argument("--no-reset", action="store_true", help="不清理 domain 旧数据") + parser.add_argument("--workers", type=int, default=16, help="详情页并发数") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + + rotator = ProxyRotator(CLASH_CONTROLLER, CLASH_SECRET) + rotator.initialize() + if rotator.nodes: + print(f"[proxy] rotator enabled, nodes={len(rotator.nodes)}") + else: + print("[proxy] rotator disabled, using current proxy route") + + start_retries = max(8, len(rotator.nodes) + 2) if rotator.nodes else 8 + group_html = fetch_html(args.start_url, rotator=rotator, max_retries=start_retries) + group_state = parse_initial_state(group_html) + group_urls = extract_group_urls_from_group80(group_state) + print(f"[group] found group urls: {len(group_urls)}") + + detail_url_to_source: Dict[str, str] = {} + for idx, rel_url in enumerate(group_urls, start=1): + list_url = f"http://m.zhongfali.com/{rel_url.lstrip('/')}" + try: + html = fetch_html(list_url, rotator=rotator, max_retries=4, timeout_seconds=12) + detail_urls = extract_detail_urls_from_group_html(html) + except Exception as exc: + print(f"[group] failed {list_url}: {exc}") + continue + + for detail_url in detail_urls: + detail_url_to_source.setdefault(detail_url, list_url) + if idx % 10 == 0: + print(f"[group] {idx}/{len(group_urls)} detail_urls={len(detail_url_to_source)}") + + records: List[Dict] = [] + seen_phones: Set[str] = set() + detail_urls = sorted(detail_url_to_source.keys()) + print(f"[detail] total detail urls: {len(detail_urls)}") + + def process_detail(detail_url: str) -> Optional[Dict]: + try: + html = fetch_html(detail_url, rotator=rotator, max_retries=2, timeout_seconds=8) + record = parse_detail_record(detail_url, html, detail_url_to_source[detail_url]) + return record + except Exception as exc: + print(f"[detail] failed {detail_url}: {exc}") + return None + + done = 0 + with ThreadPoolExecutor(max_workers=max(1, int(args.workers))) as executor: + futures = [executor.submit(process_detail, detail_url) for detail_url in detail_urls] + for future in as_completed(futures): + done += 1 + record = future.result() + if record: + phone = normalize_phone((record.get("profile", {}) or {}).get("phone", "")) + if phone and phone not in seen_phones: + seen_phones.add(phone) + records.append(record) + if done % 50 == 0: + print(f"[detail] {done}/{len(detail_urls)} valid_records={len(records)}") + + with open(args.output, "w", encoding="utf-8") as out: + for record in records: + out.write(json.dumps(record, ensure_ascii=False) + "\n") + + deleted = 0 + inserted = 0 + if not args.no_db: + with Db() as db: + if not args.no_reset: + deleted = delete_old_domain_data(db, LEGACY_DOMAIN) + inserted = write_records_to_db(db, records) + + print( + f"[done] records={len(records)}, db_deleted={deleted}, db_inserted={inserted}, output={args.output}" + ) + + +if __name__ == "__main__": + main() diff --git a/one_off_sites/zhongfali_single.py b/one_off_sites/zhongfali_single.py index a8cf142..705977c 100644 --- a/one_off_sites/zhongfali_single.py +++ b/one_off_sites/zhongfali_single.py @@ -191,7 +191,7 @@ def extract_records(url: str, state: Dict) -> List[Dict]: "source": { "site": SITE_NAME, "list_url": url, - "detail_url": url, + "detail_url": "", "province": province, "province_py": "", "city": area or city, @@ -199,6 +199,7 @@ def extract_records(url: str, state: Dict) -> List[Dict]: "page": 1, "group_id": group_id, "module_id": module_id, + "detail_url_status": "unresolved_from_pool", }, "list_snapshot": { "name": "", @@ -347,12 +348,68 @@ def write_records_to_db(db: Db, records: List[Dict]) -> Tuple[int, int]: return inserted, skipped +def lookup_name_map_from_db(db: Db, phones: List[str]) -> Dict[str, str]: + deduped = sorted({phone for phone in phones if phone}) + if not deduped: + return {} + + name_map: Dict[str, str] = {} + cur = db.db.cursor() + try: + chunk_size = 500 + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = ( + "SELECT phone, name, create_time FROM lawyer " + f"WHERE phone IN ({placeholders}) AND name<>'' " + "ORDER BY create_time DESC" + ) + cur.execute(sql, chunk) + for phone, name, _ in cur.fetchall(): + if phone not in name_map and name: + name_map[phone] = str(name).strip() + finally: + cur.close() + return name_map + + +def apply_name_backfill(records: List[Dict], name_map: Dict[str, str]) -> int: + updated = 0 + if not name_map: + return updated + + for record in records: + profile = record.get("profile", {}) or {} + list_snapshot = record.get("list_snapshot", {}) or {} + phone = normalize_phone(profile.get("phone", "")) + if not phone: + continue + + backfill_name = name_map.get(phone, "") + if not backfill_name: + continue + + current_name = str(profile.get("name") or "").strip() + if current_name: + continue + + profile["name"] = backfill_name + list_snapshot["name"] = backfill_name + record["profile"] = profile + record["list_snapshot"] = list_snapshot + updated += 1 + + return updated + + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="众法利单页律师电话采集") parser.add_argument("--url", default=DEFAULT_URL, help="详情页 URL") parser.add_argument("--output", default=DEFAULT_OUTPUT, help="输出 jsonl 文件路径") parser.add_argument("--direct", action="store_true", help="直连模式,不使用代理") parser.add_argument("--no-db", action="store_true", help="仅输出 JSON,不写入数据库") + parser.add_argument("--skip-name-backfill", action="store_true", help="跳过按手机号回填姓名") return parser.parse_args() @@ -418,12 +475,24 @@ def main() -> None: db_new = 0 db_skip = 0 + name_backfill_count = 0 + if not args.skip_name_backfill: + try: + with Db() as db: + name_map = lookup_name_map_from_db( + db, + [normalize_phone((record.get("profile", {}) or {}).get("phone", "")) for record in records], + ) + name_backfill_count = apply_name_backfill(records, name_map) + except Exception as exc: + print(f"[name-backfill] 跳过,查询失败: {exc}") + if not args.no_db: with Db() as db: db_new, db_skip = write_records_to_db(db, records) print( - f"[done] 采集{len(records)}条, JSON新增{json_new}条, " + f"[done] 采集{len(records)}条, 姓名回填{name_backfill_count}条, JSON新增{json_new}条, " f"DB新增{db_new}条, DB跳过{db_skip}条, 输出: {args.output}" )