futu_cache/qot_cache.rs
1// 行情数据缓存
2//
3// 对应 C++ NNDataCenter 中的 INNData_Qot_SecQot / INNData_Qot_KLRT 等
4// 使用 DashMap 实现并发安全的内存缓存
5//
6// ## v1.4.110 Phase 2 Slice 5: broker-aware overloads
7//
8// 加 broker-aware overload (`*_broker` 后缀) 让 crypto multi-broker push 写
9// 入独立 cache key (e.g. `"91_BTCUSDT@b1007"` vs `"91_BTCUSDT@b1008"`).
10//
11// 老 API 保留 — broker_id=None 时 `QotSecurityKey::cache_key()` 退化到原
12// `"market_code"` 形式, 与升级前行为完全等价. Phase 3 才会替换 reader caller
13// 改走 `*_broker` 版本 (handler `GetBasicQot` 等).
14
15use dashmap::DashMap;
16use futu_core::qot_stock_key::QotSecurityKey;
17use std::sync::Arc;
18use tokio::sync::Notify;
19
20/// v1.4.110 codex Phase 3 Slice 6c: cold-cache wait key for BasicQot.
21///
22/// 输入 cache_key (legacy `"market_code"` 或 broker-aware
23/// `"market_code@b1007"`), 输出 `"<cache_key>:basic"` 形式 wait key.
24/// 与 OrderBook wait key 分桶, 避免互相错唤醒.
25#[inline]
26pub fn basic_qot_wait_key(cache_key: &str) -> String {
27 format!("{cache_key}:basic")
28}
29
30/// v1.4.110 codex Phase 3 Slice 6c: cold-cache wait key for OrderBook.
31#[inline]
32pub fn order_book_wait_key(cache_key: &str) -> String {
33 format!("{cache_key}:orderbook")
34}
35
36/// 股票行情缓存 key: "market_code" (如 "1_00700") 或 broker-aware "market_code@b1007".
37///
38/// **v1.4.110 Phase 2 Slice 5**: cache key encoding 仍是 String (Phase 5 不
39/// 引入新 hash domain), broker-aware 后缀由 `QotSecurityKey::cache_key()` 编码:
40/// - no_broker: `"91_BTCUSDT"` (与升级前等价)
41/// - broker-aware: `"91_BTCUSDT@b1007"` (Phase 3 之后启用)
42pub type SecurityKey = String;
43
44/// 生成缓存 key
45pub fn make_key(market: i32, code: &str) -> SecurityKey {
46 format!("{market}_{code}")
47}
48
49/// 基本报价缓存
50#[derive(Debug, Clone)]
51pub struct CachedBasicQot {
52 pub cur_price: f64,
53 pub open_price: f64,
54 pub high_price: f64,
55 pub low_price: f64,
56 pub last_close_price: f64,
57 pub volume: i64,
58 pub turnover: f64,
59 pub turnover_rate: f64,
60 pub amplitude: f64,
61 pub is_suspended: bool,
62 pub update_time: String,
63 pub update_timestamp: f64,
64 /// v1.4.72 BUG-006 L3 (eli v1.4.69 P1): US 夜盘 OHLCV 数据。
65 /// backend 推送(CMD 6212 Qot_UpdateBasicQot)的 BasicQot.overnight (field 25)
66 /// 在夜盘时段会填充,regular hours 为 None。push_parser 提取并缓存,让
67 /// 下游 subscribe push + snapshot query 都能看到实时夜盘数据。
68 pub overnight: Option<CachedPreAfterMarketData>,
69 /// v1.4.106 codex 1140 F4 (P2): US 盘前 OHLCV 数据.
70 /// SBIT_US_PREMARKET_AFTERHOURS_DETAIL 推送时由 push_parser 解析填充, US
71 /// 盘前时段会有, regular hours / non-US → None. 下游 read 透传给 ftapi
72 /// `BasicQot.pre_market` (proto Qot_Common.proto:671). audit Finding 4.
73 pub pre_market: Option<CachedPreAfterMarketData>,
74 /// v1.4.106 codex 1140 F4 (P2): US 盘后 OHLCV 数据.
75 /// 同上, 但取 SBIT_US_PREMARKET_AFTERHOURS_DETAIL 的 after_hours 字段.
76 /// 下游 read 透传给 ftapi `BasicQot.after_market`. audit Finding 4.
77 pub after_market: Option<CachedPreAfterMarketData>,
78}
79
80/// v1.4.72 BUG-006 L3: 美股夜盘 OHLCV 数据(对齐 proto `Qot_Common::PreAfterMarketData`)
81///
82/// 同一 struct 在 pre_market / after_market / overnight 三个字段都复用。
83#[derive(Debug, Clone, Default)]
84pub struct CachedPreAfterMarketData {
85 pub price: Option<f64>,
86 pub high_price: Option<f64>,
87 pub low_price: Option<f64>,
88 pub volume: Option<i64>,
89 pub turnover: Option<f64>,
90 pub change_val: Option<f64>,
91 pub change_rate: Option<f64>,
92 pub amplitude: Option<f64>,
93}
94
95/// K 线缓存
96#[derive(Debug, Clone)]
97pub struct CachedKLine {
98 pub time: String,
99 pub open_price: f64,
100 pub high_price: f64,
101 pub low_price: f64,
102 pub close_price: f64,
103 pub volume: i64,
104 pub turnover: f64,
105}
106
107/// 摆盘缓存 (对齐 C++ Qot_UpdateOrderBook::S2C)
108#[derive(Debug, Clone, Default)]
109pub struct CachedOrderBook {
110 pub ask_list: Vec<CachedOrderBookLevel>,
111 pub bid_list: Vec<CachedOrderBookLevel>,
112 pub svr_recv_time_bid: Option<String>,
113 pub svr_recv_time_bid_timestamp: Option<f64>,
114 pub svr_recv_time_ask: Option<String>,
115 pub svr_recv_time_ask_timestamp: Option<f64>,
116}
117
118/// 摆盘单层
119#[derive(Debug, Clone)]
120pub struct CachedOrderBookLevel {
121 pub price: f64,
122 pub volume: i64,
123 pub order_count: i32,
124 /// v1.4.106 codex 1140 F7 (P2 audit Finding 7): SF 行情订单明细列表.
125 /// backend OrderBookItem.orders (重复 OrderInfo: order_id + order_size).
126 /// 仅 HK SF 行情 + prob=BIT_PROB_ORDER_BOOK_ALL_WITH_ID 时 backend 才返;
127 /// 普通行情 → 空 vec. 下游 ftapi `Qot_Common.OrderBook.detailList` 透传.
128 pub detail_list: Vec<CachedOrderBookDetail>,
129 /// v1.4.110 codex audit Round4 R4-4: 高精度委托数量 (crypto 适用).
130 ///
131 /// crypto 盘口的 `volume` 是放大整数 (`size × 10^order_size_precision`),
132 /// i64 无法表示小数量; `hp_volume = volume / 10^precision` 是真实小数量.
133 /// 普通行情 `volume` 已是精确整数 → `None` (对齐 C++ `has_hpvolume()==false`
134 /// 时 fallback `volume`). 下游 emit 到 ftapi `Qot_Common.OrderBook.hpVolume`.
135 ///
136 /// 对齐 C++ `QotRealTimeData.cpp` `pOrderBookItem->set_hpvolume(...)` +
137 /// merge `gear.dVolume += has_hpvolume() ? hpvolume() : volume()` —— merge
138 /// 累加的是 de-scale 后的真实量, 故按 level 存 (不同交易所 precision 可能不同).
139 pub hp_volume: Option<f64>,
140}
141
142/// v1.4.106 codex 1140 F7 (P2): 摆盘订单明细 (HK SF).
143/// 对齐 ftapi `Qot_Common.OrderBookDetail` (proto field orderID + volume).
144#[derive(Debug, Clone)]
145pub struct CachedOrderBookDetail {
146 pub order_id: i64,
147 pub volume: i64,
148}
149
150/// 逐笔成交缓存 (对齐 C++ Qot_UpdateTicker::S2C)
151///
152/// v1.4.106 codex 1140 F5: 加 `type_sign` 字段 (audit Finding 5),
153/// 对齐 ftapi `Qot_Common.Ticker.typeSign` (proto field 9). 之前 cache 缺
154/// 此字段, push event 与 read response 都没法透传 type_sign.
155#[derive(Debug, Clone)]
156pub struct CachedTicker {
157 pub time: String, // HH:MM:SS 时间字符串 (从 exchange_data_time_ms 派生, 按 market 时区)
158 pub sequence: i64, // tick_key, 用于去重
159 pub dir: i32, // 1=Bid/卖盘, 2=Ask/买盘, 3=Neutral
160 pub price: f64,
161 pub volume: i64,
162 pub turnover: f64, // price × volume
163 pub recv_time: Option<f64>, // server_recv_from_exchange_time_ms (秒)
164 pub ticker_type: Option<i32>, // 逐笔类型 (TickItemType: BUY=1/SELL=2/NEUTRAL=3)
165 /// v1.4.106 codex 1140 F5: 逐笔类型符号 (audit Finding 5).
166 /// 来自 TickItem.trade_type (一个英文字母的 ASCII 码), backend 推送 +
167 /// FTAPI Ticker.typeSign 对外暴露给 UI.
168 pub type_sign: Option<i32>,
169 pub push_data_type: Option<i32>,
170 pub timestamp: Option<f64>,
171}
172
173/// 分时数据点
174#[derive(Debug, Clone)]
175pub struct CachedTimeShare {
176 pub time: String,
177 pub minute: i32,
178 pub price: f64,
179 pub last_close_price: f64,
180 pub avg_price: f64,
181 pub volume: i64,
182 pub turnover: f64,
183 pub timestamp: f64,
184}
185
186/// 经纪队列缓存 (对齐 C++ Qot_UpdateBroker::S2C)
187#[derive(Debug, Clone, Default)]
188pub struct CachedBroker {
189 pub bid_list: Vec<CachedBrokerItem>,
190 pub ask_list: Vec<CachedBrokerItem>,
191}
192
193/// 经纪队列单项
194#[derive(Debug, Clone)]
195pub struct CachedBrokerItem {
196 pub id: i64,
197 pub name: String,
198 pub pos: i32,
199 /// v1.4.106 codex 1140 F7 (P2 audit Finding 7): HK SF 行情订单 ID.
200 /// 对齐 ftapi `Qot_Common.Broker.orderID` (proto field 4 optional). 仅
201 /// HK SF 时 backend HKBrokerQueue.order_id_list 含值, 普通行情 → None.
202 pub order_id: Option<i64>,
203 /// v1.4.106 codex 1140 F7 (P2): HK SF 订单股数. 对齐 ftapi
204 /// `Qot_Common.Broker.volume` (proto field 5 optional).
205 pub volume: Option<i64>,
206}
207
208/// v1.4.106 codex 1140 F7 (P2 audit Finding 7): 券商配置表 (broker_id → 名称).
209/// 由 CMD 18008 (NN_ProtoCmd_Qot_Pull_BrokerInfo) 拉取并解析后填充.
210/// 替代旧 `format!("Broker#{bid}")` 占位符进入公开 API 的反模式.
211#[derive(Debug, Clone)]
212pub struct CachedBrokerInfo {
213 /// 中文简称 (sc) — 用作主显示名 (与 C++ GetBrokerName 同语义)
214 pub name_zh_cn: String,
215 /// 英文简称 (en)
216 pub name_en: String,
217 /// 中文繁体简称 (tc)
218 pub name_tc: String,
219}
220
221/// 行情缓存管理器
222pub struct QotCache {
223 /// 基本报价缓存
224 pub basic_qot: DashMap<SecurityKey, CachedBasicQot>,
225 /// US stock overnight-enabled state, keyed by backend stock_id.
226 ///
227 /// C++ stores this as `stockID -> bool` in `INNData_Qot_USStockOvernight`:
228 /// - `NNData_Qot_USStockOvernight.cpp:21-35` (missing key => false)
229 /// - `NNBiz_Qot_USStockState.cpp:180-190` writes `overnight_type == 1`
230 /// - `APIServer_Qot_MarketState.cpp:238-244` reads it for 11 -> 37 projection
231 pub us_stock_overnight: DashMap<u64, bool>,
232 /// K 线缓存: key = "market_code:kl_type"
233 pub klines: DashMap<String, Vec<CachedKLine>>,
234 /// 摆盘缓存
235 pub order_books: DashMap<SecurityKey, CachedOrderBook>,
236 /// 逐笔缓存: 保留最近 N 条
237 pub tickers: DashMap<SecurityKey, Vec<CachedTicker>>,
238 /// 分时缓存
239 /// v1.4.106 codex 1140 F6 (P2 audit Finding 6): RT cache key 加 session
240 /// 维度. 之前 `DashMap<SecurityKey, ...>` 把 RTH/ETH/PRE/AFTER 全部混到
241 /// 同一桶, 客户端订阅 RTH 也能读到 PRE 数据. 现在 key 是
242 /// "sec_key:s{session}" (RequestSection 0=NORMAL/1=FULL/2=PREMARKET/
243 /// 3=AFTERHOURS), 隔离不同 session.
244 pub rt_data: DashMap<String, Vec<CachedTimeShare>>,
245 /// 经纪队列缓存
246 pub brokers: DashMap<SecurityKey, CachedBroker>,
247 /// v1.4.106 codex 1140 F7 (P2 audit Finding 7): 券商 ID → 信息映射.
248 /// 由 CMD 18008 拉取后填充, 用于 push parser 从 broker_id 查真名 (替代
249 /// `Broker#{bid}` 占位符).
250 pub broker_dict: DashMap<i64, CachedBrokerInfo>,
251 /// v1.4.110 codex Phase 3 Slice 6c: cold-cache wait waiters.
252 ///
253 /// key = `"<cache_key>:<wait_kind>"` (e.g. `"91_BTCUSDT@b1007:basic"` /
254 /// `"1_00700:orderbook"`). value = shared `Arc<Notify>` 让 handler 阻塞等
255 /// push parser 写 cache 后唤醒.
256 ///
257 /// 对齐 C++ `APIServer_Qot_StockBasic.cpp:226-320` `WaitForReady` —
258 /// 已订阅但 cache 未就绪时 handler 主动 Pull_SubData + 等 push 写 cache.
259 ///
260 /// 设计 trade-off:
261 /// - 用 `DashMap<String, Arc<Notify>>` 而非 `RwLock<HashMap>`: 高并发读
262 /// 写不锁全表
263 /// - key 编码 wait_kind 防 basic / orderbook 共用同一 Notify 互相错唤醒
264 /// - update path 调 `notify_waiters` (broadcast 给所有 awaiter) 然后从
265 /// map 中 remove (Arc 被 awaiter 持有, 自然释放)
266 pub cold_cache_waiters: DashMap<String, Arc<Notify>>,
267}
268
269impl QotCache {
270 pub fn new() -> Self {
271 Self {
272 basic_qot: DashMap::new(),
273 us_stock_overnight: DashMap::new(),
274 klines: DashMap::new(),
275 order_books: DashMap::new(),
276 tickers: DashMap::new(),
277 rt_data: DashMap::new(),
278 brokers: DashMap::new(),
279 // v1.4.106 codex 1140 F7: broker dict 由 CMD 18008 拉取后填充.
280 broker_dict: DashMap::new(),
281 // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait waiters.
282 cold_cache_waiters: DashMap::new(),
283 }
284 }
285
286 /// v1.4.110 codex Phase 3 Slice 6c: 注册 cold-cache wait waiter.
287 ///
288 /// 返已存在或新建的 `Arc<Notify>`. handler 调:
289 /// 1. `register_cold_cache_waiter("91_BTCUSDT@b1007:basic")` 获 Notify
290 /// 2. 主动发 Pull_SubData CMD6824
291 /// 3. `tokio::time::timeout(Duration::from_secs(3), notify.notified())` 等
292 /// 4. 再 `get_basic_qot_broker(&key)` 读 cache (可能仍 None — 真 timeout)
293 ///
294 /// `wait_kind` 推荐: `"basic"` / `"orderbook"`. 不混 sub_type 数字防误唤.
295 pub fn register_cold_cache_waiter(&self, wait_key: &str) -> Arc<Notify> {
296 self.cold_cache_waiters
297 .entry(wait_key.to_string())
298 .or_insert_with(|| Arc::new(Notify::new()))
299 .clone()
300 }
301
302 /// v1.4.110 codex Phase 3 Slice 6c: 唤醒指定 cold-cache wait waiter.
303 ///
304 /// push parser update path 调 (`update_basic_qot` / `update_order_book` /
305 /// `_broker` 变种). 没 waiter → no-op. 有 waiter → `notify_waiters()`
306 /// broadcast 给所有 awaiter, 然后 remove (Arc 仍被 awaiter 持有, 自然释放).
307 pub fn notify_cold_cache_waiters(&self, wait_key: &str) {
308 if let Some((_, n)) = self.cold_cache_waiters.remove(wait_key) {
309 n.notify_waiters();
310 }
311 }
312
313 /// v1.4.110 codex audit Round3 #22: cold-cache wait timeout 后清 idle waiter.
314 ///
315 /// `wait_for_basic_cache` / `wait_for_order_book_cache` 3s timeout 仍 cache
316 /// miss 时调. 若 push 始终没来, `notify_cold_cache_waiters` 不会触发, entry
317 /// 会一直留在 `cold_cache_waiters` map (虽 bounded by distinct wait_key 数,
318 /// 仍是慢速 leak).
319 ///
320 /// **只删 caller 自己注册的那个 entry, 且无其他并发 awaiter 时才删**:
321 /// `remove_if` closure 在 entry lock 下原子检查两条:
322 /// 1. `Arc::ptr_eq(stored, caller_notify)` — stored 必须就是 caller 当初
323 /// `register_cold_cache_waiter` 拿到的同一 Arc. 防 race: caller timeout
324 /// 后到本调用之间, 若 push 触发 `notify_cold_cache_waiters` 删了旧 entry,
325 /// 另一个 `wait_for_*` 又 register 建了**新** entry (不同 Arc), `ptr_eq`
326 /// false → 不误删别人的新 entry.
327 /// 2. `Arc::strong_count(stored) <= 2` — DashMap stored Arc 1 + caller
328 /// 持有的 `caller_notify` 1. `> 2` 表示有其他 `wait_for_*` 仍 await 同
329 /// entry → 保留让它们能被 notify 唤醒.
330 ///
331 /// caller 约定: 必须把 `register_cold_cache_waiter` 返回的 `Arc<Notify>`
332 /// 原样传进来 (caller 全程持有未 drop).
333 pub fn cleanup_cold_cache_waiter_if_idle(&self, wait_key: &str, caller_notify: &Arc<Notify>) {
334 self.cold_cache_waiters.remove_if(wait_key, |_, stored| {
335 Arc::ptr_eq(stored, caller_notify) && Arc::strong_count(stored) <= 2
336 });
337 }
338
339 /// Update C++-style US overnight stock state (`stockID -> bool`).
340 ///
341 /// Ref: `NNData_Qot_USStockOvernight.cpp:21-35` and
342 /// `NNBiz_Qot_USStockState.cpp:180-190`.
343 pub fn set_us_stock_overnight_state(&self, stock_id: u64, is_overnight: bool) {
344 if stock_id == 0 {
345 return;
346 }
347 self.us_stock_overnight.insert(stock_id, is_overnight);
348 }
349
350 /// Query whether a US stock is currently in overnight trading.
351 ///
352 /// C++ cache miss returns false (`NNData_Qot_USStockOvernight.cpp:29-34`).
353 pub fn is_us_stock_overnight(&self, stock_id: u64) -> bool {
354 self.us_stock_overnight
355 .get(&stock_id)
356 .map(|v| *v)
357 .unwrap_or(false)
358 }
359
360 /// v1.4.106 codex 1140 F7 (P2): 查 broker_id → broker name (中文简称).
361 /// cache miss → None, 调用方决定 fallback 策略 (push parser 用
362 /// `Broker#{id}` 作 emergency fallback, 但同时 warn-log 提示 dict 未加载).
363 pub fn get_broker_name(&self, broker_id: i64) -> Option<String> {
364 self.broker_dict
365 .get(&broker_id)
366 .map(|info| info.name_zh_cn.clone())
367 }
368
369 /// v1.4.106 codex 1140 F7 (P2): 批量写入 broker dict (CMD 18008 解析后调).
370 pub fn install_broker_dict(&self, entries: Vec<(i64, CachedBrokerInfo)>) {
371 for (id, info) in entries {
372 self.broker_dict.insert(id, info);
373 }
374 }
375
376 /// 更新基本报价
377 pub fn update_basic_qot(&self, key: &str, qot: CachedBasicQot) {
378 self.basic_qot.insert(key.to_string(), qot);
379 // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
380 self.notify_cold_cache_waiters(&basic_qot_wait_key(key));
381 }
382
383 /// 获取基本报价
384 pub fn get_basic_qot(&self, key: &str) -> Option<CachedBasicQot> {
385 self.basic_qot.get(key).map(|v| v.clone())
386 }
387
388 /// **v1.4.110 Phase 2 Slice 5**: 更新基本报价 (broker-aware).
389 ///
390 /// 用 `QotSecurityKey::cache_key()` 派生 String key. broker_id=None → 与
391 /// `update_basic_qot(public_sec_key, ...)` 等价; broker_id=Some(N) → 写
392 /// 独立 cache key `"market_code@b{N}"` (crypto multi-broker isolation).
393 pub fn update_basic_qot_broker(&self, key: &QotSecurityKey, qot: CachedBasicQot) {
394 let cache_key = key.cache_key();
395 self.basic_qot.insert(cache_key.clone(), qot);
396 // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
397 self.notify_cold_cache_waiters(&basic_qot_wait_key(&cache_key));
398 }
399
400 /// **v1.4.110 Phase 2 Slice 5**: 获取基本报价 (broker-aware).
401 pub fn get_basic_qot_broker(&self, key: &QotSecurityKey) -> Option<CachedBasicQot> {
402 self.basic_qot.get(&key.cache_key()).map(|v| v.clone())
403 }
404
405 /// 构造 RT 分时 cache key (v1.4.106 codex 1140 F6).
406 ///
407 /// 之前 key 仅 sec_key, RTH/ETH/PRE/AFTER/OVERNIGHT 混桶. 现在
408 /// "sec_key:s{session}" 隔离. session 来自 push 解析的
409 /// `TimeSharingPlans.section_type[0]` (RequestSection enum):
410 /// 0=NORMAL/1=FULL/2=PREMARKET/3=AFTERHOURS/5=OVERNIGHT.
411 /// GetRT handler 对 C++ 的 Session_ETH/Session_ALL 读取语义做动态拼接,
412 /// 不依赖额外 aggregate cache 桶。
413 pub fn make_rt_key(sec_key: &str, session: i32) -> String {
414 format!("{sec_key}:s{session}")
415 }
416
417 /// **v1.4.110 Phase 2 Slice 5**: 构造 RT 分时 cache key (broker-aware).
418 ///
419 /// 用 `QotSecurityKey::cache_key()` 作 prefix. broker_id=None → 与
420 /// `make_rt_key(public_sec_key, session)` 等价; broker_id=Some(N) → 写
421 /// 独立 cache key `"market_code@b{N}:s{session}"`.
422 pub fn make_rt_key_broker(key: &QotSecurityKey, session: i32) -> String {
423 format!("{}:s{}", key.cache_key(), session)
424 }
425
426 /// **v1.4.110 Phase 2 Slice 5**: 更新 RT 分时 (broker-aware).
427 pub fn update_rt_data_broker(
428 &self,
429 key: &QotSecurityKey,
430 session: i32,
431 rt_data: Vec<CachedTimeShare>,
432 ) {
433 let cache_key = Self::make_rt_key_broker(key, session);
434 self.rt_data.insert(cache_key, rt_data);
435 }
436
437 /// **v1.4.110 Phase 2 Slice 5**: 获取 RT 分时 (broker-aware).
438 pub fn get_rt_data_broker(
439 &self,
440 key: &QotSecurityKey,
441 session: i32,
442 ) -> Option<Vec<CachedTimeShare>> {
443 let cache_key = Self::make_rt_key_broker(key, session);
444 self.rt_data.get(&cache_key).map(|v| v.clone())
445 }
446
447 /// 构造 K 线 cache key (v1.4.106 codex 1140 F3 4-tuple).
448 ///
449 /// 之前 key 仅 `(sec_key, kl_type)` 2-tuple, 同股票同 KLType 但前复权 vs
450 /// 后复权 / RTH vs ETH 数据互相覆盖. 对齐 C++ APIServer_Qot_KL.cpp:
451 /// `GetNewestKLByCount(stock_id, enRehabType, enKLType, num, session, ...)`
452 /// 用 4 维 key.
453 ///
454 /// - `rehab`: proto Qot_Common.RehabType (0=None, 1=Forward, 2=Backward),
455 /// 对齐 backend `FTCmdKline.ExrightType`. 同一股票同一 kl_type 不同 rehab
456 /// 走独立 cache, 不互相覆盖.
457 /// - `kl_type`: proto Qot_Common.KLType (1=1Min, 2=Day, ..., 11=Quarter).
458 /// - `session`: proto FTCmdKline.RequestSection (0=NORMAL, 1=FULL,
459 /// 2=PREMARKET, 3=AFTERHOURS). RTH/ETH 隔离, push 来自 backend 的
460 /// `point.section_type[0]` 决定写入桶, read 由 client subscription
461 /// session 决定 (尚无, 默认 NORMAL).
462 pub fn make_kline_key(sec_key: &str, rehab: i32, kl_type: i32, session: i32) -> String {
463 format!("{sec_key}:r{rehab}:k{kl_type}:s{session}")
464 }
465
466 /// 更新 K 线 (v1.4.106 codex 1140 F3: 4-tuple key, rehab + session 隔离).
467 pub fn update_klines(
468 &self,
469 sec_key: &str,
470 rehab: i32,
471 kl_type: i32,
472 session: i32,
473 klines: Vec<CachedKLine>,
474 ) {
475 let cache_key = Self::make_kline_key(sec_key, rehab, kl_type, session);
476 self.klines.insert(cache_key, klines);
477 }
478
479 /// 获取 K 线 (v1.4.106 codex 1140 F3: 4-tuple key, rehab + session 隔离).
480 pub fn get_klines(
481 &self,
482 sec_key: &str,
483 rehab: i32,
484 kl_type: i32,
485 session: i32,
486 ) -> Option<Vec<CachedKLine>> {
487 let cache_key = Self::make_kline_key(sec_key, rehab, kl_type, session);
488 self.klines.get(&cache_key).map(|v| v.clone())
489 }
490
491 /// **v1.4.110 Phase 2 Slice 5**: 更新 K 线 (broker-aware).
492 ///
493 /// 用 `QotSecurityKey::cache_key()` 作 prefix, broker_id=None 时退化到原行为.
494 /// composite 维度仍是 4-tuple `(rehab, kl_type, session)`, broker_id 是第 5
495 /// 维通过 `QotSecurityKey` 注入到 prefix.
496 pub fn update_klines_broker(
497 &self,
498 key: &QotSecurityKey,
499 rehab: i32,
500 kl_type: i32,
501 session: i32,
502 klines: Vec<CachedKLine>,
503 ) {
504 let cache_key = Self::make_kline_key(&key.cache_key(), rehab, kl_type, session);
505 self.klines.insert(cache_key, klines);
506 }
507
508 /// **v1.4.110 Phase 2 Slice 5**: 获取 K 线 (broker-aware).
509 pub fn get_klines_broker(
510 &self,
511 key: &QotSecurityKey,
512 rehab: i32,
513 kl_type: i32,
514 session: i32,
515 ) -> Option<Vec<CachedKLine>> {
516 let cache_key = Self::make_kline_key(&key.cache_key(), rehab, kl_type, session);
517 self.klines.get(&cache_key).map(|v| v.clone())
518 }
519
520 /// 更新摆盘
521 pub fn update_order_book(&self, key: &str, ob: CachedOrderBook) {
522 self.order_books.insert(key.to_string(), ob);
523 // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
524 self.notify_cold_cache_waiters(&order_book_wait_key(key));
525 }
526
527 /// **v1.4.110 Phase 2 Slice 5**: 更新摆盘 (broker-aware).
528 pub fn update_order_book_broker(&self, key: &QotSecurityKey, ob: CachedOrderBook) {
529 let cache_key = key.cache_key();
530 self.order_books.insert(cache_key.clone(), ob);
531 // v1.4.110 codex Phase 3 Slice 6c: cold-cache wait notify.
532 self.notify_cold_cache_waiters(&order_book_wait_key(&cache_key));
533 }
534
535 /// **v1.4.110 Phase 2 Slice 5**: 获取摆盘 (broker-aware).
536 pub fn get_order_book_broker(&self, key: &QotSecurityKey) -> Option<CachedOrderBook> {
537 self.order_books.get(&key.cache_key()).map(|v| v.clone())
538 }
539
540 /// 追加逐笔(保留最近 1000 条)
541 pub fn append_tickers(&self, key: &str, new_tickers: Vec<CachedTicker>) {
542 let mut entry = self.tickers.entry(key.to_string()).or_default();
543 entry.extend(new_tickers);
544 if entry.len() > 1000 {
545 let drain_count = entry.len() - 1000;
546 entry.drain(..drain_count);
547 }
548 }
549
550 /// **v1.4.110 Phase 2 Slice 5**: 追加逐笔 (broker-aware).
551 pub fn append_tickers_broker(&self, key: &QotSecurityKey, new_tickers: Vec<CachedTicker>) {
552 let mut entry = self.tickers.entry(key.cache_key()).or_default();
553 entry.extend(new_tickers);
554 if entry.len() > 1000 {
555 let drain_count = entry.len() - 1000;
556 entry.drain(..drain_count);
557 }
558 }
559
560 /// **v1.4.110 Phase 2 Slice 5**: 获取逐笔 (broker-aware).
561 pub fn get_tickers_broker(&self, key: &QotSecurityKey) -> Option<Vec<CachedTicker>> {
562 self.tickers.get(&key.cache_key()).map(|v| v.clone())
563 }
564
565 /// 更新经纪队列
566 pub fn update_broker(&self, key: &str, broker: CachedBroker) {
567 self.brokers.insert(key.to_string(), broker);
568 }
569
570 /// 获取经纪队列
571 pub fn get_broker(&self, key: &str) -> Option<CachedBroker> {
572 self.brokers.get(key).map(|v| v.clone())
573 }
574
575 /// 清除指定股票的所有缓存
576 pub fn clear_security(&self, key: &str) {
577 self.basic_qot.remove(key);
578 self.order_books.remove(key);
579 self.tickers.remove(key);
580 self.brokers.remove(key);
581 // v1.4.106 codex 1140 F3: K 线 key 是 "sec_key:r{rehab}:k{kl_type}:s{session}"
582 // 4-tuple, 仍是 sec_key prefix 起头, retain prefix match 仍正确清所有维度.
583 let prefix = format!("{key}:");
584 self.klines.retain(|k, _| !k.starts_with(&prefix));
585 // v1.4.106 codex 1140 F6: rt_data key 也加 session 维度后变为
586 // "sec_key:s{session}", 同样 prefix-match 清 RTH/ETH/ALL 全部桶.
587 self.rt_data.retain(|k, _| !k.starts_with(&prefix));
588 }
589
590 /// **v1.4.110 Phase 2 Slice 5**: 清除指定股票的所有缓存 (broker-aware).
591 ///
592 /// 用 `QotSecurityKey::cache_key()` 派生 cache key 字符串. broker_id=None
593 /// → 与 `clear_security(public_sec_key)` 等价; broker_id=Some(N) → 只清
594 /// 该 broker 下的 cache (其他 broker 下同 stock_id 的 cache 保留).
595 pub fn clear_security_broker(&self, key: &QotSecurityKey) {
596 let cache_key = key.cache_key();
597 self.basic_qot.remove(&cache_key);
598 self.order_books.remove(&cache_key);
599 self.tickers.remove(&cache_key);
600 self.brokers.remove(&cache_key);
601 let prefix = format!("{cache_key}:");
602 self.klines.retain(|k, _| !k.starts_with(&prefix));
603 self.rt_data.retain(|k, _| !k.starts_with(&prefix));
604 }
605}
606
607impl Default for QotCache {
608 fn default() -> Self {
609 Self::new()
610 }
611}
612
613// =============================================================================
614// v1.4.110 codex QOT Phase 4 Slice 7: MergeMultipleOrderBookCaches pure fn.
615//
616// 对齐 C++ `QotRealTimeData::MergeMultipleOrderBookCaches` + `MergeOrderBookLists`
617// (QotRealTimeData.cpp:1440-1591):
618//
619// 1. **Per-side price-key merge** — 每档按 `price * 1e9` 取整作 key, 同 key
620// 累加 volume / order_count / detail_list (C++ MergedGear struct).
621// 2. **Side-aware sort** — bid 降序 (高价先), ask 升序 (低价先).
622// 3. **Top N truncate** — 仅当 `max_depth > 0` 时截断 (crypto LV2 用 40).
623// 4. **Timestamp**: bid/ask 各自取多 source 的最大 timestamp.
624// 5. **Skip price ≤ 0**: C++ "过滤空数据占位条目" (line 1466).
625// =============================================================================
626
627/// Per-side merge buffer (对齐 C++ `MergedGear`).
628#[derive(Debug, Clone)]
629struct MergedGear {
630 price: f64,
631 volume: i64,
632 order_count: i32,
633 detail_list: Vec<CachedOrderBookDetail>,
634 /// v1.4.110 codex audit Round4 R4-4: 高精度量累加器. 对齐 C++
635 /// `gear.dVolume += has_hpvolume() ? hpvolume() : volume()` —— 累加 de-scale
636 /// 后的真实量 (有 hp_volume 用之, 否则 fallback `volume`).
637 hp_volume: f64,
638}
639
640/// Per-side merge: take multiple lists, aggregate by price key, sort, optionally
641/// truncate to `max_depth`. 不会修改输入.
642///
643/// `is_bid=true` → sort 降序; `false` → 升序.
644fn merge_order_book_side(
645 sources: &[&[CachedOrderBookLevel]],
646 is_bid: bool,
647 max_depth: usize,
648) -> Vec<CachedOrderBookLevel> {
649 if sources.is_empty() {
650 return vec![];
651 }
652 // 价格精度放大到整数 (9 位小数, 对齐 C++ `PriceToKey`).
653 fn price_to_key(p: f64) -> i64 {
654 (p * 1e9 + 0.5) as i64
655 }
656
657 let mut merged: std::collections::HashMap<i64, MergedGear> = std::collections::HashMap::new();
658 for list in sources {
659 for level in list.iter() {
660 // C++: 过滤空数据占位条目 (price <= 0).
661 if level.price <= 0.0 {
662 continue;
663 }
664 let key = price_to_key(level.price);
665 let entry = merged.entry(key).or_insert(MergedGear {
666 price: level.price,
667 volume: 0,
668 order_count: 0,
669 detail_list: vec![],
670 hp_volume: 0.0,
671 });
672 entry.volume = entry.volume.saturating_add(level.volume);
673 entry.order_count = entry.order_count.saturating_add(level.order_count);
674 entry.detail_list.extend(level.detail_list.iter().cloned());
675 // v1.4.110 codex audit Round4 R4-4: 对齐 C++ merge gear.dVolume 累加 ——
676 // 有 hp_volume 用真实小数量, 否则 fallback `volume`.
677 entry.hp_volume += level.hp_volume.unwrap_or(level.volume as f64);
678 }
679 }
680
681 let mut keys: Vec<i64> = merged.keys().copied().collect();
682 if is_bid {
683 // 买盘: 价格从高到低 (降序).
684 keys.sort_by(|a, b| b.cmp(a));
685 } else {
686 // 卖盘: 价格从低到高 (升序).
687 keys.sort();
688 }
689
690 let mut out: Vec<CachedOrderBookLevel> = keys
691 .iter()
692 .map(|k| {
693 let g = &merged[k];
694 CachedOrderBookLevel {
695 price: g.price,
696 volume: g.volume,
697 order_count: g.order_count,
698 detail_list: g.detail_list.clone(),
699 // v1.4.110 codex audit Round4 R4-4: merge 输出 crypto 多交易所
700 // 合单盘口, 始终带累加后的 hp_volume (对齐 C++ set_hpvolume).
701 hp_volume: Some(g.hp_volume),
702 }
703 })
704 .collect();
705
706 // 深度截断: max_depth > 0 时生效.
707 if max_depth > 0 && out.len() > max_depth {
708 out.truncate(max_depth);
709 }
710 out
711}
712
713/// Merge N exchange-level orderbook caches into one broker-level cache.
714///
715/// 对齐 C++ `QotRealTimeData::MergeMultipleOrderBookCaches(vSources, pbMerged,
716/// nMaxDepth=40)` (QotRealTimeData.cpp:1534-1591):
717/// - 第一个 source 作为基础 (拷 svr_recv_time 等基本字段, 但 bid/ask list 重算)
718/// - 按 price key 聚合 bid/ask, 同 price 累加 volume + order_count + detail
719/// - bid 高价优先, ask 低价优先, 截断到 `max_depth`
720/// - bid/ask timestamp 取多 source 最大值
721///
722/// `max_depth = 0` → 不截断 (C++ 行为, line 1579-1583 `if nMaxDepth > 0`).
723/// `max_depth = 40` → crypto LV2 broker-level cache 用 (line 1014).
724pub fn merge_multiple_order_book_caches(
725 sources: &[CachedOrderBook],
726 max_depth: usize,
727) -> CachedOrderBook {
728 if sources.is_empty() {
729 return CachedOrderBook::default();
730 }
731
732 // v1.4.110 codex audit Round2 P2 #17: 收 slice-of-slice 而非 deep clone.
733 // merge_order_book_side 只读 levels (price/volume/order_count/detail_list),
734 // 不消费 Vec — 之前 `.bid_list.clone()` 每 source deep clone N×M levels
735 // (5 exchange × 40 level = 200 clone per LV2 push), crypto LV2 push 1Hz+
736 // 频率下浪费. 改 `.as_slice()` 只收引用, 零 Level clone.
737 let bid_sources: Vec<&[CachedOrderBookLevel]> =
738 sources.iter().map(|s| s.bid_list.as_slice()).collect();
739 let ask_sources: Vec<&[CachedOrderBookLevel]> =
740 sources.iter().map(|s| s.ask_list.as_slice()).collect();
741
742 let bid_merged = merge_order_book_side(&bid_sources, true, max_depth);
743 let ask_merged = merge_order_book_side(&ask_sources, false, max_depth);
744
745 // Timestamp 取所有 source 最大 (对齐 C++ line 1572-1575).
746 let max_bid_ts = sources
747 .iter()
748 .filter_map(|s| s.svr_recv_time_bid_timestamp)
749 .fold(None::<f64>, |acc, t| {
750 Some(acc.map(|a| a.max(t)).unwrap_or(t))
751 });
752 let max_ask_ts = sources
753 .iter()
754 .filter_map(|s| s.svr_recv_time_ask_timestamp)
755 .fold(None::<f64>, |acc, t| {
756 Some(acc.map(|a| a.max(t)).unwrap_or(t))
757 });
758
759 CachedOrderBook {
760 bid_list: bid_merged,
761 ask_list: ask_merged,
762 svr_recv_time_bid: None,
763 svr_recv_time_bid_timestamp: max_bid_ts,
764 svr_recv_time_ask: None,
765 svr_recv_time_ask_timestamp: max_ask_ts,
766 }
767}
768
769#[cfg(test)]
770mod merge_tests;
771
772#[cfg(test)]
773mod tests;