import argparse import hashlib import json import os import random import re import sys import time from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Set, Tuple from urllib.parse import urljoin import urllib3 from bs4 import BeautifulSoup current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) request_dir = os.path.join(project_root, "request") if request_dir not in sys.path: sys.path.insert(0, request_dir) if project_root not in sys.path: sys.path.append(project_root) from Db import Db from request.requests_client import RequestClientError, RequestsClient from utils.rate_limiter import wait_for_request urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) SITE_NAME = "64365" LEGACY_DOMAIN = "律图" SITE_BASE = "https://m.64365.com" AREA_DATA_URL = "https://image.64365.com/ui_v3/m/js/public/area-cate-data.js" LIST_API_URL = "https://m.64365.com/findLawyer/rpc/FindLawyer/LawyerRecommend/" PHONE_RE = re.compile(r"1[3-9]\d{9}") YEAR_RE = re.compile(r"(\d+)\s*年") @dataclass class CityTarget: area_id: str province_id: str province_name: str province_py: str city_name: str city_py: str @dataclass class ListCard: detail_url: str name: str specialties: List[str] score_text: str service_text: str def normalize_phone(text: str) -> str: compact = re.sub(r"\D", "", text or "") match = PHONE_RE.search(compact) return match.group(0) if match else "" class Six4365Crawler: def __init__( self, max_pages: int = 9999, sleep_seconds: float = 0.1, use_proxy: bool = True, db_connection=None, ): self.max_pages = max_pages self.sleep_seconds = max(0.0, sleep_seconds) self.db = db_connection self.client = RequestsClient( headers={ "User-Agent": ( "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " "Mobile/15E148 Safari/604.1" ), "Accept": "text/html, */*; q=0.01", "Connection": "close", }, use_proxy=use_proxy, retry_total=2, retry_backoff_factor=1, retry_status_forcelist=(429, 500, 502, 503, 504), retry_allowed_methods=("GET", "POST"), ) def _request_text( self, method: str, url: str, *, timeout: int = 20, max_retries: int = 3, referer: str = SITE_BASE, data: Optional[Dict] = None, ) -> str: headers = {"Referer": referer} last_error: Optional[Exception] = None for attempt in range(max_retries): wait_for_request() try: if method.upper() == "POST": resp = self.client.post_text( url, timeout=timeout, verify=False, headers=headers, data=data, ) else: resp = self.client.get_text( url, timeout=timeout, verify=False, headers=headers, ) code = resp.status_code if code == 403: if attempt < max_retries - 1: self.client.refresh() time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) continue raise RequestClientError(f"{code} Error: {url}") if code >= 500 and attempt < max_retries - 1: time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) continue if code >= 400: raise RequestClientError(f"{code} Error: {url}") return resp.text except Exception as exc: last_error = exc if attempt < max_retries - 1: time.sleep((2 ** attempt) + random.uniform(0.2, 0.8)) continue raise if last_error is not None: raise last_error raise RequestClientError(f"Unknown request error: {url}") def _get_text(self, url: str, *, timeout: int = 20, max_retries: int = 3, referer: str = SITE_BASE) -> str: return self._request_text( "GET", url, timeout=timeout, max_retries=max_retries, referer=referer, ) def _post_text( self, url: str, *, data: Dict, timeout: int = 20, max_retries: int = 3, referer: str = SITE_BASE, ) -> str: return self._request_text( "POST", url, timeout=timeout, max_retries=max_retries, referer=referer, data=data, ) def _extract_area_data(self, text: str) -> List[Dict]: match = re.search( r"lvtuData\.areaData\s*=\s*(\[[\s\S]*?\])\s*;\s*lvtuData\.categroyData", text, re.S, ) if not match: return [] raw = match.group(1) try: data = json.loads(raw) except Exception: return [] return data if isinstance(data, list) else [] def discover_cities(self) -> List[CityTarget]: text = self._get_text(AREA_DATA_URL, referer=SITE_BASE + "/findlawyer/") provinces = self._extract_area_data(text) targets: List[CityTarget] = [] seen_area: Set[str] = set() for province in provinces: province_id = str(province.get("id") or "").strip() province_name = str(province.get("name") or "").strip() province_py = str(province.get("py") or "").strip() child_rows = province.get("child") or [] # 常规省份 child 是地级市;直辖市 child 是区县,此时使用省级 id 抓取 if child_rows and any((row.get("child") or []) for row in child_rows): for city in child_rows: area_id = str(city.get("id") or "").strip() city_name = str(city.get("name") or "").strip() city_py = str(city.get("py") or "").strip() if not area_id or not city_name: continue if area_id in seen_area: continue seen_area.add(area_id) targets.append( CityTarget( area_id=area_id, province_id=province_id, province_name=province_name, province_py=province_py, city_name=city_name, city_py=city_py, ) ) else: if not province_id or not province_name: continue if province_id in seen_area: continue seen_area.add(province_id) targets.append( CityTarget( area_id=province_id, province_id=province_id, province_name=province_name, province_py=province_py, city_name=province_name, city_py=province_py, ) ) return targets def _build_payload(self, area_id: str, page: int) -> Dict[str, str]: ua = self.client.headers.get("User-Agent", "") return { "AdCode": "", "RegionId": str(area_id), "CategoryId": "", "MaxNumber": "", "OnlyData": "true", "IgnoreButton": "", "LawyerRecommendRequest[AreaId]": str(area_id), "LawyerRecommendRequest[LawCategoryIds]": "", "LawyerRecommendRequest[LawFirmPersonCount]": "", "LawyerRecommendRequest[LawFirmScale]": "", "LawyerRecommendRequest[OrderType]": "0", "LawyerRecommendRequest[PageIndex]": str(page), "LawyerRecommendRequest[PageSize]": "10", "LawyerRecommendRequest[TagId]": "", "LawyerRecommendRequest[Type]": "1", "LawyerRecommendRequest[AccountType]": "", "LawyerRecommendRequest[AddLawyer]": "true", "LawyerRecommendRequest[Content]": "", "LawyerRecommendRequest[Duty]": "", "LawyerRecommendRequest[ExcludeLawyerIds][]": "", "LawyerRecommendRequest[RefferUrl]": "", "LawyerRecommendRequest[RequestUrl]": "https://m.64365.com/findlawyer/", "LawyerRecommendRequest[resource_type_name]": "", "LawyerRecommendRequest[UserAgent]": ua, "LawyerRecommendRequest[AddLawyerWithNoData]": "false", "ShowCaseButton": "true", } def fetch_list_html(self, target: CityTarget, page: int) -> str: payload = self._build_payload(target.area_id, page) return self._post_text( LIST_API_URL, data=payload, referer=SITE_BASE + "/findlawyer/", ) def parse_list_cards(self, html: str) -> List[ListCard]: soup = BeautifulSoup(html, "html.parser") cards: List[ListCard] = [] seen: Set[str] = set() for anchor in soup.select("a.lawyer[href]"): href = (anchor.get("href") or "").strip() if not href: continue detail_url = urljoin(SITE_BASE, href) if detail_url in seen: continue seen.add(detail_url) name = "" name_tag = anchor.select_one("b.name") if name_tag: name = name_tag.get_text(strip=True) specialties: List[str] = [] skill_tag = anchor.select_one("div.skill") if skill_tag: raw = skill_tag.get_text(" ", strip=True).replace("擅长:", "") specialties = [x.strip() for x in re.split(r"[、,,]", raw) if x.strip()] score_text = "" score_tag = anchor.select_one("div.info span[title='评分'] em") if score_tag: score_text = score_tag.get_text(strip=True) service_text = "" service_tag = anchor.select_one("div.info") if service_tag: service_text = service_tag.get_text(" ", strip=True) cards.append( ListCard( detail_url=detail_url, name=name, specialties=specialties, score_text=score_text, service_text=service_text, ) ) return cards def parse_detail(self, detail_url: str) -> Dict: info_url = detail_url.rstrip("/") + "/info/" html = self._get_text(info_url, referer=detail_url) soup = BeautifulSoup(html, "html.parser") name = "" law_firm = "" phone = "" practice_years: Optional[int] = None office_area = "" address = "" specialties: List[str] = [] for li in soup.select("ul.intro-basic-bar li"): label_tag = li.select_one("span.label") value_tag = li.select_one("div.txt") if not label_tag or not value_tag: continue label = label_tag.get_text(" ", strip=True).replace(":", "") value = value_tag.get_text(" ", strip=True) if "姓名" in label and not name: name = value elif "执业律所" in label and not law_firm: law_firm = value elif "联系电话" in label and not phone: phone = normalize_phone(value) elif "执业年限" in label and practice_years is None: year_match = YEAR_RE.search(value) if year_match: try: practice_years = int(year_match.group(1)) except Exception: practice_years = None elif "办公地区" in label and not office_area: office_area = value elif "办公地址" in label and not address: address = value text = soup.get_text(" ", strip=True) if not phone: phone = normalize_phone(text) if not name and soup.title: title = soup.title.get_text(" ", strip=True) match = re.search(r"([^\s_,,。]+?)律师", title) if match: name = match.group(1).strip() skill_match = re.search(r"擅长:([^\n]+)", text) if skill_match: specialties = [x.strip() for x in re.split(r"[、,,]", skill_match.group(1)) if x.strip()] return { "name": name, "law_firm": law_firm, "phone": phone, "practice_years": practice_years, "office_area": office_area, "address": address, "specialties": specialties, "detail_url": detail_url, "info_url": info_url, } def crawl_city(self, target: CityTarget) -> Iterable[Dict]: seen_detail_urls: Set[str] = set() page_first_seen: Set[str] = set() for page in range(1, self.max_pages + 1): try: html = self.fetch_list_html(target, page) except Exception as exc: print(f"[list] 失败 area={target.area_id} p{page}: {exc}") break cards = self.parse_list_cards(html) if not cards: break first_url = cards[0].detail_url if first_url in page_first_seen: break page_first_seen.add(first_url) for card in cards: if card.detail_url in seen_detail_urls: continue seen_detail_urls.add(card.detail_url) try: detail = self.parse_detail(card.detail_url) except Exception as exc: print(f"[detail] 失败 {card.detail_url}: {exc}") continue now = int(time.time()) uid_match = re.search(r"/lawyer/(\d+)/", card.detail_url) uid = uid_match.group(1) if uid_match else card.detail_url record_id = hashlib.md5(uid.encode("utf-8")).hexdigest() yield { "record_id": record_id, "collected_at": now, "source": { "site": SITE_NAME, "province_id": target.province_id, "province": target.province_name, "province_py": target.province_py, "area_id": target.area_id, "city": target.city_name, "city_py": target.city_py, "page": page, "detail_url": card.detail_url, "info_url": detail.get("info_url", ""), }, "list_snapshot": { "name": card.name, "specialties": card.specialties, "score_text": card.score_text, "service_text": card.service_text, }, "profile": { "name": detail.get("name") or card.name, "law_firm": detail.get("law_firm") or "", "phone": detail.get("phone") or "", "practice_years": detail.get("practice_years"), "office_area": detail.get("office_area") or "", "address": detail.get("address") or "", "specialties": detail.get("specialties") or card.specialties, }, } if self.sleep_seconds: time.sleep(self.sleep_seconds) def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]: source = record.get("source", {}) or {} profile = record.get("profile", {}) or {} phone = normalize_phone(profile.get("phone", "")) if not phone: return None province = (source.get("province") or "").strip() city = (source.get("city") or province).strip() return { "name": (profile.get("name") or "").strip(), "law_firm": (profile.get("law_firm") or "").strip(), "province": province, "city": city, "phone": phone, "url": (source.get("info_url") or source.get("detail_url") or "").strip(), "domain": LEGACY_DOMAIN, "create_time": int(record.get("collected_at") or time.time()), "params": json.dumps(record, ensure_ascii=False), } def _existing_phones_in_db(self, phones: List[str]) -> Set[str]: if not self.db or not phones: return set() deduped = sorted({p for p in phones if p}) if not deduped: return set() existing: Set[str] = set() cur = self.db.db.cursor() try: chunk_size = 500 for i in range(0, len(deduped), chunk_size): chunk = deduped[i:i + chunk_size] placeholders = ",".join(["%s"] * len(chunk)) sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" cur.execute(sql, [LEGACY_DOMAIN, *chunk]) for row in cur.fetchall(): existing.add(row[0]) finally: cur.close() return existing def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int]: if not self.db: return 0, 0 rows: List[Dict[str, str]] = [] for record in records: row = self._to_legacy_lawyer_row(record) if row: rows.append(row) if not rows: return 0, 0 existing = self._existing_phones_in_db([row["phone"] for row in rows]) inserted = 0 skipped = 0 for row in rows: phone = row.get("phone", "") if not phone or phone in existing: skipped += 1 continue try: self.db.insert_data("lawyer", row) existing.add(phone) inserted += 1 except Exception as exc: skipped += 1 print(f"[db] 插入失败 phone={phone} url={row.get('url', '')}: {exc}") return inserted, skipped def crawl( self, output_path: str, max_cities: int = 0, city_filter: Optional[str] = None, ) -> None: cities = self.discover_cities() print(f"[discover] 共发现地区 {len(cities)} 个") if city_filter: key = city_filter.strip().lower() cities = [ c for c in cities if key in c.city_name.lower() or key in c.city_py.lower() or key in c.area_id ] print(f"[discover] 过滤后地区 {len(cities)} 个, filter={city_filter}") if max_cities > 0: cities = cities[:max_cities] print(f"[discover] 截断地区数 {len(cities)}") os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) seen_ids: Set[str] = set() if os.path.exists(output_path): with open(output_path, "r", encoding="utf-8") as old_file: for line in old_file: line = line.strip() if not line: continue try: item = json.loads(line) except Exception: continue rid = item.get("record_id") if rid: seen_ids.add(rid) print(f"[resume] 已有记录 {len(seen_ids)} 条") total_new_json = 0 total_new_db = 0 total_skip_db = 0 with open(output_path, "a", encoding="utf-8") as out: for idx, target in enumerate(cities, start=1): print( f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} " f"(area={target.area_id})" ) city_records = list(self.crawl_city(target)) city_new_json = 0 for record in city_records: rid = record["record_id"] if rid in seen_ids: continue out.write(json.dumps(record, ensure_ascii=False) + "\n") seen_ids.add(rid) city_new_json += 1 total_new_json += 1 city_new_db, city_skip_db = self._write_records_to_db(city_records) total_new_db += city_new_db total_skip_db += city_skip_db print( f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, " f"DB新增{city_new_db}条, DB跳过{city_skip_db}条" ) print( f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, " f"DB跳过{total_skip_db}条, 输出: {output_path}" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="律图全新采集脚本(站点数据直采)") parser.add_argument( "--output", default="/www/wwwroot/lawyers/data/six4365_records_all.jsonl", help="输出 jsonl 文件路径", ) parser.add_argument( "--max-cities", type=int, default=0, help="最多采集多少个地区,0 表示不限", ) parser.add_argument( "--max-pages", type=int, default=9999, help="每个地区最多采集多少页", ) parser.add_argument( "--city-filter", default="", help="按城市名称/拼音/编码过滤", ) parser.add_argument( "--sleep", type=float, default=0.1, help="详情页请求间隔秒数", ) parser.add_argument( "--direct", action="store_true", help="直连模式,不使用 proxy_settings.json 代理", ) parser.add_argument( "--no-db", action="store_true", help="只输出 JSONL,不写入数据库", ) return parser.parse_args() def main(): args = parse_args() if args.no_db: crawler = Six4365Crawler( max_pages=args.max_pages, sleep_seconds=args.sleep, use_proxy=not args.direct, db_connection=None, ) crawler.crawl( output_path=args.output, max_cities=args.max_cities, city_filter=args.city_filter or None, ) return with Db() as db: crawler = Six4365Crawler( max_pages=args.max_pages, sleep_seconds=args.sleep, use_proxy=not args.direct, db_connection=db, ) crawler.crawl( output_path=args.output, max_cities=args.max_cities, city_filter=args.city_filter or None, ) if __name__ == "__main__": main()