Files
2026-04-03 16:06:28 +08:00

622 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import hashlib
import json
import os
import random
import re
import sys
import time
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Set, Tuple
from urllib.parse import urljoin
from bs4 import BeautifulSoup
import urllib3
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
from request.requests_client import RequestClientError, RequestsClient
from utils.rate_limiter import request_slot
from Db import Db
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
SITE_NAME = "maxlaw"
LEGACY_DOMAIN = "大律师"
SITE_BASE = "https://m.maxlaw.cn"
CITY_API = "https://js.maxlaw.cn/js/ajax/common/getprovice.js"
CITY_DETAIL_API = "https://js.maxlaw.cn/js/ajax/common/getcity_{province_id}.js"
LIST_URL_TEMPLATE = SITE_BASE + "/law/{city_py}?page={page}"
PHONE_RE = re.compile(r"1[3-9]\d{9}")
ANSWER_RE = re.compile(r"已解答\s*(\d+)\s*次")
@dataclass
class CityTarget:
province_id: int
province_name: str
province_py: str
city_id: int
city_name: str
city_py: str
@dataclass
class ListCard:
detail_url: str
name: str = ""
law_firm: str = ""
specialties: List[str] = field(default_factory=list)
answered_count: Optional[int] = None
def clean_prefixed_name(value: str) -> str:
text = (value or "").strip()
# 接口返回常见格式如 "B 北京"
text = re.sub(r"^[A-Za-z]\s*", "", text)
return text.strip()
def normalize_phone(text: str) -> str:
compact = re.sub(r"\D", "", text or "")
match = PHONE_RE.search(compact)
return match.group(0) if match else ""
def parse_json_with_bom(text: str) -> Dict:
cleaned = (text or "").strip().lstrip("\ufeff")
return json.loads(cleaned)
class DlsFreshCrawler:
def __init__(
self,
max_pages: int = 3,
sleep_seconds: float = 0.2,
use_proxy: bool = True,
db_connection=None,
):
self.max_pages = max_pages
self.sleep_seconds = max(0.0, sleep_seconds)
self.db = db_connection
self.client = RequestsClient(
headers={
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 "
"Mobile/15E148 Safari/604.1"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "close",
},
use_proxy=use_proxy,
retry_total=2,
retry_backoff_factor=1,
retry_status_forcelist=(429, 500, 502, 503, 504),
retry_allowed_methods=("GET",),
)
def _get_text(self, url: str, timeout: int = 20, max_retries: int = 3) -> str:
last_error: Optional[Exception] = None
for attempt in range(max_retries):
try:
with request_slot():
resp = self.client.get_text(url, timeout=timeout, verify=False)
code = resp.status_code
if code == 403:
if attempt < max_retries - 1:
self.client.refresh()
time.sleep((2 ** attempt) + random.uniform(0.2, 0.8))
continue
raise RequestClientError(f"{code} Error: {url}")
if code >= 500 and attempt < max_retries - 1:
time.sleep((2 ** attempt) + random.uniform(0.2, 0.8))
continue
if code >= 400:
raise RequestClientError(f"{code} Error: {url}")
return resp.text
except Exception as exc:
last_error = exc
if attempt < max_retries - 1:
time.sleep((2 ** attempt) + random.uniform(0.2, 0.8))
continue
raise
if last_error is not None:
raise last_error
raise RequestClientError(f"Unknown request error: {url}")
def discover_cities(self) -> List[CityTarget]:
province_text = self._get_text(CITY_API)
province_data = parse_json_with_bom(province_text)
province_rows = province_data.get("ds", []) or []
cities: List[CityTarget] = []
seen_py: Set[str] = set()
for province in province_rows:
province_id = int(province.get("id"))
province_name = clean_prefixed_name(province.get("name", ""))
province_py = (province.get("py_code") or "").strip()
if not province_py:
continue
city_api = CITY_DETAIL_API.format(province_id=province_id)
try:
city_text = self._get_text(city_api)
city_data = parse_json_with_bom(city_text)
except Exception as exc:
print(f"[city] 获取失败 pid={province_id}: {exc}")
continue
for city in city_data.get("ds", []) or []:
city_py = (city.get("py_code") or "").strip()
if not city_py or city_py in seen_py:
continue
seen_py.add(city_py)
cities.append(
CityTarget(
province_id=province_id,
province_name=province_name,
province_py=province_py,
city_id=int(city.get("id")),
city_name=clean_prefixed_name(city.get("name", "")),
city_py=city_py,
)
)
return cities
def parse_list_cards(self, html: str) -> List[ListCard]:
soup = BeautifulSoup(html, "html.parser")
cards: List[ListCard] = []
seen: Set[str] = set()
for item in soup.select("div.lawyer_list ul.lawyer_ul > li"):
link = item.select_one("div.lstx a[href]")
if not link:
continue
detail_url = urljoin(SITE_BASE, link.get("href", "").strip())
if not detail_url or detail_url in seen:
continue
seen.add(detail_url)
name = ""
law_firm = ""
specialties: List[str] = []
answered_count = None
name_tag = item.select_one("p.name")
if name_tag:
name = name_tag.get_text(strip=True)
firm_tag = item.select_one("div.li_r h2")
if firm_tag:
law_firm = firm_tag.get_text(strip=True)
for span in item.select("div.zc span"):
text = span.get_text(strip=True)
if text:
specialties.append(text)
distance_text = item.select_one("div.distance i")
if distance_text:
match = ANSWER_RE.search(distance_text.get_text(" ", strip=True))
if match:
answered_count = int(match.group(1))
cards.append(
ListCard(
detail_url=detail_url,
name=name,
law_firm=law_firm,
specialties=specialties,
answered_count=answered_count,
)
)
return cards
def has_next_page(self, html: str) -> bool:
soup = BeautifulSoup(html, "html.parser")
return soup.select_one("a.mnext") is not None
def parse_detail(self, detail_url: str) -> Dict:
html = self._get_text(detail_url)
soup = BeautifulSoup(html, "html.parser")
name = ""
law_firm = ""
license_no = ""
practice_years = None
phone = ""
email = ""
address = ""
specialties: List[str] = []
name_tag = soup.select_one("h2.lawyerName")
if name_tag:
name = name_tag.get_text(strip=True)
firm_tag = soup.select_one("p.law-firm")
if firm_tag:
law_firm = firm_tag.get_text(strip=True)
license_tag = soup.select_one("p.card-zyz")
if license_tag:
license_no = (
license_tag.get_text(" ", strip=True)
.replace("执业证号:", "")
.replace("执业证号:", "")
.strip()
)
years_tag = soup.select_one("div#practice i")
if years_tag:
year_text = years_tag.get_text(strip=True)
if year_text.isdigit():
practice_years = int(year_text)
tel_tag = soup.select_one("a[href^='tel:']")
if tel_tag:
phone = normalize_phone(tel_tag.get("href", ""))
for li in soup.select("ul.contact-content > li"):
key = li.select_one("i")
val = li.select_one("p")
if not key or not val:
continue
k = key.get_text(strip=True).replace("", ":")
v = val.get_text(" ", strip=True)
if "电话" in k and not phone:
phone = normalize_phone(v)
elif "邮箱" in k and not email:
email = v.strip()
elif "地址" in k and not address:
address = v.strip()
for node in soup.select("div.exp-main li.on"):
text = node.get_text(strip=True)
if text:
specialties.append(text)
return {
"name": name,
"law_firm": law_firm,
"license_no": license_no,
"practice_years": practice_years,
"phone": phone,
"email": email,
"address": address,
"specialties": specialties,
"detail_url": detail_url,
}
def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]:
profile = record.get("profile", {}) or {}
source = record.get("source", {}) or {}
phone = normalize_phone(profile.get("phone", ""))
if not phone:
return None
province = (source.get("province") or "").strip()
city = (source.get("city") or province).strip()
return {
"name": (profile.get("name") or "").strip(),
"law_firm": (profile.get("law_firm") or "").strip(),
"province": province,
"city": city,
"phone": phone,
"url": (source.get("detail_url") or "").strip(),
"domain": LEGACY_DOMAIN,
"create_time": int(record.get("collected_at") or time.time()),
"params": json.dumps(record, ensure_ascii=False),
}
def _existing_phones_in_db(self, phones: List[str]) -> Set[str]:
if not self.db or not phones:
return set()
deduped = sorted({p for p in phones if p})
if not deduped:
return set()
existing: Set[str] = set()
cur = self.db.db.cursor()
try:
chunk_size = 500
for i in range(0, len(deduped), chunk_size):
chunk = deduped[i:i + chunk_size]
placeholders = ",".join(["%s"] * len(chunk))
sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})"
cur.execute(sql, [LEGACY_DOMAIN, *chunk])
for row in cur.fetchall():
existing.add(row[0])
finally:
cur.close()
return existing
def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]:
if not self.db:
return 0, 0
rows: List[Dict[str, str]] = []
for record in records:
row = self._to_legacy_lawyer_row(record)
if row:
rows.append(row)
if not rows:
return 0, 0
existing = self._existing_phones_in_db([row["phone"] for row in rows])
inserted = 0
skipped = 0
for row in rows:
phone = row.get("phone", "")
if not phone or phone in existing:
skipped += 1
continue
try:
self.db.insert_data("lawyer", row)
existing.add(phone)
inserted += 1
except Exception as exc:
skipped += 1
print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}")
return inserted, skipped
def crawl_city(self, target: CityTarget) -> Iterable[Dict]:
# 同一城市内去重,避免站点分页回流导致重复抓取
seen_detail_urls: Set[str] = set()
last_page_signature: Tuple[str, ...] = tuple()
repeated_signature_pages = 0
no_new_pages = 0
for page in range(1, self.max_pages + 1):
list_url = LIST_URL_TEMPLATE.format(city_py=target.city_py, page=page)
try:
html = self._get_text(list_url)
except Exception as exc:
print(f"[list] 失败 {list_url}: {exc}")
break
cards = self.parse_list_cards(html)
if not cards:
break
page_signature = tuple(sorted(card.detail_url for card in cards if card.detail_url))
if page_signature and page_signature == last_page_signature:
repeated_signature_pages += 1
else:
repeated_signature_pages = 0
last_page_signature = page_signature
if repeated_signature_pages >= 2:
print(
f"[list] 城市 {target.city_py}{page}页列表签名重复,提前结束,"
f"list_url={list_url}"
)
break
fresh_cards: List[ListCard] = []
for card in cards:
if not card.detail_url:
continue
if card.detail_url in seen_detail_urls:
continue
seen_detail_urls.add(card.detail_url)
fresh_cards.append(card)
if not fresh_cards:
no_new_pages += 1
if no_new_pages >= 3:
print(
f"[list] 城市 {target.city_py} 连续{no_new_pages}页无新增律师,提前结束,"
f"list_url={list_url}"
)
break
else:
no_new_pages = 0
print(
f"[page] city={target.city_py} page={page} cards={len(cards)} "
f"fresh={len(fresh_cards)} next={self.has_next_page(html)}"
)
for card in fresh_cards:
try:
detail = self.parse_detail(card.detail_url)
except Exception as exc:
print(f"[detail] 失败 {card.detail_url}: {exc}")
continue
now = int(time.time())
record_id = hashlib.md5(card.detail_url.encode("utf-8")).hexdigest()
yield {
"record_id": record_id,
"collected_at": now,
"source": {
"site": SITE_NAME,
"list_url": list_url,
"detail_url": card.detail_url,
"province": target.province_name,
"province_py": target.province_py,
"city": target.city_name,
"city_py": target.city_py,
"page": page,
},
"list_snapshot": {
"name": card.name,
"law_firm": card.law_firm,
"specialties": card.specialties,
"answered_count": card.answered_count,
},
"profile": {
"name": detail.get("name") or card.name,
"law_firm": detail.get("law_firm") or card.law_firm,
"phone": detail.get("phone", ""),
"license_no": detail.get("license_no", ""),
"practice_years": detail.get("practice_years"),
"email": detail.get("email", ""),
"address": detail.get("address", ""),
"specialties": detail.get("specialties") or card.specialties,
},
}
if self.sleep_seconds:
time.sleep(self.sleep_seconds)
if not self.has_next_page(html):
break
def crawl(
self,
output_path: str,
max_cities: int = 0,
city_filter: Optional[str] = None,
) -> None:
cities = self.discover_cities()
print(f"[discover] 共发现城市 {len(cities)}")
if city_filter:
key = city_filter.strip().lower()
cities = [c for c in cities if key in c.city_py.lower() or key in c.city_name.lower()]
print(f"[discover] 过滤后城市 {len(cities)} 个, filter={city_filter}")
if max_cities > 0:
cities = cities[:max_cities]
print(f"[discover] 截断城市数 {len(cities)}")
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
seen_ids: Set[str] = set()
if os.path.exists(output_path):
with open(output_path, "r", encoding="utf-8") as old_file:
for line in old_file:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
except Exception:
continue
rid = item.get("record_id")
if rid:
seen_ids.add(rid)
print(f"[resume] 已有记录 {len(seen_ids)}")
total_new_json = 0
total_new_db = 0
total_skip_db = 0
with open(output_path, "a", encoding="utf-8") as out:
for idx, target in enumerate(cities, start=1):
print(
f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} "
f"({target.city_py})"
)
city_records = list(self.crawl_city(target))
city_new_json = 0
for record in city_records:
rid = record["record_id"]
if rid in seen_ids:
continue
out.write(json.dumps(record, ensure_ascii=False) + "\n")
seen_ids.add(rid)
city_new_json += 1
total_new_json += 1
city_new_db, city_skip_db = self._write_records_to_db(city_records)
total_new_db += city_new_db
total_skip_db += city_skip_db
print(
f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, "
f"DB新增{city_new_db}条, DB跳过{city_skip_db}"
)
print(
f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, "
f"DB跳过{total_skip_db}条, 输出: {output_path}"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="大律师全新采集脚本(新数据结构)")
parser.add_argument(
"--output",
default="/www/wwwroot/lawyers/data/dls_records_all.jsonl",
help="输出 jsonl 文件路径",
)
parser.add_argument(
"--max-cities",
type=int,
default=0,
help="最多采集多少个城市,0 表示不限",
)
parser.add_argument(
"--max-pages",
type=int,
default=9999,
help="每个城市最多采集多少页",
)
parser.add_argument(
"--city-filter",
default="",
help="按城市拼音或城市名过滤,如 beijing",
)
parser.add_argument(
"--sleep",
type=float,
default=0.2,
help="详情页请求间隔秒数",
)
parser.add_argument(
"--direct",
action="store_true",
help="直连模式,不使用 proxy_settings.json 代理",
)
parser.add_argument(
"--no-db",
action="store_true",
help="只输出 JSONL,不写入数据库",
)
return parser.parse_args()
def main():
args = parse_args()
if args.no_db:
crawler = DlsFreshCrawler(
max_pages=args.max_pages,
sleep_seconds=args.sleep,
use_proxy=not args.direct,
db_connection=None,
)
crawler.crawl(
output_path=args.output,
max_cities=args.max_cities,
city_filter=args.city_filter or None,
)
return
with Db() as db:
crawler = DlsFreshCrawler(
max_pages=args.max_pages,
sleep_seconds=args.sleep,
use_proxy=not args.direct,
db_connection=db,
)
crawler.crawl(
output_path=args.output,
max_cities=args.max_cities,
city_filter=args.city_filter or None,
)
if __name__ == "__main__":
main()