futu_opend/
main.rs

1use anyhow::Result;
2use clap::Parser;
3
4use futu_gateway::bridge::{GatewayBridge, GatewayConfig};
5use futu_server::listener::{ApiServer, ServerConfig};
6use futu_server::ws_listener::WsServer;
7
8/// FutuOpenD Rust Gateway — 完全替代 C++ OpenD
9#[derive(Parser, Debug)]
10#[command(name = "futu-opend", version, about = "FutuOpenD Rust Gateway")]
11struct Args {
12    /// XML 配置文件路径 (兼容 C++ FutuOpenD.xml)
13    #[arg(long)]
14    cfg_file: Option<String>,
15
16    /// API 服务监听地址
17    #[arg(short = 'i', long)]
18    ip: Option<String>,
19
20    /// API 服务监听端口
21    #[arg(short = 'p', long)]
22    port: Option<u16>,
23
24    /// 登录账号
25    #[arg(long)]
26    login_account: Option<String>,
27
28    /// 登录密码明文
29    #[arg(long)]
30    login_pwd: Option<String>,
31
32    /// 登录密码 MD5 (32 位小写 hex)
33    #[arg(long)]
34    login_pwd_md5: Option<String>,
35
36    /// 后端连接区域 (gz/sh/hk)
37    #[arg(long)]
38    login_region: Option<String>,
39
40    /// 认证服务器 URL
41    #[arg(long)]
42    auth_server: Option<String>,
43
44    /// 日志级别
45    #[arg(long)]
46    log_level: Option<String>,
47
48    /// WebSocket 服务监听端口(可选,不指定则不启动 WebSocket)
49    #[arg(long)]
50    websocket_port: Option<u16>,
51
52    /// Telnet 管理端口(可选,不指定则不启动 Telnet)
53    #[arg(long)]
54    telnet_port: Option<u16>,
55
56    /// REST API 监听端口(可选,不指定则不启动 REST API)
57    #[arg(long)]
58    rest_port: Option<u16>,
59
60    /// gRPC 服务监听端口(可选,不指定则不启动 gRPC)
61    #[arg(long)]
62    grpc_port: Option<u16>,
63
64    /// RSA 私钥文件路径(PEM 格式,启用后 InitConnect 使用 RSA 加解密)
65    #[arg(long)]
66    rsa_private_key: Option<String>,
67
68    /// JSON 格式日志
69    #[arg(long)]
70    json_log: bool,
71
72    /// 界面语言 (chs=简体中文, cht=繁体中文, en=英文)
73    #[arg(long)]
74    lang: Option<String>,
75
76    /// REST API Bearer Token 鉴权:加载 keys.json(futucli gen-key 生成)
77    ///
78    /// 不指定时 REST API 无鉴权(保持旧行为,启动 warn)。
79    #[arg(long)]
80    rest_keys_file: Option<std::path::PathBuf>,
81
82    /// gRPC Bearer Token 鉴权:加载 keys.json(futucli gen-key 生成)
83    ///
84    /// 不指定时 gRPC 无鉴权。通常与 --rest-keys-file 指向同一文件。
85    #[arg(long)]
86    grpc_keys_file: Option<std::path::PathBuf>,
87
88    /// 核心 WebSocket Bearer Token 鉴权:加载 keys.json
89    ///
90    /// v1.0 起核心 WS(`--websocket-port`,Futu SDK 使用的 binary WS)支持
91    /// 握手 + per-message scope 鉴权。客户端用 `?token=<plaintext>` query 或
92    /// `Authorization: Bearer <plaintext>` header 传 key。不指定这个 flag 时
93    /// WS 无鉴权(legacy 保持兼容,启动 warn)。通常与 `--rest-keys-file` 指向
94    /// 同一文件。
95    #[arg(long)]
96    ws_keys_file: Option<std::path::PathBuf>,
97
98    /// 审计日志输出:JSONL 文件路径或目录
99    ///
100    /// - 带扩展名的路径(如 `/var/log/futu-audit.jsonl`)→ 单文件 append
101    /// - 不带扩展名 / 以 `/` 结尾(如 `/var/log/futu-audit/`)→ 每日滚动,
102    ///   文件名 `futu-audit.log` + 日期后缀
103    ///
104    /// 只记录 auth / 交易 事件(target = "futu_audit"),常规日志不受影响。
105    #[arg(long)]
106    audit_log: Option<std::path::PathBuf>,
107}
108
109// ===== XML 配置文件解析 (兼容 C++ FutuOpenD.xml) =====
110#[derive(Debug, Default, serde::Deserialize)]
111struct XmlConfig {
112    // 登录参数
113    login_account: Option<String>,
114    login_pwd: Option<String>,
115    login_pwd_md5: Option<String>,
116    login_region: Option<String>,
117    // 服务监听
118    ip: Option<String>,
119    #[serde(alias = "api_port")]
120    port: Option<u16>,
121    // WebSocket
122    websocket_port: Option<u16>,
123    // Telnet
124    telnet_port: Option<u16>,
125    // REST API
126    rest_port: Option<u16>,
127    // gRPC
128    grpc_port: Option<u16>,
129    // RSA
130    rsa_private_key: Option<String>,
131    // 系统
132    lang: Option<String>,
133    log_level: Option<String>,
134}
135
136/// 从 XML 配置文件读取配置 (兼容 C++ <futu_opend> 格式)
137fn load_xml_config(path: &str) -> Result<XmlConfig> {
138    let content = std::fs::read_to_string(path)?;
139    let config: XmlConfig = quick_xml::de::from_str(&content)?;
140    tracing::info!(path, "loaded XML config");
141    Ok(config)
142}
143
144/// 合并后的运行时配置
145struct RuntimeConfig {
146    ip: String,
147    port: u16,
148    login_account: Option<String>,
149    login_pwd: Option<String>,
150    #[expect(dead_code)]
151    login_pwd_md5: Option<String>,
152    login_region: String,
153    auth_server: String,
154    log_level: String,
155    websocket_port: Option<u16>,
156    telnet_port: Option<u16>,
157    rest_port: Option<u16>,
158    grpc_port: Option<u16>,
159    rsa_private_key: Option<String>,
160    json_log: bool,
161    lang: String,
162}
163
164/// 合并 CLI args + XML config (CLI 优先)
165fn merge_config(args: Args) -> Result<RuntimeConfig> {
166    let xml = if let Some(ref path) = args.cfg_file {
167        load_xml_config(path).unwrap_or_else(|e| {
168            eprintln!("warning: failed to load config file {path}: {e}");
169            XmlConfig::default()
170        })
171    } else {
172        // 尝试自动检测同目录下的 FutuOpenD.xml
173        let exe_dir = std::env::current_exe()
174            .ok()
175            .and_then(|p| p.parent().map(|d| d.to_path_buf()));
176        let auto_path = exe_dir.map(|d| d.join("FutuOpenD.xml"));
177        if let Some(ref path) = auto_path {
178            if path.exists() {
179                load_xml_config(&path.to_string_lossy()).unwrap_or_default()
180            } else {
181                XmlConfig::default()
182            }
183        } else {
184            XmlConfig::default()
185        }
186    };
187
188    Ok(RuntimeConfig {
189        ip: args.ip.or(xml.ip).unwrap_or_else(|| "0.0.0.0".to_string()),
190        port: args.port.or(xml.port).unwrap_or(11111),
191        login_account: args.login_account.or(xml.login_account),
192        login_pwd: args.login_pwd.or(xml.login_pwd),
193        login_pwd_md5: args.login_pwd_md5.or(xml.login_pwd_md5),
194        login_region: args
195            .login_region
196            .or(xml.login_region)
197            .unwrap_or_else(|| "gz".to_string()),
198        auth_server: args
199            .auth_server
200            .unwrap_or_else(|| "https://auth.futunn.com".to_string()),
201        log_level: args
202            .log_level
203            .or(xml.log_level)
204            .unwrap_or_else(|| "info".to_string()),
205        websocket_port: args.websocket_port.or(xml.websocket_port),
206        telnet_port: args.telnet_port.or(xml.telnet_port),
207        rest_port: args.rest_port.or(xml.rest_port),
208        grpc_port: args.grpc_port.or(xml.grpc_port),
209        rsa_private_key: {
210            let key_path = args.rsa_private_key.or(xml.rsa_private_key);
211            if let Some(ref path) = key_path {
212                match std::fs::read_to_string(path) {
213                    Ok(pem) => {
214                        eprintln!("loaded RSA private key from {path}");
215                        Some(pem)
216                    }
217                    Err(e) => {
218                        eprintln!("warning: failed to load RSA private key from {path}: {e}");
219                        None
220                    }
221                }
222            } else {
223                None
224            }
225        },
226        json_log: args.json_log,
227        lang: args.lang.or(xml.lang).unwrap_or_else(|| "chs".to_string()),
228    })
229}
230
231#[tokio::main]
232async fn main() -> Result<()> {
233    let args = Args::parse();
234    let rest_keys_file = args.rest_keys_file.clone();
235    let ws_keys_file = args.ws_keys_file.clone();
236    let grpc_keys_file = args.grpc_keys_file.clone();
237    let audit_log = args.audit_log.clone();
238    let config = merge_config(args)?;
239
240    // 1. 初始化日志(--log-level 参数生效,RUST_LOG 环境变量优先)
241    // audit 日志 guard 必须活到进程退出,否则 tracing-appender 后台线程可能丢事件。
242    let _audit_guard = if config.json_log {
243        if audit_log.is_some() {
244            eprintln!("warning: --audit-log is ignored when --json-log is set (main subscriber already JSON)");
245        }
246        futu_core::log::init_json_logging_with_level(&config.log_level);
247        None
248    } else {
249        match futu_core::log::init_logging_with_audit(&config.log_level, audit_log.as_deref()) {
250            Ok(guard) => {
251                if let (Some(path), Some(_)) = (audit_log.as_ref(), guard.as_ref()) {
252                    tracing::info!(
253                        path = %path.display(),
254                        "audit JSONL logger enabled (target=futu_audit → file)"
255                    );
256                }
257                guard
258            }
259            Err(e) => {
260                eprintln!("warning: failed to init audit log: {e}");
261                futu_core::log::init_logging_with_level(&config.log_level);
262                None
263            }
264        }
265    };
266
267    // 2. install 全局 metrics registry(让 audit::* 的 counter hook 和
268    //    REST `/metrics` 端点能对齐同一套计数器)
269    futu_auth::metrics::install(std::sync::Arc::new(futu_auth::MetricsRegistry::default()));
270
271    // 2.1 共享 RuntimeCounters:REST / gRPC 共用一个,这样 rate limit 和日累计
272    //     跨接口一致(同一把 key 通过 REST 下 3 单、gRPC 下 3 单,rate 窗口
273    //     看到 6 单,不是各看 3 单)
274    let shared_counters = std::sync::Arc::new(futu_auth::RuntimeCounters::new());
275
276    let listen_addr = format!("{}:{}", config.ip, config.port);
277    tracing::info!(addr = %listen_addr, "starting FutuOpenD Rust Gateway");
278
279    // 2. 创建并初始化业务桥接层
280    let mut bridge = GatewayBridge::new();
281    let mut push_receiver = None;
282
283    // 需要明文密码(auth 内部做 MD5)
284    let password = config.login_pwd.clone();
285
286    if let (Some(account), Some(password)) = (&config.login_account, &password) {
287        // device_id 必须 ≤16 字符(tgtgt 只取前 16 字节,URL 中也发同样的值)
288        let device_id = {
289            let hash = format!(
290                "{:x}",
291                md5::compute(format!("futu-opend-rs-{}", account).as_bytes())
292            );
293            hash[..16].to_string()
294        };
295        tracing::info!(account = %account, device_id = %device_id, "login credentials");
296        let app_lang = match config.lang.to_lowercase().as_str() {
297            "chs" => 0, // 简体中文
298            "cht" => 1, // 繁体中文
299            "en" => 2,  // 英文
300            _ => 0,     // 默认简体
301        };
302        let gw_config = GatewayConfig {
303            auth_server: config.auth_server.clone(),
304            account: account.clone(),
305            password: password.clone(),
306            region: config.login_region.clone(),
307            listen_addr: listen_addr.clone(),
308            device_id,
309            app_lang,
310        };
311
312        match bridge.initialize(&gw_config, None).await {
313            Ok(push_rx) => {
314                push_receiver = Some(push_rx);
315            }
316            Err(e) => {
317                tracing::error!(error = %e, "gateway initialization failed, starting in offline mode");
318            }
319        }
320    } else {
321        tracing::warn!("no login credentials provided, starting in offline mode");
322        tracing::warn!("use --login-account and --login-pwd to connect to backend");
323    }
324
325    // 3. 创建 API 服务端
326    let user_id = bridge
327        .login_cache
328        .get_login_state()
329        .map(|s| s.user_id as u64)
330        .unwrap_or(0);
331
332    let server_config = ServerConfig {
333        listen_addr: listen_addr.clone(),
334        server_ver: 1000,
335        login_user_id: user_id,
336        keepalive_interval: 10,
337        rsa_private_key: config.rsa_private_key.clone(),
338    };
339    if server_config.rsa_private_key.is_some() {
340        tracing::info!("RSA encryption enabled for InitConnect");
341    }
342    let mut server = ApiServer::new(server_config.clone());
343    server.set_metrics(std::sync::Arc::clone(&bridge.metrics));
344    server.set_subscriptions(std::sync::Arc::clone(&bridge.subscriptions));
345
346    // 4. 注册业务处理器
347    bridge.register_handlers(&server);
348
349    // 5. 创建推送广播器 (REST WebSocket + gRPC)
350    let ws_broadcaster = std::sync::Arc::new(futu_rest::ws::WsBroadcaster::new(1024));
351    let grpc_broadcaster = std::sync::Arc::new(futu_grpc::server::GrpcPushBroadcaster::new(1024));
352
353    // 6. 启动推送分发 (push_callback → channel → PushDispatcher → 客户端 + WebSocket + gRPC)
354    //    PushDispatcher 内部通过 ExternalPushSink 自动转发所有推送,
355    //    包括行情、广播、交易推送 (UpdateOrder/UpdateOrderFill)
356    if let Some(push_rx) = push_receiver {
357        let sinks: Vec<std::sync::Arc<dyn futu_server::push::ExternalPushSink>> = vec![
358            std::sync::Arc::clone(&ws_broadcaster) as _,
359            std::sync::Arc::clone(&grpc_broadcaster) as _,
360        ];
361        bridge.start_push_dispatcher(&server, push_rx, sinks);
362        tracing::info!("push dispatcher started (with WebSocket + gRPC broadcast)");
363    }
364
365    // 7. 启动 WebSocket 服务(可选)
366    let ws_handle = if let Some(ws_port) = config.websocket_port {
367        let ws_addr = format!("{}:{}", config.ip, ws_port);
368        // v1.0:WS 握手鉴权 —— 复用 REST 的 key store 设计(`--ws-keys-file` 独立
369        // 指定,不指定时 legacy 放行)
370        let ws_key_store = match &ws_keys_file {
371            Some(path) => match futu_auth::KeyStore::load(path) {
372                Ok(ks) => {
373                    tracing::info!(
374                        path = %path.display(),
375                        keys_loaded = ks.len(),
376                        "WS keys file loaded (Bearer/?token auth enabled)"
377                    );
378                    Some(std::sync::Arc::new(ks))
379                }
380                Err(e) => {
381                    tracing::error!(error = %e, "failed to load WS keys file; continuing WITHOUT auth");
382                    None
383                }
384            },
385            None => None,
386        };
387        let ws_counters = std::sync::Arc::clone(&shared_counters);
388        let ws_server = WsServer::with_auth(
389            ws_addr.clone(),
390            server_config.clone(),
391            std::sync::Arc::clone(server.connections()),
392            std::sync::Arc::clone(server.router()),
393            Some(std::sync::Arc::clone(&bridge.subscriptions)),
394            ws_key_store,
395            Some(ws_counters),
396        );
397        tracing::info!(addr = %ws_addr, "starting WebSocket server");
398        Some(tokio::spawn(async move {
399            if let Err(e) = ws_server.run().await {
400                tracing::error!(error = %e, "WebSocket server error");
401            }
402        }))
403    } else {
404        None
405    };
406
407    // 8. 启动 REST API 服务(可选,含 WebSocket 推送)
408    let rest_handle = if let Some(rest_port) = config.rest_port {
409        let rest_addr = format!("{}:{}", config.ip, rest_port);
410        let router = std::sync::Arc::clone(server.router());
411        let broadcaster = std::sync::Arc::clone(&ws_broadcaster);
412        let rest_key_store = match &rest_keys_file {
413            Some(path) => match futu_auth::KeyStore::load(path) {
414                Ok(ks) => {
415                    tracing::info!(
416                        path = %path.display(),
417                        keys_loaded = ks.len(),
418                        "REST keys file loaded (Bearer auth enabled)"
419                    );
420                    std::sync::Arc::new(ks)
421                }
422                Err(e) => {
423                    tracing::error!(error = %e, "failed to load REST keys file; continuing WITHOUT auth");
424                    std::sync::Arc::new(futu_auth::KeyStore::empty())
425                }
426            },
427            None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
428        };
429        tracing::info!(addr = %rest_addr, "starting REST API server (WebSocket: /ws)");
430
431        // SIGHUP 热重载 REST keys.json(unix only)
432        #[cfg(unix)]
433        {
434            let ks = std::sync::Arc::clone(&rest_key_store);
435            if ks.is_configured() {
436                tokio::spawn(async move {
437                    use tokio::signal::unix::{signal, SignalKind};
438                    let mut sig = match signal(SignalKind::hangup()) {
439                        Ok(s) => s,
440                        Err(e) => {
441                            tracing::error!(error = %e, "SIGHUP install failed (REST)");
442                            return;
443                        }
444                    };
445                    tracing::info!("REST keys: SIGHUP handler installed");
446                    while sig.recv().await.is_some() {
447                        match ks.reload() {
448                            Ok(()) => tracing::warn!(
449                                keys_loaded = ks.len(),
450                                "REST keys reloaded on SIGHUP"
451                            ),
452                            Err(e) => {
453                                tracing::error!(error = %e, "REST keys reload failed")
454                            }
455                        }
456                    }
457                });
458            }
459        }
460
461        let rest_counters = std::sync::Arc::clone(&shared_counters);
462        Some(tokio::spawn(async move {
463            if let Err(e) = futu_rest::server::start_with_auth(
464                &rest_addr,
465                router,
466                broadcaster,
467                rest_key_store,
468                rest_counters,
469            )
470            .await
471            {
472                tracing::error!(error = %e, "REST API server error");
473            }
474        }))
475    } else {
476        None
477    };
478
479    // 9. 启动 gRPC 服务(可选,含流式推送)
480    let grpc_handle = if let Some(grpc_port) = config.grpc_port {
481        let grpc_addr = format!("{}:{}", config.ip, grpc_port);
482        let router = std::sync::Arc::clone(server.router());
483        let broadcaster = std::sync::Arc::clone(&grpc_broadcaster);
484        let grpc_key_store = match &grpc_keys_file {
485            Some(path) => match futu_auth::KeyStore::load(path) {
486                Ok(ks) => {
487                    tracing::info!(
488                        path = %path.display(),
489                        keys_loaded = ks.len(),
490                        "gRPC keys file loaded (Bearer auth enabled)"
491                    );
492                    std::sync::Arc::new(ks)
493                }
494                Err(e) => {
495                    tracing::error!(error = %e, "failed to load gRPC keys file; continuing WITHOUT auth");
496                    std::sync::Arc::new(futu_auth::KeyStore::empty())
497                }
498            },
499            None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
500        };
501        tracing::info!(addr = %grpc_addr, "starting gRPC server (SubscribePush: streaming)");
502
503        // SIGHUP 热重载 gRPC keys.json(unix only)
504        #[cfg(unix)]
505        {
506            let ks = std::sync::Arc::clone(&grpc_key_store);
507            if ks.is_configured() {
508                tokio::spawn(async move {
509                    use tokio::signal::unix::{signal, SignalKind};
510                    let mut sig = match signal(SignalKind::hangup()) {
511                        Ok(s) => s,
512                        Err(e) => {
513                            tracing::error!(error = %e, "SIGHUP install failed (gRPC)");
514                            return;
515                        }
516                    };
517                    tracing::info!("gRPC keys: SIGHUP handler installed");
518                    while sig.recv().await.is_some() {
519                        match ks.reload() {
520                            Ok(()) => tracing::warn!(
521                                keys_loaded = ks.len(),
522                                "gRPC keys reloaded on SIGHUP"
523                            ),
524                            Err(e) => {
525                                tracing::error!(error = %e, "gRPC keys reload failed")
526                            }
527                        }
528                    }
529                });
530            }
531        }
532
533        let grpc_counters = std::sync::Arc::clone(&shared_counters);
534        Some(tokio::spawn(async move {
535            if let Err(e) = futu_grpc::server::start_with_auth(
536                &grpc_addr,
537                router,
538                broadcaster,
539                grpc_key_store,
540                grpc_counters,
541            )
542            .await
543            {
544                tracing::error!(error = %e, "gRPC server error");
545            }
546        }))
547    } else {
548        None
549    };
550
551    // 10. 启动 Telnet 管理服务(可选)
552    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
553    let telnet_handle = if let Some(telnet_port) = config.telnet_port {
554        let telnet_addr = format!("{}:{}", config.ip, telnet_port);
555        let telnet_server = futu_server::telnet::TelnetServer::new(
556            telnet_addr.clone(),
557            std::sync::Arc::clone(server.connections()),
558            Some(std::sync::Arc::clone(&bridge.subscriptions)),
559            Some(std::sync::Arc::clone(server.metrics())),
560            shutdown_tx,
561        );
562        tracing::info!(addr = %telnet_addr, "starting Telnet server");
563        Some(tokio::spawn(async move {
564            if let Err(e) = telnet_server.run().await {
565                tracing::error!(error = %e, "Telnet server error");
566            }
567        }))
568    } else {
569        None
570    };
571
572    tracing::info!("gateway ready, accepting connections on {listen_addr}");
573    tracing::info!("press Ctrl+C to exit");
574
575    // 11. 启动 API 服务 + 信号处理
576    tokio::select! {
577        result = server.run() => {
578            if let Err(e) = result {
579                tracing::error!(error = %e, "API server error");
580            }
581        }
582        _ = tokio::signal::ctrl_c() => {
583            tracing::info!("received Ctrl+C, shutting down gracefully...");
584        }
585        _ = async {
586            while shutdown_rx.changed().await.is_ok() {
587                if *shutdown_rx.borrow() {
588                    break;
589                }
590            }
591        } => {
592            tracing::info!("shutdown requested via telnet");
593        }
594    }
595
596    // 清理后台任务
597    if let Some(handle) = ws_handle {
598        handle.abort();
599    }
600    if let Some(handle) = rest_handle {
601        handle.abort();
602    }
603    if let Some(handle) = grpc_handle {
604        handle.abort();
605    }
606    if let Some(handle) = telnet_handle {
607        handle.abort();
608    }
609
610    tracing::info!("gateway stopped");
611    Ok(())
612}