import json import os import re import sys import threading import time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from urllib.parse import parse_qs, urlparse current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) if project_root not in sys.path: sys.path.append(project_root) from Db import Db AREA_TABLE = os.getenv("AREA_TARGET_TABLE", "area_new") AREA_DOMAIN = os.getenv("AREA_DOMAIN", "maxlaw") DOUYIN_DOMAIN = os.getenv("DOUYIN_DOMAIN", "抖音") DOUYIN_RAW_DIR = os.getenv("DOUYIN_RAW_DIR", os.path.join(project_root, "data", "douyin_raw")) DOUYIN_SAVE_ONLY_ENV = os.getenv("DOUYIN_SAVE_ONLY", "1") LAWYER_KEYWORDS_ENV = os.getenv("DOUYIN_LAWYER_KEYWORDS", "律师,律所") PROGRESS_TABLE = os.getenv("LAYER_PROGRESS_TABLE", "layer_progress") PROGRESS_DEFAULT_KEY = os.getenv("LAYER_PROGRESS_DEFAULT_KEY", "douyin_batch_default") SERVICE_HOST = os.getenv("AREA_SERVICE_HOST", "0.0.0.0") SERVICE_PORT = int(os.getenv("AREA_SERVICE_PORT", "9002")) PHONE_REGEX = re.compile(r"(?:\+?86[-\s]?)?(1[3-9]\d{9})") WX_CONTEXT_REGEX = re.compile(r"(?i)(?:微信|微.?信|wx|vx|weixin|v信|v号|v)\s*[::/\-\s]\s*([a-zA-Z0-9._-]{3,40})") LAW_FIRM_REGEX = re.compile(r"([\u4e00-\u9fa5A-Za-z·]{2,40}律师事务所)") RAW_WRITE_LOCK = threading.Lock() LAWYER_KEYWORDS: Tuple[str, ...] = tuple( keyword.strip() for keyword in LAWYER_KEYWORDS_ENV.split(",") if keyword.strip() ) def _is_safe_table_name(table_name: str) -> bool: return bool(re.fullmatch(r"[A-Za-z0-9_]+", table_name or "")) def _parse_int(value: Any, default: int = 0) -> int: try: return int(str(value).strip()) except Exception: return default def _parse_bool(value: Any, default: bool = False) -> bool: if value is None: return default text = str(value).strip().lower() if text in {"1", "true", "yes", "y", "on"}: return True if text in {"0", "false", "no", "n", "off"}: return False return default def _first_param(params: Dict[str, List[str]], key: str, default: str = "") -> str: values = params.get(key) or [] if not values: return default return values[0] def _append_jsonl(file_path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(file_path) or ".", exist_ok=True) line = json.dumps(payload, ensure_ascii=False) with RAW_WRITE_LOCK: with open(file_path, "a", encoding="utf-8") as out: out.write(line) out.write("\n") def _save_raw_index_payload(payload: Dict[str, Any], query: Dict[str, List[str]], client_ip: str) -> str: now_ts = int(time.time()) day = time.strftime("%Y%m%d", time.localtime(now_ts)) file_path = os.path.join(DOUYIN_RAW_DIR, f"douyin_index_{day}.jsonl") wrapped = { "received_at": now_ts, "client_ip": client_ip, "query": query, "payload": payload, } _append_jsonl(file_path, wrapped) return file_path def _ensure_progress_table() -> None: if not _is_safe_table_name(PROGRESS_TABLE): raise ValueError("非法进度表名") with Db() as db: cursor = db.db.cursor() sql = f""" CREATE TABLE IF NOT EXISTS `{PROGRESS_TABLE}` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `progress_key` varchar(128) NOT NULL, `next_city_index` int(11) DEFAULT 0, `area_signature` varchar(128) DEFAULT NULL, `area_total` int(11) DEFAULT 0, `current_city` varchar(128) DEFAULT NULL, `reason` varchar(64) DEFAULT NULL, `status` varchar(32) DEFAULT NULL, `device_id` varchar(128) DEFAULT NULL, `extra_json` longtext, `updated_at` bigint(20) DEFAULT NULL, `create_time` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `uk_progress_key` (`progress_key`), KEY `idx_updated_at` (`updated_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ cursor.execute(sql) db.db.commit() cursor.close() def _get_progress(progress_key: str) -> Optional[Dict[str, Any]]: key = str(progress_key or "").strip() if not key: return None _ensure_progress_table() with Db() as db: cursor = db.db.cursor() sql = ( f"SELECT progress_key, next_city_index, area_signature, area_total, current_city, " f"reason, status, device_id, extra_json, updated_at, create_time " f"FROM `{PROGRESS_TABLE}` WHERE progress_key=%s LIMIT 1" ) cursor.execute(sql, (key,)) row = cursor.fetchone() cursor.close() if not row: return None extra_json = row[8] or "" extra_obj: Any = {} if extra_json: try: extra_obj = json.loads(extra_json) except Exception: extra_obj = extra_json return { "progress_key": row[0] or "", "next_city_index": _parse_int(row[1], 0), "area_signature": row[2] or "", "area_total": _parse_int(row[3], 0), "current_city": row[4] or "", "reason": row[5] or "", "status": row[6] or "", "device_id": row[7] or "", "extra": extra_obj, "updated_at": _parse_int(row[9], 0), "create_time": _parse_int(row[10], 0), } def _upsert_progress(progress_key: str, payload: Dict[str, Any]) -> Dict[str, Any]: key = str(progress_key or "").strip() if not key: raise ValueError("progress_key 不能为空") _ensure_progress_table() now_ts = int(time.time()) next_city_index = _parse_int(payload.get("next_city_index"), 0) area_signature = str(payload.get("area_signature") or "").strip() area_total = _parse_int(payload.get("area_total"), 0) current_city = str(payload.get("current_city") or "").strip() reason = str(payload.get("reason") or "").strip() status = str(payload.get("status") or "").strip() device_id = str(payload.get("device_id") or "").strip() extra = payload.get("extra") if extra is None: extra = payload.get("extra_json") if isinstance(extra, str): extra_json = extra else: try: extra_json = json.dumps(extra or {}, ensure_ascii=False) except Exception: extra_json = "{}" with Db() as db: cursor = db.db.cursor() sql = ( f"INSERT INTO `{PROGRESS_TABLE}` " "(progress_key, next_city_index, area_signature, area_total, current_city, reason, status, " "device_id, extra_json, updated_at, create_time) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " "ON DUPLICATE KEY UPDATE " "next_city_index=VALUES(next_city_index), " "area_signature=VALUES(area_signature), " "area_total=VALUES(area_total), " "current_city=VALUES(current_city), " "reason=VALUES(reason), " "status=VALUES(status), " "device_id=VALUES(device_id), " "extra_json=VALUES(extra_json), " "updated_at=VALUES(updated_at)" ) cursor.execute( sql, ( key, next_city_index, area_signature, area_total, current_city, reason, status, device_id, extra_json, now_ts, now_ts, ), ) db.db.commit() cursor.close() return _get_progress(key) or {} def _clear_progress(progress_key: str) -> int: key = str(progress_key or "").strip() if not key: return 0 _ensure_progress_table() with Db() as db: cursor = db.db.cursor() sql = f"DELETE FROM `{PROGRESS_TABLE}` WHERE progress_key=%s" cursor.execute(sql, (key,)) affected = cursor.rowcount db.db.commit() cursor.close() return affected def _query_area_data(table_name: str, domain: str) -> List[Dict[str, Any]]: if not _is_safe_table_name(table_name): raise ValueError("非法表名") with Db() as db: cursor = db.db.cursor() sql = ( f"SELECT province, city, name, pid, pinyin, code, domain, level, create_time " f"FROM `{table_name}` WHERE domain=%s ORDER BY id ASC" ) cursor.execute(sql, (domain,)) rows = cursor.fetchall() cursor.close() result: List[Dict[str, Any]] = [] for row in rows: result.append( { "province": row[0] or "", "city": row[1] or "", "name": row[2] or "", "pid": row[3] if row[3] is not None else 0, "pinyin": row[4] or "", "code": row[5] or "", "domain": row[6] or "", "level": row[7] if row[7] is not None else 0, "create_time": row[8] if row[8] is not None else 0, } ) return result def _iter_dict_nodes(value: Any) -> Iterable[Dict[str, Any]]: stack: List[Any] = [value] while stack: current = stack.pop() if isinstance(current, dict): yield current stack.extend(current.values()) elif isinstance(current, list): stack.extend(current) def _extract_phones_from_text(text: str) -> List[str]: phones: List[str] = [] seen: Set[str] = set() for match in PHONE_REGEX.finditer(text or ""): phone = match.group(1) if not phone or phone in seen: continue seen.add(phone) phones.append(phone) return phones def _extract_phones_from_user_info(user_info: Dict[str, Any]) -> List[str]: signature = str(user_info.get("signature") or "") unique_id = str(user_info.get("unique_id") or "") versatile = str(user_info.get("versatile_display") or "") # 1) 优先从简介直接匹配手机号 phones = set(_extract_phones_from_text(signature)) if phones: return sorted(phones) # 2) 从微信相关标记中提取,再从抖音号字段兜底 for text in (signature, unique_id, versatile): for match in WX_CONTEXT_REGEX.finditer(text): wx_value = match.group(1) or "" for phone in _extract_phones_from_text(wx_value): phones.add(phone) for text in (unique_id, versatile): for phone in _extract_phones_from_text(text): phones.add(phone) return sorted(phones) def _extract_law_firm_from_user_info(user_info: Dict[str, Any]) -> str: candidates: List[str] = [] signature = str(user_info.get("signature") or "") if signature: candidates.append(signature) verify_reason = str(user_info.get("enterprise_verify_reason") or "") if verify_reason: candidates.append(verify_reason) cert_text = "" account_cert_info = user_info.get("account_cert_info") if isinstance(account_cert_info, str) and account_cert_info.strip(): try: cert_obj = json.loads(account_cert_info) if isinstance(cert_obj, dict): cert_text = str(cert_obj.get("label_text") or "").strip() except Exception: cert_text = account_cert_info.strip() if cert_text: candidates.append(cert_text) for text in candidates: match = LAW_FIRM_REGEX.search(text) if match: return match.group(1) return "" def _extract_account_cert_text(user_info: Dict[str, Any]) -> str: account_cert_info = user_info.get("account_cert_info") if isinstance(account_cert_info, str) and account_cert_info.strip(): try: cert_obj = json.loads(account_cert_info) if isinstance(cert_obj, dict): return str(cert_obj.get("label_text") or "").strip() except Exception: return account_cert_info.strip() return "" def _is_lawyer_related_user(user_info: Dict[str, Any], name: str, law_firm: str) -> bool: texts = [ name, str(user_info.get("nickname") or ""), str(user_info.get("signature") or ""), str(user_info.get("custom_verify") or ""), str(user_info.get("enterprise_verify_reason") or ""), str(user_info.get("versatile_display") or ""), str(user_info.get("unique_id") or ""), _extract_account_cert_text(user_info), law_firm, ] merged = "\n".join(text for text in texts if text).strip() if not merged: return False return any(keyword in merged for keyword in LAWYER_KEYWORDS) def _pick_first_str(node: Dict[str, Any], keys: Tuple[str, ...]) -> str: for key in keys: value = node.get(key) if isinstance(value, str): text = value.strip() if text: return text return "" def _extract_name(node: Dict[str, Any]) -> str: direct = _pick_first_str(node, ("name", "nickname", "nick_name", "author_name", "title", "account_name")) if direct: return direct for nested_key in ("author", "user", "user_info", "profile", "account"): nested = node.get(nested_key) if isinstance(nested, dict): nested_name = _pick_first_str(nested, ("name", "nickname", "nick_name", "author_name", "title")) if nested_name: return nested_name return "" def _extract_law_firm(node: Dict[str, Any]) -> str: direct = _pick_first_str( node, ( "law_firm", "firm", "lawFirm", "office", "org_name", "organization", "company", "enterprise", ), ) if direct: return direct enterprise = node.get("enterprise") if isinstance(enterprise, dict): company_name = _pick_first_str(enterprise, ("name", "company_name", "enterprise_name")) if company_name: return company_name return "" def _extract_detail_url(node: Dict[str, Any], fallback_api_url: str) -> str: url = _pick_first_str(node, ("share_url", "url", "web_url", "detail_url", "jump_url")) if url: return url aweme_id = node.get("aweme_id") or node.get("item_id") if aweme_id: aid = str(aweme_id).strip() if aid: return f"https://www.douyin.com/video/{aid}" sec_uid = node.get("sec_uid") if sec_uid: sec_uid_text = str(sec_uid).strip() if sec_uid_text: return f"https://www.douyin.com/user/{sec_uid_text}" return fallback_api_url def _city_from_index(city_index: int, table_name: str, domain: str) -> Tuple[str, str]: if city_index < 0: return "", "" try: areas = _query_area_data(table_name, domain) except Exception: return "", "" if city_index >= len(areas): return "", "" area = areas[city_index] province = str(area.get("province") or "").strip() city = str(area.get("city") or province).strip() return province, city def _existing_phones(domain: str, phones: List[str]) -> Set[str]: if not phones: return set() deduped = sorted({p for p in phones if p}) if not deduped: return set() existing: Set[str] = set() with Db() as db: cursor = db.db.cursor() chunk_size = 500 for i in range(0, len(deduped), chunk_size): chunk = deduped[i:i + chunk_size] placeholders = ",".join(["%s"] * len(chunk)) sql = f"SELECT phone FROM lawyer WHERE domain=%s AND phone IN ({placeholders})" cursor.execute(sql, [domain, *chunk]) for row in cursor.fetchall(): existing.add(str(row[0])) cursor.close() return existing def _insert_lawyer_rows(rows: List[Dict[str, Any]], domain: str) -> Tuple[int, int]: if not rows: return 0, 0 def row_score(item: Dict[str, Any]) -> int: score = 0 if str(item.get("name") or "").strip(): score += 5 if str(item.get("law_firm") or "").strip(): score += 3 if str(item.get("url") or "").strip(): score += 1 if str(item.get("province") or "").strip() or str(item.get("city") or "").strip(): score += 1 phone_count_in_node = _parse_int(item.get("phone_count_in_node"), 1) if phone_count_in_node > 1: score -= (phone_count_in_node - 1) return score deduped_by_phone: Dict[str, Dict[str, Any]] = {} skipped = 0 for row in rows: phone = str(row.get("phone") or "").strip() if not phone: skipped += 1 continue old_row = deduped_by_phone.get(phone) if old_row is not None: if row_score(row) > row_score(old_row): deduped_by_phone[phone] = row skipped += 1 continue deduped_by_phone[phone] = row existing = _existing_phones(domain, list(deduped_by_phone.keys())) inserted = 0 with Db() as db: cursor = db.db.cursor() sql = ( "INSERT INTO lawyer " "(name, phone, law_firm, province, city, url, domain, create_time, site_time, params) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" ) for phone, row in deduped_by_phone.items(): if phone in existing: skipped += 1 continue cursor.execute( sql, ( row.get("name") or "", phone, row.get("law_firm") or "", row.get("province") or "", row.get("city") or "", row.get("url") or "", domain, _parse_int(row.get("create_time"), int(time.time())), _parse_int(row.get("site_time"), int(time.time())), row.get("params") or "{}", ), ) inserted += 1 existing.add(phone) db.db.commit() cursor.close() return inserted, skipped def _extract_lawyer_rows_from_payload( payload: Dict[str, Any], area_table: str, area_domain: str, save_domain: str, ) -> List[Dict[str, Any]]: now_ts = int(time.time()) api_url = str(payload.get("url") or "").strip() city_index = _parse_int(payload.get("cityIndex"), -1) city_province, city_name = _city_from_index(city_index, area_table, area_domain) rows: List[Dict[str, Any]] = [] data = payload.get("data") if isinstance(payload, dict) else None user_list = data.get("user_list") if isinstance(data, dict) else None if not isinstance(user_list, list): return rows for user_item in user_list: if not isinstance(user_item, dict): continue user_info = user_item.get("user_info") if not isinstance(user_info, dict): continue name = str(user_info.get("nickname") or "").strip() law_firm = _extract_law_firm_from_user_info(user_info) # 强约束:必须出现“律师/律所”等关键词,避免非法律相关账号入库 if not _is_lawyer_related_user(user_info, name, law_firm): continue phones = _extract_phones_from_user_info(user_info) if not phones: continue sec_uid = str(user_info.get("sec_uid") or "").strip() if not sec_uid: continue url = f"https://www.douyin.com/user/{sec_uid}" province = city_province city = city_name or city_province source_record = { "source": "douyin", "api_source": payload.get("source") or "", "api_url": api_url, "city_index": city_index, "captured_at": now_ts, "sec_uid": sec_uid, "user_info": { "uid": user_info.get("uid"), "nickname": user_info.get("nickname"), "signature": user_info.get("signature"), "unique_id": user_info.get("unique_id"), "versatile_display": user_info.get("versatile_display"), }, } for phone in phones: rows.append( { "name": name, "phone": phone, "law_firm": law_firm, "province": province, "city": city, "url": url, "domain": save_domain, "create_time": now_ts, "site_time": _parse_int(payload.get("ts"), now_ts), "phone_count_in_node": len(phones), "params": json.dumps(source_record, ensure_ascii=False), } ) return rows class AreaSyncHandler(BaseHTTPRequestHandler): server_version = "AreaSyncService/2.0" def _write_json(self, status: int, payload: Any) -> None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() self.wfile.write(body) def _read_json_body(self) -> Any: length = _parse_int(self.headers.get("Content-Length"), 0) if length <= 0: return {} raw = self.rfile.read(length) if not raw: return {} try: return json.loads(raw.decode("utf-8")) except Exception: return {} def do_OPTIONS(self) -> None: self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def do_GET(self) -> None: parsed = urlparse(self.path) params = parse_qs(parsed.query) if parsed.path == "/health": self._write_json(200, {"ok": True, "service": "layer-service"}) return if parsed.path == "/api/layer/get_area": table_name = _first_param(params, "table", AREA_TABLE).strip() or AREA_TABLE domain = _first_param(params, "domain", AREA_DOMAIN).strip() or AREA_DOMAIN with_meta = _parse_bool(_first_param(params, "meta", "0"), False) try: rows = _query_area_data(table_name, domain) except Exception as exc: self._write_json(500, {"ok": False, "error": str(exc)}) return if with_meta: self._write_json( 200, { "ok": True, "count": len(rows), "table": table_name, "domain": domain, "data": rows, }, ) else: self._write_json(200, rows) return if parsed.path == "/api/layer/progress": progress_key = _first_param(params, "progress_key", PROGRESS_DEFAULT_KEY).strip() or PROGRESS_DEFAULT_KEY try: row = _get_progress(progress_key) except Exception as exc: self._write_json(500, {"ok": False, "error": str(exc)}) return self._write_json( 200, { "ok": True, "progress_key": progress_key, "data": row, }, ) return self._write_json(404, {"ok": False, "error": "not found"}) def do_POST(self) -> None: parsed = urlparse(self.path) params = parse_qs(parsed.query) if parsed.path == "/api/layer/progress": body = self._read_json_body() if not isinstance(body, dict): body = {} progress_key = str(body.get("progress_key") or _first_param(params, "progress_key", PROGRESS_DEFAULT_KEY)).strip() or PROGRESS_DEFAULT_KEY action = str(body.get("action") or _first_param(params, "action", "upsert")).strip().lower() or "upsert" try: if action == "clear": deleted = _clear_progress(progress_key) self._write_json( 200, { "ok": True, "action": "clear", "progress_key": progress_key, "deleted": deleted, }, ) return saved = _upsert_progress(progress_key, body) self._write_json( 200, { "ok": True, "action": "upsert", "progress_key": progress_key, "data": saved, }, ) return except Exception as exc: self._write_json(500, {"ok": False, "error": str(exc)}) return if parsed.path == "/api/layer/index": body = self._read_json_body() if not isinstance(body, dict) or not body: self._write_json(400, {"ok": False, "error": "invalid json body"}) return area_table = _first_param(params, "table", AREA_TABLE).strip() or AREA_TABLE area_domain = _first_param(params, "area_domain", AREA_DOMAIN).strip() or AREA_DOMAIN save_domain = _first_param(params, "save_domain", DOUYIN_DOMAIN).strip() or DOUYIN_DOMAIN save_only_default = _parse_bool(DOUYIN_SAVE_ONLY_ENV, True) save_only = _parse_bool(_first_param(params, "save_only", DOUYIN_SAVE_ONLY_ENV), save_only_default) try: saved_file = _save_raw_index_payload(body, params, self.client_address[0] if self.client_address else "") except Exception as exc: self._write_json(500, {"ok": False, "error": f"save raw payload failed: {exc}"}) return if save_only: self._write_json( 200, { "ok": True, "message": "saved_only", "save_only": True, "saved_file": saved_file, }, ) return try: extracted = _extract_lawyer_rows_from_payload(body, area_table, area_domain, save_domain) inserted, skipped = _insert_lawyer_rows(extracted, save_domain) except Exception as exc: self._write_json(500, {"ok": False, "error": str(exc)}) return self._write_json( 200, { "ok": True, "message": "received", "save_domain": save_domain, "save_only": False, "saved_file": saved_file, "extracted": len(extracted), "inserted": inserted, "skipped": skipped, }, ) return self._write_json(404, {"ok": False, "error": "not found"}) def run() -> None: try: _ensure_progress_table() except Exception as exc: print(f"[layer-service] init progress table failed: {exc}") server = ThreadingHTTPServer((SERVICE_HOST, SERVICE_PORT), AreaSyncHandler) print(f"[layer-service] running on http://{SERVICE_HOST}:{SERVICE_PORT}") print(f"[layer-service] get_area -> table/domain: {AREA_TABLE}/{AREA_DOMAIN}") print(f"[layer-service] index -> save domain: {DOUYIN_DOMAIN}") print(f"[layer-service] progress table/default key: {PROGRESS_TABLE}/{PROGRESS_DEFAULT_KEY}") server.serve_forever() if __name__ == "__main__": run()