1use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use futu_net::client::{ClientConfig, FutuClient, PushMessage, ReconnectingClient};
8use futu_net::reconnect::ReconnectPolicy;
9use futu_qot::types::{QotMarket, Security, SubType};
10use tokio::sync::mpsc;
11
12const CLI_CONNECT_TOTAL_TIMEOUT: Duration = Duration::from_secs(3);
17const CLI_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(200);
18
19pub fn parse_symbol(s: &str) -> Result<Security> {
29 let (market_str, code) = s.split_once('.').ok_or_else(|| {
30 anyhow!("invalid symbol {s:?}: expected MARKET.CODE (e.g. HK.00700, US.AAPL, SH.600519)")
31 })?;
32 if code.is_empty() {
33 bail!("invalid symbol {s:?}: code part is empty");
34 }
35 let market = match market_str.to_ascii_uppercase().as_str() {
36 "HK" => QotMarket::HkSecurity,
37 "HK_FUTURE" => QotMarket::HkFuture,
38 "US" => QotMarket::UsSecurity,
39 "SH" => QotMarket::CnshSecurity,
40 "SZ" => QotMarket::CnszSecurity,
41 "SG" => QotMarket::SgSecurity,
42 "JP" => QotMarket::JpSecurity,
43 "AU" => QotMarket::AuSecurity,
44 "MY" => QotMarket::MySecurity,
45 "CA" => QotMarket::CaSecurity,
46 "FX" => QotMarket::FxSecurity,
47 "CRYPTO" | "CC" => QotMarket::Crypto,
48 other => bail!(
49 "invalid symbol {s:?}: unknown market {other:?} (supported: HK, HK_FUTURE, US, SH, SZ, SG, JP, AU, MY, CA, FX, CRYPTO, CC)"
50 ),
51 };
52 Ok(Security::new(market, code))
53}
54
55pub fn format_symbol(sec: &Security) -> String {
57 let m = match sec.market {
58 QotMarket::HkSecurity => "HK",
59 QotMarket::HkFuture => "HK_FUTURE",
60 QotMarket::UsSecurity => "US",
61 QotMarket::CnshSecurity => "SH",
62 QotMarket::CnszSecurity => "SZ",
63 QotMarket::SgSecurity => "SG",
64 QotMarket::JpSecurity => "JP",
65 QotMarket::AuSecurity => "AU",
66 QotMarket::MySecurity => "MY",
67 QotMarket::CaSecurity => "CA",
68 QotMarket::FxSecurity => "FX",
69 QotMarket::Crypto => "CRYPTO",
70 QotMarket::Unknown => "UNKNOWN",
71 _ => "UNKNOWN",
72 };
73 format!("{m}.{}", sec.code)
74}
75
76pub fn parse_sub_type(s: &str) -> Result<SubType> {
78 let t = match s.trim().to_ascii_lowercase().as_str() {
79 "basic" => SubType::Basic,
80 "orderbook" | "order_book" => SubType::OrderBook,
81 "ticker" => SubType::Ticker,
82 "rt" => SubType::RT,
83 "kl_day" | "day" => SubType::KLDay,
84 "kl_1min" | "1min" => SubType::KL1Min,
85 "kl_3min" | "3min" => SubType::KL3Min,
86 "kl_5min" | "5min" => SubType::KL5Min,
87 "kl_15min" | "15min" => SubType::KL15Min,
88 "kl_30min" | "30min" => SubType::KL30Min,
89 "kl_60min" | "60min" => SubType::KL60Min,
90 "kl_week" | "week" => SubType::KLWeek,
91 "kl_month" | "month" => SubType::KLMonth,
92 "kl_quarter" | "quarter" => SubType::KLQuarter,
93 "kl_year" | "year" => SubType::KLYear,
94 "broker" => SubType::Broker,
95 "order_detail" | "orderdetail" => SubType::OrderDetail,
96 other => bail!(
97 "unknown sub type {other:?} (supported: basic, orderbook, ticker, rt, kl_day, kl_1min, ...)"
98 ),
99 };
100 Ok(t)
101}
102
103pub fn parse_sub_types(csv: &str) -> Result<Vec<SubType>> {
105 csv.split(',')
106 .map(parse_sub_type)
107 .collect::<Result<Vec<_>>>()
108}
109
110pub fn parse_symbol_csv(s: &str) -> Result<Vec<String>> {
123 let trimmed = s.trim();
124 if trimmed.is_empty() {
125 bail!("CSV symbol 列表为空: 必须传入非空 \"MARKET.CODE\" 列表");
126 }
127 let mut out: Vec<String> = Vec::new();
128 for (i, token) in trimmed.split(',').enumerate() {
129 let t = token.trim();
130 if t.is_empty() {
131 bail!("CSV symbol[{i}] 为空 token (输入 \"{s}\"): 整体 reject, 不 silent skip 空项");
132 }
133 out.push(t.to_string());
134 }
135 Ok(out)
136}
137
138pub async fn connect_gateway(
142 addr: &str,
143 client_id: &str,
144) -> Result<(Arc<FutuClient>, mpsc::UnboundedReceiver<PushMessage>)> {
145 let config = ClientConfig {
146 addr: addr.to_string(),
147 client_ver: env!("CARGO_PKG_VERSION").to_string(),
148 client_id: client_id.to_string(),
149 recv_notify: false,
150 rsa_key: None,
151 };
152 let policy = ReconnectPolicy::new(CLI_CONNECT_RETRY_DELAY, CLI_CONNECT_RETRY_DELAY, Some(1));
153 let mut reconnector = ReconnectingClient::new(config).with_policy(policy);
154 let connect_result =
155 tokio::time::timeout(CLI_CONNECT_TOTAL_TIMEOUT, reconnector.connect()).await;
156 let (client, push_rx, _info) = match connect_result {
157 Ok(result) => result.with_context(|| format!("connect to futu gateway at {addr}"))?,
158 Err(_) => bail!(
159 "connect to futu gateway at {addr} timed out after {}s",
160 CLI_CONNECT_TOTAL_TIMEOUT.as_secs()
161 ),
162 };
163 Ok((Arc::new(client), push_rx))
164}
165
166#[cfg(test)]
167mod tests;