import json import os import re import sys import time import random from typing import Dict, Optional, List, Set from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor, as_completed import threading current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) request_dir = os.path.join(project_root, "request") if request_dir not in sys.path: sys.path.insert(0, request_dir) if project_root not in sys.path: sys.path.append(project_root) import requests import urllib3 from bs4 import BeautifulSoup from request.proxy_config import get_proxies, report_proxy_status urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) from Db import Db from config import LAWTIME_CONFIG from utils.rate_limiter import request_slot LIST_BASE = "https://m.lawtime.cn/{pinyin}/lawyer/?page={page}" DETAIL_BASE = "https://m.lawtime.cn" DOMAIN = "法律快车" class LawtimeSpider: def __init__(self, db_connection): self.db = db_connection self.session = self._build_session() self.max_workers = int(os.getenv("SPIDER_WORKERS", "8")) self._tls = threading.local() def _build_session(self) -> requests.Session: report_proxy_status() session = requests.Session() session.trust_env = False proxies = get_proxies() if proxies: session.proxies.update(proxies) else: session.proxies.clear() headers = LAWTIME_CONFIG.get("HEADERS", {}) if headers: session.headers.update(headers) session.headers.setdefault("Connection", "close") return session def _refresh_session(self) -> None: try: self.session.close() except Exception: pass self.session = self._build_session() def _get_thread_session(self) -> requests.Session: s = getattr(self._tls, "session", None) if s is not None: return s s = self._build_session() s.headers.update(dict(self.session.headers)) self._tls.session = s return s def _refresh_thread_session(self) -> None: s = getattr(self._tls, "session", None) if s is not None: try: s.close() except Exception: pass self._tls.session = None def _existing_phones(self, phones: List[str]) -> Set[str]: if not phones: return set() existing: Set[str] = set() cur = self.db.db.cursor() try: chunk_size = 500 for i in range(0, len(phones), chunk_size): chunk = phones[i:i + chunk_size] placeholders = ",".join(["%s"] * len(chunk)) sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" cur.execute(sql, [DOMAIN, *chunk]) for row in cur.fetchall(): existing.add(row[0]) finally: cur.close() return existing def _load_areas(self): condition = "level = 2 and domain='法律快车'" tables = ("area_new", "area", "area2") last_error = None for table in tables: try: rows = self.db.select_data(table, "pinyin, province, city", condition) or [] except Exception as exc: last_error = exc continue if rows: missing_pinyin = sum(1 for r in rows if not (r.get("pinyin") or "").strip()) print(f"[法律快车] 城市来源表: {table}, 城市数: {len(rows)}, 缺少pinyin: {missing_pinyin}") return rows if last_error: print(f"[法律快车] 加载地区数据失败: {last_error}") print("[法律快车] 无城市数据(已尝试 area_new/area/area2)") return [] def _get(self, url: str, max_retries: int = 3) -> Optional[str]: return self._get_with_session(self.session, url, max_retries=max_retries, is_thread=False) def _get_with_session(self, session: requests.Session, url: str, max_retries: int = 3, is_thread: bool = False) -> Optional[str]: for attempt in range(max_retries): try: with request_slot(): resp = session.get(url, timeout=15, verify=False) status_code = resp.status_code text = resp.text resp.close() if status_code == 403: if attempt < max_retries - 1: wait_time = 2 ** attempt + random.uniform(0.3, 1.0) print(f"请求失败 {url}: 403,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") if is_thread: self._refresh_thread_session() session = self._get_thread_session() else: self._refresh_session() session = self.session time.sleep(wait_time) continue print(f"请求失败 {url}: 403 Forbidden") return None if status_code >= 400: raise requests.exceptions.HTTPError(f"{status_code} Error: {url}") return text except requests.exceptions.RequestException as exc: print(f"请求失败 {url}: {exc}") return None return None def _parse_list(self, html: str, province: str, city: str) -> int: soup = BeautifulSoup(html, "html.parser") links = [a.get("href", "") for a in soup.select("a.hide_link")] links = [link.replace("lll", "int") for link in links if link] if not links: return 0 detail_urls = [urljoin(DETAIL_BASE, link) for link in links] results: List[Dict[str, str]] = [] with ThreadPoolExecutor(max_workers=self.max_workers) as ex: futs = [ex.submit(self._parse_detail, u, province, city) for u in detail_urls] for fut in as_completed(futs): try: data = fut.result() except Exception as exc: print(f" 详情解析异常: {exc}") continue if data and data.get("phone"): results.append(data) if not results: return len(detail_urls) phones = [d["phone"] for d in results if d.get("phone")] existing = self._existing_phones(phones) for data in results: phone = data.get("phone") if not phone: continue if phone in existing: print(f" -- 已存在: {data['name']} ({phone})") continue try: self.db.insert_data("lawyer", data) print(f" -> 新增: {data['name']} ({phone})") except Exception as exc: print(f" 插入失败 {data.get('url')}: {exc}") return len(detail_urls) def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]: html = None sess = self._get_thread_session() html = self._get_with_session(sess, url, max_retries=3, is_thread=True) if not html: return None soup = BeautifulSoup(html, "html.parser") text = soup.get_text(" ") name = "" title_tag = soup.find("title") if title_tag: match = re.search(r"(\S+)律师", title_tag.get_text()) if match: name = match.group(1) if not name: intl_div = soup.find("div", class_="intl") if intl_div: match = re.search(r"(\S+)律师", intl_div.get_text()) if match: name = match.group(1) phone = "" phone_pattern = r"1[3-9]\d{9}" for item in soup.select("div.item.flex"): label = item.find("div", class_="label") desc = item.find("div", class_="desc") if not label or not desc: continue label_text = label.get_text() desc_text = desc.get_text().replace("-", "") if "联系电话" in label_text or "电话" in label_text: matches = re.findall(phone_pattern, desc_text) if matches: phone = matches[0] break if not phone: matches = re.findall(phone_pattern, text.replace("-", "")) if matches: phone = matches[0] if not phone: print(f" 无手机号: {url}") return None law_firm = "" for item in soup.select("div.item.flex"): label = item.find("div", class_="label") desc = item.find("div", class_="desc") if not label or not desc: continue if "执业律所" in label.get_text() or "律所" in label.get_text(): law_firm = desc.get_text(strip=True).replace("已认证", "") break params = { "list_url": url, "province": province, "city": city, } return { "name": name or "", "law_firm": law_firm, "province": province, "city": city, "phone": phone, "url": url, "domain": DOMAIN, "create_time": int(time.time()), "params": json.dumps(params, ensure_ascii=False) } def run(self): print("启动法律快车采集...") areas = self._load_areas() if not areas: print("无地区数据") return for area in areas: pinyin = area.get("pinyin") province = area.get("province", "") city = area.get("city", "") if not pinyin: continue page = 1 while True: list_url = LIST_BASE.format(pinyin=pinyin, page=page) print(f"采集 {province}-{city} 第 {page} 页: {list_url}") html = self._get(list_url) if not html: break link_count = self._parse_list(html, province, city) if link_count == 0: break page += 1 print("法律快车采集完成") if __name__ == "__main__": with Db() as db: spider = LawtimeSpider(db) spider.run()