Skip to main content

futu_cache/
trd_cache.rs

1// 交易数据缓存
2
3mod types;
4
5#[cfg(test)]
6mod regression_tests;
7
8pub use types::*;
9
10use dashmap::DashMap;
11use futu_core::account_locator;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14
15/// 交易数据缓存
16pub struct TrdCache {
17    /// C++ `NNData_Trd_AccList::m_mapUserAccList` equivalent.
18    ///
19    /// This is the authoritative internal account index used by request
20    /// validation, broker routing, and funds/positions/order queries. It may
21    /// contain universal parent accounts that are intentionally not exposed by
22    /// public `Trd_GetAccList`.
23    pub accounts: DashMap<AccKey, CachedTrdAcc>,
24    /// C++ `NNData_Trd_AccList::m_mapIDRelation` equivalent:
25    /// `universal_or_self_acc_id -> public sub account ids`.
26    ///
27    /// `Trd_GetAccList` uses this relation via `get_accounts()` to expose the
28    /// same public projection as C++ `GetAllSubAccList`, while `lookup_account`
29    /// and direct `accounts.get()` still see the full internal map.
30    pub account_relations: DashMap<AccKey, Vec<AccKey>>,
31    /// Public account ids derived from `account_relations`.
32    pub public_account_ids: DashMap<AccKey, ()>,
33    public_projection_ready: AtomicBool,
34    /// 资金: `FundsCacheKey { acc_id, asset_category, currency }` → funds.
35    /// **v1.4.106 Finding A**: 之前 `DashMap<AccKey, CachedFunds>` 一 acc 一 snapshot,
36    /// Universal/Futures 多币种场景被覆盖 — 改 currency-aware key 对齐 C++
37    /// `m_mapAccFund: NN_AssetKey -> NN_TrdCurrency -> Ndt_Trd_AccFund`.
38    pub funds: DashMap<FundsCacheKey, CachedFunds>,
39    /// 持仓: `PositionsCacheKey { acc_id, asset_category }` → Vec<position>.
40    /// Category 0 preserves the legacy single-bucket path; JP margin and JP
41    /// derivative requests use scoped categories to avoid cross-bucket leakage.
42    pub positions: DashMap<PositionsCacheKey, Vec<CachedPosition>>,
43    /// 当日订单: acc_id → Vec<order>
44    pub orders: DashMap<AccKey, Vec<CachedOrder>>,
45    /// 交易 cipher: acc_id → cipher bytes (解锁后获得)
46    pub ciphers: DashMap<AccKey, Vec<u8>>,
47    /// v1.4.48 #1: 订单 broker 映射(order_id_ex → broker_id_used)
48    ///
49    /// 起源:v1.4.47 P0.1 修了 PlaceOrder 按 `sec_market` 选 broker,但 ModifyOrder /
50    /// CancelOrder 仍按 `account.security_firm` 选 broker,导致"在 broker 1007 (US)
51    /// 下的单,cancel 去 broker 1019 (CA) 拒" 的 cross-broker 故障。
52    ///
53    /// 修法:PlaceOrder 成功后把 `(order_id_ex, broker_id_used)` 缓存到这里。
54    /// ModifyOrder / CancelOrder 拿到 `c2s.order_id_ex` 后先查 broker_id;
55    /// 命中 → 路由到同 broker;未命中 → fallback account.firm 路由。
56    ///
57    /// 注:cipher 按 sub-account `acc_id` 存储(`ciphers` map)。对照 C++
58    /// `NNData_Trd_AccList::m_mapAccCipher`:不同 broker 的账户天然有不同
59    /// `nAccID`,存储已隔离(v1.4.49 清理了 v1.4.48 `cipher_brokers` workaround,
60    /// 该字段在 v1.4.48 #11 routing 对齐 C++ 后成 dead code)。
61    pub order_brokers: DashMap<String, u32>,
62
63    /// v1.4.73 A2 BUG-008 fix: per-account cipher state version counter。
64    ///
65    /// 外部 tester (v1.4.71) AI 报告 5 步 repro:
66    /// ```text
67    /// Step 1: unlock pwd       → cache EXECUTED (idem_key=unlock-xxx)
68    /// Step 2: 同 body          → cache HIT (正常幂等)
69    /// Step 3: EMPTY {} LOCK    → v1.4.39 cipher 清
70    /// Step 4: 同 body          → cache HIT 返 stale 成功! (真 bug)
71    /// Step 5: place-order      → -401 "交易未解锁"
72    /// ```
73    ///
74    /// v1.4.72 Option C(空 body 不写 cache)只防 step 3 污染,未修 step 4 stale。
75    ///
76    /// Option A 真修:unlock `idem_key` 构造时纳入**当前 cipher_state_version**,
77    /// lock 清 cipher 时 `fetch_add(1, SeqCst)` → version 递增 → step 4 同 body
78    /// 得 idem_key **不同**(version=0 → version=1)→ cache miss → 真执行 unlock
79    /// 或 backend 校验失败返清晰错误。
80    ///
81    /// 为啥 SeqCst:unlock_trade handler 可能并发,确保 version 递增对所有
82    /// 后续 idem_key 构造 visible(`ciphers.remove()` + `fetch_add()` 顺序严格)。
83    ///
84    /// 注:version 不持久化 —— daemon restart 重新从 0 开始,等效于"新 cache",
85    /// 之前的 idem entries 也被 cache TTL 清光,零冲突。
86    pub cipher_state_versions: DashMap<AccKey, Arc<std::sync::atomic::AtomicU64>>,
87
88    /// v1.4.106 codex 0226 F1+F2: pending OrderConfirm context per
89    /// `(acc_id, ftapi_order_id)`.
90    ///
91    /// PlaceOrder ack 响应里若 `OrderNewRsp.action.type == ORDER_CONFIRM=5` 且
92    /// `action.order_confirm.is_some()`, daemon **必须** capture
93    /// `CltActionOrderConfirm` 字段, 用于后续 `Trd_ReconfirmOrder` 处理时构造
94    /// backend `OrderConfirmReq` (cmd 4728).
95    ///
96    /// **生命周期**:
97    /// - PlaceOrder ack 路径: capture 后 `insert(key, ctx)`
98    /// - ReconfirmOrder handler: lookup → 构造 backend req → 收到 `OrderConfirmRsp`
99    ///   `result==0` 后 `remove(key)` (一次性消费, 防止重复 confirm)
100    /// - TTL: 5min (`ORDER_CONFIRM_CONTEXT_TTL_MS`), `now - inserted_at_ms` 检查;
101    ///   stale entry handler 拒绝 + GC 清理
102    /// - daemon restart 全清 (内存 cache, backend 重新发 PlaceOrder 即可获新 context)
103    ///
104    /// 详见 `OrderConfirmContext` doc.
105    pub pending_order_confirms: DashMap<OrderConfirmKey, OrderConfirmContext>,
106}
107
108impl TrdCache {
109    pub fn get_cipher(&self, acc_id: u64) -> Option<Vec<u8>> {
110        self.ciphers.get(&acc_id).map(|v| v.clone())
111    }
112
113    pub fn set_cipher(&self, acc_id: u64, cipher: Vec<u8>) {
114        self.ciphers.insert(acc_id, cipher);
115    }
116
117    /// v1.4.73 A2 BUG-008 fix: 读当前账户的 cipher state version(用于 unlock idem_key)。
118    ///
119    /// 首次访问 acc_id 会初始化为 0。后续每次 lock 清 cipher 会 `fetch_add(1)`。
120    /// `idem_key` 构造时把这个 version 纳入 hash → cipher 清后 version 递增 →
121    /// 同 body 的 idem_key 不同 → cache miss → 真执行 unlock(或 backend 真校验)。
122    pub fn get_cipher_state_version(&self, acc_id: u64) -> u64 {
123        let entry = self
124            .cipher_state_versions
125            .entry(acc_id)
126            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
127        entry.load(Ordering::SeqCst)
128    }
129
130    /// v1.4.73 A2 BUG-008 fix: lock 清 cipher 时调,递增 version → 让下次 unlock
131    /// 同 body 得 cache miss。
132    ///
133    /// 返回 new version(递增后值),便于调用方 log。
134    #[must_use]
135    pub fn bump_cipher_state_version(&self, acc_id: u64) -> u64 {
136        let entry = self
137            .cipher_state_versions
138            .entry(acc_id)
139            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
140        entry.fetch_add(1, Ordering::SeqCst) + 1
141    }
142
143    /// v1.4.106 codex 0226 F1+F2: PlaceOrder 解析到 `OrderNewRsp.action.order_confirm`
144    /// 时调用, 保存上下文用于后续 `Trd_ReconfirmOrder` 构造 backend `OrderConfirmReq`.
145    ///
146    /// `now_ms` 由 caller 传入 (便于单测注入固定时钟); 真实路径用
147    /// `SystemTime::now()`.
148    pub fn store_pending_order_confirm(
149        &self,
150        acc_id: u64,
151        ftapi_order_id: u64,
152        mut ctx: OrderConfirmContext,
153        now_ms: u64,
154    ) {
155        ctx.inserted_at_ms = now_ms;
156        let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
157        self.pending_order_confirms.insert(key, ctx);
158    }
159
160    /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder handler 入口 lookup, 取出
161    /// `(acc_id, ftapi_order_id)` 对应 OrderConfirmContext.
162    ///
163    /// 返 `None`: cache miss (PlaceOrder 没存 / TTL 过期 / 已被消费). caller 必须
164    /// 早 reject loud, **不**允许 silent fallback (避免反模式 D / silent-success).
165    ///
166    /// `now_ms` 检查 TTL: `now - ctx.inserted_at_ms > ORDER_CONFIRM_CONTEXT_TTL_MS`
167    /// 视为 stale → return None + remove (proactive GC).
168    pub fn get_pending_order_confirm(
169        &self,
170        acc_id: u64,
171        ftapi_order_id: u64,
172        now_ms: u64,
173    ) -> Option<OrderConfirmContext> {
174        let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
175        let ctx = self.pending_order_confirms.get(&key)?.value().clone();
176        if now_ms.saturating_sub(ctx.inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
177            // Stale → proactive GC
178            self.pending_order_confirms.remove(&key);
179            return None;
180        }
181        Some(ctx)
182    }
183
184    /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder backend 成功 (`OrderConfirmRsp.result==0`)
185    /// 后调用, 从 cache 删除 (一次性消费, 防重复 confirm).
186    ///
187    /// 返 `true` 表示真有删除发生; `false` = 已被其他路径消费 / 过期 GC.
188    pub fn remove_pending_order_confirm(&self, acc_id: u64, ftapi_order_id: u64) -> bool {
189        let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
190        self.pending_order_confirms.remove(&key).is_some()
191    }
192
193    /// v1.4.106 codex 0226 F1+F2: GC stale OrderConfirmContext entries.
194    ///
195    /// 用于定时清理 (push dispatcher 收到 ORDER 类 push 时顺便扫一次), 防止
196    /// stale ctx 累积. 返回清理掉的条目数.
197    pub fn purge_stale_order_confirms(&self, now_ms: u64) -> usize {
198        let mut purged = Vec::new();
199        for entry in self.pending_order_confirms.iter() {
200            if now_ms.saturating_sub(entry.value().inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
201                purged.push(*entry.key());
202            }
203        }
204        let n = purged.len();
205        for key in purged {
206            self.pending_order_confirms.remove(&key);
207        }
208        n
209    }
210
211    /// v1.4.106 codex 0554 F1 [P1]: 原子性清空所有 cipher + 同步 bump 各账户的
212    /// `cipher_state_version`。
213    ///
214    /// 起源:`/api/admin/reload` 之前的实现是
215    /// `bridge.caches.trd_cache.ciphers.clear()` 直接动 `DashMap`,但 **没** bump
216    /// `cipher_state_version`。这与 v1.4.73 A2 BUG-008 修复的语义不一致:
217    /// lock-trade 路径里 `ciphers.remove()` 之后必跟 `bump_cipher_state_version()`,
218    /// 防止旧 idempotency cache entry(unlock idem_key 含 cipher_state_version
219    /// hash)在 cipher 被清后仍命中返 stale "cached success",导致
220    /// step 4 / step 5 silent regression。
221    ///
222    /// admin/reload 漏 bump 的具体后果:
223    /// - reload 清光 ciphers
224    /// - 客户端再调 unlock-trade 同 body → idem_key 命中(cipher_state_version
225    ///   未变)→ 返 stale 成功 → cipher cache 仍空 → place-order `-401` 解锁失败
226    ///
227    /// 本 helper 把两步打包,**禁止外部直接 `cache.ciphers.clear()`**(那条
228    /// 路径 silent skip bump,复活 BUG-008)。所有清 cipher 的 control-plane
229    /// 路径(reload / admin / 未来若加更多)必须走本 helper。
230    ///
231    /// 返回 `(cleared_count, bumped_versions)`:
232    /// - `cleared_count`:清掉的 cipher 数(即 reload 前已解锁账户数)
233    /// - `bumped_versions`:每个被清 acc_id 的 (acc_id, new_version) 列表,
234    ///   便于 log + 客户端调试 idem_key 失效原因
235    ///
236    /// 与 lock-trade 路径的 bump 行为一致:仅对**实际清掉 cipher** 的 acc_id
237    /// 递增 version;从未解锁的账户 cipher_state_version 保持不变。
238    ///
239    /// 并发:`DashMap::iter()` 期间其他线程的 `ciphers.remove()` /
240    /// `ciphers.insert()` 可能 race,但本 helper 用 `remove(&key)` 逐个清,
241    /// 拿到 `Some(_)` 才 bump,保证 `version` 单调递增 + 与 `ciphers` 实际
242    /// 状态一致。`SeqCst` 保证 bump 对所有后续 `get_cipher_state_version()`
243    /// 立即可见。
244    #[must_use]
245    pub fn clear_all_ciphers_and_bump_versions(&self) -> (usize, Vec<(u64, u64)>) {
246        // 先收集 acc_ids(避免持 DashMap iter guard 时 mutate map → deadlock)
247        let acc_ids: Vec<u64> = self.ciphers.iter().map(|e| *e.key()).collect();
248        let mut cleared_count = 0usize;
249        let mut bumped_versions: Vec<(u64, u64)> = Vec::with_capacity(acc_ids.len());
250        for acc_id in acc_ids {
251            if self.ciphers.remove(&acc_id).is_some() {
252                cleared_count += 1;
253                let new_ver = self.bump_cipher_state_version(acc_id);
254                bumped_versions.push((acc_id, new_ver));
255            }
256        }
257        (cleared_count, bumped_versions)
258    }
259
260    pub fn new() -> Self {
261        Self {
262            accounts: DashMap::new(),
263            account_relations: DashMap::new(),
264            public_account_ids: DashMap::new(),
265            public_projection_ready: AtomicBool::new(false),
266            funds: DashMap::new(),
267            positions: DashMap::new(),
268            orders: DashMap::new(),
269            ciphers: DashMap::new(),
270            order_brokers: DashMap::new(),
271            cipher_state_versions: DashMap::new(),
272            // v1.4.106 codex 0226 F1+F2: pending OrderConfirm context cache
273            pending_order_confirms: DashMap::new(),
274        }
275    }
276
277    pub fn set_accounts(&self, accounts: Vec<CachedTrdAcc>) {
278        let relations = accounts
279            .iter()
280            .map(|acc| (acc.acc_id, vec![acc.acc_id]))
281            .collect();
282        self.set_accounts_with_relations(accounts, relations);
283    }
284
285    /// Atomically replace the internal account map and the public projection.
286    ///
287    /// `relations` mirrors C++ `m_mapIDRelation`: standalone accounts map to
288    /// themselves, while universal parents map to their public sub accounts.
289    /// This lets `GetAccList` expose only C++ `GetAllSubAccList` output without
290    /// losing hidden parent accounts needed by `GetAccItem`-style request paths.
291    pub fn set_accounts_with_relations(
292        &self,
293        accounts: Vec<CachedTrdAcc>,
294        relations: Vec<(AccKey, Vec<AccKey>)>,
295    ) {
296        self.accounts.clear();
297        self.account_relations.clear();
298        self.public_account_ids.clear();
299        for (idx, mut acc) in accounts.into_iter().enumerate() {
300            acc.order_index = idx;
301            self.accounts.insert(acc.acc_id, acc);
302        }
303        for (parent_id, sub_ids) in relations {
304            for sub_id in &sub_ids {
305                self.public_account_ids.insert(*sub_id, ());
306            }
307            self.account_relations.insert(parent_id, sub_ids);
308        }
309        self.public_projection_ready.store(true, Ordering::SeqCst);
310    }
311
312    #[must_use]
313    pub fn get_accounts(&self) -> Vec<CachedTrdAcc> {
314        if self.public_projection_ready.load(Ordering::SeqCst) {
315            self.public_account_ids
316                .iter()
317                .filter_map(|e| self.accounts.get(e.key()).map(|acc| acc.value().clone()))
318                .collect()
319        } else {
320            // Backward-compatible test path: many existing tests insert directly
321            // into `cache.accounts`. Until production calls set_accounts*, expose
322            // all entries, matching the old single-map behavior.
323            self.accounts.iter().map(|e| e.value().clone()).collect()
324        }
325    }
326
327    /// v1.4.106 codex 0932 F2 [P1]: 单 acc_id O(1) 查询 (DashMap key 直查).
328    ///
329    /// 用途: push_builder 构造 Trd_UpdateOrder / Trd_UpdateOrderFill header
330    /// 之前 resolve `trd_env` + `trd_market`. 对齐 C++
331    /// `INNData_Trd_AllAccList::GetAccEnv(nAccID)` / `GetAccMkt(nAccID)`.
332    ///
333    /// 返 `None` = cache miss (账户不在交易 cache 中). caller **必须 loud
334    /// return** 不 fallback (sentinel 0 让 client filter reject =
335    /// silent-success 反模式).
336    #[must_use]
337    pub fn lookup_account(&self, acc_id: u64) -> Option<CachedTrdAcc> {
338        self.accounts.get(&acc_id).map(|e| e.value().clone())
339    }
340
341    /// v1.4.103 (B10): card_num → acc_id resolution helper.
342    ///
343    /// 接受输入:
344    /// - **16 位完整 card_num** (`"1001100100800000"`): 完全匹配 `card_num` 字段.
345    /// - **4 位末尾 suffix** (`"7680"`): 匹配 `card_num` 末 4 位 (App 显示格式).
346    ///
347    /// 返 `Vec<u64>` (matching acc_ids):
348    /// - 0 个 → cache 中无 match (caller 决定 warn / abort);
349    /// - 1 个 → unique resolution;
350    /// - >= 2 个 → ambiguous (caller 必须 reject + log 候选, 不能 silent 接受).
351    ///
352    /// **空字符串 / 非纯数字 / 长度非 4 / 非 16** → 返 empty Vec (不 panic).
353    /// 这是为了让 caller 输入校验 + resolution 双责权: 调用方应该已经校验过格式.
354    #[must_use]
355    pub fn find_acc_ids_by_card_num(&self, input: &str) -> Vec<u64> {
356        // v1.4.103 codex F2.3 (P2): 同时匹配 `card_num` 和 `uni_card_num`
357        // (综合账户卡号). 用户故事 B10 描述 App 显示的`保证金综合账户(7680)`末
358        // 4 位 — 综合账户的卡号通常 in `uni_card_num`, 普通账户在 `card_num`.
359        // 单独只看 `card_num` 会让综合账户用户写 `--allowed-card-nums 7680`
360        // 时所有 resolve 都失败 → fail-closed sentinel reject (虽然安全, 但
361        // UX 失效, 用户必须 fall back 用 acc_id). 双匹配后 fail-closed
362        // sentinel 只在真没账户 match 时触发.
363        let accounts = self.get_accounts();
364        account_locator::match_card_num_in_records(&accounts, input, None).unwrap_or_default()
365    }
366
367    /// **v1.4.106 Finding A** (legacy compat): 不带 currency 维度的 update.
368    /// 用 `FundsCacheKey::legacy(acc_id)` 作 key. 适用于:
369    /// - 现有 caller 还没改 signature 的 (背景: backend push 不一定知 currency)
370    /// - SingleCurrency / sim / Crypto / Forex 账户 (本来就单币种)
371    ///
372    /// **新 caller 应优先用 [`Self::update_funds_per_currency`]** 显式标
373    /// currency 维度, 让 Universal/Futures 账户能存独立 snapshot per currency.
374    pub fn update_funds(&self, acc_id: u64, funds: CachedFunds) {
375        self.funds.insert(FundsCacheKey::legacy(acc_id), funds);
376    }
377
378    /// **v1.4.106 Finding A** (preferred for Universal/Futures): 带 currency
379    /// 维度的 update. backend push 时若知 funds 的实际 currency (从 `f.currency`
380    /// 字段或 push context 派生), 应该用这个 helper 让多币种 snapshot 不互相覆盖.
381    ///
382    /// 对齐 C++ `INNData_Trd_Acc::SetAccFund(stKey, enCurrency, ...)`.
383    pub fn update_funds_per_currency(
384        &self,
385        acc_id: u64,
386        currency: Option<i32>,
387        funds: CachedFunds,
388    ) {
389        let key = match currency {
390            Some(c) => FundsCacheKey::per_currency(acc_id, c),
391            None => FundsCacheKey::legacy(acc_id),
392        };
393        self.funds.insert(key, funds);
394    }
395
396    /// Currency + asset-category aware funds update.
397    ///
398    /// JP derivative accounts use `asset_category` as part of the C++ asset key.
399    /// Non-JP/legacy callers should pass `asset_category=0`, which preserves the
400    /// existing legacy/per-currency key shape.
401    pub fn update_funds_scoped(
402        &self,
403        acc_id: u64,
404        asset_category: i32,
405        currency: Option<i32>,
406        funds: CachedFunds,
407    ) {
408        let key = if asset_category != 0 {
409            FundsCacheKey::full(acc_id, asset_category, currency)
410        } else {
411            match currency {
412                Some(c) => FundsCacheKey::per_currency(acc_id, c),
413                None => FundsCacheKey::legacy(acc_id),
414            }
415        };
416        self.funds.insert(key, funds);
417    }
418
419    /// Update the requested funds bucket and also mirror the returned backend
420    /// currency bucket when it is known.
421    ///
422    /// C++ stores `Ndt_Trd_AccFund` under `accFund.enCurrency`
423    /// (`INNData_Trd_Acc.cpp::SetAccFund`). A Rust caller may request CMD3020
424    /// with `currency=None` because the daemon derived the backend default, but
425    /// REST/CLI later read the same account through an explicit effective
426    /// currency bucket. Mirroring prevents an older per-currency snapshot from
427    /// masking a fresher default refresh.
428    pub fn update_funds_scoped_with_returned_currency(
429        &self,
430        acc_id: u64,
431        asset_category: i32,
432        requested_currency: Option<i32>,
433        funds: CachedFunds,
434    ) {
435        let returned_currency = funds.currency;
436        self.update_funds_scoped(acc_id, asset_category, requested_currency, funds.clone());
437
438        if let Some(returned_currency) = returned_currency
439            && requested_currency != Some(returned_currency)
440        {
441            self.update_funds_scoped(acc_id, asset_category, Some(returned_currency), funds);
442        }
443    }
444
445    /// **v1.4.106 Finding A**: cache lookup with C++-equivalent fallback.
446    ///
447    /// 对齐 C++ `INNData_Trd_Acc::GetAccFund(stKey, enCurrency, pAccFund)`:
448    /// 先试 requested currency, 找不到则 fallback 到 latest/first available
449    /// currency, **返 false** (caller 应看 boolean 决定是否 trust).
450    ///
451    /// 输入 `currency`:
452    /// - `Some(c)`: Universal/Futures 路径, 优先 match per-currency snapshot
453    /// - `None`: SingleCurrency 路径, 直接 match `legacy(acc_id)` snapshot
454    ///
455    /// 输出 `(funds, currency_match)`:
456    /// - `(Some(funds), true)`: 精确命中 requested currency snapshot
457    /// - `(Some(funds), false)`: 命中 fallback (legacy 或不同 currency 的
458    ///   snapshot — caller 应**不要 silent trust**, 至少 log warn 或 surface
459    ///   currency mismatch)
460    /// - `(None, _)`: 完全 cache miss
461    #[must_use]
462    pub fn get_funds(&self, acc_id: u64, currency: Option<i32>) -> (Option<CachedFunds>, bool) {
463        self.get_funds_scoped(acc_id, 0, currency)
464    }
465
466    /// Funds lookup using the same `(acc_id, asset_category, currency)` dimensions
467    /// as [`Self::update_funds_scoped`].
468    ///
469    /// For `asset_category != 0` we require an exact scoped hit. Falling back to a
470    /// legacy or another asset-category snapshot would mix JP derivative asset
471    /// buckets and silently return the wrong funds.
472    #[must_use]
473    pub fn get_funds_scoped(
474        &self,
475        acc_id: u64,
476        asset_category: i32,
477        currency: Option<i32>,
478    ) -> (Option<CachedFunds>, bool) {
479        // Step 1: 精确 match
480        let exact_key = if asset_category != 0 {
481            FundsCacheKey::full(acc_id, asset_category, currency)
482        } else {
483            match currency {
484                Some(c) => FundsCacheKey::per_currency(acc_id, c),
485                None => FundsCacheKey::legacy(acc_id),
486            }
487        };
488        if let Some(f) = self.funds.get(&exact_key) {
489            return (Some(f.value().clone()), true);
490        }
491        if asset_category != 0 {
492            return (None, false);
493        }
494        // Step 2: fallback to legacy(acc_id) — backend 不带 currency context
495        // push 时落进 legacy key
496        if currency.is_some()
497            && let Some(f) = self.funds.get(&FundsCacheKey::legacy(acc_id))
498        {
499            return (Some(f.value().clone()), false);
500        }
501        // Step 3: fallback to ANY snapshot for this acc_id (latest available
502        // currency, 等价于 C++ "first available")
503        for entry in self.funds.iter() {
504            if entry.key().acc_id == acc_id {
505                return (Some(entry.value().clone()), false);
506            }
507        }
508        (None, false)
509    }
510
511    pub fn update_positions(&self, acc_id: u64, positions: Vec<CachedPosition>) {
512        self.update_positions_scoped(acc_id, 0, positions);
513    }
514
515    pub fn update_positions_scoped(
516        &self,
517        acc_id: u64,
518        asset_category: i32,
519        positions: Vec<CachedPosition>,
520    ) {
521        let key = if asset_category != 0 {
522            PositionsCacheKey::scoped(acc_id, asset_category)
523        } else {
524            PositionsCacheKey::legacy(acc_id)
525        };
526        self.positions.insert(key, positions);
527    }
528
529    #[must_use]
530    pub fn get_positions_scoped(
531        &self,
532        acc_id: u64,
533        asset_category: i32,
534    ) -> Option<Vec<CachedPosition>> {
535        let key = if asset_category != 0 {
536            PositionsCacheKey::scoped(acc_id, asset_category)
537        } else {
538            PositionsCacheKey::legacy(acc_id)
539        };
540        self.positions.get(&key).map(|p| p.value().clone())
541    }
542
543    #[must_use]
544    pub fn has_positions_scoped(&self, acc_id: u64, asset_category: i32) -> bool {
545        let key = if asset_category != 0 {
546            PositionsCacheKey::scoped(acc_id, asset_category)
547        } else {
548            PositionsCacheKey::legacy(acc_id)
549        };
550        self.positions.contains_key(&key)
551    }
552
553    pub fn update_orders(&self, acc_id: u64, orders: Vec<CachedOrder>) {
554        self.orders.insert(acc_id, orders);
555    }
556
557    /// 更新单个订单(推送场景)
558    pub fn upsert_order(&self, acc_id: u64, order: CachedOrder) {
559        let mut entry = self.orders.entry(acc_id).or_default();
560        if let Some(existing) = entry.iter_mut().find(|o| o.order_id == order.order_id) {
561            *existing = order;
562        } else {
563            entry.push(order);
564        }
565    }
566
567    /// v1.4.106 codex 0219 Finding 1: resolve cached order context for trade-write
568    /// (modify / cancel) handlers.
569    ///
570    /// 对齐 C++ `APIServer_Trd_ModifyOrder.cpp:251-256` + `:270-271`:
571    /// - 优先用 client 传的 `orderIDEx` (= backend `szOrderID`).
572    /// - 否则用 `(acc_id, order_id_hash)` 从 cache 找原 order, 取它的
573    ///   `szOrderID` + `version` + `exchange*` 字段构造 backend req.
574    ///
575    /// **fail-closed 语义**: cache miss → 返 `Err`, caller 把错误透传到
576    /// FTAPI client 让用户先刷新 `/api/orders` 或传 orderIDEx. 不允许 silent
577    /// fall-through 到 `order_id.to_string()` (= 把 hash 当 backend id, 见
578    /// pitfall #45 silent-success).
579    ///
580    /// **入参**:
581    /// - `acc_id`: FTAPI `c2s.header.acc_id`.
582    /// - `order_id`: FTAPI `c2s.order_id` (hash). `0` 视为 caller 没传, 仅靠
583    ///   `order_id_ex` 路径生效.
584    /// - `order_id_ex`: FTAPI `c2s.order_id_ex` (= backend szOrderID, 优先).
585    ///
586    /// **返回**:
587    /// - `Ok(snap)`: 命中 cache, 字段已 populated.
588    /// - `Err(ResolveOrderError::CacheMiss)`: cache 没存这个 (acc_id, order_id),
589    ///   caller 应返清晰提示 "先刷新 /api/orders 或传 orderIDEx".
590    /// - `Err(ResolveOrderError::MissingBackendId)`: cache 命中但 backend_order_id
591    ///   字段空 (= cache entry 来自老版本, 没存 szOrderID), caller 应返同样提示.
592    /// - `Err(ResolveOrderError::InvalidInput)`: 同时缺 order_id 和 order_id_ex.
593    pub fn find_order_for_trade_write(
594        &self,
595        acc_id: u64,
596        order_id: u64,
597        order_id_ex: Option<&str>,
598    ) -> Result<CachedOrderSnapshot, ResolveOrderError> {
599        // Case 1: orderIDEx 传了 — 按 order_id_ex 在 cache 找完整 order
600        // (匹配 backend `szOrderID`).
601        //
602        // **v1.4.106 codex 0920 F3 (P1) fail-closed 修**: cache miss 时**不再**
603        // 返 default snapshot (= 把 ex 作 backend_id, 其他字段 0). 之前的 fallback
604        // 让 modify/cancel handler 用 default `order_version=0` / 空 `exchange*` /
605        // 空 `security_type` 发 backend, 后续 backend 拒错 / 路由错. 用户看 daemon
606        // 接受了请求实则 silent fail.
607        //
608        // **新语义**: cache miss + 用户传 ex → `Err(CacheMiss)` 让 handler 透传
609        // 给 FTAPI client, 用户应先调 `/api/orders` 刷新或确保 daemon 没 restart
610        // 过. 对齐 #45 silent-success anti-pattern (snapshot 不变量必须严格).
611        let trimmed_ex = order_id_ex.map(str::trim).filter(|s| !s.is_empty());
612        if let Some(ex) = trimmed_ex {
613            if let Some(orders) = self.orders.get(&acc_id) {
614                if let Some(order) = orders
615                    .iter()
616                    .find(|o| !o.backend_order_id.is_empty() && o.backend_order_id == ex)
617                {
618                    return Ok(CachedOrderSnapshot::from_order(order));
619                }
620                // 未找到完整 order, 也接受老 entry (order_id_ex == ex 但 backend_order_id 空):
621                // 把 ex 作 backend_order_id 用 (用户显式传, 信任 caller).
622                if let Some(order) = orders.iter().find(|o| o.order_id_ex == ex) {
623                    let mut snap = CachedOrderSnapshot::from_order(order);
624                    if snap.backend_order_id.is_empty() {
625                        snap.backend_order_id = ex.to_string();
626                    }
627                    return Ok(snap);
628                }
629            }
630            // v1.4.106 codex 0920 F3 (P1): cache miss + 用户传 ex → fail closed.
631            // 之前 silent fallback 到 default snapshot (= 0 / "" 字段 + ex 作
632            // backend_id), 让 handler 发不完整 backend req. 现在统一 reject,
633            // 让用户先刷新 cache.
634            return Err(ResolveOrderError::CacheMiss);
635        }
636
637        // Case 2: 仅 order_id (hash) — 必须 cache 命中才能查到 backend_order_id.
638        if order_id == 0 {
639            return Err(ResolveOrderError::InvalidInput);
640        }
641        let orders = self
642            .orders
643            .get(&acc_id)
644            .ok_or(ResolveOrderError::CacheMiss)?;
645        let order = orders
646            .iter()
647            .find(|o| o.order_id == order_id)
648            .ok_or(ResolveOrderError::CacheMiss)?;
649        if order.backend_order_id.is_empty() {
650            return Err(ResolveOrderError::MissingBackendId);
651        }
652        Ok(CachedOrderSnapshot::from_order(order))
653    }
654
655    /// v1.4.90 S BUG-e4da-009: stub TTL(30s)。
656    ///
657    /// stub 插入超过此 TTL 且 backend 仍不返该 `order_id` → 视为 backend 永久
658    /// 拒单(never accepted into authoritative list)→ evict。
659    pub const STUB_TTL_MS: u64 = 30_000;
660
661    /// v1.4.90 S BUG-e4da-009: 当前 unix epoch ms。
662    ///
663    /// 抽出 helper 是为了 unit test 能用 mock 时间(不直接调)。
664    fn now_ms() -> u64 {
665        use std::time::{SystemTime, UNIX_EPOCH};
666        SystemTime::now()
667            .duration_since(UNIX_EPOCH)
668            .map(|d| d.as_millis() as u64)
669            .unwrap_or(0)
670    }
671
672    /// v1.4.90 S BUG-e4da-009 cache saga 真修:merge backend 权威列表,**保留** stub.
673    ///
674    /// 历史坑(跨 v1.4.73 → v1.4.89 7 版未真修):
675    /// ```text
676    /// 17:36:44.204092 place_order.rs:427  v1.4.82 A2 stub upsert (order_id=X)
677    /// 17:36:44.204126 place_order.rs:451  PlaceOrder success
678    /// 17:36:44.204138 futu_audit:511      v1.4.38 idempotency: cached
679    /// 17:36:44.226531 place_order.rs:488  v1.4.73 A1 orders refreshed count=0  ← 22.4ms 清零
680    /// ```
681    ///
682    /// 根因:v1.4.73 A1 spawn refresh 直接 `orders.insert(acc_id, backend_list)`
683    /// **整覆盖**,22ms 内把 v1.4.82 A2 刚 upsert 的 stub 抹掉。client 0ms 查
684    /// `/api/orders` 命中 stub OK,但 22ms 后再查就 count=0 —— "**单子消失**" 假象。
685    ///
686    /// 修法(async-safe):refresh 不再 `insert` 整覆盖,而是 **merge**:
687    /// - backend 返的每个 order: upsert(同 `order_id` 命中 stub → 覆盖且
688    ///   `is_stub=false`,不在 → push)
689    /// - cache 里 backend 没返的 stub orders(`is_stub=true`):
690    ///   - `now_ms - stub_inserted_at_ms < STUB_TTL_MS` (30s) → **保留**
691    ///   - 否则 → evict(backend 永久拒单兜底)
692    /// - cache 里 backend 没返的非 stub orders(`is_stub=false`):
693    ///   全清空(backend 是权威,老的非 stub 该被替换)
694    ///
695    /// 并发语义:用 DashMap entry api 取写锁,整 merge 在锁内完成 → 多个
696    /// `merge_preserving_stubs` 调用串行化(顺序与到达顺序一致)。
697    /// `upsert_order` 与 `merge_preserving_stubs` 之间也通过同一 entry lock
698    /// 排它,不会丢失 stub 插入与 merge 之间的并发更新。
699    pub fn merge_preserving_stubs(&self, acc_id: u64, backend_orders: Vec<CachedOrder>) {
700        self.merge_preserving_stubs_with_now(acc_id, backend_orders, Self::now_ms());
701    }
702
703    /// v1.4.90 S BUG-e4da-009: `merge_preserving_stubs` 的可注入时间版(test 用)。
704    ///
705    /// 业务代码只调 `merge_preserving_stubs`;本 fn 暴露便于 unit test 模拟
706    /// "stub 已超 TTL" / "stub 仍 fresh" 两种边界。
707    pub fn merge_preserving_stubs_with_now(
708        &self,
709        acc_id: u64,
710        backend_orders: Vec<CachedOrder>,
711        now_ms: u64,
712    ) {
713        let mut entry = self.orders.entry(acc_id).or_default();
714
715        // 收集既有 cache 里的 stub orders(按 order_id 索引,便于 merge 后判断)
716        let existing_stubs: Vec<CachedOrder> =
717            entry.iter().filter(|o| o.is_stub).cloned().collect();
718
719        // 重置 entry,按 backend list 重建(每个 backend order 必然 is_stub=false)
720        let mut new_orders: Vec<CachedOrder> = backend_orders
721            .into_iter()
722            .map(|mut o| {
723                // backend 是权威,强制 is_stub=false(防 caller 不慎传 stub)
724                o.is_stub = false;
725                o.stub_inserted_at_ms = 0;
726                // v1.4.105 BUG-v1.4.104-001: backend 列表 = broker confirmed 的权威订单,
727                // 强制 is_pending_broker_confirm=false (防 caller 不慎传 pending).
728                o.is_pending_broker_confirm = false;
729                o
730            })
731            .collect();
732
733        // 把 backend 没返的 stub 按 TTL 保留(已 merge 进 backend list 的不重复)
734        let backend_ids: std::collections::HashSet<u64> =
735            new_orders.iter().map(|o| o.order_id).collect();
736        for stub in existing_stubs {
737            if backend_ids.contains(&stub.order_id) {
738                // backend 已 ack → 不保留 stub,由 backend 版本胜出
739                continue;
740            }
741            let age = now_ms.saturating_sub(stub.stub_inserted_at_ms);
742            if age < Self::STUB_TTL_MS {
743                new_orders.push(stub);
744            }
745            // else: 老 stub 超 TTL,evict(不 push)
746        }
747
748        *entry = new_orders;
749    }
750
751    /// v1.4.105 BUG-v1.4.104-001 (P0): broker async confirm 到达后清 pending 标志.
752    ///
753    /// 当 push notice_type=4/5/8/100 (ORDER_UPDATE / ORDER_LIST_UPDATE /
754    /// TRADE_STATISTIC / ORDER_NTF) 到达对应 acc_id 时, 调本 fn 把所有
755    /// `is_pending_broker_confirm=true` 的 order 翻成 `false`.
756    ///
757    /// 设计选择: 不按 order_id 精确匹配清 — push notice 通常不带具体 order_id,
758    /// 只表 "本 acc 有 order 状态变化". 简化处理: acc 内任何 ORDER 类 push 到
759    /// 即视为 broker 已开始处理本 acc 的 stub orders.
760    /// 后续 query_orders refresh 会通过 `merge_preserving_stubs` 把 enriched
761    /// 版本写入, 替换 stub.
762    ///
763    /// 返被清的 order 数 (caller 用于 audit log).
764    pub fn clear_pending_confirm_for_acc(&self, acc_id: u64) -> usize {
765        let mut cleared = 0;
766        if let Some(mut entry) = self.orders.get_mut(&acc_id) {
767            for o in entry.iter_mut() {
768                if o.is_pending_broker_confirm {
769                    o.is_pending_broker_confirm = false;
770                    cleared += 1;
771                }
772            }
773        }
774        cleared
775    }
776
777    /// v1.4.106 codex 0226 F4 (P2): selective clear pending confirm by order_ids.
778    ///
779    /// `clear_pending_confirm_for_acc` 是 acc-level 全清, 但 ORDER push notify
780    /// 在 backend 实际带具体 `order_ids` 时(notice_type=4 ORDER_UPDATE 通常
781    /// 带), daemon 应**只**清对应订单的 pending flag, 而不是把同账户其他还没
782    /// confirm 的 stub 一并误清.
783    ///
784    /// 触发场景 (`bridge/dispatcher.rs:251-268`):
785    /// - notice_type=4/5/9 + 非空 `order_ids` (backend 真带 → 按订单清)
786    /// - notice_type=4/5/9 + 空 `order_ids` → fall back to `clear_pending_confirm_for_acc`
787    ///
788    /// match 逻辑: `o.order_id_ex` (alphanumeric backend szOrderID) 与
789    /// `order_ids` 任一相等. 不 match `o.order_id` (FTAPI u64 hash) 因为
790    /// backend push 带的 `order_ids` 是 backend 原生 string id.
791    ///
792    /// 返被清的 order 数 (caller 用于 audit log).
793    pub fn clear_pending_confirm_for_orders(&self, acc_id: u64, order_ids: &[String]) -> usize {
794        if order_ids.is_empty() {
795            return 0;
796        }
797        let mut cleared = 0;
798        if let Some(mut entry) = self.orders.get_mut(&acc_id) {
799            for o in entry.iter_mut() {
800                if o.is_pending_broker_confirm && order_ids.iter().any(|id| id == &o.order_id_ex) {
801                    o.is_pending_broker_confirm = false;
802                    cleared += 1;
803                }
804            }
805        }
806        cleared
807    }
808
809    /// v1.4.105 BUG-v1.4.104-001 (P0): cleanup task 删超时未 confirm 的 pending stub.
810    ///
811    /// 触发: PlaceOrder spawn 一个 30s 延迟 task, 到点检查 (acc_id, order_id_ex)
812    /// 对应的 stub 是否仍 `is_stub=true && is_pending_broker_confirm=true`.
813    /// 若是 → 删 stub + warn (push channel 断 / broker 拒单未 push 的兜底).
814    ///
815    /// **不**简单调 STUB_TTL_MS evict — 那个是 query_orders merge 时的逻辑,
816    /// 这里是主动 GC pending stub. 两者互补.
817    ///
818    /// 返 (purged: bool, reason: 描述), caller 写 audit log.
819    pub fn purge_pending_stub_if_still_pending(
820        &self,
821        acc_id: u64,
822        order_id: u64,
823    ) -> Option<String> {
824        if let Some(mut entry) = self.orders.get_mut(&acc_id) {
825            let before = entry.len();
826            let mut purged_code = None;
827            entry.retain(|o| {
828                let should_purge =
829                    o.order_id == order_id && o.is_stub && o.is_pending_broker_confirm;
830                if should_purge {
831                    purged_code = Some(o.code.clone());
832                }
833                !should_purge
834            });
835            let after = entry.len();
836            if before != after {
837                return Some(purged_code.unwrap_or_default());
838            }
839        }
840        None
841    }
842
843    /// v1.4.83 §9 F6: 扫全 cache 查 orphan orders.
844    ///
845    /// **Orphan 定义**: `order_status ∈ {0, 1, 2, 4}` (未达到 Submitted=5
846    /// 之前的 in-flight stub) **且** `create_timestamp.is_some()` **且**
847    /// `now_secs - create_timestamp > threshold_secs`.
848    ///
849    /// 含义对应 C++ proto OrderStatus enum (Trd_Common.proto:108):
850    /// - 0 = Unsubmitted (未提交) — 极端情况, daemon stub 修后不应该出现 (v1.4.103 P0 hotfix)
851    /// - 1 = WaitingSubmit (等待提交) — 条件单 stub 初值, 等触发
852    /// - 2 = Submitting (提交中) — 普通单 stub 初值 (v1.4.103 起)
853    /// - 4 = TimeOut (处理超时) — 后端回 timeout, 状态未知
854    ///
855    /// **为什么需要**: v1.4.82 A2 PlaceOrder 成功后直接 upsert stub order
856    /// 让 `/api/orders` 立刻可见 (BUG-60b0-002 fix). 后续 push notice_type=
857    /// 4/5/8 / re-fetch 把 status 推到 5 (Submitted) / 10/11 (Filled).
858    /// 若 push 通道断流 (§9 CMD3020 chain broken), stub 卡住 5min+ = orphan.
859    ///
860    /// **v1.4.103 P0 (BUG-WUZONG-001)**: stub status 从 0 (proto 定义为
861    /// Unsubmitted "未提交", 触发客户端 retry 多下单) 改成 1/2 (WaitingSubmit/
862    /// Submitting, 对齐 C++ NNProto_Trd_OrderOp.cpp:483-510). orphan 检测同步
863    /// 扩展到 {0, 1, 2, 4} 全 in-flight 状态 — 老 daemon 留下来 status=0 的
864    /// 卡死 stub 也能被检测到.
865    ///
866    /// 返 `Vec<OrphanOrder>`; caller 决定 log 级别 + metric bump.
867    #[must_use]
868    pub fn scan_orphan_orders(&self, now_secs: f64, threshold_secs: f64) -> Vec<OrphanOrder> {
869        let mut orphans = Vec::new();
870        for entry in self.orders.iter() {
871            let acc_id = *entry.key();
872            for order in entry.value().iter() {
873                // v1.4.103 codex F2.4 (P2) round 2: 仅扫**真 stub** orders
874                // (is_stub == true). 非 stub 的 backend 权威 orders 即使 status
875                // 是 0/1/2/4 (in-flight) 也不算 orphan — 它们是 backend 主动
876                // 持续推送的真实状态, daemon 不应认为是卡住. 之前 v1.4.103
877                // P0 hotfix 加 status filter {0,1,2,4} 但漏 is_stub guard,
878                // 导致 backend 长时 Submitting/TimeOut 真单 (5min+) 也被 log
879                // 当 orphan / push 通道断流候选 → metrics 噪音.
880                if !order.is_stub {
881                    continue;
882                }
883                // v1.4.103 P0: in-flight 状态 (未到 Submitted=5 之前的过渡值)
884                // 都视为 stub 卡住候选 — 0 (Unsubmitted) / 1 (WaitingSubmit) /
885                // 2 (Submitting) / 4 (TimeOut).
886                if !matches!(order.order_status, 0 | 1 | 2 | 4) {
887                    continue;
888                }
889                // **v1.4.106 codex 0920 F7 (P2)**: PlaceOrder stub `create_timestamp=None`
890                // (backend 还没 push 真 timestamp). 只用 `create_timestamp.is_some()`
891                // 时所有 stub 都被 scanner skip → orphan 检测对最关键场景 (stub
892                // 卡死) 失效.
893                //
894                // 修法: stub (`is_stub=true`) 优先用 `stub_inserted_at_ms`
895                // 推算 age (转 secs); 非 stub 用 `create_timestamp` (与原行为
896                // 一致, 但本 scanner 已加 `is_stub` guard, 不会 reach 此分支).
897                //
898                // `stub_inserted_at_ms` 在 PlaceOrder ack 后 upsert 时写, 所以
899                // 真 stub 必有非零值. = 0 视为没初始化 (老 cache entry 兼容)
900                // → skip.
901                let age_secs = if order.is_stub {
902                    if order.stub_inserted_at_ms == 0 {
903                        // 老 cache entry 未携带 stub_inserted_at_ms (e.g. 升级前
904                        // 持久化数据反序列化), skip 而非误报.
905                        continue;
906                    }
907                    let stub_inserted_secs = (order.stub_inserted_at_ms as f64) / 1000.0;
908                    let now_unix_secs = now_secs;
909                    now_unix_secs - stub_inserted_secs
910                } else {
911                    // 非 stub: 走 create_timestamp 老路径 (本 scanner 已加
912                    // is_stub guard, 此分支不应 reach 但保留兼容).
913                    let Some(create_ts) = order.create_timestamp else {
914                        continue;
915                    };
916                    now_secs - create_ts
917                };
918                if age_secs > threshold_secs {
919                    orphans.push(OrphanOrder {
920                        acc_id,
921                        order_id: order.order_id,
922                        order_id_ex: order.order_id_ex.clone(),
923                        code: order.code.clone(),
924                        age_secs,
925                    });
926                }
927            }
928        }
929        orphans
930    }
931}
932
933/// v1.4.83 §9 F6: orphan order 结构化报告.
934#[derive(Debug, Clone)]
935pub struct OrphanOrder {
936    pub acc_id: u64,
937    pub order_id: u64,
938    pub order_id_ex: String,
939    pub code: String,
940    /// 距离 create_timestamp 的秒数
941    pub age_secs: f64,
942}
943
944impl Default for TrdCache {
945    fn default() -> Self {
946        Self::new()
947    }
948}