1use crate::handlers;
4use crate::tool_args::{
5 NoArgs, QuerySubscriptionReq, SubAccPushReq, SubscribeReq, UnsubAccPushReq, UnsubscribeReq,
6};
7use crate::tool_auth::CallerSnapshot;
8use rmcp::{
9 RoleServer, handler::server::wrapper::Parameters, service::RequestContext, tool, tool_router,
10};
11
12use super::{FutuServer, system::json_tool_output};
13
14#[tool_router(router = subscription_tool_router, vis = "pub(crate)")]
15impl FutuServer {
16 #[tool(
17 description = "Query current subscription state (subscribed types, quota used/remaining). Python SDK: OpenQuoteContext.query_subscription."
18 )]
19 async fn futu_query_subscription(
20 &self,
21 Parameters(req): Parameters<QuerySubscriptionReq>,
22 req_ctx: RequestContext<RoleServer>,
23 ) -> std::result::Result<String, String> {
24 tracing::info!(tool = "futu_query_subscription");
25 let client = self
26 .read_client_or_err("futu_query_subscription", &req_ctx, None, None)
27 .await?;
28 Self::wrap_result(handlers::core::query_subscription(&client, req.is_req_all_conn).await)
29 }
30
31 #[tool(
32 description = "Get current daemon used quota counters: subscribed quote quota and historical K-line quota. Python SDK: OpenQuoteContext.get_used_quota."
33 )]
34 async fn futu_get_used_quota(
35 &self,
36 Parameters(_req): Parameters<NoArgs>,
37 req_ctx: RequestContext<RoleServer>,
38 ) -> std::result::Result<String, String> {
39 tracing::info!(tool = "futu_get_used_quota");
40 let client = self
41 .read_client_or_err("futu_get_used_quota", &req_ctx, None, None)
42 .await?;
43 Self::wrap_result(handlers::core::get_used_quota(&client).await)
44 }
45
46 #[tool(
58 description = "Subscribe market data for given symbols + sub_types. Push data arrives via SSE notifications (HTTP mode). Python SDK: OpenQuoteContext.subscribe."
59 )]
60 async fn futu_subscribe(
61 &self,
62 Parameters(req): Parameters<SubscribeReq>,
63 req_ctx: RequestContext<RoleServer>,
64 ) -> std::result::Result<String, String> {
65 tracing::info!(
66 tool = "futu_subscribe",
67 symbols = ?req.symbols,
68 sub_types = ?req.sub_types,
69 is_first_push = req.is_first_push,
70 is_reg_push = req.is_reg_push,
71 );
72 let client = self
73 .read_client_or_err("futu_subscribe", &req_ctx, None, None)
74 .await?;
75 Self::wrap_result(
76 handlers::core::subscribe(
77 &client,
78 &req.symbols,
79 &req.sub_types,
80 req.is_first_push,
81 req.is_reg_push,
82 )
83 .await,
84 )
85 }
86
87 #[tool(
88 description = "Unsubscribe market data (by symbol+type, or unsub_all to clear this connection). Python SDK: OpenQuoteContext.unsubscribe / unsubscribe_all."
89 )]
90 async fn futu_unsubscribe(
91 &self,
92 Parameters(req): Parameters<UnsubscribeReq>,
93 req_ctx: RequestContext<RoleServer>,
94 ) -> std::result::Result<String, String> {
95 tracing::info!(
96 tool = "futu_unsubscribe",
97 count = req.symbols.len(),
98 unsub_all = req.unsub_all
99 );
100 let client = self
101 .read_client_or_err("futu_unsubscribe", &req_ctx, None, None)
102 .await?;
103 Self::wrap_result(
104 handlers::core::unsubscribe(&client, &req.symbols, &req.sub_types, req.unsub_all).await,
105 )
106 }
107
108 #[tool(
109 description = "Subscribe account order / deal push for given trading accounts. HTTP-mode MCP clients receive pushes as LoggingMessage notifications with {kind, proto_id, body_base64}. Payload body is raw Futu protobuf; decode client-side. Python SDK: OpenTradeContext.sub_acc_push."
110 )]
111 async fn futu_sub_acc_push(
112 &self,
113 Parameters(req): Parameters<SubAccPushReq>,
114 req_ctx: RequestContext<RoleServer>,
115 ) -> std::result::Result<String, String> {
116 if req.acc_ids.is_empty() {
121 return Err(
122 "futu_sub_acc_push: acc_ids 必填非空. 之前空 list silent 全订阅 \
123 (v1.4.102 codex 47 F2 P1 fix). 调 futu_list_accounts 看可用 \
124 acc_id, 显式列出要订阅的账户."
125 .to_string(),
126 );
127 }
128
129 let mut snap_opt: Option<CallerSnapshot> = None;
140 for (idx, acc_id) in req.acc_ids.iter().enumerate() {
141 let snap = self.require_acc_read_with_acc_id(
142 "futu_sub_acc_push",
143 &req_ctx,
144 req.api_key.as_deref(),
145 Some(*acc_id),
146 )?;
147 if idx == 0 {
148 snap_opt = Some(snap);
149 }
150 }
151 let Some(snap) = snap_opt else {
152 return Err(serde_json::json!({
153 "error": "futu_sub_acc_push: caller snapshot resolution failed after acc_ids validation",
154 "status": "error",
155 "hint": "retry after refreshing API key state; if it persists, check auth pipeline logs",
156 })
157 .to_string());
158 };
159
160 tracing::info!(tool = "futu_sub_acc_push", count = req.acc_ids.len());
161
162 let client = self.client_or_err().await?;
163
164 let daemon_resp = handlers::trade::sub_acc_push(&client, &req.acc_ids).await;
170 match daemon_resp {
171 Ok(_) => {
172 let owner_key_id = snap.key_id.clone();
177 let bearer_token = snap.bearer_token.clone();
178 let acc_ids_set: std::collections::HashSet<u64> =
179 req.acc_ids.iter().copied().collect();
180 let session_id = self
181 .state
182 .register_push_subscriber_with_owner(
183 req_ctx.peer.clone(),
184 acc_ids_set,
185 bearer_token,
186 owner_key_id,
187 )
188 .await;
189 tracing::info!(
190 tool = "futu_sub_acc_push",
191 session_id = %session_id,
192 count = req.acc_ids.len(),
193 "v1.4.102 codex 47 F3: push subscriber registered after daemon success"
194 );
195 json_tool_output(
196 "futu_sub_acc_push",
197 &serde_json::json!({
198 "ok": true,
199 "subscribed_acc_ids": req.acc_ids,
200 "session_id": session_id,
201 "unsub_hint": format!(
202 "call `futu_unsub_acc_push` with session_id=\"{session_id}\" to stop receiving pushes; otherwise auto-purged after 4h"
203 ),
204 }),
205 )
206 }
207 Err(e) => Self::tool_err(format!("futu_sub_acc_push: {e}")),
208 }
209 }
210
211 #[tool(
212 description = "Unsubscribe from account push notifications. Pass session_id from previous futu_sub_acc_push response (session_id field or unsub_hint). Returns {removed_count}. If session_id is not found, removed_count=0 (likely auto-purged or never registered)."
213 )]
214 async fn futu_unsub_acc_push(
215 &self,
216 Parameters(req): Parameters<UnsubAccPushReq>,
217 req_ctx: RequestContext<RoleServer>,
218 ) -> std::result::Result<String, String> {
219 let snap = self.require_acc_read_with_acc_id(
223 "futu_unsub_acc_push",
224 &req_ctx,
225 req.api_key.as_deref(),
226 None,
227 )?;
228 let Some(ref session_id) = req.session_id else {
229 return Self::tool_err(
230 "session_id required. Get it from a previous futu_sub_acc_push response.",
231 );
232 };
233 let caller_key_id = snap.key_id;
242
243 let removed = self
244 .state
245 .unregister_push_subscriber_with_owner_check(
246 session_id.as_str(),
247 caller_key_id.as_deref(),
248 )
249 .await;
250 tracing::info!(
251 tool = "futu_unsub_acc_push",
252 removed = removed.is_ok(),
253 session_id = %session_id,
254 caller_key_id = ?caller_key_id,
255 "v1.4.103 B8: push subscriber unregister with ownership check"
256 );
257 match removed {
258 Ok(true) => json_tool_output("futu_unsub_acc_push", &serde_json::json!({
259 "ok": true,
260 "removed_count": 1,
261 "session_id": session_id,
262 })),
263 Ok(false) => json_tool_output("futu_unsub_acc_push", &serde_json::json!({
264 "ok": true,
265 "removed_count": 0,
266 "session_id": session_id,
267 "hint": "session_id not found (likely 4h auto-purged or never registered)",
268 })),
269 Err(reason) => Err(serde_json::json!({
270 "error": format!("ownership check failed: {reason}"),
271 "status": "error",
272 "hint": "only the original caller (matched by API key id) or an admin-scope key can unsub a session.",
273 })
274 .to_string()),
275 }
276 }
277
278 #[tool(
279 description = "Diagnostic — list active push subscriptions on this MCP server. Returns {total_count, subscriptions: [{session_id, acc_ids, age_secs}]}. Useful to verify whether futu_sub_acc_push registered, check auto-purge timing, or debug missing pushes."
280 )]
281 async fn futu_push_subscriber_info(
282 &self,
283 Parameters(_req): Parameters<NoArgs>,
284 req_ctx: RequestContext<RoleServer>,
285 ) -> std::result::Result<String, String> {
286 let snap =
295 self.require_acc_read_with_acc_id("futu_push_subscriber_info", &req_ctx, None, None)?;
296 let subs = self
297 .state
298 .push_subscribers_summary(snap.allowed_acc_ids.as_ref())
299 .await;
300 json_tool_output(
301 "futu_push_subscriber_info",
302 &serde_json::json!({
303 "ok": true,
304 "total_count": subs.len(),
305 "subscriptions": subs.iter().map(|(sid, acc_ids, age)| serde_json::json!({
306 "session_id": sid,
307 "acc_ids": acc_ids.iter().collect::<Vec<_>>(),
308 "age_secs": age,
309 })).collect::<Vec<_>>(),
310 }),
311 )
312 }
313}