From 03847a4b8e2ffbb0437ae3535c31c0cd9ef5f837 Mon Sep 17 00:00:00 2001 From: hello-dd-code Date: Mon, 2 Mar 2026 00:19:48 +0800 Subject: [PATCH] chore: initialize lawyers crawler project --- .gitignore | 31 ++++ Db.py | 58 +++++++ README.md | 20 +++ common_sites/dls.py | 268 +++++++++++++++++++++++++++++ common_sites/findlaw.py | 209 +++++++++++++++++++++++ common_sites/hualv.py | 325 +++++++++++++++++++++++++++++++++++ common_sites/lawtime.py | 278 ++++++++++++++++++++++++++++++ common_sites/six4365.py | 332 ++++++++++++++++++++++++++++++++++++ common_sites/start.sh | 13 ++ config.py | 22 +++ request/__init__.py | 19 +++ request/proxy_config.py | 97 +++++++++++ request/proxy_settings.json | 7 + request/requests_client.py | 168 ++++++++++++++++++ requirements.txt | 5 + utils/__init__.py | 0 utils/rate_limiter.py | 76 +++++++++ 17 files changed, 1928 insertions(+) create mode 100644 .gitignore create mode 100644 Db.py create mode 100644 README.md create mode 100644 common_sites/dls.py create mode 100644 common_sites/findlaw.py create mode 100644 common_sites/hualv.py create mode 100644 common_sites/lawtime.py create mode 100644 common_sites/six4365.py create mode 100755 common_sites/start.sh create mode 100644 config.py create mode 100644 request/__init__.py create mode 100644 request/proxy_config.py create mode 100644 request/proxy_settings.json create mode 100644 request/requests_client.py create mode 100644 requirements.txt create mode 100644 utils/__init__.py create mode 100644 utils/rate_limiter.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3ef765e --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class + +# Build / packaging +build/ +dist/ +*.egg-info/ +.eggs/ + +# Virtual environments +.venv/ +venv/ +env/ + +# Test / type caches +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store +Thumbs.db + +# Local runtime files +*.log diff --git a/Db.py b/Db.py new file mode 100644 index 0000000..efdd079 --- /dev/null +++ b/Db.py @@ -0,0 +1,58 @@ +import pymysql +from config import DB_CONFIG + +class Db: + def __enter__(self): + # 使用配置文件中的信息 + self.db = pymysql.connect(**DB_CONFIG) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # 关闭数据库连接 + self.db.close() + + # 插入数据 + def insert_data(self, table_name, data): + cursor = self.db.cursor() + sql = f"INSERT INTO {table_name} ({', '.join(data.keys())}) VALUES ({', '.join(['%s'] * len(data))})" + cursor.execute(sql, list(data.values())) + self.db.commit() + inserted_id = cursor.lastrowid # 获取插入行的 ID + cursor.close() + return inserted_id + + # 查询数据 + def select_data(self, table_name, columns="*", condition=None): + cursor = self.db.cursor(pymysql.cursors.DictCursor) + sql = f"SELECT {columns} FROM {table_name}" + if condition: + sql += f" WHERE {condition}" + cursor.execute(sql) + result = cursor.fetchall() + cursor.close() + return result + + # 删除数据 + def delete_data(self, table_name, condition): + cursor = self.db.cursor() + sql = f"DELETE FROM {table_name} WHERE {condition}" + cursor.execute(sql) + self.db.commit() + cursor.close() + + # 更新数据 + def update_data(self, table_name, data, condition): + cursor = self.db.cursor() + set_clause = ", ".join([f"{key} = %s" for key in data.keys()]) + sql = f"UPDATE {table_name} SET {set_clause} WHERE {condition}" + cursor.execute(sql, list(data.values())) + self.db.commit() + cursor.close() + # 判断数据是否存在 + def is_data_exist(self, table_name, condition): + cursor = self.db.cursor() + sql = f"SELECT COUNT(*) FROM {table_name} WHERE {condition}" + cursor.execute(sql) + result = cursor.fetchone() + cursor.close() + return result[0] > 0 \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..8a61ed0 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +# lawyers + +`common_sites` 独立采集项目。 + +## 目录 + +- `common_sites/`:大律师、找法网、法律快车、律图、华律 5 个采集脚本 +- `request/proxy_config.py`:代理配置加载逻辑 +- `request/proxy_settings.json`:代理配置文件 +- `Db.py`:数据库连接与基础操作 +- `config.py`:数据库与请求头配置 + +## 运行 + +```bash +cd /www/wwwroot/lawyers +python3 -m pip install -r requirements.txt +cd common_sites +./start.sh +``` diff --git a/common_sites/dls.py b/common_sites/dls.py new file mode 100644 index 0000000..9e628a3 --- /dev/null +++ b/common_sites/dls.py @@ -0,0 +1,268 @@ +import json +import os +import sys +import time +import random +from typing import Dict, Optional + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +import urllib3 +from bs4 import BeautifulSoup +from request.requests_client import ( + RequestClientError, + RequestConnectTimeout, + RequestConnectionError, + RequestTimeout, + RequestsClient, +) + +# 禁用 SSL 警告 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +from Db import Db +from utils.rate_limiter import wait_for_request + +DOMAIN = "大律师" +LIST_TEMPLATE = "https://m.maxlaw.cn/law/{pinyin}?page={page}" +_PROXY_TESTED = False + + +class DlsSpider: + def __init__(self, db_connection): + self.db = db_connection + self.client = self._build_session() + self.areas = self._load_areas() + + def _build_session(self) -> RequestsClient: + """构建带重试机制的 session""" + client = RequestsClient( + headers={ + "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1", + "Host": "m.maxlaw.cn", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Connection": "close", + }, + retry_total=3, # 总共重试3次 + retry_backoff_factor=1, # 重试间隔:1s, 2s, 4s + retry_status_forcelist=(429, 500, 502, 503, 504), # 对这些状态码进行重试 + retry_allowed_methods=("GET", "POST"), + ) + self._proxy_test(client, client.proxies or None) + return client + + def _refresh_session(self) -> None: + self.client.refresh() + self._proxy_test(self.client, self.client.proxies or None) + + def _proxy_test(self, client: RequestsClient, proxies: Optional[Dict[str, str]]) -> None: + global _PROXY_TESTED + if _PROXY_TESTED or not os.getenv("PROXY_TEST"): + return + _PROXY_TESTED = True + if not proxies: + print("[proxy] test skipped: no proxy configured") + return + test_url = os.getenv("PROXY_TEST_URL", "https://dev.kdlapi.com/testproxy") + timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10")) + try: + resp = client.get_text( + test_url, + timeout=timeout, + headers={"Connection": "close"}, + ) + print(f"[proxy] test {resp.status_code}: {resp.text.strip()[:200]}") + except Exception as exc: + print(f"[proxy] test failed: {exc}") + + def _load_areas(self): + try: + return self.db.select_data( + "area_new", + "province, city, pinyin", + "domain='maxlaw'" + ) or [] + except Exception as exc: + print(f"加载地区失败: {exc}") + return [] + + def _get(self, url: str, max_retries: int = 3, headers: Optional[Dict[str, str]] = None) -> Optional[str]: + """发送 GET 请求,带重试机制""" + wait_for_request() + + for attempt in range(max_retries): + try: + # 使用更长的超时时间,分别设置连接和读取超时 + resp = self.client.get_text( + url, + timeout=(10, 30), # (connect_timeout, read_timeout) + verify=False, + headers=headers, + ) + status_code = resp.status_code + content = resp.text + if status_code == 403: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + random.uniform(0.3, 1.0) + print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + self._refresh_session() + time.sleep(wait_time) + continue + print(f"请求失败 {url}: 403 Forbidden") + return None + if status_code >= 400: + raise RequestClientError(f"{status_code} Error: {url}") + return content + except RequestConnectTimeout as exc: + if attempt < max_retries - 1: + wait_time = 2 ** attempt # 指数退避:2s, 4s, 8s + print(f"连接超时,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + time.sleep(wait_time) + else: + print(f"连接超时,已达到最大重试次数 {url}: {exc}") + return None + except RequestTimeout as exc: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + print(f"请求超时,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + time.sleep(wait_time) + else: + print(f"请求超时,已达到最大重试次数 {url}: {exc}") + return None + except RequestConnectionError as exc: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + print(f"连接错误,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + time.sleep(wait_time) + else: + print(f"连接错误,已达到最大重试次数 {url}: {exc}") + return None + except RequestClientError as exc: + print(f"请求失败 {url}: {exc}") + return None + + return None + + def _parse_list(self, html: str, province: str, city: str, list_url: str) -> int: + soup = BeautifulSoup(html, "html.parser") + cards = soup.find_all("div", class_="lstx") + if not cards: + return 0 + + inserted = 0 + for card in cards: + link = card.find("a") + if not link or not link.get("href"): + continue + detail = self._parse_detail(link['href'], province, city, list_url) + if not detail: + continue + phone = detail.get("phone") + if not phone: + continue + condition = f"phone='{phone}' and domain='{DOMAIN}'" + if self.db.is_data_exist("lawyer", condition): + print(f" -- 已存在: {detail['name']} ({phone})") + time.sleep(0.3) + continue + try: + self.db.insert_data("lawyer", detail) + inserted += 1 + print(f" -> 新增: {detail['name']} ({phone})") + except Exception as exc: + print(f" 插入失败: {exc}") + time.sleep(1) + time.sleep(0.3) + # 列表页结束后再缓一缓,降低风控 + time.sleep(0.6) + return inserted + + def _detail_headers(self, referer: str) -> Dict[str, str]: + return { + "Referer": referer, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Upgrade-Insecure-Requests": "1", + } + + def _parse_detail(self, path: str, province: str, city: str, list_url: str) -> Optional[Dict[str, str]]: + url = f"https://m.maxlaw.cn{path}" + print(f" 详情: {url}") + html = self._get(url, headers=self._detail_headers(list_url)) + if not html: + return None + + soup = BeautifulSoup(html, "html.parser") + name_tag = soup.find("h2", class_="lawyerName") + law_firm_tag = soup.find("p", class_="law-firm") + contact_list = soup.find("ul", class_="contact-content") + + name = name_tag.get_text(strip=True) if name_tag else "" + law_firm = law_firm_tag.get_text(strip=True) if law_firm_tag else "" + phone = "" + + if contact_list: + items = contact_list.find_all("li") + if len(items) > 2: + phone_tag = items[2].find("p") + if phone_tag: + phone = phone_tag.get_text(strip=True) + phone = phone.split("咨询请说明来自大律师网")[0].strip() + + phone = phone.replace('-', '').strip() + if not name or not phone: + print(" 信息不完整,跳过") + return None + + safe_city = city if city else province + return { + "name": name, + "law_firm": law_firm, + "province": province, + "city": safe_city, + "phone": phone, + "url": url, + "domain": DOMAIN, + "create_time": int(time.time()), + "params": json.dumps({"province": province, "city": safe_city}, ensure_ascii=False) + } + + def run(self): + print("启动大律师采集...") + if not self.areas: + print("无地区数据") + return + + for area in self.areas: + pinyin = area.get("pinyin") + province = area.get("province", "") + city = area.get("city", "") + if not pinyin: + continue + page = 1 + while True: + list_url = LIST_TEMPLATE.format(pinyin=pinyin, page=page) + print(f"采集 {province}-{city} 第 {page} 页: {list_url}") + html = self._get(list_url) + if not html: + break + inserted = self._parse_list(html, province, city, list_url) + if inserted == 0: + break + page += 1 + print("大律师采集完成") + + +if __name__ == "__main__": + with Db() as db: + spider = DlsSpider(db) + spider.run() diff --git a/common_sites/findlaw.py b/common_sites/findlaw.py new file mode 100644 index 0000000..50947cf --- /dev/null +++ b/common_sites/findlaw.py @@ -0,0 +1,209 @@ +import json +import os +import sys +import time +import random +from typing import Dict, List, Set, Optional + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +from request.requests_client import RequestClientError, RequestSSLError, RequestsClient +from Db import Db + +DOMAIN = "找法网" +LIST_TEMPLATE = "https://m.findlaw.cn/{pinyin}/q_lawyer/p{page}?ajax=1&order=0&sex=-1" + + +class FindlawSpider: + def __init__(self, db_connection): + self.db = db_connection + self.client = self._build_session() + self.cities = self._load_cities() + + def _build_session(self) -> RequestsClient: + return RequestsClient(headers={ + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), + "Accept": "application/json, text/javascript, */*; q=0.01", + "X-Requested-With": "XMLHttpRequest", + "Connection": "close", + }) + + def _refresh_session(self) -> None: + self.client.refresh() + + def _get(self, url: str, referer: str, verify: bool = True, max_retries: int = 3) -> Optional[str]: + headers = {"Referer": referer} + for attempt in range(max_retries): + try: + resp = self.client.get_text(url, timeout=15, verify=verify, headers=headers) + status_code = resp.status_code + text = resp.text + if status_code == 403: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + random.uniform(0.3, 1.0) + print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries}): {url}") + self._refresh_session() + time.sleep(wait_time) + continue + print(f"请求失败 {url}: 403 Forbidden") + return None + if status_code >= 400: + raise RequestClientError(f"{status_code} Error: {url}") + return text + except RequestSSLError: + if verify: + return self._get(url, referer, verify=False, max_retries=max_retries) + print(f"SSL错误 {url}") + return None + except RequestClientError as exc: + print(f"请求失败 {url}: {exc}") + return None + return None + + def _existing_phones(self, phones: List[str]) -> Set[str]: + if not phones: + return set() + existing: Set[str] = set() + cur = self.db.db.cursor() + try: + chunk_size = 500 + for i in range(0, len(phones), chunk_size): + chunk = phones[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cur.execute(sql, [DOMAIN, *chunk]) + for row in cur.fetchall(): + existing.add(row[0]) + finally: + cur.close() + return existing + + def _load_cities(self): + condition = "domain='findlaw' AND level=2" + tables = ("area_new", "area2", "area") + last_error = None + for table in tables: + try: + rows = self.db.select_data(table, "city, province, pinyin", condition) or [] + except Exception as exc: + last_error = exc + continue + if rows: + missing_pinyin = sum(1 for r in rows if not (r.get("pinyin") or "").strip()) + print(f"[找法网] 城市来源表: {table}, 城市数: {len(rows)}, 缺少pinyin: {missing_pinyin}") + return rows + + if last_error: + print(f"[找法网] 加载地区数据失败: {last_error}") + print("[找法网] 无城市数据(已尝试 area_new/area2/area)") + for table in tables: + try: + cnt = self.db.select_data(table, "COUNT(*) AS cnt", condition) + c = (cnt[0].get("cnt") if cnt else 0) if isinstance(cnt, (list, tuple)) else 0 + print(f"[找法网] 校验: {table} 满足条件记录数: {c}") + except Exception: + pass + return [] + + def _fetch_page(self, url: str, referer: str) -> List[Dict]: + text = self._get(url, referer, verify=True) + if not text: + return [] + + try: + # 某些返回体前会携带 BOM 或包装脚本,此处做兼容 + text = text.strip().lstrip("\ufeff") + try: + data = json.loads(text) + except ValueError: + json_start = text.find('{') + json_end = text.rfind('}') + if json_start == -1 or json_end == -1: + print(f"解析JSON失败 {url}: 返回内容开头: {text[:80]!r}") + return [] + cleaned = text[json_start:json_end + 1] + data = json.loads(cleaned) + if isinstance(data, str): + try: + data = json.loads(data) + except ValueError: + print(f"解析JSON失败 {url}: 二次解析仍为字符串,开头: {str(data)[:80]!r}") + return [] + except ValueError as exc: + print(f"解析JSON失败 {url}: {exc}") + return [] + + items = data.get("data", {}).get("lawyer_list", []) + parsed = [] + for item in items: + phone = (item.get("mobile") or "").replace("-", "") + parsed.append({ + "name": item.get("username", ""), + "law_firm": item.get("lawyer_lawroom", ""), + "province": item.get("areaInfo", {}).get("province", ""), + "city": item.get("areaInfo", {}).get("city", ""), + "phone": phone, + "url": url, + "domain": DOMAIN, + "create_time": int(time.time()), + "params": json.dumps(item, ensure_ascii=False) + }) + return parsed + + def run(self): + print("启动找法网采集...") + if not self.cities: + print("无城市数据") + return + + for city in self.cities: + pinyin = city.get("pinyin") + province = city.get("province", "") + city_name = city.get("city", "") + if not pinyin: + continue + print(f"采集 {province}-{city_name}") + page = 1 + while True: + url = LIST_TEMPLATE.format(pinyin=pinyin, page=page) + referer = f"https://m.findlaw.cn/{pinyin}/q_lawyer/" + print(f" 第 {page} 页: {url}") + items = self._fetch_page(url, referer) + if not items: + break + + phones = [it.get("phone") for it in items if (it.get("phone") or "").strip()] + existing = self._existing_phones(phones) + + for entry in items: + phone = entry.get("phone") + if not phone: + continue + if phone in existing: + print(f" -- 已存在: {entry['name']} ({phone})") + continue + try: + self.db.insert_data("lawyer", entry) + print(f" -> 新增: {entry['name']} ({phone})") + except Exception as exc: + print(f" 插入失败: {exc}") + + page += 1 + + print("找法网采集完成") + + +if __name__ == "__main__": + with Db() as db: + spider = FindlawSpider(db) + spider.run() diff --git a/common_sites/hualv.py b/common_sites/hualv.py new file mode 100644 index 0000000..006063a --- /dev/null +++ b/common_sites/hualv.py @@ -0,0 +1,325 @@ +import json +import os +import re +import sys +import time +import random +from typing import Dict, Optional + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +from bs4 import BeautifulSoup +from request.requests_client import RequestClientError, RequestsClient + +from Db import Db +from config import HEADERS + +LIST_URL = "https://m.66law.cn/findlawyer/rpc/loadlawyerlist/" +DOMAIN = "华律" + + +class HualvSpider: + def __init__(self, db_connection): + self.db = db_connection + self.client = self._build_session() + self.areas = self._load_areas() + + def _build_session(self) -> RequestsClient: + custom_headers = HEADERS.copy() + custom_headers['User-Agent'] = ( + 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) ' + 'AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 ' + 'Mobile/15E148 Safari/604.1' + ) + custom_headers["Connection"] = "close" + return RequestsClient(headers=custom_headers) + + def _refresh_session(self) -> None: + self.client.refresh() + + def _load_areas(self): + tables = ("area_new", "area2", "area") + last_error = None + for table in tables: + try: + provinces = self.db.select_data( + table, + "code, province, pinyin, id", + "domain='66law' AND level=1" + ) or [] + cities = self.db.select_data( + table, + "code, city, province, pid", + "domain='66law' AND level=2" + ) or [] + except Exception as exc: + last_error = exc + continue + + if not cities: + continue + + province_map = {p.get('id'): {"code": p.get('code'), "name": p.get('province')} for p in provinces} + city_map = {} + for city in cities: + province_info = province_map.get(city.get('pid'), {}) or {} + province_code = province_info.get('code') + city_map[city.get('code')] = { + "name": city.get('city'), + "province": city.get('province'), + "province_code": province_code, + } + print(f"[华律] 城市来源表: {table}, 城市数: {len(cities)}") + return city_map + + if last_error: + print(f"[华律] 加载地区数据失败: {last_error}") + print("[华律] 无城市数据(已尝试 area_new/area2/area)") + return {} + + def _post(self, data: Dict[str, str], max_retries: int = 3) -> Optional[Dict]: + for attempt in range(max_retries): + try: + resp = self.client.post_text(LIST_URL, data=data, timeout=20, verify=False) + status_code = resp.status_code + text = resp.text + if status_code == 403: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + random.uniform(0.3, 1.0) + print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") + self._refresh_session() + time.sleep(wait_time) + continue + print("请求失败: 403 Forbidden") + return None + if status_code >= 400: + raise RequestClientError(f"{status_code} Error") + try: + return json.loads(text) + except ValueError as exc: + print(f"解析JSON失败: {exc}") + return None + except RequestClientError as exc: + print(f"请求失败: {exc}") + return None + return None + + def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]: + contact_url = f"{url}lawyer_contact.aspx" + print(f" 详情: {contact_url}") + existing = self.db.select_data( + "lawyer", + "id, avatar_url", + f"domain='{DOMAIN}' AND url='{contact_url}'" + ) + existing_id = None + if existing: + existing_id = existing[0].get("id") + avatar = (existing[0].get("avatar_url") or "").strip() + if avatar: + print(" -- 已存在且头像已补全,跳过") + return None + + html = self._get_detail(contact_url) + if not html: + return None + + soup = BeautifulSoup(html, "html.parser") + info_list = soup.find("ul", class_="information-list") + if not info_list: + return None + + phone = "" + law_firm = "" + for li in info_list.find_all("li"): + text = li.get_text(strip=True) + if "手机号" in text: + cleaned = text.replace("手机号", "").replace("(咨询请说明来自 华律网)", "").strip() + match = re.search(r"1\d{10}", cleaned.replace('-', '').replace(' ', '')) + if match: + phone = match.group(0) + if "执业单位" in text: + law_firm = text.replace("执业单位", "").strip() + + name = "" + breadcrumb = soup.find("div", class_="weizhi") + if breadcrumb: + links = breadcrumb.find_all("a") + if len(links) > 2: + name = links[2].get_text(strip=True) + + phone = phone.replace('-', '').strip() + if not phone or not re.fullmatch(r"1\d{10}", phone): + print(" 无手机号,跳过") + return None + + avatar_url, site_time = self._extract_avatar_and_time(soup) + data = { + "phone": phone, + "province": province, + "city": city, + "law_firm": law_firm, + "url": contact_url, + "avatar_url": avatar_url, + "create_time": int(time.time()), + "site_time": site_time, + "domain": DOMAIN, + "name": name, + "params": json.dumps({"source": url}, ensure_ascii=False) + } + if existing_id: + update_data = { + "avatar_url": avatar_url, + "site_time": site_time, + } + if name: + update_data["name"] = name + if law_firm: + update_data["law_firm"] = law_firm + if province: + update_data["province"] = province + if city: + update_data["city"] = city + if phone: + update_data["phone"] = phone + update_data["params"] = json.dumps({"source": url}, ensure_ascii=False) + try: + self.db.update_data("lawyer", update_data, f"id={existing_id}") + print(" -- 已存在,已补全头像/时间") + except Exception as exc: + print(f" 更新失败: {exc}") + return None + # 若手机号已存在,则更新头像/时间,不再插入新记录 + existing_phone = self.db.select_data( + "lawyer", + "id, avatar_url, url", + f"domain='{DOMAIN}' AND phone='{phone}'" + ) + if existing_phone: + existing_row = existing_phone[0] + avatar = (existing_row.get("avatar_url") or "").strip() + if avatar: + print(" -- 已存在手机号且头像已补全,跳过") + return None + update_data = { + "avatar_url": avatar_url, + "site_time": site_time, + } + if name: + update_data["name"] = name + if law_firm: + update_data["law_firm"] = law_firm + if province: + update_data["province"] = province + if city: + update_data["city"] = city + if phone: + update_data["phone"] = phone + if not existing_row.get("url"): + update_data["url"] = contact_url + update_data["params"] = json.dumps({"source": url}, ensure_ascii=False) + try: + self.db.update_data("lawyer", update_data, f"id={existing_row.get('id')}") + print(" -- 已存在手机号,已补全头像/时间") + except Exception as exc: + print(f" 更新失败: {exc}") + return None + return data + + def _extract_avatar_and_time(self, soup: BeautifulSoup) -> (str, Optional[int]): + avatar_url = "" + site_time = None + img_tag = soup.select_one( + "div.fixed-bottom-bar div.contact-lawye a.lr-photo img" + ) + if img_tag: + src = (img_tag.get("src") or "").strip() + if src: + if src.startswith("//"): + avatar_url = f"https:{src}" + else: + avatar_url = src + match = re.search(r"/(20\d{2})(\d{2})/", avatar_url) + if match: + site_time = int(f"{match.group(1)}{match.group(2)}") + else: + match = re.search(r"(20\d{2})(\d{2})\d{2}", avatar_url) + if match: + site_time = int(f"{match.group(1)}{match.group(2)}") + return avatar_url, site_time + + def _get_detail(self, url: str, max_retries: int = 3) -> Optional[str]: + for attempt in range(max_retries): + try: + resp = self.client.get_text(url, timeout=15, verify=False) + status_code = resp.status_code + text = resp.text + if status_code == 403: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + random.uniform(0.3, 1.0) + print(f" 403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") + self._refresh_session() + time.sleep(wait_time) + continue + print(" 请求失败: 403 Forbidden") + return None + if status_code >= 400: + raise RequestClientError(f"{status_code} Error") + return text + except RequestClientError as exc: + print(f" 请求失败: {exc}") + return None + return None + + def run(self): + print("启动华律网采集...") + if not self.areas: + print("无城市数据") + return + + for city_code, city_info in self.areas.items(): + province_code = city_info.get("province_code") + if not province_code: + continue + province_name = city_info.get("province", "") + city_name = city_info.get("name", "") + print(f"采集 {province_name}-{city_name}") + + page = 1 + while True: + payload = {"pid": province_code, "cid": city_code, "page": str(page)} + data = self._post(payload) + if not data or not data.get("lawyerList"): + break + + for item in data["lawyerList"]: + result = self._parse_detail(item.get("lawyerUrl", ""), province_name, city_name) + if not result: + continue + try: + self.db.insert_data("lawyer", result) + print(f" -> 新增: {result['name']} ({result['phone']})") + except Exception as exc: + print(f" 插入失败: {exc}") + time.sleep(1) + + page_count = data.get("lawyerItems", {}).get("pageCount", page) + if page >= page_count: + break + page += 1 + time.sleep(2) + + time.sleep(1) + print("华律网采集完成") + + +if __name__ == "__main__": + with Db() as db: + spider = HualvSpider(db) + spider.run() diff --git a/common_sites/lawtime.py b/common_sites/lawtime.py new file mode 100644 index 0000000..6f6d462 --- /dev/null +++ b/common_sites/lawtime.py @@ -0,0 +1,278 @@ +import json +import os +import re +import sys +import time +import random +from typing import Dict, Optional, List, Set +from urllib.parse import urljoin +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +import urllib3 +from bs4 import BeautifulSoup +from request.requests_client import RequestClientError, RequestsClient + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +from Db import Db +from config import LAWTIME_CONFIG + +LIST_BASE = "https://m.lawtime.cn/{pinyin}/lawyer/?page={page}" +DETAIL_BASE = "https://m.lawtime.cn" +DOMAIN = "法律快车" + + +class LawtimeSpider: + def __init__(self, db_connection): + self.db = db_connection + self.client = self._build_session() + self.max_workers = int(os.getenv("SPIDER_WORKERS", "8")) + self._tls = threading.local() + + def _build_session(self) -> RequestsClient: + headers = LAWTIME_CONFIG.get("HEADERS", {}) + custom_headers = dict(headers) if headers else {} + custom_headers.setdefault("Connection", "close") + return RequestsClient(headers=custom_headers) + + def _refresh_session(self) -> None: + self.client.refresh() + + def _get_thread_session(self) -> RequestsClient: + s = getattr(self._tls, "session", None) + if s is not None: + return s + s = self.client.clone() + self._tls.session = s + return s + + def _refresh_thread_session(self) -> None: + s = getattr(self._tls, "session", None) + if s is not None: + s.close() + self._tls.session = None + + def _existing_phones(self, phones: List[str]) -> Set[str]: + if not phones: + return set() + existing: Set[str] = set() + cur = self.db.db.cursor() + try: + chunk_size = 500 + for i in range(0, len(phones), chunk_size): + chunk = phones[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cur.execute(sql, [DOMAIN, *chunk]) + for row in cur.fetchall(): + existing.add(row[0]) + finally: + cur.close() + return existing + + def _load_areas(self): + condition = "level = 2 and domain='法律快车'" + tables = ("area_new", "area", "area2") + last_error = None + for table in tables: + try: + rows = self.db.select_data(table, "pinyin, province, city", condition) or [] + except Exception as exc: + last_error = exc + continue + if rows: + missing_pinyin = sum(1 for r in rows if not (r.get("pinyin") or "").strip()) + print(f"[法律快车] 城市来源表: {table}, 城市数: {len(rows)}, 缺少pinyin: {missing_pinyin}") + return rows + + if last_error: + print(f"[法律快车] 加载地区数据失败: {last_error}") + print("[法律快车] 无城市数据(已尝试 area_new/area/area2)") + return [] + + def _get(self, url: str, max_retries: int = 3) -> Optional[str]: + return self._get_with_session(self.client, url, max_retries=max_retries, is_thread=False) + + def _get_with_session(self, session: RequestsClient, url: str, max_retries: int = 3, is_thread: bool = False) -> Optional[str]: + for attempt in range(max_retries): + try: + resp = session.get_text(url, timeout=15, verify=False) + status_code = resp.status_code + text = resp.text + if status_code == 403: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + random.uniform(0.3, 1.0) + print(f"请求失败 {url}: 403,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") + if is_thread: + self._refresh_thread_session() + session = self._get_thread_session() + else: + self._refresh_session() + session = self.client + time.sleep(wait_time) + continue + print(f"请求失败 {url}: 403 Forbidden") + return None + if status_code >= 400: + raise RequestClientError(f"{status_code} Error: {url}") + return text + except RequestClientError as exc: + print(f"请求失败 {url}: {exc}") + return None + return None + + def _parse_list(self, html: str, province: str, city: str) -> int: + soup = BeautifulSoup(html, "html.parser") + links = [a.get("href", "") for a in soup.select("a.hide_link")] + links = [link.replace("lll", "int") for link in links if link] + if not links: + return 0 + + detail_urls = [urljoin(DETAIL_BASE, link) for link in links] + + results: List[Dict[str, str]] = [] + with ThreadPoolExecutor(max_workers=self.max_workers) as ex: + futs = [ex.submit(self._parse_detail, u, province, city) for u in detail_urls] + for fut in as_completed(futs): + try: + data = fut.result() + except Exception as exc: + print(f" 详情解析异常: {exc}") + continue + if data and data.get("phone"): + results.append(data) + + if not results: + return len(detail_urls) + + phones = [d["phone"] for d in results if d.get("phone")] + existing = self._existing_phones(phones) + + for data in results: + phone = data.get("phone") + if not phone: + continue + if phone in existing: + print(f" -- 已存在: {data['name']} ({phone})") + continue + try: + self.db.insert_data("lawyer", data) + print(f" -> 新增: {data['name']} ({phone})") + except Exception as exc: + print(f" 插入失败 {data.get('url')}: {exc}") + + return len(detail_urls) + + def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]: + html = None + sess = self._get_thread_session() + html = self._get_with_session(sess, url, max_retries=3, is_thread=True) + if not html: + return None + + soup = BeautifulSoup(html, "html.parser") + text = soup.get_text(" ") + + name = "" + title_tag = soup.find("title") + if title_tag: + match = re.search(r"(\S+)律师", title_tag.get_text()) + if match: + name = match.group(1) + if not name: + intl_div = soup.find("div", class_="intl") + if intl_div: + match = re.search(r"(\S+)律师", intl_div.get_text()) + if match: + name = match.group(1) + + phone = "" + phone_pattern = r"1[3-9]\d{9}" + for item in soup.select("div.item.flex"): + label = item.find("div", class_="label") + desc = item.find("div", class_="desc") + if not label or not desc: + continue + label_text = label.get_text() + desc_text = desc.get_text().replace("-", "") + if "联系电话" in label_text or "电话" in label_text: + matches = re.findall(phone_pattern, desc_text) + if matches: + phone = matches[0] + break + if not phone: + matches = re.findall(phone_pattern, text.replace("-", "")) + if matches: + phone = matches[0] + if not phone: + print(f" 无手机号: {url}") + return None + + law_firm = "" + for item in soup.select("div.item.flex"): + label = item.find("div", class_="label") + desc = item.find("div", class_="desc") + if not label or not desc: + continue + if "执业律所" in label.get_text() or "律所" in label.get_text(): + law_firm = desc.get_text(strip=True).replace("已认证", "") + break + + params = { + "list_url": url, + "province": province, + "city": city, + } + + return { + "name": name or "", + "law_firm": law_firm, + "province": province, + "city": city, + "phone": phone, + "url": url, + "domain": DOMAIN, + "create_time": int(time.time()), + "params": json.dumps(params, ensure_ascii=False) + } + + def run(self): + print("启动法律快车采集...") + areas = self._load_areas() + if not areas: + print("无地区数据") + return + + for area in areas: + pinyin = area.get("pinyin") + province = area.get("province", "") + city = area.get("city", "") + if not pinyin: + continue + page = 1 + while True: + list_url = LIST_BASE.format(pinyin=pinyin, page=page) + print(f"采集 {province}-{city} 第 {page} 页: {list_url}") + html = self._get(list_url) + if not html: + break + link_count = self._parse_list(html, province, city) + if link_count == 0: + break + page += 1 + print("法律快车采集完成") + + +if __name__ == "__main__": + with Db() as db: + spider = LawtimeSpider(db) + spider.run() diff --git a/common_sites/six4365.py b/common_sites/six4365.py new file mode 100644 index 0000000..255e380 --- /dev/null +++ b/common_sites/six4365.py @@ -0,0 +1,332 @@ +import json +import os +import sys +import time +import random +from typing import Dict, Optional, List, Set +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +import urllib3 +from bs4 import BeautifulSoup +from request.requests_client import RequestClientError, RequestsClient + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +from Db import Db + +DOMAIN = "律图" +LIST_URL = "https://m.64365.com/findLawyer/rpc/FindLawyer/LawyerRecommend/" + + +class Six4365Spider: + def __init__(self, db_connection): + self.db = db_connection + self.client = self._build_session() + self.max_workers = int(os.getenv("SPIDER_WORKERS", "8")) + self._tls = threading.local() + self.cities = self._load_cities() + + def _build_session(self) -> RequestsClient: + return RequestsClient(headers={ + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), + "Connection": "close", + }) + + def _refresh_session(self) -> None: + self.client.refresh() + + def _get_thread_session(self) -> RequestsClient: + """每个线程使用独立请求客户端(共享相同 headers/代理配置)。""" + s = getattr(self._tls, "session", None) + if s is not None: + return s + s = self.client.clone() + self._tls.session = s + return s + + def _refresh_thread_session(self) -> None: + s = getattr(self._tls, "session", None) + if s is not None: + s.close() + self._tls.session = None + + def _existing_urls(self, urls: List[str]) -> Set[str]: + """批量查重,减少 N 次 is_data_exist""" + if not urls: + return set() + existing: Set[str] = set() + cur = self.db.db.cursor() + try: + # IN 参数过多会失败,分批 + chunk_size = 500 + for i in range(0, len(urls), chunk_size): + chunk = urls[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT url FROM lawyer WHERE url IN ({placeholders})" + cur.execute(sql, chunk) + for row in cur.fetchall(): + # pymysql 默认返回 tuple + existing.add(row[0]) + finally: + cur.close() + return existing + + def _load_cities(self): + tables = ("area_new", "area2", "area") + last_error = None + for table in tables: + try: + provinces = self.db.select_data( + table, + "id, code, province", + "domain='64365' AND level=1" + ) or [] + cities = self.db.select_data( + table, + "code, city, province, pid", + "domain='64365' AND level=2" + ) or [] + except Exception as exc: + last_error = exc + continue + + if not cities: + continue + + province_map = {row.get('id'): row for row in provinces} + data = {} + for city in cities: + province_row = province_map.get(city.get('pid'), {}) or {} + data[str(city.get('code'))] = { + "name": city.get('city'), + "province": city.get('province'), + "province_name": province_row.get('province', city.get('province')), + } + print(f"[律图] 城市来源表: {table}, 城市数: {len(cities)}") + return data + + if last_error: + print(f"[律图] 加载地区数据失败: {last_error}") + print("[律图] 无城市数据(已尝试 area_new/area2/area)") + return {} + + def _post(self, payload: Dict[str, str], max_retries: int = 3) -> Optional[str]: + for attempt in range(max_retries): + try: + resp = self.client.post_text(LIST_URL, data=payload, timeout=10, verify=False) + status_code = resp.status_code + text = resp.text + if status_code == 403: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + random.uniform(0.3, 1.0) + print(f"403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") + self._refresh_session() + time.sleep(wait_time) + continue + print("请求失败: 403 Forbidden") + return None + if status_code >= 400: + raise RequestClientError(f"{status_code} Error") + return text + except RequestClientError as exc: + print(f"请求失败: {exc}") + return None + return None + + def _build_payload(self, city_code: str, page: int) -> Dict[str, str]: + return { + "AdCode": "", + "RegionId": str(city_code), + "CategoryId": "", + "MaxNumber": "", + "OnlyData": "true", + "IgnoreButton": "", + "LawyerRecommendRequest[AreaId]": str(city_code), + "LawyerRecommendRequest[LawCategoryIds]": "", + "LawyerRecommendRequest[LawFirmPersonCount]": "", + "LawyerRecommendRequest[LawFirmScale]": "", + "LawyerRecommendRequest[OrderType]": "0", + "LawyerRecommendRequest[PageIndex]": str(page), + "LawyerRecommendRequest[PageSize]": "10", + "LawyerRecommendRequest[TagId]": "", + "LawyerRecommendRequest[Type]": "1", + "LawyerRecommendRequest[AccountType]": "", + "LawyerRecommendRequest[AddLawyer]": "true", + "LawyerRecommendRequest[Content]": "", + "LawyerRecommendRequest[Duty]": "", + "LawyerRecommendRequest[ExcludeLawyerIds][]": "", + "LawyerRecommendRequest[RefferUrl]": "", + "LawyerRecommendRequest[RequestUrl]": "https://m.64365.com/findlawyer/", + "LawyerRecommendRequest[resource_type_name]": "", + "LawyerRecommendRequest[UserAgent]": self.client.headers["User-Agent"], + "LawyerRecommendRequest[AddLawyerWithNoData]": "false", + "ShowCaseButton": "true", + } + + def _parse_list(self, html: str, province: str, city: str) -> int: + soup = BeautifulSoup(html, "html.parser") + lawyers = soup.find_all("a", class_="lawyer") + if not lawyers: + return 0 + + detail_urls: List[str] = [] + for lawyer in lawyers: + href = lawyer.get("href") + if not href: + continue + detail_urls.append(f"{href.rstrip('/')}/info/") + + if not detail_urls: + return 0 + + results: List[Dict[str, str]] = [] + with ThreadPoolExecutor(max_workers=self.max_workers) as ex: + futs = [ex.submit(self._parse_detail, u, province, city) for u in detail_urls] + for fut in as_completed(futs): + try: + data = fut.result() + except Exception as exc: + print(f" 详情解析异常: {exc}") + continue + if data: + results.append(data) + + if not results: + return len(detail_urls) + + existing = self._existing_urls([r.get("url", "") for r in results if r.get("url")]) + for data in results: + if not data: + continue + url = data.get("url", "") + if not url: + continue + if url in existing: + print(f" -- 已存在URL: {url}") + continue + try: + self.db.insert_data("lawyer", data) + print(f" -> 新增: {data['name']} ({data['phone']})") + except Exception as exc: + print(f" 插入失败 {url}: {exc}") + + return len(detail_urls) + + def _parse_detail(self, url: str, province: str, city: str) -> Optional[Dict[str, str]]: + html = self._get_detail(url) + if not html: + return None + + soup = BeautifulSoup(html, "html.parser") + base_info = soup.find("ul", class_="intro-basic-bar") + if not base_info: + return None + + name = "" + law_firm = "" + phone = "" + + for li in base_info.find_all("li"): + label = li.find("span", class_="label") + txt = li.find("div", class_="txt") + if not label or not txt: + continue + label_text = label.get_text(strip=True) + if "姓名" in label_text: + name = txt.get_text(strip=True) + if "执业律所" in label_text: + law_firm = txt.get_text(strip=True) + + more_section = soup.find("div", class_="more-intro-basic") + if more_section: + phone_ul = more_section.find("ul", class_="intro-basic-bar") + if phone_ul: + for li in phone_ul.find_all("li"): + label = li.find("span", class_="label") + txt = li.find("div", class_="txt") + if label and txt and "联系电话" in label.get_text(strip=True): + phone = txt.get_text(strip=True).replace(" ", "") + break + + phone = phone.replace('-', '').strip() + if not name or not phone: + return None + + data = { + "phone": phone, + "province": province, + "city": city, + "law_firm": law_firm, + "url": url, + "domain": DOMAIN, + "name": name, + "create_time": int(time.time()), + "params": json.dumps({"province": province, "city": city}, ensure_ascii=False) + } + return data + + def _get_detail(self, url: str, max_retries: int = 3) -> Optional[str]: + session = self._get_thread_session() + for attempt in range(max_retries): + try: + resp = session.get_text(url, timeout=10, verify=False) + status_code = resp.status_code + text = resp.text + if status_code == 403: + if attempt < max_retries - 1: + wait_time = 2 ** attempt + random.uniform(0.3, 1.0) + print(f" 403被拦截,{wait_time}秒后重试 ({attempt + 1}/{max_retries})") + self._refresh_thread_session() + session = self._get_thread_session() + time.sleep(wait_time) + continue + print(" 请求失败: 403 Forbidden") + return None + if status_code >= 400: + raise RequestClientError(f"{status_code} Error") + return text + except RequestClientError as exc: + print(f" 请求失败: {exc}") + return None + return None + + def run(self): + print("启动律图采集...") + if not self.cities: + print("无城市数据") + return + + for city_code, info in self.cities.items(): + province = info.get("province_name", "") + city = info.get("name", "") + print(f"采集 {province}-{city}") + page = 1 + while True: + payload = self._build_payload(city_code, page) + html = self._post(payload) + if not html: + break + link_count = self._parse_list(html, province, city) + if link_count == 0: + break + page += 1 + print("律图采集完成") + + +if __name__ == "__main__": + with Db() as db: + spider = Six4365Spider(db) + spider.run() diff --git a/common_sites/start.sh b/common_sites/start.sh new file mode 100755 index 0000000..e8f1ede --- /dev/null +++ b/common_sites/start.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -euo pipefail + +# 切换到脚本所在目录,确保相对路径正确 +cd "$(dirname "$0")" + +echo "使用 request/proxy_settings.json 读取代理配置" + +nohup python3 dls.py > dls.log 2>&1 & # 大律师 +nohup python3 findlaw.py > findlaw.log 2>&1 & # 找法网 +nohup python3 lawtime.py > lawtime.log 2>&1 & # 法律快车 +nohup python3 six4365.py > six4365.log 2>&1 & # 律图 +nohup python3 hualv.py > hualv.log 2>&1 & # 华律 diff --git a/config.py b/config.py new file mode 100644 index 0000000..4afd5ea --- /dev/null +++ b/config.py @@ -0,0 +1,22 @@ +# common_sites 独立项目配置 + +DB_CONFIG = { + "host": "8.134.219.222", + "user": "lawyer", + "password": "CTxr8yGwsSX3NdfJ", + "database": "lawyer", + "charset": "utf8mb4", +} + +HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7", + "X-Requested-With": "XMLHttpRequest", +} + +LAWTIME_CONFIG = { + "HEADERS": { + "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1" + } +} diff --git a/request/__init__.py b/request/__init__.py new file mode 100644 index 0000000..87b6c07 --- /dev/null +++ b/request/__init__.py @@ -0,0 +1,19 @@ +from request.requests_client import ( + RequestClientError, + RequestConnectTimeout, + RequestConnectionError, + RequestSSLError, + RequestTimeout, + RequestsClient, + ResponseData, +) + +__all__ = [ + "RequestsClient", + "ResponseData", + "RequestClientError", + "RequestConnectTimeout", + "RequestTimeout", + "RequestConnectionError", + "RequestSSLError", +] diff --git a/request/proxy_config.py b/request/proxy_config.py new file mode 100644 index 0000000..6cbc1ae --- /dev/null +++ b/request/proxy_config.py @@ -0,0 +1,97 @@ +import json +import os +from typing import Dict, Optional + +CONFIG_PATH = os.path.join(os.path.dirname(__file__), "proxy_settings.json") + +DEFAULT_CONFIG = { + "enabled": True, + "tunnel": "t133.kdltps.com:15818", + "username": "t16766298346583", + "password": "zyn0vb20", + "scheme": "http", +} + +_PROXY_STATUS_REPORTED = False + + +def _normalize_bool(value, default: bool = True) -> bool: + if value is None: + return default + if isinstance(value, bool): + return value + text = str(value).strip().lower() + return text not in ("0", "false", "no", "off", "") + + +def _load_config() -> Dict[str, str]: + if not os.path.exists(CONFIG_PATH): + return dict(DEFAULT_CONFIG) + try: + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + data = json.load(f) or {} + except Exception as exc: + print(f"[proxy] 配置读取失败: {exc}, 使用默认配置") + return dict(DEFAULT_CONFIG) + + config = dict(DEFAULT_CONFIG) + for key, value in data.items(): + if value is not None: + config[key] = value + return config + + +def report_proxy_status() -> None: + global _PROXY_STATUS_REPORTED + if _PROXY_STATUS_REPORTED: + return + _PROXY_STATUS_REPORTED = True + + config = _load_config() + enabled = _normalize_bool(config.get("enabled"), True) + if not enabled: + print("[proxy] disabled by config") + return + + missing = [key for key in ("tunnel", "username", "password") if not config.get(key)] + if missing: + print(f"[proxy] enabled but missing fields: {', '.join(missing)}") + return + print(f"[proxy] enabled=True tunnel={config.get('tunnel')}") + + +def get_proxies() -> Optional[Dict[str, str]]: + """ + 返回统一的代理配置;当配置 enabled=false 时返回 None。 + 代理配置从 proxy_settings.json 读取,不依赖环境变量。 + """ + config = _load_config() + if not _normalize_bool(config.get("enabled"), True): + return None + + tunnel = str(config.get("tunnel") or "").strip() + username = str(config.get("username") or "").strip() + password = str(config.get("password") or "").strip() + scheme = str(config.get("scheme") or "http").strip().lower() + + if not tunnel or not username or not password: + print("[proxy] missing proxy credentials, proxy disabled") + return None + + proxy = f"{scheme}://{username}:{password}@{tunnel}/" + return {"http": proxy, "https": proxy} + + +def apply_proxy(session) -> Optional[Dict[str, str]]: + """为 requests.Session 应用统一代理配置,返回最终代理字典或 None。""" + report_proxy_status() + proxies = get_proxies() + session.trust_env = False + if proxies: + session.proxies.update(proxies) + else: + session.proxies.clear() + return proxies + + +__all__ = ["get_proxies", "apply_proxy", "report_proxy_status"] diff --git a/request/proxy_settings.json b/request/proxy_settings.json new file mode 100644 index 0000000..27beb20 --- /dev/null +++ b/request/proxy_settings.json @@ -0,0 +1,7 @@ +{ + "enabled": true, + "tunnel": "t133.kdltps.com:15818", + "username": "t16766298346583", + "password": "zyn0vb20", + "scheme": "http" +} diff --git a/request/requests_client.py b/request/requests_client.py new file mode 100644 index 0000000..c9252c0 --- /dev/null +++ b/request/requests_client.py @@ -0,0 +1,168 @@ +from dataclasses import dataclass +from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from request.proxy_config import apply_proxy + +TimeoutType = Union[float, Tuple[float, float]] + + +class RequestClientError(Exception): + """请求客户端通用异常。""" + + +class RequestConnectTimeout(RequestClientError): + """连接超时。""" + + +class RequestTimeout(RequestClientError): + """请求超时。""" + + +class RequestConnectionError(RequestClientError): + """连接错误。""" + + +class RequestSSLError(RequestClientError): + """SSL 错误。""" + + +@dataclass +class ResponseData: + # 只保留采集侧稳定需要的字段,避免直接向上层泄露原始 Response 对象 + status_code: int + text: str + url: str + headers: Dict[str, str] + + +class RequestsClient: + """ + 统一 requests 客户端: + - 自动应用代理配置 + - 支持可选重试 + - 对外抛出统一异常类型 + """ + + def __init__( + self, + headers: Optional[Mapping[str, str]] = None, + *, + retry_total: int = 0, + retry_backoff_factor: float = 0.0, + retry_status_forcelist: Optional[Iterable[int]] = None, + retry_allowed_methods: Optional[Iterable[str]] = None, + default_timeout: Optional[TimeoutType] = None, + ) -> None: + self._base_headers: Dict[str, str] = dict(headers or {}) + self.retry_total = int(retry_total) + self.retry_backoff_factor = float(retry_backoff_factor) + self.retry_status_forcelist = tuple(retry_status_forcelist or ()) + self.retry_allowed_methods = tuple(retry_allowed_methods or ("GET", "POST")) + self.default_timeout = default_timeout + self._session = self._build_session() + + def _build_session(self) -> requests.Session: + session = requests.Session() + # 统一从 proxy_settings.json 注入代理,并屏蔽系统环境代理干扰 + apply_proxy(session) + if self.retry_total > 0: + # 适配器级重试:主要处理连接波动与指定状态码的瞬时失败 + retries = Retry( + total=self.retry_total, + backoff_factor=self.retry_backoff_factor, + status_forcelist=self.retry_status_forcelist, + allowed_methods=frozenset(self.retry_allowed_methods), + raise_on_status=False, + ) + adapter = HTTPAdapter(max_retries=retries) + session.mount("https://", adapter) + session.mount("http://", adapter) + if self._base_headers: + # 基础头只在建 session 时注入,业务请求可通过 headers 临时覆盖 + session.headers.update(self._base_headers) + return session + + @property + def headers(self): + return self._session.headers + + @property + def proxies(self) -> Dict[str, str]: + return dict(self._session.proxies) + + def refresh(self) -> None: + # 强制重建 session,用于 403/连接异常后的“换连接”场景 + self.close() + self._session = self._build_session() + + def close(self) -> None: + try: + self._session.close() + except Exception: + pass + + def clone(self) -> "RequestsClient": + # 线程场景建议 clone:复用同配置,但使用独立连接池 + clone_client = RequestsClient( + headers=dict(self.headers), + retry_total=self.retry_total, + retry_backoff_factor=self.retry_backoff_factor, + retry_status_forcelist=self.retry_status_forcelist, + retry_allowed_methods=self.retry_allowed_methods, + default_timeout=self.default_timeout, + ) + return clone_client + + def request_text( + self, + method: str, + url: str, + *, + timeout: Optional[TimeoutType] = None, + verify: bool = True, + headers: Optional[Mapping[str, str]] = None, + **kwargs: Any, + ) -> ResponseData: + response = None + # 调用方未传 timeout 时,回退到客户端默认超时 + real_timeout = self.default_timeout if timeout is None else timeout + try: + response = self._session.request( + method=method, + url=url, + timeout=real_timeout, + verify=verify, + headers=headers, + **kwargs, + ) + return ResponseData( + status_code=response.status_code, + text=response.text, + url=response.url, + headers=dict(response.headers), + ) + # 把 requests 的具体异常统一收敛,业务层无需依赖 requests.exceptions + except requests.exceptions.ConnectTimeout as exc: + raise RequestConnectTimeout(str(exc)) from exc + except requests.exceptions.Timeout as exc: + raise RequestTimeout(str(exc)) from exc + except requests.exceptions.ConnectionError as exc: + raise RequestConnectionError(str(exc)) from exc + except requests.exceptions.SSLError as exc: + raise RequestSSLError(str(exc)) from exc + except requests.exceptions.RequestException as exc: + raise RequestClientError(str(exc)) from exc + finally: + if response is not None: + # 立即释放底层连接,避免大量采集时连接堆积 + response.close() + + def get_text(self, url: str, **kwargs: Any) -> ResponseData: + return self.request_text("GET", url, **kwargs) + + def post_text(self, url: str, **kwargs: Any) -> ResponseData: + return self.request_text("POST", url, **kwargs) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..61d3241 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +pymysql>=1.0.2 +requests>=2.28.0 +beautifulsoup4>=4.11.0 +urllib3>=1.26.0 +lxml>=4.9.0 diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/rate_limiter.py b/utils/rate_limiter.py new file mode 100644 index 0000000..f3fd9ea --- /dev/null +++ b/utils/rate_limiter.py @@ -0,0 +1,76 @@ +""" +全局请求速率限制器 +确保代理每秒不超过5次请求 +""" +import time +import threading +from collections import deque + + +class RateLimiter: + """ + 令牌桶算法实现的速率限制器 + """ + def __init__(self, max_requests_per_second: int = 5): + """ + 初始化速率限制器 + + Args: + max_requests_per_second: 每秒最大请求数 + """ + self.max_requests = max_requests_per_second + self.requests = deque() + self.lock = threading.RLock() + + def acquire(self): + """ + 获取请求权限,如果需要则等待 + """ + with self.lock: + now = time.time() + + # 清理超过1秒的请求记录 + while self.requests and now - self.requests[0] >= 1.0: + self.requests.popleft() + + # 如果当前请求数已达上限,等待 + if len(self.requests) >= self.max_requests: + # 计算需要等待的时间 + wait_time = 1.0 - (now - self.requests[0]) + if wait_time > 0: + time.sleep(wait_time) + return self.acquire() # 递归调用以重新检查 + + # 记录这次请求 + self.requests.append(now) + + def can_make_request(self) -> bool: + """ + 检查是否可以立即发起请求(非阻塞) + """ + with self.lock: + now = time.time() + + # 清理超过1秒的请求记录 + while self.requests and now - self.requests[0] >= 1.0: + self.requests.popleft() + + return len(self.requests) < self.max_requests + + +# 全局速率限制器实例 +global_rate_limiter = RateLimiter(max_requests_per_second=5) + + +def wait_for_request(): + """ + 等待直到可以发起请求 + """ + global_rate_limiter.acquire() + + +def can_request_now() -> bool: + """ + 检查是否可以立即发起请求 + """ + return global_rate_limiter.can_make_request()