Skip to main content

futu_server/
push.rs

1// 推送分发:三种推送模式
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::DashMap;
7use futu_auth::Scope;
8
9use crate::conn::ClientConn;
10use crate::metrics::GatewayMetrics;
11use crate::subscription::SubscriptionManager;
12
13/// **防御深度**:即使客户端在订阅阶段某种方式绕过了 scope gate,推送时
14/// 再按 client key 的 scope 过滤一次。
15///
16/// `conn.scopes` 语义:
17/// - **空集** → legacy 模式(TCP listener / WS 未配 keys.json),全放行
18/// - 非空集 → scope 模式,必须包含 `needed` 才推
19fn should_push_to(conn: &ClientConn, needed: Scope, event_label: &str) -> bool {
20    if conn.scopes.is_empty() {
21        return true; // legacy 全放行
22    }
23    if conn.scopes.contains(&needed) {
24        return true;
25    }
26    // 过滤掉 —— 记 metrics 便于运维发现"谁订阅了但 scope 不够"这种配置问题
27    let key_id = conn.key_id.as_deref().unwrap_or("<none>");
28    futu_auth::metrics::bump_ws_filtered(event_label, key_id);
29    false
30}
31
32/// 外部推送接收器 trait
33///
34/// 允许外部模块(如 REST WebSocket)接收推送事件,
35/// 不引入模块间循环依赖。
36///
37/// **v1.4.106 codex 1131 F4 [P1]**: `on_quote_push` 加 `rehab_type` 参数. KL 类
38/// push 走非 0 rehab (forward / backward / 无), 其它 sub_type 走 0. Sink 实装
39/// 应用 (sec_key, sub_type, rehab_type) 三元过滤 push 接收方 — 不再 broadcast
40/// 给所有 quote-scope conn (老行为是 silent leak: 仅订未 RegPush 的 conn 也收
41/// 到 quote push, 违反 C++ `QotSubscribe::GetPushConn` 三元 key 路由).
42pub trait ExternalPushSink: Send + Sync {
43    /// 行情推送 (rehab_type=0 for non-KL).
44    fn on_quote_push(
45        &self,
46        sec_key: &str,
47        sub_type: i32,
48        rehab_type: i32,
49        proto_id: u32,
50        body: &[u8],
51    );
52    /// 广播推送 (到价提醒、系统通知等)
53    fn on_broadcast_push(&self, proto_id: u32, body: &[u8]);
54    /// 交易推送 (订单更新、成交更新等).
55    ///
56    /// `trd_market` 是 PushDispatcher 一次性 decode `body` 提取的
57    /// `s2c.header.trd_market` 大写字符串 ("HK" / "US" / "CN" / ...), 为
58    /// 4 surface (gRPC / REST WS / MCP) 复用避免各自 decode. 老 sink 实现可
59    /// 忽略此参数 (只看 acc_id + proto_id + body), Layer 3 (allowed_markets)
60    /// filter 直接从这里取 — 见 [`extract_trd_market_from_trade_body`].
61    ///
62    /// `None` = decode 失败 / proto_id 不识别 / market enum unknown — 老
63    /// 路径下游应**不 trigger Layer 3 drop** (向后兼容 — pitfall #57
64    /// backend-semantic 未真机验证前 default OFF behavior).
65    fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>);
66}
67
68/// v1.4.105 D3 (Phase 4) T-B: trade push body decode → trd_market 提取.
69///
70/// 4 surface (gRPC / REST WS / raw TCP WS / MCP) 共用同一 helper 而非各自
71/// decode 一次, 避免 mapping 漂移 (与 futu-auth-pipeline::body_aware /
72/// futu-rest::trd::trd_market_str 一致, 但本 crate 不能跨 dep 复用所以重复
73/// 一份 — 跨 crate mismatch 会被 cross_surface_smoke 抓出).
74///
75/// caller (PushDispatcher) 在分发到 sink 前**只 decode 一次**, 把字符串塞
76/// PushEventCtx.event_trd_market 让 TradePushFilter Layer 3 用
77/// allowed_markets 校验.
78///
79/// 不识别 / decode 失败 / market enum unknown → None (Layer 3 不 trigger).
80///
81/// v1.4.105 ship 时 UNVERIFIED — 真机 verify 跨 market 推送流 (HK + US 双
82/// 账户) 后 v1.4.106 升级 confidence (per pitfall #57 backend-semantic risk).
83#[must_use]
84pub fn extract_trd_market_from_trade_body(proto_id: u32, body: &[u8]) -> Option<&'static str> {
85    use prost::Message;
86    let market_int = match proto_id {
87        // TRD_UPDATE_ORDER (2208) → Trd_UpdateOrder.Response.s2c.header.trd_market
88        2208 => {
89            let resp = futu_proto::trd_update_order::Response::decode(body).ok()?;
90            resp.s2c?.header.trd_market
91        }
92        // TRD_UPDATE_ORDER_FILL (2218) → Trd_UpdateOrderFill.Response.s2c.header.trd_market
93        2218 => {
94            let resp = futu_proto::trd_update_order_fill::Response::decode(body).ok()?;
95            resp.s2c?.header.trd_market
96        }
97        // 未知 trade push proto_id → 不识别, 让 Layer 3 不 trigger
98        _ => return None,
99    };
100    // Trd_Common.TrdMarket enum int → 大写字符串. 与 futu-rest::trd::trd_market_str
101    // / futu-auth-pipeline::body_aware::trd_market_str 一致.
102    match market_int {
103        1 => Some("HK"),
104        2 => Some("US"),
105        3 => Some("CN"),
106        4 => Some("HKCC"),
107        5 => Some("FUTURES"),
108        6 => Some("SG"),
109        8 => Some("AU"),
110        15 => Some("JP"),
111        111 => Some("MY"),
112        112 => Some("CA"),
113        _ => None,
114    }
115}
116
117/// 推送分发器
118pub struct PushDispatcher {
119    connections: Arc<DashMap<u64, ClientConn>>,
120    subscriptions: Arc<SubscriptionManager>,
121    metrics: Option<Arc<GatewayMetrics>>,
122    /// 外部推送接收器列表 (REST WebSocket, gRPC 等)
123    external_sinks: Vec<Arc<dyn ExternalPushSink>>,
124}
125
126impl PushDispatcher {
127    /// 创建推送分发器。`connections` 和 `subscriptions` 由
128    /// [`super::listener::ApiServer`] 共享;外部 sink / metrics 可通过
129    /// [`Self::with_metrics`] / [`Self::with_external_sink`] 后续注入。
130    pub fn new(
131        connections: Arc<DashMap<u64, ClientConn>>,
132        subscriptions: Arc<SubscriptionManager>,
133    ) -> Self {
134        Self {
135            connections,
136            subscriptions,
137            metrics: None,
138            external_sinks: Vec::new(),
139        }
140    }
141
142    /// 设置监控指标引用
143    pub fn with_metrics(mut self, metrics: Arc<GatewayMetrics>) -> Self {
144        self.metrics = Some(metrics);
145        self
146    }
147
148    /// 添加外部推送接收器(可多次调用注册多个)
149    pub fn with_external_sink(mut self, sink: Arc<dyn ExternalPushSink>) -> Self {
150        self.external_sinks.push(sink);
151        self
152    }
153
154    fn record_push(&self) {
155        if let Some(ref m) = self.metrics {
156            m.client_pushes_sent
157                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
158        }
159    }
160
161    /// 向指定连接推送(自动处理 AES 加密)
162    pub async fn push_to_conn(&self, conn_id: u64, proto_id: u32, body: Vec<u8>) {
163        if let Some(conn) = self.connections.get(&conn_id) {
164            let frame = conn.make_frame(proto_id, 0, Bytes::from(body));
165            let _ = conn.tx.send(frame).await;
166            self.record_push();
167        }
168    }
169
170    /// 向指定连接发送 quote 首推快照(带 qot:read 防御性过滤)。
171    pub async fn push_qot_to_conn(&self, conn_id: u64, proto_id: u32, body: Vec<u8>) {
172        if let Some(conn) = self.connections.get(&conn_id) {
173            if !should_push_to(&conn, Scope::QotRead, "quote_first") {
174                return;
175            }
176            let frame = conn.make_frame(proto_id, 0, Bytes::from(body));
177            let _ = conn.tx.send(frame).await;
178            self.record_push();
179        }
180    }
181
182    /// 向所有订阅了通知的连接广播(每个连接独立 AES 加密)
183    pub async fn push_notify(&self, proto_id: u32, body: Vec<u8>) {
184        let body = Bytes::from(body);
185        for entry in self.connections.iter() {
186            let conn = entry.value();
187            if !conn.recv_notify {
188                continue;
189            }
190            // 防御深度:订阅阶段应该已经挡了 qot:read 外的 key,这里再过滤一次
191            if !should_push_to(conn, Scope::QotRead, "notify") {
192                continue;
193            }
194            let frame = conn.make_frame(proto_id, 0, body.clone());
195            let _ = conn.tx.send(frame).await;
196            self.record_push();
197        }
198    }
199
200    /// 向订阅了指定交易账户的所有连接推送
201    pub async fn push_trd_acc(&self, acc_id: u64, proto_id: u32, body: Vec<u8>) {
202        // v1.4.105 D3 (Phase 4) T-B4: 一次 decode 提取 trd_market 给 sinks
203        // 共用. 4 surface (gRPC / REST WS / 等) 复用同一字符串避免重复 decode.
204        let trd_market = extract_trd_market_from_trade_body(proto_id, &body);
205        // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
206        for sink in &self.external_sinks {
207            sink.on_trade_push(acc_id, proto_id, &body, trd_market);
208        }
209        let body = Bytes::from(body);
210        let subscribers = self.subscriptions.get_acc_subscribers(acc_id);
211        for conn_id in subscribers {
212            if let Some(conn) = self.connections.get(&conn_id) {
213                // 防御深度:trade push 要求 acc:read
214                if !should_push_to(&conn, Scope::AccRead, "trade") {
215                    continue;
216                }
217                // codex round 1 F4 (P2) v1.4.105: Layer 1 — caller key
218                // allowed_acc_ids push-time 硬过滤. 防 stale subscription /
219                // KeyRecord reload 后 acc 范围窄化 / 历史 bug 留下的 conn→acc
220                // 关系 让受限 key 仍收到非授权 acc 的 trade push.
221                //
222                // 设计同 futu-auth::Limits / KeyRecord:
223                // - allowed_acc_ids None = 无限制 (legacy / unrestricted key) → 放行
224                // - 非空 set + acc_id ∉ set → drop + metric
225                // - 空 set = 无限制 (向后兼容); deny-all 用 sentinel {0}
226                if let Some(allowed_accs) = conn.allowed_acc_ids.as_ref()
227                    && !allowed_accs.is_empty()
228                    && !allowed_accs.contains(&acc_id)
229                {
230                    let key_id = conn.key_id.as_deref().unwrap_or("<none>");
231                    futu_auth::metrics::bump_ws_filtered("trade_acc_id", key_id);
232                    continue;
233                }
234                // v1.4.105 D3 (Phase 4) T-B2: Layer 3 — caller key allowed_markets
235                // 限制. trd_market None (decode 失败 / market 未知) → 不 trigger
236                // drop (向后兼容 — pitfall #57 backend-semantic 未真机 verify
237                // 前 default 不 drop, 防 false-negative 错过用户合法 push).
238                // allowed_markets None / 空 set = 无限制.
239                if let (Some(market), Some(allowed_mkts)) =
240                    (trd_market, conn.allowed_markets.as_ref())
241                    && !allowed_mkts.is_empty()
242                    && !allowed_mkts.contains(market)
243                {
244                    let key_id = conn.key_id.as_deref().unwrap_or("<none>");
245                    futu_auth::metrics::bump_ws_filtered("trade_market", key_id);
246                    continue;
247                }
248                let frame = conn.make_frame(proto_id, 0, body.clone());
249                let _ = conn.tx.send(frame).await;
250                self.record_push();
251            }
252        }
253    }
254
255    /// 向所有已连接的客户端广播(到价提醒等,不需要订阅通知)
256    /// C++ 检查 IsConnSubRecvNotify,对齐使用 InitConnect.recvNotify。
257    pub async fn push_broadcast(&self, proto_id: u32, body: Vec<u8>) {
258        // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
259        for sink in &self.external_sinks {
260            sink.on_broadcast_push(proto_id, &body);
261        }
262        let body = Bytes::from(body);
263        for entry in self.connections.iter() {
264            let conn = entry.value();
265            if !conn.recv_notify {
266                continue;
267            }
268            if !should_push_to(conn, Scope::QotRead, "broadcast") {
269                continue;
270            }
271            let frame = conn.make_frame(proto_id, 0, body.clone());
272            let _ = conn.tx.send(frame).await;
273            self.record_push();
274        }
275    }
276
277    /// 向**注册了 push** 的连接推送 quote (F4 P1 BLOCKER fix).
278    ///
279    /// **v1.4.106 codex 1131 F4 [P1]**: 改用 push_regs 三元 key 而非老的
280    /// subscriber map. 老路径让仅订未 RegPush 的 conn 收到 quote push, 违反
281    /// C++ `QotSubscribe::GetPushConn` 三元 key 路由 — F3 split state + F4
282    /// push 端真过滤组合修复.
283    ///
284    /// **v1.4.110 broker-aware closeout**: `security_key` 是 push parser 传来的
285    /// cache-key display string (`"market_code"` or `"market_code@b{id}"`).
286    /// Dispatch 通过 cache-key bridge 查询 broker-aware push_regs, 不再直接调用
287    /// legacy String facade.
288    ///
289    /// rehab_type 仅 KL 类有意义 (sub_type 6/7/8/9/10/11/12/13/15/16/17), 其它
290    /// sub_type 应填 0. SubscriptionManager.get_qot_push_subscribers 内部对
291    /// 非 KL 自动 normalize rehab=0.
292    pub async fn push_qot(
293        &self,
294        security_key: &str,
295        sub_type: i32,
296        rehab_type: i32,
297        proto_id: u32,
298        body: Vec<u8>,
299    ) {
300        // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
301        // sink 内部按 (sec_key, sub_type, rehab_type) 自己过滤 conn 集合.
302        for sink in &self.external_sinks {
303            sink.on_quote_push(security_key, sub_type, rehab_type, proto_id, &body);
304        }
305        let body = Bytes::from(body);
306        // **F4 P1**: push_regs 三元 key 查 conn allowlist (而非 subscriber set).
307        let subscribers = self.subscriptions.get_qot_push_subscribers_by_cache_key(
308            security_key,
309            sub_type,
310            rehab_type,
311        );
312        for conn_id in subscribers {
313            if let Some(conn) = self.connections.get(&conn_id) {
314                if !should_push_to(&conn, Scope::QotRead, "quote") {
315                    continue;
316                }
317                let frame = conn.make_frame(proto_id, 0, body.clone());
318                let _ = conn.tx.send(frame).await;
319                self.record_push();
320            }
321        }
322    }
323}
324
325#[cfg(test)]
326mod tests;