Skip to main content

futu_mcp/
transport.rs

1//! Resilient stdio transport for MCP server (v1.4.90 P0-A).
2//!
3//! ## Why this exists
4//!
5//! `rmcp::transport::stdio()` (i.e. the default `(Stdin, Stdout)` adapter via
6//! `AsyncRwTransport` + `JsonRpcMessageCodec`) treats *any* JSON parse error
7//! as a fatal stream error. Concretely:
8//!
9//! 1. `JsonRpcMessageCodec::decode()` returns `Err(JsonRpcMessageCodecError::Serde(_))`
10//!    when a line is malformed (e.g. `{"price": Infinity}` — JSON spec forbids
11//!    `Infinity` / `NaN` literals, but LLM clients emit them occasionally).
12//! 2. `FramedRead` yields `Some(Err(_))`.
13//! 3. `AsyncRwTransport::receive()` does `next.await.and_then(|e| e.ok())` —
14//!    converting `Err` to `None`.
15//! 4. The rmcp service loop interprets `None` as "input stream closed" and
16//!    breaks with `QuitReason::Closed`, terminating the entire MCP server.
17//!
18//! Result: a *single* malformed JSON line silently kills the whole server,
19//! disconnecting every client (multi-version sweep proven across v1.4.47 →
20//! v1.4.86 — 11 versions all vulnerable).
21//!
22//! Per JSON-RPC 2.0 §5.1, the correct behavior is to return a `-32700 Parse
23//! error` response and keep the connection alive. This module implements that
24//! behavior as a drop-in replacement for `rmcp::transport::stdio()`.
25//!
26//! ## Design
27//!
28//! - `ResilientStdioTransport` implements `rmcp::transport::Transport<RoleServer>`.
29//! - A background **reader task** owns stdin, reads newline-delimited frames,
30//!   and parses each into `RxJsonRpcMessage<RoleServer>`. Successful parses go
31//!   into an inbound mpsc channel for `receive()`. Parse failures cause a
32//!   synthetic `JsonRpcError(-32700)` to be enqueued onto the **outbound**
33//!   channel directly (bypassing `receive()` so the service never sees an
34//!   error event), and the loop continues.
35//! - A background **writer task** owns stdout and drains the outbound channel,
36//!   serialising messages as one-line JSON each.
37//! - `send()` enqueues onto the outbound channel; `receive()` polls the
38//!   inbound channel; `close()` drops the senders so both tasks exit cleanly.
39//!
40//! ## What this is NOT
41//!
42//! This is a stdio-only fix. The HTTP transport (`StreamableHttpService`)
43//! has its own per-request HTTP body parsing — a malformed request there
44//! returns 4xx without killing the server, so it's not affected by this bug.
45//! Future work: upstream PR to rmcp so all transports share resilient parsing.
46
47use std::sync::Arc;
48
49use rmcp::RoleServer;
50use rmcp::model::{ErrorCode, ErrorData, JsonRpcError, NumberOrString};
51use rmcp::service::{RxJsonRpcMessage, TxJsonRpcMessage};
52use rmcp::transport::Transport;
53use serde_json::Value;
54use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
55use tokio::sync::{Mutex, mpsc};
56
57/// Bound for the inbound channel. 64 frames buffered is plenty — the rmcp
58/// service loop drains promptly. If the channel ever fills up, `receive()` /
59/// the reader task will simply backpressure on stdin, which is fine.
60const INBOUND_BUFFER: usize = 64;
61
62/// Outbound is unbounded because we never want a slow consumer to deadlock
63/// the parse-error path (which writes synthetically). All writes are tiny
64/// JSON lines.
65type OutboundTx = mpsc::UnboundedSender<TxJsonRpcMessage<RoleServer>>;
66type OutboundRx = mpsc::UnboundedReceiver<TxJsonRpcMessage<RoleServer>>;
67
68/// Resilient stdio transport — see module docs.
69pub struct ResilientStdioTransport {
70    inbound_rx: mpsc::Receiver<RxJsonRpcMessage<RoleServer>>,
71    /// Wrapped in `Arc<Mutex<>>` so `send()` can return a `'static` future
72    /// per the `Transport` trait contract.
73    outbound_tx: Arc<Mutex<Option<OutboundTx>>>,
74}
75
76impl ResilientStdioTransport {
77    /// Spawn reader + writer tasks bound to the supplied I/O handles.
78    ///
79    /// Generic over `R` / `W` so tests can inject in-memory pipes; production
80    /// callers use [`resilient_stdio()`].
81    pub fn new<R, W>(read: R, write: W) -> Self
82    where
83        R: AsyncRead + Send + Unpin + 'static,
84        W: AsyncWrite + Send + Unpin + 'static,
85    {
86        let (inbound_tx, inbound_rx) =
87            mpsc::channel::<RxJsonRpcMessage<RoleServer>>(INBOUND_BUFFER);
88        let (outbound_tx, outbound_rx) = mpsc::unbounded_channel::<TxJsonRpcMessage<RoleServer>>();
89
90        // Reader task — owns stdin, parses lines, recovers from parse errors.
91        let outbound_tx_for_reader = outbound_tx.clone();
92        tokio::spawn(reader_task(read, inbound_tx, outbound_tx_for_reader));
93
94        // Writer task — owns stdout, drains outbound queue.
95        tokio::spawn(writer_task(write, outbound_rx));
96
97        Self {
98            inbound_rx,
99            outbound_tx: Arc::new(Mutex::new(Some(outbound_tx))),
100        }
101    }
102}
103
104/// Drop-in replacement for `rmcp::transport::stdio()`. Returns a transport
105/// that survives malformed JSON instead of `exit(0)`-ing.
106pub fn resilient_stdio() -> ResilientStdioTransport {
107    ResilientStdioTransport::new(tokio::io::stdin(), tokio::io::stdout())
108}
109
110#[derive(Debug)]
111pub enum TransportError {
112    Closed,
113}
114
115impl std::fmt::Display for TransportError {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self {
118            Self::Closed => f.write_str("transport closed"),
119        }
120    }
121}
122
123impl std::error::Error for TransportError {}
124
125impl Transport<RoleServer> for ResilientStdioTransport {
126    type Error = TransportError;
127
128    fn send(
129        &mut self,
130        item: TxJsonRpcMessage<RoleServer>,
131    ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
132        let lock = self.outbound_tx.clone();
133        async move {
134            let guard = lock.lock().await;
135            match guard.as_ref() {
136                Some(tx) => tx.send(item).map_err(|_| TransportError::Closed),
137                None => Err(TransportError::Closed),
138            }
139        }
140    }
141
142    async fn receive(&mut self) -> Option<RxJsonRpcMessage<RoleServer>> {
143        self.inbound_rx.recv().await
144    }
145
146    async fn close(&mut self) -> Result<(), Self::Error> {
147        let mut guard = self.outbound_tx.lock().await;
148        // Dropping the sender signals the writer task to exit. The reader
149        // task exits naturally on stdin EOF (or when its inbound_tx half is
150        // dropped, which happens when this struct drops).
151        guard.take();
152        self.inbound_rx.close();
153        Ok(())
154    }
155}
156
157// `ResilientStdioTransport: Transport<RoleServer>` automatically gives us
158// `IntoTransport<RoleServer, TransportError, TransportAdapterIdentity>` via
159// rmcp's blanket impl, so no explicit `IntoTransport` impl is needed here.
160
161// ---------------------------------------------------------------------------
162// Reader / writer tasks
163// ---------------------------------------------------------------------------
164
165async fn reader_task<R>(
166    read: R,
167    inbound_tx: mpsc::Sender<RxJsonRpcMessage<RoleServer>>,
168    outbound_tx: OutboundTx,
169) where
170    R: AsyncRead + Send + Unpin + 'static,
171{
172    let mut reader = BufReader::new(read);
173    // v1.4.93 P0-3 (BUG-003): read raw bytes instead of String to gracefully
174    // handle UTF-8 invalid sequences (e.g. `\xfe\xfe`, UTF-16 BOM, mixed
175    // binary). `read_line` into String returns Err(InvalidData) on bad UTF-8
176    // and the previous code matched `Err => break;` -> server terminated
177    // mid-session. Per JSON-RPC 2.0 §5.1 we should return -32700 Parse error
178    // and keep the connection alive (same as v1.4.90 P0-A but for the
179    // pre-string-conversion stage).
180    let mut line_bytes = Vec::<u8>::new();
181
182    loop {
183        line_bytes.clear();
184        match reader.read_until(b'\n', &mut line_bytes).await {
185            Ok(0) => {
186                // True EOF — stdin closed by client. Let the inbound channel
187                // close so the service loop sees `receive() -> None` and
188                // shuts down cleanly.
189                tracing::debug!("resilient stdio: stdin EOF, closing");
190                break;
191            }
192            Ok(_) => {
193                // v1.4.93 P0-3: try UTF-8 conversion; on failure emit -32700
194                // and continue reading instead of terminating the reader task.
195                let line_str = match std::str::from_utf8(&line_bytes) {
196                    Ok(s) => s,
197                    Err(utf8_err) => {
198                        // Build a small ASCII preview of the offending bytes
199                        // for the error message (escape non-ASCII as `\xNN`).
200                        let preview_bytes: String = line_bytes
201                            .iter()
202                            .take(64)
203                            .map(|b| {
204                                if (0x20..=0x7e).contains(b) {
205                                    (*b as char).to_string()
206                                } else {
207                                    format!("\\x{b:02x}")
208                                }
209                            })
210                            .collect();
211                        let err_msg = JsonRpcError::new(
212                            recover_request_id(""), // no id recoverable from non-UTF8
213                            ErrorData::new(
214                                ErrorCode::PARSE_ERROR,
215                                format!(
216                                    "Parse error: invalid UTF-8 at byte {}: {}",
217                                    utf8_err.valid_up_to(),
218                                    utf8_err
219                                ),
220                                None,
221                            ),
222                        );
223                        tracing::warn!(
224                            error = %utf8_err,
225                            line_preview = %preview_bytes,
226                            "resilient stdio: invalid UTF-8 input, returning -32700 (server stays alive)"
227                        );
228                        let _ = outbound_tx.send(rmcp::model::JsonRpcMessage::Error(err_msg));
229                        continue;
230                    }
231                };
232                let trimmed =
233                    line_str.trim_matches(|c| c == '\n' || c == '\r' || c == ' ' || c == '\t');
234                if trimmed.is_empty() {
235                    continue;
236                }
237                match serde_json::from_str::<RxJsonRpcMessage<RoleServer>>(trimmed) {
238                    Ok(msg) => {
239                        if inbound_tx.send(msg).await.is_err() {
240                            // Receiver dropped — transport is closing.
241                            break;
242                        }
243                    }
244                    Err(parse_err) => {
245                        // Per JSON-RPC 2.0 §5.1, return -32700 Parse error
246                        // and keep the connection alive. Try to extract the
247                        // request id from the malformed payload (best-effort
248                        // — the spec says id should be `null` if it can't
249                        // be determined, but rmcp's `JsonRpcError` requires
250                        // a `RequestId` so we synthesise one as 0 / "" when
251                        // missing).
252                        let id = recover_request_id(trimmed);
253                        let err_msg = JsonRpcError::new(
254                            id,
255                            ErrorData::new(
256                                ErrorCode::PARSE_ERROR,
257                                format!("Parse error: {parse_err}"),
258                                None,
259                            ),
260                        );
261                        tracing::warn!(
262                            error = %parse_err,
263                            line_preview = %preview(trimmed),
264                            "resilient stdio: parse error, returning -32700 (server stays alive)"
265                        );
266                        // Best-effort send — if the writer task is gone we
267                        // can't help, but the server is dying anyway.
268                        let _ = outbound_tx.send(rmcp::model::JsonRpcMessage::Error(err_msg));
269                    }
270                }
271            }
272            Err(io_err) => {
273                tracing::warn!(error = %io_err, "resilient stdio: read error, terminating");
274                break;
275            }
276        }
277    }
278
279    // Drop the inbound sender so receive() returns None and the service
280    // loop exits cleanly.
281    drop(inbound_tx);
282}
283
284async fn writer_task<W>(write: W, mut outbound_rx: OutboundRx)
285where
286    W: AsyncWrite + Send + Unpin + 'static,
287{
288    let mut write = write;
289    while let Some(msg) = outbound_rx.recv().await {
290        match serde_json::to_vec(&msg) {
291            Ok(mut bytes) => {
292                bytes.push(b'\n');
293                if let Err(io_err) = write.write_all(&bytes).await {
294                    tracing::warn!(error = %io_err, "resilient stdio: write error, terminating");
295                    break;
296                }
297                if let Err(io_err) = write.flush().await {
298                    tracing::warn!(error = %io_err, "resilient stdio: flush error, terminating");
299                    break;
300                }
301            }
302            Err(serde_err) => {
303                // This should never happen — TxJsonRpcMessage<RoleServer>
304                // is always serialisable. If it does, log and skip.
305                tracing::error!(
306                    error = %serde_err,
307                    "resilient stdio: failed to serialise outbound message (BUG)"
308                );
309            }
310        }
311    }
312}
313
314/// Best-effort recovery of the `id` field from a malformed JSON-RPC payload.
315/// Falls back to `Number(0)` when extraction fails (the JSON-RPC spec says
316/// "null" is the canonical placeholder, but rmcp's `RequestId` doesn't admit
317/// a null variant — `Number(0)` is the closest match and round-trips cleanly).
318fn recover_request_id(line: &str) -> NumberOrString {
319    if let Ok(value) = serde_json::from_str::<Value>(line)
320        && let Some(id) = value.get("id")
321    {
322        if let Some(n) = id.as_i64() {
323            return NumberOrString::Number(n);
324        }
325        if let Some(s) = id.as_str() {
326            return NumberOrString::String(s.into());
327        }
328    }
329    NumberOrString::Number(0)
330}
331
332/// Truncate a line for log output (avoid dumping arbitrary client input
333/// into the audit log unbounded).
334fn preview(s: &str) -> String {
335    const MAX: usize = 200;
336    if s.len() <= MAX {
337        s.to_string()
338    } else {
339        format!("{}…(+{} bytes)", &s[..MAX], s.len() - MAX)
340    }
341}
342
343// ---------------------------------------------------------------------------
344// Tests
345// ---------------------------------------------------------------------------
346
347#[cfg(test)]
348mod tests;