futu_opend/startup/
phase3.rs1#![allow(unused_imports)]
13
14use std::sync::Arc;
15
16use futu_gateway_core::bridge::{GatewayBridge, PushEvent};
17use futu_server::listener::{ApiServer, ServerConfig};
18
19use crate::config::RuntimeConfig;
20
21pub(super) struct Phase3Out {
23 pub(super) server: ApiServer,
24 pub(super) server_config: ServerConfig,
25 pub(super) ws_broadcaster: Arc<futu_rest::ws::WsBroadcaster>,
26 pub(super) grpc_broadcaster: Arc<futu_grpc::server::GrpcPushBroadcaster>,
27}
28
29pub(super) fn run_phase3(
30 config: &RuntimeConfig,
31 bridge: &Arc<GatewayBridge>,
32 listen_addr: &str,
33 push_receiver: Option<tokio::sync::mpsc::Receiver<PushEvent>>,
34) -> Phase3Out {
35 let user_id = bridge
37 .caches
38 .login_cache
39 .get_login_state()
40 .map(|s| s.user_id as u64)
41 .unwrap_or(0);
42
43 let server_config = ServerConfig {
44 listen_addr: listen_addr.to_string(),
45 server_ver: 1000,
46 login_user_id: user_id,
47 keepalive_interval: 10,
48 rsa_private_key: config.rsa_private_key.clone(),
49 };
50 if server_config.rsa_private_key.is_some() {
51 tracing::info!("RSA encryption enabled for InitConnect");
52 }
53 let mut server = ApiServer::new(server_config.clone());
54 server.set_metrics(std::sync::Arc::clone(&bridge.push_runtime.metrics));
55 server.set_subscriptions(std::sync::Arc::clone(&bridge.subscriptions));
56
57 futu_server::metrics::install_prometheus_extension(std::sync::Arc::clone(server.metrics()));
62
63 {
69 let router = server.router();
70 futu_gateway_qot::register_handlers(router, bridge);
71 futu_gateway_trd::register_handlers(router, bridge);
72 futu_gateway_core::handlers_sys::register_handlers(router, bridge);
73 tracing::info!("all business handlers registered");
74 }
75
76 let ws_broadcaster = std::sync::Arc::new(futu_rest::ws::WsBroadcaster::new(1024));
78 let grpc_broadcaster = std::sync::Arc::new(futu_grpc::server::GrpcPushBroadcaster::new(1024));
79
80 if let Some(push_rx) = push_receiver {
84 let sinks: Vec<std::sync::Arc<dyn futu_server::push::ExternalPushSink>> = vec![
85 std::sync::Arc::clone(&ws_broadcaster) as _,
86 std::sync::Arc::clone(&grpc_broadcaster) as _,
87 ];
88 bridge.start_push_dispatcher(&server, push_rx, sinks);
89 tracing::info!("push dispatcher started (with WebSocket + gRPC broadcast)");
90 }
91
92 Phase3Out {
93 server,
94 server_config,
95 ws_broadcaster,
96 grpc_broadcaster,
97 }
98}