1use std::collections::{BTreeMap, BTreeSet};
18
19use prost::Message;
20
21use futu_core::error::{FutuError, Result};
22
23use crate::conn::BackendConn;
24use crate::proto_internal::ft_cmd_stock_quote_sub;
25use crate::proto_internal::ft_cmd_stock_quote_sub_data;
26use crate::proto_internal::ft_cmd_tick;
27
28pub const CMD_QOT_SUB: u16 = 6211;
30pub const CMD_QOT_PUSH: u16 = 6212;
32pub const CMD_QOT_PULL_TICKER: u16 = 6128;
37
38const TICKER_FETCH_LATEST_KEY: u64 = u64::MAX;
40const TICKER_LATEST_DATE_TIME_S: u32 = u32::MAX;
42
43mod nn_quote_session {
44 pub const RTH: i32 = 0;
45 pub const ETH: i32 = 1;
46 pub const ALL: i32 = 2;
47}
48
49mod tick_period_type {
50 pub const NORMAL: u32 = 0;
52 pub const BEFORE: u32 = 1;
53 pub const AFTER: u32 = 2;
54 pub const OVERNIGHT: u32 = 4;
55}
56
57pub mod sub_type {
59 pub const BASIC: i32 = 1;
60 pub const ORDER_BOOK: i32 = 2;
61 pub const TICKER: i32 = 4;
62 pub const RT: i32 = 5;
63 pub const KL_DAY: i32 = 6;
64 pub const KL_5MIN: i32 = 7;
65 pub const KL_15MIN: i32 = 8;
66 pub const KL_30MIN: i32 = 9;
67 pub const KL_60MIN: i32 = 10;
68 pub const KL_1MIN: i32 = 11;
69 pub const KL_WEEK: i32 = 12;
70 pub const KL_MONTH: i32 = 13;
71 pub const BROKER: i32 = 14;
72 pub const KL_QUARTER: i32 = 15;
73 pub const KL_YEAR: i32 = 16;
74 pub const KL_3MIN: i32 = 17;
75}
76
77fn common_session_to_nn(session: i32) -> i32 {
78 match session {
79 2 => nn_quote_session::ETH,
81 3 => nn_quote_session::ALL,
82 _ => nn_quote_session::RTH,
83 }
84}
85
86fn ticker_periods_for_nn_session(nn_session: i32) -> Vec<u32> {
87 match nn_session {
88 nn_quote_session::ALL => vec![
89 tick_period_type::NORMAL,
90 tick_period_type::BEFORE,
91 tick_period_type::AFTER,
92 tick_period_type::OVERNIGHT,
93 ],
94 nn_quote_session::ETH => vec![
95 tick_period_type::NORMAL,
96 tick_period_type::BEFORE,
97 tick_period_type::AFTER,
98 ],
99 _ => vec![tick_period_type::NORMAL],
100 }
101}
102
103pub async fn pull_latest_ticker(
112 backend: &BackendConn,
113 stock_id: u64,
114 nn_mkt_type: u8,
115 common_session: i32,
116 pull_count: u32,
117 broker_id: Option<i32>,
118) -> Result<ft_cmd_tick::TickRsp> {
119 if stock_id == 0 || pull_count == 0 {
120 return Err(FutuError::Codec(format!(
121 "PullLatestTicker: invalid stock_id={stock_id} pull_count={pull_count}"
122 )));
123 }
124
125 let nn_session = if nn_mkt_type == ftapi_market_to_quote_mkt(11) {
128 common_session_to_nn(common_session)
129 } else {
130 nn_quote_session::RTH
131 };
132
133 let req = ft_cmd_tick::TickReq {
134 security_id: Some(stock_id),
135 date_time_s: Some(TICKER_LATEST_DATE_TIME_S),
136 begin_tick_key: Some(TICKER_FETCH_LATEST_KEY),
137 tick_count: Some(pull_count),
138 tick_period_type: None,
139 tick_period_type_ex: ticker_periods_for_nn_session(nn_session),
140 req_auth: None,
141 end_tick_key: None,
142 date_time_s_v2: None,
143 broker_id,
146 };
147
148 let mut reserved = [0u8; 10];
149 reserved[0] = nn_mkt_type;
150 let frame = backend
153 .request_with_reserved(CMD_QOT_PULL_TICKER, req.encode_to_vec(), reserved)
154 .await?;
155 let rsp: ft_cmd_tick::TickRsp = Message::decode(frame.body.as_ref())?;
156 let result = rsp.result.unwrap_or(-1);
157 if result != 0 {
158 return Err(FutuError::ServerError {
159 ret_type: result,
160 msg: format!("CMD6128 PullLatestTicker result={result}"),
161 });
162 }
163 Ok(rsp)
164}
165
166pub mod sbit {
168 pub const PRICE: u32 = 0;
169 pub const STOCK_STATE: u32 = 1;
170 pub const STOCK_TYPE_SPECIFIC: u32 = 2;
171 pub const ORDER_BOOK: u32 = 3;
172 pub const ORDER_BOOK_DETAIL: u32 = 4; pub const DEAL_STATISTICS: u32 = 5;
174 pub const HK_BROKER_QUEUE: u32 = 9;
175 pub const HK_BROKER_DETAIL: u32 = 10; pub const US_PREMARKET_AFTERHOURS: u32 = 13;
177 pub const US_LV2_ORDER: u32 = 17;
178 pub const TIME_SHARING: u32 = 20;
179 pub const KLINE_1MIN: u32 = 21;
180 pub const KLINE_3MIN: u32 = 22;
181 pub const KLINE_5MIN: u32 = 23;
182 pub const KLINE_15MIN: u32 = 24;
183 pub const KLINE_30MIN: u32 = 25;
184 pub const KLINE_60MIN: u32 = 26;
185 pub const KLINE_DAY: u32 = 27;
186 pub const KLINE_WEEK: u32 = 28;
187 pub const KLINE_MONTH: u32 = 29;
188 pub const KLINE_QUARTER: u32 = 30;
189 pub const KLINE_YEAR: u32 = 31;
190 pub const TICK: u32 = 35;
191 pub const MEGER_LV2_ORDER: u32 = 39;
192}
193
194#[derive(Debug, Clone)]
206pub struct SecurityWithOpts {
207 pub stock_id: u64,
208 pub ftapi_market: i32,
209 pub sub_types_with_opts: Vec<(i32, SubBitOptions)>,
210 pub broker_id: Option<std::num::NonZeroU32>,
215}
216
217impl SecurityWithOpts {
218 pub fn new(
220 stock_id: u64,
221 ftapi_market: i32,
222 sub_types_with_opts: Vec<(i32, SubBitOptions)>,
223 ) -> Self {
224 Self {
225 stock_id,
226 ftapi_market,
227 sub_types_with_opts,
228 broker_id: None,
231 }
232 }
233
234 pub fn with_broker(
236 stock_id: u64,
237 ftapi_market: i32,
238 sub_types_with_opts: Vec<(i32, SubBitOptions)>,
239 broker_id: u32,
240 ) -> Self {
241 Self {
242 stock_id,
243 ftapi_market,
244 sub_types_with_opts,
245 broker_id: std::num::NonZeroU32::new(broker_id),
246 }
247 }
248}
249
250#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
256pub struct EmptyDesiredMarket {
257 pub mkt_type: u8,
258 pub is_depth: bool,
259}
260
261pub fn empty_desired_market_for_sub(
262 ftapi_market: i32,
263 sub_type: i32,
264) -> Option<EmptyDesiredMarket> {
265 let mkt_type = ftapi_market_to_quote_mkt(ftapi_market);
266 if mkt_type == 0 {
267 return None;
268 }
269 Some(EmptyDesiredMarket {
270 mkt_type,
271 is_depth: is_depth_sub_type(sub_type),
272 })
273}
274
275#[derive(Debug)]
279pub enum QotSubError {
280 BackendRejected { result: i32, warning: i32 },
282 DecodeFailed(String),
284 Transport(FutuError),
286 UnsupportedMarket { offending: Vec<i32> },
291 PartialMarketFailure { succeeded: Vec<u8>, failed: Vec<u8> },
296}
297
298impl std::fmt::Display for QotSubError {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 match self {
301 QotSubError::BackendRejected { result, warning } => {
302 write!(
303 f,
304 "backend rejected CMD6211: result={result} warning={warning}"
305 )
306 }
307 QotSubError::DecodeFailed(s) => write!(f, "CMD6211 response decode failed: {s}"),
308 QotSubError::Transport(e) => write!(f, "CMD6211 transport error: {e}"),
309 QotSubError::UnsupportedMarket { offending } => write!(
310 f,
311 "CMD6211 unsupported ftapi_market(s): {offending:?} \
312 (ftapi_market_to_quote_mkt returned 0). Caller must validate \
313 ftapi_market before submit_global_desired_set."
314 ),
315 QotSubError::PartialMarketFailure { succeeded, failed } => write!(
316 f,
317 "CMD6211 partial-market failure: succeeded={succeeded:?} \
318 failed={failed:?}. State is split: succeeded markets are \
319 applied, failed markets need re-submit."
320 ),
321 }
322 }
323}
324
325impl std::error::Error for QotSubError {
326 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
327 match self {
328 QotSubError::Transport(e) => Some(e),
329 _ => None,
330 }
331 }
332}
333
334impl From<FutuError> for QotSubError {
335 fn from(e: FutuError) -> Self {
336 QotSubError::Transport(e)
337 }
338}
339
340pub fn sub_type_to_bits(sub_type: i32) -> Vec<(u32, i64)> {
343 match sub_type {
344 sub_type::BASIC => vec![
345 (sbit::PRICE, 0),
346 (sbit::STOCK_STATE, 0),
347 (sbit::STOCK_TYPE_SPECIFIC, 0),
348 (sbit::DEAL_STATISTICS, 0),
349 ],
350 sub_type::ORDER_BOOK => vec![(sbit::ORDER_BOOK, 0)],
351 sub_type::TICKER => vec![(sbit::TICK, 1)], sub_type::RT => vec![(sbit::STOCK_STATE, 0), (sbit::TIME_SHARING, 0)],
353 sub_type::BROKER => vec![(sbit::HK_BROKER_QUEUE, 0)],
354 sub_type::KL_1MIN => vec![(sbit::KLINE_1MIN, 1)], sub_type::KL_3MIN => vec![(sbit::KLINE_3MIN, 1)],
356 sub_type::KL_5MIN => vec![(sbit::KLINE_5MIN, 1)],
357 sub_type::KL_15MIN => vec![(sbit::KLINE_15MIN, 1)],
358 sub_type::KL_30MIN => vec![(sbit::KLINE_30MIN, 1)],
359 sub_type::KL_60MIN => vec![(sbit::KLINE_60MIN, 1)],
360 sub_type::KL_DAY => vec![(sbit::KLINE_DAY, 1)],
361 sub_type::KL_WEEK => vec![(sbit::KLINE_WEEK, 1)],
362 sub_type::KL_MONTH => vec![(sbit::KLINE_MONTH, 1)],
363 sub_type::KL_QUARTER => vec![(sbit::KLINE_QUARTER, 1)],
364 sub_type::KL_YEAR => vec![(sbit::KLINE_YEAR, 1)],
365 _ => vec![],
366 }
367}
368
369#[derive(Debug, Clone, Copy, Default)]
374pub struct SubBitOptions {
375 pub session: i32,
378 pub orderbook_detail: bool,
380 pub orderbook_full_depth: bool,
384 pub broker_detail: bool,
386 pub us_pre_after_detail: bool,
389 pub extended_time: bool,
392 pub merged_lv2_order_types: u32,
395 pub merged_lv2_order_types_v2: u32,
398}
399
400#[derive(Debug, Clone, PartialEq, Eq)]
401pub struct SubscribeBitInfo {
402 pub bit: u32,
403 pub prob: i64,
404 pub prob2: Option<String>,
405 pub prob2_v2: Option<Vec<u8>>,
406}
407
408const ORDER_BOOK_40_PROB: i64 = 0;
410const ORDER_BOOK_ALL_PROB: i64 = 1;
411const ORDER_BOOK_ALL_WITH_DETAIL_PROB: i64 = 2;
412const ORDER_BOOK_SIMPLE_LV2_PROB: i64 = 8 | 16;
413
414fn encode_us_lv2_order_sub_prob(lv2_type: u32, level: u32) -> Vec<u8> {
415 ft_cmd_stock_quote_sub_data::Uslv2OrderSubProb {
416 sub_items: vec![ft_cmd_stock_quote_sub_data::Uslv2OrderSubItem {
417 us_lv2_order_type: Some(lv2_type),
418 us_lv2_order_level: Some(level),
419 }],
420 }
421 .encode_to_vec()
422}
423
424fn for_each_type_mask_bit(mut mask: u32, mut f: impl FnMut(u32)) {
425 while mask != 0 {
426 let bit = mask & (!mask + 1);
427 f(bit);
428 mask &= !bit;
429 }
430}
431
432fn for_each_us_lv2_prob2_type(mask: u32, mut f: impl FnMut(u32)) {
433 for known in [
437 futu_cache::qot_right::US_LV2_ORDER_NASDAQ_TV,
438 futu_cache::qot_right::US_LV2_ORDER_ARCA,
439 ] {
440 if mask & known != 0 {
441 f(known);
442 }
443 }
444 let remaining = mask
445 & !(futu_cache::qot_right::US_LV2_ORDER_NASDAQ_TV
446 | futu_cache::qot_right::US_LV2_ORDER_ARCA);
447 for_each_type_mask_bit(remaining, f);
448}
449
450pub fn sub_type_to_bits_with_options(sub_type: i32, opts: SubBitOptions) -> Vec<(u32, i64)> {
455 sub_type_to_bit_infos_with_options(sub_type, opts)
456 .into_iter()
457 .map(|info| (info.bit, info.prob))
458 .collect()
459}
460
461pub fn sub_type_to_bit_infos_with_options(
462 sub_type: i32,
463 opts: SubBitOptions,
464) -> Vec<SubscribeBitInfo> {
465 let ticker_session_prob = match opts.session {
473 2 => 1 | 2 | 4,
474 3 => 1 | 2 | 4 | 8,
475 _ => 1,
476 };
477 let kline_session_prob = match opts.session {
481 2 => 2,
482 3 => 3,
483 _ => 1,
484 };
485 match sub_type {
486 sub_type::BASIC => {
487 let mut infos = vec![
488 SubscribeBitInfo::new(sbit::PRICE, 0),
489 SubscribeBitInfo::new(sbit::STOCK_STATE, 0),
490 SubscribeBitInfo::new(sbit::STOCK_TYPE_SPECIFIC, 0),
491 SubscribeBitInfo::new(sbit::DEAL_STATISTICS, 0),
492 ];
493 if opts.us_pre_after_detail {
494 infos.push(SubscribeBitInfo::new(sbit::US_PREMARKET_AFTERHOURS, 0));
497 }
498 infos
499 }
500 sub_type::ORDER_BOOK => {
501 let bit = if opts.orderbook_detail {
502 sbit::ORDER_BOOK_DETAIL
503 } else {
504 sbit::ORDER_BOOK
505 };
506 let mut infos = Vec::new();
507 for_each_type_mask_bit(opts.merged_lv2_order_types_v2, |lv2_type| {
508 infos.push(SubscribeBitInfo {
509 bit: sbit::MEGER_LV2_ORDER,
510 prob: 0,
511 prob2: None,
512 prob2_v2: Some(encode_us_lv2_order_sub_prob(lv2_type, 60)),
513 });
514 });
515 for_each_us_lv2_prob2_type(opts.merged_lv2_order_types, |lv2_type| {
516 let bytes = encode_us_lv2_order_sub_prob(lv2_type, 60);
517 let (prob2, prob2_v2) = encode_legacy_or_bytes_prob2(bytes);
518 infos.push(SubscribeBitInfo {
519 bit: sbit::MEGER_LV2_ORDER,
520 prob: 0,
521 prob2,
522 prob2_v2,
523 });
524 });
525 let prob = if opts.orderbook_full_depth {
526 if opts.orderbook_detail {
527 ORDER_BOOK_ALL_WITH_DETAIL_PROB
528 } else {
529 ORDER_BOOK_ALL_PROB
530 }
531 } else if opts.merged_lv2_order_types != 0 || opts.merged_lv2_order_types_v2 != 0 {
532 ORDER_BOOK_SIMPLE_LV2_PROB
533 } else {
534 ORDER_BOOK_40_PROB
535 };
536 infos.push(SubscribeBitInfo::new(bit, prob));
537 infos
538 }
539 sub_type::TICKER => vec![SubscribeBitInfo::new(sbit::TICK, ticker_session_prob)],
540 sub_type::RT => vec![
541 SubscribeBitInfo::new(sbit::STOCK_STATE, 0),
542 SubscribeBitInfo::new(sbit::TIME_SHARING, 0),
543 ],
544 sub_type::BROKER => {
545 let bit = if opts.broker_detail {
546 sbit::HK_BROKER_DETAIL
547 } else {
548 sbit::HK_BROKER_QUEUE
549 };
550 vec![SubscribeBitInfo::new(bit, 0)]
551 }
552 sub_type::KL_1MIN => vec![SubscribeBitInfo::new(sbit::KLINE_1MIN, kline_session_prob)],
553 sub_type::KL_3MIN => vec![SubscribeBitInfo::new(sbit::KLINE_3MIN, kline_session_prob)],
554 sub_type::KL_5MIN => vec![SubscribeBitInfo::new(sbit::KLINE_5MIN, kline_session_prob)],
555 sub_type::KL_15MIN => vec![SubscribeBitInfo::new(sbit::KLINE_15MIN, kline_session_prob)],
556 sub_type::KL_30MIN => vec![SubscribeBitInfo::new(sbit::KLINE_30MIN, kline_session_prob)],
557 sub_type::KL_60MIN => vec![SubscribeBitInfo::new(sbit::KLINE_60MIN, kline_session_prob)],
558 sub_type::KL_DAY => vec![SubscribeBitInfo::new(sbit::KLINE_DAY, kline_session_prob)],
559 sub_type::KL_WEEK => vec![SubscribeBitInfo::new(sbit::KLINE_WEEK, kline_session_prob)],
560 sub_type::KL_MONTH => vec![SubscribeBitInfo::new(sbit::KLINE_MONTH, kline_session_prob)],
561 sub_type::KL_QUARTER => vec![SubscribeBitInfo::new(
562 sbit::KLINE_QUARTER,
563 kline_session_prob,
564 )],
565 sub_type::KL_YEAR => vec![SubscribeBitInfo::new(sbit::KLINE_YEAR, kline_session_prob)],
566 _ => vec![],
567 }
568}
569
570impl SubscribeBitInfo {
571 fn new(bit: u32, prob: i64) -> Self {
572 Self {
573 bit,
574 prob,
575 prob2: None,
576 prob2_v2: None,
577 }
578 }
579}
580
581fn encode_legacy_or_bytes_prob2(bytes: Vec<u8>) -> (Option<String>, Option<Vec<u8>>) {
582 match String::from_utf8(bytes) {
583 Ok(prob2) => (Some(prob2), None),
584 Err(err) => (None, Some(err.into_bytes())),
585 }
586}
587
588pub type SecuritySubscribeInput = (u64, Option<std::num::NonZeroU32>, Vec<(i32, SubBitOptions)>);
594
595pub fn build_subscribe_req_with_options(
609 securities: &[SecuritySubscribeInput],
610) -> ft_cmd_stock_quote_sub::SubscribeSetReq {
611 let mut security_list = Vec::new();
612
613 for (stock_id, broker_id, sub_types_with_opts) in securities {
614 let mut bit_info_list = Vec::new();
615 for (st, opts) in sub_types_with_opts {
616 for info in sub_type_to_bit_infos_with_options(*st, *opts) {
617 bit_info_list.push(ft_cmd_stock_quote_sub_data::BitInfo {
618 bit: Some(info.bit),
619 prob: Some(info.prob),
620 prob2: info.prob2,
621 prob2_v2: info.prob2_v2,
622 });
623 }
624 }
625 security_list.push(ft_cmd_stock_quote_sub_data::SecuritySubscribe {
626 security_id: Some(*stock_id),
627 bit_info_list,
628 broker_id: broker_id.map(|nz| nz.get() as i32),
631 });
632 }
633
634 ft_cmd_stock_quote_sub::SubscribeSetReq {
635 security_list,
636 reserved: None,
637 timer_sub: None,
638 }
639}
640
641pub fn ftapi_market_to_quote_mkt(market: i32) -> u8 {
645 match market {
646 1 => 1, 11 => 2, 21 => 3, 22 => 4, 5 => 5, 6 => 6, 9 => 9, 13 => 13, 15 => 7, 14 => 8, 16 => 16, 23 => 10, 91 => 17, _ => 0,
671 }
672}
673
674fn is_depth_sub_type(sub_type: i32) -> bool {
681 matches!(sub_type, sub_type::ORDER_BOOK | sub_type::BROKER)
682}
683
684pub async fn submit_global_desired_set(
705 backend: &BackendConn,
706 securities: &[SecurityWithOpts],
707) -> std::result::Result<i32, QotSubError> {
708 if securities.is_empty() {
709 return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
713 }
714
715 let offending: Vec<i32> = securities
722 .iter()
723 .filter_map(|sec| {
724 if ftapi_market_to_quote_mkt(sec.ftapi_market) == 0 {
725 Some(sec.ftapi_market)
726 } else {
727 None
728 }
729 })
730 .collect();
731 if !offending.is_empty() {
732 let mut dedup: Vec<i32> = offending;
733 dedup.sort_unstable();
734 dedup.dedup();
735 tracing::warn!(
736 offending = ?dedup,
737 "v1.4.106 codex 0631 F3: submit_global_desired_set rejected — unsupported ftapi_market(s)"
738 );
739 return Err(QotSubError::UnsupportedMarket { offending: dedup });
740 }
741
742 type MktGroup = Vec<SecuritySubscribeInput>;
753 let mut by_market: BTreeMap<u8, MktGroup> = BTreeMap::new();
754 for sec in securities {
755 let mkt = ftapi_market_to_quote_mkt(sec.ftapi_market);
756 debug_assert!(mkt != 0, "F3 validate 应已拒未知 market");
758 by_market.entry(mkt).or_default().push((
759 sec.stock_id,
760 sec.broker_id,
761 sec.sub_types_with_opts.clone(),
762 ));
763 }
764
765 let mut max_sub_count = 0i32;
771 let mut succeeded_markets: Vec<u8> = Vec::new();
772 let mut failed_markets: Vec<u8> = Vec::new();
773 let mut first_transport_err: Option<FutuError> = None;
774 for (mkt_type, secs) in &by_market {
775 let contains_depth_type = secs
776 .iter()
777 .any(|(_, _, sub_types)| sub_types.iter().any(|(st, _)| is_depth_sub_type(*st)));
778 match submit_subscribe_with_market(backend, secs, *mkt_type, contains_depth_type, false)
779 .await
780 {
781 Ok(count) => {
782 if count > max_sub_count {
783 max_sub_count = count;
784 }
785 if !succeeded_markets.contains(mkt_type) {
786 succeeded_markets.push(*mkt_type);
787 }
788 }
789 Err(QotSubError::Transport(e)) => {
790 first_transport_err = Some(e);
793 break;
794 }
795 Err(_) => {
796 if !failed_markets.contains(mkt_type) {
797 failed_markets.push(*mkt_type);
798 }
799 }
800 }
801 }
802 if let Some(e) = first_transport_err {
803 return Err(QotSubError::Transport(e));
804 }
805 if !failed_markets.is_empty() {
806 succeeded_markets.sort_unstable();
807 failed_markets.sort_unstable();
808 tracing::warn!(
809 succeeded = ?succeeded_markets,
810 failed = ?failed_markets,
811 "v1.4.106 codex 0631 F3: submit_global_desired_set partial failure"
812 );
813 return Err(QotSubError::PartialMarketFailure {
814 succeeded: succeeded_markets,
815 failed: failed_markets,
816 });
817 }
818
819 Ok(max_sub_count)
820}
821
822pub async fn submit_empty_desired_set_for_markets(
823 backend: &BackendConn,
824 markets: &[EmptyDesiredMarket],
825) -> std::result::Result<i32, QotSubError> {
826 if markets.is_empty() {
827 return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
828 }
829
830 let groups: Vec<EmptyDesiredMarket> = BTreeSet::from_iter(markets.iter().copied())
831 .into_iter()
832 .collect();
833 if groups.iter().any(|g| g.mkt_type == 0) {
834 return Err(QotSubError::UnsupportedMarket { offending: vec![0] });
835 }
836
837 let mut max_sub_count = 0i32;
838 let mut succeeded_markets: Vec<u8> = Vec::new();
839 let mut failed_markets: Vec<u8> = Vec::new();
840 let mut first_transport_err: Option<FutuError> = None;
841
842 for group in groups {
843 match submit_subscribe_with_market(backend, &[], group.mkt_type, group.is_depth, true).await
844 {
845 Ok(count) => {
846 max_sub_count = max_sub_count.max(count);
847 if !succeeded_markets.contains(&group.mkt_type) {
848 succeeded_markets.push(group.mkt_type);
849 }
850 }
851 Err(QotSubError::Transport(e)) => {
852 first_transport_err = Some(e);
853 break;
854 }
855 Err(_) => {
856 if !failed_markets.contains(&group.mkt_type) {
857 failed_markets.push(group.mkt_type);
858 }
859 }
860 }
861 }
862
863 if let Some(e) = first_transport_err {
864 return Err(QotSubError::Transport(e));
865 }
866 if !failed_markets.is_empty() {
867 succeeded_markets.sort_unstable();
868 succeeded_markets.dedup();
869 failed_markets.sort_unstable();
870 failed_markets.dedup();
871 return Err(QotSubError::PartialMarketFailure {
872 succeeded: succeeded_markets,
873 failed: failed_markets,
874 });
875 }
876
877 Ok(max_sub_count)
878}
879
880async fn submit_subscribe_with_market(
885 backend: &BackendConn,
886 secs: &[SecuritySubscribeInput],
887 mkt_type: u8,
888 is_depth: bool,
889 is_unsub_all: bool,
890) -> std::result::Result<i32, QotSubError> {
891 let req = if is_unsub_all {
892 ft_cmd_stock_quote_sub::SubscribeSetReq {
894 security_list: vec![],
895 reserved: Some(1),
896 timer_sub: None,
897 }
898 } else {
899 build_subscribe_req_with_options(secs)
900 };
901 let body = req.encode_to_vec();
902
903 let mut reserved = [0u8; 10];
904 reserved[0] = mkt_type;
905 let request_bits: Vec<(u64, Vec<(u32, i64)>)> = secs
908 .iter()
909 .map(|(stock_id, _broker_id, sub_types)| {
910 let bits = sub_types
911 .iter()
912 .flat_map(|(sub_type, opts)| sub_type_to_bits_with_options(*sub_type, *opts))
913 .collect();
914 (*stock_id, bits)
915 })
916 .collect();
917
918 tracing::info!(
919 mkt_type,
920 is_depth,
921 is_unsub_all,
922 count = secs.len(),
923 body_len = body.len(),
924 request_bits = ?request_bits,
925 "v1.4.106 codex 1131 F1: sending CMD6211 subscribe (set-state)"
926 );
927
928 let resp = backend
929 .request_with_reserved(CMD_QOT_SUB, body, reserved)
930 .await
931 .map_err(QotSubError::Transport)?;
932
933 let parsed: ft_cmd_stock_quote_sub::SubscribeSetRsp = Message::decode(resp.body.as_ref())
934 .map_err(|e| QotSubError::DecodeFailed(format!("{e}")))?;
935
936 let result = parsed.result.unwrap_or(-1);
937 let warning = parsed.warning_code.unwrap_or(0);
938 let max_sub_count = parsed.max_sub_count.unwrap_or(0);
939
940 if result != 0 {
941 tracing::warn!(
943 mkt_type,
944 is_depth,
945 result,
946 warning,
947 request_bits = ?request_bits,
948 "v1.4.106 codex 1131 F1: CMD6211 backend rejected"
949 );
950 return Err(QotSubError::BackendRejected { result, warning });
951 }
952
953 tracing::info!(
954 mkt_type,
955 is_depth,
956 max_sub_count,
957 "v1.4.106 codex 1131 F1: CMD6211 ok"
958 );
959 Ok(max_sub_count)
960}
961
962#[cfg(test)]
963mod tests;