Files
2026-04-28 17:33:51 +08:00

402 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import json
import math
import logging
import re
from typing import Dict, List, Optional
from urllib.parse import urlencode
import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
from urllib3.util.retry import Retry
# 添加项目根目录到系统路径(保留你的原逻辑)
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir)
if project_root not in sys.path:
sys.path.append(project_root)
from Db import Db # 你的 DB 封装
import config as project_config
# logging 配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
class GaodeSpider:
"""高德地图 API 商户手机号采集 - 重构版"""
def __init__(
self,
db_connection,
api_key: Optional[str] = None,
offset: int = 20,
max_pages_per_city: int = 10,
sleep_between_pages: float = 2.0,
sleep_between_cities: float = 3.0,
):
self.db = db_connection
config_api_key = ""
gaode_config = getattr(project_config, "GAODE_CONFIG", None)
if isinstance(gaode_config, dict):
config_api_key = str(gaode_config.get("API_KEY", "")).strip()
self.api_key = (api_key or os.environ.get("AMAP_API_KEY", "") or config_api_key).strip()
if not self.api_key:
raise ValueError("高德 API Key 未配置,请在 config.py 的 GAODE_CONFIG.API_KEY 或环境变量 AMAP_API_KEY 中填写")
self.api_base = "https://restapi.amap.com/v3/place/text"
self.offset = offset
self.session = self._build_session()
self.max_pages_per_city = max_pages_per_city
self.sleep_between_pages = sleep_between_pages
self.sleep_between_cities = sleep_between_cities
# 加载地区数据
self.cities = self._load_area_data()
def _build_session(self) -> requests.Session:
s = requests.Session()
# Retry for idempotent errors (GET) and some server errors
retries = Retry(
total=3,
backoff_factor=1,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=frozenset(["GET", "POST"])
)
adapter = HTTPAdapter(max_retries=retries)
s.mount("https://", adapter)
s.mount("http://", adapter)
s.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
})
return s
def _load_area_data(self) -> Dict[int, Dict]:
"""从数据库加载地区数据(保持与你原来表结构兼容)。
要求 area_new 表含:id, code, city, province, pid, pinyin, domain, level
仅加载 domain='findlaw' 且 level=2 的城市
返回字典:{ city_id: {code, name, province, pid, pinyin} }
"""
try:
rows = self.db.select_data("area_new", "id, code, city, province, pid, pinyin", "domain='maxlaw' AND level=2")
result = {}
for r in rows:
cid = r.get("id")
result[cid] = {
"code": r.get("code") or "",
"name": r.get("city") or "",
"province": r.get("province") or "",
"pid": r.get("pid"),
"pinyin": r.get("pinyin") or ""
}
logger.info("加载城市数量: %d", len(result))
return result
except Exception as e:
logger.exception("从数据库加载地区数据失败: %s", e)
return {}
def _search_gaode_api(self, keywords: str, city: str, page: int = 1) -> Dict:
"""调用高德 API 搜索,返回整个 JSON 响应(或空 dict)"""
params = {
"keywords": keywords,
"city": city,
"offset": self.offset,
"page": page,
"key": self.api_key,
"extensions": "all"
}
print(params)
try:
resp = self.session.get(self.api_base, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
return data
except (HTTPError, ConnectionError, Timeout) as e:
logger.warning("高德 API 请求失败(%s %s page=%s): %s", keywords, city, page, e)
return {}
except ValueError as e:
logger.error("高德 API 返回非 JSON 数据: %s", e)
return {}
def _split_and_clean_phones(self, raw_tel: str) -> List[str]:
"""把 raw tel 拆成候选号码并清洗"""
if not raw_tel:
return []
logger.debug("原始电话号码: %s", raw_tel)
# 常见分隔符 ; / , 、 | 空格
parts = re.split(r"[;,/,、\|]+|\s+", raw_tel.strip())
cleaned = []
for p in parts:
if not p:
continue
original_p = p
# 移除括号内内容以及非数字和连字符
p = re.sub(r".*?|\(.*?\)|[^\d\-+]", "", p)
# 有的号码含国际码 +86
p = p.lstrip("+")
# 移除前导 86(如果之后是11位)
if p.startswith("86") and len(p) > 11:
p = p[2:]
# 最后移除短横线
p = p.replace("-", "")
if p:
cleaned.append(p)
logger.debug("清洗后号码: %s -> %s", original_p, p)
logger.debug("清洗后共 %d 个号码: %s", len(cleaned), cleaned)
return cleaned
def _is_valid_phone(self, phone: str) -> bool:
"""验证手机号码,必须为11位且以1开头的手机号。"""
if not phone:
return False
# 强制要求:11位且以1开头的手机号
if re.fullmatch(r"1\d{10}", phone):
return True
return False
def _extract_phones_from_poi(self, poi: Dict) -> List[str]:
"""
从 POI 数据中提取所有候选电话号码。
优先查找 poi['business']['tel'](你的示例),并兼容早期可能的字段如 tel/phone。
返回去重后且通过校验的号码列表。
"""
candidates = []
# 1) 优先查找 business.tel 字段(高德API的主要电话字段)
business = poi.get("business") or {}
tel = business.get("tel")
if tel:
logger.debug("从 business.tel 提取: %s", tel)
candidates.extend(self._split_and_clean_phones(str(tel)))
# 2) 兼容旧结构:顶层 tel/phone/contact 等
for key in ("tel", "phone", "contact", "business_area"):
v = poi.get(key)
if v:
logger.debug("%s 提取: %s", key, v)
candidates.extend(self._split_and_clean_phones(str(v)))
# 3) 审慎兼容:一些扩展字段可能也包含电话
for nested_key in ("biz_ext", "ext", "attributes"):
nested = poi.get(nested_key) or {}
if isinstance(nested, dict):
for subkey in ("tel", "phone", "contact"):
if nested.get(subkey):
logger.debug("%s.%s 提取: %s", nested_key, subkey, nested.get(subkey))
candidates.extend(self._split_and_clean_phones(str(nested.get(subkey))))
# 去重并只保留合法号码(按 _is_valid_phone
unique = []
for c in candidates:
if c not in unique and self._is_valid_phone(c):
unique.append(c)
logger.debug("有效电话号码: %s", c)
logger.debug("POI %s 提取到 %d 个有效电话号码", poi.get("name", ""), len(unique))
return unique
def _is_duplicate(self, phone: str) -> bool:
"""检查某个 phone 是否已存在(domain='高德地图'"""
try:
condition = f"phone='{phone}' AND domain='高德地图'"
exists = self.db.is_data_exist("lawyer", condition)
if exists:
logger.debug("手机号已存在: %s (domain=高德地图)", phone)
return exists
except Exception as e:
logger.exception("去重检查失败: %s", e)
# 若出错,返回 True 以避免重复插入或脏数据
return True
def _parse_poi_to_record(self, poi: Dict, city_info: Dict, province_info: Dict, used_phone: str) -> Dict:
"""把单条 poi 转为数据库记录(针对某个已选的号码 used_phone)"""
# 安全处理 shopinfo 字段,可能是字符串或字典
shopinfo = poi.get("shopinfo")
if isinstance(shopinfo, dict):
law_firm = shopinfo.get("shop_name", "高德搜索")
else:
law_firm = "高德搜索"
record = {
"name": poi.get("name", "").strip(),
"phone": used_phone,
"law_firm": law_firm,
"province": province_info.get("name", ""),
"city": city_info.get("name", ""),
"url": poi.get("website", "") or "",
"domain": "高德地图",
"create_time": int(time.time()),
"params": json.dumps({
"address": poi.get("address", ""),
"location": poi.get("location", ""),
"type": poi.get("type", ""),
"business_area": poi.get("business_area", ""),
"raw_tel": poi.get("tel", "") or "",
"raw_poi": poi
}, ensure_ascii=False)
}
return record
def _save_lawyer(self, record: Dict) -> bool:
"""存储律师信息到数据库"""
try:
self.db.insert_data("lawyer", record)
logger.info("新增商户: %s (%s)", record.get("name"), record.get("phone"))
return True
except Exception as e:
logger.exception("存储失败: %s %s", record.get("name"), record.get("phone"))
return False
def _search_city(self, keywords: str, city_info: Dict, province_info: Dict) -> int:
"""在指定城市搜索并存储;返回新增条数"""
# city 参数可以是 city code 或 city name;使用你存表里的 code 优先
# city_code = city_info.get("code") or city_info.get("name")
city_code = city_info.get("name")
total_added = 0
# 先请求第一页拿到 count
page = 1
first_resp = self._search_gaode_api(keywords, city_code, page)
if not first_resp:
logger.info(" 未获取到第一页数据: %s", keywords)
return 0
status = first_resp.get("status")
if str(status) != "1":
logger.warning(" 高德返回错误: %s", first_resp.get("info"))
return 0
try:
count = int(first_resp.get("count", 0))
except Exception:
count = 0
# 计算总页数
total_pages = math.ceil(count / self.offset) if count else 1
total_pages = min(total_pages, self.max_pages_per_city)
logger.info(" 城市 %s 搜索到 count=%s, pages=%s (限制 %s)", city_code, count, total_pages, self.max_pages_per_city)
# 处理第一页 POIs
def process_page(page_num: int, page_data: Dict) -> int:
"""处理单页数据,返回新增条数"""
nonlocal total_added
if not page_data:
logger.info(" page %s 未返回数据", page_num)
return 0
if str(page_data.get("status")) != "1":
logger.warning(" page %s 返回状态非1: %s", page_num, page_data.get("info"))
return 0
pois = page_data.get("pois") or []
page_added = 0
for poi in pois:
name = (poi.get("name") or "").strip()
if not name:
continue
phones = self._extract_phones_from_poi(poi)
if not phones:
logger.debug(" 跳过无电话: %s", name)
continue
for ph in phones:
# 如果已存在跳过该号码
if self._is_duplicate(ph):
logger.debug(" 跳过已存在号码: %s (%s)", name, ph)
continue
rec = self._parse_poi_to_record(poi, city_info, province_info, ph)
ok = self._save_lawyer(rec)
if ok:
page_added += 1
total_added += 1
# 如果该 POI 有多个号码,我们仍尝试插入其它号码(有用时)
# end for phones
# end for pois
return page_added
# 先处理第一页
first_page_added = process_page(1, first_resp)
logger.info(" 城市 %s 第 1 页新增 %d", city_code, first_page_added)
# 记录已处理的页面,避免重复处理
processed_pages = {1}
# 依次请求剩余页面,直到读完或无数据
for page_num in range(2, total_pages + 1):
if page_num in processed_pages:
continue
time.sleep(self.sleep_between_pages)
page_data = self._search_gaode_api(keywords, city_code, page_num)
if not page_data:
logger.info("%s 页无响应数据,停止翻页", page_num)
break
if str(page_data.get("status")) != "1":
logger.info("%s 页状态异常(%s),停止翻页", page_num, page_data.get("info"))
break
pois = page_data.get("pois") or []
if not pois:
logger.info("%s 页返回空pois,提前结束", page_num)
break
page_added = process_page(page_num, page_data)
logger.info(" 城市 %s%s 页新增 %d", city_code, page_num, page_added)
processed_pages.add(page_num)
# 如果结果数量不足一页,说明已经接近尾部
if len(pois) < self.offset:
logger.info("%s 页结果不足一页,推测已到尾页,提前结束", page_num)
break
return total_added
def run(self):
logger.info("启动高德地图律师信息采集...")
if not self.cities:
logger.error("未加载城市列表,退出")
return
total_stored = 0
keywords_suffix = "律师"
for city_id, city_info in self.cities.items():
try:
province_info = {"name": city_info.get("province", "")}
city_name = city_info.get("name", "")
if not city_name:
continue
search_keywords = f"{keywords_suffix}"
added = self._search_city(search_keywords, city_info, province_info)
total_stored += added
logger.info("城市 %s 完成,新增 %d 条,总计 %d", city_name, added, total_stored)
time.sleep(self.sleep_between_cities)
except Exception as e:
logger.exception("处理城市 %s 时出错: %s", city_info.get("name", ""), e)
logger.info("采集完成,共新增 %d 条商户信息。", total_stored)
if __name__ == "__main__":
# 运行示例
with Db() as db:
spider = GaodeSpider(db)
spider.run()