chore: 暂存本地修改

This commit is contained in:
hello-dd-code
2026-04-28 17:33:51 +08:00
parent ba04fe42fc
commit f67cb30f0d
15 changed files with 1139 additions and 97 deletions
+220
View File
@@ -0,0 +1,220 @@
import json
import os
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
request_dir = os.path.join(project_root, "request")
if request_dir not in sys.path:
sys.path.insert(0, request_dir)
if project_root not in sys.path:
sys.path.append(project_root)
import requests
from request.proxy_config import get_proxies, report_proxy_status
@dataclass
class CheckResult:
site: str
url: str
method: str
ok: bool
status_code: Optional[int]
error: str
hint: str
elapsed_ms: int
def _now_ms() -> int:
return int(time.time() * 1000)
def _short_hint(text: str) -> str:
s = (text or "").strip().lower()
flags = []
for key, label in [
("403", "403"),
("429", "429"),
("captcha", "captcha"),
("验证码", "captcha_cn"),
("人机", "bot_check_cn"),
("access denied", "access_denied"),
("forbidden", "forbidden"),
("too many requests", "rate_limited"),
("cloudflare", "cloudflare"),
("challenge", "challenge"),
]:
if key in s:
flags.append(label)
return ",".join(flags)[:120]
def _build_session() -> requests.Session:
report_proxy_status()
s = requests.Session()
s.trust_env = False
proxies = get_proxies()
if proxies:
s.proxies.update(proxies)
else:
s.proxies.clear()
s.headers.update(
{
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36"
),
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "close",
}
)
return s
def _check(
session: requests.Session,
*,
site: str,
method: str,
url: str,
timeout: Tuple[float, float] = (10.0, 15.0),
headers: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
) -> CheckResult:
start = _now_ms()
try:
resp = session.request(
method=method,
url=url,
timeout=timeout,
verify=False,
headers=headers,
data=data,
)
text = resp.text or ""
status = resp.status_code
hint = _short_hint(text[:1200])
ok = 200 <= status < 400
return CheckResult(
site=site,
url=url,
method=method,
ok=ok,
status_code=status,
error="",
hint=hint,
elapsed_ms=_now_ms() - start,
)
except Exception as exc:
return CheckResult(
site=site,
url=url,
method=method,
ok=False,
status_code=None,
error=str(exc)[:200],
hint="",
elapsed_ms=_now_ms() - start,
)
finally:
try:
resp.close() # type: ignore[name-defined]
except Exception:
pass
def _tests() -> List[Dict[str, Any]]:
# 每个站点选一个“代表性列表/API”作为冒烟:能快速暴露 403/验证码/限频。
return [
{
"site": "大律师(m站)",
"method": "GET",
"url": "https://m.maxlaw.cn/",
},
{
"site": "大律师(PC站)",
"method": "GET",
"url": "https://www.maxlaw.cn/law/beijing?page=1",
"headers": {"Referer": "https://www.maxlaw.cn/"},
},
{
"site": "找法网(m站)",
"method": "GET",
"url": "https://m.findlaw.cn/beijing/q_lawyer/p1?ajax=1&order=0&sex=-1",
"headers": {
"Referer": "https://m.findlaw.cn/beijing/q_lawyer/",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01",
},
},
{
"site": "法律快车(m站)",
"method": "GET",
"url": "https://m.lawtime.cn/beijing/lawyer/?page=1",
},
{
"site": "律图(m站)",
"method": "POST",
"url": "https://m.64365.com/findLawyer/rpc/FindLawyer/LawyerRecommend/",
"data": {
"RegionId": "110100", # 北京市
"OnlyData": "true",
"LawyerRecommendRequest[AreaId]": "110100",
"LawyerRecommendRequest[PageIndex]": "1",
"LawyerRecommendRequest[PageSize]": "10",
"LawyerRecommendRequest[OrderType]": "0",
"LawyerRecommendRequest[Type]": "1",
},
},
{
"site": "华律(m站)",
"method": "POST",
"url": "https://m.66law.cn/findlawyer/rpc/loadlawyerlist/",
"data": {
"pid": "110000", # 北京
"cid": "110100", # 北京市
"page": "1",
},
},
]
def main() -> int:
mode = os.getenv("PROXY_ENABLED")
print(f"[smoke] PROXY_ENABLED={mode!r}")
s = _build_session()
results: List[CheckResult] = []
for item in _tests():
res = _check(
s,
site=item["site"],
method=item["method"],
url=item["url"],
headers=item.get("headers"),
data=item.get("data"),
)
results.append(res)
print(
f"[smoke] {res.site} {res.method} {res.status_code} ok={res.ok} "
f"{res.elapsed_ms}ms hint={res.hint or '-'} err={res.error or '-'}"
)
time.sleep(0.3)
summary = {
"proxy_enabled": mode,
"results": [res.__dict__ for res in results],
}
print("[smoke] summary_json=" + json.dumps(summary, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())