1use std::sync::Arc;
14
15use dashmap::DashMap;
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::net::TcpListener;
18use tokio::sync::watch;
19
20use crate::conn::ClientConn;
21use crate::metrics::GatewayMetrics;
22use crate::subscription::SubscriptionManager;
23
24pub type ReloginFn = Arc<dyn Fn() + Send + Sync>;
37
38pub struct TelnetServer {
40 listen_addr: String,
41 connections: Arc<DashMap<u64, ClientConn>>,
42 subscriptions: Option<Arc<SubscriptionManager>>,
43 metrics: Option<Arc<GatewayMetrics>>,
44 shutdown_tx: watch::Sender<bool>,
45 relogin_fn: Option<ReloginFn>,
48}
49
50const MAX_TELNET_CONNECTIONS: usize = 5;
51const VERSION: &str = env!("CARGO_PKG_VERSION");
52
53impl TelnetServer {
54 pub fn new(
55 listen_addr: String,
56 connections: Arc<DashMap<u64, ClientConn>>,
57 subscriptions: Option<Arc<SubscriptionManager>>,
58 metrics: Option<Arc<GatewayMetrics>>,
59 shutdown_tx: watch::Sender<bool>,
60 ) -> Self {
61 Self {
62 listen_addr,
63 connections,
64 subscriptions,
65 metrics,
66 shutdown_tx,
67 relogin_fn: None,
68 }
69 }
70
71 pub fn with_relogin_fn(mut self, relogin_fn: ReloginFn) -> Self {
73 self.relogin_fn = Some(relogin_fn);
74 self
75 }
76
77 pub async fn run(&self) -> anyhow::Result<()> {
78 let listener = TcpListener::bind(&self.listen_addr).await?;
79 tracing::info!(addr = %self.listen_addr, "Telnet server listening");
80
81 let active = Arc::new(std::sync::atomic::AtomicUsize::new(0));
82
83 loop {
84 let (stream, peer_addr) = listener.accept().await?;
85 let count = active.load(std::sync::atomic::Ordering::Relaxed);
86 if count >= MAX_TELNET_CONNECTIONS {
87 tracing::warn!(peer = %peer_addr, "telnet max connections reached");
88 drop(stream);
89 continue;
90 }
91
92 active.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93 let active_clone = Arc::clone(&active);
94 let connections = Arc::clone(&self.connections);
95 let subscriptions = self.subscriptions.clone();
96 let metrics = self.metrics.clone();
97 let shutdown_tx = self.shutdown_tx.clone();
98 let relogin_fn = self.relogin_fn.clone();
100
101 tokio::spawn(async move {
102 tracing::info!(peer = %peer_addr, "telnet client connected");
103 let (reader, mut writer) = stream.into_split();
104 let mut reader = BufReader::new(reader);
105
106 let _ = writer
107 .write_all(
108 b"FutuOpenD Rust Gateway Telnet Console\r\nType 'help' for commands.\r\n> ",
109 )
110 .await;
111
112 let mut line = String::new();
113 loop {
114 line.clear();
115 match reader.read_line(&mut line).await {
116 Ok(0) => break, Ok(_) => {}
118 Err(_) => break,
119 }
120
121 let cmd = line.trim().to_lowercase();
122 let response = match cmd.as_str() {
123 "help" => "Available commands:\r\n\
124 help — show this help\r\n\
125 ping — connectivity test\r\n\
126 version — show version\r\n\
127 show_subinfo — show subscription info\r\n\
128 show_conn — show active connections\r\n\
129 show_metrics — show gateway metrics\r\n\
130 set_loglevel — change log level (debug/info/warn/error)\r\n\
131 relogin — clear login_cache; next P1-D tick triggers relogin (v1.4.97)\r\n\
132 exit — shutdown gateway\r\n"
133 .to_string(),
134 "ping" => "pong\r\n".to_string(),
135 "version" => format!("futu-opend-rs v{VERSION}\r\n"),
136 "show_subinfo" => {
137 if let Some(ref subs) = subscriptions {
138 let total_quota = subs.get_total_used_quota();
139 format!("Total subscription quota used: {total_quota}/4000\r\n")
140 } else {
141 "Subscription manager not available\r\n".to_string()
142 }
143 }
144 "show_conn" => {
145 let count = connections.len();
146 let mut info = format!("Active connections: {count}\r\n");
147 for entry in connections.iter() {
148 let c = entry.value();
149 info += &format!(" conn_id={} state={:?}\r\n", c.conn_id, c.state);
150 }
151 info
152 }
153 "show_metrics" => {
154 if let Some(ref m) = metrics {
155 m.report()
156 } else {
157 "Metrics not available\r\n".to_string()
158 }
159 }
160 s if s.starts_with("set_loglevel") => {
161 let parts: Vec<&str> = s.split_whitespace().collect();
162 if parts.len() < 2 {
163 "Usage: set_loglevel <debug|info|warn|error>\r\n".to_string()
164 } else {
165 let level = parts[1];
168 tracing::info!(level, "log level change requested via telnet");
169 format!(
170 "Log level change to '{level}' noted (restart for full effect)\r\n"
171 )
172 }
173 }
174 "relogin" => {
175 if let Some(ref f) = relogin_fn {
184 tracing::info!(peer = %peer_addr,
185 "v1.4.97 P1-D-F: relogin requested via telnet");
186 f();
187 "Relogin triggered. Watch daemon log for \
188 \"P1-D ladder fired\" + \"relogin succeeded\" \
189 in next 30s tick.\r\n"
190 .to_string()
191 } else {
192 "Relogin callback not wired (daemon was started \
193 without bridge handle). Relogin requires daemon \
194 restart in this configuration.\r\n"
195 .to_string()
196 }
197 }
198 "exit" => {
199 let _ = writer.write_all(b"Shutting down...\r\n").await;
200 let _ = shutdown_tx.send(true);
201 break;
202 }
203 "" => String::new(),
204 _ => format!(
205 "Unknown command: '{cmd}'. Type 'help' for available commands.\r\n"
206 ),
207 };
208
209 if !response.is_empty() {
210 let _ = writer.write_all(response.as_bytes()).await;
211 }
212 let _ = writer.write_all(b"> ").await;
213 }
214
215 active_clone.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
216 tracing::info!(peer = %peer_addr, "telnet client disconnected");
217 });
218 }
219 }
220}