From ff5e04d98621dde1592afef0801e088afba133d8 Mon Sep 17 00:00:00 2001 From: hello-dd-code Date: Fri, 20 Mar 2026 10:40:07 +0800 Subject: [PATCH] feat: add baidu lvlin crawler --- common_sites/baidu_lvlin.py | 473 ++++++++++++++++++++++++++++++++++++ 1 file changed, 473 insertions(+) create mode 100644 common_sites/baidu_lvlin.py diff --git a/common_sites/baidu_lvlin.py b/common_sites/baidu_lvlin.py new file mode 100644 index 0000000..7fb5859 --- /dev/null +++ b/common_sites/baidu_lvlin.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +import sys +import time +from typing import Dict, List, Optional, Set, Tuple +from urllib.parse import urlencode + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from Db import Db +from request.proxy_config import get_proxies, report_proxy_status + + +DOMAIN = "百度法行宝" +BASE_URL = "https://lvlin.baidu.com" +CITY_API = f"{BASE_URL}/pc/api/law/sync/city" +LIST_API = f"{BASE_URL}/pc/api/law/api/lawyerlist" +DETAIL_API = f"{BASE_URL}/pc/api/law/api/lawyerhome" +DEFAULT_PAGE_SIZE = 16 +DEFAULT_MAX_PAGES = 30 +DEFAULT_STOP_ZERO_NEW_PAGES = 3 +DEFAULT_SLEEP_SECONDS = 0.1 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="采集百度法行宝律师信息并落库") + parser.add_argument("--province", default="", help="仅采集指定省份,例如:山东") + parser.add_argument("--city", default="", help="仅采集指定城市,例如:聊城 / 聊城市") + parser.add_argument( + "--areas", + default="", + help="指定案件类型,逗号分隔;不传时自动发现顶级类型并追加不限", + ) + parser.add_argument( + "--limit-cities", + type=int, + default=0, + help="仅处理前 N 个城市,0 表示不限", + ) + parser.add_argument( + "--page-size", + type=int, + default=DEFAULT_PAGE_SIZE, + help=f"每次列表请求条数,默认 {DEFAULT_PAGE_SIZE}", + ) + parser.add_argument( + "--max-pages-per-query", + type=int, + default=DEFAULT_MAX_PAGES, + help=f"单城市单类型最大翻页数,默认 {DEFAULT_MAX_PAGES}", + ) + parser.add_argument( + "--stop-zero-new-pages", + type=int, + default=DEFAULT_STOP_ZERO_NEW_PAGES, + help=f"连续多少页无新增就停止当前查询,默认 {DEFAULT_STOP_ZERO_NEW_PAGES}", + ) + parser.add_argument( + "--sleep-seconds", + type=float, + default=DEFAULT_SLEEP_SECONDS, + help=f"请求间隔秒数,默认 {DEFAULT_SLEEP_SECONDS}", + ) + return parser.parse_args() + + +class BaiduLvlinSpider: + def __init__(self, db_connection: Db, args: argparse.Namespace): + self.db = db_connection + self.args = args + self.page_size = max(1, int(args.page_size or DEFAULT_PAGE_SIZE)) + self.max_pages_per_query = max(1, int(args.max_pages_per_query or DEFAULT_MAX_PAGES)) + self.stop_zero_new_pages = max(1, int(args.stop_zero_new_pages or DEFAULT_STOP_ZERO_NEW_PAGES)) + self.sleep_seconds = max(0.0, float(args.sleep_seconds or 0.0)) + self.proxy_enabled = False + self.session = self._build_session() + self.existing_urls = self._load_existing_urls() + self.cities = self._load_cities() + self.areas = self._load_areas() + self.inserted_count = 0 + + def _build_session(self) -> requests.Session: + report_proxy_status() + session = requests.Session() + session.trust_env = False + proxies = get_proxies() + if proxies: + session.proxies.update(proxies) + self.proxy_enabled = True + else: + session.proxies.clear() + self.proxy_enabled = False + + retries = Retry( + total=3, + backoff_factor=1, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=frozenset(["GET"]), + raise_on_status=False, + ) + adapter = HTTPAdapter(max_retries=retries) + session.mount("https://", adapter) + session.mount("http://", adapter) + session.headers.update( + { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/123.0.0.0 Safari/537.36" + ), + "Accept": "application/json, text/plain, */*", + "Referer": f"{BASE_URL}/pc/r?vn=law", + "Connection": "close", + } + ) + return session + + def _disable_proxy(self) -> None: + if not self.proxy_enabled: + return + self.session.proxies.clear() + self.proxy_enabled = False + print(f"[{DOMAIN}] 代理不可用,已切换直连") + + def _sleep(self) -> None: + if self.sleep_seconds > 0: + time.sleep(self.sleep_seconds) + + def _get_json(self, url: str, params: Optional[Dict[str, object]] = None, referer: str = "") -> Dict: + headers = {} + if referer: + headers["Referer"] = referer + try: + resp = self.session.get(url, params=params or {}, timeout=20, headers=headers) + except requests.exceptions.ProxyError: + self._disable_proxy() + resp = self.session.get(url, params=params or {}, timeout=20, headers=headers) + try: + resp.raise_for_status() + return resp.json() + finally: + resp.close() + + def _load_existing_urls(self) -> Set[str]: + urls: Set[str] = set() + cursor = self.db.db.cursor() + try: + cursor.execute("SELECT url FROM lawyer WHERE domain=%s AND url IS NOT NULL", (DOMAIN,)) + for row in cursor.fetchall(): + url = (row[0] or "").strip() + if url: + urls.add(url) + finally: + cursor.close() + print(f"[{DOMAIN}] 已存在 URL 数: {len(urls)}") + return urls + + def _normalize_city_name(self, city_name: str) -> str: + text = str(city_name or "").strip() + if text.endswith("市"): + return text[:-1] + return text + + def _city_matches(self, expected_city: str, actual_city: str) -> bool: + left = self._normalize_city_name(expected_city) + right = self._normalize_city_name(actual_city) + if not left or not right: + return False + return left == right + + def _load_cities(self) -> List[Dict[str, str]]: + payload = self._get_json(CITY_API, params={"vn": "law"}, referer=f"{BASE_URL}/pc/r?vn=law") + all_city_list = payload.get("data", {}).get("AllCityList", []) or [] + cities: List[Dict[str, str]] = [] + province_filter = self.args.province.strip() + city_filter = self._normalize_city_name(self.args.city) + + for block in all_city_list: + for item in block.get("cityList", []) or []: + city_name = str(item.get("name") or "").strip() + province = str(item.get("province") or "").strip() + city_code = str(item.get("code") or "").strip() + if not city_name or not province or not city_code: + continue + if province_filter and province != province_filter: + continue + if city_filter and self._normalize_city_name(city_name) != city_filter: + continue + cities.append( + { + "province": province, + "city": city_name, + "city_code": city_code, + } + ) + + cities.sort(key=lambda item: (item["province"], item["city"])) + if self.args.limit_cities and self.args.limit_cities > 0: + cities = cities[: self.args.limit_cities] + print(f"[{DOMAIN}] 本次待采城市数: {len(cities)}") + return cities + + def _discover_top_level_areas(self) -> List[str]: + sample_city = self.cities[0]["city"] if self.cities else "北京" + payload = self._get_json( + LIST_API, + params={ + "city_name": sample_city, + "page_num": 1, + "page_size": self.page_size, + "ts": int(time.time()), + "clientType": "pc", + "list_type": 1, + }, + referer=f"{BASE_URL}/pc/r?vn=law", + ) + filters = payload.get("data", {}).get("filters", []) or [] + areas: List[str] = ["不限"] + seen = {"不限"} + for item in filters: + if item.get("key") != "type": + continue + for option in item.get("options", []) or []: + value = str(option.get("value") or "").strip() + if not value or value in seen: + continue + seen.add(value) + areas.append(value) + return areas + + def _load_areas(self) -> List[str]: + if self.args.areas.strip(): + areas = [part.strip() for part in self.args.areas.split(",") if part.strip()] + unique: List[str] = [] + seen = set() + for area in areas: + if area not in seen: + seen.add(area) + unique.append(area) + print(f"[{DOMAIN}] 使用指定案件类型: {unique}") + return unique + + areas = self._discover_top_level_areas() + print(f"[{DOMAIN}] 自动发现案件类型: {areas}") + return areas + + def _build_pc_detail_url(self, qc_no: str, rs_id: str) -> str: + return f"{BASE_URL}/pc/lawyer?vn=law&qc_no={qc_no}&rs_id={rs_id}" + + def _build_list_page_url(self, city_name: str, area_name: str) -> str: + params = {"city": city_name, "vn": "law"} + if area_name and area_name != "不限": + params["expertiseArea"] = area_name + return f"{BASE_URL}/pc/r?{urlencode(params)}" + + def _fetch_list(self, city_name: str, area_name: str, page_num: int) -> List[Dict]: + params: Dict[str, object] = { + "city_name": city_name, + "page_num": page_num, + "page_size": self.page_size, + "ts": int(time.time()), + "clientType": "pc", + "list_type": 1, + } + if area_name and area_name != "不限": + params["expertiseArea"] = area_name + payload = self._get_json( + LIST_API, + params=params, + referer=self._build_list_page_url(city_name, area_name), + ) + return payload.get("data", {}).get("lawyer_list", []) or [] + + def _fetch_detail(self, qc_no: str, rs_id: str) -> Dict: + payload = self._get_json( + DETAIL_API, + params={"vn": "law", "qc_no": qc_no, "rs_id": rs_id}, + referer=self._build_pc_detail_url(qc_no, rs_id), + ) + return payload.get("data", {}).get("lawyer", {}) or {} + + def _extract_phone(self, detail: Dict) -> Optional[str]: + for service in detail.get("lawyer_service", []) or []: + phone = str(service.get("phone_num") or "").strip() + if phone: + return phone + for service in detail.get("lawyer_service_new", []) or []: + phone = str(service.get("phone_num") or "").strip() + if phone: + return phone + return None + + def _safe_json(self, payload: Dict) -> str: + return json.dumps(payload, ensure_ascii=False) + + def _build_record( + self, + city_info: Dict[str, str], + area_name: str, + page_num: int, + list_item: Dict, + detail: Dict, + ) -> Dict[str, object]: + qc_no = str(list_item.get("qc_no") or detail.get("qc_no") or "").strip() + rs_id = str(list_item.get("rs_id") or detail.get("rs_id") or "").strip() + detail_url = self._build_pc_detail_url(qc_no, rs_id) + name = str(detail.get("lawyer_name") or list_item.get("lawyer_name") or "").strip() + law_firm = str(detail.get("practice_company") or list_item.get("practice_company") or "").strip() + city_name = str(list_item.get("city") or city_info.get("city") or "").strip() + avatar_url = str(detail.get("lawyer_avatar_big") or detail.get("lawyer_avatar") or list_item.get("lawyer_avatar_big") or list_item.get("lawyer_avatar") or "").strip() + phone = self._extract_phone(detail) + + params = { + "source": { + "site": "baidu_lvlin", + "city_name": city_info.get("city"), + "city_code": city_info.get("city_code"), + "province": city_info.get("province"), + "expertise_area": area_name, + "page_num": page_num, + "list_url": self._build_list_page_url(city_info.get("city", ""), area_name), + "detail_url": detail_url, + "list_api": LIST_API, + "detail_api": DETAIL_API, + }, + "list_item": list_item, + "detail": detail, + } + + return { + "name": name or None, + "phone": phone or None, + "law_firm": law_firm or None, + "province": city_info.get("province") or None, + "city": city_name or city_info.get("city") or None, + "url": detail_url, + "avatar_url": avatar_url or None, + "domain": DOMAIN, + "create_time": int(time.time()), + "site_time": None, + "params": self._safe_json(params), + } + + def _insert_record(self, record: Dict[str, object]) -> bool: + url = str(record.get("url") or "").strip() + if not url or url in self.existing_urls: + return False + self.db.insert_data("lawyer", record) + self.existing_urls.add(url) + self.inserted_count += 1 + return True + + def _iter_city_area(self, city_info: Dict[str, str], area_name: str) -> Tuple[int, int]: + inserted = 0 + pages = 0 + zero_new_pages = 0 + city_name = city_info["city"] + + for page_num in range(1, self.max_pages_per_query + 1): + pages = page_num + try: + items = self._fetch_list(city_name, area_name, page_num) + except Exception as exc: + print(f"[{DOMAIN}] 列表请求失败 {city_name}-{area_name}-p{page_num}: {exc}") + break + + if not items: + print(f"[{DOMAIN}] {city_name}-{area_name} 第 {page_num} 页无数据,停止") + break + + page_new = 0 + for item in items: + qc_no = str(item.get("qc_no") or "").strip() + rs_id = str(item.get("rs_id") or "").strip() + actual_city = str(item.get("city") or "").strip() + if not qc_no or not rs_id: + continue + if actual_city and not self._city_matches(city_name, actual_city): + continue + + detail_url = self._build_pc_detail_url(qc_no, rs_id) + if detail_url in self.existing_urls: + continue + + detail: Dict = {} + try: + detail = self._fetch_detail(qc_no, rs_id) + except Exception as exc: + print(f"[{DOMAIN}] 详情请求失败 {qc_no}-{rs_id}: {exc}") + + record = self._build_record(city_info, area_name, page_num, item, detail) + try: + if self._insert_record(record): + page_new += 1 + inserted += 1 + print( + f"[{DOMAIN}] -> 新增 {record.get('name') or qc_no} " + f"| {city_name} | {area_name} | p{page_num}" + ) + except Exception as exc: + print(f"[{DOMAIN}] 插入失败 {record.get('url')}: {exc}") + self._sleep() + + print( + f"[{DOMAIN}] {city_name} | {area_name} | p{page_num} " + f"| 列表 {len(items)} | 新增 {page_new}" + ) + + if len(items) < self.page_size: + break + if page_new == 0: + zero_new_pages += 1 + if zero_new_pages >= self.stop_zero_new_pages: + print( + f"[{DOMAIN}] {city_name}-{area_name} 连续 {zero_new_pages} 页无新增,停止" + ) + break + else: + zero_new_pages = 0 + + self._sleep() + + return inserted, pages + + def run(self) -> None: + print(f"[{DOMAIN}] 启动采集") + if not self.cities: + print(f"[{DOMAIN}] 无可采城市") + return + if not self.areas: + print(f"[{DOMAIN}] 无可采案件类型") + return + + total_queries = len(self.cities) * len(self.areas) + query_index = 0 + for city_info in self.cities: + city_inserted = 0 + for area_name in self.areas: + query_index += 1 + print( + f"[{DOMAIN}] 进度 {query_index}/{total_queries} | " + f"{city_info['province']}-{city_info['city']} | {area_name}" + ) + inserted, pages = self._iter_city_area(city_info, area_name) + city_inserted += inserted + print( + f"[{DOMAIN}] 完成 {city_info['city']} | {area_name} " + f"| 翻页 {pages} | 新增 {inserted}" + ) + print( + f"[{DOMAIN}] 城市完成 {city_info['province']}-{city_info['city']} " + f"| 本城新增 {city_inserted} | 总新增 {self.inserted_count}" + ) + print(f"[{DOMAIN}] 采集完成,总新增 {self.inserted_count}") + + +if __name__ == "__main__": + cli_args = parse_args() + with Db() as db: + spider = BaiduLvlinSpider(db, cli_args) + spider.run()