Skip to main content

futu_cache/
qot_cache.rs

1// 行情数据缓存
2//
3// 对应 C++ NNDataCenter 中的 INNData_Qot_SecQot / INNData_Qot_KLRT 等
4// 使用 DashMap 实现并发安全的内存缓存
5//
6// ## v1.4.110 Phase 2 Slice 5: broker-aware overloads
7//
8// 加 broker-aware overload (`*_broker` 后缀) 让 crypto multi-broker push 写
9// 入独立 cache key (e.g. `"91_BTCUSDT@b1007"` vs `"91_BTCUSDT@b1008"`).
10//
11// 老 API 保留 — broker_id=None 时 `QotSecurityKey::cache_key()` 退化到原
12// `"market_code"` 形式, 与升级前行为完全等价. Phase 3 才会替换 reader caller
13// 改走 `*_broker` 版本 (handler `GetBasicQot` 等).
14
15use dashmap::DashMap;
16use futu_core::qot_stock_key::QotSecurityKey;
17use std::sync::Arc;
18use tokio::sync::Notify;
19
20/// v1.4.110 codex Phase 3 Slice 6c: cold-cache wait key for BasicQot.
21///
22/// 输入 cache_key (legacy `"market_code"` 或 broker-aware
23/// `"market_code@b1007"`), 输出 `"<cache_key>:basic"` 形式 wait key.
24/// 与 OrderBook wait key 分桶, 避免互相错唤醒.
25#[inline]
26pub fn basic_qot_wait_key(cache_key: &str) -> String {
27    format!("{cache_key}:basic")
28}
29
30/// v1.4.110 codex Phase 3 Slice 6c: cold-cache wait key for OrderBook.
31#[inline]
32pub fn order_book_wait_key(cache_key: &str) -> String {
33    format!("{cache_key}:orderbook")
34}
35
36/// 股票行情缓存 key: "market_code" (如 "1_00700") 或 broker-aware "market_code@b1007".
37///
38/// **v1.4.110 Phase 2 Slice 5**: cache key encoding 仍是 String (Phase 5 不
39/// 引入新 hash domain), broker-aware 后缀由 `QotSecurityKey::cache_key()` 编码:
40/// - no_broker: `"91_BTCUSDT"` (与升级前等价)
41/// - broker-aware: `"91_BTCUSDT@b1007"` (Phase 3 之后启用)
42pub type SecurityKey = String;
43
44/// 生成缓存 key
45pub fn make_key(market: i32, code: &str) -> SecurityKey {
46    format!("{market}_{code}")
47}
48
49/// 基本报价缓存
50#[derive(Debug, Clone)]
51pub struct CachedBasicQot {
52    pub cur_price: f64,
53    pub open_price: f64,
54    pub high_price: f64,
55    pub low_price: f64,
56    pub last_close_price: f64,
57    pub volume: i64,
58    pub turnover: f64,
59    pub turnover_rate: f64,
60    pub amplitude: f64,
61    pub is_suspended: bool,
62    pub update_time: String,
63    pub update_timestamp: f64,
64    /// v1.4.72 BUG-006 L3 (eli v1.4.69 P1): US 夜盘 OHLCV 数据。
65    /// backend 推送(CMD 6212 Qot_UpdateBasicQot)的 BasicQot.overnight (field 25)
66    /// 在夜盘时段会填充,regular hours 为 None。push_parser 提取并缓存,让
67    /// 下游 subscribe push + snapshot query 都能看到实时夜盘数据。
68    pub overnight: Option<CachedPreAfterMarketData>,
69    /// v1.4.106 codex 1140 F4 (P2): US 盘前 OHLCV 数据.
70    /// SBIT_US_PREMARKET_AFTERHOURS_DETAIL 推送时由 push_parser 解析填充, US
71    /// 盘前时段会有, regular hours / non-US → None. 下游 read 透传给 ftapi
72    /// `BasicQot.pre_market` (proto Qot_Common.proto:671). audit Finding 4.
73    pub pre_market: Option<CachedPreAfterMarketData>,
74    /// v1.4.106 codex 1140 F4 (P2): US 盘后 OHLCV 数据.
75    /// 同上, 但取 SBIT_US_PREMARKET_AFTERHOURS_DETAIL 的 after_hours 字段.
76    /// 下游 read 透传给 ftapi `BasicQot.after_market`. audit Finding 4.
77    pub after_market: Option<CachedPreAfterMarketData>,
78}
79
80/// v1.4.72 BUG-006 L3: 美股夜盘 OHLCV 数据(对齐 proto `Qot_Common::PreAfterMarketData`)
81///
82/// 同一 struct 在 pre_market / after_market / overnight 三个字段都复用。
83#[derive(Debug, Clone, Default)]
84pub struct CachedPreAfterMarketData {
85    pub price: Option<f64>,
86    pub high_price: Option<f64>,
87    pub low_price: Option<f64>,
88    pub volume: Option<i64>,
89    pub turnover: Option<f64>,
90    pub change_val: Option<f64>,
91    pub change_rate: Option<f64>,
92    pub amplitude: Option<f64>,
93}
94
95/// K 线缓存
96#[derive(Debug, Clone)]
97pub struct CachedKLine {
98    pub time: String,
99    pub open_price: f64,
100    pub high_price: f64,
101    pub low_price: f64,
102    pub close_price: f64,
103    pub volume: i64,
104    pub turnover: f64,
105}
106
107/// 摆盘缓存 (对齐 C++ Qot_UpdateOrderBook::S2C)
108#[derive(Debug, Clone, Default)]
109pub struct CachedOrderBook {
110    pub ask_list: Vec<CachedOrderBookLevel>,
111    pub bid_list: Vec<CachedOrderBookLevel>,
112    pub svr_recv_time_bid: Option<String>,
113    pub svr_recv_time_bid_timestamp: Option<f64>,
114    pub svr_recv_time_ask: Option<String>,
115    pub svr_recv_time_ask_timestamp: Option<f64>,
116}
117
118/// 摆盘单层
119#[derive(Debug, Clone)]
120pub struct CachedOrderBookLevel {
121    pub price: f64,
122    pub volume: i64,
123    pub order_count: i32,
124    /// v1.4.106 codex 1140 F7 (P2 audit Finding 7): SF 行情订单明细列表.
125    /// backend OrderBookItem.orders (重复 OrderInfo: order_id + order_size).
126    /// 仅 HK SF 行情 + prob=BIT_PROB_ORDER_BOOK_ALL_WITH_ID 时 backend 才返;
127    /// 普通行情 → 空 vec. 下游 ftapi `Qot_Common.OrderBook.detailList` 透传.
128    pub detail_list: Vec<CachedOrderBookDetail>,
129    /// v1.4.110 codex audit Round4 R4-4: 高精度委托数量 (crypto 适用).
130    ///
131    /// crypto 盘口的 `volume` 是放大整数 (`size × 10^order_size_precision`),
132    /// i64 无法表示小数量; `hp_volume = volume / 10^precision` 是真实小数量.
133    /// 普通行情 `volume` 已是精确整数 → `None` (对齐 C++ `has_hpvolume()==false`
134    /// 时 fallback `volume`). 下游 emit 到 ftapi `Qot_Common.OrderBook.hpVolume`.
135    ///
136    /// 对齐 C++ `QotRealTimeData.cpp` `pOrderBookItem->set_hpvolume(...)` +
137    /// merge `gear.dVolume += has_hpvolume() ? hpvolume() : volume()` —— merge
138    /// 累加的是 de-scale 后的真实量, 故按 level 存 (不同交易所 precision 可能不同).
139    pub hp_volume: Option<f64>,
140}
141
142/// v1.4.106 codex 1140 F7 (P2): 摆盘订单明细 (HK SF).
143/// 对齐 ftapi `Qot_Common.OrderBookDetail` (proto field orderID + volume).
144#[derive(Debug, Clone)]
145pub struct CachedOrderBookDetail {
146    pub order_id: i64,
147    pub volume: i64,
148}
149
150/// 逐笔成交缓存 (对齐 C++ Qot_UpdateTicker::S2C)
151///
152/// v1.4.106 codex 1140 F5: 加 `type_sign` 字段 (audit Finding 5),
153/// 对齐 ftapi `Qot_Common.Ticker.typeSign` (proto field 9). 之前 cache 缺
154/// 此字段, push event 与 read response 都没法透传 type_sign.
155#[derive(Debug, Clone)]
156pub struct CachedTicker {
157    pub time: String, // HH:MM:SS 时间字符串 (从 exchange_data_time_ms 派生, 按 market 时区)
158    pub sequence: i64, // tick_key, 用于去重
159    pub dir: i32,     // 1=Bid/卖盘, 2=Ask/买盘, 3=Neutral
160    pub price: f64,
161    pub volume: i64,
162    pub turnover: f64,            // price × volume
163    pub recv_time: Option<f64>,   // server_recv_from_exchange_time_ms (秒)
164    pub ticker_type: Option<i32>, // 逐笔类型 (TickItemType: BUY=1/SELL=2/NEUTRAL=3)
165    /// v1.4.106 codex 1140 F5: 逐笔类型符号 (audit Finding 5).
166    /// 来自 TickItem.trade_type (一个英文字母的 ASCII 码), backend 推送 +
167    /// FTAPI Ticker.typeSign 对外暴露给 UI.
168    pub type_sign: Option<i32>,
169    pub push_data_type: Option<i32>,
170    pub timestamp: Option<f64>,
171}
172
173/// 分时数据点
174#[derive(Debug, Clone)]
175pub struct CachedTimeShare {
176    pub time: String,
177    pub minute: i32,
178    pub price: f64,
179    pub last_close_price: f64,
180    pub avg_price: f64,
181    pub volume: i64,
182    pub turnover: f64,
183    pub timestamp: f64,
184}
185
186/// 经纪队列缓存 (对齐 C++ Qot_UpdateBroker::S2C)
187#[derive(Debug, Clone, Default)]
188pub struct CachedBroker {
189    pub bid_list: Vec<CachedBrokerItem>,
190    pub ask_list: Vec<CachedBrokerItem>,
191}
192
193/// 经纪队列单项
194#[derive(Debug, Clone)]
195pub struct CachedBrokerItem {
196    pub id: i64,
197    pub name: String,
198    pub pos: i32,
199    /// v1.4.106 codex 1140 F7 (P2 audit Finding 7): HK SF 行情订单 ID.
200    /// 对齐 ftapi `Qot_Common.Broker.orderID` (proto field 4 optional). 仅
201    /// HK SF 时 backend HKBrokerQueue.order_id_list 含值, 普通行情 → None.
202    pub order_id: Option<i64>,
203    /// v1.4.106 codex 1140 F7 (P2): HK SF 订单股数. 对齐 ftapi
204    /// `Qot_Common.Broker.volume` (proto field 5 optional).
205    pub volume: Option<i64>,
206}
207
208/// v1.4.106 codex 1140 F7 (P2 audit Finding 7): 券商配置表 (broker_id → 名称).
209/// 由 CMD 18008 (NN_ProtoCmd_Qot_Pull_BrokerInfo) 拉取并解析后填充.
210/// 替代旧 `format!("Broker#{bid}")` 占位符进入公开 API 的反模式.
211#[derive(Debug, Clone)]
212pub struct CachedBrokerInfo {
213    /// 中文简称 (sc) — 用作主显示名 (与 C++ GetBrokerName 同语义)
214    pub name_zh_cn: String,
215    /// 英文简称 (en)
216    pub name_en: String,
217    /// 中文繁体简称 (tc)
218    pub name_tc: String,
219}
220
221/// 行情缓存管理器
222pub struct QotCache {
223    /// 基本报价缓存
224    pub basic_qot: DashMap<SecurityKey, CachedBasicQot>,
225    /// US stock overnight-enabled state, keyed by backend stock_id.
226    ///
227    /// C++ stores this as `stockID -> bool` in `INNData_Qot_USStockOvernight`:
228    /// - `NNData_Qot_USStockOvernight.cpp:21-35` (missing key => false)
229    /// - `NNBiz_Qot_USStockState.cpp:180-190` writes `overnight_type == 1`
230    /// - `APIServer_Qot_MarketState.cpp:238-244` reads it for 11 -> 37 projection
231    pub us_stock_overnight: DashMap<u64, bool>,
232    /// K 线缓存: key = "market_code:kl_type"
233    pub klines: DashMap<String, Vec<CachedKLine>>,
234    /// 摆盘缓存
235    pub order_books: DashMap<SecurityKey, CachedOrderBook>,
236    /// 逐笔缓存: 保留最近 N 条
237    pub tickers: DashMap<SecurityKey, Vec<CachedTicker>>,
238    /// 分时缓存
239    /// v1.4.106 codex 1140 F6 (P2 audit Finding 6): RT cache key 加 session
240    /// 维度. 之前 `DashMap<SecurityKey, ...>` 把 RTH/ETH/PRE/AFTER 全部混到
241    /// 同一桶, 客户端订阅 RTH 也能读到 PRE 数据. 现在 key 是
242    /// "sec_key:s{session}" (RequestSection 0=NORMAL/1=FULL/2=PREMARKET/
243    /// 3=AFTERHOURS), 隔离不同 session.
244    pub rt_data: DashMap<String, Vec<CachedTimeShare>>,
245    /// 经纪队列缓存
246    pub brokers: DashMap<SecurityKey, CachedBroker>,
247    /// v1.4.106 codex 1140 F7 (P2 audit Finding 7): 券商 ID → 信息映射.
248    /// 由 CMD 18008 拉取后填充, 用于 push parser 从 broker_id 查真名 (替代
249    /// `Broker#{bid}` 占位符).
250    pub broker_dict: DashMap<i64, CachedBrokerInfo>,
251    /// v1.4.110 codex Phase 3 Slice 6c: cold-cache wait waiters.
252    ///
253    /// key = `"<cache_key>:<wait_kind>"` (e.g. `"91_BTCUSDT@b1007:basic"` /
254    /// `"1_00700:orderbook"`). value = shared `Arc<Notify>` 让 handler 阻塞等
255    /// push parser 写 cache 后唤醒.
256    ///
257    /// 对齐 C++ `APIServer_Qot_StockBasic.cpp:226-320` `WaitForReady` —
258    /// 已订阅但 cache 未就绪时 handler 主动 Pull_SubData + 等 push 写 cache.
259    ///
260    /// 设计 trade-off:
261    /// - 用 `DashMap<String, Arc<Notify>>` 而非 `RwLock<HashMap>`: 高并发读
262    ///   写不锁全表
263    /// - key 编码 wait_kind 防 basic / orderbook 共用同一 Notify 互相错唤醒
264    /// - update path 调 `notify_waiters` (broadcast 给所有 awaiter) 然后从
265    ///   map 中 remove (Arc 被 awaiter 持有, 自然释放)
266    pub cold_cache_waiters: DashMap<String, Arc<Notify>>,
267}
268
269impl QotCache {
270    pub fn new() -> Self {
271        Self {
272            basic_qot: DashMap::new(),
273            us_stock_overnight: DashMap::new(),
274            klines: DashMap::new(),
275            order_books: DashMap::new(),
276            tickers: DashMap::new(),
277            rt_data: DashMap::new(),
278            brokers: DashMap::new(),
279            // v1.4.106 codex 1140 F7: broker dict 由 CMD 18008 拉取后填充.
280            broker_dict: DashMap::new(),
281            // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait waiters.
282            cold_cache_waiters: DashMap::new(),
283        }
284    }
285
286    /// v1.4.110 codex Phase 3 Slice 6c: 注册 cold-cache wait waiter.
287    ///
288    /// 返已存在或新建的 `Arc<Notify>`. handler 调:
289    /// 1. `register_cold_cache_waiter("91_BTCUSDT@b1007:basic")` 获 Notify
290    /// 2. 主动发 Pull_SubData CMD6824
291    /// 3. `tokio::time::timeout(Duration::from_secs(3), notify.notified())` 等
292    /// 4. 再 `get_basic_qot_broker(&key)` 读 cache (可能仍 None — 真 timeout)
293    ///
294    /// `wait_kind` 推荐: `"basic"` / `"orderbook"`. 不混 sub_type 数字防误唤.
295    pub fn register_cold_cache_waiter(&self, wait_key: &str) -> Arc<Notify> {
296        self.cold_cache_waiters
297            .entry(wait_key.to_string())
298            .or_insert_with(|| Arc::new(Notify::new()))
299            .clone()
300    }
301
302    /// v1.4.110 codex Phase 3 Slice 6c: 唤醒指定 cold-cache wait waiter.
303    ///
304    /// push parser update path 调 (`update_basic_qot` / `update_order_book` /
305    /// `_broker` 变种). 没 waiter → no-op. 有 waiter → `notify_waiters()`
306    /// broadcast 给所有 awaiter, 然后 remove (Arc 仍被 awaiter 持有, 自然释放).
307    pub fn notify_cold_cache_waiters(&self, wait_key: &str) {
308        if let Some((_, n)) = self.cold_cache_waiters.remove(wait_key) {
309            n.notify_waiters();
310        }
311    }
312
313    /// v1.4.110 codex audit Round3 #22: cold-cache wait timeout 后清 idle waiter.
314    ///
315    /// `wait_for_basic_cache` / `wait_for_order_book_cache` 3s timeout 仍 cache
316    /// miss 时调. 若 push 始终没来, `notify_cold_cache_waiters` 不会触发, entry
317    /// 会一直留在 `cold_cache_waiters` map (虽 bounded by distinct wait_key 数,
318    /// 仍是慢速 leak).
319    ///
320    /// **只删 caller 自己注册的那个 entry, 且无其他并发 awaiter 时才删**:
321    /// `remove_if` closure 在 entry lock 下原子检查两条:
322    /// 1. `Arc::ptr_eq(stored, caller_notify)` — stored 必须就是 caller 当初
323    ///    `register_cold_cache_waiter` 拿到的同一 Arc. 防 race: caller timeout
324    ///    后到本调用之间, 若 push 触发 `notify_cold_cache_waiters` 删了旧 entry,
325    ///    另一个 `wait_for_*` 又 register 建了**新** entry (不同 Arc), `ptr_eq`
326    ///    false → 不误删别人的新 entry.
327    /// 2. `Arc::strong_count(stored) <= 2` — DashMap stored Arc 1 + caller
328    ///    持有的 `caller_notify` 1. `> 2` 表示有其他 `wait_for_*` 仍 await 同
329    ///    entry → 保留让它们能被 notify 唤醒.
330    ///
331    /// caller 约定: 必须把 `register_cold_cache_waiter` 返回的 `Arc<Notify>`
332    /// 原样传进来 (caller 全程持有未 drop).
333    pub fn cleanup_cold_cache_waiter_if_idle(&self, wait_key: &str, caller_notify: &Arc<Notify>) {
334        self.cold_cache_waiters.remove_if(wait_key, |_, stored| {
335            Arc::ptr_eq(stored, caller_notify) && Arc::strong_count(stored) <= 2
336        });
337    }
338
339    /// Update C++-style US overnight stock state (`stockID -> bool`).
340    ///
341    /// Ref: `NNData_Qot_USStockOvernight.cpp:21-35` and
342    /// `NNBiz_Qot_USStockState.cpp:180-190`.
343    pub fn set_us_stock_overnight_state(&self, stock_id: u64, is_overnight: bool) {
344        if stock_id == 0 {
345            return;
346        }
347        self.us_stock_overnight.insert(stock_id, is_overnight);
348    }
349
350    /// Query whether a US stock is currently in overnight trading.
351    ///
352    /// C++ cache miss returns false (`NNData_Qot_USStockOvernight.cpp:29-34`).
353    pub fn is_us_stock_overnight(&self, stock_id: u64) -> bool {
354        self.us_stock_overnight
355            .get(&stock_id)
356            .map(|v| *v)
357            .unwrap_or(false)
358    }
359
360    /// v1.4.106 codex 1140 F7 (P2): 查 broker_id → broker name (中文简称).
361    /// cache miss → None, 调用方决定 fallback 策略 (push parser 用
362    /// `Broker#{id}` 作 emergency fallback, 但同时 warn-log 提示 dict 未加载).
363    pub fn get_broker_name(&self, broker_id: i64) -> Option<String> {
364        self.broker_dict
365            .get(&broker_id)
366            .map(|info| info.name_zh_cn.clone())
367    }
368
369    /// v1.4.106 codex 1140 F7 (P2): 批量写入 broker dict (CMD 18008 解析后调).
370    pub fn install_broker_dict(&self, entries: Vec<(i64, CachedBrokerInfo)>) {
371        for (id, info) in entries {
372            self.broker_dict.insert(id, info);
373        }
374    }
375
376    /// 更新基本报价
377    pub fn update_basic_qot(&self, key: &str, qot: CachedBasicQot) {
378        self.basic_qot.insert(key.to_string(), qot);
379        // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
380        self.notify_cold_cache_waiters(&basic_qot_wait_key(key));
381    }
382
383    /// 获取基本报价
384    pub fn get_basic_qot(&self, key: &str) -> Option<CachedBasicQot> {
385        self.basic_qot.get(key).map(|v| v.clone())
386    }
387
388    /// **v1.4.110 Phase 2 Slice 5**: 更新基本报价 (broker-aware).
389    ///
390    /// 用 `QotSecurityKey::cache_key()` 派生 String key. broker_id=None → 与
391    /// `update_basic_qot(public_sec_key, ...)` 等价; broker_id=Some(N) → 写
392    /// 独立 cache key `"market_code@b{N}"` (crypto multi-broker isolation).
393    pub fn update_basic_qot_broker(&self, key: &QotSecurityKey, qot: CachedBasicQot) {
394        let cache_key = key.cache_key();
395        self.basic_qot.insert(cache_key.clone(), qot);
396        // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
397        self.notify_cold_cache_waiters(&basic_qot_wait_key(&cache_key));
398    }
399
400    /// **v1.4.110 Phase 2 Slice 5**: 获取基本报价 (broker-aware).
401    pub fn get_basic_qot_broker(&self, key: &QotSecurityKey) -> Option<CachedBasicQot> {
402        self.basic_qot.get(&key.cache_key()).map(|v| v.clone())
403    }
404
405    /// 构造 RT 分时 cache key (v1.4.106 codex 1140 F6).
406    ///
407    /// 之前 key 仅 sec_key, RTH/ETH/PRE/AFTER/OVERNIGHT 混桶. 现在
408    /// "sec_key:s{session}" 隔离. session 来自 push 解析的
409    /// `TimeSharingPlans.section_type[0]` (RequestSection enum):
410    /// 0=NORMAL/1=FULL/2=PREMARKET/3=AFTERHOURS/5=OVERNIGHT.
411    /// GetRT handler 对 C++ 的 Session_ETH/Session_ALL 读取语义做动态拼接,
412    /// 不依赖额外 aggregate cache 桶。
413    pub fn make_rt_key(sec_key: &str, session: i32) -> String {
414        format!("{sec_key}:s{session}")
415    }
416
417    /// **v1.4.110 Phase 2 Slice 5**: 构造 RT 分时 cache key (broker-aware).
418    ///
419    /// 用 `QotSecurityKey::cache_key()` 作 prefix. broker_id=None → 与
420    /// `make_rt_key(public_sec_key, session)` 等价; broker_id=Some(N) → 写
421    /// 独立 cache key `"market_code@b{N}:s{session}"`.
422    pub fn make_rt_key_broker(key: &QotSecurityKey, session: i32) -> String {
423        format!("{}:s{}", key.cache_key(), session)
424    }
425
426    /// **v1.4.110 Phase 2 Slice 5**: 更新 RT 分时 (broker-aware).
427    pub fn update_rt_data_broker(
428        &self,
429        key: &QotSecurityKey,
430        session: i32,
431        rt_data: Vec<CachedTimeShare>,
432    ) {
433        let cache_key = Self::make_rt_key_broker(key, session);
434        self.rt_data.insert(cache_key, rt_data);
435    }
436
437    /// **v1.4.110 Phase 2 Slice 5**: 获取 RT 分时 (broker-aware).
438    pub fn get_rt_data_broker(
439        &self,
440        key: &QotSecurityKey,
441        session: i32,
442    ) -> Option<Vec<CachedTimeShare>> {
443        let cache_key = Self::make_rt_key_broker(key, session);
444        self.rt_data.get(&cache_key).map(|v| v.clone())
445    }
446
447    /// 构造 K 线 cache key (v1.4.106 codex 1140 F3 4-tuple).
448    ///
449    /// 之前 key 仅 `(sec_key, kl_type)` 2-tuple, 同股票同 KLType 但前复权 vs
450    /// 后复权 / RTH vs ETH 数据互相覆盖. 对齐 C++ APIServer_Qot_KL.cpp:
451    /// `GetNewestKLByCount(stock_id, enRehabType, enKLType, num, session, ...)`
452    /// 用 4 维 key.
453    ///
454    /// - `rehab`: proto Qot_Common.RehabType (0=None, 1=Forward, 2=Backward),
455    ///   对齐 backend `FTCmdKline.ExrightType`. 同一股票同一 kl_type 不同 rehab
456    ///   走独立 cache, 不互相覆盖.
457    /// - `kl_type`: proto Qot_Common.KLType (1=1Min, 2=Day, ..., 11=Quarter).
458    /// - `session`: proto FTCmdKline.RequestSection (0=NORMAL, 1=FULL,
459    ///   2=PREMARKET, 3=AFTERHOURS). RTH/ETH 隔离, push 来自 backend 的
460    ///   `point.section_type[0]` 决定写入桶, read 由 client subscription
461    ///   session 决定 (尚无, 默认 NORMAL).
462    pub fn make_kline_key(sec_key: &str, rehab: i32, kl_type: i32, session: i32) -> String {
463        format!("{sec_key}:r{rehab}:k{kl_type}:s{session}")
464    }
465
466    /// 更新 K 线 (v1.4.106 codex 1140 F3: 4-tuple key, rehab + session 隔离).
467    pub fn update_klines(
468        &self,
469        sec_key: &str,
470        rehab: i32,
471        kl_type: i32,
472        session: i32,
473        klines: Vec<CachedKLine>,
474    ) {
475        let cache_key = Self::make_kline_key(sec_key, rehab, kl_type, session);
476        self.klines.insert(cache_key, klines);
477    }
478
479    /// 获取 K 线 (v1.4.106 codex 1140 F3: 4-tuple key, rehab + session 隔离).
480    pub fn get_klines(
481        &self,
482        sec_key: &str,
483        rehab: i32,
484        kl_type: i32,
485        session: i32,
486    ) -> Option<Vec<CachedKLine>> {
487        let cache_key = Self::make_kline_key(sec_key, rehab, kl_type, session);
488        self.klines.get(&cache_key).map(|v| v.clone())
489    }
490
491    /// **v1.4.110 Phase 2 Slice 5**: 更新 K 线 (broker-aware).
492    ///
493    /// 用 `QotSecurityKey::cache_key()` 作 prefix, broker_id=None 时退化到原行为.
494    /// composite 维度仍是 4-tuple `(rehab, kl_type, session)`, broker_id 是第 5
495    /// 维通过 `QotSecurityKey` 注入到 prefix.
496    pub fn update_klines_broker(
497        &self,
498        key: &QotSecurityKey,
499        rehab: i32,
500        kl_type: i32,
501        session: i32,
502        klines: Vec<CachedKLine>,
503    ) {
504        let cache_key = Self::make_kline_key(&key.cache_key(), rehab, kl_type, session);
505        self.klines.insert(cache_key, klines);
506    }
507
508    /// **v1.4.110 Phase 2 Slice 5**: 获取 K 线 (broker-aware).
509    pub fn get_klines_broker(
510        &self,
511        key: &QotSecurityKey,
512        rehab: i32,
513        kl_type: i32,
514        session: i32,
515    ) -> Option<Vec<CachedKLine>> {
516        let cache_key = Self::make_kline_key(&key.cache_key(), rehab, kl_type, session);
517        self.klines.get(&cache_key).map(|v| v.clone())
518    }
519
520    /// 更新摆盘
521    pub fn update_order_book(&self, key: &str, ob: CachedOrderBook) {
522        self.order_books.insert(key.to_string(), ob);
523        // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
524        self.notify_cold_cache_waiters(&order_book_wait_key(key));
525    }
526
527    /// **v1.4.110 Phase 2 Slice 5**: 更新摆盘 (broker-aware).
528    pub fn update_order_book_broker(&self, key: &QotSecurityKey, ob: CachedOrderBook) {
529        let cache_key = key.cache_key();
530        self.order_books.insert(cache_key.clone(), ob);
531        // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
532        self.notify_cold_cache_waiters(&order_book_wait_key(&cache_key));
533    }
534
535    /// **v1.4.110 Phase 2 Slice 5**: 获取摆盘 (broker-aware).
536    pub fn get_order_book_broker(&self, key: &QotSecurityKey) -> Option<CachedOrderBook> {
537        self.order_books.get(&key.cache_key()).map(|v| v.clone())
538    }
539
540    /// 追加逐笔(保留最近 1000 条)
541    pub fn append_tickers(&self, key: &str, new_tickers: Vec<CachedTicker>) {
542        let mut entry = self.tickers.entry(key.to_string()).or_default();
543        entry.extend(new_tickers);
544        if entry.len() > 1000 {
545            let drain_count = entry.len() - 1000;
546            entry.drain(..drain_count);
547        }
548    }
549
550    /// **v1.4.110 Phase 2 Slice 5**: 追加逐笔 (broker-aware).
551    pub fn append_tickers_broker(&self, key: &QotSecurityKey, new_tickers: Vec<CachedTicker>) {
552        let mut entry = self.tickers.entry(key.cache_key()).or_default();
553        entry.extend(new_tickers);
554        if entry.len() > 1000 {
555            let drain_count = entry.len() - 1000;
556            entry.drain(..drain_count);
557        }
558    }
559
560    /// **v1.4.110 Phase 2 Slice 5**: 获取逐笔 (broker-aware).
561    pub fn get_tickers_broker(&self, key: &QotSecurityKey) -> Option<Vec<CachedTicker>> {
562        self.tickers.get(&key.cache_key()).map(|v| v.clone())
563    }
564
565    /// 更新经纪队列
566    pub fn update_broker(&self, key: &str, broker: CachedBroker) {
567        self.brokers.insert(key.to_string(), broker);
568    }
569
570    /// 获取经纪队列
571    pub fn get_broker(&self, key: &str) -> Option<CachedBroker> {
572        self.brokers.get(key).map(|v| v.clone())
573    }
574
575    /// 清除指定股票的所有缓存
576    pub fn clear_security(&self, key: &str) {
577        self.basic_qot.remove(key);
578        self.order_books.remove(key);
579        self.tickers.remove(key);
580        self.brokers.remove(key);
581        // v1.4.106 codex 1140 F3: K 线 key 是 "sec_key:r{rehab}:k{kl_type}:s{session}"
582        // 4-tuple, 仍是 sec_key prefix 起头, retain prefix match 仍正确清所有维度.
583        let prefix = format!("{key}:");
584        self.klines.retain(|k, _| !k.starts_with(&prefix));
585        // v1.4.106 codex 1140 F6: rt_data key 也加 session 维度后变为
586        // "sec_key:s{session}", 同样 prefix-match 清 RTH/ETH/ALL 全部桶.
587        self.rt_data.retain(|k, _| !k.starts_with(&prefix));
588    }
589
590    /// **v1.4.110 Phase 2 Slice 5**: 清除指定股票的所有缓存 (broker-aware).
591    ///
592    /// 用 `QotSecurityKey::cache_key()` 派生 cache key 字符串. broker_id=None
593    /// → 与 `clear_security(public_sec_key)` 等价; broker_id=Some(N) → 只清
594    /// 该 broker 下的 cache (其他 broker 下同 stock_id 的 cache 保留).
595    pub fn clear_security_broker(&self, key: &QotSecurityKey) {
596        let cache_key = key.cache_key();
597        self.basic_qot.remove(&cache_key);
598        self.order_books.remove(&cache_key);
599        self.tickers.remove(&cache_key);
600        self.brokers.remove(&cache_key);
601        let prefix = format!("{cache_key}:");
602        self.klines.retain(|k, _| !k.starts_with(&prefix));
603        self.rt_data.retain(|k, _| !k.starts_with(&prefix));
604    }
605}
606
607impl Default for QotCache {
608    fn default() -> Self {
609        Self::new()
610    }
611}
612
613// =============================================================================
614// v1.4.110 codex QOT Phase 4 Slice 7: MergeMultipleOrderBookCaches pure fn.
615//
616// 对齐 C++ `QotRealTimeData::MergeMultipleOrderBookCaches` + `MergeOrderBookLists`
617// (QotRealTimeData.cpp:1440-1591):
618//
619// 1. **Per-side price-key merge** — 每档按 `price * 1e9` 取整作 key, 同 key
620//    累加 volume / order_count / detail_list (C++ MergedGear struct).
621// 2. **Side-aware sort** — bid 降序 (高价先), ask 升序 (低价先).
622// 3. **Top N truncate** — 仅当 `max_depth > 0` 时截断 (crypto LV2 用 40).
623// 4. **Timestamp**: bid/ask 各自取多 source 的最大 timestamp.
624// 5. **Skip price ≤ 0**: C++ "过滤空数据占位条目" (line 1466).
625// =============================================================================
626
627/// Per-side merge buffer (对齐 C++ `MergedGear`).
628#[derive(Debug, Clone)]
629struct MergedGear {
630    price: f64,
631    volume: i64,
632    order_count: i32,
633    detail_list: Vec<CachedOrderBookDetail>,
634    /// v1.4.110 codex audit Round4 R4-4: 高精度量累加器. 对齐 C++
635    /// `gear.dVolume += has_hpvolume() ? hpvolume() : volume()` —— 累加 de-scale
636    /// 后的真实量 (有 hp_volume 用之, 否则 fallback `volume`).
637    hp_volume: f64,
638}
639
640/// Per-side merge: take multiple lists, aggregate by price key, sort, optionally
641/// truncate to `max_depth`. 不会修改输入.
642///
643/// `is_bid=true` → sort 降序; `false` → 升序.
644fn merge_order_book_side(
645    sources: &[&[CachedOrderBookLevel]],
646    is_bid: bool,
647    max_depth: usize,
648) -> Vec<CachedOrderBookLevel> {
649    if sources.is_empty() {
650        return vec![];
651    }
652    // 价格精度放大到整数 (9 位小数, 对齐 C++ `PriceToKey`).
653    fn price_to_key(p: f64) -> i64 {
654        (p * 1e9 + 0.5) as i64
655    }
656
657    let mut merged: std::collections::HashMap<i64, MergedGear> = std::collections::HashMap::new();
658    for list in sources {
659        for level in list.iter() {
660            // C++: 过滤空数据占位条目 (price <= 0).
661            if level.price <= 0.0 {
662                continue;
663            }
664            let key = price_to_key(level.price);
665            let entry = merged.entry(key).or_insert(MergedGear {
666                price: level.price,
667                volume: 0,
668                order_count: 0,
669                detail_list: vec![],
670                hp_volume: 0.0,
671            });
672            entry.volume = entry.volume.saturating_add(level.volume);
673            entry.order_count = entry.order_count.saturating_add(level.order_count);
674            entry.detail_list.extend(level.detail_list.iter().cloned());
675            // v1.4.110 codex audit Round4 R4-4: 对齐 C++ merge gear.dVolume 累加 ——
676            // 有 hp_volume 用真实小数量, 否则 fallback `volume`.
677            entry.hp_volume += level.hp_volume.unwrap_or(level.volume as f64);
678        }
679    }
680
681    let mut keys: Vec<i64> = merged.keys().copied().collect();
682    if is_bid {
683        // 买盘: 价格从高到低 (降序).
684        keys.sort_by(|a, b| b.cmp(a));
685    } else {
686        // 卖盘: 价格从低到高 (升序).
687        keys.sort();
688    }
689
690    let mut out: Vec<CachedOrderBookLevel> = keys
691        .iter()
692        .map(|k| {
693            let g = &merged[k];
694            CachedOrderBookLevel {
695                price: g.price,
696                volume: g.volume,
697                order_count: g.order_count,
698                detail_list: g.detail_list.clone(),
699                // v1.4.110 codex audit Round4 R4-4: merge 输出 crypto 多交易所
700                // 合单盘口, 始终带累加后的 hp_volume (对齐 C++ set_hpvolume).
701                hp_volume: Some(g.hp_volume),
702            }
703        })
704        .collect();
705
706    // 深度截断: max_depth > 0 时生效.
707    if max_depth > 0 && out.len() > max_depth {
708        out.truncate(max_depth);
709    }
710    out
711}
712
713/// Merge N exchange-level orderbook caches into one broker-level cache.
714///
715/// 对齐 C++ `QotRealTimeData::MergeMultipleOrderBookCaches(vSources, pbMerged,
716/// nMaxDepth=40)` (QotRealTimeData.cpp:1534-1591):
717/// - 第一个 source 作为基础 (拷 svr_recv_time 等基本字段, 但 bid/ask list 重算)
718/// - 按 price key 聚合 bid/ask, 同 price 累加 volume + order_count + detail
719/// - bid 高价优先, ask 低价优先, 截断到 `max_depth`
720/// - bid/ask timestamp 取多 source 最大值
721///
722/// `max_depth = 0` → 不截断 (C++ 行为, line 1579-1583 `if nMaxDepth > 0`).
723/// `max_depth = 40` → crypto LV2 broker-level cache 用 (line 1014).
724pub fn merge_multiple_order_book_caches(
725    sources: &[CachedOrderBook],
726    max_depth: usize,
727) -> CachedOrderBook {
728    if sources.is_empty() {
729        return CachedOrderBook::default();
730    }
731
732    // v1.4.110 codex audit Round2 P2 #17: 收 slice-of-slice 而非 deep clone.
733    // merge_order_book_side 只读 levels (price/volume/order_count/detail_list),
734    // 不消费 Vec — 之前 `.bid_list.clone()` 每 source deep clone N×M levels
735    // (5 exchange × 40 level = 200 clone per LV2 push), crypto LV2 push 1Hz+
736    // 频率下浪费. 改 `.as_slice()` 只收引用, 零 Level clone.
737    let bid_sources: Vec<&[CachedOrderBookLevel]> =
738        sources.iter().map(|s| s.bid_list.as_slice()).collect();
739    let ask_sources: Vec<&[CachedOrderBookLevel]> =
740        sources.iter().map(|s| s.ask_list.as_slice()).collect();
741
742    let bid_merged = merge_order_book_side(&bid_sources, true, max_depth);
743    let ask_merged = merge_order_book_side(&ask_sources, false, max_depth);
744
745    // Timestamp 取所有 source 最大 (对齐 C++ line 1572-1575).
746    let max_bid_ts = sources
747        .iter()
748        .filter_map(|s| s.svr_recv_time_bid_timestamp)
749        .fold(None::<f64>, |acc, t| {
750            Some(acc.map(|a| a.max(t)).unwrap_or(t))
751        });
752    let max_ask_ts = sources
753        .iter()
754        .filter_map(|s| s.svr_recv_time_ask_timestamp)
755        .fold(None::<f64>, |acc, t| {
756            Some(acc.map(|a| a.max(t)).unwrap_or(t))
757        });
758
759    CachedOrderBook {
760        bid_list: bid_merged,
761        ask_list: ask_merged,
762        svr_recv_time_bid: None,
763        svr_recv_time_bid_timestamp: max_bid_ts,
764        svr_recv_time_ask: None,
765        svr_recv_time_ask_timestamp: max_ask_ts,
766    }
767}
768
769#[cfg(test)]
770mod merge_tests;
771
772#[cfg(test)]
773mod tests;