feat: add baidu lvlin crawler

This commit is contained in:
hello-dd-code
2026-03-20 10:40:07 +08:00
parent 7d5f5b1054
commit ff5e04d986
+473
View File
@@ -0,0 +1,473 @@
#!/usr/bin/env python3
import argparse
import json
import os
import sys
import time
from typing import Dict, List, Optional, Set, Tuple
from urllib.parse import urlencode
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from Db import Db
from request.proxy_config import get_proxies, report_proxy_status
DOMAIN = "百度法行宝"
BASE_URL = "https://lvlin.baidu.com"
CITY_API = f"{BASE_URL}/pc/api/law/sync/city"
LIST_API = f"{BASE_URL}/pc/api/law/api/lawyerlist"
DETAIL_API = f"{BASE_URL}/pc/api/law/api/lawyerhome"
DEFAULT_PAGE_SIZE = 16
DEFAULT_MAX_PAGES = 30
DEFAULT_STOP_ZERO_NEW_PAGES = 3
DEFAULT_SLEEP_SECONDS = 0.1
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="采集百度法行宝律师信息并落库")
parser.add_argument("--province", default="", help="仅采集指定省份,例如:山东")
parser.add_argument("--city", default="", help="仅采集指定城市,例如:聊城 / 聊城市")
parser.add_argument(
"--areas",
default="",
help="指定案件类型,逗号分隔;不传时自动发现顶级类型并追加不限",
)
parser.add_argument(
"--limit-cities",
type=int,
default=0,
help="仅处理前 N 个城市,0 表示不限",
)
parser.add_argument(
"--page-size",
type=int,
default=DEFAULT_PAGE_SIZE,
help=f"每次列表请求条数,默认 {DEFAULT_PAGE_SIZE}",
)
parser.add_argument(
"--max-pages-per-query",
type=int,
default=DEFAULT_MAX_PAGES,
help=f"单城市单类型最大翻页数,默认 {DEFAULT_MAX_PAGES}",
)
parser.add_argument(
"--stop-zero-new-pages",
type=int,
default=DEFAULT_STOP_ZERO_NEW_PAGES,
help=f"连续多少页无新增就停止当前查询,默认 {DEFAULT_STOP_ZERO_NEW_PAGES}",
)
parser.add_argument(
"--sleep-seconds",
type=float,
default=DEFAULT_SLEEP_SECONDS,
help=f"请求间隔秒数,默认 {DEFAULT_SLEEP_SECONDS}",
)
return parser.parse_args()
class BaiduLvlinSpider:
def __init__(self, db_connection: Db, args: argparse.Namespace):
self.db = db_connection
self.args = args
self.page_size = max(1, int(args.page_size or DEFAULT_PAGE_SIZE))
self.max_pages_per_query = max(1, int(args.max_pages_per_query or DEFAULT_MAX_PAGES))
self.stop_zero_new_pages = max(1, int(args.stop_zero_new_pages or DEFAULT_STOP_ZERO_NEW_PAGES))
self.sleep_seconds = max(0.0, float(args.sleep_seconds or 0.0))
self.proxy_enabled = False
self.session = self._build_session()
self.existing_urls = self._load_existing_urls()
self.cities = self._load_cities()
self.areas = self._load_areas()
self.inserted_count = 0
def _build_session(self) -> requests.Session:
report_proxy_status()
session = requests.Session()
session.trust_env = False
proxies = get_proxies()
if proxies:
session.proxies.update(proxies)
self.proxy_enabled = True
else:
session.proxies.clear()
self.proxy_enabled = False
retries = Retry(
total=3,
backoff_factor=1,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=frozenset(["GET"]),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update(
{
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Referer": f"{BASE_URL}/pc/r?vn=law",
"Connection": "close",
}
)
return session
def _disable_proxy(self) -> None:
if not self.proxy_enabled:
return
self.session.proxies.clear()
self.proxy_enabled = False
print(f"[{DOMAIN}] 代理不可用,已切换直连")
def _sleep(self) -> None:
if self.sleep_seconds > 0:
time.sleep(self.sleep_seconds)
def _get_json(self, url: str, params: Optional[Dict[str, object]] = None, referer: str = "") -> Dict:
headers = {}
if referer:
headers["Referer"] = referer
try:
resp = self.session.get(url, params=params or {}, timeout=20, headers=headers)
except requests.exceptions.ProxyError:
self._disable_proxy()
resp = self.session.get(url, params=params or {}, timeout=20, headers=headers)
try:
resp.raise_for_status()
return resp.json()
finally:
resp.close()
def _load_existing_urls(self) -> Set[str]:
urls: Set[str] = set()
cursor = self.db.db.cursor()
try:
cursor.execute("SELECT url FROM lawyer WHERE domain=%s AND url IS NOT NULL", (DOMAIN,))
for row in cursor.fetchall():
url = (row[0] or "").strip()
if url:
urls.add(url)
finally:
cursor.close()
print(f"[{DOMAIN}] 已存在 URL 数: {len(urls)}")
return urls
def _normalize_city_name(self, city_name: str) -> str:
text = str(city_name or "").strip()
if text.endswith(""):
return text[:-1]
return text
def _city_matches(self, expected_city: str, actual_city: str) -> bool:
left = self._normalize_city_name(expected_city)
right = self._normalize_city_name(actual_city)
if not left or not right:
return False
return left == right
def _load_cities(self) -> List[Dict[str, str]]:
payload = self._get_json(CITY_API, params={"vn": "law"}, referer=f"{BASE_URL}/pc/r?vn=law")
all_city_list = payload.get("data", {}).get("AllCityList", []) or []
cities: List[Dict[str, str]] = []
province_filter = self.args.province.strip()
city_filter = self._normalize_city_name(self.args.city)
for block in all_city_list:
for item in block.get("cityList", []) or []:
city_name = str(item.get("name") or "").strip()
province = str(item.get("province") or "").strip()
city_code = str(item.get("code") or "").strip()
if not city_name or not province or not city_code:
continue
if province_filter and province != province_filter:
continue
if city_filter and self._normalize_city_name(city_name) != city_filter:
continue
cities.append(
{
"province": province,
"city": city_name,
"city_code": city_code,
}
)
cities.sort(key=lambda item: (item["province"], item["city"]))
if self.args.limit_cities and self.args.limit_cities > 0:
cities = cities[: self.args.limit_cities]
print(f"[{DOMAIN}] 本次待采城市数: {len(cities)}")
return cities
def _discover_top_level_areas(self) -> List[str]:
sample_city = self.cities[0]["city"] if self.cities else "北京"
payload = self._get_json(
LIST_API,
params={
"city_name": sample_city,
"page_num": 1,
"page_size": self.page_size,
"ts": int(time.time()),
"clientType": "pc",
"list_type": 1,
},
referer=f"{BASE_URL}/pc/r?vn=law",
)
filters = payload.get("data", {}).get("filters", []) or []
areas: List[str] = ["不限"]
seen = {"不限"}
for item in filters:
if item.get("key") != "type":
continue
for option in item.get("options", []) or []:
value = str(option.get("value") or "").strip()
if not value or value in seen:
continue
seen.add(value)
areas.append(value)
return areas
def _load_areas(self) -> List[str]:
if self.args.areas.strip():
areas = [part.strip() for part in self.args.areas.split(",") if part.strip()]
unique: List[str] = []
seen = set()
for area in areas:
if area not in seen:
seen.add(area)
unique.append(area)
print(f"[{DOMAIN}] 使用指定案件类型: {unique}")
return unique
areas = self._discover_top_level_areas()
print(f"[{DOMAIN}] 自动发现案件类型: {areas}")
return areas
def _build_pc_detail_url(self, qc_no: str, rs_id: str) -> str:
return f"{BASE_URL}/pc/lawyer?vn=law&qc_no={qc_no}&rs_id={rs_id}"
def _build_list_page_url(self, city_name: str, area_name: str) -> str:
params = {"city": city_name, "vn": "law"}
if area_name and area_name != "不限":
params["expertiseArea"] = area_name
return f"{BASE_URL}/pc/r?{urlencode(params)}"
def _fetch_list(self, city_name: str, area_name: str, page_num: int) -> List[Dict]:
params: Dict[str, object] = {
"city_name": city_name,
"page_num": page_num,
"page_size": self.page_size,
"ts": int(time.time()),
"clientType": "pc",
"list_type": 1,
}
if area_name and area_name != "不限":
params["expertiseArea"] = area_name
payload = self._get_json(
LIST_API,
params=params,
referer=self._build_list_page_url(city_name, area_name),
)
return payload.get("data", {}).get("lawyer_list", []) or []
def _fetch_detail(self, qc_no: str, rs_id: str) -> Dict:
payload = self._get_json(
DETAIL_API,
params={"vn": "law", "qc_no": qc_no, "rs_id": rs_id},
referer=self._build_pc_detail_url(qc_no, rs_id),
)
return payload.get("data", {}).get("lawyer", {}) or {}
def _extract_phone(self, detail: Dict) -> Optional[str]:
for service in detail.get("lawyer_service", []) or []:
phone = str(service.get("phone_num") or "").strip()
if phone:
return phone
for service in detail.get("lawyer_service_new", []) or []:
phone = str(service.get("phone_num") or "").strip()
if phone:
return phone
return None
def _safe_json(self, payload: Dict) -> str:
return json.dumps(payload, ensure_ascii=False)
def _build_record(
self,
city_info: Dict[str, str],
area_name: str,
page_num: int,
list_item: Dict,
detail: Dict,
) -> Dict[str, object]:
qc_no = str(list_item.get("qc_no") or detail.get("qc_no") or "").strip()
rs_id = str(list_item.get("rs_id") or detail.get("rs_id") or "").strip()
detail_url = self._build_pc_detail_url(qc_no, rs_id)
name = str(detail.get("lawyer_name") or list_item.get("lawyer_name") or "").strip()
law_firm = str(detail.get("practice_company") or list_item.get("practice_company") or "").strip()
city_name = str(list_item.get("city") or city_info.get("city") or "").strip()
avatar_url = str(detail.get("lawyer_avatar_big") or detail.get("lawyer_avatar") or list_item.get("lawyer_avatar_big") or list_item.get("lawyer_avatar") or "").strip()
phone = self._extract_phone(detail)
params = {
"source": {
"site": "baidu_lvlin",
"city_name": city_info.get("city"),
"city_code": city_info.get("city_code"),
"province": city_info.get("province"),
"expertise_area": area_name,
"page_num": page_num,
"list_url": self._build_list_page_url(city_info.get("city", ""), area_name),
"detail_url": detail_url,
"list_api": LIST_API,
"detail_api": DETAIL_API,
},
"list_item": list_item,
"detail": detail,
}
return {
"name": name or None,
"phone": phone or None,
"law_firm": law_firm or None,
"province": city_info.get("province") or None,
"city": city_name or city_info.get("city") or None,
"url": detail_url,
"avatar_url": avatar_url or None,
"domain": DOMAIN,
"create_time": int(time.time()),
"site_time": None,
"params": self._safe_json(params),
}
def _insert_record(self, record: Dict[str, object]) -> bool:
url = str(record.get("url") or "").strip()
if not url or url in self.existing_urls:
return False
self.db.insert_data("lawyer", record)
self.existing_urls.add(url)
self.inserted_count += 1
return True
def _iter_city_area(self, city_info: Dict[str, str], area_name: str) -> Tuple[int, int]:
inserted = 0
pages = 0
zero_new_pages = 0
city_name = city_info["city"]
for page_num in range(1, self.max_pages_per_query + 1):
pages = page_num
try:
items = self._fetch_list(city_name, area_name, page_num)
except Exception as exc:
print(f"[{DOMAIN}] 列表请求失败 {city_name}-{area_name}-p{page_num}: {exc}")
break
if not items:
print(f"[{DOMAIN}] {city_name}-{area_name}{page_num} 页无数据,停止")
break
page_new = 0
for item in items:
qc_no = str(item.get("qc_no") or "").strip()
rs_id = str(item.get("rs_id") or "").strip()
actual_city = str(item.get("city") or "").strip()
if not qc_no or not rs_id:
continue
if actual_city and not self._city_matches(city_name, actual_city):
continue
detail_url = self._build_pc_detail_url(qc_no, rs_id)
if detail_url in self.existing_urls:
continue
detail: Dict = {}
try:
detail = self._fetch_detail(qc_no, rs_id)
except Exception as exc:
print(f"[{DOMAIN}] 详情请求失败 {qc_no}-{rs_id}: {exc}")
record = self._build_record(city_info, area_name, page_num, item, detail)
try:
if self._insert_record(record):
page_new += 1
inserted += 1
print(
f"[{DOMAIN}] -> 新增 {record.get('name') or qc_no} "
f"| {city_name} | {area_name} | p{page_num}"
)
except Exception as exc:
print(f"[{DOMAIN}] 插入失败 {record.get('url')}: {exc}")
self._sleep()
print(
f"[{DOMAIN}] {city_name} | {area_name} | p{page_num} "
f"| 列表 {len(items)} | 新增 {page_new}"
)
if len(items) < self.page_size:
break
if page_new == 0:
zero_new_pages += 1
if zero_new_pages >= self.stop_zero_new_pages:
print(
f"[{DOMAIN}] {city_name}-{area_name} 连续 {zero_new_pages} 页无新增,停止"
)
break
else:
zero_new_pages = 0
self._sleep()
return inserted, pages
def run(self) -> None:
print(f"[{DOMAIN}] 启动采集")
if not self.cities:
print(f"[{DOMAIN}] 无可采城市")
return
if not self.areas:
print(f"[{DOMAIN}] 无可采案件类型")
return
total_queries = len(self.cities) * len(self.areas)
query_index = 0
for city_info in self.cities:
city_inserted = 0
for area_name in self.areas:
query_index += 1
print(
f"[{DOMAIN}] 进度 {query_index}/{total_queries} | "
f"{city_info['province']}-{city_info['city']} | {area_name}"
)
inserted, pages = self._iter_city_area(city_info, area_name)
city_inserted += inserted
print(
f"[{DOMAIN}] 完成 {city_info['city']} | {area_name} "
f"| 翻页 {pages} | 新增 {inserted}"
)
print(
f"[{DOMAIN}] 城市完成 {city_info['province']}-{city_info['city']} "
f"| 本城新增 {city_inserted} | 总新增 {self.inserted_count}"
)
print(f"[{DOMAIN}] 采集完成,总新增 {self.inserted_count}")
if __name__ == "__main__":
cli_args = parse_args()
with Db() as db:
spider = BaiduLvlinSpider(db, cli_args)
spider.run()