1use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::Instant;
12
13use chrono::{Timelike, Utc};
14use parking_lot::RwLock;
15
16#[derive(Debug)]
21pub struct HourBreakdown {
22 counters: [AtomicU64; 24],
23}
24
25impl HourBreakdown {
26 pub const fn new() -> Self {
27 Self {
28 counters: [const { AtomicU64::new(0) }; 24],
29 }
30 }
31
32 pub fn bump_now(&self) {
34 let hour = Utc::now().hour() as usize;
35 if hour < 24 {
36 self.counters[hour].fetch_add(1, Ordering::Relaxed);
37 }
38 }
39
40 pub fn get(&self, hour: usize) -> u64 {
42 self.counters
43 .get(hour)
44 .map(|a| a.load(Ordering::Relaxed))
45 .unwrap_or(0)
46 }
47
48 pub fn snapshot(&self) -> [u64; 24] {
50 let mut out = [0u64; 24];
51 for (i, c) in self.counters.iter().enumerate() {
52 out[i] = c.load(Ordering::Relaxed);
53 }
54 out
55 }
56}
57
58impl Default for HourBreakdown {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64pub struct GatewayMetrics {
66 pub start_time: Instant,
68
69 pub total_connections: AtomicU64,
72 pub total_disconnections: AtomicU64,
74 pub rejected_connections: AtomicU64,
76
77 pub total_requests: AtomicU64,
80 pub total_request_errors: AtomicU64,
82 pub total_response_bytes: AtomicU64,
84
85 pub backend_reconnects: AtomicU64,
88 pub backend_reconnect_failures: AtomicU64,
90 pub last_reconnect_ms: AtomicU64,
92 pub backend_online: AtomicU64,
94
95 pub backend_pushes_received: AtomicU64,
98 pub client_pushes_sent: AtomicU64,
100 pub backend_pushes_cmd_quote: AtomicU64,
103 pub backend_pushes_cmd_trade_legacy: AtomicU64,
105 pub backend_pushes_cmd_trade_new: AtomicU64,
107 pub backend_pushes_cmd_msg_center: AtomicU64,
109 pub backend_pushes_cmd_other: AtomicU64,
111
112 pub backend_pushes_cmd_quote_by_hour: HourBreakdown,
116 pub backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown,
118 pub backend_pushes_cmd_trade_new_by_hour: HourBreakdown,
120 pub backend_pushes_cmd_msg_center_by_hour: HourBreakdown,
122
123 pub qot_subscribe_ops: AtomicU64,
126 pub qot_unsubscribe_ops: AtomicU64,
128
129 pub cold_cache_wait_total: AtomicU64,
136 pub cold_cache_wait_hit: AtomicU64,
138 pub cold_cache_wait_timeout: AtomicU64,
140 pub resubscribe_ops: AtomicU64,
143
144 pub resubscribe_attempts: AtomicU64,
158 pub resubscribe_applied_keys: AtomicU64,
161
162 pub qot_push_dropped_total: AtomicU64,
170 pub qot_push_dropped_by_sub_type: [AtomicU64; 18],
174
175 pub keepalive_timeouts: AtomicU64,
178
179 latency_ring: RwLock<LatencyRing>,
182}
183
184struct LatencyRing {
186 buf: Vec<u64>,
187 pos: usize,
188 count: u64,
189 total_ns: u64,
190}
191
192const LATENCY_RING_SIZE: usize = 1000;
193
194impl LatencyRing {
195 fn new() -> Self {
196 Self {
197 buf: vec![0u64; LATENCY_RING_SIZE],
198 pos: 0,
199 count: 0,
200 total_ns: 0,
201 }
202 }
203
204 fn push(&mut self, ns: u64) {
205 if self.count >= LATENCY_RING_SIZE as u64 {
207 self.total_ns = self.total_ns.saturating_sub(self.buf[self.pos]);
208 }
209 self.buf[self.pos] = ns;
210 self.total_ns += ns;
211 self.pos = (self.pos + 1) % LATENCY_RING_SIZE;
212 self.count += 1;
213 }
214
215 fn stats(&self) -> LatencyStats {
216 let n = self.count.min(LATENCY_RING_SIZE as u64) as usize;
217 if n == 0 {
218 return LatencyStats::default();
219 }
220
221 let mut samples: Vec<u64> = if self.count >= LATENCY_RING_SIZE as u64 {
222 self.buf.clone()
223 } else {
224 self.buf[..n].to_vec()
225 };
226 samples.sort_unstable();
227
228 LatencyStats {
229 count: self.count,
230 avg_us: (self.total_ns / n as u64) / 1000,
231 p50_us: samples[n / 2] / 1000,
232 p95_us: samples[(n as f64 * 0.95) as usize] / 1000,
233 p99_us: samples[(n as f64 * 0.99).min((n - 1) as f64) as usize] / 1000,
234 max_us: samples[n - 1] / 1000,
235 }
236 }
237}
238
239#[derive(Default)]
241pub struct LatencyStats {
242 pub count: u64,
244 pub avg_us: u64,
246 pub p50_us: u64,
248 pub p95_us: u64,
250 pub p99_us: u64,
252 pub max_us: u64,
254}
255
256fn format_hour_row(hb: &HourBreakdown) -> String {
261 let snap = hb.snapshot();
262 let mut out = String::with_capacity(24 * 10);
263 for (i, v) in snap.iter().enumerate() {
264 if i > 0 {
265 out.push(' ');
266 }
267 out.push_str(&format!("h{:02}={}", i, v));
268 }
269 out
270}
271
272impl GatewayMetrics {
273 pub fn new() -> Self {
274 Self {
275 start_time: Instant::now(),
276 total_connections: AtomicU64::new(0),
277 total_disconnections: AtomicU64::new(0),
278 rejected_connections: AtomicU64::new(0),
279 total_requests: AtomicU64::new(0),
280 total_request_errors: AtomicU64::new(0),
281 total_response_bytes: AtomicU64::new(0),
282 backend_reconnects: AtomicU64::new(0),
283 backend_reconnect_failures: AtomicU64::new(0),
284 last_reconnect_ms: AtomicU64::new(0),
285 backend_online: AtomicU64::new(1),
286 backend_pushes_received: AtomicU64::new(0),
287 client_pushes_sent: AtomicU64::new(0),
288 backend_pushes_cmd_quote: AtomicU64::new(0),
289 backend_pushes_cmd_trade_legacy: AtomicU64::new(0),
290 backend_pushes_cmd_trade_new: AtomicU64::new(0),
291 backend_pushes_cmd_msg_center: AtomicU64::new(0),
292 backend_pushes_cmd_other: AtomicU64::new(0),
293 backend_pushes_cmd_quote_by_hour: HourBreakdown::new(),
294 backend_pushes_cmd_trade_legacy_by_hour: HourBreakdown::new(),
295 backend_pushes_cmd_trade_new_by_hour: HourBreakdown::new(),
296 backend_pushes_cmd_msg_center_by_hour: HourBreakdown::new(),
297 qot_subscribe_ops: AtomicU64::new(0),
298 qot_unsubscribe_ops: AtomicU64::new(0),
299 cold_cache_wait_total: AtomicU64::new(0),
300 cold_cache_wait_hit: AtomicU64::new(0),
301 cold_cache_wait_timeout: AtomicU64::new(0),
302 resubscribe_ops: AtomicU64::new(0),
303 resubscribe_attempts: AtomicU64::new(0),
304 resubscribe_applied_keys: AtomicU64::new(0),
305 qot_push_dropped_total: AtomicU64::new(0),
307 qot_push_dropped_by_sub_type: [const { AtomicU64::new(0) }; 18],
308 keepalive_timeouts: AtomicU64::new(0),
309 latency_ring: RwLock::new(LatencyRing::new()),
310 }
311 }
312
313 pub fn record_latency_ns(&self, ns: u64) {
315 self.latency_ring.write().push(ns);
316 }
317
318 pub fn record_qot_push_dropped(&self, sub_type: i32) {
323 self.qot_push_dropped_total.fetch_add(1, Ordering::Relaxed);
324 let bucket = if (0..18).contains(&sub_type) {
325 sub_type as usize
326 } else {
327 0
328 };
329 self.qot_push_dropped_by_sub_type[bucket].fetch_add(1, Ordering::Relaxed);
330 }
331
332 pub fn qot_push_dropped_per_sub_type(&self) -> [u64; 18] {
335 let mut out = [0u64; 18];
336 for (i, slot) in self.qot_push_dropped_by_sub_type.iter().enumerate() {
337 out[i] = slot.load(Ordering::Relaxed);
338 }
339 out
340 }
341
342 pub fn latency_stats(&self) -> LatencyStats {
344 self.latency_ring.read().stats()
345 }
346
347 pub fn uptime_str(&self) -> String {
349 let elapsed = self.start_time.elapsed();
350 let secs = elapsed.as_secs();
351 let days = secs / 86400;
352 let hours = (secs % 86400) / 3600;
353 let mins = (secs % 3600) / 60;
354 let s = secs % 60;
355 if days > 0 {
356 format!("{days}d {hours}h {mins}m {s}s")
357 } else if hours > 0 {
358 format!("{hours}h {mins}m {s}s")
359 } else {
360 format!("{mins}m {s}s")
361 }
362 }
363
364 pub fn report(&self) -> String {
366 let lat = self.latency_stats();
367 let backend_status = if self.backend_online.load(Ordering::Relaxed) == 1 {
368 "ONLINE"
369 } else {
370 "OFFLINE"
371 };
372
373 let total_req = self.total_requests.load(Ordering::Relaxed);
374 let uptime_secs = self.start_time.elapsed().as_secs_f64();
375 let avg_rps = if uptime_secs > 0.0 {
376 total_req as f64 / uptime_secs
377 } else {
378 0.0
379 };
380
381 format!(
382 "=== Gateway Metrics ===\r\n\
383 Uptime: {uptime}\r\n\
384 \r\n\
385 [Connections]\r\n\
386 total_accepted: {total_conn}\r\n\
387 total_disconnected: {total_disconn}\r\n\
388 rejected (limit): {rejected}\r\n\
389 keepalive_timeouts: {ka_timeout}\r\n\
390 \r\n\
391 [Requests]\r\n\
392 total_requests: {total_req}\r\n\
393 total_errors: {total_err}\r\n\
394 avg_rps: {avg_rps:.1}\r\n\
395 response_bytes: {resp_bytes}\r\n\
396 \r\n\
397 [Latency (recent {lat_count} samples)]\r\n\
398 avg: {lat_avg}us p50: {lat_p50}us p95: {lat_p95}us p99: {lat_p99}us max: {lat_max}us\r\n\
399 \r\n\
400 [Backend]\r\n\
401 status: {backend_status}\r\n\
402 reconnects: {reconnects}\r\n\
403 reconnect_failures: {reconnect_fail}\r\n\
404 pushes_received: {push_recv}\r\n\
405 pushes_sent_to_clients: {push_sent}\r\n\
406 \r\n\
407 [Pushes by CMD (v1.4.83 §14)]\r\n\
408 cmd_6212_quote: {push_cmd_quote}\r\n\
409 cmd_4716_trade_legacy: {push_cmd_trade_legacy}\r\n\
410 cmd_14716_trade_new: {push_cmd_trade_new}\r\n\
411 cmd_5300_msg_center: {push_cmd_msg_center}\r\n\
412 cmd_other: {push_cmd_other}\r\n\
413 \r\n\
414 [Pushes by CMD × UTC hour (v1.4.84 §14)]\r\n\
415 cmd_14716_trade_new_hour_0..23: {hour_trade_new}\r\n\
416 cmd_6212_quote_hour_0..23: {hour_quote}\r\n\
417 cmd_4716_trade_legacy_hour_0..23: {hour_trade_legacy}\r\n\
418 cmd_5300_msg_center_hour_0..23: {hour_msg_center}\r\n\
419 \r\n\
420 [Subscriptions]\r\n\
421 subscribe_ops: {sub_ops}\r\n\
422 unsubscribe_ops: {unsub_ops}\r\n\
423 resubscribe_ops: {resub_ops}\r\n\
424 \r\n\
425 [Cold-cache wait (v1.4.110 §P3 #19)]\r\n\
426 total: {cc_total} hit: {cc_hit} timeout: {cc_timeout}\r\n",
427 uptime = self.uptime_str(),
428 total_conn = self.total_connections.load(Ordering::Relaxed),
429 total_disconn = self.total_disconnections.load(Ordering::Relaxed),
430 rejected = self.rejected_connections.load(Ordering::Relaxed),
431 ka_timeout = self.keepalive_timeouts.load(Ordering::Relaxed),
432 total_req = total_req,
433 total_err = self.total_request_errors.load(Ordering::Relaxed),
434 resp_bytes = self.total_response_bytes.load(Ordering::Relaxed),
435 lat_count = lat.count.min(LATENCY_RING_SIZE as u64),
436 lat_avg = lat.avg_us,
437 lat_p50 = lat.p50_us,
438 lat_p95 = lat.p95_us,
439 lat_p99 = lat.p99_us,
440 lat_max = lat.max_us,
441 reconnects = self.backend_reconnects.load(Ordering::Relaxed),
442 reconnect_fail = self.backend_reconnect_failures.load(Ordering::Relaxed),
443 push_recv = self.backend_pushes_received.load(Ordering::Relaxed),
444 push_sent = self.client_pushes_sent.load(Ordering::Relaxed),
445 push_cmd_quote = self.backend_pushes_cmd_quote.load(Ordering::Relaxed),
446 push_cmd_trade_legacy = self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed),
447 push_cmd_trade_new = self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed),
448 push_cmd_msg_center = self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed),
449 push_cmd_other = self.backend_pushes_cmd_other.load(Ordering::Relaxed),
450 hour_trade_new = format_hour_row(&self.backend_pushes_cmd_trade_new_by_hour),
451 hour_quote = format_hour_row(&self.backend_pushes_cmd_quote_by_hour),
452 hour_trade_legacy = format_hour_row(&self.backend_pushes_cmd_trade_legacy_by_hour),
453 hour_msg_center = format_hour_row(&self.backend_pushes_cmd_msg_center_by_hour),
454 sub_ops = self.qot_subscribe_ops.load(Ordering::Relaxed),
455 unsub_ops = self.qot_unsubscribe_ops.load(Ordering::Relaxed),
456 resub_ops = self.resubscribe_ops.load(Ordering::Relaxed),
457 cc_total = self.cold_cache_wait_total.load(Ordering::Relaxed),
458 cc_hit = self.cold_cache_wait_hit.load(Ordering::Relaxed),
459 cc_timeout = self.cold_cache_wait_timeout.load(Ordering::Relaxed),
460 )
461 }
462}
463
464impl Default for GatewayMetrics {
465 fn default() -> Self {
466 Self::new()
467 }
468}
469
470fn render_hour_breakdown_prom(metric_name: &str, hb: &HourBreakdown) -> String {
475 let snap = hb.snapshot();
476 let mut out = String::with_capacity(24 * 60);
477 for (h, v) in snap.iter().enumerate() {
478 out.push_str(&format!("{}{{hour=\"{:02}\"}} {}\n", metric_name, h, v));
479 }
480 out
481}
482
483impl GatewayMetrics {
484 #[must_use]
497 pub fn render_prometheus(&self) -> String {
498 let mut s = String::with_capacity(8192);
499
500 s.push_str("# HELP futu_gateway_connections_total Total accepted client connections\n");
502 s.push_str("# TYPE futu_gateway_connections_total counter\n");
503 s.push_str(&format!(
504 "futu_gateway_connections_total {}\n",
505 self.total_connections.load(Ordering::Relaxed)
506 ));
507 s.push_str(
508 "# HELP futu_gateway_disconnections_total Total client disconnections\n# TYPE futu_gateway_disconnections_total counter\n",
509 );
510 s.push_str(&format!(
511 "futu_gateway_disconnections_total {}\n",
512 self.total_disconnections.load(Ordering::Relaxed)
513 ));
514 s.push_str(
515 "# HELP futu_gateway_rejected_connections_total Connections rejected (limit hit)\n# TYPE futu_gateway_rejected_connections_total counter\n",
516 );
517 s.push_str(&format!(
518 "futu_gateway_rejected_connections_total {}\n",
519 self.rejected_connections.load(Ordering::Relaxed)
520 ));
521 s.push_str(
522 "# HELP futu_gateway_keepalive_timeouts_total KeepAlive timeout disconnects\n# TYPE futu_gateway_keepalive_timeouts_total counter\n",
523 );
524 s.push_str(&format!(
525 "futu_gateway_keepalive_timeouts_total {}\n",
526 self.keepalive_timeouts.load(Ordering::Relaxed)
527 ));
528
529 s.push_str(
531 "# HELP futu_gateway_requests_total Total handled client requests\n# TYPE futu_gateway_requests_total counter\n",
532 );
533 s.push_str(&format!(
534 "futu_gateway_requests_total {}\n",
535 self.total_requests.load(Ordering::Relaxed)
536 ));
537 s.push_str(
538 "# HELP futu_gateway_request_errors_total Handler-returned-None or decryption errors\n# TYPE futu_gateway_request_errors_total counter\n",
539 );
540 s.push_str(&format!(
541 "futu_gateway_request_errors_total {}\n",
542 self.total_request_errors.load(Ordering::Relaxed)
543 ));
544 s.push_str(
545 "# HELP futu_gateway_response_bytes_total Cumulative response payload bytes\n# TYPE futu_gateway_response_bytes_total counter\n",
546 );
547 s.push_str(&format!(
548 "futu_gateway_response_bytes_total {}\n",
549 self.total_response_bytes.load(Ordering::Relaxed)
550 ));
551
552 s.push_str(
554 "# HELP futu_gateway_backend_online Backend connection state (1=online,0=offline)\n# TYPE futu_gateway_backend_online gauge\n",
555 );
556 s.push_str(&format!(
557 "futu_gateway_backend_online {}\n",
558 self.backend_online.load(Ordering::Relaxed)
559 ));
560 s.push_str(
561 "# HELP futu_gateway_backend_reconnects_total Backend reconnect attempts\n# TYPE futu_gateway_backend_reconnects_total counter\n",
562 );
563 s.push_str(&format!(
564 "futu_gateway_backend_reconnects_total {}\n",
565 self.backend_reconnects.load(Ordering::Relaxed)
566 ));
567 s.push_str(
568 "# HELP futu_gateway_backend_reconnect_failures_total Backend reconnect failures\n# TYPE futu_gateway_backend_reconnect_failures_total counter\n",
569 );
570 s.push_str(&format!(
571 "futu_gateway_backend_reconnect_failures_total {}\n",
572 self.backend_reconnect_failures.load(Ordering::Relaxed)
573 ));
574
575 s.push_str(
577 "# HELP futu_gateway_backend_pushes_received_total Pushes received from backend\n# TYPE futu_gateway_backend_pushes_received_total counter\n",
578 );
579 s.push_str(&format!(
580 "futu_gateway_backend_pushes_received_total {}\n",
581 self.backend_pushes_received.load(Ordering::Relaxed)
582 ));
583 s.push_str(
584 "# HELP futu_gateway_client_pushes_sent_total Pushes forwarded to clients\n# TYPE futu_gateway_client_pushes_sent_total counter\n",
585 );
586 s.push_str(&format!(
587 "futu_gateway_client_pushes_sent_total {}\n",
588 self.client_pushes_sent.load(Ordering::Relaxed)
589 ));
590
591 s.push_str(
594 "# HELP futu_gateway_backend_pushes_cmd_total Backend pushes by cmd_id (v1.4.83 §14)\n# TYPE futu_gateway_backend_pushes_cmd_total counter\n",
595 );
596 s.push_str(&format!(
597 "futu_gateway_backend_pushes_cmd_total{{cmd=\"6212_quote\"}} {}\n",
598 self.backend_pushes_cmd_quote.load(Ordering::Relaxed)
599 ));
600 s.push_str(&format!(
601 "futu_gateway_backend_pushes_cmd_total{{cmd=\"4716_trade_legacy\"}} {}\n",
602 self.backend_pushes_cmd_trade_legacy.load(Ordering::Relaxed)
603 ));
604 s.push_str(&format!(
605 "futu_gateway_backend_pushes_cmd_total{{cmd=\"14716_trade_new\"}} {}\n",
606 self.backend_pushes_cmd_trade_new.load(Ordering::Relaxed)
607 ));
608 s.push_str(&format!(
609 "futu_gateway_backend_pushes_cmd_total{{cmd=\"5300_msg_center\"}} {}\n",
610 self.backend_pushes_cmd_msg_center.load(Ordering::Relaxed)
611 ));
612 s.push_str(&format!(
613 "futu_gateway_backend_pushes_cmd_total{{cmd=\"other\"}} {}\n",
614 self.backend_pushes_cmd_other.load(Ordering::Relaxed)
615 ));
616
617 s.push_str(
619 "# HELP futu_gateway_backend_pushes_cmd_quote_by_hour Cmd 6212 quote pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_quote_by_hour counter\n",
620 );
621 s.push_str(&render_hour_breakdown_prom(
622 "futu_gateway_backend_pushes_cmd_quote_by_hour",
623 &self.backend_pushes_cmd_quote_by_hour,
624 ));
625 s.push_str(
626 "# HELP futu_gateway_backend_pushes_cmd_trade_legacy_by_hour Cmd 4716 trade-legacy pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_trade_legacy_by_hour counter\n",
627 );
628 s.push_str(&render_hour_breakdown_prom(
629 "futu_gateway_backend_pushes_cmd_trade_legacy_by_hour",
630 &self.backend_pushes_cmd_trade_legacy_by_hour,
631 ));
632 s.push_str(
633 "# HELP futu_gateway_backend_pushes_cmd_trade_new_by_hour Cmd 14716 trade-new pushes per UTC hour (v1.4.84 §14 tester subject)\n# TYPE futu_gateway_backend_pushes_cmd_trade_new_by_hour counter\n",
634 );
635 s.push_str(&render_hour_breakdown_prom(
636 "futu_gateway_backend_pushes_cmd_trade_new_by_hour",
637 &self.backend_pushes_cmd_trade_new_by_hour,
638 ));
639 s.push_str(
640 "# HELP futu_gateway_backend_pushes_cmd_msg_center_by_hour Cmd 5300 msg-center pushes per UTC hour\n# TYPE futu_gateway_backend_pushes_cmd_msg_center_by_hour counter\n",
641 );
642 s.push_str(&render_hour_breakdown_prom(
643 "futu_gateway_backend_pushes_cmd_msg_center_by_hour",
644 &self.backend_pushes_cmd_msg_center_by_hour,
645 ));
646
647 s.push_str(
649 "# HELP futu_gateway_qot_subscribe_ops_total Quote subscribe operations\n# TYPE futu_gateway_qot_subscribe_ops_total counter\n",
650 );
651 s.push_str(&format!(
652 "futu_gateway_qot_subscribe_ops_total {}\n",
653 self.qot_subscribe_ops.load(Ordering::Relaxed)
654 ));
655 s.push_str(
656 "# HELP futu_gateway_qot_unsubscribe_ops_total Quote unsubscribe operations\n# TYPE futu_gateway_qot_unsubscribe_ops_total counter\n",
657 );
658 s.push_str(&format!(
659 "futu_gateway_qot_unsubscribe_ops_total {}\n",
660 self.qot_unsubscribe_ops.load(Ordering::Relaxed)
661 ));
662 s.push_str(
663 "# HELP futu_gateway_resubscribe_ops_total Re-subscribe ops after reconnect (legacy, == resubscribe_applied_keys)\n# TYPE futu_gateway_resubscribe_ops_total counter\n",
664 );
665 s.push_str(&format!(
666 "futu_gateway_resubscribe_ops_total {}\n",
667 self.resubscribe_ops.load(Ordering::Relaxed)
668 ));
669 s.push_str(
671 "# HELP futu_gateway_resubscribe_attempts_total Re-subscribe trigger count (each reconnect/staleness loop +=1)\n# TYPE futu_gateway_resubscribe_attempts_total counter\n",
672 );
673 s.push_str(&format!(
674 "futu_gateway_resubscribe_attempts_total {}\n",
675 self.resubscribe_attempts.load(Ordering::Relaxed)
676 ));
677 s.push_str(
678 "# HELP futu_gateway_resubscribe_applied_keys_total Re-subscribe applied keys total (cache resolve OK + backend ack OK)\n# TYPE futu_gateway_resubscribe_applied_keys_total counter\n",
679 );
680 s.push_str(&format!(
681 "futu_gateway_resubscribe_applied_keys_total {}\n",
682 self.resubscribe_applied_keys.load(Ordering::Relaxed)
683 ));
684
685 s.push_str(
688 "# HELP futu_gateway_cold_cache_wait_total Cold-cache wait entries (cache miss + IsSub)\n# TYPE futu_gateway_cold_cache_wait_total counter\n",
689 );
690 s.push_str(&format!(
691 "futu_gateway_cold_cache_wait_total {}\n",
692 self.cold_cache_wait_total.load(Ordering::Relaxed)
693 ));
694 s.push_str(
695 "# HELP futu_gateway_cold_cache_wait_hit_total Cold-cache wait hits (push filled cache within timeout)\n# TYPE futu_gateway_cold_cache_wait_hit_total counter\n",
696 );
697 s.push_str(&format!(
698 "futu_gateway_cold_cache_wait_hit_total {}\n",
699 self.cold_cache_wait_hit.load(Ordering::Relaxed)
700 ));
701 s.push_str(
702 "# HELP futu_gateway_cold_cache_wait_timeout_total Cold-cache wait timeouts (3s elapsed, cache still miss)\n# TYPE futu_gateway_cold_cache_wait_timeout_total counter\n",
703 );
704 s.push_str(&format!(
705 "futu_gateway_cold_cache_wait_timeout_total {}\n",
706 self.cold_cache_wait_timeout.load(Ordering::Relaxed)
707 ));
708
709 let lat = self.latency_stats();
711 s.push_str(
712 "# HELP futu_gateway_request_latency_us Request latency percentiles (microseconds, recent ring)\n# TYPE futu_gateway_request_latency_us gauge\n",
713 );
714 s.push_str(&format!(
715 "futu_gateway_request_latency_us{{quantile=\"p50\"}} {}\n",
716 lat.p50_us
717 ));
718 s.push_str(&format!(
719 "futu_gateway_request_latency_us{{quantile=\"p95\"}} {}\n",
720 lat.p95_us
721 ));
722 s.push_str(&format!(
723 "futu_gateway_request_latency_us{{quantile=\"p99\"}} {}\n",
724 lat.p99_us
725 ));
726 s.push_str(&format!(
727 "futu_gateway_request_latency_us{{quantile=\"max\"}} {}\n",
728 lat.max_us
729 ));
730
731 s
732 }
733}
734
735pub fn install_prometheus_extension(metrics: Arc<GatewayMetrics>) {
749 futu_auth::metrics::register_global_renderer(move || metrics.render_prometheus());
750}
751
752#[cfg(test)]
753mod tests;