""" 全局请求速率限制器 默认按“所有爬虫进程共享一个桶”来限流,避免 `bash start.sh` 同时启动多个进程时,每个进程各自 5 次/秒,叠加后把代理冲爆。 """ from contextlib import contextmanager import json import os import tempfile import time import threading from pathlib import Path from uuid import uuid4 import fcntl from request.proxy_config import is_proxy_enabled class RateLimiter: """ 基于文件锁的跨进程滑动窗口限流器。 - 同一台机器上的多个 Python 进程会共享同一个状态文件 - 同一个进程内的多个线程也会一起走这个限流器 """ def __init__( self, max_requests_per_second: int = 5, window_seconds: float = 1.0, state_file: str | None = None, ): self.max_requests = max(1, int(max_requests_per_second)) self.max_concurrent = max( 1, int(os.getenv("PROXY_MAX_CONCURRENT_REQUESTS", str(self.max_requests))), ) self.window_seconds = max(0.1, float(window_seconds)) self.lease_seconds = max( 5.0, float(os.getenv("PROXY_REQUEST_LEASE_SECONDS", "120")), ) default_state = os.path.join( tempfile.gettempdir(), "lawyers_proxy_rate_limiter.json", ) self.state_file = Path( state_file or os.getenv("PROXY_RATE_LIMIT_FILE", default_state) ) self.lock_file = self.state_file.with_suffix(self.state_file.suffix + ".lock") self._thread_lock = threading.RLock() self.state_file.parent.mkdir(parents=True, exist_ok=True) self.lock_file.parent.mkdir(parents=True, exist_ok=True) def _load_state(self) -> dict: if not self.state_file.exists(): return {"timestamps": [], "leases": {}} try: raw = self.state_file.read_text(encoding="utf-8").strip() if not raw: return {"timestamps": [], "leases": {}} data = json.loads(raw) if isinstance(data, list): return { "timestamps": [float(item) for item in data], "leases": {}, } if not isinstance(data, dict): return {"timestamps": [], "leases": {}} timestamps = data.get("timestamps", []) or [] leases = data.get("leases", {}) or {} return { "timestamps": [float(item) for item in timestamps], "leases": {str(key): float(value) for key, value in leases.items()}, } except Exception: return {"timestamps": [], "leases": {}} def _save_state(self, state: dict) -> None: payload = json.dumps(state, ensure_ascii=False) self.state_file.write_text(payload, encoding="utf-8") def _normalize_state(self, state: dict, now: float) -> dict: timestamps = [ float(ts) for ts in (state.get("timestamps", []) or []) if now - float(ts) < self.window_seconds ] leases = { str(key): float(value) for key, value in (state.get("leases", {}) or {}).items() if now - float(value) < self.lease_seconds } return {"timestamps": timestamps, "leases": leases} def acquire(self) -> None: token = None while True: token = self.try_acquire_slot() if token: self.release(token) return time.sleep(0.05) def try_acquire_slot(self) -> str | None: while True: wait_time = 0.0 with self._thread_lock: with open(self.lock_file, "a+", encoding="utf-8") as lock_fp: fcntl.flock(lock_fp.fileno(), fcntl.LOCK_EX) now = time.time() state = self._normalize_state(self._load_state(), now) timestamps = state["timestamps"] leases = state["leases"] if len(timestamps) < self.max_requests and len(leases) < self.max_concurrent: token = uuid4().hex timestamps.append(now) leases[token] = now self._save_state(state) fcntl.flock(lock_fp.fileno(), fcntl.LOCK_UN) return token wait_candidates = [] if len(timestamps) >= self.max_requests and timestamps: wait_candidates.append(self.window_seconds - (now - timestamps[0])) if len(leases) >= self.max_concurrent: wait_candidates.append(0.05) wait_time = max(0.05, min([item for item in wait_candidates if item > 0] or [0.05])) fcntl.flock(lock_fp.fileno(), fcntl.LOCK_UN) time.sleep(wait_time) def release(self, token: str | None) -> None: if not token: return with self._thread_lock: with open(self.lock_file, "a+", encoding="utf-8") as lock_fp: fcntl.flock(lock_fp.fileno(), fcntl.LOCK_EX) now = time.time() state = self._normalize_state(self._load_state(), now) leases = state["leases"] if token in leases: leases.pop(token, None) self._save_state(state) else: self._save_state(state) fcntl.flock(lock_fp.fileno(), fcntl.LOCK_UN) def can_make_request(self) -> bool: with self._thread_lock: with open(self.lock_file, "a+", encoding="utf-8") as lock_fp: fcntl.flock(lock_fp.fileno(), fcntl.LOCK_EX) now = time.time() state = self._normalize_state(self._load_state(), now) self._save_state(state) allowed = ( len(state["timestamps"]) < self.max_requests and len(state["leases"]) < self.max_concurrent ) fcntl.flock(lock_fp.fileno(), fcntl.LOCK_UN) return allowed global_rate_limiter = RateLimiter( max_requests_per_second=int(os.getenv("PROXY_MAX_REQUESTS_PER_SECOND", "5")) ) def _should_limit_proxy_requests() -> bool: """ 仅在当前进程实际启用代理时启用全局代理限流。 """ try: return is_proxy_enabled() except Exception: return True def wait_for_request(): """等待直到可以发起请求。""" if not _should_limit_proxy_requests(): return global_rate_limiter.acquire() def can_request_now() -> bool: """检查是否可以立即发起请求。""" if not _should_limit_proxy_requests(): return True return global_rate_limiter.can_make_request() @contextmanager def request_slot(): """ 申请一个跨进程共享的请求槽位,请求结束后自动释放。 这样既能限制“每秒启动多少请求”,也能限制“同时在飞多少请求”。 """ if not _should_limit_proxy_requests(): yield return token = global_rate_limiter.try_acquire_slot() try: yield finally: global_rate_limiter.release(token)