feat: add gaode crawler and export domain exclusion
This commit is contained in:
@@ -49,6 +49,11 @@ def parse_args() -> argparse.Namespace:
|
||||
default="",
|
||||
help="按 domain 过滤,例如:大律师 / 找法网 / 华律",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--exclude-domain",
|
||||
default="",
|
||||
help="排除指定 domain,例如:高德地图",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--province",
|
||||
default="",
|
||||
@@ -134,6 +139,9 @@ def build_query(args: argparse.Namespace) -> (str, List):
|
||||
if args.domain.strip() and not args.douyin_only:
|
||||
where.append("domain = %s")
|
||||
params.append(args.domain.strip())
|
||||
if args.exclude_domain.strip():
|
||||
where.append("domain <> %s")
|
||||
params.append(args.exclude_domain.strip())
|
||||
if args.province.strip():
|
||||
where.append("province = %s")
|
||||
params.append(args.province.strip())
|
||||
|
||||
@@ -0,0 +1,395 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import math
|
||||
import logging
|
||||
import re
|
||||
from typing import Dict, List, Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
# 添加项目根目录到系统路径(保留你的原逻辑)
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
project_root = os.path.dirname(current_dir)
|
||||
if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
from Db import Db # 你的 DB 封装
|
||||
|
||||
# logging 配置
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GaodeSpider:
|
||||
"""高德地图 API 商户手机号采集 - 重构版"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db_connection,
|
||||
api_key: Optional[str] = None,
|
||||
offset: int = 20,
|
||||
max_pages_per_city: int = 10,
|
||||
sleep_between_pages: float = 2.0,
|
||||
sleep_between_cities: float = 3.0,
|
||||
):
|
||||
self.db = db_connection
|
||||
self.api_key = (api_key or os.environ.get("AMAP_API_KEY", "")).strip()
|
||||
if not self.api_key:
|
||||
raise ValueError("AMAP_API_KEY environment variable is required")
|
||||
self.api_base = "https://restapi.amap.com/v3/place/text"
|
||||
self.offset = offset
|
||||
self.session = self._build_session()
|
||||
self.max_pages_per_city = max_pages_per_city
|
||||
self.sleep_between_pages = sleep_between_pages
|
||||
self.sleep_between_cities = sleep_between_cities
|
||||
|
||||
# 加载地区数据
|
||||
self.cities = self._load_area_data()
|
||||
|
||||
def _build_session(self) -> requests.Session:
|
||||
s = requests.Session()
|
||||
# Retry for idempotent errors (GET) and some server errors
|
||||
retries = Retry(
|
||||
total=3,
|
||||
backoff_factor=1,
|
||||
status_forcelist=(429, 500, 502, 503, 504),
|
||||
allowed_methods=frozenset(["GET", "POST"])
|
||||
)
|
||||
adapter = HTTPAdapter(max_retries=retries)
|
||||
s.mount("https://", adapter)
|
||||
s.mount("http://", adapter)
|
||||
s.headers.update({
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||||
})
|
||||
return s
|
||||
|
||||
def _load_area_data(self) -> Dict[int, Dict]:
|
||||
"""从数据库加载地区数据(保持与你原来表结构兼容)。
|
||||
要求 area_new 表含:id, code, city, province, pid, pinyin, domain, level
|
||||
仅加载 domain='findlaw' 且 level=2 的城市
|
||||
返回字典:{ city_id: {code, name, province, pid, pinyin} }
|
||||
"""
|
||||
try:
|
||||
rows = self.db.select_data("area_new", "id, code, city, province, pid, pinyin", "domain='maxlaw' AND level=2")
|
||||
result = {}
|
||||
for r in rows:
|
||||
cid = r.get("id")
|
||||
result[cid] = {
|
||||
"code": r.get("code") or "",
|
||||
"name": r.get("city") or "",
|
||||
"province": r.get("province") or "",
|
||||
"pid": r.get("pid"),
|
||||
"pinyin": r.get("pinyin") or ""
|
||||
}
|
||||
logger.info("加载城市数量: %d", len(result))
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.exception("从数据库加载地区数据失败: %s", e)
|
||||
return {}
|
||||
|
||||
def _search_gaode_api(self, keywords: str, city: str, page: int = 1) -> Dict:
|
||||
"""调用高德 API 搜索,返回整个 JSON 响应(或空 dict)"""
|
||||
params = {
|
||||
"keywords": keywords,
|
||||
"city": city,
|
||||
"offset": self.offset,
|
||||
"page": page,
|
||||
"key": self.api_key,
|
||||
"extensions": "all"
|
||||
}
|
||||
print(params)
|
||||
try:
|
||||
resp = self.session.get(self.api_base, params=params, timeout=15)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return data
|
||||
except (HTTPError, ConnectionError, Timeout) as e:
|
||||
logger.warning("高德 API 请求失败(%s %s page=%s): %s", keywords, city, page, e)
|
||||
return {}
|
||||
except ValueError as e:
|
||||
logger.error("高德 API 返回非 JSON 数据: %s", e)
|
||||
return {}
|
||||
|
||||
def _split_and_clean_phones(self, raw_tel: str) -> List[str]:
|
||||
"""把 raw tel 拆成候选号码并清洗"""
|
||||
if not raw_tel:
|
||||
return []
|
||||
|
||||
logger.debug("原始电话号码: %s", raw_tel)
|
||||
|
||||
# 常见分隔符 ; / , 、 | 空格
|
||||
parts = re.split(r"[;,/,、\|]+|\s+", raw_tel.strip())
|
||||
cleaned = []
|
||||
|
||||
for p in parts:
|
||||
if not p:
|
||||
continue
|
||||
|
||||
original_p = p
|
||||
# 移除括号内内容以及非数字和连字符
|
||||
p = re.sub(r"(.*?)|\(.*?\)|[^\d\-+]", "", p)
|
||||
# 有的号码含国际码 +86
|
||||
p = p.lstrip("+")
|
||||
# 移除前导 86(如果之后是11位)
|
||||
if p.startswith("86") and len(p) > 11:
|
||||
p = p[2:]
|
||||
# 最后移除短横线
|
||||
p = p.replace("-", "")
|
||||
|
||||
if p:
|
||||
cleaned.append(p)
|
||||
logger.debug("清洗后号码: %s -> %s", original_p, p)
|
||||
|
||||
logger.debug("清洗后共 %d 个号码: %s", len(cleaned), cleaned)
|
||||
return cleaned
|
||||
|
||||
def _is_valid_phone(self, phone: str) -> bool:
|
||||
"""验证手机号码,必须为11位且以1开头的手机号。"""
|
||||
if not phone:
|
||||
return False
|
||||
# 强制要求:11位且以1开头的手机号
|
||||
if re.fullmatch(r"1\d{10}", phone):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _extract_phones_from_poi(self, poi: Dict) -> List[str]:
|
||||
"""
|
||||
从 POI 数据中提取所有候选电话号码。
|
||||
优先查找 poi['business']['tel'](你的示例),并兼容早期可能的字段如 tel/phone。
|
||||
返回去重后且通过校验的号码列表。
|
||||
"""
|
||||
candidates = []
|
||||
|
||||
# 1) 优先查找 business.tel 字段(高德API的主要电话字段)
|
||||
business = poi.get("business") or {}
|
||||
tel = business.get("tel")
|
||||
if tel:
|
||||
logger.debug("从 business.tel 提取: %s", tel)
|
||||
candidates.extend(self._split_and_clean_phones(str(tel)))
|
||||
|
||||
# 2) 兼容旧结构:顶层 tel/phone/contact 等
|
||||
for key in ("tel", "phone", "contact", "business_area"):
|
||||
v = poi.get(key)
|
||||
if v:
|
||||
logger.debug("从 %s 提取: %s", key, v)
|
||||
candidates.extend(self._split_and_clean_phones(str(v)))
|
||||
|
||||
# 3) 审慎兼容:一些扩展字段可能也包含电话
|
||||
for nested_key in ("biz_ext", "ext", "attributes"):
|
||||
nested = poi.get(nested_key) or {}
|
||||
if isinstance(nested, dict):
|
||||
for subkey in ("tel", "phone", "contact"):
|
||||
if nested.get(subkey):
|
||||
logger.debug("从 %s.%s 提取: %s", nested_key, subkey, nested.get(subkey))
|
||||
candidates.extend(self._split_and_clean_phones(str(nested.get(subkey))))
|
||||
|
||||
# 去重并只保留合法号码(按 _is_valid_phone)
|
||||
unique = []
|
||||
for c in candidates:
|
||||
if c not in unique and self._is_valid_phone(c):
|
||||
unique.append(c)
|
||||
logger.debug("有效电话号码: %s", c)
|
||||
|
||||
logger.debug("POI %s 提取到 %d 个有效电话号码", poi.get("name", ""), len(unique))
|
||||
return unique
|
||||
|
||||
|
||||
def _is_duplicate(self, phone: str) -> bool:
|
||||
"""检查某个 phone 是否已存在(domain='高德地图')"""
|
||||
try:
|
||||
condition = f"phone='{phone}' AND domain='高德地图'"
|
||||
exists = self.db.is_data_exist("lawyer", condition)
|
||||
if exists:
|
||||
logger.debug("手机号已存在: %s (domain=高德地图)", phone)
|
||||
return exists
|
||||
except Exception as e:
|
||||
logger.exception("去重检查失败: %s", e)
|
||||
# 若出错,返回 True 以避免重复插入或脏数据
|
||||
return True
|
||||
|
||||
def _parse_poi_to_record(self, poi: Dict, city_info: Dict, province_info: Dict, used_phone: str) -> Dict:
|
||||
"""把单条 poi 转为数据库记录(针对某个已选的号码 used_phone)"""
|
||||
# 安全处理 shopinfo 字段,可能是字符串或字典
|
||||
shopinfo = poi.get("shopinfo")
|
||||
if isinstance(shopinfo, dict):
|
||||
law_firm = shopinfo.get("shop_name", "高德搜索")
|
||||
else:
|
||||
law_firm = "高德搜索"
|
||||
|
||||
record = {
|
||||
"name": poi.get("name", "").strip(),
|
||||
"phone": used_phone,
|
||||
"law_firm": law_firm,
|
||||
"province": province_info.get("name", ""),
|
||||
"city": city_info.get("name", ""),
|
||||
"url": poi.get("website", "") or "",
|
||||
"domain": "高德地图",
|
||||
"create_time": int(time.time()),
|
||||
"params": json.dumps({
|
||||
"address": poi.get("address", ""),
|
||||
"location": poi.get("location", ""),
|
||||
"type": poi.get("type", ""),
|
||||
"business_area": poi.get("business_area", ""),
|
||||
"raw_tel": poi.get("tel", "") or "",
|
||||
"raw_poi": poi
|
||||
}, ensure_ascii=False)
|
||||
}
|
||||
return record
|
||||
|
||||
def _save_lawyer(self, record: Dict) -> bool:
|
||||
"""存储律师信息到数据库"""
|
||||
try:
|
||||
self.db.insert_data("lawyer", record)
|
||||
logger.info("新增商户: %s (%s)", record.get("name"), record.get("phone"))
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.exception("存储失败: %s %s", record.get("name"), record.get("phone"))
|
||||
return False
|
||||
|
||||
def _search_city(self, keywords: str, city_info: Dict, province_info: Dict) -> int:
|
||||
"""在指定城市搜索并存储;返回新增条数"""
|
||||
# city 参数可以是 city code 或 city name;使用你存表里的 code 优先
|
||||
# city_code = city_info.get("code") or city_info.get("name")
|
||||
city_code = city_info.get("name")
|
||||
total_added = 0
|
||||
|
||||
# 先请求第一页拿到 count
|
||||
page = 1
|
||||
first_resp = self._search_gaode_api(keywords, city_code, page)
|
||||
if not first_resp:
|
||||
logger.info(" 未获取到第一页数据: %s", keywords)
|
||||
return 0
|
||||
|
||||
status = first_resp.get("status")
|
||||
if str(status) != "1":
|
||||
logger.warning(" 高德返回错误: %s", first_resp.get("info"))
|
||||
return 0
|
||||
|
||||
try:
|
||||
count = int(first_resp.get("count", 0))
|
||||
except Exception:
|
||||
count = 0
|
||||
# 计算总页数
|
||||
total_pages = math.ceil(count / self.offset) if count else 1
|
||||
total_pages = min(total_pages, self.max_pages_per_city)
|
||||
|
||||
logger.info(" 城市 %s 搜索到 count=%s, pages=%s (限制 %s)", city_code, count, total_pages, self.max_pages_per_city)
|
||||
|
||||
# 处理第一页 POIs
|
||||
def process_page(page_num: int, page_data: Dict) -> int:
|
||||
"""处理单页数据,返回新增条数"""
|
||||
nonlocal total_added
|
||||
if not page_data:
|
||||
logger.info(" page %s 未返回数据", page_num)
|
||||
return 0
|
||||
if str(page_data.get("status")) != "1":
|
||||
logger.warning(" page %s 返回状态非1: %s", page_num, page_data.get("info"))
|
||||
return 0
|
||||
|
||||
pois = page_data.get("pois") or []
|
||||
page_added = 0
|
||||
for poi in pois:
|
||||
name = (poi.get("name") or "").strip()
|
||||
if not name:
|
||||
continue
|
||||
phones = self._extract_phones_from_poi(poi)
|
||||
if not phones:
|
||||
logger.debug(" 跳过无电话: %s", name)
|
||||
continue
|
||||
for ph in phones:
|
||||
# 如果已存在跳过该号码
|
||||
if self._is_duplicate(ph):
|
||||
logger.debug(" 跳过已存在号码: %s (%s)", name, ph)
|
||||
continue
|
||||
rec = self._parse_poi_to_record(poi, city_info, province_info, ph)
|
||||
ok = self._save_lawyer(rec)
|
||||
if ok:
|
||||
page_added += 1
|
||||
total_added += 1
|
||||
# 如果该 POI 有多个号码,我们仍尝试插入其它号码(有用时)
|
||||
# end for phones
|
||||
# end for pois
|
||||
return page_added
|
||||
|
||||
# 先处理第一页
|
||||
first_page_added = process_page(1, first_resp)
|
||||
logger.info(" 城市 %s 第 1 页新增 %d 条", city_code, first_page_added)
|
||||
|
||||
# 记录已处理的页面,避免重复处理
|
||||
processed_pages = {1}
|
||||
|
||||
# 依次请求剩余页面,直到读完或无数据
|
||||
for page_num in range(2, total_pages + 1):
|
||||
if page_num in processed_pages:
|
||||
continue
|
||||
|
||||
time.sleep(self.sleep_between_pages)
|
||||
page_data = self._search_gaode_api(keywords, city_code, page_num)
|
||||
|
||||
if not page_data:
|
||||
logger.info(" 第 %s 页无响应数据,停止翻页", page_num)
|
||||
break
|
||||
|
||||
if str(page_data.get("status")) != "1":
|
||||
logger.info(" 第 %s 页状态异常(%s),停止翻页", page_num, page_data.get("info"))
|
||||
break
|
||||
|
||||
pois = page_data.get("pois") or []
|
||||
if not pois:
|
||||
logger.info(" 第 %s 页返回空pois,提前结束", page_num)
|
||||
break
|
||||
|
||||
page_added = process_page(page_num, page_data)
|
||||
logger.info(" 城市 %s 第 %s 页新增 %d 条", city_code, page_num, page_added)
|
||||
processed_pages.add(page_num)
|
||||
|
||||
# 如果结果数量不足一页,说明已经接近尾部
|
||||
if len(pois) < self.offset:
|
||||
logger.info(" 第 %s 页结果不足一页,推测已到尾页,提前结束", page_num)
|
||||
break
|
||||
|
||||
return total_added
|
||||
|
||||
def run(self):
|
||||
logger.info("启动高德地图律师信息采集...")
|
||||
if not self.cities:
|
||||
logger.error("未加载城市列表,退出")
|
||||
return
|
||||
|
||||
total_stored = 0
|
||||
keywords_suffix = "律所"
|
||||
|
||||
for city_id, city_info in self.cities.items():
|
||||
try:
|
||||
province_info = {"name": city_info.get("province", "")}
|
||||
city_name = city_info.get("name", "")
|
||||
if not city_name:
|
||||
continue
|
||||
search_keywords = f"{keywords_suffix}"
|
||||
added = self._search_city(search_keywords, city_info, province_info)
|
||||
total_stored += added
|
||||
logger.info("城市 %s 完成,新增 %d 条,总计 %d", city_name, added, total_stored)
|
||||
time.sleep(self.sleep_between_cities)
|
||||
except Exception as e:
|
||||
logger.exception("处理城市 %s 时出错: %s", city_info.get("name", ""), e)
|
||||
|
||||
logger.info("采集完成,共新增 %d 条商户信息。", total_stored)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 运行示例
|
||||
with Db() as db:
|
||||
spider = GaodeSpider(db)
|
||||
spider.run()
|
||||
Reference in New Issue
Block a user