Skip to main content

futu_backend/
ftlogin_wire.rs

1//! FTLogin/F3CLogin channel wire layer.
2//!
3//! C++ Futu_OpenD receives already-unwrapped protobuf bodies from
4//! F3CLogin.framework. This Rust daemon does not link that framework, so this
5//! module implements the equivalent inbound wire layer from FTLogin source.
6//!
7//! Hardcoded / assumption ledger:
8//! - Header layout and protocol version are protocol constants from
9//!   `/Users/leaf/ai-lab/o-src/FTLogin/Src/ftlogin/channel/impl/protocol_header.h`
10//!   and `protocol_header.cpp` (`kProtoVersion = 39`).
11//!   We emit v39, but inbound non-v39 frames are decoded with the legacy byte-11
12//!   `cmd_type` interpretation used by the pre-v39 Rust decoder. This prevents a
13//!   broker-side mixed-version response from tearing down the whole channel.
14//! - Compressed bodies reserve the first 4 body bytes for BE uncompressed size,
15//!   then carry an LZ4 raw block. Ref:
16//!   `/Users/leaf/ai-lab/o-src/FTLogin/Src/ftlogin/channel/impl/channel_impl.cpp`
17//!   (`kBodyFrontReservedBytes = 4`, `LZ4_decompress_safe`).
18//! - Decompressed length limit is 12 MiB per the same C++ file
19//!   (`kDecompressedDataLengthLimit = 12 * 1024 * 1024`).
20
21use futu_core::error::FutuError;
22
23pub const MAGIC: [u8; 2] = [b'F', b'T'];
24pub const PROTO_VERSION: u8 = 39;
25pub const HEADER_LEN: usize = 32;
26pub const RESERVED_LEN: usize = 8;
27pub const BODY_FRONT_RESERVED_BYTES: usize = 4;
28pub const DECOMPRESSED_DATA_LEN_LIMIT: usize = 12 * 1024 * 1024;
29
30const FLAG_PUSH: u8 = 0b0000_0001;
31const FLAG_COMPRESS: u8 = 0b0000_0010;
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct ProtocolHeader {
35    pub proto_version: u8,
36    pub client_type: u8,
37    pub client_version: u16,
38    pub language: u8,
39    pub user_id: u32,
40    pub is_push: bool,
41    pub is_compressed: bool,
42    pub serial_num: u32,
43    pub cmd: u16,
44    pub ex_head_body_len: u32,
45    pub reserved: [u8; RESERVED_LEN],
46    pub ex_head_len: u16,
47}
48
49impl ProtocolHeader {
50    pub fn decode(bytes: &[u8]) -> Result<Self, FutuError> {
51        if bytes.len() < HEADER_LEN {
52            return Err(FutuError::Codec(format!(
53                "FTLogin header too short: actual={} expected={HEADER_LEN}",
54                bytes.len()
55            )));
56        }
57        if bytes[0..2] != MAGIC {
58            return Err(FutuError::InvalidHeader);
59        }
60        let proto_version = bytes[2];
61        let flags = bytes[11];
62        let ex_head_body_len = read_be_u32(bytes, 18, "ex_head_body_len")?;
63        let ex_head_len = read_be_u16(bytes, 30, "ex_head_len")?;
64        if u32::from(ex_head_len) > ex_head_body_len {
65            return Err(FutuError::Codec(format!(
66                "FTLogin header ex_head_len > ex_head_body_len: ex_head_len={ex_head_len} ex_head_body_len={ex_head_body_len}"
67            )));
68        }
69        let cmd = read_be_u16(bytes, 16, "cmd")?;
70        let (is_push, is_compressed) = if proto_version == PROTO_VERSION {
71            (flags & FLAG_PUSH != 0, flags & FLAG_COMPRESS != 0)
72        } else {
73            // v1.4.107 tolerated older NN-style frames on broker channels because
74            // byte 11 was treated as cmd_type (1 = push, other = reply). Keep that
75            // compatibility while still decoding v39 flags for compressed pushes.
76            tracing::warn!(
77                actual = proto_version,
78                expected = PROTO_VERSION,
79                cmd,
80                "FTLogin protocol version mismatch; falling back to legacy cmd_type semantics"
81            );
82            (flags == FLAG_PUSH, false)
83        };
84
85        Ok(Self {
86            proto_version,
87            client_type: bytes[3],
88            client_version: read_be_u16(bytes, 4, "client_version")?,
89            language: bytes[6],
90            user_id: read_be_u32(bytes, 7, "user_id")?,
91            is_push,
92            is_compressed,
93            serial_num: read_be_u32(bytes, 12, "serial_num")?,
94            cmd,
95            ex_head_body_len,
96            reserved: read_exact::<RESERVED_LEN>(bytes, 22, "reserved")?,
97            ex_head_len,
98        })
99    }
100
101    pub fn encode(&self) -> [u8; HEADER_LEN] {
102        let mut out = [0u8; HEADER_LEN];
103        out[0..2].copy_from_slice(&MAGIC);
104        out[2] = self.proto_version;
105        out[3] = self.client_type;
106        out[4..6].copy_from_slice(&self.client_version.to_be_bytes());
107        out[6] = self.language;
108        out[7..11].copy_from_slice(&self.user_id.to_be_bytes());
109        out[11] = encode_flags(self.is_push, self.is_compressed);
110        out[12..16].copy_from_slice(&self.serial_num.to_be_bytes());
111        out[16..18].copy_from_slice(&self.cmd.to_be_bytes());
112        out[18..22].copy_from_slice(&self.ex_head_body_len.to_be_bytes());
113        out[22..30].copy_from_slice(&self.reserved);
114        out[30..32].copy_from_slice(&self.ex_head_len.to_be_bytes());
115        out
116    }
117
118    pub fn flags(&self) -> u8 {
119        encode_flags(self.is_push, self.is_compressed)
120    }
121
122    pub fn body_len(&self) -> Result<usize, FutuError> {
123        let body_len = self
124            .ex_head_body_len
125            .checked_sub(u32::from(self.ex_head_len))
126            .ok_or_else(|| {
127                FutuError::Codec(format!(
128                    "FTLogin header ex_head_len > ex_head_body_len: ex_head_len={} ex_head_body_len={}",
129                    self.ex_head_len, self.ex_head_body_len
130                ))
131            })?;
132        usize::try_from(body_len)
133            .map_err(|_| FutuError::Codec("FTLogin body length too large".into()))
134    }
135
136    pub fn frame_len(&self) -> Result<usize, FutuError> {
137        HEADER_LEN
138            .checked_add(self.ex_head_body_len as usize)
139            .ok_or_else(|| FutuError::Codec("FTLogin frame length overflow".into()))
140    }
141}
142
143pub fn decode_inbound_body(is_compressed: bool, raw_body: &[u8]) -> Result<Vec<u8>, FutuError> {
144    if !is_compressed {
145        return Ok(raw_body.to_vec());
146    }
147
148    let original_len = compressed_original_len(raw_body)?;
149    decompress_lz4_block(&raw_body[BODY_FRONT_RESERVED_BYTES..], original_len)
150}
151
152pub fn compressed_original_len(body: &[u8]) -> Result<usize, FutuError> {
153    if body.len() <= BODY_FRONT_RESERVED_BYTES {
154        return Err(FutuError::Codec(format!(
155            "FTLogin compressed body too short: actual={} min={}",
156            body.len(),
157            BODY_FRONT_RESERVED_BYTES + 1
158        )));
159    }
160
161    let len = read_be_u32(body, 0, "compressed_original_len")? as usize;
162    if len > DECOMPRESSED_DATA_LEN_LIMIT {
163        return Err(FutuError::Codec(format!(
164            "FTLogin compressed body declares oversized decompressed len: actual={len} max={DECOMPRESSED_DATA_LEN_LIMIT}"
165        )));
166    }
167    Ok(len)
168}
169
170/// LZ4 raw block decompression compatible with C++ `LZ4_decompress_safe`.
171pub fn decompress_lz4_block(compressed: &[u8], original_len: usize) -> Result<Vec<u8>, FutuError> {
172    let mut input_pos = 0usize;
173    let mut output = Vec::with_capacity(original_len);
174
175    while input_pos < compressed.len() {
176        let token = compressed[input_pos];
177        input_pos += 1;
178
179        let literal_len = read_extended_len(compressed, &mut input_pos, usize::from(token >> 4))?;
180        let literal_end = input_pos
181            .checked_add(literal_len)
182            .ok_or_else(|| FutuError::Codec("LZ4 literal length overflow".into()))?;
183        if literal_end > compressed.len() {
184            return Err(FutuError::Codec("LZ4 literal run exceeds input".into()));
185        }
186        if output.len().saturating_add(literal_len) > original_len {
187            return Err(FutuError::Codec(
188                "LZ4 literal run exceeds output limit".into(),
189            ));
190        }
191        output.extend_from_slice(&compressed[input_pos..literal_end]);
192        input_pos = literal_end;
193
194        if input_pos == compressed.len() {
195            break;
196        }
197
198        if input_pos + 2 > compressed.len() {
199            return Err(FutuError::Codec("LZ4 missing match offset".into()));
200        }
201        let offset = usize::from(u16::from_le_bytes([
202            compressed[input_pos],
203            compressed[input_pos + 1],
204        ]));
205        input_pos += 2;
206        if offset == 0 || offset > output.len() {
207            return Err(FutuError::Codec(format!(
208                "LZ4 invalid match offset: {offset}"
209            )));
210        }
211
212        let match_len = read_extended_len(compressed, &mut input_pos, usize::from(token & 0x0f))?
213            .checked_add(4)
214            .ok_or_else(|| FutuError::Codec("LZ4 match length overflow".into()))?;
215        if output.len().saturating_add(match_len) > original_len {
216            return Err(FutuError::Codec(
217                "LZ4 match run exceeds output limit".into(),
218            ));
219        }
220
221        let mut read_pos = output.len() - offset;
222        let mut copied = 0;
223        // LZ4 match run requires byte-by-byte append from a moving read_pos
224        // (allowing overlap with the just-appended bytes). Keep this as a
225        // while loop; iterator rewrites can accidentally read from the
226        // pre-append buffer only and break overlapping matches.
227        while copied < match_len {
228            let byte = output[read_pos];
229            output.push(byte);
230            read_pos += 1;
231            copied += 1;
232        }
233    }
234
235    if output.len() != original_len {
236        return Err(FutuError::Codec(format!(
237            "LZ4 decompressed length mismatch: expected={original_len} actual={}",
238            output.len()
239        )));
240    }
241    Ok(output)
242}
243
244fn read_extended_len(
245    input: &[u8],
246    input_pos: &mut usize,
247    base_len: usize,
248) -> Result<usize, FutuError> {
249    let mut len = base_len;
250    if base_len != 15 {
251        return Ok(len);
252    }
253
254    loop {
255        if *input_pos >= input.len() {
256            return Err(FutuError::Codec("LZ4 truncated extended length".into()));
257        }
258        let value = input[*input_pos];
259        *input_pos += 1;
260        len = len
261            .checked_add(usize::from(value))
262            .ok_or_else(|| FutuError::Codec("LZ4 extended length overflow".into()))?;
263        if value != 255 {
264            return Ok(len);
265        }
266    }
267}
268
269fn encode_flags(is_push: bool, is_compressed: bool) -> u8 {
270    u8::from(is_push) | (u8::from(is_compressed) << 1)
271}
272
273fn read_be_u16(bytes: &[u8], start: usize, field: &str) -> Result<u16, FutuError> {
274    Ok(u16::from_be_bytes(read_exact::<2>(bytes, start, field)?))
275}
276
277fn read_be_u32(bytes: &[u8], start: usize, field: &str) -> Result<u32, FutuError> {
278    Ok(u32::from_be_bytes(read_exact::<4>(bytes, start, field)?))
279}
280
281fn read_exact<const N: usize>(
282    bytes: &[u8],
283    start: usize,
284    field: &str,
285) -> Result<[u8; N], FutuError> {
286    let end = start
287        .checked_add(N)
288        .ok_or_else(|| FutuError::Codec(format!("FTLogin field offset overflow: {field}")))?;
289    let slice = bytes.get(start..end).ok_or_else(|| {
290        FutuError::Codec(format!(
291            "FTLogin field too short: {field} start={start} len={N} actual={}",
292            bytes.len()
293        ))
294    })?;
295    let mut out = [0u8; N];
296    out.copy_from_slice(slice);
297    Ok(out)
298}
299
300#[cfg(test)]
301mod tests;