futu_server/push.rs
1// 推送分发:三种推送模式
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::DashMap;
7use futu_auth::Scope;
8
9use crate::conn::ClientConn;
10use crate::metrics::GatewayMetrics;
11use crate::subscription::SubscriptionManager;
12
13/// **防御深度**:即使客户端在订阅阶段某种方式绕过了 scope gate,推送时
14/// 再按 client key 的 scope 过滤一次。
15///
16/// `conn.scopes` 语义:
17/// - **空集** → legacy 模式(TCP listener / WS 未配 keys.json),全放行
18/// - 非空集 → scope 模式,必须包含 `needed` 才推
19fn should_push_to(conn: &ClientConn, needed: Scope, event_label: &str) -> bool {
20 if conn.scopes.is_empty() {
21 return true; // legacy 全放行
22 }
23 if conn.scopes.contains(&needed) {
24 return true;
25 }
26 // 过滤掉 —— 记 metrics 便于运维发现"谁订阅了但 scope 不够"这种配置问题
27 let key_id = conn.key_id.as_deref().unwrap_or("<none>");
28 futu_auth::metrics::bump_ws_filtered(event_label, key_id);
29 false
30}
31
32/// 外部推送接收器 trait
33///
34/// 允许外部模块(如 REST WebSocket)接收推送事件,
35/// 不引入模块间循环依赖。
36///
37/// **v1.4.106 codex 1131 F4 [P1]**: `on_quote_push` 加 `rehab_type` 参数. KL 类
38/// push 走非 0 rehab (forward / backward / 无), 其它 sub_type 走 0. Sink 实装
39/// 应用 (sec_key, sub_type, rehab_type) 三元过滤 push 接收方 — 不再 broadcast
40/// 给所有 quote-scope conn (老行为是 silent leak: 仅订未 RegPush 的 conn 也收
41/// 到 quote push, 违反 C++ `QotSubscribe::GetPushConn` 三元 key 路由).
42pub trait ExternalPushSink: Send + Sync {
43 /// 行情推送 (rehab_type=0 for non-KL).
44 fn on_quote_push(
45 &self,
46 sec_key: &str,
47 sub_type: i32,
48 rehab_type: i32,
49 proto_id: u32,
50 body: &[u8],
51 );
52 /// 广播推送 (到价提醒、系统通知等)
53 fn on_broadcast_push(&self, proto_id: u32, body: &[u8]);
54 /// 交易推送 (订单更新、成交更新等).
55 ///
56 /// `trd_market` 是 PushDispatcher 一次性 decode `body` 提取的
57 /// `s2c.header.trd_market` 大写字符串 ("HK" / "US" / "CN" / ...), 为
58 /// 4 surface (gRPC / REST WS / MCP) 复用避免各自 decode. 老 sink 实现可
59 /// 忽略此参数 (只看 acc_id + proto_id + body), Layer 3 (allowed_markets)
60 /// filter 直接从这里取 — 见 [`extract_trd_market_from_trade_body`].
61 ///
62 /// `None` = decode 失败 / proto_id 不识别 / market enum unknown — 老
63 /// 路径下游应**不 trigger Layer 3 drop** (向后兼容 — pitfall #57
64 /// backend-semantic 未真机验证前 default OFF behavior).
65 fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>);
66}
67
68/// v1.4.105 D3 (Phase 4) T-B: trade push body decode → trd_market 提取.
69///
70/// 4 surface (gRPC / REST WS / raw TCP WS / MCP) 共用同一 helper 而非各自
71/// decode 一次, 避免 mapping 漂移 (与 futu-auth-pipeline::body_aware /
72/// futu-rest::trd::trd_market_str 一致, 但本 crate 不能跨 dep 复用所以重复
73/// 一份 — 跨 crate mismatch 会被 cross_surface_smoke 抓出).
74///
75/// caller (PushDispatcher) 在分发到 sink 前**只 decode 一次**, 把字符串塞
76/// PushEventCtx.event_trd_market 让 TradePushFilter Layer 3 用
77/// allowed_markets 校验.
78///
79/// 不识别 / decode 失败 / market enum unknown → None (Layer 3 不 trigger).
80///
81/// v1.4.105 ship 时 UNVERIFIED — 真机 verify 跨 market 推送流 (HK + US 双
82/// 账户) 后 v1.4.106 升级 confidence (per pitfall #57 backend-semantic risk).
83#[must_use]
84pub fn extract_trd_market_from_trade_body(proto_id: u32, body: &[u8]) -> Option<&'static str> {
85 use prost::Message;
86 let market_int = match proto_id {
87 // TRD_UPDATE_ORDER (2208) → Trd_UpdateOrder.Response.s2c.header.trd_market
88 2208 => {
89 let resp = futu_proto::trd_update_order::Response::decode(body).ok()?;
90 resp.s2c?.header.trd_market
91 }
92 // TRD_UPDATE_ORDER_FILL (2218) → Trd_UpdateOrderFill.Response.s2c.header.trd_market
93 2218 => {
94 let resp = futu_proto::trd_update_order_fill::Response::decode(body).ok()?;
95 resp.s2c?.header.trd_market
96 }
97 // 未知 trade push proto_id → 不识别, 让 Layer 3 不 trigger
98 _ => return None,
99 };
100 // Trd_Common.TrdMarket enum int → 大写字符串. 与 futu-rest::trd::trd_market_str
101 // / futu-auth-pipeline::body_aware::trd_market_str 一致.
102 match market_int {
103 1 => Some("HK"),
104 2 => Some("US"),
105 3 => Some("CN"),
106 4 => Some("HKCC"),
107 5 => Some("FUTURES"),
108 6 => Some("SG"),
109 8 => Some("AU"),
110 15 => Some("JP"),
111 111 => Some("MY"),
112 112 => Some("CA"),
113 _ => None,
114 }
115}
116
117/// 推送分发器
118pub struct PushDispatcher {
119 connections: Arc<DashMap<u64, ClientConn>>,
120 subscriptions: Arc<SubscriptionManager>,
121 metrics: Option<Arc<GatewayMetrics>>,
122 /// 外部推送接收器列表 (REST WebSocket, gRPC 等)
123 external_sinks: Vec<Arc<dyn ExternalPushSink>>,
124}
125
126impl PushDispatcher {
127 /// 创建推送分发器。`connections` 和 `subscriptions` 由
128 /// [`super::listener::ApiServer`] 共享;外部 sink / metrics 可通过
129 /// [`Self::with_metrics`] / [`Self::with_external_sink`] 后续注入。
130 pub fn new(
131 connections: Arc<DashMap<u64, ClientConn>>,
132 subscriptions: Arc<SubscriptionManager>,
133 ) -> Self {
134 Self {
135 connections,
136 subscriptions,
137 metrics: None,
138 external_sinks: Vec::new(),
139 }
140 }
141
142 /// 设置监控指标引用
143 pub fn with_metrics(mut self, metrics: Arc<GatewayMetrics>) -> Self {
144 self.metrics = Some(metrics);
145 self
146 }
147
148 /// 添加外部推送接收器(可多次调用注册多个)
149 pub fn with_external_sink(mut self, sink: Arc<dyn ExternalPushSink>) -> Self {
150 self.external_sinks.push(sink);
151 self
152 }
153
154 fn record_push(&self) {
155 if let Some(ref m) = self.metrics {
156 m.client_pushes_sent
157 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
158 }
159 }
160
161 /// 向指定连接推送(自动处理 AES 加密)
162 pub async fn push_to_conn(&self, conn_id: u64, proto_id: u32, body: Vec<u8>) {
163 if let Some(conn) = self.connections.get(&conn_id) {
164 let frame = conn.make_frame(proto_id, 0, Bytes::from(body));
165 let _ = conn.tx.send(frame).await;
166 self.record_push();
167 }
168 }
169
170 /// 向指定连接发送 quote 首推快照(带 qot:read 防御性过滤)。
171 pub async fn push_qot_to_conn(&self, conn_id: u64, proto_id: u32, body: Vec<u8>) {
172 if let Some(conn) = self.connections.get(&conn_id) {
173 if !should_push_to(&conn, Scope::QotRead, "quote_first") {
174 return;
175 }
176 let frame = conn.make_frame(proto_id, 0, Bytes::from(body));
177 let _ = conn.tx.send(frame).await;
178 self.record_push();
179 }
180 }
181
182 /// 向所有订阅了通知的连接广播(每个连接独立 AES 加密)
183 pub async fn push_notify(&self, proto_id: u32, body: Vec<u8>) {
184 let body = Bytes::from(body);
185 for entry in self.connections.iter() {
186 let conn = entry.value();
187 if !conn.recv_notify {
188 continue;
189 }
190 // 防御深度:订阅阶段应该已经挡了 qot:read 外的 key,这里再过滤一次
191 if !should_push_to(conn, Scope::QotRead, "notify") {
192 continue;
193 }
194 let frame = conn.make_frame(proto_id, 0, body.clone());
195 let _ = conn.tx.send(frame).await;
196 self.record_push();
197 }
198 }
199
200 /// 向订阅了指定交易账户的所有连接推送
201 pub async fn push_trd_acc(&self, acc_id: u64, proto_id: u32, body: Vec<u8>) {
202 // v1.4.105 D3 (Phase 4) T-B4: 一次 decode 提取 trd_market 给 sinks
203 // 共用. 4 surface (gRPC / REST WS / 等) 复用同一字符串避免重复 decode.
204 let trd_market = extract_trd_market_from_trade_body(proto_id, &body);
205 // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
206 for sink in &self.external_sinks {
207 sink.on_trade_push(acc_id, proto_id, &body, trd_market);
208 }
209 let body = Bytes::from(body);
210 let subscribers = self.subscriptions.get_acc_subscribers(acc_id);
211 for conn_id in subscribers {
212 if let Some(conn) = self.connections.get(&conn_id) {
213 // 防御深度:trade push 要求 acc:read
214 if !should_push_to(&conn, Scope::AccRead, "trade") {
215 continue;
216 }
217 // codex round 1 F4 (P2) v1.4.105: Layer 1 — caller key
218 // allowed_acc_ids push-time 硬过滤. 防 stale subscription /
219 // KeyRecord reload 后 acc 范围窄化 / 历史 bug 留下的 conn→acc
220 // 关系 让受限 key 仍收到非授权 acc 的 trade push.
221 //
222 // 设计同 futu-auth::Limits / KeyRecord:
223 // - allowed_acc_ids None = 无限制 (legacy / unrestricted key) → 放行
224 // - 非空 set + acc_id ∉ set → drop + metric
225 // - 空 set = 无限制 (向后兼容); deny-all 用 sentinel {0}
226 if let Some(allowed_accs) = conn.allowed_acc_ids.as_ref()
227 && !allowed_accs.is_empty()
228 && !allowed_accs.contains(&acc_id)
229 {
230 let key_id = conn.key_id.as_deref().unwrap_or("<none>");
231 futu_auth::metrics::bump_ws_filtered("trade_acc_id", key_id);
232 continue;
233 }
234 // v1.4.105 D3 (Phase 4) T-B2: Layer 3 — caller key allowed_markets
235 // 限制. trd_market None (decode 失败 / market 未知) → 不 trigger
236 // drop (向后兼容 — pitfall #57 backend-semantic 未真机 verify
237 // 前 default 不 drop, 防 false-negative 错过用户合法 push).
238 // allowed_markets None / 空 set = 无限制.
239 if let (Some(market), Some(allowed_mkts)) =
240 (trd_market, conn.allowed_markets.as_ref())
241 && !allowed_mkts.is_empty()
242 && !allowed_mkts.contains(market)
243 {
244 let key_id = conn.key_id.as_deref().unwrap_or("<none>");
245 futu_auth::metrics::bump_ws_filtered("trade_market", key_id);
246 continue;
247 }
248 let frame = conn.make_frame(proto_id, 0, body.clone());
249 let _ = conn.tx.send(frame).await;
250 self.record_push();
251 }
252 }
253 }
254
255 /// 向所有已连接的客户端广播(到价提醒等,不需要订阅通知)
256 /// C++ 检查 IsConnSubRecvNotify,对齐使用 InitConnect.recvNotify。
257 pub async fn push_broadcast(&self, proto_id: u32, body: Vec<u8>) {
258 // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
259 for sink in &self.external_sinks {
260 sink.on_broadcast_push(proto_id, &body);
261 }
262 let body = Bytes::from(body);
263 for entry in self.connections.iter() {
264 let conn = entry.value();
265 if !conn.recv_notify {
266 continue;
267 }
268 if !should_push_to(conn, Scope::QotRead, "broadcast") {
269 continue;
270 }
271 let frame = conn.make_frame(proto_id, 0, body.clone());
272 let _ = conn.tx.send(frame).await;
273 self.record_push();
274 }
275 }
276
277 /// 向**注册了 push** 的连接推送 quote (F4 P1 BLOCKER fix).
278 ///
279 /// **v1.4.106 codex 1131 F4 [P1]**: 改用 push_regs 三元 key 而非老的
280 /// subscriber map. 老路径让仅订未 RegPush 的 conn 收到 quote push, 违反
281 /// C++ `QotSubscribe::GetPushConn` 三元 key 路由 — F3 split state + F4
282 /// push 端真过滤组合修复.
283 ///
284 /// **v1.4.110 broker-aware closeout**: `security_key` 是 push parser 传来的
285 /// cache-key display string (`"market_code"` or `"market_code@b{id}"`).
286 /// Dispatch 通过 cache-key bridge 查询 broker-aware push_regs, 不再直接调用
287 /// legacy String facade.
288 ///
289 /// rehab_type 仅 KL 类有意义 (sub_type 6/7/8/9/10/11/12/13/15/16/17), 其它
290 /// sub_type 应填 0. SubscriptionManager.get_qot_push_subscribers 内部对
291 /// 非 KL 自动 normalize rehab=0.
292 pub async fn push_qot(
293 &self,
294 security_key: &str,
295 sub_type: i32,
296 rehab_type: i32,
297 proto_id: u32,
298 body: Vec<u8>,
299 ) {
300 // 同时推送给外部接收器 (REST WebSocket, gRPC 等)
301 // sink 内部按 (sec_key, sub_type, rehab_type) 自己过滤 conn 集合.
302 for sink in &self.external_sinks {
303 sink.on_quote_push(security_key, sub_type, rehab_type, proto_id, &body);
304 }
305 let body = Bytes::from(body);
306 // **F4 P1**: push_regs 三元 key 查 conn allowlist (而非 subscriber set).
307 let subscribers = self.subscriptions.get_qot_push_subscribers_by_cache_key(
308 security_key,
309 sub_type,
310 rehab_type,
311 );
312 for conn_id in subscribers {
313 if let Some(conn) = self.connections.get(&conn_id) {
314 if !should_push_to(&conn, Scope::QotRead, "quote") {
315 continue;
316 }
317 let frame = conn.make_frame(proto_id, 0, body.clone());
318 let _ = conn.tx.send(frame).await;
319 self.record_push();
320 }
321 }
322 }
323}
324
325#[cfg(test)]
326mod tests;