From 7d5f5b1054d72e67dc037ceb2a9dfb7757fcc0a8 Mon Sep 17 00:00:00 2001 From: hello-dd-code Date: Fri, 20 Mar 2026 10:07:48 +0800 Subject: [PATCH] feat: add gaode crawler and export domain exclusion --- common_sites/export_lawyers_excel.py | 8 + gaode.py | 395 +++++++++++++++++++++++++++ 2 files changed, 403 insertions(+) create mode 100644 gaode.py diff --git a/common_sites/export_lawyers_excel.py b/common_sites/export_lawyers_excel.py index 46e9d11..9e2dd7b 100644 --- a/common_sites/export_lawyers_excel.py +++ b/common_sites/export_lawyers_excel.py @@ -49,6 +49,11 @@ def parse_args() -> argparse.Namespace: default="", help="按 domain 过滤,例如:大律师 / 找法网 / 华律", ) + parser.add_argument( + "--exclude-domain", + default="", + help="排除指定 domain,例如:高德地图", + ) parser.add_argument( "--province", default="", @@ -134,6 +139,9 @@ def build_query(args: argparse.Namespace) -> (str, List): if args.domain.strip() and not args.douyin_only: where.append("domain = %s") params.append(args.domain.strip()) + if args.exclude_domain.strip(): + where.append("domain <> %s") + params.append(args.exclude_domain.strip()) if args.province.strip(): where.append("province = %s") params.append(args.province.strip()) diff --git a/gaode.py b/gaode.py new file mode 100644 index 0000000..8ed32f0 --- /dev/null +++ b/gaode.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import sys +import time +import json +import math +import logging +import re +from typing import Dict, List, Optional +from urllib.parse import urlencode + +import requests +from requests.adapters import HTTPAdapter +from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout +from urllib3.util.retry import Retry + +# 添加项目根目录到系统路径(保留你的原逻辑) +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +from Db import Db # 你的 DB 封装 + +# logging 配置 +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", +) +logger = logging.getLogger(__name__) + + +class GaodeSpider: + """高德地图 API 商户手机号采集 - 重构版""" + + def __init__( + self, + db_connection, + api_key: Optional[str] = None, + offset: int = 20, + max_pages_per_city: int = 10, + sleep_between_pages: float = 2.0, + sleep_between_cities: float = 3.0, + ): + self.db = db_connection + self.api_key = (api_key or os.environ.get("AMAP_API_KEY", "")).strip() + if not self.api_key: + raise ValueError("AMAP_API_KEY environment variable is required") + self.api_base = "https://restapi.amap.com/v3/place/text" + self.offset = offset + self.session = self._build_session() + self.max_pages_per_city = max_pages_per_city + self.sleep_between_pages = sleep_between_pages + self.sleep_between_cities = sleep_between_cities + + # 加载地区数据 + self.cities = self._load_area_data() + + def _build_session(self) -> requests.Session: + s = requests.Session() + # Retry for idempotent errors (GET) and some server errors + retries = Retry( + total=3, + backoff_factor=1, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=frozenset(["GET", "POST"]) + ) + adapter = HTTPAdapter(max_retries=retries) + s.mount("https://", adapter) + s.mount("http://", adapter) + s.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + }) + return s + + def _load_area_data(self) -> Dict[int, Dict]: + """从数据库加载地区数据(保持与你原来表结构兼容)。 + 要求 area_new 表含:id, code, city, province, pid, pinyin, domain, level + 仅加载 domain='findlaw' 且 level=2 的城市 + 返回字典:{ city_id: {code, name, province, pid, pinyin} } + """ + try: + rows = self.db.select_data("area_new", "id, code, city, province, pid, pinyin", "domain='maxlaw' AND level=2") + result = {} + for r in rows: + cid = r.get("id") + result[cid] = { + "code": r.get("code") or "", + "name": r.get("city") or "", + "province": r.get("province") or "", + "pid": r.get("pid"), + "pinyin": r.get("pinyin") or "" + } + logger.info("加载城市数量: %d", len(result)) + return result + except Exception as e: + logger.exception("从数据库加载地区数据失败: %s", e) + return {} + + def _search_gaode_api(self, keywords: str, city: str, page: int = 1) -> Dict: + """调用高德 API 搜索,返回整个 JSON 响应(或空 dict)""" + params = { + "keywords": keywords, + "city": city, + "offset": self.offset, + "page": page, + "key": self.api_key, + "extensions": "all" + } + print(params) + try: + resp = self.session.get(self.api_base, params=params, timeout=15) + resp.raise_for_status() + data = resp.json() + return data + except (HTTPError, ConnectionError, Timeout) as e: + logger.warning("高德 API 请求失败(%s %s page=%s): %s", keywords, city, page, e) + return {} + except ValueError as e: + logger.error("高德 API 返回非 JSON 数据: %s", e) + return {} + + def _split_and_clean_phones(self, raw_tel: str) -> List[str]: + """把 raw tel 拆成候选号码并清洗""" + if not raw_tel: + return [] + + logger.debug("原始电话号码: %s", raw_tel) + + # 常见分隔符 ; / , 、 | 空格 + parts = re.split(r"[;,/,、\|]+|\s+", raw_tel.strip()) + cleaned = [] + + for p in parts: + if not p: + continue + + original_p = p + # 移除括号内内容以及非数字和连字符 + p = re.sub(r"(.*?)|\(.*?\)|[^\d\-+]", "", p) + # 有的号码含国际码 +86 + p = p.lstrip("+") + # 移除前导 86(如果之后是11位) + if p.startswith("86") and len(p) > 11: + p = p[2:] + # 最后移除短横线 + p = p.replace("-", "") + + if p: + cleaned.append(p) + logger.debug("清洗后号码: %s -> %s", original_p, p) + + logger.debug("清洗后共 %d 个号码: %s", len(cleaned), cleaned) + return cleaned + + def _is_valid_phone(self, phone: str) -> bool: + """验证手机号码,必须为11位且以1开头的手机号。""" + if not phone: + return False + # 强制要求:11位且以1开头的手机号 + if re.fullmatch(r"1\d{10}", phone): + return True + return False + + def _extract_phones_from_poi(self, poi: Dict) -> List[str]: + """ + 从 POI 数据中提取所有候选电话号码。 + 优先查找 poi['business']['tel'](你的示例),并兼容早期可能的字段如 tel/phone。 + 返回去重后且通过校验的号码列表。 + """ + candidates = [] + + # 1) 优先查找 business.tel 字段(高德API的主要电话字段) + business = poi.get("business") or {} + tel = business.get("tel") + if tel: + logger.debug("从 business.tel 提取: %s", tel) + candidates.extend(self._split_and_clean_phones(str(tel))) + + # 2) 兼容旧结构:顶层 tel/phone/contact 等 + for key in ("tel", "phone", "contact", "business_area"): + v = poi.get(key) + if v: + logger.debug("从 %s 提取: %s", key, v) + candidates.extend(self._split_and_clean_phones(str(v))) + + # 3) 审慎兼容:一些扩展字段可能也包含电话 + for nested_key in ("biz_ext", "ext", "attributes"): + nested = poi.get(nested_key) or {} + if isinstance(nested, dict): + for subkey in ("tel", "phone", "contact"): + if nested.get(subkey): + logger.debug("从 %s.%s 提取: %s", nested_key, subkey, nested.get(subkey)) + candidates.extend(self._split_and_clean_phones(str(nested.get(subkey)))) + + # 去重并只保留合法号码(按 _is_valid_phone) + unique = [] + for c in candidates: + if c not in unique and self._is_valid_phone(c): + unique.append(c) + logger.debug("有效电话号码: %s", c) + + logger.debug("POI %s 提取到 %d 个有效电话号码", poi.get("name", ""), len(unique)) + return unique + + + def _is_duplicate(self, phone: str) -> bool: + """检查某个 phone 是否已存在(domain='高德地图')""" + try: + condition = f"phone='{phone}' AND domain='高德地图'" + exists = self.db.is_data_exist("lawyer", condition) + if exists: + logger.debug("手机号已存在: %s (domain=高德地图)", phone) + return exists + except Exception as e: + logger.exception("去重检查失败: %s", e) + # 若出错,返回 True 以避免重复插入或脏数据 + return True + + def _parse_poi_to_record(self, poi: Dict, city_info: Dict, province_info: Dict, used_phone: str) -> Dict: + """把单条 poi 转为数据库记录(针对某个已选的号码 used_phone)""" + # 安全处理 shopinfo 字段,可能是字符串或字典 + shopinfo = poi.get("shopinfo") + if isinstance(shopinfo, dict): + law_firm = shopinfo.get("shop_name", "高德搜索") + else: + law_firm = "高德搜索" + + record = { + "name": poi.get("name", "").strip(), + "phone": used_phone, + "law_firm": law_firm, + "province": province_info.get("name", ""), + "city": city_info.get("name", ""), + "url": poi.get("website", "") or "", + "domain": "高德地图", + "create_time": int(time.time()), + "params": json.dumps({ + "address": poi.get("address", ""), + "location": poi.get("location", ""), + "type": poi.get("type", ""), + "business_area": poi.get("business_area", ""), + "raw_tel": poi.get("tel", "") or "", + "raw_poi": poi + }, ensure_ascii=False) + } + return record + + def _save_lawyer(self, record: Dict) -> bool: + """存储律师信息到数据库""" + try: + self.db.insert_data("lawyer", record) + logger.info("新增商户: %s (%s)", record.get("name"), record.get("phone")) + return True + except Exception as e: + logger.exception("存储失败: %s %s", record.get("name"), record.get("phone")) + return False + + def _search_city(self, keywords: str, city_info: Dict, province_info: Dict) -> int: + """在指定城市搜索并存储;返回新增条数""" + # city 参数可以是 city code 或 city name;使用你存表里的 code 优先 + # city_code = city_info.get("code") or city_info.get("name") + city_code = city_info.get("name") + total_added = 0 + + # 先请求第一页拿到 count + page = 1 + first_resp = self._search_gaode_api(keywords, city_code, page) + if not first_resp: + logger.info(" 未获取到第一页数据: %s", keywords) + return 0 + + status = first_resp.get("status") + if str(status) != "1": + logger.warning(" 高德返回错误: %s", first_resp.get("info")) + return 0 + + try: + count = int(first_resp.get("count", 0)) + except Exception: + count = 0 + # 计算总页数 + total_pages = math.ceil(count / self.offset) if count else 1 + total_pages = min(total_pages, self.max_pages_per_city) + + logger.info(" 城市 %s 搜索到 count=%s, pages=%s (限制 %s)", city_code, count, total_pages, self.max_pages_per_city) + + # 处理第一页 POIs + def process_page(page_num: int, page_data: Dict) -> int: + """处理单页数据,返回新增条数""" + nonlocal total_added + if not page_data: + logger.info(" page %s 未返回数据", page_num) + return 0 + if str(page_data.get("status")) != "1": + logger.warning(" page %s 返回状态非1: %s", page_num, page_data.get("info")) + return 0 + + pois = page_data.get("pois") or [] + page_added = 0 + for poi in pois: + name = (poi.get("name") or "").strip() + if not name: + continue + phones = self._extract_phones_from_poi(poi) + if not phones: + logger.debug(" 跳过无电话: %s", name) + continue + for ph in phones: + # 如果已存在跳过该号码 + if self._is_duplicate(ph): + logger.debug(" 跳过已存在号码: %s (%s)", name, ph) + continue + rec = self._parse_poi_to_record(poi, city_info, province_info, ph) + ok = self._save_lawyer(rec) + if ok: + page_added += 1 + total_added += 1 + # 如果该 POI 有多个号码,我们仍尝试插入其它号码(有用时) + # end for phones + # end for pois + return page_added + + # 先处理第一页 + first_page_added = process_page(1, first_resp) + logger.info(" 城市 %s 第 1 页新增 %d 条", city_code, first_page_added) + + # 记录已处理的页面,避免重复处理 + processed_pages = {1} + + # 依次请求剩余页面,直到读完或无数据 + for page_num in range(2, total_pages + 1): + if page_num in processed_pages: + continue + + time.sleep(self.sleep_between_pages) + page_data = self._search_gaode_api(keywords, city_code, page_num) + + if not page_data: + logger.info(" 第 %s 页无响应数据,停止翻页", page_num) + break + + if str(page_data.get("status")) != "1": + logger.info(" 第 %s 页状态异常(%s),停止翻页", page_num, page_data.get("info")) + break + + pois = page_data.get("pois") or [] + if not pois: + logger.info(" 第 %s 页返回空pois,提前结束", page_num) + break + + page_added = process_page(page_num, page_data) + logger.info(" 城市 %s 第 %s 页新增 %d 条", city_code, page_num, page_added) + processed_pages.add(page_num) + + # 如果结果数量不足一页,说明已经接近尾部 + if len(pois) < self.offset: + logger.info(" 第 %s 页结果不足一页,推测已到尾页,提前结束", page_num) + break + + return total_added + + def run(self): + logger.info("启动高德地图律师信息采集...") + if not self.cities: + logger.error("未加载城市列表,退出") + return + + total_stored = 0 + keywords_suffix = "律所" + + for city_id, city_info in self.cities.items(): + try: + province_info = {"name": city_info.get("province", "")} + city_name = city_info.get("name", "") + if not city_name: + continue + search_keywords = f"{keywords_suffix}" + added = self._search_city(search_keywords, city_info, province_info) + total_stored += added + logger.info("城市 %s 完成,新增 %d 条,总计 %d", city_name, added, total_stored) + time.sleep(self.sleep_between_cities) + except Exception as e: + logger.exception("处理城市 %s 时出错: %s", city_info.get("name", ""), e) + + logger.info("采集完成,共新增 %d 条商户信息。", total_stored) + + +if __name__ == "__main__": + # 运行示例 + with Db() as db: + spider = GaodeSpider(db) + spider.run()