402 lines
16 KiB
Python
402 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import math
|
||
import logging
|
||
import re
|
||
from typing import Dict, List, Optional
|
||
from urllib.parse import urlencode
|
||
|
||
import requests
|
||
from requests.adapters import HTTPAdapter
|
||
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
|
||
from urllib3.util.retry import Retry
|
||
|
||
# 添加项目根目录到系统路径(保留你的原逻辑)
|
||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||
project_root = os.path.dirname(current_dir)
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
from Db import Db # 你的 DB 封装
|
||
import config as project_config
|
||
|
||
# logging 配置
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class GaodeSpider:
|
||
"""高德地图 API 商户手机号采集 - 重构版"""
|
||
|
||
def __init__(
|
||
self,
|
||
db_connection,
|
||
api_key: Optional[str] = None,
|
||
offset: int = 20,
|
||
max_pages_per_city: int = 10,
|
||
sleep_between_pages: float = 2.0,
|
||
sleep_between_cities: float = 3.0,
|
||
):
|
||
self.db = db_connection
|
||
config_api_key = ""
|
||
gaode_config = getattr(project_config, "GAODE_CONFIG", None)
|
||
if isinstance(gaode_config, dict):
|
||
config_api_key = str(gaode_config.get("API_KEY", "")).strip()
|
||
|
||
self.api_key = (api_key or os.environ.get("AMAP_API_KEY", "") or config_api_key).strip()
|
||
if not self.api_key:
|
||
raise ValueError("高德 API Key 未配置,请在 config.py 的 GAODE_CONFIG.API_KEY 或环境变量 AMAP_API_KEY 中填写")
|
||
self.api_base = "https://restapi.amap.com/v3/place/text"
|
||
self.offset = offset
|
||
self.session = self._build_session()
|
||
self.max_pages_per_city = max_pages_per_city
|
||
self.sleep_between_pages = sleep_between_pages
|
||
self.sleep_between_cities = sleep_between_cities
|
||
|
||
# 加载地区数据
|
||
self.cities = self._load_area_data()
|
||
|
||
def _build_session(self) -> requests.Session:
|
||
s = requests.Session()
|
||
# Retry for idempotent errors (GET) and some server errors
|
||
retries = Retry(
|
||
total=3,
|
||
backoff_factor=1,
|
||
status_forcelist=(429, 500, 502, 503, 504),
|
||
allowed_methods=frozenset(["GET", "POST"])
|
||
)
|
||
adapter = HTTPAdapter(max_retries=retries)
|
||
s.mount("https://", adapter)
|
||
s.mount("http://", adapter)
|
||
s.headers.update({
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||
})
|
||
return s
|
||
|
||
def _load_area_data(self) -> Dict[int, Dict]:
|
||
"""从数据库加载地区数据(保持与你原来表结构兼容)。
|
||
要求 area_new 表含:id, code, city, province, pid, pinyin, domain, level
|
||
仅加载 domain='findlaw' 且 level=2 的城市
|
||
返回字典:{ city_id: {code, name, province, pid, pinyin} }
|
||
"""
|
||
try:
|
||
rows = self.db.select_data("area_new", "id, code, city, province, pid, pinyin", "domain='maxlaw' AND level=2")
|
||
result = {}
|
||
for r in rows:
|
||
cid = r.get("id")
|
||
result[cid] = {
|
||
"code": r.get("code") or "",
|
||
"name": r.get("city") or "",
|
||
"province": r.get("province") or "",
|
||
"pid": r.get("pid"),
|
||
"pinyin": r.get("pinyin") or ""
|
||
}
|
||
logger.info("加载城市数量: %d", len(result))
|
||
return result
|
||
except Exception as e:
|
||
logger.exception("从数据库加载地区数据失败: %s", e)
|
||
return {}
|
||
|
||
def _search_gaode_api(self, keywords: str, city: str, page: int = 1) -> Dict:
|
||
"""调用高德 API 搜索,返回整个 JSON 响应(或空 dict)"""
|
||
params = {
|
||
"keywords": keywords,
|
||
"city": city,
|
||
"offset": self.offset,
|
||
"page": page,
|
||
"key": self.api_key,
|
||
"extensions": "all"
|
||
}
|
||
print(params)
|
||
try:
|
||
resp = self.session.get(self.api_base, params=params, timeout=15)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
return data
|
||
except (HTTPError, ConnectionError, Timeout) as e:
|
||
logger.warning("高德 API 请求失败(%s %s page=%s): %s", keywords, city, page, e)
|
||
return {}
|
||
except ValueError as e:
|
||
logger.error("高德 API 返回非 JSON 数据: %s", e)
|
||
return {}
|
||
|
||
def _split_and_clean_phones(self, raw_tel: str) -> List[str]:
|
||
"""把 raw tel 拆成候选号码并清洗"""
|
||
if not raw_tel:
|
||
return []
|
||
|
||
logger.debug("原始电话号码: %s", raw_tel)
|
||
|
||
# 常见分隔符 ; / , 、 | 空格
|
||
parts = re.split(r"[;,/,、\|]+|\s+", raw_tel.strip())
|
||
cleaned = []
|
||
|
||
for p in parts:
|
||
if not p:
|
||
continue
|
||
|
||
original_p = p
|
||
# 移除括号内内容以及非数字和连字符
|
||
p = re.sub(r"(.*?)|\(.*?\)|[^\d\-+]", "", p)
|
||
# 有的号码含国际码 +86
|
||
p = p.lstrip("+")
|
||
# 移除前导 86(如果之后是11位)
|
||
if p.startswith("86") and len(p) > 11:
|
||
p = p[2:]
|
||
# 最后移除短横线
|
||
p = p.replace("-", "")
|
||
|
||
if p:
|
||
cleaned.append(p)
|
||
logger.debug("清洗后号码: %s -> %s", original_p, p)
|
||
|
||
logger.debug("清洗后共 %d 个号码: %s", len(cleaned), cleaned)
|
||
return cleaned
|
||
|
||
def _is_valid_phone(self, phone: str) -> bool:
|
||
"""验证手机号码,必须为11位且以1开头的手机号。"""
|
||
if not phone:
|
||
return False
|
||
# 强制要求:11位且以1开头的手机号
|
||
if re.fullmatch(r"1\d{10}", phone):
|
||
return True
|
||
return False
|
||
|
||
def _extract_phones_from_poi(self, poi: Dict) -> List[str]:
|
||
"""
|
||
从 POI 数据中提取所有候选电话号码。
|
||
优先查找 poi['business']['tel'](你的示例),并兼容早期可能的字段如 tel/phone。
|
||
返回去重后且通过校验的号码列表。
|
||
"""
|
||
candidates = []
|
||
|
||
# 1) 优先查找 business.tel 字段(高德API的主要电话字段)
|
||
business = poi.get("business") or {}
|
||
tel = business.get("tel")
|
||
if tel:
|
||
logger.debug("从 business.tel 提取: %s", tel)
|
||
candidates.extend(self._split_and_clean_phones(str(tel)))
|
||
|
||
# 2) 兼容旧结构:顶层 tel/phone/contact 等
|
||
for key in ("tel", "phone", "contact", "business_area"):
|
||
v = poi.get(key)
|
||
if v:
|
||
logger.debug("从 %s 提取: %s", key, v)
|
||
candidates.extend(self._split_and_clean_phones(str(v)))
|
||
|
||
# 3) 审慎兼容:一些扩展字段可能也包含电话
|
||
for nested_key in ("biz_ext", "ext", "attributes"):
|
||
nested = poi.get(nested_key) or {}
|
||
if isinstance(nested, dict):
|
||
for subkey in ("tel", "phone", "contact"):
|
||
if nested.get(subkey):
|
||
logger.debug("从 %s.%s 提取: %s", nested_key, subkey, nested.get(subkey))
|
||
candidates.extend(self._split_and_clean_phones(str(nested.get(subkey))))
|
||
|
||
# 去重并只保留合法号码(按 _is_valid_phone)
|
||
unique = []
|
||
for c in candidates:
|
||
if c not in unique and self._is_valid_phone(c):
|
||
unique.append(c)
|
||
logger.debug("有效电话号码: %s", c)
|
||
|
||
logger.debug("POI %s 提取到 %d 个有效电话号码", poi.get("name", ""), len(unique))
|
||
return unique
|
||
|
||
|
||
def _is_duplicate(self, phone: str) -> bool:
|
||
"""检查某个 phone 是否已存在(domain='高德地图')"""
|
||
try:
|
||
condition = f"phone='{phone}' AND domain='高德地图'"
|
||
exists = self.db.is_data_exist("lawyer", condition)
|
||
if exists:
|
||
logger.debug("手机号已存在: %s (domain=高德地图)", phone)
|
||
return exists
|
||
except Exception as e:
|
||
logger.exception("去重检查失败: %s", e)
|
||
# 若出错,返回 True 以避免重复插入或脏数据
|
||
return True
|
||
|
||
def _parse_poi_to_record(self, poi: Dict, city_info: Dict, province_info: Dict, used_phone: str) -> Dict:
|
||
"""把单条 poi 转为数据库记录(针对某个已选的号码 used_phone)"""
|
||
# 安全处理 shopinfo 字段,可能是字符串或字典
|
||
shopinfo = poi.get("shopinfo")
|
||
if isinstance(shopinfo, dict):
|
||
law_firm = shopinfo.get("shop_name", "高德搜索")
|
||
else:
|
||
law_firm = "高德搜索"
|
||
|
||
record = {
|
||
"name": poi.get("name", "").strip(),
|
||
"phone": used_phone,
|
||
"law_firm": law_firm,
|
||
"province": province_info.get("name", ""),
|
||
"city": city_info.get("name", ""),
|
||
"url": poi.get("website", "") or "",
|
||
"domain": "高德地图",
|
||
"create_time": int(time.time()),
|
||
"params": json.dumps({
|
||
"address": poi.get("address", ""),
|
||
"location": poi.get("location", ""),
|
||
"type": poi.get("type", ""),
|
||
"business_area": poi.get("business_area", ""),
|
||
"raw_tel": poi.get("tel", "") or "",
|
||
"raw_poi": poi
|
||
}, ensure_ascii=False)
|
||
}
|
||
return record
|
||
|
||
def _save_lawyer(self, record: Dict) -> bool:
|
||
"""存储律师信息到数据库"""
|
||
try:
|
||
self.db.insert_data("lawyer", record)
|
||
logger.info("新增商户: %s (%s)", record.get("name"), record.get("phone"))
|
||
return True
|
||
except Exception as e:
|
||
logger.exception("存储失败: %s %s", record.get("name"), record.get("phone"))
|
||
return False
|
||
|
||
def _search_city(self, keywords: str, city_info: Dict, province_info: Dict) -> int:
|
||
"""在指定城市搜索并存储;返回新增条数"""
|
||
# city 参数可以是 city code 或 city name;使用你存表里的 code 优先
|
||
# city_code = city_info.get("code") or city_info.get("name")
|
||
city_code = city_info.get("name")
|
||
total_added = 0
|
||
|
||
# 先请求第一页拿到 count
|
||
page = 1
|
||
first_resp = self._search_gaode_api(keywords, city_code, page)
|
||
if not first_resp:
|
||
logger.info(" 未获取到第一页数据: %s", keywords)
|
||
return 0
|
||
|
||
status = first_resp.get("status")
|
||
if str(status) != "1":
|
||
logger.warning(" 高德返回错误: %s", first_resp.get("info"))
|
||
return 0
|
||
|
||
try:
|
||
count = int(first_resp.get("count", 0))
|
||
except Exception:
|
||
count = 0
|
||
# 计算总页数
|
||
total_pages = math.ceil(count / self.offset) if count else 1
|
||
total_pages = min(total_pages, self.max_pages_per_city)
|
||
|
||
logger.info(" 城市 %s 搜索到 count=%s, pages=%s (限制 %s)", city_code, count, total_pages, self.max_pages_per_city)
|
||
|
||
# 处理第一页 POIs
|
||
def process_page(page_num: int, page_data: Dict) -> int:
|
||
"""处理单页数据,返回新增条数"""
|
||
nonlocal total_added
|
||
if not page_data:
|
||
logger.info(" page %s 未返回数据", page_num)
|
||
return 0
|
||
if str(page_data.get("status")) != "1":
|
||
logger.warning(" page %s 返回状态非1: %s", page_num, page_data.get("info"))
|
||
return 0
|
||
|
||
pois = page_data.get("pois") or []
|
||
page_added = 0
|
||
for poi in pois:
|
||
name = (poi.get("name") or "").strip()
|
||
if not name:
|
||
continue
|
||
phones = self._extract_phones_from_poi(poi)
|
||
if not phones:
|
||
logger.debug(" 跳过无电话: %s", name)
|
||
continue
|
||
for ph in phones:
|
||
# 如果已存在跳过该号码
|
||
if self._is_duplicate(ph):
|
||
logger.debug(" 跳过已存在号码: %s (%s)", name, ph)
|
||
continue
|
||
rec = self._parse_poi_to_record(poi, city_info, province_info, ph)
|
||
ok = self._save_lawyer(rec)
|
||
if ok:
|
||
page_added += 1
|
||
total_added += 1
|
||
# 如果该 POI 有多个号码,我们仍尝试插入其它号码(有用时)
|
||
# end for phones
|
||
# end for pois
|
||
return page_added
|
||
|
||
# 先处理第一页
|
||
first_page_added = process_page(1, first_resp)
|
||
logger.info(" 城市 %s 第 1 页新增 %d 条", city_code, first_page_added)
|
||
|
||
# 记录已处理的页面,避免重复处理
|
||
processed_pages = {1}
|
||
|
||
# 依次请求剩余页面,直到读完或无数据
|
||
for page_num in range(2, total_pages + 1):
|
||
if page_num in processed_pages:
|
||
continue
|
||
|
||
time.sleep(self.sleep_between_pages)
|
||
page_data = self._search_gaode_api(keywords, city_code, page_num)
|
||
|
||
if not page_data:
|
||
logger.info(" 第 %s 页无响应数据,停止翻页", page_num)
|
||
break
|
||
|
||
if str(page_data.get("status")) != "1":
|
||
logger.info(" 第 %s 页状态异常(%s),停止翻页", page_num, page_data.get("info"))
|
||
break
|
||
|
||
pois = page_data.get("pois") or []
|
||
if not pois:
|
||
logger.info(" 第 %s 页返回空pois,提前结束", page_num)
|
||
break
|
||
|
||
page_added = process_page(page_num, page_data)
|
||
logger.info(" 城市 %s 第 %s 页新增 %d 条", city_code, page_num, page_added)
|
||
processed_pages.add(page_num)
|
||
|
||
# 如果结果数量不足一页,说明已经接近尾部
|
||
if len(pois) < self.offset:
|
||
logger.info(" 第 %s 页结果不足一页,推测已到尾页,提前结束", page_num)
|
||
break
|
||
|
||
return total_added
|
||
|
||
def run(self):
|
||
logger.info("启动高德地图律师信息采集...")
|
||
if not self.cities:
|
||
logger.error("未加载城市列表,退出")
|
||
return
|
||
|
||
total_stored = 0
|
||
keywords_suffix = "律师"
|
||
|
||
for city_id, city_info in self.cities.items():
|
||
try:
|
||
province_info = {"name": city_info.get("province", "")}
|
||
city_name = city_info.get("name", "")
|
||
if not city_name:
|
||
continue
|
||
search_keywords = f"{keywords_suffix}"
|
||
added = self._search_city(search_keywords, city_info, province_info)
|
||
total_stored += added
|
||
logger.info("城市 %s 完成,新增 %d 条,总计 %d", city_name, added, total_stored)
|
||
time.sleep(self.sleep_between_cities)
|
||
except Exception as e:
|
||
logger.exception("处理城市 %s 时出错: %s", city_info.get("name", ""), e)
|
||
|
||
logger.info("采集完成,共新增 %d 条商户信息。", total_stored)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 运行示例
|
||
with Db() as db:
|
||
spider = GaodeSpider(db)
|
||
spider.run()
|