feat: add shared progress API and resume/skip support for douyin batch

This commit is contained in:
hello-dd-code
2026-03-07 01:06:40 +08:00
parent 86cf933913
commit e10437cd90
3 changed files with 594 additions and 19 deletions
+264 -4
View File
@@ -5,7 +5,7 @@ import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any, Dict, Iterable, List, Set, Tuple
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from urllib.parse import parse_qs, urlparse
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -21,6 +21,9 @@ AREA_DOMAIN = os.getenv("AREA_DOMAIN", "maxlaw")
DOUYIN_DOMAIN = os.getenv("DOUYIN_DOMAIN", "抖音")
DOUYIN_RAW_DIR = os.getenv("DOUYIN_RAW_DIR", os.path.join(project_root, "data", "douyin_raw"))
DOUYIN_SAVE_ONLY_ENV = os.getenv("DOUYIN_SAVE_ONLY", "1")
LAWYER_KEYWORDS_ENV = os.getenv("DOUYIN_LAWYER_KEYWORDS", "律师,律所")
PROGRESS_TABLE = os.getenv("LAYER_PROGRESS_TABLE", "layer_progress")
PROGRESS_DEFAULT_KEY = os.getenv("LAYER_PROGRESS_DEFAULT_KEY", "douyin_batch_default")
SERVICE_HOST = os.getenv("AREA_SERVICE_HOST", "0.0.0.0")
SERVICE_PORT = int(os.getenv("AREA_SERVICE_PORT", "9002"))
@@ -29,6 +32,10 @@ WX_CONTEXT_REGEX = re.compile(r"(?i)(?:微信|微.?信|wx|vx|weixin|v信|v号|v)
LAW_FIRM_REGEX = re.compile(r"([\u4e00-\u9fa5A-Za-z·]{2,40}律师事务所)")
RAW_WRITE_LOCK = threading.Lock()
LAWYER_KEYWORDS: Tuple[str, ...] = tuple(
keyword.strip() for keyword in LAWYER_KEYWORDS_ENV.split(",") if keyword.strip()
)
def _is_safe_table_name(table_name: str) -> bool:
return bool(re.fullmatch(r"[A-Za-z0-9_]+", table_name or ""))
@@ -83,6 +90,161 @@ def _save_raw_index_payload(payload: Dict[str, Any], query: Dict[str, List[str]]
return file_path
def _ensure_progress_table() -> None:
if not _is_safe_table_name(PROGRESS_TABLE):
raise ValueError("非法进度表名")
with Db() as db:
cursor = db.db.cursor()
sql = f"""
CREATE TABLE IF NOT EXISTS `{PROGRESS_TABLE}` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`progress_key` varchar(128) NOT NULL,
`next_city_index` int(11) DEFAULT 0,
`area_signature` varchar(128) DEFAULT NULL,
`area_total` int(11) DEFAULT 0,
`current_city` varchar(128) DEFAULT NULL,
`reason` varchar(64) DEFAULT NULL,
`status` varchar(32) DEFAULT NULL,
`device_id` varchar(128) DEFAULT NULL,
`extra_json` longtext,
`updated_at` bigint(20) DEFAULT NULL,
`create_time` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_progress_key` (`progress_key`),
KEY `idx_updated_at` (`updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
cursor.execute(sql)
db.db.commit()
cursor.close()
def _get_progress(progress_key: str) -> Optional[Dict[str, Any]]:
key = str(progress_key or "").strip()
if not key:
return None
_ensure_progress_table()
with Db() as db:
cursor = db.db.cursor()
sql = (
f"SELECT progress_key, next_city_index, area_signature, area_total, current_city, "
f"reason, status, device_id, extra_json, updated_at, create_time "
f"FROM `{PROGRESS_TABLE}` WHERE progress_key=%s LIMIT 1"
)
cursor.execute(sql, (key,))
row = cursor.fetchone()
cursor.close()
if not row:
return None
extra_json = row[8] or ""
extra_obj: Any = {}
if extra_json:
try:
extra_obj = json.loads(extra_json)
except Exception:
extra_obj = extra_json
return {
"progress_key": row[0] or "",
"next_city_index": _parse_int(row[1], 0),
"area_signature": row[2] or "",
"area_total": _parse_int(row[3], 0),
"current_city": row[4] or "",
"reason": row[5] or "",
"status": row[6] or "",
"device_id": row[7] or "",
"extra": extra_obj,
"updated_at": _parse_int(row[9], 0),
"create_time": _parse_int(row[10], 0),
}
def _upsert_progress(progress_key: str, payload: Dict[str, Any]) -> Dict[str, Any]:
key = str(progress_key or "").strip()
if not key:
raise ValueError("progress_key 不能为空")
_ensure_progress_table()
now_ts = int(time.time())
next_city_index = _parse_int(payload.get("next_city_index"), 0)
area_signature = str(payload.get("area_signature") or "").strip()
area_total = _parse_int(payload.get("area_total"), 0)
current_city = str(payload.get("current_city") or "").strip()
reason = str(payload.get("reason") or "").strip()
status = str(payload.get("status") or "").strip()
device_id = str(payload.get("device_id") or "").strip()
extra = payload.get("extra")
if extra is None:
extra = payload.get("extra_json")
if isinstance(extra, str):
extra_json = extra
else:
try:
extra_json = json.dumps(extra or {}, ensure_ascii=False)
except Exception:
extra_json = "{}"
with Db() as db:
cursor = db.db.cursor()
sql = (
f"INSERT INTO `{PROGRESS_TABLE}` "
"(progress_key, next_city_index, area_signature, area_total, current_city, reason, status, "
"device_id, extra_json, updated_at, create_time) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) "
"ON DUPLICATE KEY UPDATE "
"next_city_index=VALUES(next_city_index), "
"area_signature=VALUES(area_signature), "
"area_total=VALUES(area_total), "
"current_city=VALUES(current_city), "
"reason=VALUES(reason), "
"status=VALUES(status), "
"device_id=VALUES(device_id), "
"extra_json=VALUES(extra_json), "
"updated_at=VALUES(updated_at)"
)
cursor.execute(
sql,
(
key,
next_city_index,
area_signature,
area_total,
current_city,
reason,
status,
device_id,
extra_json,
now_ts,
now_ts,
),
)
db.db.commit()
cursor.close()
return _get_progress(key) or {}
def _clear_progress(progress_key: str) -> int:
key = str(progress_key or "").strip()
if not key:
return 0
_ensure_progress_table()
with Db() as db:
cursor = db.db.cursor()
sql = f"DELETE FROM `{PROGRESS_TABLE}` WHERE progress_key=%s"
cursor.execute(sql, (key,))
affected = cursor.rowcount
db.db.commit()
cursor.close()
return affected
def _query_area_data(table_name: str, domain: str) -> List[Dict[str, Any]]:
if not _is_safe_table_name(table_name):
raise ValueError("非法表名")
@@ -193,6 +355,36 @@ def _extract_law_firm_from_user_info(user_info: Dict[str, Any]) -> str:
return ""
def _extract_account_cert_text(user_info: Dict[str, Any]) -> str:
account_cert_info = user_info.get("account_cert_info")
if isinstance(account_cert_info, str) and account_cert_info.strip():
try:
cert_obj = json.loads(account_cert_info)
if isinstance(cert_obj, dict):
return str(cert_obj.get("label_text") or "").strip()
except Exception:
return account_cert_info.strip()
return ""
def _is_lawyer_related_user(user_info: Dict[str, Any], name: str, law_firm: str) -> bool:
texts = [
name,
str(user_info.get("nickname") or ""),
str(user_info.get("signature") or ""),
str(user_info.get("custom_verify") or ""),
str(user_info.get("enterprise_verify_reason") or ""),
str(user_info.get("versatile_display") or ""),
str(user_info.get("unique_id") or ""),
_extract_account_cert_text(user_info),
law_firm,
]
merged = "\n".join(text for text in texts if text).strip()
if not merged:
return False
return any(keyword in merged for keyword in LAWYER_KEYWORDS)
def _pick_first_str(node: Dict[str, Any], keys: Tuple[str, ...]) -> str:
for key in keys:
value = node.get(key)
@@ -399,14 +591,21 @@ def _extract_lawyer_rows_from_payload(
if not isinstance(user_info, dict):
continue
name = str(user_info.get("nickname") or "").strip()
law_firm = _extract_law_firm_from_user_info(user_info)
# 强约束:必须出现“律师/律所”等关键词,避免非法律相关账号入库
if not _is_lawyer_related_user(user_info, name, law_firm):
continue
phones = _extract_phones_from_user_info(user_info)
if not phones:
continue
name = str(user_info.get("nickname") or "").strip()
law_firm = _extract_law_firm_from_user_info(user_info)
sec_uid = str(user_info.get("sec_uid") or "").strip()
url = f"https://www.douyin.com/user/{sec_uid}" if sec_uid else api_url
if not sec_uid:
continue
url = f"https://www.douyin.com/user/{sec_uid}"
province = city_province
city = city_name or city_province
@@ -516,12 +715,67 @@ class AreaSyncHandler(BaseHTTPRequestHandler):
self._write_json(200, rows)
return
if parsed.path == "/api/layer/progress":
progress_key = _first_param(params, "progress_key", PROGRESS_DEFAULT_KEY).strip() or PROGRESS_DEFAULT_KEY
try:
row = _get_progress(progress_key)
except Exception as exc:
self._write_json(500, {"ok": False, "error": str(exc)})
return
self._write_json(
200,
{
"ok": True,
"progress_key": progress_key,
"data": row,
},
)
return
self._write_json(404, {"ok": False, "error": "not found"})
def do_POST(self) -> None:
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
if parsed.path == "/api/layer/progress":
body = self._read_json_body()
if not isinstance(body, dict):
body = {}
progress_key = str(body.get("progress_key") or _first_param(params, "progress_key", PROGRESS_DEFAULT_KEY)).strip() or PROGRESS_DEFAULT_KEY
action = str(body.get("action") or _first_param(params, "action", "upsert")).strip().lower() or "upsert"
try:
if action == "clear":
deleted = _clear_progress(progress_key)
self._write_json(
200,
{
"ok": True,
"action": "clear",
"progress_key": progress_key,
"deleted": deleted,
},
)
return
saved = _upsert_progress(progress_key, body)
self._write_json(
200,
{
"ok": True,
"action": "upsert",
"progress_key": progress_key,
"data": saved,
},
)
return
except Exception as exc:
self._write_json(500, {"ok": False, "error": str(exc)})
return
if parsed.path == "/api/layer/index":
body = self._read_json_body()
if not isinstance(body, dict) or not body:
@@ -578,10 +832,16 @@ class AreaSyncHandler(BaseHTTPRequestHandler):
def run() -> None:
try:
_ensure_progress_table()
except Exception as exc:
print(f"[layer-service] init progress table failed: {exc}")
server = ThreadingHTTPServer((SERVICE_HOST, SERVICE_PORT), AreaSyncHandler)
print(f"[layer-service] running on http://{SERVICE_HOST}:{SERVICE_PORT}")
print(f"[layer-service] get_area -> table/domain: {AREA_TABLE}/{AREA_DOMAIN}")
print(f"[layer-service] index -> save domain: {DOUYIN_DOMAIN}")
print(f"[layer-service] progress table/default key: {PROGRESS_TABLE}/{PROGRESS_DEFAULT_KEY}")
server.serve_forever()