import json import os from typing import Dict, Optional CONFIG_PATH = os.path.join(os.path.dirname(__file__), "proxy_settings.json") DEFAULT_CONFIG = { "enabled": True, "tunnel": "t133.kdltps.com:15818", "username": "t16766298346583", "password": "zyn0vb20", "scheme": "http", } _PROXY_STATUS_REPORTED = False def _normalize_bool(value, default: bool = True) -> bool: if value is None: return default if isinstance(value, bool): return value text = str(value).strip().lower() return text not in ("0", "false", "no", "off", "") def _load_config() -> Dict[str, str]: if not os.path.exists(CONFIG_PATH): return dict(DEFAULT_CONFIG) try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: data = json.load(f) or {} except Exception as exc: print(f"[proxy] 配置读取失败: {exc}, 使用默认配置") return dict(DEFAULT_CONFIG) config = dict(DEFAULT_CONFIG) for key, value in data.items(): if value is not None: config[key] = value return config def report_proxy_status() -> None: global _PROXY_STATUS_REPORTED if _PROXY_STATUS_REPORTED: return _PROXY_STATUS_REPORTED = True config = _load_config() enabled = _normalize_bool(config.get("enabled"), True) if not enabled: print("[proxy] disabled by config") return missing = [key for key in ("tunnel", "username", "password") if not config.get(key)] if missing: print(f"[proxy] enabled but missing fields: {', '.join(missing)}") return print(f"[proxy] enabled=True tunnel={config.get('tunnel')}") def get_proxies() -> Optional[Dict[str, str]]: """ 返回统一的代理配置;当配置 enabled=false 时返回 None。 代理配置从 proxy_settings.json 读取,不依赖环境变量。 """ config = _load_config() if not _normalize_bool(config.get("enabled"), True): return None tunnel = str(config.get("tunnel") or "").strip() username = str(config.get("username") or "").strip() password = str(config.get("password") or "").strip() scheme = str(config.get("scheme") or "http").strip().lower() if not tunnel or not username or not password: print("[proxy] missing proxy credentials, proxy disabled") return None proxy = f"{scheme}://{username}:{password}@{tunnel}/" return {"http": proxy, "https": proxy} def apply_proxy(session) -> Optional[Dict[str, str]]: """为 requests.Session 应用统一代理配置,返回最终代理字典或 None。""" report_proxy_status() proxies = get_proxies() session.trust_env = False if proxies: session.proxies.update(proxies) else: session.proxies.clear() return proxies __all__ = ["get_proxies", "apply_proxy", "report_proxy_status"]