futu_backend/
heartbeat.rs1use std::sync::Arc;
13use std::time::Duration;
14
15use crate::conn::BackendConn;
16use crate::proto_internal::ft_conn_heart_beat::HeartBeatReq;
17
18pub const CMD_HEARTBEAT_PLATFORM: u16 = 6003;
20pub const CMD_HEARTBEAT_BROKER: u16 = 1003;
22
23pub fn start_heartbeat(conn: Arc<BackendConn>, interval: Duration) -> tokio::task::JoinHandle<()> {
25 start_heartbeat_with_cmd(conn, interval, CMD_HEARTBEAT_PLATFORM, "platform")
26}
27
28pub fn start_broker_heartbeat(
30 conn: Arc<BackendConn>,
31 interval: Duration,
32) -> tokio::task::JoinHandle<()> {
33 start_heartbeat_with_cmd(conn, interval, CMD_HEARTBEAT_BROKER, "broker")
34}
35
36fn start_heartbeat_with_cmd(
38 conn: Arc<BackendConn>,
39 interval: Duration,
40 cmd_id: u16,
41 channel_name: &'static str,
42) -> tokio::task::JoinHandle<()> {
43 tokio::spawn(async move {
44 let mut ticker = tokio::time::interval(interval);
45 ticker.tick().await; let mut fail_count = 0u32;
47 const MAX_FAILURES: u32 = 3;
48
49 loop {
50 ticker.tick().await;
51
52 let req = HeartBeatReq {
53 pre_time_delay: Some(0),
54 };
55 let body = prost::Message::encode_to_vec(&req);
56
57 match conn.request(cmd_id, body).await {
58 Ok(resp) => {
59 tracing::trace!(
60 channel = channel_name,
61 cmd_id,
62 body_len = resp.body.len(),
63 "backend heartbeat ok"
64 );
65 fail_count = 0; }
67 Err(e) => {
68 fail_count += 1;
69 tracing::warn!(
70 channel = channel_name,
71 cmd_id,
72 error = %e,
73 fail_count,
74 max = MAX_FAILURES,
75 "backend heartbeat failed"
76 );
77 if fail_count >= MAX_FAILURES {
78 tracing::error!(
79 channel = channel_name,
80 "heartbeat failed {MAX_FAILURES} times, connection likely lost"
81 );
82 break;
83 }
84 }
85 }
86 }
87 })
88}