futu_mcp/transport.rs
1//! Resilient stdio transport for MCP server (v1.4.90 P0-A).
2//!
3//! ## Why this exists
4//!
5//! `rmcp::transport::stdio()` (i.e. the default `(Stdin, Stdout)` adapter via
6//! `AsyncRwTransport` + `JsonRpcMessageCodec`) treats *any* JSON parse error
7//! as a fatal stream error. Concretely:
8//!
9//! 1. `JsonRpcMessageCodec::decode()` returns `Err(JsonRpcMessageCodecError::Serde(_))`
10//! when a line is malformed (e.g. `{"price": Infinity}` — JSON spec forbids
11//! `Infinity` / `NaN` literals, but LLM clients emit them occasionally).
12//! 2. `FramedRead` yields `Some(Err(_))`.
13//! 3. `AsyncRwTransport::receive()` does `next.await.and_then(|e| e.ok())` —
14//! converting `Err` to `None`.
15//! 4. The rmcp service loop interprets `None` as "input stream closed" and
16//! breaks with `QuitReason::Closed`, terminating the entire MCP server.
17//!
18//! Result: a *single* malformed JSON line silently kills the whole server,
19//! disconnecting every client (multi-version sweep proven across v1.4.47 →
20//! v1.4.86 — 11 versions all vulnerable).
21//!
22//! Per JSON-RPC 2.0 §5.1, the correct behavior is to return a `-32700 Parse
23//! error` response and keep the connection alive. This module implements that
24//! behavior as a drop-in replacement for `rmcp::transport::stdio()`.
25//!
26//! ## Design
27//!
28//! - `ResilientStdioTransport` implements `rmcp::transport::Transport<RoleServer>`.
29//! - A background **reader task** owns stdin, reads newline-delimited frames,
30//! and parses each into `RxJsonRpcMessage<RoleServer>`. Successful parses go
31//! into an inbound mpsc channel for `receive()`. Parse failures cause a
32//! synthetic `JsonRpcError(-32700)` to be enqueued onto the **outbound**
33//! channel directly (bypassing `receive()` so the service never sees an
34//! error event), and the loop continues.
35//! - A background **writer task** owns stdout and drains the outbound channel,
36//! serialising messages as one-line JSON each.
37//! - `send()` enqueues onto the outbound channel; `receive()` polls the
38//! inbound channel; `close()` drops the senders so both tasks exit cleanly.
39//!
40//! ## What this is NOT
41//!
42//! This is a stdio-only fix. The HTTP transport (`StreamableHttpService`)
43//! has its own per-request HTTP body parsing — a malformed request there
44//! returns 4xx without killing the server, so it's not affected by this bug.
45//! Future work: upstream PR to rmcp so all transports share resilient parsing.
46
47use std::sync::Arc;
48
49use rmcp::RoleServer;
50use rmcp::model::{ErrorCode, ErrorData, JsonRpcError, NumberOrString};
51use rmcp::service::{RxJsonRpcMessage, TxJsonRpcMessage};
52use rmcp::transport::Transport;
53use serde_json::Value;
54use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
55use tokio::sync::{Mutex, mpsc};
56
57/// Bound for the inbound channel. 64 frames buffered is plenty — the rmcp
58/// service loop drains promptly. If the channel ever fills up, `receive()` /
59/// the reader task will simply backpressure on stdin, which is fine.
60const INBOUND_BUFFER: usize = 64;
61
62/// Outbound is unbounded because we never want a slow consumer to deadlock
63/// the parse-error path (which writes synthetically). All writes are tiny
64/// JSON lines.
65type OutboundTx = mpsc::UnboundedSender<TxJsonRpcMessage<RoleServer>>;
66type OutboundRx = mpsc::UnboundedReceiver<TxJsonRpcMessage<RoleServer>>;
67
68/// Resilient stdio transport — see module docs.
69pub struct ResilientStdioTransport {
70 inbound_rx: mpsc::Receiver<RxJsonRpcMessage<RoleServer>>,
71 /// Wrapped in `Arc<Mutex<>>` so `send()` can return a `'static` future
72 /// per the `Transport` trait contract.
73 outbound_tx: Arc<Mutex<Option<OutboundTx>>>,
74}
75
76impl ResilientStdioTransport {
77 /// Spawn reader + writer tasks bound to the supplied I/O handles.
78 ///
79 /// Generic over `R` / `W` so tests can inject in-memory pipes; production
80 /// callers use [`resilient_stdio()`].
81 pub fn new<R, W>(read: R, write: W) -> Self
82 where
83 R: AsyncRead + Send + Unpin + 'static,
84 W: AsyncWrite + Send + Unpin + 'static,
85 {
86 let (inbound_tx, inbound_rx) =
87 mpsc::channel::<RxJsonRpcMessage<RoleServer>>(INBOUND_BUFFER);
88 let (outbound_tx, outbound_rx) = mpsc::unbounded_channel::<TxJsonRpcMessage<RoleServer>>();
89
90 // Reader task — owns stdin, parses lines, recovers from parse errors.
91 let outbound_tx_for_reader = outbound_tx.clone();
92 tokio::spawn(reader_task(read, inbound_tx, outbound_tx_for_reader));
93
94 // Writer task — owns stdout, drains outbound queue.
95 tokio::spawn(writer_task(write, outbound_rx));
96
97 Self {
98 inbound_rx,
99 outbound_tx: Arc::new(Mutex::new(Some(outbound_tx))),
100 }
101 }
102}
103
104/// Drop-in replacement for `rmcp::transport::stdio()`. Returns a transport
105/// that survives malformed JSON instead of `exit(0)`-ing.
106pub fn resilient_stdio() -> ResilientStdioTransport {
107 ResilientStdioTransport::new(tokio::io::stdin(), tokio::io::stdout())
108}
109
110#[derive(Debug)]
111pub enum TransportError {
112 Closed,
113}
114
115impl std::fmt::Display for TransportError {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 Self::Closed => f.write_str("transport closed"),
119 }
120 }
121}
122
123impl std::error::Error for TransportError {}
124
125impl Transport<RoleServer> for ResilientStdioTransport {
126 type Error = TransportError;
127
128 fn send(
129 &mut self,
130 item: TxJsonRpcMessage<RoleServer>,
131 ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
132 let lock = self.outbound_tx.clone();
133 async move {
134 let guard = lock.lock().await;
135 match guard.as_ref() {
136 Some(tx) => tx.send(item).map_err(|_| TransportError::Closed),
137 None => Err(TransportError::Closed),
138 }
139 }
140 }
141
142 async fn receive(&mut self) -> Option<RxJsonRpcMessage<RoleServer>> {
143 self.inbound_rx.recv().await
144 }
145
146 async fn close(&mut self) -> Result<(), Self::Error> {
147 let mut guard = self.outbound_tx.lock().await;
148 // Dropping the sender signals the writer task to exit. The reader
149 // task exits naturally on stdin EOF (or when its inbound_tx half is
150 // dropped, which happens when this struct drops).
151 guard.take();
152 self.inbound_rx.close();
153 Ok(())
154 }
155}
156
157// `ResilientStdioTransport: Transport<RoleServer>` automatically gives us
158// `IntoTransport<RoleServer, TransportError, TransportAdapterIdentity>` via
159// rmcp's blanket impl, so no explicit `IntoTransport` impl is needed here.
160
161// ---------------------------------------------------------------------------
162// Reader / writer tasks
163// ---------------------------------------------------------------------------
164
165async fn reader_task<R>(
166 read: R,
167 inbound_tx: mpsc::Sender<RxJsonRpcMessage<RoleServer>>,
168 outbound_tx: OutboundTx,
169) where
170 R: AsyncRead + Send + Unpin + 'static,
171{
172 let mut reader = BufReader::new(read);
173 // v1.4.93 P0-3 (BUG-003): read raw bytes instead of String to gracefully
174 // handle UTF-8 invalid sequences (e.g. `\xfe\xfe`, UTF-16 BOM, mixed
175 // binary). `read_line` into String returns Err(InvalidData) on bad UTF-8
176 // and the previous code matched `Err => break;` -> server terminated
177 // mid-session. Per JSON-RPC 2.0 §5.1 we should return -32700 Parse error
178 // and keep the connection alive (same as v1.4.90 P0-A but for the
179 // pre-string-conversion stage).
180 let mut line_bytes = Vec::<u8>::new();
181
182 loop {
183 line_bytes.clear();
184 match reader.read_until(b'\n', &mut line_bytes).await {
185 Ok(0) => {
186 // True EOF — stdin closed by client. Let the inbound channel
187 // close so the service loop sees `receive() -> None` and
188 // shuts down cleanly.
189 tracing::debug!("resilient stdio: stdin EOF, closing");
190 break;
191 }
192 Ok(_) => {
193 // v1.4.93 P0-3: try UTF-8 conversion; on failure emit -32700
194 // and continue reading instead of terminating the reader task.
195 let line_str = match std::str::from_utf8(&line_bytes) {
196 Ok(s) => s,
197 Err(utf8_err) => {
198 // Build a small ASCII preview of the offending bytes
199 // for the error message (escape non-ASCII as `\xNN`).
200 let preview_bytes: String = line_bytes
201 .iter()
202 .take(64)
203 .map(|b| {
204 if (0x20..=0x7e).contains(b) {
205 (*b as char).to_string()
206 } else {
207 format!("\\x{b:02x}")
208 }
209 })
210 .collect();
211 let err_msg = JsonRpcError::new(
212 recover_request_id(""), // no id recoverable from non-UTF8
213 ErrorData::new(
214 ErrorCode::PARSE_ERROR,
215 format!(
216 "Parse error: invalid UTF-8 at byte {}: {}",
217 utf8_err.valid_up_to(),
218 utf8_err
219 ),
220 None,
221 ),
222 );
223 tracing::warn!(
224 error = %utf8_err,
225 line_preview = %preview_bytes,
226 "resilient stdio: invalid UTF-8 input, returning -32700 (server stays alive)"
227 );
228 let _ = outbound_tx.send(rmcp::model::JsonRpcMessage::Error(err_msg));
229 continue;
230 }
231 };
232 let trimmed =
233 line_str.trim_matches(|c| c == '\n' || c == '\r' || c == ' ' || c == '\t');
234 if trimmed.is_empty() {
235 continue;
236 }
237 match serde_json::from_str::<RxJsonRpcMessage<RoleServer>>(trimmed) {
238 Ok(msg) => {
239 if inbound_tx.send(msg).await.is_err() {
240 // Receiver dropped — transport is closing.
241 break;
242 }
243 }
244 Err(parse_err) => {
245 // Per JSON-RPC 2.0 §5.1, return -32700 Parse error
246 // and keep the connection alive. Try to extract the
247 // request id from the malformed payload (best-effort
248 // — the spec says id should be `null` if it can't
249 // be determined, but rmcp's `JsonRpcError` requires
250 // a `RequestId` so we synthesise one as 0 / "" when
251 // missing).
252 let id = recover_request_id(trimmed);
253 let err_msg = JsonRpcError::new(
254 id,
255 ErrorData::new(
256 ErrorCode::PARSE_ERROR,
257 format!("Parse error: {parse_err}"),
258 None,
259 ),
260 );
261 tracing::warn!(
262 error = %parse_err,
263 line_preview = %preview(trimmed),
264 "resilient stdio: parse error, returning -32700 (server stays alive)"
265 );
266 // Best-effort send — if the writer task is gone we
267 // can't help, but the server is dying anyway.
268 let _ = outbound_tx.send(rmcp::model::JsonRpcMessage::Error(err_msg));
269 }
270 }
271 }
272 Err(io_err) => {
273 tracing::warn!(error = %io_err, "resilient stdio: read error, terminating");
274 break;
275 }
276 }
277 }
278
279 // Drop the inbound sender so receive() returns None and the service
280 // loop exits cleanly.
281 drop(inbound_tx);
282}
283
284async fn writer_task<W>(write: W, mut outbound_rx: OutboundRx)
285where
286 W: AsyncWrite + Send + Unpin + 'static,
287{
288 let mut write = write;
289 while let Some(msg) = outbound_rx.recv().await {
290 match serde_json::to_vec(&msg) {
291 Ok(mut bytes) => {
292 bytes.push(b'\n');
293 if let Err(io_err) = write.write_all(&bytes).await {
294 tracing::warn!(error = %io_err, "resilient stdio: write error, terminating");
295 break;
296 }
297 if let Err(io_err) = write.flush().await {
298 tracing::warn!(error = %io_err, "resilient stdio: flush error, terminating");
299 break;
300 }
301 }
302 Err(serde_err) => {
303 // This should never happen — TxJsonRpcMessage<RoleServer>
304 // is always serialisable. If it does, log and skip.
305 tracing::error!(
306 error = %serde_err,
307 "resilient stdio: failed to serialise outbound message (BUG)"
308 );
309 }
310 }
311 }
312}
313
314/// Best-effort recovery of the `id` field from a malformed JSON-RPC payload.
315/// Falls back to `Number(0)` when extraction fails (the JSON-RPC spec says
316/// "null" is the canonical placeholder, but rmcp's `RequestId` doesn't admit
317/// a null variant — `Number(0)` is the closest match and round-trips cleanly).
318fn recover_request_id(line: &str) -> NumberOrString {
319 if let Ok(value) = serde_json::from_str::<Value>(line)
320 && let Some(id) = value.get("id")
321 {
322 if let Some(n) = id.as_i64() {
323 return NumberOrString::Number(n);
324 }
325 if let Some(s) = id.as_str() {
326 return NumberOrString::String(s.into());
327 }
328 }
329 NumberOrString::Number(0)
330}
331
332/// Truncate a line for log output (avoid dumping arbitrary client input
333/// into the audit log unbounded).
334fn preview(s: &str) -> String {
335 const MAX: usize = 200;
336 if s.len() <= MAX {
337 s.to_string()
338 } else {
339 format!("{}…(+{} bytes)", &s[..MAX], s.len() - MAX)
340 }
341}
342
343// ---------------------------------------------------------------------------
344// Tests
345// ---------------------------------------------------------------------------
346
347#[cfg(test)]
348mod tests;