412 lines
14 KiB
Python
412 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.dirname(current_dir)
|
|
if project_root not in sys.path:
|
|
sys.path.append(project_root)
|
|
|
|
from Db import Db
|
|
|
|
SITE_NAME = "zhongfali_group80"
|
|
LEGACY_DOMAIN = "众法利单页"
|
|
START_URL = "http://m.zhongfali.com/pg.jsp?groupId=80&pgt=0&pgs=1"
|
|
DEFAULT_OUTPUT = "/www/wwwroot/lawyers/data/one_off_sites/zhongfali_records_all.jsonl"
|
|
|
|
SOCKS_PROXY = "127.0.0.1:7891"
|
|
CLASH_CONTROLLER = os.environ.get("CLASH_CONTROLLER", "http://127.0.0.1:9090")
|
|
CLASH_SECRET = os.environ.get("CLASH_SECRET", "")
|
|
PHONE_RE = re.compile(r"1[3-9]\d{9}")
|
|
INITIAL_STATE_RE = re.compile(r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})</script>", re.S)
|
|
|
|
|
|
class ProxyRotator:
|
|
def __init__(self, controller: str, secret: str):
|
|
self.controller = controller.rstrip("/")
|
|
self.secret = secret.strip()
|
|
self.nodes: List[str] = []
|
|
self.index = 0
|
|
|
|
def _api(self, path: str, method: str = "GET", payload: Optional[Dict] = None) -> Dict:
|
|
headers = {}
|
|
if self.secret:
|
|
headers["Authorization"] = f"Bearer {self.secret}"
|
|
body = None
|
|
if payload is not None:
|
|
headers["Content-Type"] = "application/json"
|
|
body = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{self.controller}{path}",
|
|
data=body,
|
|
headers=headers,
|
|
method=method,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
raw = resp.read().decode("utf-8", errors="ignore")
|
|
return json.loads(raw) if raw else {}
|
|
|
|
def initialize(self) -> None:
|
|
if not self.secret:
|
|
return
|
|
try:
|
|
self._api("/configs", method="PATCH", payload={"mode": "global"})
|
|
proxy_data = self._api("/proxies")
|
|
proxies = proxy_data.get("proxies", {}) or {}
|
|
skip = {
|
|
"GLOBAL",
|
|
"DIRECT",
|
|
"REJECT",
|
|
"REJECT-DROP",
|
|
"PASS",
|
|
"COMPATIBLE",
|
|
"🔰 选择节点",
|
|
"☁️ OneDrive",
|
|
"🐟 漏网之鱼",
|
|
"🎯 全球直连",
|
|
"🛑 拦截广告",
|
|
"🌍 爱奇艺&哔哩哔哩",
|
|
"🎮 Steam 登录/下载",
|
|
"🎮 Steam 商店/社区",
|
|
"🌩️ Cloudflare",
|
|
"🎬 动画疯",
|
|
"🎓学术网站",
|
|
"🇨🇳 国内网站",
|
|
}
|
|
self.nodes = [
|
|
name
|
|
for name, info in proxies.items()
|
|
if name not in skip and isinstance(info, dict)
|
|
and info.get("type") not in {"Selector", "URLTest", "Fallback", "LoadBalance"}
|
|
]
|
|
if self.nodes:
|
|
self.switch_to(self.nodes[0])
|
|
except Exception as exc:
|
|
print(f"[proxy] rotator init failed: {exc}")
|
|
self.nodes = []
|
|
|
|
def switch_to(self, node_name: str) -> None:
|
|
self._api("/proxies/GLOBAL", method="PUT", payload={"name": node_name})
|
|
|
|
def rotate(self) -> None:
|
|
if not self.nodes:
|
|
return
|
|
self.index = (self.index + 1) % len(self.nodes)
|
|
node = self.nodes[self.index]
|
|
self.switch_to(node)
|
|
|
|
|
|
def normalize_phone(value: str) -> str:
|
|
compact = "".join(ch for ch in str(value or "") if ch.isdigit())
|
|
match = PHONE_RE.search(compact)
|
|
return match.group(0) if match else ""
|
|
|
|
|
|
def fetch_html(
|
|
url: str,
|
|
rotator: Optional[ProxyRotator] = None,
|
|
max_retries: int = 6,
|
|
timeout_seconds: int = 18,
|
|
) -> str:
|
|
last_error = ""
|
|
for attempt in range(max_retries):
|
|
cmd = [
|
|
"curl",
|
|
"-sS",
|
|
"--socks5-hostname",
|
|
SOCKS_PROXY,
|
|
"-L",
|
|
"--compressed",
|
|
"--max-time",
|
|
str(timeout_seconds),
|
|
"-w",
|
|
"\n__CODE__:%{http_code}",
|
|
url,
|
|
]
|
|
proc = subprocess.run(cmd, capture_output=True)
|
|
if proc.returncode == 0:
|
|
raw = proc.stdout.decode("utf-8", errors="ignore")
|
|
marker = "\n__CODE__:"
|
|
split_at = raw.rfind(marker)
|
|
if split_at != -1:
|
|
text = raw[:split_at]
|
|
code_text = raw[split_at + len(marker):].strip()
|
|
else:
|
|
text = raw
|
|
code_text = ""
|
|
code_ok = code_text == "200" if code_text else bool(text)
|
|
if text and code_ok:
|
|
return text
|
|
last_error = "empty body"
|
|
else:
|
|
last_error = proc.stderr.decode("utf-8", errors="ignore").strip() or f"exit={proc.returncode}"
|
|
if rotator and rotator.nodes:
|
|
try:
|
|
rotator.rotate()
|
|
except Exception as exc:
|
|
last_error = f"{last_error}; rotate failed: {exc}"
|
|
if attempt < max_retries - 1:
|
|
time.sleep(0.6 * (attempt + 1))
|
|
raise RuntimeError(f"fetch failed: {url}, reason={last_error}")
|
|
|
|
|
|
def parse_initial_state(html: str) -> Dict:
|
|
match = INITIAL_STATE_RE.search(html)
|
|
if not match:
|
|
raise ValueError("window.__INITIAL_STATE__ not found")
|
|
return json.loads(match.group(1))
|
|
|
|
|
|
def extract_group_urls_from_group80(state: Dict) -> List[str]:
|
|
module = (state.get("currentPageModuleIdMap") or {}).get("21") or {}
|
|
ext_info = module.get("extInfo", {}) or {}
|
|
second_group_map = ext_info.get("secondGroupMap", {}) or {}
|
|
rows = second_group_map.get("80") or []
|
|
|
|
urls: Set[str] = set()
|
|
for row in rows:
|
|
url = str(row.get("url") or "").strip()
|
|
if url:
|
|
urls.add(url)
|
|
for city in row.get("thirdGroupList") or []:
|
|
city_url = str(city.get("url") or "").strip()
|
|
if city_url:
|
|
urls.add(city_url)
|
|
return sorted(urls)
|
|
|
|
|
|
def extract_detail_urls_from_group_html(html: str) -> Set[str]:
|
|
detail_ids = set(re.findall(r"h-pd-(\d+)\.html", html))
|
|
return {f"http://m.zhongfali.com/h-pd-{pid}.html" for pid in detail_ids}
|
|
|
|
|
|
def parse_location_and_name(product_name: str) -> Tuple[str, str, str]:
|
|
text = re.sub(r"\s+", " ", str(product_name or "")).strip()
|
|
province = ""
|
|
city = ""
|
|
name = ""
|
|
|
|
province_match = re.search(r"([\u4e00-\u9fa5]{2,}省)", text)
|
|
if province_match:
|
|
province = province_match.group(1)
|
|
|
|
city_match = re.search(r"(?:[\u4e00-\u9fa5]+省)?([\u4e00-\u9fa5]+(?:市|区|县|州|盟))", text)
|
|
if city_match:
|
|
city = city_match.group(1)
|
|
|
|
name_match = re.search(r"([\u4e00-\u9fa5]{2,4})\s*律师", text)
|
|
if name_match:
|
|
name = name_match.group(1)
|
|
|
|
return province, city, name
|
|
|
|
|
|
def parse_detail_record(detail_url: str, html: str, source_list_url: str) -> Optional[Dict]:
|
|
state = parse_initial_state(html)
|
|
|
|
module = None
|
|
for mod in (state.get("currentPageModuleIdMap") or {}).values():
|
|
if isinstance(mod, dict) and (mod.get("extInfo") or {}).get("productInfo"):
|
|
module = mod
|
|
break
|
|
if not module:
|
|
return None
|
|
|
|
ext_info = module.get("extInfo", {}) or {}
|
|
product_info = ext_info.get("productInfo", {}) or {}
|
|
|
|
phone = normalize_phone(product_info.get("material", ""))
|
|
if not phone:
|
|
return None
|
|
|
|
product_name = str(product_info.get("name") or "").strip()
|
|
province, city, lawyer_name = parse_location_and_name(product_name)
|
|
law_firm = str(product_info.get("prop0") or "").strip()
|
|
|
|
if not lawyer_name:
|
|
lawyer_name = product_name
|
|
|
|
now = int(time.time())
|
|
record_id = hashlib.md5(detail_url.encode("utf-8")).hexdigest()
|
|
return {
|
|
"record_id": record_id,
|
|
"collected_at": now,
|
|
"source": {
|
|
"site": SITE_NAME,
|
|
"list_url": source_list_url,
|
|
"detail_url": detail_url,
|
|
"province": province,
|
|
"province_py": "",
|
|
"city": city,
|
|
"city_py": "",
|
|
"page": 1,
|
|
},
|
|
"list_snapshot": {
|
|
"name": lawyer_name,
|
|
"law_firm": law_firm,
|
|
"specialties": [],
|
|
"answer_count": None,
|
|
},
|
|
"profile": {
|
|
"name": lawyer_name,
|
|
"law_firm": law_firm,
|
|
"phone": phone,
|
|
"license_no": str(product_info.get("prop1") or "").strip(),
|
|
"practice_years": None,
|
|
"email": "",
|
|
"address": str(product_info.get("prop3") or "").strip(),
|
|
"specialties": [],
|
|
},
|
|
"raw": {
|
|
"product_name": product_name,
|
|
"group_ids": product_info.get("groupIdList") or [],
|
|
},
|
|
}
|
|
|
|
|
|
def to_legacy_row(record: Dict) -> Optional[Dict[str, str]]:
|
|
profile = record.get("profile", {}) or {}
|
|
source = record.get("source", {}) or {}
|
|
phone = normalize_phone(profile.get("phone", ""))
|
|
if not phone:
|
|
return None
|
|
|
|
province = str(source.get("province") or "").strip()
|
|
city = str(source.get("city") or province).strip()
|
|
return {
|
|
"name": str(profile.get("name") or "").strip(),
|
|
"law_firm": str(profile.get("law_firm") or "").strip(),
|
|
"province": province,
|
|
"city": city,
|
|
"phone": phone,
|
|
"url": str(source.get("detail_url") or "").strip(),
|
|
"domain": LEGACY_DOMAIN,
|
|
"create_time": int(record.get("collected_at") or time.time()),
|
|
"params": json.dumps(record, ensure_ascii=False),
|
|
}
|
|
|
|
|
|
def delete_old_domain_data(db: Db, domain: str) -> int:
|
|
cur = db.db.cursor()
|
|
try:
|
|
cur.execute("DELETE FROM lawyer WHERE domain=%s", (domain,))
|
|
affected = cur.rowcount
|
|
db.db.commit()
|
|
return affected
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
def write_records_to_db(db: Db, records: List[Dict]) -> int:
|
|
inserted = 0
|
|
for record in records:
|
|
row = to_legacy_row(record)
|
|
if not row:
|
|
continue
|
|
try:
|
|
db.insert_data("lawyer", row)
|
|
inserted += 1
|
|
except Exception as exc:
|
|
print(f"[db] insert failed phone={row.get('phone', '')}: {exc}")
|
|
return inserted
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="众法利 groupId=80 基础字段采集(姓名/手机号/地区)")
|
|
parser.add_argument("--start-url", default=START_URL, help="入口分组页 URL")
|
|
parser.add_argument("--output", default=DEFAULT_OUTPUT, help="JSONL 输出路径")
|
|
parser.add_argument("--no-db", action="store_true", help="只写 JSON,不写 DB")
|
|
parser.add_argument("--no-reset", action="store_true", help="不清理 domain 旧数据")
|
|
parser.add_argument("--workers", type=int, default=16, help="详情页并发数")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
|
|
|
|
rotator = ProxyRotator(CLASH_CONTROLLER, CLASH_SECRET)
|
|
rotator.initialize()
|
|
if rotator.nodes:
|
|
print(f"[proxy] rotator enabled, nodes={len(rotator.nodes)}")
|
|
else:
|
|
print("[proxy] rotator disabled, using current proxy route")
|
|
|
|
start_retries = max(8, len(rotator.nodes) + 2) if rotator.nodes else 8
|
|
group_html = fetch_html(args.start_url, rotator=rotator, max_retries=start_retries)
|
|
group_state = parse_initial_state(group_html)
|
|
group_urls = extract_group_urls_from_group80(group_state)
|
|
print(f"[group] found group urls: {len(group_urls)}")
|
|
|
|
detail_url_to_source: Dict[str, str] = {}
|
|
for idx, rel_url in enumerate(group_urls, start=1):
|
|
list_url = f"http://m.zhongfali.com/{rel_url.lstrip('/')}"
|
|
try:
|
|
html = fetch_html(list_url, rotator=rotator, max_retries=4, timeout_seconds=12)
|
|
detail_urls = extract_detail_urls_from_group_html(html)
|
|
except Exception as exc:
|
|
print(f"[group] failed {list_url}: {exc}")
|
|
continue
|
|
|
|
for detail_url in detail_urls:
|
|
detail_url_to_source.setdefault(detail_url, list_url)
|
|
if idx % 10 == 0:
|
|
print(f"[group] {idx}/{len(group_urls)} detail_urls={len(detail_url_to_source)}")
|
|
|
|
records: List[Dict] = []
|
|
seen_phones: Set[str] = set()
|
|
detail_urls = sorted(detail_url_to_source.keys())
|
|
print(f"[detail] total detail urls: {len(detail_urls)}")
|
|
|
|
def process_detail(detail_url: str) -> Optional[Dict]:
|
|
try:
|
|
html = fetch_html(detail_url, rotator=rotator, max_retries=2, timeout_seconds=8)
|
|
record = parse_detail_record(detail_url, html, detail_url_to_source[detail_url])
|
|
return record
|
|
except Exception as exc:
|
|
print(f"[detail] failed {detail_url}: {exc}")
|
|
return None
|
|
|
|
done = 0
|
|
with ThreadPoolExecutor(max_workers=max(1, int(args.workers))) as executor:
|
|
futures = [executor.submit(process_detail, detail_url) for detail_url in detail_urls]
|
|
for future in as_completed(futures):
|
|
done += 1
|
|
record = future.result()
|
|
if record:
|
|
phone = normalize_phone((record.get("profile", {}) or {}).get("phone", ""))
|
|
if phone and phone not in seen_phones:
|
|
seen_phones.add(phone)
|
|
records.append(record)
|
|
if done % 50 == 0:
|
|
print(f"[detail] {done}/{len(detail_urls)} valid_records={len(records)}")
|
|
|
|
with open(args.output, "w", encoding="utf-8") as out:
|
|
for record in records:
|
|
out.write(json.dumps(record, ensure_ascii=False) + "\n")
|
|
|
|
deleted = 0
|
|
inserted = 0
|
|
if not args.no_db:
|
|
with Db() as db:
|
|
if not args.no_reset:
|
|
deleted = delete_old_domain_data(db, LEGACY_DOMAIN)
|
|
inserted = write_records_to_db(db, records)
|
|
|
|
print(
|
|
f"[done] records={len(records)}, db_deleted={deleted}, db_inserted={inserted}, output={args.output}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|