Skip to main content

futu_opend/startup/
phase3.rs

1//! v1.4.110 Layer 3 A: startup Phase 3 — ApiServer 构造 / handler 注册 /
2//! 推送广播器 / push dispatcher 启动. 抽自原 `mod.rs::run_daemon` 477..531 行段.
3//!
4//! Phase 3 主要副作用 (按顺序):
5//! 1. 从 `bridge.caches.login_cache` 取 user_id 构造 `ServerConfig`
6//! 2. `ApiServer::new` + `set_metrics` + `set_subscriptions`
7//! 3. `install_prometheus_extension` (server.metrics() 暴露到 `/metrics`)
8//! 4. 调 3 个域 register fn (qot / trd / sys) 注册业务 handler
9//! 5. 创建 `WsBroadcaster` + `GrpcPushBroadcaster` (各容量 1024)
10//! 6. 如 push_receiver Some → `bridge.start_push_dispatcher` (ws + grpc sinks)
11
12#![allow(unused_imports)]
13
14use std::sync::Arc;
15
16use futu_gateway_core::bridge::{GatewayBridge, PushEvent};
17use futu_server::listener::{ApiServer, ServerConfig};
18
19use crate::config::RuntimeConfig;
20
21/// Phase 3 output — Phase 4 spawn 各 surface server 时需要这些资源.
22pub(super) struct Phase3Out {
23    pub(super) server: ApiServer,
24    pub(super) server_config: ServerConfig,
25    pub(super) ws_broadcaster: Arc<futu_rest::ws::WsBroadcaster>,
26    pub(super) grpc_broadcaster: Arc<futu_grpc::server::GrpcPushBroadcaster>,
27}
28
29pub(super) fn run_phase3(
30    config: &RuntimeConfig,
31    bridge: &Arc<GatewayBridge>,
32    listen_addr: &str,
33    push_receiver: Option<tokio::sync::mpsc::Receiver<PushEvent>>,
34) -> Phase3Out {
35    // 3. 创建 API 服务端
36    let user_id = bridge
37        .caches
38        .login_cache
39        .get_login_state()
40        .map(|s| s.user_id as u64)
41        .unwrap_or(0);
42
43    let server_config = ServerConfig {
44        listen_addr: listen_addr.to_string(),
45        server_ver: 1000,
46        login_user_id: user_id,
47        keepalive_interval: 10,
48        rsa_private_key: config.rsa_private_key.clone(),
49    };
50    if server_config.rsa_private_key.is_some() {
51        tracing::info!("RSA encryption enabled for InitConnect");
52    }
53    let mut server = ApiServer::new(server_config.clone());
54    server.set_metrics(std::sync::Arc::clone(&bridge.push_runtime.metrics));
55    server.set_subscriptions(std::sync::Arc::clone(&bridge.subscriptions));
56
57    // v1.4.90 P1-B: 把 GatewayMetrics 注册为 futu_auth Registry 的 extension
58    // renderer, 让 `/metrics` HTTP 端点暴露 per-cmd / per-hour push counter.
59    // 之前 v1.4.83/84 只在 telnet `show_metrics` 暴露, /metrics 仅 3 个 auth
60    // counter (cmd_6212_quote / cmd_14716_trade_new 等被双 tester 报告 missing).
61    futu_server::metrics::install_prometheus_extension(std::sync::Arc::clone(server.metrics()));
62
63    // 4. 注册业务处理器
64    //
65    // v1.4.110 P0-5 T15 P4 wire: 显式调 3 个域 register fn (替原
66    // `bridge.register_handlers(&server)` 反向调用). 3 crate split 后各域
67    // register fn 在各自 crate:
68    {
69        let router = server.router();
70        futu_gateway_qot::register_handlers(router, bridge);
71        futu_gateway_trd::register_handlers(router, bridge);
72        futu_gateway_core::handlers_sys::register_handlers(router, bridge);
73        tracing::info!("all business handlers registered");
74    }
75
76    // 5. 创建推送广播器 (REST WebSocket + gRPC)
77    let ws_broadcaster = std::sync::Arc::new(futu_rest::ws::WsBroadcaster::new(1024));
78    let grpc_broadcaster = std::sync::Arc::new(futu_grpc::server::GrpcPushBroadcaster::new(1024));
79
80    // 6. 启动推送分发 (push_callback → channel → PushDispatcher → 客户端 + WebSocket + gRPC)
81    //    PushDispatcher 内部通过 ExternalPushSink 自动转发所有推送,
82    //    包括行情、广播、交易推送 (UpdateOrder/UpdateOrderFill)
83    if let Some(push_rx) = push_receiver {
84        let sinks: Vec<std::sync::Arc<dyn futu_server::push::ExternalPushSink>> = vec![
85            std::sync::Arc::clone(&ws_broadcaster) as _,
86            std::sync::Arc::clone(&grpc_broadcaster) as _,
87        ];
88        bridge.start_push_dispatcher(&server, push_rx, sinks);
89        tracing::info!("push dispatcher started (with WebSocket + gRPC broadcast)");
90    }
91
92    Phase3Out {
93        server,
94        server_config,
95        ws_broadcaster,
96        grpc_broadcaster,
97    }
98}