futu_rest/ws.rs
1//! WebSocket 推送模块
2//!
3//! 在 REST API 端口上提供 /ws 路由,客户端通过 WebSocket 接收实时推送。
4//!
5//! 推送事件通过 broadcast channel 从 OpenD 核心分发到所有 WebSocket 客户端。
6
7use std::collections::{HashMap, HashSet};
8use std::sync::{Arc, RwLock};
9
10use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
11use axum::extract::{Query, State};
12use axum::http::{HeaderMap, StatusCode};
13use axum::response::IntoResponse;
14use chrono::Utc;
15use futures::{SinkExt, StreamExt};
16use tokio::sync::broadcast;
17
18use futu_auth::{KeyRecord, KeyStore, Scope};
19use futu_server::push::ExternalPushSink;
20
21use crate::adapter::RestState;
22
23/// WebSocket 推送事件
24#[derive(Clone, Debug, serde::Serialize)]
25pub struct WsPushEvent {
26 /// 推送类型: "quote", "trade", "notify"
27 #[serde(rename = "type")]
28 pub event_type: String,
29 /// 该事件需要哪个 scope 才能被某个 client 接收(filter 用,不发到客户端)
30 #[serde(skip)]
31 pub required_scope: WsPushScope,
32 /// 协议 ID
33 pub proto_id: u32,
34 /// 证券标识 (行情推送)
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub sec_key: Option<String>,
37 /// 订阅类型 (行情推送)
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub sub_type: Option<i32>,
40 /// **v1.4.106 codex 1131 F4 [P1]**: rehab 类型 (KL push 非 0, 其它 sub_type
41 /// 为 0). 客户端用于 (sec_key, sub_type, rehab_type) 三元 key 自行 filter
42 /// 不感兴趣的 KL rehab 推送.
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub rehab_type: Option<i32>,
45 /// 交易账户 ID (交易推送)
46 #[serde(skip_serializing_if = "Option::is_none")]
47 pub acc_id: Option<u64>,
48 /// protobuf body 的 base64 编码
49 pub body_b64: String,
50 /// v1.4.105 D3 (Phase 4) T-B1: 交易推送的 trd_market 大写字符串 ("HK" /
51 /// "US" / "CN" / "HKCC" / "FUTURES" / "SG" / "AU" / "JP" / "MY" / "CA").
52 /// PushDispatcher 一次 decode body 后透传过来, 让 WS push filter Layer 3
53 /// (allowed_markets) 直接读. `None` = 非 trade event / decode 失败 /
54 /// market 未知 (Layer 3 向后兼容不 trigger drop).
55 ///
56 /// 客户端可见: trade event 出现 `trd_market` 字段, qot/notify 不出现
57 /// (`skip_serializing_if = "Option::is_none"`).
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub trd_market: Option<String>,
60}
61
62/// WS 推送事件需要的最低 scope(client 没这个 scope 就收不到)
63///
64/// - `Quote` → `qot:read`:行情类
65/// - `Notify` → `qot:read`:通用通知(如订阅状态、网关心跳)
66/// - `Trade` → `acc:read`:交易回报涉及账户隐私,必须有账户读权限
67#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
68#[non_exhaustive]
69pub enum WsPushScope {
70 /// 行情推送(订阅 symbol 后 push 的 basic_qot / order_book / ticker 等)。
71 /// 需要 [`Scope::QotRead`]。
72 #[default]
73 Quote,
74 /// 广播通知(系统事件 / 全局消息)。需要 [`Scope::QotRead`]。
75 Notify,
76 /// 交易推送(订单状态变化 / 成交回报)。需要 [`Scope::AccRead`]。
77 Trade,
78}
79
80impl WsPushScope {
81 /// 该事件类型需要的 Scope;client 必须持有这个 scope 才能收到
82 pub fn required_scope(&self) -> Scope {
83 match self {
84 WsPushScope::Quote => Scope::QotRead,
85 WsPushScope::Notify => Scope::QotRead,
86 WsPushScope::Trade => Scope::AccRead,
87 }
88 }
89}
90
91/// WebSocket 推送广播器
92///
93/// OpenD 核心推送事件 → broadcast channel → 所有 WebSocket 客户端
94///
95/// 实现 `ExternalPushSink` trait,可直接嵌入 PushDispatcher。
96#[derive(Clone)]
97pub struct WsBroadcaster {
98 tx: broadcast::Sender<WsPushEvent>,
99}
100
101impl WsBroadcaster {
102 pub fn new(capacity: usize) -> Self {
103 let (tx, _) = broadcast::channel(capacity);
104 Self { tx }
105 }
106
107 /// 发送推送事件到所有 WebSocket 客户端
108 pub fn send(&self, event: WsPushEvent) {
109 // 忽略没有接收者的情况
110 let _ = self.tx.send(event);
111 }
112
113 /// 创建接收端
114 pub fn subscribe(&self) -> broadcast::Receiver<WsPushEvent> {
115 self.tx.subscribe()
116 }
117
118 fn encode_body(body: &[u8]) -> String {
119 use base64::Engine;
120 base64::engine::general_purpose::STANDARD.encode(body)
121 }
122
123 /// 发送行情推送.
124 ///
125 /// **v1.4.106 codex 1131 F4 [P1]**: 加 `rehab_type` 参数. KL push 的
126 /// `rehab_type` ≠ 0, 其它 sub_type → 0. 当前 REST WS 仍 broadcast 所有
127 /// quote events 给 qot:read 订阅者 (per-conn 三元 key 过滤是 raw TCP 专属
128 /// 行为 — REST WS 用 broadcast 模型). 但 rehab_type 透传给客户端可见, 让
129 /// agent 自己识别 KL push 的 rehab 类型.
130 pub fn push_quote(
131 &self,
132 sec_key: &str,
133 sub_type: i32,
134 rehab_type: i32,
135 proto_id: u32,
136 body: &[u8],
137 ) {
138 self.send(WsPushEvent {
139 event_type: "quote".to_string(),
140 required_scope: WsPushScope::Quote,
141 proto_id,
142 sec_key: Some(sec_key.to_string()),
143 sub_type: Some(sub_type),
144 rehab_type: Some(rehab_type),
145 acc_id: None,
146 body_b64: Self::encode_body(body),
147 trd_market: None,
148 });
149 }
150
151 /// 发送广播推送
152 pub fn push_broadcast(&self, proto_id: u32, body: &[u8]) {
153 self.send(WsPushEvent {
154 event_type: "notify".to_string(),
155 required_scope: WsPushScope::Notify,
156 proto_id,
157 sec_key: None,
158 sub_type: None,
159 rehab_type: None,
160 acc_id: None,
161 body_b64: Self::encode_body(body),
162 trd_market: None,
163 });
164 }
165
166 /// 发送交易推送
167 ///
168 /// v1.4.105 D3 (Phase 4) T-B1: `trd_market` 由 [`PushDispatcher`] 一次
169 /// decode body 后透传, 直接塞 [`WsPushEvent.trd_market`] 给后续 Layer 3
170 /// filter 与客户端可见.
171 pub fn push_trade(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
172 self.send(WsPushEvent {
173 event_type: "trade".to_string(),
174 required_scope: WsPushScope::Trade,
175 proto_id,
176 sec_key: None,
177 sub_type: None,
178 rehab_type: None,
179 acc_id: Some(acc_id),
180 body_b64: Self::encode_body(body),
181 trd_market: trd_market.map(|s| s.to_string()),
182 });
183 }
184}
185
186/// 实现 ExternalPushSink,使 WsBroadcaster 可嵌入 PushDispatcher
187impl ExternalPushSink for WsBroadcaster {
188 fn on_quote_push(
189 &self,
190 sec_key: &str,
191 sub_type: i32,
192 rehab_type: i32,
193 proto_id: u32,
194 body: &[u8],
195 ) {
196 self.push_quote(sec_key, sub_type, rehab_type, proto_id, body);
197 }
198
199 fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
200 self.push_broadcast(proto_id, body);
201 }
202
203 fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
204 self.push_trade(acc_id, proto_id, body, trd_market);
205 }
206}
207
208/// WebSocket 握手鉴权:从 `?token=xxx` 查询参数或 `Authorization: Bearer` header 提取 token
209///
210/// 浏览器 WebSocket API 不允许设置自定义 header,所以优先支持 `?token=`;
211/// 原生客户端(curl / websocat / tokio-tungstenite)可以用任一方式。
212fn extract_ws_token(headers: &HeaderMap, query: &HashMap<String, String>) -> Option<String> {
213 if let Some(t) = query.get("token") {
214 return Some(t.clone());
215 }
216 headers
217 .get("authorization")
218 .and_then(|v| v.to_str().ok())
219 .and_then(|v| v.strip_prefix("Bearer ").map(|s| s.trim().to_string()))
220}
221
222/// 校验 WebSocket 握手的 token;返回 `Ok(Some(rec))` 表示 scope 模式 + 通过;
223/// `Ok(None)` 表示 legacy 模式(未配 KeyStore),所有事件无条件放行。
224///
225/// - `key_store.is_configured() == false` → 无条件放行(legacy 模式)
226/// - 配置了 KeyStore:必须有 token,且 key 有 `qot:read` scope(最低门槛,
227/// 实际收哪些事件由后续 push filter 按 scope 决定)
228fn authenticate_ws(
229 key_store: &KeyStore,
230 headers: &HeaderMap,
231 query: &HashMap<String, String>,
232) -> Result<Option<Arc<KeyRecord>>, (StatusCode, &'static str)> {
233 if !key_store.is_configured() {
234 return Ok(None);
235 }
236
237 let Some(token) = extract_ws_token(headers, query) else {
238 futu_auth::audit::reject(
239 "ws",
240 "/ws",
241 "<missing>",
242 "missing token (query or Authorization)",
243 );
244 return Err((StatusCode::UNAUTHORIZED, "missing api key"));
245 };
246
247 let Some(rec) = key_store.verify(&token) else {
248 futu_auth::audit::reject("ws", "/ws", "<invalid>", "invalid api key");
249 return Err((StatusCode::UNAUTHORIZED, "invalid api key"));
250 };
251
252 if rec.is_expired(Utc::now()) {
253 futu_auth::audit::reject("ws", "/ws", &rec.id, "key expired");
254 return Err((StatusCode::UNAUTHORIZED, "key expired"));
255 }
256
257 if !rec.scopes.contains(&Scope::QotRead) {
258 // v1.4.102 BUG-011 fix (P2): 不再泄露 scope 给请求方,
259 // 仅写本地 audit log. 与 REST / gRPC 同步.
260 futu_auth::audit::reject("ws", "/ws", &rec.id, "missing qot:read scope");
261 return Err((StatusCode::FORBIDDEN, "forbidden"));
262 }
263
264 futu_auth::audit::allow("ws", "/ws", &rec.id, Some("qot:read"));
265 Ok(Some(rec))
266}
267
268/// WebSocket 升级处理
269pub async fn ws_handler(
270 ws: WebSocketUpgrade,
271 headers: HeaderMap,
272 Query(query): Query<HashMap<String, String>>,
273 State(state): State<RestState>,
274) -> impl IntoResponse {
275 let rec = match authenticate_ws(&state.key_store, &headers, &query) {
276 Ok(rec) => rec,
277 Err((code, msg)) => return (code, msg).into_response(),
278 };
279 // legacy(rec=None)时给个"全 scope"快照让 filter 全放行;scope 模式用 rec.scopes
280 let scopes: HashSet<Scope> = match &rec {
281 Some(r) => r.scopes.clone(),
282 None => all_scopes(),
283 };
284 let key_id = rec.as_ref().map(|r| r.id.clone());
285 // v1.4.102 codex 47 F1 / 48 F1 (P1): WS 必须独立 enforce key.allowed_acc_ids
286 // (per-key acc 白名单), 不能依赖 sub-acc-push 是否调过. 之前未调 sub-acc-push
287 // 时 fall-back 全 push, key 被限到 acc A 仍能收 acc B 的 trade push.
288 let allowed_acc_ids = rec.as_ref().and_then(|r| r.allowed_acc_ids.clone());
289 // v1.4.105 D3 (Phase 4) T-B1: per-key allowed_markets 硬限额 (大写字符
290 // 串 set, e.g. {"HK","US"}). `None` / 空 set = 无限制. WS Layer 3
291 // (TradePushFilter) 用此 set 过滤 trade event 的 trd_market.
292 let allowed_markets = rec.as_ref().and_then(|r| r.allowed_markets.clone());
293 let broadcaster = Arc::clone(&state.ws_broadcaster);
294 // v1.4.102 codex 46 F2 (P1): pass per-key acc subscription state into
295 // WS connection so trade push delivery can filter by sub-acc-push registrations.
296 let rest_acc_subs = Arc::clone(&state.rest_acc_subscriptions);
297 // v1.4.105 D4 (Phase 1): pass shared FilterRegistry into WS connection
298 // so push event filter (TradePushFilter) goes through unified registry.
299 let filter_registry = Arc::clone(&state.filter_registry);
300 let ctx = WsConnectionContext {
301 broadcaster,
302 scopes,
303 key_id,
304 allowed_acc_ids,
305 allowed_markets,
306 rest_acc_subscriptions: rest_acc_subs,
307 filter_registry,
308 };
309 ws.on_upgrade(move |socket| handle_ws_connection(socket, ctx))
310 .into_response()
311}
312
313/// 全 scope 集合(legacy 模式用)
314fn all_scopes() -> HashSet<Scope> {
315 [
316 Scope::QotRead,
317 Scope::AccRead,
318 Scope::TradeSimulate,
319 Scope::TradeReal,
320 ]
321 .into_iter()
322 .collect()
323}
324
325/// 处理单个 WebSocket 连接
326///
327/// `scopes` 是该连接 key 持有的 scope 集合,用于按 `WsPushScope::required_scope()`
328/// 过滤推送事件。例如只有 `qot:read` 的 key 不会收到 `trade` 类推送。
329// v1.4.102 codex 47 F1 / 48 F1 (P1): per-key allowed_acc_ids 硬限额.
330// `None` = 该 key 无 acc 限制 (默认全开); `Some(set)` = 仅这些 acc 可见.
331struct WsConnectionContext {
332 broadcaster: Arc<WsBroadcaster>,
333 scopes: HashSet<Scope>,
334 key_id: Option<String>,
335 allowed_acc_ids: Option<HashSet<u64>>,
336 // v1.4.105 D3 (Phase 4) T-B1: caller key 的 allowed_markets 硬限额, 用于
337 // Layer 3 (TradePushFilter) 过滤. None / 空 set = 无限制.
338 allowed_markets: Option<HashSet<String>>,
339 rest_acc_subscriptions: Arc<RwLock<HashMap<String, HashSet<u64>>>>,
340 // v1.4.105 D4 (Phase 1): 共享 FilterRegistry 实例 — push event 过滤走
341 // 同一 registry 与 4 surface (REST body filter / WS push) 一致.
342 filter_registry: Arc<futu_auth_pipeline::FilterRegistry>,
343}
344
345async fn handle_ws_connection(socket: WebSocket, ctx: WsConnectionContext) {
346 let WsConnectionContext {
347 broadcaster,
348 scopes,
349 key_id,
350 allowed_acc_ids,
351 allowed_markets,
352 rest_acc_subscriptions,
353 filter_registry,
354 } = ctx;
355
356 let (mut ws_tx, mut ws_rx) = socket.split();
357 let mut push_rx = broadcaster.subscribe();
358
359 tracing::info!(
360 key_id = ?key_id,
361 scopes = ?scopes,
362 "WebSocket push client connected"
363 );
364
365 // v1.4.106 codex 1125 F6 [P2]: REST WS notify subscription state.
366 //
367 // 对齐 C++ raw TCP `IsConnSubRecvNotify` (APIServer_Qot_PriceReminder.cpp:730-735):
368 // broadcast notify 类 push (e.g. price reminder) 必须 client 显式 sub 才下发.
369 //
370 // **Breaking change vs v1.4.105**: v1.4.105 之前 REST `/ws` 默认收所有
371 // broadcast notify; v1.4.106 起需 client 发 `{"action":"subscribe-notify"}`
372 // text message 才能继续收. 老 client 如果依赖 price reminder push 必须升级.
373 //
374 // Default false 对齐 raw TCP 默认 unsub 状态.
375 let notify_subscribed = Arc::new(std::sync::atomic::AtomicBool::new(false));
376 let notify_subscribed_for_send = Arc::clone(¬ify_subscribed);
377 let notify_subscribed_for_recv = Arc::clone(¬ify_subscribed);
378
379 // 推送任务:从 broadcast channel 读取事件 → 按 scope 过滤 → 发送给客户端
380 let send_scopes = scopes.clone();
381 let send_key_id_str = key_id.clone().unwrap_or_else(|| "<none>".to_string());
382 let send_key_id_for_filter = key_id.clone();
383 let rest_subs_for_filter = Arc::clone(&rest_acc_subscriptions);
384 let send_task = tokio::spawn(async move {
385 while let Ok(event) = push_rx.recv().await {
386 // 按 client scope 过滤:key 没这个 scope 就不发
387 if !send_scopes.contains(&event.required_scope.required_scope()) {
388 // 记一次"被挡住的推送",供 Prometheus `/metrics` 观察
389 futu_auth::metrics::bump_ws_filtered(&event.event_type, &send_key_id_str);
390 continue;
391 }
392 // v1.4.106 codex 1125 F6 [P2]: notify subscribe gate.
393 // 对齐 C++ raw TCP `IsConnSubRecvNotify` (broadcast push 必须显式 sub).
394 if matches!(event.required_scope, WsPushScope::Notify)
395 && !notify_subscribed_for_send.load(std::sync::atomic::Ordering::Relaxed)
396 {
397 futu_auth::metrics::bump_ws_filtered("notify_unsub", &send_key_id_str);
398 continue;
399 }
400 // v1.4.102 codex 47 F1 / 48 F1 (P1): trade push 进 acc-id 过滤,
401 // 两层独立 enforce:
402 // 1. **key.allowed_acc_ids 硬限额** (Some(set)): event.acc_id 不在
403 // set → drop. 与 sub-acc-push 是否调过无关 (老 key 没 sub 也强限).
404 // 2. **REST sub-acc-push state map** (sub_state): 仅当 key 已调过
405 // sub-acc-push 才生效. entry 存在但不含 acc_id → drop. 未调过 →
406 // pass (向后兼容老 client).
407 //
408 // codex 48 F2 P1 fix: REST sub state empty entry (Some(set) 但 set
409 // 空) 也算 "已 unsub all" tombstone, 不允许 fall back 到全 push.
410 //
411 // v1.4.103 codex F5.11 (P2) round 5: 抽 logic 到
412 // `should_drop_trade_event_for_caller` pure fn 让单测可验证.
413 //
414 // v1.4.105 D4 (Phase 1): 改走 `FilterRegistry::should_drop_event`
415 // 让 4 surface (REST `/ws` 现接 + 后续 gRPC subscribe_push 等)
416 // 共用同一 registry instance. 防 sibling-route bypass —
417 // 任何人加新 push event filter 只在 registry 注册一次, 不需
418 // 改各 surface inline. `should_drop_trade_event_for_caller`
419 // pure fn 仍保留作 unit test 直接验证 logic, 但 production 走 registry.
420 if matches!(event.required_scope, WsPushScope::Trade)
421 && let Some(event_acc) = event.acc_id
422 {
423 let sub_state_owned: Option<HashSet<u64>> =
424 send_key_id_for_filter.as_ref().and_then(|kid| {
425 crate::adapter::with_rest_acc_subscriptions_read(
426 &rest_subs_for_filter,
427 |subs| subs.get(kid).cloned(),
428 )
429 });
430 let ctx = futu_auth_pipeline::PushEventCtx {
431 event_type: &event.event_type,
432 event_acc: Some(event_acc),
433 allowed_acc_ids: allowed_acc_ids.as_ref(),
434 sub_state: sub_state_owned.as_ref(),
435 // v1.4.105 D3 (Phase 4) T-B1: 真接 trd_market —
436 // PushDispatcher 端一次 decode 后透传到 WsPushEvent.trd_market
437 // (None = 老路径 / decode 失败 / market 未知, 不 trigger
438 // Layer 3 drop).
439 event_trd_market: event.trd_market.as_deref(),
440 allowed_markets: allowed_markets.as_ref(),
441 };
442 if filter_registry.should_drop_event(&ctx) {
443 // v1.4.105 F5.2 fix (codex review C4 4th): 4 surface 统一
444 // metric label "trade_market" 跟 gRPC + raw TCP WS + MCP 一致,
445 // 不再用 event.event_type (= "trade") 让跨 surface jq aggregate
446 // 一致.
447 futu_auth::metrics::bump_ws_filtered("trade_market", &send_key_id_str);
448 continue;
449 }
450 }
451 let json = match serde_json::to_string(&event) {
452 Ok(j) => j,
453 Err(_) => continue,
454 };
455 if ws_tx.send(Message::Text(json.into())).await.is_err() {
456 break; // 客户端断开
457 }
458 }
459 });
460
461 // 接收任务:处理客户端消息(ping/pong/close + v1.4.106 codex 1125 F6 subscribe-notify)
462 let recv_task = tokio::spawn(async move {
463 while let Some(msg) = ws_rx.next().await {
464 match msg {
465 Ok(Message::Close(_)) | Err(_) => break,
466 Ok(Message::Ping(data)) => {
467 // axum 自动回复 pong,不需要手动处理
468 let _ = data;
469 }
470 // v1.4.106 codex 1125 F6 [P2]: 处理 client 发的 JSON control message.
471 // 支持 `{"action":"subscribe-notify"}` / `{"action":"unsubscribe-notify"}`.
472 // 对齐 C++ raw TCP IsConnSubRecvNotify 的 sub/unsub 接口.
473 Ok(Message::Text(text)) => {
474 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text)
475 && let Some(action) = val.get("action").and_then(|v| v.as_str())
476 {
477 match action {
478 "subscribe-notify" => {
479 notify_subscribed_for_recv
480 .store(true, std::sync::atomic::Ordering::Relaxed);
481 tracing::info!("WS client subscribed notify push");
482 }
483 "unsubscribe-notify" => {
484 notify_subscribed_for_recv
485 .store(false, std::sync::atomic::Ordering::Relaxed);
486 tracing::info!("WS client unsubscribed notify push");
487 }
488 other => {
489 tracing::debug!(action = %other, "WS client unknown action");
490 }
491 }
492 }
493 }
494 _ => {} // 忽略其他消息
495 }
496 }
497 });
498
499 // 任一任务结束则关闭连接
500 tokio::select! {
501 _ = send_task => {}
502 _ = recv_task => {}
503 }
504
505 tracing::info!("WebSocket push client disconnected");
506}
507
508/// v1.4.103 codex F5.11 (P2) round 5: pure-fn 提取 trade event 过滤决策让单测
509/// 可以验证. 不直接 hit handle_ws_connection 异步通路 (需 WebSocket infra),
510/// 但 logic 与该函数 inline 顺序 1:1 对齐.
511///
512/// **行为**:
513/// - event_acc=None (非 trade event 没 acc_id): 不 drop
514/// - Layer 1: `allowed_acc_ids` 非空 + event_acc ∉ allowed → drop
515/// - Layer 2: `sub_state` 含 caller key 的 entry (即使空 tombstone) + event_acc ∉
516/// set → drop. **空 set = unsub-all tombstone, 全 drop 一切 trade**
517/// - Layer 2 missing entry (caller 从未调过 sub-acc-push) → 不 drop (向后兼容).
518///
519/// 返 `true` 表示 drop, `false` 表示 deliver.
520///
521/// v1.4.105 D4 (Phase 1): production 路径已切到 `FilterRegistry::should_drop_event`
522/// (走 `TradePushFilter` impl, logic 等价). 这个 pure fn 现仅作 unit test target
523/// 直接验证 logic, 不再 production 调用. 保留 + `#[cfg(test)]` 让 dead_code 不警告.
524#[cfg(test)]
525pub(crate) fn should_drop_trade_event_for_caller(
526 allowed_acc_ids: Option<&HashSet<u64>>,
527 sub_state: Option<&HashSet<u64>>,
528 event_acc: u64,
529) -> bool {
530 // Layer 1: hard allowed_acc_ids whitelist (caller key 限制).
531 if let Some(allowed) = allowed_acc_ids
532 && !allowed.is_empty()
533 && !allowed.contains(&event_acc)
534 {
535 return true;
536 }
537 // Layer 2: per-key REST sub state (opt-in via /api/sub-acc-push).
538 // - entry 存在 (sub-acc-push 调过): 必须 acc_id ∈ set (空 set =
539 // unsub all tombstone → drop 一切)
540 // - entry 不存在 (从未调 sub-acc-push): 不 drop (向后兼容老 client)
541 if let Some(set) = sub_state
542 && !set.contains(&event_acc)
543 {
544 return true;
545 }
546 false
547}
548
549#[cfg(test)]
550mod tests;