#!/usr/bin/env python3 import argparse import json import os import sys import time from typing import Dict, List, Optional, Set, Tuple from urllib.parse import urlencode current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) request_dir = os.path.join(project_root, "request") if request_dir not in sys.path: sys.path.insert(0, request_dir) if project_root not in sys.path: sys.path.append(project_root) import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from Db import Db from request.proxy_config import get_proxies, report_proxy_status DOMAIN = "百度法行宝" BASE_URL = "https://lvlin.baidu.com" CITY_API = f"{BASE_URL}/pc/api/law/sync/city" LIST_API = f"{BASE_URL}/pc/api/law/api/lawyerlist" DETAIL_API = f"{BASE_URL}/pc/api/law/api/lawyerhome" DEFAULT_PAGE_SIZE = 16 DEFAULT_MAX_PAGES = 30 DEFAULT_STOP_ZERO_NEW_PAGES = 3 DEFAULT_SLEEP_SECONDS = 0.1 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="采集百度法行宝律师信息并落库") parser.add_argument("--province", default="", help="仅采集指定省份,例如:山东") parser.add_argument("--city", default="", help="仅采集指定城市,例如:聊城 / 聊城市") parser.add_argument( "--areas", default="", help="指定案件类型,逗号分隔;不传时自动发现顶级类型并追加不限", ) parser.add_argument( "--limit-cities", type=int, default=0, help="仅处理前 N 个城市,0 表示不限", ) parser.add_argument( "--page-size", type=int, default=DEFAULT_PAGE_SIZE, help=f"每次列表请求条数,默认 {DEFAULT_PAGE_SIZE}", ) parser.add_argument( "--max-pages-per-query", type=int, default=DEFAULT_MAX_PAGES, help=f"单城市单类型最大翻页数,默认 {DEFAULT_MAX_PAGES}", ) parser.add_argument( "--stop-zero-new-pages", type=int, default=DEFAULT_STOP_ZERO_NEW_PAGES, help=f"连续多少页无新增就停止当前查询,默认 {DEFAULT_STOP_ZERO_NEW_PAGES}", ) parser.add_argument( "--sleep-seconds", type=float, default=DEFAULT_SLEEP_SECONDS, help=f"请求间隔秒数,默认 {DEFAULT_SLEEP_SECONDS}", ) return parser.parse_args() class BaiduLvlinSpider: def __init__(self, db_connection: Db, args: argparse.Namespace): self.db = db_connection self.args = args self.page_size = max(1, int(args.page_size or DEFAULT_PAGE_SIZE)) self.max_pages_per_query = max(1, int(args.max_pages_per_query or DEFAULT_MAX_PAGES)) self.stop_zero_new_pages = max(1, int(args.stop_zero_new_pages or DEFAULT_STOP_ZERO_NEW_PAGES)) self.sleep_seconds = max(0.0, float(args.sleep_seconds or 0.0)) self.proxy_enabled = False self.session = self._build_session() self.existing_urls = self._load_existing_urls() self.cities = self._load_cities() self.areas = self._load_areas() self.inserted_count = 0 def _build_session(self) -> requests.Session: report_proxy_status() session = requests.Session() session.trust_env = False proxies = get_proxies() if proxies: session.proxies.update(proxies) self.proxy_enabled = True else: session.proxies.clear() self.proxy_enabled = False retries = Retry( total=3, backoff_factor=1, status_forcelist=(429, 500, 502, 503, 504), allowed_methods=frozenset(["GET"]), raise_on_status=False, ) adapter = HTTPAdapter(max_retries=retries) session.mount("https://", adapter) session.mount("http://", adapter) session.headers.update( { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/123.0.0.0 Safari/537.36" ), "Accept": "application/json, text/plain, */*", "Referer": f"{BASE_URL}/pc/r?vn=law", "Connection": "close", } ) return session def _disable_proxy(self) -> None: if not self.proxy_enabled: return self.session.proxies.clear() self.proxy_enabled = False print(f"[{DOMAIN}] 代理不可用,已切换直连") def _sleep(self) -> None: if self.sleep_seconds > 0: time.sleep(self.sleep_seconds) def _get_json(self, url: str, params: Optional[Dict[str, object]] = None, referer: str = "") -> Dict: headers = {} if referer: headers["Referer"] = referer try: resp = self.session.get(url, params=params or {}, timeout=20, headers=headers) except requests.exceptions.ProxyError: self._disable_proxy() resp = self.session.get(url, params=params or {}, timeout=20, headers=headers) try: resp.raise_for_status() return resp.json() finally: resp.close() def _load_existing_urls(self) -> Set[str]: urls: Set[str] = set() cursor = self.db.db.cursor() try: cursor.execute("SELECT url FROM lawyer WHERE domain=%s AND url IS NOT NULL", (DOMAIN,)) for row in cursor.fetchall(): url = (row[0] or "").strip() if url: urls.add(url) finally: cursor.close() print(f"[{DOMAIN}] 已存在 URL 数: {len(urls)}") return urls def _normalize_city_name(self, city_name: str) -> str: text = str(city_name or "").strip() if text.endswith("市"): return text[:-1] return text def _city_matches(self, expected_city: str, actual_city: str) -> bool: left = self._normalize_city_name(expected_city) right = self._normalize_city_name(actual_city) if not left or not right: return False return left == right def _load_cities(self) -> List[Dict[str, str]]: payload = self._get_json(CITY_API, params={"vn": "law"}, referer=f"{BASE_URL}/pc/r?vn=law") all_city_list = payload.get("data", {}).get("AllCityList", []) or [] cities: List[Dict[str, str]] = [] province_filter = self.args.province.strip() city_filter = self._normalize_city_name(self.args.city) for block in all_city_list: for item in block.get("cityList", []) or []: city_name = str(item.get("name") or "").strip() province = str(item.get("province") or "").strip() city_code = str(item.get("code") or "").strip() if not city_name or not province or not city_code: continue if province_filter and province != province_filter: continue if city_filter and self._normalize_city_name(city_name) != city_filter: continue cities.append( { "province": province, "city": city_name, "city_code": city_code, } ) cities.sort(key=lambda item: (item["province"], item["city"])) if self.args.limit_cities and self.args.limit_cities > 0: cities = cities[: self.args.limit_cities] print(f"[{DOMAIN}] 本次待采城市数: {len(cities)}") return cities def _discover_top_level_areas(self) -> List[str]: sample_city = self.cities[0]["city"] if self.cities else "北京" payload = self._get_json( LIST_API, params={ "city_name": sample_city, "page_num": 1, "page_size": self.page_size, "ts": int(time.time()), "clientType": "pc", "list_type": 1, }, referer=f"{BASE_URL}/pc/r?vn=law", ) filters = payload.get("data", {}).get("filters", []) or [] areas: List[str] = ["不限"] seen = {"不限"} for item in filters: if item.get("key") != "type": continue for option in item.get("options", []) or []: value = str(option.get("value") or "").strip() if not value or value in seen: continue seen.add(value) areas.append(value) return areas def _load_areas(self) -> List[str]: if self.args.areas.strip(): areas = [part.strip() for part in self.args.areas.split(",") if part.strip()] unique: List[str] = [] seen = set() for area in areas: if area not in seen: seen.add(area) unique.append(area) print(f"[{DOMAIN}] 使用指定案件类型: {unique}") return unique areas = self._discover_top_level_areas() print(f"[{DOMAIN}] 自动发现案件类型: {areas}") return areas def _build_pc_detail_url(self, qc_no: str, rs_id: str) -> str: return f"{BASE_URL}/pc/lawyer?vn=law&qc_no={qc_no}&rs_id={rs_id}" def _build_list_page_url(self, city_name: str, area_name: str) -> str: params = {"city": city_name, "vn": "law"} if area_name and area_name != "不限": params["expertiseArea"] = area_name return f"{BASE_URL}/pc/r?{urlencode(params)}" def _fetch_list(self, city_name: str, area_name: str, page_num: int) -> List[Dict]: params: Dict[str, object] = { "city_name": city_name, "page_num": page_num, "page_size": self.page_size, "ts": int(time.time()), "clientType": "pc", "list_type": 1, } if area_name and area_name != "不限": params["expertiseArea"] = area_name payload = self._get_json( LIST_API, params=params, referer=self._build_list_page_url(city_name, area_name), ) return payload.get("data", {}).get("lawyer_list", []) or [] def _fetch_detail(self, qc_no: str, rs_id: str) -> Dict: payload = self._get_json( DETAIL_API, params={"vn": "law", "qc_no": qc_no, "rs_id": rs_id}, referer=self._build_pc_detail_url(qc_no, rs_id), ) return payload.get("data", {}).get("lawyer", {}) or {} def _extract_phone(self, detail: Dict) -> Optional[str]: for service in detail.get("lawyer_service", []) or []: phone = str(service.get("phone_num") or "").strip() if phone: return phone for service in detail.get("lawyer_service_new", []) or []: phone = str(service.get("phone_num") or "").strip() if phone: return phone return None def _safe_json(self, payload: Dict) -> str: return json.dumps(payload, ensure_ascii=False) def _build_record( self, city_info: Dict[str, str], area_name: str, page_num: int, list_item: Dict, detail: Dict, ) -> Dict[str, object]: qc_no = str(list_item.get("qc_no") or detail.get("qc_no") or "").strip() rs_id = str(list_item.get("rs_id") or detail.get("rs_id") or "").strip() detail_url = self._build_pc_detail_url(qc_no, rs_id) name = str(detail.get("lawyer_name") or list_item.get("lawyer_name") or "").strip() law_firm = str(detail.get("practice_company") or list_item.get("practice_company") or "").strip() city_name = str(list_item.get("city") or city_info.get("city") or "").strip() avatar_url = str(detail.get("lawyer_avatar_big") or detail.get("lawyer_avatar") or list_item.get("lawyer_avatar_big") or list_item.get("lawyer_avatar") or "").strip() phone = self._extract_phone(detail) params = { "source": { "site": "baidu_lvlin", "city_name": city_info.get("city"), "city_code": city_info.get("city_code"), "province": city_info.get("province"), "expertise_area": area_name, "page_num": page_num, "list_url": self._build_list_page_url(city_info.get("city", ""), area_name), "detail_url": detail_url, "list_api": LIST_API, "detail_api": DETAIL_API, }, "list_item": list_item, "detail": detail, } return { "name": name or None, "phone": phone or None, "law_firm": law_firm or None, "province": city_info.get("province") or None, "city": city_name or city_info.get("city") or None, "url": detail_url, "avatar_url": avatar_url or None, "domain": DOMAIN, "create_time": int(time.time()), "site_time": None, "params": self._safe_json(params), } def _insert_record(self, record: Dict[str, object]) -> bool: url = str(record.get("url") or "").strip() if not url or url in self.existing_urls: return False self.db.insert_data("lawyer", record) self.existing_urls.add(url) self.inserted_count += 1 return True def _iter_city_area(self, city_info: Dict[str, str], area_name: str) -> Tuple[int, int]: inserted = 0 pages = 0 zero_new_pages = 0 city_name = city_info["city"] for page_num in range(1, self.max_pages_per_query + 1): pages = page_num try: items = self._fetch_list(city_name, area_name, page_num) except Exception as exc: print(f"[{DOMAIN}] 列表请求失败 {city_name}-{area_name}-p{page_num}: {exc}") break if not items: print(f"[{DOMAIN}] {city_name}-{area_name} 第 {page_num} 页无数据,停止") break page_new = 0 for item in items: qc_no = str(item.get("qc_no") or "").strip() rs_id = str(item.get("rs_id") or "").strip() actual_city = str(item.get("city") or "").strip() if not qc_no or not rs_id: continue if actual_city and not self._city_matches(city_name, actual_city): continue detail_url = self._build_pc_detail_url(qc_no, rs_id) if detail_url in self.existing_urls: continue detail: Dict = {} try: detail = self._fetch_detail(qc_no, rs_id) except Exception as exc: print(f"[{DOMAIN}] 详情请求失败 {qc_no}-{rs_id}: {exc}") record = self._build_record(city_info, area_name, page_num, item, detail) try: if self._insert_record(record): page_new += 1 inserted += 1 print( f"[{DOMAIN}] -> 新增 {record.get('name') or qc_no} " f"| {city_name} | {area_name} | p{page_num}" ) except Exception as exc: print(f"[{DOMAIN}] 插入失败 {record.get('url')}: {exc}") self._sleep() print( f"[{DOMAIN}] {city_name} | {area_name} | p{page_num} " f"| 列表 {len(items)} | 新增 {page_new}" ) if len(items) < self.page_size: break if page_new == 0: zero_new_pages += 1 if zero_new_pages >= self.stop_zero_new_pages: print( f"[{DOMAIN}] {city_name}-{area_name} 连续 {zero_new_pages} 页无新增,停止" ) break else: zero_new_pages = 0 self._sleep() return inserted, pages def run(self) -> None: print(f"[{DOMAIN}] 启动采集") if not self.cities: print(f"[{DOMAIN}] 无可采城市") return if not self.areas: print(f"[{DOMAIN}] 无可采案件类型") return total_queries = len(self.cities) * len(self.areas) query_index = 0 for city_info in self.cities: city_inserted = 0 for area_name in self.areas: query_index += 1 print( f"[{DOMAIN}] 进度 {query_index}/{total_queries} | " f"{city_info['province']}-{city_info['city']} | {area_name}" ) inserted, pages = self._iter_city_area(city_info, area_name) city_inserted += inserted print( f"[{DOMAIN}] 完成 {city_info['city']} | {area_name} " f"| 翻页 {pages} | 新增 {inserted}" ) print( f"[{DOMAIN}] 城市完成 {city_info['province']}-{city_info['city']} " f"| 本城新增 {city_inserted} | 总新增 {self.inserted_count}" ) print(f"[{DOMAIN}] 采集完成,总新增 {self.inserted_count}") if __name__ == "__main__": cli_args = parse_args() with Db() as db: spider = BaiduLvlinSpider(db, cli_args) spider.run()