Skip to main content

futu_server/
telnet.rs

1// Telnet 管理接口 — 运维调试用
2//
3// 支持的命令:
4//   help          — 显示帮助
5//   ping          — 连通测试
6//   version       — 版本信息
7//   show_subinfo  — 显示订阅状态
8//   set_loglevel  — 设置日志级别
9//   exit          — 关闭网关
10//
11// C++ 对齐: APIServer_Telnet.cpp (最大 5 个并发连接)
12
13use std::sync::Arc;
14
15use dashmap::DashMap;
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::net::TcpListener;
18use tokio::sync::watch;
19
20use crate::conn::ClientConn;
21use crate::metrics::GatewayMetrics;
22use crate::subscription::SubscriptionManager;
23
24/// v1.4.97 P1-D-F: telnet `relogin` 命令的回调.
25///
26/// 对齐 C++ `GTWCmd_ReLogin` (FTGateway/FTGTW_Define_Key.h:5 +
27/// GTWCmdAndPushReply.cpp:1780-1799) 命令名 — 不创新 `force_reconnect_backend`
28/// (per CLAUDE.md 坑 #51 "对齐 C++ = 减法").
29///
30/// `main.rs` wires 一个 closure 清 `bridge.caches.login_cache` (LoginCache::clear),
31/// 让下个 30s P1-D health tick 看到 `qot_logined=false` → 自动 trigger
32/// AuthRefresher relogin (走现有 v1.4.92 P1-D ladder 路径).
33///
34/// `Arc<dyn Fn>` 而非 trait object: 不需要 mutable state, 多 telnet client
35/// 并发调时是 idempotent (clear 已 cleared 的 cache 仍 OK).
36pub type ReloginFn = Arc<dyn Fn() + Send + Sync>;
37
38/// Telnet 管理服务
39pub struct TelnetServer {
40    listen_addr: String,
41    connections: Arc<DashMap<u64, ClientConn>>,
42    subscriptions: Option<Arc<SubscriptionManager>>,
43    metrics: Option<Arc<GatewayMetrics>>,
44    shutdown_tx: watch::Sender<bool>,
45    /// v1.4.97 P1-D-F: optional relogin callback (None = command unavailable
46    /// with helpful error msg).
47    relogin_fn: Option<ReloginFn>,
48}
49
50const MAX_TELNET_CONNECTIONS: usize = 5;
51const VERSION: &str = env!("CARGO_PKG_VERSION");
52
53impl TelnetServer {
54    pub fn new(
55        listen_addr: String,
56        connections: Arc<DashMap<u64, ClientConn>>,
57        subscriptions: Option<Arc<SubscriptionManager>>,
58        metrics: Option<Arc<GatewayMetrics>>,
59        shutdown_tx: watch::Sender<bool>,
60    ) -> Self {
61        Self {
62            listen_addr,
63            connections,
64            subscriptions,
65            metrics,
66            shutdown_tx,
67            relogin_fn: None,
68        }
69    }
70
71    /// v1.4.97 P1-D-F: builder-style setter for relogin callback.
72    pub fn with_relogin_fn(mut self, relogin_fn: ReloginFn) -> Self {
73        self.relogin_fn = Some(relogin_fn);
74        self
75    }
76
77    pub async fn run(&self) -> anyhow::Result<()> {
78        let listener = TcpListener::bind(&self.listen_addr).await?;
79        tracing::info!(addr = %self.listen_addr, "Telnet server listening");
80
81        let active = Arc::new(std::sync::atomic::AtomicUsize::new(0));
82
83        loop {
84            let (stream, peer_addr) = listener.accept().await?;
85            let count = active.load(std::sync::atomic::Ordering::Relaxed);
86            if count >= MAX_TELNET_CONNECTIONS {
87                tracing::warn!(peer = %peer_addr, "telnet max connections reached");
88                drop(stream);
89                continue;
90            }
91
92            active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93            let active_clone = Arc::clone(&active);
94            let connections = Arc::clone(&self.connections);
95            let subscriptions = self.subscriptions.clone();
96            let metrics = self.metrics.clone();
97            let shutdown_tx = self.shutdown_tx.clone();
98            // v1.4.97 P1-D-F: clone relogin callback into spawned task.
99            let relogin_fn = self.relogin_fn.clone();
100
101            tokio::spawn(async move {
102                tracing::info!(peer = %peer_addr, "telnet client connected");
103                let (reader, mut writer) = stream.into_split();
104                let mut reader = BufReader::new(reader);
105
106                let _ = writer
107                    .write_all(
108                        b"FutuOpenD Rust Gateway Telnet Console\r\nType 'help' for commands.\r\n> ",
109                    )
110                    .await;
111
112                let mut line = String::new();
113                loop {
114                    line.clear();
115                    match reader.read_line(&mut line).await {
116                        Ok(0) => break, // EOF
117                        Ok(_) => {}
118                        Err(_) => break,
119                    }
120
121                    let cmd = line.trim().to_lowercase();
122                    let response = match cmd.as_str() {
123                        "help" => "Available commands:\r\n\
124                             help           — show this help\r\n\
125                             ping           — connectivity test\r\n\
126                             version        — show version\r\n\
127                             show_subinfo   — show subscription info\r\n\
128                             show_conn      — show active connections\r\n\
129                             show_metrics   — show gateway metrics\r\n\
130                             set_loglevel   — change log level (debug/info/warn/error)\r\n\
131                             relogin        — clear login_cache; next P1-D tick triggers relogin (v1.4.97)\r\n\
132                             exit           — shutdown gateway\r\n"
133                            .to_string(),
134                        "ping" => "pong\r\n".to_string(),
135                        "version" => format!("futu-opend-rs v{VERSION}\r\n"),
136                        "show_subinfo" => {
137                            if let Some(ref subs) = subscriptions {
138                                let total_quota = subs.get_total_used_quota();
139                                format!("Total subscription quota used: {total_quota}/4000\r\n")
140                            } else {
141                                "Subscription manager not available\r\n".to_string()
142                            }
143                        }
144                        "show_conn" => {
145                            let count = connections.len();
146                            let mut info = format!("Active connections: {count}\r\n");
147                            for entry in connections.iter() {
148                                let c = entry.value();
149                                info += &format!("  conn_id={} state={:?}\r\n", c.conn_id, c.state);
150                            }
151                            info
152                        }
153                        "show_metrics" => {
154                            if let Some(ref m) = metrics {
155                                m.report()
156                            } else {
157                                "Metrics not available\r\n".to_string()
158                            }
159                        }
160                        s if s.starts_with("set_loglevel") => {
161                            let parts: Vec<&str> = s.split_whitespace().collect();
162                            if parts.len() < 2 {
163                                "Usage: set_loglevel <debug|info|warn|error>\r\n".to_string()
164                            } else {
165                                // tracing 动态级别调整需要 reload handle,
166                                // 简化实现:只记录意图
167                                let level = parts[1];
168                                tracing::info!(level, "log level change requested via telnet");
169                                format!(
170                                    "Log level change to '{level}' noted (restart for full effect)\r\n"
171                                )
172                            }
173                        }
174                        "relogin" => {
175                            // v1.4.97 P1-D-F: align with C++ GTWCmd_ReLogin
176                            // (FTGateway/FTGTW_Define_Key.h:5).
177                            //
178                            // Clear login_cache → next 30s health-loop tick
179                            // sees qot_logined=false → triggers AuthRefresher
180                            // relogin via existing v1.4.92 P1-D ladder path.
181                            // Tester: wait up to ~30s, see daemon log
182                            // "P1-D loop alive" + "relogin succeeded".
183                            if let Some(ref f) = relogin_fn {
184                                tracing::info!(peer = %peer_addr,
185                                    "v1.4.97 P1-D-F: relogin requested via telnet");
186                                f();
187                                "Relogin triggered. Watch daemon log for \
188                                 \"P1-D ladder fired\" + \"relogin succeeded\" \
189                                 in next 30s tick.\r\n"
190                                    .to_string()
191                            } else {
192                                "Relogin callback not wired (daemon was started \
193                                 without bridge handle). Relogin requires daemon \
194                                 restart in this configuration.\r\n"
195                                    .to_string()
196                            }
197                        }
198                        "exit" => {
199                            let _ = writer.write_all(b"Shutting down...\r\n").await;
200                            let _ = shutdown_tx.send(true);
201                            break;
202                        }
203                        "" => String::new(),
204                        _ => format!(
205                            "Unknown command: '{cmd}'. Type 'help' for available commands.\r\n"
206                        ),
207                    };
208
209                    if !response.is_empty() {
210                        let _ = writer.write_all(response.as_bytes()).await;
211                    }
212                    let _ = writer.write_all(b"> ").await;
213                }
214
215                active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
216                tracing::info!(peer = %peer_addr, "telnet client disconnected");
217            });
218        }
219    }
220}