Files
2026-04-03 16:06:28 +08:00

345 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import re
import sys
import time
import random
from typing import Dict, Optional
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
import requests
from bs4 import BeautifulSoup
from request.proxy_config import get_proxies, report_proxy_status
from Db import Db
from config import HEADERS
from utils.rate_limiter import request_slot
LIST_URL = "https://m.66law.cn/findlawyer/rpc/loadlawyerlist/"
DOMAIN = "华律"
class HualvSpider:
def __init__(self, db_connection):
self.db = db_connection
self.session = self._build_session()
self.areas = self._load_areas()
def _build_session(self) -> requests.Session:
report_proxy_status()
session = requests.Session()
session.trust_env = False
proxies = get_proxies()
if proxies:
session.proxies.update(proxies)
else:
session.proxies.clear()
custom_headers = HEADERS.copy()
custom_headers['User-Agent'] = (
'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) '
'AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 '
'Mobile/15E148 Safari/604.1'
)
custom_headers["Connection"] = "close"
session.headers.update(custom_headers)
return session
def _refresh_session(self) -> None:
try:
self.session.close()
except Exception:
pass
self.session = self._build_session()
def _load_areas(self):
tables = ("area_new", "area2", "area")
last_error = None
for table in tables:
try:
provinces = self.db.select_data(
table,
"code, province, pinyin, id",
"domain='66law' AND level=1"
) or []
cities = self.db.select_data(
table,
"code, city, province, pid",
"domain='66law' AND level=2"
) or []
except Exception as exc:
last_error = exc
continue
if not cities:
continue
province_map = {p.get('id'): {"code": p.get('code'), "name": p.get('province')} for p in provinces}
city_map = {}
for city in cities:
province_info = province_map.get(city.get('pid'), {}) or {}
province_code = province_info.get('code')
city_map[city.get('code')] = {
"name": city.get('city'),
"province": city.get('province'),
"province_code": province_code,
}
print(f"[华律] 城市来源表: {table}, 城市数: {len(cities)}")
return city_map
if last_error:
print(f"[华律] 加载地区数据失败: {last_error}")
print("[华律] 无城市数据(已尝试 area_new/area2/area")
return {}
def _post(self, data: Dict[str, str], max_retries: int = 3) -> Optional[Dict]:
for attempt in range(max_retries):
try:
with request_slot():
resp = self.session.post(LIST_URL, data=data, timeout=20, verify=False)
status_code = resp.status_code
text = resp.text
resp.close()
if status_code == 403:
if attempt < max_retries - 1:
wait_time = 2 ** attempt + random.uniform(0.3, 1.0)
print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})")
self._refresh_session()
time.sleep(wait_time)
continue
print("请求失败: 403 Forbidden")
return None
if status_code >= 400:
raise requests.exceptions.HTTPError(f"{status_code} Error")
try:
return json.loads(text)
except ValueError as exc:
print(f"解析JSON失败: {exc}")
return None
except requests.exceptions.RequestException as exc:
print(f"请求失败: {exc}")
return None
return None
def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]:
contact_url = f"{url}lawyer_contact.aspx"
print(f" 详情: {contact_url}")
existing = self.db.select_data(
"lawyer",
"id, avatar_url",
f"domain='{DOMAIN}' AND url='{contact_url}'"
)
existing_id = None
if existing:
existing_id = existing[0].get("id")
avatar = (existing[0].get("avatar_url") or "").strip()
if avatar:
print(" -- 已存在且头像已补全,跳过")
return None
html = self._get_detail(contact_url)
if not html:
return None
soup = BeautifulSoup(html, "html.parser")
info_list = soup.find("ul", class_="information-list")
if not info_list:
return None
phone = ""
law_firm = ""
for li in info_list.find_all("li"):
text = li.get_text(strip=True)
if "手机号" in text:
cleaned = text.replace("手机号", "").replace("(咨询请说明来自 华律网)", "").strip()
match = re.search(r"1\d{10}", cleaned.replace('-', '').replace(' ', ''))
if match:
phone = match.group(0)
if "执业单位" in text:
law_firm = text.replace("执业单位", "").strip()
name = ""
breadcrumb = soup.find("div", class_="weizhi")
if breadcrumb:
links = breadcrumb.find_all("a")
if len(links) > 2:
name = links[2].get_text(strip=True)
phone = phone.replace('-', '').strip()
if not phone or not re.fullmatch(r"1\d{10}", phone):
print(" 无手机号,跳过")
return None
avatar_url, site_time = self._extract_avatar_and_time(soup)
data = {
"phone": phone,
"province": province,
"city": city,
"law_firm": law_firm,
"url": contact_url,
"avatar_url": avatar_url,
"create_time": int(time.time()),
"site_time": site_time,
"domain": DOMAIN,
"name": name,
"params": json.dumps({"source": url}, ensure_ascii=False)
}
if existing_id:
update_data = {
"avatar_url": avatar_url,
"site_time": site_time,
}
if name:
update_data["name"] = name
if law_firm:
update_data["law_firm"] = law_firm
if province:
update_data["province"] = province
if city:
update_data["city"] = city
if phone:
update_data["phone"] = phone
update_data["params"] = json.dumps({"source": url}, ensure_ascii=False)
try:
self.db.update_data("lawyer", update_data, f"id={existing_id}")
print(" -- 已存在,已补全头像/时间")
except Exception as exc:
print(f" 更新失败: {exc}")
return None
# 若手机号已存在,则更新头像/时间,不再插入新记录
existing_phone = self.db.select_data(
"lawyer",
"id, avatar_url, url",
f"domain='{DOMAIN}' AND phone='{phone}'"
)
if existing_phone:
existing_row = existing_phone[0]
avatar = (existing_row.get("avatar_url") or "").strip()
if avatar:
print(" -- 已存在手机号且头像已补全,跳过")
return None
update_data = {
"avatar_url": avatar_url,
"site_time": site_time,
}
if name:
update_data["name"] = name
if law_firm:
update_data["law_firm"] = law_firm
if province:
update_data["province"] = province
if city:
update_data["city"] = city
if phone:
update_data["phone"] = phone
if not existing_row.get("url"):
update_data["url"] = contact_url
update_data["params"] = json.dumps({"source": url}, ensure_ascii=False)
try:
self.db.update_data("lawyer", update_data, f"id={existing_row.get('id')}")
print(" -- 已存在手机号,已补全头像/时间")
except Exception as exc:
print(f" 更新失败: {exc}")
return None
return data
def _extract_avatar_and_time(self, soup: BeautifulSoup) -> (str, Optional[int]):
avatar_url = ""
site_time = None
img_tag = soup.select_one(
"div.fixed-bottom-bar div.contact-lawye a.lr-photo img"
)
if img_tag:
src = (img_tag.get("src") or "").strip()
if src:
if src.startswith("//"):
avatar_url = f"https:{src}"
else:
avatar_url = src
match = re.search(r"/(20\d{2})(\d{2})/", avatar_url)
if match:
site_time = int(f"{match.group(1)}{match.group(2)}")
else:
match = re.search(r"(20\d{2})(\d{2})\d{2}", avatar_url)
if match:
site_time = int(f"{match.group(1)}{match.group(2)}")
return avatar_url, site_time
def _get_detail(self, url: str, max_retries: int = 3) -> Optional[str]:
for attempt in range(max_retries):
try:
with request_slot():
resp = self.session.get(url, timeout=15, verify=False)
status_code = resp.status_code
text = resp.text
resp.close()
if status_code == 403:
if attempt < max_retries - 1:
wait_time = 2 ** attempt + random.uniform(0.3, 1.0)
print(f" 403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})")
self._refresh_session()
time.sleep(wait_time)
continue
print(" 请求失败: 403 Forbidden")
return None
if status_code >= 400:
raise requests.exceptions.HTTPError(f"{status_code} Error")
return text
except requests.exceptions.RequestException as exc:
print(f" 请求失败: {exc}")
return None
return None
def run(self):
print("启动华律网采集...")
if not self.areas:
print("无城市数据")
return
for city_code, city_info in self.areas.items():
province_code = city_info.get("province_code")
if not province_code:
continue
province_name = city_info.get("province", "")
city_name = city_info.get("name", "")
print(f"采集 {province_name}-{city_name}")
page = 1
while True:
payload = {"pid": province_code, "cid": city_code, "page": str(page)}
data = self._post(payload)
if not data or not data.get("lawyerList"):
break
for item in data["lawyerList"]:
result = self._parse_detail(item.get("lawyerUrl", ""), province_name, city_name)
if not result:
continue
try:
self.db.insert_data("lawyer", result)
print(f" -> 新增: {result['name']} ({result['phone']})")
except Exception as exc:
print(f" 插入失败: {exc}")
time.sleep(1)
page_count = data.get("lawyerItems", {}).get("pageCount", page)
if page >= page_count:
break
page += 1
time.sleep(2)
time.sleep(1)
print("华律网采集完成")
if __name__ == "__main__":
with Db() as db:
spider = HualvSpider(db)
spider.run()