futu_qot/
push.rs

1use async_trait::async_trait;
2use futu_core::error::FutuError;
3use futu_core::proto_id;
4
5use crate::broker::{BrokerData, BrokerEntry};
6use crate::order_detail::OrderDetailData;
7use crate::rt::TimeShare;
8use crate::ticker::Ticker;
9use crate::types::{BasicQot, KLine, OrderBookData, OrderBookEntry, Security};
10
11/// 行情推送回调 trait
12///
13/// 实现此 trait 以接收实时行情推送。
14/// 使用 `QuotePushDispatcher::dispatch` 将原始推送消息分发到对应回调。
15#[async_trait]
16pub trait QuoteHandler: Send + Sync + 'static {
17    /// 基本行情更新推送
18    async fn on_basic_qot_update(&self, _qot_list: Vec<BasicQot>) {}
19
20    /// K 线更新推送
21    async fn on_kl_update(&self, _security: Security, _kl_list: Vec<KLine>) {}
22
23    /// 摆盘更新推送
24    async fn on_order_book_update(&self, _data: OrderBookData) {}
25
26    /// 逐笔更新推送
27    async fn on_ticker_update(&self, _security: Security, _ticker_list: Vec<Ticker>) {}
28
29    /// 分时更新推送
30    async fn on_rt_update(&self, _security: Security, _rt_list: Vec<TimeShare>) {}
31
32    /// 经纪队列更新推送
33    async fn on_broker_update(&self, _data: BrokerData) {}
34
35    /// 委托明细更新推送
36    async fn on_order_detail_update(&self, _data: OrderDetailData) {}
37}
38
39/// 行情推送分发器
40pub struct QuotePushDispatcher;
41
42impl QuotePushDispatcher {
43    /// 将原始推送消息分发到 QuoteHandler 的对应回调
44    pub async fn dispatch(
45        handler: &dyn QuoteHandler,
46        proto_id: u32,
47        body: &[u8],
48    ) -> Result<(), FutuError> {
49        match proto_id {
50            proto_id::QOT_UPDATE_BASIC_QOT => {
51                let resp: futu_proto::qot_update_basic_qot::Response =
52                    prost::Message::decode(body).map_err(FutuError::Proto)?;
53                if let Some(s2c) = resp.s2c {
54                    let qot_list: Vec<BasicQot> = s2c
55                        .basic_qot_list
56                        .iter()
57                        .map(BasicQot::from_proto)
58                        .collect();
59                    handler.on_basic_qot_update(qot_list).await;
60                }
61            }
62            proto_id::QOT_UPDATE_KL => {
63                let resp: futu_proto::qot_update_kl::Response =
64                    prost::Message::decode(body).map_err(FutuError::Proto)?;
65                if let Some(s2c) = resp.s2c {
66                    let security = Security::from_proto(&s2c.security);
67                    let kl_list: Vec<KLine> = s2c.kl_list.iter().map(KLine::from_proto).collect();
68                    handler.on_kl_update(security, kl_list).await;
69                }
70            }
71            proto_id::QOT_UPDATE_ORDER_BOOK => {
72                let resp: futu_proto::qot_update_order_book::Response =
73                    prost::Message::decode(body).map_err(FutuError::Proto)?;
74                if let Some(s2c) = resp.s2c {
75                    let data = OrderBookData {
76                        security: Security::from_proto(&s2c.security),
77                        ask_list: s2c
78                            .order_book_ask_list
79                            .iter()
80                            .map(OrderBookEntry::from_proto)
81                            .collect(),
82                        bid_list: s2c
83                            .order_book_bid_list
84                            .iter()
85                            .map(OrderBookEntry::from_proto)
86                            .collect(),
87                    };
88                    handler.on_order_book_update(data).await;
89                }
90            }
91            proto_id::QOT_UPDATE_TICKER => {
92                let resp: futu_proto::qot_update_ticker::Response =
93                    prost::Message::decode(body).map_err(FutuError::Proto)?;
94                if let Some(s2c) = resp.s2c {
95                    let security = Security::from_proto(&s2c.security);
96                    let ticker_list: Vec<Ticker> =
97                        s2c.ticker_list.iter().map(Ticker::from_proto).collect();
98                    handler.on_ticker_update(security, ticker_list).await;
99                }
100            }
101            proto_id::QOT_UPDATE_RT => {
102                let resp: futu_proto::qot_update_rt::Response =
103                    prost::Message::decode(body).map_err(FutuError::Proto)?;
104                if let Some(s2c) = resp.s2c {
105                    let security = Security::from_proto(&s2c.security);
106                    let rt_list: Vec<TimeShare> =
107                        s2c.rt_list.iter().map(TimeShare::from_proto).collect();
108                    handler.on_rt_update(security, rt_list).await;
109                }
110            }
111            proto_id::QOT_UPDATE_BROKER => {
112                let resp: futu_proto::qot_update_broker::Response =
113                    prost::Message::decode(body).map_err(FutuError::Proto)?;
114                if let Some(s2c) = resp.s2c {
115                    let data = BrokerData {
116                        security: Security::from_proto(&s2c.security),
117                        ask_list: s2c
118                            .broker_ask_list
119                            .iter()
120                            .map(BrokerEntry::from_proto)
121                            .collect(),
122                        bid_list: s2c
123                            .broker_bid_list
124                            .iter()
125                            .map(BrokerEntry::from_proto)
126                            .collect(),
127                    };
128                    handler.on_broker_update(data).await;
129                }
130            }
131            // NOTE: QOT_UPDATE_ORDER_DETAIL proto has been removed; push handler disabled.
132            // proto_id::QOT_UPDATE_ORDER_DETAIL => { ... }
133            _ => {
134                tracing::debug!(proto_id = proto_id, "unhandled quote push");
135            }
136        }
137        Ok(())
138    }
139}