futu_cache/trd_cache.rs
1// 交易数据缓存
2
3mod types;
4
5#[cfg(test)]
6mod regression_tests;
7
8pub use types::*;
9
10use dashmap::DashMap;
11use futu_core::account_locator;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14
15/// 交易数据缓存
16pub struct TrdCache {
17 /// C++ `NNData_Trd_AccList::m_mapUserAccList` equivalent.
18 ///
19 /// This is the authoritative internal account index used by request
20 /// validation, broker routing, and funds/positions/order queries. It may
21 /// contain universal parent accounts that are intentionally not exposed by
22 /// public `Trd_GetAccList`.
23 pub accounts: DashMap<AccKey, CachedTrdAcc>,
24 /// C++ `NNData_Trd_AccList::m_mapIDRelation` equivalent:
25 /// `universal_or_self_acc_id -> public sub account ids`.
26 ///
27 /// `Trd_GetAccList` uses this relation via `get_accounts()` to expose the
28 /// same public projection as C++ `GetAllSubAccList`, while `lookup_account`
29 /// and direct `accounts.get()` still see the full internal map.
30 pub account_relations: DashMap<AccKey, Vec<AccKey>>,
31 /// Public account ids derived from `account_relations`.
32 pub public_account_ids: DashMap<AccKey, ()>,
33 public_projection_ready: AtomicBool,
34 /// 资金: `FundsCacheKey { acc_id, asset_category, currency }` → funds.
35 /// **v1.4.106 Finding A**: 之前 `DashMap<AccKey, CachedFunds>` 一 acc 一 snapshot,
36 /// Universal/Futures 多币种场景被覆盖 — 改 currency-aware key 对齐 C++
37 /// `m_mapAccFund: NN_AssetKey -> NN_TrdCurrency -> Ndt_Trd_AccFund`.
38 pub funds: DashMap<FundsCacheKey, CachedFunds>,
39 /// 持仓: `PositionsCacheKey { acc_id, asset_category }` → Vec<position>.
40 /// Category 0 preserves the legacy single-bucket path; JP margin and JP
41 /// derivative requests use scoped categories to avoid cross-bucket leakage.
42 pub positions: DashMap<PositionsCacheKey, Vec<CachedPosition>>,
43 /// 当日订单: acc_id → Vec<order>
44 pub orders: DashMap<AccKey, Vec<CachedOrder>>,
45 /// 交易 cipher: acc_id → cipher bytes (解锁后获得)
46 pub ciphers: DashMap<AccKey, Vec<u8>>,
47 /// v1.4.48 #1: 订单 broker 映射(order_id_ex → broker_id_used)
48 ///
49 /// 起源:v1.4.47 P0.1 修了 PlaceOrder 按 `sec_market` 选 broker,但 ModifyOrder /
50 /// CancelOrder 仍按 `account.security_firm` 选 broker,导致"在 broker 1007 (US)
51 /// 下的单,cancel 去 broker 1019 (CA) 拒" 的 cross-broker 故障。
52 ///
53 /// 修法:PlaceOrder 成功后把 `(order_id_ex, broker_id_used)` 缓存到这里。
54 /// ModifyOrder / CancelOrder 拿到 `c2s.order_id_ex` 后先查 broker_id;
55 /// 命中 → 路由到同 broker;未命中 → fallback account.firm 路由。
56 ///
57 /// 注:cipher 按 sub-account `acc_id` 存储(`ciphers` map)。对照 C++
58 /// `NNData_Trd_AccList::m_mapAccCipher`:不同 broker 的账户天然有不同
59 /// `nAccID`,存储已隔离(v1.4.49 清理了 v1.4.48 `cipher_brokers` workaround,
60 /// 该字段在 v1.4.48 #11 routing 对齐 C++ 后成 dead code)。
61 pub order_brokers: DashMap<String, u32>,
62
63 /// v1.4.73 A2 BUG-008 fix: per-account cipher state version counter。
64 ///
65 /// 外部 tester (v1.4.71) AI 报告 5 步 repro:
66 /// ```text
67 /// Step 1: unlock pwd → cache EXECUTED (idem_key=unlock-xxx)
68 /// Step 2: 同 body → cache HIT (正常幂等)
69 /// Step 3: EMPTY {} LOCK → v1.4.39 cipher 清
70 /// Step 4: 同 body → cache HIT 返 stale 成功! (真 bug)
71 /// Step 5: place-order → -401 "交易未解锁"
72 /// ```
73 ///
74 /// v1.4.72 Option C(空 body 不写 cache)只防 step 3 污染,未修 step 4 stale。
75 ///
76 /// Option A 真修:unlock `idem_key` 构造时纳入**当前 cipher_state_version**,
77 /// lock 清 cipher 时 `fetch_add(1, SeqCst)` → version 递增 → step 4 同 body
78 /// 得 idem_key **不同**(version=0 → version=1)→ cache miss → 真执行 unlock
79 /// 或 backend 校验失败返清晰错误。
80 ///
81 /// 为啥 SeqCst:unlock_trade handler 可能并发,确保 version 递增对所有
82 /// 后续 idem_key 构造 visible(`ciphers.remove()` + `fetch_add()` 顺序严格)。
83 ///
84 /// 注:version 不持久化 —— daemon restart 重新从 0 开始,等效于"新 cache",
85 /// 之前的 idem entries 也被 cache TTL 清光,零冲突。
86 pub cipher_state_versions: DashMap<AccKey, Arc<std::sync::atomic::AtomicU64>>,
87
88 /// v1.4.106 codex 0226 F1+F2: pending OrderConfirm context per
89 /// `(acc_id, ftapi_order_id)`.
90 ///
91 /// PlaceOrder ack 响应里若 `OrderNewRsp.action.type == ORDER_CONFIRM=5` 且
92 /// `action.order_confirm.is_some()`, daemon **必须** capture
93 /// `CltActionOrderConfirm` 字段, 用于后续 `Trd_ReconfirmOrder` 处理时构造
94 /// backend `OrderConfirmReq` (cmd 4728).
95 ///
96 /// **生命周期**:
97 /// - PlaceOrder ack 路径: capture 后 `insert(key, ctx)`
98 /// - ReconfirmOrder handler: lookup → 构造 backend req → 收到 `OrderConfirmRsp`
99 /// `result==0` 后 `remove(key)` (一次性消费, 防止重复 confirm)
100 /// - TTL: 5min (`ORDER_CONFIRM_CONTEXT_TTL_MS`), `now - inserted_at_ms` 检查;
101 /// stale entry handler 拒绝 + GC 清理
102 /// - daemon restart 全清 (内存 cache, backend 重新发 PlaceOrder 即可获新 context)
103 ///
104 /// 详见 `OrderConfirmContext` doc.
105 pub pending_order_confirms: DashMap<OrderConfirmKey, OrderConfirmContext>,
106}
107
108impl TrdCache {
109 pub fn get_cipher(&self, acc_id: u64) -> Option<Vec<u8>> {
110 self.ciphers.get(&acc_id).map(|v| v.clone())
111 }
112
113 pub fn set_cipher(&self, acc_id: u64, cipher: Vec<u8>) {
114 self.ciphers.insert(acc_id, cipher);
115 }
116
117 /// v1.4.73 A2 BUG-008 fix: 读当前账户的 cipher state version(用于 unlock idem_key)。
118 ///
119 /// 首次访问 acc_id 会初始化为 0。后续每次 lock 清 cipher 会 `fetch_add(1)`。
120 /// `idem_key` 构造时把这个 version 纳入 hash → cipher 清后 version 递增 →
121 /// 同 body 的 idem_key 不同 → cache miss → 真执行 unlock(或 backend 真校验)。
122 pub fn get_cipher_state_version(&self, acc_id: u64) -> u64 {
123 let entry = self
124 .cipher_state_versions
125 .entry(acc_id)
126 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
127 entry.load(Ordering::SeqCst)
128 }
129
130 /// v1.4.73 A2 BUG-008 fix: lock 清 cipher 时调,递增 version → 让下次 unlock
131 /// 同 body 得 cache miss。
132 ///
133 /// 返回 new version(递增后值),便于调用方 log。
134 #[must_use]
135 pub fn bump_cipher_state_version(&self, acc_id: u64) -> u64 {
136 let entry = self
137 .cipher_state_versions
138 .entry(acc_id)
139 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
140 entry.fetch_add(1, Ordering::SeqCst) + 1
141 }
142
143 /// v1.4.106 codex 0226 F1+F2: PlaceOrder 解析到 `OrderNewRsp.action.order_confirm`
144 /// 时调用, 保存上下文用于后续 `Trd_ReconfirmOrder` 构造 backend `OrderConfirmReq`.
145 ///
146 /// `now_ms` 由 caller 传入 (便于单测注入固定时钟); 真实路径用
147 /// `SystemTime::now()`.
148 pub fn store_pending_order_confirm(
149 &self,
150 acc_id: u64,
151 ftapi_order_id: u64,
152 mut ctx: OrderConfirmContext,
153 now_ms: u64,
154 ) {
155 ctx.inserted_at_ms = now_ms;
156 let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
157 self.pending_order_confirms.insert(key, ctx);
158 }
159
160 /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder handler 入口 lookup, 取出
161 /// `(acc_id, ftapi_order_id)` 对应 OrderConfirmContext.
162 ///
163 /// 返 `None`: cache miss (PlaceOrder 没存 / TTL 过期 / 已被消费). caller 必须
164 /// 早 reject loud, **不**允许 silent fallback (避免反模式 D / silent-success).
165 ///
166 /// `now_ms` 检查 TTL: `now - ctx.inserted_at_ms > ORDER_CONFIRM_CONTEXT_TTL_MS`
167 /// 视为 stale → return None + remove (proactive GC).
168 pub fn get_pending_order_confirm(
169 &self,
170 acc_id: u64,
171 ftapi_order_id: u64,
172 now_ms: u64,
173 ) -> Option<OrderConfirmContext> {
174 let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
175 let ctx = self.pending_order_confirms.get(&key)?.value().clone();
176 if now_ms.saturating_sub(ctx.inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
177 // Stale → proactive GC
178 self.pending_order_confirms.remove(&key);
179 return None;
180 }
181 Some(ctx)
182 }
183
184 /// v1.4.106 codex 0226 F1+F2: ReconfirmOrder backend 成功 (`OrderConfirmRsp.result==0`)
185 /// 后调用, 从 cache 删除 (一次性消费, 防重复 confirm).
186 ///
187 /// 返 `true` 表示真有删除发生; `false` = 已被其他路径消费 / 过期 GC.
188 pub fn remove_pending_order_confirm(&self, acc_id: u64, ftapi_order_id: u64) -> bool {
189 let key = OrderConfirmKey::new(acc_id, ftapi_order_id);
190 self.pending_order_confirms.remove(&key).is_some()
191 }
192
193 /// v1.4.106 codex 0226 F1+F2: GC stale OrderConfirmContext entries.
194 ///
195 /// 用于定时清理 (push dispatcher 收到 ORDER 类 push 时顺便扫一次), 防止
196 /// stale ctx 累积. 返回清理掉的条目数.
197 pub fn purge_stale_order_confirms(&self, now_ms: u64) -> usize {
198 let mut purged = Vec::new();
199 for entry in self.pending_order_confirms.iter() {
200 if now_ms.saturating_sub(entry.value().inserted_at_ms) > ORDER_CONFIRM_CONTEXT_TTL_MS {
201 purged.push(*entry.key());
202 }
203 }
204 let n = purged.len();
205 for key in purged {
206 self.pending_order_confirms.remove(&key);
207 }
208 n
209 }
210
211 /// v1.4.106 codex 0554 F1 [P1]: 原子性清空所有 cipher + 同步 bump 各账户的
212 /// `cipher_state_version`。
213 ///
214 /// 起源:`/api/admin/reload` 之前的实现是
215 /// `bridge.caches.trd_cache.ciphers.clear()` 直接动 `DashMap`,但 **没** bump
216 /// `cipher_state_version`。这与 v1.4.73 A2 BUG-008 修复的语义不一致:
217 /// lock-trade 路径里 `ciphers.remove()` 之后必跟 `bump_cipher_state_version()`,
218 /// 防止旧 idempotency cache entry(unlock idem_key 含 cipher_state_version
219 /// hash)在 cipher 被清后仍命中返 stale "cached success",导致
220 /// step 4 / step 5 silent regression。
221 ///
222 /// admin/reload 漏 bump 的具体后果:
223 /// - reload 清光 ciphers
224 /// - 客户端再调 unlock-trade 同 body → idem_key 命中(cipher_state_version
225 /// 未变)→ 返 stale 成功 → cipher cache 仍空 → place-order `-401` 解锁失败
226 ///
227 /// 本 helper 把两步打包,**禁止外部直接 `cache.ciphers.clear()`**(那条
228 /// 路径 silent skip bump,复活 BUG-008)。所有清 cipher 的 control-plane
229 /// 路径(reload / admin / 未来若加更多)必须走本 helper。
230 ///
231 /// 返回 `(cleared_count, bumped_versions)`:
232 /// - `cleared_count`:清掉的 cipher 数(即 reload 前已解锁账户数)
233 /// - `bumped_versions`:每个被清 acc_id 的 (acc_id, new_version) 列表,
234 /// 便于 log + 客户端调试 idem_key 失效原因
235 ///
236 /// 与 lock-trade 路径的 bump 行为一致:仅对**实际清掉 cipher** 的 acc_id
237 /// 递增 version;从未解锁的账户 cipher_state_version 保持不变。
238 ///
239 /// 并发:`DashMap::iter()` 期间其他线程的 `ciphers.remove()` /
240 /// `ciphers.insert()` 可能 race,但本 helper 用 `remove(&key)` 逐个清,
241 /// 拿到 `Some(_)` 才 bump,保证 `version` 单调递增 + 与 `ciphers` 实际
242 /// 状态一致。`SeqCst` 保证 bump 对所有后续 `get_cipher_state_version()`
243 /// 立即可见。
244 #[must_use]
245 pub fn clear_all_ciphers_and_bump_versions(&self) -> (usize, Vec<(u64, u64)>) {
246 // 先收集 acc_ids(避免持 DashMap iter guard 时 mutate map → deadlock)
247 let acc_ids: Vec<u64> = self.ciphers.iter().map(|e| *e.key()).collect();
248 let mut cleared_count = 0usize;
249 let mut bumped_versions: Vec<(u64, u64)> = Vec::with_capacity(acc_ids.len());
250 for acc_id in acc_ids {
251 if self.ciphers.remove(&acc_id).is_some() {
252 cleared_count += 1;
253 let new_ver = self.bump_cipher_state_version(acc_id);
254 bumped_versions.push((acc_id, new_ver));
255 }
256 }
257 (cleared_count, bumped_versions)
258 }
259
260 pub fn new() -> Self {
261 Self {
262 accounts: DashMap::new(),
263 account_relations: DashMap::new(),
264 public_account_ids: DashMap::new(),
265 public_projection_ready: AtomicBool::new(false),
266 funds: DashMap::new(),
267 positions: DashMap::new(),
268 orders: DashMap::new(),
269 ciphers: DashMap::new(),
270 order_brokers: DashMap::new(),
271 cipher_state_versions: DashMap::new(),
272 // v1.4.106 codex 0226 F1+F2: pending OrderConfirm context cache
273 pending_order_confirms: DashMap::new(),
274 }
275 }
276
277 pub fn set_accounts(&self, accounts: Vec<CachedTrdAcc>) {
278 let relations = accounts
279 .iter()
280 .map(|acc| (acc.acc_id, vec![acc.acc_id]))
281 .collect();
282 self.set_accounts_with_relations(accounts, relations);
283 }
284
285 /// Atomically replace the internal account map and the public projection.
286 ///
287 /// `relations` mirrors C++ `m_mapIDRelation`: standalone accounts map to
288 /// themselves, while universal parents map to their public sub accounts.
289 /// This lets `GetAccList` expose only C++ `GetAllSubAccList` output without
290 /// losing hidden parent accounts needed by `GetAccItem`-style request paths.
291 pub fn set_accounts_with_relations(
292 &self,
293 accounts: Vec<CachedTrdAcc>,
294 relations: Vec<(AccKey, Vec<AccKey>)>,
295 ) {
296 self.accounts.clear();
297 self.account_relations.clear();
298 self.public_account_ids.clear();
299 for (idx, mut acc) in accounts.into_iter().enumerate() {
300 acc.order_index = idx;
301 self.accounts.insert(acc.acc_id, acc);
302 }
303 for (parent_id, sub_ids) in relations {
304 for sub_id in &sub_ids {
305 self.public_account_ids.insert(*sub_id, ());
306 }
307 self.account_relations.insert(parent_id, sub_ids);
308 }
309 self.public_projection_ready.store(true, Ordering::SeqCst);
310 }
311
312 #[must_use]
313 pub fn get_accounts(&self) -> Vec<CachedTrdAcc> {
314 if self.public_projection_ready.load(Ordering::SeqCst) {
315 self.public_account_ids
316 .iter()
317 .filter_map(|e| self.accounts.get(e.key()).map(|acc| acc.value().clone()))
318 .collect()
319 } else {
320 // Backward-compatible test path: many existing tests insert directly
321 // into `cache.accounts`. Until production calls set_accounts*, expose
322 // all entries, matching the old single-map behavior.
323 self.accounts.iter().map(|e| e.value().clone()).collect()
324 }
325 }
326
327 /// v1.4.106 codex 0932 F2 [P1]: 单 acc_id O(1) 查询 (DashMap key 直查).
328 ///
329 /// 用途: push_builder 构造 Trd_UpdateOrder / Trd_UpdateOrderFill header
330 /// 之前 resolve `trd_env` + `trd_market`. 对齐 C++
331 /// `INNData_Trd_AllAccList::GetAccEnv(nAccID)` / `GetAccMkt(nAccID)`.
332 ///
333 /// 返 `None` = cache miss (账户不在交易 cache 中). caller **必须 loud
334 /// return** 不 fallback (sentinel 0 让 client filter reject =
335 /// silent-success 反模式).
336 #[must_use]
337 pub fn lookup_account(&self, acc_id: u64) -> Option<CachedTrdAcc> {
338 self.accounts.get(&acc_id).map(|e| e.value().clone())
339 }
340
341 /// v1.4.103 (B10): card_num → acc_id resolution helper.
342 ///
343 /// 接受输入:
344 /// - **16 位完整 card_num** (`"1001100100800000"`): 完全匹配 `card_num` 字段.
345 /// - **4 位末尾 suffix** (`"7680"`): 匹配 `card_num` 末 4 位 (App 显示格式).
346 ///
347 /// 返 `Vec<u64>` (matching acc_ids):
348 /// - 0 个 → cache 中无 match (caller 决定 warn / abort);
349 /// - 1 个 → unique resolution;
350 /// - >= 2 个 → ambiguous (caller 必须 reject + log 候选, 不能 silent 接受).
351 ///
352 /// **空字符串 / 非纯数字 / 长度非 4 / 非 16** → 返 empty Vec (不 panic).
353 /// 这是为了让 caller 输入校验 + resolution 双责权: 调用方应该已经校验过格式.
354 #[must_use]
355 pub fn find_acc_ids_by_card_num(&self, input: &str) -> Vec<u64> {
356 // v1.4.103 codex F2.3 (P2): 同时匹配 `card_num` 和 `uni_card_num`
357 // (综合账户卡号). 用户故事 B10 描述 App 显示的`保证金综合账户(7680)`末
358 // 4 位 — 综合账户的卡号通常 in `uni_card_num`, 普通账户在 `card_num`.
359 // 单独只看 `card_num` 会让综合账户用户写 `--allowed-card-nums 7680`
360 // 时所有 resolve 都失败 → fail-closed sentinel reject (虽然安全, 但
361 // UX 失效, 用户必须 fall back 用 acc_id). 双匹配后 fail-closed
362 // sentinel 只在真没账户 match 时触发.
363 let accounts = self.get_accounts();
364 account_locator::match_card_num_in_records(&accounts, input, None).unwrap_or_default()
365 }
366
367 /// **v1.4.106 Finding A** (legacy compat): 不带 currency 维度的 update.
368 /// 用 `FundsCacheKey::legacy(acc_id)` 作 key. 适用于:
369 /// - 现有 caller 还没改 signature 的 (背景: backend push 不一定知 currency)
370 /// - SingleCurrency / sim / Crypto / Forex 账户 (本来就单币种)
371 ///
372 /// **新 caller 应优先用 [`Self::update_funds_per_currency`]** 显式标
373 /// currency 维度, 让 Universal/Futures 账户能存独立 snapshot per currency.
374 pub fn update_funds(&self, acc_id: u64, funds: CachedFunds) {
375 self.funds.insert(FundsCacheKey::legacy(acc_id), funds);
376 }
377
378 /// **v1.4.106 Finding A** (preferred for Universal/Futures): 带 currency
379 /// 维度的 update. backend push 时若知 funds 的实际 currency (从 `f.currency`
380 /// 字段或 push context 派生), 应该用这个 helper 让多币种 snapshot 不互相覆盖.
381 ///
382 /// 对齐 C++ `INNData_Trd_Acc::SetAccFund(stKey, enCurrency, ...)`.
383 pub fn update_funds_per_currency(
384 &self,
385 acc_id: u64,
386 currency: Option<i32>,
387 funds: CachedFunds,
388 ) {
389 let key = match currency {
390 Some(c) => FundsCacheKey::per_currency(acc_id, c),
391 None => FundsCacheKey::legacy(acc_id),
392 };
393 self.funds.insert(key, funds);
394 }
395
396 /// Currency + asset-category aware funds update.
397 ///
398 /// JP derivative accounts use `asset_category` as part of the C++ asset key.
399 /// Non-JP/legacy callers should pass `asset_category=0`, which preserves the
400 /// existing legacy/per-currency key shape.
401 pub fn update_funds_scoped(
402 &self,
403 acc_id: u64,
404 asset_category: i32,
405 currency: Option<i32>,
406 funds: CachedFunds,
407 ) {
408 let key = if asset_category != 0 {
409 FundsCacheKey::full(acc_id, asset_category, currency)
410 } else {
411 match currency {
412 Some(c) => FundsCacheKey::per_currency(acc_id, c),
413 None => FundsCacheKey::legacy(acc_id),
414 }
415 };
416 self.funds.insert(key, funds);
417 }
418
419 /// Update the requested funds bucket and also mirror the returned backend
420 /// currency bucket when it is known.
421 ///
422 /// C++ stores `Ndt_Trd_AccFund` under `accFund.enCurrency`
423 /// (`INNData_Trd_Acc.cpp::SetAccFund`). A Rust caller may request CMD3020
424 /// with `currency=None` because the daemon derived the backend default, but
425 /// REST/CLI later read the same account through an explicit effective
426 /// currency bucket. Mirroring prevents an older per-currency snapshot from
427 /// masking a fresher default refresh.
428 pub fn update_funds_scoped_with_returned_currency(
429 &self,
430 acc_id: u64,
431 asset_category: i32,
432 requested_currency: Option<i32>,
433 funds: CachedFunds,
434 ) {
435 let returned_currency = funds.currency;
436 self.update_funds_scoped(acc_id, asset_category, requested_currency, funds.clone());
437
438 if let Some(returned_currency) = returned_currency
439 && requested_currency != Some(returned_currency)
440 {
441 self.update_funds_scoped(acc_id, asset_category, Some(returned_currency), funds);
442 }
443 }
444
445 /// **v1.4.106 Finding A**: cache lookup with C++-equivalent fallback.
446 ///
447 /// 对齐 C++ `INNData_Trd_Acc::GetAccFund(stKey, enCurrency, pAccFund)`:
448 /// 先试 requested currency, 找不到则 fallback 到 latest/first available
449 /// currency, **返 false** (caller 应看 boolean 决定是否 trust).
450 ///
451 /// 输入 `currency`:
452 /// - `Some(c)`: Universal/Futures 路径, 优先 match per-currency snapshot
453 /// - `None`: SingleCurrency 路径, 直接 match `legacy(acc_id)` snapshot
454 ///
455 /// 输出 `(funds, currency_match)`:
456 /// - `(Some(funds), true)`: 精确命中 requested currency snapshot
457 /// - `(Some(funds), false)`: 命中 fallback (legacy 或不同 currency 的
458 /// snapshot — caller 应**不要 silent trust**, 至少 log warn 或 surface
459 /// currency mismatch)
460 /// - `(None, _)`: 完全 cache miss
461 #[must_use]
462 pub fn get_funds(&self, acc_id: u64, currency: Option<i32>) -> (Option<CachedFunds>, bool) {
463 self.get_funds_scoped(acc_id, 0, currency)
464 }
465
466 /// Funds lookup using the same `(acc_id, asset_category, currency)` dimensions
467 /// as [`Self::update_funds_scoped`].
468 ///
469 /// For `asset_category != 0` we require an exact scoped hit. Falling back to a
470 /// legacy or another asset-category snapshot would mix JP derivative asset
471 /// buckets and silently return the wrong funds.
472 #[must_use]
473 pub fn get_funds_scoped(
474 &self,
475 acc_id: u64,
476 asset_category: i32,
477 currency: Option<i32>,
478 ) -> (Option<CachedFunds>, bool) {
479 // Step 1: 精确 match
480 let exact_key = if asset_category != 0 {
481 FundsCacheKey::full(acc_id, asset_category, currency)
482 } else {
483 match currency {
484 Some(c) => FundsCacheKey::per_currency(acc_id, c),
485 None => FundsCacheKey::legacy(acc_id),
486 }
487 };
488 if let Some(f) = self.funds.get(&exact_key) {
489 return (Some(f.value().clone()), true);
490 }
491 if asset_category != 0 {
492 return (None, false);
493 }
494 // Step 2: fallback to legacy(acc_id) — backend 不带 currency context
495 // push 时落进 legacy key
496 if currency.is_some()
497 && let Some(f) = self.funds.get(&FundsCacheKey::legacy(acc_id))
498 {
499 return (Some(f.value().clone()), false);
500 }
501 // Step 3: fallback to ANY snapshot for this acc_id (latest available
502 // currency, 等价于 C++ "first available")
503 for entry in self.funds.iter() {
504 if entry.key().acc_id == acc_id {
505 return (Some(entry.value().clone()), false);
506 }
507 }
508 (None, false)
509 }
510
511 pub fn update_positions(&self, acc_id: u64, positions: Vec<CachedPosition>) {
512 self.update_positions_scoped(acc_id, 0, positions);
513 }
514
515 pub fn update_positions_scoped(
516 &self,
517 acc_id: u64,
518 asset_category: i32,
519 positions: Vec<CachedPosition>,
520 ) {
521 let key = if asset_category != 0 {
522 PositionsCacheKey::scoped(acc_id, asset_category)
523 } else {
524 PositionsCacheKey::legacy(acc_id)
525 };
526 self.positions.insert(key, positions);
527 }
528
529 #[must_use]
530 pub fn get_positions_scoped(
531 &self,
532 acc_id: u64,
533 asset_category: i32,
534 ) -> Option<Vec<CachedPosition>> {
535 let key = if asset_category != 0 {
536 PositionsCacheKey::scoped(acc_id, asset_category)
537 } else {
538 PositionsCacheKey::legacy(acc_id)
539 };
540 self.positions.get(&key).map(|p| p.value().clone())
541 }
542
543 #[must_use]
544 pub fn has_positions_scoped(&self, acc_id: u64, asset_category: i32) -> bool {
545 let key = if asset_category != 0 {
546 PositionsCacheKey::scoped(acc_id, asset_category)
547 } else {
548 PositionsCacheKey::legacy(acc_id)
549 };
550 self.positions.contains_key(&key)
551 }
552
553 pub fn update_orders(&self, acc_id: u64, orders: Vec<CachedOrder>) {
554 self.orders.insert(acc_id, orders);
555 }
556
557 /// 更新单个订单(推送场景)
558 pub fn upsert_order(&self, acc_id: u64, order: CachedOrder) {
559 let mut entry = self.orders.entry(acc_id).or_default();
560 if let Some(existing) = entry.iter_mut().find(|o| o.order_id == order.order_id) {
561 *existing = order;
562 } else {
563 entry.push(order);
564 }
565 }
566
567 /// v1.4.106 codex 0219 Finding 1: resolve cached order context for trade-write
568 /// (modify / cancel) handlers.
569 ///
570 /// 对齐 C++ `APIServer_Trd_ModifyOrder.cpp:251-256` + `:270-271`:
571 /// - 优先用 client 传的 `orderIDEx` (= backend `szOrderID`).
572 /// - 否则用 `(acc_id, order_id_hash)` 从 cache 找原 order, 取它的
573 /// `szOrderID` + `version` + `exchange*` 字段构造 backend req.
574 ///
575 /// **fail-closed 语义**: cache miss → 返 `Err`, caller 把错误透传到
576 /// FTAPI client 让用户先刷新 `/api/orders` 或传 orderIDEx. 不允许 silent
577 /// fall-through 到 `order_id.to_string()` (= 把 hash 当 backend id, 见
578 /// pitfall #45 silent-success).
579 ///
580 /// **入参**:
581 /// - `acc_id`: FTAPI `c2s.header.acc_id`.
582 /// - `order_id`: FTAPI `c2s.order_id` (hash). `0` 视为 caller 没传, 仅靠
583 /// `order_id_ex` 路径生效.
584 /// - `order_id_ex`: FTAPI `c2s.order_id_ex` (= backend szOrderID, 优先).
585 ///
586 /// **返回**:
587 /// - `Ok(snap)`: 命中 cache, 字段已 populated.
588 /// - `Err(ResolveOrderError::CacheMiss)`: cache 没存这个 (acc_id, order_id),
589 /// caller 应返清晰提示 "先刷新 /api/orders 或传 orderIDEx".
590 /// - `Err(ResolveOrderError::MissingBackendId)`: cache 命中但 backend_order_id
591 /// 字段空 (= cache entry 来自老版本, 没存 szOrderID), caller 应返同样提示.
592 /// - `Err(ResolveOrderError::InvalidInput)`: 同时缺 order_id 和 order_id_ex.
593 pub fn find_order_for_trade_write(
594 &self,
595 acc_id: u64,
596 order_id: u64,
597 order_id_ex: Option<&str>,
598 ) -> Result<CachedOrderSnapshot, ResolveOrderError> {
599 // Case 1: orderIDEx 传了 — 按 order_id_ex 在 cache 找完整 order
600 // (匹配 backend `szOrderID`).
601 //
602 // **v1.4.106 codex 0920 F3 (P1) fail-closed 修**: cache miss 时**不再**
603 // 返 default snapshot (= 把 ex 作 backend_id, 其他字段 0). 之前的 fallback
604 // 让 modify/cancel handler 用 default `order_version=0` / 空 `exchange*` /
605 // 空 `security_type` 发 backend, 后续 backend 拒错 / 路由错. 用户看 daemon
606 // 接受了请求实则 silent fail.
607 //
608 // **新语义**: cache miss + 用户传 ex → `Err(CacheMiss)` 让 handler 透传
609 // 给 FTAPI client, 用户应先调 `/api/orders` 刷新或确保 daemon 没 restart
610 // 过. 对齐 #45 silent-success anti-pattern (snapshot 不变量必须严格).
611 let trimmed_ex = order_id_ex.map(str::trim).filter(|s| !s.is_empty());
612 if let Some(ex) = trimmed_ex {
613 if let Some(orders) = self.orders.get(&acc_id) {
614 if let Some(order) = orders
615 .iter()
616 .find(|o| !o.backend_order_id.is_empty() && o.backend_order_id == ex)
617 {
618 return Ok(CachedOrderSnapshot::from_order(order));
619 }
620 // 未找到完整 order, 也接受老 entry (order_id_ex == ex 但 backend_order_id 空):
621 // 把 ex 作 backend_order_id 用 (用户显式传, 信任 caller).
622 if let Some(order) = orders.iter().find(|o| o.order_id_ex == ex) {
623 let mut snap = CachedOrderSnapshot::from_order(order);
624 if snap.backend_order_id.is_empty() {
625 snap.backend_order_id = ex.to_string();
626 }
627 return Ok(snap);
628 }
629 }
630 // v1.4.106 codex 0920 F3 (P1): cache miss + 用户传 ex → fail closed.
631 // 之前 silent fallback 到 default snapshot (= 0 / "" 字段 + ex 作
632 // backend_id), 让 handler 发不完整 backend req. 现在统一 reject,
633 // 让用户先刷新 cache.
634 return Err(ResolveOrderError::CacheMiss);
635 }
636
637 // Case 2: 仅 order_id (hash) — 必须 cache 命中才能查到 backend_order_id.
638 if order_id == 0 {
639 return Err(ResolveOrderError::InvalidInput);
640 }
641 let orders = self
642 .orders
643 .get(&acc_id)
644 .ok_or(ResolveOrderError::CacheMiss)?;
645 let order = orders
646 .iter()
647 .find(|o| o.order_id == order_id)
648 .ok_or(ResolveOrderError::CacheMiss)?;
649 if order.backend_order_id.is_empty() {
650 return Err(ResolveOrderError::MissingBackendId);
651 }
652 Ok(CachedOrderSnapshot::from_order(order))
653 }
654
655 /// v1.4.90 S BUG-e4da-009: stub TTL(30s)。
656 ///
657 /// stub 插入超过此 TTL 且 backend 仍不返该 `order_id` → 视为 backend 永久
658 /// 拒单(never accepted into authoritative list)→ evict。
659 pub const STUB_TTL_MS: u64 = 30_000;
660
661 /// v1.4.90 S BUG-e4da-009: 当前 unix epoch ms。
662 ///
663 /// 抽出 helper 是为了 unit test 能用 mock 时间(不直接调)。
664 fn now_ms() -> u64 {
665 use std::time::{SystemTime, UNIX_EPOCH};
666 SystemTime::now()
667 .duration_since(UNIX_EPOCH)
668 .map(|d| d.as_millis() as u64)
669 .unwrap_or(0)
670 }
671
672 /// v1.4.90 S BUG-e4da-009 cache saga 真修:merge backend 权威列表,**保留** stub.
673 ///
674 /// 历史坑(跨 v1.4.73 → v1.4.89 7 版未真修):
675 /// ```text
676 /// 17:36:44.204092 place_order.rs:427 v1.4.82 A2 stub upsert (order_id=X)
677 /// 17:36:44.204126 place_order.rs:451 PlaceOrder success
678 /// 17:36:44.204138 futu_audit:511 v1.4.38 idempotency: cached
679 /// 17:36:44.226531 place_order.rs:488 v1.4.73 A1 orders refreshed count=0 ← 22.4ms 清零
680 /// ```
681 ///
682 /// 根因:v1.4.73 A1 spawn refresh 直接 `orders.insert(acc_id, backend_list)`
683 /// **整覆盖**,22ms 内把 v1.4.82 A2 刚 upsert 的 stub 抹掉。client 0ms 查
684 /// `/api/orders` 命中 stub OK,但 22ms 后再查就 count=0 —— "**单子消失**" 假象。
685 ///
686 /// 修法(async-safe):refresh 不再 `insert` 整覆盖,而是 **merge**:
687 /// - backend 返的每个 order: upsert(同 `order_id` 命中 stub → 覆盖且
688 /// `is_stub=false`,不在 → push)
689 /// - cache 里 backend 没返的 stub orders(`is_stub=true`):
690 /// - `now_ms - stub_inserted_at_ms < STUB_TTL_MS` (30s) → **保留**
691 /// - 否则 → evict(backend 永久拒单兜底)
692 /// - cache 里 backend 没返的非 stub orders(`is_stub=false`):
693 /// 全清空(backend 是权威,老的非 stub 该被替换)
694 ///
695 /// 并发语义:用 DashMap entry api 取写锁,整 merge 在锁内完成 → 多个
696 /// `merge_preserving_stubs` 调用串行化(顺序与到达顺序一致)。
697 /// `upsert_order` 与 `merge_preserving_stubs` 之间也通过同一 entry lock
698 /// 排它,不会丢失 stub 插入与 merge 之间的并发更新。
699 pub fn merge_preserving_stubs(&self, acc_id: u64, backend_orders: Vec<CachedOrder>) {
700 self.merge_preserving_stubs_with_now(acc_id, backend_orders, Self::now_ms());
701 }
702
703 /// v1.4.90 S BUG-e4da-009: `merge_preserving_stubs` 的可注入时间版(test 用)。
704 ///
705 /// 业务代码只调 `merge_preserving_stubs`;本 fn 暴露便于 unit test 模拟
706 /// "stub 已超 TTL" / "stub 仍 fresh" 两种边界。
707 pub fn merge_preserving_stubs_with_now(
708 &self,
709 acc_id: u64,
710 backend_orders: Vec<CachedOrder>,
711 now_ms: u64,
712 ) {
713 let mut entry = self.orders.entry(acc_id).or_default();
714
715 // 收集既有 cache 里的 stub orders(按 order_id 索引,便于 merge 后判断)
716 let existing_stubs: Vec<CachedOrder> =
717 entry.iter().filter(|o| o.is_stub).cloned().collect();
718
719 // 重置 entry,按 backend list 重建(每个 backend order 必然 is_stub=false)
720 let mut new_orders: Vec<CachedOrder> = backend_orders
721 .into_iter()
722 .map(|mut o| {
723 // backend 是权威,强制 is_stub=false(防 caller 不慎传 stub)
724 o.is_stub = false;
725 o.stub_inserted_at_ms = 0;
726 // v1.4.105 BUG-v1.4.104-001: backend 列表 = broker confirmed 的权威订单,
727 // 强制 is_pending_broker_confirm=false (防 caller 不慎传 pending).
728 o.is_pending_broker_confirm = false;
729 o
730 })
731 .collect();
732
733 // 把 backend 没返的 stub 按 TTL 保留(已 merge 进 backend list 的不重复)
734 let backend_ids: std::collections::HashSet<u64> =
735 new_orders.iter().map(|o| o.order_id).collect();
736 for stub in existing_stubs {
737 if backend_ids.contains(&stub.order_id) {
738 // backend 已 ack → 不保留 stub,由 backend 版本胜出
739 continue;
740 }
741 let age = now_ms.saturating_sub(stub.stub_inserted_at_ms);
742 if age < Self::STUB_TTL_MS {
743 new_orders.push(stub);
744 }
745 // else: 老 stub 超 TTL,evict(不 push)
746 }
747
748 *entry = new_orders;
749 }
750
751 /// v1.4.105 BUG-v1.4.104-001 (P0): broker async confirm 到达后清 pending 标志.
752 ///
753 /// 当 push notice_type=4/5/8/100 (ORDER_UPDATE / ORDER_LIST_UPDATE /
754 /// TRADE_STATISTIC / ORDER_NTF) 到达对应 acc_id 时, 调本 fn 把所有
755 /// `is_pending_broker_confirm=true` 的 order 翻成 `false`.
756 ///
757 /// 设计选择: 不按 order_id 精确匹配清 — push notice 通常不带具体 order_id,
758 /// 只表 "本 acc 有 order 状态变化". 简化处理: acc 内任何 ORDER 类 push 到
759 /// 即视为 broker 已开始处理本 acc 的 stub orders.
760 /// 后续 query_orders refresh 会通过 `merge_preserving_stubs` 把 enriched
761 /// 版本写入, 替换 stub.
762 ///
763 /// 返被清的 order 数 (caller 用于 audit log).
764 pub fn clear_pending_confirm_for_acc(&self, acc_id: u64) -> usize {
765 let mut cleared = 0;
766 if let Some(mut entry) = self.orders.get_mut(&acc_id) {
767 for o in entry.iter_mut() {
768 if o.is_pending_broker_confirm {
769 o.is_pending_broker_confirm = false;
770 cleared += 1;
771 }
772 }
773 }
774 cleared
775 }
776
777 /// v1.4.106 codex 0226 F4 (P2): selective clear pending confirm by order_ids.
778 ///
779 /// `clear_pending_confirm_for_acc` 是 acc-level 全清, 但 ORDER push notify
780 /// 在 backend 实际带具体 `order_ids` 时(notice_type=4 ORDER_UPDATE 通常
781 /// 带), daemon 应**只**清对应订单的 pending flag, 而不是把同账户其他还没
782 /// confirm 的 stub 一并误清.
783 ///
784 /// 触发场景 (`bridge/dispatcher.rs:251-268`):
785 /// - notice_type=4/5/9 + 非空 `order_ids` (backend 真带 → 按订单清)
786 /// - notice_type=4/5/9 + 空 `order_ids` → fall back to `clear_pending_confirm_for_acc`
787 ///
788 /// match 逻辑: `o.order_id_ex` (alphanumeric backend szOrderID) 与
789 /// `order_ids` 任一相等. 不 match `o.order_id` (FTAPI u64 hash) 因为
790 /// backend push 带的 `order_ids` 是 backend 原生 string id.
791 ///
792 /// 返被清的 order 数 (caller 用于 audit log).
793 pub fn clear_pending_confirm_for_orders(&self, acc_id: u64, order_ids: &[String]) -> usize {
794 if order_ids.is_empty() {
795 return 0;
796 }
797 let mut cleared = 0;
798 if let Some(mut entry) = self.orders.get_mut(&acc_id) {
799 for o in entry.iter_mut() {
800 if o.is_pending_broker_confirm && order_ids.iter().any(|id| id == &o.order_id_ex) {
801 o.is_pending_broker_confirm = false;
802 cleared += 1;
803 }
804 }
805 }
806 cleared
807 }
808
809 /// v1.4.105 BUG-v1.4.104-001 (P0): cleanup task 删超时未 confirm 的 pending stub.
810 ///
811 /// 触发: PlaceOrder spawn 一个 30s 延迟 task, 到点检查 (acc_id, order_id_ex)
812 /// 对应的 stub 是否仍 `is_stub=true && is_pending_broker_confirm=true`.
813 /// 若是 → 删 stub + warn (push channel 断 / broker 拒单未 push 的兜底).
814 ///
815 /// **不**简单调 STUB_TTL_MS evict — 那个是 query_orders merge 时的逻辑,
816 /// 这里是主动 GC pending stub. 两者互补.
817 ///
818 /// 返 (purged: bool, reason: 描述), caller 写 audit log.
819 pub fn purge_pending_stub_if_still_pending(
820 &self,
821 acc_id: u64,
822 order_id: u64,
823 ) -> Option<String> {
824 if let Some(mut entry) = self.orders.get_mut(&acc_id) {
825 let before = entry.len();
826 let mut purged_code = None;
827 entry.retain(|o| {
828 let should_purge =
829 o.order_id == order_id && o.is_stub && o.is_pending_broker_confirm;
830 if should_purge {
831 purged_code = Some(o.code.clone());
832 }
833 !should_purge
834 });
835 let after = entry.len();
836 if before != after {
837 return Some(purged_code.unwrap_or_default());
838 }
839 }
840 None
841 }
842
843 /// v1.4.83 §9 F6: 扫全 cache 查 orphan orders.
844 ///
845 /// **Orphan 定义**: `order_status ∈ {0, 1, 2, 4}` (未达到 Submitted=5
846 /// 之前的 in-flight stub) **且** `create_timestamp.is_some()` **且**
847 /// `now_secs - create_timestamp > threshold_secs`.
848 ///
849 /// 含义对应 C++ proto OrderStatus enum (Trd_Common.proto:108):
850 /// - 0 = Unsubmitted (未提交) — 极端情况, daemon stub 修后不应该出现 (v1.4.103 P0 hotfix)
851 /// - 1 = WaitingSubmit (等待提交) — 条件单 stub 初值, 等触发
852 /// - 2 = Submitting (提交中) — 普通单 stub 初值 (v1.4.103 起)
853 /// - 4 = TimeOut (处理超时) — 后端回 timeout, 状态未知
854 ///
855 /// **为什么需要**: v1.4.82 A2 PlaceOrder 成功后直接 upsert stub order
856 /// 让 `/api/orders` 立刻可见 (BUG-60b0-002 fix). 后续 push notice_type=
857 /// 4/5/8 / re-fetch 把 status 推到 5 (Submitted) / 10/11 (Filled).
858 /// 若 push 通道断流 (§9 CMD3020 chain broken), stub 卡住 5min+ = orphan.
859 ///
860 /// **v1.4.103 P0 (BUG-WUZONG-001)**: stub status 从 0 (proto 定义为
861 /// Unsubmitted "未提交", 触发客户端 retry 多下单) 改成 1/2 (WaitingSubmit/
862 /// Submitting, 对齐 C++ NNProto_Trd_OrderOp.cpp:483-510). orphan 检测同步
863 /// 扩展到 {0, 1, 2, 4} 全 in-flight 状态 — 老 daemon 留下来 status=0 的
864 /// 卡死 stub 也能被检测到.
865 ///
866 /// 返 `Vec<OrphanOrder>`; caller 决定 log 级别 + metric bump.
867 #[must_use]
868 pub fn scan_orphan_orders(&self, now_secs: f64, threshold_secs: f64) -> Vec<OrphanOrder> {
869 let mut orphans = Vec::new();
870 for entry in self.orders.iter() {
871 let acc_id = *entry.key();
872 for order in entry.value().iter() {
873 // v1.4.103 codex F2.4 (P2) round 2: 仅扫**真 stub** orders
874 // (is_stub == true). 非 stub 的 backend 权威 orders 即使 status
875 // 是 0/1/2/4 (in-flight) 也不算 orphan — 它们是 backend 主动
876 // 持续推送的真实状态, daemon 不应认为是卡住. 之前 v1.4.103
877 // P0 hotfix 加 status filter {0,1,2,4} 但漏 is_stub guard,
878 // 导致 backend 长时 Submitting/TimeOut 真单 (5min+) 也被 log
879 // 当 orphan / push 通道断流候选 → metrics 噪音.
880 if !order.is_stub {
881 continue;
882 }
883 // v1.4.103 P0: in-flight 状态 (未到 Submitted=5 之前的过渡值)
884 // 都视为 stub 卡住候选 — 0 (Unsubmitted) / 1 (WaitingSubmit) /
885 // 2 (Submitting) / 4 (TimeOut).
886 if !matches!(order.order_status, 0 | 1 | 2 | 4) {
887 continue;
888 }
889 // **v1.4.106 codex 0920 F7 (P2)**: PlaceOrder stub `create_timestamp=None`
890 // (backend 还没 push 真 timestamp). 只用 `create_timestamp.is_some()`
891 // 时所有 stub 都被 scanner skip → orphan 检测对最关键场景 (stub
892 // 卡死) 失效.
893 //
894 // 修法: stub (`is_stub=true`) 优先用 `stub_inserted_at_ms`
895 // 推算 age (转 secs); 非 stub 用 `create_timestamp` (与原行为
896 // 一致, 但本 scanner 已加 `is_stub` guard, 不会 reach 此分支).
897 //
898 // `stub_inserted_at_ms` 在 PlaceOrder ack 后 upsert 时写, 所以
899 // 真 stub 必有非零值. = 0 视为没初始化 (老 cache entry 兼容)
900 // → skip.
901 let age_secs = if order.is_stub {
902 if order.stub_inserted_at_ms == 0 {
903 // 老 cache entry 未携带 stub_inserted_at_ms (e.g. 升级前
904 // 持久化数据反序列化), skip 而非误报.
905 continue;
906 }
907 let stub_inserted_secs = (order.stub_inserted_at_ms as f64) / 1000.0;
908 let now_unix_secs = now_secs;
909 now_unix_secs - stub_inserted_secs
910 } else {
911 // 非 stub: 走 create_timestamp 老路径 (本 scanner 已加
912 // is_stub guard, 此分支不应 reach 但保留兼容).
913 let Some(create_ts) = order.create_timestamp else {
914 continue;
915 };
916 now_secs - create_ts
917 };
918 if age_secs > threshold_secs {
919 orphans.push(OrphanOrder {
920 acc_id,
921 order_id: order.order_id,
922 order_id_ex: order.order_id_ex.clone(),
923 code: order.code.clone(),
924 age_secs,
925 });
926 }
927 }
928 }
929 orphans
930 }
931}
932
933/// v1.4.83 §9 F6: orphan order 结构化报告.
934#[derive(Debug, Clone)]
935pub struct OrphanOrder {
936 pub acc_id: u64,
937 pub order_id: u64,
938 pub order_id_ex: String,
939 pub code: String,
940 /// 距离 create_timestamp 的秒数
941 pub age_secs: f64,
942}
943
944impl Default for TrdCache {
945 fn default() -> Self {
946 Self::new()
947 }
948}