Skip to main content

futu_backend/
quote_sub.rs

1// 行情订阅管理
2//
3// FTAPI SubType → 后端 SubscribeBit 映射
4// CMD 6211: 发送订阅请求到后端 (set-state semantics, server 覆盖式)
5// CMD 6212: 接收行情推送
6//
7// **v1.4.106 codex 1131 F1+F2** 重构:
8// - submit_global_desired_set 公共 fn 发"全集"CMD6211,并返回 QotSubError。
9//   backend reject / decode 错 / timeout 都 loud 返回,caller 据此决定
10//   ack-then-commit (F1);旧 silent legacy wrapper 已在 v1.4.109 删除。
11// - backend 覆盖式
12//   ("每次请求,客户端都需要提供当前需要订阅的所有股票的所有订阅位")
13//   unsub 通过发**新的更小集**实现 — 与 sub 同 entry 复用 (F2)
14// - SubscribeRequestParams 携带 session / orderbook_detail / broker_detail /
15//   rehab_type / first_push 等 backend req fields (F6)
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use prost::Message;
20
21use futu_core::error::{FutuError, Result};
22
23use crate::conn::BackendConn;
24use crate::proto_internal::ft_cmd_stock_quote_sub;
25use crate::proto_internal::ft_cmd_stock_quote_sub_data;
26use crate::proto_internal::ft_cmd_tick;
27
28/// 后端订阅命令 ID
29pub const CMD_QOT_SUB: u16 = 6211;
30/// 后端推送命令 ID
31pub const CMD_QOT_PUSH: u16 = 6212;
32/// 逐笔主动拉取命令 ID.
33///
34/// Ref: C++ `NNBiz_Qot_PullQot.cpp:327-365`
35/// `SendPullTickerListReq(... NN_ProtoCmd_Qot_Pull_Ticker ...)`.
36pub const CMD_QOT_PULL_TICKER: u16 = 6128;
37
38/// C++ `NN_QuoteTickerKey_FetchLatest ((u64_t)-1)`.
39const TICKER_FETCH_LATEST_KEY: u64 = u64::MAX;
40/// C++ `NN_QuotePullTicker_LatestTime ((u32_t)-1)`.
41const TICKER_LATEST_DATE_TIME_S: u32 = u32::MAX;
42
43mod nn_quote_session {
44    pub const RTH: i32 = 0;
45    pub const ETH: i32 = 1;
46    pub const ALL: i32 = 2;
47}
48
49mod tick_period_type {
50    // Ref: C++ `FTCmdTick.proto` TickPeriodType.
51    pub const NORMAL: u32 = 0;
52    pub const BEFORE: u32 = 1;
53    pub const AFTER: u32 = 2;
54    pub const OVERNIGHT: u32 = 4;
55}
56
57/// FTAPI SubType 枚举值
58pub mod sub_type {
59    pub const BASIC: i32 = 1;
60    pub const ORDER_BOOK: i32 = 2;
61    pub const TICKER: i32 = 4;
62    pub const RT: i32 = 5;
63    pub const KL_DAY: i32 = 6;
64    pub const KL_5MIN: i32 = 7;
65    pub const KL_15MIN: i32 = 8;
66    pub const KL_30MIN: i32 = 9;
67    pub const KL_60MIN: i32 = 10;
68    pub const KL_1MIN: i32 = 11;
69    pub const KL_WEEK: i32 = 12;
70    pub const KL_MONTH: i32 = 13;
71    pub const BROKER: i32 = 14;
72    pub const KL_QUARTER: i32 = 15;
73    pub const KL_YEAR: i32 = 16;
74    pub const KL_3MIN: i32 = 17;
75}
76
77fn common_session_to_nn(session: i32) -> i32 {
78    match session {
79        // FTAPI Common.Session: 1=RTH, 2=ETH, 3=ALL.
80        2 => nn_quote_session::ETH,
81        3 => nn_quote_session::ALL,
82        _ => nn_quote_session::RTH,
83    }
84}
85
86fn ticker_periods_for_nn_session(nn_session: i32) -> Vec<u32> {
87    match nn_session {
88        nn_quote_session::ALL => vec![
89            tick_period_type::NORMAL,
90            tick_period_type::BEFORE,
91            tick_period_type::AFTER,
92            tick_period_type::OVERNIGHT,
93        ],
94        nn_quote_session::ETH => vec![
95            tick_period_type::NORMAL,
96            tick_period_type::BEFORE,
97            tick_period_type::AFTER,
98        ],
99        _ => vec![tick_period_type::NORMAL],
100    }
101}
102
103/// 拉取最新逐笔,用于对齐 C++ Qot_Sub 成功后“订阅逐笔要提前拉一根”。
104///
105/// Ref:
106/// - `APIServer_Qot_Sub.cpp:265-280`: subscribe Ticker 后调用
107///   `PullNewestTickerList_Lot(..., 1, false, SessionToNN(enSession))`.
108/// - `NNBiz_Qot_PullQot.cpp:126-129`: 非 US 强制 RTH,再从 latest key 拉取。
109/// - `NNBiz_Qot_PullQot.cpp:327-365`: CMD6128 body + reserved[0]=head market,
110///   reserved[1]=NN_QuoteExType_SECURITY(0).
111pub async fn pull_latest_ticker(
112    backend: &BackendConn,
113    stock_id: u64,
114    nn_mkt_type: u8,
115    common_session: i32,
116    pull_count: u32,
117    broker_id: Option<i32>,
118) -> Result<ft_cmd_tick::TickRsp> {
119    if stock_id == 0 || pull_count == 0 {
120        return Err(FutuError::Codec(format!(
121            "PullLatestTicker: invalid stock_id={stock_id} pull_count={pull_count}"
122        )));
123    }
124
125    // C++ `PullNewestTickerList`: only US keeps ETH/ALL; other markets collapse
126    // to RTH because backend does not distinguish pre/after sessions there.
127    let nn_session = if nn_mkt_type == ftapi_market_to_quote_mkt(11) {
128        common_session_to_nn(common_session)
129    } else {
130        nn_quote_session::RTH
131    };
132
133    let req = ft_cmd_tick::TickReq {
134        security_id: Some(stock_id),
135        date_time_s: Some(TICKER_LATEST_DATE_TIME_S),
136        begin_tick_key: Some(TICKER_FETCH_LATEST_KEY),
137        tick_count: Some(pull_count),
138        tick_period_type: None,
139        tick_period_type_ex: ticker_periods_for_nn_session(nn_session),
140        req_auth: None,
141        end_tick_key: None,
142        date_time_s_v2: None,
143        // v1.4.110 codex Phase 3 Slice 6a: caller-provided broker_id (crypto-only).
144        // 对齐 C++ NNBiz_Qot_PullQot.cpp:344-349 `pbReq.set_broker_id(...)`.
145        broker_id,
146    };
147
148    let mut reserved = [0u8; 10];
149    reserved[0] = nn_mkt_type;
150    // reserved[1] = 0 == NN_QuoteExType_SECURITY.
151
152    let frame = backend
153        .request_with_reserved(CMD_QOT_PULL_TICKER, req.encode_to_vec(), reserved)
154        .await?;
155    let rsp: ft_cmd_tick::TickRsp = Message::decode(frame.body.as_ref())?;
156    let result = rsp.result.unwrap_or(-1);
157    if result != 0 {
158        return Err(FutuError::ServerError {
159            ret_type: result,
160            msg: format!("CMD6128 PullLatestTicker result={result}"),
161        });
162    }
163    Ok(rsp)
164}
165
166/// 后端 SubscribeBit 值
167pub mod sbit {
168    pub const PRICE: u32 = 0;
169    pub const STOCK_STATE: u32 = 1;
170    pub const STOCK_TYPE_SPECIFIC: u32 = 2;
171    pub const ORDER_BOOK: u32 = 3;
172    pub const ORDER_BOOK_DETAIL: u32 = 4; // v1.4.106 codex 1131 F6: detail 走独立 bit
173    pub const DEAL_STATISTICS: u32 = 5;
174    pub const HK_BROKER_QUEUE: u32 = 9;
175    pub const HK_BROKER_DETAIL: u32 = 10; // v1.4.106 codex 1131 F6: HK broker detail
176    pub const US_PREMARKET_AFTERHOURS: u32 = 13;
177    pub const US_LV2_ORDER: u32 = 17;
178    pub const TIME_SHARING: u32 = 20;
179    pub const KLINE_1MIN: u32 = 21;
180    pub const KLINE_3MIN: u32 = 22;
181    pub const KLINE_5MIN: u32 = 23;
182    pub const KLINE_15MIN: u32 = 24;
183    pub const KLINE_30MIN: u32 = 25;
184    pub const KLINE_60MIN: u32 = 26;
185    pub const KLINE_DAY: u32 = 27;
186    pub const KLINE_WEEK: u32 = 28;
187    pub const KLINE_MONTH: u32 = 29;
188    pub const KLINE_QUARTER: u32 = 30;
189    pub const KLINE_YEAR: u32 = 31;
190    pub const TICK: u32 = 35;
191    pub const MEGER_LV2_ORDER: u32 = 39;
192}
193
194/// **v1.4.106 codex 1131 F1+F2**: 单个 (security, sub_types_with_opts) 集合.
195/// 让 caller 一次表达 (stock_id, ftapi_market, [(sub_type, SubBitOptions)]).
196///
197/// **v1.4.110 Phase 2 Slice 4**: 从 3-tuple 升级为 struct, 加 `broker_id` 字段
198/// 让 CMD6211 `SecuritySubscribe.broker_id` (FTCmdStockQuoteSubData.proto:269,
199/// 对齐 C++ `MktQotSub.cpp:454-463 SecuritySubscribe::set_broker_id`) 真正写
200/// 出去. broker_id = `None` 走 no-broker 路径 (普通股); `Some(NonZeroU32)`
201/// = crypto multi-broker 隔离.
202///
203/// 当前 (Phase 2) 所有 caller 都传 `broker_id = None` (Phase 3 才在 SubHandler
204/// 入口接 `securityFirm` resolve 写 broker, 见 codex 调研 12:13 增量).
205#[derive(Debug, Clone)]
206pub struct SecurityWithOpts {
207    pub stock_id: u64,
208    pub ftapi_market: i32,
209    pub sub_types_with_opts: Vec<(i32, SubBitOptions)>,
210    /// **v1.4.110 Phase 2 Slice 4**: broker-aware 订阅 (C++ `StockKey(stockID, brokerID)`,
211    /// 见 `qot_stock_key.rs` Phase 1 doc).
212    /// - `None`: C++ `m_hasBroker = false`, CMD6211 不写 broker_id field
213    /// - `Some(N)`: C++ `m_hasBroker = true`, CMD6211 写 broker_id = N
214    pub broker_id: Option<std::num::NonZeroU32>,
215}
216
217impl SecurityWithOpts {
218    /// 不带 broker 的快捷构造 (普通股 / 非 crypto).
219    pub fn new(
220        stock_id: u64,
221        ftapi_market: i32,
222        sub_types_with_opts: Vec<(i32, SubBitOptions)>,
223    ) -> Self {
224        Self {
225            stock_id,
226            ftapi_market,
227            sub_types_with_opts,
228            // v1.4.110 codex audit P3 #10: 默认 no-broker. 普通股 / non-crypto
229            // 路径走这里; crypto multi-broker 用 `with_broker(...)` 构造.
230            broker_id: None,
231        }
232    }
233
234    /// 带 broker 的快捷构造 (crypto multi-broker). `broker_id = 0` 自动降级 None.
235    pub fn with_broker(
236        stock_id: u64,
237        ftapi_market: i32,
238        sub_types_with_opts: Vec<(i32, SubBitOptions)>,
239        broker_id: u32,
240    ) -> Self {
241        Self {
242            stock_id,
243            ftapi_market,
244            sub_types_with_opts,
245            broker_id: std::num::NonZeroU32::new(broker_id),
246        }
247    }
248}
249
250/// Explicit backend market group for an empty CMD6211 set-state request.
251///
252/// Empty requests are used when the last desired item in a market bucket is
253/// removed. They still must carry the original backend route market in protocol
254/// header `reserved[0]`; routeMarketID=0 is rejected by backend.
255#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub struct EmptyDesiredMarket {
257    pub mkt_type: u8,
258    pub is_depth: bool,
259}
260
261pub fn empty_desired_market_for_sub(
262    ftapi_market: i32,
263    sub_type: i32,
264) -> Option<EmptyDesiredMarket> {
265    let mkt_type = ftapi_market_to_quote_mkt(ftapi_market);
266    if mkt_type == 0 {
267        return None;
268    }
269    Some(EmptyDesiredMarket {
270        mkt_type,
271        is_depth: is_depth_sub_type(sub_type),
272    })
273}
274
275/// **v1.4.106 codex 1131 F1**: `submit_global_desired_set` 错误类型.
276/// 让 caller (SubHandler) 区分 backend reject vs decode err vs timeout →
277/// ack-then-commit pattern (F1 P1).
278#[derive(Debug)]
279pub enum QotSubError {
280    /// backend 返了 SubscribeSetRsp.result != 0 (reject), 携带 result 数 + warning.
281    BackendRejected { result: i32, warning: i32 },
282    /// 响应 decode 失败 — 该批次状态未知, 不写 local state.
283    DecodeFailed(String),
284    /// 网络 / TCP 错误 (timeout / 连接断). 透传 inner.
285    Transport(FutuError),
286    /// **v1.4.106 codex 0631 F3 [P2]**: caller 传了一个 backend 不识别的
287    /// `ftapi_market` (`ftapi_market_to_quote_mkt → 0`). 防御性 fail loud:
288    /// 不发任何 CMD6211, 整批 reject. caller 应早 validate 后再调.
289    /// 携带 offending list 让 caller 报清晰错给用户.
290    UnsupportedMarket { offending: Vec<i32> },
291    /// **v1.4.106 codex 0631 F3 [P2]**: 多 market 分批发送时, 部分市场
292    /// backend 失败 (BackendRejected / DecodeFailed) 但其它 OK — 该批次
293    /// 是部分应用 (split state). caller 不能当全成功, 需明示用户
294    /// "succeeded markets 已生效, failed 需重发".
295    PartialMarketFailure { succeeded: Vec<u8>, failed: Vec<u8> },
296}
297
298impl std::fmt::Display for QotSubError {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        match self {
301            QotSubError::BackendRejected { result, warning } => {
302                write!(
303                    f,
304                    "backend rejected CMD6211: result={result} warning={warning}"
305                )
306            }
307            QotSubError::DecodeFailed(s) => write!(f, "CMD6211 response decode failed: {s}"),
308            QotSubError::Transport(e) => write!(f, "CMD6211 transport error: {e}"),
309            QotSubError::UnsupportedMarket { offending } => write!(
310                f,
311                "CMD6211 unsupported ftapi_market(s): {offending:?} \
312                 (ftapi_market_to_quote_mkt returned 0). Caller must validate \
313                 ftapi_market before submit_global_desired_set."
314            ),
315            QotSubError::PartialMarketFailure { succeeded, failed } => write!(
316                f,
317                "CMD6211 partial-market failure: succeeded={succeeded:?} \
318                 failed={failed:?}. State is split: succeeded markets are \
319                 applied, failed markets need re-submit."
320            ),
321        }
322    }
323}
324
325impl std::error::Error for QotSubError {
326    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
327        match self {
328            QotSubError::Transport(e) => Some(e),
329            _ => None,
330        }
331    }
332}
333
334impl From<FutuError> for QotSubError {
335    fn from(e: FutuError) -> Self {
336        QotSubError::Transport(e)
337    }
338}
339
340/// FTAPI SubType → 后端 SubscribeBit 列表映射
341/// 返回 (bit, probability) 列表
342pub fn sub_type_to_bits(sub_type: i32) -> Vec<(u32, i64)> {
343    match sub_type {
344        sub_type::BASIC => vec![
345            (sbit::PRICE, 0),
346            (sbit::STOCK_STATE, 0),
347            (sbit::STOCK_TYPE_SPECIFIC, 0),
348            (sbit::DEAL_STATISTICS, 0),
349        ],
350        sub_type::ORDER_BOOK => vec![(sbit::ORDER_BOOK, 0)],
351        sub_type::TICKER => vec![(sbit::TICK, 1)], // prob=1 normal period
352        sub_type::RT => vec![(sbit::STOCK_STATE, 0), (sbit::TIME_SHARING, 0)],
353        sub_type::BROKER => vec![(sbit::HK_BROKER_QUEUE, 0)],
354        sub_type::KL_1MIN => vec![(sbit::KLINE_1MIN, 1)], // prob=1 no rehab
355        sub_type::KL_3MIN => vec![(sbit::KLINE_3MIN, 1)],
356        sub_type::KL_5MIN => vec![(sbit::KLINE_5MIN, 1)],
357        sub_type::KL_15MIN => vec![(sbit::KLINE_15MIN, 1)],
358        sub_type::KL_30MIN => vec![(sbit::KLINE_30MIN, 1)],
359        sub_type::KL_60MIN => vec![(sbit::KLINE_60MIN, 1)],
360        sub_type::KL_DAY => vec![(sbit::KLINE_DAY, 1)],
361        sub_type::KL_WEEK => vec![(sbit::KLINE_WEEK, 1)],
362        sub_type::KL_MONTH => vec![(sbit::KLINE_MONTH, 1)],
363        sub_type::KL_QUARTER => vec![(sbit::KLINE_QUARTER, 1)],
364        sub_type::KL_YEAR => vec![(sbit::KLINE_YEAR, 1)],
365        _ => vec![],
366    }
367}
368
369/// **v1.4.106 codex 1131 F6 [P2]**: 单个 (security, sub_type) 的扩展参数.
370///
371/// 决定 backend req body 里的 prob / detail flag / session 选择. 由 SubHandler
372/// 从 c2s + SubscriptionManager 的 detail/session aggregator 聚合产出.
373#[derive(Debug, Clone, Copy, Default)]
374pub struct SubBitOptions {
375    /// session: 1=RTH / 2=ETH / 3=ALL / 5=OVERNIGHT (rejected by C++)
376    /// 仅 Ticker / KLRT 类有 session 语义, 其他 sub_type 应填 RTH 默认.
377    pub session: i32,
378    /// OrderBook detail (有 conn 要 detail → 全局走 detail bit).
379    pub orderbook_detail: bool,
380    /// HKSF orderbook uses C++ `OrderBook_All` / `OrderBook_All_WithDetail`
381    /// backend prob so the server pushes every available side level, not only
382    /// the default 40-level snapshot.
383    pub orderbook_full_depth: bool,
384    /// Broker detail (HK broker queue 时有 conn 要 detail → 走 detail bit).
385    pub broker_detail: bool,
386    /// US Basic/MOST quote needs SBIT_US_PREMARKET_AFTERHOURS_DETAIL so
387    /// BasicQot can expose preMarket / afterMarket / overnight.
388    pub us_pre_after_detail: bool,
389    /// 扩展 session 含 ETH / Overnight / pre-after / etc — 影响 prob / time_seg.
390    /// 对齐 C++ APIServer_Qot_Sub.cpp:218-257.
391    pub extended_time: bool,
392    /// C++ `SBIT_MEGER_LV2_ORDER` 的 `prob2` USLV2OrderSubProb type bitmask.
393    /// Ref: `MktQotSubInstance.cpp:114-180`.
394    pub merged_lv2_order_types: u32,
395    /// C++ `SBIT_MEGER_LV2_ORDER` 的 `prob2_v2` USLV2OrderSubProb type bitmask.
396    /// Overnight / US futures 会产生非 UTF-8 protobuf bytes, 必须走 bytes field.
397    pub merged_lv2_order_types_v2: u32,
398}
399
400#[derive(Debug, Clone, PartialEq, Eq)]
401pub struct SubscribeBitInfo {
402    pub bit: u32,
403    pub prob: i64,
404    pub prob2: Option<String>,
405    pub prob2_v2: Option<Vec<u8>>,
406}
407
408// Ref: FutuOpenD/Src/NNProtoCenter/Quote/SubBitUtil.cpp:119-126.
409const ORDER_BOOK_40_PROB: i64 = 0;
410const ORDER_BOOK_ALL_PROB: i64 = 1;
411const ORDER_BOOK_ALL_WITH_DETAIL_PROB: i64 = 2;
412const ORDER_BOOK_SIMPLE_LV2_PROB: i64 = 8 | 16;
413
414fn encode_us_lv2_order_sub_prob(lv2_type: u32, level: u32) -> Vec<u8> {
415    ft_cmd_stock_quote_sub_data::Uslv2OrderSubProb {
416        sub_items: vec![ft_cmd_stock_quote_sub_data::Uslv2OrderSubItem {
417            us_lv2_order_type: Some(lv2_type),
418            us_lv2_order_level: Some(level),
419        }],
420    }
421    .encode_to_vec()
422}
423
424fn for_each_type_mask_bit(mut mask: u32, mut f: impl FnMut(u32)) {
425    while mask != 0 {
426        let bit = mask & (!mask + 1);
427        f(bit);
428        mask &= !bit;
429    }
430}
431
432fn for_each_us_lv2_prob2_type(mask: u32, mut f: impl FnMut(u32)) {
433    // Ref: FutuOpenD/Src/NNSymbol/MktQotSubInstance.cpp:150-168.
434    // US stock LV2 orderbook uses prob2 order: NASDAQ TotalView first, ARCA
435    // second. Keep any future unknown bit after the known C++ order.
436    for known in [
437        futu_cache::qot_right::US_LV2_ORDER_NASDAQ_TV,
438        futu_cache::qot_right::US_LV2_ORDER_ARCA,
439    ] {
440        if mask & known != 0 {
441            f(known);
442        }
443    }
444    let remaining = mask
445        & !(futu_cache::qot_right::US_LV2_ORDER_NASDAQ_TV
446            | futu_cache::qot_right::US_LV2_ORDER_ARCA);
447    for_each_type_mask_bit(remaining, f);
448}
449
450/// 按 SubBitOptions 计算实际 (bit, prob) 列表 — 取代 sub_type_to_bits 简版.
451///
452/// **v1.4.106 codex 1131 F6**: prob 不是死值. detail 切替 bit.
453/// orderbook_detail=true → ORDER_BOOK_DETAIL bit (4) 替代 ORDER_BOOK bit (3).
454pub fn sub_type_to_bits_with_options(sub_type: i32, opts: SubBitOptions) -> Vec<(u32, i64)> {
455    sub_type_to_bit_infos_with_options(sub_type, opts)
456        .into_iter()
457        .map(|info| (info.bit, info.prob))
458        .collect()
459}
460
461pub fn sub_type_to_bit_infos_with_options(
462    sub_type: i32,
463    opts: SubBitOptions,
464) -> Vec<SubscribeBitInfo> {
465    // Ticker session prob uses backend FTCmdStockQuoteSubData::BitProbTick
466    // bitset, after C++ converts NN_Prob_Type_* through
467    // `SubBitUtil::SubProb_NNToSvr`:
468    // - `QotSubscribe.cpp:1254-1268`: RTH=Normal, ETH=Normal|Before|After,
469    //   ALL=Normal|Before|After|OverNight.
470    // - `SubBitUtil.cpp:119-129`: Normal=1 / Before=2 / After=4 /
471    //   OverNight=8.
472    let ticker_session_prob = match opts.session {
473        2 => 1 | 2 | 4,
474        3 => 1 | 2 | 4 | 8,
475        _ => 1,
476    };
477    // KLine probabilities are pre-existing values for this CMD6211 path. Do
478    // not reuse ticker bitset here: C++ `QotSubscribe.cpp:1280-1294` sends KLRT
479    // through INNBiz_Qot_KLRT, not the ticker SubBit path.
480    let kline_session_prob = match opts.session {
481        2 => 2,
482        3 => 3,
483        _ => 1,
484    };
485    match sub_type {
486        sub_type::BASIC => {
487            let mut infos = vec![
488                SubscribeBitInfo::new(sbit::PRICE, 0),
489                SubscribeBitInfo::new(sbit::STOCK_STATE, 0),
490                SubscribeBitInfo::new(sbit::STOCK_TYPE_SPECIFIC, 0),
491                SubscribeBitInfo::new(sbit::DEAL_STATISTICS, 0),
492            ];
493            if opts.us_pre_after_detail {
494                // C++ SubBitUtil keeps this bit for US / US_OPTIONS / US_FUT
495                // Basic/MOST subscriptions and erases it for non-US markets.
496                infos.push(SubscribeBitInfo::new(sbit::US_PREMARKET_AFTERHOURS, 0));
497            }
498            infos
499        }
500        sub_type::ORDER_BOOK => {
501            let bit = if opts.orderbook_detail {
502                sbit::ORDER_BOOK_DETAIL
503            } else {
504                sbit::ORDER_BOOK
505            };
506            let mut infos = Vec::new();
507            for_each_type_mask_bit(opts.merged_lv2_order_types_v2, |lv2_type| {
508                infos.push(SubscribeBitInfo {
509                    bit: sbit::MEGER_LV2_ORDER,
510                    prob: 0,
511                    prob2: None,
512                    prob2_v2: Some(encode_us_lv2_order_sub_prob(lv2_type, 60)),
513                });
514            });
515            for_each_us_lv2_prob2_type(opts.merged_lv2_order_types, |lv2_type| {
516                let bytes = encode_us_lv2_order_sub_prob(lv2_type, 60);
517                let (prob2, prob2_v2) = encode_legacy_or_bytes_prob2(bytes);
518                infos.push(SubscribeBitInfo {
519                    bit: sbit::MEGER_LV2_ORDER,
520                    prob: 0,
521                    prob2,
522                    prob2_v2,
523                });
524            });
525            let prob = if opts.orderbook_full_depth {
526                if opts.orderbook_detail {
527                    ORDER_BOOK_ALL_WITH_DETAIL_PROB
528                } else {
529                    ORDER_BOOK_ALL_PROB
530                }
531            } else if opts.merged_lv2_order_types != 0 || opts.merged_lv2_order_types_v2 != 0 {
532                ORDER_BOOK_SIMPLE_LV2_PROB
533            } else {
534                ORDER_BOOK_40_PROB
535            };
536            infos.push(SubscribeBitInfo::new(bit, prob));
537            infos
538        }
539        sub_type::TICKER => vec![SubscribeBitInfo::new(sbit::TICK, ticker_session_prob)],
540        sub_type::RT => vec![
541            SubscribeBitInfo::new(sbit::STOCK_STATE, 0),
542            SubscribeBitInfo::new(sbit::TIME_SHARING, 0),
543        ],
544        sub_type::BROKER => {
545            let bit = if opts.broker_detail {
546                sbit::HK_BROKER_DETAIL
547            } else {
548                sbit::HK_BROKER_QUEUE
549            };
550            vec![SubscribeBitInfo::new(bit, 0)]
551        }
552        sub_type::KL_1MIN => vec![SubscribeBitInfo::new(sbit::KLINE_1MIN, kline_session_prob)],
553        sub_type::KL_3MIN => vec![SubscribeBitInfo::new(sbit::KLINE_3MIN, kline_session_prob)],
554        sub_type::KL_5MIN => vec![SubscribeBitInfo::new(sbit::KLINE_5MIN, kline_session_prob)],
555        sub_type::KL_15MIN => vec![SubscribeBitInfo::new(sbit::KLINE_15MIN, kline_session_prob)],
556        sub_type::KL_30MIN => vec![SubscribeBitInfo::new(sbit::KLINE_30MIN, kline_session_prob)],
557        sub_type::KL_60MIN => vec![SubscribeBitInfo::new(sbit::KLINE_60MIN, kline_session_prob)],
558        sub_type::KL_DAY => vec![SubscribeBitInfo::new(sbit::KLINE_DAY, kline_session_prob)],
559        sub_type::KL_WEEK => vec![SubscribeBitInfo::new(sbit::KLINE_WEEK, kline_session_prob)],
560        sub_type::KL_MONTH => vec![SubscribeBitInfo::new(sbit::KLINE_MONTH, kline_session_prob)],
561        sub_type::KL_QUARTER => vec![SubscribeBitInfo::new(
562            sbit::KLINE_QUARTER,
563            kline_session_prob,
564        )],
565        sub_type::KL_YEAR => vec![SubscribeBitInfo::new(sbit::KLINE_YEAR, kline_session_prob)],
566        _ => vec![],
567    }
568}
569
570impl SubscribeBitInfo {
571    fn new(bit: u32, prob: i64) -> Self {
572        Self {
573            bit,
574            prob,
575            prob2: None,
576            prob2_v2: None,
577        }
578    }
579}
580
581fn encode_legacy_or_bytes_prob2(bytes: Vec<u8>) -> (Option<String>, Option<Vec<u8>>) {
582    match String::from_utf8(bytes) {
583        Ok(prob2) => (Some(prob2), None),
584        Err(err) => (None, Some(err.into_bytes())),
585    }
586}
587
588/// **v1.4.110 Phase 2 Slice 4**: per-security 输入 — `(stock_id, broker_id, sub_types_with_opts)`
589/// 3-tuple. broker_id = `None` 走 no-broker 路径 (普通股, Phase 2 默认),
590/// `Some(NonZeroU32)` 走 broker-aware 路径 (crypto multi-broker).
591///
592/// 抽出 type alias 防 clippy `type_complexity` warn.
593pub type SecuritySubscribeInput = (u64, Option<std::num::NonZeroU32>, Vec<(i32, SubBitOptions)>);
594
595/// **v1.4.106 codex 1131 F6**: 构建带 SubBitOptions 的订阅请求 — 真传 session
596/// / detail / extended_time 给 backend.
597///
598/// **v1.4.110 Phase 2 Slice 4**: 升 3-tuple `(stock_id, broker_id, sub_types)`
599/// 让 SecuritySubscribe.broker_id 真写出去 (对齐 C++ `MktQotSub.cpp:454-463`):
600/// ```cpp
601/// SecuritySubscribe::set_security_id(stSecKey.nStockID);
602/// if (stSecKey.HasBroker()) {
603///     SecuritySubscribe::set_broker_id(stSecKey.GetBrokerID());
604/// }
605/// ```
606/// - `broker_id = None` ⟺ C++ `m_hasBroker = false`, 不调 `set_broker_id`
607/// - `broker_id = Some(N)` ⟺ C++ `m_hasBroker = true`, 写 `broker_id = N`
608pub fn build_subscribe_req_with_options(
609    securities: &[SecuritySubscribeInput],
610) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
611    let mut security_list = Vec::new();
612
613    for (stock_id, broker_id, sub_types_with_opts) in securities {
614        let mut bit_info_list = Vec::new();
615        for (st, opts) in sub_types_with_opts {
616            for info in sub_type_to_bit_infos_with_options(*st, *opts) {
617                bit_info_list.push(ft_cmd_stock_quote_sub_data::BitInfo {
618                    bit: Some(info.bit),
619                    prob: Some(info.prob),
620                    prob2: info.prob2,
621                    prob2_v2: info.prob2_v2,
622                });
623            }
624        }
625        security_list.push(ft_cmd_stock_quote_sub_data::SecuritySubscribe {
626            security_id: Some(*stock_id),
627            bit_info_list,
628            // v1.4.110 Phase 2 Slice 4: broker-aware CMD6211 wire.
629            // Some(NZ) → 写 i32 (crypto multi-broker); None → 不写 (普通股).
630            broker_id: broker_id.map(|nz| nz.get() as i32),
631        });
632    }
633
634    ft_cmd_stock_quote_sub::SubscribeSetReq {
635        security_list,
636        reserved: None,
637        timer_sub: None,
638    }
639}
640
641/// FTAPI market → 后端 NN_QuoteMktType 映射
642///
643/// 0 = unknown (caller should reject loud per F7).
644pub fn ftapi_market_to_quote_mkt(market: i32) -> u8 {
645    match market {
646        1 => 1,  // HK → NN_QuoteMktType_HK
647        11 => 2, // US → NN_QuoteMktType_US
648        21 => 3, // SH → NN_QuoteMktType_SH
649        22 => 4, // SZ → NN_QuoteMktType_SZ
650        // Internal subscribe-market markers derived from static security info.
651        // Public QotMarket maps HK futures back to HK_Security (C++
652        // APIServer_Inner_API.cpp::Market_NNToAPI), but CMD6211 reserved[0]
653        // still needs the backend NN_QuoteMktType. These values are not
654        // client-facing QotMarket values; SubHandler derives them from
655        // CachedSecurityInfo.sec_type/mkt_id before entering this function.
656        5 => 5,   // HK futures legacy → NN_QuoteMktType_FUT_HK
657        6 => 6,   // HK futures current → NN_QuoteMktType_FUT_HK_NEW
658        9 => 9,   // HK_OPTIONS → NN_QuoteMktType_HK_OPTIONS
659        13 => 13, // SG futures → NN_QuoteMktType_SG_FUTURE
660        15 => 7,  // US_OPTIONS → NN_QuoteMktType_US_OPTIONS
661        14 => 8,  // US_FUTURE → NN_QuoteMktType_US_FUT
662        16 => 16, // JP futures → NN_QuoteMktType_JP_FUTURE
663        23 => 10, // SH_KCB → NN_QuoteMktType_SH_KCB
664        // v1.4.110 codex Phase 3 Slice 6b: QotMarket_CC_Security=91 (Crypto)
665        // → NN_QuoteMktType_CRYPTO=17. C++ `NNBase_Define_Enum.h:517`.
666        91 => 17, // Crypto → NN_QuoteMktType_CRYPTO
667        // F7 [P2]: 31 (SG) / 41 (JP) / 51 (AU) / 61 (MY) / 71 (CA) / 81 (FX)
668        // 当前 backend reserved[0] mapping 不识别这些值 → 返 0,
669        // SubHandler 会按 F7 loud reject ftapi_market_to_quote_mkt==0 的 case.
670        _ => 0,
671    }
672}
673
674/// "深度"类 SubType.
675///
676/// 仅用于旧的 empty desired-set bucket 标记和 trace;非空 CMD6211 不能按
677/// normal/depth 拆分。CMD6211 是 backend 覆盖式 set-state,请求必须携带同一
678/// market 的完整订阅 bit 集合;C++ `MktQotSub.cpp::Timer_SendSubReq` 也是每个
679/// market 发一条 `SubscribeSetReq`,在同一个 `SecuritySubscribe` 内追加全部 bit。
680fn is_depth_sub_type(sub_type: i32) -> bool {
681    matches!(sub_type, sub_type::ORDER_BOOK | sub_type::BROKER)
682}
683
684/// **v1.4.106 codex 1131 F1+F2**: 给 caller (SubHandler) 的 set-state 接口.
685///
686/// **语义**: 发送整组 desired (stock_id, ftapi_market, sub_type_with_opts) →
687/// backend 返 SubscribeSetRsp. backend "覆盖式" — 所有不在新集合中的旧订阅
688/// 自动取消 (per FTCmdStockQuoteSub.proto 设计 doc:
689/// "server会覆盖此客户端之前的订阅,并主动推送一次新增加的股票订阅位数据").
690///
691/// **F1 ack-then-commit**: caller 必须仅在 Ok 后才写 SubscriptionManager state.
692/// Err 时 → 不写 state, 返用户 ret_type=-1 + 错误原因.
693///
694/// **F2 unsub via fresh set**: 退订通过传"new desired set 不含 removed key"
695/// 实现, 不调单独 unsub backend cmd. SubHandler 计算
696/// `current global - removed` 后调本 fn.
697///
698/// **max_sub_count**: 响应中 backend 下发的 quota — caller 应调
699/// `SubscriptionManager::set_total_quota_from_backend(max_sub_count as u32)`
700/// 同步真值 (F5).
701///
702/// **return 值**: backend 下发的 max_sub_count (caller 据此 update SubscriptionManager
703/// 总配额 — F5 P2 dynamic quota).
704pub async fn submit_global_desired_set(
705    backend: &BackendConn,
706    securities: &[SecurityWithOpts],
707) -> std::result::Result<i32, QotSubError> {
708    if securities.is_empty() {
709        // Empty desired-set submit is ambiguous without the original backend
710        // route market. Call `submit_empty_desired_set_for_markets` with the
711        // market bucket(s) being cleared instead.
712        return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
713    }
714
715    // **v1.4.106 codex 0631 F3 [P2]**: 入口 validate 全部 ftapi_market 已知.
716    // 任一未知 → Err(UnsupportedMarket{offending}), 不发任何 CMD6211 请求.
717    //
718    // 老代码 (v1.4.106 codex 1131 F7) 是 silent skip + warn —— 全部未知时返
719    // Ok(()) 但 0 backend 请求 = silent success (CLAUDE.md 反模式 D
720    // silent-success). 修法: validate first, fail loud, caller 早 reject.
721    let offending: Vec<i32> = securities
722        .iter()
723        .filter_map(|sec| {
724            if ftapi_market_to_quote_mkt(sec.ftapi_market) == 0 {
725                Some(sec.ftapi_market)
726            } else {
727                None
728            }
729        })
730        .collect();
731    if !offending.is_empty() {
732        let mut dedup: Vec<i32> = offending;
733        dedup.sort_unstable();
734        dedup.dedup();
735        tracing::warn!(
736            offending = ?dedup,
737            "v1.4.106 codex 0631 F3: submit_global_desired_set rejected — unsupported ftapi_market(s)"
738        );
739        return Err(QotSubError::UnsupportedMarket { offending: dedup });
740    }
741
742    // 按后端市场类型分组(C++ 每个市场独立发送订阅请求)。
743    //
744    // 不再按 normal/depth 拆分: CMD6211 是覆盖式 set-state;同一 market 拆两条
745    // 请求会让后发请求覆盖先发请求,表现为 QUOTE 或 ORDER_BOOK 只能有一边持续
746    // push。C++ 对齐点: `MktQotSub.cpp::Timer_SendSubReq` 为一个 market 构造
747    // 单个 `SubscribeSetReq`,同一个 security 内聚合全部 `SubBit`。
748    //
749    // v1.4.110 Phase 2 Slice 4: MktGroup 内每项升 `(stock_id, broker_id, sub_types_with_opts)`
750    // 3-tuple, 让 build_subscribe_req_with_options 能写出
751    // SecuritySubscribe.broker_id.
752    type MktGroup = Vec<SecuritySubscribeInput>;
753    let mut by_market: BTreeMap<u8, MktGroup> = BTreeMap::new();
754    for sec in securities {
755        let mkt = ftapi_market_to_quote_mkt(sec.ftapi_market);
756        // F3 已 validate, 不可能是 0 — 此处 debug_assert.
757        debug_assert!(mkt != 0, "F3 validate 应已拒未知 market");
758        by_market.entry(mkt).or_default().push((
759            sec.stock_id,
760            sec.broker_id,
761            sec.sub_types_with_opts.clone(),
762        ));
763    }
764
765    // **v1.4.106 codex 0631 F3 [P2]**: 多 market 分批发送, 部分失败 → 不能当
766    // 全成功. 收集 succeeded / failed market 列表, 返 PartialMarketFailure.
767    //
768    // 老代码 .await? 短路: 第一个失败市场退出, max_sub_count=0 — caller 看不到
769    // 哪些市场已成功 (split state). 改为收集所有结果, 部分失败 loud Err.
770    let mut max_sub_count = 0i32;
771    let mut succeeded_markets: Vec<u8> = Vec::new();
772    let mut failed_markets: Vec<u8> = Vec::new();
773    let mut first_transport_err: Option<FutuError> = None;
774    for (mkt_type, secs) in &by_market {
775        let contains_depth_type = secs
776            .iter()
777            .any(|(_, _, sub_types)| sub_types.iter().any(|(st, _)| is_depth_sub_type(*st)));
778        match submit_subscribe_with_market(backend, secs, *mkt_type, contains_depth_type, false)
779            .await
780        {
781            Ok(count) => {
782                if count > max_sub_count {
783                    max_sub_count = count;
784                }
785                if !succeeded_markets.contains(mkt_type) {
786                    succeeded_markets.push(*mkt_type);
787                }
788            }
789            Err(QotSubError::Transport(e)) => {
790                // Transport error: 网络断 / TCP 错 — 整批中断, 透传不分批.
791                // 这种情况下 partial 没意义 (后续市场也会同样 Transport 错).
792                first_transport_err = Some(e);
793                break;
794            }
795            Err(_) => {
796                if !failed_markets.contains(mkt_type) {
797                    failed_markets.push(*mkt_type);
798                }
799            }
800        }
801    }
802    if let Some(e) = first_transport_err {
803        return Err(QotSubError::Transport(e));
804    }
805    if !failed_markets.is_empty() {
806        succeeded_markets.sort_unstable();
807        failed_markets.sort_unstable();
808        tracing::warn!(
809            succeeded = ?succeeded_markets,
810            failed = ?failed_markets,
811            "v1.4.106 codex 0631 F3: submit_global_desired_set partial failure"
812        );
813        return Err(QotSubError::PartialMarketFailure {
814            succeeded: succeeded_markets,
815            failed: failed_markets,
816        });
817    }
818
819    Ok(max_sub_count)
820}
821
822pub async fn submit_empty_desired_set_for_markets(
823    backend: &BackendConn,
824    markets: &[EmptyDesiredMarket],
825) -> std::result::Result<i32, QotSubError> {
826    if markets.is_empty() {
827        return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
828    }
829
830    let groups: Vec<EmptyDesiredMarket> = BTreeSet::from_iter(markets.iter().copied())
831        .into_iter()
832        .collect();
833    if groups.iter().any(|g| g.mkt_type == 0) {
834        return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
835    }
836
837    let mut max_sub_count = 0i32;
838    let mut succeeded_markets: Vec<u8> = Vec::new();
839    let mut failed_markets: Vec<u8> = Vec::new();
840    let mut first_transport_err: Option<FutuError> = None;
841
842    for group in groups {
843        match submit_subscribe_with_market(backend, &[], group.mkt_type, group.is_depth, true).await
844        {
845            Ok(count) => {
846                max_sub_count = max_sub_count.max(count);
847                if !succeeded_markets.contains(&group.mkt_type) {
848                    succeeded_markets.push(group.mkt_type);
849                }
850            }
851            Err(QotSubError::Transport(e)) => {
852                first_transport_err = Some(e);
853                break;
854            }
855            Err(_) => {
856                if !failed_markets.contains(&group.mkt_type) {
857                    failed_markets.push(group.mkt_type);
858                }
859            }
860        }
861    }
862
863    if let Some(e) = first_transport_err {
864        return Err(QotSubError::Transport(e));
865    }
866    if !failed_markets.is_empty() {
867        succeeded_markets.sort_unstable();
868        succeeded_markets.dedup();
869        failed_markets.sort_unstable();
870        failed_markets.dedup();
871        return Err(QotSubError::PartialMarketFailure {
872            succeeded: succeeded_markets,
873            failed: failed_markets,
874        });
875    }
876
877    Ok(max_sub_count)
878}
879
880/// 发送单个 (mkt_type, is_depth) 的 CMD6211 请求, 返 max_sub_count.
881///
882/// v1.4.110 Phase 2 Slice 4: secs 升 3-tuple `(stock_id, broker_id, sub_types)`
883/// 让 build_subscribe_req_with_options 能写出 SecuritySubscribe.broker_id.
884async fn submit_subscribe_with_market(
885    backend: &BackendConn,
886    secs: &[SecuritySubscribeInput],
887    mkt_type: u8,
888    is_depth: bool,
889    is_unsub_all: bool,
890) -> std::result::Result<i32, QotSubError> {
891    let req = if is_unsub_all {
892        // 全退场景 — security_list 空, reserved=1 让 body 非零 (Windows backend 兼容).
893        ft_cmd_stock_quote_sub::SubscribeSetReq {
894            security_list: vec![],
895            reserved: Some(1),
896            timer_sub: None,
897        }
898    } else {
899        build_subscribe_req_with_options(secs)
900    };
901    let body = req.encode_to_vec();
902
903    let mut reserved = [0u8; 10];
904    reserved[0] = mkt_type;
905    // v1.4.110 Phase 2 Slice 4: request_bits 是 trace 用, 不必含 broker_id;
906    // SecuritySubscribe.broker_id 在 build_subscribe_req_with_options 内部写.
907    let request_bits: Vec<(u64, Vec<(u32, i64)>)> = secs
908        .iter()
909        .map(|(stock_id, _broker_id, sub_types)| {
910            let bits = sub_types
911                .iter()
912                .flat_map(|(sub_type, opts)| sub_type_to_bits_with_options(*sub_type, *opts))
913                .collect();
914            (*stock_id, bits)
915        })
916        .collect();
917
918    tracing::info!(
919        mkt_type,
920        is_depth,
921        is_unsub_all,
922        count = secs.len(),
923        body_len = body.len(),
924        request_bits = ?request_bits,
925        "v1.4.106 codex 1131 F1: sending CMD6211 subscribe (set-state)"
926    );
927
928    let resp = backend
929        .request_with_reserved(CMD_QOT_SUB, body, reserved)
930        .await
931        .map_err(QotSubError::Transport)?;
932
933    let parsed: ft_cmd_stock_quote_sub::SubscribeSetRsp = Message::decode(resp.body.as_ref())
934        .map_err(|e| QotSubError::DecodeFailed(format!("{e}")))?;
935
936    let result = parsed.result.unwrap_or(-1);
937    let warning = parsed.warning_code.unwrap_or(0);
938    let max_sub_count = parsed.max_sub_count.unwrap_or(0);
939
940    if result != 0 {
941        // **F1 P1**: backend reject → 让 caller 知道 (Err), 不 silent-warn.
942        tracing::warn!(
943            mkt_type,
944            is_depth,
945            result,
946            warning,
947            request_bits = ?request_bits,
948            "v1.4.106 codex 1131 F1: CMD6211 backend rejected"
949        );
950        return Err(QotSubError::BackendRejected { result, warning });
951    }
952
953    tracing::info!(
954        mkt_type,
955        is_depth,
956        max_sub_count,
957        "v1.4.106 codex 1131 F1: CMD6211 ok"
958    );
959    Ok(max_sub_count)
960}
961
962#[cfg(test)]
963mod tests;