Skip to main content

futu_server/
subscription.rs

1// 订阅管理:行情订阅 + 交易账户推送订阅 + 通知订阅
2//
3// v1.4.106 codex 1131 F3 [P1] (BLOCKER fix): C++ 把"backend 订阅状态"
4// (`m_setSub`) 与"push 注册状态" (`m_mapRegPush`) 分开维护. Rust 之前
5// 把两者塞同一 `qot_subs` map → `Qot_RegQotPush(register=true)` 制造
6// 假订阅, `Qot_RegQotPush(register=false)` 误删真订阅, GetSubInfo 把
7// push 注册当订阅项报告. 本版彻底拆开:
8//   - `qot_subs: HashMap<(SecurityKey, SubType), HashSet<ConnId>>`
9//     = desired sub state, 决定 backend CMD 6211 desired set + GetSubInfo
10//   - `qot_push_regs: HashMap<(SecurityKey, SubType, RehabType), HashSet<ConnId>>`
11//     = push 注册 state, 决定 PushDispatcher 路由对象, 不影响订阅
12// C++ 对照:
13//   - QotSubscribe.cpp:107-108 m_setSub[(stockID, subType)].insert(connID)
14//   - QotSubscribe.cpp:489-490 m_mapRegPush[(stockID, subType, rehabType)].insert(connID)
15//   - QotSubscribe.cpp:639-649 GetPushConn(stockID, subType, rehabType)
16//
17// v1.4.110 codex Phase 3 closeout (broker-aware key model): C++ QOT 模块
18// (subscription / cache / push / quota) 以 `StockKey` (=stockID + 可选
19// brokerID) 为 first-class identity. Rust 旧 String facade 已从生产
20// `SubscriptionManager` 删除;内部状态直接以 `QotSecurityKey` keyed.
21//
22// C++ 对照 (QotSubscribe.h):
23//   - Map_t<SubType, Set_t<StockKey>> m_mapSub  (per-conn)
24//   - Set_t<(StockKey, SubType)>      m_setSub  (global)
25//   - Map_t<(StockKey, SubType, RehabType), Set<ConnID>> m_mapRegPush
26//   - Map_t<(ConnID, StockKey), bool> m_mapConnOrderBookDetail / BrokerDetail
27
28use std::collections::{HashMap, HashSet};
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::{Duration, Instant};
31
32use futu_core::qot_stock_key::QotSecurityKey;
33use parking_lot::RwLock;
34
35/// 订阅管理器
36pub struct SubscriptionManager {
37    /// 通知订阅:哪些连接订阅了系统通知
38    notify_subs: RwLock<HashSet<u64>>,
39
40    /// 交易账户推送订阅:acc_id → Set<conn_id>
41    trd_acc_subs: RwLock<HashMap<u64, HashSet<u64>>>,
42
43    /// **行情订阅** (desired sub state, 对齐 C++ `m_setSub`):
44    ///   key = (QotSecurityKey, sub_type), val = 订阅 conn 集合.
45    qot_subs: RwLock<HashMap<(QotSecurityKey, i32), HashSet<u64>>>,
46
47    /// **行情 push 注册** (对齐 C++ `m_mapRegPush`):
48    ///   key = (QotSecurityKey, sub_type, rehab_type), val = 注册接收 push 的 conn 集合.
49    /// rehab_type 仅 KL 类有效 (None=0 / Forward=1 / Backword=2 / N/A=0 for non-KL).
50    qot_push_regs: RwLock<HashMap<(QotSecurityKey, i32, i32), HashSet<u64>>>,
51
52    /// **每 (security, sub_type) 的 desired session** (对齐 C++
53    /// `m_mapConnTickerSession` / `m_mapConnKLRTSession` global view):
54    /// max(per-conn session) 决定 backend desired session.
55    /// session: 0=Unknown / 1=RTH / 2=ETH / 3=ALL / 5=OVERNIGHT (rejected).
56    qot_sub_sessions: RwLock<HashMap<(QotSecurityKey, i32), HashMap<u64, i32>>>,
57
58    /// **每 (security) 的 OrderBook detail flag** (对齐 C++
59    /// `m_mapConnOrderBookDetail`): 一旦有 conn 要 detail, 全局走 detail.
60    qot_orderbook_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
61
62    /// **每 (security) 的 Broker detail flag** (对齐 C++
63    /// `m_mapConnBrokerDetail`).
64    qot_broker_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
65
66    /// **总 quota 上限** (对齐 C++ `INNData_APIInterLimit::GetSubQuota()`):
67    /// 启动 hardcode 4000 fallback, backend SubscribeSetRsp.max_sub_count 下发
68    /// 后 setter 更新. 不再用静态 const.
69    total_quota: RwLock<u32>,
70
71    /// 全局订阅时间 (key = (QotSecurityKey, sub_type)).
72    /// 对齐 C++ `m_mapSubTime`: 只有全局第一次订阅该 SubKey 或 backend
73    /// 属性升级重新拉取时才刷新;退订前至少等待 `QOT_MIN_UNSUB_ELAPSED_SECS`.
74    qot_sub_times: RwLock<HashMap<(QotSecurityKey, i32), Instant>>,
75
76    /// 已断开的 conn_id,但其 QOT 订阅还没达到 C++ 最短退订窗口。
77    ///
78    /// 对齐 C++ `QotSubscribe::ClearConnSubInfo`: 断线时 push 注册立即清,
79    /// 但 `m_setSub` 只有在 `IsSubTimeEnoughToUnSub` 后才移除;没到窗口的
80    /// 连接由后续定时清理再次尝试。
81    qot_disconnected_conns: RwLock<HashSet<u64>>,
82
83    /// 断线延迟清理导致 global desired set 变化的 generation。
84    ///
85    /// server 层没有 backend 句柄,不能在 `on_disconnect` 里直接发 CMD6211。
86    /// 这里仅记录“需要 gateway 同步”的单调计数,gateway 后台任务看到变化后
87    /// 发当前 desired set。
88    qot_disconnect_sync_generation: AtomicU64,
89}
90
91/// 总订阅额度上限 fallback (启动时未从 backend 拉到真值前用此).
92///
93/// 对齐 C++ `QotSubscribe.cpp:1132` `GetUserSubQuota()` 默认 4000.
94/// 真实 quota 由 backend `SubscribeSetRsp.max_sub_count` 在 CMD 6211 响应
95/// 里下发, daemon 收到后调 `set_total_quota_from_backend(value)` 更新
96/// `total_quota`.
97pub const TOTAL_QUOTA: u32 = 4000;
98
99/// C++ `QotSubscribe.cpp:9` uses 60 seconds and
100/// `IsSubTimeEnoughToUnSub` checks `>= SubAtLeastTime - 1`.
101pub const QOT_MIN_UNSUB_ELAPSED_SECS: u64 = 59;
102
103/// **subscribe_qot 返回的 commit 结果** (用于 quota 维度精确计算 — 对齐 C++
104/// `m_setSub` 全局唯一计 quota).
105///
106/// C++ 对照: QotSubscribe.cpp:84-111. `bNoSub = m_setSub.count(pairSubKey) == 0`,
107/// 仅当全局 set 不存在该 key 时才 `UseQuota()`.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum SubResult {
110    /// 全局首次订阅该 (security, sub_type) — quota 应 +1.
111    NewGlobal,
112    /// 全局已有订阅, 本 conn 是新加入 — quota 不变.
113    AlreadyGlobal,
114    /// (conn_id, security, sub_type) 已在 set 中, 重复订阅 — quota 不变.
115    NoChange,
116}
117
118/// **unsubscribe_qot 返回的 commit 结果**.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum UnsubResult {
121    /// 最后一个 conn 退订 → 全局 set 删除该 key, **caller 必须发 backend
122    /// fresh CMD 6211 with new desired set** (drop 该 (stock_id, sub_type)).
123    LastSubscriber,
124    /// 还有其他 conn 订阅该 (security, sub_type), backend 不需退订.
125    StillSubscribed,
126    /// (conn_id, security, sub_type) 之前未订阅, silent no-op (caller 决定
127    /// 是否 loud reject).
128    NotSubscribed,
129}
130
131impl SubscriptionManager {
132    pub fn new() -> Self {
133        Self {
134            notify_subs: RwLock::new(HashSet::new()),
135            trd_acc_subs: RwLock::new(HashMap::new()),
136            qot_subs: RwLock::new(HashMap::new()),
137            qot_push_regs: RwLock::new(HashMap::new()),
138            qot_sub_sessions: RwLock::new(HashMap::new()),
139            qot_orderbook_detail: RwLock::new(HashMap::new()),
140            qot_broker_detail: RwLock::new(HashMap::new()),
141            total_quota: RwLock::new(TOTAL_QUOTA),
142            qot_sub_times: RwLock::new(HashMap::new()),
143            qot_disconnected_conns: RwLock::new(HashSet::new()),
144            qot_disconnect_sync_generation: AtomicU64::new(0),
145        }
146    }
147
148    // ===== 通知订阅 =====
149
150    pub fn subscribe_notify(&self, conn_id: u64) {
151        self.notify_subs.write().insert(conn_id);
152    }
153
154    pub fn unsubscribe_notify(&self, conn_id: u64) {
155        self.notify_subs.write().remove(&conn_id);
156    }
157
158    pub fn is_subscribed_notify(&self, conn_id: u64) -> bool {
159        self.notify_subs.read().contains(&conn_id)
160    }
161
162    // ===== 交易账户推送 =====
163
164    pub fn subscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
165        self.trd_acc_subs
166            .write()
167            .entry(acc_id)
168            .or_default()
169            .insert(conn_id);
170    }
171
172    pub fn unsubscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
173        if let Some(subs) = self.trd_acc_subs.write().get_mut(&acc_id) {
174            subs.remove(&conn_id);
175        }
176    }
177
178    pub fn get_acc_subscribers(&self, acc_id: u64) -> Vec<u64> {
179        self.trd_acc_subs
180            .read()
181            .get(&acc_id)
182            .map(|s| s.iter().copied().collect())
183            .unwrap_or_default()
184    }
185
186    // ===== 行情订阅 (subscribers, F3 split-state) =====
187
188    /// 生成行情订阅 key.
189    pub fn make_qot_key(market: i32, code: &str, sub_type: i32) -> String {
190        format!("{market}_{code}:{sub_type}")
191    }
192
193    #[inline]
194    fn broker_key(sec_key: &QotSecurityKey) -> QotSecurityKey {
195        sec_key.clone()
196    }
197
198    /// **v1.4.106 codex 1131 F1+F5 [P1+P2]**: 订阅行情. 返 [`SubResult`]
199    /// 表示是否新加全局订阅 (caller 据此累 quota).
200    /// 重复订阅 (同 conn_id 同 key) 不影响 set, 不影响 quota.
201    /// **NOTE**: caller 必须先 backend ack-then-commit (F1) — 本方法仅写
202    /// local state. 失败 caller 应调 `unsubscribe_qot_broker` 回滚.
203    pub fn subscribe_qot_broker(
204        &self,
205        conn_id: u64,
206        sec_key: &QotSecurityKey,
207        sub_type: i32,
208    ) -> SubResult {
209        self.subscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
210    }
211
212    fn subscribe_qot_inner(&self, conn_id: u64, key: QotSecurityKey, sub_type: i32) -> SubResult {
213        self.qot_disconnected_conns.write().remove(&conn_id);
214        let mut qot = self.qot_subs.write();
215        let map_key = (key.clone(), sub_type);
216        let entry = qot.entry(map_key.clone()).or_default();
217        let was_empty_global = entry.is_empty();
218        let inserted = entry.insert(conn_id);
219        if was_empty_global && inserted {
220            self.qot_sub_times.write().insert(map_key, Instant::now());
221        }
222        if !inserted {
223            SubResult::NoChange
224        } else if was_empty_global {
225            SubResult::NewGlobal
226        } else {
227            SubResult::AlreadyGlobal
228        }
229    }
230
231    /// 退订并返结构化结果. caller 据 `LastSubscriber` 决定是否发 backend
232    /// fresh CMD 6211 with new desired set.
233    pub fn unsubscribe_qot_broker(
234        &self,
235        conn_id: u64,
236        sec_key: &QotSecurityKey,
237        sub_type: i32,
238    ) -> UnsubResult {
239        self.unsubscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
240    }
241
242    fn unsubscribe_qot_inner(
243        &self,
244        conn_id: u64,
245        key: QotSecurityKey,
246        sub_type: i32,
247    ) -> UnsubResult {
248        let map_key = (key.clone(), sub_type);
249        let became_empty;
250        let was_member;
251        {
252            let mut qot = self.qot_subs.write();
253            let entry = match qot.get_mut(&map_key) {
254                Some(e) => e,
255                None => return UnsubResult::NotSubscribed,
256            };
257            was_member = entry.remove(&conn_id);
258            became_empty = entry.is_empty();
259            if became_empty {
260                qot.remove(&map_key);
261            }
262        }
263        if !was_member {
264            return UnsubResult::NotSubscribed;
265        }
266        if became_empty {
267            self.qot_sub_times.write().remove(&map_key);
268        }
269        // 清 session/detail per-conn entry (对齐 C++ QotSubscribe::UnSub line 251-292).
270        {
271            let mut sess = self.qot_sub_sessions.write();
272            if let Some(e) = sess.get_mut(&map_key) {
273                e.remove(&conn_id);
274                if e.is_empty() {
275                    sess.remove(&map_key);
276                }
277            }
278        }
279        if sub_type == sub_type_orderbook() {
280            let mut ob = self.qot_orderbook_detail.write();
281            if let Some(e) = ob.get_mut(&key) {
282                e.remove(&conn_id);
283                if e.is_empty() {
284                    ob.remove(&key);
285                }
286            }
287        }
288        if sub_type == sub_type_broker() {
289            let mut br = self.qot_broker_detail.write();
290            if let Some(e) = br.get_mut(&key) {
291                e.remove(&conn_id);
292                if e.is_empty() {
293                    br.remove(&key);
294                }
295            }
296        }
297        if became_empty {
298            UnsubResult::LastSubscriber
299        } else {
300            UnsubResult::StillSubscribed
301        }
302    }
303
304    /// 是否 (conn_id, key, sub_type) 已订阅.
305    pub fn is_qot_subscribed_broker(
306        &self,
307        conn_id: u64,
308        sec_key: &QotSecurityKey,
309        sub_type: i32,
310    ) -> bool {
311        self.qot_subs
312            .read()
313            .get(&(Self::broker_key(sec_key), sub_type))
314            .is_some_and(|subs| subs.contains(&conn_id))
315    }
316
317    /// v1.4.106 codex 1131 F3 [P1]: 全局 (ignore conn) 是否有订阅 — RegQotPush
318    /// 的 precondition. 对齐 C++ `QotSubscribe::IsSub(stockID, subType)`.
319    pub fn is_globally_subscribed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
320        self.qot_subs
321            .read()
322            .get(&(Self::broker_key(sec_key), sub_type))
323            .is_some_and(|subs| !subs.is_empty())
324    }
325
326    /// min-unsub window for broker-aware subscription keys.
327    pub fn qot_min_unsub_elapsed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
328        self.qot_sub_times
329            .read()
330            .get(&(Self::broker_key(sec_key), sub_type))
331            .map(|instant| instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS))
332            .unwrap_or(true)
333    }
334
335    /// remaining min-unsub window for broker-aware keys.
336    pub fn qot_min_unsub_remaining_secs_broker(
337        &self,
338        sec_key: &QotSecurityKey,
339        sub_type: i32,
340    ) -> u64 {
341        self.qot_sub_times
342            .read()
343            .get(&(Self::broker_key(sec_key), sub_type))
344            .map(|instant| QOT_MIN_UNSUB_ELAPSED_SECS.saturating_sub(instant.elapsed().as_secs()))
345            .unwrap_or(0)
346    }
347
348    /// 断线延迟清理后需要 gateway 同步 CMD6211 的 generation。
349    pub fn qot_disconnect_sync_generation(&self) -> u64 {
350        self.qot_disconnect_sync_generation.load(Ordering::SeqCst)
351    }
352
353    fn bump_qot_disconnect_sync_generation(&self) {
354        self.qot_disconnect_sync_generation
355            .fetch_add(1, Ordering::SeqCst);
356    }
357
358    fn conn_has_qot_subs(&self, conn_id: u64) -> bool {
359        self.qot_subs
360            .read()
361            .values()
362            .any(|subs| subs.contains(&conn_id))
363    }
364
365    #[doc(hidden)]
366    pub fn backdate_qot_sub_time_broker_for_test(
367        &self,
368        sec_key: &QotSecurityKey,
369        sub_type: i32,
370        elapsed: Duration,
371    ) {
372        let map_key = (Self::broker_key(sec_key), sub_type);
373        let instant = Instant::now()
374            .checked_sub(elapsed)
375            .unwrap_or_else(Instant::now);
376        self.qot_sub_times.write().insert(map_key, instant);
377    }
378
379    /// v1.4.106 codex 1131 F2: clear all qot subs for a single conn_id.
380    /// 返 (sec_key, sub_type) 列表 of "本 conn 退订后变成全局空的" — caller
381    /// 据此构 backend new desired set. 返的 sec_key 是 cache_key display string
382    /// (`"market_code"` or `"market_code@b{id}"`).
383    pub fn unsubscribe_all_qot_collect_global_empty(&self, conn_id: u64) -> Vec<(String, i32)> {
384        let mut became_empty: Vec<(String, i32)> = Vec::new();
385        let keys_to_check: Vec<(QotSecurityKey, i32)> = self
386            .qot_subs
387            .read()
388            .iter()
389            .filter(|(_, set)| set.contains(&conn_id))
390            .map(|(k, _)| k.clone())
391            .collect();
392        {
393            let mut qot = self.qot_subs.write();
394            for k in keys_to_check {
395                if let Some(set) = qot.get_mut(&k) {
396                    set.remove(&conn_id);
397                    if set.is_empty() {
398                        became_empty.push((k.0.cache_key(), k.1));
399                        let removed_key = k.clone();
400                        qot.remove(&k);
401                        self.qot_sub_times.write().remove(&removed_key);
402                    }
403                }
404            }
405        }
406        // session/detail/push_regs cleanup for this conn.
407        {
408            let mut sess = self.qot_sub_sessions.write();
409            sess.retain(|_, m| {
410                m.remove(&conn_id);
411                !m.is_empty()
412            });
413        }
414        {
415            let mut ob = self.qot_orderbook_detail.write();
416            ob.retain(|_, m| {
417                m.remove(&conn_id);
418                !m.is_empty()
419            });
420        }
421        {
422            let mut br = self.qot_broker_detail.write();
423            br.retain(|_, m| {
424                m.remove(&conn_id);
425                !m.is_empty()
426            });
427        }
428        {
429            let mut pr = self.qot_push_regs.write();
430            pr.retain(|_, set| {
431                set.remove(&conn_id);
432                !set.is_empty()
433            });
434        }
435        became_empty
436    }
437
438    /// 清理已断开且已满足 C++ 最短退订窗口的 QOT conn。
439    ///
440    /// 返回本次清理后 global desired set 变空的 `(sec_key, sub_type)` 列表
441    /// (sec_key 是 cache_key display string: `"market_code"` or
442    /// `"market_code@b{id}"`).
443    /// 若列表非空,会 bump `qot_disconnect_sync_generation`,由 gateway 后台
444    /// 任务负责把新的全局 desired set 发到 backend。
445    pub fn cleanup_due_disconnected_qot(&self) -> Vec<(String, i32)> {
446        let disconnected: Vec<u64> = self.qot_disconnected_conns.read().iter().copied().collect();
447        if disconnected.is_empty() {
448            return Vec::new();
449        }
450
451        let mut to_remove: Vec<(u64, QotSecurityKey, i32)> = Vec::new();
452        {
453            let qot = self.qot_subs.read();
454            let sub_times = self.qot_sub_times.read();
455            for conn_id in &disconnected {
456                for ((key, sub_type), subs) in qot.iter() {
457                    if !subs.contains(conn_id) {
458                        continue;
459                    }
460                    let elapsed_ok = sub_times
461                        .get(&(key.clone(), *sub_type))
462                        .map(|instant| {
463                            instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS)
464                        })
465                        .unwrap_or(true);
466                    if elapsed_ok {
467                        to_remove.push((*conn_id, key.clone(), *sub_type));
468                    }
469                }
470            }
471        }
472
473        let mut became_empty = Vec::new();
474        for (conn_id, key, sub_type) in to_remove {
475            let display_key = key.cache_key();
476            if matches!(
477                self.unsubscribe_qot_inner(conn_id, key, sub_type),
478                UnsubResult::LastSubscriber
479            ) {
480                became_empty.push((display_key, sub_type));
481            }
482        }
483
484        {
485            let mut disconnected = self.qot_disconnected_conns.write();
486            disconnected.retain(|conn_id| self.conn_has_qot_subs(*conn_id));
487        }
488
489        if !became_empty.is_empty() {
490            self.bump_qot_disconnect_sync_generation();
491        }
492        became_empty
493    }
494
495    /// **v1.4.106 codex 0631 F1 [P1]**: ack-then-commit `unsub_all` 的"干跑"半段.
496    ///
497    /// 计算: **若**本 conn 退订全部, 哪些 `(sec_key, sub_type)` 在 global
498    /// desired set 中**变空** (= backend 该真退). **不修 state, 不动 detail
499    /// flag, 不动 push_regs**. 用在 ack-then-commit pipeline:
500    ///
501    /// `dry_run -> submit_global_desired_set (backend ack) -> commit (清 state)`
502    ///
503    /// backend reject → caller 不调 `commit`, state 保留 → 客户端可重试幂等.
504    /// 老 `unsubscribe_all_qot_collect_global_empty` 是先清后算 — 失败时
505    /// state 已 mutate, 不能 rollback (split-brain 风险). 本 helper 替代.
506    pub fn unsubscribe_all_qot_dry_run(&self, conn_id: u64) -> Vec<(String, i32)> {
507        let mut became_empty: Vec<(String, i32)> = Vec::new();
508        let qot = self.qot_subs.read();
509        for ((k, sub_type), set) in qot.iter() {
510            if !set.contains(&conn_id) {
511                continue;
512            }
513            // 退订本 conn 后该 set 是否变空? set.len() == 1 + contains(&conn_id) → 退后空.
514            if set.len() == 1 {
515                became_empty.push((k.cache_key(), *sub_type));
516            }
517        }
518        became_empty
519    }
520
521    /// **v1.4.106 codex 0631 F1 [P1]**: ack-then-commit `unsub_all` 的"提交"半段.
522    /// 等价于老 `unsubscribe_all_qot_collect_global_empty` (语义不变, 仅在
523    /// backend ack OK 后才调). 同时清 session / detail / push_regs.
524    pub fn unsubscribe_all_qot_commit(&self, conn_id: u64) -> Vec<(String, i32)> {
525        self.unsubscribe_all_qot_collect_global_empty(conn_id)
526    }
527
528    /// 获取订阅了指定行情的连接列表 (subscribers, **不**用作 push 路由).
529    /// 用于 `apply_unsubscribe_delta` 判断 broker-aware key 上是否还有其他
530    /// conn 订阅 (last-subscriber gate for desired-set remove).
531    pub fn get_qot_subscribers_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> Vec<u64> {
532        self.qot_subs
533            .read()
534            .get(&(Self::broker_key(sec_key), sub_type))
535            .map(|s| s.iter().copied().collect())
536            .unwrap_or_default()
537    }
538
539    /// **v1.4.110 codex audit Round3 P2 #21**: 给定 `stock_id`, 判断该 stock 是否
540    /// **全局**已无任何 conn 订阅 (跨所有 broker_id + 所有 sub_type).
541    ///
542    /// 用途: `Qot_Sub` 退订路径在 commit 之后判断 crypto symbol 是否真正全空,
543    /// 决定是否调 `CryptoExchangeCache::clear_stock(stock_id)` 清 stale entry —
544    /// 因为 `crypto_exchange_cache` 按 `stock_id` keyed (broker 无关), 只有该
545    /// stock 全 broker 全 sub_type 都退掉才能安全清.
546    ///
547    /// 扫 `qot_subs` 找任意 key 满足 `QotStockKey.stock_id == stock_id`;
548    /// broker 1007 退订但 1008 仍订阅时返 `false` (不能清).
549    ///
550    /// 返 `true` ⟺ 该 stock_id 在 `qot_subs` 中无任何带 subscriber 的 entry.
551    pub fn crypto_stock_globally_unsubscribed(&self, stock_id: u64) -> bool {
552        let qot = self.qot_subs.read();
553        !qot.iter()
554            .any(|((key, _sub_type), subs)| !subs.is_empty() && key.stock_key.stock_id == stock_id)
555    }
556
557    /// v1.4.110 R6-8: `(stock_id, broker_id)`-级版 `crypto_stock_globally_unsubscribed`.
558    ///
559    /// 用途: 部分 broker 退订时, 判断某具体 `(stock_id, broker_id)` 是否已无任何
560    /// conn 订阅 → 决定是否调 `CryptoExchangeCache::clear_stock_broker` 清该
561    /// broker 的 stale `by_broker` entry (整 stock 仍有别的 broker 在订时
562    /// `clear_stock` 不适用).
563    ///
564    /// 按 `stock_key.stock_id` + `stock_key.broker_id` 双字段匹配.
565    ///
566    /// 返 `true` ⟺ 该 `(stock_id, broker_id)` 在 `qot_subs` 无任何带 subscriber 的 entry.
567    pub fn crypto_stock_broker_globally_unsubscribed(&self, stock_id: u64, broker_id: u32) -> bool {
568        let target_broker = std::num::NonZeroU32::new(broker_id);
569        let qot = self.qot_subs.read();
570        !qot.iter().any(|((key, _sub_type), subs)| {
571            !subs.is_empty()
572                && key.stock_key.stock_id == stock_id
573                && key.stock_key.broker_id == target_broker
574        })
575    }
576
577    // ===== 行情 push 注册 (qot_push_regs, F3 独立) =====
578
579    /// **v1.4.106 codex 1131 F3 [P1]**: 注册接收 push (对齐 C++ `RegPush`).
580    /// 仅写本 map, **不动 `qot_subs`**. caller (RegQotPushHandler) 必须在
581    /// 调本方法前确认已订阅 (`is_globally_subscribed_broker`).
582    pub fn register_push_broker(
583        &self,
584        conn_id: u64,
585        sec_key: &QotSecurityKey,
586        sub_type: i32,
587        rehab_type: i32,
588    ) {
589        self.register_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
590    }
591
592    fn register_push_inner(
593        &self,
594        conn_id: u64,
595        key: QotSecurityKey,
596        sub_type: i32,
597        rehab_type: i32,
598    ) {
599        let effective_rehab = if is_kl_sub_type(sub_type) {
600            rehab_type
601        } else {
602            0
603        };
604        self.qot_push_regs
605            .write()
606            .entry((key, sub_type, effective_rehab))
607            .or_default()
608            .insert(conn_id);
609    }
610
611    /// **v1.4.106 codex 1131 F3 [P1]**: 取消 push 注册 — 不删 `qot_subs`.
612    pub fn unregister_push_broker(
613        &self,
614        conn_id: u64,
615        sec_key: &QotSecurityKey,
616        sub_type: i32,
617        rehab_type: i32,
618    ) {
619        self.unregister_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
620    }
621
622    fn unregister_push_inner(
623        &self,
624        conn_id: u64,
625        key: QotSecurityKey,
626        sub_type: i32,
627        rehab_type: i32,
628    ) {
629        let effective_rehab = if is_kl_sub_type(sub_type) {
630            rehab_type
631        } else {
632            0
633        };
634        let map_key = (key, sub_type, effective_rehab);
635        let mut pr = self.qot_push_regs.write();
636        if let Some(set) = pr.get_mut(&map_key) {
637            set.remove(&conn_id);
638            if set.is_empty() {
639                pr.remove(&map_key);
640            }
641        }
642    }
643
644    /// **v1.4.106 codex 1131 F4 [P1]**: push delivery filter. 对齐 C++
645    /// `QotSubscribe::GetPushConn`.
646    pub fn get_qot_push_subscribers_broker(
647        &self,
648        sec_key: &QotSecurityKey,
649        sub_type: i32,
650        rehab_type: i32,
651    ) -> Vec<u64> {
652        self.get_qot_push_subscribers_inner(Self::broker_key(sec_key), sub_type, rehab_type)
653    }
654
655    /// Lookup by internal cache-key display
656    /// string carried by push events (`"market_code"` or `"market_code@b{id}"`).
657    ///
658    /// `PushEvent::QuotePush` does not carry `stock_id`, so dispatch cannot
659    /// reconstruct a full `QotSecurityKey`. Instead, match the already stored
660    /// subscription keys by their cache-key display string.
661    pub fn get_qot_push_subscribers_by_cache_key(
662        &self,
663        cache_key: &str,
664        sub_type: i32,
665        rehab_type: i32,
666    ) -> Vec<u64> {
667        if QotSecurityKey::parse_cache_key(cache_key).is_none() {
668            return Vec::new();
669        }
670
671        let effective_rehab = if is_kl_sub_type(sub_type) {
672            rehab_type
673        } else {
674            0
675        };
676        let regs = self.qot_push_regs.read();
677        let mut out = Vec::new();
678        for ((key, stored_sub_type, stored_rehab), subscribers) in regs.iter() {
679            if *stored_sub_type != sub_type
680                || *stored_rehab != effective_rehab
681                || key.cache_key() != cache_key
682            {
683                continue;
684            }
685            out.extend(subscribers.iter().copied());
686        }
687
688        out.sort_unstable();
689        out.dedup();
690        out
691    }
692
693    fn get_qot_push_subscribers_inner(
694        &self,
695        key: QotSecurityKey,
696        sub_type: i32,
697        rehab_type: i32,
698    ) -> Vec<u64> {
699        let effective_rehab = if is_kl_sub_type(sub_type) {
700            rehab_type
701        } else {
702            0
703        };
704        self.qot_push_regs
705            .read()
706            .get(&(key, sub_type, effective_rehab))
707            .map(|s| s.iter().copied().collect())
708            .unwrap_or_default()
709    }
710
711    /// **v1.4.106 codex 1131 F3 [P1]**: 是否注册过 push (任意 rehab).
712    pub fn is_push_registered_any_rehab_broker(
713        &self,
714        conn_id: u64,
715        sec_key: &QotSecurityKey,
716        sub_type: i32,
717    ) -> bool {
718        let broker_key = Self::broker_key(sec_key);
719        let pr = self.qot_push_regs.read();
720        pr.iter().any(|((k, st, _rehab), set)| {
721            k == &broker_key && *st == sub_type && set.contains(&conn_id)
722        })
723    }
724
725    // ===== Session / Detail (per-(security, sub_type) global aggregator) =====
726
727    pub fn set_conn_session_broker(
728        &self,
729        conn_id: u64,
730        sec_key: &QotSecurityKey,
731        sub_type: i32,
732        session: i32,
733    ) {
734        self.qot_sub_sessions
735            .write()
736            .entry((Self::broker_key(sec_key), sub_type))
737            .or_default()
738            .insert(conn_id, session);
739    }
740
741    pub fn get_global_session_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> i32 {
742        self.qot_sub_sessions
743            .read()
744            .get(&(Self::broker_key(sec_key), sub_type))
745            .map(|m| m.values().copied().max().unwrap_or(1))
746            .unwrap_or(1)
747    }
748
749    /// 获取单连接订阅 session(没有显式记录时按 C++ 默认 RTH)。
750    pub fn get_conn_session_broker(
751        &self,
752        conn_id: u64,
753        sec_key: &QotSecurityKey,
754        sub_type: i32,
755    ) -> i32 {
756        self.qot_sub_sessions
757            .read()
758            .get(&(Self::broker_key(sec_key), sub_type))
759            .and_then(|m| m.get(&conn_id).copied())
760            .unwrap_or(1)
761    }
762
763    pub fn set_conn_orderbook_detail_broker(
764        &self,
765        conn_id: u64,
766        sec_key: &QotSecurityKey,
767        detail: bool,
768    ) {
769        self.qot_orderbook_detail
770            .write()
771            .entry(Self::broker_key(sec_key))
772            .or_default()
773            .insert(conn_id, detail);
774    }
775
776    pub fn is_global_orderbook_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
777        self.qot_orderbook_detail
778            .read()
779            .get(&Self::broker_key(sec_key))
780            .map(|m| m.values().any(|&d| d))
781            .unwrap_or(false)
782    }
783
784    pub fn set_conn_broker_detail_broker(
785        &self,
786        conn_id: u64,
787        sec_key: &QotSecurityKey,
788        detail: bool,
789    ) {
790        self.qot_broker_detail
791            .write()
792            .entry(Self::broker_key(sec_key))
793            .or_default()
794            .insert(conn_id, detail);
795    }
796
797    pub fn is_global_broker_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
798        self.qot_broker_detail
799            .read()
800            .get(&Self::broker_key(sec_key))
801            .map(|m| m.values().any(|&d| d))
802            .unwrap_or(false)
803    }
804
805    // ===== Quota =====
806
807    /// per-conn used quota — count 该 conn 的 (security_key, sub_type) 对数.
808    pub fn get_conn_used_quota(&self, conn_id: u64) -> u32 {
809        self.qot_subs
810            .read()
811            .iter()
812            .filter(|(_, set)| set.contains(&conn_id))
813            .count() as u32
814    }
815
816    /// **全局 used quota** — 对齐 C++ `m_nAllUsedQuota` (累加全局唯一
817    /// SubKey count). 不重复计 conn — 多 conn 订同 (stock, sub_type) 全局算 1.
818    pub fn get_total_used_quota(&self) -> u32 {
819        self.qot_subs.read().len() as u32
820    }
821
822    /// 获取 total quota (动态, backend 下发后更新).
823    pub fn get_total_quota(&self) -> u32 {
824        *self.total_quota.read()
825    }
826
827    /// **v1.4.106 codex 1131 F5 [P2]**: backend 下发的 quota 真值 setter.
828    pub fn set_total_quota_from_backend(&self, value: u32) {
829        let mut q = self.total_quota.write();
830        if *q != value {
831            tracing::info!(
832                old = *q,
833                new = value,
834                "v1.4.106 codex 1131 F5: total_quota updated from backend"
835            );
836            *q = value;
837        }
838    }
839
840    pub fn get_remain_quota(&self) -> u32 {
841        let total = self.get_total_quota();
842        let used = self.get_total_used_quota();
843        total.saturating_sub(used)
844    }
845
846    // ===== Conn-level views =====
847
848    pub fn get_conn_qot_subs(&self, conn_id: u64) -> HashMap<i32, Vec<String>> {
849        let qot = self.qot_subs.read();
850        let mut result: HashMap<i32, Vec<String>> = HashMap::new();
851        for ((key, sub_type), conn_ids) in qot.iter() {
852            if conn_ids.contains(&conn_id) {
853                result.entry(*sub_type).or_default().push(key.cache_key());
854            }
855        }
856        result
857    }
858
859    pub fn get_all_qot_conn_ids(&self) -> HashSet<u64> {
860        let qot = self.qot_subs.read();
861        let mut ids = HashSet::new();
862        for conn_ids in qot.values() {
863            ids.extend(conn_ids);
864        }
865        ids
866    }
867
868    /// v1.4.106 codex 0932 F5 [P2]: 获取所有连接 ID(有交易账户订阅的).
869    pub fn get_all_trd_conn_ids(&self) -> HashSet<u64> {
870        let trd = self.trd_acc_subs.read();
871        let mut ids = HashSet::new();
872        for conn_ids in trd.values() {
873            ids.extend(conn_ids);
874        }
875        ids
876    }
877
878    /// v1.4.106 codex 1131 F2 [P1]: 计算全局 desired set.
879    /// 返 (sec_key_display, sub_type), sec_key_display 是 cache_key 形态
880    /// (`"market_code"` or `"market_code@b{id}"`).
881    pub fn compute_global_desired_set(&self) -> Vec<(String, i32)> {
882        let qot = self.qot_subs.read();
883        let mut out = Vec::with_capacity(qot.len());
884        for (k, sub_type) in qot.keys() {
885            out.push((k.cache_key(), *sub_type));
886        }
887        out
888    }
889    /// **v1.4.106 codex 0631 F2 [P2]**: shared global desired-set keys helper.
890    ///
891    /// 返当前全集 `Vec<(sec_key, sub_type)>` (与 `compute_global_desired_set`
892    /// 等价, 命名对齐 codex 0631 audit 习惯). 共享给 SubHandler / unsub /
893    /// unsub_all / resubscribe_quotes 三路径, 不再每条 sub 单独发 delta —
894    /// 对齐 CMD 6211 set-state 协议 (per-market full set replaces).
895    ///
896    /// caller 应做的事:
897    /// 1. 调本 fn 拿当前全集
898    /// 2. 应用 delta (sub: 加; unsub: 减; unsub_all: 移除本 conn 独占的)
899    /// 3. resolve sec_key → stock_id (cache); cache miss → loud warn (本 fn 不
900    ///    替 caller 决定 — 由 caller 选择 fail-loud 还是 partial-submit)
901    /// 4. 调 `submit_global_desired_set(NEW set)` 让 backend ack
902    ///
903    /// 命名: keys = `(sec_key, sub_type)` 二元组, 不是 `(stock_id, market)` —
904    /// stock_id resolve 是 caller 责任 (因为依赖 StaticDataCache).
905    #[inline]
906    pub fn qot_global_desired_keys(&self) -> Vec<(String, i32)> {
907        self.compute_global_desired_set()
908    }
909
910    // ===== 连接断开清理 =====
911
912    pub fn on_disconnect(&self, conn_id: u64) -> Vec<(String, i32)> {
913        self.notify_subs.write().remove(&conn_id);
914
915        {
916            let mut trd = self.trd_acc_subs.write();
917            for subs in trd.values_mut() {
918                subs.remove(&conn_id);
919            }
920        }
921
922        {
923            let mut pr = self.qot_push_regs.write();
924            pr.retain(|_, set| {
925                set.remove(&conn_id);
926                !set.is_empty()
927            });
928        }
929
930        if self.conn_has_qot_subs(conn_id) {
931            self.qot_disconnected_conns.write().insert(conn_id);
932        }
933        self.cleanup_due_disconnected_qot()
934    }
935}
936
937impl Default for SubscriptionManager {
938    fn default() -> Self {
939        Self::new()
940    }
941}
942
943#[inline]
944fn sub_type_orderbook() -> i32 {
945    2
946}
947
948#[inline]
949fn sub_type_broker() -> i32 {
950    14
951}
952
953/// 是否 KL 类 — 对齐 C++ `IsKLSubType`.
954#[inline]
955fn is_kl_sub_type(sub_type: i32) -> bool {
956    matches!(sub_type, 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 15 | 16 | 17)
957}
958
959#[cfg(test)]
960mod tests;