diff --git a/README.md b/README.md index e8f0a44..8180f51 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ ## 目录 - `common_sites/`:大律师、找法网、法律快车、律图、华律 5 个采集脚本 +- `one_off_sites/`:一次性/临时站点采集脚本(不纳入常用站点批量启动) - `request/proxy_config.py`:代理配置加载逻辑 - `request/proxy_settings.json`:代理配置文件 - `Db.py`:数据库连接与基础操作 @@ -60,3 +61,15 @@ DLS_DIRECT=1 DLS_NO_DB=1 ./common_sites/start.sh # 如果不需要解析 params 扩展信息 ./.venv/bin/python ./common_sites/export_lawyers_excel.py --no-parse-params ``` + +## 一次性站点(众法利) + +脚本:`one_off_sites/zhongfali_single.py` + +```bash +# 仅采集写 JSON(默认输出到 data/one_off_sites/) +./.venv/bin/python ./one_off_sites/zhongfali_single.py --direct --no-db + +# 采集并写入 lawyer 表(domain=众法利单页) +./.venv/bin/python ./one_off_sites/zhongfali_single.py --direct +``` diff --git a/one_off_sites/zhongfali_single.py b/one_off_sites/zhongfali_single.py new file mode 100644 index 0000000..a8cf142 --- /dev/null +++ b/one_off_sites/zhongfali_single.py @@ -0,0 +1,432 @@ +#!/usr/bin/env python3 +import argparse +import hashlib +import json +import os +import re +import sys +import time +from typing import Dict, List, Optional, Set, Tuple + +import urllib3 + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +request_dir = os.path.join(project_root, "request") +if request_dir not in sys.path: + sys.path.insert(0, request_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +from Db import Db +from request.requests_client import RequestClientError, RequestsClient + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +SITE_NAME = "zhongfali_single" +LEGACY_DOMAIN = "众法利单页" +DEFAULT_URL = "http://m.zhongfali.com/h-pd-552.html#mid=3&groupId=196&desc=false" +DEFAULT_OUTPUT = "/www/wwwroot/lawyers/data/one_off_sites/zhongfali_records_all.jsonl" + +PHONE_RE = re.compile(r"1[3-9]\d{9}") +INITIAL_STATE_RE = re.compile(r"window\.__INITIAL_STATE__\s*=\s*(\{.*?\})", re.S) + + +def normalize_phone(text: str) -> str: + compact = re.sub(r"\D", "", text or "") + match = PHONE_RE.search(compact) + return match.group(0) if match else "" + + +def split_specialties(text: str) -> List[str]: + source = (text or "").strip() + if not source: + return [] + parts = [item.strip() for item in re.split(r"[、,,;;\s]+", source) if item.strip()] + seen: Set[str] = set() + result: List[str] = [] + for item in parts: + if item in seen: + continue + seen.add(item) + result.append(item) + return result + + +def strip_html(text: str) -> str: + cleaned = re.sub(r"<[^>]+>", " ", text or "") + cleaned = cleaned.replace(" ", " ") + cleaned = re.sub(r"\s+", " ", cleaned) + return cleaned.strip() + + +def extract_specialties_from_remark(remark: str) -> List[str]: + plain = strip_html(remark) + if not plain: + return [] + + match = re.search(r"专业领域[::]\s*([^。;]+)", plain) + if match: + return split_specialties(match.group(1)) + return [] + + +def value_at(values: List[str], index: int) -> str: + if index < 0 or index >= len(values): + return "" + return str(values[index] or "").strip() + + +def parse_initial_state(html: str) -> Dict: + match = INITIAL_STATE_RE.search(html) + if not match: + raise ValueError("未找到 window.__INITIAL_STATE__") + return json.loads(match.group(1)) + + +def extract_location_and_name(product_name: str) -> Tuple[str, str, str]: + text = re.sub(r"\s+", " ", product_name or "").strip() + province = "" + city = "" + lawyer_name = "" + + province_match = re.search(r"([\u4e00-\u9fa5]{2,}省)", text) + city_match = re.search(r"(?:[\u4e00-\u9fa5]+省)?([\u4e00-\u9fa5]+市)", text) + name_match = re.search(r"([\u4e00-\u9fa5]{2,4})\s*律师", text) + + if province_match: + province = province_match.group(1) + if city_match: + city = city_match.group(1) + if name_match: + lawyer_name = name_match.group(1) + + return province, city, lawyer_name + + +def pick_product_module(state: Dict) -> Optional[Dict]: + module_map = state.get("currentPageModuleIdMap", {}) or {} + page_ids = state.get("currentPageModuleIds", []) or [] + + for module_id in page_ids: + module = module_map.get(str(module_id)) or module_map.get(module_id) + if not isinstance(module, dict): + continue + ext_info = module.get("extInfo", {}) or {} + if ext_info.get("productInfo"): + return module + + for module in module_map.values(): + if not isinstance(module, dict): + continue + ext_info = module.get("extInfo", {}) or {} + if ext_info.get("productInfo"): + return module + + return None + + +def parse_group_id_from_url(url: str) -> int: + match = re.search(r"(?:[?&#]|^)groupId=(\d+)", url) + if not match: + return 0 + try: + return int(match.group(1)) + except ValueError: + return 0 + + +def extract_records(url: str, state: Dict) -> List[Dict]: + module = pick_product_module(state) + if not module: + return [] + + ext_info = module.get("extInfo", {}) or {} + product_info = ext_info.get("productInfo", {}) or {} + product_name = str(product_info.get("name") or "").strip() + + province, city, current_name = extract_location_and_name(product_name) + group_id = product_info.get("groupId") + if not group_id: + group_id = parse_group_id_from_url(url) + module_id = module.get("id") + + prop_map: Dict[str, List[str]] = {} + for prop in ext_info.get("propList", []) or []: + name = str(prop.get("name") or "").strip() + values = [str(item or "").strip() for item in (prop.get("valueList") or [])] + if name: + prop_map[name] = values + + result: List[Dict] = [] + seen_phones: Set[str] = set() + now = int(time.time()) + + phone_values = prop_map.get("电话", []) + for idx, raw_phone in enumerate(phone_values): + phone = normalize_phone(raw_phone) + if not phone or phone in seen_phones: + continue + seen_phones.add(phone) + + law_firm = value_at(prop_map.get("律师所", []), idx) + area = value_at(prop_map.get("所在地区", []), idx) + direction = value_at(prop_map.get("主攻方向", []), idx) + specialty_text = value_at(prop_map.get("专业特长", []), idx) + license_no = value_at(prop_map.get("执业证号", []), idx) + address = value_at(prop_map.get("地址", []), idx) + email = value_at(prop_map.get("电子邮箱", []), idx) + seat_phone = value_at(prop_map.get("座机", []), idx) + wechat = value_at(prop_map.get("微信", []), idx) + qq = value_at(prop_map.get("QQ", []), idx) + first_practice_date = value_at(prop_map.get("首次执业日期", []), idx) + + specialties = split_specialties(direction) + if not specialties: + specialties = split_specialties(specialty_text) + + record = { + "record_id": hashlib.md5(f"{url}|{phone}".encode("utf-8")).hexdigest(), + "collected_at": now, + "source": { + "site": SITE_NAME, + "list_url": url, + "detail_url": url, + "province": province, + "province_py": "", + "city": area or city, + "city_py": "", + "page": 1, + "group_id": group_id, + "module_id": module_id, + }, + "list_snapshot": { + "name": "", + "law_firm": law_firm, + "specialties": specialties, + "answer_count": None, + }, + "profile": { + "name": "", + "law_firm": law_firm, + "phone": phone, + "license_no": license_no, + "practice_years": None, + "email": email, + "address": address, + "specialties": specialties, + }, + "raw": { + "source_index": idx, + "direction": direction, + "specialty_text": specialty_text, + "seat_phone": seat_phone, + "wechat": wechat, + "qq": qq, + "first_practice_date": first_practice_date, + }, + } + result.append(record) + + current_phone = normalize_phone(str(product_info.get("material") or "")) + if current_phone and current_phone not in seen_phones: + seen_phones.add(current_phone) + remark = str(product_info.get("remark") or "") + specialties = extract_specialties_from_remark(remark) + result.append( + { + "record_id": hashlib.md5(f"{url}|{current_phone}".encode("utf-8")).hexdigest(), + "collected_at": now, + "source": { + "site": SITE_NAME, + "list_url": url, + "detail_url": url, + "province": province, + "province_py": "", + "city": city, + "city_py": "", + "page": 1, + "group_id": group_id, + "module_id": module_id, + }, + "list_snapshot": { + "name": current_name, + "law_firm": str(product_info.get("prop0") or "").strip(), + "specialties": specialties, + "answer_count": None, + }, + "profile": { + "name": current_name, + "law_firm": str(product_info.get("prop0") or "").strip(), + "phone": current_phone, + "license_no": str(product_info.get("prop1") or "").strip(), + "practice_years": None, + "email": "", + "address": str(product_info.get("prop3") or "").strip(), + "specialties": specialties, + }, + "raw": { + "from_product_info": True, + "product_name": product_name, + "remark": remark, + }, + } + ) + + return result + + +def to_legacy_row(record: Dict) -> Optional[Dict[str, str]]: + source = record.get("source", {}) or {} + profile = record.get("profile", {}) or {} + phone = normalize_phone(profile.get("phone", "")) + if not phone: + return None + + province = str(source.get("province") or "").strip() + city = str(source.get("city") or province).strip() + return { + "name": str(profile.get("name") or "").strip(), + "law_firm": str(profile.get("law_firm") or "").strip(), + "province": province, + "city": city, + "phone": phone, + "url": str(source.get("detail_url") or "").strip(), + "domain": LEGACY_DOMAIN, + "create_time": int(record.get("collected_at") or time.time()), + "params": json.dumps(record, ensure_ascii=False), + } + + +def existing_phones_in_db(db: Db, phones: List[str]) -> Set[str]: + deduped = sorted({phone for phone in phones if phone}) + if not deduped: + return set() + + existing: Set[str] = set() + cur = db.db.cursor() + try: + chunk_size = 500 + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cur.execute(sql, [LEGACY_DOMAIN, *chunk]) + for row in cur.fetchall(): + existing.add(row[0]) + finally: + cur.close() + return existing + + +def write_records_to_db(db: Db, records: List[Dict]) -> Tuple[int, int]: + rows: List[Dict[str, str]] = [] + for record in records: + row = to_legacy_row(record) + if row: + rows.append(row) + if not rows: + return 0, 0 + + existing = existing_phones_in_db(db, [row["phone"] for row in rows]) + inserted = 0 + skipped = 0 + + for row in rows: + phone = row.get("phone", "") + if not phone or phone in existing: + skipped += 1 + continue + try: + db.insert_data("lawyer", row) + existing.add(phone) + inserted += 1 + except Exception as exc: + skipped += 1 + print(f"[db] 插入失败 phone={phone}: {exc}") + return inserted, skipped + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="众法利单页律师电话采集") + parser.add_argument("--url", default=DEFAULT_URL, help="详情页 URL") + parser.add_argument("--output", default=DEFAULT_OUTPUT, help="输出 jsonl 文件路径") + parser.add_argument("--direct", action="store_true", help="直连模式,不使用代理") + parser.add_argument("--no-db", action="store_true", help="仅输出 JSON,不写入数据库") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + + client = RequestsClient( + headers={ + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 " + "Mobile/15E148 Safari/604.1" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Connection": "close", + }, + use_proxy=not args.direct, + retry_total=2, + retry_backoff_factor=1, + retry_status_forcelist=(429, 500, 502, 503, 504), + retry_allowed_methods=("GET",), + ) + + try: + resp = client.get_text(args.url, timeout=30, verify=False) + if resp.status_code >= 400: + raise RequestClientError(f"{resp.status_code} Error: {args.url}") + state = parse_initial_state(resp.text) + records = extract_records(args.url, state) + finally: + client.close() + + if not records: + print("[done] 未采集到有效手机号") + return + + seen_ids: Set[str] = set() + if os.path.exists(args.output): + with open(args.output, "r", encoding="utf-8") as old_file: + for line in old_file: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + except Exception: + continue + record_id = item.get("record_id") + if record_id: + seen_ids.add(record_id) + + json_new = 0 + with open(args.output, "a", encoding="utf-8") as out: + for record in records: + record_id = record["record_id"] + if record_id in seen_ids: + continue + out.write(json.dumps(record, ensure_ascii=False) + "\n") + seen_ids.add(record_id) + json_new += 1 + + db_new = 0 + db_skip = 0 + if not args.no_db: + with Db() as db: + db_new, db_skip = write_records_to_db(db, records) + + print( + f"[done] 采集{len(records)}条, JSON新增{json_new}条, " + f"DB新增{db_new}条, DB跳过{db_skip}条, 输出: {args.output}" + ) + + +if __name__ == "__main__": + main()