Files
lawyers/common_sites/hualv.py
T
hello-dd-code c2b77975c1 feat: add douyin data export functionality to lawyer export script
- Introduced a new command-line argument `--douyin-only` to export data specifically for Douyin, including additional fields such as sec_uid, douyin_uid, and user information.
- Updated the README to include instructions for exporting Douyin data.
- Enhanced the export logic to accommodate new fields when exporting Douyin-specific data.
2026-03-09 21:26:50 +08:00

842 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import ast
import hashlib
import json
import os
import pymysql
import random
import re
import sys
import time
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Set, Tuple
from urllib.parse import urljoin
import urllib3
from bs4 import BeautifulSoup
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
from Db import Db
from request.requests_client import RequestClientError, RequestsClient
from utils.rate_limiter import wait_for_request
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
SITE_NAME = "hualv"
LEGACY_DOMAIN = "华律"
SITE_BASE = "https://m.66law.cn"
CITY_DATA_URL = "https://cache.66law.cn/dist/main-v2.0.js"
LIST_API_URL = "https://m.66law.cn/findlawyer/rpc/loadlawyerlist/"
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
PHONE_CANDIDATE_RE = re.compile(r"(?<!\d)(?:\+?86[-\s]?)?1[3-9](?:[\s-]?\d){9}(?!\d)")
YEAR_RE = re.compile(r"执业\s*(\d+)\s*年")
@dataclass
class CityTarget:
province_id: int
province_name: str
city_id: int
city_name: str
def normalize_phone(text: str) -> str:
if not text:
return ""
# 避免把邮箱前缀中的数字误识别为手机号
sanitized = EMAIL_RE.sub(" ", str(text))
for match in PHONE_CANDIDATE_RE.finditer(sanitized):
candidate = match.group(0)
compact = re.sub(r"\D", "", candidate)
if compact.startswith("86") and len(compact) == 13:
compact = compact[2:]
if len(compact) == 11 and compact.startswith("1") and compact[1] in "3456789":
return compact
return ""
def strip_html_tags(text: str) -> str:
return re.sub(r"<[^>]+>", "", text or "").strip()
class HualvCrawler:
def __init__(
self,
max_pages: int = 9999,
sleep_seconds: float = 0.15,
use_proxy: bool = True,
db_connection=None,
):
self.max_pages = max_pages
self.sleep_seconds = max(0.0, sleep_seconds)
self.db = db_connection
self.client = RequestsClient(
headers={
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 "
"Mobile/15E148 Safari/604.1"
),
"Accept": "application/json, text/javascript, */*; q=0.01",
"X-Requested-With": "XMLHttpRequest",
"Connection": "close",
},
use_proxy=use_proxy,
retry_total=2,
retry_backoff_factor=1,
retry_status_forcelist=(429, 500, 502, 503, 504),
retry_allowed_methods=("GET", "POST"),
)
def _request_text(
self,
method: str,
url: str,
*,
timeout: int = 20,
max_retries: int = 3,
referer: str = SITE_BASE,
data: Optional[Dict] = None,
) -> str:
headers = {"Referer": referer}
last_error: Optional[Exception] = None
for attempt in range(max_retries):
wait_for_request()
try:
if method.upper() == "POST":
resp = self.client.post_text(
url,
timeout=timeout,
verify=False,
headers=headers,
data=data,
)
else:
resp = self.client.get_text(
url,
timeout=timeout,
verify=False,
headers=headers,
)
code = resp.status_code
if code == 403:
if attempt < max_retries - 1:
self.client.refresh()
time.sleep((2 ** attempt) + random.uniform(0.2, 0.8))
continue
raise RequestClientError(f"{code} Error: {url}")
if code >= 500 and attempt < max_retries - 1:
time.sleep((2 ** attempt) + random.uniform(0.2, 0.8))
continue
if code >= 400:
raise RequestClientError(f"{code} Error: {url}")
return resp.text
except Exception as exc:
last_error = exc
if attempt < max_retries - 1:
time.sleep((2 ** attempt) + random.uniform(0.2, 0.8))
continue
raise
if last_error is not None:
raise last_error
raise RequestClientError(f"Unknown request error: {url}")
def _get_text(self, url: str, *, timeout: int = 20, max_retries: int = 3, referer: str = SITE_BASE) -> str:
return self._request_text(
"GET",
url,
timeout=timeout,
max_retries=max_retries,
referer=referer,
)
def _post_text(
self,
url: str,
*,
data: Dict,
timeout: int = 20,
max_retries: int = 3,
referer: str = SITE_BASE,
) -> str:
return self._request_text(
"POST",
url,
timeout=timeout,
max_retries=max_retries,
referer=referer,
data=data,
)
def _extract_spc_location(self, script_text: str) -> List:
# main-v2.js 内置了 sPCLocation=new Array(...),后面紧跟 cateinfo 数组
marker = "sPCLocation = new Array("
start = script_text.find(marker)
if start == -1:
marker = "sPCLocation=new Array("
start = script_text.find(marker)
if start == -1:
return []
start += len(marker)
next_marker = script_text.find("cateinfo = new Array(", start)
if next_marker == -1:
next_marker = script_text.find("cateinfo=new Array(", start)
if next_marker != -1:
end = script_text.rfind(");", start, next_marker)
else:
end = script_text.find(");", start)
if end == -1 or end <= start:
return []
raw = "[" + script_text[start:end] + "]"
try:
data = ast.literal_eval(raw)
except Exception:
return []
return data if isinstance(data, list) else []
def discover_cities(self) -> List[CityTarget]:
script_text = self._get_text(CITY_DATA_URL, referer=SITE_BASE + "/findlawyer/")
rows = self._extract_spc_location(script_text)
targets: List[CityTarget] = []
seen: Set[Tuple[int, int]] = set()
for province in rows:
if not isinstance(province, list) or len(province) < 3:
continue
try:
province_id = int(province[0])
except Exception:
continue
province_name = str(province[1] or "").strip()
city_rows = province[2] if isinstance(province[2], list) else []
for city in city_rows:
if not isinstance(city, list) or len(city) < 2:
continue
try:
city_id = int(city[0])
except Exception:
continue
city_name = str(city[1] or "").strip()
if city_id <= 0 or not city_name:
continue
key = (province_id, city_id)
if key in seen:
continue
seen.add(key)
targets.append(
CityTarget(
province_id=province_id,
province_name=province_name,
city_id=city_id,
city_name=city_name,
)
)
return targets
def fetch_list_page(self, target: CityTarget, page: int) -> Tuple[List[Dict], int]:
payload = {
"pid": str(target.province_id),
"cid": str(target.city_id),
"page": str(page),
}
text = self._post_text(
LIST_API_URL,
data=payload,
referer=SITE_BASE + "/findlawyer/",
)
data = json.loads((text or "").strip().lstrip("\ufeff") or "{}")
items = data.get("lawyerList") or data.get("queryLawyerList") or []
if not isinstance(items, list):
items = []
page_count = 0
try:
page_count = int((data.get("lawyerItems") or {}).get("pageCount") or 0)
except Exception:
page_count = 0
return items, page_count
def parse_detail(self, detail_url: str) -> Dict:
contact_url = detail_url.rstrip("/") + "/lawyer_contact.aspx"
html = self._get_text(contact_url, referer=detail_url)
soup = BeautifulSoup(html, "html.parser")
full_text = soup.get_text(" ", strip=True)
name = ""
law_firm = ""
phone = ""
email = ""
address = ""
license_no = ""
practice_years: Optional[int] = None
name_tag = soup.select_one(".logo-box .title b")
if name_tag:
name = name_tag.get_text(strip=True).replace("律师", "").strip()
if not name and soup.title:
match = re.search(r"([^\s,,。_]+?)律师", soup.title.get_text(" ", strip=True))
if match:
name = match.group(1).strip()
phone_candidates = [
soup.select_one(".logo-box .r-bar .tel").get_text(" ", strip=True)
if soup.select_one(".logo-box .r-bar .tel")
else "",
soup.select_one(".lawyer-show ul.info").get_text(" ", strip=True)
if soup.select_one(".lawyer-show ul.info")
else "",
full_text,
]
for candidate in phone_candidates:
phone = normalize_phone(candidate)
if phone:
break
for li in soup.select(".lawyer-show ul.info li"):
li_text = li.get_text(" ", strip=True)
if ("事务所" in li_text or "法律服务所" in li_text) and not law_firm:
law_firm = li_text
if not law_firm:
match = re.search(r'"affiliation":\{"@type":"Organization","name":"([^"]+)"', html)
if match:
law_firm = match.group(1).strip()
match = re.search(r'"identifier":"([^"]+)"', html)
if match:
license_no = match.group(1).strip()
match = re.search(r'"streetAddress":"([^"]+)"', html)
if match:
address = match.group(1).strip()
email_match = EMAIL_RE.search(html)
if email_match:
email = email_match.group(0).strip()
year_match = YEAR_RE.search(full_text)
if year_match:
try:
practice_years = int(year_match.group(1))
except Exception:
practice_years = None
specialties = [node.get_text(strip=True) for node in soup.select(".tag-h38 span")]
specialties = [x for x in specialties if x]
return {
"name": name,
"law_firm": law_firm,
"phone": phone,
"email": email,
"address": address,
"license_no": license_no,
"practice_years": practice_years,
"specialties": specialties,
"detail_url": detail_url,
"contact_url": contact_url,
}
def crawl_city(self, target: CityTarget) -> Iterable[Dict]:
seen_details: Set[str] = set()
for page in range(1, self.max_pages + 1):
try:
items, page_count = self.fetch_list_page(target, page)
except Exception as exc:
print(f"[list] 失败 pid={target.province_id} cid={target.city_id} p{page}: {exc}")
break
if not items:
break
for item in items:
detail_url = str(item.get("lawyerUrl") or "").strip()
if not detail_url:
continue
if detail_url.startswith("//"):
detail_url = "https:" + detail_url
if not detail_url.startswith("http"):
detail_url = urljoin(SITE_BASE, detail_url)
if detail_url in seen_details:
continue
seen_details.add(detail_url)
try:
detail = self.parse_detail(detail_url)
except Exception as exc:
print(f"[detail] 失败 {detail_url}: {exc}")
continue
now = int(time.time())
uid = str(item.get("lawyerId") or item.get("globalUserId") or detail_url)
record_id = hashlib.md5(uid.encode("utf-8")).hexdigest()
list_name = str(item.get("name") or "").replace("律师", "").strip()
category_text = str(item.get("categoryNames") or "").strip()
category_arr = [x.strip() for x in re.split(r"[、,]", category_text) if x.strip()]
yield {
"record_id": record_id,
"collected_at": now,
"source": {
"site": SITE_NAME,
"province_id": target.province_id,
"province": target.province_name,
"city_id": target.city_id,
"city": target.city_name,
"page": page,
"detail_url": detail_url,
"contact_url": detail.get("contact_url", ""),
},
"list_snapshot": {
"lawyer_id": item.get("lawyerId"),
"name": list_name,
"category_names": category_arr,
"help_count": strip_html_tags(str(item.get("helpCount") or "")),
"comment_score": strip_html_tags(str(item.get("commentScore") or "")),
"response_time": str(item.get("responseTime") or "").strip(),
"year": item.get("year"),
"is_adv": bool(item.get("isAdv")),
},
"profile": {
"name": detail.get("name") or list_name,
"law_firm": detail.get("law_firm") or "",
"phone": detail.get("phone") or "",
"email": detail.get("email") or "",
"address": detail.get("address") or "",
"license_no": detail.get("license_no") or "",
"practice_years": detail.get("practice_years"),
"specialties": detail.get("specialties") or category_arr,
},
"raw": item,
}
if self.sleep_seconds:
time.sleep(self.sleep_seconds)
if page_count > 0 and page >= page_count:
break
def _to_legacy_lawyer_row(self, record: Dict) -> Optional[Dict[str, str]]:
source = record.get("source", {}) or {}
profile = record.get("profile", {}) or {}
phone = normalize_phone(profile.get("phone", ""))
if not phone:
return None
province = (source.get("province") or "").strip()
city = (source.get("city") or province).strip()
return {
"name": (profile.get("name") or "").strip(),
"law_firm": (profile.get("law_firm") or "").strip(),
"province": province,
"city": city,
"phone": phone,
"url": (source.get("contact_url") or source.get("detail_url") or "").strip(),
"domain": LEGACY_DOMAIN,
"create_time": int(record.get("collected_at") or time.time()),
"params": json.dumps(record, ensure_ascii=False),
}
def _existing_phones_in_db(self, phones: List[str]) -> Set[str]:
if not self.db or not phones:
return set()
deduped = sorted({p for p in phones if p})
if not deduped:
return set()
existing: Set[str] = set()
cur = self.db.db.cursor()
try:
chunk_size = 500
for i in range(0, len(deduped), chunk_size):
chunk = deduped[i:i + chunk_size]
placeholders = ",".join(["%s"] * len(chunk))
sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})"
cur.execute(sql, [LEGACY_DOMAIN, *chunk])
for row in cur.fetchall():
existing.add(row[0])
finally:
cur.close()
return existing
def _extract_email_from_params_text(self, params_text: str) -> str:
if not params_text:
return ""
try:
data = json.loads(params_text)
except Exception:
return ""
if not isinstance(data, dict):
return ""
profile = data.get("profile") or {}
if not isinstance(profile, dict):
return ""
return str(profile.get("email") or "").strip()
def _is_phone_from_email_prefix(self, phone: str, email: str) -> bool:
phone_text = str(phone or "").strip()
email_text = str(email or "").strip()
if not phone_text or not email_text or "@" not in email_text:
return False
prefix = email_text.split("@", 1)[0]
prefix_phone = normalize_phone(prefix)
return bool(prefix_phone) and prefix_phone == phone_text
def _existing_rows_by_urls(self, urls: List[str]) -> Dict[str, List[Dict[str, str]]]:
if not self.db or not urls:
return {}
deduped = sorted({u for u in urls if u})
if not deduped:
return {}
result: Dict[str, List[Dict[str, str]]] = {}
cur = self.db.db.cursor(pymysql.cursors.DictCursor)
try:
chunk_size = 200
for i in range(0, len(deduped), chunk_size):
chunk = deduped[i:i + chunk_size]
placeholders = ",".join(["%s"] * len(chunk))
sql = (
"SELECT id, phone, url, params FROM lawyer "
f"WHERE domain=%s AND url IN ({placeholders})"
)
cur.execute(sql, [LEGACY_DOMAIN, *chunk])
for row in cur.fetchall() or []:
key = str(row.get("url") or "")
if not key:
continue
result.setdefault(key, []).append(row)
finally:
cur.close()
return result
def _cleanup_dirty_duplicates_for_urls(self, urls: List[str]) -> int:
if not self.db:
return 0
by_url = self._existing_rows_by_urls(urls)
if not by_url:
return 0
delete_ids: List[int] = []
for _, rows in by_url.items():
if len(rows) <= 1:
continue
dirty_ids: List[int] = []
has_clean = False
for row in rows:
row_id = int(row.get("id") or 0)
row_phone = str(row.get("phone") or "").strip()
row_email = self._extract_email_from_params_text(str(row.get("params") or ""))
if row_id <= 0:
continue
if self._is_phone_from_email_prefix(row_phone, row_email):
dirty_ids.append(row_id)
else:
has_clean = True
if has_clean and dirty_ids:
delete_ids.extend(dirty_ids)
if not delete_ids:
return 0
removed = 0
cur = self.db.db.cursor()
try:
chunk_size = 300
for i in range(0, len(delete_ids), chunk_size):
chunk = delete_ids[i:i + chunk_size]
placeholders = ",".join(["%s"] * len(chunk))
sql = f"DELETE FROM lawyer WHERE id IN ({placeholders})"
cur.execute(sql, chunk)
removed += cur.rowcount
self.db.db.commit()
finally:
cur.close()
return removed
def _write_records_to_db(self, records: List[Dict]) -> Tuple[int, int, int, int]:
if not self.db:
return 0, 0, 0, 0
rows: List[Dict[str, str]] = []
for record in records:
row = self._to_legacy_lawyer_row(record)
if row:
rows.append(row)
if not rows:
return 0, 0, 0, 0
existing = self._existing_phones_in_db([row["phone"] for row in rows])
existing_by_url = self._existing_rows_by_urls([str(row.get("url") or "") for row in rows])
inserted = 0
skipped = 0
repaired = 0
cur = self.db.db.cursor()
update_sql = (
"UPDATE lawyer SET name=%s, phone=%s, law_firm=%s, province=%s, city=%s, "
"url=%s, domain=%s, create_time=%s, params=%s WHERE id=%s"
)
for row in rows:
phone = str(row.get("phone") or "").strip()
url = str(row.get("url") or "").strip()
if not phone:
skipped += 1
continue
same_url_rows = existing_by_url.get(url, []) if url else []
if same_url_rows:
if any(str(item.get("phone") or "").strip() == phone for item in same_url_rows):
skipped += 1
continue
row_email = self._extract_email_from_params_text(str(row.get("params") or ""))
new_is_dirty = self._is_phone_from_email_prefix(phone, row_email)
repair_target = None
for item in same_url_rows:
old_phone = str(item.get("phone") or "").strip()
old_email = self._extract_email_from_params_text(str(item.get("params") or ""))
if self._is_phone_from_email_prefix(old_phone, old_email):
repair_target = item
break
if repair_target and not new_is_dirty:
try:
cur.execute(
update_sql,
(
row.get("name") or "",
phone,
row.get("law_firm") or "",
row.get("province") or "",
row.get("city") or "",
row.get("url") or "",
row.get("domain") or LEGACY_DOMAIN,
int(row.get("create_time") or time.time()),
row.get("params") or "{}",
int(repair_target.get("id") or 0),
),
)
self.db.db.commit()
repaired += 1
existing.add(phone)
old_phone = str(repair_target.get("phone") or "").strip()
if old_phone:
existing.discard(old_phone)
repair_target["phone"] = phone
repair_target["params"] = row.get("params") or "{}"
continue
except Exception as exc:
print(f"[db] 修复失败 phone={phone} url={url}: {exc}")
if not phone or phone in existing:
skipped += 1
continue
try:
self.db.insert_data("lawyer", row)
existing.add(phone)
inserted += 1
except Exception as exc:
skipped += 1
print(f"[db] 插入失败 phone={phone} url={url}: {exc}")
cur.close()
cleaned = self._cleanup_dirty_duplicates_for_urls([str(row.get("url") or "") for row in rows])
return inserted, skipped, repaired, cleaned
def crawl(
self,
output_path: str,
max_cities: int = 0,
city_filter: Optional[str] = None,
) -> None:
cities = self.discover_cities()
print(f"[discover] 共发现城市 {len(cities)}")
if city_filter:
key = city_filter.strip().lower()
cities = [
c for c in cities
if key in c.city_name.lower() or key in str(c.city_id).lower()
]
print(f"[discover] 过滤后城市 {len(cities)} 个, filter={city_filter}")
if max_cities > 0:
cities = cities[:max_cities]
print(f"[discover] 截断城市数 {len(cities)}")
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
seen_ids: Set[str] = set()
if os.path.exists(output_path):
with open(output_path, "r", encoding="utf-8") as old_file:
for line in old_file:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
except Exception:
continue
rid = item.get("record_id")
if rid:
seen_ids.add(rid)
print(f"[resume] 已有记录 {len(seen_ids)}")
total_new_json = 0
total_new_db = 0
total_skip_db = 0
total_repair_db = 0
total_clean_db = 0
with open(output_path, "a", encoding="utf-8") as out:
for idx, target in enumerate(cities, start=1):
print(
f"[city {idx}/{len(cities)}] {target.province_name}-{target.city_name} "
f"(pid={target.province_id}, cid={target.city_id})"
)
city_records = list(self.crawl_city(target))
city_new_json = 0
for record in city_records:
rid = record["record_id"]
if rid in seen_ids:
continue
out.write(json.dumps(record, ensure_ascii=False) + "\n")
seen_ids.add(rid)
city_new_json += 1
total_new_json += 1
city_new_db, city_skip_db, city_repair_db, city_clean_db = self._write_records_to_db(city_records)
total_new_db += city_new_db
total_skip_db += city_skip_db
total_repair_db += city_repair_db
total_clean_db += city_clean_db
print(
f"[city] 采集{len(city_records)}条, JSON新增{city_new_json}条, "
f"DB新增{city_new_db}条, DB修复{city_repair_db}条, "
f"DB清理{city_clean_db}条, DB跳过{city_skip_db}"
)
print(
f"[done] JSON新增{total_new_json}条, DB新增{total_new_db}条, "
f"DB修复{total_repair_db}条, DB清理{total_clean_db}条, "
f"DB跳过{total_skip_db}条, 输出: {output_path}"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="华律网全新采集脚本(站点数据直采)")
parser.add_argument(
"--output",
default="/www/wwwroot/lawyers/data/hualv_records_all.jsonl",
help="输出 jsonl 文件路径",
)
parser.add_argument(
"--max-cities",
type=int,
default=0,
help="最多采集多少个城市,0 表示不限",
)
parser.add_argument(
"--max-pages",
type=int,
default=9999,
help="每个城市最多采集多少页",
)
parser.add_argument(
"--city-filter",
default="",
help="按城市名称或城市编码过滤,如 beijing / 110100",
)
parser.add_argument(
"--sleep",
type=float,
default=0.15,
help="详情页请求间隔秒数",
)
parser.add_argument(
"--direct",
action="store_true",
help="直连模式,不使用 proxy_settings.json 代理",
)
parser.add_argument(
"--no-db",
action="store_true",
help="只输出 JSONL,不写入数据库",
)
return parser.parse_args()
def main():
args = parse_args()
if args.no_db:
crawler = HualvCrawler(
max_pages=args.max_pages,
sleep_seconds=args.sleep,
use_proxy=not args.direct,
db_connection=None,
)
crawler.crawl(
output_path=args.output,
max_cities=args.max_cities,
city_filter=args.city_filter or None,
)
return
with Db() as db:
crawler = HualvCrawler(
max_pages=args.max_pages,
sleep_seconds=args.sleep,
use_proxy=not args.direct,
db_connection=db,
)
crawler.crawl(
output_path=args.output,
max_cities=args.max_cities,
city_filter=args.city_filter or None,
)
if __name__ == "__main__":
main()