diff --git a/.gitignore b/.gitignore index 3ef765e..cdd6454 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,8 @@ Thumbs.db # Local runtime files *.log +logs/ +data/ + +# accidental local files +=* diff --git a/README.md b/README.md index 8a61ed0..e8f0a44 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,49 @@ ```bash cd /www/wwwroot/lawyers -python3 -m pip install -r requirements.txt -cd common_sites -./start.sh +python3 -m venv .venv +.venv/bin/pip install -r requirements.txt +./common_sites/start.sh +``` + +## 启动参数 + +`start.sh` 默认并行启动 5 个站点采集(大律师使用 `dls_fresh.py`)。 + +- 日志目录:`/www/wwwroot/lawyers/logs` +- 大律师 JSON 输出:`/www/wwwroot/lawyers/data/dls_records.jsonl` + +常用环境变量: + +```bash +# 顺序执行(默认 parallel) +RUN_MODE=sequential ./common_sites/start.sh + +# 大律师限制采集范围 +DLS_CITY_FILTER=beijing DLS_MAX_CITIES=1 DLS_MAX_PAGES=1 ./common_sites/start.sh + +# 大律师直连(不走代理)/ 仅导出JSON不写库 +DLS_DIRECT=1 DLS_NO_DB=1 ./common_sites/start.sh +``` + +## 导出 Excel + +新增导出脚本:`common_sites/export_lawyers_excel.py` + +```bash +# 无参数:默认导出最近7天数据(含手机号/姓名/律所/省份/市区/站点名称) +# 并默认解析 params 扩展信息(邮箱/地址/执业证号/执业年限/擅长领域等) +./.venv/bin/python ./common_sites/export_lawyers_excel.py + +# 按 create_time 时间戳范围导出 +./.venv/bin/python ./common_sites/export_lawyers_excel.py \ + --start-ts 1772380000 --end-ts 1772429999 \ + --output ./data/lawyers_20260302.xlsx + +# 只导出某站点,并带技术字段(url/域名/时间等) +./.venv/bin/python ./common_sites/export_lawyers_excel.py \ + --domain 大律师 --include-extra + +# 如果不需要解析 params 扩展信息 +./.venv/bin/python ./common_sites/export_lawyers_excel.py --no-parse-params ``` diff --git a/common_sites/dls.py b/common_sites/dls.py index 9e628a3..2ab61a9 100644 --- a/common_sites/dls.py +++ b/common_sites/dls.py @@ -1,9 +1,14 @@ import json import os +import random +import re import sys import time -import random -from typing import Dict, Optional +from typing import Dict, List, Optional, Set, Tuple +from urllib.parse import urljoin + +import urllib3 +from bs4 import BeautifulSoup current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) @@ -13,8 +18,7 @@ if request_dir not in sys.path: if project_root not in sys.path: sys.path.append(project_root) -import urllib3 -from bs4 import BeautifulSoup +from Db import Db from request.requests_client import ( RequestClientError, RequestConnectTimeout, @@ -22,168 +26,136 @@ from request.requests_client import ( RequestTimeout, RequestsClient, ) - -# 禁用 SSL 警告 -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - -from Db import Db from utils.rate_limiter import wait_for_request +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + DOMAIN = "大律师" -LIST_TEMPLATE = "https://m.maxlaw.cn/law/{pinyin}?page={page}" -_PROXY_TESTED = False +SITE_BASE = "https://m.maxlaw.cn" +LIST_TEMPLATE = SITE_BASE + "/law/{pinyin}?page={page}" +PHONE_PATTERN = re.compile(r"1[3-9]\d{9}") +MAX_PAGES_PER_CITY = int(os.getenv("MAXLAW_MAX_PAGES", "0")) +PROXY_TESTED = False class DlsSpider: def __init__(self, db_connection): self.db = db_connection - self.client = self._build_session() + self.client = self._build_client() self.areas = self._load_areas() - def _build_session(self) -> RequestsClient: - """构建带重试机制的 session""" + def _build_client(self) -> RequestsClient: client = RequestsClient( headers={ - "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1", + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), "Host": "m.maxlaw.cn", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Connection": "close", }, - retry_total=3, # 总共重试3次 - retry_backoff_factor=1, # 重试间隔:1s, 2s, 4s - retry_status_forcelist=(429, 500, 502, 503, 504), # 对这些状态码进行重试 + retry_total=3, + retry_backoff_factor=1, + retry_status_forcelist=(429, 500, 502, 503, 504), retry_allowed_methods=("GET", "POST"), ) self._proxy_test(client, client.proxies or None) return client - def _refresh_session(self) -> None: + def _refresh_client(self) -> None: self.client.refresh() self._proxy_test(self.client, self.client.proxies or None) def _proxy_test(self, client: RequestsClient, proxies: Optional[Dict[str, str]]) -> None: - global _PROXY_TESTED - if _PROXY_TESTED or not os.getenv("PROXY_TEST"): + global PROXY_TESTED + if PROXY_TESTED or not os.getenv("PROXY_TEST"): return - _PROXY_TESTED = True + PROXY_TESTED = True if not proxies: print("[proxy] test skipped: no proxy configured") return test_url = os.getenv("PROXY_TEST_URL", "https://dev.kdlapi.com/testproxy") timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10")) try: - resp = client.get_text( - test_url, - timeout=timeout, - headers={"Connection": "close"}, - ) + resp = client.get_text(test_url, timeout=timeout, headers={"Connection": "close"}) print(f"[proxy] test {resp.status_code}: {resp.text.strip()[:200]}") except Exception as exc: print(f"[proxy] test failed: {exc}") - def _load_areas(self): - try: - return self.db.select_data( - "area_new", - "province, city, pinyin", - "domain='maxlaw'" - ) or [] - except Exception as exc: - print(f"加载地区失败: {exc}") - return [] + def _load_areas(self) -> List[Dict[str, str]]: + tables = ("area_new", "area2", "area") + last_error = None + for table in tables: + try: + rows = self.db.select_data(table, "province, city, pinyin", "domain='maxlaw'") or [] + except Exception as exc: + last_error = exc + continue + if rows: + missing_pinyin = sum(1 for row in rows if not (row.get("pinyin") or "").strip()) + print(f"[大律师] 地区来源表: {table}, 城市数: {len(rows)}, 缺少pinyin: {missing_pinyin}") + return rows + if last_error: + print(f"[大律师] 加载地区失败: {last_error}") + print("[大律师] 无地区数据(已尝试 area_new/area2/area)") + return [] - def _get(self, url: str, max_retries: int = 3, headers: Optional[Dict[str, str]] = None) -> Optional[str]: - """发送 GET 请求,带重试机制""" + def _get( + self, + url: str, + *, + headers: Optional[Dict[str, str]] = None, + max_retries: int = 3, + timeout: Tuple[int, int] = (10, 30), + ) -> Optional[str]: wait_for_request() - for attempt in range(max_retries): try: - # 使用更长的超时时间,分别设置连接和读取超时 - resp = self.client.get_text( - url, - timeout=(10, 30), # (connect_timeout, read_timeout) - verify=False, - headers=headers, - ) - status_code = resp.status_code - content = resp.text - if status_code == 403: + resp = self.client.get_text(url, timeout=timeout, verify=False, headers=headers) + if resp.status_code == 403: if attempt < max_retries - 1: - wait_time = 2 ** attempt + random.uniform(0.3, 1.0) - print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") - self._refresh_session() + wait_time = (2 ** attempt) + random.uniform(0.3, 1.0) + print(f"请求403,{wait_time:.2f}s后重试 ({attempt + 1}/{max_retries}): {url}") + self._refresh_client() time.sleep(wait_time) continue print(f"请求失败 {url}: 403 Forbidden") return None - if status_code >= 400: - raise RequestClientError(f"{status_code} Error: {url}") - return content + if resp.status_code >= 400: + raise RequestClientError(f"{resp.status_code} Error: {url}") + return resp.text except RequestConnectTimeout as exc: if attempt < max_retries - 1: - wait_time = 2 ** attempt # 指数退避:2s, 4s, 8s - print(f"连接超时,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + wait_time = 2 ** attempt + print(f"连接超时,{wait_time}s后重试 ({attempt + 1}/{max_retries}): {url}") time.sleep(wait_time) - else: - print(f"连接超时,已达到最大重试次数 {url}: {exc}") - return None + continue + print(f"连接超时,已达到最大重试次数 {url}: {exc}") + return None except RequestTimeout as exc: if attempt < max_retries - 1: wait_time = 2 ** attempt - print(f"请求超时,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + print(f"请求超时,{wait_time}s后重试 ({attempt + 1}/{max_retries}): {url}") time.sleep(wait_time) - else: - print(f"请求超时,已达到最大重试次数 {url}: {exc}") - return None + continue + print(f"请求超时,已达到最大重试次数 {url}: {exc}") + return None except RequestConnectionError as exc: if attempt < max_retries - 1: wait_time = 2 ** attempt - print(f"连接错误,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + print(f"连接错误,{wait_time}s后重试 ({attempt + 1}/{max_retries}): {url}") time.sleep(wait_time) - else: - print(f"连接错误,已达到最大重试次数 {url}: {exc}") - return None + continue + print(f"连接错误,已达到最大重试次数 {url}: {exc}") + return None except RequestClientError as exc: print(f"请求失败 {url}: {exc}") return None - return None - def _parse_list(self, html: str, province: str, city: str, list_url: str) -> int: - soup = BeautifulSoup(html, "html.parser") - cards = soup.find_all("div", class_="lstx") - if not cards: - return 0 - - inserted = 0 - for card in cards: - link = card.find("a") - if not link or not link.get("href"): - continue - detail = self._parse_detail(link['href'], province, city, list_url) - if not detail: - continue - phone = detail.get("phone") - if not phone: - continue - condition = f"phone='{phone}' and domain='{DOMAIN}'" - if self.db.is_data_exist("lawyer", condition): - print(f" -- 已存在: {detail['name']} ({phone})") - time.sleep(0.3) - continue - try: - self.db.insert_data("lawyer", detail) - inserted += 1 - print(f" -> 新增: {detail['name']} ({phone})") - except Exception as exc: - print(f" 插入失败: {exc}") - time.sleep(1) - time.sleep(0.3) - # 列表页结束后再缓一缓,降低风控 - time.sleep(0.6) - return inserted - def _detail_headers(self, referer: str) -> Dict[str, str]: return { "Referer": referer, @@ -194,72 +166,215 @@ class DlsSpider: "Upgrade-Insecure-Requests": "1", } - def _parse_detail(self, path: str, province: str, city: str, list_url: str) -> Optional[Dict[str, str]]: - url = f"https://m.maxlaw.cn{path}" - print(f" 详情: {url}") - html = self._get(url, headers=self._detail_headers(list_url)) + def _extract_detail_urls(self, html: str) -> List[str]: + soup = BeautifulSoup(html, "html.parser") + urls: List[str] = [] + seen: Set[str] = set() + + # 主选择器:当前站点列表卡片 + for a_tag in soup.select("div.lstx a[href]"): + href = (a_tag.get("href") or "").strip() + if not href: + continue + url = urljoin(SITE_BASE, href) + if url in seen: + continue + seen.add(url) + urls.append(url) + + # 回退选择器:页面结构轻微变化时尽量保活 + if not urls: + for a_tag in soup.select("a[href]"): + href = (a_tag.get("href") or "").strip() + if "/lawyer/" not in href: + continue + url = urljoin(SITE_BASE, href) + if url in seen: + continue + seen.add(url) + urls.append(url) + return urls + + def _extract_name(self, soup: BeautifulSoup) -> str: + for selector in ("h2.lawyerName", "h1.lawyerName", "h1", "h2"): + tag = soup.select_one(selector) + if tag: + name = tag.get_text(strip=True) + if name: + return name + title = soup.title.get_text(strip=True) if soup.title else "" + match = re.search(r"(\S+律师)", title) + return match.group(1) if match else "" + + def _extract_law_firm(self, soup: BeautifulSoup) -> str: + for selector in ("p.law-firm", "div.law-firm", "p[class*=firm]"): + tag = soup.select_one(selector) + if tag: + text = tag.get_text(strip=True) + if text: + return text + page_text = soup.get_text(" ", strip=True) + match = re.search(r"(执业机构|律所)\s*[::]?\s*([^\s,。,;;]{2,40})", page_text) + if match: + return match.group(2).strip() + return "" + + def _normalize_phone(self, text: str) -> str: + compact = re.sub(r"\D", "", text or "") + match = PHONE_PATTERN.search(compact) + return match.group(0) if match else "" + + def _extract_phone(self, soup: BeautifulSoup) -> str: + contact = soup.select_one("ul.contact-content") + if contact: + phone = self._normalize_phone(contact.get_text(" ", strip=True)) + if phone: + return phone + for selector in ("a[href^='tel:']", "span.phone", "p.phone"): + tag = soup.select_one(selector) + if tag: + phone = self._normalize_phone(tag.get_text(" ", strip=True)) + if phone: + return phone + return self._normalize_phone(soup.get_text(" ", strip=True)) + + def _parse_detail(self, detail_url: str, province: str, city: str, list_url: str) -> Optional[Dict[str, str]]: + print(f" 详情: {detail_url}") + html = self._get(detail_url, headers=self._detail_headers(list_url)) if not html: return None soup = BeautifulSoup(html, "html.parser") - name_tag = soup.find("h2", class_="lawyerName") - law_firm_tag = soup.find("p", class_="law-firm") - contact_list = soup.find("ul", class_="contact-content") - - name = name_tag.get_text(strip=True) if name_tag else "" - law_firm = law_firm_tag.get_text(strip=True) if law_firm_tag else "" - phone = "" - - if contact_list: - items = contact_list.find_all("li") - if len(items) > 2: - phone_tag = items[2].find("p") - if phone_tag: - phone = phone_tag.get_text(strip=True) - phone = phone.split("咨询请说明来自大律师网")[0].strip() - - phone = phone.replace('-', '').strip() + name = self._extract_name(soup) + phone = self._extract_phone(soup) if not name or not phone: print(" 信息不完整,跳过") return None - safe_city = city if city else province + safe_city = city or province return { "name": name, - "law_firm": law_firm, + "law_firm": self._extract_law_firm(soup), "province": province, "city": safe_city, "phone": phone, - "url": url, + "url": detail_url, "domain": DOMAIN, "create_time": int(time.time()), - "params": json.dumps({"province": province, "city": safe_city}, ensure_ascii=False) + "params": json.dumps({"province": province, "city": safe_city}, ensure_ascii=False), } + def _existing_phones(self, phones: List[str]) -> Set[str]: + if not phones: + return set() + existing: Set[str] = set() + cur = self.db.db.cursor() + try: + chunk_size = 500 + for idx in range(0, len(phones), chunk_size): + chunk = phones[idx:idx + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cur.execute(sql, [DOMAIN, *chunk]) + for row in cur.fetchall(): + existing.add(row[0]) + finally: + cur.close() + return existing + + def _save_lawyers(self, lawyers: List[Dict[str, str]]) -> Tuple[int, int]: + if not lawyers: + return 0, 0 + phones = [row["phone"] for row in lawyers if row.get("phone")] + existing = self._existing_phones(phones) + inserted = 0 + skipped = 0 + + for row in lawyers: + phone = row.get("phone", "") + if not phone: + skipped += 1 + continue + if phone in existing: + skipped += 1 + print(f" -- 已存在: {row.get('name', '')} ({phone})") + continue + try: + self.db.insert_data("lawyer", row) + existing.add(phone) + inserted += 1 + print(f" -> 新增: {row.get('name', '')} ({phone})") + except Exception as exc: + skipped += 1 + print(f" 插入失败 {row.get('url', '')}: {exc}") + return inserted, skipped + + def _crawl_city(self, area: Dict[str, str]) -> Tuple[int, int]: + pinyin = (area.get("pinyin") or "").strip() + province = area.get("province", "") + city = area.get("city", "") + if not pinyin: + return 0, 0 + + total_inserted = 0 + total_parsed = 0 + page = 1 + prev_fingerprint = "" + + while True: + if MAX_PAGES_PER_CITY > 0 and page > MAX_PAGES_PER_CITY: + print(f"达到分页上限({MAX_PAGES_PER_CITY}),停止 {province}-{city}") + break + + list_url = LIST_TEMPLATE.format(pinyin=pinyin, page=page) + print(f"采集 {province}-{city} 第 {page} 页: {list_url}") + html = self._get(list_url) + if not html: + break + + detail_urls = self._extract_detail_urls(html) + if not detail_urls: + print(" 列表为空,结束当前城市") + break + + fingerprint = "|".join(detail_urls[:8]) + if fingerprint and fingerprint == prev_fingerprint: + print(" 列表页重复,提前停止当前城市") + break + prev_fingerprint = fingerprint + + lawyers: List[Dict[str, str]] = [] + for detail_url in detail_urls: + row = self._parse_detail(detail_url, province, city, list_url) + if row: + lawyers.append(row) + time.sleep(0.25) + + inserted, skipped = self._save_lawyers(lawyers) + total_inserted += inserted + total_parsed += len(lawyers) + print( + f" 第 {page} 页完成: 列表{len(detail_urls)}条, " + f"解析{len(lawyers)}条, 新增{inserted}条, 跳过{skipped}条" + ) + + page += 1 + time.sleep(0.5) + return total_inserted, total_parsed + def run(self): print("启动大律师采集...") if not self.areas: print("无地区数据") return + all_inserted = 0 + all_parsed = 0 for area in self.areas: - pinyin = area.get("pinyin") - province = area.get("province", "") - city = area.get("city", "") - if not pinyin: - continue - page = 1 - while True: - list_url = LIST_TEMPLATE.format(pinyin=pinyin, page=page) - print(f"采集 {province}-{city} 第 {page} 页: {list_url}") - html = self._get(list_url) - if not html: - break - inserted = self._parse_list(html, province, city, list_url) - if inserted == 0: - break - page += 1 - print("大律师采集完成") + inserted, parsed = self._crawl_city(area) + all_inserted += inserted + all_parsed += parsed + print(f"大律师采集完成: 解析{all_parsed}条, 新增{all_inserted}条") if __name__ == "__main__": diff --git a/common_sites/dls_fresh.py b/common_sites/dls_fresh.py new file mode 100644 index 0000000..d4a4347 --- /dev/null +++ b/common_sites/dls_fresh.py @@ -0,0 +1,621 @@ +import argparse +import hashlib +import json +import os +import random +import re +import sys +import time +from dataclasses import dataclass, field +from typing import Dict, Iterable, List, Optional, Set, Tuple +from urllib.parse import urljoin + +from bs4 import BeautifulSoup +import urllib3 + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +from request.requests_client import RequestClientError, RequestsClient +from utils.rate_limiter import wait_for_request +from Db import Db + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +SITE_NAME = "maxlaw" +LEGACY_DOMAIN = "大律师" +SITE_BASE = "https://m.maxlaw.cn" +CITY_API = "https://js.maxlaw.cn/js/ajax/common/getprovice.js" +CITY_DETAIL_API = "https://js.maxlaw.cn/js/ajax/common/getcity_{province_id}.js" +LIST_URL_TEMPLATE = SITE_BASE + "/law/{city_py}?page={page}" + +PHONE_RE = re.compile(r"1[3-9]\d{9}") +ANSWER_RE = re.compile(r"已解答\s*(\d+)\s*次") + + +@dataclass +class CityTarget: + province_id: int + province_name: str + province_py: str + city_id: int + city_name: str + city_py: str + + +@dataclass +class ListCard: + detail_url: str + name: str = "" + law_firm: str = "" + specialties: List[str] = field(default_factory=list) + answered_count: Optional[int] = None + + +def clean_prefixed_name(value: str) -> str: + text = (value or "").strip() + # 接口返回常见格式如 "B 北京" + text = re.sub(r"^[A-Za-z]\s*", "", text) + return text.strip() + + +def normalize_phone(text: str) -> str: + compact = re.sub(r"\D", "", text or "") + match = PHONE_RE.search(compact) + return match.group(0) if match else "" + + +def parse_json_with_bom(text: str) -> Dict: + cleaned = (text or "").strip().lstrip("\ufeff") + return json.loads(cleaned) + + +class DlsFreshCrawler: + def __init__( + self, + max_pages: int = 3, + sleep_seconds: float = 0.2, + use_proxy: bool = True, + db_connection=None, + ): + self.max_pages = max_pages + self.sleep_seconds = max(0.0, sleep_seconds) + self.db = db_connection + self.client = RequestsClient( + headers={ + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Connection": "close", + }, + use_proxy=use_proxy, + retry_total=2, + retry_backoff_factor=1, + retry_status_forcelist=(429, 500, 502, 503, 504), + retry_allowed_methods=("GET",), + ) + + def _get_text(self, url: str, timeout: int = 20, max_retries: int = 3) -> str: + last_error: Optional[Exception] = None + for attempt in range(max_retries): + wait_for_request() + try: + resp = self.client.get_text(url, timeout=timeout, verify=False) + code = resp.status_code + if code == 403: + if attempt < max_retries - 1: + self.client.refresh() + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise RequestClientError(f"{code} Error: {url}") + if code >= 500 and attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + if code >= 400: + raise RequestClientError(f"{code} Error: {url}") + return resp.text + except Exception as exc: + last_error = exc + if attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise + if last_error is not None: + raise last_error + raise RequestClientError(f"Unknown request error: {url}") + + def discover_cities(self) -> List[CityTarget]: + province_text = self._get_text(CITY_API) + province_data = parse_json_with_bom(province_text) + province_rows = province_data.get("ds", []) or [] + + cities: List[CityTarget] = [] + seen_py: Set[str] = set() + + for province in province_rows: + province_id = int(province.get("id")) + province_name = clean_prefixed_name(province.get("name", "")) + province_py = (province.get("py_code") or "").strip() + if not province_py: + continue + + city_api = CITY_DETAIL_API.format(province_id=province_id) + try: + city_text = self._get_text(city_api) + city_data = parse_json_with_bom(city_text) + except Exception as exc: + print(f"[city] 获取失败 pid={province_id}: {exc}") + continue + + for city in city_data.get("ds", []) or []: + city_py = (city.get("py_code") or "").strip() + if not city_py or city_py in seen_py: + continue + seen_py.add(city_py) + cities.append( + CityTarget( + province_id=province_id, + province_name=province_name, + province_py=province_py, + city_id=int(city.get("id")), + city_name=clean_prefixed_name(city.get("name", "")), + city_py=city_py, + ) + ) + + return cities + + def parse_list_cards(self, html: str) -> List[ListCard]: + soup = BeautifulSoup(html, "html.parser") + cards: List[ListCard] = [] + seen: Set[str] = set() + + for item in soup.select("div.lawyer_list ul.lawyer_ul > li"): + link = item.select_one("div.lstx a[href]") + if not link: + continue + detail_url = urljoin(SITE_BASE, link.get("href", "").strip()) + if not detail_url or detail_url in seen: + continue + seen.add(detail_url) + + name = "" + law_firm = "" + specialties: List[str] = [] + answered_count = None + + name_tag = item.select_one("p.name") + if name_tag: + name = name_tag.get_text(strip=True) + + firm_tag = item.select_one("div.li_r h2") + if firm_tag: + law_firm = firm_tag.get_text(strip=True) + + for span in item.select("div.zc span"): + text = span.get_text(strip=True) + if text: + specialties.append(text) + + distance_text = item.select_one("div.distance i") + if distance_text: + match = ANSWER_RE.search(distance_text.get_text(" ", strip=True)) + if match: + answered_count = int(match.group(1)) + + cards.append( + ListCard( + detail_url=detail_url, + name=name, + law_firm=law_firm, + specialties=specialties, + answered_count=answered_count, + ) + ) + return cards + + def has_next_page(self, html: str) -> bool: + soup = BeautifulSoup(html, "html.parser") + return soup.select_one("a.mnext") is not None + + def parse_detail(self, detail_url: str) -> Dict: + html = self._get_text(detail_url) + soup = BeautifulSoup(html, "html.parser") + + name = "" + law_firm = "" + license_no = "" + practice_years = None + phone = "" + email = "" + address = "" + specialties: List[str] = [] + + name_tag = soup.select_one("h2.lawyerName") + if name_tag: + name = name_tag.get_text(strip=True) + + firm_tag = soup.select_one("p.law-firm") + if firm_tag: + law_firm = firm_tag.get_text(strip=True) + + license_tag = soup.select_one("p.card-zyz") + if license_tag: + license_no = ( + license_tag.get_text(" ", strip=True) + .replace("执业证号:", "") + .replace("执业证号:", "") + .strip() + ) + + years_tag = soup.select_one("div#practice i") + if years_tag: + year_text = years_tag.get_text(strip=True) + if year_text.isdigit(): + practice_years = int(year_text) + + tel_tag = soup.select_one("a[href^='tel:']") + if tel_tag: + phone = normalize_phone(tel_tag.get("href", "")) + + for li in soup.select("ul.contact-content > li"): + key = li.select_one("i") + val = li.select_one("p") + if not key or not val: + continue + k = key.get_text(strip=True).replace(":", ":") + v = val.get_text(" ", strip=True) + if "电话" in k and not phone: + phone = normalize_phone(v) + elif "邮箱" in k and not email: + email = v.strip() + elif "地址" in k and not address: + address = v.strip() + + for node in soup.select("div.exp-main li.on"): + text = node.get_text(strip=True) + if text: + specialties.append(text) + + return { + "name": name, + "law_firm": law_firm, + "license_no": license_no, + "practice_years": practice_years, + "phone": phone, + "email": email, + "address": address, + "specialties": specialties, + "detail_url": detail_url, + } + + def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]: + profile = record.get("profile", {}) or {} + source = record.get("source", {}) or {} + + phone = normalize_phone(profile.get("phone", "")) + if not phone: + return None + + province = (source.get("province") or "").strip() + city = (source.get("city") or province).strip() + return { + "name": (profile.get("name") or "").strip(), + "law_firm": (profile.get("law_firm") or "").strip(), + "province": province, + "city": city, + "phone": phone, + "url": (source.get("detail_url") or "").strip(), + "domain": LEGACY_DOMAIN, + "create_time": int(record.get("collected_at") or time.time()), + "params": json.dumps(record, ensure_ascii=False), + } + + def _existing_phones_in_db(self, phones: List[str]) -> Set[str]: + if not self.db or not phones: + return set() + deduped = sorted({p for p in phones if p}) + if not deduped: + return set() + + existing: Set[str] = set() + cur = self.db.db.cursor() + try: + chunk_size = 500 + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cur.execute(sql, [LEGACY_DOMAIN, *chunk]) + for row in cur.fetchall(): + existing.add(row[0]) + finally: + cur.close() + return existing + + def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: + if not self.db: + return 0, 0 + + rows: List[Dict[str, str]] = [] + for record in records: + row = self._to_legacy_lawyer_row(record) + if row: + rows.append(row) + if not rows: + return 0, 0 + + existing = self._existing_phones_in_db([row["phone"] for row in rows]) + inserted = 0 + skipped = 0 + + for row in rows: + phone = row.get("phone", "") + if not phone or phone in existing: + skipped += 1 + continue + try: + self.db.insert_data("lawyer", row) + existing.add(phone) + inserted += 1 + except Exception as exc: + skipped += 1 + print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") + return inserted, skipped + + def crawl_city(self, target: CityTarget) -> Iterable[Dict]: + # 同一城市内去重,避免站点分页回流导致重复抓取 + seen_detail_urls: Set[str] = set() + last_page_signature: Tuple[str, ...] = tuple() + repeated_signature_pages = 0 + no_new_pages = 0 + + for page in range(1, self.max_pages + 1): + list_url = LIST_URL_TEMPLATE.format(city_py=target.city_py, page=page) + try: + html = self._get_text(list_url) + except Exception as exc: + print(f"[list] 失败 {list_url}: {exc}") + break + + cards = self.parse_list_cards(html) + if not cards: + break + + page_signature = tuple(sorted(card.detail_url for card in cards if card.detail_url)) + if page_signature and page_signature == last_page_signature: + repeated_signature_pages += 1 + else: + repeated_signature_pages = 0 + last_page_signature = page_signature + + if repeated_signature_pages >= 2: + print( + f"[list] 城市 {target.city_py} 第{page}页列表签名重复,提前结束," + f"list_url={list_url}" + ) + break + + fresh_cards: List[ListCard] = [] + for card in cards: + if not card.detail_url: + continue + if card.detail_url in seen_detail_urls: + continue + seen_detail_urls.add(card.detail_url) + fresh_cards.append(card) + + if not fresh_cards: + no_new_pages += 1 + if no_new_pages >= 3: + print( + f"[list] 城市 {target.city_py} 连续{no_new_pages}页无新增律师,提前结束," + f"list_url={list_url}" + ) + break + else: + no_new_pages = 0 + + print( + f"[page] city={target.city_py} page={page} cards={len(cards)} " + f"fresh={len(fresh_cards)} next={self.has_next_page(html)}" + ) + + for card in fresh_cards: + try: + detail = self.parse_detail(card.detail_url) + except Exception as exc: + print(f"[detail] 失败 {card.detail_url}: {exc}") + continue + + now = int(time.time()) + record_id = hashlib.md5(card.detail_url.encode("utf-8")).hexdigest() + yield { + "record_id": record_id, + "collected_at": now, + "source": { + "site": SITE_NAME, + "list_url": list_url, + "detail_url": card.detail_url, + "province": target.province_name, + "province_py": target.province_py, + "city": target.city_name, + "city_py": target.city_py, + "page": page, + }, + "list_snapshot": { + "name": card.name, + "law_firm": card.law_firm, + "specialties": card.specialties, + "answered_count": card.answered_count, + }, + "profile": { + "name": detail.get("name") or card.name, + "law_firm": detail.get("law_firm") or card.law_firm, + "phone": detail.get("phone", ""), + "license_no": detail.get("license_no", ""), + "practice_years": detail.get("practice_years"), + "email": detail.get("email", ""), + "address": detail.get("address", ""), + "specialties": detail.get("specialties") or card.specialties, + }, + } + if self.sleep_seconds: + time.sleep(self.sleep_seconds) + + if not self.has_next_page(html): + break + + def crawl( + self, + output_path: str, + max_cities: int = 0, + city_filter: Optional[str] = None, + ) -> None: + cities = self.discover_cities() + print(f"[discover] 共发现城市 {len(cities)} 个") + if city_filter: + key = city_filter.strip().lower() + cities = [c for c in cities if key in c.city_py.lower() or key in c.city_name.lower()] + print(f"[discover] 过滤后城市 {len(cities)} 个, filter={city_filter}") + if max_cities > 0: + cities = cities[:max_cities] + print(f"[discover] 截断城市数 {len(cities)}") + + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + + seen_ids: Set[str] = set() + if os.path.exists(output_path): + with open(output_path, "r", encoding="utf-8") as old_file: + for line in old_file: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + except Exception: + continue + rid = item.get("record_id") + if rid: + seen_ids.add(rid) + print(f"[resume] 已有记录 {len(seen_ids)} 条") + + total_new_json = 0 + total_new_db = 0 + total_skip_db = 0 + with open(output_path, "a", encoding="utf-8") as out: + for idx, target in enumerate(cities, start=1): + print( + f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} " + f"({target.city_py})" + ) + city_records = list(self.crawl_city(target)) + + city_new_json = 0 + for record in city_records: + rid = record["record_id"] + if rid in seen_ids: + continue + out.write(json.dumps(record, ensure_ascii=False) + "\n") + seen_ids.add(rid) + city_new_json += 1 + total_new_json += 1 + + city_new_db, city_skip_db = self._write_records_to_db(city_records) + total_new_db += city_new_db + total_skip_db += city_skip_db + + print( + f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " + f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" + ) + print( + f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " + f"DB跳过{total_skip_db}条, 输出: {output_path}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="大律师全新采集脚本(新数据结构)") + parser.add_argument( + "--output", + default="/www/wwwroot/lawyers/data/dls_records_all.jsonl", + help="输出 jsonl 文件路径", + ) + parser.add_argument( + "--max-cities", + type=int, + default=0, + help="最多采集多少个城市,0 表示不限", + ) + parser.add_argument( + "--max-pages", + type=int, + default=9999, + help="每个城市最多采集多少页", + ) + parser.add_argument( + "--city-filter", + default="", + help="按城市拼音或城市名过滤,如 beijing", + ) + parser.add_argument( + "--sleep", + type=float, + default=0.2, + help="详情页请求间隔秒数", + ) + parser.add_argument( + "--direct", + action="store_true", + help="直连模式,不使用 proxy_settings.json 代理", + ) + parser.add_argument( + "--no-db", + action="store_true", + help="只输出 JSONL,不写入数据库", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + if args.no_db: + crawler = DlsFreshCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=None, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) + return + + with Db() as db: + crawler = DlsFreshCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=db, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) + + +if __name__ == "__main__": + main() diff --git a/common_sites/export_lawyers_excel.py b/common_sites/export_lawyers_excel.py new file mode 100644 index 0000000..739734c --- /dev/null +++ b/common_sites/export_lawyers_excel.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +import sys +import time +from datetime import datetime +from typing import Dict, List, Optional + +import pymysql +from openpyxl import Workbook +from openpyxl.styles import Font + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +if project_root not in sys.path: + sys.path.insert(0, project_root) + +from Db import Db + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="导出律师数据到 Excel") + parser.add_argument( + "--output", + default="", + help="输出 xlsx 文件路径,默认输出到 data/export_lawyers_时间戳.xlsx", + ) + parser.add_argument( + "--start-ts", + type=int, + default=None, + help="create_time 起始时间戳(含),不传时默认取最近7天", + ) + parser.add_argument( + "--end-ts", + type=int, + default=None, + help="create_time 结束时间戳(含),默认不限制上限", + ) + parser.add_argument( + "--domain", + default="", + help="按 domain 过滤,例如:大律师 / 找法网 / 华律", + ) + parser.add_argument( + "--province", + default="", + help="按省份过滤,例如:北京、广东", + ) + parser.add_argument( + "--city", + default="", + help="按城市过滤,例如:北京、深圳", + ) + parser.add_argument( + "--keyword", + default="", + help="关键词过滤(匹配姓名/律所/手机号)", + ) + parser.add_argument( + "--limit", + type=int, + default=0, + help="最多导出多少条,0 表示不限", + ) + parser.add_argument( + "--include-extra", + action="store_true", + help="导出更多扩展字段(url/domain/create_time/site_time 等)", + ) + parser.add_argument( + "--no-parse-params", + action="store_true", + help="关闭 params JSON 扩展信息解析(默认开启)", + ) + return parser.parse_args() + + +def apply_default_time_filter(args: argparse.Namespace) -> None: + # 未显式传时间范围时,默认导出最近7天的数据 + if args.start_ts is None and args.end_ts is None: + args.start_ts = int(time.time()) - 7 * 24 * 3600 + args.end_ts = 0 + return + if args.start_ts is None: + args.start_ts = 0 + if args.end_ts is None: + args.end_ts = 0 + + +def build_output_path(user_output: str) -> str: + if user_output: + return os.path.abspath(user_output) + ts = int(time.time()) + return os.path.abspath(f"/www/wwwroot/lawyers/data/export_lawyers_{ts}.xlsx") + + +def ts_to_text(ts_value: Optional[int]) -> str: + if ts_value in (None, 0, ""): + return "" + try: + return datetime.fromtimestamp(int(ts_value)).strftime("%Y-%m-%d %H:%M:%S") + except Exception: + return "" + + +def build_query(args: argparse.Namespace) -> (str, List): + where: List[str] = [] + params: List = [] + + if args.start_ts > 0: + where.append("create_time >= %s") + params.append(args.start_ts) + if args.end_ts > 0: + where.append("create_time <= %s") + params.append(args.end_ts) + if args.domain.strip(): + where.append("domain = %s") + params.append(args.domain.strip()) + if args.province.strip(): + where.append("province = %s") + params.append(args.province.strip()) + if args.city.strip(): + where.append("city = %s") + params.append(args.city.strip()) + if args.keyword.strip(): + like = f"%{args.keyword.strip()}%" + where.append("(name LIKE %s OR law_firm LIKE %s OR phone LIKE %s)") + params.extend([like, like, like]) + + where_sql = f"WHERE {' AND '.join(where)}" if where else "" + limit_sql = f"LIMIT {int(args.limit)}" if args.limit and args.limit > 0 else "" + sql = ( + "SELECT id, name, phone, law_firm, province, city, url, domain, " + "create_time, site_time, params " + f"FROM lawyer {where_sql} ORDER BY id ASC {limit_sql}" + ) + return sql, params + + +def parse_params(params_text: str) -> Dict[str, str]: + if not params_text: + return {} + try: + data = json.loads(params_text) + except Exception: + return {} + if not isinstance(data, dict): + return {} + + profile = data.get("profile") or {} + source = data.get("source") or {} + if not isinstance(profile, dict): + profile = {} + if not isinstance(source, dict): + source = {} + specialties = profile.get("specialties") + if isinstance(specialties, list): + specialties_text = ",".join(str(x) for x in specialties if x) + else: + specialties_text = "" + + return { + "email": str(profile.get("email") or ""), + "address": str(profile.get("address") or ""), + "license_no": str(profile.get("license_no") or ""), + "practice_years": str(profile.get("practice_years") or ""), + "specialties": specialties_text, + "source_site": str(source.get("site") or ""), + "detail_url": str(source.get("detail_url") or ""), + "list_url": str(source.get("list_url") or ""), + } + + +def export_to_excel(rows: List[Dict], output_path: str, include_extra: bool, parse_params_flag: bool) -> int: + wb = Workbook() + ws = wb.active + ws.title = "lawyers" + + headers = ["手机号", "姓名", "律所", "省份", "市区", "站点名称", "domain"] + if include_extra: + headers.extend( + [ + "URL", + "站点", + "create_time", + "create_time_text", + "site_time", + "site_time_text", + "ID", + ] + ) + if parse_params_flag: + headers.extend( + [ + "邮箱", + "地址", + "执业证号", + "执业年限", + "擅长领域", + "source_site", + "detail_url", + "list_url", + ] + ) + + ws.append(headers) + for cell in ws[1]: + cell.font = Font(bold=True) + + exported = 0 + for row in rows: + info = parse_params(row.get("params", "") or "") if parse_params_flag else {} + site_name = info.get("source_site") or (row.get("domain", "") or "") + line = [ + row.get("phone", "") or "", + row.get("name", "") or "", + row.get("law_firm", "") or "", + row.get("province", "") or "", + row.get("city", "") or "", + site_name, + row.get("domain", "") or "", + ] + + if include_extra: + line.extend( + [ + row.get("url", "") or "", + row.get("domain", "") or "", + row.get("create_time", "") or "", + ts_to_text(row.get("create_time")), + row.get("site_time", "") or "", + ts_to_text(row.get("site_time")), + row.get("id", "") or "", + ] + ) + + if parse_params_flag: + line.extend( + [ + info.get("email", ""), + info.get("address", ""), + info.get("license_no", ""), + info.get("practice_years", ""), + info.get("specialties", ""), + info.get("source_site", ""), + info.get("detail_url", ""), + info.get("list_url", ""), + ] + ) + + ws.append(line) + exported += 1 + + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + wb.save(output_path) + return exported + + +def main() -> None: + args = parse_args() + apply_default_time_filter(args) + output_path = build_output_path(args.output) + sql, sql_params = build_query(args) + + with Db() as db: + cursor = db.db.cursor(pymysql.cursors.DictCursor) + try: + cursor.execute(sql, sql_params) + rows = cursor.fetchall() + finally: + cursor.close() + + count = export_to_excel( + rows=rows, + output_path=output_path, + include_extra=args.include_extra, + parse_params_flag=not args.no_parse_params, + ) + + print(f"[export] 导出完成,共 {count} 条") + print(f"[export] 文件路径: {output_path}") + print( + f"[export] 时间筛选 create_time: start={args.start_ts or '-'} end={args.end_ts or '-'}" + ) + + +if __name__ == "__main__": + main() diff --git a/common_sites/findlaw.py b/common_sites/findlaw.py index 50947cf..b972893 100644 --- a/common_sites/findlaw.py +++ b/common_sites/findlaw.py @@ -1,9 +1,16 @@ +import argparse +import ast +import hashlib import json import os +import random +import re import sys import time -import random -from typing import Dict, List, Set, Optional +from dataclasses import dataclass +from typing import Dict, Iterable, List, Optional, Set, Tuple + +import urllib3 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) @@ -13,197 +20,460 @@ if request_dir not in sys.path: if project_root not in sys.path: sys.path.append(project_root) -from request.requests_client import RequestClientError, RequestSSLError, RequestsClient from Db import Db +from request.requests_client import RequestClientError, RequestsClient +from utils.rate_limiter import wait_for_request -DOMAIN = "找法网" -LIST_TEMPLATE = "https://m.findlaw.cn/{pinyin}/q_lawyer/p{page}?ajax=1&order=0&sex=-1" +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +SITE_NAME = "findlaw" +LEGACY_DOMAIN = "找法网" +SITE_BASE = "https://m.findlaw.cn" +CITY_DATA_URL = "https://static.findlawimg.com/minify/?b=js&f=m/public/v1/areaDataTwo.js" +LIST_URL_TEMPLATE = SITE_BASE + "/{city_py}/q_lawyer/p{page}?ajax=1&order=0&sex=-1" + +PHONE_RE = re.compile(r"1[3-9]\d{9}") -class FindlawSpider: - def __init__(self, db_connection): +@dataclass +class CityTarget: + province_id: str + province_name: str + province_py: str + city_id: str + city_name: str + city_py: str + + +def normalize_phone(text: str) -> str: + compact = re.sub(r"\D", "", text or "") + match = PHONE_RE.search(compact) + return match.group(0) if match else "" + + +class FindlawCrawler: + def __init__( + self, + max_pages: int = 9999, + sleep_seconds: float = 0.1, + use_proxy: bool = True, + db_connection=None, + ): + self.max_pages = max_pages + self.sleep_seconds = max(0.0, sleep_seconds) self.db = db_connection - self.client = self._build_session() - self.cities = self._load_cities() + self.client = RequestsClient( + headers={ + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), + "Accept": "application/json, text/javascript, */*; q=0.01", + "X-Requested-With": "XMLHttpRequest", + "Connection": "close", + }, + use_proxy=use_proxy, + retry_total=2, + retry_backoff_factor=1, + retry_status_forcelist=(429, 500, 502, 503, 504), + retry_allowed_methods=("GET",), + ) - def _build_session(self) -> RequestsClient: - return RequestsClient(headers={ - "User-Agent": ( - "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " - "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " - "Mobile/15E148 Safari/604.1" - ), - "Accept": "application/json, text/javascript, */*; q=0.01", - "X-Requested-With": "XMLHttpRequest", - "Connection": "close", - }) - - def _refresh_session(self) -> None: - self.client.refresh() - - def _get(self, url: str, referer: str, verify: bool = True, max_retries: int = 3) -> Optional[str]: + def _get_text( + self, + url: str, + timeout: int = 20, + max_retries: int = 3, + referer: str = SITE_BASE, + ) -> str: headers = {"Referer": referer} - for attempt in range(max_retries): - try: - resp = self.client.get_text(url, timeout=15, verify=verify, headers=headers) - status_code = resp.status_code - text = resp.text - if status_code == 403: - if attempt < max_retries - 1: - wait_time = 2 ** attempt + random.uniform(0.3, 1.0) - print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") - self._refresh_session() - time.sleep(wait_time) - continue - print(f"请求失败 {url}: 403 Forbidden") - return None - if status_code >= 400: - raise RequestClientError(f"{status_code} Error: {url}") - return text - except RequestSSLError: - if verify: - return self._get(url, referer, verify=False, max_retries=max_retries) - print(f"SSL错误 {url}") - return None - except RequestClientError as exc: - print(f"请求失败 {url}: {exc}") - return None - return None + last_error: Optional[Exception] = None - def _existing_phones(self, phones: List[str]) -> Set[str]: - if not phones: + for attempt in range(max_retries): + wait_for_request() + try: + resp = self.client.get_text(url, timeout=timeout, verify=False, headers=headers) + code = resp.status_code + if code == 403: + if attempt < max_retries - 1: + self.client.refresh() + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise RequestClientError(f"{code} Error: {url}") + if code >= 500 and attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + if code >= 400: + raise RequestClientError(f"{code} Error: {url}") + return resp.text + except Exception as exc: + last_error = exc + if attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise + + if last_error is not None: + raise last_error + raise RequestClientError(f"Unknown request error: {url}") + + def _parse_city_js_array(self, script_text: str, var_name: str) -> List[Dict]: + pattern = rf"var\s+{re.escape(var_name)}\s*=\s*(\[[\s\S]*?\]);" + match = re.search(pattern, script_text) + if not match: + return [] + raw = match.group(1) + try: + rows = ast.literal_eval(raw) + return rows if isinstance(rows, list) else [] + except Exception: + return [] + + def discover_cities(self) -> List[CityTarget]: + js_text = self._get_text(CITY_DATA_URL, referer=SITE_BASE + "/findlawyers/") + provinces = self._parse_city_js_array(js_text, "iosProvinces") + cities = self._parse_city_js_array(js_text, "iosCitys") + + province_map: Dict[str, Dict] = {} + for item in provinces: + pid = str(item.get("id") or "").strip() + if pid: + province_map[pid] = item + + results: List[CityTarget] = [] + seen_py: Set[str] = set() + for city in cities: + city_py = str(city.get("pinyin") or "").strip() + city_name = str(city.get("value") or "").strip() + city_id = str(city.get("id") or "").strip() + province_id = str(city.get("parentId") or "").strip() + if not city_py or not city_name or not city_id: + continue + if city_py in seen_py: + continue + seen_py.add(city_py) + + province_row = province_map.get(province_id, {}) + province_name = str(province_row.get("value") or city_name).strip() + province_py = str(province_row.get("pinyin") or city_py).strip() + + results.append( + CityTarget( + province_id=province_id, + province_name=province_name, + province_py=province_py, + city_id=city_id, + city_name=city_name, + city_py=city_py, + ) + ) + return results + + def _parse_list_payload(self, text: str) -> Dict: + cleaned = (text or "").strip().lstrip("\ufeff") + try: + return json.loads(cleaned) + except ValueError: + start = cleaned.find("{") + end = cleaned.rfind("}") + if start == -1 or end == -1: + return {} + return json.loads(cleaned[start:end + 1]) + + def fetch_list_page(self, city_py: str, page: int) -> Tuple[List[Dict], bool, str]: + list_url = LIST_URL_TEMPLATE.format(city_py=city_py, page=page) + referer = f"{SITE_BASE}/{city_py}/q_lawyer/" + text = self._get_text(list_url, referer=referer) + payload = self._parse_list_payload(text) + if payload.get("errcode") != 0: + return [], False, list_url + + data = payload.get("data", {}) or {} + items = data.get("lawyer_list", []) or [] + has_more = str(data.get("has_more", "0")) == "1" + return items, has_more, list_url + + def crawl_city(self, target: CityTarget) -> Iterable[Dict]: + for page in range(1, self.max_pages + 1): + try: + items, has_more, list_url = self.fetch_list_page(target.city_py, page) + except Exception as exc: + print(f"[list] 失败 {target.city_py} p{page}: {exc}") + break + + if not items: + break + + for item in items: + detail_url = item.get("siteask_m") or item.get("site_url") or "" + detail_url = str(detail_url).strip() + if not detail_url.startswith("http"): + detail_url = list_url + + phone = normalize_phone(item.get("mobile", "")) + profile = { + "uid": str(item.get("uid") or ""), + "name": str(item.get("username") or "").strip(), + "law_firm": str(item.get("lawyer_lawroom") or "").strip(), + "phone": phone, + "lawyer_year": item.get("lawyer_year"), + "service_area": str(item.get("service_area") or "").strip(), + "address": str(item.get("addr") or "").strip(), + "specialties": item.get("professionArr") or [], + "answer_count": item.get("ansnum"), + "comment_count": item.get("askcommentnum"), + } + + now = int(time.time()) + uid = profile.get("uid", "") + record_key = uid or detail_url + record_id = hashlib.md5(record_key.encode("utf-8")).hexdigest() + + area = item.get("areaInfo", {}) or {} + yield { + "record_id": record_id, + "collected_at": now, + "source": { + "site": SITE_NAME, + "list_url": list_url, + "detail_url": detail_url, + "province": str(area.get("province") or target.province_name), + "province_py": target.province_py, + "city": str(area.get("city") or target.city_name), + "city_py": target.city_py, + "page": page, + }, + "list_snapshot": { + "uid": uid, + "name": profile["name"], + "law_firm": profile["law_firm"], + "answer_count": profile["answer_count"], + "comment_count": profile["comment_count"], + }, + "profile": profile, + "raw": item, + } + if self.sleep_seconds: + time.sleep(self.sleep_seconds) + + if not has_more: + break + + def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]: + source = record.get("source", {}) or {} + profile = record.get("profile", {}) or {} + phone = normalize_phone(profile.get("phone", "")) + if not phone: + return None + + province = (source.get("province") or "").strip() + city = (source.get("city") or province).strip() + return { + "name": (profile.get("name") or "").strip(), + "law_firm": (profile.get("law_firm") or "").strip(), + "province": province, + "city": city, + "phone": phone, + "url": (source.get("detail_url") or source.get("list_url") or "").strip(), + "domain": LEGACY_DOMAIN, + "create_time": int(record.get("collected_at") or time.time()), + "params": json.dumps(record, ensure_ascii=False), + } + + def _existing_phones_in_db(self, phones: List[str]) -> Set[str]: + if not self.db or not phones: return set() + deduped = sorted({p for p in phones if p}) + if not deduped: + return set() + existing: Set[str] = set() cur = self.db.db.cursor() try: chunk_size = 500 - for i in range(0, len(phones), chunk_size): - chunk = phones[i:i + chunk_size] + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] placeholders = ",".join(["%s"] * len(chunk)) sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" - cur.execute(sql, [DOMAIN, *chunk]) + cur.execute(sql, [LEGACY_DOMAIN, *chunk]) for row in cur.fetchall(): existing.add(row[0]) finally: cur.close() return existing - def _load_cities(self): - condition = "domain='findlaw' AND level=2" - tables = ("area_new", "area2", "area") - last_error = None - for table in tables: + def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: + if not self.db: + return 0, 0 + + rows: List[Dict[str, str]] = [] + for record in records: + row = self._to_legacy_lawyer_row(record) + if row: + rows.append(row) + if not rows: + return 0, 0 + + existing = self._existing_phones_in_db([row["phone"] for row in rows]) + inserted = 0 + skipped = 0 + for row in rows: + phone = row.get("phone", "") + if not phone or phone in existing: + skipped += 1 + continue try: - rows = self.db.select_data(table, "city, province, pinyin", condition) or [] + self.db.insert_data("lawyer", row) + existing.add(phone) + inserted += 1 except Exception as exc: - last_error = exc - continue - if rows: - missing_pinyin = sum(1 for r in rows if not (r.get("pinyin") or "").strip()) - print(f"[找法网] 城市来源表: {table}, 城市数: {len(rows)}, 缺少pinyin: {missing_pinyin}") - return rows + skipped += 1 + print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") + return inserted, skipped - if last_error: - print(f"[找法网] 加载地区数据失败: {last_error}") - print("[找法网] 无城市数据(已尝试 area_new/area2/area)") - for table in tables: - try: - cnt = self.db.select_data(table, "COUNT(*) AS cnt", condition) - c = (cnt[0].get("cnt") if cnt else 0) if isinstance(cnt, (list, tuple)) else 0 - print(f"[找法网] 校验: {table} 满足条件记录数: {c}") - except Exception: - pass - return [] + def crawl( + self, + output_path: str, + max_cities: int = 0, + city_filter: Optional[str] = None, + ) -> None: + cities = self.discover_cities() + print(f"[discover] 共发现城市 {len(cities)} 个") + if city_filter: + key = city_filter.strip().lower() + cities = [c for c in cities if key in c.city_py.lower() or key in c.city_name.lower()] + print(f"[discover] 过滤后城市 {len(cities)} 个, filter={city_filter}") + if max_cities > 0: + cities = cities[:max_cities] + print(f"[discover] 截断城市数 {len(cities)}") - def _fetch_page(self, url: str, referer: str) -> List[Dict]: - text = self._get(url, referer, verify=True) - if not text: - return [] + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) - try: - # 某些返回体前会携带 BOM 或包装脚本,此处做兼容 - text = text.strip().lstrip("\ufeff") - try: - data = json.loads(text) - except ValueError: - json_start = text.find('{') - json_end = text.rfind('}') - if json_start == -1 or json_end == -1: - print(f"解析JSON失败 {url}: 返回内容开头: {text[:80]!r}") - return [] - cleaned = text[json_start:json_end + 1] - data = json.loads(cleaned) - if isinstance(data, str): - try: - data = json.loads(data) - except ValueError: - print(f"解析JSON失败 {url}: 二次解析仍为字符串,开头: {str(data)[:80]!r}") - return [] - except ValueError as exc: - print(f"解析JSON失败 {url}: {exc}") - return [] - - items = data.get("data", {}).get("lawyer_list", []) - parsed = [] - for item in items: - phone = (item.get("mobile") or "").replace("-", "") - parsed.append({ - "name": item.get("username", ""), - "law_firm": item.get("lawyer_lawroom", ""), - "province": item.get("areaInfo", {}).get("province", ""), - "city": item.get("areaInfo", {}).get("city", ""), - "phone": phone, - "url": url, - "domain": DOMAIN, - "create_time": int(time.time()), - "params": json.dumps(item, ensure_ascii=False) - }) - return parsed - - def run(self): - print("启动找法网采集...") - if not self.cities: - print("无城市数据") - return - - for city in self.cities: - pinyin = city.get("pinyin") - province = city.get("province", "") - city_name = city.get("city", "") - if not pinyin: - continue - print(f"采集 {province}-{city_name}") - page = 1 - while True: - url = LIST_TEMPLATE.format(pinyin=pinyin, page=page) - referer = f"https://m.findlaw.cn/{pinyin}/q_lawyer/" - print(f" 第 {page} 页: {url}") - items = self._fetch_page(url, referer) - if not items: - break - - phones = [it.get("phone") for it in items if (it.get("phone") or "").strip()] - existing = self._existing_phones(phones) - - for entry in items: - phone = entry.get("phone") - if not phone: - continue - if phone in existing: - print(f" -- 已存在: {entry['name']} ({phone})") + seen_ids: Set[str] = set() + if os.path.exists(output_path): + with open(output_path, "r", encoding="utf-8") as old_file: + for line in old_file: + line = line.strip() + if not line: continue try: - self.db.insert_data("lawyer", entry) - print(f" -> 新增: {entry['name']} ({phone})") - except Exception as exc: - print(f" 插入失败: {exc}") + item = json.loads(line) + except Exception: + continue + rid = item.get("record_id") + if rid: + seen_ids.add(rid) + print(f"[resume] 已有记录 {len(seen_ids)} 条") - page += 1 + total_new_json = 0 + total_new_db = 0 + total_skip_db = 0 - print("找法网采集完成") + with open(output_path, "a", encoding="utf-8") as out: + for idx, target in enumerate(cities, start=1): + print( + f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} " + f"({target.city_py})" + ) + city_records = list(self.crawl_city(target)) + + city_new_json = 0 + for record in city_records: + rid = record["record_id"] + if rid in seen_ids: + continue + out.write(json.dumps(record, ensure_ascii=False) + "\n") + seen_ids.add(rid) + city_new_json += 1 + total_new_json += 1 + + city_new_db, city_skip_db = self._write_records_to_db(city_records) + total_new_db += city_new_db + total_skip_db += city_skip_db + print( + f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " + f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" + ) + + print( + f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " + f"DB跳过{total_skip_db}条, 输出: {output_path}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="找法网全新采集脚本(重写版)") + parser.add_argument( + "--output", + default="/www/wwwroot/lawyers/data/findlaw_records_all.jsonl", + help="输出 jsonl 文件路径", + ) + parser.add_argument( + "--max-cities", + type=int, + default=0, + help="最多采集多少个城市,0 表示不限", + ) + parser.add_argument( + "--max-pages", + type=int, + default=9999, + help="每个城市最多采集多少页", + ) + parser.add_argument( + "--city-filter", + default="", + help="按城市拼音或城市名过滤,如 beijing", + ) + parser.add_argument( + "--sleep", + type=float, + default=0.1, + help="每条记录采集间隔秒数", + ) + parser.add_argument( + "--direct", + action="store_true", + help="直连模式,不使用 proxy_settings.json 代理", + ) + parser.add_argument( + "--no-db", + action="store_true", + help="只输出 JSONL,不写入数据库", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + if args.no_db: + crawler = FindlawCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=None, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) + return + + with Db() as db: + crawler = FindlawCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=db, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) if __name__ == "__main__": - with Db() as db: - spider = FindlawSpider(db) - spider.run() + main() diff --git a/common_sites/hualv.py b/common_sites/hualv.py index 006063a..f6eb9ad 100644 --- a/common_sites/hualv.py +++ b/common_sites/hualv.py @@ -1,10 +1,18 @@ +import argparse +import ast +import hashlib import json import os +import random import re import sys import time -import random -from typing import Dict, Optional +from dataclasses import dataclass +from typing import Dict, Iterable, List, Optional, Set, Tuple +from urllib.parse import urljoin + +import urllib3 +from bs4 import BeautifulSoup current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) @@ -14,312 +22,638 @@ if request_dir not in sys.path: if project_root not in sys.path: sys.path.append(project_root) -from bs4 import BeautifulSoup -from request.requests_client import RequestClientError, RequestsClient - from Db import Db -from config import HEADERS +from request.requests_client import RequestClientError, RequestsClient +from utils.rate_limiter import wait_for_request -LIST_URL = "https://m.66law.cn/findlawyer/rpc/loadlawyerlist/" -DOMAIN = "华律" +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +SITE_NAME = "hualv" +LEGACY_DOMAIN = "华律" +SITE_BASE = "https://m.66law.cn" +CITY_DATA_URL = "https://cache.66law.cn/dist/main-v2.0.js" +LIST_API_URL = "https://m.66law.cn/findlawyer/rpc/loadlawyerlist/" + +PHONE_RE = re.compile(r"1[3-9]\d{9}") +EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") +YEAR_RE = re.compile(r"执业\s*(\d+)\s*年") -class HualvSpider: - def __init__(self, db_connection): +@dataclass +class CityTarget: + province_id: int + province_name: str + city_id: int + city_name: str + + +def normalize_phone(text: str) -> str: + compact = re.sub(r"\D", "", text or "") + match = PHONE_RE.search(compact) + return match.group(0) if match else "" + + +def strip_html_tags(text: str) -> str: + return re.sub(r"<[^>]+>", "", text or "").strip() + + +class HualvCrawler: + def __init__( + self, + max_pages: int = 9999, + sleep_seconds: float = 0.15, + use_proxy: bool = True, + db_connection=None, + ): + self.max_pages = max_pages + self.sleep_seconds = max(0.0, sleep_seconds) self.db = db_connection - self.client = self._build_session() - self.areas = self._load_areas() - - def _build_session(self) -> RequestsClient: - custom_headers = HEADERS.copy() - custom_headers['User-Agent'] = ( - 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) ' - 'AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 ' - 'Mobile/15E148 Safari/604.1' + self.client = RequestsClient( + headers={ + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), + "Accept": "application/json, text/javascript, */*; q=0.01", + "X-Requested-With": "XMLHttpRequest", + "Connection": "close", + }, + use_proxy=use_proxy, + retry_total=2, + retry_backoff_factor=1, + retry_status_forcelist=(429, 500, 502, 503, 504), + retry_allowed_methods=("GET", "POST"), ) - custom_headers["Connection"] = "close" - return RequestsClient(headers=custom_headers) - def _refresh_session(self) -> None: - self.client.refresh() + def _request_text( + self, + method: str, + url: str, + *, + timeout: int = 20, + max_retries: int = 3, + referer: str = SITE_BASE, + data: Optional[Dict] = None, + ) -> str: + headers = {"Referer": referer} + last_error: Optional[Exception] = None - def _load_areas(self): - tables = ("area_new", "area2", "area") - last_error = None - for table in tables: + for attempt in range(max_retries): + wait_for_request() try: - provinces = self.db.select_data( - table, - "code, province, pinyin, id", - "domain='66law' AND level=1" - ) or [] - cities = self.db.select_data( - table, - "code, city, province, pid", - "domain='66law' AND level=2" - ) or [] + if method.upper() == "POST": + resp = self.client.post_text( + url, + timeout=timeout, + verify=False, + headers=headers, + data=data, + ) + else: + resp = self.client.get_text( + url, + timeout=timeout, + verify=False, + headers=headers, + ) + + code = resp.status_code + if code == 403: + if attempt < max_retries - 1: + self.client.refresh() + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise RequestClientError(f"{code} Error: {url}") + if code >= 500 and attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + if code >= 400: + raise RequestClientError(f"{code} Error: {url}") + return resp.text except Exception as exc: last_error = exc - continue + if attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise - if not cities: - continue + if last_error is not None: + raise last_error + raise RequestClientError(f"Unknown request error: {url}") - province_map = {p.get('id'): {"code": p.get('code'), "name": p.get('province')} for p in provinces} - city_map = {} - for city in cities: - province_info = province_map.get(city.get('pid'), {}) or {} - province_code = province_info.get('code') - city_map[city.get('code')] = { - "name": city.get('city'), - "province": city.get('province'), - "province_code": province_code, - } - print(f"[华律] 城市来源表: {table}, 城市数: {len(cities)}") - return city_map - - if last_error: - print(f"[华律] 加载地区数据失败: {last_error}") - print("[华律] 无城市数据(已尝试 area_new/area2/area)") - return {} - - def _post(self, data: Dict[str, str], max_retries: int = 3) -> Optional[Dict]: - for attempt in range(max_retries): - try: - resp = self.client.post_text(LIST_URL, data=data, timeout=20, verify=False) - status_code = resp.status_code - text = resp.text - if status_code == 403: - if attempt < max_retries - 1: - wait_time = 2 ** attempt + random.uniform(0.3, 1.0) - print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") - self._refresh_session() - time.sleep(wait_time) - continue - print("请求失败: 403 Forbidden") - return None - if status_code >= 400: - raise RequestClientError(f"{status_code} Error") - try: - return json.loads(text) - except ValueError as exc: - print(f"解析JSON失败: {exc}") - return None - except RequestClientError as exc: - print(f"请求失败: {exc}") - return None - return None - - def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]: - contact_url = f"{url}lawyer_contact.aspx" - print(f" 详情: {contact_url}") - existing = self.db.select_data( - "lawyer", - "id, avatar_url", - f"domain='{DOMAIN}' AND url='{contact_url}'" + def _get_text(self, url: str, *, timeout: int = 20, max_retries: int = 3, referer: str = SITE_BASE) -> str: + return self._request_text( + "GET", + url, + timeout=timeout, + max_retries=max_retries, + referer=referer, ) - existing_id = None - if existing: - existing_id = existing[0].get("id") - avatar = (existing[0].get("avatar_url") or "").strip() - if avatar: - print(" -- 已存在且头像已补全,跳过") - return None - html = self._get_detail(contact_url) - if not html: - return None + def _post_text( + self, + url: str, + *, + data: Dict, + timeout: int = 20, + max_retries: int = 3, + referer: str = SITE_BASE, + ) -> str: + return self._request_text( + "POST", + url, + timeout=timeout, + max_retries=max_retries, + referer=referer, + data=data, + ) + def _extract_spc_location(self, script_text: str) -> List: + # main-v2.js 内置了 sPCLocation=new Array(...),后面紧跟 cateinfo 数组 + marker = "sPCLocation = new Array(" + start = script_text.find(marker) + if start == -1: + marker = "sPCLocation=new Array(" + start = script_text.find(marker) + if start == -1: + return [] + start += len(marker) + + next_marker = script_text.find("cateinfo = new Array(", start) + if next_marker == -1: + next_marker = script_text.find("cateinfo=new Array(", start) + + if next_marker != -1: + end = script_text.rfind(");", start, next_marker) + else: + end = script_text.find(");", start) + + if end == -1 or end <= start: + return [] + + raw = "[" + script_text[start:end] + "]" + try: + data = ast.literal_eval(raw) + except Exception: + return [] + return data if isinstance(data, list) else [] + + def discover_cities(self) -> List[CityTarget]: + script_text = self._get_text(CITY_DATA_URL, referer=SITE_BASE + "/findlawyer/") + rows = self._extract_spc_location(script_text) + + targets: List[CityTarget] = [] + seen: Set[Tuple[int, int]] = set() + + for province in rows: + if not isinstance(province, list) or len(province) < 3: + continue + try: + province_id = int(province[0]) + except Exception: + continue + province_name = str(province[1] or "").strip() + city_rows = province[2] if isinstance(province[2], list) else [] + + for city in city_rows: + if not isinstance(city, list) or len(city) < 2: + continue + try: + city_id = int(city[0]) + except Exception: + continue + city_name = str(city[1] or "").strip() + if city_id <= 0 or not city_name: + continue + + key = (province_id, city_id) + if key in seen: + continue + seen.add(key) + + targets.append( + CityTarget( + province_id=province_id, + province_name=province_name, + city_id=city_id, + city_name=city_name, + ) + ) + return targets + + def fetch_list_page(self, target: CityTarget, page: int) -> Tuple[List[Dict], int]: + payload = { + "pid": str(target.province_id), + "cid": str(target.city_id), + "page": str(page), + } + text = self._post_text( + LIST_API_URL, + data=payload, + referer=SITE_BASE + "/findlawyer/", + ) + data = json.loads((text or "").strip().lstrip("\ufeff") or "{}") + items = data.get("lawyerList") or data.get("queryLawyerList") or [] + if not isinstance(items, list): + items = [] + + page_count = 0 + try: + page_count = int((data.get("lawyerItems") or {}).get("pageCount") or 0) + except Exception: + page_count = 0 + return items, page_count + + def parse_detail(self, detail_url: str) -> Dict: + contact_url = detail_url.rstrip("/") + "/lawyer_contact.aspx" + html = self._get_text(contact_url, referer=detail_url) soup = BeautifulSoup(html, "html.parser") - info_list = soup.find("ul", class_="information-list") - if not info_list: - return None - - phone = "" - law_firm = "" - for li in info_list.find_all("li"): - text = li.get_text(strip=True) - if "手机号" in text: - cleaned = text.replace("手机号", "").replace("(咨询请说明来自 华律网)", "").strip() - match = re.search(r"1\d{10}", cleaned.replace('-', '').replace(' ', '')) - if match: - phone = match.group(0) - if "执业单位" in text: - law_firm = text.replace("执业单位", "").strip() + full_text = soup.get_text(" ", strip=True) name = "" - breadcrumb = soup.find("div", class_="weizhi") - if breadcrumb: - links = breadcrumb.find_all("a") - if len(links) > 2: - name = links[2].get_text(strip=True) + law_firm = "" + phone = "" + email = "" + address = "" + license_no = "" + practice_years: Optional[int] = None - phone = phone.replace('-', '').strip() - if not phone or not re.fullmatch(r"1\d{10}", phone): - print(" 无手机号,跳过") + name_tag = soup.select_one(".logo-box .title b") + if name_tag: + name = name_tag.get_text(strip=True).replace("律师", "").strip() + if not name and soup.title: + match = re.search(r"([^\s,,。_]+?)律师", soup.title.get_text(" ", strip=True)) + if match: + name = match.group(1).strip() + + phone_candidates = [ + soup.select_one(".logo-box .r-bar .tel").get_text(" ", strip=True) + if soup.select_one(".logo-box .r-bar .tel") + else "", + soup.select_one(".lawyer-show ul.info").get_text(" ", strip=True) + if soup.select_one(".lawyer-show ul.info") + else "", + full_text, + ] + for candidate in phone_candidates: + phone = normalize_phone(candidate) + if phone: + break + + for li in soup.select(".lawyer-show ul.info li"): + li_text = li.get_text(" ", strip=True) + if ("事务所" in li_text or "法律服务所" in li_text) and not law_firm: + law_firm = li_text + + if not law_firm: + match = re.search(r'"affiliation":\{"@type":"Organization","name":"([^"]+)"', html) + if match: + law_firm = match.group(1).strip() + + match = re.search(r'"identifier":"([^"]+)"', html) + if match: + license_no = match.group(1).strip() + + match = re.search(r'"streetAddress":"([^"]+)"', html) + if match: + address = match.group(1).strip() + + email_match = EMAIL_RE.search(html) + if email_match: + email = email_match.group(0).strip() + + year_match = YEAR_RE.search(full_text) + if year_match: + try: + practice_years = int(year_match.group(1)) + except Exception: + practice_years = None + + specialties = [node.get_text(strip=True) for node in soup.select(".tag-h38 span")] + specialties = [x for x in specialties if x] + + return { + "name": name, + "law_firm": law_firm, + "phone": phone, + "email": email, + "address": address, + "license_no": license_no, + "practice_years": practice_years, + "specialties": specialties, + "detail_url": detail_url, + "contact_url": contact_url, + } + + def crawl_city(self, target: CityTarget) -> Iterable[Dict]: + seen_details: Set[str] = set() + + for page in range(1, self.max_pages + 1): + try: + items, page_count = self.fetch_list_page(target, page) + except Exception as exc: + print(f"[list] 失败 pid={target.province_id} cid={target.city_id} p{page}: {exc}") + break + + if not items: + break + + for item in items: + detail_url = str(item.get("lawyerUrl") or "").strip() + if not detail_url: + continue + if detail_url.startswith("//"): + detail_url = "https:" + detail_url + if not detail_url.startswith("http"): + detail_url = urljoin(SITE_BASE, detail_url) + + if detail_url in seen_details: + continue + seen_details.add(detail_url) + + try: + detail = self.parse_detail(detail_url) + except Exception as exc: + print(f"[detail] 失败 {detail_url}: {exc}") + continue + + now = int(time.time()) + uid = str(item.get("lawyerId") or item.get("globalUserId") or detail_url) + record_id = hashlib.md5(uid.encode("utf-8")).hexdigest() + + list_name = str(item.get("name") or "").replace("律师", "").strip() + category_text = str(item.get("categoryNames") or "").strip() + category_arr = [x.strip() for x in re.split(r"[、,,]", category_text) if x.strip()] + + yield { + "record_id": record_id, + "collected_at": now, + "source": { + "site": SITE_NAME, + "province_id": target.province_id, + "province": target.province_name, + "city_id": target.city_id, + "city": target.city_name, + "page": page, + "detail_url": detail_url, + "contact_url": detail.get("contact_url", ""), + }, + "list_snapshot": { + "lawyer_id": item.get("lawyerId"), + "name": list_name, + "category_names": category_arr, + "help_count": strip_html_tags(str(item.get("helpCount") or "")), + "comment_score": strip_html_tags(str(item.get("commentScore") or "")), + "response_time": str(item.get("responseTime") or "").strip(), + "year": item.get("year"), + "is_adv": bool(item.get("isAdv")), + }, + "profile": { + "name": detail.get("name") or list_name, + "law_firm": detail.get("law_firm") or "", + "phone": detail.get("phone") or "", + "email": detail.get("email") or "", + "address": detail.get("address") or "", + "license_no": detail.get("license_no") or "", + "practice_years": detail.get("practice_years"), + "specialties": detail.get("specialties") or category_arr, + }, + "raw": item, + } + + if self.sleep_seconds: + time.sleep(self.sleep_seconds) + + if page_count > 0 and page >= page_count: + break + + def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]: + source = record.get("source", {}) or {} + profile = record.get("profile", {}) or {} + + phone = normalize_phone(profile.get("phone", "")) + if not phone: return None - avatar_url, site_time = self._extract_avatar_and_time(soup) - data = { - "phone": phone, + province = (source.get("province") or "").strip() + city = (source.get("city") or province).strip() + return { + "name": (profile.get("name") or "").strip(), + "law_firm": (profile.get("law_firm") or "").strip(), "province": province, "city": city, - "law_firm": law_firm, - "url": contact_url, - "avatar_url": avatar_url, - "create_time": int(time.time()), - "site_time": site_time, - "domain": DOMAIN, - "name": name, - "params": json.dumps({"source": url}, ensure_ascii=False) + "phone": phone, + "url": (source.get("contact_url") or source.get("detail_url") or "").strip(), + "domain": LEGACY_DOMAIN, + "create_time": int(record.get("collected_at") or time.time()), + "params": json.dumps(record, ensure_ascii=False), } - if existing_id: - update_data = { - "avatar_url": avatar_url, - "site_time": site_time, - } - if name: - update_data["name"] = name - if law_firm: - update_data["law_firm"] = law_firm - if province: - update_data["province"] = province - if city: - update_data["city"] = city - if phone: - update_data["phone"] = phone - update_data["params"] = json.dumps({"source": url}, ensure_ascii=False) - try: - self.db.update_data("lawyer", update_data, f"id={existing_id}") - print(" -- 已存在,已补全头像/时间") - except Exception as exc: - print(f" 更新失败: {exc}") - return None - # 若手机号已存在,则更新头像/时间,不再插入新记录 - existing_phone = self.db.select_data( - "lawyer", - "id, avatar_url, url", - f"domain='{DOMAIN}' AND phone='{phone}'" - ) - if existing_phone: - existing_row = existing_phone[0] - avatar = (existing_row.get("avatar_url") or "").strip() - if avatar: - print(" -- 已存在手机号且头像已补全,跳过") - return None - update_data = { - "avatar_url": avatar_url, - "site_time": site_time, - } - if name: - update_data["name"] = name - if law_firm: - update_data["law_firm"] = law_firm - if province: - update_data["province"] = province - if city: - update_data["city"] = city - if phone: - update_data["phone"] = phone - if not existing_row.get("url"): - update_data["url"] = contact_url - update_data["params"] = json.dumps({"source": url}, ensure_ascii=False) - try: - self.db.update_data("lawyer", update_data, f"id={existing_row.get('id')}") - print(" -- 已存在手机号,已补全头像/时间") - except Exception as exc: - print(f" 更新失败: {exc}") - return None - return data - def _extract_avatar_and_time(self, soup: BeautifulSoup) -> (str, Optional[int]): - avatar_url = "" - site_time = None - img_tag = soup.select_one( - "div.fixed-bottom-bar div.contact-lawye a.lr-photo img" - ) - if img_tag: - src = (img_tag.get("src") or "").strip() - if src: - if src.startswith("//"): - avatar_url = f"https:{src}" - else: - avatar_url = src - match = re.search(r"/(20\d{2})(\d{2})/", avatar_url) - if match: - site_time = int(f"{match.group(1)}{match.group(2)}") - else: - match = re.search(r"(20\d{2})(\d{2})\d{2}", avatar_url) - if match: - site_time = int(f"{match.group(1)}{match.group(2)}") - return avatar_url, site_time + def _existing_phones_in_db(self, phones: List[str]) -> Set[str]: + if not self.db or not phones: + return set() - def _get_detail(self, url: str, max_retries: int = 3) -> Optional[str]: - for attempt in range(max_retries): - try: - resp = self.client.get_text(url, timeout=15, verify=False) - status_code = resp.status_code - text = resp.text - if status_code == 403: - if attempt < max_retries - 1: - wait_time = 2 ** attempt + random.uniform(0.3, 1.0) - print(f" 403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") - self._refresh_session() - time.sleep(wait_time) - continue - print(" 请求失败: 403 Forbidden") - return None - if status_code >= 400: - raise RequestClientError(f"{status_code} Error") - return text - except RequestClientError as exc: - print(f" 请求失败: {exc}") - return None - return None + deduped = sorted({p for p in phones if p}) + if not deduped: + return set() - def run(self): - print("启动华律网采集...") - if not self.areas: - print("无城市数据") - return + existing: Set[str] = set() + cur = self.db.db.cursor() + try: + chunk_size = 500 + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cur.execute(sql, [LEGACY_DOMAIN, *chunk]) + for row in cur.fetchall(): + existing.add(row[0]) + finally: + cur.close() - for city_code, city_info in self.areas.items(): - province_code = city_info.get("province_code") - if not province_code: + return existing + + def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: + if not self.db: + return 0, 0 + + rows: List[Dict[str, str]] = [] + for record in records: + row = self._to_legacy_lawyer_row(record) + if row: + rows.append(row) + if not rows: + return 0, 0 + + existing = self._existing_phones_in_db([row["phone"] for row in rows]) + inserted = 0 + skipped = 0 + + for row in rows: + phone = row.get("phone", "") + if not phone or phone in existing: + skipped += 1 continue - province_name = city_info.get("province", "") - city_name = city_info.get("name", "") - print(f"采集 {province_name}-{city_name}") + try: + self.db.insert_data("lawyer", row) + existing.add(phone) + inserted += 1 + except Exception as exc: + skipped += 1 + print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") - page = 1 - while True: - payload = {"pid": province_code, "cid": city_code, "page": str(page)} - data = self._post(payload) - if not data or not data.get("lawyerList"): - break + return inserted, skipped - for item in data["lawyerList"]: - result = self._parse_detail(item.get("lawyerUrl", ""), province_name, city_name) - if not result: + def crawl( + self, + output_path: str, + max_cities: int = 0, + city_filter: Optional[str] = None, + ) -> None: + cities = self.discover_cities() + print(f"[discover] 共发现城市 {len(cities)} 个") + + if city_filter: + key = city_filter.strip().lower() + cities = [ + c for c in cities + if key in c.city_name.lower() or key in str(c.city_id).lower() + ] + print(f"[discover] 过滤后城市 {len(cities)} 个, filter={city_filter}") + + if max_cities > 0: + cities = cities[:max_cities] + print(f"[discover] 截断城市数 {len(cities)}") + + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + + seen_ids: Set[str] = set() + if os.path.exists(output_path): + with open(output_path, "r", encoding="utf-8") as old_file: + for line in old_file: + line = line.strip() + if not line: continue try: - self.db.insert_data("lawyer", result) - print(f" -> 新增: {result['name']} ({result['phone']})") - except Exception as exc: - print(f" 插入失败: {exc}") - time.sleep(1) + item = json.loads(line) + except Exception: + continue + rid = item.get("record_id") + if rid: + seen_ids.add(rid) + print(f"[resume] 已有记录 {len(seen_ids)} 条") - page_count = data.get("lawyerItems", {}).get("pageCount", page) - if page >= page_count: - break - page += 1 - time.sleep(2) + total_new_json = 0 + total_new_db = 0 + total_skip_db = 0 - time.sleep(1) - print("华律网采集完成") + with open(output_path, "a", encoding="utf-8") as out: + for idx, target in enumerate(cities, start=1): + print( + f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} " + f"(pid={target.province_id}, cid={target.city_id})" + ) + city_records = list(self.crawl_city(target)) + + city_new_json = 0 + for record in city_records: + rid = record["record_id"] + if rid in seen_ids: + continue + out.write(json.dumps(record, ensure_ascii=False) + "\n") + seen_ids.add(rid) + city_new_json += 1 + total_new_json += 1 + + city_new_db, city_skip_db = self._write_records_to_db(city_records) + total_new_db += city_new_db + total_skip_db += city_skip_db + + print( + f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " + f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" + ) + + print( + f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " + f"DB跳过{total_skip_db}条, 输出: {output_path}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="华律网全新采集脚本(站点数据直采)") + parser.add_argument( + "--output", + default="/www/wwwroot/lawyers/data/hualv_records_all.jsonl", + help="输出 jsonl 文件路径", + ) + parser.add_argument( + "--max-cities", + type=int, + default=0, + help="最多采集多少个城市,0 表示不限", + ) + parser.add_argument( + "--max-pages", + type=int, + default=9999, + help="每个城市最多采集多少页", + ) + parser.add_argument( + "--city-filter", + default="", + help="按城市名称或城市编码过滤,如 beijing / 110100", + ) + parser.add_argument( + "--sleep", + type=float, + default=0.15, + help="详情页请求间隔秒数", + ) + parser.add_argument( + "--direct", + action="store_true", + help="直连模式,不使用 proxy_settings.json 代理", + ) + parser.add_argument( + "--no-db", + action="store_true", + help="只输出 JSONL,不写入数据库", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + if args.no_db: + crawler = HualvCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=None, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) + return + + with Db() as db: + crawler = HualvCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=db, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) if __name__ == "__main__": - with Db() as db: - spider = HualvSpider(db) - spider.run() + main() diff --git a/common_sites/lawtime.py b/common_sites/lawtime.py index 6f6d462..a3c0285 100644 --- a/common_sites/lawtime.py +++ b/common_sites/lawtime.py @@ -1,13 +1,16 @@ +import argparse +import hashlib import json import os +import random import re import sys import time -import random -from typing import Dict, Optional, List, Set -from urllib.parse import urljoin -from concurrent.futures import ThreadPoolExecutor, as_completed -import threading +from dataclasses import dataclass, field +from typing import Dict, Iterable, List, Optional, Set, Tuple + +import urllib3 +from bs4 import BeautifulSoup current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) @@ -17,262 +20,628 @@ if request_dir not in sys.path: if project_root not in sys.path: sys.path.append(project_root) -import urllib3 -from bs4 import BeautifulSoup +from Db import Db from request.requests_client import RequestClientError, RequestsClient +from utils.rate_limiter import wait_for_request urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -from Db import Db -from config import LAWTIME_CONFIG +SITE_NAME = "lawtime" +LEGACY_DOMAIN = "法律快车" +SITE_BASE = "https://www.lawtime.cn" +PROVINCE_API = "https://www.lawtime.cn/public/stationIndex/getAreaList?areacode=0" +CITY_API_TEMPLATE = "https://www.lawtime.cn/public/stationIndex/getAreaList?areacode={province_id}" +LIST_URL_TEMPLATE = SITE_BASE + "/{city_py}/lawyer" -LIST_BASE = "https://m.lawtime.cn/{pinyin}/lawyer/?page={page}" -DETAIL_BASE = "https://m.lawtime.cn" -DOMAIN = "法律快车" +PHONE_RE = re.compile(r"1[3-9]\d{9}") +YEAR_RE = re.compile(r"执业\s*(\d+)\s*年") -class LawtimeSpider: - def __init__(self, db_connection): +@dataclass +class CityTarget: + province_id: str + province_name: str + province_py: str + city_id: str + city_name: str + city_py: str + + +@dataclass +class ListCard: + detail_url: str + name: str + phone: str + address: str = "" + specialties: List[str] = field(default_factory=list) + metric_text: str = "" + + +def normalize_phone(text: str) -> str: + compact = re.sub(r"\D", "", text or "") + match = PHONE_RE.search(compact) + return match.group(0) if match else "" + + +class LawtimeCrawler: + def __init__( + self, + max_pages: int = 9999, + sleep_seconds: float = 0.1, + use_proxy: bool = True, + db_connection=None, + ): + self.max_pages = max_pages + self.sleep_seconds = max(0.0, sleep_seconds) self.db = db_connection - self.client = self._build_session() - self.max_workers = int(os.getenv("SPIDER_WORKERS", "8")) - self._tls = threading.local() + self.client = RequestsClient( + headers={ + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/122.0.0.0 Safari/537.36" + ), + "Accept": "text/html,application/json,*/*;q=0.8", + "Connection": "close", + }, + use_proxy=use_proxy, + retry_total=2, + retry_backoff_factor=1, + retry_status_forcelist=(429, 500, 502, 503, 504), + retry_allowed_methods=("GET",), + ) - def _build_session(self) -> RequestsClient: - headers = LAWTIME_CONFIG.get("HEADERS", {}) - custom_headers = dict(headers) if headers else {} - custom_headers.setdefault("Connection", "close") - return RequestsClient(headers=custom_headers) + def _get_text( + self, + url: str, + *, + timeout: int = 20, + max_retries: int = 3, + referer: str = SITE_BASE, + ) -> str: + headers = {"Referer": referer} + last_error: Optional[Exception] = None - def _refresh_session(self) -> None: - self.client.refresh() + for attempt in range(max_retries): + wait_for_request() + try: + resp = self.client.get_text( + url, + timeout=timeout, + verify=False, + headers=headers, + ) + code = resp.status_code + if code == 403: + if attempt < max_retries - 1: + self.client.refresh() + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise RequestClientError(f"{code} Error: {url}") + if code >= 500 and attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + if code >= 400: + raise RequestClientError(f"{code} Error: {url}") + return resp.text + except Exception as exc: + last_error = exc + if attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise - def _get_thread_session(self) -> RequestsClient: - s = getattr(self._tls, "session", None) - if s is not None: - return s - s = self.client.clone() - self._tls.session = s - return s + if last_error is not None: + raise last_error + raise RequestClientError(f"Unknown request error: {url}") - def _refresh_thread_session(self) -> None: - s = getattr(self._tls, "session", None) - if s is not None: - s.close() - self._tls.session = None + def _get_json(self, url: str, *, referer: str) -> List[Dict]: + text = self._get_text(url, referer=referer) + cleaned = (text or "").strip().lstrip("\ufeff") + if not cleaned or cleaned.startswith("<"): + return [] + try: + data = json.loads(cleaned) + except ValueError: + return [] + return data if isinstance(data, list) else [] - def _existing_phones(self, phones: List[str]) -> Set[str]: - if not phones: + def discover_cities(self) -> List[CityTarget]: + provinces = self._get_json(PROVINCE_API, referer=SITE_BASE) + if not provinces: + print("[discover] 地区接口未返回有效数据") + return [] + + results: List[CityTarget] = [] + seen_py: Set[str] = set() + + for province in provinces: + province_id = str(province.get("id") or "").strip() + province_name = str(province.get("province") or province.get("city") or "").strip() + province_py = str(province.get("pinyin") or "").strip() + if not province_id or not province_name: + continue + + city_api = CITY_API_TEMPLATE.format(province_id=province_id) + try: + cities = self._get_json(city_api, referer=LIST_URL_TEMPLATE.format(city_py=province_py or "")) + except Exception as exc: + print(f"[city] 获取失败 province={province_id}: {exc}") + continue + + if not cities: + cities = [ + { + "id": province_id, + "province": province_name, + "city": province_name, + "pinyin": province_py, + } + ] + + for city in cities: + city_id = str(city.get("id") or "").strip() + city_name = str(city.get("city") or city.get("province") or "").strip() + city_py = str(city.get("pinyin") or "").strip() + if not city_id or not city_name or not city_py: + continue + if city_py in seen_py: + continue + seen_py.add(city_py) + + results.append( + CityTarget( + province_id=province_id, + province_name=province_name, + province_py=province_py, + city_id=city_id, + city_name=city_name, + city_py=city_py, + ) + ) + + return results + + def _build_list_url(self, city_py: str, page: int) -> str: + base = LIST_URL_TEMPLATE.format(city_py=city_py) + if page <= 1: + return base + return f"{base}?page={page}" + + def fetch_list_page(self, target: CityTarget, page: int) -> Tuple[List[ListCard], bool, str]: + list_url = self._build_list_url(target.city_py, page) + html = self._get_text(list_url, referer=SITE_BASE + "/") + + cards = self.parse_list_cards(html) + + soup = BeautifulSoup(html, "html.parser") + next_link = soup.select_one(f"div.page a[href*='page={page + 1}']") + has_next = next_link is not None + + return cards, has_next, list_url + + def parse_list_cards(self, html: str) -> List[ListCard]: + soup = BeautifulSoup(html, "html.parser") + cards: List[ListCard] = [] + seen: Set[str] = set() + + for item in soup.select("li.lawyer-item-card"): + link_tag = item.select_one("a.name[href]") or item.select_one("a.lawyer-img-box[href]") + if not link_tag: + continue + detail_url = (link_tag.get("href") or "").strip() + if not detail_url.startswith("http"): + continue + if detail_url in seen: + continue + seen.add(detail_url) + + name = link_tag.get_text(strip=True) + phone = "" + phone_tag = item.select_one("div.phone") + if phone_tag: + phone = normalize_phone(phone_tag.get_text(" ", strip=True)) + + address = "" + addr_tag = item.select_one("div.location .txt") + if addr_tag: + address = addr_tag.get_text(" ", strip=True) + + specialties: List[str] = [] + prof_tag = item.select_one("div.prof .txt") + if prof_tag: + specialties = [ + x.strip() for x in re.split(r"[、,,]", prof_tag.get_text(" ", strip=True)) if x.strip() + ] + + metric_text = "" + metric_tag = item.select_one("div.num-msg") + if metric_tag: + metric_text = metric_tag.get_text(" ", strip=True) + + cards.append( + ListCard( + detail_url=detail_url, + name=name, + phone=phone, + address=address, + specialties=specialties, + metric_text=metric_text, + ) + ) + + return cards + + def parse_detail(self, detail_url: str) -> Dict: + html = self._get_text(detail_url, referer=SITE_BASE) + if "网站防火墙" in html or "访问被拒绝" in html or "/ipfilter/verify" in html: + raise RequestClientError(f"firewall blocked: {detail_url}") + + soup = BeautifulSoup(html, "html.parser") + text = soup.get_text(" ", strip=True) + + name = "" + law_firm = "" + phone = "" + address = "" + practice_years: Optional[int] = None + specialties: List[str] = [] + + if soup.title: + title = soup.title.get_text(" ", strip=True) + match = re.search(r"([^\s_,,。]+?)律师", title) + if match: + name = match.group(1).strip() + + phone_candidates = [ + soup.select_one(".data-w .tel-b b").get_text(" ", strip=True) + if soup.select_one(".data-w .tel-b b") + else "", + soup.select_one(".law-info-b .item .two-r.b").get_text(" ", strip=True) + if soup.select_one(".law-info-b .item .two-r.b") + else "", + text, + ] + for candidate in phone_candidates: + phone = normalize_phone(candidate) + if phone: + break + + law_firm_tag = soup.select_one(".law-info-b .item .two-nowrap") + if law_firm_tag: + law_firm = law_firm_tag.get_text(" ", strip=True) + + for li in soup.select(".law-info-b .item"): + li_text = li.get_text(" ", strip=True) + if ("事务所" in li_text or "法律服务所" in li_text) and not law_firm: + law_firm = li_text + + addr_tag = soup.select_one(".law-info-b .item .two-r[title]") + if addr_tag: + addr_value = (addr_tag.get("title") or "").strip() + if len(addr_value) > 8: + address = addr_value + + if not address: + addr_tag = soup.select_one(".law-info-b .item .two-r") + if addr_tag: + addr_value = addr_tag.get_text(" ", strip=True) + if len(addr_value) > 8 and "律师" not in addr_value: + address = addr_value + + year_match = YEAR_RE.search(text) + if year_match: + try: + practice_years = int(year_match.group(1)) + except Exception: + practice_years = None + + specialties = [x.get_text(strip=True) for x in soup.select(".profession-b .item") if x.get_text(strip=True)] + + return { + "name": name, + "law_firm": law_firm, + "phone": phone, + "address": address, + "practice_years": practice_years, + "specialties": specialties, + "detail_url": detail_url, + } + + def crawl_city(self, target: CityTarget) -> Iterable[Dict]: + seen_details: Set[str] = set() + + for page in range(1, self.max_pages + 1): + try: + cards, has_next, list_url = self.fetch_list_page(target, page) + except Exception as exc: + print(f"[list] 失败 {target.city_py} p{page}: {exc}") + break + + if not cards: + break + + for card in cards: + if card.detail_url in seen_details: + continue + seen_details.add(card.detail_url) + + detail: Dict = {} + try: + detail = self.parse_detail(card.detail_url) + except Exception as exc: + print(f"[detail] 失败 {card.detail_url}: {exc}") + + phone = normalize_phone(detail.get("phone") or card.phone) + profile_name = (detail.get("name") or card.name).replace("律师", "").strip() + + now = int(time.time()) + record_id = hashlib.md5(card.detail_url.encode("utf-8")).hexdigest() + + yield { + "record_id": record_id, + "collected_at": now, + "source": { + "site": SITE_NAME, + "province_id": target.province_id, + "province": target.province_name, + "province_py": target.province_py, + "city_id": target.city_id, + "city": target.city_name, + "city_py": target.city_py, + "page": page, + "list_url": list_url, + "detail_url": card.detail_url, + }, + "list_snapshot": { + "name": card.name, + "phone": card.phone, + "address": card.address, + "specialties": card.specialties, + "metric_text": card.metric_text, + }, + "profile": { + "name": profile_name, + "law_firm": (detail.get("law_firm") or "").strip(), + "phone": phone, + "address": (detail.get("address") or card.address or "").strip(), + "practice_years": detail.get("practice_years"), + "specialties": detail.get("specialties") or card.specialties, + }, + } + + if self.sleep_seconds: + time.sleep(self.sleep_seconds) + + if not has_next: + break + + def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]: + source = record.get("source", {}) or {} + profile = record.get("profile", {}) or {} + + phone = normalize_phone(profile.get("phone", "")) + if not phone: + return None + + province = (source.get("province") or "").strip() + city = (source.get("city") or province).strip() + return { + "name": (profile.get("name") or "").strip(), + "law_firm": (profile.get("law_firm") or "").strip(), + "province": province, + "city": city, + "phone": phone, + "url": (source.get("detail_url") or source.get("list_url") or "").strip(), + "domain": LEGACY_DOMAIN, + "create_time": int(record.get("collected_at") or time.time()), + "params": json.dumps(record, ensure_ascii=False), + } + + def _existing_phones_in_db(self, phones: List[str]) -> Set[str]: + if not self.db or not phones: return set() + + deduped = sorted({p for p in phones if p}) + if not deduped: + return set() + existing: Set[str] = set() cur = self.db.db.cursor() try: chunk_size = 500 - for i in range(0, len(phones), chunk_size): - chunk = phones[i:i + chunk_size] + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] placeholders = ",".join(["%s"] * len(chunk)) sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" - cur.execute(sql, [DOMAIN, *chunk]) + cur.execute(sql, [LEGACY_DOMAIN, *chunk]) for row in cur.fetchall(): existing.add(row[0]) finally: cur.close() + return existing - def _load_areas(self): - condition = "level = 2 and domain='法律快车'" - tables = ("area_new", "area", "area2") - last_error = None - for table in tables: - try: - rows = self.db.select_data(table, "pinyin, province, city", condition) or [] - except Exception as exc: - last_error = exc + def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: + if not self.db: + return 0, 0 + + rows: List[Dict[str, str]] = [] + for record in records: + row = self._to_legacy_lawyer_row(record) + if row: + rows.append(row) + if not rows: + return 0, 0 + + existing = self._existing_phones_in_db([row["phone"] for row in rows]) + inserted = 0 + skipped = 0 + + for row in rows: + phone = row.get("phone", "") + if not phone or phone in existing: + skipped += 1 continue - if rows: - missing_pinyin = sum(1 for r in rows if not (r.get("pinyin") or "").strip()) - print(f"[法律快车] 城市来源表: {table}, 城市数: {len(rows)}, 缺少pinyin: {missing_pinyin}") - return rows - - if last_error: - print(f"[法律快车] 加载地区数据失败: {last_error}") - print("[法律快车] 无城市数据(已尝试 area_new/area/area2)") - return [] - - def _get(self, url: str, max_retries: int = 3) -> Optional[str]: - return self._get_with_session(self.client, url, max_retries=max_retries, is_thread=False) - - def _get_with_session(self, session: RequestsClient, url: str, max_retries: int = 3, is_thread: bool = False) -> Optional[str]: - for attempt in range(max_retries): try: - resp = session.get_text(url, timeout=15, verify=False) - status_code = resp.status_code - text = resp.text - if status_code == 403: - if attempt < max_retries - 1: - wait_time = 2 ** attempt + random.uniform(0.3, 1.0) - print(f"请求失败 {url}: 403,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") - if is_thread: - self._refresh_thread_session() - session = self._get_thread_session() - else: - self._refresh_session() - session = self.client - time.sleep(wait_time) + self.db.insert_data("lawyer", row) + existing.add(phone) + inserted += 1 + except Exception as exc: + skipped += 1 + print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") + + return inserted, skipped + + def crawl( + self, + output_path: str, + max_cities: int = 0, + city_filter: Optional[str] = None, + ) -> None: + cities = self.discover_cities() + print(f"[discover] 共发现城市 {len(cities)} 个") + + if city_filter: + key = city_filter.strip().lower() + cities = [ + c for c in cities + if key in c.city_py.lower() or key in c.city_name.lower() + ] + print(f"[discover] 过滤后城市 {len(cities)} 个, filter={city_filter}") + + if max_cities > 0: + cities = cities[:max_cities] + print(f"[discover] 截断城市数 {len(cities)}") + + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + + seen_ids: Set[str] = set() + if os.path.exists(output_path): + with open(output_path, "r", encoding="utf-8") as old_file: + for line in old_file: + line = line.strip() + if not line: continue - print(f"请求失败 {url}: 403 Forbidden") - return None - if status_code >= 400: - raise RequestClientError(f"{status_code} Error: {url}") - return text - except RequestClientError as exc: - print(f"请求失败 {url}: {exc}") - return None - return None + try: + item = json.loads(line) + except Exception: + continue + rid = item.get("record_id") + if rid: + seen_ids.add(rid) + print(f"[resume] 已有记录 {len(seen_ids)} 条") - def _parse_list(self, html: str, province: str, city: str) -> int: - soup = BeautifulSoup(html, "html.parser") - links = [a.get("href", "") for a in soup.select("a.hide_link")] - links = [link.replace("lll", "int") for link in links if link] - if not links: - return 0 + total_new_json = 0 + total_new_db = 0 + total_skip_db = 0 - detail_urls = [urljoin(DETAIL_BASE, link) for link in links] + with open(output_path, "a", encoding="utf-8") as out: + for idx, target in enumerate(cities, start=1): + print( + f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} " + f"({target.city_py})" + ) + city_records = list(self.crawl_city(target)) - results: List[Dict[str, str]] = [] - with ThreadPoolExecutor(max_workers=self.max_workers) as ex: - futs = [ex.submit(self._parse_detail, u, province, city) for u in detail_urls] - for fut in as_completed(futs): - try: - data = fut.result() - except Exception as exc: - print(f" 详情解析异常: {exc}") - continue - if data and data.get("phone"): - results.append(data) + city_new_json = 0 + for record in city_records: + rid = record["record_id"] + if rid in seen_ids: + continue + out.write(json.dumps(record, ensure_ascii=False) + "\n") + seen_ids.add(rid) + city_new_json += 1 + total_new_json += 1 - if not results: - return len(detail_urls) + city_new_db, city_skip_db = self._write_records_to_db(city_records) + total_new_db += city_new_db + total_skip_db += city_skip_db - phones = [d["phone"] for d in results if d.get("phone")] - existing = self._existing_phones(phones) + print( + f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " + f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" + ) - for data in results: - phone = data.get("phone") - if not phone: - continue - if phone in existing: - print(f" -- 已存在: {data['name']} ({phone})") - continue - try: - self.db.insert_data("lawyer", data) - print(f" -> 新增: {data['name']} ({phone})") - except Exception as exc: - print(f" 插入失败 {data.get('url')}: {exc}") + print( + f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " + f"DB跳过{total_skip_db}条, 输出: {output_path}" + ) - return len(detail_urls) - def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]: - html = None - sess = self._get_thread_session() - html = self._get_with_session(sess, url, max_retries=3, is_thread=True) - if not html: - return None +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="法律快车全新采集脚本(站点数据直采)") + parser.add_argument( + "--output", + default="/www/wwwroot/lawyers/data/lawtime_records_all.jsonl", + help="输出 jsonl 文件路径", + ) + parser.add_argument( + "--max-cities", + type=int, + default=0, + help="最多采集多少个城市,0 表示不限", + ) + parser.add_argument( + "--max-pages", + type=int, + default=9999, + help="每个城市最多采集多少页", + ) + parser.add_argument( + "--city-filter", + default="", + help="按城市拼音或城市名过滤,如 beijing", + ) + parser.add_argument( + "--sleep", + type=float, + default=0.1, + help="详情页请求间隔秒数", + ) + parser.add_argument( + "--direct", + action="store_true", + help="直连模式,不使用 proxy_settings.json 代理", + ) + parser.add_argument( + "--no-db", + action="store_true", + help="只输出 JSONL,不写入数据库", + ) + return parser.parse_args() - soup = BeautifulSoup(html, "html.parser") - text = soup.get_text(" ") - name = "" - title_tag = soup.find("title") - if title_tag: - match = re.search(r"(\S+)律师", title_tag.get_text()) - if match: - name = match.group(1) - if not name: - intl_div = soup.find("div", class_="intl") - if intl_div: - match = re.search(r"(\S+)律师", intl_div.get_text()) - if match: - name = match.group(1) +def main(): + args = parse_args() - phone = "" - phone_pattern = r"1[3-9]\d{9}" - for item in soup.select("div.item.flex"): - label = item.find("div", class_="label") - desc = item.find("div", class_="desc") - if not label or not desc: - continue - label_text = label.get_text() - desc_text = desc.get_text().replace("-", "") - if "联系电话" in label_text or "电话" in label_text: - matches = re.findall(phone_pattern, desc_text) - if matches: - phone = matches[0] - break - if not phone: - matches = re.findall(phone_pattern, text.replace("-", "")) - if matches: - phone = matches[0] - if not phone: - print(f" 无手机号: {url}") - return None + if args.no_db: + crawler = LawtimeCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=None, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) + return - law_firm = "" - for item in soup.select("div.item.flex"): - label = item.find("div", class_="label") - desc = item.find("div", class_="desc") - if not label or not desc: - continue - if "执业律所" in label.get_text() or "律所" in label.get_text(): - law_firm = desc.get_text(strip=True).replace("已认证", "") - break - - params = { - "list_url": url, - "province": province, - "city": city, - } - - return { - "name": name or "", - "law_firm": law_firm, - "province": province, - "city": city, - "phone": phone, - "url": url, - "domain": DOMAIN, - "create_time": int(time.time()), - "params": json.dumps(params, ensure_ascii=False) - } - - def run(self): - print("启动法律快车采集...") - areas = self._load_areas() - if not areas: - print("无地区数据") - return - - for area in areas: - pinyin = area.get("pinyin") - province = area.get("province", "") - city = area.get("city", "") - if not pinyin: - continue - page = 1 - while True: - list_url = LIST_BASE.format(pinyin=pinyin, page=page) - print(f"采集 {province}-{city} 第 {page} 页: {list_url}") - html = self._get(list_url) - if not html: - break - link_count = self._parse_list(html, province, city) - if link_count == 0: - break - page += 1 - print("法律快车采集完成") + with Db() as db: + crawler = LawtimeCrawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=db, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) if __name__ == "__main__": - with Db() as db: - spider = LawtimeSpider(db) - spider.run() + main() diff --git a/common_sites/six4365.py b/common_sites/six4365.py index 255e380..84da880 100644 --- a/common_sites/six4365.py +++ b/common_sites/six4365.py @@ -1,11 +1,17 @@ +import argparse +import hashlib import json import os +import random +import re import sys import time -import random -from typing import Dict, Optional, List, Set -from concurrent.futures import ThreadPoolExecutor, as_completed -import threading +from dataclasses import dataclass +from typing import Dict, Iterable, List, Optional, Set, Tuple +from urllib.parse import urljoin + +import urllib3 +from bs4 import BeautifulSoup current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) @@ -15,146 +21,237 @@ if request_dir not in sys.path: if project_root not in sys.path: sys.path.append(project_root) -import urllib3 -from bs4 import BeautifulSoup +from Db import Db from request.requests_client import RequestClientError, RequestsClient +from utils.rate_limiter import wait_for_request urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -from Db import Db +SITE_NAME = "64365" +LEGACY_DOMAIN = "律图" +SITE_BASE = "https://m.64365.com" +AREA_DATA_URL = "https://image.64365.com/ui_v3/m/js/public/area-cate-data.js" +LIST_API_URL = "https://m.64365.com/findLawyer/rpc/FindLawyer/LawyerRecommend/" -DOMAIN = "律图" -LIST_URL = "https://m.64365.com/findLawyer/rpc/FindLawyer/LawyerRecommend/" +PHONE_RE = re.compile(r"1[3-9]\d{9}") +YEAR_RE = re.compile(r"(\d+)\s*年") -class Six4365Spider: - def __init__(self, db_connection): +@dataclass +class CityTarget: + area_id: str + province_id: str + province_name: str + province_py: str + city_name: str + city_py: str + + +@dataclass +class ListCard: + detail_url: str + name: str + specialties: List[str] + score_text: str + service_text: str + + +def normalize_phone(text: str) -> str: + compact = re.sub(r"\D", "", text or "") + match = PHONE_RE.search(compact) + return match.group(0) if match else "" + + +class Six4365Crawler: + def __init__( + self, + max_pages: int = 9999, + sleep_seconds: float = 0.1, + use_proxy: bool = True, + db_connection=None, + ): + self.max_pages = max_pages + self.sleep_seconds = max(0.0, sleep_seconds) self.db = db_connection - self.client = self._build_session() - self.max_workers = int(os.getenv("SPIDER_WORKERS", "8")) - self._tls = threading.local() - self.cities = self._load_cities() + self.client = RequestsClient( + headers={ + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), + "Accept": "text/html, */*; q=0.01", + "Connection": "close", + }, + use_proxy=use_proxy, + retry_total=2, + retry_backoff_factor=1, + retry_status_forcelist=(429, 500, 502, 503, 504), + retry_allowed_methods=("GET", "POST"), + ) - def _build_session(self) -> RequestsClient: - return RequestsClient(headers={ - "User-Agent": ( - "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " - "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " - "Mobile/15E148 Safari/604.1" - ), - "Connection": "close", - }) + def _request_text( + self, + method: str, + url: str, + *, + timeout: int = 20, + max_retries: int = 3, + referer: str = SITE_BASE, + data: Optional[Dict] = None, + ) -> str: + headers = {"Referer": referer} + last_error: Optional[Exception] = None - def _refresh_session(self) -> None: - self.client.refresh() - - def _get_thread_session(self) -> RequestsClient: - """每个线程使用独立请求客户端(共享相同 headers/代理配置)。""" - s = getattr(self._tls, "session", None) - if s is not None: - return s - s = self.client.clone() - self._tls.session = s - return s - - def _refresh_thread_session(self) -> None: - s = getattr(self._tls, "session", None) - if s is not None: - s.close() - self._tls.session = None - - def _existing_urls(self, urls: List[str]) -> Set[str]: - """批量查重,减少 N 次 is_data_exist""" - if not urls: - return set() - existing: Set[str] = set() - cur = self.db.db.cursor() - try: - # IN 参数过多会失败,分批 - chunk_size = 500 - for i in range(0, len(urls), chunk_size): - chunk = urls[i:i + chunk_size] - placeholders = ",".join(["%s"] * len(chunk)) - sql = f"SELECT url FROM lawyer WHERE url IN ({placeholders})" - cur.execute(sql, chunk) - for row in cur.fetchall(): - # pymysql 默认返回 tuple - existing.add(row[0]) - finally: - cur.close() - return existing - - def _load_cities(self): - tables = ("area_new", "area2", "area") - last_error = None - for table in tables: + for attempt in range(max_retries): + wait_for_request() try: - provinces = self.db.select_data( - table, - "id, code, province", - "domain='64365' AND level=1" - ) or [] - cities = self.db.select_data( - table, - "code, city, province, pid", - "domain='64365' AND level=2" - ) or [] + if method.upper() == "POST": + resp = self.client.post_text( + url, + timeout=timeout, + verify=False, + headers=headers, + data=data, + ) + else: + resp = self.client.get_text( + url, + timeout=timeout, + verify=False, + headers=headers, + ) + + code = resp.status_code + if code == 403: + if attempt < max_retries - 1: + self.client.refresh() + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise RequestClientError(f"{code} Error: {url}") + if code >= 500 and attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + if code >= 400: + raise RequestClientError(f"{code} Error: {url}") + return resp.text except Exception as exc: last_error = exc - continue + if attempt < max_retries - 1: + time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) + continue + raise - if not cities: - continue + if last_error is not None: + raise last_error + raise RequestClientError(f"Unknown request error: {url}") - province_map = {row.get('id'): row for row in provinces} - data = {} - for city in cities: - province_row = province_map.get(city.get('pid'), {}) or {} - data[str(city.get('code'))] = { - "name": city.get('city'), - "province": city.get('province'), - "province_name": province_row.get('province', city.get('province')), - } - print(f"[律图] 城市来源表: {table}, 城市数: {len(cities)}") - return data + def _get_text(self, url: str, *, timeout: int = 20, max_retries: int = 3, referer: str = SITE_BASE) -> str: + return self._request_text( + "GET", + url, + timeout=timeout, + max_retries=max_retries, + referer=referer, + ) - if last_error: - print(f"[律图] 加载地区数据失败: {last_error}") - print("[律图] 无城市数据(已尝试 area_new/area2/area)") - return {} + def _post_text( + self, + url: str, + *, + data: Dict, + timeout: int = 20, + max_retries: int = 3, + referer: str = SITE_BASE, + ) -> str: + return self._request_text( + "POST", + url, + timeout=timeout, + max_retries=max_retries, + referer=referer, + data=data, + ) - def _post(self, payload: Dict[str, str], max_retries: int = 3) -> Optional[str]: - for attempt in range(max_retries): - try: - resp = self.client.post_text(LIST_URL, data=payload, timeout=10, verify=False) - status_code = resp.status_code - text = resp.text - if status_code == 403: - if attempt < max_retries - 1: - wait_time = 2 ** attempt + random.uniform(0.3, 1.0) - print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") - self._refresh_session() - time.sleep(wait_time) + def _extract_area_data(self, text: str) -> List[Dict]: + match = re.search( + r"lvtuData\.areaData\s*=\s*(\[[\s\S]*?\])\s*;\s*lvtuData\.categroyData", + text, + re.S, + ) + if not match: + return [] + + raw = match.group(1) + try: + data = json.loads(raw) + except Exception: + return [] + return data if isinstance(data, list) else [] + + def discover_cities(self) -> List[CityTarget]: + text = self._get_text(AREA_DATA_URL, referer=SITE_BASE + "/findlawyer/") + provinces = self._extract_area_data(text) + + targets: List[CityTarget] = [] + seen_area: Set[str] = set() + + for province in provinces: + province_id = str(province.get("id") or "").strip() + province_name = str(province.get("name") or "").strip() + province_py = str(province.get("py") or "").strip() + child_rows = province.get("child") or [] + + # 常规省份 child 是地级市;直辖市 child 是区县,此时使用省级 id 抓取 + if child_rows and any((row.get("child") or []) for row in child_rows): + for city in child_rows: + area_id = str(city.get("id") or "").strip() + city_name = str(city.get("name") or "").strip() + city_py = str(city.get("py") or "").strip() + if not area_id or not city_name: continue - print("请求失败: 403 Forbidden") - return None - if status_code >= 400: - raise RequestClientError(f"{status_code} Error") - return text - except RequestClientError as exc: - print(f"请求失败: {exc}") - return None - return None + if area_id in seen_area: + continue + seen_area.add(area_id) + targets.append( + CityTarget( + area_id=area_id, + province_id=province_id, + province_name=province_name, + province_py=province_py, + city_name=city_name, + city_py=city_py, + ) + ) + else: + if not province_id or not province_name: + continue + if province_id in seen_area: + continue + seen_area.add(province_id) + targets.append( + CityTarget( + area_id=province_id, + province_id=province_id, + province_name=province_name, + province_py=province_py, + city_name=province_name, + city_py=province_py, + ) + ) - def _build_payload(self, city_code: str, page: int) -> Dict[str, str]: + return targets + + def _build_payload(self, area_id: str, page: int) -> Dict[str, str]: + ua = self.client.headers.get("User-Agent", "") return { "AdCode": "", - "RegionId": str(city_code), + "RegionId": str(area_id), "CategoryId": "", "MaxNumber": "", "OnlyData": "true", "IgnoreButton": "", - "LawyerRecommendRequest[AreaId]": str(city_code), + "LawyerRecommendRequest[AreaId]": str(area_id), "LawyerRecommendRequest[LawCategoryIds]": "", "LawyerRecommendRequest[LawFirmPersonCount]": "", "LawyerRecommendRequest[LawFirmScale]": "", @@ -171,162 +268,429 @@ class Six4365Spider: "LawyerRecommendRequest[RefferUrl]": "", "LawyerRecommendRequest[RequestUrl]": "https://m.64365.com/findlawyer/", "LawyerRecommendRequest[resource_type_name]": "", - "LawyerRecommendRequest[UserAgent]": self.client.headers["User-Agent"], + "LawyerRecommendRequest[UserAgent]": ua, "LawyerRecommendRequest[AddLawyerWithNoData]": "false", "ShowCaseButton": "true", } - def _parse_list(self, html: str, province: str, city: str) -> int: - soup = BeautifulSoup(html, "html.parser") - lawyers = soup.find_all("a", class_="lawyer") - if not lawyers: - return 0 + def fetch_list_html(self, target: CityTarget, page: int) -> str: + payload = self._build_payload(target.area_id, page) + return self._post_text( + LIST_API_URL, + data=payload, + referer=SITE_BASE + "/findlawyer/", + ) - detail_urls: List[str] = [] - for lawyer in lawyers: - href = lawyer.get("href") + def parse_list_cards(self, html: str) -> List[ListCard]: + soup = BeautifulSoup(html, "html.parser") + cards: List[ListCard] = [] + seen: Set[str] = set() + + for anchor in soup.select("a.lawyer[href]"): + href = (anchor.get("href") or "").strip() if not href: continue - detail_urls.append(f"{href.rstrip('/')}/info/") - - if not detail_urls: - return 0 - - results: List[Dict[str, str]] = [] - with ThreadPoolExecutor(max_workers=self.max_workers) as ex: - futs = [ex.submit(self._parse_detail, u, province, city) for u in detail_urls] - for fut in as_completed(futs): - try: - data = fut.result() - except Exception as exc: - print(f" 详情解析异常: {exc}") - continue - if data: - results.append(data) - - if not results: - return len(detail_urls) - - existing = self._existing_urls([r.get("url", "") for r in results if r.get("url")]) - for data in results: - if not data: + detail_url = urljoin(SITE_BASE, href) + if detail_url in seen: continue - url = data.get("url", "") - if not url: - continue - if url in existing: - print(f" -- 已存在URL: {url}") - continue - try: - self.db.insert_data("lawyer", data) - print(f" -> 新增: {data['name']} ({data['phone']})") - except Exception as exc: - print(f" 插入失败 {url}: {exc}") + seen.add(detail_url) - return len(detail_urls) + name = "" + name_tag = anchor.select_one("b.name") + if name_tag: + name = name_tag.get_text(strip=True) - def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]: - html = self._get_detail(url) - if not html: - return None + specialties: List[str] = [] + skill_tag = anchor.select_one("div.skill") + if skill_tag: + raw = skill_tag.get_text(" ", strip=True).replace("擅长:", "") + specialties = [x.strip() for x in re.split(r"[、,,]", raw) if x.strip()] + score_text = "" + score_tag = anchor.select_one("div.info span[title='评分'] em") + if score_tag: + score_text = score_tag.get_text(strip=True) + + service_text = "" + service_tag = anchor.select_one("div.info") + if service_tag: + service_text = service_tag.get_text(" ", strip=True) + + cards.append( + ListCard( + detail_url=detail_url, + name=name, + specialties=specialties, + score_text=score_text, + service_text=service_text, + ) + ) + + return cards + + def parse_detail(self, detail_url: str) -> Dict: + info_url = detail_url.rstrip("/") + "/info/" + html = self._get_text(info_url, referer=detail_url) soup = BeautifulSoup(html, "html.parser") - base_info = soup.find("ul", class_="intro-basic-bar") - if not base_info: - return None name = "" law_firm = "" phone = "" + practice_years: Optional[int] = None + office_area = "" + address = "" + specialties: List[str] = [] - for li in base_info.find_all("li"): - label = li.find("span", class_="label") - txt = li.find("div", class_="txt") - if not label or not txt: + for li in soup.select("ul.intro-basic-bar li"): + label_tag = li.select_one("span.label") + value_tag = li.select_one("div.txt") + if not label_tag or not value_tag: continue - label_text = label.get_text(strip=True) - if "姓名" in label_text: - name = txt.get_text(strip=True) - if "执业律所" in label_text: - law_firm = txt.get_text(strip=True) - more_section = soup.find("div", class_="more-intro-basic") - if more_section: - phone_ul = more_section.find("ul", class_="intro-basic-bar") - if phone_ul: - for li in phone_ul.find_all("li"): - label = li.find("span", class_="label") - txt = li.find("div", class_="txt") - if label and txt and "联系电话" in label.get_text(strip=True): - phone = txt.get_text(strip=True).replace(" ", "") - break + label = label_tag.get_text(" ", strip=True).replace(":", "") + value = value_tag.get_text(" ", strip=True) - phone = phone.replace('-', '').strip() - if not name or not phone: + if "姓名" in label and not name: + name = value + elif "执业律所" in label and not law_firm: + law_firm = value + elif "联系电话" in label and not phone: + phone = normalize_phone(value) + elif "执业年限" in label and practice_years is None: + year_match = YEAR_RE.search(value) + if year_match: + try: + practice_years = int(year_match.group(1)) + except Exception: + practice_years = None + elif "办公地区" in label and not office_area: + office_area = value + elif "办公地址" in label and not address: + address = value + + text = soup.get_text(" ", strip=True) + if not phone: + phone = normalize_phone(text) + + if not name and soup.title: + title = soup.title.get_text(" ", strip=True) + match = re.search(r"([^\s_,,。]+?)律师", title) + if match: + name = match.group(1).strip() + + skill_match = re.search(r"擅长:([^\n]+)", text) + if skill_match: + specialties = [x.strip() for x in re.split(r"[、,,]", skill_match.group(1)) if x.strip()] + + return { + "name": name, + "law_firm": law_firm, + "phone": phone, + "practice_years": practice_years, + "office_area": office_area, + "address": address, + "specialties": specialties, + "detail_url": detail_url, + "info_url": info_url, + } + + def crawl_city(self, target: CityTarget) -> Iterable[Dict]: + seen_detail_urls: Set[str] = set() + page_first_seen: Set[str] = set() + + for page in range(1, self.max_pages + 1): + try: + html = self.fetch_list_html(target, page) + except Exception as exc: + print(f"[list] 失败 area={target.area_id} p{page}: {exc}") + break + + cards = self.parse_list_cards(html) + if not cards: + break + + first_url = cards[0].detail_url + if first_url in page_first_seen: + break + page_first_seen.add(first_url) + + for card in cards: + if card.detail_url in seen_detail_urls: + continue + seen_detail_urls.add(card.detail_url) + + try: + detail = self.parse_detail(card.detail_url) + except Exception as exc: + print(f"[detail] 失败 {card.detail_url}: {exc}") + continue + + now = int(time.time()) + uid_match = re.search(r"/lawyer/(\d+)/", card.detail_url) + uid = uid_match.group(1) if uid_match else card.detail_url + record_id = hashlib.md5(uid.encode("utf-8")).hexdigest() + + yield { + "record_id": record_id, + "collected_at": now, + "source": { + "site": SITE_NAME, + "province_id": target.province_id, + "province": target.province_name, + "province_py": target.province_py, + "area_id": target.area_id, + "city": target.city_name, + "city_py": target.city_py, + "page": page, + "detail_url": card.detail_url, + "info_url": detail.get("info_url", ""), + }, + "list_snapshot": { + "name": card.name, + "specialties": card.specialties, + "score_text": card.score_text, + "service_text": card.service_text, + }, + "profile": { + "name": detail.get("name") or card.name, + "law_firm": detail.get("law_firm") or "", + "phone": detail.get("phone") or "", + "practice_years": detail.get("practice_years"), + "office_area": detail.get("office_area") or "", + "address": detail.get("address") or "", + "specialties": detail.get("specialties") or card.specialties, + }, + } + + if self.sleep_seconds: + time.sleep(self.sleep_seconds) + + def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]: + source = record.get("source", {}) or {} + profile = record.get("profile", {}) or {} + + phone = normalize_phone(profile.get("phone", "")) + if not phone: return None - data = { - "phone": phone, + province = (source.get("province") or "").strip() + city = (source.get("city") or province).strip() + return { + "name": (profile.get("name") or "").strip(), + "law_firm": (profile.get("law_firm") or "").strip(), "province": province, "city": city, - "law_firm": law_firm, - "url": url, - "domain": DOMAIN, - "name": name, - "create_time": int(time.time()), - "params": json.dumps({"province": province, "city": city}, ensure_ascii=False) + "phone": phone, + "url": (source.get("info_url") or source.get("detail_url") or "").strip(), + "domain": LEGACY_DOMAIN, + "create_time": int(record.get("collected_at") or time.time()), + "params": json.dumps(record, ensure_ascii=False), } - return data - def _get_detail(self, url: str, max_retries: int = 3) -> Optional[str]: - session = self._get_thread_session() - for attempt in range(max_retries): + def _existing_phones_in_db(self, phones: List[str]) -> Set[str]: + if not self.db or not phones: + return set() + + deduped = sorted({p for p in phones if p}) + if not deduped: + return set() + + existing: Set[str] = set() + cur = self.db.db.cursor() + try: + chunk_size = 500 + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cur.execute(sql, [LEGACY_DOMAIN, *chunk]) + for row in cur.fetchall(): + existing.add(row[0]) + finally: + cur.close() + + return existing + + def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: + if not self.db: + return 0, 0 + + rows: List[Dict[str, str]] = [] + for record in records: + row = self._to_legacy_lawyer_row(record) + if row: + rows.append(row) + if not rows: + return 0, 0 + + existing = self._existing_phones_in_db([row["phone"] for row in rows]) + inserted = 0 + skipped = 0 + + for row in rows: + phone = row.get("phone", "") + if not phone or phone in existing: + skipped += 1 + continue try: - resp = session.get_text(url, timeout=10, verify=False) - status_code = resp.status_code - text = resp.text - if status_code == 403: - if attempt < max_retries - 1: - wait_time = 2 ** attempt + random.uniform(0.3, 1.0) - print(f" 403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") - self._refresh_thread_session() - session = self._get_thread_session() - time.sleep(wait_time) + self.db.insert_data("lawyer", row) + existing.add(phone) + inserted += 1 + except Exception as exc: + skipped += 1 + print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") + + return inserted, skipped + + def crawl( + self, + output_path: str, + max_cities: int = 0, + city_filter: Optional[str] = None, + ) -> None: + cities = self.discover_cities() + print(f"[discover] 共发现地区 {len(cities)} 个") + + if city_filter: + key = city_filter.strip().lower() + cities = [ + c for c in cities + if key in c.city_name.lower() or key in c.city_py.lower() or key in c.area_id + ] + print(f"[discover] 过滤后地区 {len(cities)} 个, filter={city_filter}") + + if max_cities > 0: + cities = cities[:max_cities] + print(f"[discover] 截断地区数 {len(cities)}") + + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + + seen_ids: Set[str] = set() + if os.path.exists(output_path): + with open(output_path, "r", encoding="utf-8") as old_file: + for line in old_file: + line = line.strip() + if not line: continue - print(" 请求失败: 403 Forbidden") - return None - if status_code >= 400: - raise RequestClientError(f"{status_code} Error") - return text - except RequestClientError as exc: - print(f" 请求失败: {exc}") - return None - return None + try: + item = json.loads(line) + except Exception: + continue + rid = item.get("record_id") + if rid: + seen_ids.add(rid) + print(f"[resume] 已有记录 {len(seen_ids)} 条") - def run(self): - print("启动律图采集...") - if not self.cities: - print("无城市数据") - return + total_new_json = 0 + total_new_db = 0 + total_skip_db = 0 - for city_code, info in self.cities.items(): - province = info.get("province_name", "") - city = info.get("name", "") - print(f"采集 {province}-{city}") - page = 1 - while True: - payload = self._build_payload(city_code, page) - html = self._post(payload) - if not html: - break - link_count = self._parse_list(html, province, city) - if link_count == 0: - break - page += 1 - print("律图采集完成") + with open(output_path, "a", encoding="utf-8") as out: + for idx, target in enumerate(cities, start=1): + print( + f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} " + f"(area={target.area_id})" + ) + city_records = list(self.crawl_city(target)) + + city_new_json = 0 + for record in city_records: + rid = record["record_id"] + if rid in seen_ids: + continue + out.write(json.dumps(record, ensure_ascii=False) + "\n") + seen_ids.add(rid) + city_new_json += 1 + total_new_json += 1 + + city_new_db, city_skip_db = self._write_records_to_db(city_records) + total_new_db += city_new_db + total_skip_db += city_skip_db + + print( + f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " + f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" + ) + + print( + f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " + f"DB跳过{total_skip_db}条, 输出: {output_path}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="律图全新采集脚本(站点数据直采)") + parser.add_argument( + "--output", + default="/www/wwwroot/lawyers/data/six4365_records_all.jsonl", + help="输出 jsonl 文件路径", + ) + parser.add_argument( + "--max-cities", + type=int, + default=0, + help="最多采集多少个地区,0 表示不限", + ) + parser.add_argument( + "--max-pages", + type=int, + default=9999, + help="每个地区最多采集多少页", + ) + parser.add_argument( + "--city-filter", + default="", + help="按城市名称/拼音/编码过滤", + ) + parser.add_argument( + "--sleep", + type=float, + default=0.1, + help="详情页请求间隔秒数", + ) + parser.add_argument( + "--direct", + action="store_true", + help="直连模式,不使用 proxy_settings.json 代理", + ) + parser.add_argument( + "--no-db", + action="store_true", + help="只输出 JSONL,不写入数据库", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + if args.no_db: + crawler = Six4365Crawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=None, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) + return + + with Db() as db: + crawler = Six4365Crawler( + max_pages=args.max_pages, + sleep_seconds=args.sleep, + use_proxy=not args.direct, + db_connection=db, + ) + crawler.crawl( + output_path=args.output, + max_cities=args.max_cities, + city_filter=args.city_filter or None, + ) if __name__ == "__main__": - with Db() as db: - spider = Six4365Spider(db) - spider.run() + main() diff --git a/common_sites/start.sh b/common_sites/start.sh index e8f1ede..9f849b8 100755 --- a/common_sites/start.sh +++ b/common_sites/start.sh @@ -1,13 +1,80 @@ #!/usr/bin/env bash set -euo pipefail -# 切换到脚本所在目录,确保相对路径正确 -cd "$(dirname "$0")" +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +LOG_DIR="${PROJECT_ROOT}/logs" +DATA_DIR="${PROJECT_ROOT}/data" -echo "使用 request/proxy_settings.json 读取代理配置" +mkdir -p "${LOG_DIR}" "${DATA_DIR}" -nohup python3 dls.py > dls.log 2>&1 & # 大律师 -nohup python3 findlaw.py > findlaw.log 2>&1 & # 找法网 -nohup python3 lawtime.py > lawtime.log 2>&1 & # 法律快车 -nohup python3 six4365.py > six4365.log 2>&1 & # 律图 -nohup python3 hualv.py > hualv.log 2>&1 & # 华律 +if [[ -x "${PROJECT_ROOT}/.venv/bin/python" ]]; then + PYTHON_BIN="${PROJECT_ROOT}/.venv/bin/python" +else + PYTHON_BIN="python3" +fi + +RUN_MODE="${RUN_MODE:-parallel}" # parallel | sequential + +echo "[start] project=${PROJECT_ROOT}" +echo "[start] python=${PYTHON_BIN}" +echo "[start] mode=${RUN_MODE}" +echo "[start] proxy=request/proxy_settings.json" + +# 大律师(新结构采集 + 写库)可通过环境变量控制 +DLS_OUTPUT="${DLS_OUTPUT:-${DATA_DIR}/dls_records.jsonl}" +DLS_MAX_CITIES="${DLS_MAX_CITIES:-0}" +DLS_MAX_PAGES="${DLS_MAX_PAGES:-0}" +DLS_SLEEP="${DLS_SLEEP:-0.2}" +DLS_CITY_FILTER="${DLS_CITY_FILTER:-}" +DLS_EXTRA_ARGS=() + +if [[ "${DLS_MAX_CITIES}" != "0" ]]; then + DLS_EXTRA_ARGS+=(--max-cities "${DLS_MAX_CITIES}") +fi +if [[ "${DLS_MAX_PAGES}" != "0" ]]; then + DLS_EXTRA_ARGS+=(--max-pages "${DLS_MAX_PAGES}") +fi +if [[ -n "${DLS_CITY_FILTER}" ]]; then + DLS_EXTRA_ARGS+=(--city-filter "${DLS_CITY_FILTER}") +fi +DLS_EXTRA_ARGS+=(--sleep "${DLS_SLEEP}" --output "${DLS_OUTPUT}") + +if [[ "${DLS_DIRECT:-0}" == "1" ]]; then + DLS_EXTRA_ARGS+=(--direct) +fi +if [[ "${DLS_NO_DB:-0}" == "1" ]]; then + DLS_EXTRA_ARGS+=(--no-db) +fi + +run_bg() { + local name="$1" + shift + local logfile="${LOG_DIR}/${name}.log" + nohup env PYTHONUNBUFFERED=1 "$@" > "${logfile}" 2>&1 & + echo "[start] ${name} pid=$! log=${logfile}" +} + +run_fg() { + local name="$1" + shift + local logfile="${LOG_DIR}/${name}.log" + echo "[start] ${name} fg log=${logfile}" + env PYTHONUNBUFFERED=1 "$@" > "${logfile}" 2>&1 +} + +if [[ "${RUN_MODE}" == "sequential" ]]; then + run_fg "dls_fresh" "${PYTHON_BIN}" "${SCRIPT_DIR}/dls_fresh.py" "${DLS_EXTRA_ARGS[@]}" + run_fg "findlaw" "${PYTHON_BIN}" "${SCRIPT_DIR}/findlaw.py" + run_fg "lawtime" "${PYTHON_BIN}" "${SCRIPT_DIR}/lawtime.py" + run_fg "six4365" "${PYTHON_BIN}" "${SCRIPT_DIR}/six4365.py" + run_fg "hualv" "${PYTHON_BIN}" "${SCRIPT_DIR}/hualv.py" + echo "[done] sequential completed" +else + run_bg "dls_fresh" "${PYTHON_BIN}" "${SCRIPT_DIR}/dls_fresh.py" "${DLS_EXTRA_ARGS[@]}" + run_bg "findlaw" "${PYTHON_BIN}" "${SCRIPT_DIR}/findlaw.py" + run_bg "lawtime" "${PYTHON_BIN}" "${SCRIPT_DIR}/lawtime.py" + run_bg "six4365" "${PYTHON_BIN}" "${SCRIPT_DIR}/six4365.py" + run_bg "hualv" "${PYTHON_BIN}" "${SCRIPT_DIR}/hualv.py" + echo "[done] all crawlers started in background" +fi diff --git a/request/requests_client.py b/request/requests_client.py index c9252c0..c496897 100644 --- a/request/requests_client.py +++ b/request/requests_client.py @@ -51,6 +51,7 @@ class RequestsClient: self, headers: Optional[Mapping[str, str]] = None, *, + use_proxy: bool = True, retry_total: int = 0, retry_backoff_factor: float = 0.0, retry_status_forcelist: Optional[Iterable[int]] = None, @@ -58,6 +59,7 @@ class RequestsClient: default_timeout: Optional[TimeoutType] = None, ) -> None: self._base_headers: Dict[str, str] = dict(headers or {}) + self.use_proxy = bool(use_proxy) self.retry_total = int(retry_total) self.retry_backoff_factor = float(retry_backoff_factor) self.retry_status_forcelist = tuple(retry_status_forcelist or ()) @@ -67,8 +69,13 @@ class RequestsClient: def _build_session(self) -> requests.Session: session = requests.Session() - # 统一从 proxy_settings.json 注入代理,并屏蔽系统环境代理干扰 - apply_proxy(session) + if self.use_proxy: + # 统一从 proxy_settings.json 注入代理,并屏蔽系统环境代理干扰 + apply_proxy(session) + else: + # 强制直连:不读取环境代理,不走配置文件代理 + session.trust_env = False + session.proxies.clear() if self.retry_total > 0: # 适配器级重试:主要处理连接波动与指定状态码的瞬时失败 retries = Retry( @@ -109,6 +116,7 @@ class RequestsClient: # 线程场景建议 clone:复用同配置,但使用独立连接池 clone_client = RequestsClient( headers=dict(self.headers), + use_proxy=self.use_proxy, retry_total=self.retry_total, retry_backoff_factor=self.retry_backoff_factor, retry_status_forcelist=self.retry_status_forcelist, diff --git a/requirements.txt b/requirements.txt index 61d3241..6a3fec7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ requests>=2.28.0 beautifulsoup4>=4.11.0 urllib3>=1.26.0 lxml>=4.9.0 +openpyxl>=3.1.0