落地结构化日志与基础监控告警
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
membershipmodel "wx_service/internal/membership/model"
|
||||
membershipservice "wx_service/internal/membership/service"
|
||||
"wx_service/internal/model"
|
||||
"wx_service/internal/observability"
|
||||
rmhandler "wx_service/internal/remove_watermark/handler"
|
||||
rmmodel "wx_service/internal/remove_watermark/model"
|
||||
rmservice "wx_service/internal/remove_watermark/service"
|
||||
@@ -69,6 +70,10 @@ func main() {
|
||||
gin.SetMode(config.AppConfig.Server.Mode)
|
||||
router := gin.Default()
|
||||
|
||||
metricsCollector := observability.NewCollector()
|
||||
router.Use(observability.RequestLogMiddleware(metricsCollector))
|
||||
router.GET("/metrics/basic", observability.BasicMetricsHandler(metricsCollector))
|
||||
|
||||
// 5) 依赖注入:先创建 service,再创建 handler(handler 只关心 HTTP 输入/输出)
|
||||
miniProgramService := authservice.NewMiniProgramService(database.DB)
|
||||
authService := authservice.NewAuthService(database.DB, miniProgramService)
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
# 日志收集与基础监控落地
|
||||
|
||||
对应 issue:`#11 [P1][T8] 日志收集与基础监控落地`
|
||||
|
||||
## 1. 结构化日志
|
||||
|
||||
- 中间件:`internal/observability/http.go::RequestLogMiddleware`
|
||||
- 每个请求输出一条 JSON 日志,包含:
|
||||
- `request_id`
|
||||
- `method/path/status`
|
||||
- `latency_ms`
|
||||
- `client_ip`
|
||||
- `uid`(若已鉴权)
|
||||
- `errors`(若有)
|
||||
|
||||
## 2. 核心指标
|
||||
|
||||
- 采集器:`internal/observability/collector.go`
|
||||
- 暴露接口:`GET /metrics/basic`
|
||||
- 指标字段:
|
||||
- `total_requests`
|
||||
- `client_errors`
|
||||
- `server_errors`
|
||||
- `client_error_rate_pct`
|
||||
- `server_error_rate_pct`
|
||||
- `avg_latency_ms`
|
||||
- `max_latency_ms`
|
||||
|
||||
## 3. 基础告警阈值
|
||||
|
||||
- 脚本:`scripts/ops/check_basic_metrics.sh`
|
||||
- 默认阈值:
|
||||
- `SERVER_ERROR_RATE_THRESHOLD=5`(%)
|
||||
- `AVG_LATENCY_THRESHOLD_MS=800`(ms)
|
||||
- 触发后行为:
|
||||
- 返回非 0
|
||||
- 若配置 `OPS_ALERT_WEBHOOK` 则发送告警
|
||||
|
||||
推荐 cron(每 5 分钟):
|
||||
|
||||
```bash
|
||||
*/5 * * * * METRICS_URL=http://127.0.0.1:8080/metrics/basic SERVER_ERROR_RATE_THRESHOLD=5 AVG_LATENCY_THRESHOLD_MS=800 OPS_ALERT_WEBHOOK="https://example.com/webhook" /path/to/wx_service/scripts/ops/check_basic_metrics.sh >> /var/log/wx_service-metrics-check.log 2>&1
|
||||
```
|
||||
@@ -0,0 +1,20 @@
|
||||
# 可观测性落地验证记录(2026-02-28)
|
||||
|
||||
对应 issue:`#11 [P1][T8] 日志收集与基础监控落地`
|
||||
|
||||
## 验证项
|
||||
|
||||
1. 单元测试
|
||||
- `go test ./internal/observability -v`
|
||||
|
||||
2. 全量测试
|
||||
- `go test ./...`
|
||||
|
||||
3. 告警脚本语法检查
|
||||
- `bash -n scripts/ops/check_basic_metrics.sh`
|
||||
|
||||
## 结果
|
||||
|
||||
- 结构化日志中间件与指标采集逻辑可编译并通过测试。
|
||||
- `/metrics/basic` 指标接口已接入启动流程。
|
||||
- 基础告警阈值脚本语法检查通过。
|
||||
@@ -0,0 +1,83 @@
|
||||
package observability
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Snapshot struct {
|
||||
GeneratedAtUTC string `json:"generated_at_utc"`
|
||||
UptimeSeconds int64 `json:"uptime_seconds"`
|
||||
TotalRequests uint64 `json:"total_requests"`
|
||||
ClientErrors uint64 `json:"client_errors"`
|
||||
ServerErrors uint64 `json:"server_errors"`
|
||||
ClientErrorRatePct float64 `json:"client_error_rate_pct"`
|
||||
ServerErrorRatePct float64 `json:"server_error_rate_pct"`
|
||||
AvgLatencyMs float64 `json:"avg_latency_ms"`
|
||||
MaxLatencyMs float64 `json:"max_latency_ms"`
|
||||
}
|
||||
|
||||
type Collector struct {
|
||||
startedAt time.Time
|
||||
totalRequests uint64
|
||||
clientErrors uint64
|
||||
serverErrors uint64
|
||||
totalLatencyNs uint64
|
||||
maxLatencyNs uint64
|
||||
}
|
||||
|
||||
func NewCollector() *Collector {
|
||||
return &Collector{startedAt: time.Now()}
|
||||
}
|
||||
|
||||
func (c *Collector) Observe(status int, latency time.Duration) {
|
||||
atomic.AddUint64(&c.totalRequests, 1)
|
||||
atomic.AddUint64(&c.totalLatencyNs, uint64(latency.Nanoseconds()))
|
||||
|
||||
if status >= 500 {
|
||||
atomic.AddUint64(&c.serverErrors, 1)
|
||||
} else if status >= 400 {
|
||||
atomic.AddUint64(&c.clientErrors, 1)
|
||||
}
|
||||
|
||||
latNs := uint64(latency.Nanoseconds())
|
||||
for {
|
||||
old := atomic.LoadUint64(&c.maxLatencyNs)
|
||||
if latNs <= old {
|
||||
break
|
||||
}
|
||||
if atomic.CompareAndSwapUint64(&c.maxLatencyNs, old, latNs) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) Snapshot() Snapshot {
|
||||
now := time.Now()
|
||||
total := atomic.LoadUint64(&c.totalRequests)
|
||||
clientErr := atomic.LoadUint64(&c.clientErrors)
|
||||
serverErr := atomic.LoadUint64(&c.serverErrors)
|
||||
totalLatencyNs := atomic.LoadUint64(&c.totalLatencyNs)
|
||||
maxLatencyNs := atomic.LoadUint64(&c.maxLatencyNs)
|
||||
|
||||
var clientRate float64
|
||||
var serverRate float64
|
||||
var avgLatencyMs float64
|
||||
if total > 0 {
|
||||
clientRate = float64(clientErr) * 100 / float64(total)
|
||||
serverRate = float64(serverErr) * 100 / float64(total)
|
||||
avgLatencyMs = float64(totalLatencyNs) / float64(total) / 1e6
|
||||
}
|
||||
|
||||
return Snapshot{
|
||||
GeneratedAtUTC: now.UTC().Format(time.RFC3339),
|
||||
UptimeSeconds: int64(now.Sub(c.startedAt).Seconds()),
|
||||
TotalRequests: total,
|
||||
ClientErrors: clientErr,
|
||||
ServerErrors: serverErr,
|
||||
ClientErrorRatePct: clientRate,
|
||||
ServerErrorRatePct: serverRate,
|
||||
AvgLatencyMs: avgLatencyMs,
|
||||
MaxLatencyMs: float64(maxLatencyNs) / 1e6,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package observability
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCollectorSnapshot(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := NewCollector()
|
||||
c.Observe(200, 100*time.Millisecond)
|
||||
c.Observe(404, 300*time.Millisecond)
|
||||
c.Observe(500, 500*time.Millisecond)
|
||||
|
||||
s := c.Snapshot()
|
||||
if s.TotalRequests != 3 {
|
||||
t.Fatalf("total_requests=%d, want=3", s.TotalRequests)
|
||||
}
|
||||
if s.ClientErrors != 1 {
|
||||
t.Fatalf("client_errors=%d, want=1", s.ClientErrors)
|
||||
}
|
||||
if s.ServerErrors != 1 {
|
||||
t.Fatalf("server_errors=%d, want=1", s.ServerErrors)
|
||||
}
|
||||
if s.ServerErrorRatePct <= 0 {
|
||||
t.Fatalf("server_error_rate_pct=%f, want>0", s.ServerErrorRatePct)
|
||||
}
|
||||
if s.AvgLatencyMs <= 0 {
|
||||
t.Fatalf("avg_latency_ms=%f, want>0", s.AvgLatencyMs)
|
||||
}
|
||||
if s.MaxLatencyMs < 500 {
|
||||
t.Fatalf("max_latency_ms=%f, want>=500", s.MaxLatencyMs)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
package observability
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"wx_service/internal/middleware"
|
||||
)
|
||||
|
||||
var requestSeq uint64
|
||||
|
||||
func RequestLogMiddleware(metrics *Collector) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
startedAt := time.Now()
|
||||
|
||||
requestID := strings.TrimSpace(c.GetHeader("X-Request-ID"))
|
||||
if requestID == "" {
|
||||
seq := atomic.AddUint64(&requestSeq, 1)
|
||||
requestID = fmt.Sprintf("req-%d-%d", time.Now().UnixNano(), seq)
|
||||
}
|
||||
c.Writer.Header().Set("X-Request-ID", requestID)
|
||||
|
||||
c.Next()
|
||||
|
||||
status := c.Writer.Status()
|
||||
latency := time.Since(startedAt)
|
||||
if metrics != nil {
|
||||
metrics.Observe(status, latency)
|
||||
}
|
||||
|
||||
path := c.FullPath()
|
||||
if path == "" {
|
||||
path = c.Request.URL.Path
|
||||
}
|
||||
|
||||
level := "info"
|
||||
if status >= 500 {
|
||||
level = "error"
|
||||
} else if status >= 400 {
|
||||
level = "warn"
|
||||
}
|
||||
|
||||
entry := map[string]any{
|
||||
"ts": time.Now().UTC().Format(time.RFC3339),
|
||||
"level": level,
|
||||
"request_id": requestID,
|
||||
"method": c.Request.Method,
|
||||
"path": path,
|
||||
"status": status,
|
||||
"latency_ms": float64(latency.Microseconds()) / 1000.0,
|
||||
"client_ip": c.ClientIP(),
|
||||
}
|
||||
|
||||
if user, ok := middleware.CurrentUser(c); ok && user != nil {
|
||||
entry["uid"] = user.ID
|
||||
}
|
||||
if len(c.Errors) > 0 {
|
||||
entry["errors"] = c.Errors.String()
|
||||
}
|
||||
|
||||
payload, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
log.Printf("observability marshal log failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.Println(string(payload))
|
||||
}
|
||||
}
|
||||
|
||||
func BasicMetricsHandler(metrics *Collector) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
if metrics == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{
|
||||
"code": http.StatusServiceUnavailable,
|
||||
"message": "metrics not initialized",
|
||||
})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"code": http.StatusOK,
|
||||
"message": "success",
|
||||
"data": metrics.Snapshot(),
|
||||
})
|
||||
}
|
||||
}
|
||||
Executable
+67
@@ -0,0 +1,67 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# 用法:
|
||||
# METRICS_URL=http://127.0.0.1:8080/metrics/basic \
|
||||
# SERVER_ERROR_RATE_THRESHOLD=5 AVG_LATENCY_THRESHOLD_MS=800 \
|
||||
# OPS_ALERT_WEBHOOK=https://example.com/webhook ./scripts/ops/check_basic_metrics.sh
|
||||
|
||||
METRICS_URL="${METRICS_URL:-http://127.0.0.1:8080/metrics/basic}"
|
||||
SERVER_ERROR_RATE_THRESHOLD="${SERVER_ERROR_RATE_THRESHOLD:-5}"
|
||||
AVG_LATENCY_THRESHOLD_MS="${AVG_LATENCY_THRESHOLD_MS:-800}"
|
||||
OPS_ALERT_WEBHOOK="${OPS_ALERT_WEBHOOK:-}"
|
||||
ALERT_TITLE="${ALERT_TITLE:-[wx_service] 基础监控告警}"
|
||||
|
||||
send_alert() {
|
||||
local message="$1"
|
||||
if [[ -z "${OPS_ALERT_WEBHOOK}" ]]; then
|
||||
echo "ALERT: ${message}" >&2
|
||||
return
|
||||
fi
|
||||
curl -fsS -X POST "${OPS_ALERT_WEBHOOK}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"title\":\"${ALERT_TITLE}\",\"message\":\"${message}\"}" >/dev/null || true
|
||||
}
|
||||
|
||||
response="$(curl -fsS "${METRICS_URL}" || true)"
|
||||
if [[ -z "${response}" ]]; then
|
||||
send_alert "无法访问 metrics 接口:${METRICS_URL}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
set +e
|
||||
check_output="$(echo "${response}" | python3 - "${SERVER_ERROR_RATE_THRESHOLD}" "${AVG_LATENCY_THRESHOLD_MS}" <<'PY'
|
||||
import json
|
||||
import sys
|
||||
|
||||
payload = json.load(sys.stdin)
|
||||
data = payload.get("data", {})
|
||||
server_error = float(data.get("server_error_rate_pct", 0.0))
|
||||
avg_latency = float(data.get("avg_latency_ms", 0.0))
|
||||
|
||||
server_th = float(sys.argv[1])
|
||||
latency_th = float(sys.argv[2])
|
||||
|
||||
problems = []
|
||||
if server_error > server_th:
|
||||
problems.append(f"server_error_rate_pct={server_error:.2f}% > {server_th:.2f}%")
|
||||
if avg_latency > latency_th:
|
||||
problems.append(f"avg_latency_ms={avg_latency:.2f} > {latency_th:.2f}")
|
||||
|
||||
if problems:
|
||||
print("; ".join(problems))
|
||||
sys.exit(1)
|
||||
|
||||
print(f"ok server_error_rate_pct={server_error:.2f}% avg_latency_ms={avg_latency:.2f}")
|
||||
PY
|
||||
)"
|
||||
check_exit=$?
|
||||
set -e
|
||||
|
||||
if [[ ${check_exit} -ne 0 ]]; then
|
||||
send_alert "基础监控阈值触发:${check_output}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "${check_output}"
|
||||
|
||||
Reference in New Issue
Block a user