Skip to main content

futu_rest/routes/trd/
sub_acc_push.rs

1//! REST trade account push subscription route and helpers.
2
3use std::sync::Arc;
4
5use axum::extract::{Extension, Json, State};
6use axum::http::StatusCode;
7use serde_json::Value;
8
9use futu_auth::KeyRecord;
10
11use crate::adapter::RestState;
12
13use super::ApiResult;
14
15/// v1.4.90 P1-C 修:POST /api/sub-acc-push — 订阅交易账户 push(订单 / 成交变动)
16///
17/// **历史背景**(外部 tester v1.4.69-89 报告 6 版 silent stub):
18/// - 老版 handler 是单 `proto_request` 透传 wrapper,无 input validation、
19///   无 audit log、无 session_id token —— 用户 POST 后 daemon log 看不到任何
20///   "本 endpoint 被调用过" 的痕迹(仅 gateway 侧 SubAccPushHandler 默默更
21///   新 `SubscriptionManager.subscribe_trd_acc(conn_id, acc_id)`),
22///   "ret=0 + s2c={}" 看似成功但下游全无可观测信号。
23///
24/// **本次 (v1.4.90 P1-C) 对齐 MCP `futu_sub_acc_push`**:
25/// 1. **Loud input validation**(acc_id_list 必须非空 + 每个 id 非零;C++
26///    `IAPIServer_Trd_SubAccPush::OnClientReq_SubAccPush` 等价 check)
27/// 2. **Audit log** `tracing::info!` 留 endpoint + count + 头几个 acc_id 痕迹,
28///    给 push_health 看不到的部分提供可观测性
29/// 3. **Backend CMD forward 经 `proto_request`**(即 dispatch 到 gateway
30///    `SubAccPushHandler` 完成 `SubscriptionManager` 注册 + return ret=0)
31/// 4. **Response augment**:在标准 proto Response (`ret_type/s2c{}`) 上补
32///    `subscribed_acc_ids` + `session_id` + `unsub_hint`,对齐 MCP
33///    `{ok, subscribed_acc_ids, session_id, unsub_hint}` 输出形状。
34///    REST stateless,session_id 用 sorted acc_ids hash 派生(确定性 token,
35///    可拿来 POST `/api/unsub-acc-push`)
36///
37/// **架构说明**:本 daemon 内 gateway local `SubAccPushHandler` 即对齐
38/// C++ APIServer 的 `Trd_SubAccPush` 处理点 —— 没有"再下一层 backend"
39/// 的 CMD 需要 forward。MCP 通过 TCP `client.request(TRD_SUB_ACC_PUSH)`
40/// 也是落到同一个 gateway handler。所以 "CMD forward" 的语义是 REST →
41/// `proto_request` → `RequestRouter::dispatch` → `SubAccPushHandler`。
42pub async fn sub_acc_push(
43    State(state): State<RestState>,
44    rec: Option<Extension<Arc<KeyRecord>>>,
45    Json(mut body): Json<Value>,
46) -> ApiResult {
47    // v1.4.103 (codex 56 F1 / 58 F5 — B9): legacy 模式 (无 keys.json / 无 Bearer)
48    // 直接 reject. 之前 legacy 模式返 ret_type=0 + session_id 给客户端 "看似成功",
49    // 但**不写** rest_acc_subscriptions 状态 (legacy WS 连接也无 key_id, sub
50    // state filter 跳过) — 用户实际看到 revoke 成功仍按 legacy 全量 trade push
51    // 行为继续收, silent inconsistency. 改为 loud reject 让用户知道 sub state
52    // 需要 keys.json + Bearer 才工作.
53    if rec.is_none() {
54        futu_auth::audit::reject(
55            "rest",
56            "/api/sub-acc-push",
57            "<legacy>",
58            "sub-acc-push not supported in legacy mode (no keys.json)",
59        );
60        return Err((
61            StatusCode::FORBIDDEN,
62            Json(serde_json::json!({
63                "error": "/api/sub-acc-push: legacy mode (no keys.json) does not support per-key sub state. \
64                          Configure keys.json and pass Bearer token to enable.",
65                "ret_type": -1,
66                "hint": "v1.4.103 BUG-009: legacy mode previously returned silent success without persisting sub state. Now loud-reject to surface the limitation."
67            })),
68        ));
69    }
70    // v1.4.102 codex 43 F1 (P2): normalize 在 extract_acc_id_list 之前.
71    // 之前 extractor 只看 snake_case `acc_id_list`, 但官方 proto 是 camelCase
72    // `accIDList`. adapter 后续会 normalize, 但本 extractor 先跑会误 reject 空
73    // → 400 silent. 现在 normalize 进去, extractor 看到 acc_id_list.
74    crate::adapter::normalize_json_keys_snake_case(&mut body);
75    // 1. Loud input validation(解析 acc_id_list;畸形 / 空 / 全零 → 400)
76    let acc_ids = match extract_acc_id_list(&body) {
77        Ok(acc_ids) => acc_ids,
78        Err(reason) => {
79            let key_id = rec
80                .as_deref()
81                .map(|r| r.as_ref().id.clone())
82                .unwrap_or_else(|| "<legacy>".to_string());
83            futu_auth::audit::reject("rest", "/api/sub-acc-push", &key_id, &reason);
84            return Err((
85                StatusCode::BAD_REQUEST,
86                Json(serde_json::json!({
87                    "error": format!("/api/sub-acc-push: {reason}")
88                })),
89            ));
90        }
91    };
92    if let Err(reason) = validate_sub_acc_push_acc_ids(&acc_ids) {
93        let key_id = rec
94            .as_deref()
95            .map(|r| r.as_ref().id.clone())
96            .unwrap_or_else(|| "<legacy>".to_string());
97        futu_auth::audit::reject("rest", "/api/sub-acc-push", &key_id, reason);
98        return Err((
99            StatusCode::BAD_REQUEST,
100            Json(serde_json::json!({
101                "error": format!("/api/sub-acc-push: {reason}")
102            })),
103        ));
104    }
105    // v1.4.102 codex 46 F4 (P1): sub-acc-push 必须对 acc_id_list 每个 id 跑
106    // allowed_acc_ids 限额. 之前只 validate 非空, 但 key 限到 acc A 的用户传
107    // [A, B, C] 仍能 subscribe 别人账户的 trade push stream — 越权.
108    //
109    // codex 0522 F2 v1.4.106: 走共享 helper `check_per_acc_rate_for_caller`,
110    // 与 `/api/unsub-acc-push` 同源 (单一逻辑, 防漂移).
111    check_per_acc_rate_for_caller(
112        &state.counters,
113        rec.as_deref().map(|r| r.as_ref()),
114        &acc_ids,
115        "/api/sub-acc-push",
116    )?;
117
118    // 2. Audit log:留 endpoint + count + head acc_ids 痕迹(push_health 不
119    // 跟踪 subscribe,本 log 是 silent-stub 的可观测性补丁)
120    let head: Vec<u64> = acc_ids.iter().take(3).copied().collect();
121    tracing::info!(
122        target: futu_auth::audit::TARGET,
123        iface = "rest",
124        endpoint = "/api/sub-acc-push",
125        count = acc_ids.len(),
126        head_acc_ids = ?head,
127        "v1.4.90 P1-C: sub-acc-push CMD forward → SubAccPushHandler"
128    );
129
130    // 3. v1.4.102 codex 51 F2 (P2): REST 不再 dispatch 到 gateway
131    // SubAccPushHandler — 那条路径 register ephemeral REST conn_id 到
132    // SubscriptionManager (per-conn_id), 而 unsub 用新 conn_id 删不到, 留
133    // stale entries 让 push_trd_acc 一直扫. REST sub state map (per-key) 已
134    // 是 REST 层 truly canonical 真相源, 直接 return success + 让 augment
135    // 段 (后) 写 state map.
136    let mut resp = Json(serde_json::json!({
137        "ret_type": 0,
138        "ret_msg": serde_json::Value::Null,
139        "err_code": serde_json::Value::Null,
140        "s2c": {}
141    }));
142    // body 不再下发给 gateway, 但仍要消费防 unused warning (REST 已经从 body
143    // 提了 acc_ids).
144    let _ = body;
145
146    // 4. 仅在 backend 返成功 (ret_type=0) 时 augment session_id / unsub_hint
147    //    + v1.4.102 codex 46 F2/F3 / 47 F3 / 48 F3 (P1): 写入 REST sub state
148    //    map 让 WS push delivery 过滤生效. **mutate-after-daemon-success** —
149    //    proto_request 已 await 完, ret_type=0 才 register, 失败时不留 local
150    //    state (避免 daemon 失败但 REST WS state 残留导致行为不一致).
151    if let Some(obj) = resp.0.as_object_mut() {
152        let ret_type_ok = obj.get("ret_type").and_then(|v| v.as_i64()).unwrap_or(-1) == 0;
153        if ret_type_ok {
154            // codex 47 F3 / 48 F3: 仅 daemon-success 后 register local state.
155            if let Some(rec_ref) = rec.as_deref() {
156                let key_id = rec_ref.as_ref().id.clone();
157                crate::adapter::with_rest_acc_subscriptions_write(
158                    &state.rest_acc_subscriptions,
159                    |subs| {
160                        let entry = subs.entry(key_id).or_default();
161                        for &acc_id in &acc_ids {
162                            entry.insert(acc_id);
163                        }
164                    },
165                );
166            }
167            let session_id = derive_sub_acc_push_session_id(&acc_ids);
168            obj.insert("subscribed_acc_ids".to_string(), serde_json::json!(acc_ids));
169            obj.insert("session_id".to_string(), Value::String(session_id.clone()));
170            obj.insert(
171                "unsub_hint".to_string(),
172                Value::String(format!(
173                    "POST /api/unsub-acc-push with same acc_id_list to revoke (REST is stateless, session_id=\"{session_id}\" is a deterministic token derived from sorted acc_ids)"
174                )),
175            );
176        }
177    }
178    Ok(resp)
179}
180
181/// v1.4.90 P1-C helper:从 JSON body 抽 `c2s.acc_id_list`(兼容 flat body)。
182///
183/// 容忍点:
184/// - `body.c2s.acc_id_list` 嵌套(标准 proto 结构)
185/// - `body.acc_id_list` 扁平(adapter 之后会 wrap 成 c2s,本 helper 提前看一眼)
186/// - 数组元素必须是 JSON 正整数;畸形元素 fail-closed,不静默过滤
187pub(crate) fn extract_acc_id_list(body: &Value) -> Result<Vec<u64>, String> {
188    let raw = body
189        .pointer("/c2s/acc_id_list")
190        .or_else(|| body.pointer("/acc_id_list"));
191    let Some(raw) = raw else {
192        return Ok(Vec::new());
193    };
194    let Some(arr) = raw.as_array() else {
195        return Err("c2s.acc_id_list must be an array of positive integer acc_id values".into());
196    };
197
198    let mut acc_ids = Vec::with_capacity(arr.len());
199    for (idx, value) in arr.iter().enumerate() {
200        let Some(acc_id) = value.as_u64() else {
201            return Err(format!(
202                "c2s.acc_id_list[{idx}] must be a positive integer acc_id"
203            ));
204        };
205        acc_ids.push(acc_id);
206    }
207    Ok(acc_ids)
208}
209
210/// v1.4.90 P1-C helper:sub-acc-push 入参校验。
211///
212/// C++ `IAPIServer_Trd_SubAccPush::OnClientReq_SubAccPush` 没有显式校验
213/// 空 list / 0 acc_id(默默 no-op),但 silent no-op 是用户视角的 bug
214/// (tester 报告 "看似 ret=0 但什么都没订上")。Rust REST 侧 fail-fast。
215pub(crate) fn validate_sub_acc_push_acc_ids(acc_ids: &[u64]) -> Result<(), &'static str> {
216    if acc_ids.is_empty() {
217        return Err("c2s.acc_id_list is required and must not be empty");
218    }
219    if acc_ids.contains(&0) {
220        return Err(
221            "c2s.acc_id_list contains zero — call /api/list-accounts to discover real acc_id values",
222        );
223    }
224    Ok(())
225}
226
227/// codex 0522 F2 v1.4.106: 共享的 per-acc rate / scope check helper.
228///
229/// `/api/sub-acc-push` 与 `/api/unsub-acc-push` 之前在两个文件 (trd.rs +
230/// sys.rs) 各写一份相同的 for-loop 跑 `check_full_skip_rate` per acc_id.
231/// codex F2 audit 指出: REST caller scope 没统一入口, 多套手写 = 漂移风险.
232/// 本 helper 是单一来源 — 两处 callers 都调它, 任一改逻辑都同步生效.
233///
234/// 接 `CallerContext` 而不是 `&KeyRecord`: 让 caller side 总是先 build ctx
235/// (per-call snapshot), 与 `IncomingRequest.caller_*` 同源数据不漂移.
236///
237/// `endpoint` 用作 audit reject label (e.g. "/api/sub-acc-push").
238///
239/// **行为**:
240/// - `ctx.key_id = None` (legacy mode): no-op 返 Ok (route handler 已在 sub
241///   入口 reject legacy, 但本 helper 防御性兼容)
242/// - `acc_ids` 任一不通过 → audit reject + 返 Err 含 status code (从 outcome
243///   的 http_status_code 派生, 默认 403)
244/// - 全通过 → Ok
245pub(crate) fn check_per_acc_rate_for_caller(
246    counters: &futu_auth::RuntimeCounters,
247    rec: Option<&futu_auth::KeyRecord>,
248    acc_ids: &[u64],
249    endpoint: &'static str,
250) -> Result<(), (StatusCode, Json<Value>)> {
251    let Some(key_rec) = rec else {
252        // legacy mode — caller handler 已 reject 过, 但防御性兼容. no-op.
253        return Ok(());
254    };
255    for &acc_id in acc_ids {
256        let ctx = futu_auth::CheckCtx {
257            market: String::new(),
258            symbol: String::new(),
259            order_value: None,
260            trd_side: None,
261            acc_id: Some(acc_id),
262            mutation_no_exposure: false,
263            currency: None,
264        };
265        let now = chrono::Utc::now();
266        let outcome = counters.check_full_skip_rate(&key_rec.id, &key_rec.limits(), &ctx, now);
267        if let Some(reason) = outcome.reason() {
268            futu_auth::audit::reject(
269                "rest",
270                endpoint,
271                &key_rec.id,
272                &format!("limit: {reason} (acc_id={acc_id})"),
273            );
274            let status =
275                StatusCode::from_u16(outcome.http_status_code()).unwrap_or(StatusCode::FORBIDDEN);
276            return Err((
277                status,
278                Json(serde_json::json!({
279                    "error": format!("{endpoint}: limit check failed for acc_id={acc_id}: {reason}")
280                })),
281            ));
282        }
283    }
284    Ok(())
285}
286
287/// v1.4.90 P1-C helper:从 sorted acc_ids 派生确定性 session_id。
288///
289/// REST stateless 没有 in-process push 注册表(MCP 有 `Arc<Mutex<HashMap<>>>`),
290/// session_id 由 sorted acc_ids hash 派生,**输入相同 → 输出相同**。
291/// 用户存下 session_id 之后通过 `/api/unsub-acc-push` 用相同 acc_ids
292/// 派生出相同 session_id 即可撤销。
293///
294/// 注:这不是为了"防 hallucination"或"鉴权",仅作为输出 token 让用户
295/// 体感对齐 MCP `futu_sub_acc_push` 的 `{session_id, unsub_hint}` 形状。
296fn derive_sub_acc_push_session_id(acc_ids: &[u64]) -> String {
297    use std::collections::hash_map::DefaultHasher;
298    use std::hash::{Hash, Hasher};
299    let mut sorted = acc_ids.to_vec();
300    sorted.sort_unstable();
301    sorted.dedup();
302    let mut hasher = DefaultHasher::new();
303    sorted.hash(&mut hasher);
304    let hash = hasher.finish();
305    format!("rest-sub-{hash:016x}")
306}
307
308/// codex 0522 F2 v1.4.106: caller scope invariant tests for sub/unsub helpers.
309///
310/// 验证 `check_per_acc_rate_for_caller` 给定同一 `KeyRecord` 的两个不同
311/// endpoint label (sub / unsub) 看到同一份 allowed set。即使 helper
312/// 被加新的 endpoint 调用,`KeyRecord.allowed_acc_ids` 仍是 single source
313/// of truth,helper 不能引入 per-endpoint 漂移。
314#[cfg(test)]
315mod tests_v1_4_106_caller_ctx_invariant;
316
317#[cfg(test)]
318mod acc_id_list_tests;
319
320#[cfg(test)]
321mod tests_v1_4_90_p1_c_sub_acc_push;