Skip to main content

futucli/cmd/
repl.rs

1//! `futucli repl` — 交互式 REPL
2//!
3//! 特性:
4//! - 共享一条 FutuClient 长连接,避免每条命令重连网关
5//! - 复用所有子命令(通过 `cli::dispatch`)
6//! - rustyline 行编辑:↑↓ 历史、Ctrl-R 反向搜索、Ctrl-D 退出
7//! - 历史持久化到 ~/.cache/futucli/history(或 $XDG_CACHE_HOME)
8//! - 订阅推送实时打印且不打断 prompt(`ExternalPrinter`)
9//! - REPL 专属命令:help / exit / reconnect / subs / unsub
10//!
11//! 实现要点:`rustyline::readline` 是阻塞式 API,放进 `tokio::task::spawn_blocking`
12//! 以与 async 命令共存。
13
14use std::collections::{BTreeMap, HashSet};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use anyhow::{Context, Result, anyhow, bail};
19use async_trait::async_trait;
20use clap::Parser;
21use futu_net::client::{FutuClient, PushMessage};
22use futu_qot::push::{QuoteHandler, QuotePushDispatcher};
23use futu_qot::types::{BasicQot, KLine, OrderBookData, Security, SubType};
24use rustyline::error::ReadlineError;
25use rustyline::{DefaultEditor, ExternalPrinter};
26use tokio::sync::{Mutex, mpsc};
27
28use crate::cli::{Cli, Command};
29use crate::common::{connect_gateway, format_symbol, parse_sub_type, parse_symbol};
30use crate::output::OutputFormat;
31
32/// REPL 运行时状态。订阅状态按 `Security → 订阅的 SubType 集合` 记录,
33/// 用 `BTreeMap` 保证 `subs` 命令输出稳定可读。
34struct ReplState {
35    gateway: String,
36    output: OutputFormat,
37    client: Arc<FutuClient>,
38    subs: Mutex<BTreeMap<String, HashSet<SubType>>>,
39}
40
41/// 推送打印器:tty 下走 rustyline ExternalPrinter(不撞 prompt),
42/// 非 tty(pipe / 重定向)下降级到 stderr,保证脚本 / 测试里 REPL 仍可用。
43enum PushPrinter {
44    External(Box<dyn ExternalPrinter + Send>),
45    Stderr,
46}
47
48impl PushPrinter {
49    fn print(&mut self, msg: String) {
50        match self {
51            PushPrinter::External(p) => {
52                let _ = p.print(msg);
53            }
54            PushPrinter::Stderr => {
55                eprint!("{msg}");
56            }
57        }
58    }
59}
60
61/// 包一层 tokio Mutex,跨 task 共享。
62type SharedPrinter = Arc<Mutex<PushPrinter>>;
63
64pub async fn run(gateway: &str, output: OutputFormat) -> Result<()> {
65    // 1. 建立长连接
66    let (client, push_rx) = connect_gateway(gateway, "futucli-repl").await?;
67    eprintln!("✓ connected to {gateway}");
68
69    let state = Arc::new(ReplState {
70        gateway: gateway.to_string(),
71        output,
72        client,
73        subs: Mutex::new(BTreeMap::new()),
74    });
75
76    // 2. rustyline editor + 历史文件
77    let history_path = history_file_path();
78    let mut rl = DefaultEditor::new().context("init rustyline editor")?;
79    if let Some(p) = &history_path {
80        let _ = rl.load_history(p); // 首次运行无文件属正常
81    }
82
83    // 3. ExternalPrinter:供 push task 打印,不打断 prompt;
84    //    非 tty 环境(pipe)直接降级到 stderr。
85    let printer_inner = match rl.create_external_printer() {
86        Ok(p) => PushPrinter::External(Box::new(p)),
87        Err(e) => {
88            eprintln!("note: external printer unavailable ({e}); push output will go to stderr");
89            PushPrinter::Stderr
90        }
91    };
92    let printer: SharedPrinter = Arc::new(Mutex::new(printer_inner));
93
94    // 4. 启动 push 消费 task
95    tokio::spawn(push_loop(push_rx, printer.clone()));
96
97    // 5. 主循环:readline → 解析 → dispatch
98    print_banner(gateway);
99    loop {
100        // rustyline 的 readline 是阻塞的,放到 blocking 线程里
101        let prompt = format!("futu ({}) > ", state.gateway);
102        let read_result = tokio::task::spawn_blocking(move || {
103            let res = rl.readline(&prompt);
104            (rl, res)
105        })
106        .await
107        .context("readline task panicked")?;
108        rl = read_result.0;
109
110        let line = match read_result.1 {
111            Ok(l) => l,
112            Err(ReadlineError::Interrupted) => {
113                eprintln!("(Ctrl-C) — type 'exit' or Ctrl-D to quit");
114                continue;
115            }
116            Err(ReadlineError::Eof) => {
117                eprintln!("bye");
118                break;
119            }
120            Err(e) => {
121                eprintln!("readline error: {e}");
122                break;
123            }
124        };
125        let trimmed = line.trim();
126        if trimmed.is_empty() {
127            continue;
128        }
129        let _ = rl.add_history_entry(trimmed);
130
131        // 6. 派发:REPL 专属 > 子命令
132        match handle_line(trimmed, &state, &printer).await {
133            Ok(ShouldContinue::Continue) => {}
134            Ok(ShouldContinue::Exit) => break,
135            Err(e) => eprintln!("error: {e:#}"),
136        }
137    }
138
139    // 退出前保存历史
140    if let Some(p) = &history_path {
141        let _ = std::fs::create_dir_all(p.parent().unwrap_or_else(|| std::path::Path::new(".")));
142        let _ = rl.save_history(p);
143    }
144    Ok(())
145}
146
147enum ShouldContinue {
148    Continue,
149    Exit,
150}
151
152/// 路由一行输入。REPL 专属命令优先;否则走 clap 解析再转 `cli::dispatch`。
153async fn handle_line(
154    line: &str,
155    state: &Arc<ReplState>,
156    printer: &SharedPrinter,
157) -> Result<ShouldContinue> {
158    let tokens = shlex::split(line).ok_or_else(|| anyhow!("failed to tokenize input"))?;
159    if tokens.is_empty() {
160        return Ok(ShouldContinue::Continue);
161    }
162
163    match tokens[0].as_str() {
164        "exit" | "quit" | ":q" => return Ok(ShouldContinue::Exit),
165        "help" | "?" => {
166            print_help();
167            return Ok(ShouldContinue::Continue);
168        }
169        "reconnect" => {
170            reconnect(state, printer).await?;
171            return Ok(ShouldContinue::Continue);
172        }
173        "subs" => {
174            list_subs(state).await;
175            return Ok(ShouldContinue::Continue);
176        }
177        "sub" if tokens.len() >= 2 && looks_like_symbol(&tokens[1]) => {
178            // 覆盖顶层 `sub` 子命令,改为 REPL 内部后台订阅
179            return sub_inline(&tokens[1..], state, printer)
180                .await
181                .map(|_| ShouldContinue::Continue);
182        }
183        "unsub" => {
184            return unsub_inline(&tokens[1..], state)
185                .await
186                .map(|_| ShouldContinue::Continue);
187        }
188        _ => {}
189    }
190
191    // clap 解析(注入一个伪 argv[0])
192    let argv = std::iter::once("futucli".to_string())
193        .chain(tokens)
194        .collect::<Vec<_>>();
195    let parsed = match Cli::try_parse_from(&argv) {
196        Ok(p) => p,
197        Err(e) => {
198            // clap 的帮助 / 错误消息自己会带换行;直接打出
199            eprint!("{e}");
200            return Ok(ShouldContinue::Continue);
201        }
202    };
203
204    // REPL 里禁止再次进入 repl
205    if matches!(parsed.command, Command::Repl) {
206        bail!("already in REPL");
207    }
208
209    // 使用 REPL 的 gateway/output,忽略解析到的(避免 REPL 里切换网关造成长连接失效)
210    crate::cli::dispatch(&state.gateway, state.output, parsed.command).await?;
211    Ok(ShouldContinue::Continue)
212}
213
214// ========== REPL 专属命令实现 ==========
215
216fn print_banner(gateway: &str) {
217    eprintln!("futucli REPL — gateway {gateway}");
218    eprintln!("type 'help' for commands, 'exit' or Ctrl-D to quit");
219}
220
221fn print_help() {
222    let lines = [
223        "REPL 内置命令:",
224        "  help | ?              显示本帮助",
225        "  exit | quit | :q      退出",
226        "  reconnect             断开并重新连接网关",
227        "  subs                  列出当前活跃订阅",
228        "  sub <SYMBOL> [-t csv] 后台订阅(推送在 prompt 上方实时显示)",
229        "  unsub <SYMBOL> [csv]  取消订阅(csv 省略则全部类型)",
230        "",
231        "子命令(和外层 futucli 一致):",
232        "  ping / quote / snapshot / kline / orderbook / ticker / rt / static /",
233        "  broker / plate-list / plate-stocks / account / funds / position /",
234        "  order / deal / unlock-trade",
235        "",
236        "示例:",
237        "  quote HK.00700 US.AAPL",
238        "  kline HK.00700 -t day -n 5",
239        "  sub HK.00700 -t basic,orderbook",
240    ];
241    for l in lines {
242        eprintln!("{l}");
243    }
244}
245
246async fn reconnect(state: &Arc<ReplState>, printer: &SharedPrinter) -> Result<()> {
247    let (new_client, new_rx) = connect_gateway(&state.gateway, "futucli-repl").await?;
248
249    // 原子替换 client。FutuClient 存在 state 里的是 Arc,替换指向对象需要 Mutex;
250    // 简化:reconnect 不热替换 state.client(会需要 Arc<Mutex<Arc<FutuClient>>>),
251    // 而是让用户重启 REPL。如果有需要我们再做热替换。
252    let _ = (new_client, new_rx); // 暂保留变量
253    let _ = printer;
254    eprintln!("reconnect not fully wired yet — please exit and re-enter REPL for a clean state");
255    Ok(())
256}
257
258async fn list_subs(state: &Arc<ReplState>) {
259    let guard = state.subs.lock().await;
260    if guard.is_empty() {
261        println!("(no active subscriptions)");
262        return;
263    }
264    println!("Active subscriptions:");
265    for (sym, types) in guard.iter() {
266        let mut names: Vec<&'static str> = types.iter().copied().map(sub_type_label).collect();
267        names.sort_unstable();
268        println!("  {sym:<14} {}", names.join(","));
269    }
270}
271
272fn looks_like_symbol(tok: &str) -> bool {
273    tok.contains('.') && !tok.starts_with('-')
274}
275
276async fn sub_inline(
277    args: &[String],
278    state: &Arc<ReplState>,
279    printer: &SharedPrinter,
280) -> Result<()> {
281    // 用法: sub <SYMBOL...> [-t csv]
282    let (symbols, types_csv) = split_sub_args(args)?;
283    let secs: Vec<Security> = symbols
284        .iter()
285        .map(|s| parse_symbol(s))
286        .collect::<Result<_>>()?;
287    let sub_types: Vec<SubType> = types_csv
288        .split(',')
289        .map(parse_sub_type)
290        .collect::<Result<_>>()?;
291
292    futu_qot::sub::subscribe(&state.client, &secs, &sub_types, true, true).await?;
293    {
294        let mut guard = state.subs.lock().await;
295        for s in &symbols {
296            let entry = guard.entry(s.clone()).or_default();
297            for t in &sub_types {
298                entry.insert(*t);
299            }
300        }
301    }
302    let _ = printer; // push 已在全局 task 里消费
303    eprintln!("✓ subscribed symbols={symbols:?} types={types_csv}");
304    Ok(())
305}
306
307async fn unsub_inline(args: &[String], state: &Arc<ReplState>) -> Result<()> {
308    if args.is_empty() {
309        bail!("usage: unsub <SYMBOL> [csv]");
310    }
311    let symbol = args[0].clone();
312    let types_csv = args.get(1).cloned();
313
314    let sec = parse_symbol(&symbol)?;
315    let sub_types: Vec<SubType> = match types_csv {
316        Some(csv) if !csv.is_empty() => {
317            csv.split(',').map(parse_sub_type).collect::<Result<_>>()?
318        }
319        _ => {
320            let guard = state.subs.lock().await;
321            guard
322                .get(&symbol)
323                .map(|s| s.iter().copied().collect())
324                .unwrap_or_default()
325        }
326    };
327    if sub_types.is_empty() {
328        bail!("no active subscription for {symbol}");
329    }
330
331    futu_qot::sub::unsubscribe(&state.client, std::slice::from_ref(&sec), &sub_types).await?;
332    {
333        let mut guard = state.subs.lock().await;
334        if let Some(set) = guard.get_mut(&symbol) {
335            for t in &sub_types {
336                set.remove(t);
337            }
338            if set.is_empty() {
339                guard.remove(&symbol);
340            }
341        }
342    }
343    eprintln!(
344        "✓ unsubscribed {symbol} types=[{}]",
345        sub_types
346            .iter()
347            .map(|t| sub_type_label(*t))
348            .collect::<Vec<_>>()
349            .join(",")
350    );
351    Ok(())
352}
353
354/// 拆 `sub HK.00700 US.AAPL -t basic,rt` → (["HK.00700","US.AAPL"], "basic,rt")
355fn split_sub_args(args: &[String]) -> Result<(Vec<String>, String)> {
356    let mut symbols = Vec::new();
357    let mut types = "basic".to_string();
358    let mut i = 0;
359    while i < args.len() {
360        let a = &args[i];
361        if a == "-t" || a == "--type" {
362            i += 1;
363            types = args
364                .get(i)
365                .ok_or_else(|| anyhow!("-t needs a value"))?
366                .clone();
367        } else if let Some(rest) = a.strip_prefix("--type=") {
368            types = rest.to_string();
369        } else if looks_like_symbol(a) {
370            symbols.push(a.clone());
371        } else {
372            bail!("unexpected token {a:?}");
373        }
374        i += 1;
375    }
376    if symbols.is_empty() {
377        bail!("usage: sub <SYMBOL...> [-t csv]");
378    }
379    Ok((symbols, types))
380}
381
382// ========== push 异步打印 ==========
383
384struct PrintHandler {
385    printer: SharedPrinter,
386}
387
388impl PrintHandler {
389    async fn emit(&self, s: String) {
390        let mut g = self.printer.lock().await;
391        g.print(format!("{s}\n"));
392    }
393}
394
395#[async_trait]
396impl QuoteHandler for PrintHandler {
397    async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
398        for q in qot_list {
399            let sym = format_symbol(&q.security);
400            let change = q.cur_price - q.last_close_price;
401            let pct = if q.last_close_price != 0.0 {
402                change / q.last_close_price * 100.0
403            } else {
404                0.0
405            };
406            let sign = if change >= 0.0 { "+" } else { "" };
407            self.emit(format!(
408                "[{}] basic {sym:<12} px={:.3} {sign}{change:.3} ({sign}{pct:.2}%) vol={}",
409                q.update_time, q.cur_price, q.volume
410            ))
411            .await;
412        }
413    }
414
415    async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
416        let sym = format_symbol(&security);
417        for k in kl_list {
418            self.emit(format!(
419                "[{}] kl    {sym:<12} O={:.3} H={:.3} L={:.3} C={:.3} V={}",
420                k.time, k.open_price, k.high_price, k.low_price, k.close_price, k.volume
421            ))
422            .await;
423        }
424    }
425
426    async fn on_order_book_update(&self, data: OrderBookData) {
427        let sym = format_symbol(&data.security);
428        let top_bid = data.bid_list.first();
429        let top_ask = data.ask_list.first();
430        let msg = match (top_bid, top_ask) {
431            (Some(b), Some(a)) => format!(
432                "              ob    {sym:<12} bid={:.3}x{} ask={:.3}x{} spr={:.3}",
433                b.price,
434                b.volume,
435                a.price,
436                a.volume,
437                a.price - b.price
438            ),
439            _ => format!("              ob    {sym:<12} (empty)"),
440        };
441        self.emit(msg).await;
442    }
443
444    async fn on_ticker_update(
445        &self,
446        security: Security,
447        ticker_list: Vec<futu_qot::ticker::Ticker>,
448    ) {
449        let sym = format_symbol(&security);
450        for t in ticker_list {
451            self.emit(format!(
452                "              tick  {sym:<12} px={:.3} vol={} dir={}",
453                t.price, t.volume, t.dir
454            ))
455            .await;
456        }
457    }
458
459    async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
460        let sym = format_symbol(&security);
461        for r in rt_list {
462            self.emit(format!(
463                "[{}] rt    {sym:<12} px={:.3} avg={:.3} vol={}",
464                r.time, r.price, r.avg_price, r.volume
465            ))
466            .await;
467        }
468    }
469}
470
471async fn push_loop(mut rx: mpsc::UnboundedReceiver<PushMessage>, printer: SharedPrinter) {
472    let handler = PrintHandler { printer };
473    while let Some(msg) = rx.recv().await {
474        if let Err(e) = QuotePushDispatcher::dispatch(&handler, msg.proto_id, &msg.body).await {
475            let mut g = handler.printer.lock().await;
476            g.print(format!("push dispatch error: {e}\n"));
477        }
478    }
479}
480
481// ========== util ==========
482
483fn sub_type_label(t: SubType) -> &'static str {
484    match t {
485        SubType::Basic => "basic",
486        SubType::OrderBook => "orderbook",
487        SubType::Ticker => "ticker",
488        SubType::RT => "rt",
489        SubType::KLDay => "kl_day",
490        SubType::KL1Min => "kl_1min",
491        SubType::KL3Min => "kl_3min",
492        SubType::KL5Min => "kl_5min",
493        SubType::KL15Min => "kl_15min",
494        SubType::KL30Min => "kl_30min",
495        SubType::KL60Min => "kl_60min",
496        SubType::KLWeek => "kl_week",
497        SubType::KLMonth => "kl_month",
498        SubType::KLQuarter => "kl_quarter",
499        SubType::KLYear => "kl_year",
500        SubType::Broker => "broker",
501        SubType::OrderDetail => "order_detail",
502        _ => "other",
503    }
504}
505
506fn history_file_path() -> Option<PathBuf> {
507    let base = dirs::cache_dir()?;
508    Some(base.join("futucli").join("history"))
509}