import json import os import random import re import sys import time from typing import Dict, List, Optional, Set, Tuple from urllib.parse import urljoin current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) request_dir = os.path.join(project_root, "request") if request_dir not in sys.path: sys.path.insert(0, request_dir) if project_root not in sys.path: sys.path.append(project_root) import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import urllib3 from bs4 import BeautifulSoup from request.proxy_config import get_proxies, report_proxy_status from utils.rate_limiter import request_slot from Db import Db urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) DOMAIN = "大律师" SITE_BASE = "https://www.maxlaw.cn" LIST_URL_TEMPLATE = SITE_BASE + "/law/{pinyin}?page={page}" PROVINCE_API = "https://js.maxlaw.cn/js/ajax/common/getprovice.js" CITY_API_TEMPLATE = "https://js.maxlaw.cn/js/ajax/common/getcity_{province_id}.js" PHONE_RE = re.compile(r"1[3-9]\d{9}") REPLY_RE = re.compile(r"已回复[::]?\s*(\d+)") AREA_PREFIX_RE = re.compile(r"^[A-Za-z]\s*") def normalize_phone(text: str) -> str: compact = re.sub(r"\D", "", text or "") match = PHONE_RE.search(compact) return match.group(0) if match else "" def clean_area_name(text: str) -> str: value = AREA_PREFIX_RE.sub("", (text or "").strip()) return value.strip() def normalize_region_text(text: str) -> str: value = (text or "").strip() value = value.replace("\xa0", " ") value = value.replace("-", "-").replace("—", "-").replace("–", "-") value = re.sub(r"\s*-\s*", "-", value) value = re.sub(r"\s+", "", value) return value class DlsPcSpider: def __init__(self, db_connection): self.db = db_connection self.session = self._build_session() self.max_pages = int(os.getenv("MAXLAW_PC_MAX_PAGES", "100")) self.areas = self._load_areas() def _build_session(self) -> requests.Session: report_proxy_status() session = requests.Session() session.trust_env = False proxies = get_proxies() if proxies: session.proxies.update(proxies) else: session.proxies.clear() retries = Retry( total=3, backoff_factor=1, status_forcelist=(429, 500, 502, 503, 504), allowed_methods=frozenset(["GET"]), raise_on_status=False, ) adapter = HTTPAdapter(max_retries=retries) session.mount("https://", adapter) session.mount("http://", adapter) session.headers.update({ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/136.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Connection": "close", }) return session def _refresh_session(self) -> None: try: self.session.close() except Exception: pass self.session = self._build_session() def _get(self, url: str, max_retries: int = 3, headers: Optional[Dict[str, str]] = None) -> Optional[str]: for attempt in range(max_retries): try: with request_slot(): resp = self.session.get(url, timeout=(10, 25), verify=False, headers=headers) status_code = resp.status_code text = resp.text resp.close() if status_code == 403: if attempt < max_retries - 1: wait_time = 2 ** attempt + random.uniform(0.3, 1.0) print(f"403被拦截,{wait_time:.1f}秒后重试 ({attempt + 1}/{max_retries}): {url}") self._refresh_session() time.sleep(wait_time) continue print(f"请求失败 {url}: 403 Forbidden") return None if status_code >= 400: raise requests.exceptions.HTTPError(f"{status_code} Error: {url}") return text except requests.exceptions.RequestException as exc: if attempt < max_retries - 1: wait_time = 2 ** attempt + random.uniform(0.3, 1.0) print(f"请求失败,{wait_time:.1f}秒后重试 ({attempt + 1}/{max_retries}): {url} -> {exc}") time.sleep(wait_time) continue print(f"请求失败 {url}: {exc}") return None return None def _get_json(self, url: str) -> Optional[Dict]: text = self._get(url) if not text: return None try: return json.loads(text.strip().lstrip("\ufeff")) except ValueError as exc: print(f"解析JSON失败 {url}: {exc}") return None def _load_areas(self) -> List[Dict[str, str]]: areas = self._load_areas_from_site() if areas: print(f"[大律师PC] 地区来源: site, 地区数: {len(areas)}") return areas areas = self._load_areas_from_db() if areas: print(f"[大律师PC] 地区来源: db, 地区数: {len(areas)}") return areas print("[大律师PC] 无地区数据") return [] def _load_areas_from_site(self) -> List[Dict[str, str]]: data = self._get_json(PROVINCE_API) if not data or str(data.get("status")) != "1": return [] result: List[Dict[str, str]] = [] seen_pinyin: Set[str] = set() for province in data.get("ds", []) or []: province_id = province.get("id") province_name = clean_area_name(province.get("name", "")) province_pinyin = (province.get("py_code") or "").strip() city_rows = [] if province_id: city_data = self._get_json(CITY_API_TEMPLATE.format(province_id=province_id)) if city_data and str(city_data.get("status")) == "1": city_rows = city_data.get("ds", []) or [] if not city_rows and province_pinyin and province_pinyin not in seen_pinyin: seen_pinyin.add(province_pinyin) result.append({ "province": province_name, "city": province_name, "pinyin": province_pinyin, }) continue for city in city_rows: city_name = clean_area_name(city.get("name", "")) city_pinyin = (city.get("py_code") or "").strip() if not city_pinyin or city_pinyin in seen_pinyin: continue seen_pinyin.add(city_pinyin) result.append({ "province": province_name, "city": city_name or province_name, "pinyin": city_pinyin, }) return result def _load_areas_from_db(self) -> List[Dict[str, str]]: tables = ("area_new", "area", "area2") last_error = None for table in tables: try: rows = self.db.select_data( table, "province, city, pinyin", "domain='maxlaw' AND level=2", ) or [] except Exception as exc: last_error = exc continue if rows: return rows if last_error: print(f"[大律师PC] 加载数据库地区失败: {last_error}") return [] def _existing_phones(self, phones: List[str]) -> Set[str]: if not phones: return set() existing: Set[str] = set() cur = self.db.db.cursor() try: chunk_size = 500 for i in range(0, len(phones), chunk_size): chunk = phones[i:i + chunk_size] placeholders = ",".join(["%s"] * len(chunk)) sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" cur.execute(sql, [DOMAIN, *chunk]) for row in cur.fetchall(): existing.add(row[0]) finally: cur.close() return existing def _build_list_url(self, pinyin: str, page: int) -> str: return LIST_URL_TEMPLATE.format(pinyin=pinyin, page=page) def _parse_location_line( self, text: str, fallback_province: str, fallback_city: str, ) -> Tuple[str, str, str]: raw = (text or "").replace("\xa0", " ") raw = re.sub(r"\s+", " ", raw).strip() if not raw: return fallback_province, fallback_city or fallback_province, "" parts = raw.split(" ", 1) area_text = parts[0].strip() law_firm = parts[1].strip() if len(parts) > 1 else "" province = fallback_province city = fallback_city or fallback_province if "-" in area_text: area_parts = [item.strip() for item in area_text.split("-", 1)] if area_parts[0]: province = area_parts[0] if len(area_parts) > 1 and area_parts[1]: city = area_parts[1] elif area_text: province = area_text city = area_text return province, city, law_firm def _extract_page_region(self, soup: BeautifulSoup) -> str: button = soup.select_one(".filter .filter-btn") if button: return normalize_region_text(button.get_text(" ", strip=True)) title = soup.select_one(".findLawyer-title h1") if title: return normalize_region_text(title.get_text(strip=True).replace("律师", "")) return "" def _page_matches_area(self, soup: BeautifulSoup, province: str, city: str) -> Tuple[bool, str]: current_region = self._extract_page_region(soup) if not current_region: return True, current_region if "全国" in current_region: return False, current_region norm_province = normalize_region_text(province) norm_city = normalize_region_text(city or province) if norm_city and norm_city != norm_province: matched = norm_province in current_region and norm_city in current_region else: matched = norm_province in current_region if matched: return True, current_region title = soup.select_one(".findLawyer-title h1") title_text = "" if title: title_text = normalize_region_text(title.get_text(strip=True).replace("律师", "")) if norm_city and norm_city != norm_province: matched = norm_city in title_text else: matched = norm_province in title_text return matched, current_region or title_text def _parse_list(self, html: str, province: str, city: str, list_url: str, area_pinyin: str) -> Tuple[bool, int, int]: soup = BeautifulSoup(html, "html.parser") matched, current_region = self._page_matches_area(soup, province, city) if not matched: print(f" 页面地区不匹配,停止分页: 目标={province}-{city} 当前={current_region or '未知'}") return False, 0, 0 cards = [] seen_page_phone: Set[str] = set() for item in soup.select("ul.findLawyer-list > li.clearfix"): name_link = item.select_one(".findLawyer-list-detail-name a[href]") phone_tag = item.select_one(".findLawyer-list-detail-name span") if not name_link or not phone_tag: continue phone = normalize_phone(phone_tag.get_text(" ", strip=True)) if not phone or phone in seen_page_phone: continue seen_page_phone.add(phone) name = name_link.get_text(strip=True) detail_url = urljoin(SITE_BASE, name_link.get("href", "").strip()) location_tag = item.select_one(".findLawyer-list-detail-the") card_province, card_city, law_firm = self._parse_location_line( location_tag.get_text(" ", strip=True) if location_tag else "", province, city, ) specialties = [] for dd in item.select(".findLawyer-list-detail-fields dd"): text = dd.get_text(strip=True) if text: specialties.append(text) reply_count = None reply_tag = item.select_one(".findLawyer-list-detail-other a") if reply_tag: match = REPLY_RE.search(reply_tag.get_text(" ", strip=True)) if match: reply_count = int(match.group(1)) cards.append({ "name": name, "law_firm": law_firm, "province": card_province or province, "city": card_city or city or province, "phone": phone, "url": detail_url, "domain": DOMAIN, "create_time": int(time.time()), "params": json.dumps({ "area_pinyin": area_pinyin, "source": list_url, "specialties": specialties, "reply_count": reply_count, }, ensure_ascii=False), }) if not cards: return True, 0, 0 phones = [item["phone"] for item in cards if item.get("phone")] existing = self._existing_phones(phones) inserted = 0 for item in cards: phone = item.get("phone") if not phone: continue if phone in existing: print(f" -- 已存在: {item['name']} ({phone})") continue try: self.db.insert_data("lawyer", item) inserted += 1 print(f" -> 新增: {item['name']} ({phone})") except Exception as exc: print(f" 插入失败 {item.get('url')}: {exc}") return True, inserted, len(cards) def run(self): print("启动大律师 PC 站采集...") if not self.areas: print("无地区数据") return for area in self.areas: province = (area.get("province") or "").strip() city = (area.get("city") or province).strip() pinyin = (area.get("pinyin") or "").strip() if not province or not pinyin: continue area_label = province if not city or city == province else f"{province}-{city}" print(f"采集地区: {area_label} ({pinyin})") for page in range(1, self.max_pages + 1): list_url = self._build_list_url(pinyin, page) print(f" 第 {page} 页: {list_url}") html = self._get(list_url, headers={"Referer": SITE_BASE + "/law"}) if not html: break page_ok, inserted, parsed_count = self._parse_list(html, province, city, list_url, pinyin) if not page_ok: break if parsed_count == 0: print(" 当前页无律师卡片,停止") break if inserted == 0: print(" 当前页无新增数据") time.sleep(0.5) print("大律师 PC 站采集完成") if __name__ == "__main__": with Db() as db: spider = DlsPcSpider(db) spider.run()