Skip to main content

futu_mcp/
state.rs

1//! 共享状态:网关连接 + 订阅状态 + 授权
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::{Context, Result, anyhow};
7use futu_auth::{KeyRecord, KeyStore, RuntimeCounters};
8use futu_net::client::{ClientConfig, FutuClient, ReconnectingClient};
9use futu_net::reconnect::ReconnectPolicy;
10use futu_qot::types::{QotMarket, Security};
11use rmcp::{RoleServer, service::Peer};
12use tokio::sync::Mutex;
13
14#[cfg(test)]
15mod tests;
16
17const MCP_CONNECT_TOTAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
18const MCP_CONNECT_RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(200);
19
20/// v1.4.38 Phase 5 helper: bytes → base64 (用于 push body 安全包进 JSON)
21fn base64_encode_bytes(bytes: &[u8]) -> String {
22    use base64::Engine as _;
23    base64::engine::general_purpose::STANDARD.encode(bytes)
24}
25
26/// v1.4.105 T-C2: 从 daemon push 的 raw body 里解 `(acc_id, trd_market)`,
27/// 用于按 caller key `allowed_markets` 白名单过滤 trade push event。
28///
29/// 只处理 trade push(TRD_UPDATE_ORDER / TRD_UPDATE_ORDER_FILL),proto Header
30/// 同时含 `acc_id` (u64) 与 `trd_market` (i32 enum)。返 `Some((acc_id, trd_market))`
31/// 仅当 decode 成功且 s2c.header 存在.
32///
33/// 行情 push (QOT_UPDATE_*) 不含 trd_market 概念 → 返 None → 调用方按
34/// `event_acc_id=None` 路径不做 acc/market gate, 直接 broadcast.
35fn extract_acc_id_and_market_from_push(proto_id: u32, body: &[u8]) -> Option<(u64, i32)> {
36    use prost::Message;
37    match proto_id {
38        TRD_UPDATE_ORDER_PROTO_ID => {
39            let rsp = futu_proto::trd_update_order::Response::decode(body).ok()?;
40            let h = rsp.s2c?.header;
41            Some((h.acc_id, h.trd_market))
42        }
43        TRD_UPDATE_ORDER_FILL_PROTO_ID => {
44            let rsp = futu_proto::trd_update_order_fill::Response::decode(body).ok()?;
45            let h = rsp.s2c?.header;
46            Some((h.acc_id, h.trd_market))
47        }
48        _ => None,
49    }
50}
51
52/// v1.4.106 codex 0932 F6/F7: trade push proto_ids (set membership 测试).
53const TRD_UPDATE_ORDER_PROTO_ID: u32 = 2208;
54const TRD_UPDATE_ORDER_FILL_PROTO_ID: u32 = 2218;
55
56/// v1.4.106 codex 0932 F6 [P2]: 仅按 proto_id 判断是否 trade push 类.
57///
58/// 修 F6 silent-misclassify bug: 旧实装靠 `extract_acc_id_and_market_from_push`
59/// 返 None 推断 "非 trade" — 但 trade push (proto_id 2208/2218) 若 body decode
60/// 失败也返 None → 误归 quote → restricted key (有 allowed_acc_ids 限额) 看到
61/// 该 push 时按 quote 路径直接 broadcast (绕过 trade-market filter 的 ACL).
62///
63/// **正确语义**: event_type 由 proto_id 决定 (set membership), 与 body 完整性
64/// 无关. body 解码状态另用 `decode_status` 字段表达 (F7).
65fn is_trade_push_proto_id(proto_id: u32) -> bool {
66    matches!(
67        proto_id,
68        TRD_UPDATE_ORDER_PROTO_ID | TRD_UPDATE_ORDER_FILL_PROTO_ID
69    )
70}
71
72/// v1.4.106 codex 0932 F6 [P2]: trade push 的解码结果 + 分发决策.
73#[derive(Debug, Clone)]
74enum TradePushDecode {
75    /// proto_id 不在 trade set — 非 trade push (quote / notify / 其他).
76    NotTrade,
77    /// trade push body decode 成功 — 含 (acc_id, trd_market).
78    Decoded { acc_id: u64, trd_market: i32 },
79    /// trade push body decode 失败 — caller 必须按 trade 语义处理 (event_type="trade")
80    /// 但无 acc_id / market gate 信息. restricted key 应 drop, unrestricted
81    /// 应透传带 `decode_status="failed"`.
82    DecodeFailed,
83}
84
85fn classify_trade_push(proto_id: u32, body: &[u8]) -> TradePushDecode {
86    if !is_trade_push_proto_id(proto_id) {
87        return TradePushDecode::NotTrade;
88    }
89    match extract_acc_id_and_market_from_push(proto_id, body) {
90        Some((acc_id, trd_market)) => TradePushDecode::Decoded { acc_id, trd_market },
91        None => TradePushDecode::DecodeFailed,
92    }
93}
94
95/// v1.4.105 T-C2: `Trd_Common.TrdMarket` enum int → 字符串 (与 `keys.json`
96/// 配 `allowed_markets` 中字符串一致). 实际映射由 `futu_trd::market`
97/// 统一维护,避免 MCP push 过滤、CLI 展示和 trade read projection 漂移.
98///
99/// 来源: `Trd_Common.proto::TrdMarket` enum.
100fn trd_market_int_to_str(i: i32) -> &'static str {
101    futu_trd::market::trd_market_label(i).unwrap_or("")
102}
103
104/// v1.4.39 Phase 5 filter 核心决策(pure function,便于单测):
105///
106/// 判断 push 是否应推给某个订阅者。2 层 gate:
107/// 1. **subscriber acc_ids**:`futu_sub_acc_push` 参数里用户指定的 acc_id 列表
108///    (空集 = subscribe-all,即不做 subscriber 级过滤)
109/// 2. **key allowed_acc_ids** (v1.4.39):注册时快照的 per-key 白名单。defense-in-depth
110///    层,防止 agent 订阅了 key 无权限的 acc_id(主 auth 在 guard.rs tool 调用
111///    时 enforce,但 push 是服务端主动发起绕过 tool 调用)
112///
113/// 两层都 pass 才推。行情类 push (`push_acc_id=None`) 跳过所有 acc_id gate。
114/// v1.4.58 MED-NEW-3(2nd code review): summary 过滤决策 pure function。
115///
116/// 用于 `push_subscribers_summary` 的 cross-tenant filter —— scope mode 下 caller
117/// 只能看到**自己 allowed_acc_ids 有交集的订阅**(防止跨租户泄漏其他 agent 的
118/// acc_ids)。
119///
120/// 规则:
121/// - `caller_allowed = None` 或空集(legacy / unrestricted)→ 全可见
122/// - 被查订阅的 `sub_acc_ids` 空集(subscribe-all)→ 全可见(没有 specific 账户可泄漏)
123/// - 否则 → 只要 `sub_acc_ids` 与 `caller_allowed` 有**任一交集** → 可见
124pub(crate) fn subscriber_visible_to_caller(
125    sub_acc_ids: &std::collections::HashSet<u64>,
126    caller_allowed: Option<&std::collections::HashSet<u64>>,
127) -> bool {
128    match caller_allowed {
129        Some(allowed) if !allowed.is_empty() => {
130            sub_acc_ids.is_empty() || sub_acc_ids.iter().any(|a| allowed.contains(a))
131        }
132        _ => true,
133    }
134}
135
136/// v1.4.105 T-C2: 生产路径已 migrate 到 `subscriber_should_receive_with_market`
137/// (含 market gate Layer 3). 本 helper 保留作向后兼容入口 (老 6 个 filter_*
138/// pure-fn 测仍引用), 不入生产路径.
139///
140/// **v1.4.105 F5 fix**: production migrated to `FilterRegistry::should_drop_event`,
141/// 本 fn 跟 `subscriber_should_receive_with_market` 同 `#[cfg(test)]` gate 防
142/// dead_code 警告.
143#[cfg(test)]
144pub(crate) fn subscriber_should_receive(
145    push_acc_id: Option<u64>,
146    sub_acc_ids: &std::collections::HashSet<u64>,
147    key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
148) -> bool {
149    // v1.4.105 T-C2: 旧 API 委托新 API (无 market 信息 → 不做 market gate).
150    // 保留为向后兼容入口 (单测仍以 sub_acc_ids + allowed_acc_ids 双 gate 模式验证).
151    subscriber_should_receive_with_market(
152        push_acc_id,
153        None, // 无 market context → 不做 market gate
154        sub_acc_ids,
155        key_allowed_acc_ids,
156        None, // 无 allowed_markets snapshot → 不做 market gate
157    )
158}
159
160/// v1.4.105 T-C2: subscriber_should_receive 的扩展版 — 加 market gate (Layer 3).
161///
162/// 完整 3-layer trade push filter:
163/// 1. **subscriber acc_ids** (`futu_sub_acc_push` 参数): 空 = subscribe-all
164/// 2. **key allowed_acc_ids** (caller key 注册时快照): 硬白名单
165/// 3. **key allowed_markets** (caller key 注册时快照, v1.4.105 加): 硬白名单
166///
167/// 三 gate 都 pass 才推. `event_trd_market=None` (decode 失败 / 行情 push) 跳过
168/// market gate. `key_allowed_markets=None` / 空 = 不做 market gate (stdio /
169/// legacy / 未配 allowed_markets 的 key).
170///
171/// 行情类 push (`push_acc_id=None`) 仍按 v1.4.39 行为全推 — 行情无 acc / market
172/// 概念, 不参与 trade event filter.
173///
174/// **v1.4.105 F5 fix (codex review C4)**: production 路径已 migrate 到
175/// `FilterRegistry::should_drop_event` (跟 4 surface 一致). 本 fn 现仅作
176/// `#[cfg(test)]` unit test target — 直接验证 logic, 不再 production 调用.
177/// 保留 + cfg-gate 让 12 既有 test 继续 lock 行为契约 (logic 等价两套断言:
178/// 这里 pure-fn assert + cross_surface_invariants FilterRegistry 路径 assert).
179#[cfg(test)]
180pub(crate) fn subscriber_should_receive_with_market(
181    push_acc_id: Option<u64>,
182    event_trd_market: Option<i32>,
183    sub_acc_ids: &std::collections::HashSet<u64>,
184    key_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
185    key_allowed_markets: Option<&std::collections::HashSet<String>>,
186) -> bool {
187    let Some(aid) = push_acc_id else {
188        return true; // 非 trade push,全推(行情类无 key 级 gate 概念)
189    };
190    // Layer 1: 订阅者 acc_ids 过滤
191    let sub_gate = sub_acc_ids.is_empty() || sub_acc_ids.contains(&aid);
192    // Layer 2: caller key allowed_acc_ids 硬白名单
193    let key_acc_gate = match key_allowed_acc_ids {
194        Some(allowed) if !allowed.is_empty() => allowed.contains(&aid),
195        _ => true, // 无 key 级约束(allowed_acc_ids=None 或空)/ stdio 模式
196    };
197    // Layer 3 (v1.4.105 T-C2): caller key allowed_markets 硬白名单
198    //
199    // 行为契约:
200    // - `key_allowed_markets=None` 或空集 → 不做 market gate (向后兼容 + stdio)
201    // - `event_trd_market=None` → 不做 market gate (decode 失败保守 — 不 silent
202    //   drop, 让 caller 看到原始 push, downstream 单测覆盖 decode 失败路径)
203    // - `event_trd_market=Some(int)` → 转字符串与 allowed 集合比对
204    //   (注: int=0/未知 → trd_market_int_to_str 返 "" → 不在任何非空 allowed
205    //    集合内 → drop. 这是 fail-closed 语义: 未知 market 配 restricted key
206    //    应 drop 而非 silently leak)
207    let key_market_gate = match key_allowed_markets {
208        Some(allowed) if !allowed.is_empty() => match event_trd_market {
209            Some(int) => allowed.contains(trd_market_int_to_str(int)),
210            None => true, // decode 失败 → 不 drop (向后兼容 — 单独 metric 提示)
211        },
212        _ => true, // 无 market 限制
213    };
214    sub_gate && key_acc_gate && key_market_gate
215}
216
217/// v1.4.38 Phase 5: 订阅了 push 通知的 MCP 客户端 session 记录。
218///
219/// 每个 session 用 `futu_sub_acc_push` 注册时登记一条。daemon push 事件来到
220/// MCP server 时,按 `acc_ids` 过滤后用 `peer.notify_logging_message()` 推回。
221///
222/// v1.4.38 100%:`acc_ids` 过滤已生效(state.rs drain loop 实装)。
223/// caller key ownership / scope 快照在注册时解析并保存,后续 push 分发不再
224/// 重新读取 bearer 明文。
225#[derive(Clone)]
226pub struct PushSubscriber {
227    /// rmcp 对 MCP session 的抽象。clone 便宜(内部 Arc)。
228    pub peer: Peer<RoleServer>,
229    /// 该 session 关心的 acc_id 列表。空集合表示"不过滤"(接收所有 acc 的 push)。
230    pub acc_ids: std::collections::HashSet<u64>,
231    /// v1.4.39 per-key acc_id 白名单**注册时快照**(非 live-reload)。
232    ///
233    /// Some(set) + non-empty → push 的 acc_id 必须在 set 里才推。
234    /// Some(empty) / None → 不做 key 级过滤(兼容无 allowed_acc_ids 约束的 key
235    /// 或 stdio / legacy 模式)。
236    ///
237    /// 快照语义:注册后 SIGHUP 重载 keys.json 修改 allowed_acc_ids 不会立即
238    /// 反映到已注册订阅者。用户需重新 `futu_sub_acc_push` 才应用新 scope。
239    /// 这是 defense-in-depth 层(主 auth 在 tool 调用时 guard.rs),可接受。
240    pub allowed_acc_ids_snapshot: Option<std::collections::HashSet<u64>>,
241    /// v1.4.105 T-C2: per-key `allowed_markets` 注册时快照, Layer 3 trade push gate.
242    ///
243    /// `Some(set)` + non-empty → push event 的 `trd_market` 必须 ∈ set 才推 (按
244    /// `Trd_Common.TrdMarket` int → 字符串映射, 与 `keys.json::allowed_markets`
245    /// 配置字符串一致, e.g. "HK"/"US"/"FUTURES").
246    /// `None` / `Some(empty)` → 不做 market gate (兼容 stdio / legacy / 未配
247    /// allowed_markets 的 key).
248    ///
249    /// 与 `allowed_acc_ids_snapshot` 同样**注册时快照**, SIGHUP 重载不影响
250    /// 已注册订阅者. 用户需重新 `futu_sub_acc_push` 才应用新 scope.
251    /// 配套 main auth (guard.rs / require_acc_read_with_acc_id) 仍在 tool 调用
252    /// 时 enforce, 这是 defense-in-depth 层 (push 走 server-initiated channel
253    /// 绕过 tool 调用 → 必须独立 enforce).
254    pub allowed_markets_snapshot: Option<std::collections::HashSet<String>>,
255    /// 注册时间(用于 session 硬上限清理,4h 默认 TTL)
256    pub registered_at: std::time::Instant,
257    /// v1.4.103 (codex 50 F6 / 53 F4 — B8): owner key id (KeyRecord.id).
258    /// 注册时填的 caller key id (HTTP Bearer 或 startup key); 用于 unsub
259    /// ownership check — 任何 caller 拿到 session_id 后想 unsub 必须 key id
260    /// 匹配 owner_key_id (admin scope 例外).
261    ///
262    /// None = legacy / stdio 模式无 key (ownership 退化为 "anyone can unsub",
263    /// 与本来 v1.4.102 行为一致).
264    pub owner_key_id: Option<String>,
265}
266
267/// MCP server 运行时状态
268#[derive(Clone)]
269pub struct ServerState {
270    /// [`Inner`] 共享可变状态(gateway 地址 + 懒加载的 [`FutuClient`])
271    pub inner: Arc<Mutex<Inner>>,
272    /// 是否启用交易写工具(place/modify/cancel)。默认 false。旧开关,仅当
273    /// `key_store.is_configured() == false` 时生效。
274    pub enable_trading: bool,
275    /// 是否允许对 real 环境下单。默认 false。旧开关,同上。
276    pub allow_real_trading: bool,
277    /// keys.json 加载的 KeyStore。`is_configured()` 为 true 时走 scope 授权模式。
278    pub key_store: Arc<KeyStore>,
279    /// 调用方传入的 API Key 对应的记录;None 表示未提供 key。
280    pub authed_key: Option<Arc<KeyRecord>>,
281    /// 交易密码所属登录账号。用于 `futu_unlock_trade` 从账号级 keychain
282    /// `trade-password.<login-account>` 读取密码;None 时走 legacy/global/env 兼容路径。
283    pub trade_pwd_account: Option<String>,
284    /// 限额运行时(日累计计数器)
285    pub counters: Arc<RuntimeCounters>,
286    /// v1.4.38 Phase 5: MCP push 订阅者注册表(session_uuid → subscriber)。
287    /// `futu_sub_acc_push` 工具在 HTTP 模式下调用时注册当前 session,daemon
288    /// push 到 MCP 后按 acc_id filter 向注册的 peer 发
289    /// `notify_logging_message`(server-initiated notification)。
290    pub push_subscribers: Arc<Mutex<HashMap<String, PushSubscriber>>>,
291}
292
293/// ServerState 内部可变部分,加锁存放 gateway 地址 + 懒加载的 [`FutuClient`]。
294pub struct Inner {
295    /// 网关 TCP 地址(如 `127.0.0.1:11111`)
296    pub gateway: String,
297    /// 懒加载的底层连接;首次调用 [`ServerState::client`] 时建立,后续复用
298    pub client: Option<Arc<FutuClient>>,
299}
300
301impl ServerState {
302    /// 创建默认 state:`enable_trading=false` / `allow_real_trading=false` /
303    /// 空 [`KeyStore`] / 无 authed_key。使用 `with_*` 链式方法注入额外能力。
304    pub fn new(gateway: String) -> Self {
305        Self {
306            inner: Arc::new(Mutex::new(Inner {
307                gateway,
308                client: None,
309            })),
310            enable_trading: false,
311            allow_real_trading: false,
312            key_store: Arc::new(KeyStore::empty()),
313            authed_key: None,
314            trade_pwd_account: None,
315            counters: Arc::new(RuntimeCounters::new()),
316            push_subscribers: Arc::new(Mutex::new(HashMap::new())),
317        }
318    }
319
320    /// v1.4.38 Phase 5: 注册当前 session 接收指定 acc_id 的 push。返回 session
321    /// UUID(调用方存着,后续可 unregister)。
322    ///
323    /// v1.4.38: 已 wire 到 `futu_sub_acc_push` tool。tool 被调用时拿到
324    /// `RequestContext.peer`,`acc_ids` 从工具 args 解析,注册完成后
325    /// state.rs 的 push drain loop 会按 acc_ids filter 转 notify 给该 peer。
326    /// v1.4.103 (codex 50 F5 / 53 F2 / 58 F3 — B7) + (codex 50 F6 / 53 F4 — B8):
327    /// 注册当前 session 接收指定 acc_id 的 push。
328    ///
329    /// `owner_key_id_override` 由 caller 传入 (例如从 HTTP Bearer 解析得到 key id);
330    /// 若 None → fall back 到 bearer_token 解析 → fall back 到 startup `authed_key.id`.
331    /// `allowed_acc_ids_snapshot` 同样 fall back 链: bearer → startup.
332    pub async fn register_push_subscriber_with_owner(
333        &self,
334        peer: Peer<RoleServer>,
335        acc_ids: std::collections::HashSet<u64>,
336        bearer_token: Option<String>,
337        owner_key_id_override: Option<String>,
338    ) -> String {
339        use std::time::Instant;
340        let session_id = format!("sub-{}", rand::random::<u64>());
341
342        // 解析 caller-specific KeyRecord (HTTP Bearer 优先, fallback startup key).
343        // 用于 (a) allowed_acc_ids_snapshot (B7) (b) owner_key_id (B8).
344        //
345        // v1.4.106 codex 0608 F2 (P1): startup fallback 路径用
346        // `get_by_id_for_current_machine` 替代裸 `get_by_id`, 与 verify (Bearer 路径
347        // 自带 machine 校验) 行为对称, 让 SIGHUP 收紧 allowed_machines 后能立即拒绝.
348        let bearer_key_rec = bearer_token
349            .as_deref()
350            .filter(|pt| !pt.is_empty())
351            .and_then(|pt| self.key_store.verify(pt));
352        let startup_key_rec = self
353            .authed_key
354            .as_ref()
355            .and_then(|k| self.key_store.get_by_id_for_current_machine(&k.id));
356        let effective_key_rec = bearer_key_rec.as_ref().or(startup_key_rec.as_ref());
357
358        // v1.4.103 (B7): allowed_acc_ids_snapshot 优先 HTTP Bearer 解析,
359        // 否则 fall back startup key.allowed_acc_ids; 都无 → None (无限制).
360        let allowed_acc_ids_snapshot =
361            effective_key_rec.and_then(|rec| rec.allowed_acc_ids.clone());
362
363        // v1.4.105 T-C2: allowed_markets_snapshot 同链 — 与 acc_ids_snapshot
364        // 同样从 effective_key_rec (Bearer / startup) 取 allowed_markets.
365        // 用于 push event Layer 3 market gate (state.rs::subscriber_should_receive_with_market).
366        let allowed_markets_snapshot =
367            effective_key_rec.and_then(|rec| rec.allowed_markets.clone());
368
369        // v1.4.103 (B8): owner_key_id 优先 caller 显式传入, 否则 effective_key_rec.id.
370        let owner_key_id =
371            owner_key_id_override.or_else(|| effective_key_rec.map(|rec| rec.id.clone()));
372
373        let subscriber = PushSubscriber {
374            peer,
375            acc_ids,
376            allowed_acc_ids_snapshot,
377            allowed_markets_snapshot,
378            registered_at: Instant::now(),
379            owner_key_id,
380        };
381        self.push_subscribers
382            .lock()
383            .await
384            .insert(session_id.clone(), subscriber);
385        session_id
386    }
387
388    /// v1.4.103 (codex 50 F6 / 53 F4 — B8): unsub session ownership check.
389    ///
390    /// 行为:
391    /// - 无 caller_key_id (legacy / stdio): 退化为旧行为 (任何 caller 可 unsub).
392    /// - 有 caller_key_id + subscriber.owner_key_id 匹配: 删除, 返 Ok(true).
393    /// - 有 caller_key_id + subscriber.owner_key_id 不匹配: **拒绝**, 返
394    ///   Err(reason) — 防其他 caller 拿可见 session_id 强制 unsub.
395    /// - session_id 不存在: 返 Ok(false) (idempotent, 不报错).
396    /// - subscriber.owner_key_id = None (legacy 注册): 退化为旧行为 — 任何 caller
397    ///   可 unsub (向后兼容).
398    pub async fn unregister_push_subscriber_with_owner_check(
399        &self,
400        session_id: &str,
401        caller_key_id: Option<&str>,
402    ) -> Result<bool, String> {
403        let mut subs = self.push_subscribers.lock().await;
404        // 不存在 → idempotent Ok(false), 不报错
405        let Some(sub) = subs.get(session_id) else {
406            return Ok(false);
407        };
408        // ownership check
409        match (caller_key_id, sub.owner_key_id.as_deref()) {
410            (None, _) => {
411                // 无 caller key: legacy / stdio 模式, 退化旧行为
412                subs.remove(session_id);
413                Ok(true)
414            }
415            (Some(caller), None) => {
416                // session 注册时无 owner_key_id (legacy): 任何 caller 可 unsub
417                tracing::warn!(
418                    session_id,
419                    caller,
420                    "v1.4.103 B8: unsub legacy session (no owner_key_id) — \
421                     allowed for backward-compat"
422                );
423                subs.remove(session_id);
424                Ok(true)
425            }
426            (Some(caller), Some(owner)) if caller == owner => {
427                subs.remove(session_id);
428                Ok(true)
429            }
430            (Some(caller), Some(owner)) => {
431                // ownership 不匹配: reject. 当前 MCP subscription ownership contract
432                // 没有 admin override surface;若要扩展,必须先在 spec / auth
433                // pipeline / integration tests 中定义清楚。
434                Err(format!(
435                    "session_id {session_id:?} owned by key_id {owner:?}, \
436                     caller key_id {caller:?} not allowed to unsub"
437                ))
438            }
439        }
440    }
441
442    /// 当前活跃订阅数。生产诊断使用 `push_subscribers_summary`,这里只作为
443    /// state 单测的轻量断言入口保留。
444    #[cfg(test)]
445    pub async fn push_subscriber_count(&self) -> usize {
446        self.push_subscribers.lock().await.len()
447    }
448
449    /// v1.4.58 Phase A: 列出所有 push 订阅 summary(tool diagnostic 用)。
450    ///
451    /// 返 Vec<(session_id, acc_ids, age_secs)>。
452    ///
453    /// **MED-NEW-3 修(2nd review)**:加 `caller_allowed_acc_ids` 参数做
454    /// scope-mode 多租过滤。当 caller 的 key 有 `allowed_acc_ids` 白名单时,
455    /// **只返** subscription 的 `acc_ids` 与 caller 白名单有交集的条目。
456    /// 避免 agent A(acc_ids=[100, 200])通过本 tool 看到 agent B 订阅的
457    /// acc_id=[300, 400]。
458    ///
459    /// `caller_allowed_acc_ids=None` / empty → 不过滤(legacy mode / no-scope key)。
460    ///
461    /// **rmcp 版本兼容**:rmcp 1.4.0 `Peer<RoleServer>` 不实装 `PartialEq`,
462    /// 无法按 peer 身份直接过滤。若未来 rmcp 加 PartialEq,可切到更精确的
463    /// per-session-owner 过滤(当前只能靠 acc_id 权限交集近似)。
464    pub async fn push_subscribers_summary(
465        &self,
466        caller_allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
467    ) -> Vec<(String, std::collections::HashSet<u64>, u64)> {
468        let subs = self.push_subscribers.lock().await;
469        let now = std::time::Instant::now();
470        subs.iter()
471            .filter(|(_, sub)| subscriber_visible_to_caller(&sub.acc_ids, caller_allowed_acc_ids))
472            .map(|(id, sub)| {
473                let age = now
474                    .checked_duration_since(sub.registered_at)
475                    .map(|d| d.as_secs())
476                    .unwrap_or(0);
477                // LOW-3RD-1(3rd code review):scope mode 下返回的 acc_ids 要与
478                // caller allowed 求交集 — 防止 caller=[100] 看到 sub=[100, 999]
479                // 时知道 999 这个 acc 存在。sub.acc_ids 空集(subscribe-all)不做
480                // 交集(概念上 caller 看到的是"有个 catch-all subscriber",不泄漏
481                // 具体账户信息)。
482                let visible_accs = match caller_allowed_acc_ids {
483                    Some(allowed) if !allowed.is_empty() && !sub.acc_ids.is_empty() => {
484                        sub.acc_ids.intersection(allowed).copied().collect()
485                    }
486                    _ => sub.acc_ids.clone(),
487                };
488                (id.clone(), visible_accs, age)
489            })
490            .collect()
491    }
492
493    /// 启用交易写工具(构造器式链式设置)
494    pub fn with_trading(mut self, enable_trading: bool, allow_real_trading: bool) -> Self {
495        self.enable_trading = enable_trading;
496        self.allow_real_trading = allow_real_trading;
497        self
498    }
499
500    /// 设置 KeyStore(新授权模式)
501    pub fn with_key_store(mut self, store: Arc<KeyStore>) -> Self {
502        self.key_store = store;
503        self
504    }
505
506    /// 设置已通过验证的 API Key 记录
507    pub fn with_authed_key(mut self, key: Option<Arc<KeyRecord>>) -> Self {
508        self.authed_key = key;
509        self
510    }
511
512    /// 设置交易密码所属登录账号(MCP 只连 gateway,本身无法可靠推断 daemon
513    /// 的 login account;由 CLI/env/config 显式注入)。
514    pub fn with_trade_pwd_account(mut self, account: Option<String>) -> Self {
515        self.trade_pwd_account = account;
516        self
517    }
518
519    /// 是否启用了 scope 授权模式
520    pub fn is_scope_mode(&self) -> bool {
521        self.key_store.is_configured()
522    }
523
524    /// 获取(或懒加载)网关客户端
525    pub async fn client(&self) -> Result<Arc<FutuClient>> {
526        let mut guard = self.inner.lock().await;
527        if let Some(c) = &guard.client {
528            return Ok(c.clone());
529        }
530
531        let config = ClientConfig {
532            addr: guard.gateway.clone(),
533            client_ver: env!("CARGO_PKG_VERSION").to_string(),
534            client_id: "futu-mcp".to_string(),
535            recv_notify: false,
536            rsa_key: None,
537        };
538        let policy =
539            ReconnectPolicy::new(MCP_CONNECT_RETRY_DELAY, MCP_CONNECT_RETRY_DELAY, Some(1));
540        let mut reconnector = ReconnectingClient::new(config).with_policy(policy);
541        let gateway = guard.gateway.clone();
542        let connect_result =
543            tokio::time::timeout(MCP_CONNECT_TOTAL_TIMEOUT, reconnector.connect()).await;
544        let (client, mut push_rx, _info) = match connect_result {
545            Ok(result) => {
546                result.with_context(|| format!("connect to futu gateway at {gateway}"))?
547            }
548            Err(_) => {
549                return Err(anyhow!(
550                    "connect to futu gateway at {gateway} timed out after {}s",
551                    MCP_CONNECT_TOTAL_TIMEOUT.as_secs()
552                ));
553            }
554        };
555
556        // v1.4.38 Phase 5 (100%): 按 acc_ids 过滤的 push broadcast
557        //
558        // 流程:
559        // 1. push_rx 收 daemon 转发的 push
560        // 2. 对 TRD_UPDATE_ORDER (2208) / TRD_UPDATE_ORDER_FILL (2218) 解包
561        //    提取 acc_id
562        // 3. 遍历订阅者,**只推给 acc_ids 匹配的**(或订阅者 acc_ids 空 = 不
563        //    过滤,所有 acc 都收)
564        // 4. 行情 push(QOT_UPDATE_*)无 acc_id 语义,广播给所有订阅者
565        //
566        // Per-session 独立 spawn notify,避免一个慢 session 阻塞其他
567        let subs_for_push = self.push_subscribers.clone();
568        // v1.4.105 F5 fix (codex review C4 USER_ACK B): MCP push filter 改用
569        // FilterRegistry::should_drop_event 单一注册中心 (跟 4 surface 一致),
570        // 替代之前 inline subscriber_should_receive_with_market. 防 sibling-route
571        // bypass — 加新 push event filter 维度只在 install_defaults 注册一次,
572        // MCP 自动覆盖.
573        let filter_registry =
574            std::sync::Arc::new(futu_auth_pipeline::FilterRegistry::with_defaults());
575        tokio::spawn(async move {
576            while let Some(push) = push_rx.recv().await {
577                let subs = subs_for_push.lock().await;
578                if subs.is_empty() {
579                    continue; // fast path: no listeners, drop
580                }
581                // v1.4.105 T-C2 + v1.4.106 codex 0932 F6/F7: classify push by proto_id
582                // (set membership), 不再靠 body decode 成功推断. trade body decode
583                // 失败现在归 TradePushDecode::DecodeFailed (event_type="trade",
584                // 无 acc/market gate 信息) — restricted key 应 drop, unrestricted
585                // 透传带 decode_status="failed".
586                let decode_result = classify_trade_push(push.proto_id, &push.body);
587                let (push_acc_id, push_trd_market, decode_status, event_type) = match &decode_result
588                {
589                    TradePushDecode::NotTrade => (None, None, "ok", "quote"),
590                    TradePushDecode::Decoded { acc_id, trd_market } => {
591                        (Some(*acc_id), Some(*trd_market), "ok", "trade")
592                    }
593                    TradePushDecode::DecodeFailed => (None, None, "failed", "trade"),
594                };
595                let push_trd_market_str = push_trd_market.map(trd_market_int_to_str);
596                // v1.4.106 codex 0932 F7 [P3]: payload 加 event_type / trd_market /
597                // decode_status — 让客户端不需要按 proto_id 自己 derive (4 surface 一致).
598                // body_base64 后向兼容保留.
599                let payload = serde_json::json!({
600                    "kind": "futu_push",
601                    "proto_id": push.proto_id,
602                    "acc_id": push_acc_id,
603                    "event_type": event_type,
604                    "trd_market": push_trd_market_str,
605                    "decode_status": decode_status,
606                    "body_base64": base64_encode_bytes(&push.body),
607                });
608                for sub in subs.values() {
609                    // v1.4.106 codex 0932 F6 [P2]: trade decode-failed + restricted
610                    // key (allowed_acc_ids 非 None) → DROP. 不能让 restricted key
611                    // 看到无 acc gate 信息的 trade body 透传 (绕过 ACL).
612                    // unrestricted key (allowed_acc_ids None / 空) 仍透传带 decode_status="failed".
613                    if matches!(decode_result, TradePushDecode::DecodeFailed) {
614                        let restricted = sub
615                            .allowed_acc_ids_snapshot
616                            .as_ref()
617                            .map(|s| !s.is_empty())
618                            .unwrap_or(false);
619                        if restricted {
620                            let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
621                            // 复用 cross-surface metric — reason="trade_decode_failed"
622                            futu_auth::metrics::bump_ws_filtered("trade_decode_failed", key_id);
623                            tracing::warn!(
624                                proto_id = push.proto_id,
625                                key_id,
626                                "v1.4.106 codex 0932 F6: trade push body decode failed; \
627                                 dropped for restricted key (allowed_acc_ids set, \
628                                 cannot ACL-gate body without acc_id)"
629                            );
630                            continue;
631                        }
632                        // unrestricted: fall through, broadcast 带 decode_status="failed"
633                    }
634                    // v1.4.105 F5 fix: 改用 FilterRegistry::should_drop_event.
635                    // 行为对齐 4 surface — sub.acc_ids (MCP 显式订阅 list) 喂给
636                    // ctx.sub_state (REST sub-acc-push state 同语义); sub.allowed_*
637                    // _snapshot 喂给 ctx.allowed_* (caller key 限额).
638                    //
639                    // **行为微差** (与老 inline fn 一致, 不破老行为):
640                    // - sub.acc_ids 空 = MCP 老语义"无限制订阅" → 传 None
641                    //   (避免 REST sub_state 空集 tombstone 语义触发 drop-all)
642                    // - sub.acc_ids 非空 → 传 Some(&sub.acc_ids), 跟 REST 一致
643                    let sub_state_for_ctx = if sub.acc_ids.is_empty() {
644                        None
645                    } else {
646                        Some(&sub.acc_ids)
647                    };
648                    let ctx = futu_auth_pipeline::PushEventCtx {
649                        event_type,
650                        event_acc: push_acc_id,
651                        allowed_acc_ids: sub.allowed_acc_ids_snapshot.as_ref(),
652                        sub_state: sub_state_for_ctx,
653                        event_trd_market: push_trd_market_str,
654                        allowed_markets: sub.allowed_markets_snapshot.as_ref(),
655                    };
656                    if filter_registry.should_drop_event(&ctx) {
657                        // v1.4.105 T-C2 + F3 (codex review C4): bump filtered metric.
658                        // **统一 label 命名** 跟 4 surface 一致 — gRPC subscribe_push
659                        // / push_trd_acc / 都用 "trade_market" 标 Layer 3 (allowed_markets)
660                        // 拒. 老 "push.trade" 命名是 surface-specific (T-C2 sole), 改成
661                        // canonical "trade_market" 让跨 surface metrics jq aggregate 一致.
662                        let key_id = sub.owner_key_id.as_deref().unwrap_or("<none>");
663                        futu_auth::metrics::bump_ws_filtered("trade_market", key_id);
664                        continue;
665                    }
666                    let peer = sub.peer.clone();
667                    let data = payload.clone();
668                    tokio::spawn(async move {
669                        let params = rmcp::model::LoggingMessageNotificationParam {
670                            level: rmcp::model::LoggingLevel::Info,
671                            logger: Some("futu_push".to_string()),
672                            data,
673                        };
674                        let _ = peer.notify_logging_message(params).await;
675                    });
676                }
677            }
678        });
679
680        // v1.4.39 Phase 5 stale cleanup: 5 分钟跑一次,移除 registered_at > 4h
681        // 的订阅者。避免长跑 daemon 累积陈旧 subscriber(客户端断开 /  rmcp
682        // session gone 但没显式 unregister 的情况)。
683        let subs_for_purge = self.push_subscribers.clone();
684        tokio::spawn(async move {
685            use std::time::Duration;
686            const PURGE_INTERVAL: Duration = Duration::from_secs(5 * 60);
687            const MAX_AGE: Duration = Duration::from_secs(4 * 3600);
688            let mut ticker = tokio::time::interval(PURGE_INTERVAL);
689            ticker.tick().await; // skip the immediate first tick
690            loop {
691                ticker.tick().await;
692                let now = std::time::Instant::now();
693                let mut subs = subs_for_purge.lock().await;
694                let before = subs.len();
695                subs.retain(|_, sub| {
696                    now.checked_duration_since(sub.registered_at)
697                        .map(|age| age < MAX_AGE)
698                        .unwrap_or(true)
699                });
700                let purged = before - subs.len();
701                if purged > 0 {
702                    tracing::info!(
703                        purged,
704                        remaining = subs.len(),
705                        max_age_secs = MAX_AGE.as_secs(),
706                        "v1.4.39 Phase 5: purged stale push subscribers (> 4h registered)"
707                    );
708                }
709            }
710        });
711
712        let arc = Arc::new(client);
713        guard.client = Some(arc.clone());
714        Ok(arc)
715    }
716}
717
718// ========== symbol 解析 ==========
719
720/// 解析 "MARKET.CODE" 格式的 symbol
721pub fn parse_symbol(s: &str) -> Result<Security> {
722    let (market_str, code) = s.split_once('.').ok_or_else(|| {
723        anyhow!("invalid symbol {s:?}: expected MARKET.CODE (e.g. HK.00700, US.AAPL, SH.600519)")
724    })?;
725    if code.is_empty() {
726        return Err(anyhow!("invalid symbol {s:?}: code part is empty"));
727    }
728    let market = match market_str.to_ascii_uppercase().as_str() {
729        "HK" => QotMarket::HkSecurity,
730        "HK_FUTURE" => QotMarket::HkFuture,
731        "US" => QotMarket::UsSecurity,
732        "SH" => QotMarket::CnshSecurity,
733        "SZ" => QotMarket::CnszSecurity,
734        "SG" => QotMarket::SgSecurity,
735        "JP" => QotMarket::JpSecurity,
736        "AU" => QotMarket::AuSecurity,
737        "MY" => QotMarket::MySecurity,
738        "CA" => QotMarket::CaSecurity,
739        "FX" => QotMarket::FxSecurity,
740        "CRYPTO" | "CC" => QotMarket::Crypto,
741        other => {
742            return Err(anyhow!(
743                "invalid symbol {s:?}: unknown market {other:?} (HK|HK_FUTURE|US|SH|SZ|SG|JP|AU|MY|CA|FX|CRYPTO|CC)"
744            ));
745        }
746    };
747    Ok(Security::new(market, code))
748}
749
750/// 格式化 Security 为 "MARKET.CODE"
751pub fn format_symbol(sec: &Security) -> String {
752    let m = match sec.market {
753        QotMarket::HkSecurity => "HK",
754        QotMarket::HkFuture => "HK_FUTURE",
755        QotMarket::UsSecurity => "US",
756        QotMarket::CnshSecurity => "SH",
757        QotMarket::CnszSecurity => "SZ",
758        QotMarket::SgSecurity => "SG",
759        QotMarket::JpSecurity => "JP",
760        QotMarket::AuSecurity => "AU",
761        QotMarket::MySecurity => "MY",
762        QotMarket::CaSecurity => "CA",
763        QotMarket::FxSecurity => "FX",
764        QotMarket::Crypto => "CRYPTO",
765        QotMarket::Unknown => "UNKNOWN",
766        _ => "UNKNOWN",
767    };
768    format!("{m}.{}", sec.code)
769}
770
771/// v1.4.90 P2-C: audit log Option<T> 序列化助手。
772///
773/// **背景**:之前 audit log 把 `Option<f64>` 用 `?req.price`(tracing 的 Debug
774/// shorthand)记录,渲染成 JSON 字符串 `"Some(400.0)"` / `"None"`,下游 jq /
775/// DuckDB 数值聚合炸(aggregator 期望 `400.0` number 或 `null`)。
776///
777/// **修法**:用 NaN sentinel 把 `Option<f64>` flatten 成 `f64`,tracing-subscriber
778/// 的 JSON formatter 内部走 `serde_json::Value::from(f64::NAN)` →
779/// `Number::from_f64(NaN) = None` → `Value::Null`。
780/// 整数 / 字符串同理(i32 → f64 NaN sentinel;&str → "" 哨兵)。
781///
782/// 验证依据:
783/// - `tracing_subscriber::fmt::format::json` line 501 `record_f64` 直接调
784///   `serde_json::Value::from(value)`
785/// - `serde_json::Value::from(f64)` impl: `Number::from_f64(f).map_or(Value::Null, Value::Number)`
786pub mod audit_fmt {
787    /// `Option<f64>` → `f64`(None → NaN)。tracing JSON 渲染 NaN 为 `null`。
788    #[inline]
789    pub fn opt_f64(v: Option<f64>) -> f64 {
790        v.unwrap_or(f64::NAN)
791    }
792
793    /// `Option<i32>` → `f64`(None → NaN,Some(n) → n as f64)。
794    /// i32 ≤ 2^31 < 2^52 mantissa,无精度损失。
795    #[inline]
796    pub fn opt_i32(v: Option<i32>) -> f64 {
797        v.map(f64::from).unwrap_or(f64::NAN)
798    }
799
800    /// `Option<&str>` → `&str`(None → "")。"" 哨兵在 audit 上下文里足以区分
801    /// 不传 vs 传空(因为 Symbol / owner 等业务字段不会是空字符串)。
802    #[inline]
803    pub fn opt_str(v: Option<&str>) -> &str {
804        v.unwrap_or("")
805    }
806}