import argparse import hashlib import json import os import random import re import sys import time from dataclasses import dataclass, field from typing import Dict, Iterable, List, Optional, Set, Tuple import urllib3 from bs4 import BeautifulSoup current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) request_dir = os.path.join(project_root, "request") if request_dir not in sys.path: sys.path.insert(0, request_dir) if project_root not in sys.path: sys.path.append(project_root) from Db import Db from request.requests_client import RequestClientError, RequestsClient from utils.rate_limiter import wait_for_request urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) SITE_NAME = "lawtime" LEGACY_DOMAIN = "法律快车" SITE_BASE = "https://www.lawtime.cn" PROVINCE_API = "https://www.lawtime.cn/public/stationIndex/getAreaList?areacode=0" CITY_API_TEMPLATE = "https://www.lawtime.cn/public/stationIndex/getAreaList?areacode={province_id}" LIST_URL_TEMPLATE = SITE_BASE + "/{city_py}/lawyer" PHONE_RE = re.compile(r"1[3-9]\d{9}") YEAR_RE = re.compile(r"执业\s*(\d+)\s*年") @dataclass class CityTarget: province_id: str province_name: str province_py: str city_id: str city_name: str city_py: str @dataclass class ListCard: detail_url: str name: str phone: str address: str = "" specialties: List[str] = field(default_factory=list) metric_text: str = "" def normalize_phone(text: str) -> str: compact = re.sub(r"\D", "", text or "") match = PHONE_RE.search(compact) return match.group(0) if match else "" class LawtimeCrawler: def __init__( self, max_pages: int = 9999, sleep_seconds: float = 0.1, use_proxy: bool = True, db_connection=None, ): self.max_pages = max_pages self.sleep_seconds = max(0.0, sleep_seconds) self.db = db_connection self.client = RequestsClient( headers={ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ), "Accept": "text/html,application/json,*/*;q=0.8", "Connection": "close", }, use_proxy=use_proxy, retry_total=2, retry_backoff_factor=1, retry_status_forcelist=(429, 500, 502, 503, 504), retry_allowed_methods=("GET",), ) def _get_text( self, url: str, *, timeout: int = 20, max_retries: int = 3, referer: str = SITE_BASE, ) -> str: headers = {"Referer": referer} last_error: Optional[Exception] = None for attempt in range(max_retries): wait_for_request() try: resp = self.client.get_text( url, timeout=timeout, verify=False, headers=headers, ) code = resp.status_code if code == 403: if attempt < max_retries - 1: self.client.refresh() time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) continue raise RequestClientError(f"{code} Error: {url}") if code >= 500 and attempt < max_retries - 1: time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) continue if code >= 400: raise RequestClientError(f"{code} Error: {url}") return resp.text except Exception as exc: last_error = exc if attempt < max_retries - 1: time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) continue raise if last_error is not None: raise last_error raise RequestClientError(f"Unknown request error: {url}") def _get_json(self, url: str, *, referer: str) -> List[Dict]: text = self._get_text(url, referer=referer) cleaned = (text or "").strip().lstrip("\ufeff") if not cleaned or cleaned.startswith("<"): return [] try: data = json.loads(cleaned) except ValueError: return [] return data if isinstance(data, list) else [] def discover_cities(self) -> List[CityTarget]: provinces = self._get_json(PROVINCE_API, referer=SITE_BASE) if not provinces: print("[discover] 地区接口未返回有效数据") return [] results: List[CityTarget] = [] seen_py: Set[str] = set() for province in provinces: province_id = str(province.get("id") or "").strip() province_name = str(province.get("province") or province.get("city") or "").strip() province_py = str(province.get("pinyin") or "").strip() if not province_id or not province_name: continue city_api = CITY_API_TEMPLATE.format(province_id=province_id) try: cities = self._get_json(city_api, referer=LIST_URL_TEMPLATE.format(city_py=province_py or "")) except Exception as exc: print(f"[city] 获取失败 province={province_id}: {exc}") continue if not cities: cities = [ { "id": province_id, "province": province_name, "city": province_name, "pinyin": province_py, } ] for city in cities: city_id = str(city.get("id") or "").strip() city_name = str(city.get("city") or city.get("province") or "").strip() city_py = str(city.get("pinyin") or "").strip() if not city_id or not city_name or not city_py: continue if city_py in seen_py: continue seen_py.add(city_py) results.append( CityTarget( province_id=province_id, province_name=province_name, province_py=province_py, city_id=city_id, city_name=city_name, city_py=city_py, ) ) return results def _build_list_url(self, city_py: str, page: int) -> str: base = LIST_URL_TEMPLATE.format(city_py=city_py) if page <= 1: return base return f"{base}?page={page}" def fetch_list_page(self, target: CityTarget, page: int) -> Tuple[List[ListCard], bool, str]: list_url = self._build_list_url(target.city_py, page) html = self._get_text(list_url, referer=SITE_BASE + "/") cards = self.parse_list_cards(html) soup = BeautifulSoup(html, "html.parser") next_link = soup.select_one(f"div.page a[href*='page={page + 1}']") has_next = next_link is not None return cards, has_next, list_url def parse_list_cards(self, html: str) -> List[ListCard]: soup = BeautifulSoup(html, "html.parser") cards: List[ListCard] = [] seen: Set[str] = set() for item in soup.select("li.lawyer-item-card"): link_tag = item.select_one("a.name[href]") or item.select_one("a.lawyer-img-box[href]") if not link_tag: continue detail_url = (link_tag.get("href") or "").strip() if not detail_url.startswith("http"): continue if detail_url in seen: continue seen.add(detail_url) name = link_tag.get_text(strip=True) phone = "" phone_tag = item.select_one("div.phone") if phone_tag: phone = normalize_phone(phone_tag.get_text(" ", strip=True)) address = "" addr_tag = item.select_one("div.location .txt") if addr_tag: address = addr_tag.get_text(" ", strip=True) specialties: List[str] = [] prof_tag = item.select_one("div.prof .txt") if prof_tag: specialties = [ x.strip() for x in re.split(r"[、,,]", prof_tag.get_text(" ", strip=True)) if x.strip() ] metric_text = "" metric_tag = item.select_one("div.num-msg") if metric_tag: metric_text = metric_tag.get_text(" ", strip=True) cards.append( ListCard( detail_url=detail_url, name=name, phone=phone, address=address, specialties=specialties, metric_text=metric_text, ) ) return cards def parse_detail(self, detail_url: str) -> Dict: html = self._get_text(detail_url, referer=SITE_BASE) if "网站防火墙" in html or "访问被拒绝" in html or "/ipfilter/verify" in html: raise RequestClientError(f"firewall blocked: {detail_url}") soup = BeautifulSoup(html, "html.parser") text = soup.get_text(" ", strip=True) name = "" law_firm = "" phone = "" address = "" practice_years: Optional[int] = None specialties: List[str] = [] if soup.title: title = soup.title.get_text(" ", strip=True) match = re.search(r"([^\s_,,。]+?)律师", title) if match: name = match.group(1).strip() phone_candidates = [ soup.select_one(".data-w .tel-b b").get_text(" ", strip=True) if soup.select_one(".data-w .tel-b b") else "", soup.select_one(".law-info-b .item .two-r.b").get_text(" ", strip=True) if soup.select_one(".law-info-b .item .two-r.b") else "", text, ] for candidate in phone_candidates: phone = normalize_phone(candidate) if phone: break law_firm_tag = soup.select_one(".law-info-b .item .two-nowrap") if law_firm_tag: law_firm = law_firm_tag.get_text(" ", strip=True) for li in soup.select(".law-info-b .item"): li_text = li.get_text(" ", strip=True) if ("事务所" in li_text or "法律服务所" in li_text) and not law_firm: law_firm = li_text addr_tag = soup.select_one(".law-info-b .item .two-r[title]") if addr_tag: addr_value = (addr_tag.get("title") or "").strip() if len(addr_value) > 8: address = addr_value if not address: addr_tag = soup.select_one(".law-info-b .item .two-r") if addr_tag: addr_value = addr_tag.get_text(" ", strip=True) if len(addr_value) > 8 and "律师" not in addr_value: address = addr_value year_match = YEAR_RE.search(text) if year_match: try: practice_years = int(year_match.group(1)) except Exception: practice_years = None specialties = [x.get_text(strip=True) for x in soup.select(".profession-b .item") if x.get_text(strip=True)] return { "name": name, "law_firm": law_firm, "phone": phone, "address": address, "practice_years": practice_years, "specialties": specialties, "detail_url": detail_url, } def crawl_city(self, target: CityTarget) -> Iterable[Dict]: seen_details: Set[str] = set() for page in range(1, self.max_pages + 1): try: cards, has_next, list_url = self.fetch_list_page(target, page) except Exception as exc: print(f"[list] 失败 {target.city_py} p{page}: {exc}") break if not cards: break for card in cards: if card.detail_url in seen_details: continue seen_details.add(card.detail_url) detail: Dict = {} try: detail = self.parse_detail(card.detail_url) except Exception as exc: print(f"[detail] 失败 {card.detail_url}: {exc}") phone = normalize_phone(detail.get("phone") or card.phone) profile_name = (detail.get("name") or card.name).replace("律师", "").strip() now = int(time.time()) record_id = hashlib.md5(card.detail_url.encode("utf-8")).hexdigest() yield { "record_id": record_id, "collected_at": now, "source": { "site": SITE_NAME, "province_id": target.province_id, "province": target.province_name, "province_py": target.province_py, "city_id": target.city_id, "city": target.city_name, "city_py": target.city_py, "page": page, "list_url": list_url, "detail_url": card.detail_url, }, "list_snapshot": { "name": card.name, "phone": card.phone, "address": card.address, "specialties": card.specialties, "metric_text": card.metric_text, }, "profile": { "name": profile_name, "law_firm": (detail.get("law_firm") or "").strip(), "phone": phone, "address": (detail.get("address") or card.address or "").strip(), "practice_years": detail.get("practice_years"), "specialties": detail.get("specialties") or card.specialties, }, } if self.sleep_seconds: time.sleep(self.sleep_seconds) if not has_next: break def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]: source = record.get("source", {}) or {} profile = record.get("profile", {}) or {} phone = normalize_phone(profile.get("phone", "")) if not phone: return None province = (source.get("province") or "").strip() city = (source.get("city") or province).strip() return { "name": (profile.get("name") or "").strip(), "law_firm": (profile.get("law_firm") or "").strip(), "province": province, "city": city, "phone": phone, "url": (source.get("detail_url") or source.get("list_url") or "").strip(), "domain": LEGACY_DOMAIN, "create_time": int(record.get("collected_at") or time.time()), "params": json.dumps(record, ensure_ascii=False), } def _existing_phones_in_db(self, phones: List[str]) -> Set[str]: if not self.db or not phones: return set() deduped = sorted({p for p in phones if p}) if not deduped: return set() existing: Set[str] = set() cur = self.db.db.cursor() try: chunk_size = 500 for i in range(0, len(deduped), chunk_size): chunk = deduped[i:i + chunk_size] placeholders = ",".join(["%s"] * len(chunk)) sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" cur.execute(sql, [LEGACY_DOMAIN, *chunk]) for row in cur.fetchall(): existing.add(row[0]) finally: cur.close() return existing def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: if not self.db: return 0, 0 rows: List[Dict[str, str]] = [] for record in records: row = self._to_legacy_lawyer_row(record) if row: rows.append(row) if not rows: return 0, 0 existing = self._existing_phones_in_db([row["phone"] for row in rows]) inserted = 0 skipped = 0 for row in rows: phone = row.get("phone", "") if not phone or phone in existing: skipped += 1 continue try: self.db.insert_data("lawyer", row) existing.add(phone) inserted += 1 except Exception as exc: skipped += 1 print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") return inserted, skipped def crawl( self, output_path: str, max_cities: int = 0, city_filter: Optional[str] = None, ) -> None: cities = self.discover_cities() print(f"[discover] 共发现城市 {len(cities)} 个") if city_filter: key = city_filter.strip().lower() cities = [ c for c in cities if key in c.city_py.lower() or key in c.city_name.lower() ] print(f"[discover] 过滤后城市 {len(cities)} 个, filter={city_filter}") if max_cities > 0: cities = cities[:max_cities] print(f"[discover] 截断城市数 {len(cities)}") os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) seen_ids: Set[str] = set() if os.path.exists(output_path): with open(output_path, "r", encoding="utf-8") as old_file: for line in old_file: line = line.strip() if not line: continue try: item = json.loads(line) except Exception: continue rid = item.get("record_id") if rid: seen_ids.add(rid) print(f"[resume] 已有记录 {len(seen_ids)} 条") total_new_json = 0 total_new_db = 0 total_skip_db = 0 with open(output_path, "a", encoding="utf-8") as out: for idx, target in enumerate(cities, start=1): print( f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} " f"({target.city_py})" ) city_records = list(self.crawl_city(target)) city_new_json = 0 for record in city_records: rid = record["record_id"] if rid in seen_ids: continue out.write(json.dumps(record, ensure_ascii=False) + "\n") seen_ids.add(rid) city_new_json += 1 total_new_json += 1 city_new_db, city_skip_db = self._write_records_to_db(city_records) total_new_db += city_new_db total_skip_db += city_skip_db print( f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" ) print( f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " f"DB跳过{total_skip_db}条, 输出: {output_path}" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="法律快车全新采集脚本(站点数据直采)") parser.add_argument( "--output", default="/www/wwwroot/lawyers/data/lawtime_records_all.jsonl", help="输出 jsonl 文件路径", ) parser.add_argument( "--max-cities", type=int, default=0, help="最多采集多少个城市,0 表示不限", ) parser.add_argument( "--max-pages", type=int, default=9999, help="每个城市最多采集多少页", ) parser.add_argument( "--city-filter", default="", help="按城市拼音或城市名过滤,如 beijing", ) parser.add_argument( "--sleep", type=float, default=0.1, help="详情页请求间隔秒数", ) parser.add_argument( "--direct", action="store_true", help="直连模式,不使用 proxy_settings.json 代理", ) parser.add_argument( "--no-db", action="store_true", help="只输出 JSONL,不写入数据库", ) return parser.parse_args() def main(): args = parse_args() if args.no_db: crawler = LawtimeCrawler( max_pages=args.max_pages, sleep_seconds=args.sleep, use_proxy=not args.direct, db_connection=None, ) crawler.crawl( output_path=args.output, max_cities=args.max_cities, city_filter=args.city_filter or None, ) return with Db() as db: crawler = LawtimeCrawler( max_pages=args.max_pages, sleep_seconds=args.sleep, use_proxy=not args.direct, db_connection=db, ) crawler.crawl( output_path=args.output, max_cities=args.max_cities, city_filter=args.city_filter or None, ) if __name__ == "__main__": main()