Skip to main content

futucli/cmd/
sub.rs

1//! `futucli sub` — 订阅推送并持续打印,Ctrl-C 停止
2//!
3//! v1.4.98 eli BUG-005 fix (P2, 2026-04-27): `-o jsonl` 之前忽略, 仍打印
4//! `[time] basic HK.00700 price=...` 文本. 修后 `-o jsonl` 每条 push 一行
5//! 合法 JSON object (脚本 / agent 用).
6
7use anyhow::Result;
8use async_trait::async_trait;
9use futu_qot::push::{QuoteHandler, QuotePushDispatcher};
10use futu_qot::types::{BasicQot, KLine, OrderBookData, Security};
11
12use crate::common::{connect_gateway, format_symbol, parse_sub_types, parse_symbol};
13use crate::output::OutputFormat;
14
15/// PrintHandler — text format (default & --output table).
16struct PrintHandler;
17
18/// JsonlHandler — emit one JSON object per push event (--output jsonl/json).
19struct JsonlHandler;
20
21#[async_trait]
22impl QuoteHandler for PrintHandler {
23    async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
24        for q in qot_list {
25            let sym = format_symbol(&q.security);
26            let change = q.cur_price - q.last_close_price;
27            let pct = if q.last_close_price != 0.0 {
28                change / q.last_close_price * 100.0
29            } else {
30                0.0
31            };
32            let sign = if change >= 0.0 { "+" } else { "" };
33            println!(
34                "[{}] basic {sym:<12} price={:.3} change={sign}{change:.3} ({sign}{pct:.2}%) vol={}",
35                q.update_time, q.cur_price, q.volume
36            );
37        }
38    }
39
40    async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
41        let sym = format_symbol(&security);
42        for k in kl_list {
43            println!(
44                "[{}] kl    {sym:<12} O={:.3} H={:.3} L={:.3} C={:.3} V={}",
45                k.time, k.open_price, k.high_price, k.low_price, k.close_price, k.volume
46            );
47        }
48    }
49
50    async fn on_order_book_update(&self, data: OrderBookData) {
51        let sym = format_symbol(&data.security);
52        let top_bid = data.bid_list.first();
53        let top_ask = data.ask_list.first();
54        match (top_bid, top_ask) {
55            (Some(b), Some(a)) => println!(
56                "              ob    {sym:<12} bid={:.3}x{}  ask={:.3}x{}  spread={:.3}",
57                b.price,
58                b.volume,
59                a.price,
60                a.volume,
61                a.price - b.price
62            ),
63            _ => println!("              ob    {sym:<12} (empty)"),
64        }
65    }
66
67    async fn on_ticker_update(
68        &self,
69        security: Security,
70        ticker_list: Vec<futu_qot::ticker::Ticker>,
71    ) {
72        let sym = format_symbol(&security);
73        for t in ticker_list {
74            println!(
75                "              tick  {sym:<12} px={:.3} vol={} dir={}",
76                t.price, t.volume, t.dir
77            );
78        }
79    }
80
81    async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
82        let sym = format_symbol(&security);
83        for r in rt_list {
84            println!(
85                "[{}] rt    {sym:<12} px={:.3} avg={:.3} vol={}",
86                r.time, r.price, r.avg_price, r.volume
87            );
88        }
89    }
90}
91
92#[async_trait]
93impl QuoteHandler for JsonlHandler {
94    async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
95        for q in qot_list {
96            let line = serde_json::json!({
97                "kind": "basic",
98                "symbol": format_symbol(&q.security),
99                "update_time": q.update_time,
100                "cur_price": q.cur_price,
101                "last_close": q.last_close_price,
102                "volume": q.volume,
103                "turnover": q.turnover,
104                "open_price": q.open_price,
105                "high_price": q.high_price,
106                "low_price": q.low_price,
107            });
108            println!("{}", line);
109        }
110    }
111    async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
112        for k in kl_list {
113            let line = serde_json::json!({
114                "kind": "kline",
115                "symbol": format_symbol(&security),
116                "time": k.time,
117                "open": k.open_price,
118                "high": k.high_price,
119                "low": k.low_price,
120                "close": k.close_price,
121                "volume": k.volume,
122            });
123            println!("{}", line);
124        }
125    }
126    async fn on_order_book_update(&self, data: OrderBookData) {
127        let line = serde_json::json!({
128            "kind": "orderbook",
129            "symbol": format_symbol(&data.security),
130            "bid_top": data.bid_list.first().map(|b| serde_json::json!({"price": b.price, "volume": b.volume})),
131            "ask_top": data.ask_list.first().map(|a| serde_json::json!({"price": a.price, "volume": a.volume})),
132        });
133        println!("{}", line);
134    }
135    async fn on_ticker_update(
136        &self,
137        security: Security,
138        ticker_list: Vec<futu_qot::ticker::Ticker>,
139    ) {
140        for t in ticker_list {
141            let line = serde_json::json!({
142                "kind": "ticker",
143                "symbol": format_symbol(&security),
144                "price": t.price,
145                "volume": t.volume,
146                "dir": t.dir,
147            });
148            println!("{}", line);
149        }
150    }
151    async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
152        for r in rt_list {
153            let line = serde_json::json!({
154                "kind": "rt",
155                "symbol": format_symbol(&security),
156                "time": r.time,
157                "price": r.price,
158                "avg_price": r.avg_price,
159                "volume": r.volume,
160            });
161            println!("{}", line);
162        }
163    }
164}
165
166pub async fn run(
167    gateway: &str,
168    symbols: &[String],
169    types_csv: &str,
170    format: OutputFormat,
171) -> Result<()> {
172    let secs: Vec<_> = symbols
173        .iter()
174        .map(|s| parse_symbol(s))
175        .collect::<Result<_>>()?;
176    let sub_types = parse_sub_types(types_csv)?;
177
178    let (client, mut push_rx) = connect_gateway(gateway, "futucli-sub").await?;
179
180    // 订阅
181    futu_qot::sub::subscribe(&client, &secs, &sub_types, true, true).await?;
182    eprintln!(
183        "✓ subscribed: symbols={:?}, types={types_csv}  (Ctrl-C to stop)",
184        symbols
185    );
186
187    // v1.4.98 eli BUG-005: jsonl/json → JsonlHandler 输出每条 push 一行
188    // JSON object; 否则 PrintHandler 文本.
189    let use_jsonl = matches!(format, OutputFormat::Json | OutputFormat::Jsonl);
190
191    loop {
192        tokio::select! {
193            maybe = push_rx.recv() => {
194                let Some(msg) = maybe else {
195                    eprintln!("⚠️  push channel closed");
196                    break;
197                };
198                let dispatch_err = if use_jsonl {
199                    QuotePushDispatcher::dispatch(&JsonlHandler, msg.proto_id, &msg.body).await
200                } else {
201                    QuotePushDispatcher::dispatch(&PrintHandler, msg.proto_id, &msg.body).await
202                };
203                if let Err(e) = dispatch_err {
204                    eprintln!("push dispatch error: {e}");
205                }
206            }
207            _ = tokio::signal::ctrl_c() => {
208                eprintln!("\n⏹  stopping (ctrl-c)");
209                // 尝试退订,静默错误
210                let _ = futu_qot::sub::unsubscribe(&client, &secs, &sub_types).await;
211                break;
212            }
213        }
214    }
215    Ok(())
216}