Skip to main content

futu_backend/
suspend_data.rs

1//! 停牌数据 HTTP 下载 + 解析
2//!
3//! C++ 对应: NNBiz_Qot_Suspend / NNData_Qot_Suspend
4//!
5//! 从腾讯云 CDN 下载 zip 文件,解压后得到 .dat 二进制文件,格式:
6//! - 2 字节 version (big-endian u16)
7//! - 4 字节 group_count (big-endian u32)
8//! - group_count 个组,每个:
9//!   - 4 字节 group_len (big-endian u32)
10//!   - group_len 字节 protobuf (StockSuspendRecordGroup)
11//!
12//! 解析后缓存在 DashMap<stock_id, Vec<timestamp>> 中,供 GetSuspend handler 查询。
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17/// 停牌数据缓存:stock_id → Vec<timestamp>(已排序的停牌日期时间戳列表)
18pub type SuspendCache = Arc<dashmap::DashMap<u64, Vec<u64>>>;
19
20/// CDN URLs 和对应的 zip 内文件名
21const SUSPEND_SOURCES: [(&str, &str); 3] = [
22    (
23        "http://openquote-1251001049.cosgz.myqcloud.com/hk_stock_suspend_record.zip",
24        "hk_stock_suspend_record",
25    ),
26    (
27        "http://openquote-1251001049.cosgz.myqcloud.com/us_stock_suspend_record.zip",
28        "us_stock_suspend_record",
29    ),
30    (
31        "http://openquote-1251001049.cosgz.myqcloud.com/cn_stock_suspend_record.zip",
32        "cn_stock_suspend_record",
33    ),
34];
35
36/// 下载并解析所有市场的停牌数据
37pub async fn load_suspend_data() -> SuspendCache {
38    let cache: SuspendCache = Arc::new(dashmap::DashMap::new());
39
40    for (url, name) in &SUSPEND_SOURCES {
41        match download_and_parse(url, name).await {
42            Ok(data) => {
43                let count = data.len();
44                for (stock_id, timestamps) in data {
45                    cache.insert(stock_id, timestamps);
46                }
47                tracing::info!(market = *name, stocks = count, "loaded suspend data");
48            }
49            Err(e) => {
50                // v1.4.27:从 WARN 降 DEBUG。CDN 偶尔返 404 / HTML 错误页对终端
51                // 用户无 actionable 价值,也不影响交易 / 行情主功能;有兴趣的
52                // 运维可以用 `--log-level debug` 看到完整错误。
53                tracing::debug!(market = *name, error = %e, "failed to load suspend data");
54            }
55        }
56    }
57
58    cache
59}
60
61/// 下载单个市场的 zip 文件并解析
62///
63/// v1.4.27 修(BUG-3,加拿大同事 v1.4.26 回归测试发现):
64/// - HTTP 非 200 → 直接降级,不尝试解 ZIP
65/// - 内容不是 ZIP(magic number 不是 `PK\x03\x04`)→ 降级 + 用 DEBUG 打
66///   出前 N 字节 + Content-Type(帮助判断是否拿到了 HTML 错误页)
67/// - 只有真正是 ZIP 才进 `ZipArchive` 解析,避免 "Could not find EOCD" 这种
68///   对终端用户无 actionable 价值的 WARN
69async fn download_and_parse(
70    url: &str,
71    name: &str,
72) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
73    tracing::debug!(url, "downloading suspend data");
74    let resp = reqwest::get(url).await?;
75    let status = resp.status();
76    let content_type = resp
77        .headers()
78        .get(reqwest::header::CONTENT_TYPE)
79        .and_then(|v| v.to_str().ok())
80        .unwrap_or("<none>")
81        .to_string();
82
83    if !status.is_success() {
84        return Err(format!("HTTP {status} from {url} (content-type={content_type})").into());
85    }
86
87    let zip_bytes = resp.bytes().await?;
88    tracing::debug!(url, bytes = zip_bytes.len(), "downloaded suspend response");
89
90    // ZIP magic number: `PK\x03\x04` (or `PK\x05\x06` for empty archive, `PK\x07\x08` for spanned)
91    let is_zip = zip_bytes.len() >= 4
92        && zip_bytes[0] == b'P'
93        && zip_bytes[1] == b'K'
94        && (zip_bytes[2] == 0x03 || zip_bytes[2] == 0x05 || zip_bytes[2] == 0x07);
95
96    if !is_zip {
97        let preview: String = zip_bytes
98            .iter()
99            .take(120)
100            .map(|&b| {
101                if b.is_ascii_graphic() || b == b' ' {
102                    b as char
103                } else {
104                    '·'
105                }
106            })
107            .collect();
108        tracing::debug!(
109            url,
110            content_type,
111            bytes = zip_bytes.len(),
112            preview,
113            "suspend data response is not a ZIP (magic number mismatch); CDN may have returned HTML error page"
114        );
115        return Err(
116            format!("not a ZIP archive (magic mismatch, content-type={content_type})").into(),
117        );
118    }
119
120    // 真正是 ZIP 才进解压(至此若报 "Could not find EOCD" 就是 ZIP 实际损坏,
121    // 值得 WARN;否则 magic-check 已经把 HTML 错误页 / 空响应挡在外面)
122    let cursor = std::io::Cursor::new(zip_bytes);
123    let mut archive = zip::ZipArchive::new(cursor)?;
124    let mut file = archive.by_name(name)?;
125    let mut dat_bytes = Vec::new();
126    std::io::Read::read_to_end(&mut file, &mut dat_bytes)?;
127
128    parse_suspend_dat(&dat_bytes)
129}
130
131/// 解析 .dat 二进制格式
132///
133/// C++ 对应: NNBiz_Qot_Suspend::LoadFile
134fn parse_suspend_dat(
135    data: &[u8],
136) -> Result<HashMap<u64, Vec<u64>>, Box<dyn std::error::Error + Send + Sync>> {
137    use prost::Message;
138
139    if data.len() < 6 {
140        return Err("suspend dat too short".into());
141    }
142
143    let version = u16::from_be_bytes([data[0], data[1]]);
144    let group_count = u32::from_be_bytes([data[2], data[3], data[4], data[5]]);
145
146    // C++ 校验: nVersion <= 0 || nGroupCnt <= 0
147    if version == 0 || group_count == 0 {
148        return Err(
149            format!("invalid suspend dat: version={version}, group_count={group_count}").into(),
150        );
151    }
152
153    let mut result = HashMap::new();
154    let mut offset = 6_usize;
155    let mut read_groups = 0_u32;
156
157    while read_groups < group_count {
158        if offset + 4 > data.len() {
159            tracing::warn!(
160                offset,
161                remaining = data.len() - offset,
162                "suspend dat truncated at group_len"
163            );
164            break;
165        }
166
167        let group_len = u32::from_be_bytes([
168            data[offset],
169            data[offset + 1],
170            data[offset + 2],
171            data[offset + 3],
172        ]) as usize;
173        offset += 4;
174
175        // C++ 校验: nGroupLen <= 0
176        if group_len == 0 {
177            tracing::warn!(offset, "suspend dat: zero group_len");
178            break;
179        }
180
181        if offset + group_len > data.len() {
182            tracing::warn!(
183                offset,
184                group_len,
185                data_len = data.len(),
186                "suspend dat truncated at group data"
187            );
188            break;
189        }
190
191        let group: super::proto_internal::stock_suspend::StockSuspendRecordGroup =
192            match Message::decode(&data[offset..offset + group_len]) {
193                Ok(g) => g,
194                Err(e) => {
195                    tracing::warn!(error = %e, "suspend dat: protobuf decode failed");
196                    break;
197                }
198            };
199
200        for record in &group.stock_suspend_record_list {
201            if let Some(stock_id) = record.stock_id {
202                let mut timestamps: Vec<u64> = record
203                    .stock_sus_seq_list
204                    .iter()
205                    .filter_map(|seq| seq.time)
206                    .collect();
207                if !timestamps.is_empty() {
208                    // C++ 中 SetSuspendData 会 sort,GetSuspendData 用 lower_bound/upper_bound
209                    timestamps.sort_unstable();
210                    timestamps.dedup();
211                    result.insert(stock_id, timestamps);
212                }
213            }
214        }
215
216        offset += group_len;
217        read_groups += 1;
218    }
219
220    Ok(result)
221}
222
223#[cfg(test)]
224mod tests;