Skip to main content

futu_server/
metrics.rs

1// 网关运行时监控指标
2//
3// 使用原子计数器,零开销采集,无需外部依赖。
4// 通过 telnet `show_metrics` 命令查看。
5// v1.4.90 P1-B: 也通过 [`GatewayMetrics::render_prometheus`] 暴露到
6// `/metrics` HTTP 端点 (经 [`futu_auth::metrics::Registry`] extension renderer
7// 注册). 之前 v1.4.83/84 声称这些 counter 在 Prometheus 但仅在 telnet 输出.
8
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::Instant;
12
13use chrono::{Timelike, Utc};
14use parking_lot::RwLock;
15
16/// v1.4.84 §14: per-cmd_id per-UTC-hour breakdown for monitoring.
17///
18/// 让 tester CI 长窗口 job 能做时段异常检测 (CMD14716 UTC 15-18 window).
19/// 单独结构, 各 cmd counter 独立持有一组 24 bucket.
20#[derive(Debug)]
21pub struct HourBreakdown {
22    counters: [AtomicU64; 24],
23}
24
25impl HourBreakdown {
26    pub const fn new() -> Self {
27        Self {
28            counters: [const { AtomicU64::new(0) }; 24],
29        }
30    }
31
32    /// Bump the counter for current UTC hour (0..23).
33    pub fn bump_now(&self) {
34        let hour = Utc::now().hour() as usize;
35        if hour < 24 {
36            self.counters[hour].fetch_add(1, Ordering::Relaxed);
37        }
38    }
39
40    /// Read counter for specific hour (0..23). Out-of-range returns 0.
41    pub fn get(&self, hour: usize) -> u64 {
42        self.counters
43            .get(hour)
44            .map(|a| a.load(Ordering::Relaxed))
45            .unwrap_or(0)
46    }
47
48    /// Return snapshot of all 24 hours as array.
49    pub fn snapshot(&self) -> [u64; 24] {
50        let mut out = [0u64; 24];
51        for (i, c) in self.counters.iter().enumerate() {
52            out[i] = c.load(Ordering::Relaxed);
53        }
54        out
55    }
56}
57
58impl Default for HourBreakdown {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64/// 网关运行时监控指标
65pub struct GatewayMetrics {
66    /// 网关启动时间
67    pub start_time: Instant,
68
69    // ===== 连接指标 =====
70    /// 累计接受的客户端连接数
71    pub total_connections: AtomicU64,
72    /// 累计客户端断开数
73    pub total_disconnections: AtomicU64,
74    /// 被拒绝的连接数(超过上限)
75    pub rejected_connections: AtomicU64,
76
77    // ===== 请求指标 =====
78    /// 累计处理的请求数
79    pub total_requests: AtomicU64,
80    /// 累计请求错误数(handler 返回 None 或解密失败)
81    pub total_request_errors: AtomicU64,
82    /// 累计响应字节数
83    pub total_response_bytes: AtomicU64,
84
85    // ===== 后端指标 =====
86    /// 后端重连次数
87    pub backend_reconnects: AtomicU64,
88    /// 后端重连失败次数
89    pub backend_reconnect_failures: AtomicU64,
90    /// 最近一次重连时间 (Unix 毫秒, 0=未重连过)
91    pub last_reconnect_ms: AtomicU64,
92    /// 后端是否在线 (1=online, 0=offline)
93    pub backend_online: AtomicU64,
94
95    // ===== 推送指标 =====
96    /// 后端收到的推送数 (CMD 6212 / 4716 / 5300 等)
97    pub backend_pushes_received: AtomicU64,
98    /// 向客户端发送的推送数
99    pub client_pushes_sent: AtomicU64,
100    // v1.4.83 §14 Phase 4: per-cmd_id push 细分计数 (monitoring)
101    /// CMD 6212 quote push 计数
102    pub backend_pushes_cmd_quote: AtomicU64,
103    /// CMD 4716 trade notify (legacy channel) 计数
104    pub backend_pushes_cmd_trade_legacy: AtomicU64,
105    /// CMD 14716 trade notify (v1.4.41 new channel) 计数 — tester §14 追踪
106    pub backend_pushes_cmd_trade_new: AtomicU64,
107    /// CMD 5300 msg-center push 计数
108    pub backend_pushes_cmd_msg_center: AtomicU64,
109    /// 其他未路由 CMD push 计数
110    pub backend_pushes_cmd_other: AtomicU64,
111
112    // v1.4.84 §14: per-cmd_id × UTC-hour 分桶 (监控深化 — tester CI 长窗口
113    // job 用于 "CMD14716 UTC 15-18 异常时段" 检测). cmd_other 不分时段.
114    /// CMD 6212 quote push per-UTC-hour 计数
115    pub backend_pushes_cmd_quote_by_hour: HourBreakdown,
116    /// CMD 4716 trade notify (legacy) per-UTC-hour 计数
117    pub backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown,
118    /// CMD 14716 trade notify (new) per-UTC-hour 计数 — tester §14 主角
119    pub backend_pushes_cmd_trade_new_by_hour: HourBreakdown,
120    /// CMD 5300 msg-center push per-UTC-hour 计数
121    pub backend_pushes_cmd_msg_center_by_hour: HourBreakdown,
122
123    // ===== 订阅指标 =====
124    /// 行情订阅操作次数
125    pub qot_subscribe_ops: AtomicU64,
126    /// 行情退订操作次数
127    pub qot_unsubscribe_ops: AtomicU64,
128
129    // ===== v1.4.110 codex audit Round2 P3 #19: cold-cache wait 监控 =====
130    //
131    // GetBasicQot / GetOrderBook cache miss + 已订阅 → cold-cache wait 路径
132    // (Pull_SubData 主动拉 + 最多 3s 等 push). ops 用 hit/total 比看 backend
133    // push 延迟健康度, timeout/total 比看 cold-cache wait 是否常超时.
134    /// cold-cache wait 进入次数 (cache miss + IsSub, 触发 wait)
135    pub cold_cache_wait_total: AtomicU64,
136    /// cold-cache wait 命中次数 (3s 内 push 写 cache → re-read 拿到值)
137    pub cold_cache_wait_hit: AtomicU64,
138    /// cold-cache wait 超时次数 (3s timeout 仍 cache miss)
139    pub cold_cache_wait_timeout: AtomicU64,
140    /// 重连后重新订阅次数 (legacy, == applied_keys 累加; v1.4.106 codex 0631
141    /// F5 起仍 bump 向后兼容旧 dashboard, 等价新 `resubscribe_applied_keys`).
142    pub resubscribe_ops: AtomicU64,
143
144    // ===== v1.4.106 codex 0631 F5 [P3]: dual resubscribe counter =====
145    //
146    // 老 `resubscribe_ops_total` 把"触发数"和"真生效 keys 数"混一桶, 看不出
147    // partial / cache miss 的 silent loss. 拆 dual:
148    //   - resubscribe_attempts_total: 触发次数 (本 reconnect / staleness 触
149    //     发了 N 次 resubscribe, 不论结果).
150    //   - resubscribe_applied_keys_total: 真生效 keys 数 (cache resolve OK +
151    //     backend ack OK 的 (sec_key, sub_type) 对). 部分失败时 < attempts.
152    //
153    // ratio applied/attempts < 1.0 显著 → ghost subs / cache miss / backend
154    // partial reject 信号. legacy resubscribe_ops_total 仍 bump (向后兼容).
155    /// **v1.4.106 codex 0631 F5 [P3]**: resubscribe 触发次数 (每次 reconnect /
156    /// staleness loop 触发 += 1). 与 applied_keys 对比 ratio 看 partial 程度.
157    pub resubscribe_attempts: AtomicU64,
158    /// **v1.4.106 codex 0631 F5 [P3]**: resubscribe 真生效 keys 数 (累积).
159    /// applied < attempts × global_keys → 部分 partial.
160    pub resubscribe_applied_keys: AtomicU64,
161
162    // ===== v1.4.106 codex 1140 F8: 行情 push 投递失败计数 =====
163    //
164    // 之前 `bridge::push_parser` 用 `let _ = push_tx.try_send(event)` 静默吞错,
165    // cache 已更新但 subscriber 收不到 push (audit Finding 8). 加 metric +
166    // warn log 让 channel full / closed 立即可观测.
167    /// CMD 6212 行情 push (BasicQot/OrderBook/Ticker/RT/KL/Broker/...) 因
168    /// `push_tx` 队列满或关闭被 drop 的总次数 (累积).
169    pub qot_push_dropped_total: AtomicU64,
170    /// 按 SubType 拆分的 drop 计数 (proto Qot_Common.SubType: 0..=17, 共 18 桶).
171    /// 桶 0 = "未知 / 不属于任何已知 SubType" (兜底, e.g. 拼错的 sub_type).
172    /// 桶 1..=17 = 对应 SubType. v1.4.106 codex 1140 F8 加.
173    pub qot_push_dropped_by_sub_type: [AtomicU64; 18],
174
175    // ===== KeepAlive 指标 =====
176    /// KeepAlive 超时断开数
177    pub keepalive_timeouts: AtomicU64,
178
179    // ===== 延迟采样 =====
180    /// 最近 N 个请求延迟的环形缓冲 (纳秒)
181    latency_ring: RwLock<LatencyRing>,
182}
183
184/// 延迟环形缓冲 — 保留最近 1000 个采样
185struct LatencyRing {
186    buf: Vec<u64>,
187    pos: usize,
188    count: u64,
189    total_ns: u64,
190}
191
192const LATENCY_RING_SIZE: usize = 1000;
193
194impl LatencyRing {
195    fn new() -> Self {
196        Self {
197            buf: vec![0u64; LATENCY_RING_SIZE],
198            pos: 0,
199            count: 0,
200            total_ns: 0,
201        }
202    }
203
204    fn push(&mut self, ns: u64) {
205        // 减去被覆盖的旧值
206        if self.count >= LATENCY_RING_SIZE as u64 {
207            self.total_ns = self.total_ns.saturating_sub(self.buf[self.pos]);
208        }
209        self.buf[self.pos] = ns;
210        self.total_ns += ns;
211        self.pos = (self.pos + 1) % LATENCY_RING_SIZE;
212        self.count += 1;
213    }
214
215    fn stats(&self) -> LatencyStats {
216        let n = self.count.min(LATENCY_RING_SIZE as u64) as usize;
217        if n == 0 {
218            return LatencyStats::default();
219        }
220
221        let mut samples: Vec<u64> = if self.count >= LATENCY_RING_SIZE as u64 {
222            self.buf.clone()
223        } else {
224            self.buf[..n].to_vec()
225        };
226        samples.sort_unstable();
227
228        LatencyStats {
229            count: self.count,
230            avg_us: (self.total_ns / n as u64) / 1000,
231            p50_us: samples[n / 2] / 1000,
232            p95_us: samples[(n as f64 * 0.95) as usize] / 1000,
233            p99_us: samples[(n as f64 * 0.99).min((n - 1) as f64) as usize] / 1000,
234            max_us: samples[n - 1] / 1000,
235        }
236    }
237}
238
239/// 延迟统计摘要 (微秒)
240#[derive(Default)]
241pub struct LatencyStats {
242    /// 总采样数
243    pub count: u64,
244    /// 平均延迟 (微秒)
245    pub avg_us: u64,
246    /// P50 延迟
247    pub p50_us: u64,
248    /// P95 延迟
249    pub p95_us: u64,
250    /// P99 延迟
251    pub p99_us: u64,
252    /// 最大延迟
253    pub max_us: u64,
254}
255
256/// v1.4.84 §14: 把 24 小时 counters snapshot 格式化为空格分隔的单行.
257///
258/// 输出格式: `"h00=N h01=N ... h23=N"` — 便于人眼 scan 时段异常,
259/// 同时保持 parseable (awk / grep / Prometheus textfile).
260fn format_hour_row(hb: &HourBreakdown) -> String {
261    let snap = hb.snapshot();
262    let mut out = String::with_capacity(24 * 10);
263    for (i, v) in snap.iter().enumerate() {
264        if i > 0 {
265            out.push(' ');
266        }
267        out.push_str(&format!("h{:02}={}", i, v));
268    }
269    out
270}
271
272impl GatewayMetrics {
273    pub fn new() -> Self {
274        Self {
275            start_time: Instant::now(),
276            total_connections: AtomicU64::new(0),
277            total_disconnections: AtomicU64::new(0),
278            rejected_connections: AtomicU64::new(0),
279            total_requests: AtomicU64::new(0),
280            total_request_errors: AtomicU64::new(0),
281            total_response_bytes: AtomicU64::new(0),
282            backend_reconnects: AtomicU64::new(0),
283            backend_reconnect_failures: AtomicU64::new(0),
284            last_reconnect_ms: AtomicU64::new(0),
285            backend_online: AtomicU64::new(1),
286            backend_pushes_received: AtomicU64::new(0),
287            client_pushes_sent: AtomicU64::new(0),
288            backend_pushes_cmd_quote: AtomicU64::new(0),
289            backend_pushes_cmd_trade_legacy: AtomicU64::new(0),
290            backend_pushes_cmd_trade_new: AtomicU64::new(0),
291            backend_pushes_cmd_msg_center: AtomicU64::new(0),
292            backend_pushes_cmd_other: AtomicU64::new(0),
293            backend_pushes_cmd_quote_by_hour: HourBreakdown::new(),
294            backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown::new(),
295            backend_pushes_cmd_trade_new_by_hour: HourBreakdown::new(),
296            backend_pushes_cmd_msg_center_by_hour: HourBreakdown::new(),
297            qot_subscribe_ops: AtomicU64::new(0),
298            qot_unsubscribe_ops: AtomicU64::new(0),
299            cold_cache_wait_total: AtomicU64::new(0),
300            cold_cache_wait_hit: AtomicU64::new(0),
301            cold_cache_wait_timeout: AtomicU64::new(0),
302            resubscribe_ops: AtomicU64::new(0),
303            resubscribe_attempts: AtomicU64::new(0),
304            resubscribe_applied_keys: AtomicU64::new(0),
305            // v1.4.106 codex 1140 F8: qot push drop counter init.
306            qot_push_dropped_total: AtomicU64::new(0),
307            qot_push_dropped_by_sub_type: [const { AtomicU64::new(0) }; 18],
308            keepalive_timeouts: AtomicU64::new(0),
309            latency_ring: RwLock::new(LatencyRing::new()),
310        }
311    }
312
313    /// 记录一次请求延迟 (纳秒)
314    pub fn record_latency_ns(&self, ns: u64) {
315        self.latency_ring.write().push(ns);
316    }
317
318    /// v1.4.106 codex 1140 F8: 记录一次 qot push 被 drop (channel full / closed).
319    ///
320    /// `sub_type` 范围 0..=17 (proto Qot_Common.SubType). 越界值归桶 0
321    /// (未知). 同时 bump 总计数 + per-sub-type 桶, 保证 dashboard 可分维度.
322    pub fn record_qot_push_dropped(&self, sub_type: i32) {
323        self.qot_push_dropped_total.fetch_add(1, Ordering::Relaxed);
324        let bucket = if (0..18).contains(&sub_type) {
325            sub_type as usize
326        } else {
327            0
328        };
329        self.qot_push_dropped_by_sub_type[bucket].fetch_add(1, Ordering::Relaxed);
330    }
331
332    /// v1.4.106 codex 1140 F8: 读取每个 sub_type 桶的 drop 计数 (snapshot).
333    /// 用于 metrics endpoint render.
334    pub fn qot_push_dropped_per_sub_type(&self) -> [u64; 18] {
335        let mut out = [0u64; 18];
336        for (i, slot) in self.qot_push_dropped_by_sub_type.iter().enumerate() {
337            out[i] = slot.load(Ordering::Relaxed);
338        }
339        out
340    }
341
342    /// 获取延迟统计
343    pub fn latency_stats(&self) -> LatencyStats {
344        self.latency_ring.read().stats()
345    }
346
347    /// 格式化运行时间
348    pub fn uptime_str(&self) -> String {
349        let elapsed = self.start_time.elapsed();
350        let secs = elapsed.as_secs();
351        let days = secs / 86400;
352        let hours = (secs % 86400) / 3600;
353        let mins = (secs % 3600) / 60;
354        let s = secs % 60;
355        if days > 0 {
356            format!("{days}d {hours}h {mins}m {s}s")
357        } else if hours > 0 {
358            format!("{hours}h {mins}m {s}s")
359        } else {
360            format!("{mins}m {s}s")
361        }
362    }
363
364    /// 生成 telnet 可展示的指标报告
365    pub fn report(&self) -> String {
366        let lat = self.latency_stats();
367        let backend_status = if self.backend_online.load(Ordering::Relaxed) == 1 {
368            "ONLINE"
369        } else {
370            "OFFLINE"
371        };
372
373        let total_req = self.total_requests.load(Ordering::Relaxed);
374        let uptime_secs = self.start_time.elapsed().as_secs_f64();
375        let avg_rps = if uptime_secs > 0.0 {
376            total_req as f64 / uptime_secs
377        } else {
378            0.0
379        };
380
381        format!(
382            "=== Gateway Metrics ===\r\n\
383             Uptime: {uptime}\r\n\
384             \r\n\
385             [Connections]\r\n\
386             total_accepted: {total_conn}\r\n\
387             total_disconnected: {total_disconn}\r\n\
388             rejected (limit): {rejected}\r\n\
389             keepalive_timeouts: {ka_timeout}\r\n\
390             \r\n\
391             [Requests]\r\n\
392             total_requests: {total_req}\r\n\
393             total_errors: {total_err}\r\n\
394             avg_rps: {avg_rps:.1}\r\n\
395             response_bytes: {resp_bytes}\r\n\
396             \r\n\
397             [Latency (recent {lat_count} samples)]\r\n\
398             avg: {lat_avg}us  p50: {lat_p50}us  p95: {lat_p95}us  p99: {lat_p99}us  max: {lat_max}us\r\n\
399             \r\n\
400             [Backend]\r\n\
401             status: {backend_status}\r\n\
402             reconnects: {reconnects}\r\n\
403             reconnect_failures: {reconnect_fail}\r\n\
404             pushes_received: {push_recv}\r\n\
405             pushes_sent_to_clients: {push_sent}\r\n\
406             \r\n\
407             [Pushes by CMD (v1.4.83 §14)]\r\n\
408             cmd_6212_quote: {push_cmd_quote}\r\n\
409             cmd_4716_trade_legacy: {push_cmd_trade_legacy}\r\n\
410             cmd_14716_trade_new: {push_cmd_trade_new}\r\n\
411             cmd_5300_msg_center: {push_cmd_msg_center}\r\n\
412             cmd_other: {push_cmd_other}\r\n\
413             \r\n\
414             [Pushes by CMD × UTC hour (v1.4.84 §14)]\r\n\
415             cmd_14716_trade_new_hour_0..23: {hour_trade_new}\r\n\
416             cmd_6212_quote_hour_0..23: {hour_quote}\r\n\
417             cmd_4716_trade_legacy_hour_0..23: {hour_trade_legacy}\r\n\
418             cmd_5300_msg_center_hour_0..23: {hour_msg_center}\r\n\
419             \r\n\
420             [Subscriptions]\r\n\
421             subscribe_ops: {sub_ops}\r\n\
422             unsubscribe_ops: {unsub_ops}\r\n\
423             resubscribe_ops: {resub_ops}\r\n\
424             \r\n\
425             [Cold-cache wait (v1.4.110 §P3 #19)]\r\n\
426             total: {cc_total}  hit: {cc_hit}  timeout: {cc_timeout}\r\n",
427            uptime = self.uptime_str(),
428            total_conn = self.total_connections.load(Ordering::Relaxed),
429            total_disconn = self.total_disconnections.load(Ordering::Relaxed),
430            rejected = self.rejected_connections.load(Ordering::Relaxed),
431            ka_timeout = self.keepalive_timeouts.load(Ordering::Relaxed),
432            total_req = total_req,
433            total_err = self.total_request_errors.load(Ordering::Relaxed),
434            resp_bytes = self.total_response_bytes.load(Ordering::Relaxed),
435            lat_count = lat.count.min(LATENCY_RING_SIZE as u64),
436            lat_avg = lat.avg_us,
437            lat_p50 = lat.p50_us,
438            lat_p95 = lat.p95_us,
439            lat_p99 = lat.p99_us,
440            lat_max = lat.max_us,
441            reconnects = self.backend_reconnects.load(Ordering::Relaxed),
442            reconnect_fail = self.backend_reconnect_failures.load(Ordering::Relaxed),
443            push_recv = self.backend_pushes_received.load(Ordering::Relaxed),
444            push_sent = self.client_pushes_sent.load(Ordering::Relaxed),
445            push_cmd_quote = self.backend_pushes_cmd_quote.load(Ordering::Relaxed),
446            push_cmd_trade_legacy = self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed),
447            push_cmd_trade_new = self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed),
448            push_cmd_msg_center = self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed),
449            push_cmd_other = self.backend_pushes_cmd_other.load(Ordering::Relaxed),
450            hour_trade_new = format_hour_row(&self.backend_pushes_cmd_trade_new_by_hour),
451            hour_quote = format_hour_row(&self.backend_pushes_cmd_quote_by_hour),
452            hour_trade_legacy = format_hour_row(&self.backend_pushes_cmd_trade_legacy_by_hour),
453            hour_msg_center = format_hour_row(&self.backend_pushes_cmd_msg_center_by_hour),
454            sub_ops = self.qot_subscribe_ops.load(Ordering::Relaxed),
455            unsub_ops = self.qot_unsubscribe_ops.load(Ordering::Relaxed),
456            resub_ops = self.resubscribe_ops.load(Ordering::Relaxed),
457            cc_total = self.cold_cache_wait_total.load(Ordering::Relaxed),
458            cc_hit = self.cold_cache_wait_hit.load(Ordering::Relaxed),
459            cc_timeout = self.cold_cache_wait_timeout.load(Ordering::Relaxed),
460        )
461    }
462}
463
464impl Default for GatewayMetrics {
465    fn default() -> Self {
466        Self::new()
467    }
468}
469
470/// v1.4.90 P1-B: 把 24 小时 hour breakdown 渲染成 Prometheus 行 (with `hour` label).
471///
472/// 输出形如 `<metric_name>{hour="00"} 7\n<metric_name>{hour="01"} 0\n...`
473/// 每个 metric 24 行 (UTC hour 0..23).
474fn render_hour_breakdown_prom(metric_name: &str, hb: &HourBreakdown) -> String {
475    let snap = hb.snapshot();
476    let mut out = String::with_capacity(24 * 60);
477    for (h, v) in snap.iter().enumerate() {
478        out.push_str(&format!("{}{{hour=\"{:02}\"}} {}\n", metric_name, h, v));
479    }
480    out
481}
482
483impl GatewayMetrics {
484    /// v1.4.90 P1-B: 输出 Prometheus text exposition 格式, 涵盖:
485    ///
486    /// - 连接 / 请求 / 响应 / 后端连接基础 counter
487    /// - per-cmd_id push counter (cmd_6212_quote / cmd_4716 / cmd_14716 /
488    ///   cmd_5300 / cmd_other) — v1.4.83/84 telnet 已暴露, 此处补 Prometheus
489    /// - per-cmd_id × UTC-hour 24-bucket breakdown (v1.4.84 §14)
490    /// - 订阅 / KeepAlive 计数
491    /// - 延迟 p50/p95/p99 gauge (取最近 1000 样本)
492    ///
493    /// 注册到 [`futu_auth::metrics::Registry`] extension renderer 后, 在
494    /// `/metrics` HTTP 输出末尾自动追加. 见
495    /// [`install_prometheus_extension`].
496    #[must_use]
497    pub fn render_prometheus(&self) -> String {
498        let mut s = String::with_capacity(8192);
499
500        // ===== 连接 =====
501        s.push_str("# HELP futu_gateway_connections_total Total accepted client connections\n");
502        s.push_str("# TYPE futu_gateway_connections_total counter\n");
503        s.push_str(&format!(
504            "futu_gateway_connections_total {}\n",
505            self.total_connections.load(Ordering::Relaxed)
506        ));
507        s.push_str(
508            "# HELP futu_gateway_disconnections_total Total client disconnections\n# TYPE futu_gateway_disconnections_total counter\n",
509        );
510        s.push_str(&format!(
511            "futu_gateway_disconnections_total {}\n",
512            self.total_disconnections.load(Ordering::Relaxed)
513        ));
514        s.push_str(
515            "# HELP futu_gateway_rejected_connections_total Connections rejected (limit hit)\n# TYPE futu_gateway_rejected_connections_total counter\n",
516        );
517        s.push_str(&format!(
518            "futu_gateway_rejected_connections_total {}\n",
519            self.rejected_connections.load(Ordering::Relaxed)
520        ));
521        s.push_str(
522            "# HELP futu_gateway_keepalive_timeouts_total KeepAlive timeout disconnects\n# TYPE futu_gateway_keepalive_timeouts_total counter\n",
523        );
524        s.push_str(&format!(
525            "futu_gateway_keepalive_timeouts_total {}\n",
526            self.keepalive_timeouts.load(Ordering::Relaxed)
527        ));
528
529        // ===== 请求 =====
530        s.push_str(
531            "# HELP futu_gateway_requests_total Total handled client requests\n# TYPE futu_gateway_requests_total counter\n",
532        );
533        s.push_str(&format!(
534            "futu_gateway_requests_total {}\n",
535            self.total_requests.load(Ordering::Relaxed)
536        ));
537        s.push_str(
538            "# HELP futu_gateway_request_errors_total Handler-returned-None or decryption errors\n# TYPE futu_gateway_request_errors_total counter\n",
539        );
540        s.push_str(&format!(
541            "futu_gateway_request_errors_total {}\n",
542            self.total_request_errors.load(Ordering::Relaxed)
543        ));
544        s.push_str(
545            "# HELP futu_gateway_response_bytes_total Cumulative response payload bytes\n# TYPE futu_gateway_response_bytes_total counter\n",
546        );
547        s.push_str(&format!(
548            "futu_gateway_response_bytes_total {}\n",
549            self.total_response_bytes.load(Ordering::Relaxed)
550        ));
551
552        // ===== 后端 =====
553        s.push_str(
554            "# HELP futu_gateway_backend_online Backend connection state (1=online,0=offline)\n# TYPE futu_gateway_backend_online gauge\n",
555        );
556        s.push_str(&format!(
557            "futu_gateway_backend_online {}\n",
558            self.backend_online.load(Ordering::Relaxed)
559        ));
560        s.push_str(
561            "# HELP futu_gateway_backend_reconnects_total Backend reconnect attempts\n# TYPE futu_gateway_backend_reconnects_total counter\n",
562        );
563        s.push_str(&format!(
564            "futu_gateway_backend_reconnects_total {}\n",
565            self.backend_reconnects.load(Ordering::Relaxed)
566        ));
567        s.push_str(
568            "# HELP futu_gateway_backend_reconnect_failures_total Backend reconnect failures\n# TYPE futu_gateway_backend_reconnect_failures_total counter\n",
569        );
570        s.push_str(&format!(
571            "futu_gateway_backend_reconnect_failures_total {}\n",
572            self.backend_reconnect_failures.load(Ordering::Relaxed)
573        ));
574
575        // ===== 推送总数 =====
576        s.push_str(
577            "# HELP futu_gateway_backend_pushes_received_total Pushes received from backend\n# TYPE futu_gateway_backend_pushes_received_total counter\n",
578        );
579        s.push_str(&format!(
580            "futu_gateway_backend_pushes_received_total {}\n",
581            self.backend_pushes_received.load(Ordering::Relaxed)
582        ));
583        s.push_str(
584            "# HELP futu_gateway_client_pushes_sent_total Pushes forwarded to clients\n# TYPE futu_gateway_client_pushes_sent_total counter\n",
585        );
586        s.push_str(&format!(
587            "futu_gateway_client_pushes_sent_total {}\n",
588            self.client_pushes_sent.load(Ordering::Relaxed)
589        ));
590
591        // ===== per-cmd push counters (v1.4.83 §14) =====
592        // 这是 P1-B 修复的核心: telnet 已有但 /metrics 之前没暴露
593        s.push_str(
594            "# HELP futu_gateway_backend_pushes_cmd_total Backend pushes by cmd_id (v1.4.83 §14)\n# TYPE futu_gateway_backend_pushes_cmd_total counter\n",
595        );
596        s.push_str(&format!(
597            "futu_gateway_backend_pushes_cmd_total{{cmd=\"6212_quote\"}} {}\n",
598            self.backend_pushes_cmd_quote.load(Ordering::Relaxed)
599        ));
600        s.push_str(&format!(
601            "futu_gateway_backend_pushes_cmd_total{{cmd=\"4716_trade_legacy\"}} {}\n",
602            self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed)
603        ));
604        s.push_str(&format!(
605            "futu_gateway_backend_pushes_cmd_total{{cmd=\"14716_trade_new\"}} {}\n",
606            self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed)
607        ));
608        s.push_str(&format!(
609            "futu_gateway_backend_pushes_cmd_total{{cmd=\"5300_msg_center\"}} {}\n",
610            self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed)
611        ));
612        s.push_str(&format!(
613            "futu_gateway_backend_pushes_cmd_total{{cmd=\"other\"}} {}\n",
614            self.backend_pushes_cmd_other.load(Ordering::Relaxed)
615        ));
616
617        // ===== per-cmd × UTC hour breakdown (v1.4.84 §14) =====
618        s.push_str(
619            "# HELP futu_gateway_backend_pushes_cmd_quote_by_hour Cmd 6212 quote pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_quote_by_hour counter\n",
620        );
621        s.push_str(&render_hour_breakdown_prom(
622            "futu_gateway_backend_pushes_cmd_quote_by_hour",
623            &self.backend_pushes_cmd_quote_by_hour,
624        ));
625        s.push_str(
626            "# HELP futu_gateway_backend_pushes_cmd_trade_legacy_by_hour Cmd 4716 trade-legacy pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_trade_legacy_by_hour counter\n",
627        );
628        s.push_str(&render_hour_breakdown_prom(
629            "futu_gateway_backend_pushes_cmd_trade_legacy_by_hour",
630            &self.backend_pushes_cmd_trade_legacy_by_hour,
631        ));
632        s.push_str(
633            "# HELP futu_gateway_backend_pushes_cmd_trade_new_by_hour Cmd 14716 trade-new pushes per UTC hour (v1.4.84 §14 tester subject)\n# TYPE futu_gateway_backend_pushes_cmd_trade_new_by_hour counter\n",
634        );
635        s.push_str(&render_hour_breakdown_prom(
636            "futu_gateway_backend_pushes_cmd_trade_new_by_hour",
637            &self.backend_pushes_cmd_trade_new_by_hour,
638        ));
639        s.push_str(
640            "# HELP futu_gateway_backend_pushes_cmd_msg_center_by_hour Cmd 5300 msg-center pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_msg_center_by_hour counter\n",
641        );
642        s.push_str(&render_hour_breakdown_prom(
643            "futu_gateway_backend_pushes_cmd_msg_center_by_hour",
644            &self.backend_pushes_cmd_msg_center_by_hour,
645        ));
646
647        // ===== 订阅 =====
648        s.push_str(
649            "# HELP futu_gateway_qot_subscribe_ops_total Quote subscribe operations\n# TYPE futu_gateway_qot_subscribe_ops_total counter\n",
650        );
651        s.push_str(&format!(
652            "futu_gateway_qot_subscribe_ops_total {}\n",
653            self.qot_subscribe_ops.load(Ordering::Relaxed)
654        ));
655        s.push_str(
656            "# HELP futu_gateway_qot_unsubscribe_ops_total Quote unsubscribe operations\n# TYPE futu_gateway_qot_unsubscribe_ops_total counter\n",
657        );
658        s.push_str(&format!(
659            "futu_gateway_qot_unsubscribe_ops_total {}\n",
660            self.qot_unsubscribe_ops.load(Ordering::Relaxed)
661        ));
662        s.push_str(
663            "# HELP futu_gateway_resubscribe_ops_total Re-subscribe ops after reconnect (legacy, == resubscribe_applied_keys)\n# TYPE futu_gateway_resubscribe_ops_total counter\n",
664        );
665        s.push_str(&format!(
666            "futu_gateway_resubscribe_ops_total {}\n",
667            self.resubscribe_ops.load(Ordering::Relaxed)
668        ));
669        // v1.4.106 codex 0631 F5: dual counter — attempts 触发 vs applied_keys 真生效.
670        s.push_str(
671            "# HELP futu_gateway_resubscribe_attempts_total Re-subscribe trigger count (each reconnect/staleness loop +=1)\n# TYPE futu_gateway_resubscribe_attempts_total counter\n",
672        );
673        s.push_str(&format!(
674            "futu_gateway_resubscribe_attempts_total {}\n",
675            self.resubscribe_attempts.load(Ordering::Relaxed)
676        ));
677        s.push_str(
678            "# HELP futu_gateway_resubscribe_applied_keys_total Re-subscribe applied keys total (cache resolve OK + backend ack OK)\n# TYPE futu_gateway_resubscribe_applied_keys_total counter\n",
679        );
680        s.push_str(&format!(
681            "futu_gateway_resubscribe_applied_keys_total {}\n",
682            self.resubscribe_applied_keys.load(Ordering::Relaxed)
683        ));
684
685        // v1.4.110 codex audit Round2 P3 #19: cold-cache wait 监控.
686        // hit/total 比看 backend push 延迟健康度, timeout/total 比看常超时.
687        s.push_str(
688            "# HELP futu_gateway_cold_cache_wait_total Cold-cache wait entries (cache miss + IsSub)\n# TYPE futu_gateway_cold_cache_wait_total counter\n",
689        );
690        s.push_str(&format!(
691            "futu_gateway_cold_cache_wait_total {}\n",
692            self.cold_cache_wait_total.load(Ordering::Relaxed)
693        ));
694        s.push_str(
695            "# HELP futu_gateway_cold_cache_wait_hit_total Cold-cache wait hits (push filled cache within timeout)\n# TYPE futu_gateway_cold_cache_wait_hit_total counter\n",
696        );
697        s.push_str(&format!(
698            "futu_gateway_cold_cache_wait_hit_total {}\n",
699            self.cold_cache_wait_hit.load(Ordering::Relaxed)
700        ));
701        s.push_str(
702            "# HELP futu_gateway_cold_cache_wait_timeout_total Cold-cache wait timeouts (3s elapsed, cache still miss)\n# TYPE futu_gateway_cold_cache_wait_timeout_total counter\n",
703        );
704        s.push_str(&format!(
705            "futu_gateway_cold_cache_wait_timeout_total {}\n",
706            self.cold_cache_wait_timeout.load(Ordering::Relaxed)
707        ));
708
709        // ===== 延迟 (gauge, 取 ring 当前 stats) =====
710        let lat = self.latency_stats();
711        s.push_str(
712            "# HELP futu_gateway_request_latency_us Request latency percentiles (microseconds, recent ring)\n# TYPE futu_gateway_request_latency_us gauge\n",
713        );
714        s.push_str(&format!(
715            "futu_gateway_request_latency_us{{quantile=\"p50\"}} {}\n",
716            lat.p50_us
717        ));
718        s.push_str(&format!(
719            "futu_gateway_request_latency_us{{quantile=\"p95\"}} {}\n",
720            lat.p95_us
721        ));
722        s.push_str(&format!(
723            "futu_gateway_request_latency_us{{quantile=\"p99\"}} {}\n",
724            lat.p99_us
725        ));
726        s.push_str(&format!(
727            "futu_gateway_request_latency_us{{quantile=\"max\"}} {}\n",
728            lat.max_us
729        ));
730
731        s
732    }
733}
734
735/// v1.4.90 P1-B: 把 [`GatewayMetrics`] 注册为 [`futu_auth::metrics::Registry`]
736/// 的 extension renderer, 让 `/metrics` HTTP 端点自动包含 per-cmd / per-hour
737/// counter.
738///
739/// 调用方 (futu-opend `main.rs`) 在创建 `GatewayMetrics` Arc 之后调一次:
740///
741/// ```ignore
742/// futu_auth::metrics::install(Arc::new(MetricsRegistry::default()));
743/// // ... bridge / server 初始化, 都共享同一份 Arc<GatewayMetrics> ...
744/// futu_server::metrics::install_prometheus_extension(server.metrics().clone());
745/// ```
746///
747/// 多次调用会注册多个 renderer (无害但重复输出); 实际上只该调用一次.
748pub fn install_prometheus_extension(metrics: Arc<GatewayMetrics>) {
749    futu_auth::metrics::register_global_renderer(move || metrics.render_prometheus());
750}
751
752#[cfg(test)]
753mod tests;