Skip to main content

futu_server/
ws_listener.rs

1// WebSocket 监听器:接受 WebSocket 连接,复用 TCP 的请求路由和连接池
2//
3// 每个 WebSocket 二进制消息 = 一个完整的 FutuAPI 帧(44 字节帧头 + body)。
4// 与 TCP 共享同一个 connections DashMap、RequestRouter、SubscriptionManager。
5//
6// ## v1.0 鉴权
7//
8// 握手阶段(accept_hdr_async)校验 HTTP `Authorization: Bearer <token>` 或
9// `?token=<plaintext>` query —— 通过 `KeyStore::verify` 得到 `KeyRecord`,
10// 把 scope 集合和 key_id 存到 `ClientConn`。每条消息进 `ws_process_requests`
11// 时按 `futu_auth::scope_for_proto_id(proto_id)` 查所需 scope,不匹配 → 不 dispatch、
12// 记 audit reject。`trade:real` 额外跑 `check_and_commit` 过一道 rate + hours
13// 全局闸门。未注入 KeyStore(TCP listener 或 legacy 模式)→ scopes 空集被
14// 解释为"全放行",保持向后兼容。
15//
16// ## v1.4.104 阶段 3
17//
18// 把 inline scope gate / rate gate / body-aware acc_id check 替换为
19// `futu_auth_pipeline::authenticate_request` 单一调用. 流程:
20//   1. AES decrypt (handshake protocol INIT_CONNECT 跳过)
21//   2. (非 INIT_CONNECT + 非 1xxx 系统协议) → 调 pipeline
22//      Credential::PreVerified(rec_from_get_by_id) 拿 SIGHUP-aware 最新 rec.
23//      Reject → drop. Allow → 取 allowed_acc_ids 给 response filter.
24//   3. dispatch (router / handle_init_connect / handle_keepalive)
25//   4. response filter (proto 2001 TRD_GET_ACC_LIST 等)
26//
27// 删除: 旧 `ws_body_aware_check` (功能进 pipeline body_aware::build_check_ctxs).
28// 删除: 旧 inline scope gate + rate gate + audit allow/reject. 全 pipeline 一处.
29
30use std::collections::{HashMap, HashSet};
31use std::sync::Arc;
32use std::time::Instant;
33
34use bytes::BytesMut;
35use chrono::Utc;
36use dashmap::DashMap;
37use futures::{SinkExt, StreamExt};
38use tokio::net::TcpListener;
39use tokio::sync::mpsc;
40use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
41use tokio_tungstenite::tungstenite::http::StatusCode;
42use tokio_tungstenite::tungstenite::protocol::Message;
43
44use futu_auth::{KeyRecord, KeyStore, RuntimeCounters, Scope};
45use futu_auth_pipeline::{
46    AuthDecision, AuthEnvelope, Credential, Endpoint, FilterRegistry, RejectKind, SurfaceId,
47    authenticate_request,
48};
49
50/// v1.4.106 D1 5c: WS surface adapter — `AuthDecision::Reject` 翻成 silent drop.
51///
52/// **历史**: WS 在 v1.4.103 / v1.4.104 阶段 3 都按 silent drop 处理 reject
53/// (与 v1.4.103 行为一致, 防 timing 探测 — 给客户端任何 wire response 都让
54/// 它知道帧被读了 vs 没读). v1.4.106 D1 把这层"翻译为 unit"也走 trait, 让 4
55/// surface SurfaceAdapter 一致.
56///
57/// **WireResponse = ()**: WS 不发任何东西回 client; 调用方拿 `Option<()>`
58/// 知道是 reject (Some) 还是 allow (None) 即可继续 dispatch / 丢弃.
59///
60/// **不变量**: `translate_reject` 不能写日志 (pipeline 已 audit::reject 一次,
61/// 不要重复). 只 drain reason / kind 让它进 `_`.
62pub struct WsAdapter;
63
64impl futu_auth_pipeline::SurfaceAdapter for WsAdapter {
65    type WireResponse = ();
66
67    fn surface_id() -> SurfaceId {
68        SurfaceId::Ws
69    }
70
71    fn translate_reject(kind: RejectKind, reason: String) -> Self::WireResponse {
72        // 与 v1.4.103/104 行为一致: silent drop. pipeline 已 audit reject,
73        // 不再写 log; reason/kind drained 防 client 探测 daemon 状态.
74        let _ = (kind, reason);
75    }
76}
77use futu_codec::frame::FutuFrame;
78use futu_codec::header::{FutuHeader, HEADER_SIZE, ProtoFmtType};
79use futu_core::proto_id;
80
81use crate::conn::{ClientConn, ConnState, DisconnectNotify, IncomingRequest};
82use crate::listener::{MAX_CONNECTIONS, ServerConfig};
83use crate::router::RequestRouter;
84
85/// WebSocket 服务端
86pub struct WsServer {
87    listen_addr: String,
88    config: ServerConfig,
89    connections: Arc<DashMap<u64, ClientConn>>,
90    router: Arc<RequestRouter>,
91    subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
92    /// v1.0:握手时做 Bearer token 鉴权。None 或 `!is_configured()` → legacy 模式放行
93    key_store: Option<Arc<KeyStore>>,
94    /// v1.0:跨 REST / gRPC / WS 共享的限额 counters
95    counters: Option<Arc<RuntimeCounters>>,
96    /// v1.4.104 阶段 3: 跨 surface 共享的 response filter registry (proto 2001
97    /// TRD_GET_ACC_LIST 默认装入). None 时 fallback 到内置 with_defaults.
98    filter_registry: Option<Arc<FilterRegistry>>,
99}
100
101/// Shared dependencies for the WebSocket server.
102///
103/// These are the same runtime objects the raw TCP server owns. Keeping them in a
104/// bundle avoids every constructor carrying a growing positional list as auth /
105/// counters / filters evolve.
106pub struct WsServerDeps {
107    connections: Arc<DashMap<u64, ClientConn>>,
108    router: Arc<RequestRouter>,
109    subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
110}
111
112impl WsServerDeps {
113    pub fn new(
114        connections: Arc<DashMap<u64, ClientConn>>,
115        router: Arc<RequestRouter>,
116        subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
117    ) -> Self {
118        Self {
119            connections,
120            router,
121            subscriptions,
122        }
123    }
124}
125
126impl WsServer {
127    /// 创建 WsServer,共享 TCP 的连接池、路由器、订阅管理器(无鉴权,向后兼容)
128    pub fn new(
129        listen_addr: String,
130        config: ServerConfig,
131        connections: Arc<DashMap<u64, ClientConn>>,
132        router: Arc<RequestRouter>,
133        subscriptions: Option<Arc<crate::subscription::SubscriptionManager>>,
134    ) -> Self {
135        Self::with_auth(
136            listen_addr,
137            config,
138            WsServerDeps::new(connections, router, subscriptions),
139            None,
140            None,
141        )
142    }
143
144    /// v1.0 入口:同时接入 KeyStore + 共享 RuntimeCounters 做握手鉴权和 per-message
145    /// scope / 限额检查。`key_store = None` 或未配置时保持 legacy(全放行)。
146    pub fn with_auth(
147        listen_addr: String,
148        config: ServerConfig,
149        deps: WsServerDeps,
150        key_store: Option<Arc<KeyStore>>,
151        counters: Option<Arc<RuntimeCounters>>,
152    ) -> Self {
153        Self {
154            listen_addr,
155            config,
156            connections: deps.connections,
157            router: deps.router,
158            subscriptions: deps.subscriptions,
159            key_store,
160            counters,
161            filter_registry: None,
162        }
163    }
164
165    /// v1.4.104 阶段 3: 注入显式 FilterRegistry (跨 surface 共享同一份).
166    /// 不调用此 setter 则 run() 时 fallback 到 `FilterRegistry::with_defaults()`.
167    pub fn with_filter_registry(mut self, registry: Arc<FilterRegistry>) -> Self {
168        self.filter_registry = Some(registry);
169        self
170    }
171
172    /// 启动 WebSocket 服务端监听
173    pub async fn run(&self) -> anyhow::Result<()> {
174        let listener = TcpListener::bind(&self.listen_addr).await?;
175        tracing::info!(addr = %self.listen_addr, "WebSocket server listening");
176
177        let (req_tx, req_rx) = mpsc::unbounded_channel::<IncomingRequest>();
178        let (disconnect_tx, mut disconnect_rx) = mpsc::unbounded_channel::<DisconnectNotify>();
179
180        // 启动请求处理任务(与 TCP 共享同一逻辑)
181        let connections = Arc::clone(&self.connections);
182        let router = Arc::clone(&self.router);
183        let config = self.config.clone();
184        // v1.4.104 阶段 3: KeyStore + RuntimeCounters 总是材料化 (legacy mode 用
185        // empty / new()), 让 ws_process_requests 拿 non-Option Arc 直接调 pipeline.
186        // 行为与 v1.4.103 等价: KeyStore::empty().is_configured() = false, pipeline
187        // 走 legacy short-circuit (Allow{rec:None} 不 audit, body-aware 不 enforce).
188        let key_store_for_process = self
189            .key_store
190            .clone()
191            .unwrap_or_else(|| Arc::new(KeyStore::empty()));
192        let counters_for_process = self
193            .counters
194            .clone()
195            .unwrap_or_else(|| Arc::new(RuntimeCounters::new()));
196        let filter_registry_for_process = self
197            .filter_registry
198            .clone()
199            .unwrap_or_else(|| Arc::new(FilterRegistry::with_defaults()));
200        tokio::spawn(async move {
201            ws_process_requests(
202                req_rx,
203                connections,
204                router,
205                config,
206                counters_for_process,
207                key_store_for_process,
208                filter_registry_for_process,
209            )
210            .await;
211        });
212
213        // 启动连接清理任务
214        let cleanup_connections = Arc::clone(&self.connections);
215        let cleanup_subs = self.subscriptions.clone();
216        tokio::spawn(async move {
217            while let Some(notify) = disconnect_rx.recv().await {
218                let removed = cleanup_connections.remove(&notify.conn_id);
219                if removed.is_some() {
220                    if let Some(ref subs) = cleanup_subs {
221                        subs.on_disconnect(notify.conn_id);
222                    }
223                    tracing::info!(
224                        conn_id = notify.conn_id,
225                        remaining = cleanup_connections.len(),
226                        "ws connection removed from pool"
227                    );
228                }
229            }
230        });
231
232        // 接受连接循环
233        let connections = Arc::clone(&self.connections);
234        let key_store_accept = self.key_store.clone();
235        // v1.4.104 阶段 3: scope_mode 局部计算 (KeyStore configured = 启用 auth).
236        let scope_mode = self.key_store.as_ref().is_some_and(|ks| ks.is_configured());
237        if !scope_mode {
238            // v1.4.93 P0-5 (NEW-C-02): 加强 legacy mode loud WARN —
239            // 对齐 REST mutating-blocked policy 的 startup signal。任何
240            // 未授权客户端都可 handshake + 接收 push(HTTP 101 OK)。本版
241            // **不 reject**(保持向后兼容),未来 v2 默认 reject。
242            tracing::warn!("{}", legacy_mode_warn_tracing_message());
243            eprintln!("{}", legacy_mode_warn_stderr_message());
244        }
245        loop {
246            let (stream, peer_addr) = listener.accept().await?;
247
248            if connections.len() >= MAX_CONNECTIONS {
249                tracing::warn!(
250                    peer = %peer_addr,
251                    "max connections reached ({}), rejecting ws client",
252                    MAX_CONNECTIONS,
253                );
254                drop(stream);
255                continue;
256            }
257
258            let conn_id = ClientConn::generate_conn_id();
259            let aes_key = ClientConn::generate_aes_key();
260            stream.set_nodelay(true).ok();
261
262            tracing::info!(
263                conn_id = conn_id,
264                peer = %peer_addr,
265                total = connections.len() + 1,
266                "ws client connected"
267            );
268
269            let (tx, authed) = run_ws_connection(
270                stream,
271                conn_id,
272                aes_key,
273                req_tx.clone(),
274                disconnect_tx.clone(),
275                key_store_accept.clone(),
276            )
277            .await;
278
279            // 握手鉴权失败 → run_ws_connection 已经 drop 连接;这里什么都不做
280            let Some(authed) = authed else {
281                continue;
282            };
283
284            let (key_id, scopes, allowed_markets, allowed_acc_ids) = match authed {
285                AuthResult::Authenticated(rec) => (
286                    Some(rec.id.clone()),
287                    rec.scopes.clone(),
288                    // v1.4.105 D3 (Phase 4) T-B2: 拷贝 caller key 的 allowed_markets
289                    // 到 ClientConn 让 PushDispatcher::push_trd_acc Layer 3 用.
290                    rec.allowed_markets
291                        .as_ref()
292                        .map(|s| std::sync::Arc::new(s.clone())),
293                    // codex round 1 F4 (P2) v1.4.105: 拷贝 caller key 的
294                    // allowed_acc_ids 到 ClientConn 让 PushDispatcher::push_trd_acc
295                    // Layer 1 push-time 硬过滤. 防 stale subscription /
296                    // KeyRecord reload 后 acc 范围窄化 时 push leak.
297                    rec.allowed_acc_ids
298                        .as_ref()
299                        .map(|s| std::sync::Arc::new(s.clone())),
300                ),
301                AuthResult::Legacy => (None, HashSet::new(), None, None),
302            };
303
304            let conn = ClientConn {
305                conn_id,
306                state: ConnState::Connected,
307                aes_key,
308                aes_encrypt_enabled: false,
309                proto_fmt_type: ProtoFmtType::Protobuf,
310                last_keepalive: Instant::now(),
311                recv_notify: false,
312                keepalive_count: std::sync::atomic::AtomicU32::new(0),
313                tx,
314                key_id,
315                scopes,
316                allowed_markets,
317                allowed_acc_ids,
318            };
319
320            connections.insert(conn_id, conn);
321        }
322    }
323}
324
325/// 握手鉴权结果:scope 模式下的 KeyRecord 或 legacy 全放行
326enum AuthResult {
327    Authenticated(Arc<KeyRecord>),
328    Legacy,
329}
330
331// `ErrorResponse` 是 tungstenite/axum 的 ws upgrade 接口约定类型,含
332// `http::Response<Option<String>>` 必然偏大(≥136 bytes)。本 fn 在 ws
333// 握手 error path(slot poisoning,极罕见),非 hot path;改 `Box<ErrorResponse>`
334// 需要所有 caller deref 适配,收益不抵成本。按 pitfall #51"对齐 C++ = 减法"
335// 保最小侵入。
336#[allow(clippy::result_large_err)]
337fn store_ws_auth_result(
338    slot: &std::sync::Mutex<Option<AuthResult>>,
339    result: AuthResult,
340    conn_id: u64,
341) -> Result<(), ErrorResponse> {
342    match slot.lock() {
343        Ok(mut guard) => {
344            *guard = Some(result);
345            Ok(())
346        }
347        Err(e) => {
348            tracing::warn!(conn_id = conn_id, error = %e, "ws auth slot poisoned during handshake");
349            Err(make_err_response(
350                StatusCode::INTERNAL_SERVER_ERROR,
351                "ws auth state unavailable",
352            ))
353        }
354    }
355}
356
357fn take_ws_auth_result(
358    slot: &std::sync::Mutex<Option<AuthResult>>,
359    conn_id: u64,
360) -> Option<AuthResult> {
361    match slot.lock() {
362        Ok(mut guard) => match guard.take() {
363            Some(result) => Some(result),
364            None => {
365                tracing::warn!(
366                    conn_id = conn_id,
367                    "ws handshake succeeded without auth state; closing connection"
368                );
369                None
370            }
371        },
372        Err(e) => {
373            tracing::warn!(conn_id = conn_id, error = %e, "ws auth slot poisoned after handshake");
374            None
375        }
376    }
377}
378
379/// 运行单个 WebSocket 连接的收发循环
380///
381/// 接收端:WS Binary Message → 解析为 FutuFrame → 发到 req_tx
382/// 发送端:frame_rx 收到 FutuFrame → 编码为字节 → 发送 WS Binary Message
383///
384/// 返回 `(frame_tx, Option<AuthResult>)` —— `None` 表示握手 / 鉴权失败,调用方
385/// 不应把这个 conn_id 加入连接池。
386async fn run_ws_connection(
387    stream: tokio::net::TcpStream,
388    conn_id: u64,
389    _aes_key: [u8; 16],
390    req_tx: mpsc::UnboundedSender<IncomingRequest>,
391    disconnect_tx: mpsc::UnboundedSender<DisconnectNotify>,
392    key_store: Option<Arc<KeyStore>>,
393) -> (mpsc::Sender<FutuFrame>, Option<AuthResult>) {
394    let (frame_tx, mut frame_rx) = mpsc::channel::<FutuFrame>(256);
395
396    // 握手回调里验证 token + scope,结果存到 authed_slot;accept_hdr_async 内部
397    // 只读 callback 的 Ok/Err 决定要不要升级,所以 KeyRecord 要借 Mutex 传出来
398    let authed_slot: Arc<std::sync::Mutex<Option<AuthResult>>> =
399        Arc::new(std::sync::Mutex::new(None));
400    let slot_cb = Arc::clone(&authed_slot);
401    let store_cb = key_store.clone();
402
403    #[allow(clippy::result_large_err)] // ErrorResponse 是 tungstenite 的类型,我们无法改其大小
404    let callback = move |req: &Request, resp: Response| -> Result<Response, ErrorResponse> {
405        // v1.4.102 F-002 fix (P2, leaf v1.4.100 报告): WS handshake 加 Origin 校验.
406        // 历史: legacy WS 看到任意 evil Origin 仍 handshake 成功 (HTTP 101).
407        // 浏览器 WS exfiltration POC 还没证实, 但默认行为该收紧.
408        //
409        // **策略** (与 REST CORS BUG-008 对齐):
410        //   1. `FUTU_WS_ALLOWED_ORIGINS=https://app.com,http://localhost:3000`
411        //      显式 allowlist (推荐).
412        //   2. key_store 配置了 (auth enabled) + 未设 env → loopback only.
413        //   3. legacy unauth + 未设 env → loopback only (与 REST CORS 对齐).
414        //   4. **没 Origin 头**(非 browser 客户端 / native WS lib)→ 通过.
415        if let Some(origin_hv) = req.headers().get("origin") {
416            let origin_str = match origin_hv.to_str() {
417                Ok(s) => s,
418                Err(e) => {
419                    tracing::warn!(conn_id, error = %e, "ws Origin header is not valid UTF-8");
420                    futu_auth::audit::reject("ws", "/ws", "<origin>", "invalid Origin header");
421                    return Err(make_err_response(
422                        StatusCode::FORBIDDEN,
423                        "Invalid Origin header",
424                    ));
425                }
426            };
427            let allowed = ws_check_origin(origin_str, store_cb.as_deref());
428            if !allowed {
429                futu_auth::audit::reject("ws", "/ws", "<origin>", "Origin rejected by allowlist");
430                return Err(make_err_response(
431                    StatusCode::FORBIDDEN,
432                    "Origin not allowed (configure FUTU_WS_ALLOWED_ORIGINS env)",
433                ));
434            }
435        }
436
437        // legacy:未注入 KeyStore 或未配置 keys.json → 全放行
438        let Some(store) = store_cb.as_ref() else {
439            store_ws_auth_result(slot_cb.as_ref(), AuthResult::Legacy, conn_id)?;
440            return Ok(resp);
441        };
442        if !store.is_configured() {
443            store_ws_auth_result(slot_cb.as_ref(), AuthResult::Legacy, conn_id)?;
444            return Ok(resp);
445        }
446
447        // 提取 token:?token=... 或 Authorization: Bearer ...
448        let token = extract_ws_token(req);
449        let Some(token) = token else {
450            futu_auth::audit::reject("ws", "/ws", "<missing>", "missing token");
451            return Err(make_err_response(
452                StatusCode::UNAUTHORIZED,
453                "missing api key (use ?token=... or Authorization: Bearer ...)",
454            ));
455        };
456
457        let Some(rec) = store.verify(&token) else {
458            futu_auth::audit::reject("ws", "/ws", "<invalid>", "invalid api key");
459            return Err(make_err_response(
460                StatusCode::UNAUTHORIZED,
461                "invalid api key",
462            ));
463        };
464
465        if rec.is_expired(Utc::now()) {
466            futu_auth::audit::reject("ws", "/ws", &rec.id, "key expired");
467            return Err(make_err_response(StatusCode::UNAUTHORIZED, "key expired"));
468        }
469
470        // 最低门槛:qot:read。真正按 proto_id 的细粒度检查留给 process_requests
471        // v1.4.102 codex 24 F4 (P2): forbidden body 通用化, 不再泄 scope.
472        // 详细 audit 信息留 audit log; HTTP body 用通用 "forbidden". REST + gRPC
473        // + WS handshake 同步 (BUG-011 + 24 F4).
474        if !rec.scopes.contains(&Scope::QotRead) {
475            futu_auth::audit::reject("ws", "/ws", &rec.id, "missing qot:read");
476            return Err(make_err_response(StatusCode::FORBIDDEN, "forbidden"));
477        }
478
479        futu_auth::audit::allow("ws", "/ws", &rec.id, Some("qot:read"));
480        store_ws_auth_result(slot_cb.as_ref(), AuthResult::Authenticated(rec), conn_id)?;
481        Ok(resp)
482    };
483
484    let ws_stream = match tokio_tungstenite::accept_hdr_async(stream, callback).await {
485        Ok(ws) => ws,
486        Err(e) => {
487            tracing::warn!(conn_id = conn_id, error = %e, "ws handshake failed");
488            let _ = disconnect_tx.send(DisconnectNotify { conn_id });
489            return (frame_tx, None);
490        }
491    };
492
493    // 握手成功 → slot_cb 应已填;若内部状态不可用,fail-closed 关闭这条 WS。
494    let Some(authed) = take_ws_auth_result(authed_slot.as_ref(), conn_id) else {
495        let _ = disconnect_tx.send(DisconnectNotify { conn_id });
496        return (frame_tx, None);
497    };
498
499    let (mut ws_sink, mut ws_stream_rx) = ws_stream.split();
500
501    // 发送任务:FutuFrame → 编码为字节 → WS Binary
502    tokio::spawn(async move {
503        while let Some(frame) = frame_rx.recv().await {
504            let mut buf = BytesMut::new();
505            frame.header.encode(&mut buf);
506            buf.extend_from_slice(&frame.body);
507            // tokio-tungstenite 0.26+: Message::Binary 收 Bytes(之前是 Vec<u8>)
508            let msg = Message::Binary(buf.freeze());
509            if let Err(e) = ws_sink.send(msg).await {
510                tracing::warn!(conn_id = conn_id, error = %e, "ws send failed");
511                break;
512            }
513        }
514    });
515
516    // 接收任务:WS Binary → 解析 FutuFrame → req_tx
517    tokio::spawn(async move {
518        while let Some(result) = ws_stream_rx.next().await {
519            match result {
520                Ok(msg) => {
521                    let data = match msg {
522                        Message::Binary(data) => data,
523                        Message::Close(_) => {
524                            tracing::info!(conn_id = conn_id, "ws client sent close");
525                            break;
526                        }
527                        Message::Ping(_) | Message::Pong(_) => {
528                            // tokio-tungstenite 自动处理 ping/pong
529                            continue;
530                        }
531                        _ => {
532                            // 忽略 Text 等其他类型
533                            continue;
534                        }
535                    };
536
537                    // 解析 FutuAPI 帧(44 字节帧头 + body)
538                    if data.len() < HEADER_SIZE {
539                        tracing::warn!(
540                            conn_id = conn_id,
541                            len = data.len(),
542                            "ws message too short for futu header"
543                        );
544                        continue;
545                    }
546
547                    let header_buf = BytesMut::from(&data[..]);
548                    let header = match FutuHeader::peek(&header_buf) {
549                        Ok(Some(h)) => h,
550                        Ok(None) => {
551                            tracing::warn!(conn_id = conn_id, "ws header peek returned None");
552                            continue;
553                        }
554                        Err(e) => {
555                            tracing::warn!(conn_id = conn_id, error = %e, "ws invalid futu header");
556                            continue;
557                        }
558                    };
559
560                    let expected_len = HEADER_SIZE + header.body_len as usize;
561                    if data.len() < expected_len {
562                        tracing::warn!(
563                            conn_id = conn_id,
564                            expected = expected_len,
565                            actual = data.len(),
566                            "ws message shorter than expected frame size"
567                        );
568                        continue;
569                    }
570
571                    let body = bytes::Bytes::copy_from_slice(&data[HEADER_SIZE..expected_len]);
572
573                    let req = IncomingRequest::builder(
574                        conn_id,
575                        header.proto_id,
576                        header.serial_no,
577                        header.proto_fmt_type,
578                        body,
579                    )
580                    .build();
581
582                    if req_tx.send(req).is_err() {
583                        break;
584                    }
585                }
586                Err(e) => {
587                    tracing::warn!(conn_id = conn_id, error = %e, "ws recv error");
588                    break;
589                }
590            }
591        }
592        tracing::info!(conn_id = conn_id, "ws connection closed");
593        let _ = disconnect_tx.send(DisconnectNotify { conn_id });
594    });
595
596    (frame_tx, Some(authed))
597}
598
599/// v1.4.93 P0-5 (NEW-C-02): legacy mode 的 `tracing::warn!` 内容。
600///
601/// 抽出 const fn 以便单测验证 warn 消息携带 "v2"/"reject" 等关键提示词,
602/// 防止后续被误删(同模式 v1.4.86 SEC-003 Q4 已沉淀)。
603pub(crate) const fn legacy_mode_warn_tracing_message() -> &'static str {
604    "WS server running WITHOUT API key auth (legacy mode); \
605     all WS clients accept unauthenticated handshake (no-token / \
606     wrong-bearer / bogus-query all return success). \
607     Pass KeyStore via with_auth() to enable. \
608     v2 will default-reject; migrate to --rest-keys-file / --ws-keys-file for production."
609}
610
611/// v1.4.93 P0-5 (NEW-C-02): legacy mode 的 stderr 用户可见消息。
612///
613/// 比 tracing::warn 更短,方便 systemd / docker logs 一行抓住。
614pub(crate) const fn legacy_mode_warn_stderr_message() -> &'static str {
615    "⚠️  WS server (legacy mode, no --ws-keys-file): \
616     unauthenticated handshakes accepted. v2 will default-reject. \
617     Migrate to --ws-keys-file for production."
618}
619
620/// 从握手 Request 里提取 token:优先 ?token=...,再 Authorization: Bearer ...
621///
622/// 浏览器 WS API 不允许设置自定义 header,所以优先支持 query;原生客户端
623/// (curl / Futu SDK 之类)两种都可以.
624///
625/// v1.4.104 阶段 7-3 fix:Authorization 走 `parse_bearer_scheme` 共享 helper
626/// (case-insensitive RFC 7235). 旧实装 `strip_prefix("Bearer ")` 是
627/// case-sensitive bug — `bearer xxx` / `BEARER xxx` 会被拒, 与 gRPC + REST +
628/// MCP 行为不一致 (它们已 case-insensitive). 本 commit 修齐.
629fn extract_ws_token(req: &Request) -> Option<String> {
630    if let Some(q) = req.uri().query() {
631        // 手写 query 解析,避免引 url / percent-encoding 新依赖
632        let params: HashMap<&str, &str> =
633            q.split('&').filter_map(|kv| kv.split_once('=')).collect();
634        if let Some(v) = params.get("token")
635            && !v.is_empty()
636        {
637            return Some((*v).to_string());
638        }
639    }
640    let header = req.headers().get("authorization")?;
641    let value = header.to_str().ok()?;
642    futu_auth_pipeline::parse_bearer_scheme(value).map(|t| t.to_string())
643}
644
645/// v1.4.102 F-002 fix (P2): WS handshake Origin 校验.
646///
647/// **匹配规则** (与 REST CORS BUG-008 对齐):
648///   1. `FUTU_WS_ALLOWED_ORIGINS` env 显式 allowlist → 严格匹配
649///   2. 未设 env + key_store configured → loopback (127.0.0.1 / localhost / [::1])
650///   3. 未设 env + legacy unauth → 允许任意 (向后兼容)
651///
652/// 浏览器以外的原生 WS 客户端 (futucli, Python SDK 等) **不发 Origin 头**,
653/// 调用方应在 callback 里**先 check Origin header presence**, 缺失时跳过本 fn.
654fn ws_check_origin(origin: &str, key_store: Option<&KeyStore>) -> bool {
655    if let Ok(raw) = std::env::var("FUTU_WS_ALLOWED_ORIGINS") {
656        let trimmed = raw.trim();
657        if !trimmed.is_empty() {
658            for allowed in trimmed
659                .split(',')
660                .map(|s| s.trim())
661                .filter(|s| !s.is_empty())
662            {
663                if allowed == origin {
664                    return true;
665                }
666            }
667            // env 显式设了但 origin 不在 list → reject
668            return false;
669        }
670    }
671    // env 未设 / 空: 按 auth 状态决定 default
672    let auth_enabled = key_store.is_some_and(|ks| ks.is_configured());
673    if !auth_enabled {
674        // v1.4.104 eli P2-001 (P2) fix: legacy unauth WS 也 default loopback,
675        // 不再允许任意 Origin. 之前 evil 站点可发 WS handshake 拿 101, 配合
676        // socket 抓数据 → cross-origin leak 风险. 用户要 cross-origin browser
677        // client 必须显式 FUTU_WS_ALLOWED_ORIGINS=https://app.example.com.
678        // (与 REST CORS v1.4.104 P1-004 对齐.)
679        return is_strict_loopback_origin(origin);
680    }
681    // auth enabled: loopback only.
682    // v1.4.102 codex 25 F5 / 30 F3 / 31 F6 (P2/P3) refine: 严格 host 匹配,
683    // 不接受 prefix-混入 evil hosts (`[::1].evil.com` / `localhost.evil.com`).
684    is_strict_loopback_origin(origin)
685}
686
687/// v1.4.102 codex 25 F5 / 30 F3 / 31 F6 strict parser. 不接受 path / query /
688/// userinfo / 任何混入字符. 与 `crates/futu-rest/src/server.rs::is_loopback_origin_str`
689/// 行为对齐 (REST + WS handshake 共用 same strict policy).
690fn is_strict_loopback_origin(s: &str) -> bool {
691    let after_scheme = match s
692        .strip_prefix("http://")
693        .or_else(|| s.strip_prefix("https://"))
694    {
695        Some(rest) => rest,
696        None => return false,
697    };
698    if after_scheme.contains('/')
699        || after_scheme.contains('?')
700        || after_scheme.contains('#')
701        || after_scheme.contains('@')
702    {
703        return false;
704    }
705    // v1.4.102 codex 41 F4: 区分 None vs Some("") (host: vs host).
706    let (host, port_opt): (&str, Option<&str>) =
707        if let Some(rest) = after_scheme.strip_prefix("[::1]") {
708            if rest.is_empty() {
709                ("[::1]", None)
710            } else if let Some(p) = rest.strip_prefix(':') {
711                ("[::1]", Some(p))
712            } else {
713                return false;
714            }
715        } else if let Some((h, p)) = after_scheme.rsplit_once(':') {
716            (h, Some(p))
717        } else {
718            (after_scheme, None)
719        };
720    if !matches!(host, "127.0.0.1" | "localhost" | "[::1]") {
721        return false;
722    }
723    if let Some(port_str) = port_opt {
724        match port_str.parse::<u16>() {
725            Ok(p) if p >= 1 => {}
726            _ => return false,
727        }
728    }
729    true
730}
731
732/// 构造 tungstenite 握手失败的 HTTP 响应
733fn make_err_response(code: StatusCode, msg: &str) -> ErrorResponse {
734    let body = Some(format!(r#"{{"error":"{msg}"}}"#));
735    let mut resp = tokio_tungstenite::tungstenite::http::Response::new(body);
736    *resp.status_mut() = code;
737    resp.headers_mut().insert(
738        "content-type",
739        tokio_tungstenite::tungstenite::http::HeaderValue::from_static("application/json"),
740    );
741    resp
742}
743
744/// 处理 WebSocket 连接的请求(逻辑与 TCP 的 process_requests 相同,额外做 scope / 限额)
745///
746/// v1.4.104 阶段 3 重构: inline scope gate + trade:real rate gate +
747/// `ws_body_aware_check` 三段折叠为单一 `authenticate_request` pipeline 调用.
748///
749/// **流程**:
750/// 1. AES decrypt (handshake INIT_CONNECT 跳过 — body 是明文 RSA 加密 InitReq)
751/// 2. 非 INIT_CONNECT + scope_for_proto_id != None → 调 pipeline:
752///    - Credential::PreVerified(`KeyStore::get_by_id(key_id)`) 拿 SIGHUP-aware
753///      最新 rec, 复用 handshake 已 verify 的身份 (skip re-verify).
754///    - Endpoint::Proto(proto_id), commit_rate=true (per-msg rate 闸门).
755///    - Reject → drop request (audit 已 emit). Allow → 拿 allowed_acc_ids.
756/// 3. dispatch (router / handle_init_connect / handle_keepalive)
757/// 4. response filter (proto 2001 等, 通过 FilterRegistry::apply).
758///
759/// **legacy mode 行为**: KeyStore::empty().is_configured() = false, pipeline
760/// 走 legacy short-circuit (Allow{rec:None}, 不 audit, body-aware 不 enforce).
761/// 1xxx 系统协议 (INIT_CONNECT / KEEP_ALIVE / GET_GLOBAL_STATE) 与 v1.4.103
762/// 一致跳过 pipeline 直接 dispatch (handshake / heartbeat / 公开协议无 scope).
763async fn ws_process_requests(
764    mut req_rx: mpsc::UnboundedReceiver<IncomingRequest>,
765    connections: Arc<DashMap<u64, ClientConn>>,
766    router: Arc<RequestRouter>,
767    config: ServerConfig,
768    counters: Arc<RuntimeCounters>,
769    key_store: Arc<KeyStore>,
770    filter_registry: Arc<FilterRegistry>,
771) {
772    use crate::listener::ApiServer;
773
774    while let Some(mut req) = req_rx.recv().await {
775        let conn_id = req.conn_id;
776        let proto_id_val = req.proto_id;
777        let serial_no = req.serial_no;
778
779        // 更新 last_keepalive(任何包都算活跃)
780        if let Some(mut conn) = connections.get_mut(&conn_id) {
781            conn.last_keepalive = Instant::now();
782        }
783
784        // ── Step 0: v1.4.106 codex 0532 F3 (P2): daemon-internal proto_id
785        // (高位 0x8000_0000 bit) 绝不应从 raw WS 公开 surface 进入 — 仅 REST
786        // handler 内部合成给 router. 在 AES decrypt 前显式 reject + log,
787        // 防探测 daemon 内部 routing.
788        if futu_auth::is_internal_proto_id(proto_id_val) {
789            tracing::warn!(
790                conn_id,
791                proto_id = proto_id_val,
792                "rejecting daemon-internal proto_id at raw WS public surface (codex 0532 F3)"
793            );
794            continue;
795        }
796
797        // ── Step 1: AES decrypt (always for non-INIT_CONNECT) ─────────────────
798        // pipeline + body-aware 都需 plaintext body. INIT_CONNECT body 是 RSA
799        // 加密的 ConnInitReq, 由 handle_init_connect 自行 RSA decrypt.
800        if proto_id_val != proto_id::INIT_CONNECT
801            && let Some(conn) = connections.get(&conn_id)
802            && conn.aes_encrypt_enabled
803        {
804            match conn.decrypt_body(&req.body) {
805                Ok(decrypted) => {
806                    req.body = bytes::Bytes::from(decrypted);
807                }
808                Err(e) => {
809                    tracing::warn!(
810                        conn_id = conn_id,
811                        proto_id = proto_id_val,
812                        error = %e,
813                        "ws AES decrypt request failed, dropping"
814                    );
815                    continue;
816                }
817            }
818        }
819
820        // ── Step 2: Pipeline auth ─────────────────────────────────────────────
821        // INIT_CONNECT (handshake) + 1xxx 系统协议 (scope_for_proto_id == None)
822        // 跳 pipeline. 与 v1.4.103 行为一致: 系统协议不审 / 不限频 / 直 dispatch.
823        let needed_scope = futu_auth_pipeline::capability::scope_for_proto_id(proto_id_val);
824        // codex 0522 F1 v1.4.106: 提前抓 conn.key_id 快照让 dispatch IncomingRequest
825        // 也能填 caller_key_id (per-call snapshot, 与 caller_allowed_acc_ids 同源).
826        // 即使 INIT_CONNECT / 1xxx 系统协议 (跳 pipeline) 也带上 — handler 可基于
827        // key_id 做 per-key 订阅配额 / cleanup / 审计.
828        let dispatch_caller_key_id: Option<String> =
829            connections.get(&conn_id).and_then(|c| c.key_id.clone());
830        let allowed_acc_ids_for_resp_filter: Option<HashSet<u64>> =
831            if proto_id_val == proto_id::INIT_CONNECT || needed_scope.is_none() {
832                None
833            } else {
834                // 从 conn 取 key_id 快照, 然后 KeyStore::get_by_id 拿 SIGHUP-aware
835                // 最新 rec (limits / scopes / allowed_acc_ids 都跟最新). 找不到 ↦
836                // Credential::None (legacy mode 放行 / scope mode reject Unauth).
837                let key_id_snap = dispatch_caller_key_id.clone();
838                let rec_opt = key_id_snap.as_ref().and_then(|id| key_store.get_by_id(id));
839                let credential = match rec_opt {
840                    Some(rec) => Credential::PreVerified(rec),
841                    None => Credential::None,
842                };
843
844                let env = AuthEnvelope {
845                    surface: SurfaceId::Ws,
846                    endpoint: Endpoint::Proto(proto_id_val),
847                    needed_scope,
848                    credential,
849                    proto_id: Some(proto_id_val),
850                    body: &req.body,
851                    explicit_acc_id: None,
852                    explicit_ctx: None,
853                    commit_rate: true, // WS per-msg 是 trade:real 唯一 rate gate
854                    audit_emit: true,
855                };
856
857                // v1.4.106 D1 5c: 走 SurfaceAdapter trait
858                // (`WsAdapter::translate_decision`), 与 4 surface 一致.
859                // Allow → Some(allowed_acc_ids), Reject → None + silent drop.
860                use futu_auth_pipeline::SurfaceAdapter;
861                match authenticate_request(&key_store, &counters, env) {
862                    AuthDecision::Allow {
863                        allowed_acc_ids, ..
864                    } => allowed_acc_ids,
865                    decision @ AuthDecision::Reject { .. } => {
866                        // WsAdapter::translate_decision 返 Some(()) 表 reject
867                        // (silent drop). pipeline 已 audit reject, 这里不打 log.
868                        let _ = WsAdapter::translate_decision(decision);
869                        continue;
870                    }
871                }
872            };
873
874        // ── Step 3: Dispatch ──────────────────────────────────────────────────
875        let response_body = match proto_id_val {
876            proto_id::INIT_CONNECT => match connections.get_mut(&conn_id) {
877                Some(mut conn) => conn
878                    .handle_init_connect(
879                        &req.body,
880                        config.server_ver,
881                        config.login_user_id,
882                        config.keepalive_interval,
883                        config.rsa_private_key.as_deref(),
884                    )
885                    .ok(),
886                _ => None,
887            },
888            proto_id::KEEP_ALIVE => match connections.get(&conn_id) {
889                Some(conn) => conn.handle_keepalive(&req.body).ok(),
890                _ => None,
891            },
892            _ => {
893                // v1.4.105 D2 T-A1 fix: caller_allowed_acc_ids 从 pipeline allow
894                // decision 真填进 IncomingRequest, 让 dispatch handler (e.g.
895                // SubAccPushHandler) 端 enforce per-acc whitelist defense-in-depth.
896                // codex 0522 F1 v1.4.106: 同步填 caller_key_id (per-call snapshot
897                // 来自 conn.key_id), 让 cross-surface handler 都能识别 caller.
898                let dispatch_req = IncomingRequest::builder(
899                    req.conn_id,
900                    req.proto_id,
901                    req.serial_no,
902                    req.proto_fmt_type,
903                    req.body.clone(),
904                )
905                .with_idempotency_key(req.idempotency_key.clone())
906                .with_caller_scope(
907                    allowed_acc_ids_for_resp_filter
908                        .as_ref()
909                        .map(|s| std::sync::Arc::new(s.clone())),
910                    dispatch_caller_key_id.clone(),
911                )
912                .build();
913                router.dispatch(conn_id, &dispatch_req).await
914            }
915        };
916
917        // ── Step 4: Response filter (TRD_GET_ACC_LIST 等) ────────────────────
918        if let Some(body) = response_body {
919            // FilterRegistry::apply(): proto_id 未注册 → 原 body 不动 (no-op).
920            // 注册了 (e.g. proto 2001) → filter by allowed_acc_ids.
921            let filtered =
922                filter_registry.apply(proto_id_val, body, allowed_acc_ids_for_resp_filter.as_ref());
923            ApiServer::send_response(&connections, conn_id, proto_id_val, serial_no, filtered)
924                .await;
925        }
926    }
927}
928
929#[cfg(test)]
930mod tests;