Files
lawyers/common_sites/dls.py
T
hello-dd-code 19cf9ce901 重构采集脚本并新增按时间导出Excel
- 统一五个站点采集逻辑与启动脚本\n- 新增 dls_fresh 采集流程与日志优化\n- 新增 export_lawyers_excel 按时间条件导出\n- 默认导出近7天并支持扩展字段解析\n- 整理 .gitignore,忽略 data/logs 本地产物
2026-03-02 11:46:05 +08:00

384 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import random
import re
import sys
import time
from typing import Dict, List, Optional, Set, Tuple
from urllib.parse import urljoin
import urllib3
from bs4 import BeautifulSoup
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
from Db import Db
from request.requests_client import (
RequestClientError,
RequestConnectTimeout,
RequestConnectionError,
RequestTimeout,
RequestsClient,
)
from utils.rate_limiter import wait_for_request
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DOMAIN = "大律师"
SITE_BASE = "https://m.maxlaw.cn"
LIST_TEMPLATE = SITE_BASE + "/law/{pinyin}?page={page}"
PHONE_PATTERN = re.compile(r"1[3-9]\d{9}")
MAX_PAGES_PER_CITY = int(os.getenv("MAXLAW_MAX_PAGES", "0"))
PROXY_TESTED = False
class DlsSpider:
def __init__(self, db_connection):
self.db = db_connection
self.client = self._build_client()
self.areas = self._load_areas()
def _build_client(self) -> RequestsClient:
client = RequestsClient(
headers={
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 "
"Mobile/15E148 Safari/604.1"
),
"Host": "m.maxlaw.cn",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "close",
},
retry_total=3,
retry_backoff_factor=1,
retry_status_forcelist=(429, 500, 502, 503, 504),
retry_allowed_methods=("GET", "POST"),
)
self._proxy_test(client, client.proxies or None)
return client
def _refresh_client(self) -> None:
self.client.refresh()
self._proxy_test(self.client, self.client.proxies or None)
def _proxy_test(self, client: RequestsClient, proxies: Optional[Dict[str, str]]) -> None:
global PROXY_TESTED
if PROXY_TESTED or not os.getenv("PROXY_TEST"):
return
PROXY_TESTED = True
if not proxies:
print("[proxy] test skipped: no proxy configured")
return
test_url = os.getenv("PROXY_TEST_URL", "https://dev.kdlapi.com/testproxy")
timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10"))
try:
resp = client.get_text(test_url, timeout=timeout, headers={"Connection": "close"})
print(f"[proxy] test {resp.status_code}: {resp.text.strip()[:200]}")
except Exception as exc:
print(f"[proxy] test failed: {exc}")
def _load_areas(self) -> List[Dict[str, str]]:
tables = ("area_new", "area2", "area")
last_error = None
for table in tables:
try:
rows = self.db.select_data(table, "province, city, pinyin", "domain='maxlaw'") or []
except Exception as exc:
last_error = exc
continue
if rows:
missing_pinyin = sum(1 for row in rows if not (row.get("pinyin") or "").strip())
print(f"[大律师] 地区来源表: {table}, 城市数: {len(rows)}, 缺少pinyin: {missing_pinyin}")
return rows
if last_error:
print(f"[大律师] 加载地区失败: {last_error}")
print("[大律师] 无地区数据(已尝试 area_new/area2/area")
return []
def _get(
self,
url: str,
*,
headers: Optional[Dict[str, str]] = None,
max_retries: int = 3,
timeout: Tuple[int, int] = (10, 30),
) -> Optional[str]:
wait_for_request()
for attempt in range(max_retries):
try:
resp = self.client.get_text(url, timeout=timeout, verify=False, headers=headers)
if resp.status_code == 403:
if attempt < max_retries - 1:
wait_time = (2 ** attempt) + random.uniform(0.3, 1.0)
print(f"请求403{wait_time:.2f}s后重试 ({attempt + 1}/{max_retries}): {url}")
self._refresh_client()
time.sleep(wait_time)
continue
print(f"请求失败 {url}: 403 Forbidden")
return None
if resp.status_code >= 400:
raise RequestClientError(f"{resp.status_code} Error: {url}")
return resp.text
except RequestConnectTimeout as exc:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"连接超时,{wait_time}s后重试 ({attempt + 1}/{max_retries}): {url}")
time.sleep(wait_time)
continue
print(f"连接超时,已达到最大重试次数 {url}: {exc}")
return None
except RequestTimeout as exc:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"请求超时,{wait_time}s后重试 ({attempt + 1}/{max_retries}): {url}")
time.sleep(wait_time)
continue
print(f"请求超时,已达到最大重试次数 {url}: {exc}")
return None
except RequestConnectionError as exc:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"连接错误,{wait_time}s后重试 ({attempt + 1}/{max_retries}): {url}")
time.sleep(wait_time)
continue
print(f"连接错误,已达到最大重试次数 {url}: {exc}")
return None
except RequestClientError as exc:
print(f"请求失败 {url}: {exc}")
return None
return None
def _detail_headers(self, referer: str) -> Dict[str, str]:
return {
"Referer": referer,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Upgrade-Insecure-Requests": "1",
}
def _extract_detail_urls(self, html: str) -> List[str]:
soup = BeautifulSoup(html, "html.parser")
urls: List[str] = []
seen: Set[str] = set()
# 主选择器:当前站点列表卡片
for a_tag in soup.select("div.lstx a[href]"):
href = (a_tag.get("href") or "").strip()
if not href:
continue
url = urljoin(SITE_BASE, href)
if url in seen:
continue
seen.add(url)
urls.append(url)
# 回退选择器:页面结构轻微变化时尽量保活
if not urls:
for a_tag in soup.select("a[href]"):
href = (a_tag.get("href") or "").strip()
if "/lawyer/" not in href:
continue
url = urljoin(SITE_BASE, href)
if url in seen:
continue
seen.add(url)
urls.append(url)
return urls
def _extract_name(self, soup: BeautifulSoup) -> str:
for selector in ("h2.lawyerName", "h1.lawyerName", "h1", "h2"):
tag = soup.select_one(selector)
if tag:
name = tag.get_text(strip=True)
if name:
return name
title = soup.title.get_text(strip=True) if soup.title else ""
match = re.search(r"(\S+律师)", title)
return match.group(1) if match else ""
def _extract_law_firm(self, soup: BeautifulSoup) -> str:
for selector in ("p.law-firm", "div.law-firm", "p[class*=firm]"):
tag = soup.select_one(selector)
if tag:
text = tag.get_text(strip=True)
if text:
return text
page_text = soup.get_text(" ", strip=True)
match = re.search(r"(执业机构|律所)\s*[:]?\s*([^\s,。,;]{2,40})", page_text)
if match:
return match.group(2).strip()
return ""
def _normalize_phone(self, text: str) -> str:
compact = re.sub(r"\D", "", text or "")
match = PHONE_PATTERN.search(compact)
return match.group(0) if match else ""
def _extract_phone(self, soup: BeautifulSoup) -> str:
contact = soup.select_one("ul.contact-content")
if contact:
phone = self._normalize_phone(contact.get_text(" ", strip=True))
if phone:
return phone
for selector in ("a[href^='tel:']", "span.phone", "p.phone"):
tag = soup.select_one(selector)
if tag:
phone = self._normalize_phone(tag.get_text(" ", strip=True))
if phone:
return phone
return self._normalize_phone(soup.get_text(" ", strip=True))
def _parse_detail(self, detail_url: str, province: str, city: str, list_url: str) -> Optional[Dict[str, str]]:
print(f" 详情: {detail_url}")
html = self._get(detail_url, headers=self._detail_headers(list_url))
if not html:
return None
soup = BeautifulSoup(html, "html.parser")
name = self._extract_name(soup)
phone = self._extract_phone(soup)
if not name or not phone:
print(" 信息不完整,跳过")
return None
safe_city = city or province
return {
"name": name,
"law_firm": self._extract_law_firm(soup),
"province": province,
"city": safe_city,
"phone": phone,
"url": detail_url,
"domain": DOMAIN,
"create_time": int(time.time()),
"params": json.dumps({"province": province, "city": safe_city}, ensure_ascii=False),
}
def _existing_phones(self, phones: List[str]) -> Set[str]:
if not phones:
return set()
existing: Set[str] = set()
cur = self.db.db.cursor()
try:
chunk_size = 500
for idx in range(0, len(phones), chunk_size):
chunk = phones[idx:idx + chunk_size]
placeholders = ",".join(["%s"] * len(chunk))
sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})"
cur.execute(sql, [DOMAIN, *chunk])
for row in cur.fetchall():
existing.add(row[0])
finally:
cur.close()
return existing
def _save_lawyers(self, lawyers: List[Dict[str, str]]) -> Tuple[int, int]:
if not lawyers:
return 0, 0
phones = [row["phone"] for row in lawyers if row.get("phone")]
existing = self._existing_phones(phones)
inserted = 0
skipped = 0
for row in lawyers:
phone = row.get("phone", "")
if not phone:
skipped += 1
continue
if phone in existing:
skipped += 1
print(f" -- 已存在: {row.get('name', '')} ({phone})")
continue
try:
self.db.insert_data("lawyer", row)
existing.add(phone)
inserted += 1
print(f" -> 新增: {row.get('name', '')} ({phone})")
except Exception as exc:
skipped += 1
print(f" 插入失败 {row.get('url', '')}: {exc}")
return inserted, skipped
def _crawl_city(self, area: Dict[str, str]) -> Tuple[int, int]:
pinyin = (area.get("pinyin") or "").strip()
province = area.get("province", "")
city = area.get("city", "")
if not pinyin:
return 0, 0
total_inserted = 0
total_parsed = 0
page = 1
prev_fingerprint = ""
while True:
if MAX_PAGES_PER_CITY > 0 and page > MAX_PAGES_PER_CITY:
print(f"达到分页上限({MAX_PAGES_PER_CITY}),停止 {province}-{city}")
break
list_url = LIST_TEMPLATE.format(pinyin=pinyin, page=page)
print(f"采集 {province}-{city}{page} 页: {list_url}")
html = self._get(list_url)
if not html:
break
detail_urls = self._extract_detail_urls(html)
if not detail_urls:
print(" 列表为空,结束当前城市")
break
fingerprint = "|".join(detail_urls[:8])
if fingerprint and fingerprint == prev_fingerprint:
print(" 列表页重复,提前停止当前城市")
break
prev_fingerprint = fingerprint
lawyers: List[Dict[str, str]] = []
for detail_url in detail_urls:
row = self._parse_detail(detail_url, province, city, list_url)
if row:
lawyers.append(row)
time.sleep(0.25)
inserted, skipped = self._save_lawyers(lawyers)
total_inserted += inserted
total_parsed += len(lawyers)
print(
f"{page} 页完成: 列表{len(detail_urls)}条, "
f"解析{len(lawyers)}条, 新增{inserted}条, 跳过{skipped}"
)
page += 1
time.sleep(0.5)
return total_inserted, total_parsed
def run(self):
print("启动大律师采集...")
if not self.areas:
print("无地区数据")
return
all_inserted = 0
all_parsed = 0
for area in self.areas:
inserted, parsed = self._crawl_city(area)
all_inserted += inserted
all_parsed += parsed
print(f"大律师采集完成: 解析{all_parsed}条, 新增{all_inserted}")
if __name__ == "__main__":
with Db() as db:
spider = DlsSpider(db)
spider.run()