futu_mcp/state.rs
1//! 共享状态:网关连接 + 订阅状态 + 授权
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::{Context, Result, anyhow};
7use futu_auth::{KeyRecord, KeyStore, RuntimeCounters};
8use futu_net::client::{ClientConfig, FutuClient, ReconnectingClient};
9use futu_net::reconnect::ReconnectPolicy;
10use futu_qot::types::{QotMarket, Security};
11use rmcp::{RoleServer, service::Peer};
12use tokio::sync::Mutex;
13
14#[cfg(test)]
15mod tests;
16
17const MCP_CONNECT_TOTAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
18const MCP_CONNECT_RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(200);
19
20/// v1.4.38 Phase 5 helper: bytes → base64 (用于 push body 安全包进 JSON)
21fn base64_encode_bytes(bytes: &[u8]) -> String {
22 use base64::Engine as _;
23 base64::engine::general_purpose::STANDARD.encode(bytes)
24}
25
26/// v1.4.105 T-C2: 从 daemon push 的 raw body 里解 `(acc_id, trd_market)`,
27/// 用于按 caller key `allowed_markets` 白名单过滤 trade push event。
28///
29/// 只处理 trade push(TRD_UPDATE_ORDER / TRD_UPDATE_ORDER_FILL),proto Header
30/// 同时含 `acc_id` (u64) 与 `trd_market` (i32 enum)。返 `Some((acc_id, trd_market))`
31/// 仅当 decode 成功且 s2c.header 存在.
32///
33/// 行情 push (QOT_UPDATE_*) 不含 trd_market 概念 → 返 None → 调用方按
34/// `event_acc_id=None` 路径不做 acc/market gate, 直接 broadcast.
35fn extract_acc_id_and_market_from_push(proto_id: u32, body: &[u8]) -> Option<(u64, i32)> {
36 use prost::Message;
37 match proto_id {
38 TRD_UPDATE_ORDER_PROTO_ID => {
39 let rsp = futu_proto::trd_update_order::Response::decode(body).ok()?;
40 let h = rsp.s2c?.header;
41 Some((h.acc_id, h.trd_market))
42 }
43 TRD_UPDATE_ORDER_FILL_PROTO_ID => {
44 let rsp = futu_proto::trd_update_order_fill::Response::decode(body).ok()?;
45 let h = rsp.s2c?.header;
46 Some((h.acc_id, h.trd_market))
47 }
48 _ => None,
49 }
50}
51
52/// v1.4.106 codex 0932 F6/F7: trade push proto_ids (set membership 测试).
53const TRD_UPDATE_ORDER_PROTO_ID: u32 = 2208;
54const TRD_UPDATE_ORDER_FILL_PROTO_ID: u32 = 2218;
55
56/// v1.4.106 codex 0932 F6 [P2]: 仅按 proto_id 判断是否 trade push 类.
57///
58/// 修 F6 silent-misclassify bug: 旧实装靠 `extract_acc_id_and_market_from_push`
59/// 返 None 推断 "非 trade" — 但 trade push (proto_id 2208/2218) 若 body decode
60/// 失败也返 None → 误归 quote → restricted key (有 allowed_acc_ids 限额) 看到
61/// 该 push 时按 quote 路径直接 broadcast (绕过 trade-market filter 的 ACL).
62///
63/// **正确语义**: event_type 由 proto_id 决定 (set membership), 与 body 完整性
64/// 无关. body 解码状态另用 `decode_status` 字段表达 (F7).
65fn is_trade_push_proto_id(proto_id: u32) -> bool {
66 matches!(
67 proto_id,
68 TRD_UPDATE_ORDER_PROTO_ID | TRD_UPDATE_ORDER_FILL_PROTO_ID
69 )
70}
71
72/// v1.4.106 codex 0932 F6 [P2]: trade push 的解码结果 + 分发决策.
73#[derive(Debug, Clone)]
74enum TradePushDecode {
75 /// proto_id 不在 trade set — 非 trade push (quote / notify / 其他).
76 NotTrade,
77 /// trade push body decode 成功 — 含 (acc_id, trd_market).
78 Decoded { acc_id: u64, trd_market: i32 },
79 /// trade push body decode 失败 — caller 必须按 trade 语义处理 (event_type="trade")
80 /// 但无 acc_id / market gate 信息. restricted key 应 drop, unrestricted
81 /// 应透传带 `decode_status="failed"`.
82 DecodeFailed,
83}
84
85fn classify_trade_push(proto_id: u32, body: &[u8]) -> TradePushDecode {
86 if !is_trade_push_proto_id(proto_id) {
87 return TradePushDecode::NotTrade;
88 }
89 match extract_acc_id_and_market_from_push(proto_id, body) {
90 Some((acc_id, trd_market)) => TradePushDecode::Decoded { acc_id, trd_market },
91 None => TradePushDecode::DecodeFailed,
92 }
93}
94
95/// v1.4.105 T-C2: `Trd_Common.TrdMarket` enum int → 字符串 (与 `keys.json`
96/// 配 `allowed_markets` 中字符串一致). 实际映射由 `futu_trd::market`
97/// 统一维护,避免 MCP push 过滤、CLI 展示和 trade read projection 漂移.
98///
99/// 来源: `Trd_Common.proto::TrdMarket` enum.
100fn trd_market_int_to_str(i: i32) -> &'static str {
101 futu_trd::market::trd_market_label(i).unwrap_or("")
102}
103
104/// v1.4.39 Phase 5 filter 核心决策(pure function,便于单测):
105///
106/// 判断 push 是否应推给某个订阅者。2 层 gate:
107/// 1. **subscriber acc_ids**:`futu_sub_acc_push` 参数里用户指定的 acc_id 列表
108/// (空集 = subscribe-all,即不做 subscriber 级过滤)
109/// 2. **key allowed_acc_ids** (v1.4.39):注册时快照的 per-key 白名单。defense-in-depth
110/// 层,防止 agent 订阅了 key 无权限的 acc_id(主 auth 在 guard.rs tool 调用
111/// 时 enforce,但 push 是服务端主动发起绕过 tool 调用)
112///
113/// 两层都 pass 才推。行情类 push (`push_acc_id=None`) 跳过所有 acc_id gate。
114/// v1.4.58 MED-NEW-3(2nd code review): summary 过滤决策 pure function。
115///
116/// 用于 `push_subscribers_summary` 的 cross-tenant filter —— scope mode 下 caller
117/// 只能看到**自己 allowed_acc_ids 有交集的订阅**(防止跨租户泄漏其他 agent 的
118/// acc_ids)。
119///
120/// 规则:
121/// - `caller_allowed = None` 或空集(legacy / unrestricted)→ 全可见
122/// - 被查订阅的 `sub_acc_ids` 空集(subscribe-all)→ 全可见(没有 specific 账户可泄漏)
123/// - 否则 → 只要 `sub_acc_ids` 与 `caller_allowed` 有**任一交集** → 可见
124pub(crate) fn subscriber_visible_to_caller(
125 sub_acc_ids: &std::collections::HashSet<u64>,
126 caller_allowed: Option<&std::collections::HashSet<u64>>,
127) -> bool {
128 match caller_allowed {
129 Some(allowed) if !allowed.is_empty() => {
130 sub_acc_ids.is_empty() || sub_acc_ids.iter().any(|a| allowed.contains(a))
131 }
132 _ => true,
133 }
134}
135
136/// v1.4.105 T-C2: 生产路径已 migrate 到 `subscriber_should_receive_with_market`
137/// (含 market gate Layer 3). 本 helper 保留作向后兼容入口 (老 6 个 filter_*
138/// pure-fn 测仍引用), 不入生产路径.
139///
140/// **v1.4.105 F5 fix**: production migrated to `FilterRegistry::should_drop_event`,
141/// 本 fn 跟 `subscriber_should_receive_with_market` 同 `#[cfg(test)]` gate 防
142/// dead_code 警告.
143#[cfg(test)]
144pub(crate) fn subscriber_should_receive(
145 push_acc_id: Option<u64>,
146 sub_acc_ids: &std::collections::HashSet<u64>,
147 key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
148) -> bool {
149 // v1.4.105 T-C2: 旧 API 委托新 API (无 market 信息 → 不做 market gate).
150 // 保留为向后兼容入口 (单测仍以 sub_acc_ids + allowed_acc_ids 双 gate 模式验证).
151 subscriber_should_receive_with_market(
152 push_acc_id,
153 None, // 无 market context → 不做 market gate
154 sub_acc_ids,
155 key_allowed_acc_ids,
156 None, // 无 allowed_markets snapshot → 不做 market gate
157 )
158}
159
160/// v1.4.105 T-C2: subscriber_should_receive 的扩展版 — 加 market gate (Layer 3).
161///
162/// 完整 3-layer trade push filter:
163/// 1. **subscriber acc_ids** (`futu_sub_acc_push` 参数): 空 = subscribe-all
164/// 2. **key allowed_acc_ids** (caller key 注册时快照): 硬白名单
165/// 3. **key allowed_markets** (caller key 注册时快照, v1.4.105 加): 硬白名单
166///
167/// 三 gate 都 pass 才推. `event_trd_market=None` (decode 失败 / 行情 push) 跳过
168/// market gate. `key_allowed_markets=None` / 空 = 不做 market gate (stdio /
169/// legacy / 未配 allowed_markets 的 key).
170///
171/// 行情类 push (`push_acc_id=None`) 仍按 v1.4.39 行为全推 — 行情无 acc / market
172/// 概念, 不参与 trade event filter.
173///
174/// **v1.4.105 F5 fix (codex review C4)**: production 路径已 migrate 到
175/// `FilterRegistry::should_drop_event` (跟 4 surface 一致). 本 fn 现仅作
176/// `#[cfg(test)]` unit test target — 直接验证 logic, 不再 production 调用.
177/// 保留 + cfg-gate 让 12 既有 test 继续 lock 行为契约 (logic 等价两套断言:
178/// 这里 pure-fn assert + cross_surface_invariants FilterRegistry 路径 assert).
179#[cfg(test)]
180pub(crate) fn subscriber_should_receive_with_market(
181 push_acc_id: Option<u64>,
182 event_trd_market: Option<i32>,
183 sub_acc_ids: &std::collections::HashSet<u64>,
184 key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
185 key_allowed_markets: Option<&std::collections::HashSet<String>>,
186) -> bool {
187 let Some(aid) = push_acc_id else {
188 return true; // 非 trade push,全推(行情类无 key 级 gate 概念)
189 };
190 // Layer 1: 订阅者 acc_ids 过滤
191 let sub_gate = sub_acc_ids.is_empty() || sub_acc_ids.contains(&aid);
192 // Layer 2: caller key allowed_acc_ids 硬白名单
193 let key_acc_gate = match key_allowed_acc_ids {
194 Some(allowed) if !allowed.is_empty() => allowed.contains(&aid),
195 _ => true, // 无 key 级约束(allowed_acc_ids=None 或空)/ stdio 模式
196 };
197 // Layer 3 (v1.4.105 T-C2): caller key allowed_markets 硬白名单
198 //
199 // 行为契约:
200 // - `key_allowed_markets=None` 或空集 → 不做 market gate (向后兼容 + stdio)
201 // - `event_trd_market=None` → 不做 market gate (decode 失败保守 — 不 silent
202 // drop, 让 caller 看到原始 push, downstream 单测覆盖 decode 失败路径)
203 // - `event_trd_market=Some(int)` → 转字符串与 allowed 集合比对
204 // (注: int=0/未知 → trd_market_int_to_str 返 "" → 不在任何非空 allowed
205 // 集合内 → drop. 这是 fail-closed 语义: 未知 market 配 restricted key
206 // 应 drop 而非 silently leak)
207 let key_market_gate = match key_allowed_markets {
208 Some(allowed) if !allowed.is_empty() => match event_trd_market {
209 Some(int) => allowed.contains(trd_market_int_to_str(int)),
210 None => true, // decode 失败 → 不 drop (向后兼容 — 单独 metric 提示)
211 },
212 _ => true, // 无 market 限制
213 };
214 sub_gate && key_acc_gate && key_market_gate
215}
216
217/// v1.4.38 Phase 5: 订阅了 push 通知的 MCP 客户端 session 记录。
218///
219/// 每个 session 用 `futu_sub_acc_push` 注册时登记一条。daemon push 事件来到
220/// MCP server 时,按 `acc_ids` 过滤后用 `peer.notify_logging_message()` 推回。
221///
222/// v1.4.38 100%:`acc_ids` 过滤已生效(state.rs drain loop 实装)。
223/// caller key ownership / scope 快照在注册时解析并保存,后续 push 分发不再
224/// 重新读取 bearer 明文。
225#[derive(Clone)]
226pub struct PushSubscriber {
227 /// rmcp 对 MCP session 的抽象。clone 便宜(内部 Arc)。
228 pub peer: Peer<RoleServer>,
229 /// 该 session 关心的 acc_id 列表。空集合表示"不过滤"(接收所有 acc 的 push)。
230 pub acc_ids: std::collections::HashSet<u64>,
231 /// v1.4.39 per-key acc_id 白名单**注册时快照**(非 live-reload)。
232 ///
233 /// Some(set) + non-empty → push 的 acc_id 必须在 set 里才推。
234 /// Some(empty) / None → 不做 key 级过滤(兼容无 allowed_acc_ids 约束的 key
235 /// 或 stdio / legacy 模式)。
236 ///
237 /// 快照语义:注册后 SIGHUP 重载 keys.json 修改 allowed_acc_ids 不会立即
238 /// 反映到已注册订阅者。用户需重新 `futu_sub_acc_push` 才应用新 scope。
239 /// 这是 defense-in-depth 层(主 auth 在 tool 调用时 guard.rs),可接受。
240 pub allowed_acc_ids_snapshot: Option<std::collections::HashSet<u64>>,
241 /// v1.4.105 T-C2: per-key `allowed_markets` 注册时快照, Layer 3 trade push gate.
242 ///
243 /// `Some(set)` + non-empty → push event 的 `trd_market` 必须 ∈ set 才推 (按
244 /// `Trd_Common.TrdMarket` int → 字符串映射, 与 `keys.json::allowed_markets`
245 /// 配置字符串一致, e.g. "HK"/"US"/"FUTURES").
246 /// `None` / `Some(empty)` → 不做 market gate (兼容 stdio / legacy / 未配
247 /// allowed_markets 的 key).
248 ///
249 /// 与 `allowed_acc_ids_snapshot` 同样**注册时快照**, SIGHUP 重载不影响
250 /// 已注册订阅者. 用户需重新 `futu_sub_acc_push` 才应用新 scope.
251 /// 配套 main auth (guard.rs / require_acc_read_with_acc_id) 仍在 tool 调用
252 /// 时 enforce, 这是 defense-in-depth 层 (push 走 server-initiated channel
253 /// 绕过 tool 调用 → 必须独立 enforce).
254 pub allowed_markets_snapshot: Option<std::collections::HashSet<String>>,
255 /// 注册时间(用于 session 硬上限清理,4h 默认 TTL)
256 pub registered_at: std::time::Instant,
257 /// v1.4.103 (codex 50 F6 / 53 F4 — B8): owner key id (KeyRecord.id).
258 /// 注册时填的 caller key id (HTTP Bearer 或 startup key); 用于 unsub
259 /// ownership check — 任何 caller 拿到 session_id 后想 unsub 必须 key id
260 /// 匹配 owner_key_id (admin scope 例外).
261 ///
262 /// None = legacy / stdio 模式无 key (ownership 退化为 "anyone can unsub",
263 /// 与本来 v1.4.102 行为一致).
264 pub owner_key_id: Option<String>,
265}
266
267/// MCP server 运行时状态
268#[derive(Clone)]
269pub struct ServerState {
270 /// [`Inner`] 共享可变状态(gateway 地址 + 懒加载的 [`FutuClient`])
271 pub inner: Arc<Mutex<Inner>>,
272 /// 是否启用交易写工具(place/modify/cancel)。默认 false。旧开关,仅当
273 /// `key_store.is_configured() == false` 时生效。
274 pub enable_trading: bool,
275 /// 是否允许对 real 环境下单。默认 false。旧开关,同上。
276 pub allow_real_trading: bool,
277 /// keys.json 加载的 KeyStore。`is_configured()` 为 true 时走 scope 授权模式。
278 pub key_store: Arc<KeyStore>,
279 /// 调用方传入的 API Key 对应的记录;None 表示未提供 key。
280 pub authed_key: Option<Arc<KeyRecord>>,
281 /// 交易密码所属登录账号。用于 `futu_unlock_trade` 从账号级 keychain
282 /// `trade-password.<login-account>` 读取密码;None 时走 legacy/global/env 兼容路径。
283 pub trade_pwd_account: Option<String>,
284 /// 限额运行时(日累计计数器)
285 pub counters: Arc<RuntimeCounters>,
286 /// v1.4.38 Phase 5: MCP push 订阅者注册表(session_uuid → subscriber)。
287 /// `futu_sub_acc_push` 工具在 HTTP 模式下调用时注册当前 session,daemon
288 /// push 到 MCP 后按 acc_id filter 向注册的 peer 发
289 /// `notify_logging_message`(server-initiated notification)。
290 pub push_subscribers: Arc<Mutex<HashMap<String, PushSubscriber>>>,
291}
292
293/// ServerState 内部可变部分,加锁存放 gateway 地址 + 懒加载的 [`FutuClient`]。
294pub struct Inner {
295 /// 网关 TCP 地址(如 `127.0.0.1:11111`)
296 pub gateway: String,
297 /// 懒加载的底层连接;首次调用 [`ServerState::client`] 时建立,后续复用
298 pub client: Option<Arc<FutuClient>>,
299}
300
301impl ServerState {
302 /// 创建默认 state:`enable_trading=false` / `allow_real_trading=false` /
303 /// 空 [`KeyStore`] / 无 authed_key。使用 `with_*` 链式方法注入额外能力。
304 pub fn new(gateway: String) -> Self {
305 Self {
306 inner: Arc::new(Mutex::new(Inner {
307 gateway,
308 client: None,
309 })),
310 enable_trading: false,
311 allow_real_trading: false,
312 key_store: Arc::new(KeyStore::empty()),
313 authed_key: None,
314 trade_pwd_account: None,
315 counters: Arc::new(RuntimeCounters::new()),
316 push_subscribers: Arc::new(Mutex::new(HashMap::new())),
317 }
318 }
319
320 /// v1.4.38 Phase 5: 注册当前 session 接收指定 acc_id 的 push。返回 session
321 /// UUID(调用方存着,后续可 unregister)。
322 ///
323 /// v1.4.38: 已 wire 到 `futu_sub_acc_push` tool。tool 被调用时拿到
324 /// `RequestContext.peer`,`acc_ids` 从工具 args 解析,注册完成后
325 /// state.rs 的 push drain loop 会按 acc_ids filter 转 notify 给该 peer。
326 /// v1.4.103 (codex 50 F5 / 53 F2 / 58 F3 — B7) + (codex 50 F6 / 53 F4 — B8):
327 /// 注册当前 session 接收指定 acc_id 的 push。
328 ///
329 /// `owner_key_id_override` 由 caller 传入 (例如从 HTTP Bearer 解析得到 key id);
330 /// 若 None → fall back 到 bearer_token 解析 → fall back 到 startup `authed_key.id`.
331 /// `allowed_acc_ids_snapshot` 同样 fall back 链: bearer → startup.
332 pub async fn register_push_subscriber_with_owner(
333 &self,
334 peer: Peer<RoleServer>,
335 acc_ids: std::collections::HashSet<u64>,
336 bearer_token: Option<String>,
337 owner_key_id_override: Option<String>,
338 ) -> String {
339 use std::time::Instant;
340 let session_id = format!("sub-{}", rand::random::<u64>());
341
342 // 解析 caller-specific KeyRecord (HTTP Bearer 优先, fallback startup key).
343 // 用于 (a) allowed_acc_ids_snapshot (B7) (b) owner_key_id (B8).
344 //
345 // v1.4.106 codex 0608 F2 (P1): startup fallback 路径用
346 // `get_by_id_for_current_machine` 替代裸 `get_by_id`, 与 verify (Bearer 路径
347 // 自带 machine 校验) 行为对称, 让 SIGHUP 收紧 allowed_machines 后能立即拒绝.
348 let bearer_key_rec = bearer_token
349 .as_deref()
350 .filter(|pt| !pt.is_empty())
351 .and_then(|pt| self.key_store.verify(pt));
352 let startup_key_rec = self
353 .authed_key
354 .as_ref()
355 .and_then(|k| self.key_store.get_by_id_for_current_machine(&k.id));
356 let effective_key_rec = bearer_key_rec.as_ref().or(startup_key_rec.as_ref());
357
358 // v1.4.103 (B7): allowed_acc_ids_snapshot 优先 HTTP Bearer 解析,
359 // 否则 fall back startup key.allowed_acc_ids; 都无 → None (无限制).
360 let allowed_acc_ids_snapshot =
361 effective_key_rec.and_then(|rec| rec.allowed_acc_ids.clone());
362
363 // v1.4.105 T-C2: allowed_markets_snapshot 同链 — 与 acc_ids_snapshot
364 // 同样从 effective_key_rec (Bearer / startup) 取 allowed_markets.
365 // 用于 push event Layer 3 market gate (state.rs::subscriber_should_receive_with_market).
366 let allowed_markets_snapshot =
367 effective_key_rec.and_then(|rec| rec.allowed_markets.clone());
368
369 // v1.4.103 (B8): owner_key_id 优先 caller 显式传入, 否则 effective_key_rec.id.
370 let owner_key_id =
371 owner_key_id_override.or_else(|| effective_key_rec.map(|rec| rec.id.clone()));
372
373 let subscriber = PushSubscriber {
374 peer,
375 acc_ids,
376 allowed_acc_ids_snapshot,
377 allowed_markets_snapshot,
378 registered_at: Instant::now(),
379 owner_key_id,
380 };
381 self.push_subscribers
382 .lock()
383 .await
384 .insert(session_id.clone(), subscriber);
385 session_id
386 }
387
388 /// v1.4.103 (codex 50 F6 / 53 F4 — B8): unsub session ownership check.
389 ///
390 /// 行为:
391 /// - 无 caller_key_id (legacy / stdio): 退化为旧行为 (任何 caller 可 unsub).
392 /// - 有 caller_key_id + subscriber.owner_key_id 匹配: 删除, 返 Ok(true).
393 /// - 有 caller_key_id + subscriber.owner_key_id 不匹配: **拒绝**, 返
394 /// Err(reason) — 防其他 caller 拿可见 session_id 强制 unsub.
395 /// - session_id 不存在: 返 Ok(false) (idempotent, 不报错).
396 /// - subscriber.owner_key_id = None (legacy 注册): 退化为旧行为 — 任何 caller
397 /// 可 unsub (向后兼容).
398 pub async fn unregister_push_subscriber_with_owner_check(
399 &self,
400 session_id: &str,
401 caller_key_id: Option<&str>,
402 ) -> Result<bool, String> {
403 let mut subs = self.push_subscribers.lock().await;
404 // 不存在 → idempotent Ok(false), 不报错
405 let Some(sub) = subs.get(session_id) else {
406 return Ok(false);
407 };
408 // ownership check
409 match (caller_key_id, sub.owner_key_id.as_deref()) {
410 (None, _) => {
411 // 无 caller key: legacy / stdio 模式, 退化旧行为
412 subs.remove(session_id);
413 Ok(true)
414 }
415 (Some(caller), None) => {
416 // session 注册时无 owner_key_id (legacy): 任何 caller 可 unsub
417 tracing::warn!(
418 session_id,
419 caller,
420 "v1.4.103 B8: unsub legacy session (no owner_key_id) — \
421 allowed for backward-compat"
422 );
423 subs.remove(session_id);
424 Ok(true)
425 }
426 (Some(caller), Some(owner)) if caller == owner => {
427 subs.remove(session_id);
428 Ok(true)
429 }
430 (Some(caller), Some(owner)) => {
431 // ownership 不匹配: reject. 当前 MCP subscription ownership contract
432 // 没有 admin override surface;若要扩展,必须先在 spec / auth
433 // pipeline / integration tests 中定义清楚。
434 Err(format!(
435 "session_id {session_id:?} owned by key_id {owner:?}, \
436 caller key_id {caller:?} not allowed to unsub"
437 ))
438 }
439 }
440 }
441
442 /// 当前活跃订阅数。生产诊断使用 `push_subscribers_summary`,这里只作为
443 /// state 单测的轻量断言入口保留。
444 #[cfg(test)]
445 pub async fn push_subscriber_count(&self) -> usize {
446 self.push_subscribers.lock().await.len()
447 }
448
449 /// v1.4.58 Phase A: 列出所有 push 订阅 summary(tool diagnostic 用)。
450 ///
451 /// 返 Vec<(session_id, acc_ids, age_secs)>。
452 ///
453 /// **MED-NEW-3 修(2nd review)**:加 `caller_allowed_acc_ids` 参数做
454 /// scope-mode 多租过滤。当 caller 的 key 有 `allowed_acc_ids` 白名单时,
455 /// **只返** subscription 的 `acc_ids` 与 caller 白名单有交集的条目。
456 /// 避免 agent A(acc_ids=[100, 200])通过本 tool 看到 agent B 订阅的
457 /// acc_id=[300, 400]。
458 ///
459 /// `caller_allowed_acc_ids=None` / empty → 不过滤(legacy mode / no-scope key)。
460 ///
461 /// **rmcp 版本兼容**:rmcp 1.4.0 `Peer<RoleServer>` 不实装 `PartialEq`,
462 /// 无法按 peer 身份直接过滤。若未来 rmcp 加 PartialEq,可切到更精确的
463 /// per-session-owner 过滤(当前只能靠 acc_id 权限交集近似)。
464 pub async fn push_subscribers_summary(
465 &self,
466 caller_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
467 ) -> Vec<(String, std::collections::HashSet<u64>, u64)> {
468 let subs = self.push_subscribers.lock().await;
469 let now = std::time::Instant::now();
470 subs.iter()
471 .filter(|(_, sub)| subscriber_visible_to_caller(&sub.acc_ids, caller_allowed_acc_ids))
472 .map(|(id, sub)| {
473 let age = now
474 .checked_duration_since(sub.registered_at)
475 .map(|d| d.as_secs())
476 .unwrap_or(0);
477 // LOW-3RD-1(3rd code review):scope mode 下返回的 acc_ids 要与
478 // caller allowed 求交集 — 防止 caller=[100] 看到 sub=[100, 999]
479 // 时知道 999 这个 acc 存在。sub.acc_ids 空集(subscribe-all)不做
480 // 交集(概念上 caller 看到的是"有个 catch-all subscriber",不泄漏
481 // 具体账户信息)。
482 let visible_accs = match caller_allowed_acc_ids {
483 Some(allowed) if !allowed.is_empty() && !sub.acc_ids.is_empty() => {
484 sub.acc_ids.intersection(allowed).copied().collect()
485 }
486 _ => sub.acc_ids.clone(),
487 };
488 (id.clone(), visible_accs, age)
489 })
490 .collect()
491 }
492
493 /// 启用交易写工具(构造器式链式设置)
494 pub fn with_trading(mut self, enable_trading: bool, allow_real_trading: bool) -> Self {
495 self.enable_trading = enable_trading;
496 self.allow_real_trading = allow_real_trading;
497 self
498 }
499
500 /// 设置 KeyStore(新授权模式)
501 pub fn with_key_store(mut self, store: Arc<KeyStore>) -> Self {
502 self.key_store = store;
503 self
504 }
505
506 /// 设置已通过验证的 API Key 记录
507 pub fn with_authed_key(mut self, key: Option<Arc<KeyRecord>>) -> Self {
508 self.authed_key = key;
509 self
510 }
511
512 /// 设置交易密码所属登录账号(MCP 只连 gateway,本身无法可靠推断 daemon
513 /// 的 login account;由 CLI/env/config 显式注入)。
514 pub fn with_trade_pwd_account(mut self, account: Option<String>) -> Self {
515 self.trade_pwd_account = account;
516 self
517 }
518
519 /// 是否启用了 scope 授权模式
520 pub fn is_scope_mode(&self) -> bool {
521 self.key_store.is_configured()
522 }
523
524 /// 获取(或懒加载)网关客户端
525 pub async fn client(&self) -> Result<Arc<FutuClient>> {
526 let mut guard = self.inner.lock().await;
527 if let Some(c) = &guard.client {
528 return Ok(c.clone());
529 }
530
531 let config = ClientConfig {
532 addr: guard.gateway.clone(),
533 client_ver: env!("CARGO_PKG_VERSION").to_string(),
534 client_id: "futu-mcp".to_string(),
535 recv_notify: false,
536 rsa_key: None,
537 };
538 let policy =
539 ReconnectPolicy::new(MCP_CONNECT_RETRY_DELAY, MCP_CONNECT_RETRY_DELAY, Some(1));
540 let mut reconnector = ReconnectingClient::new(config).with_policy(policy);
541 let gateway = guard.gateway.clone();
542 let connect_result =
543 tokio::time::timeout(MCP_CONNECT_TOTAL_TIMEOUT, reconnector.connect()).await;
544 let (client, mut push_rx, _info) = match connect_result {
545 Ok(result) => {
546 result.with_context(|| format!("connect to futu gateway at {gateway}"))?
547 }
548 Err(_) => {
549 return Err(anyhow!(
550 "connect to futu gateway at {gateway} timed out after {}s",
551 MCP_CONNECT_TOTAL_TIMEOUT.as_secs()
552 ));
553 }
554 };
555
556 // v1.4.38 Phase 5 (100%): 按 acc_ids 过滤的 push broadcast
557 //
558 // 流程:
559 // 1. push_rx 收 daemon 转发的 push
560 // 2. 对 TRD_UPDATE_ORDER (2208) / TRD_UPDATE_ORDER_FILL (2218) 解包
561 // 提取 acc_id
562 // 3. 遍历订阅者,**只推给 acc_ids 匹配的**(或订阅者 acc_ids 空 = 不
563 // 过滤,所有 acc 都收)
564 // 4. 行情 push(QOT_UPDATE_*)无 acc_id 语义,广播给所有订阅者
565 //
566 // Per-session 独立 spawn notify,避免一个慢 session 阻塞其他
567 let subs_for_push = self.push_subscribers.clone();
568 // v1.4.105 F5 fix (codex review C4 USER_ACK B): MCP push filter 改用
569 // FilterRegistry::should_drop_event 单一注册中心 (跟 4 surface 一致),
570 // 替代之前 inline subscriber_should_receive_with_market. 防 sibling-route
571 // bypass — 加新 push event filter 维度只在 install_defaults 注册一次,
572 // MCP 自动覆盖.
573 let filter_registry =
574 std::sync::Arc::new(futu_auth_pipeline::FilterRegistry::with_defaults());
575 tokio::spawn(async move {
576 while let Some(push) = push_rx.recv().await {
577 let subs = subs_for_push.lock().await;
578 if subs.is_empty() {
579 continue; // fast path: no listeners, drop
580 }
581 // v1.4.105 T-C2 + v1.4.106 codex 0932 F6/F7: classify push by proto_id
582 // (set membership), 不再靠 body decode 成功推断. trade body decode
583 // 失败现在归 TradePushDecode::DecodeFailed (event_type="trade",
584 // 无 acc/market gate 信息) — restricted key 应 drop, unrestricted
585 // 透传带 decode_status="failed".
586 let decode_result = classify_trade_push(push.proto_id, &push.body);
587 let (push_acc_id, push_trd_market, decode_status, event_type) = match &decode_result
588 {
589 TradePushDecode::NotTrade => (None, None, "ok", "quote"),
590 TradePushDecode::Decoded { acc_id, trd_market } => {
591 (Some(*acc_id), Some(*trd_market), "ok", "trade")
592 }
593 TradePushDecode::DecodeFailed => (None, None, "failed", "trade"),
594 };
595 let push_trd_market_str = push_trd_market.map(trd_market_int_to_str);
596 // v1.4.106 codex 0932 F7 [P3]: payload 加 event_type / trd_market /
597 // decode_status — 让客户端不需要按 proto_id 自己 derive (4 surface 一致).
598 // body_base64 后向兼容保留.
599 let payload = serde_json::json!({
600 "kind": "futu_push",
601 "proto_id": push.proto_id,
602 "acc_id": push_acc_id,
603 "event_type": event_type,
604 "trd_market": push_trd_market_str,
605 "decode_status": decode_status,
606 "body_base64": base64_encode_bytes(&push.body),
607 });
608 for sub in subs.values() {
609 // v1.4.106 codex 0932 F6 [P2]: trade decode-failed + restricted
610 // key (allowed_acc_ids 非 None) → DROP. 不能让 restricted key
611 // 看到无 acc gate 信息的 trade body 透传 (绕过 ACL).
612 // unrestricted key (allowed_acc_ids None / 空) 仍透传带 decode_status="failed".
613 if matches!(decode_result, TradePushDecode::DecodeFailed) {
614 let restricted = sub
615 .allowed_acc_ids_snapshot
616 .as_ref()
617 .map(|s| !s.is_empty())
618 .unwrap_or(false);
619 if restricted {
620 let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
621 // 复用 cross-surface metric — reason="trade_decode_failed"
622 futu_auth::metrics::bump_ws_filtered("trade_decode_failed", key_id);
623 tracing::warn!(
624 proto_id = push.proto_id,
625 key_id,
626 "v1.4.106 codex 0932 F6: trade push body decode failed; \
627 dropped for restricted key (allowed_acc_ids set, \
628 cannot ACL-gate body without acc_id)"
629 );
630 continue;
631 }
632 // unrestricted: fall through, broadcast 带 decode_status="failed"
633 }
634 // v1.4.105 F5 fix: 改用 FilterRegistry::should_drop_event.
635 // 行为对齐 4 surface — sub.acc_ids (MCP 显式订阅 list) 喂给
636 // ctx.sub_state (REST sub-acc-push state 同语义); sub.allowed_*
637 // _snapshot 喂给 ctx.allowed_* (caller key 限额).
638 //
639 // **行为微差** (与老 inline fn 一致, 不破老行为):
640 // - sub.acc_ids 空 = MCP 老语义"无限制订阅" → 传 None
641 // (避免 REST sub_state 空集 tombstone 语义触发 drop-all)
642 // - sub.acc_ids 非空 → 传 Some(&sub.acc_ids), 跟 REST 一致
643 let sub_state_for_ctx = if sub.acc_ids.is_empty() {
644 None
645 } else {
646 Some(&sub.acc_ids)
647 };
648 let ctx = futu_auth_pipeline::PushEventCtx {
649 event_type,
650 event_acc: push_acc_id,
651 allowed_acc_ids: sub.allowed_acc_ids_snapshot.as_ref(),
652 sub_state: sub_state_for_ctx,
653 event_trd_market: push_trd_market_str,
654 allowed_markets: sub.allowed_markets_snapshot.as_ref(),
655 };
656 if filter_registry.should_drop_event(&ctx) {
657 // v1.4.105 T-C2 + F3 (codex review C4): bump filtered metric.
658 // **统一 label 命名** 跟 4 surface 一致 — gRPC subscribe_push
659 // / push_trd_acc / 都用 "trade_market" 标 Layer 3 (allowed_markets)
660 // 拒. 老 "push.trade" 命名是 surface-specific (T-C2 sole), 改成
661 // canonical "trade_market" 让跨 surface metrics jq aggregate 一致.
662 let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
663 futu_auth::metrics::bump_ws_filtered("trade_market", key_id);
664 continue;
665 }
666 let peer = sub.peer.clone();
667 let data = payload.clone();
668 tokio::spawn(async move {
669 let params = rmcp::model::LoggingMessageNotificationParam {
670 level: rmcp::model::LoggingLevel::Info,
671 logger: Some("futu_push".to_string()),
672 data,
673 };
674 let _ = peer.notify_logging_message(params).await;
675 });
676 }
677 }
678 });
679
680 // v1.4.39 Phase 5 stale cleanup: 5 分钟跑一次,移除 registered_at > 4h
681 // 的订阅者。避免长跑 daemon 累积陈旧 subscriber(客户端断开 / rmcp
682 // session gone 但没显式 unregister 的情况)。
683 let subs_for_purge = self.push_subscribers.clone();
684 tokio::spawn(async move {
685 use std::time::Duration;
686 const PURGE_INTERVAL: Duration = Duration::from_secs(5 * 60);
687 const MAX_AGE: Duration = Duration::from_secs(4 * 3600);
688 let mut ticker = tokio::time::interval(PURGE_INTERVAL);
689 ticker.tick().await; // skip the immediate first tick
690 loop {
691 ticker.tick().await;
692 let now = std::time::Instant::now();
693 let mut subs = subs_for_purge.lock().await;
694 let before = subs.len();
695 subs.retain(|_, sub| {
696 now.checked_duration_since(sub.registered_at)
697 .map(|age| age < MAX_AGE)
698 .unwrap_or(true)
699 });
700 let purged = before - subs.len();
701 if purged > 0 {
702 tracing::info!(
703 purged,
704 remaining = subs.len(),
705 max_age_secs = MAX_AGE.as_secs(),
706 "v1.4.39 Phase 5: purged stale push subscribers (> 4h registered)"
707 );
708 }
709 }
710 });
711
712 let arc = Arc::new(client);
713 guard.client = Some(arc.clone());
714 Ok(arc)
715 }
716}
717
718// ========== symbol 解析 ==========
719
720/// 解析 "MARKET.CODE" 格式的 symbol
721pub fn parse_symbol(s: &str) -> Result<Security> {
722 let (market_str, code) = s.split_once('.').ok_or_else(|| {
723 anyhow!("invalid symbol {s:?}: expected MARKET.CODE (e.g. HK.00700, US.AAPL, SH.600519)")
724 })?;
725 if code.is_empty() {
726 return Err(anyhow!("invalid symbol {s:?}: code part is empty"));
727 }
728 let market = match market_str.to_ascii_uppercase().as_str() {
729 "HK" => QotMarket::HkSecurity,
730 "HK_FUTURE" => QotMarket::HkFuture,
731 "US" => QotMarket::UsSecurity,
732 "SH" => QotMarket::CnshSecurity,
733 "SZ" => QotMarket::CnszSecurity,
734 "SG" => QotMarket::SgSecurity,
735 "JP" => QotMarket::JpSecurity,
736 "AU" => QotMarket::AuSecurity,
737 "MY" => QotMarket::MySecurity,
738 "CA" => QotMarket::CaSecurity,
739 "FX" => QotMarket::FxSecurity,
740 "CRYPTO" | "CC" => QotMarket::Crypto,
741 other => {
742 return Err(anyhow!(
743 "invalid symbol {s:?}: unknown market {other:?} (HK|HK_FUTURE|US|SH|SZ|SG|JP|AU|MY|CA|FX|CRYPTO|CC)"
744 ));
745 }
746 };
747 Ok(Security::new(market, code))
748}
749
750/// 格式化 Security 为 "MARKET.CODE"
751pub fn format_symbol(sec: &Security) -> String {
752 let m = match sec.market {
753 QotMarket::HkSecurity => "HK",
754 QotMarket::HkFuture => "HK_FUTURE",
755 QotMarket::UsSecurity => "US",
756 QotMarket::CnshSecurity => "SH",
757 QotMarket::CnszSecurity => "SZ",
758 QotMarket::SgSecurity => "SG",
759 QotMarket::JpSecurity => "JP",
760 QotMarket::AuSecurity => "AU",
761 QotMarket::MySecurity => "MY",
762 QotMarket::CaSecurity => "CA",
763 QotMarket::FxSecurity => "FX",
764 QotMarket::Crypto => "CRYPTO",
765 QotMarket::Unknown => "UNKNOWN",
766 _ => "UNKNOWN",
767 };
768 format!("{m}.{}", sec.code)
769}
770
771/// v1.4.90 P2-C: audit log Option<T> 序列化助手。
772///
773/// **背景**:之前 audit log 把 `Option<f64>` 用 `?req.price`(tracing 的 Debug
774/// shorthand)记录,渲染成 JSON 字符串 `"Some(400.0)"` / `"None"`,下游 jq /
775/// DuckDB 数值聚合炸(aggregator 期望 `400.0` number 或 `null`)。
776///
777/// **修法**:用 NaN sentinel 把 `Option<f64>` flatten 成 `f64`,tracing-subscriber
778/// 的 JSON formatter 内部走 `serde_json::Value::from(f64::NAN)` →
779/// `Number::from_f64(NaN) = None` → `Value::Null`。
780/// 整数 / 字符串同理(i32 → f64 NaN sentinel;&str → "" 哨兵)。
781///
782/// 验证依据:
783/// - `tracing_subscriber::fmt::format::json` line 501 `record_f64` 直接调
784/// `serde_json::Value::from(value)`
785/// - `serde_json::Value::from(f64)` impl: `Number::from_f64(f).map_or(Value::Null, Value::Number)`
786pub mod audit_fmt {
787 /// `Option<f64>` → `f64`(None → NaN)。tracing JSON 渲染 NaN 为 `null`。
788 #[inline]
789 pub fn opt_f64(v: Option<f64>) -> f64 {
790 v.unwrap_or(f64::NAN)
791 }
792
793 /// `Option<i32>` → `f64`(None → NaN,Some(n) → n as f64)。
794 /// i32 ≤ 2^31 < 2^52 mantissa,无精度损失。
795 #[inline]
796 pub fn opt_i32(v: Option<i32>) -> f64 {
797 v.map(f64::from).unwrap_or(f64::NAN)
798 }
799
800 /// `Option<&str>` → `&str`(None → "")。"" 哨兵在 audit 上下文里足以区分
801 /// 不传 vs 传空(因为 Symbol / owner 等业务字段不会是空字符串)。
802 #[inline]
803 pub fn opt_str(v: Option<&str>) -> &str {
804 v.unwrap_or("")
805 }
806}