Skip to main content

futu_cache/
crypto_exchange_cache.rs

1//! v1.4.110 codex QOT Phase 4 Slice 7: Crypto LV2 多交易所缓存.
2//!
3//! 对齐 C++ `APIServer/Business/Quote/QotRealTimeData.cpp` 三个 map +
4//! `INNData_Qot_CryptoExchange::SetLv2RelatedExchange` 缓存写入:
5//!
6//! - **by_broker** (`m_mapCryptoStockBrokerExchange`): `(stock_id, broker_id) →
7//!   Vec<CryptoExchangeInfo>` — 18012 response 写入, 表示该 broker 看该 stock
8//!   有哪些 LV2 关联交易所. 通过 `GetLv2RelatedExchangeList(stKey)` 查询.
9//! - **lv2_exchange_cache** (`m_mapLv2ExchangeCache`): `(stock_id, lv2_prob) →
10//!   CachedOrderBook` — 单 exchange 的原始 LV2 摆盘缓存. 收到 push 时按
11//!   `ExchangeCacheKey` 索引写入.
12//! - **lv2_prob_to_brokers** (`m_mapLv2ProbToBrokers`): `(stock_id, lv2_prob)
13//!   → HashSet<broker_id>` — 反向索引. 收到 exchange-level push 时, O(1) 找出
14//!   所有受影响 broker, 然后 per-broker rebuild 调 `merge_multiple_order_book_caches`.
15//!
16//! ## 流程图
17//!
18//! ```text
19//! GetOrderBookHandler: crypto LV2 第 1 次订阅
20//!   → fetch_and_cache_exchanges (CMD18012, PT 过滤)
21//!     → cache.set_lv2_related_exchange((stock_id, broker_id), Vec<info>)
22//!       → 内部更新 by_broker + 重建 lv2_prob_to_brokers
23//!     → resubscribe with prob2_v2 (level=60) for each lv2_prob
24//!
25//! Backend push exchange-level LV2 orderbook:
26//!   → push_parser 识别 SBIT_US_LV2_ORDER (17) + sec_info.is_crypto + lv2_type
27//!     → cache.set_lv2_exchange_cache((stock_id, lv2_prob), s2c)
28//!     → cache.get_brokers_affected_by_lv2_prob((stock_id, lv2_prob)) → HashSet
29//!       For each broker in HashSet:
30//!         → cache.exchanges_for_broker((stock_id, broker_id)) → Vec
31//!         → 收集所有 exchange caches → merge_multiple_order_book_caches(40)
32//!         → qot_cache.update_order_book_broker(StockKey{stock_id, broker_id})
33//!         → cold-cache wait notify (Slice 6c)
34//! ```
35//!
36//! ## C++ 参照
37//!
38//! - `QotRealTimeData.cpp:918-1022` `ParseCryptoToExchangeCache` /
39//!   `RebuildCryptoBrokerCache` / `MergeCryptoExchangesToBrokerCache`
40//! - `QotRealTimeData.cpp:1024-1063` `UpdateLv2ProbToBrokersIndex`
41//! - `NNBiz_Qot_CryptoExchange.cpp:131-141` `SetLv2RelatedExchange` 写完后
42//!   触发 `IOMEvent::NotifyEvent(NN_OMEvent_Qot_CryptoExchange_IndexUpdate)` +
43//!   `ReSubCryptoOrderBook`.
44
45use std::collections::{HashMap, HashSet};
46use std::sync::Arc;
47
48use dashmap::DashMap;
49
50use crate::qot_cache::CachedOrderBook;
51
52/// `(stock_id, lv2_prob)` — exchange-level cache key.
53///
54/// 对齐 C++ `ExchangeCacheKey(nStockID, nLv2Prob)` (QotRealTimeData.cpp).
55pub type ExchangeCacheKey = (u64, i32);
56
57/// `(stock_id, broker_id)` — broker-level exchange list key.
58pub type BrokerExchangeKey = (u64, u32);
59
60/// Daemon-side snapshot of one entry in CMD18012 `ExchangeInfo`.
61///
62/// 对齐 C++ `Ndt_Qot_CryptoExchangeInfo` (NNBiz_Qot_CryptoExchange.cpp:55-58).
63///
64/// 在 cache crate 单独定义 (而非引用 futu-backend) 是为了避免 futu-cache →
65/// futu-backend 反向依赖 (C++ 也分 NNData / NNBiz / APIServer 三层, 类似关注点分离).
66#[derive(Debug, Clone, PartialEq)]
67pub struct CryptoExchangeInfo {
68    /// 订阅位 prob — `USLV2OrderSubProb.us_lv2_order_type`.
69    pub lv2_prob: i32,
70    /// 交易所内部 name (e.g. "BINANCE", "OKX", "PT").
71    pub exchange_name: String,
72    /// 上市交易所标识 (FTAPI listed_exchange 字段).
73    pub listed_exchange: String,
74    /// 是否默认选中 (UI 显示用, daemon 一般不消费).
75    pub is_pick: bool,
76}
77
78/// Crypto LV2 多交易所缓存. 包装 3 个 DashMap + 反向索引重建逻辑.
79///
80/// **线程安全**: 内部全用 DashMap, 任意线程可同时 read/write.
81/// **lifecycle**: 整个 daemon lifetime, 由 `GatewayBridge::crypto_exchange_cache`
82/// 持有 Arc.
83#[derive(Debug, Default)]
84pub struct CryptoExchangeCache {
85    /// `(stock_id, broker_id) → exchange list (from CMD18012 response)`.
86    pub by_broker: DashMap<BrokerExchangeKey, Vec<CryptoExchangeInfo>>,
87
88    /// `(stock_id, lv2_prob) → orderbook (exchange-level, raw from push)`.
89    pub lv2_exchange_cache: DashMap<ExchangeCacheKey, CachedOrderBook>,
90
91    /// `(stock_id, lv2_prob) → set<broker_id>` — 反向索引,
92    /// O(1) 查询受 exchange-level push 影响的 broker 集.
93    pub lv2_prob_to_brokers: DashMap<ExchangeCacheKey, HashSet<u32>>,
94
95    /// v1.4.110 R6-5: 串行化所有"写 `by_broker` + 重建反向索引"的写路径
96    /// (`set_lv2_related_exchange` / `clear_stock` / `clear_stock_broker`).
97    ///
98    /// 反向索引重建要遍历 `by_broker` 全表算 `new_index`; 两个并发 writer 各自
99    /// 遍历会拿到不一致的 DashMap 快照, 一个 writer 的 `retain` 可能误删另一个
100    /// writer 刚 `insert` 的 prob entry → 该 prob 反向索引永久丢失 → 之后 crypto
101    /// LV2 push 对该 broker silent miss (坑 #45). R4-2 只修了"新 prob entry 不
102    /// 经历空态", 没修这条跨线程 stale-retain (坑 #44: 同一反向索引并发 invariant
103    /// 第 2 次复发 → 根治). 这把锁让 "insert by_broker + 重建索引" 成为真临界区.
104    /// writer 都是低频 first-sub / unsub 路径, 锁零实际性能影响; reader
105    /// (`get_brokers_affected_by_lv2_prob` 等) 不取锁, 仍走 DashMap lock-free 读.
106    rebuild_lock: parking_lot::Mutex<()>,
107}
108
109impl CryptoExchangeCache {
110    pub fn new() -> Arc<Self> {
111        Arc::new(Self::default())
112    }
113
114    /// 设置 `(stock_id, broker_id)` 对应的 exchange 列表 (CMD18012 response).
115    ///
116    /// 副作用:
117    /// 1. 更新 `by_broker[(stock_id, broker_id)] = exchanges`
118    /// 2. 重建 `lv2_prob_to_brokers[(stock_id, *)]` 该 stock 的反向索引
119    ///    (先清旧 entries 该 stock 的所有 prob → 再按本次 + 其他 broker 重建)
120    ///
121    /// 对齐 C++ `INNData_Qot_CryptoExchange::SetLv2RelatedExchange(stKey, vExchanges)` +
122    /// `UpdateLv2ProbToBrokersIndex(nStockID)`.
123    pub fn set_lv2_related_exchange(
124        &self,
125        stock_id: u64,
126        broker_id: u32,
127        exchanges: Vec<CryptoExchangeInfo>,
128    ) {
129        // v1.4.110 R6-5: 整个 "写 by_broker + 重建反向索引" 在 rebuild_lock 内
130        // 串行化 (见 `rebuild_lock` 字段注释).
131        let _rebuild_guard = self.rebuild_lock.lock();
132        self.by_broker.insert((stock_id, broker_id), exchanges);
133        self.rebuild_reverse_index_for_stock(stock_id);
134        tracing::debug!(
135            stock_id,
136            broker_id,
137            "v1.4.110 codex Phase 4 Slice 7: set_lv2_related_exchange + rebuild reverse index"
138        );
139    }
140
141    /// 重建某 stock 的 `lv2_prob_to_brokers` 反向索引 (遍历 `by_broker` 当前
142    /// 全部 `(stock_id, *)` entry 重算 prob → brokers 映射).
143    ///
144    /// **caller 必须持 `rebuild_lock`** —— 本 fn 遍历 `by_broker` 算 `new_index`,
145    /// 多个并发 caller 不串行化会拿到不一致快照, 一个 caller 的 (c) `retain`
146    /// 可能误删另一个 caller 刚 (b) `insert` 的 prob (见 R6-5 / 坑 #44).
147    ///
148    /// v1.4.110 codex audit Round4 R4-2: 不用 "retain 清旧 → for 重建" 两步法 ——
149    /// 两步之间有 empty window: 并发 crypto LV2 push 在此间隙调
150    /// `get_brokers_affected_by_lv2_prob` 会看到空索引 → affected_brokers 空 →
151    /// push 被静默丢 (pitfall #45). 改 overwrite-then-remove:
152    ///   (a) 本地算出该 stock 的新 prob → brokers 映射;
153    ///   (b) 对每个新 prob `insert` 原子覆盖 —— 已存在的 prob entry 从不经历
154    ///       空态, reader 永远看到 old-set / new-set 二选一;
155    ///   (c) 再 `retain` 删掉本 stock 不在新映射里的旧 prob (这些 prob 已无
156    ///       broker 订阅, push 本就该丢).
157    fn rebuild_reverse_index_for_stock(&self, stock_id: u64) {
158        let mut new_index: HashMap<i32, HashSet<u32>> = HashMap::new();
159        for entry in self.by_broker.iter() {
160            let (sid, bid) = entry.key();
161            if *sid != stock_id {
162                continue;
163            }
164            for info in entry.value() {
165                new_index.entry(info.lv2_prob).or_default().insert(*bid);
166            }
167        }
168        // (b) 原子覆盖每个新 prob.
169        for (prob, brokers) in &new_index {
170            let cache_key: ExchangeCacheKey = (stock_id, *prob);
171            self.lv2_prob_to_brokers.insert(cache_key, brokers.clone());
172        }
173        // (c) 删掉本 stock 不在新映射里的旧 prob.
174        self.lv2_prob_to_brokers
175            .retain(|key, _| key.0 != stock_id || new_index.contains_key(&key.1));
176    }
177
178    /// 取某 broker 看某 stock 的 exchange 列表.
179    pub fn exchanges_for_broker(
180        &self,
181        stock_id: u64,
182        broker_id: u32,
183    ) -> Option<Vec<CryptoExchangeInfo>> {
184        self.by_broker
185            .get(&(stock_id, broker_id))
186            .map(|r| r.clone())
187    }
188
189    /// 写 exchange-level cache (单个 exchange 的原始 LV2 摆盘).
190    /// 对齐 C++ `m_mapLv2ExchangeCache[cacheKey] = pbOrderBook` (QotRealTimeData.cpp:938).
191    pub fn set_lv2_exchange_cache(&self, stock_id: u64, lv2_prob: i32, orderbook: CachedOrderBook) {
192        self.lv2_exchange_cache
193            .insert((stock_id, lv2_prob), orderbook);
194    }
195
196    /// 取 exchange-level cache.
197    pub fn get_lv2_exchange_cache(&self, stock_id: u64, lv2_prob: i32) -> Option<CachedOrderBook> {
198        self.lv2_exchange_cache
199            .get(&(stock_id, lv2_prob))
200            .map(|v| v.clone())
201    }
202
203    /// 反向索引查询: `(stock_id, lv2_prob)` 影响哪些 broker.
204    /// 对齐 C++ `setBrokerIDs = m_mapLv2ProbToBrokers[cacheKey]` (QotRealTimeData.cpp:941-944).
205    pub fn get_brokers_affected_by_lv2_prob(&self, stock_id: u64, lv2_prob: i32) -> HashSet<u32> {
206        self.lv2_prob_to_brokers
207            .get(&(stock_id, lv2_prob))
208            .map(|s| s.clone())
209            .unwrap_or_default()
210    }
211
212    /// 给定 `(stock_id, broker_id)`, 收集该 broker 的所有 exchange 对应的
213    /// exchange-level cache 列表 (用作 merge 输入).
214    ///
215    /// 对齐 C++ `MergeCryptoExchangesToBrokerCache` 内部逻辑 (line 987-1022).
216    pub fn collect_exchange_caches_for_broker(
217        &self,
218        stock_id: u64,
219        broker_id: u32,
220    ) -> Vec<CachedOrderBook> {
221        let infos = match self.exchanges_for_broker(stock_id, broker_id) {
222            Some(v) => v,
223            None => return vec![],
224        };
225        infos
226            .iter()
227            .filter_map(|info| self.get_lv2_exchange_cache(stock_id, info.lv2_prob))
228            .collect()
229    }
230
231    /// 清除某 stock 的所有 cache (退订 / reset 时调).
232    /// 对齐 C++ `ClearStockData(nStockID)`.
233    pub fn clear_stock(&self, stock_id: u64) {
234        // v1.4.110 R6-5: 与 set_lv2_related_exchange / clear_stock_broker 共用
235        // rebuild_lock, 避免整 stock 清与反向索引重建交错.
236        let _rebuild_guard = self.rebuild_lock.lock();
237        self.by_broker.retain(|key, _| key.0 != stock_id);
238        self.lv2_exchange_cache.retain(|key, _| key.0 != stock_id);
239        self.lv2_prob_to_brokers.retain(|key, _| key.0 != stock_id);
240    }
241
242    /// v1.4.110 R6-8: 清除某 `(stock_id, broker_id)` 的 `by_broker` entry +
243    /// 重建该 stock 反向索引. 用于 **部分 broker 退订** —— stock 还有别的 broker
244    /// 在订 (`clear_stock` 整 stock 清不适用), 但本 broker 已全局无订阅, 其
245    /// `by_broker[(stock, broker)]` 会 stale 滞留 → `by_broker` 慢漏累积死 entry.
246    ///
247    /// 移除本 broker 后该 stock 已无任何 broker → 连 `lv2_exchange_cache` 一并清
248    /// (无 broker 引用的 exchange-level cache 也是 stale).
249    pub fn clear_stock_broker(&self, stock_id: u64, broker_id: u32) {
250        let _rebuild_guard = self.rebuild_lock.lock();
251        self.by_broker.remove(&(stock_id, broker_id));
252        self.rebuild_reverse_index_for_stock(stock_id);
253        // 该 stock 已无任何 broker → exchange-level cache 也清.
254        let stock_still_has_broker = self.by_broker.iter().any(|e| e.key().0 == stock_id);
255        if !stock_still_has_broker {
256            self.lv2_exchange_cache.retain(|key, _| key.0 != stock_id);
257        }
258    }
259
260    /// v1.4.110 R6-8: 列出 `by_broker` 里某 stock 当前缓存的所有 broker_id.
261    /// 退订路径用它枚举该 stock 的 broker, 逐个判是否已全局无订阅 → clear.
262    pub fn brokers_for_stock(&self, stock_id: u64) -> Vec<u32> {
263        self.by_broker
264            .iter()
265            .filter(|e| e.key().0 == stock_id)
266            .map(|e| e.key().1)
267            .collect()
268    }
269
270    /// v1.4.110 R6-2 + R6-4: 重建某 `(stock_id, broker_id)` 的 broker-level
271    /// 摆盘缓存 —— 收集该 broker 所有 exchange 的 LV2 cache → merge 40 档 → 写
272    /// `qot_cache.order_books[broker_cache_key]` → 唤醒 cold-cache waiter.
273    ///
274    /// 对齐 C++ `QotRealTimeData::RebuildCryptoBrokerCache` (line 960-985).
275    /// `parse_crypto_lv2_order_book_to_push` (push 路径, 每个 affected broker 调
276    /// 一次) 与 `maybe_fetch_crypto_exchanges` (CMD18012 完成后补 merge, R6-4)
277    /// 共用本 fn.
278    ///
279    /// 返 `Some(merged)` 供 caller 构造 PushEvent; 该 broker 当前无任何 exchange
280    /// cache (collect 空) → `None` (无数据可 merge, 不动 `qot_cache`).
281    ///
282    /// `broker_cache_key` 由 caller 传 (= `QotSecurityKey::cache_key()`,
283    /// `"{public}@b{broker_id}"`) —— public sec_key 构造需要 market/code 上下文,
284    /// 不在本 cache 持有.
285    pub fn rebuild_broker_cache(
286        &self,
287        qot_cache: &crate::qot_cache::QotCache,
288        stock_id: u64,
289        broker_id: u32,
290        broker_cache_key: &str,
291    ) -> Option<CachedOrderBook> {
292        let caches = self.collect_exchange_caches_for_broker(stock_id, broker_id);
293        if caches.is_empty() {
294            return None;
295        }
296        let merged = crate::qot_cache::merge_multiple_order_book_caches(&caches, 40);
297        qot_cache
298            .order_books
299            .insert(broker_cache_key.to_string(), merged.clone());
300        qot_cache
301            .notify_cold_cache_waiters(&crate::qot_cache::order_book_wait_key(broker_cache_key));
302        Some(merged)
303    }
304}
305
306#[cfg(test)]
307mod tests;