From 5666dc61a0743225f0405bb08a11fc0d3d1d64a1 Mon Sep 17 00:00:00 2001 From: hello-dd-code Date: Sat, 28 Feb 2026 16:37:37 +0800 Subject: [PATCH] =?UTF-8?q?=E8=90=BD=E5=9C=B0=E7=BB=93=E6=9E=84=E5=8C=96?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=B8=8E=E5=9F=BA=E7=A1=80=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E5=91=8A=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/api/main.go | 5 + docs/ops/observability_baseline.md | 43 +++++++++ .../observability_validation_2026-02-28.md | 20 ++++ internal/observability/collector.go | 83 +++++++++++++++++ internal/observability/collector_test.go | 35 +++++++ internal/observability/http.go | 92 +++++++++++++++++++ scripts/ops/check_basic_metrics.sh | 67 ++++++++++++++ 7 files changed, 345 insertions(+) create mode 100644 docs/ops/observability_baseline.md create mode 100644 docs/ops/reports/observability_validation_2026-02-28.md create mode 100644 internal/observability/collector.go create mode 100644 internal/observability/collector_test.go create mode 100644 internal/observability/http.go create mode 100755 scripts/ops/check_basic_metrics.sh diff --git a/cmd/api/main.go b/cmd/api/main.go index 8ae5b4c..b8ff6ca 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -22,6 +22,7 @@ import ( membershipmodel "wx_service/internal/membership/model" membershipservice "wx_service/internal/membership/service" "wx_service/internal/model" + "wx_service/internal/observability" rmhandler "wx_service/internal/remove_watermark/handler" rmmodel "wx_service/internal/remove_watermark/model" rmservice "wx_service/internal/remove_watermark/service" @@ -69,6 +70,10 @@ func main() { gin.SetMode(config.AppConfig.Server.Mode) router := gin.Default() + metricsCollector := observability.NewCollector() + router.Use(observability.RequestLogMiddleware(metricsCollector)) + router.GET("/metrics/basic", observability.BasicMetricsHandler(metricsCollector)) + // 5) 依赖注入:先创建 service,再创建 handler(handler 只关心 HTTP 输入/输出) miniProgramService := authservice.NewMiniProgramService(database.DB) authService := authservice.NewAuthService(database.DB, miniProgramService) diff --git a/docs/ops/observability_baseline.md b/docs/ops/observability_baseline.md new file mode 100644 index 0000000..96241f8 --- /dev/null +++ b/docs/ops/observability_baseline.md @@ -0,0 +1,43 @@ +# 日志收集与基础监控落地 + +对应 issue:`#11 [P1][T8] 日志收集与基础监控落地` + +## 1. 结构化日志 + +- 中间件:`internal/observability/http.go::RequestLogMiddleware` +- 每个请求输出一条 JSON 日志,包含: + - `request_id` + - `method/path/status` + - `latency_ms` + - `client_ip` + - `uid`(若已鉴权) + - `errors`(若有) + +## 2. 核心指标 + +- 采集器:`internal/observability/collector.go` +- 暴露接口:`GET /metrics/basic` +- 指标字段: + - `total_requests` + - `client_errors` + - `server_errors` + - `client_error_rate_pct` + - `server_error_rate_pct` + - `avg_latency_ms` + - `max_latency_ms` + +## 3. 基础告警阈值 + +- 脚本:`scripts/ops/check_basic_metrics.sh` +- 默认阈值: + - `SERVER_ERROR_RATE_THRESHOLD=5`(%) + - `AVG_LATENCY_THRESHOLD_MS=800`(ms) +- 触发后行为: + - 返回非 0 + - 若配置 `OPS_ALERT_WEBHOOK` 则发送告警 + +推荐 cron(每 5 分钟): + +```bash +*/5 * * * * METRICS_URL=http://127.0.0.1:8080/metrics/basic SERVER_ERROR_RATE_THRESHOLD=5 AVG_LATENCY_THRESHOLD_MS=800 OPS_ALERT_WEBHOOK="https://example.com/webhook" /path/to/wx_service/scripts/ops/check_basic_metrics.sh >> /var/log/wx_service-metrics-check.log 2>&1 +``` diff --git a/docs/ops/reports/observability_validation_2026-02-28.md b/docs/ops/reports/observability_validation_2026-02-28.md new file mode 100644 index 0000000..5209a93 --- /dev/null +++ b/docs/ops/reports/observability_validation_2026-02-28.md @@ -0,0 +1,20 @@ +# 可观测性落地验证记录(2026-02-28) + +对应 issue:`#11 [P1][T8] 日志收集与基础监控落地` + +## 验证项 + +1. 单元测试 +- `go test ./internal/observability -v` + +2. 全量测试 +- `go test ./...` + +3. 告警脚本语法检查 +- `bash -n scripts/ops/check_basic_metrics.sh` + +## 结果 + +- 结构化日志中间件与指标采集逻辑可编译并通过测试。 +- `/metrics/basic` 指标接口已接入启动流程。 +- 基础告警阈值脚本语法检查通过。 diff --git a/internal/observability/collector.go b/internal/observability/collector.go new file mode 100644 index 0000000..6a7a800 --- /dev/null +++ b/internal/observability/collector.go @@ -0,0 +1,83 @@ +package observability + +import ( + "sync/atomic" + "time" +) + +type Snapshot struct { + GeneratedAtUTC string `json:"generated_at_utc"` + UptimeSeconds int64 `json:"uptime_seconds"` + TotalRequests uint64 `json:"total_requests"` + ClientErrors uint64 `json:"client_errors"` + ServerErrors uint64 `json:"server_errors"` + ClientErrorRatePct float64 `json:"client_error_rate_pct"` + ServerErrorRatePct float64 `json:"server_error_rate_pct"` + AvgLatencyMs float64 `json:"avg_latency_ms"` + MaxLatencyMs float64 `json:"max_latency_ms"` +} + +type Collector struct { + startedAt time.Time + totalRequests uint64 + clientErrors uint64 + serverErrors uint64 + totalLatencyNs uint64 + maxLatencyNs uint64 +} + +func NewCollector() *Collector { + return &Collector{startedAt: time.Now()} +} + +func (c *Collector) Observe(status int, latency time.Duration) { + atomic.AddUint64(&c.totalRequests, 1) + atomic.AddUint64(&c.totalLatencyNs, uint64(latency.Nanoseconds())) + + if status >= 500 { + atomic.AddUint64(&c.serverErrors, 1) + } else if status >= 400 { + atomic.AddUint64(&c.clientErrors, 1) + } + + latNs := uint64(latency.Nanoseconds()) + for { + old := atomic.LoadUint64(&c.maxLatencyNs) + if latNs <= old { + break + } + if atomic.CompareAndSwapUint64(&c.maxLatencyNs, old, latNs) { + break + } + } +} + +func (c *Collector) Snapshot() Snapshot { + now := time.Now() + total := atomic.LoadUint64(&c.totalRequests) + clientErr := atomic.LoadUint64(&c.clientErrors) + serverErr := atomic.LoadUint64(&c.serverErrors) + totalLatencyNs := atomic.LoadUint64(&c.totalLatencyNs) + maxLatencyNs := atomic.LoadUint64(&c.maxLatencyNs) + + var clientRate float64 + var serverRate float64 + var avgLatencyMs float64 + if total > 0 { + clientRate = float64(clientErr) * 100 / float64(total) + serverRate = float64(serverErr) * 100 / float64(total) + avgLatencyMs = float64(totalLatencyNs) / float64(total) / 1e6 + } + + return Snapshot{ + GeneratedAtUTC: now.UTC().Format(time.RFC3339), + UptimeSeconds: int64(now.Sub(c.startedAt).Seconds()), + TotalRequests: total, + ClientErrors: clientErr, + ServerErrors: serverErr, + ClientErrorRatePct: clientRate, + ServerErrorRatePct: serverRate, + AvgLatencyMs: avgLatencyMs, + MaxLatencyMs: float64(maxLatencyNs) / 1e6, + } +} diff --git a/internal/observability/collector_test.go b/internal/observability/collector_test.go new file mode 100644 index 0000000..a75ed10 --- /dev/null +++ b/internal/observability/collector_test.go @@ -0,0 +1,35 @@ +package observability + +import ( + "testing" + "time" +) + +func TestCollectorSnapshot(t *testing.T) { + t.Parallel() + + c := NewCollector() + c.Observe(200, 100*time.Millisecond) + c.Observe(404, 300*time.Millisecond) + c.Observe(500, 500*time.Millisecond) + + s := c.Snapshot() + if s.TotalRequests != 3 { + t.Fatalf("total_requests=%d, want=3", s.TotalRequests) + } + if s.ClientErrors != 1 { + t.Fatalf("client_errors=%d, want=1", s.ClientErrors) + } + if s.ServerErrors != 1 { + t.Fatalf("server_errors=%d, want=1", s.ServerErrors) + } + if s.ServerErrorRatePct <= 0 { + t.Fatalf("server_error_rate_pct=%f, want>0", s.ServerErrorRatePct) + } + if s.AvgLatencyMs <= 0 { + t.Fatalf("avg_latency_ms=%f, want>0", s.AvgLatencyMs) + } + if s.MaxLatencyMs < 500 { + t.Fatalf("max_latency_ms=%f, want>=500", s.MaxLatencyMs) + } +} diff --git a/internal/observability/http.go b/internal/observability/http.go new file mode 100644 index 0000000..8965f8f --- /dev/null +++ b/internal/observability/http.go @@ -0,0 +1,92 @@ +package observability + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + "sync/atomic" + "time" + + "github.com/gin-gonic/gin" + + "wx_service/internal/middleware" +) + +var requestSeq uint64 + +func RequestLogMiddleware(metrics *Collector) gin.HandlerFunc { + return func(c *gin.Context) { + startedAt := time.Now() + + requestID := strings.TrimSpace(c.GetHeader("X-Request-ID")) + if requestID == "" { + seq := atomic.AddUint64(&requestSeq, 1) + requestID = fmt.Sprintf("req-%d-%d", time.Now().UnixNano(), seq) + } + c.Writer.Header().Set("X-Request-ID", requestID) + + c.Next() + + status := c.Writer.Status() + latency := time.Since(startedAt) + if metrics != nil { + metrics.Observe(status, latency) + } + + path := c.FullPath() + if path == "" { + path = c.Request.URL.Path + } + + level := "info" + if status >= 500 { + level = "error" + } else if status >= 400 { + level = "warn" + } + + entry := map[string]any{ + "ts": time.Now().UTC().Format(time.RFC3339), + "level": level, + "request_id": requestID, + "method": c.Request.Method, + "path": path, + "status": status, + "latency_ms": float64(latency.Microseconds()) / 1000.0, + "client_ip": c.ClientIP(), + } + + if user, ok := middleware.CurrentUser(c); ok && user != nil { + entry["uid"] = user.ID + } + if len(c.Errors) > 0 { + entry["errors"] = c.Errors.String() + } + + payload, err := json.Marshal(entry) + if err != nil { + log.Printf("observability marshal log failed: %v", err) + return + } + log.Println(string(payload)) + } +} + +func BasicMetricsHandler(metrics *Collector) gin.HandlerFunc { + return func(c *gin.Context) { + if metrics == nil { + c.JSON(http.StatusServiceUnavailable, gin.H{ + "code": http.StatusServiceUnavailable, + "message": "metrics not initialized", + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "code": http.StatusOK, + "message": "success", + "data": metrics.Snapshot(), + }) + } +} diff --git a/scripts/ops/check_basic_metrics.sh b/scripts/ops/check_basic_metrics.sh new file mode 100755 index 0000000..6a98a79 --- /dev/null +++ b/scripts/ops/check_basic_metrics.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +set -euo pipefail + +# 用法: +# METRICS_URL=http://127.0.0.1:8080/metrics/basic \ +# SERVER_ERROR_RATE_THRESHOLD=5 AVG_LATENCY_THRESHOLD_MS=800 \ +# OPS_ALERT_WEBHOOK=https://example.com/webhook ./scripts/ops/check_basic_metrics.sh + +METRICS_URL="${METRICS_URL:-http://127.0.0.1:8080/metrics/basic}" +SERVER_ERROR_RATE_THRESHOLD="${SERVER_ERROR_RATE_THRESHOLD:-5}" +AVG_LATENCY_THRESHOLD_MS="${AVG_LATENCY_THRESHOLD_MS:-800}" +OPS_ALERT_WEBHOOK="${OPS_ALERT_WEBHOOK:-}" +ALERT_TITLE="${ALERT_TITLE:-[wx_service] 基础监控告警}" + +send_alert() { + local message="$1" + if [[ -z "${OPS_ALERT_WEBHOOK}" ]]; then + echo "ALERT: ${message}" >&2 + return + fi + curl -fsS -X POST "${OPS_ALERT_WEBHOOK}" \ + -H "Content-Type: application/json" \ + -d "{\"title\":\"${ALERT_TITLE}\",\"message\":\"${message}\"}" >/dev/null || true +} + +response="$(curl -fsS "${METRICS_URL}" || true)" +if [[ -z "${response}" ]]; then + send_alert "无法访问 metrics 接口:${METRICS_URL}" + exit 1 +fi + +set +e +check_output="$(echo "${response}" | python3 - "${SERVER_ERROR_RATE_THRESHOLD}" "${AVG_LATENCY_THRESHOLD_MS}" <<'PY' +import json +import sys + +payload = json.load(sys.stdin) +data = payload.get("data", {}) +server_error = float(data.get("server_error_rate_pct", 0.0)) +avg_latency = float(data.get("avg_latency_ms", 0.0)) + +server_th = float(sys.argv[1]) +latency_th = float(sys.argv[2]) + +problems = [] +if server_error > server_th: + problems.append(f"server_error_rate_pct={server_error:.2f}% > {server_th:.2f}%") +if avg_latency > latency_th: + problems.append(f"avg_latency_ms={avg_latency:.2f} > {latency_th:.2f}") + +if problems: + print("; ".join(problems)) + sys.exit(1) + +print(f"ok server_error_rate_pct={server_error:.2f}% avg_latency_ms={avg_latency:.2f}") +PY +)" +check_exit=$? +set -e + +if [[ ${check_exit} -ne 0 ]]; then + send_alert "基础监控阈值触发:${check_output}" + exit 1 +fi + +echo "${check_output}" +