Skip to main content

futu_opend/startup/
phase4.rs

1//! v1.4.110 Layer 3 A: startup Phase 4 — surface server (WS/REST/gRPC/Telnet)
2//! spawn / card_num expand / SIGHUP unified handler / TCP fail-closed gate /
3//! 主 `tokio::select!` 循环 + 清理. 抽自原 `mod.rs::run_daemon` 533..1075 行段.
4//!
5//! Phase 4 取走 `Phase1Out` 所有权 (保证 `_audit_guard` 直到 fn 返回才 drop),
6//! 同时取 `bridge: Arc<GatewayBridge>` 与 `Phase3Out` 启动各 surface server.
7
8#![allow(unused_imports)]
9
10use anyhow::Result;
11use std::sync::Arc;
12
13use futu_gateway_core::bridge::GatewayBridge;
14use futu_server::ws_listener::{WsServer, WsServerDeps};
15
16use crate::config::RuntimeConfig;
17use crate::startup::phase1::Phase1Out;
18use crate::startup::phase3::Phase3Out;
19
20#[cfg(test)]
21mod tests;
22
23fn merge_push_health_snapshots_for_rest(
24    push: serde_json::Value,
25    qot_login: serde_json::Value,
26) -> serde_json::Value {
27    let mut combined = match push {
28        serde_json::Value::Object(map) => map,
29        other => {
30            let mut map = serde_json::Map::new();
31            map.insert(
32                "error".to_string(),
33                serde_json::Value::String(format!(
34                    "push_health snapshot serialized to non-object {}",
35                    json_value_kind(&other)
36                )),
37            );
38            map
39        }
40    };
41    combined.insert("qot_login_health".to_string(), qot_login);
42    serde_json::Value::Object(combined)
43}
44
45fn json_value_kind(value: &serde_json::Value) -> &'static str {
46    match value {
47        serde_json::Value::Null => "null",
48        serde_json::Value::Bool(_) => "bool",
49        serde_json::Value::Number(_) => "number",
50        serde_json::Value::String(_) => "string",
51        serde_json::Value::Array(_) => "array",
52        serde_json::Value::Object(_) => "object",
53    }
54}
55
56pub(super) async fn run_phase4(
57    config: &RuntimeConfig,
58    phase1: Phase1Out,
59    bridge: Arc<GatewayBridge>,
60    phase3: Phase3Out,
61) -> Result<()> {
62    let Phase1Out {
63        _audit_guard,
64        shared_counters,
65        listen_addr,
66        rest_keys_file,
67        ws_keys_file,
68        grpc_keys_file,
69        allow_tcp_unauthenticated,
70    } = phase1;
71    let Phase3Out {
72        server,
73        server_config,
74        ws_broadcaster,
75        grpc_broadcaster,
76    } = phase3;
77
78    // v1.4.103 (B10): 把 3 个 server 的 key_store Option 提升到外层作用域,
79    // 让后面的 expand_allowed_card_nums spawn task 能跨 server 块捕获.
80    // 各 server 块仍各自加载 (允许独立 keys.json), 这里只共享 Arc clone.
81    let mut ws_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
82    let mut rest_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
83    let mut grpc_key_store_holder: Option<std::sync::Arc<futu_auth::KeyStore>> = None;
84
85    // 7. 启动 WebSocket 服务(可选)
86    let ws_handle = if let Some(ws_port) = config.websocket_port {
87        let ws_addr = format!("{}:{}", config.ip, ws_port);
88        // v1.0:WS 握手鉴权 —— 复用 REST 的 key store 设计(`--ws-keys-file` 独立
89        // 指定,不指定时 legacy 放行)
90        // v1.4.102 BUG-007 fix (P1, leaf v1.4.100 报告): keys-file load 失败
91        // 必须 fail-closed (abort daemon), 不再 silent fallback to legacy mode.
92        // 历史: 用户传 `--ws-keys-file` 表示明确意图启用 auth, 文件 typo / parse
93        // 错时 daemon 只 log error 然后继续无 auth → 用户以为有门锁, 实际门锁
94        // 没装上 (security misconfig 比纯 legacy 更危险).
95        let ws_key_store = match &ws_keys_file {
96            Some(path) => match futu_auth::KeyStore::load(path) {
97                Ok(ks) => {
98                    tracing::info!(
99                        path = %path.display(),
100                        keys_loaded = ks.len(),
101                        "WS keys file loaded (Bearer/?token auth enabled)"
102                    );
103                    Some(std::sync::Arc::new(ks))
104                }
105                Err(e) => {
106                    // v1.4.102 BUG-007: fail-closed
107                    tracing::error!(
108                        error = %e,
109                        path = %path.display(),
110                        "failed to load WS keys file (--ws-keys-file 明确指定 → fail-closed). \
111                         daemon abort. fix the keys file then restart. \
112                         不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
113                    );
114                    return Err(anyhow::anyhow!(
115                        "failed to load WS keys file {}: {e}. \
116                         --ws-keys-file 明确指定 → fail-closed (BUG-007).",
117                        path.display()
118                    ));
119                }
120            },
121            None => None,
122        };
123        let ws_counters = std::sync::Arc::clone(&shared_counters);
124        // v1.4.103 (B10): clone Arc 给外层 holder 持有, 同时让 ws_server 仍接管原 Arc.
125        ws_key_store_holder = ws_key_store.as_ref().map(std::sync::Arc::clone);
126        let ws_server = WsServer::with_auth(
127            ws_addr.clone(),
128            server_config.clone(),
129            WsServerDeps::new(
130                std::sync::Arc::clone(server.connections()),
131                std::sync::Arc::clone(server.router()),
132                Some(std::sync::Arc::clone(&bridge.subscriptions)),
133            ),
134            ws_key_store,
135            Some(ws_counters),
136        );
137        tracing::info!(addr = %ws_addr, "starting WebSocket server");
138        Some(tokio::spawn(async move {
139            if let Err(e) = ws_server.run().await {
140                tracing::error!(error = %e, "WebSocket server error");
141            }
142        }))
143    } else {
144        None
145    };
146
147    // 8. 启动 REST API 服务(可选,含 WebSocket 推送)
148    let rest_handle = if let Some(rest_port) = config.rest_port {
149        let rest_addr = format!("{}:{}", config.ip, rest_port);
150        let router = std::sync::Arc::clone(server.router());
151        let broadcaster = std::sync::Arc::clone(&ws_broadcaster);
152        // v1.4.102 BUG-007 fix: 同 WS 路径 (fail-closed when keys-file 显式指定).
153        let rest_key_store = match &rest_keys_file {
154            Some(path) => match futu_auth::KeyStore::load(path) {
155                Ok(ks) => {
156                    tracing::info!(
157                        path = %path.display(),
158                        keys_loaded = ks.len(),
159                        "REST keys file loaded (Bearer auth enabled)"
160                    );
161                    std::sync::Arc::new(ks)
162                }
163                Err(e) => {
164                    tracing::error!(
165                        error = %e,
166                        path = %path.display(),
167                        "failed to load REST keys file (--rest-keys-file 明确指定 → fail-closed). \
168                         daemon abort. fix the keys file then restart. \
169                         不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
170                    );
171                    return Err(anyhow::anyhow!(
172                        "failed to load REST keys file {}: {e}. \
173                         --rest-keys-file 明确指定 → fail-closed (BUG-007).",
174                        path.display()
175                    ));
176                }
177            },
178            None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
179        };
180        tracing::info!(addr = %rest_addr, "starting REST API server (WebSocket: /ws)");
181
182        // v1.4.103 (B10): clone Arc 给外层 holder 持有 (跨 server 块共享给
183        // expand_allowed_card_nums spawn task). 仅在 keys 实际配置时填充
184        // (empty store 不需要 expand).
185        if rest_key_store.is_configured() {
186            rest_key_store_holder = Some(std::sync::Arc::clone(&rest_key_store));
187        }
188
189        // v1.4.103 codex F3.1 (P1) round 3: REST 单独的 SIGHUP reload listener
190        // 已**移除** — 防与 card_num expand 间的 race (多 listener 并发顺序乱
191        // 可能让 reload 覆盖 expand 写入的 sentinel, 受限 key 在 reload 窗口
192        // 内 silent unrestricted). 现在 reload + expand 由文件末尾的 unified
193        // SIGHUP handler 顺序操作 (调 card_num_reload_and_expand_fn(true)).
194
195        let rest_counters = std::sync::Arc::clone(&shared_counters);
196        // v1.4.32+ admin snapshot provider:closure 捕获 `Arc<GatewayBridge>`
197        // 每次被 admin_status handler 调用时返回实时 StatusSnapshot JSON。
198        // bridge 已经在上面 Arc 化,这里只做一次 clone。
199        let bridge_for_status = std::sync::Arc::clone(&bridge);
200        let admin_status_provider: futu_rest::adapter::AdminStatusProvider =
201            std::sync::Arc::new(move || {
202                serde_json::to_value(bridge_for_status.snapshot_status())
203                    .unwrap_or_else(|_| serde_json::json!({"error": "snapshot serialize failed"}))
204            });
205        // v1.4.32+ admin reload handler:closure 调 Bridge::reload() 清 cipher cache
206        // v1.4.34: reload 升级为 async(内部刷 credentials 走网络 I/O),
207        // handler 返 Future,axum admin_reload async handler await 之
208        // v1.4.106 codex 0554 F3 [P2]: reload 拆两阶段后变 sync (sync clear +
209        // tokio::spawn refresh). closure 仍返 Future 类型不变 (向后兼容
210        // AdminReloadHandler API), 但内部不 await — sync 阶段已完成 + spawn
211        // 已派发, ReloadReport 立即可用.
212        let bridge_for_reload = std::sync::Arc::clone(&bridge);
213        let admin_reload_handler: futu_rest::adapter::AdminReloadHandler =
214            std::sync::Arc::new(move || {
215                let bridge = std::sync::Arc::clone(&bridge_for_reload);
216                Box::pin(async move {
217                    serde_json::to_value(bridge.reload())
218                        .unwrap_or_else(|_| serde_json::json!({"error": "reload serialize failed"}))
219                })
220            });
221        // v1.4.83 §9 Phase 2 F5: push health snapshot provider —— closure
222        // 捕获 bridge.push_runtime.push_health Arc, 每次
223        // `/api/push-subscriber-info` 调用时返当前真实 snapshot.
224        // v1.4.91 P1-D wiring: closure 同时捕获 bridge.push_runtime.qot_login_health,
225        // 在返 push_health 同时多带一个 qot_login_health 字段供 ops 看
226        // qot_logined self-heal counter (修 P1-D non-deterministic gap).
227        let bridge_for_push_health = std::sync::Arc::clone(&bridge);
228        let push_health_snapshot_provider: futu_rest::adapter::PushHealthSnapshotProvider =
229            std::sync::Arc::new(move || {
230                let push = serde_json::to_value(
231                    bridge_for_push_health.push_runtime.push_health.snapshot(),
232                )
233                .unwrap_or_else(
234                    |_| serde_json::json!({"error": "push_health snapshot serialize failed"}),
235                );
236                let qot_login = serde_json::to_value(
237                    bridge_for_push_health
238                        .push_runtime
239                        .qot_login_health
240                        .snapshot(),
241                )
242                .unwrap_or_else(
243                    |_| serde_json::json!({"error": "qot_login_health snapshot serialize failed"}),
244                );
245                merge_push_health_snapshots_for_rest(push, qot_login)
246            });
247        // v1.4.105 D12 (Phase 2): 注入 card_num resolver 让 REST trade
248        // handler (place_order / modify_order / cancel_all_order) 能解析
249        // user 传 `card_num` 字段 → acc_id (覆盖 c2s.header.acc_id).
250        // closure 捕获 bridge.caches.trd_cache, 调
251        // `find_acc_ids_by_card_num(input) -> Vec<u64>`.
252        let bridge_for_card_num = std::sync::Arc::clone(&bridge);
253        let card_num_resolver: futu_rest::adapter::CardNumResolver =
254            std::sync::Arc::new(move |cn: &str| {
255                bridge_for_card_num
256                    .caches
257                    .trd_cache
258                    .find_acc_ids_by_card_num(cn)
259            });
260        Some(tokio::spawn(async move {
261            if let Err(e) = futu_rest::server::start_with_auth_full_admin(
262                &rest_addr,
263                router,
264                broadcaster,
265                rest_key_store,
266                rest_counters,
267                futu_rest::server::RestAdminHooks {
268                    admin_status_provider: Some(admin_status_provider),
269                    admin_reload_handler: Some(admin_reload_handler),
270                    push_health_snapshot_provider: Some(push_health_snapshot_provider),
271                    card_num_resolver: Some(card_num_resolver),
272                },
273            )
274            .await
275            {
276                tracing::error!(error = %e, "REST API server error");
277            }
278        }))
279    } else {
280        None
281    };
282
283    // 9. 启动 gRPC 服务(可选,含流式推送)
284    let grpc_handle = if let Some(grpc_port) = config.grpc_port {
285        let grpc_addr = format!("{}:{}", config.ip, grpc_port);
286        let router = std::sync::Arc::clone(server.router());
287        let broadcaster = std::sync::Arc::clone(&grpc_broadcaster);
288        // v1.4.102 BUG-007 fix: 同 WS / REST 路径 (fail-closed when keys-file 显式指定).
289        let grpc_key_store = match &grpc_keys_file {
290            Some(path) => match futu_auth::KeyStore::load(path) {
291                Ok(ks) => {
292                    tracing::info!(
293                        path = %path.display(),
294                        keys_loaded = ks.len(),
295                        "gRPC keys file loaded (Bearer auth enabled)"
296                    );
297                    std::sync::Arc::new(ks)
298                }
299                Err(e) => {
300                    tracing::error!(
301                        error = %e,
302                        path = %path.display(),
303                        "failed to load gRPC keys file (--grpc-keys-file 明确指定 → fail-closed). \
304                         daemon abort. fix the keys file then restart. \
305                         不再 fallback to legacy unauth (v1.4.102 BUG-007 fix)."
306                    );
307                    return Err(anyhow::anyhow!(
308                        "failed to load gRPC keys file {}: {e}. \
309                         --grpc-keys-file 明确指定 → fail-closed (BUG-007).",
310                        path.display()
311                    ));
312                }
313            },
314            None => std::sync::Arc::new(futu_auth::KeyStore::empty()),
315        };
316        tracing::info!(addr = %grpc_addr, "starting gRPC server (SubscribePush: streaming)");
317
318        // v1.4.103 (B10): clone Arc 给外层 holder 持有.
319        if grpc_key_store.is_configured() {
320            grpc_key_store_holder = Some(std::sync::Arc::clone(&grpc_key_store));
321        }
322
323        // v1.4.103 codex F3.1 (P1) round 3: gRPC 单独的 SIGHUP reload listener
324        // 已**移除** — 同 REST 块, 由文件末尾的 unified SIGHUP handler 顺序
325        // reload + expand 防 race.
326
327        let grpc_counters = std::sync::Arc::clone(&shared_counters);
328        Some(tokio::spawn(async move {
329            if let Err(e) = futu_grpc::server::start_with_auth(
330                &grpc_addr,
331                router,
332                broadcaster,
333                grpc_key_store,
334                grpc_counters,
335            )
336            .await
337            {
338                tracing::error!(error = %e, "gRPC server error");
339            }
340        }))
341    } else {
342        None
343    };
344
345    // 10. 启动 Telnet 管理服务(可选)
346    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
347    let telnet_handle = if let Some(telnet_port) = config.telnet_port {
348        let telnet_addr = format!("{}:{}", config.ip, telnet_port);
349        // v1.4.97 P1-D-F: relogin callback closes over bridge to clear
350        // login_cache; next 30s P1-D health tick triggers relogin.
351        // Aligns with C++ GTWCmd_ReLogin (FTGateway/FTGTW_Define_Key.h:5).
352        let bridge_for_relogin = std::sync::Arc::clone(&bridge);
353        let relogin_fn: futu_server::telnet::ReloginFn = std::sync::Arc::new(move || {
354            tracing::warn!(
355                "v1.4.97 P1-D-F: telnet relogin clearing login_cache; \
356                     next P1-D tick will trigger AuthRefresher relogin"
357            );
358            bridge_for_relogin.caches.login_cache.clear();
359        });
360        let telnet_server = futu_server::telnet::TelnetServer::new(
361            telnet_addr.clone(),
362            std::sync::Arc::clone(server.connections()),
363            Some(std::sync::Arc::clone(&bridge.subscriptions)),
364            Some(std::sync::Arc::clone(server.metrics())),
365            shutdown_tx,
366        )
367        .with_relogin_fn(relogin_fn);
368        tracing::info!(addr = %telnet_addr, "starting Telnet server");
369        Some(tokio::spawn(async move {
370            if let Err(e) = telnet_server.run().await {
371                tracing::error!(error = %e, "Telnet server error");
372            }
373        }))
374    } else {
375        None
376    };
377
378    tracing::info!("gateway ready, accepting connections on {listen_addr}");
379    tracing::info!("press Ctrl+C to exit");
380
381    // v1.4.103 (B10) + codex F2 (P1): 立即跑首次 expand + background retry
382    // + SIGHUP reload 钩子 (fail-closed sentinel 即时生效, 无 startup window).
383    //
384    // 行为:
385    // 1. **立即跑首次 expand** — 即便 cache 空, fail-closed sentinel (codex F1)
386    //    也写进 allowed_acc_ids, 短路 startup window 的 silent unrestricted.
387    // 2. **background retry**: 每 10s 检查 cache, 加载后 re-expand 真 acc_id
388    //    覆盖 sentinel. 60s 上限.
389    // 3. **SIGHUP reload 钩子**: 重载 keys.json 后再 expand 一次 (codex F2 reload window).
390    // v1.4.103 codex F3.1 (P1) round 3: 合并 reload + expand 为单一 ordered op,
391    // 防 race. 当 SIGHUP 触发时, 此 fn 同步顺序: (1) reload 所有 store (从
392    // keys.json 读 raw allowed_card_nums + ArcSwap.store) (2) expand (resolve
393    // card_num → acc_ids + ArcSwap.store with sentinel/resolved). 因为是单线
394    // 调用, 不存在多 SIGHUP listener 间的乱序 race.
395    //
396    // `do_reload` 控制是否先 reload (启动时第一次 / 60s retry 不需 reload,
397    // 直接 expand; SIGHUP 路径需要先 reload).
398    let card_num_reload_and_expand_fn: std::sync::Arc<dyn Fn(bool) + Send + Sync> = {
399        let bridge_for_expand = std::sync::Arc::clone(&bridge);
400        let ws_ks = ws_key_store_holder.clone();
401        let rest_ks = rest_key_store_holder.clone();
402        let grpc_ks = grpc_key_store_holder.clone();
403        std::sync::Arc::new(move |do_reload: bool| {
404            // (1) Reload phase — 仅 SIGHUP 路径调
405            if do_reload {
406                for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
407                    let Some(ks) = ks_opt.as_ref() else { continue };
408                    match ks.reload() {
409                        Ok(()) => tracing::warn!(
410                            ks = ks_name,
411                            keys_loaded = ks.len(),
412                            "v1.4.103 F3.1: keys reloaded on SIGHUP (before card_num expand)"
413                        ),
414                        Err(e) => tracing::error!(
415                            ks = ks_name,
416                            error = %e,
417                            "v1.4.103 F3.1: keys reload failed (skipping expand for this store)"
418                        ),
419                    }
420                }
421            }
422            // (2) Expand phase
423            let trd_cache = std::sync::Arc::clone(&bridge_for_expand.caches.trd_cache);
424            let resolver = {
425                let cache_clone = std::sync::Arc::clone(&trd_cache);
426                move |cn: &str| cache_clone.find_acc_ids_by_card_num(cn)
427            };
428            for (ks_name, ks_opt) in [("ws", &ws_ks), ("rest", &rest_ks), ("grpc", &grpc_ks)] {
429                let Some(ks) = ks_opt.as_ref() else { continue };
430                let (resolved, unresolved, ambiguous) = ks.expand_allowed_card_nums(
431                    &resolver,
432                    |key_id, cn| {
433                        tracing::warn!(
434                            key_id = %key_id,
435                            card_num = %cn,
436                            "v1.4.103 B10/F1 fail-closed: card_num not found in trd_cache; \
437                             writing sentinel acc_id=0 to enforce restrictive denylist \
438                             (limits.contains check 永远 false → reject 真账户)"
439                        );
440                    },
441                    |key_id, cn, candidates| {
442                        tracing::warn!(
443                            key_id = %key_id,
444                            card_num = %cn,
445                            candidates = ?candidates,
446                            "v1.4.103 B10/F1 fail-closed: ambiguous card_num suffix \
447                             matched multiple accounts (skipped, write 完整 16 位 / specific 4 位)"
448                        );
449                    },
450                );
451                tracing::info!(
452                    ks = ks_name,
453                    resolved,
454                    unresolved,
455                    ambiguous,
456                    "v1.4.103 B10: expanded allowed_card_nums into allowed_acc_ids"
457                );
458            }
459        })
460    };
461    // 老 alias (不 reload, 仅 expand) 用于启动 + retry 路径
462    let card_num_expand_fn: std::sync::Arc<dyn Fn() + Send + Sync> = {
463        let inner = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
464        std::sync::Arc::new(move || (inner)(false))
465    };
466
467    // codex F2 (P1) 立即跑首次: fail-closed sentinel 即时生效, 不留 startup window
468    (card_num_expand_fn)();
469
470    // background retry loop: cache 加载后 re-expand 覆盖 sentinel
471    {
472        let card_num_expand_fn_loop = std::sync::Arc::clone(&card_num_expand_fn);
473        let bridge_for_check = std::sync::Arc::clone(&bridge);
474        tokio::spawn(async move {
475            let trd_cache = std::sync::Arc::clone(&bridge_for_check.caches.trd_cache);
476            let mut attempts = 0u32;
477            let max_attempts = 6u32; // 6 × 10s = 60s
478            loop {
479                tokio::time::sleep(std::time::Duration::from_secs(10)).await;
480                attempts += 1;
481                let accounts = trd_cache.get_accounts();
482                if accounts.is_empty() {
483                    if attempts >= max_attempts {
484                        tracing::warn!(
485                            "v1.4.103 B10: trd_cache 仍空 (after {max_attempts} × 10s); \
486                             受限 key 仍走 fail-closed sentinel reject 直到 SIGHUP / cache 加载."
487                        );
488                        return;
489                    }
490                    continue;
491                }
492                (card_num_expand_fn_loop)();
493                return;
494            }
495        });
496    }
497
498    // codex F2 (P1) + F3.1 (P1) round 3: 单一 SIGHUP 钩子 — reload + expand
499    // 顺序操作避免 race. 之前每 server (REST/gRPC) 各自 SIGHUP 监听 reload,
500    // 加上 card_num expand 单独监听 — 多 SIGHUP listener 并发执行无序, 可能
501    // expand 先跑 (写 sentinel) 然后 reload 后跑 (overwrite sentinel 用 raw
502    // allowed_card_nums) → 受限 key 在 reload window 内 silent unrestricted.
503    //
504    // 现在: **唯一 SIGHUP listener**, 顺序 reload → expand. 删除 REST/gRPC 各
505    // 自的 SIGHUP reload listener (上面 server 块内已注释).
506    #[cfg(unix)]
507    {
508        let unified_sighup_fn = std::sync::Arc::clone(&card_num_reload_and_expand_fn);
509        tokio::spawn(async move {
510            use tokio::signal::unix::{SignalKind, signal};
511            let mut sig = match signal(SignalKind::hangup()) {
512                Ok(s) => s,
513                Err(e) => {
514                    tracing::error!(error = %e, "SIGHUP install failed (unified reload+expand)");
515                    return;
516                }
517            };
518            tracing::info!(
519                "v1.4.103 F3.1: unified SIGHUP handler installed (reload all keys + expand card_num)"
520            );
521            while sig.recv().await.is_some() {
522                tracing::info!(
523                    "v1.4.103 F3.1: SIGHUP received — running reload_all_stores + \
524                     expand_allowed_card_nums (single ordered op, no race)"
525                );
526                (unified_sighup_fn)(true); // do_reload=true
527            }
528        });
529    }
530
531    // 11. v1.4.104 eli S-001 (P0): native TCP keystore guard.
532    //
533    // 配任一 keys file (rest / grpc / ws) → 用户**意图**启用 scope mode.
534    // 但 TCP FTAPI 协议 InitConnect 没有 Bearer 字段, 无法做 caller-specific
535    // scope check (S-001). 默认 fail-closed: 不启 TCP listener, 用户需要
536    // 通过 REST/gRPC/WS 与 daemon 交互 (这些 surface 都已加 pipeline auth).
537    //
538    // 显式 `--allow-tcp-unauthenticated` opt-in → 启 TCP, 但 daemon 启动
539    // loud warn 该端口完全无 auth, agent skill 等 local process 可任意调用.
540    // 用之前 captured 的 local 变量, args 已被 merge_config 消费
541    let any_keys_configured =
542        rest_keys_file.is_some() || grpc_keys_file.is_some() || ws_keys_file.is_some();
543    let tcp_disabled = any_keys_configured && !allow_tcp_unauthenticated;
544
545    if tcp_disabled {
546        tracing::warn!(
547            listen_addr = %listen_addr,
548            "v1.4.104 eli S-001 (P0) fix: TCP listener (port {}) NOT started — \
549             keys file configured but --allow-tcp-unauthenticated not set. \
550             native TCP FTAPI protocol has no Bearer field, cannot enforce \
551             caller-specific scope check; defaulting to fail-closed (skip TCP). \
552             Use REST/gRPC/WS endpoints for authenticated access. \
553             To restore TCP (legacy Python SDK clients) add --allow-tcp-unauthenticated, \
554             but be aware that port {} will accept ANY local connection without \
555             scope check (跨账户 leak risk).",
556            config.port,
557            config.port,
558        );
559        eprintln!(
560            "⚠️  TCP listener (port {}) DISABLED (v1.4.104 eli S-001 fix): \
561             keys file configured + no --allow-tcp-unauthenticated. \
562             Pass --allow-tcp-unauthenticated to restore (with security warning).",
563            config.port,
564        );
565    } else if any_keys_configured && allow_tcp_unauthenticated {
566        tracing::warn!(
567            listen_addr = %listen_addr,
568            "⚠️  v1.4.104: TCP listener running WITHOUT scope check despite keys configured \
569             (--allow-tcp-unauthenticated set). Port {} accepts ANY local connection — \
570             跨账户 leak risk. Use REST/gRPC/WS for authenticated clients; reserve \
571             TCP only for legacy Python SDK / C++ OpenD where Bearer not feasible.",
572            config.port,
573        );
574        eprintln!(
575            "⚠️  TCP port {} ACCEPTS UNAUTHENTICATED connections (--allow-tcp-unauthenticated). \
576             受限 keys 不在该 surface 强制. 推荐改用 REST/gRPC/WS.",
577            config.port,
578        );
579    }
580
581    // 11. 启动 API 服务 + 信号处理
582    tokio::select! {
583        result = async {
584            if tcp_disabled {
585                // TCP 不跑, 但仍要等 ctrl_c / telnet shutdown
586                std::future::pending::<anyhow::Result<()>>().await
587            } else {
588                server.run().await
589            }
590        } => {
591            if let Err(e) = result {
592                tracing::error!(error = %e, "API server error");
593            }
594        }
595        _ = tokio::signal::ctrl_c() => {
596            tracing::info!("received Ctrl+C, shutting down gracefully...");
597        }
598        _ = async {
599            while shutdown_rx.changed().await.is_ok() {
600                if *shutdown_rx.borrow() {
601                    break;
602                }
603            }
604        } => {
605            tracing::info!("shutdown requested via telnet");
606        }
607    }
608
609    // 清理后台任务
610    if let Some(handle) = ws_handle {
611        handle.abort();
612    }
613    if let Some(handle) = rest_handle {
614        handle.abort();
615    }
616    if let Some(handle) = grpc_handle {
617        handle.abort();
618    }
619    if let Some(handle) = telnet_handle {
620        handle.abort();
621    }
622
623    tracing::info!("gateway stopped");
624    Ok(())
625}