1use anyhow::Result;
2use clap::Parser;
3
4use futu_gateway::bridge::{GatewayBridge, GatewayConfig};
5use futu_server::listener::{ApiServer, ServerConfig};
6use futu_server::ws_listener::WsServer;
7
8#[derive(Parser, Debug)]
10#[command(name = "futu-opend", version, about = "FutuOpenD Rust Gateway")]
11struct Args {
12 #[arg(long)]
14 cfg_file: Option<String>,
15
16 #[arg(short = 'i', long)]
18 ip: Option<String>,
19
20 #[arg(short = 'p', long)]
22 port: Option<u16>,
23
24 #[arg(long)]
26 login_account: Option<String>,
27
28 #[arg(long)]
30 login_pwd: Option<String>,
31
32 #[arg(long)]
34 login_pwd_md5: Option<String>,
35
36 #[arg(long)]
38 login_region: Option<String>,
39
40 #[arg(long)]
42 auth_server: Option<String>,
43
44 #[arg(long)]
46 log_level: Option<String>,
47
48 #[arg(long)]
50 websocket_port: Option<u16>,
51
52 #[arg(long)]
54 telnet_port: Option<u16>,
55
56 #[arg(long)]
58 rest_port: Option<u16>,
59
60 #[arg(long)]
62 grpc_port: Option<u16>,
63
64 #[arg(long)]
66 rsa_private_key: Option<String>,
67
68 #[arg(long)]
70 json_log: bool,
71
72 #[arg(long)]
74 lang: Option<String>,
75
76 #[arg(long)]
80 rest_keys_file: Option<std::path::PathBuf>,
81
82 #[arg(long)]
86 grpc_keys_file: Option<std::path::PathBuf>,
87
88 #[arg(long)]
96 ws_keys_file: Option<std::path::PathBuf>,
97
98 #[arg(long)]
106 audit_log: Option<std::path::PathBuf>,
107}
108
109#[derive(Debug, Default, serde::Deserialize)]
111struct XmlConfig {
112 login_account: Option<String>,
114 login_pwd: Option<String>,
115 login_pwd_md5: Option<String>,
116 login_region: Option<String>,
117 ip: Option<String>,
119 #[serde(alias = "api_port")]
120 port: Option<u16>,
121 websocket_port: Option<u16>,
123 telnet_port: Option<u16>,
125 rest_port: Option<u16>,
127 grpc_port: Option<u16>,
129 rsa_private_key: Option<String>,
131 lang: Option<String>,
133 log_level: Option<String>,
134}
135
136fn load_xml_config(path: &str) -> Result<XmlConfig> {
138 let content = std::fs::read_to_string(path)?;
139 let config: XmlConfig = quick_xml::de::from_str(&content)?;
140 tracing::info!(path, "loaded XML config");
141 Ok(config)
142}
143
144struct RuntimeConfig {
146 ip: String,
147 port: u16,
148 login_account: Option<String>,
149 login_pwd: Option<String>,
150 #[expect(dead_code)]
151 login_pwd_md5: Option<String>,
152 login_region: String,
153 auth_server: String,
154 log_level: String,
155 websocket_port: Option<u16>,
156 telnet_port: Option<u16>,
157 rest_port: Option<u16>,
158 grpc_port: Option<u16>,
159 rsa_private_key: Option<String>,
160 json_log: bool,
161 lang: String,
162}
163
164fn merge_config(args: Args) -> Result<RuntimeConfig> {
166 let xml = if let Some(ref path) = args.cfg_file {
167 load_xml_config(path).unwrap_or_else(|e| {
168 eprintln!("warning: failed to load config file {path}: {e}");
169 XmlConfig::default()
170 })
171 } else {
172 let exe_dir = std::env::current_exe()
174 .ok()
175 .and_then(|p| p.parent().map(|d| d.to_path_buf()));
176 let auto_path = exe_dir.map(|d| d.join("FutuOpenD.xml"));
177 if let Some(ref path) = auto_path {
178 if path.exists() {
179 load_xml_config(&path.to_string_lossy()).unwrap_or_default()
180 } else {
181 XmlConfig::default()
182 }
183 } else {
184 XmlConfig::default()
185 }
186 };
187
188 Ok(RuntimeConfig {
189 ip: args.ip.or(xml.ip).unwrap_or_else(|| "0.0.0.0".to_string()),
190 port: args.port.or(xml.port).unwrap_or(11111),
191 login_account: args.login_account.or(xml.login_account),
192 login_pwd: args.login_pwd.or(xml.login_pwd),
193 login_pwd_md5: args.login_pwd_md5.or(xml.login_pwd_md5),
194 login_region: args
195 .login_region
196 .or(xml.login_region)
197 .unwrap_or_else(|| "gz".to_string()),
198 auth_server: args
199 .auth_server
200 .unwrap_or_else(|| "https://auth.futunn.com".to_string()),
201 log_level: args
202 .log_level
203 .or(xml.log_level)
204 .unwrap_or_else(|| "info".to_string()),
205 websocket_port: args.websocket_port.or(xml.websocket_port),
206 telnet_port: args.telnet_port.or(xml.telnet_port),
207 rest_port: args.rest_port.or(xml.rest_port),
208 grpc_port: args.grpc_port.or(xml.grpc_port),
209 rsa_private_key: {
210 let key_path = args.rsa_private_key.or(xml.rsa_private_key);
211 if let Some(ref path) = key_path {
212 match std::fs::read_to_string(path) {
213 Ok(pem) => {
214 eprintln!("loaded RSA private key from {path}");
215 Some(pem)
216 }
217 Err(e) => {
218 eprintln!("warning: failed to load RSA private key from {path}: {e}");
219 None
220 }
221 }
222 } else {
223 None
224 }
225 },
226 json_log: args.json_log,
227 lang: args.lang.or(xml.lang).unwrap_or_else(|| "chs".to_string()),
228 })
229}
230
231#[tokio::main]
232async fn main() -> Result<()> {
233 let args = Args::parse();
234 let rest_keys_file = args.rest_keys_file.clone();
235 let ws_keys_file = args.ws_keys_file.clone();
236 let grpc_keys_file = args.grpc_keys_file.clone();
237 let audit_log = args.audit_log.clone();
238 let config = merge_config(args)?;
239
240 let _audit_guard = if config.json_log {
243 if audit_log.is_some() {
244 eprintln!("warning: --audit-log is ignored when --json-log is set (main subscriber already JSON)");
245 }
246 futu_core::log::init_json_logging_with_level(&config.log_level);
247 None
248 } else {
249 match futu_core::log::init_logging_with_audit(&config.log_level, audit_log.as_deref()) {
250 Ok(guard) => {
251 if let (Some(path), Some(_)) = (audit_log.as_ref(), guard.as_ref()) {
252 tracing::info!(
253 path = %path.display(),
254 "audit JSONL logger enabled (target=futu_audit → file)"
255 );
256 }
257 guard
258 }
259 Err(e) => {
260 eprintln!("warning: failed to init audit log: {e}");
261 futu_core::log::init_logging_with_level(&config.log_level);
262 None
263 }
264 }
265 };
266
267 futu_auth::metrics::install(std::sync::Arc::new(futu_auth::MetricsRegistry::default()));
270
271 let shared_counters = std::sync::Arc::new(futu_auth::RuntimeCounters::new());
275
276 let listen_addr = format!("{}:{}", config.ip, config.port);
277 tracing::info!(addr = %listen_addr, "starting FutuOpenD Rust Gateway");
278
279 let mut bridge = GatewayBridge::new();
281 let mut push_receiver = None;
282
283 let password = config.login_pwd.clone();
285
286 if let (Some(account), Some(password)) = (&config.login_account, &password) {
287 let device_id = {
289 let hash = format!(
290 "{:x}",
291 md5::compute(format!("futu-opend-rs-{}", account).as_bytes())
292 );
293 hash[..16].to_string()
294 };
295 tracing::info!(account = %account, device_id = %device_id, "login credentials");
296 let app_lang = match config.lang.to_lowercase().as_str() {
297 "chs" => 0, "cht" => 1, "en" => 2, _ => 0, };
302 let gw_config = GatewayConfig {
303 auth_server: config.auth_server.clone(),
304 account: account.clone(),
305 password: password.clone(),
306 region: config.login_region.clone(),
307 listen_addr: listen_addr.clone(),
308 device_id,
309 app_lang,
310 };
311
312 match bridge.initialize(&gw_config, None).await {
313 Ok(push_rx) => {
314 push_receiver = Some(push_rx);
315 }
316 Err(e) => {
317 tracing::error!(error = %e, "gateway initialization failed, starting in offline mode");
318 }
319 }
320 } else {
321 tracing::warn!("no login credentials provided, starting in offline mode");
322 tracing::warn!("use --login-account and --login-pwd to connect to backend");
323 }
324
325 let user_id = bridge
327 .login_cache
328 .get_login_state()
329 .map(|s| s.user_id as u64)
330 .unwrap_or(0);
331
332 let server_config = ServerConfig {
333 listen_addr: listen_addr.clone(),
334 server_ver: 1000,
335 login_user_id: user_id,
336 keepalive_interval: 10,
337 rsa_private_key: config.rsa_private_key.clone(),
338 };
339 if server_config.rsa_private_key.is_some() {
340 tracing::info!("RSA encryption enabled for InitConnect");
341 }
342 let mut server = ApiServer::new(server_config.clone());
343 server.set_metrics(std::sync::Arc::clone(&bridge.metrics));
344 server.set_subscriptions(std::sync::Arc::clone(&bridge.subscriptions));
345
346 bridge.register_handlers(&server);
348
349 let ws_broadcaster = std::sync::Arc::new(futu_rest::ws::WsBroadcaster::new(1024));
351 let grpc_broadcaster = std::sync::Arc::new(futu_grpc::server::GrpcPushBroadcaster::new(1024));
352
353 if let Some(push_rx) = push_receiver {
357 let sinks: Vec<std::sync::Arc<dyn futu_server::push::ExternalPushSink>> = vec![
358 std::sync::Arc::clone(&ws_broadcaster) as _,
359 std::sync::Arc::clone(&grpc_broadcaster) as _,
360 ];
361 bridge.start_push_dispatcher(&server, push_rx, sinks);
362 tracing::info!("push dispatcher started (with WebSocket + gRPC broadcast)");
363 }
364
365 let ws_handle = if let Some(ws_port) = config.websocket_port {
367 let ws_addr = format!("{}:{}", config.ip, ws_port);
368 let ws_key_store = match &ws_keys_file {
371 Some(path) => match futu_auth::KeyStore::load(path) {
372 Ok(ks) => {
373 tracing::info!(
374 path = %path.display(),
375 keys_loaded = ks.len(),
376 "WS keys file loaded (Bearer/?token auth enabled)"
377 );
378 Some(std::sync::Arc::new(ks))
379 }
380 Err(e) => {
381 tracing::error!(error = %e, "failed to load WS keys file; continuing WITHOUT auth");
382 None
383 }
384 },
385 None => None,
386 };
387 let ws_counters = std::sync::Arc::clone(&shared_counters);
388 let ws_server = WsServer::with_auth(
389 ws_addr.clone(),
390 server_config.clone(),
391 std::sync::Arc::clone(server.connections()),
392 std::sync::Arc::clone(server.router()),
393 Some(std::sync::Arc::clone(&bridge.subscriptions)),
394 ws_key_store,
395 Some(ws_counters),
396 );
397 tracing::info!(addr = %ws_addr, "starting WebSocket server");
398 Some(tokio::spawn(async move {
399 if let Err(e) = ws_server.run().await {
400 tracing::error!(error = %e, "WebSocket server error");
401 }
402 }))
403 } else {
404 None
405 };
406
407 let rest_handle = if let Some(rest_port) = config.rest_port {
409 let rest_addr = format!("{}:{}", config.ip, rest_port);
410 let router = std::sync::Arc::clone(server.router());
411 let broadcaster = std::sync::Arc::clone(&ws_broadcaster);
412 let rest_key_store = match &rest_keys_file {
413 Some(path) => match futu_auth::KeyStore::load(path) {
414 Ok(ks) => {
415 tracing::info!(
416 path = %path.display(),
417 keys_loaded = ks.len(),
418 "REST keys file loaded (Bearer auth enabled)"
419 );
420 std::sync::Arc::new(ks)
421 }
422 Err(e) => {
423 tracing::error!(error = %e, "failed to load REST keys file; continuing WITHOUT auth");
424 std::sync::Arc::new(futu_auth::KeyStore::empty())
425 }
426 },
427 None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
428 };
429 tracing::info!(addr = %rest_addr, "starting REST API server (WebSocket: /ws)");
430
431 #[cfg(unix)]
433 {
434 let ks = std::sync::Arc::clone(&rest_key_store);
435 if ks.is_configured() {
436 tokio::spawn(async move {
437 use tokio::signal::unix::{signal, SignalKind};
438 let mut sig = match signal(SignalKind::hangup()) {
439 Ok(s) => s,
440 Err(e) => {
441 tracing::error!(error = %e, "SIGHUP install failed (REST)");
442 return;
443 }
444 };
445 tracing::info!("REST keys: SIGHUP handler installed");
446 while sig.recv().await.is_some() {
447 match ks.reload() {
448 Ok(()) => tracing::warn!(
449 keys_loaded = ks.len(),
450 "REST keys reloaded on SIGHUP"
451 ),
452 Err(e) => {
453 tracing::error!(error = %e, "REST keys reload failed")
454 }
455 }
456 }
457 });
458 }
459 }
460
461 let rest_counters = std::sync::Arc::clone(&shared_counters);
462 Some(tokio::spawn(async move {
463 if let Err(e) = futu_rest::server::start_with_auth(
464 &rest_addr,
465 router,
466 broadcaster,
467 rest_key_store,
468 rest_counters,
469 )
470 .await
471 {
472 tracing::error!(error = %e, "REST API server error");
473 }
474 }))
475 } else {
476 None
477 };
478
479 let grpc_handle = if let Some(grpc_port) = config.grpc_port {
481 let grpc_addr = format!("{}:{}", config.ip, grpc_port);
482 let router = std::sync::Arc::clone(server.router());
483 let broadcaster = std::sync::Arc::clone(&grpc_broadcaster);
484 let grpc_key_store = match &grpc_keys_file {
485 Some(path) => match futu_auth::KeyStore::load(path) {
486 Ok(ks) => {
487 tracing::info!(
488 path = %path.display(),
489 keys_loaded = ks.len(),
490 "gRPC keys file loaded (Bearer auth enabled)"
491 );
492 std::sync::Arc::new(ks)
493 }
494 Err(e) => {
495 tracing::error!(error = %e, "failed to load gRPC keys file; continuing WITHOUT auth");
496 std::sync::Arc::new(futu_auth::KeyStore::empty())
497 }
498 },
499 None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
500 };
501 tracing::info!(addr = %grpc_addr, "starting gRPC server (SubscribePush: streaming)");
502
503 #[cfg(unix)]
505 {
506 let ks = std::sync::Arc::clone(&grpc_key_store);
507 if ks.is_configured() {
508 tokio::spawn(async move {
509 use tokio::signal::unix::{signal, SignalKind};
510 let mut sig = match signal(SignalKind::hangup()) {
511 Ok(s) => s,
512 Err(e) => {
513 tracing::error!(error = %e, "SIGHUP install failed (gRPC)");
514 return;
515 }
516 };
517 tracing::info!("gRPC keys: SIGHUP handler installed");
518 while sig.recv().await.is_some() {
519 match ks.reload() {
520 Ok(()) => tracing::warn!(
521 keys_loaded = ks.len(),
522 "gRPC keys reloaded on SIGHUP"
523 ),
524 Err(e) => {
525 tracing::error!(error = %e, "gRPC keys reload failed")
526 }
527 }
528 }
529 });
530 }
531 }
532
533 let grpc_counters = std::sync::Arc::clone(&shared_counters);
534 Some(tokio::spawn(async move {
535 if let Err(e) = futu_grpc::server::start_with_auth(
536 &grpc_addr,
537 router,
538 broadcaster,
539 grpc_key_store,
540 grpc_counters,
541 )
542 .await
543 {
544 tracing::error!(error = %e, "gRPC server error");
545 }
546 }))
547 } else {
548 None
549 };
550
551 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
553 let telnet_handle = if let Some(telnet_port) = config.telnet_port {
554 let telnet_addr = format!("{}:{}", config.ip, telnet_port);
555 let telnet_server = futu_server::telnet::TelnetServer::new(
556 telnet_addr.clone(),
557 std::sync::Arc::clone(server.connections()),
558 Some(std::sync::Arc::clone(&bridge.subscriptions)),
559 Some(std::sync::Arc::clone(server.metrics())),
560 shutdown_tx,
561 );
562 tracing::info!(addr = %telnet_addr, "starting Telnet server");
563 Some(tokio::spawn(async move {
564 if let Err(e) = telnet_server.run().await {
565 tracing::error!(error = %e, "Telnet server error");
566 }
567 }))
568 } else {
569 None
570 };
571
572 tracing::info!("gateway ready, accepting connections on {listen_addr}");
573 tracing::info!("press Ctrl+C to exit");
574
575 tokio::select! {
577 result = server.run() => {
578 if let Err(e) = result {
579 tracing::error!(error = %e, "API server error");
580 }
581 }
582 _ = tokio::signal::ctrl_c() => {
583 tracing::info!("received Ctrl+C, shutting down gracefully...");
584 }
585 _ = async {
586 while shutdown_rx.changed().await.is_ok() {
587 if *shutdown_rx.borrow() {
588 break;
589 }
590 }
591 } => {
592 tracing::info!("shutdown requested via telnet");
593 }
594 }
595
596 if let Some(handle) = ws_handle {
598 handle.abort();
599 }
600 if let Some(handle) = rest_handle {
601 handle.abort();
602 }
603 if let Some(handle) = grpc_handle {
604 handle.abort();
605 }
606 if let Some(handle) = telnet_handle {
607 handle.abort();
608 }
609
610 tracing::info!("gateway stopped");
611 Ok(())
612}