1use std::collections::{BTreeMap, HashSet};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use anyhow::{Context, Result, anyhow, bail};
19use async_trait::async_trait;
20use clap::Parser;
21use futu_net::client::{FutuClient, PushMessage};
22use futu_qot::push::{QuoteHandler, QuotePushDispatcher};
23use futu_qot::types::{BasicQot, KLine, OrderBookData, Security, SubType};
24use rustyline::error::ReadlineError;
25use rustyline::{DefaultEditor, ExternalPrinter};
26use tokio::sync::{Mutex, mpsc};
27
28use crate::cli::{Cli, Command};
29use crate::common::{connect_gateway, format_symbol, parse_sub_type, parse_symbol};
30use crate::output::OutputFormat;
31
32struct ReplState {
35 gateway: String,
36 output: OutputFormat,
37 client: Arc<FutuClient>,
38 subs: Mutex<BTreeMap<String, HashSet<SubType>>>,
39}
40
41enum PushPrinter {
44 External(Box<dyn ExternalPrinter + Send>),
45 Stderr,
46}
47
48impl PushPrinter {
49 fn print(&mut self, msg: String) {
50 match self {
51 PushPrinter::External(p) => {
52 let _ = p.print(msg);
53 }
54 PushPrinter::Stderr => {
55 eprint!("{msg}");
56 }
57 }
58 }
59}
60
61type SharedPrinter = Arc<Mutex<PushPrinter>>;
63
64pub async fn run(gateway: &str, output: OutputFormat) -> Result<()> {
65 let (client, push_rx) = connect_gateway(gateway, "futucli-repl").await?;
67 eprintln!("✓ connected to {gateway}");
68
69 let state = Arc::new(ReplState {
70 gateway: gateway.to_string(),
71 output,
72 client,
73 subs: Mutex::new(BTreeMap::new()),
74 });
75
76 let history_path = history_file_path();
78 let mut rl = DefaultEditor::new().context("init rustyline editor")?;
79 if let Some(p) = &history_path {
80 let _ = rl.load_history(p); }
82
83 let printer_inner = match rl.create_external_printer() {
86 Ok(p) => PushPrinter::External(Box::new(p)),
87 Err(e) => {
88 eprintln!("note: external printer unavailable ({e}); push output will go to stderr");
89 PushPrinter::Stderr
90 }
91 };
92 let printer: SharedPrinter = Arc::new(Mutex::new(printer_inner));
93
94 tokio::spawn(push_loop(push_rx, printer.clone()));
96
97 print_banner(gateway);
99 loop {
100 let prompt = format!("futu ({}) > ", state.gateway);
102 let read_result = tokio::task::spawn_blocking(move || {
103 let res = rl.readline(&prompt);
104 (rl, res)
105 })
106 .await
107 .context("readline task panicked")?;
108 rl = read_result.0;
109
110 let line = match read_result.1 {
111 Ok(l) => l,
112 Err(ReadlineError::Interrupted) => {
113 eprintln!("(Ctrl-C) — type 'exit' or Ctrl-D to quit");
114 continue;
115 }
116 Err(ReadlineError::Eof) => {
117 eprintln!("bye");
118 break;
119 }
120 Err(e) => {
121 eprintln!("readline error: {e}");
122 break;
123 }
124 };
125 let trimmed = line.trim();
126 if trimmed.is_empty() {
127 continue;
128 }
129 let _ = rl.add_history_entry(trimmed);
130
131 match handle_line(trimmed, &state, &printer).await {
133 Ok(ShouldContinue::Continue) => {}
134 Ok(ShouldContinue::Exit) => break,
135 Err(e) => eprintln!("error: {e:#}"),
136 }
137 }
138
139 if let Some(p) = &history_path {
141 let _ = std::fs::create_dir_all(p.parent().unwrap_or_else(|| std::path::Path::new(".")));
142 let _ = rl.save_history(p);
143 }
144 Ok(())
145}
146
147enum ShouldContinue {
148 Continue,
149 Exit,
150}
151
152async fn handle_line(
154 line: &str,
155 state: &Arc<ReplState>,
156 printer: &SharedPrinter,
157) -> Result<ShouldContinue> {
158 let tokens = shlex::split(line).ok_or_else(|| anyhow!("failed to tokenize input"))?;
159 if tokens.is_empty() {
160 return Ok(ShouldContinue::Continue);
161 }
162
163 match tokens[0].as_str() {
164 "exit" | "quit" | ":q" => return Ok(ShouldContinue::Exit),
165 "help" | "?" => {
166 print_help();
167 return Ok(ShouldContinue::Continue);
168 }
169 "reconnect" => {
170 reconnect(state, printer).await?;
171 return Ok(ShouldContinue::Continue);
172 }
173 "subs" => {
174 list_subs(state).await;
175 return Ok(ShouldContinue::Continue);
176 }
177 "sub" if tokens.len() >= 2 && looks_like_symbol(&tokens[1]) => {
178 return sub_inline(&tokens[1..], state, printer)
180 .await
181 .map(|_| ShouldContinue::Continue);
182 }
183 "unsub" => {
184 return unsub_inline(&tokens[1..], state)
185 .await
186 .map(|_| ShouldContinue::Continue);
187 }
188 _ => {}
189 }
190
191 let argv = std::iter::once("futucli".to_string())
193 .chain(tokens)
194 .collect::<Vec<_>>();
195 let parsed = match Cli::try_parse_from(&argv) {
196 Ok(p) => p,
197 Err(e) => {
198 eprint!("{e}");
200 return Ok(ShouldContinue::Continue);
201 }
202 };
203
204 if matches!(parsed.command, Command::Repl) {
206 bail!("already in REPL");
207 }
208
209 crate::cli::dispatch(&state.gateway, state.output, parsed.command).await?;
211 Ok(ShouldContinue::Continue)
212}
213
214fn print_banner(gateway: &str) {
217 eprintln!("futucli REPL — gateway {gateway}");
218 eprintln!("type 'help' for commands, 'exit' or Ctrl-D to quit");
219}
220
221fn print_help() {
222 let lines = [
223 "REPL 内置命令:",
224 " help | ? 显示本帮助",
225 " exit | quit | :q 退出",
226 " reconnect 断开并重新连接网关",
227 " subs 列出当前活跃订阅",
228 " sub <SYMBOL> [-t csv] 后台订阅(推送在 prompt 上方实时显示)",
229 " unsub <SYMBOL> [csv] 取消订阅(csv 省略则全部类型)",
230 "",
231 "子命令(和外层 futucli 一致):",
232 " ping / quote / snapshot / kline / orderbook / ticker / rt / static /",
233 " broker / plate-list / plate-stocks / account / funds / position /",
234 " order / deal / unlock-trade",
235 "",
236 "示例:",
237 " quote HK.00700 US.AAPL",
238 " kline HK.00700 -t day -n 5",
239 " sub HK.00700 -t basic,orderbook",
240 ];
241 for l in lines {
242 eprintln!("{l}");
243 }
244}
245
246async fn reconnect(state: &Arc<ReplState>, printer: &SharedPrinter) -> Result<()> {
247 let (new_client, new_rx) = connect_gateway(&state.gateway, "futucli-repl").await?;
248
249 let _ = (new_client, new_rx); let _ = printer;
254 eprintln!("reconnect not fully wired yet — please exit and re-enter REPL for a clean state");
255 Ok(())
256}
257
258async fn list_subs(state: &Arc<ReplState>) {
259 let guard = state.subs.lock().await;
260 if guard.is_empty() {
261 println!("(no active subscriptions)");
262 return;
263 }
264 println!("Active subscriptions:");
265 for (sym, types) in guard.iter() {
266 let mut names: Vec<&'static str> = types.iter().copied().map(sub_type_label).collect();
267 names.sort_unstable();
268 println!(" {sym:<14} {}", names.join(","));
269 }
270}
271
272fn looks_like_symbol(tok: &str) -> bool {
273 tok.contains('.') && !tok.starts_with('-')
274}
275
276async fn sub_inline(
277 args: &[String],
278 state: &Arc<ReplState>,
279 printer: &SharedPrinter,
280) -> Result<()> {
281 let (symbols, types_csv) = split_sub_args(args)?;
283 let secs: Vec<Security> = symbols
284 .iter()
285 .map(|s| parse_symbol(s))
286 .collect::<Result<_>>()?;
287 let sub_types: Vec<SubType> = types_csv
288 .split(',')
289 .map(parse_sub_type)
290 .collect::<Result<_>>()?;
291
292 futu_qot::sub::subscribe(&state.client, &secs, &sub_types, true, true).await?;
293 {
294 let mut guard = state.subs.lock().await;
295 for s in &symbols {
296 let entry = guard.entry(s.clone()).or_default();
297 for t in &sub_types {
298 entry.insert(*t);
299 }
300 }
301 }
302 let _ = printer; eprintln!("✓ subscribed symbols={symbols:?} types={types_csv}");
304 Ok(())
305}
306
307async fn unsub_inline(args: &[String], state: &Arc<ReplState>) -> Result<()> {
308 if args.is_empty() {
309 bail!("usage: unsub <SYMBOL> [csv]");
310 }
311 let symbol = args[0].clone();
312 let types_csv = args.get(1).cloned();
313
314 let sec = parse_symbol(&symbol)?;
315 let sub_types: Vec<SubType> = match types_csv {
316 Some(csv) if !csv.is_empty() => {
317 csv.split(',').map(parse_sub_type).collect::<Result<_>>()?
318 }
319 _ => {
320 let guard = state.subs.lock().await;
321 guard
322 .get(&symbol)
323 .map(|s| s.iter().copied().collect())
324 .unwrap_or_default()
325 }
326 };
327 if sub_types.is_empty() {
328 bail!("no active subscription for {symbol}");
329 }
330
331 futu_qot::sub::unsubscribe(&state.client, std::slice::from_ref(&sec), &sub_types).await?;
332 {
333 let mut guard = state.subs.lock().await;
334 if let Some(set) = guard.get_mut(&symbol) {
335 for t in &sub_types {
336 set.remove(t);
337 }
338 if set.is_empty() {
339 guard.remove(&symbol);
340 }
341 }
342 }
343 eprintln!(
344 "✓ unsubscribed {symbol} types=[{}]",
345 sub_types
346 .iter()
347 .map(|t| sub_type_label(*t))
348 .collect::<Vec<_>>()
349 .join(",")
350 );
351 Ok(())
352}
353
354fn split_sub_args(args: &[String]) -> Result<(Vec<String>, String)> {
356 let mut symbols = Vec::new();
357 let mut types = "basic".to_string();
358 let mut i = 0;
359 while i < args.len() {
360 let a = &args[i];
361 if a == "-t" || a == "--type" {
362 i += 1;
363 types = args
364 .get(i)
365 .ok_or_else(|| anyhow!("-t needs a value"))?
366 .clone();
367 } else if let Some(rest) = a.strip_prefix("--type=") {
368 types = rest.to_string();
369 } else if looks_like_symbol(a) {
370 symbols.push(a.clone());
371 } else {
372 bail!("unexpected token {a:?}");
373 }
374 i += 1;
375 }
376 if symbols.is_empty() {
377 bail!("usage: sub <SYMBOL...> [-t csv]");
378 }
379 Ok((symbols, types))
380}
381
382struct PrintHandler {
385 printer: SharedPrinter,
386}
387
388impl PrintHandler {
389 async fn emit(&self, s: String) {
390 let mut g = self.printer.lock().await;
391 g.print(format!("{s}\n"));
392 }
393}
394
395#[async_trait]
396impl QuoteHandler for PrintHandler {
397 async fn on_basic_qot_update(&self, qot_list: Vec<BasicQot>) {
398 for q in qot_list {
399 let sym = format_symbol(&q.security);
400 let change = q.cur_price - q.last_close_price;
401 let pct = if q.last_close_price != 0.0 {
402 change / q.last_close_price * 100.0
403 } else {
404 0.0
405 };
406 let sign = if change >= 0.0 { "+" } else { "" };
407 self.emit(format!(
408 "[{}] basic {sym:<12} px={:.3} {sign}{change:.3} ({sign}{pct:.2}%) vol={}",
409 q.update_time, q.cur_price, q.volume
410 ))
411 .await;
412 }
413 }
414
415 async fn on_kl_update(&self, security: Security, kl_list: Vec<KLine>) {
416 let sym = format_symbol(&security);
417 for k in kl_list {
418 self.emit(format!(
419 "[{}] kl {sym:<12} O={:.3} H={:.3} L={:.3} C={:.3} V={}",
420 k.time, k.open_price, k.high_price, k.low_price, k.close_price, k.volume
421 ))
422 .await;
423 }
424 }
425
426 async fn on_order_book_update(&self, data: OrderBookData) {
427 let sym = format_symbol(&data.security);
428 let top_bid = data.bid_list.first();
429 let top_ask = data.ask_list.first();
430 let msg = match (top_bid, top_ask) {
431 (Some(b), Some(a)) => format!(
432 " ob {sym:<12} bid={:.3}x{} ask={:.3}x{} spr={:.3}",
433 b.price,
434 b.volume,
435 a.price,
436 a.volume,
437 a.price - b.price
438 ),
439 _ => format!(" ob {sym:<12} (empty)"),
440 };
441 self.emit(msg).await;
442 }
443
444 async fn on_ticker_update(
445 &self,
446 security: Security,
447 ticker_list: Vec<futu_qot::ticker::Ticker>,
448 ) {
449 let sym = format_symbol(&security);
450 for t in ticker_list {
451 self.emit(format!(
452 " tick {sym:<12} px={:.3} vol={} dir={}",
453 t.price, t.volume, t.dir
454 ))
455 .await;
456 }
457 }
458
459 async fn on_rt_update(&self, security: Security, rt_list: Vec<futu_qot::rt::TimeShare>) {
460 let sym = format_symbol(&security);
461 for r in rt_list {
462 self.emit(format!(
463 "[{}] rt {sym:<12} px={:.3} avg={:.3} vol={}",
464 r.time, r.price, r.avg_price, r.volume
465 ))
466 .await;
467 }
468 }
469}
470
471async fn push_loop(mut rx: mpsc::UnboundedReceiver<PushMessage>, printer: SharedPrinter) {
472 let handler = PrintHandler { printer };
473 while let Some(msg) = rx.recv().await {
474 if let Err(e) = QuotePushDispatcher::dispatch(&handler, msg.proto_id, &msg.body).await {
475 let mut g = handler.printer.lock().await;
476 g.print(format!("push dispatch error: {e}\n"));
477 }
478 }
479}
480
481fn sub_type_label(t: SubType) -> &'static str {
484 match t {
485 SubType::Basic => "basic",
486 SubType::OrderBook => "orderbook",
487 SubType::Ticker => "ticker",
488 SubType::RT => "rt",
489 SubType::KLDay => "kl_day",
490 SubType::KL1Min => "kl_1min",
491 SubType::KL3Min => "kl_3min",
492 SubType::KL5Min => "kl_5min",
493 SubType::KL15Min => "kl_15min",
494 SubType::KL30Min => "kl_30min",
495 SubType::KL60Min => "kl_60min",
496 SubType::KLWeek => "kl_week",
497 SubType::KLMonth => "kl_month",
498 SubType::KLQuarter => "kl_quarter",
499 SubType::KLYear => "kl_year",
500 SubType::Broker => "broker",
501 SubType::OrderDetail => "order_detail",
502 _ => "other",
503 }
504}
505
506fn history_file_path() -> Option<PathBuf> {
507 let base = dirs::cache_dir()?;
508 Some(base.join("futucli").join("history"))
509}