Skip to main content

futu_backend/
nn_codec.rs

1// NNProtoCenter 内部协议帧编解码
2//
3// 对齐 C++ f3clogin `ProtocolHeader`(`protocol_header.h` + `protocol_header.cpp`):
4// 32 字节 struct,多字节字段 **`hton_uint16/uint32` 大端网络字节序**
5//
6// offset 0:  szMagicCode[2]       "FT"
7// offset 2:  nProtoVer            u8   (当前 39)
8// offset 3:  nClientType          u8
9// offset 4:  nClientVer           u16 BE
10// offset 6:  nLanguage            u8
11// offset 7:  nUserID              u32 BE
12// offset 11: flags                u8   (bit0=push, bit1=compress)
13// offset 12: nSerialNo            u32 BE
14// offset 16: nCmdID               u16 BE
15// offset 18: nExHeadBodyLen       u32 BE  (ex_head_len + body_len,**含扩展头**)
16// offset 22: szReserved[8]
17// offset 30: nExHeadLen           u16 BE  (紧接 reserved 后,**不是** reserved 的一部分)
18// Total: 32 bytes
19//
20// 收到的数据布局:
21//   [32 字节 header] [ex_head_len 字节扩展头 FtnnExtendHead] [真实 body body_len 字节]
22// decoder 会剥掉扩展头,只把真实 body 暴露给上层。
23
24use crate::ftlogin_wire;
25use bytes::{Buf, BufMut, BytesMut};
26use tokio_util::codec::{Decoder, Encoder};
27
28/// 内部帧头大小
29pub const NN_HEADER_SIZE: usize = ftlogin_wire::HEADER_LEN;
30
31/// 协议版本 (C++ 实际使用 39,非注释中的 30)
32pub const NN_PROTO_VER: u8 = ftlogin_wire::PROTO_VERSION;
33
34/// Magic bytes
35pub const MAGIC: [u8; 2] = ftlogin_wire::MAGIC;
36
37/// 内部协议帧头
38#[derive(Debug, Clone)]
39pub struct NNHeader {
40    pub proto_ver: u8,
41    pub client_type: u8,
42    pub client_ver: u16,
43    pub lang_id: u8,
44    pub user_id: u32,
45    /// C++ `ProtocolHeader::flags_.push_`.
46    pub is_push: bool,
47    /// C++ `ProtocolHeader::flags_.compress_`.
48    pub is_compressed: bool,
49    pub serial_no: u32,
50    pub cmd_id: u16,
51    /// **真实 body 长度**(不含 ex_head)—— 对齐 C++ `GetBodyLen()` 语义
52    pub body_len: u32,
53    /// reserved 8 字节,行情命令用 [0]=market_type [1]=ex_type
54    /// 注意:C++ reserved 只有 8 字节,后 2 字节是 ex_head_len
55    pub reserved: [u8; 8],
56    /// 扩展头长度(FtnnExtendHead protobuf 序列化后字节数)
57    pub ex_head_len: u16,
58}
59
60impl NNHeader {
61    pub fn new(cmd_id: u16, serial_no: u32) -> Self {
62        Self {
63            proto_ver: NN_PROTO_VER,
64            client_type: 0,
65            client_ver: 0,
66            lang_id: 0,
67            user_id: 0,
68            is_push: false,
69            is_compressed: false,
70            serial_no,
71            cmd_id,
72            body_len: 0,
73            reserved: [0u8; 8],
74            ex_head_len: 0,
75        }
76    }
77
78    /// 编码到字节流。`body_len` 字段按**真实 body 长度**传入(不含 ex_head),
79    /// 实际上线时我们 **不发送 ex_head**(ex_head_len=0),所以 wire 上的
80    /// `ex_head_body_len = body_len`。
81    pub fn encode(&self, dst: &mut BytesMut) {
82        let wire = ftlogin_wire::ProtocolHeader {
83            proto_version: self.proto_ver,
84            client_type: self.client_type,
85            client_version: self.client_ver,
86            language: self.lang_id,
87            user_id: self.user_id,
88            is_push: self.is_push,
89            is_compressed: self.is_compressed,
90            serial_num: self.serial_no,
91            cmd: self.cmd_id,
92            ex_head_body_len: self.ex_head_len as u32 + self.body_len,
93            reserved: self.reserved,
94            ex_head_len: self.ex_head_len,
95        };
96        dst.reserve(NN_HEADER_SIZE);
97        dst.put_slice(&wire.encode());
98    }
99
100    pub fn decode(src: &BytesMut) -> Result<Option<Self>, futu_core::error::FutuError> {
101        if src.len() < NN_HEADER_SIZE {
102            return Ok(None);
103        }
104
105        let wire = ftlogin_wire::ProtocolHeader::decode(&src[..NN_HEADER_SIZE])?;
106        let body_len = wire.body_len()? as u32;
107
108        Ok(Some(Self {
109            proto_ver: wire.proto_version,
110            client_type: wire.client_type,
111            client_ver: wire.client_version,
112            lang_id: wire.language,
113            user_id: wire.user_id,
114            is_push: wire.is_push,
115            is_compressed: wire.is_compressed,
116            serial_no: wire.serial_num,
117            cmd_id: wire.cmd,
118            body_len,
119            reserved: wire.reserved,
120            ex_head_len: wire.ex_head_len,
121        }))
122    }
123
124    #[inline]
125    pub fn is_compressed(&self) -> bool {
126        self.is_compressed
127    }
128
129    #[inline]
130    pub fn flags(&self) -> u8 {
131        u8::from(self.is_push) | (u8::from(self.is_compressed) << 1)
132    }
133}
134
135/// 内部协议帧
136#[derive(Debug, Clone)]
137pub struct NNFrame {
138    pub header: NNHeader,
139    pub body: bytes::Bytes,
140    /// 服务端下发的扩展头原始字节(FtnnExtendHead protobuf)
141    /// 关键用途:body 为空时,服务端通常把业务错误塞在 ex_head.err_info 里
142    pub ex_head: bytes::Bytes,
143}
144
145/// 对齐 C++ `FTConnExtHead.proto` 的 `ErrorInfo`(field 5 of FtnnExtendHead)。
146#[derive(Debug, Clone, Default)]
147pub struct ExHeadErrorInfo {
148    /// 对齐 `CmdResult` enum:0=成功,1=失败
149    pub cmd_result: i32,
150    pub source: String,
151    pub code: i32,
152    pub message: String,
153}
154
155impl NNFrame {
156    /// 从 ex_head 里提取 err_info(field 5 = ErrorInfo nested message)
157    /// 如果 ex_head 为空或没 err_info 字段,返回 None。
158    pub fn parse_ex_head_error(&self) -> Option<ExHeadErrorInfo> {
159        if self.ex_head.is_empty() {
160            return None;
161        }
162        parse_err_info_from_ex_head(&self.ex_head)
163    }
164}
165
166/// 从 FtnnExtendHead protobuf 字节里提取 field 5 (ErrorInfo)。
167/// 手写 protobuf 解析,避免依赖 prost 生成代码。
168fn parse_err_info_from_ex_head(data: &[u8]) -> Option<ExHeadErrorInfo> {
169    let mut pos = 0;
170    // 遍历 FtnnExtendHead 的字段,找 field 5
171    while pos < data.len() {
172        let (tag, new_pos) = decode_varint(data, pos)?;
173        pos = new_pos;
174        let field_num = (tag >> 3) as u32;
175        let wire_type = (tag & 0x07) as u8;
176
177        if wire_type == 2 {
178            let (len, new_pos) = decode_varint(data, pos)?;
179            pos = new_pos;
180            let len = len as usize;
181            if pos + len > data.len() {
182                return None;
183            }
184            if field_num == 5 {
185                // ErrorInfo 子消息
186                return Some(decode_error_info(&data[pos..pos + len]));
187            }
188            pos += len;
189        } else if wire_type == 0 {
190            let (_v, new_pos) = decode_varint(data, pos)?;
191            pos = new_pos;
192        } else if wire_type == 1 {
193            pos += 8;
194        } else if wire_type == 5 {
195            pos += 4;
196        } else {
197            return None;
198        }
199    }
200    None
201}
202
203fn decode_error_info(data: &[u8]) -> ExHeadErrorInfo {
204    let mut info = ExHeadErrorInfo::default();
205    let mut pos = 0;
206    while pos < data.len() {
207        let Some((tag, new_pos)) = decode_varint(data, pos) else {
208            break;
209        };
210        pos = new_pos;
211        let field_num = (tag >> 3) as u32;
212        let wire_type = (tag & 0x07) as u8;
213
214        match (field_num, wire_type) {
215            (1, 0) => {
216                // cmd_result (int enum)
217                if let Some((v, p)) = decode_varint(data, pos) {
218                    info.cmd_result = v as i32;
219                    pos = p;
220                } else {
221                    break;
222                }
223            }
224            (3, 0) => {
225                // code (int32)
226                if let Some((v, p)) = decode_varint(data, pos) {
227                    info.code = v as i32;
228                    pos = p;
229                } else {
230                    break;
231                }
232            }
233            (2, 2) | (4, 2) => {
234                // source (field 2) / message (field 4)
235                let Some((len, new_pos)) = decode_varint(data, pos) else {
236                    break;
237                };
238                pos = new_pos;
239                let end = pos + len as usize;
240                if end > data.len() {
241                    break;
242                }
243                let s = String::from_utf8_lossy(&data[pos..end]).to_string();
244                if field_num == 2 {
245                    info.source = s;
246                } else {
247                    info.message = s;
248                }
249                pos = end;
250            }
251            (_, 0) => {
252                let Some((_v, p)) = decode_varint(data, pos) else {
253                    break;
254                };
255                pos = p;
256            }
257            (_, 2) => {
258                let Some((len, p)) = decode_varint(data, pos) else {
259                    break;
260                };
261                pos = p + len as usize;
262            }
263            (_, 1) => pos += 8,
264            (_, 5) => pos += 4,
265            _ => break,
266        }
267    }
268    info
269}
270
271fn decode_varint(data: &[u8], start: usize) -> Option<(u64, usize)> {
272    let mut result: u64 = 0;
273    let mut shift = 0;
274    let mut pos = start;
275    loop {
276        if pos >= data.len() {
277            return None;
278        }
279        let byte = data[pos];
280        result |= ((byte & 0x7F) as u64) << shift;
281        pos += 1;
282        if byte & 0x80 == 0 {
283            return Some((result, pos));
284        }
285        shift += 7;
286        if shift >= 64 {
287            return None;
288        }
289    }
290}
291
292/// 内部协议编解码器
293pub struct NNCodec;
294
295impl Decoder for NNCodec {
296    type Item = NNFrame;
297    type Error = futu_core::error::FutuError;
298
299    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
300        let header = match NNHeader::decode(src)? {
301            Some(h) => h,
302            None => return Ok(None),
303        };
304
305        // 线上布局:[32 header][ex_head_len 扩展头][body_len 真实 body]
306        let ex_head_body_len = header.ex_head_len as usize + header.body_len as usize;
307        let total = NN_HEADER_SIZE + ex_head_body_len;
308        if src.len() < total {
309            src.reserve(total - src.len());
310            return Ok(None);
311        }
312
313        src.advance(NN_HEADER_SIZE);
314        // 保留扩展头字节 —— NNFrame::parse_ex_head_error 可以读 err_info
315        let ex_head = if header.ex_head_len > 0 {
316            src.split_to(header.ex_head_len as usize).freeze()
317        } else {
318            bytes::Bytes::new()
319        };
320        // Do not decompress here. C++ `channel_impl.cpp:1873-1900` decrypts
321        // encrypted command bodies first, then applies `IsDataCompressed()`.
322        // `NNCodec` owns only frame slicing; `BackendConn` owns the body pipeline.
323        let body = src.split_to(header.body_len as usize).freeze();
324
325        Ok(Some(NNFrame {
326            header,
327            body,
328            ex_head,
329        }))
330    }
331}
332
333impl Encoder<NNFrame> for NNCodec {
334    type Error = futu_core::error::FutuError;
335
336    fn encode(&mut self, item: NNFrame, dst: &mut BytesMut) -> Result<(), Self::Error> {
337        let mut header = item.header;
338        header.body_len = item.body.len() as u32;
339        header.encode(dst);
340        dst.extend_from_slice(&item.body);
341        Ok(())
342    }
343}
344
345/// 判断是否为登录命令(登录命令不加密)
346pub fn is_login_cmd(cmd_id: u16) -> bool {
347    matches!(cmd_id, 1001 | 6001 | 2001 | 4001)
348}
349
350/// 判断是否为不加密的命令(行情命令等)
351/// 对应 C++ gs_arrUnencryptedProtoCmd
352pub fn is_unencrypted_proto(cmd_id: u16) -> bool {
353    matches!(
354        cmd_id,
355        1306  | // Pull_ErrorCode
356        1316  | // Ping
357        1321  | // Pull_ConnIp (获取连接点列表)
358        20147 | // Broker UpdateConnIp (FTLogin kCmdUpdateConnIpBroker, no encrypt)
359        5115  | // 自选股列表更新
360        5120  | // Pull_USGroupSecList
361        5121  | // Pull_USGroupInfo
362        6682  | // ModifyUSGroupSecList (修改自选股)
363        6032  | // Qot_Request_Right
364        6128  | // Qot_Pull_Ticker
365        6160  | // Qot_Pull_TimeShare
366        6161  | // Qot_Pull_Kline
367        6211  | // Qot_Push_Sub
368        6212  | // Qot_Push_Push
369        6301  | // Qot_Push_EventNotice
370        6304  | // Qot_ReSub
371        6311  | // Qot_Pull_OptionStrikeDate
372        6337  | // Qot_Pull_FutureInfo
373        6365  | // Qot_Pull_TickerStatistics
374        6366  | // Qot_Pull_TickerStatisticsDetail
375        6503  | // Qot_Pull_SpreadInfo
376        6513  | // Qot_Pull_Warrant
377        6600  | // Qot_Pull_PlateOrSetID
378        6608  | // Qot_Pull_SecOwnerPlate
379        6621  | // Qot_Pull_BrokerMonitor
380        6693  | // Qot_Pull_CapitalDistribution
381        6694  | // Qot_Pull_CapitalFlow
382        6695  | // Qot_Pull_HistoryCapitalFlow
383        6701  | // Qot_Pull_FutureRelated
384        6733  | // Pull_TradeDate
385        6736  | // Qot_Pull_OptionChain
386        6745  | // Qot_SecListCheckSumDiffReport
387        6746  | // Qot_Update_SecList
388        6747  | // Qot_Pull_StaticInfo
389        6801  | 6802 | 6804 | // 股价提醒
390        6803  | // Push_PriceReminderChange
391        6808  | // Pull_PriceReminder
392        6809  | // Set_PriceReminder
393        6811  | // Qot_Pull_Rehab
394        6822  | // Qot_Push_EventNotice_Sub
395        6823  | // Qot_Pull_EventNotice
396        6824  | // Qot_Pull_SubData
397        6825  | // Qot_Pull_MarketState
398        6956  | // Qot_Pull_IpoList
399        6957  | // Qot_Pull_IpoDetail
400        8017  | // 牛牛圈新消息push
401        18008 | // Qot_Pull_BrokerInfo
402        18012 | // Qot_Pull_GetLv2RelatedExchange (v1.4.110 codex QOT Phase 4 Slice 7)
403        65507 | // FTLogin WebTCP kCmdWebRequest
404        65509 | // FTLogin WebTCP kCmdSiteConfigRequest
405        20106 | // Qot_Pull_GetSecuritiesInfo
406        20287 | // ConfirmLogPush
407        20334 // Qot_New_Pull_Warrant
408    )
409}
410
411/// 判断命令是否需要跳过加密(登录命令或未加密行情命令)
412pub fn should_skip_encryption(cmd_id: u16) -> bool {
413    is_login_cmd(cmd_id) || is_unencrypted_proto(cmd_id)
414}
415
416#[cfg(test)]
417mod tests;