Files
lawyers/common_sites/dls_pc.py
T
2026-04-03 16:06:28 +08:00

439 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import random
import re
import sys
import time
from typing import Dict, List, Optional, Set, Tuple
from urllib.parse import urljoin
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import urllib3
from bs4 import BeautifulSoup
from request.proxy_config import get_proxies, report_proxy_status
from utils.rate_limiter import request_slot
from Db import Db
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DOMAIN = "大律师"
SITE_BASE = "https://www.maxlaw.cn"
LIST_URL_TEMPLATE = SITE_BASE + "/law/{pinyin}?page={page}"
PROVINCE_API = "https://js.maxlaw.cn/js/ajax/common/getprovice.js"
CITY_API_TEMPLATE = "https://js.maxlaw.cn/js/ajax/common/getcity_{province_id}.js"
PHONE_RE = re.compile(r"1[3-9]\d{9}")
REPLY_RE = re.compile(r"已回复[:]?\s*(\d+)")
AREA_PREFIX_RE = re.compile(r"^[A-Za-z]\s*")
def normalize_phone(text: str) -> str:
compact = re.sub(r"\D", "", text or "")
match = PHONE_RE.search(compact)
return match.group(0) if match else ""
def clean_area_name(text: str) -> str:
value = AREA_PREFIX_RE.sub("", (text or "").strip())
return value.strip()
def normalize_region_text(text: str) -> str:
value = (text or "").strip()
value = value.replace("\xa0", " ")
value = value.replace("", "-").replace("", "-").replace("", "-")
value = re.sub(r"\s*-\s*", "-", value)
value = re.sub(r"\s+", "", value)
return value
class DlsPcSpider:
def __init__(self, db_connection):
self.db = db_connection
self.session = self._build_session()
self.max_pages = int(os.getenv("MAXLAW_PC_MAX_PAGES", "100"))
self.areas = self._load_areas()
def _build_session(self) -> requests.Session:
report_proxy_status()
session = requests.Session()
session.trust_env = False
proxies = get_proxies()
if proxies:
session.proxies.update(proxies)
else:
session.proxies.clear()
retries = Retry(
total=3,
backoff_factor=1,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=frozenset(["GET"]),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "close",
})
return session
def _refresh_session(self) -> None:
try:
self.session.close()
except Exception:
pass
self.session = self._build_session()
def _get(self, url: str, max_retries: int = 3, headers: Optional[Dict[str, str]] = None) -> Optional[str]:
for attempt in range(max_retries):
try:
with request_slot():
resp = self.session.get(url, timeout=(10, 25), verify=False, headers=headers)
status_code = resp.status_code
text = resp.text
resp.close()
if status_code == 403:
if attempt < max_retries - 1:
wait_time = 2 ** attempt + random.uniform(0.3, 1.0)
print(f"403被拦截,{wait_time:.1f}秒后重试 ({attempt + 1}/{max_retries}): {url}")
self._refresh_session()
time.sleep(wait_time)
continue
print(f"请求失败 {url}: 403 Forbidden")
return None
if status_code >= 400:
raise requests.exceptions.HTTPError(f"{status_code} Error: {url}")
return text
except requests.exceptions.RequestException as exc:
if attempt < max_retries - 1:
wait_time = 2 ** attempt + random.uniform(0.3, 1.0)
print(f"请求失败,{wait_time:.1f}秒后重试 ({attempt + 1}/{max_retries}): {url} -> {exc}")
time.sleep(wait_time)
continue
print(f"请求失败 {url}: {exc}")
return None
return None
def _get_json(self, url: str) -> Optional[Dict]:
text = self._get(url)
if not text:
return None
try:
return json.loads(text.strip().lstrip("\ufeff"))
except ValueError as exc:
print(f"解析JSON失败 {url}: {exc}")
return None
def _load_areas(self) -> List[Dict[str, str]]:
areas = self._load_areas_from_site()
if areas:
print(f"[大律师PC] 地区来源: site, 地区数: {len(areas)}")
return areas
areas = self._load_areas_from_db()
if areas:
print(f"[大律师PC] 地区来源: db, 地区数: {len(areas)}")
return areas
print("[大律师PC] 无地区数据")
return []
def _load_areas_from_site(self) -> List[Dict[str, str]]:
data = self._get_json(PROVINCE_API)
if not data or str(data.get("status")) != "1":
return []
result: List[Dict[str, str]] = []
seen_pinyin: Set[str] = set()
for province in data.get("ds", []) or []:
province_id = province.get("id")
province_name = clean_area_name(province.get("name", ""))
province_pinyin = (province.get("py_code") or "").strip()
city_rows = []
if province_id:
city_data = self._get_json(CITY_API_TEMPLATE.format(province_id=province_id))
if city_data and str(city_data.get("status")) == "1":
city_rows = city_data.get("ds", []) or []
if not city_rows and province_pinyin and province_pinyin not in seen_pinyin:
seen_pinyin.add(province_pinyin)
result.append({
"province": province_name,
"city": province_name,
"pinyin": province_pinyin,
})
continue
for city in city_rows:
city_name = clean_area_name(city.get("name", ""))
city_pinyin = (city.get("py_code") or "").strip()
if not city_pinyin or city_pinyin in seen_pinyin:
continue
seen_pinyin.add(city_pinyin)
result.append({
"province": province_name,
"city": city_name or province_name,
"pinyin": city_pinyin,
})
return result
def _load_areas_from_db(self) -> List[Dict[str, str]]:
tables = ("area_new", "area", "area2")
last_error = None
for table in tables:
try:
rows = self.db.select_data(
table,
"province, city, pinyin",
"domain='maxlaw' AND level=2",
) or []
except Exception as exc:
last_error = exc
continue
if rows:
return rows
if last_error:
print(f"[大律师PC] 加载数据库地区失败: {last_error}")
return []
def _existing_phones(self, phones: List[str]) -> Set[str]:
if not phones:
return set()
existing: Set[str] = set()
cur = self.db.db.cursor()
try:
chunk_size = 500
for i in range(0, len(phones), chunk_size):
chunk = phones[i:i + chunk_size]
placeholders = ",".join(["%s"] * len(chunk))
sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})"
cur.execute(sql, [DOMAIN, *chunk])
for row in cur.fetchall():
existing.add(row[0])
finally:
cur.close()
return existing
def _build_list_url(self, pinyin: str, page: int) -> str:
return LIST_URL_TEMPLATE.format(pinyin=pinyin, page=page)
def _parse_location_line(
self,
text: str,
fallback_province: str,
fallback_city: str,
) -> Tuple[str, str, str]:
raw = (text or "").replace("\xa0", " ")
raw = re.sub(r"\s+", " ", raw).strip()
if not raw:
return fallback_province, fallback_city or fallback_province, ""
parts = raw.split(" ", 1)
area_text = parts[0].strip()
law_firm = parts[1].strip() if len(parts) > 1 else ""
province = fallback_province
city = fallback_city or fallback_province
if "-" in area_text:
area_parts = [item.strip() for item in area_text.split("-", 1)]
if area_parts[0]:
province = area_parts[0]
if len(area_parts) > 1 and area_parts[1]:
city = area_parts[1]
elif area_text:
province = area_text
city = area_text
return province, city, law_firm
def _extract_page_region(self, soup: BeautifulSoup) -> str:
button = soup.select_one(".filter .filter-btn")
if button:
return normalize_region_text(button.get_text(" ", strip=True))
title = soup.select_one(".findLawyer-title h1")
if title:
return normalize_region_text(title.get_text(strip=True).replace("律师", ""))
return ""
def _page_matches_area(self, soup: BeautifulSoup, province: str, city: str) -> Tuple[bool, str]:
current_region = self._extract_page_region(soup)
if not current_region:
return True, current_region
if "全国" in current_region:
return False, current_region
norm_province = normalize_region_text(province)
norm_city = normalize_region_text(city or province)
if norm_city and norm_city != norm_province:
matched = norm_province in current_region and norm_city in current_region
else:
matched = norm_province in current_region
if matched:
return True, current_region
title = soup.select_one(".findLawyer-title h1")
title_text = ""
if title:
title_text = normalize_region_text(title.get_text(strip=True).replace("律师", ""))
if norm_city and norm_city != norm_province:
matched = norm_city in title_text
else:
matched = norm_province in title_text
return matched, current_region or title_text
def _parse_list(self, html: str, province: str, city: str, list_url: str, area_pinyin: str) -> Tuple[bool, int, int]:
soup = BeautifulSoup(html, "html.parser")
matched, current_region = self._page_matches_area(soup, province, city)
if not matched:
print(f" 页面地区不匹配,停止分页: 目标={province}-{city} 当前={current_region or '未知'}")
return False, 0, 0
cards = []
seen_page_phone: Set[str] = set()
for item in soup.select("ul.findLawyer-list > li.clearfix"):
name_link = item.select_one(".findLawyer-list-detail-name a[href]")
phone_tag = item.select_one(".findLawyer-list-detail-name span")
if not name_link or not phone_tag:
continue
phone = normalize_phone(phone_tag.get_text(" ", strip=True))
if not phone or phone in seen_page_phone:
continue
seen_page_phone.add(phone)
name = name_link.get_text(strip=True)
detail_url = urljoin(SITE_BASE, name_link.get("href", "").strip())
location_tag = item.select_one(".findLawyer-list-detail-the")
card_province, card_city, law_firm = self._parse_location_line(
location_tag.get_text(" ", strip=True) if location_tag else "",
province,
city,
)
specialties = []
for dd in item.select(".findLawyer-list-detail-fields dd"):
text = dd.get_text(strip=True)
if text:
specialties.append(text)
reply_count = None
reply_tag = item.select_one(".findLawyer-list-detail-other a")
if reply_tag:
match = REPLY_RE.search(reply_tag.get_text(" ", strip=True))
if match:
reply_count = int(match.group(1))
cards.append({
"name": name,
"law_firm": law_firm,
"province": card_province or province,
"city": card_city or city or province,
"phone": phone,
"url": detail_url,
"domain": DOMAIN,
"create_time": int(time.time()),
"params": json.dumps({
"area_pinyin": area_pinyin,
"source": list_url,
"specialties": specialties,
"reply_count": reply_count,
}, ensure_ascii=False),
})
if not cards:
return True, 0, 0
phones = [item["phone"] for item in cards if item.get("phone")]
existing = self._existing_phones(phones)
inserted = 0
for item in cards:
phone = item.get("phone")
if not phone:
continue
if phone in existing:
print(f" -- 已存在: {item['name']} ({phone})")
continue
try:
self.db.insert_data("lawyer", item)
inserted += 1
print(f" -> 新增: {item['name']} ({phone})")
except Exception as exc:
print(f" 插入失败 {item.get('url')}: {exc}")
return True, inserted, len(cards)
def run(self):
print("启动大律师 PC 站采集...")
if not self.areas:
print("无地区数据")
return
for area in self.areas:
province = (area.get("province") or "").strip()
city = (area.get("city") or province).strip()
pinyin = (area.get("pinyin") or "").strip()
if not province or not pinyin:
continue
area_label = province if not city or city == province else f"{province}-{city}"
print(f"采集地区: {area_label} ({pinyin})")
for page in range(1, self.max_pages + 1):
list_url = self._build_list_url(pinyin, page)
print(f"{page} 页: {list_url}")
html = self._get(list_url, headers={"Referer": SITE_BASE + "/law"})
if not html:
break
page_ok, inserted, parsed_count = self._parse_list(html, province, city, list_url, pinyin)
if not page_ok:
break
if parsed_count == 0:
print(" 当前页无律师卡片,停止")
break
if inserted == 0:
print(" 当前页无新增数据")
time.sleep(0.5)
print("大律师 PC 站采集完成")
if __name__ == "__main__":
with Db() as db:
spider = DlsPcSpider(db)
spider.run()