import copy import json import os import re import sys import time from html import unescape from http.cookies import SimpleCookie from typing import Dict, Optional from urllib.parse import urlencode import requests import urllib3 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) for path in (current_dir, project_root): if path not in sys.path: sys.path.append(path) import config as project_config from utils.rate_limiter import wait_for_request, global_rate_limiter API_ENDPOINT = "https://mp.weixin.qq.com/cgi-bin/videosnap" DOMAIN = "mp.weixin.qq.com" DEFAULT_HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/146.0.0.0 Safari/537.36" ), "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7", "DNT": "1", "Priority": "u=1, i", "Sec-CH-UA": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"', "Sec-CH-UA-Mobile": "?0", "Sec-CH-UA-Platform": '"Windows"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "X-Requested-With": "XMLHttpRequest", } DEFAULT_WEIXIN_CONFIG = { "TOKEN": "609153506", "FINGERPRINT": "46a7e6ac6ccf205986adc0aa99127860", "COOKIE": { "appmsglist_action_3258147150": "card", "_qimei_uuid42": "1a302160d051008226aec905b63f99ff3989f30009", "_qimei_i_3": "63b22b84c15204dfc595ac6452d722b1f0bdf0f6145b568ae68a7c0e70947438686637943989e2a1d792", "_qimei_h38": "215986ce26aec905b63f99ff0200000e81a302", "ua_id": "S7gglu0eZh9NkAzLAAAAADH8dynpnFZVN29lxm7BQo0=", "wxuin": "73074968761097", "mm_lang": "zh_CN", "eas_sid": "91X7I7K4K5k364U2z3k2I980F5", "_qimei_q36": "", "_qimei_fingerprint": "d895c46d5fda98cab67d9daec00068ed", "_clck": "501quy|1|g4t|0", "uuid": "210d1c199a63afd4c774eccd9a06a27f", "rand_info": "CAESIE4WqrFFVVjqrrNflbCUM7wPD5NXjuGbjfHolAEsMmEm", "slave_bizuin": "3258147150", "data_bizuin": "3258147150", "bizuin": "3258147150", "data_ticket": "tpcLjRB7B7AlUY3rFe/ILEjtCKs7dEEGsn8kXnHVzdTb9dgIpSPN1aP8FlE6FDhj", "slave_sid": "U3hfU1Z0UV91N0U5d0lkRDhyTzh3d3hmbnBHMjBnbmFNdzVJeGlJeTJ6OTVxRjJQVVE2VkNhejYzTkxETVVSZkF3eWRORmtRS01XWFBjdnFZZWFLNjR2ZGtwdUJ2MzByclg0NjF4SHlDeVJneEhsczdSYUJVNE45VEhNRWVTQXg1dlpGdWQ0bU5VM3pnRzJN", "slave_user": "gh_fe76760560d0", "xid": "ef503a6864cceaef225c615a45606e4a", "_clsk": "12arnf1|1774975723874|4|1|mp.weixin.qq.com/weheat-agent/payload/record", "_qimei_i_1": "2ddc6a80945f59d3c7c4ab325dd526b3feeea1a31458558bbdd97e582493206c6163629d39d8e1dcd49fddc7" }, "COUNT": 21, "REFERER": "https://mp.weixin.qq.com/", "HEADERS": {}, "REQUEST_PARAMS": { "action": "search", "scene": "1", "lang": "zh_CN", "f": "json", "ajax": "1", }, "REQUESTS_PER_SECOND": 5, "PAGE_DELAY": 5, "CITY_DELAY": 2, } def _deep_merge_dict(base: Dict, incoming: Dict) -> Dict: merged = copy.deepcopy(base) for key, value in incoming.items(): if isinstance(value, dict) and isinstance(merged.get(key), dict): merged[key] = _deep_merge_dict(merged[key], value) else: merged[key] = value return merged def _parse_cookie_value(cookie_value) -> Dict[str, str]: if isinstance(cookie_value, dict): return {str(key): str(value) for key, value in cookie_value.items()} if not cookie_value: return {} if isinstance(cookie_value, str): text = cookie_value.strip() if not text: return {} try: parsed = json.loads(text) except json.JSONDecodeError: parsed = None if isinstance(parsed, dict): return {str(key): str(value) for key, value in parsed.items()} cookie = SimpleCookie() cookie.load(text) return {key: morsel.value for key, morsel in cookie.items()} return {} def _load_weixin_config() -> Dict: config = copy.deepcopy(DEFAULT_WEIXIN_CONFIG) module_config = getattr(project_config, "WEIXIN_CONFIG", None) if isinstance(module_config, dict): config = _deep_merge_dict(config, module_config) env_mapping = { "TOKEN": os.getenv("WEIXIN_TOKEN"), "FINGERPRINT": os.getenv("WEIXIN_FINGERPRINT"), "COOKIE": os.getenv("WEIXIN_COOKIE"), "REFERER": os.getenv("WEIXIN_REFERER"), "COUNT": os.getenv("WEIXIN_COUNT"), "REQUESTS_PER_SECOND": os.getenv("WEIXIN_REQUESTS_PER_SECOND"), "PAGE_DELAY": os.getenv("WEIXIN_PAGE_DELAY"), "CITY_DELAY": os.getenv("WEIXIN_CITY_DELAY"), } for key, value in env_mapping.items(): if value not in (None, ""): config[key] = value config["COOKIE"] = _parse_cookie_value(config.get("COOKIE")) for key in ("COUNT", "REQUESTS_PER_SECOND"): try: config[key] = int(config[key]) except (TypeError, ValueError): config[key] = DEFAULT_WEIXIN_CONFIG[key] for key in ("PAGE_DELAY", "CITY_DELAY"): try: config[key] = float(config[key]) except (TypeError, ValueError): config[key] = DEFAULT_WEIXIN_CONFIG[key] return config def _strip_html(text: str) -> str: if not text: return "" return re.sub(r"<[^>]+>", "", unescape(text)).strip() class WeixinSpider: """基于 requests 的微信视频号采集器""" def __init__(self, db_connection): self.db = db_connection self.config = _load_weixin_config() self.token = str(self.config.get("TOKEN", "")).strip() self.fingerprint = str(self.config.get("FINGERPRINT", "")).strip() self.cookies = self.config.get("COOKIE", {}) self.count = str(self.config.get("COUNT", DEFAULT_WEIXIN_CONFIG["COUNT"])) self.referer = str(self.config.get("REFERER", DEFAULT_WEIXIN_CONFIG["REFERER"])).strip() self.request_params = { str(key): str(value) for key, value in (self.config.get("REQUEST_PARAMS", {}) or {}).items() if value is not None } self.page_delay = max(0.0, float(self.config.get("PAGE_DELAY", DEFAULT_WEIXIN_CONFIG["PAGE_DELAY"]))) self.city_delay = max(0.0, float(self.config.get("CITY_DELAY", DEFAULT_WEIXIN_CONFIG["CITY_DELAY"]))) max_rps = self.config.get("REQUESTS_PER_SECOND") if max_rps: global_rate_limiter.max_requests = int(max_rps) headers = DEFAULT_HEADERS.copy() project_headers = getattr(project_config, "HEADERS", None) if isinstance(project_headers, dict): headers.update(project_headers) config_headers = self.config.get("HEADERS", {}) if isinstance(config_headers, dict): headers.update({str(key): str(value) for key, value in config_headers.items()}) if self.referer: headers["Referer"] = self.referer self.session = requests.Session() self.session.trust_env = False self.session.headers.update(headers) if self.cookies: self.session.cookies.update(self.cookies) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def _validate_runtime_config(self) -> bool: missing = [] if not self.token: missing.append("TOKEN") if not self.fingerprint: missing.append("FINGERPRINT") if not self.cookies: missing.append("COOKIE") if not missing: return True print( "[微信] 配置不完整,缺少: " + ", ".join(missing) + "。请在 config.py 的 WEIXIN_CONFIG 中补齐," + "或通过环境变量 WEIXIN_TOKEN / WEIXIN_FINGERPRINT / WEIXIN_COOKIE 提供。" ) return False def _load_areas(self): condition = "domain='maxlaw' AND level=2" tables = ("area_new", "area", "area2") last_error = None for table in tables: try: rows = self.db.select_data(table, "province, city", condition) or [] except Exception as exc: last_error = exc continue if rows: print(f"[微信] 城市来源表: {table}, 城市数: {len(rows)}") return rows if last_error: print(f"[微信] 加载地区数据失败: {last_error}") print("[微信] 无城市数据(已尝试 area_new/area/area2)") return [] def _build_query_url(self, query: str, buffer: str) -> str: params = self.request_params.copy() params.update({ "query": query, "count": self.count, "buffer": buffer, "fingerprint": self.fingerprint, "token": self.token, }) return f"{API_ENDPOINT}?{urlencode(params)}" def _extract_phone(self, text: str) -> Optional[str]: if not text: return None match = re.search(r"1[3-9]\d{9}", text) return match.group(0) if match else None def _parse_name(self, acct: Dict) -> str: highlight = _strip_html(acct.get("highlight_nickname", "")) if highlight: return highlight return _strip_html(acct.get("nickname", "")) def _store_account(self, acct: Dict, province: str, city: str) -> None: signature = acct.get("signature", "") phone = self._extract_phone(signature) if not phone: return if self.db.is_data_exist("lawyer", f"phone='{phone}' and domain='{DOMAIN}'"): name = self._parse_name(acct) print(f" -- 已存在律师: {name} ({phone})") return params = json.dumps(acct, ensure_ascii=False) lawyer_data = { "phone": phone, "province": province, "city": city, "law_firm": acct.get("auth_info", {}).get("auth_profession"), "url": f"https://channels.weixin.qq.com/finder/creator/feeds?finder_username={acct.get('username', '')}", "create_time": int(time.time()), "domain": DOMAIN, "name": self._parse_name(acct), "params": params, } try: inserted_id = self.db.insert_data("lawyer", lawyer_data) print(f" -> 新增律师: {lawyer_data['name']} ({phone}), ID: {inserted_id}") except Exception as exc: print(f" 插入失败 {lawyer_data['name']} ({phone}): {exc}") def _search_city(self, province: str, city: str) -> None: city_name = city.replace('市', '') query = f"{city_name}律所" print(f"--- [微信] 开始采集城市: {province} - {city_name} ---") buffer = "" has_more = True page_no = 0 while has_more: page_no += 1 url = self._build_query_url(query, buffer) print(f"正在采集 '{query}' 第 {page_no} 页: {url}") wait_for_request() try: response = self.session.get( url, timeout=15, cookies=self.cookies, proxies={}, # 明确禁用代理 verify=False, ) response.raise_for_status() data = response.json() except requests.exceptions.RequestException as exc: print(f"网络请求失败: {exc}") break except json.JSONDecodeError: print("解析返回的JSON失败。返回内容:", response.text[:200]) break base_resp = data.get("base_resp", {}) if base_resp.get("ret") != 0: print(f"API返回错误: {base_resp.get('err_msg')}") if "invalid ticket" in (base_resp.get('err_msg') or ""): print("Token 或 Cookie 可能失效,请更新配置。") break accounts = data.get("acct_list", []) if not accounts: print("本页未找到更多律师信息。") break for acct in accounts: self._store_account(acct, province, city_name) has_more = bool(data.get("acct_continue_flag")) buffer = data.get("last_buff", "") time.sleep(self.page_delay) print(f"--- [微信] 城市: {city_name} 采集完成 ---\n") def run(self) -> None: print("启动微信视频号律师信息采集...") if not self._validate_runtime_config(): return areas = self._load_areas() if not areas: print("[微信] 未能从 `area_new` 表获取到地区信息。") return for area in areas: province = area.get("province", "") city = area.get("city", "") if not city: continue try: self._search_city(province, city) except Exception as exc: print(f"采集 {province}-{city} 时发生错误: {exc}") time.sleep(self.city_delay) print("微信视频号律师信息采集完成。") if __name__ == "__main__": from Db import Db with Db() as db: spider = WeixinSpider(db) spider.run()