Files
lawyers/common_sites/six4365.py
T
hello-dd-code 38e7c284e8 feat: enhance project configuration and improve data export functionality
- Updated `.gitignore` to streamline ignored files and added logging for common sites.
- Expanded `config.py` with new configurations for Weixin and Redis, and improved database connection settings.
- Refined `README.md` to clarify project structure and usage instructions.
- Enhanced `requirements.txt` with additional dependencies for MongoDB and Redis support.
- Refactored multiple spider scripts to utilize a session-based approach for HTTP requests, improving error handling and proxy management.
- Updated `export_lawyers_excel.py` to include a default timestamp for data exports.
2026-03-18 10:02:25 +08:00

353 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import sys
import time
import random
from typing import Dict, Optional, List, Set
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
import requests
import urllib3
from bs4 import BeautifulSoup
from request.proxy_config import get_proxies, report_proxy_status
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from Db import Db
DOMAIN = "律图"
LIST_URL = "https://m.64365.com/findLawyer/rpc/FindLawyer/LawyerRecommend/"
class Six4365Spider:
def __init__(self, db_connection):
self.db = db_connection
self.session = self._build_session()
self.max_workers = int(os.getenv("SPIDER_WORKERS", "8"))
self._tls = threading.local()
self.cities = self._load_cities()
def _build_session(self) -> requests.Session:
report_proxy_status()
session = requests.Session()
session.trust_env = False
proxies = get_proxies()
if proxies:
session.proxies.update(proxies)
else:
session.proxies.clear()
session.headers.update({
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 "
"Mobile/15E148 Safari/604.1"
),
"Connection": "close",
})
return session
def _refresh_session(self) -> None:
try:
self.session.close()
except Exception:
pass
self.session = self._build_session()
def _get_thread_session(self) -> requests.Session:
"""requests.Session 不是严格线程安全:每个线程用独立 session(但共享同样代理/headers"""
s = getattr(self._tls, "session", None)
if s is not None:
return s
s = self._build_session()
s.headers.update(dict(self.session.headers))
self._tls.session = s
return s
def _refresh_thread_session(self) -> None:
s = getattr(self._tls, "session", None)
if s is not None:
try:
s.close()
except Exception:
pass
self._tls.session = None
def _existing_urls(self, urls: List[str]) -> Set[str]:
"""批量查重,减少 N 次 is_data_exist"""
if not urls:
return set()
existing: Set[str] = set()
cur = self.db.db.cursor()
try:
# IN 参数过多会失败,分批
chunk_size = 500
for i in range(0, len(urls), chunk_size):
chunk = urls[i:i + chunk_size]
placeholders = ",".join(["%s"] * len(chunk))
sql = f"SELECT url FROM lawyer WHERE url IN ({placeholders})"
cur.execute(sql, chunk)
for row in cur.fetchall():
# pymysql 默认返回 tuple
existing.add(row[0])
finally:
cur.close()
return existing
def _load_cities(self):
tables = ("area_new", "area2", "area")
last_error = None
for table in tables:
try:
provinces = self.db.select_data(
table,
"id, code, province",
"domain='64365' AND level=1"
) or []
cities = self.db.select_data(
table,
"code, city, province, pid",
"domain='64365' AND level=2"
) or []
except Exception as exc:
last_error = exc
continue
if not cities:
continue
province_map = {row.get('id'): row for row in provinces}
data = {}
for city in cities:
province_row = province_map.get(city.get('pid'), {}) or {}
data[str(city.get('code'))] = {
"name": city.get('city'),
"province": city.get('province'),
"province_name": province_row.get('province', city.get('province')),
}
print(f"[律图] 城市来源表: {table}, 城市数: {len(cities)}")
return data
if last_error:
print(f"[律图] 加载地区数据失败: {last_error}")
print("[律图] 无城市数据(已尝试 area_new/area2/area")
return {}
def _post(self, payload: Dict[str, str], max_retries: int = 3) -> Optional[str]:
for attempt in range(max_retries):
try:
resp = self.session.post(LIST_URL, data=payload, timeout=10, verify=False)
status_code = resp.status_code
text = resp.text
resp.close()
if status_code == 403:
if attempt < max_retries - 1:
wait_time = 2 ** attempt + random.uniform(0.3, 1.0)
print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})")
self._refresh_session()
time.sleep(wait_time)
continue
print("请求失败: 403 Forbidden")
return None
if status_code >= 400:
raise requests.exceptions.HTTPError(f"{status_code} Error")
return text
except requests.exceptions.RequestException as exc:
print(f"请求失败: {exc}")
return None
return None
def _build_payload(self, city_code: str, page: int) -> Dict[str, str]:
return {
"AdCode": "",
"RegionId": str(city_code),
"CategoryId": "",
"MaxNumber": "",
"OnlyData": "true",
"IgnoreButton": "",
"LawyerRecommendRequest[AreaId]": str(city_code),
"LawyerRecommendRequest[LawCategoryIds]": "",
"LawyerRecommendRequest[LawFirmPersonCount]": "",
"LawyerRecommendRequest[LawFirmScale]": "",
"LawyerRecommendRequest[OrderType]": "0",
"LawyerRecommendRequest[PageIndex]": str(page),
"LawyerRecommendRequest[PageSize]": "10",
"LawyerRecommendRequest[TagId]": "",
"LawyerRecommendRequest[Type]": "1",
"LawyerRecommendRequest[AccountType]": "",
"LawyerRecommendRequest[AddLawyer]": "true",
"LawyerRecommendRequest[Content]": "",
"LawyerRecommendRequest[Duty]": "",
"LawyerRecommendRequest[ExcludeLawyerIds][]": "",
"LawyerRecommendRequest[RefferUrl]": "",
"LawyerRecommendRequest[RequestUrl]": "https://m.64365.com/findlawyer/",
"LawyerRecommendRequest[resource_type_name]": "",
"LawyerRecommendRequest[UserAgent]": self.session.headers["User-Agent"],
"LawyerRecommendRequest[AddLawyerWithNoData]": "false",
"ShowCaseButton": "true",
}
def _parse_list(self, html: str, province: str, city: str) -> int:
soup = BeautifulSoup(html, "html.parser")
lawyers = soup.find_all("a", class_="lawyer")
if not lawyers:
return 0
detail_urls: List[str] = []
for lawyer in lawyers:
href = lawyer.get("href")
if not href:
continue
detail_urls.append(f"{href.rstrip('/')}/info/")
if not detail_urls:
return 0
results: List[Dict[str, str]] = []
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
futs = [ex.submit(self._parse_detail, u, province, city) for u in detail_urls]
for fut in as_completed(futs):
try:
data = fut.result()
except Exception as exc:
print(f" 详情解析异常: {exc}")
continue
if data:
results.append(data)
if not results:
return len(detail_urls)
existing = self._existing_urls([r.get("url", "") for r in results if r.get("url")])
for data in results:
if not data:
continue
url = data.get("url", "")
if not url:
continue
if url in existing:
print(f" -- 已存在URL: {url}")
continue
try:
self.db.insert_data("lawyer", data)
print(f" -> 新增: {data['name']} ({data['phone']})")
except Exception as exc:
print(f" 插入失败 {url}: {exc}")
return len(detail_urls)
def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]:
html = self._get_detail(url)
if not html:
return None
soup = BeautifulSoup(html, "html.parser")
base_info = soup.find("ul", class_="intro-basic-bar")
if not base_info:
return None
name = ""
law_firm = ""
phone = ""
for li in base_info.find_all("li"):
label = li.find("span", class_="label")
txt = li.find("div", class_="txt")
if not label or not txt:
continue
label_text = label.get_text(strip=True)
if "姓名" in label_text:
name = txt.get_text(strip=True)
if "执业律所" in label_text:
law_firm = txt.get_text(strip=True)
more_section = soup.find("div", class_="more-intro-basic")
if more_section:
phone_ul = more_section.find("ul", class_="intro-basic-bar")
if phone_ul:
for li in phone_ul.find_all("li"):
label = li.find("span", class_="label")
txt = li.find("div", class_="txt")
if label and txt and "联系电话" in label.get_text(strip=True):
phone = txt.get_text(strip=True).replace(" ", "")
break
phone = phone.replace('-', '').strip()
if not name or not phone:
return None
data = {
"phone": phone,
"province": province,
"city": city,
"law_firm": law_firm,
"url": url,
"domain": DOMAIN,
"name": name,
"create_time": int(time.time()),
"params": json.dumps({"province": province, "city": city}, ensure_ascii=False)
}
return data
def _get_detail(self, url: str, max_retries: int = 3) -> Optional[str]:
session = self._get_thread_session()
for attempt in range(max_retries):
try:
resp = session.get(url, timeout=10, verify=False)
status_code = resp.status_code
text = resp.text
resp.close()
if status_code == 403:
if attempt < max_retries - 1:
wait_time = 2 ** attempt + random.uniform(0.3, 1.0)
print(f" 403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})")
self._refresh_thread_session()
session = self._get_thread_session()
time.sleep(wait_time)
continue
print(" 请求失败: 403 Forbidden")
return None
if status_code >= 400:
raise requests.exceptions.HTTPError(f"{status_code} Error")
return text
except requests.exceptions.RequestException as exc:
print(f" 请求失败: {exc}")
return None
return None
def run(self):
print("启动律图采集...")
if not self.cities:
print("无城市数据")
return
for city_code, info in self.cities.items():
province = info.get("province_name", "")
city = info.get("name", "")
print(f"采集 {province}-{city}")
page = 1
while True:
payload = self._build_payload(city_code, page)
html = self._post(payload)
if not html:
break
link_count = self._parse_list(html, province, city)
if link_count == 0:
break
page += 1
print("律图采集完成")
if __name__ == "__main__":
with Db() as db:
spider = Six4365Spider(db)
spider.run()