futu_backend/
suspend_data.rs1use std::collections::HashMap;
15use std::sync::Arc;
16
17pub type SuspendCache = Arc<dashmap::DashMap<u64, Vec<u64>>>;
19
20const SUSPEND_SOURCES: [(&str, &str); 3] = [
22 (
23 "http://openquote-1251001049.cosgz.myqcloud.com/hk_stock_suspend_record.zip",
24 "hk_stock_suspend_record",
25 ),
26 (
27 "http://openquote-1251001049.cosgz.myqcloud.com/us_stock_suspend_record.zip",
28 "us_stock_suspend_record",
29 ),
30 (
31 "http://openquote-1251001049.cosgz.myqcloud.com/cn_stock_suspend_record.zip",
32 "cn_stock_suspend_record",
33 ),
34];
35
36pub async fn load_suspend_data() -> SuspendCache {
38 let cache: SuspendCache = Arc::new(dashmap::DashMap::new());
39
40 for (url, name) in &SUSPEND_SOURCES {
41 match download_and_parse(url, name).await {
42 Ok(data) => {
43 let count = data.len();
44 for (stock_id, timestamps) in data {
45 cache.insert(stock_id, timestamps);
46 }
47 tracing::info!(market = *name, stocks = count, "loaded suspend data");
48 }
49 Err(e) => {
50 tracing::debug!(market = *name, error = %e, "failed to load suspend data");
54 }
55 }
56 }
57
58 cache
59}
60
61async fn download_and_parse(
70 url: &str,
71 name: &str,
72) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
73 tracing::debug!(url, "downloading suspend data");
74 let resp = reqwest::get(url).await?;
75 let status = resp.status();
76 let content_type = resp
77 .headers()
78 .get(reqwest::header::CONTENT_TYPE)
79 .and_then(|v| v.to_str().ok())
80 .unwrap_or("<none>")
81 .to_string();
82
83 if !status.is_success() {
84 return Err(format!("HTTP {status} from {url} (content-type={content_type})").into());
85 }
86
87 let zip_bytes = resp.bytes().await?;
88 tracing::debug!(url, bytes = zip_bytes.len(), "downloaded suspend response");
89
90 let is_zip = zip_bytes.len() >= 4
92 && zip_bytes[0] == b'P'
93 && zip_bytes[1] == b'K'
94 && (zip_bytes[2] == 0x03 || zip_bytes[2] == 0x05 || zip_bytes[2] == 0x07);
95
96 if !is_zip {
97 let preview: String = zip_bytes
98 .iter()
99 .take(120)
100 .map(|&b| {
101 if b.is_ascii_graphic() || b == b' ' {
102 b as char
103 } else {
104 '·'
105 }
106 })
107 .collect();
108 tracing::debug!(
109 url,
110 content_type,
111 bytes = zip_bytes.len(),
112 preview,
113 "suspend data response is not a ZIP (magic number mismatch); CDN may have returned HTML error page"
114 );
115 return Err(
116 format!("not a ZIP archive (magic mismatch, content-type={content_type})").into(),
117 );
118 }
119
120 let cursor = std::io::Cursor::new(zip_bytes);
123 let mut archive = zip::ZipArchive::new(cursor)?;
124 let mut file = archive.by_name(name)?;
125 let mut dat_bytes = Vec::new();
126 std::io::Read::read_to_end(&mut file, &mut dat_bytes)?;
127
128 parse_suspend_dat(&dat_bytes)
129}
130
131fn parse_suspend_dat(
135 data: &[u8],
136) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
137 use prost::Message;
138
139 if data.len() < 6 {
140 return Err("suspend dat too short".into());
141 }
142
143 let version = u16::from_be_bytes([data[0], data[1]]);
144 let group_count = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
145
146 if version == 0 || group_count == 0 {
148 return Err(
149 format!("invalid suspend dat: version={version}, group_count={group_count}").into(),
150 );
151 }
152
153 let mut result = HashMap::new();
154 let mut offset = 6_usize;
155 let mut read_groups = 0_u32;
156
157 while read_groups < group_count {
158 if offset + 4 > data.len() {
159 tracing::warn!(
160 offset,
161 remaining = data.len() - offset,
162 "suspend dat truncated at group_len"
163 );
164 break;
165 }
166
167 let group_len = u32::from_be_bytes([
168 data[offset],
169 data[offset + 1],
170 data[offset + 2],
171 data[offset + 3],
172 ]) as usize;
173 offset += 4;
174
175 if group_len == 0 {
177 tracing::warn!(offset, "suspend dat: zero group_len");
178 break;
179 }
180
181 if offset + group_len > data.len() {
182 tracing::warn!(
183 offset,
184 group_len,
185 data_len = data.len(),
186 "suspend dat truncated at group data"
187 );
188 break;
189 }
190
191 let group: super::proto_internal::stock_suspend::StockSuspendRecordGroup =
192 match Message::decode(&data[offset..offset + group_len]) {
193 Ok(g) => g,
194 Err(e) => {
195 tracing::warn!(error = %e, "suspend dat: protobuf decode failed");
196 break;
197 }
198 };
199
200 for record in &group.stock_suspend_record_list {
201 if let Some(stock_id) = record.stock_id {
202 let mut timestamps: Vec<u64> = record
203 .stock_sus_seq_list
204 .iter()
205 .filter_map(|seq| seq.time)
206 .collect();
207 if !timestamps.is_empty() {
208 timestamps.sort_unstable();
210 timestamps.dedup();
211 result.insert(stock_id, timestamps);
212 }
213 }
214 }
215
216 offset += group_len;
217 read_groups += 1;
218 }
219
220 Ok(result)
221}
222
223#[cfg(test)]
224mod tests;