From e10437cd90afb10c6004ce276516bf7ff16823e1 Mon Sep 17 00:00:00 2001 From: hello-dd-code Date: Sat, 7 Mar 2026 01:06:40 +0800 Subject: [PATCH] feat: add shared progress API and resume/skip support for douyin batch --- README.md | 20 ++- js/douyin.js | 325 ++++++++++++++++++++++++++++++++-- services/area_sync_service.py | 268 +++++++++++++++++++++++++++- 3 files changed, 594 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index f9d0fe2..f04cfff 100644 --- a/README.md +++ b/README.md @@ -29,12 +29,14 @@ python3 -m venv .venv - 替代原 `nas.nepiedg.site:9002` 的核心接口 - `GET /api/layer/get_area`:从数据库 `area_new` 读取地区列表并返回给 `js/douyin.js` - `POST /api/layer/index`:接收脚本回传搜索数据,先保存原始 JSON 到本地,再按参数决定是否入库 +- `GET/POST /api/layer/progress`:多设备共享采集断点(自动建表 `layer_progress`) `/api/layer/index` 当前入库规则(基于 `payload.data.user_list[].user_info`): - 主要从 `signature`(简介)里正则提取手机号 - 若简介未命中,再从微信相关标记(`微信/wx/vx/v`)和 `unique_id/versatile_display` 提取手机号 -- `url` 固定写为 `https://www.douyin.com/user/{sec_uid}`(`sec_uid` 为空时回退接口 URL) +- 必须命中关键词(默认:`律师,律所`)才允许入库,可通过 `DOUYIN_LAWYER_KEYWORDS` 调整 +- `url` 固定写为 `https://www.douyin.com/user/{sec_uid}`(`sec_uid` 为空则跳过不入库) 启动: @@ -53,6 +55,9 @@ AREA_DOMAIN=maxlaw DOUYIN_DOMAIN=抖音 DOUYIN_RAW_DIR=/www/wwwroot/lawyers/data/douyin_raw DOUYIN_SAVE_ONLY=1 +DOUYIN_LAWYER_KEYWORDS=律师,律所 +LAYER_PROGRESS_TABLE=layer_progress +LAYER_PROGRESS_DEFAULT_KEY=douyin_batch_default ``` 接口示例: @@ -84,6 +89,19 @@ curl -X POST 'http://127.0.0.1:9002/api/layer/index?save_only=1' \ # 原始数据落盘目录(按天分文件) # /www/wwwroot/lawyers/data/douyin_raw/douyin_index_YYYYMMDD.jsonl + +# 读取共享断点(多设备) +curl 'http://127.0.0.1:9002/api/layer/progress?server=1&progress_key=douyin_batch_default' + +# 更新共享断点 +curl -X POST 'http://127.0.0.1:9002/api/layer/progress?server=1' \ + -H 'Content-Type: application/json' \ + -d '{"progress_key":"douyin_batch_default","device_id":"device-a","next_city_index":120,"area_signature":"xxxx","area_total":551,"current_city":"北京","reason":"city_done","status":"running"}' + +# 清空共享断点 +curl -X POST 'http://127.0.0.1:9002/api/layer/progress?server=1' \ + -H 'Content-Type: application/json' \ + -d '{"action":"clear","progress_key":"douyin_batch_default"}' ``` 如果 9002 端口已有旧进程占用,可先执行: diff --git a/js/douyin.js b/js/douyin.js index 545cfb9..92cecfa 100644 --- a/js/douyin.js +++ b/js/douyin.js @@ -1,7 +1,7 @@ // ==UserScript== // @name Douyin Batch City Search + AutoScroll + Capture // @namespace http://tampermonkey.net/ -// @version 1.0 +// @version 1.1 // @description 从 Python 服务获取地区列表,按 city + "律师" 搜索并自动下滑,拦截 /aweme/v1/web/discover/search/ 返回并转发到入库接口。 // @author You // @match https://www.douyin.com/* @@ -42,6 +42,13 @@ const SCROLL_BY = 2200; const WAIT_AFTER_SEARCH_MS = 1000; const DELAY_BETWEEN_CITIES_MS = 1500; + + // 断点续跑配置 + const PROGRESS_STORAGE_KEY = 'dm_batch_progress_v1'; + const DEVICE_ID_STORAGE_KEY = 'dm_batch_device_id_v1'; + const PROGRESS_SYNC_ENABLED = true; + const PROGRESS_KEY = 'douyin_batch_default'; + const PROGRESS_API = `${API_BASE}/api/layer/progress?server=1`; // 可选:如果希望只发送包含手机号的条目,可在此启用并调整正则 const ONLY_SEND_IF_HAS_PHONE = false; @@ -50,14 +57,20 @@ /********************* 运行时状态 *********************/ let areaList = []; let stopFlag = false; // 由 UI 控制,true 表示停止整个任务 + let skipCurrentCityFlag = false; // 由 UI 控制,true 表示跳过当前城市 let currentCityIndex = -1; + let currentAreaSignature = ''; + let isLoopRunning = false; let inputEl = null; let btnEl = null; + const DEVICE_ID = getOrCreateDeviceId(); // 节流/去重发送 let lastSentHash = null; let lastSentAt = 0; const SEND_MIN_INTERVAL_MS = 800; + let progressSyncInFlight = false; + let progressSyncPendingPayload = null; /********************* 工具函数 *********************/ function log(...args) { console.log('[DouyinBatch] ', ...args); } @@ -71,6 +84,106 @@ } return h.toString(16); } + + function sleep(ms) { + return new Promise(r => setTimeout(r, ms)); + } + + function getOrCreateDeviceId() { + try { + const old = localStorage.getItem(DEVICE_ID_STORAGE_KEY); + if (old) return old; + const generated = (window.crypto && typeof window.crypto.randomUUID === 'function') + ? window.crypto.randomUUID() + : `dm-${Date.now()}-${Math.random().toString(16).slice(2, 10)}`; + localStorage.setItem(DEVICE_ID_STORAGE_KEY, generated); + return generated; + } catch (_) { + return `dm-${Date.now()}-${Math.random().toString(16).slice(2, 10)}`; + } + } + + function getAreaRowName(row) { + if (!row || typeof row !== 'object') return ''; + return String(row.city || row.province || row.name || '').trim(); + } + + function buildAreaSignature(list) { + try { + if (!Array.isArray(list) || list.length === 0) return 'empty'; + const names = list.map(getAreaRowName).filter(Boolean); + return hashString(`${list.length}|${names.join('|')}`); + } catch (e) { + return 'unknown'; + } + } + + function readProgress() { + try { + const raw = localStorage.getItem(PROGRESS_STORAGE_KEY); + if (!raw) return null; + const parsed = JSON.parse(raw); + if (!parsed || typeof parsed !== 'object') return null; + return parsed; + } catch (_) { + return null; + } + } + + function buildProgressPayload(nextCityIndex, reason = '') { + const safeIndex = Number.isFinite(nextCityIndex) ? Math.max(0, Math.floor(nextCityIndex)) : 0; + const currentArea = areaList[safeIndex] || areaList[Math.max(0, currentCityIndex)] || {}; + return { + progress_key: PROGRESS_KEY, + device_id: DEVICE_ID, + next_city_index: safeIndex, + area_signature: currentAreaSignature || '', + area_total: Array.isArray(areaList) ? areaList.length : 0, + current_city: getAreaRowName(currentArea), + reason, + status: stopFlag ? 'paused' : 'running', + extra: { + path: location.pathname || '', + href: location.href || '', + }, + }; + } + + function persistProgress(nextCityIndex, reason = '') { + try { + const payload = buildProgressPayload(nextCityIndex, reason); + localStorage.setItem(PROGRESS_STORAGE_KEY, JSON.stringify({ + nextCityIndex: payload.next_city_index, + areaSignature: payload.area_signature, + reason: payload.reason, + updatedAt: Date.now(), + progressKey: payload.progress_key, + deviceId: payload.device_id, + })); + + enqueueRemoteProgressSync(payload); + } catch (e) { + err('保存进度失败', e); + } + } + + function restoreProgress(areaSignature, listLength) { + const progress = readProgress(); + if (!progress) return 0; + if (!progress.areaSignature || progress.areaSignature !== areaSignature) return 0; + const idx = Number.isFinite(progress.nextCityIndex) ? Math.floor(progress.nextCityIndex) : 0; + if (idx < 0 || idx >= listLength) return 0; + return idx; + } + + function clearProgress() { + try { localStorage.removeItem(PROGRESS_STORAGE_KEY); } catch (_) {} + enqueueRemoteProgressSync({ + action: 'clear', + progress_key: PROGRESS_KEY, + device_id: DEVICE_ID, + }); + } function gmGetJson(url) { return new Promise((resolve, reject) => { @@ -89,6 +202,76 @@ }); }); } + + function gmPostJson(url, data) { + return new Promise((resolve, reject) => { + GM_xmlhttpRequest({ + method: 'POST', + url, + headers: { 'Content-Type': 'application/json' }, + data: JSON.stringify(data || {}), + onload(res) { + try { + const json = JSON.parse(res.responseText || '{}'); + resolve(json); + } catch (e) { + reject(e); + } + }, + onerror(err) { reject(err); } + }); + }); + } + + function enqueueRemoteProgressSync(payload) { + if (!PROGRESS_SYNC_ENABLED) return; + if (!payload || typeof payload !== 'object') return; + progressSyncPendingPayload = payload; + if (progressSyncInFlight) return; + flushRemoteProgressSync(); + } + + async function flushRemoteProgressSync() { + if (!PROGRESS_SYNC_ENABLED) return; + if (progressSyncInFlight) return; + + progressSyncInFlight = true; + try { + while (progressSyncPendingPayload) { + const payload = progressSyncPendingPayload; + progressSyncPendingPayload = null; + try { + await gmPostJson(PROGRESS_API, payload); + } catch (e) { + err('同步远端进度失败', e); + break; + } + } + } finally { + progressSyncInFlight = false; + } + } + + async function restoreRemoteProgress(areaSignature, listLength) { + if (!PROGRESS_SYNC_ENABLED) return 0; + try { + const url = `${PROGRESS_API}&progress_key=${encodeURIComponent(PROGRESS_KEY)}`; + const response = await gmGetJson(url); + const data = response && response.data ? response.data : null; + if (!data || typeof data !== 'object') return 0; + + const remoteSignature = String(data.area_signature || ''); + if (!remoteSignature || remoteSignature !== areaSignature) return 0; + + const idxRaw = data.next_city_index; + const idx = Number.isFinite(idxRaw) ? Math.floor(idxRaw) : Math.floor(Number(idxRaw || 0)); + if (!Number.isFinite(idx) || idx < 0 || idx >= listLength) return 0; + return idx; + } catch (e) { + err('读取远端进度失败', e); + return 0; + } + } function setNativeValue(el, value) { if (!el) return; @@ -263,6 +446,11 @@ let scrolls = 0; while (!stopFlag) { + if (skipCurrentCityFlag) { + statusNode.textContent = '收到跳过指令,结束当前地区滚动。'; + break; + } + scrolls++; if (scrolls > maxScrolls) { statusNode.textContent = `达到单次搜索最大滚动 ${maxScrolls},停止本次自动下滑。`; @@ -276,7 +464,12 @@ window.scrollTo(0, (document.body.scrollHeight || document.documentElement.scrollHeight)); } - await new Promise(r => setTimeout(r, SCROLL_INTERVAL_MS)); + await sleep(SCROLL_INTERVAL_MS); + + if (skipCurrentCityFlag) { + statusNode.textContent = '收到跳过指令,结束当前地区滚动。'; + break; + } const curHeight = document.body.scrollHeight || document.documentElement.scrollHeight || 0; if (curHeight === lastHeight) { @@ -391,7 +584,9 @@ const css = ` #dm-batch-btn { position: fixed; right: 12px; bottom: 12px; z-index:999999; background: rgba(0,0,0,0.65); color:#fff; padding:8px 10px; border-radius:8px; font-size:13px; cursor:pointer; user-select:none;} - #dm-batch-status { position: fixed; right:12px; bottom:56px; z-index:999999; background: rgba(0,0,0,0.45); color:#fff; + #dm-batch-skip { position: fixed; right:12px; bottom:50px; z-index:999999; background: rgba(30,30,30,0.72); color:#fff; + padding:7px 10px; border-radius:8px; font-size:12px; cursor:pointer; user-select:none;} + #dm-batch-status { position: fixed; right:12px; bottom:88px; z-index:999999; background: rgba(0,0,0,0.45); color:#fff; padding:6px 8px; border-radius:6px; font-size:12px; max-width:320px; word-break:break-word;} `; const s = document.createElement('style'); s.textContent = css; document.head && document.head.appendChild(s); @@ -401,6 +596,11 @@ btn.textContent = 'BatchSearch:停止'; btn.dataset.running = '1'; document.body.appendChild(btn); + + const skipBtn = document.createElement('div'); + skipBtn.id = 'dm-batch-skip'; + skipBtn.textContent = 'BatchSearch:跳过当前'; + document.body.appendChild(skipBtn); const status = document.createElement('div'); status.id = 'dm-batch-status'; @@ -411,21 +611,55 @@ const running = btn.dataset.running === '1'; btn.dataset.running = running ? '0' : '1'; btn.textContent = running ? 'BatchSearch:已停止' : 'BatchSearch:停止'; - status.textContent = running ? '已手动停止' : '已启动'; + status.textContent = running ? '已手动停止(已保存断点)' : '已启动'; stopFlag = running; // if was running and clicked -> set stopFlag true; if restarting, set false + if (running) { + skipCurrentCityFlag = false; + persistProgress(Math.max(currentCityIndex, 0), 'manual_pause'); + } if (!stopFlag) { // restart loop if needed runBatchSearchLoop(status).catch(e => err(e)); } }); + + skipBtn.addEventListener('click', () => { + if (currentCityIndex < 0) { + status.textContent = '当前还未开始处理城市,稍后再跳过。'; + return; + } + skipCurrentCityFlag = true; + const areaName = getAreaRowName(areaList[currentCityIndex] || {}); + status.textContent = `收到跳过指令:${areaName || `索引${currentCityIndex}`}`; + }); + + skipBtn.addEventListener('contextmenu', (event) => { + event.preventDefault(); + clearProgress(); + currentCityIndex = 0; + status.textContent = '已清除断点。下次将从第 1 个地区开始。'; + }); - return { btn, status }; + return { btn, skipBtn, status }; } /********************* 主流程:获取城市并循环搜索 *********************/ async function runBatchSearchLoop(statusNode) { + if (isLoopRunning) { + statusNode.textContent = '批量任务已在运行中,请勿重复启动。'; + return; + } + + isLoopRunning = true; try { stopFlag = (document.getElementById('dm-batch-btn') && document.getElementById('dm-batch-btn').dataset.running === '0'); + skipCurrentCityFlag = false; + + if (stopFlag) { + statusNode.textContent = '当前是暂停状态,点击“BatchSearch:停止”可继续。'; + return; + } + // 获取 area list(仅在内存为空时获取) if (!areaList || !Array.isArray(areaList) || areaList.length === 0) { statusNode.textContent = '正在获取城市列表...'; @@ -450,6 +684,20 @@ return; } } + + currentAreaSignature = buildAreaSignature(areaList); + const restoredIndexLocal = restoreProgress(currentAreaSignature, areaList.length); + const restoredIndexRemote = await restoreRemoteProgress(currentAreaSignature, areaList.length); + const restoredIndex = Math.max(restoredIndexLocal, restoredIndexRemote); + const startIndex = (currentCityIndex >= 0 && currentCityIndex < areaList.length) + ? currentCityIndex + : restoredIndex; + currentCityIndex = startIndex; + + if (startIndex > 0) { + statusNode.textContent = `检测到断点(本地:${restoredIndexLocal + 1} 远端:${restoredIndexRemote + 1}),将从第 ${startIndex + 1}/${areaList.length} 个地区继续。`; + await sleep(500); + } // 等待搜索输入与按钮可用 try { @@ -459,13 +707,28 @@ statusNode.textContent = '未找到搜索输入或按钮,脚本仍会监听接口,但无法自动搜索。'; return; } + + let completedAll = true; // 主循环:对每个 city 执行搜索 -> 下滑 -> 发送结果 -> 下一 city - for (let i = 0; i < areaList.length; i++) { - if (stopFlag) { statusNode.textContent = '已停止'; break; } + for (let i = startIndex; i < areaList.length; i++) { + if (stopFlag) { + completedAll = false; + persistProgress(i, 'manual_stop'); + statusNode.textContent = '已停止(断点已保存)。'; + break; + } + currentCityIndex = i; + skipCurrentCityFlag = false; + persistProgress(i, 'start_city'); + const city = (areaList[i].city || areaList[i].province || '').trim(); - if (!city) continue; + if (!city) { + persistProgress(i + 1, 'empty_city'); + continue; + } + const keyword = `${city}律师`; statusNode.textContent = `正在搜索:${keyword} (${i+1}/${areaList.length})`; log(`开始城市[${i+1}/${areaList.length}] 搜索:`, keyword); @@ -476,6 +739,8 @@ } catch (e) { err('刷新搜索控件失败', e); statusNode.textContent = '刷新搜索控件失败,终止批量搜索。'; + completedAll = false; + persistProgress(i, 'search_control_error'); break; } @@ -488,6 +753,8 @@ await ensureSearchControls(statusNode); if (!simulateSearchTrigger()) { statusNode.textContent = '搜索触发失败,终止批量搜索。'; + completedAll = false; + persistProgress(i, 'search_trigger_error'); break; } } @@ -497,25 +764,55 @@ // 自动下滑直到稳定或达到上限 await autoScrollUntilStable(statusNode, MAX_SCROLLS_PER_CITY); + + if (skipCurrentCityFlag) { + skipCurrentCityFlag = false; + persistProgress(i + 1, 'skip_city'); + statusNode.textContent = `已跳过 ${keyword},继续下一个地区...`; + await sleep(Math.min(DELAY_BETWEEN_CITIES_MS, 800)); + continue; + } - if (stopFlag) { statusNode.textContent = '已停止'; break; } + if (stopFlag) { + completedAll = false; + persistProgress(i, 'manual_stop_after_scroll'); + statusNode.textContent = '已停止(断点已保存)。'; + break; + } + + persistProgress(i + 1, 'city_done'); // 等待短暂间隔再进行下一个城市 statusNode.textContent = `完成 ${keyword} 的加载,等待 ${DELAY_BETWEEN_CITIES_MS} ms 后继续...`; - await new Promise(r => setTimeout(r, DELAY_BETWEEN_CITIES_MS)); + await sleep(DELAY_BETWEEN_CITIES_MS); + } + + if (completedAll && !stopFlag) { + clearProgress(); + currentCityIndex = -1; + statusNode.textContent = '批量搜索完成,已清除断点进度。'; + log('批量搜索循环结束: completed'); + } else { + log('批量搜索循环结束: paused/broken'); } - - statusNode.textContent = '批量搜索完成或已停止。'; - log('批量搜索循环结束'); } catch (e) { err('runBatchSearchLoop error', e); + persistProgress(Math.max(currentCityIndex, 0), 'loop_exception'); + } finally { + isLoopRunning = false; } } /********************* 启动脚本 *********************/ (function init() { + window.addEventListener('beforeunload', () => { + if (currentCityIndex >= 0) { + persistProgress(Math.max(currentCityIndex, 0), 'page_unload'); + } + }); + const ui = createUI(); - ui.status.textContent = '就绪 - 点击右下按钮可停止/重启批量搜索'; + ui.status.textContent = '就绪 - 可暂停/跳过,自动保存断点(右键跳过按钮可清除断点)'; console.log(location.pathname) // 如果当前为目标页面(/jingxuan/search/),则自动启动;否则仍可在任何页面打开并手动启动。 const isAutoPage = location.pathname && location.pathname.indexOf('/search/') !== -1; diff --git a/services/area_sync_service.py b/services/area_sync_service.py index 592a576..312b8f8 100644 --- a/services/area_sync_service.py +++ b/services/area_sync_service.py @@ -5,7 +5,7 @@ import sys import threading import time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from typing import Any, Dict, Iterable, List, Set, Tuple +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from urllib.parse import parse_qs, urlparse current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -21,6 +21,9 @@ AREA_DOMAIN = os.getenv("AREA_DOMAIN", "maxlaw") DOUYIN_DOMAIN = os.getenv("DOUYIN_DOMAIN", "抖音") DOUYIN_RAW_DIR = os.getenv("DOUYIN_RAW_DIR", os.path.join(project_root, "data", "douyin_raw")) DOUYIN_SAVE_ONLY_ENV = os.getenv("DOUYIN_SAVE_ONLY", "1") +LAWYER_KEYWORDS_ENV = os.getenv("DOUYIN_LAWYER_KEYWORDS", "律师,律所") +PROGRESS_TABLE = os.getenv("LAYER_PROGRESS_TABLE", "layer_progress") +PROGRESS_DEFAULT_KEY = os.getenv("LAYER_PROGRESS_DEFAULT_KEY", "douyin_batch_default") SERVICE_HOST = os.getenv("AREA_SERVICE_HOST", "0.0.0.0") SERVICE_PORT = int(os.getenv("AREA_SERVICE_PORT", "9002")) @@ -29,6 +32,10 @@ WX_CONTEXT_REGEX = re.compile(r"(?i)(?:微信|微.?信|wx|vx|weixin|v信|v号|v) LAW_FIRM_REGEX = re.compile(r"([\u4e00-\u9fa5A-Za-z·]{2,40}律师事务所)") RAW_WRITE_LOCK = threading.Lock() +LAWYER_KEYWORDS: Tuple[str, ...] = tuple( + keyword.strip() for keyword in LAWYER_KEYWORDS_ENV.split(",") if keyword.strip() +) + def _is_safe_table_name(table_name: str) -> bool: return bool(re.fullmatch(r"[A-Za-z0-9_]+", table_name or "")) @@ -83,6 +90,161 @@ def _save_raw_index_payload(payload: Dict[str, Any], query: Dict[str, List[str]] return file_path +def _ensure_progress_table() -> None: + if not _is_safe_table_name(PROGRESS_TABLE): + raise ValueError("非法进度表名") + + with Db() as db: + cursor = db.db.cursor() + sql = f""" + CREATE TABLE IF NOT EXISTS `{PROGRESS_TABLE}` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `progress_key` varchar(128) NOT NULL, + `next_city_index` int(11) DEFAULT 0, + `area_signature` varchar(128) DEFAULT NULL, + `area_total` int(11) DEFAULT 0, + `current_city` varchar(128) DEFAULT NULL, + `reason` varchar(64) DEFAULT NULL, + `status` varchar(32) DEFAULT NULL, + `device_id` varchar(128) DEFAULT NULL, + `extra_json` longtext, + `updated_at` bigint(20) DEFAULT NULL, + `create_time` bigint(20) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_progress_key` (`progress_key`), + KEY `idx_updated_at` (`updated_at`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + cursor.execute(sql) + db.db.commit() + cursor.close() + + +def _get_progress(progress_key: str) -> Optional[Dict[str, Any]]: + key = str(progress_key or "").strip() + if not key: + return None + + _ensure_progress_table() + with Db() as db: + cursor = db.db.cursor() + sql = ( + f"SELECT progress_key, next_city_index, area_signature, area_total, current_city, " + f"reason, status, device_id, extra_json, updated_at, create_time " + f"FROM `{PROGRESS_TABLE}` WHERE progress_key=%s LIMIT 1" + ) + cursor.execute(sql, (key,)) + row = cursor.fetchone() + cursor.close() + + if not row: + return None + + extra_json = row[8] or "" + extra_obj: Any = {} + if extra_json: + try: + extra_obj = json.loads(extra_json) + except Exception: + extra_obj = extra_json + + return { + "progress_key": row[0] or "", + "next_city_index": _parse_int(row[1], 0), + "area_signature": row[2] or "", + "area_total": _parse_int(row[3], 0), + "current_city": row[4] or "", + "reason": row[5] or "", + "status": row[6] or "", + "device_id": row[7] or "", + "extra": extra_obj, + "updated_at": _parse_int(row[9], 0), + "create_time": _parse_int(row[10], 0), + } + + +def _upsert_progress(progress_key: str, payload: Dict[str, Any]) -> Dict[str, Any]: + key = str(progress_key or "").strip() + if not key: + raise ValueError("progress_key 不能为空") + + _ensure_progress_table() + now_ts = int(time.time()) + next_city_index = _parse_int(payload.get("next_city_index"), 0) + area_signature = str(payload.get("area_signature") or "").strip() + area_total = _parse_int(payload.get("area_total"), 0) + current_city = str(payload.get("current_city") or "").strip() + reason = str(payload.get("reason") or "").strip() + status = str(payload.get("status") or "").strip() + device_id = str(payload.get("device_id") or "").strip() + extra = payload.get("extra") + if extra is None: + extra = payload.get("extra_json") + + if isinstance(extra, str): + extra_json = extra + else: + try: + extra_json = json.dumps(extra or {}, ensure_ascii=False) + except Exception: + extra_json = "{}" + + with Db() as db: + cursor = db.db.cursor() + sql = ( + f"INSERT INTO `{PROGRESS_TABLE}` " + "(progress_key, next_city_index, area_signature, area_total, current_city, reason, status, " + "device_id, extra_json, updated_at, create_time) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " + "ON DUPLICATE KEY UPDATE " + "next_city_index=VALUES(next_city_index), " + "area_signature=VALUES(area_signature), " + "area_total=VALUES(area_total), " + "current_city=VALUES(current_city), " + "reason=VALUES(reason), " + "status=VALUES(status), " + "device_id=VALUES(device_id), " + "extra_json=VALUES(extra_json), " + "updated_at=VALUES(updated_at)" + ) + cursor.execute( + sql, + ( + key, + next_city_index, + area_signature, + area_total, + current_city, + reason, + status, + device_id, + extra_json, + now_ts, + now_ts, + ), + ) + db.db.commit() + cursor.close() + + return _get_progress(key) or {} + + +def _clear_progress(progress_key: str) -> int: + key = str(progress_key or "").strip() + if not key: + return 0 + + _ensure_progress_table() + with Db() as db: + cursor = db.db.cursor() + sql = f"DELETE FROM `{PROGRESS_TABLE}` WHERE progress_key=%s" + cursor.execute(sql, (key,)) + affected = cursor.rowcount + db.db.commit() + cursor.close() + return affected + + def _query_area_data(table_name: str, domain: str) -> List[Dict[str, Any]]: if not _is_safe_table_name(table_name): raise ValueError("非法表名") @@ -193,6 +355,36 @@ def _extract_law_firm_from_user_info(user_info: Dict[str, Any]) -> str: return "" +def _extract_account_cert_text(user_info: Dict[str, Any]) -> str: + account_cert_info = user_info.get("account_cert_info") + if isinstance(account_cert_info, str) and account_cert_info.strip(): + try: + cert_obj = json.loads(account_cert_info) + if isinstance(cert_obj, dict): + return str(cert_obj.get("label_text") or "").strip() + except Exception: + return account_cert_info.strip() + return "" + + +def _is_lawyer_related_user(user_info: Dict[str, Any], name: str, law_firm: str) -> bool: + texts = [ + name, + str(user_info.get("nickname") or ""), + str(user_info.get("signature") or ""), + str(user_info.get("custom_verify") or ""), + str(user_info.get("enterprise_verify_reason") or ""), + str(user_info.get("versatile_display") or ""), + str(user_info.get("unique_id") or ""), + _extract_account_cert_text(user_info), + law_firm, + ] + merged = "\n".join(text for text in texts if text).strip() + if not merged: + return False + return any(keyword in merged for keyword in LAWYER_KEYWORDS) + + def _pick_first_str(node: Dict[str, Any], keys: Tuple[str, ...]) -> str: for key in keys: value = node.get(key) @@ -399,14 +591,21 @@ def _extract_lawyer_rows_from_payload( if not isinstance(user_info, dict): continue + name = str(user_info.get("nickname") or "").strip() + law_firm = _extract_law_firm_from_user_info(user_info) + + # 强约束:必须出现“律师/律所”等关键词,避免非法律相关账号入库 + if not _is_lawyer_related_user(user_info, name, law_firm): + continue + phones = _extract_phones_from_user_info(user_info) if not phones: continue - name = str(user_info.get("nickname") or "").strip() - law_firm = _extract_law_firm_from_user_info(user_info) sec_uid = str(user_info.get("sec_uid") or "").strip() - url = f"https://www.douyin.com/user/{sec_uid}" if sec_uid else api_url + if not sec_uid: + continue + url = f"https://www.douyin.com/user/{sec_uid}" province = city_province city = city_name or city_province @@ -516,12 +715,67 @@ class AreaSyncHandler(BaseHTTPRequestHandler): self._write_json(200, rows) return + if parsed.path == "/api/layer/progress": + progress_key = _first_param(params, "progress_key", PROGRESS_DEFAULT_KEY).strip() or PROGRESS_DEFAULT_KEY + try: + row = _get_progress(progress_key) + except Exception as exc: + self._write_json(500, {"ok": False, "error": str(exc)}) + return + + self._write_json( + 200, + { + "ok": True, + "progress_key": progress_key, + "data": row, + }, + ) + return + self._write_json(404, {"ok": False, "error": "not found"}) def do_POST(self) -> None: parsed = urlparse(self.path) params = parse_qs(parsed.query) + if parsed.path == "/api/layer/progress": + body = self._read_json_body() + if not isinstance(body, dict): + body = {} + + progress_key = str(body.get("progress_key") or _first_param(params, "progress_key", PROGRESS_DEFAULT_KEY)).strip() or PROGRESS_DEFAULT_KEY + action = str(body.get("action") or _first_param(params, "action", "upsert")).strip().lower() or "upsert" + + try: + if action == "clear": + deleted = _clear_progress(progress_key) + self._write_json( + 200, + { + "ok": True, + "action": "clear", + "progress_key": progress_key, + "deleted": deleted, + }, + ) + return + + saved = _upsert_progress(progress_key, body) + self._write_json( + 200, + { + "ok": True, + "action": "upsert", + "progress_key": progress_key, + "data": saved, + }, + ) + return + except Exception as exc: + self._write_json(500, {"ok": False, "error": str(exc)}) + return + if parsed.path == "/api/layer/index": body = self._read_json_body() if not isinstance(body, dict) or not body: @@ -578,10 +832,16 @@ class AreaSyncHandler(BaseHTTPRequestHandler): def run() -> None: + try: + _ensure_progress_table() + except Exception as exc: + print(f"[layer-service] init progress table failed: {exc}") + server = ThreadingHTTPServer((SERVICE_HOST, SERVICE_PORT), AreaSyncHandler) print(f"[layer-service] running on http://{SERVICE_HOST}:{SERVICE_PORT}") print(f"[layer-service] get_area -> table/domain: {AREA_TABLE}/{AREA_DOMAIN}") print(f"[layer-service] index -> save domain: {DOUYIN_DOMAIN}") + print(f"[layer-service] progress table/default key: {PROGRESS_TABLE}/{PROGRESS_DEFAULT_KEY}") server.serve_forever()