Skip to main content

futu_mcp/tools/
subscription.rs

1//! MCP subscription tools (QOT subscribe/unsubscribe and trade push subscriber state).
2
3use crate::handlers;
4use crate::tool_args::{
5    NoArgs, QuerySubscriptionReq, SubAccPushReq, SubscribeReq, UnsubAccPushReq, UnsubscribeReq,
6};
7use crate::tool_auth::CallerSnapshot;
8use rmcp::{
9    RoleServer, handler::server::wrapper::Parameters, service::RequestContext, tool, tool_router,
10};
11
12use super::{FutuServer, system::json_tool_output};
13
14#[tool_router(router = subscription_tool_router, vis = "pub(crate)")]
15impl FutuServer {
16    #[tool(
17        description = "Query current subscription state (subscribed types, quota used/remaining). Python SDK: OpenQuoteContext.query_subscription."
18    )]
19    async fn futu_query_subscription(
20        &self,
21        Parameters(req): Parameters<QuerySubscriptionReq>,
22        req_ctx: RequestContext<RoleServer>,
23    ) -> std::result::Result<String, String> {
24        tracing::info!(tool = "futu_query_subscription");
25        let client = self
26            .read_client_or_err("futu_query_subscription", &req_ctx, None, None)
27            .await?;
28        Self::wrap_result(handlers::core::query_subscription(&client, req.is_req_all_conn).await)
29    }
30
31    #[tool(
32        description = "Get current daemon used quota counters: subscribed quote quota and historical K-line quota. Python SDK: OpenQuoteContext.get_used_quota."
33    )]
34    async fn futu_get_used_quota(
35        &self,
36        Parameters(_req): Parameters<NoArgs>,
37        req_ctx: RequestContext<RoleServer>,
38    ) -> std::result::Result<String, String> {
39        tracing::info!(tool = "futu_get_used_quota");
40        let client = self
41            .read_client_or_err("futu_get_used_quota", &req_ctx, None, None)
42            .await?;
43        Self::wrap_result(handlers::core::get_used_quota(&client).await)
44    }
45
46    /// v1.4.74 A1 BUG-011 fix: 加 `futu_subscribe` 补齐 MCP 对称性。
47    ///
48    /// MCP 之前有 `futu_unsubscribe` / `futu_sub_acc_push` / `futu_unsub_acc_push`
49    /// / `futu_query_subscription` / `futu_push_subscriber_info` 但缺
50    /// `futu_subscribe` 本身 → API 对称性被打破("能取消但不能订阅")。
51    ///
52    /// 架构:本 tool 触发 gateway 层订阅(CMD 3001 QOT_SUB),不返 push stream
53    /// 本身。Push 数据通过 SSE notification 走(v1.4.58 MCP SSE basics),
54    /// 客户端用 `futu_push_subscriber_info` 查询订阅状态。
55    ///
56    /// 对齐 Python SDK `OpenQuoteContext.subscribe`。
57    #[tool(
58        description = "Subscribe market data for given symbols + sub_types. Push data arrives via SSE notifications (HTTP mode). Python SDK: OpenQuoteContext.subscribe."
59    )]
60    async fn futu_subscribe(
61        &self,
62        Parameters(req): Parameters<SubscribeReq>,
63        req_ctx: RequestContext<RoleServer>,
64    ) -> std::result::Result<String, String> {
65        tracing::info!(
66            tool = "futu_subscribe",
67            symbols = ?req.symbols,
68            sub_types = ?req.sub_types,
69            is_first_push = req.is_first_push,
70            is_reg_push = req.is_reg_push,
71        );
72        let client = self
73            .read_client_or_err("futu_subscribe", &req_ctx, None, None)
74            .await?;
75        Self::wrap_result(
76            handlers::core::subscribe(
77                &client,
78                &req.symbols,
79                &req.sub_types,
80                req.is_first_push,
81                req.is_reg_push,
82            )
83            .await,
84        )
85    }
86
87    #[tool(
88        description = "Unsubscribe market data (by symbol+type, or unsub_all to clear this connection). Python SDK: OpenQuoteContext.unsubscribe / unsubscribe_all."
89    )]
90    async fn futu_unsubscribe(
91        &self,
92        Parameters(req): Parameters<UnsubscribeReq>,
93        req_ctx: RequestContext<RoleServer>,
94    ) -> std::result::Result<String, String> {
95        tracing::info!(
96            tool = "futu_unsubscribe",
97            count = req.symbols.len(),
98            unsub_all = req.unsub_all
99        );
100        let client = self
101            .read_client_or_err("futu_unsubscribe", &req_ctx, None, None)
102            .await?;
103        Self::wrap_result(
104            handlers::core::unsubscribe(&client, &req.symbols, &req.sub_types, req.unsub_all).await,
105        )
106    }
107
108    #[tool(
109        description = "Subscribe account order / deal push for given trading accounts. HTTP-mode MCP clients receive pushes as LoggingMessage notifications with {kind, proto_id, body_base64}. Payload body is raw Futu protobuf; decode client-side. Python SDK: OpenTradeContext.sub_acc_push."
110    )]
111    async fn futu_sub_acc_push(
112        &self,
113        Parameters(req): Parameters<SubAccPushReq>,
114        req_ctx: RequestContext<RoleServer>,
115    ) -> std::result::Result<String, String> {
116        // v1.4.102 codex 47 F2 (P1): 空 acc_ids 必拒. 之前空 list 进
117        // register_push_subscriber, subscriber_should_receive 把空 set 视为
118        // "subscribe-all" 全开 push, agent 调 {"acc_ids": []} 等于全订阅
119        // (silent privilege escalation 反模式 D / pitfall #45).
120        if req.acc_ids.is_empty() {
121            return Err(
122                "futu_sub_acc_push: acc_ids 必填非空. 之前空 list silent 全订阅 \
123                 (v1.4.102 codex 47 F2 P1 fix). 调 futu_list_accounts 看可用 \
124                 acc_id, 显式列出要订阅的账户."
125                    .to_string(),
126            );
127        }
128
129        // v1.4.103 (codex 50 F4 / 52 F4 / 54 F3 / 58 F2 — B6): 在调 daemon
130        // 之前用 caller's KeyRecord 比对每个 req.acc_ids ⊆ allowed_acc_ids.
131        // 之前 register-after-daemon-success 只防 daemon 失败的 race window,
132        // 不防 narrow-scope key 让 daemon 全局订阅 acc B (subscriber 本身被
133        // delivery filter 过滤掉, 但 daemon 已对 acc B 发起订阅副作用).
134        //
135        // v1.4.104 codex F3 (P2): 用 pipeline 返的 snapshot 做 ownership +
136        // visibility — pipeline 授权 + push 注册同一 KeyRecord 实例, 避免
137        // SIGHUP 之间 drift / fail-open. 取第 1 个 acc_id 的 snapshot 作 owner;
138        // 后续 acc_ids 通过 pipeline 复 check (不重 capture, 同一 caller).
139        let mut snap_opt: Option<CallerSnapshot> = None;
140        for (idx, acc_id) in req.acc_ids.iter().enumerate() {
141            let snap = self.require_acc_read_with_acc_id(
142                "futu_sub_acc_push",
143                &req_ctx,
144                req.api_key.as_deref(),
145                Some(*acc_id),
146            )?;
147            if idx == 0 {
148                snap_opt = Some(snap);
149            }
150        }
151        let Some(snap) = snap_opt else {
152            return Err(serde_json::json!({
153                "error": "futu_sub_acc_push: caller snapshot resolution failed after acc_ids validation",
154                "status": "error",
155                "hint": "retry after refreshing API key state; if it persists, check auth pipeline logs",
156            })
157            .to_string());
158        };
159
160        tracing::info!(tool = "futu_sub_acc_push", count = req.acc_ids.len());
161
162        let client = self.client_or_err().await?;
163
164        // v1.4.102 codex 47 F3 / 48 F3 (P2): register-after-daemon-success.
165        // 之前先 register peer 后调 daemon, daemon 失败时 local subscriber 留下
166        // 来 — 后续如果其他 caller 让 daemon sub 了同 acc 流, 这个本来失败的
167        // session 仍能收 push (race window leak). 现在: 先调 daemon, 成功才
168        // register local subscriber; daemon 失败 → 不留 local state.
169        let daemon_resp = handlers::trade::sub_acc_push(&client, &req.acc_ids).await;
170        match daemon_resp {
171            Ok(_) => {
172                // v1.4.104 codex F3 (P2): 用 pipeline snapshot 不重新 resolve.
173                // 与 v1.4.103 codex F5.7 (P2) 防 SIGHUP race 行为对齐, 但现
174                // owner_key_id + bearer_token 都从 snapshot 直取, 与 pipeline
175                // 授权决策同一身份. 不再 await 后重 verify.
176                let owner_key_id = snap.key_id.clone();
177                let bearer_token = snap.bearer_token.clone();
178                let acc_ids_set: std::collections::HashSet<u64> =
179                    req.acc_ids.iter().copied().collect();
180                let session_id = self
181                    .state
182                    .register_push_subscriber_with_owner(
183                        req_ctx.peer.clone(),
184                        acc_ids_set,
185                        bearer_token,
186                        owner_key_id,
187                    )
188                    .await;
189                tracing::info!(
190                    tool = "futu_sub_acc_push",
191                    session_id = %session_id,
192                    count = req.acc_ids.len(),
193                    "v1.4.102 codex 47 F3: push subscriber registered after daemon success"
194                );
195                json_tool_output(
196                    "futu_sub_acc_push",
197                    &serde_json::json!({
198                        "ok": true,
199                        "subscribed_acc_ids": req.acc_ids,
200                        "session_id": session_id,
201                        "unsub_hint": format!(
202                            "call `futu_unsub_acc_push` with session_id=\"{session_id}\" to stop receiving pushes; otherwise auto-purged after 4h"
203                        ),
204                    }),
205                )
206            }
207            Err(e) => Self::tool_err(format!("futu_sub_acc_push: {e}")),
208        }
209    }
210
211    #[tool(
212        description = "Unsubscribe from account push notifications. Pass session_id from previous futu_sub_acc_push response (session_id field or unsub_hint). Returns {removed_count}. If session_id is not found, removed_count=0 (likely auto-purged or never registered)."
213    )]
214    async fn futu_unsub_acc_push(
215        &self,
216        Parameters(req): Parameters<UnsubAccPushReq>,
217        req_ctx: RequestContext<RoleServer>,
218    ) -> std::result::Result<String, String> {
219        // v1.4.103 codex F4 (P1) fail-closed: scope check 用 caller-specific
220        // (HTTP Bearer / api_key 优先) 而不是 process-wide startup key.
221        // 之前 require_tool_scope 只看 startup, 受限 Bearer fall through.
222        let snap = self.require_acc_read_with_acc_id(
223            "futu_unsub_acc_push",
224            &req_ctx,
225            req.api_key.as_deref(),
226            None,
227        )?;
228        let Some(ref session_id) = req.session_id else {
229            return Self::tool_err(
230                "session_id required. Get it from a previous futu_sub_acc_push response.",
231            );
232        };
233        // v1.4.103 (codex 50 F6 / 53 F4 — B8): unsub session ownership check.
234        // 之前任何 caller 拿到可见 session_id 即可 remove, 跨 caller 容易踩
235        // (尤其 push_subscriber_info 列其他 session 后被恶意 caller unsub).
236        // 现在: 解析 caller key (Bearer / startup), 比对 subscriber.owner_key_id.
237        // v1.4.103 codex F4 (P1) fail-closed: invalid Bearer → 不 fall back
238        // startup key. 因为前面 require_acc_read_with_acc_id 已经把 invalid
239        // api_key / Bearer reject 掉了, 走到这里直接复用同一 auth snapshot 的
240        // key_id 做 ownership check,避免重新解析时与 sub/register 身份漂移。
241        let caller_key_id = snap.key_id;
242
243        let removed = self
244            .state
245            .unregister_push_subscriber_with_owner_check(
246                session_id.as_str(),
247                caller_key_id.as_deref(),
248            )
249            .await;
250        tracing::info!(
251            tool = "futu_unsub_acc_push",
252            removed = removed.is_ok(),
253            session_id = %session_id,
254            caller_key_id = ?caller_key_id,
255            "v1.4.103 B8: push subscriber unregister with ownership check"
256        );
257        match removed {
258            Ok(true) => json_tool_output("futu_unsub_acc_push", &serde_json::json!({
259                "ok": true,
260                "removed_count": 1,
261                "session_id": session_id,
262            })),
263            Ok(false) => json_tool_output("futu_unsub_acc_push", &serde_json::json!({
264                "ok": true,
265                "removed_count": 0,
266                "session_id": session_id,
267                "hint": "session_id not found (likely 4h auto-purged or never registered)",
268            })),
269            Err(reason) => Err(serde_json::json!({
270                "error": format!("ownership check failed: {reason}"),
271                "status": "error",
272                "hint": "only the original caller (matched by API key id) or an admin-scope key can unsub a session.",
273            })
274            .to_string()),
275        }
276    }
277
278    #[tool(
279        description = "Diagnostic — list active push subscriptions on this MCP server. Returns {total_count, subscriptions: [{session_id, acc_ids, age_secs}]}. Useful to verify whether futu_sub_acc_push registered, check auto-purge timing, or debug missing pushes."
280    )]
281    async fn futu_push_subscriber_info(
282        &self,
283        Parameters(_req): Parameters<NoArgs>,
284        req_ctx: RequestContext<RoleServer>,
285    ) -> std::result::Result<String, String> {
286        // v1.4.103 codex F5 (P2): 加 RequestContext 让 caller-specific Bearer
287        // 解析能生效. 之前 require_tool_scope 只看 startup key, narrow Bearer
288        // caller 看到的 caller_allowed 是 startup key 的, 跨租户泄漏其他
289        // agent 的订阅 acc_ids.
290        //
291        // v1.4.104 codex F3 (P2) fix: 用 pipeline 返的 snapshot 做 visibility
292        // filter, 不再 re-resolve from Bearer/startup (避免 SIGHUP race —
293        // pipeline 授权与 visibility 用同一 KeyRecord 实例).
294        let snap =
295            self.require_acc_read_with_acc_id("futu_push_subscriber_info", &req_ctx, None, None)?;
296        let subs = self
297            .state
298            .push_subscribers_summary(snap.allowed_acc_ids.as_ref())
299            .await;
300        json_tool_output(
301            "futu_push_subscriber_info",
302            &serde_json::json!({
303                "ok": true,
304                "total_count": subs.len(),
305                "subscriptions": subs.iter().map(|(sid, acc_ids, age)| serde_json::json!({
306                    "session_id": sid,
307                    "acc_ids": acc_ids.iter().collect::<Vec<_>>(),
308                    "age_secs": age,
309                })).collect::<Vec<_>>(),
310            }),
311        )
312    }
313}