1use anyhow::Result;
8use async_trait::async_trait;
9use futu_qot::push::{QuoteHandler, QuotePushDispatcher};
10use futu_qot::types::{BasicQot, KLine, OrderBookData, Security};
11
12use crate::common::{connect_gateway, format_symbol, parse_sub_types, parse_symbol};
13use crate::output::OutputFormat;
14
15struct PrintHandler;
17
18struct JsonlHandler;
20
21#[async_trait]
22impl QuoteHandler for PrintHandler {
23 async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
24 for q in qot_list {
25 let sym = format_symbol(&q.security);
26 let change = q.cur_price - q.last_close_price;
27 let pct = if q.last_close_price != 0.0 {
28 change / q.last_close_price * 100.0
29 } else {
30 0.0
31 };
32 let sign = if change >= 0.0 { "+" } else { "" };
33 println!(
34 "[{}] basic {sym:<12} price={:.3} change={sign}{change:.3} ({sign}{pct:.2}%) vol={}",
35 q.update_time, q.cur_price, q.volume
36 );
37 }
38 }
39
40 async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
41 let sym = format_symbol(&security);
42 for k in kl_list {
43 println!(
44 "[{}] kl {sym:<12} O={:.3} H={:.3} L={:.3} C={:.3} V={}",
45 k.time, k.open_price, k.high_price, k.low_price, k.close_price, k.volume
46 );
47 }
48 }
49
50 async fn on_order_book_update(&self, data: OrderBookData) {
51 let sym = format_symbol(&data.security);
52 let top_bid = data.bid_list.first();
53 let top_ask = data.ask_list.first();
54 match (top_bid, top_ask) {
55 (Some(b), Some(a)) => println!(
56 " ob {sym:<12} bid={:.3}x{} ask={:.3}x{} spread={:.3}",
57 b.price,
58 b.volume,
59 a.price,
60 a.volume,
61 a.price - b.price
62 ),
63 _ => println!(" ob {sym:<12} (empty)"),
64 }
65 }
66
67 async fn on_ticker_update(
68 &self,
69 security: Security,
70 ticker_list: Vec<futu_qot::ticker::Ticker>,
71 ) {
72 let sym = format_symbol(&security);
73 for t in ticker_list {
74 println!(
75 " tick {sym:<12} px={:.3} vol={} dir={}",
76 t.price, t.volume, t.dir
77 );
78 }
79 }
80
81 async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
82 let sym = format_symbol(&security);
83 for r in rt_list {
84 println!(
85 "[{}] rt {sym:<12} px={:.3} avg={:.3} vol={}",
86 r.time, r.price, r.avg_price, r.volume
87 );
88 }
89 }
90}
91
92#[async_trait]
93impl QuoteHandler for JsonlHandler {
94 async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
95 for q in qot_list {
96 let line = serde_json::json!({
97 "kind": "basic",
98 "symbol": format_symbol(&q.security),
99 "update_time": q.update_time,
100 "cur_price": q.cur_price,
101 "last_close": q.last_close_price,
102 "volume": q.volume,
103 "turnover": q.turnover,
104 "open_price": q.open_price,
105 "high_price": q.high_price,
106 "low_price": q.low_price,
107 });
108 println!("{}", line);
109 }
110 }
111 async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
112 for k in kl_list {
113 let line = serde_json::json!({
114 "kind": "kline",
115 "symbol": format_symbol(&security),
116 "time": k.time,
117 "open": k.open_price,
118 "high": k.high_price,
119 "low": k.low_price,
120 "close": k.close_price,
121 "volume": k.volume,
122 });
123 println!("{}", line);
124 }
125 }
126 async fn on_order_book_update(&self, data: OrderBookData) {
127 let line = serde_json::json!({
128 "kind": "orderbook",
129 "symbol": format_symbol(&data.security),
130 "bid_top": data.bid_list.first().map(|b| serde_json::json!({"price": b.price, "volume": b.volume})),
131 "ask_top": data.ask_list.first().map(|a| serde_json::json!({"price": a.price, "volume": a.volume})),
132 });
133 println!("{}", line);
134 }
135 async fn on_ticker_update(
136 &self,
137 security: Security,
138 ticker_list: Vec<futu_qot::ticker::Ticker>,
139 ) {
140 for t in ticker_list {
141 let line = serde_json::json!({
142 "kind": "ticker",
143 "symbol": format_symbol(&security),
144 "price": t.price,
145 "volume": t.volume,
146 "dir": t.dir,
147 });
148 println!("{}", line);
149 }
150 }
151 async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
152 for r in rt_list {
153 let line = serde_json::json!({
154 "kind": "rt",
155 "symbol": format_symbol(&security),
156 "time": r.time,
157 "price": r.price,
158 "avg_price": r.avg_price,
159 "volume": r.volume,
160 });
161 println!("{}", line);
162 }
163 }
164}
165
166pub async fn run(
167 gateway: &str,
168 symbols: &[String],
169 types_csv: &str,
170 format: OutputFormat,
171) -> Result<()> {
172 let secs: Vec<_> = symbols
173 .iter()
174 .map(|s| parse_symbol(s))
175 .collect::<Result<_>>()?;
176 let sub_types = parse_sub_types(types_csv)?;
177
178 let (client, mut push_rx) = connect_gateway(gateway, "futucli-sub").await?;
179
180 futu_qot::sub::subscribe(&client, &secs, &sub_types, true, true).await?;
182 eprintln!(
183 "✓ subscribed: symbols={:?}, types={types_csv} (Ctrl-C to stop)",
184 symbols
185 );
186
187 let use_jsonl = matches!(format, OutputFormat::Json | OutputFormat::Jsonl);
190
191 loop {
192 tokio::select! {
193 maybe = push_rx.recv() => {
194 let Some(msg) = maybe else {
195 eprintln!("⚠️ push channel closed");
196 break;
197 };
198 let dispatch_err = if use_jsonl {
199 QuotePushDispatcher::dispatch(&JsonlHandler, msg.proto_id, &msg.body).await
200 } else {
201 QuotePushDispatcher::dispatch(&PrintHandler, msg.proto_id, &msg.body).await
202 };
203 if let Err(e) = dispatch_err {
204 eprintln!("push dispatch error: {e}");
205 }
206 }
207 _ = tokio::signal::ctrl_c() => {
208 eprintln!("\n⏹ stopping (ctrl-c)");
209 let _ = futu_qot::sub::unsubscribe(&client, &secs, &sub_types).await;
211 break;
212 }
213 }
214 }
215 Ok(())
216}