1#![allow(unused_imports)]
9
10use anyhow::Result;
11use std::sync::Arc;
12
13use futu_gateway_core::bridge::GatewayBridge;
14use futu_server::ws_listener::{WsServer, WsServerDeps};
15
16use crate::config::RuntimeConfig;
17use crate::startup::phase1::Phase1Out;
18use crate::startup::phase3::Phase3Out;
19
20#[cfg(test)]
21mod tests;
22
23fn merge_push_health_snapshots_for_rest(
24 push: serde_json::Value,
25 qot_login: serde_json::Value,
26) -> serde_json::Value {
27 let mut combined = match push {
28 serde_json::Value::Object(map) => map,
29 other => {
30 let mut map = serde_json::Map::new();
31 map.insert(
32 "error".to_string(),
33 serde_json::Value::String(format!(
34 "push_health snapshot serialized to non-object {}",
35 json_value_kind(&other)
36 )),
37 );
38 map
39 }
40 };
41 combined.insert("qot_login_health".to_string(), qot_login);
42 serde_json::Value::Object(combined)
43}
44
45fn json_value_kind(value: &serde_json::Value) -> &'static str {
46 match value {
47 serde_json::Value::Null => "null",
48 serde_json::Value::Bool(_) => "bool",
49 serde_json::Value::Number(_) => "number",
50 serde_json::Value::String(_) => "string",
51 serde_json::Value::Array(_) => "array",
52 serde_json::Value::Object(_) => "object",
53 }
54}
55
56pub(super) async fn run_phase4(
57 config: &RuntimeConfig,
58 phase1: Phase1Out,
59 bridge: Arc<GatewayBridge>,
60 phase3: Phase3Out,
61) -> Result<()> {
62 let Phase1Out {
63 _audit_guard,
64 shared_counters,
65 listen_addr,
66 rest_keys_file,
67 ws_keys_file,
68 grpc_keys_file,
69 allow_tcp_unauthenticated,
70 } = phase1;
71 let Phase3Out {
72 server,
73 server_config,
74 ws_broadcaster,
75 grpc_broadcaster,
76 } = phase3;
77
78 let mut ws_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
82 let mut rest_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
83 let mut grpc_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
84
85 let ws_handle = if let Some(ws_port) = config.websocket_port {
87 let ws_addr = format!("{}:{}", config.ip, ws_port);
88 let ws_key_store = match &ws_keys_file {
96 Some(path) => match futu_auth::KeyStore::load(path) {
97 Ok(ks) => {
98 tracing::info!(
99 path = %path.display(),
100 keys_loaded = ks.len(),
101 "WS keys file loaded (Bearer/?token auth enabled)"
102 );
103 Some(std::sync::Arc::new(ks))
104 }
105 Err(e) => {
106 tracing::error!(
108 error = %e,
109 path = %path.display(),
110 "failed to load WS keys file (--ws-keys-file 明确指定 → fail-closed). \
111 daemon abort. fix the keys file then restart. \
112 不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
113 );
114 return Err(anyhow::anyhow!(
115 "failed to load WS keys file {}: {e}. \
116 --ws-keys-file 明确指定 → fail-closed (BUG-007).",
117 path.display()
118 ));
119 }
120 },
121 None => None,
122 };
123 let ws_counters = std::sync::Arc::clone(&shared_counters);
124 ws_key_store_holder = ws_key_store.as_ref().map(std::sync::Arc::clone);
126 let ws_server = WsServer::with_auth(
127 ws_addr.clone(),
128 server_config.clone(),
129 WsServerDeps::new(
130 std::sync::Arc::clone(server.connections()),
131 std::sync::Arc::clone(server.router()),
132 Some(std::sync::Arc::clone(&bridge.subscriptions)),
133 ),
134 ws_key_store,
135 Some(ws_counters),
136 );
137 tracing::info!(addr = %ws_addr, "starting WebSocket server");
138 Some(tokio::spawn(async move {
139 if let Err(e) = ws_server.run().await {
140 tracing::error!(error = %e, "WebSocket server error");
141 }
142 }))
143 } else {
144 None
145 };
146
147 let rest_handle = if let Some(rest_port) = config.rest_port {
149 let rest_addr = format!("{}:{}", config.ip, rest_port);
150 let router = std::sync::Arc::clone(server.router());
151 let broadcaster = std::sync::Arc::clone(&ws_broadcaster);
152 let rest_key_store = match &rest_keys_file {
154 Some(path) => match futu_auth::KeyStore::load(path) {
155 Ok(ks) => {
156 tracing::info!(
157 path = %path.display(),
158 keys_loaded = ks.len(),
159 "REST keys file loaded (Bearer auth enabled)"
160 );
161 std::sync::Arc::new(ks)
162 }
163 Err(e) => {
164 tracing::error!(
165 error = %e,
166 path = %path.display(),
167 "failed to load REST keys file (--rest-keys-file 明确指定 → fail-closed). \
168 daemon abort. fix the keys file then restart. \
169 不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
170 );
171 return Err(anyhow::anyhow!(
172 "failed to load REST keys file {}: {e}. \
173 --rest-keys-file 明确指定 → fail-closed (BUG-007).",
174 path.display()
175 ));
176 }
177 },
178 None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
179 };
180 tracing::info!(addr = %rest_addr, "starting REST API server (WebSocket: /ws)");
181
182 if rest_key_store.is_configured() {
186 rest_key_store_holder = Some(std::sync::Arc::clone(&rest_key_store));
187 }
188
189 let rest_counters = std::sync::Arc::clone(&shared_counters);
196 let bridge_for_status = std::sync::Arc::clone(&bridge);
200 let admin_status_provider: futu_rest::adapter::AdminStatusProvider =
201 std::sync::Arc::new(move || {
202 serde_json::to_value(bridge_for_status.snapshot_status())
203 .unwrap_or_else(|_| serde_json::json!({"error": "snapshot serialize failed"}))
204 });
205 let bridge_for_reload = std::sync::Arc::clone(&bridge);
213 let admin_reload_handler: futu_rest::adapter::AdminReloadHandler =
214 std::sync::Arc::new(move || {
215 let bridge = std::sync::Arc::clone(&bridge_for_reload);
216 Box::pin(async move {
217 serde_json::to_value(bridge.reload())
218 .unwrap_or_else(|_| serde_json::json!({"error": "reload serialize failed"}))
219 })
220 });
221 let bridge_for_push_health = std::sync::Arc::clone(&bridge);
228 let push_health_snapshot_provider: futu_rest::adapter::PushHealthSnapshotProvider =
229 std::sync::Arc::new(move || {
230 let push = serde_json::to_value(
231 bridge_for_push_health.push_runtime.push_health.snapshot(),
232 )
233 .unwrap_or_else(
234 |_| serde_json::json!({"error": "push_health snapshot serialize failed"}),
235 );
236 let qot_login = serde_json::to_value(
237 bridge_for_push_health
238 .push_runtime
239 .qot_login_health
240 .snapshot(),
241 )
242 .unwrap_or_else(
243 |_| serde_json::json!({"error": "qot_login_health snapshot serialize failed"}),
244 );
245 merge_push_health_snapshots_for_rest(push, qot_login)
246 });
247 let bridge_for_card_num = std::sync::Arc::clone(&bridge);
253 let card_num_resolver: futu_rest::adapter::CardNumResolver =
254 std::sync::Arc::new(move |cn: &str| {
255 bridge_for_card_num
256 .caches
257 .trd_cache
258 .find_acc_ids_by_card_num(cn)
259 });
260 Some(tokio::spawn(async move {
261 if let Err(e) = futu_rest::server::start_with_auth_full_admin(
262 &rest_addr,
263 router,
264 broadcaster,
265 rest_key_store,
266 rest_counters,
267 futu_rest::server::RestAdminHooks {
268 admin_status_provider: Some(admin_status_provider),
269 admin_reload_handler: Some(admin_reload_handler),
270 push_health_snapshot_provider: Some(push_health_snapshot_provider),
271 card_num_resolver: Some(card_num_resolver),
272 },
273 )
274 .await
275 {
276 tracing::error!(error = %e, "REST API server error");
277 }
278 }))
279 } else {
280 None
281 };
282
283 let grpc_handle = if let Some(grpc_port) = config.grpc_port {
285 let grpc_addr = format!("{}:{}", config.ip, grpc_port);
286 let router = std::sync::Arc::clone(server.router());
287 let broadcaster = std::sync::Arc::clone(&grpc_broadcaster);
288 let grpc_key_store = match &grpc_keys_file {
290 Some(path) => match futu_auth::KeyStore::load(path) {
291 Ok(ks) => {
292 tracing::info!(
293 path = %path.display(),
294 keys_loaded = ks.len(),
295 "gRPC keys file loaded (Bearer auth enabled)"
296 );
297 std::sync::Arc::new(ks)
298 }
299 Err(e) => {
300 tracing::error!(
301 error = %e,
302 path = %path.display(),
303 "failed to load gRPC keys file (--grpc-keys-file 明确指定 → fail-closed). \
304 daemon abort. fix the keys file then restart. \
305 不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
306 );
307 return Err(anyhow::anyhow!(
308 "failed to load gRPC keys file {}: {e}. \
309 --grpc-keys-file 明确指定 → fail-closed (BUG-007).",
310 path.display()
311 ));
312 }
313 },
314 None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
315 };
316 tracing::info!(addr = %grpc_addr, "starting gRPC server (SubscribePush: streaming)");
317
318 if grpc_key_store.is_configured() {
320 grpc_key_store_holder = Some(std::sync::Arc::clone(&grpc_key_store));
321 }
322
323 let grpc_counters = std::sync::Arc::clone(&shared_counters);
328 Some(tokio::spawn(async move {
329 if let Err(e) = futu_grpc::server::start_with_auth(
330 &grpc_addr,
331 router,
332 broadcaster,
333 grpc_key_store,
334 grpc_counters,
335 )
336 .await
337 {
338 tracing::error!(error = %e, "gRPC server error");
339 }
340 }))
341 } else {
342 None
343 };
344
345 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
347 let telnet_handle = if let Some(telnet_port) = config.telnet_port {
348 let telnet_addr = format!("{}:{}", config.ip, telnet_port);
349 let bridge_for_relogin = std::sync::Arc::clone(&bridge);
353 let relogin_fn: futu_server::telnet::ReloginFn = std::sync::Arc::new(move || {
354 tracing::warn!(
355 "v1.4.97 P1-D-F: telnet relogin clearing login_cache; \
356 next P1-D tick will trigger AuthRefresher relogin"
357 );
358 bridge_for_relogin.caches.login_cache.clear();
359 });
360 let telnet_server = futu_server::telnet::TelnetServer::new(
361 telnet_addr.clone(),
362 std::sync::Arc::clone(server.connections()),
363 Some(std::sync::Arc::clone(&bridge.subscriptions)),
364 Some(std::sync::Arc::clone(server.metrics())),
365 shutdown_tx,
366 )
367 .with_relogin_fn(relogin_fn);
368 tracing::info!(addr = %telnet_addr, "starting Telnet server");
369 Some(tokio::spawn(async move {
370 if let Err(e) = telnet_server.run().await {
371 tracing::error!(error = %e, "Telnet server error");
372 }
373 }))
374 } else {
375 None
376 };
377
378 tracing::info!("gateway ready, accepting connections on {listen_addr}");
379 tracing::info!("press Ctrl+C to exit");
380
381 let card_num_reload_and_expand_fn: std::sync::Arc<dyn Fn(bool) + Send + Sync> = {
399 let bridge_for_expand = std::sync::Arc::clone(&bridge);
400 let ws_ks = ws_key_store_holder.clone();
401 let rest_ks = rest_key_store_holder.clone();
402 let grpc_ks = grpc_key_store_holder.clone();
403 std::sync::Arc::new(move |do_reload: bool| {
404 if do_reload {
406 for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
407 let Some(ks) = ks_opt.as_ref() else { continue };
408 match ks.reload() {
409 Ok(()) => tracing::warn!(
410 ks = ks_name,
411 keys_loaded = ks.len(),
412 "v1.4.103 F3.1: keys reloaded on SIGHUP (before card_num expand)"
413 ),
414 Err(e) => tracing::error!(
415 ks = ks_name,
416 error = %e,
417 "v1.4.103 F3.1: keys reload failed (skipping expand for this store)"
418 ),
419 }
420 }
421 }
422 let trd_cache = std::sync::Arc::clone(&bridge_for_expand.caches.trd_cache);
424 let resolver = {
425 let cache_clone = std::sync::Arc::clone(&trd_cache);
426 move |cn: &str| cache_clone.find_acc_ids_by_card_num(cn)
427 };
428 for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
429 let Some(ks) = ks_opt.as_ref() else { continue };
430 let (resolved, unresolved, ambiguous) = ks.expand_allowed_card_nums(
431 &resolver,
432 |key_id, cn| {
433 tracing::warn!(
434 key_id = %key_id,
435 card_num = %cn,
436 "v1.4.103 B10/F1 fail-closed: card_num not found in trd_cache; \
437 writing sentinel acc_id=0 to enforce restrictive denylist \
438 (limits.contains check 永远 false → reject 真账户)"
439 );
440 },
441 |key_id, cn, candidates| {
442 tracing::warn!(
443 key_id = %key_id,
444 card_num = %cn,
445 candidates = ?candidates,
446 "v1.4.103 B10/F1 fail-closed: ambiguous card_num suffix \
447 matched multiple accounts (skipped, write 完整 16 位 / specific 4 位)"
448 );
449 },
450 );
451 tracing::info!(
452 ks = ks_name,
453 resolved,
454 unresolved,
455 ambiguous,
456 "v1.4.103 B10: expanded allowed_card_nums into allowed_acc_ids"
457 );
458 }
459 })
460 };
461 let card_num_expand_fn: std::sync::Arc<dyn Fn() + Send + Sync> = {
463 let inner = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
464 std::sync::Arc::new(move || (inner)(false))
465 };
466
467 (card_num_expand_fn)();
469
470 {
472 let card_num_expand_fn_loop = std::sync::Arc::clone(&card_num_expand_fn);
473 let bridge_for_check = std::sync::Arc::clone(&bridge);
474 tokio::spawn(async move {
475 let trd_cache = std::sync::Arc::clone(&bridge_for_check.caches.trd_cache);
476 let mut attempts = 0u32;
477 let max_attempts = 6u32; loop {
479 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
480 attempts += 1;
481 let accounts = trd_cache.get_accounts();
482 if accounts.is_empty() {
483 if attempts >= max_attempts {
484 tracing::warn!(
485 "v1.4.103 B10: trd_cache 仍空 (after {max_attempts} × 10s); \
486 受限 key 仍走 fail-closed sentinel reject 直到 SIGHUP / cache 加载."
487 );
488 return;
489 }
490 continue;
491 }
492 (card_num_expand_fn_loop)();
493 return;
494 }
495 });
496 }
497
498 #[cfg(unix)]
507 {
508 let unified_sighup_fn = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
509 tokio::spawn(async move {
510 use tokio::signal::unix::{SignalKind, signal};
511 let mut sig = match signal(SignalKind::hangup()) {
512 Ok(s) => s,
513 Err(e) => {
514 tracing::error!(error = %e, "SIGHUP install failed (unified reload+expand)");
515 return;
516 }
517 };
518 tracing::info!(
519 "v1.4.103 F3.1: unified SIGHUP handler installed (reload all keys + expand card_num)"
520 );
521 while sig.recv().await.is_some() {
522 tracing::info!(
523 "v1.4.103 F3.1: SIGHUP received — running reload_all_stores + \
524 expand_allowed_card_nums (single ordered op, no race)"
525 );
526 (unified_sighup_fn)(true); }
528 });
529 }
530
531 let any_keys_configured =
542 rest_keys_file.is_some() || grpc_keys_file.is_some() || ws_keys_file.is_some();
543 let tcp_disabled = any_keys_configured && !allow_tcp_unauthenticated;
544
545 if tcp_disabled {
546 tracing::warn!(
547 listen_addr = %listen_addr,
548 "v1.4.104 eli S-001 (P0) fix: TCP listener (port {}) NOT started — \
549 keys file configured but --allow-tcp-unauthenticated not set. \
550 native TCP FTAPI protocol has no Bearer field, cannot enforce \
551 caller-specific scope check; defaulting to fail-closed (skip TCP). \
552 Use REST/gRPC/WS endpoints for authenticated access. \
553 To restore TCP (legacy Python SDK clients) add --allow-tcp-unauthenticated, \
554 but be aware that port {} will accept ANY local connection without \
555 scope check (跨账户 leak risk).",
556 config.port,
557 config.port,
558 );
559 eprintln!(
560 "⚠️ TCP listener (port {}) DISABLED (v1.4.104 eli S-001 fix): \
561 keys file configured + no --allow-tcp-unauthenticated. \
562 Pass --allow-tcp-unauthenticated to restore (with security warning).",
563 config.port,
564 );
565 } else if any_keys_configured && allow_tcp_unauthenticated {
566 tracing::warn!(
567 listen_addr = %listen_addr,
568 "⚠️ v1.4.104: TCP listener running WITHOUT scope check despite keys configured \
569 (--allow-tcp-unauthenticated set). Port {} accepts ANY local connection — \
570 跨账户 leak risk. Use REST/gRPC/WS for authenticated clients; reserve \
571 TCP only for legacy Python SDK / C++ OpenD where Bearer not feasible.",
572 config.port,
573 );
574 eprintln!(
575 "⚠️ TCP port {} ACCEPTS UNAUTHENTICATED connections (--allow-tcp-unauthenticated). \
576 受限 keys 不在该 surface 强制. 推荐改用 REST/gRPC/WS.",
577 config.port,
578 );
579 }
580
581 tokio::select! {
583 result = async {
584 if tcp_disabled {
585 std::future::pending::<anyhow::Result<()>>().await
587 } else {
588 server.run().await
589 }
590 } => {
591 if let Err(e) = result {
592 tracing::error!(error = %e, "API server error");
593 }
594 }
595 _ = tokio::signal::ctrl_c() => {
596 tracing::info!("received Ctrl+C, shutting down gracefully...");
597 }
598 _ = async {
599 while shutdown_rx.changed().await.is_ok() {
600 if *shutdown_rx.borrow() {
601 break;
602 }
603 }
604 } => {
605 tracing::info!("shutdown requested via telnet");
606 }
607 }
608
609 if let Some(handle) = ws_handle {
611 handle.abort();
612 }
613 if let Some(handle) = rest_handle {
614 handle.abort();
615 }
616 if let Some(handle) = grpc_handle {
617 handle.abort();
618 }
619 if let Some(handle) = telnet_handle {
620 handle.abort();
621 }
622
623 tracing::info!("gateway stopped");
624 Ok(())
625}