Skip to main content

futu_rest/
ws.rs

1//! WebSocket 推送模块
2//!
3//! 在 REST API 端口上提供 /ws 路由,客户端通过 WebSocket 接收实时推送。
4//!
5//! 推送事件通过 broadcast channel 从 OpenD 核心分发到所有 WebSocket 客户端。
6
7use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, RwLock};
9
10use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
11use axum::extract::{Query, State};
12use axum::http::{HeaderMap, StatusCode};
13use axum::response::IntoResponse;
14use chrono::Utc;
15use futures::{SinkExt, StreamExt};
16use tokio::sync::broadcast;
17
18use futu_auth::{KeyRecord, KeyStore, Scope};
19use futu_server::push::ExternalPushSink;
20
21use crate::adapter::RestState;
22
23/// WebSocket 推送事件
24#[derive(Clone, Debug, serde::Serialize)]
25pub struct WsPushEvent {
26    /// 推送类型: "quote", "trade", "notify"
27    #[serde(rename = "type")]
28    pub event_type: String,
29    /// 该事件需要哪个 scope 才能被某个 client 接收(filter 用,不发到客户端)
30    #[serde(skip)]
31    pub required_scope: WsPushScope,
32    /// 协议 ID
33    pub proto_id: u32,
34    /// 证券标识 (行情推送)
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub sec_key: Option<String>,
37    /// 订阅类型 (行情推送)
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub sub_type: Option<i32>,
40    /// **v1.4.106 codex 1131 F4 [P1]**: rehab 类型 (KL push 非 0, 其它 sub_type
41    /// 为 0). 客户端用于 (sec_key, sub_type, rehab_type) 三元 key 自行 filter
42    /// 不感兴趣的 KL rehab 推送.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub rehab_type: Option<i32>,
45    /// 交易账户 ID (交易推送)
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub acc_id: Option<u64>,
48    /// protobuf body 的 base64 编码
49    pub body_b64: String,
50    /// v1.4.105 D3 (Phase 4) T-B1: 交易推送的 trd_market 大写字符串 ("HK" /
51    /// "US" / "CN" / "HKCC" / "FUTURES" / "SG" / "AU" / "JP" / "MY" / "CA").
52    /// PushDispatcher 一次 decode body 后透传过来, 让 WS push filter Layer 3
53    /// (allowed_markets) 直接读. `None` = 非 trade event / decode 失败 /
54    /// market 未知 (Layer 3 向后兼容不 trigger drop).
55    ///
56    /// 客户端可见: trade event 出现 `trd_market` 字段, qot/notify 不出现
57    /// (`skip_serializing_if = "Option::is_none"`).
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub trd_market: Option<String>,
60}
61
62/// WS 推送事件需要的最低 scope(client 没这个 scope 就收不到)
63///
64/// - `Quote` → `qot:read`:行情类
65/// - `Notify` → `qot:read`:通用通知(如订阅状态、网关心跳)
66/// - `Trade` → `acc:read`:交易回报涉及账户隐私,必须有账户读权限
67#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
68#[non_exhaustive]
69pub enum WsPushScope {
70    /// 行情推送(订阅 symbol 后 push 的 basic_qot / order_book / ticker 等)。
71    /// 需要 [`Scope::QotRead`]。
72    #[default]
73    Quote,
74    /// 广播通知(系统事件 / 全局消息)。需要 [`Scope::QotRead`]。
75    Notify,
76    /// 交易推送(订单状态变化 / 成交回报)。需要 [`Scope::AccRead`]。
77    Trade,
78}
79
80impl WsPushScope {
81    /// 该事件类型需要的 Scope;client 必须持有这个 scope 才能收到
82    pub fn required_scope(&self) -> Scope {
83        match self {
84            WsPushScope::Quote => Scope::QotRead,
85            WsPushScope::Notify => Scope::QotRead,
86            WsPushScope::Trade => Scope::AccRead,
87        }
88    }
89}
90
91/// WebSocket 推送广播器
92///
93/// OpenD 核心推送事件 → broadcast channel → 所有 WebSocket 客户端
94///
95/// 实现 `ExternalPushSink` trait,可直接嵌入 PushDispatcher。
96#[derive(Clone)]
97pub struct WsBroadcaster {
98    tx: broadcast::Sender<WsPushEvent>,
99}
100
101impl WsBroadcaster {
102    pub fn new(capacity: usize) -> Self {
103        let (tx, _) = broadcast::channel(capacity);
104        Self { tx }
105    }
106
107    /// 发送推送事件到所有 WebSocket 客户端
108    pub fn send(&self, event: WsPushEvent) {
109        // 忽略没有接收者的情况
110        let _ = self.tx.send(event);
111    }
112
113    /// 创建接收端
114    pub fn subscribe(&self) -> broadcast::Receiver<WsPushEvent> {
115        self.tx.subscribe()
116    }
117
118    fn encode_body(body: &[u8]) -> String {
119        use base64::Engine;
120        base64::engine::general_purpose::STANDARD.encode(body)
121    }
122
123    /// 发送行情推送.
124    ///
125    /// **v1.4.106 codex 1131 F4 [P1]**: 加 `rehab_type` 参数. KL push 的
126    /// `rehab_type` ≠ 0, 其它 sub_type → 0. 当前 REST WS 仍 broadcast 所有
127    /// quote events 给 qot:read 订阅者 (per-conn 三元 key 过滤是 raw TCP 专属
128    /// 行为 — REST WS 用 broadcast 模型). 但 rehab_type 透传给客户端可见, 让
129    /// agent 自己识别 KL push 的 rehab 类型.
130    pub fn push_quote(
131        &self,
132        sec_key: &str,
133        sub_type: i32,
134        rehab_type: i32,
135        proto_id: u32,
136        body: &[u8],
137    ) {
138        self.send(WsPushEvent {
139            event_type: "quote".to_string(),
140            required_scope: WsPushScope::Quote,
141            proto_id,
142            sec_key: Some(sec_key.to_string()),
143            sub_type: Some(sub_type),
144            rehab_type: Some(rehab_type),
145            acc_id: None,
146            body_b64: Self::encode_body(body),
147            trd_market: None,
148        });
149    }
150
151    /// 发送广播推送
152    pub fn push_broadcast(&self, proto_id: u32, body: &[u8]) {
153        self.send(WsPushEvent {
154            event_type: "notify".to_string(),
155            required_scope: WsPushScope::Notify,
156            proto_id,
157            sec_key: None,
158            sub_type: None,
159            rehab_type: None,
160            acc_id: None,
161            body_b64: Self::encode_body(body),
162            trd_market: None,
163        });
164    }
165
166    /// 发送交易推送
167    ///
168    /// v1.4.105 D3 (Phase 4) T-B1: `trd_market` 由 [`PushDispatcher`] 一次
169    /// decode body 后透传, 直接塞 [`WsPushEvent.trd_market`] 给后续 Layer 3
170    /// filter 与客户端可见.
171    pub fn push_trade(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
172        self.send(WsPushEvent {
173            event_type: "trade".to_string(),
174            required_scope: WsPushScope::Trade,
175            proto_id,
176            sec_key: None,
177            sub_type: None,
178            rehab_type: None,
179            acc_id: Some(acc_id),
180            body_b64: Self::encode_body(body),
181            trd_market: trd_market.map(|s| s.to_string()),
182        });
183    }
184}
185
186/// 实现 ExternalPushSink,使 WsBroadcaster 可嵌入 PushDispatcher
187impl ExternalPushSink for WsBroadcaster {
188    fn on_quote_push(
189        &self,
190        sec_key: &str,
191        sub_type: i32,
192        rehab_type: i32,
193        proto_id: u32,
194        body: &[u8],
195    ) {
196        self.push_quote(sec_key, sub_type, rehab_type, proto_id, body);
197    }
198
199    fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
200        self.push_broadcast(proto_id, body);
201    }
202
203    fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
204        self.push_trade(acc_id, proto_id, body, trd_market);
205    }
206}
207
208/// WebSocket 握手鉴权:从 `?token=xxx` 查询参数或 `Authorization: Bearer` header 提取 token
209///
210/// 浏览器 WebSocket API 不允许设置自定义 header,所以优先支持 `?token=`;
211/// 原生客户端(curl / websocat / tokio-tungstenite)可以用任一方式。
212fn extract_ws_token(headers: &HeaderMap, query: &HashMap<String, String>) -> Option<String> {
213    if let Some(t) = query.get("token") {
214        return Some(t.clone());
215    }
216    headers
217        .get("authorization")
218        .and_then(|v| v.to_str().ok())
219        .and_then(|v| v.strip_prefix("Bearer ").map(|s| s.trim().to_string()))
220}
221
222/// 校验 WebSocket 握手的 token;返回 `Ok(Some(rec))` 表示 scope 模式 + 通过;
223/// `Ok(None)` 表示 legacy 模式(未配 KeyStore),所有事件无条件放行。
224///
225/// - `key_store.is_configured() == false` → 无条件放行(legacy 模式)
226/// - 配置了 KeyStore:必须有 token,且 key 有 `qot:read` scope(最低门槛,
227///   实际收哪些事件由后续 push filter 按 scope 决定)
228fn authenticate_ws(
229    key_store: &KeyStore,
230    headers: &HeaderMap,
231    query: &HashMap<String, String>,
232) -> Result<Option<Arc<KeyRecord>>, (StatusCode, &'static str)> {
233    if !key_store.is_configured() {
234        return Ok(None);
235    }
236
237    let Some(token) = extract_ws_token(headers, query) else {
238        futu_auth::audit::reject(
239            "ws",
240            "/ws",
241            "<missing>",
242            "missing token (query or Authorization)",
243        );
244        return Err((StatusCode::UNAUTHORIZED, "missing api key"));
245    };
246
247    let Some(rec) = key_store.verify(&token) else {
248        futu_auth::audit::reject("ws", "/ws", "<invalid>", "invalid api key");
249        return Err((StatusCode::UNAUTHORIZED, "invalid api key"));
250    };
251
252    if rec.is_expired(Utc::now()) {
253        futu_auth::audit::reject("ws", "/ws", &rec.id, "key expired");
254        return Err((StatusCode::UNAUTHORIZED, "key expired"));
255    }
256
257    if !rec.scopes.contains(&Scope::QotRead) {
258        // v1.4.102 BUG-011 fix (P2): 不再泄露 scope 给请求方,
259        // 仅写本地 audit log. 与 REST / gRPC 同步.
260        futu_auth::audit::reject("ws", "/ws", &rec.id, "missing qot:read scope");
261        return Err((StatusCode::FORBIDDEN, "forbidden"));
262    }
263
264    futu_auth::audit::allow("ws", "/ws", &rec.id, Some("qot:read"));
265    Ok(Some(rec))
266}
267
268/// WebSocket 升级处理
269pub async fn ws_handler(
270    ws: WebSocketUpgrade,
271    headers: HeaderMap,
272    Query(query): Query<HashMap<String, String>>,
273    State(state): State<RestState>,
274) -> impl IntoResponse {
275    let rec = match authenticate_ws(&state.key_store, &headers, &query) {
276        Ok(rec) => rec,
277        Err((code, msg)) => return (code, msg).into_response(),
278    };
279    // legacy(rec=None)时给个"全 scope"快照让 filter 全放行;scope 模式用 rec.scopes
280    let scopes: HashSet<Scope> = match &rec {
281        Some(r) => r.scopes.clone(),
282        None => all_scopes(),
283    };
284    let key_id = rec.as_ref().map(|r| r.id.clone());
285    // v1.4.102 codex 47 F1 / 48 F1 (P1): WS 必须独立 enforce key.allowed_acc_ids
286    // (per-key acc 白名单), 不能依赖 sub-acc-push 是否调过. 之前未调 sub-acc-push
287    // 时 fall-back 全 push, key 被限到 acc A 仍能收 acc B 的 trade push.
288    let allowed_acc_ids = rec.as_ref().and_then(|r| r.allowed_acc_ids.clone());
289    // v1.4.105 D3 (Phase 4) T-B1: per-key allowed_markets 硬限额 (大写字符
290    // 串 set, e.g. {"HK","US"}). `None` / 空 set = 无限制. WS Layer 3
291    // (TradePushFilter) 用此 set 过滤 trade event 的 trd_market.
292    let allowed_markets = rec.as_ref().and_then(|r| r.allowed_markets.clone());
293    let broadcaster = Arc::clone(&state.ws_broadcaster);
294    // v1.4.102 codex 46 F2 (P1): pass per-key acc subscription state into
295    // WS connection so trade push delivery can filter by sub-acc-push registrations.
296    let rest_acc_subs = Arc::clone(&state.rest_acc_subscriptions);
297    // v1.4.105 D4 (Phase 1): pass shared FilterRegistry into WS connection
298    // so push event filter (TradePushFilter) goes through unified registry.
299    let filter_registry = Arc::clone(&state.filter_registry);
300    let ctx = WsConnectionContext {
301        broadcaster,
302        scopes,
303        key_id,
304        allowed_acc_ids,
305        allowed_markets,
306        rest_acc_subscriptions: rest_acc_subs,
307        filter_registry,
308    };
309    ws.on_upgrade(move |socket| handle_ws_connection(socket, ctx))
310        .into_response()
311}
312
313/// 全 scope 集合(legacy 模式用)
314fn all_scopes() -> HashSet<Scope> {
315    [
316        Scope::QotRead,
317        Scope::AccRead,
318        Scope::TradeSimulate,
319        Scope::TradeReal,
320    ]
321    .into_iter()
322    .collect()
323}
324
325/// 处理单个 WebSocket 连接
326///
327/// `scopes` 是该连接 key 持有的 scope 集合,用于按 `WsPushScope::required_scope()`
328/// 过滤推送事件。例如只有 `qot:read` 的 key 不会收到 `trade` 类推送。
329// v1.4.102 codex 47 F1 / 48 F1 (P1): per-key allowed_acc_ids 硬限额.
330// `None` = 该 key 无 acc 限制 (默认全开); `Some(set)` = 仅这些 acc 可见.
331struct WsConnectionContext {
332    broadcaster: Arc<WsBroadcaster>,
333    scopes: HashSet<Scope>,
334    key_id: Option<String>,
335    allowed_acc_ids: Option<HashSet<u64>>,
336    // v1.4.105 D3 (Phase 4) T-B1: caller key 的 allowed_markets 硬限额, 用于
337    // Layer 3 (TradePushFilter) 过滤. None / 空 set = 无限制.
338    allowed_markets: Option<HashSet<String>>,
339    rest_acc_subscriptions: Arc<RwLock<HashMap<String, HashSet<u64>>>>,
340    // v1.4.105 D4 (Phase 1): 共享 FilterRegistry 实例 — push event 过滤走
341    // 同一 registry 与 4 surface (REST body filter / WS push) 一致.
342    filter_registry: Arc<futu_auth_pipeline::FilterRegistry>,
343}
344
345async fn handle_ws_connection(socket: WebSocket, ctx: WsConnectionContext) {
346    let WsConnectionContext {
347        broadcaster,
348        scopes,
349        key_id,
350        allowed_acc_ids,
351        allowed_markets,
352        rest_acc_subscriptions,
353        filter_registry,
354    } = ctx;
355
356    let (mut ws_tx, mut ws_rx) = socket.split();
357    let mut push_rx = broadcaster.subscribe();
358
359    tracing::info!(
360        key_id = ?key_id,
361        scopes = ?scopes,
362        "WebSocket push client connected"
363    );
364
365    // v1.4.106 codex 1125 F6 [P2]: REST WS notify subscription state.
366    //
367    // 对齐 C++ raw TCP `IsConnSubRecvNotify` (APIServer_Qot_PriceReminder.cpp:730-735):
368    // broadcast notify 类 push (e.g. price reminder) 必须 client 显式 sub 才下发.
369    //
370    // **Breaking change vs v1.4.105**: v1.4.105 之前 REST `/ws` 默认收所有
371    // broadcast notify; v1.4.106 起需 client 发 `{"action":"subscribe-notify"}`
372    // text message 才能继续收. 老 client 如果依赖 price reminder push 必须升级.
373    //
374    // Default false 对齐 raw TCP 默认 unsub 状态.
375    let notify_subscribed = Arc::new(std::sync::atomic::AtomicBool::new(false));
376    let notify_subscribed_for_send = Arc::clone(&notify_subscribed);
377    let notify_subscribed_for_recv = Arc::clone(&notify_subscribed);
378
379    // 推送任务:从 broadcast channel 读取事件 → 按 scope 过滤 → 发送给客户端
380    let send_scopes = scopes.clone();
381    let send_key_id_str = key_id.clone().unwrap_or_else(|| "<none>".to_string());
382    let send_key_id_for_filter = key_id.clone();
383    let rest_subs_for_filter = Arc::clone(&rest_acc_subscriptions);
384    let send_task = tokio::spawn(async move {
385        while let Ok(event) = push_rx.recv().await {
386            // 按 client scope 过滤:key 没这个 scope 就不发
387            if !send_scopes.contains(&event.required_scope.required_scope()) {
388                // 记一次"被挡住的推送",供 Prometheus `/metrics` 观察
389                futu_auth::metrics::bump_ws_filtered(&event.event_type, &send_key_id_str);
390                continue;
391            }
392            // v1.4.106 codex 1125 F6 [P2]: notify subscribe gate.
393            // 对齐 C++ raw TCP `IsConnSubRecvNotify` (broadcast push 必须显式 sub).
394            if matches!(event.required_scope, WsPushScope::Notify)
395                && !notify_subscribed_for_send.load(std::sync::atomic::Ordering::Relaxed)
396            {
397                futu_auth::metrics::bump_ws_filtered("notify_unsub", &send_key_id_str);
398                continue;
399            }
400            // v1.4.102 codex 47 F1 / 48 F1 (P1): trade push 进 acc-id 过滤,
401            // 两层独立 enforce:
402            // 1. **key.allowed_acc_ids 硬限额** (Some(set)): event.acc_id 不在
403            //    set → drop. 与 sub-acc-push 是否调过无关 (老 key 没 sub 也强限).
404            // 2. **REST sub-acc-push state map** (sub_state): 仅当 key 已调过
405            //    sub-acc-push 才生效. entry 存在但不含 acc_id → drop. 未调过 →
406            //    pass (向后兼容老 client).
407            //
408            // codex 48 F2 P1 fix: REST sub state empty entry (Some(set) 但 set
409            // 空) 也算 "已 unsub all" tombstone, 不允许 fall back 到全 push.
410            //
411            // v1.4.103 codex F5.11 (P2) round 5: 抽 logic 到
412            // `should_drop_trade_event_for_caller` pure fn 让单测可验证.
413            //
414            // v1.4.105 D4 (Phase 1): 改走 `FilterRegistry::should_drop_event`
415            // 让 4 surface (REST `/ws` 现接 + 后续 gRPC subscribe_push 等)
416            // 共用同一 registry instance. 防 sibling-route bypass —
417            // 任何人加新 push event filter 只在 registry 注册一次, 不需
418            // 改各 surface inline. `should_drop_trade_event_for_caller`
419            // pure fn 仍保留作 unit test 直接验证 logic, 但 production 走 registry.
420            if matches!(event.required_scope, WsPushScope::Trade)
421                && let Some(event_acc) = event.acc_id
422            {
423                let sub_state_owned: Option<HashSet<u64>> =
424                    send_key_id_for_filter.as_ref().and_then(|kid| {
425                        crate::adapter::with_rest_acc_subscriptions_read(
426                            &rest_subs_for_filter,
427                            |subs| subs.get(kid).cloned(),
428                        )
429                    });
430                let ctx = futu_auth_pipeline::PushEventCtx {
431                    event_type: &event.event_type,
432                    event_acc: Some(event_acc),
433                    allowed_acc_ids: allowed_acc_ids.as_ref(),
434                    sub_state: sub_state_owned.as_ref(),
435                    // v1.4.105 D3 (Phase 4) T-B1: 真接 trd_market —
436                    // PushDispatcher 端一次 decode 后透传到 WsPushEvent.trd_market
437                    // (None = 老路径 / decode 失败 / market 未知, 不 trigger
438                    // Layer 3 drop).
439                    event_trd_market: event.trd_market.as_deref(),
440                    allowed_markets: allowed_markets.as_ref(),
441                };
442                if filter_registry.should_drop_event(&ctx) {
443                    // v1.4.105 F5.2 fix (codex review C4 4th): 4 surface 统一
444                    // metric label "trade_market" 跟 gRPC + raw TCP WS + MCP 一致,
445                    // 不再用 event.event_type (= "trade") 让跨 surface jq aggregate
446                    // 一致.
447                    futu_auth::metrics::bump_ws_filtered("trade_market", &send_key_id_str);
448                    continue;
449                }
450            }
451            let json = match serde_json::to_string(&event) {
452                Ok(j) => j,
453                Err(_) => continue,
454            };
455            if ws_tx.send(Message::Text(json.into())).await.is_err() {
456                break; // 客户端断开
457            }
458        }
459    });
460
461    // 接收任务:处理客户端消息(ping/pong/close + v1.4.106 codex 1125 F6 subscribe-notify)
462    let recv_task = tokio::spawn(async move {
463        while let Some(msg) = ws_rx.next().await {
464            match msg {
465                Ok(Message::Close(_)) | Err(_) => break,
466                Ok(Message::Ping(data)) => {
467                    // axum 自动回复 pong,不需要手动处理
468                    let _ = data;
469                }
470                // v1.4.106 codex 1125 F6 [P2]: 处理 client 发的 JSON control message.
471                // 支持 `{"action":"subscribe-notify"}` / `{"action":"unsubscribe-notify"}`.
472                // 对齐 C++ raw TCP IsConnSubRecvNotify 的 sub/unsub 接口.
473                Ok(Message::Text(text)) => {
474                    if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text)
475                        && let Some(action) = val.get("action").and_then(|v| v.as_str())
476                    {
477                        match action {
478                            "subscribe-notify" => {
479                                notify_subscribed_for_recv
480                                    .store(true, std::sync::atomic::Ordering::Relaxed);
481                                tracing::info!("WS client subscribed notify push");
482                            }
483                            "unsubscribe-notify" => {
484                                notify_subscribed_for_recv
485                                    .store(false, std::sync::atomic::Ordering::Relaxed);
486                                tracing::info!("WS client unsubscribed notify push");
487                            }
488                            other => {
489                                tracing::debug!(action = %other, "WS client unknown action");
490                            }
491                        }
492                    }
493                }
494                _ => {} // 忽略其他消息
495            }
496        }
497    });
498
499    // 任一任务结束则关闭连接
500    tokio::select! {
501        _ = send_task => {}
502        _ = recv_task => {}
503    }
504
505    tracing::info!("WebSocket push client disconnected");
506}
507
508/// v1.4.103 codex F5.11 (P2) round 5: pure-fn 提取 trade event 过滤决策让单测
509/// 可以验证. 不直接 hit handle_ws_connection 异步通路 (需 WebSocket infra),
510/// 但 logic 与该函数 inline 顺序 1:1 对齐.
511///
512/// **行为**:
513/// - event_acc=None (非 trade event 没 acc_id): 不 drop
514/// - Layer 1: `allowed_acc_ids` 非空 + event_acc ∉ allowed → drop
515/// - Layer 2: `sub_state` 含 caller key 的 entry (即使空 tombstone) + event_acc ∉
516///   set → drop. **空 set = unsub-all tombstone, 全 drop 一切 trade**
517/// - Layer 2 missing entry (caller 从未调过 sub-acc-push) → 不 drop (向后兼容).
518///
519/// 返 `true` 表示 drop, `false` 表示 deliver.
520///
521/// v1.4.105 D4 (Phase 1): production 路径已切到 `FilterRegistry::should_drop_event`
522/// (走 `TradePushFilter` impl, logic 等价). 这个 pure fn 现仅作 unit test target
523/// 直接验证 logic, 不再 production 调用. 保留 + `#[cfg(test)]` 让 dead_code 不警告.
524#[cfg(test)]
525pub(crate) fn should_drop_trade_event_for_caller(
526    allowed_acc_ids: Option<&HashSet<u64>>,
527    sub_state: Option<&HashSet<u64>>,
528    event_acc: u64,
529) -> bool {
530    // Layer 1: hard allowed_acc_ids whitelist (caller key 限制).
531    if let Some(allowed) = allowed_acc_ids
532        && !allowed.is_empty()
533        && !allowed.contains(&event_acc)
534    {
535        return true;
536    }
537    // Layer 2: per-key REST sub state (opt-in via /api/sub-acc-push).
538    // - entry 存在 (sub-acc-push 调过): 必须 acc_id ∈ set (空 set =
539    //   unsub all tombstone → drop 一切)
540    // - entry 不存在 (从未调 sub-acc-push): 不 drop (向后兼容老 client)
541    if let Some(set) = sub_state
542        && !set.contains(&event_acc)
543    {
544        return true;
545    }
546    false
547}
548
549#[cfg(test)]
550mod tests;