diff --git a/README.md b/README.md index 8180f51..f9d0fe2 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,79 @@ python3 -m venv .venv ./common_sites/start.sh ``` +## 地区同步服务(Python) + +新增服务脚本:`services/area_sync_service.py` + +用途: + +- 替代原 `nas.nepiedg.site:9002` 的核心接口 +- `GET /api/layer/get_area`:从数据库 `area_new` 读取地区列表并返回给 `js/douyin.js` +- `POST /api/layer/index`:接收脚本回传搜索数据,先保存原始 JSON 到本地,再按参数决定是否入库 + +`/api/layer/index` 当前入库规则(基于 `payload.data.user_list[].user_info`): + +- 主要从 `signature`(简介)里正则提取手机号 +- 若简介未命中,再从微信相关标记(`微信/wx/vx/v`)和 `unique_id/versatile_display` 提取手机号 +- `url` 固定写为 `https://www.douyin.com/user/{sec_uid}`(`sec_uid` 为空时回退接口 URL) + +启动: + +```bash +cd /www/wwwroot/lawyers +./.venv/bin/python ./services/area_sync_service.py +``` + +常用环境变量: + +```bash +AREA_SERVICE_HOST=0.0.0.0 +AREA_SERVICE_PORT=9002 +AREA_TARGET_TABLE=area_new +AREA_DOMAIN=maxlaw +DOUYIN_DOMAIN=抖音 +DOUYIN_RAW_DIR=/www/wwwroot/lawyers/data/douyin_raw +DOUYIN_SAVE_ONLY=1 +``` + +接口示例: + +```bash +# 健康检查 +curl 'http://127.0.0.1:9002/health' + +# 读取数据库中的地区(默认直接返回数组,兼容 js/douyin.js) +curl 'http://127.0.0.1:9002/api/layer/get_area?server=1' + +# 如果需要带统计信息 +curl 'http://127.0.0.1:9002/api/layer/get_area?table=area_new&domain=maxlaw&meta=1' + +# 接收 douyin.js 回传结果并入库(默认写 lawyer.domain=抖音) +curl -X POST 'http://127.0.0.1:9002/api/layer/index?server=1&save_only=0' \ + -H 'Content-Type: application/json' \ + -d '{"source":"xhr","url":"https://www.douyin.com/aweme/v1/web/discover/search/","ts":1772811111,"cityIndex":0,"data":{"desc":"联系方式 13812345678"}}' + +# 可选:指定写入域名(用于测试) +curl -X POST 'http://127.0.0.1:9002/api/layer/index?save_domain=codex_test_douyin' \ + -H 'Content-Type: application/json' \ + -d '{"source":"xhr","url":"https://www.douyin.com/aweme/v1/web/discover/search/","ts":1772811111,"cityIndex":0,"data":{"desc":"联系方式 13812345678"}}' + +# 仅保存原始回传(不入库) +curl -X POST 'http://127.0.0.1:9002/api/layer/index?save_only=1' \ + -H 'Content-Type: application/json' \ + -d '{"source":"xhr","url":"https://www.douyin.com/aweme/v1/web/discover/search/","ts":1772811111,"cityIndex":0,"data":{"desc":"联系方式 13812345678"}}' + +# 原始数据落盘目录(按天分文件) +# /www/wwwroot/lawyers/data/douyin_raw/douyin_index_YYYYMMDD.jsonl +``` + +如果 9002 端口已有旧进程占用,可先执行: + +```bash +lsof -iTCP:9002 -sTCP:LISTEN -t +kill +``` + ## 启动参数 `start.sh` 默认并行启动 5 个站点采集(大律师使用 `dls_fresh.py`)。 diff --git a/js/douyin.js b/js/douyin.js new file mode 100644 index 0000000..545cfb9 --- /dev/null +++ b/js/douyin.js @@ -0,0 +1,535 @@ +// ==UserScript== +// @name Douyin Batch City Search + AutoScroll + Capture +// @namespace http://tampermonkey.net/ +// @version 1.0 +// @description 从 Python 服务获取地区列表,按 city + "律师" 搜索并自动下滑,拦截 /aweme/v1/web/discover/search/ 返回并转发到入库接口。 +// @author You +// @match https://www.douyin.com/* +// @grant GM_xmlhttpRequest +// @connect * +// @run-at document-idle +// ==/UserScript== + +(function () { + 'use strict'; + + /********************* 配置区(按需修改) *********************/ + const API_BASE = 'http://127.0.0.1:9002'; // 改成你部署 Python 服务的地址,例如 http://nas.nepiedg.site:9002 + const AREA_API = `${API_BASE}/api/layer/get_area?server=1`; // 获取城市列表的接口 + const SEND_TARGETS = [ + `${API_BASE}/api/layer/index?server=1&save_only=0` + ]; + + // 搜索框与按钮选择器(根据页面更新) + const SEARCH_INPUT_SELECTORS = [ + 'input[data-e2e="search-input"]', + 'input[data-e2e="searchbar-input"]', + 'form[data-e2e="searchbar"] input', + 'input[placeholder*="搜索"]' + ]; + const SEARCH_BTN_SELECTORS = [ + '[data-e2e="search-button"]', + 'button[data-e2e="search-button"]', + 'span[data-e2e="search-button"]', + 'button[data-e2e="searchbar-button"]', + 'span.btn-title' + ]; + + // 每个城市搜索时的自动下滑配置 + const SCROLL_INTERVAL_MS = 2000; + const MAX_STABLE_COUNT = 6; + const MAX_SCROLLS_PER_CITY = 120; + const SCROLL_BY = 2200; + const WAIT_AFTER_SEARCH_MS = 1000; + const DELAY_BETWEEN_CITIES_MS = 1500; + + // 可选:如果希望只发送包含手机号的条目,可在此启用并调整正则 + const ONLY_SEND_IF_HAS_PHONE = false; + const PHONE_REGEX = /(?:\+?86)?1[3-9]\d{9}/g; + + /********************* 运行时状态 *********************/ + let areaList = []; + let stopFlag = false; // 由 UI 控制,true 表示停止整个任务 + let currentCityIndex = -1; + let inputEl = null; + let btnEl = null; + + // 节流/去重发送 + let lastSentHash = null; + let lastSentAt = 0; + const SEND_MIN_INTERVAL_MS = 800; + + /********************* 工具函数 *********************/ + function log(...args) { console.log('[DouyinBatch] ', ...args); } + function err(...args) { console.error('[DouyinBatch] ', ...args); } + + function hashString(str) { + let h = 2166136261 >>> 0; + for (let i = 0; i < str.length; i++) { + h ^= str.charCodeAt(i); + h = Math.imul(h, 16777619) >>> 0; + } + return h.toString(16); + } + + function gmGetJson(url) { + return new Promise((resolve, reject) => { + GM_xmlhttpRequest({ + method: 'GET', + url, + onload(res) { + try { + const json = JSON.parse(res.responseText); + resolve(json); + } catch (e) { + reject(e); + } + }, + onerror(err) { reject(err); } + }); + }); + } + + function setNativeValue(el, value) { + if (!el) return; + const prototype = el.constructor && el.constructor.prototype ? el.constructor.prototype : window.HTMLInputElement && window.HTMLInputElement.prototype; + const descriptor = prototype ? Object.getOwnPropertyDescriptor(prototype, 'value') : null; + if (descriptor && descriptor.set) { + descriptor.set.call(el, value); + } else { + el.value = value; + } + } + + async function simulateSearchInput(keyword) { + if (!inputEl) return; + try { + inputEl.focus(); + inputEl.dispatchEvent(new Event('focus', { bubbles: false })); + + // 清空旧值并触发事件 + if (inputEl.value) { + setNativeValue(inputEl, ''); + if (typeof InputEvent === 'function') { + inputEl.dispatchEvent(new InputEvent('input', { bubbles: true, inputType: 'deleteContentBackward', data: '' })); + } else { + inputEl.dispatchEvent(new Event('input', { bubbles: true })); + } + } + + setNativeValue(inputEl, keyword); + if (typeof InputEvent === 'function') { + inputEl.dispatchEvent(new InputEvent('beforeinput', { bubbles: true, inputType: 'insertText', data: keyword })); + inputEl.dispatchEvent(new InputEvent('input', { bubbles: true, inputType: 'insertText', data: keyword })); + } else { + inputEl.dispatchEvent(new Event('input', { bubbles: true })); + } + inputEl.dispatchEvent(new Event('change', { bubbles: true })); + inputEl.dispatchEvent(new Event('blur', { bubbles: false })); + } catch (e) { + err('simulateSearchInput error', e); + } + await new Promise(r => setTimeout(r, 80)); + } + + function simulateSearchTrigger() { + let triggered = false; + if (btnEl && btnEl.isConnected) { + try { + btnEl.focus(); + btnEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true, cancelable: true, view: window })); + btnEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true, cancelable: true, view: window })); + btnEl.dispatchEvent(new MouseEvent('click', { bubbles: true, cancelable: true, view: window })); + triggered = true; + } catch (e) { + err('simulateSearchTrigger click error', e); + } + } + + if (!triggered && inputEl) { + try { + const opts = { bubbles: true, cancelable: true, key: 'Enter', code: 'Enter', keyCode: 13, which: 13 }; + inputEl.dispatchEvent(new KeyboardEvent('keydown', opts)); + inputEl.dispatchEvent(new KeyboardEvent('keypress', opts)); + inputEl.dispatchEvent(new KeyboardEvent('keyup', opts)); + triggered = true; + } catch (e) { + err('Enter 触发搜索失败', e); + } + } + + return triggered; + } + + function sendToTargets(data) { + try { + const body = typeof data === 'string' ? data : JSON.stringify(data); + if (ONLY_SEND_IF_HAS_PHONE) { + if (!PHONE_REGEX.test(body)) { + // 未匹配手机号则跳过发送 + return; + } + } + const hash = hashString(body); + const now = Date.now(); + if (hash === lastSentHash && now - lastSentAt < SEND_MIN_INTERVAL_MS) { + return; + } + lastSentHash = hash; + lastSentAt = now; + + for (const target of SEND_TARGETS) { + GM_xmlhttpRequest({ + method: 'POST', + url: target, + headers: { 'Content-Type': 'application/json' }, + data: body, + onload(res) { log(`sent -> ${target}, status: ${res.status}`); }, + onerror(e) { err(`send error to ${target}`, e); } + }); + } + } catch (e) { + err('sendToTargets error', e); + } + } + + /********************* 拦截 fetch 与 XHR(捕获目标接口返回) *********************/ + const TARGET_PATH = '/aweme/v1/web/discover/search/'; + + (function interceptFetch() { + if (!window.fetch) return; + const orig = window.fetch.bind(window); + window.fetch = function (...args) { + try { + const resource = args[0]; + const url = (typeof resource === 'string') ? resource : (resource && resource.url) ? resource.url : ''; + if (url && url.includes(TARGET_PATH)) { + return orig(...args).then((response) => { + try { + const cloned = response.clone(); + cloned.json().then((json) => { + if (json && typeof json === 'object') { + sendToTargets({ source: 'fetch', url, data: json, ts: Date.now(), cityIndex: currentCityIndex }); + } + }).catch(()=>{}); + } catch (e) { /* ignore */ } + return response; + }); + } + } catch (e) { err('fetch wrapper error', e); } + return orig(...args); + }; + })(); + + (function interceptXHR() { + const XHR = window.XMLHttpRequest; + if (!XHR) return; + const origOpen = XHR.prototype.open; + const origSend = XHR.prototype.send; + + XHR.prototype.open = function (method, url, ...rest) { + try { this.__dm_url = (typeof url === 'string') ? url : ''; } catch(e){} + return origOpen.apply(this, [method, url, ...rest]); + }; + + XHR.prototype.send = function (body) { + try { + const targetUrl = this.__dm_url || ''; + if (targetUrl && targetUrl.includes(TARGET_PATH)) { + this.addEventListener('readystatechange', function () { + if (this.readyState === 4) { + try { + const text = this.responseText; + if (!text) return; + try { + const json = JSON.parse(text); + sendToTargets({ source: 'xhr', url: targetUrl, data: json, ts: Date.now(), cityIndex: currentCityIndex }); + } catch (err) { + // 非 json 忽略 + } + } catch (e) { /* ignore */ } + } + }); + } + } catch (e) { err('XHR wrapper error', e); } + return origSend.apply(this, [body]); + }; + })(); + + /********************* 自动下滑函数(单次搜索) *********************/ + async function autoScrollUntilStable(statusNode, maxScrolls = MAX_SCROLLS_PER_CITY) { + let lastHeight = -1; + let stableCount = 0; + let scrolls = 0; + + while (!stopFlag) { + scrolls++; + if (scrolls > maxScrolls) { + statusNode.textContent = `达到单次搜索最大滚动 ${maxScrolls},停止本次自动下滑。`; + break; + } + + // 执行滚动 + try { + window.scrollBy({ top: SCROLL_BY, left: 0, behavior: 'smooth' }); + } catch (e) { + window.scrollTo(0, (document.body.scrollHeight || document.documentElement.scrollHeight)); + } + + await new Promise(r => setTimeout(r, SCROLL_INTERVAL_MS)); + + const curHeight = document.body.scrollHeight || document.documentElement.scrollHeight || 0; + if (curHeight === lastHeight) { + stableCount++; + } else { + stableCount = 0; + lastHeight = curHeight; + } + + statusNode.textContent = `滚动次数: ${scrolls}, 稳定计数: ${stableCount}/${MAX_STABLE_COUNT}`; + + if (stableCount >= MAX_STABLE_COUNT) { + statusNode.textContent = `页面高度稳定 (${stableCount}), 本次搜索加载结束。`; + break; + } + } + } + + /********************* 页面元素辅助:等待元素出现 *********************/ + function waitForSelector(selector, timeout = 10000) { + const selectors = Array.isArray(selector) ? selector.filter(Boolean) : [selector]; + return new Promise((resolve, reject) => { + let timer; + const root = document.documentElement || document.body; + + const cleanup = (observer) => { + try { observer && observer.disconnect(); } catch (_) {} + if (timer) clearTimeout(timer); + }; + + const pick = () => { + for (const sel of selectors) { + if (!sel) continue; + try { + const found = document.querySelector(sel); + if (found) { + return found; + } + } catch (e) { + err('query selector error', sel, e); + } + } + return null; + }; + + const immediate = pick(); + if (immediate) { + return resolve(immediate); + } + + const observer = new MutationObserver(() => { + const node = pick(); + if (node) { + cleanup(observer); + resolve(node); + } + }); + + if (root) { + observer.observe(root, { childList: true, subtree: true }); + } + + timer = setTimeout(() => { + cleanup(observer); + reject(new Error('timeout waiting for ' + selectors.join(', '))); + }, timeout); + }); + } + + async function ensureSearchControls(statusNode) { + const isConnected = (node) => { + if (!node) return false; + try { + if (node.isConnected !== undefined) return node.isConnected; + return document.contains(node); + } catch (_) { + return false; + } + }; + + if (!isConnected(inputEl)) inputEl = null; + if (!isConnected(btnEl)) btnEl = null; + + if (!inputEl) { + statusNode && (statusNode.textContent = '等待搜索输入框可用...'); + inputEl = await waitForSelector(SEARCH_INPUT_SELECTORS, 10000); + } + + if (!btnEl) { + try { + statusNode && (statusNode.textContent = '等待搜索按钮可用...'); + btnEl = await waitForSelector(SEARCH_BTN_SELECTORS, 8000); + if (btnEl && btnEl.tagName !== 'BUTTON') { + const maybeButton = btnEl.closest('button'); + if (maybeButton) btnEl = maybeButton; + } + } catch (e) { + btnEl = null; + err('未找到搜索按钮,将使用 Enter 键进行触发。'); + } + } + + if (!inputEl) { + throw new Error('未定位到搜索输入框'); + } + + return { inputEl, btnEl }; + } + + /********************* UI 控制(右下角) *********************/ + function createUI() { + const css = ` + #dm-batch-btn { position: fixed; right: 12px; bottom: 12px; z-index:999999; background: rgba(0,0,0,0.65); color:#fff; + padding:8px 10px; border-radius:8px; font-size:13px; cursor:pointer; user-select:none;} + #dm-batch-status { position: fixed; right:12px; bottom:56px; z-index:999999; background: rgba(0,0,0,0.45); color:#fff; + padding:6px 8px; border-radius:6px; font-size:12px; max-width:320px; word-break:break-word;} + `; + const s = document.createElement('style'); s.textContent = css; document.head && document.head.appendChild(s); + + const btn = document.createElement('div'); + btn.id = 'dm-batch-btn'; + btn.textContent = 'BatchSearch:停止'; + btn.dataset.running = '1'; + document.body.appendChild(btn); + + const status = document.createElement('div'); + status.id = 'dm-batch-status'; + status.textContent = '准备中...'; + document.body.appendChild(status); + + btn.addEventListener('click', () => { + const running = btn.dataset.running === '1'; + btn.dataset.running = running ? '0' : '1'; + btn.textContent = running ? 'BatchSearch:已停止' : 'BatchSearch:停止'; + status.textContent = running ? '已手动停止' : '已启动'; + stopFlag = running; // if was running and clicked -> set stopFlag true; if restarting, set false + if (!stopFlag) { + // restart loop if needed + runBatchSearchLoop(status).catch(e => err(e)); + } + }); + + return { btn, status }; + } + + /********************* 主流程:获取城市并循环搜索 *********************/ + async function runBatchSearchLoop(statusNode) { + try { + stopFlag = (document.getElementById('dm-batch-btn') && document.getElementById('dm-batch-btn').dataset.running === '0'); + // 获取 area list(仅在内存为空时获取) + if (!areaList || !Array.isArray(areaList) || areaList.length === 0) { + statusNode.textContent = '正在获取城市列表...'; + try { + const data = await gmGetJson(AREA_API); + const normalizedAreaList = Array.isArray(data) + ? data + : (data && Array.isArray(data.data) ? data.data : []); + + if (normalizedAreaList.length > 0) { + areaList = normalizedAreaList; + log('获取城市列表数量:', areaList.length); + statusNode.textContent = `获取到 ${areaList.length} 个城市,准备开始循环。`; + } else { + err('area API returned not array', data); + statusNode.textContent = '获取城市列表失败(返回格式异常)'; + return; + } + } catch (e) { + err('获取城市列表失败', e); + statusNode.textContent = '获取城市列表失败: ' + e.message; + return; + } + } + + // 等待搜索输入与按钮可用 + try { + await ensureSearchControls(statusNode); + } catch (e) { + err('未找到搜索输入或按钮', e); + statusNode.textContent = '未找到搜索输入或按钮,脚本仍会监听接口,但无法自动搜索。'; + return; + } + + // 主循环:对每个 city 执行搜索 -> 下滑 -> 发送结果 -> 下一 city + for (let i = 0; i < areaList.length; i++) { + if (stopFlag) { statusNode.textContent = '已停止'; break; } + currentCityIndex = i; + const city = (areaList[i].city || areaList[i].province || '').trim(); + if (!city) continue; + const keyword = `${city}律师`; + statusNode.textContent = `正在搜索:${keyword} (${i+1}/${areaList.length})`; + log(`开始城市[${i+1}/${areaList.length}] 搜索:`, keyword); + + // 将搜索词放入输入框 (触发 input 事件) + try { + await ensureSearchControls(statusNode); + } catch (e) { + err('刷新搜索控件失败', e); + statusNode.textContent = '刷新搜索控件失败,终止批量搜索。'; + break; + } + + await simulateSearchInput(keyword); + + const triggered = simulateSearchTrigger(); + if (!triggered) { + statusNode.textContent = '搜索触发失败,尝试刷新控件...'; + btnEl = null; + await ensureSearchControls(statusNode); + if (!simulateSearchTrigger()) { + statusNode.textContent = '搜索触发失败,终止批量搜索。'; + break; + } + } + + // 等待搜索结果开始加载 + await new Promise(r => setTimeout(r, WAIT_AFTER_SEARCH_MS)); + + // 自动下滑直到稳定或达到上限 + await autoScrollUntilStable(statusNode, MAX_SCROLLS_PER_CITY); + + if (stopFlag) { statusNode.textContent = '已停止'; break; } + + // 等待短暂间隔再进行下一个城市 + statusNode.textContent = `完成 ${keyword} 的加载,等待 ${DELAY_BETWEEN_CITIES_MS} ms 后继续...`; + await new Promise(r => setTimeout(r, DELAY_BETWEEN_CITIES_MS)); + } + + statusNode.textContent = '批量搜索完成或已停止。'; + log('批量搜索循环结束'); + } catch (e) { + err('runBatchSearchLoop error', e); + } + } + + /********************* 启动脚本 *********************/ + (function init() { + const ui = createUI(); + ui.status.textContent = '就绪 - 点击右下按钮可停止/重启批量搜索'; + console.log(location.pathname) + // 如果当前为目标页面(/jingxuan/search/),则自动启动;否则仍可在任何页面打开并手动启动。 + const isAutoPage = location.pathname && location.pathname.indexOf('/search/') !== -1; + if (isAutoPage) { + ui.status.textContent = '检测到 /jingxuan/search/ 页面,准备开始批量搜索...'; + // 给页面一点时间加载必要脚本与 dom + setTimeout(() => { + runBatchSearchLoop(ui.status).catch(e => err(e)); + }, 800); + } else { + // 非目标页面,仍可手动点击按钮(按钮初始化为运行状态,点击色变为已停止) + ui.status.textContent = '非 /jingxuan/search/ 页面。导航至该页面或手动控制开始。'; + } + })(); + + })(); + diff --git a/services/area_sync_service.py b/services/area_sync_service.py new file mode 100644 index 0000000..592a576 --- /dev/null +++ b/services/area_sync_service.py @@ -0,0 +1,589 @@ +import json +import os +import re +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any, Dict, Iterable, List, Set, Tuple +from urllib.parse import parse_qs, urlparse + +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(current_dir) +if project_root not in sys.path: + sys.path.append(project_root) + +from Db import Db + + +AREA_TABLE = os.getenv("AREA_TARGET_TABLE", "area_new") +AREA_DOMAIN = os.getenv("AREA_DOMAIN", "maxlaw") +DOUYIN_DOMAIN = os.getenv("DOUYIN_DOMAIN", "抖音") +DOUYIN_RAW_DIR = os.getenv("DOUYIN_RAW_DIR", os.path.join(project_root, "data", "douyin_raw")) +DOUYIN_SAVE_ONLY_ENV = os.getenv("DOUYIN_SAVE_ONLY", "1") +SERVICE_HOST = os.getenv("AREA_SERVICE_HOST", "0.0.0.0") +SERVICE_PORT = int(os.getenv("AREA_SERVICE_PORT", "9002")) + +PHONE_REGEX = re.compile(r"(?:\+?86[-\s]?)?(1[3-9]\d{9})") +WX_CONTEXT_REGEX = re.compile(r"(?i)(?:微信|微.?信|wx|vx|weixin|v信|v号|v)\s*[::/\-\s]\s*([a-zA-Z0-9._-]{3,40})") +LAW_FIRM_REGEX = re.compile(r"([\u4e00-\u9fa5A-Za-z·]{2,40}律师事务所)") +RAW_WRITE_LOCK = threading.Lock() + + +def _is_safe_table_name(table_name: str) -> bool: + return bool(re.fullmatch(r"[A-Za-z0-9_]+", table_name or "")) + + +def _parse_int(value: Any, default: int = 0) -> int: + try: + return int(str(value).strip()) + except Exception: + return default + + +def _parse_bool(value: Any, default: bool = False) -> bool: + if value is None: + return default + text = str(value).strip().lower() + if text in {"1", "true", "yes", "y", "on"}: + return True + if text in {"0", "false", "no", "n", "off"}: + return False + return default + + +def _first_param(params: Dict[str, List[str]], key: str, default: str = "") -> str: + values = params.get(key) or [] + if not values: + return default + return values[0] + + +def _append_jsonl(file_path: str, payload: Dict[str, Any]) -> None: + os.makedirs(os.path.dirname(file_path) or ".", exist_ok=True) + line = json.dumps(payload, ensure_ascii=False) + with RAW_WRITE_LOCK: + with open(file_path, "a", encoding="utf-8") as out: + out.write(line) + out.write("\n") + + +def _save_raw_index_payload(payload: Dict[str, Any], query: Dict[str, List[str]], client_ip: str) -> str: + now_ts = int(time.time()) + day = time.strftime("%Y%m%d", time.localtime(now_ts)) + file_path = os.path.join(DOUYIN_RAW_DIR, f"douyin_index_{day}.jsonl") + + wrapped = { + "received_at": now_ts, + "client_ip": client_ip, + "query": query, + "payload": payload, + } + _append_jsonl(file_path, wrapped) + return file_path + + +def _query_area_data(table_name: str, domain: str) -> List[Dict[str, Any]]: + if not _is_safe_table_name(table_name): + raise ValueError("非法表名") + + with Db() as db: + cursor = db.db.cursor() + sql = ( + f"SELECT province, city, name, pid, pinyin, code, domain, level, create_time " + f"FROM `{table_name}` WHERE domain=%s ORDER BY id ASC" + ) + cursor.execute(sql, (domain,)) + rows = cursor.fetchall() + cursor.close() + + result: List[Dict[str, Any]] = [] + for row in rows: + result.append( + { + "province": row[0] or "", + "city": row[1] or "", + "name": row[2] or "", + "pid": row[3] if row[3] is not None else 0, + "pinyin": row[4] or "", + "code": row[5] or "", + "domain": row[6] or "", + "level": row[7] if row[7] is not None else 0, + "create_time": row[8] if row[8] is not None else 0, + } + ) + return result + + +def _iter_dict_nodes(value: Any) -> Iterable[Dict[str, Any]]: + stack: List[Any] = [value] + while stack: + current = stack.pop() + if isinstance(current, dict): + yield current + stack.extend(current.values()) + elif isinstance(current, list): + stack.extend(current) + + +def _extract_phones_from_text(text: str) -> List[str]: + phones: List[str] = [] + seen: Set[str] = set() + for match in PHONE_REGEX.finditer(text or ""): + phone = match.group(1) + if not phone or phone in seen: + continue + seen.add(phone) + phones.append(phone) + return phones + + +def _extract_phones_from_user_info(user_info: Dict[str, Any]) -> List[str]: + signature = str(user_info.get("signature") or "") + unique_id = str(user_info.get("unique_id") or "") + versatile = str(user_info.get("versatile_display") or "") + + # 1) 优先从简介直接匹配手机号 + phones = set(_extract_phones_from_text(signature)) + if phones: + return sorted(phones) + + # 2) 从微信相关标记中提取,再从抖音号字段兜底 + for text in (signature, unique_id, versatile): + for match in WX_CONTEXT_REGEX.finditer(text): + wx_value = match.group(1) or "" + for phone in _extract_phones_from_text(wx_value): + phones.add(phone) + + for text in (unique_id, versatile): + for phone in _extract_phones_from_text(text): + phones.add(phone) + + return sorted(phones) + + +def _extract_law_firm_from_user_info(user_info: Dict[str, Any]) -> str: + candidates: List[str] = [] + + signature = str(user_info.get("signature") or "") + if signature: + candidates.append(signature) + + verify_reason = str(user_info.get("enterprise_verify_reason") or "") + if verify_reason: + candidates.append(verify_reason) + + cert_text = "" + account_cert_info = user_info.get("account_cert_info") + if isinstance(account_cert_info, str) and account_cert_info.strip(): + try: + cert_obj = json.loads(account_cert_info) + if isinstance(cert_obj, dict): + cert_text = str(cert_obj.get("label_text") or "").strip() + except Exception: + cert_text = account_cert_info.strip() + if cert_text: + candidates.append(cert_text) + + for text in candidates: + match = LAW_FIRM_REGEX.search(text) + if match: + return match.group(1) + + return "" + + +def _pick_first_str(node: Dict[str, Any], keys: Tuple[str, ...]) -> str: + for key in keys: + value = node.get(key) + if isinstance(value, str): + text = value.strip() + if text: + return text + return "" + + +def _extract_name(node: Dict[str, Any]) -> str: + direct = _pick_first_str(node, ("name", "nickname", "nick_name", "author_name", "title", "account_name")) + if direct: + return direct + + for nested_key in ("author", "user", "user_info", "profile", "account"): + nested = node.get(nested_key) + if isinstance(nested, dict): + nested_name = _pick_first_str(nested, ("name", "nickname", "nick_name", "author_name", "title")) + if nested_name: + return nested_name + + return "" + + +def _extract_law_firm(node: Dict[str, Any]) -> str: + direct = _pick_first_str( + node, + ( + "law_firm", + "firm", + "lawFirm", + "office", + "org_name", + "organization", + "company", + "enterprise", + ), + ) + if direct: + return direct + + enterprise = node.get("enterprise") + if isinstance(enterprise, dict): + company_name = _pick_first_str(enterprise, ("name", "company_name", "enterprise_name")) + if company_name: + return company_name + + return "" + + +def _extract_detail_url(node: Dict[str, Any], fallback_api_url: str) -> str: + url = _pick_first_str(node, ("share_url", "url", "web_url", "detail_url", "jump_url")) + if url: + return url + + aweme_id = node.get("aweme_id") or node.get("item_id") + if aweme_id: + aid = str(aweme_id).strip() + if aid: + return f"https://www.douyin.com/video/{aid}" + + sec_uid = node.get("sec_uid") + if sec_uid: + sec_uid_text = str(sec_uid).strip() + if sec_uid_text: + return f"https://www.douyin.com/user/{sec_uid_text}" + + return fallback_api_url + + +def _city_from_index(city_index: int, table_name: str, domain: str) -> Tuple[str, str]: + if city_index < 0: + return "", "" + try: + areas = _query_area_data(table_name, domain) + except Exception: + return "", "" + if city_index >= len(areas): + return "", "" + area = areas[city_index] + province = str(area.get("province") or "").strip() + city = str(area.get("city") or province).strip() + return province, city + + +def _existing_phones(domain: str, phones: List[str]) -> Set[str]: + if not phones: + return set() + + deduped = sorted({p for p in phones if p}) + if not deduped: + return set() + + existing: Set[str] = set() + with Db() as db: + cursor = db.db.cursor() + chunk_size = 500 + for i in range(0, len(deduped), chunk_size): + chunk = deduped[i:i + chunk_size] + placeholders = ",".join(["%s"] * len(chunk)) + sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" + cursor.execute(sql, [domain, *chunk]) + for row in cursor.fetchall(): + existing.add(str(row[0])) + cursor.close() + return existing + + +def _insert_lawyer_rows(rows: List[Dict[str, Any]], domain: str) -> Tuple[int, int]: + if not rows: + return 0, 0 + + def row_score(item: Dict[str, Any]) -> int: + score = 0 + if str(item.get("name") or "").strip(): + score += 5 + if str(item.get("law_firm") or "").strip(): + score += 3 + if str(item.get("url") or "").strip(): + score += 1 + if str(item.get("province") or "").strip() or str(item.get("city") or "").strip(): + score += 1 + phone_count_in_node = _parse_int(item.get("phone_count_in_node"), 1) + if phone_count_in_node > 1: + score -= (phone_count_in_node - 1) + return score + + deduped_by_phone: Dict[str, Dict[str, Any]] = {} + skipped = 0 + for row in rows: + phone = str(row.get("phone") or "").strip() + if not phone: + skipped += 1 + continue + old_row = deduped_by_phone.get(phone) + if old_row is not None: + if row_score(row) > row_score(old_row): + deduped_by_phone[phone] = row + skipped += 1 + continue + deduped_by_phone[phone] = row + + existing = _existing_phones(domain, list(deduped_by_phone.keys())) + + inserted = 0 + with Db() as db: + cursor = db.db.cursor() + sql = ( + "INSERT INTO lawyer " + "(name, phone, law_firm, province, city, url, domain, create_time, site_time, params) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + ) + for phone, row in deduped_by_phone.items(): + if phone in existing: + skipped += 1 + continue + + cursor.execute( + sql, + ( + row.get("name") or "", + phone, + row.get("law_firm") or "", + row.get("province") or "", + row.get("city") or "", + row.get("url") or "", + domain, + _parse_int(row.get("create_time"), int(time.time())), + _parse_int(row.get("site_time"), int(time.time())), + row.get("params") or "{}", + ), + ) + inserted += 1 + existing.add(phone) + + db.db.commit() + cursor.close() + + return inserted, skipped + + +def _extract_lawyer_rows_from_payload( + payload: Dict[str, Any], + area_table: str, + area_domain: str, + save_domain: str, +) -> List[Dict[str, Any]]: + now_ts = int(time.time()) + api_url = str(payload.get("url") or "").strip() + city_index = _parse_int(payload.get("cityIndex"), -1) + city_province, city_name = _city_from_index(city_index, area_table, area_domain) + + rows: List[Dict[str, Any]] = [] + data = payload.get("data") if isinstance(payload, dict) else None + user_list = data.get("user_list") if isinstance(data, dict) else None + if not isinstance(user_list, list): + return rows + + for user_item in user_list: + if not isinstance(user_item, dict): + continue + user_info = user_item.get("user_info") + if not isinstance(user_info, dict): + continue + + phones = _extract_phones_from_user_info(user_info) + if not phones: + continue + + name = str(user_info.get("nickname") or "").strip() + law_firm = _extract_law_firm_from_user_info(user_info) + sec_uid = str(user_info.get("sec_uid") or "").strip() + url = f"https://www.douyin.com/user/{sec_uid}" if sec_uid else api_url + + province = city_province + city = city_name or city_province + + source_record = { + "source": "douyin", + "api_source": payload.get("source") or "", + "api_url": api_url, + "city_index": city_index, + "captured_at": now_ts, + "sec_uid": sec_uid, + "user_info": { + "uid": user_info.get("uid"), + "nickname": user_info.get("nickname"), + "signature": user_info.get("signature"), + "unique_id": user_info.get("unique_id"), + "versatile_display": user_info.get("versatile_display"), + }, + } + + for phone in phones: + rows.append( + { + "name": name, + "phone": phone, + "law_firm": law_firm, + "province": province, + "city": city, + "url": url, + "domain": save_domain, + "create_time": now_ts, + "site_time": _parse_int(payload.get("ts"), now_ts), + "phone_count_in_node": len(phones), + "params": json.dumps(source_record, ensure_ascii=False), + } + ) + + return rows + + +class AreaSyncHandler(BaseHTTPRequestHandler): + server_version = "AreaSyncService/2.0" + + def _write_json(self, status: int, payload: Any) -> None: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.end_headers() + self.wfile.write(body) + + def _read_json_body(self) -> Any: + length = _parse_int(self.headers.get("Content-Length"), 0) + if length <= 0: + return {} + + raw = self.rfile.read(length) + if not raw: + return {} + + try: + return json.loads(raw.decode("utf-8")) + except Exception: + return {} + + def do_OPTIONS(self) -> None: + self.send_response(204) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.end_headers() + + def do_GET(self) -> None: + parsed = urlparse(self.path) + params = parse_qs(parsed.query) + + if parsed.path == "/health": + self._write_json(200, {"ok": True, "service": "layer-service"}) + return + + if parsed.path == "/api/layer/get_area": + table_name = _first_param(params, "table", AREA_TABLE).strip() or AREA_TABLE + domain = _first_param(params, "domain", AREA_DOMAIN).strip() or AREA_DOMAIN + with_meta = _parse_bool(_first_param(params, "meta", "0"), False) + + try: + rows = _query_area_data(table_name, domain) + except Exception as exc: + self._write_json(500, {"ok": False, "error": str(exc)}) + return + + if with_meta: + self._write_json( + 200, + { + "ok": True, + "count": len(rows), + "table": table_name, + "domain": domain, + "data": rows, + }, + ) + else: + self._write_json(200, rows) + return + + self._write_json(404, {"ok": False, "error": "not found"}) + + def do_POST(self) -> None: + parsed = urlparse(self.path) + params = parse_qs(parsed.query) + + if parsed.path == "/api/layer/index": + body = self._read_json_body() + if not isinstance(body, dict) or not body: + self._write_json(400, {"ok": False, "error": "invalid json body"}) + return + + area_table = _first_param(params, "table", AREA_TABLE).strip() or AREA_TABLE + area_domain = _first_param(params, "area_domain", AREA_DOMAIN).strip() or AREA_DOMAIN + save_domain = _first_param(params, "save_domain", DOUYIN_DOMAIN).strip() or DOUYIN_DOMAIN + save_only_default = _parse_bool(DOUYIN_SAVE_ONLY_ENV, True) + save_only = _parse_bool(_first_param(params, "save_only", DOUYIN_SAVE_ONLY_ENV), save_only_default) + + try: + saved_file = _save_raw_index_payload(body, params, self.client_address[0] if self.client_address else "") + except Exception as exc: + self._write_json(500, {"ok": False, "error": f"save raw payload failed: {exc}"}) + return + + if save_only: + self._write_json( + 200, + { + "ok": True, + "message": "saved_only", + "save_only": True, + "saved_file": saved_file, + }, + ) + return + + try: + extracted = _extract_lawyer_rows_from_payload(body, area_table, area_domain, save_domain) + inserted, skipped = _insert_lawyer_rows(extracted, save_domain) + except Exception as exc: + self._write_json(500, {"ok": False, "error": str(exc)}) + return + + self._write_json( + 200, + { + "ok": True, + "message": "received", + "save_domain": save_domain, + "save_only": False, + "saved_file": saved_file, + "extracted": len(extracted), + "inserted": inserted, + "skipped": skipped, + }, + ) + return + + self._write_json(404, {"ok": False, "error": "not found"}) + + +def run() -> None: + server = ThreadingHTTPServer((SERVICE_HOST, SERVICE_PORT), AreaSyncHandler) + print(f"[layer-service] running on http://{SERVICE_HOST}:{SERVICE_PORT}") + print(f"[layer-service] get_area -> table/domain: {AREA_TABLE}/{AREA_DOMAIN}") + print(f"[layer-service] index -> save domain: {DOUYIN_DOMAIN}") + server.serve_forever() + + +if __name__ == "__main__": + run()