futu_rest/routes/trd/sub_acc_push.rs
1//! REST trade account push subscription route and helpers.
2
3use std::sync::Arc;
4
5use axum::extract::{Extension, Json, State};
6use axum::http::StatusCode;
7use serde_json::Value;
8
9use futu_auth::KeyRecord;
10
11use crate::adapter::RestState;
12
13use super::ApiResult;
14
15/// v1.4.90 P1-C 修:POST /api/sub-acc-push — 订阅交易账户 push(订单 / 成交变动)
16///
17/// **历史背景**(外部 tester v1.4.69-89 报告 6 版 silent stub):
18/// - 老版 handler 是单 `proto_request` 透传 wrapper,无 input validation、
19/// 无 audit log、无 session_id token —— 用户 POST 后 daemon log 看不到任何
20/// "本 endpoint 被调用过" 的痕迹(仅 gateway 侧 SubAccPushHandler 默默更
21/// 新 `SubscriptionManager.subscribe_trd_acc(conn_id, acc_id)`),
22/// "ret=0 + s2c={}" 看似成功但下游全无可观测信号。
23///
24/// **本次 (v1.4.90 P1-C) 对齐 MCP `futu_sub_acc_push`**:
25/// 1. **Loud input validation**(acc_id_list 必须非空 + 每个 id 非零;C++
26/// `IAPIServer_Trd_SubAccPush::OnClientReq_SubAccPush` 等价 check)
27/// 2. **Audit log** `tracing::info!` 留 endpoint + count + 头几个 acc_id 痕迹,
28/// 给 push_health 看不到的部分提供可观测性
29/// 3. **Backend CMD forward 经 `proto_request`**(即 dispatch 到 gateway
30/// `SubAccPushHandler` 完成 `SubscriptionManager` 注册 + return ret=0)
31/// 4. **Response augment**:在标准 proto Response (`ret_type/s2c{}`) 上补
32/// `subscribed_acc_ids` + `session_id` + `unsub_hint`,对齐 MCP
33/// `{ok, subscribed_acc_ids, session_id, unsub_hint}` 输出形状。
34/// REST stateless,session_id 用 sorted acc_ids hash 派生(确定性 token,
35/// 可拿来 POST `/api/unsub-acc-push`)
36///
37/// **架构说明**:本 daemon 内 gateway local `SubAccPushHandler` 即对齐
38/// C++ APIServer 的 `Trd_SubAccPush` 处理点 —— 没有"再下一层 backend"
39/// 的 CMD 需要 forward。MCP 通过 TCP `client.request(TRD_SUB_ACC_PUSH)`
40/// 也是落到同一个 gateway handler。所以 "CMD forward" 的语义是 REST →
41/// `proto_request` → `RequestRouter::dispatch` → `SubAccPushHandler`。
42pub async fn sub_acc_push(
43 State(state): State<RestState>,
44 rec: Option<Extension<Arc<KeyRecord>>>,
45 Json(mut body): Json<Value>,
46) -> ApiResult {
47 // v1.4.103 (codex 56 F1 / 58 F5 — B9): legacy 模式 (无 keys.json / 无 Bearer)
48 // 直接 reject. 之前 legacy 模式返 ret_type=0 + session_id 给客户端 "看似成功",
49 // 但**不写** rest_acc_subscriptions 状态 (legacy WS 连接也无 key_id, sub
50 // state filter 跳过) — 用户实际看到 revoke 成功仍按 legacy 全量 trade push
51 // 行为继续收, silent inconsistency. 改为 loud reject 让用户知道 sub state
52 // 需要 keys.json + Bearer 才工作.
53 if rec.is_none() {
54 futu_auth::audit::reject(
55 "rest",
56 "/api/sub-acc-push",
57 "<legacy>",
58 "sub-acc-push not supported in legacy mode (no keys.json)",
59 );
60 return Err((
61 StatusCode::FORBIDDEN,
62 Json(serde_json::json!({
63 "error": "/api/sub-acc-push: legacy mode (no keys.json) does not support per-key sub state. \
64 Configure keys.json and pass Bearer token to enable.",
65 "ret_type": -1,
66 "hint": "v1.4.103 BUG-009: legacy mode previously returned silent success without persisting sub state. Now loud-reject to surface the limitation."
67 })),
68 ));
69 }
70 // v1.4.102 codex 43 F1 (P2): normalize 在 extract_acc_id_list 之前.
71 // 之前 extractor 只看 snake_case `acc_id_list`, 但官方 proto 是 camelCase
72 // `accIDList`. adapter 后续会 normalize, 但本 extractor 先跑会误 reject 空
73 // → 400 silent. 现在 normalize 进去, extractor 看到 acc_id_list.
74 crate::adapter::normalize_json_keys_snake_case(&mut body);
75 // 1. Loud input validation(解析 acc_id_list;畸形 / 空 / 全零 → 400)
76 let acc_ids = match extract_acc_id_list(&body) {
77 Ok(acc_ids) => acc_ids,
78 Err(reason) => {
79 let key_id = rec
80 .as_deref()
81 .map(|r| r.as_ref().id.clone())
82 .unwrap_or_else(|| "<legacy>".to_string());
83 futu_auth::audit::reject("rest", "/api/sub-acc-push", &key_id, &reason);
84 return Err((
85 StatusCode::BAD_REQUEST,
86 Json(serde_json::json!({
87 "error": format!("/api/sub-acc-push: {reason}")
88 })),
89 ));
90 }
91 };
92 if let Err(reason) = validate_sub_acc_push_acc_ids(&acc_ids) {
93 let key_id = rec
94 .as_deref()
95 .map(|r| r.as_ref().id.clone())
96 .unwrap_or_else(|| "<legacy>".to_string());
97 futu_auth::audit::reject("rest", "/api/sub-acc-push", &key_id, reason);
98 return Err((
99 StatusCode::BAD_REQUEST,
100 Json(serde_json::json!({
101 "error": format!("/api/sub-acc-push: {reason}")
102 })),
103 ));
104 }
105 // v1.4.102 codex 46 F4 (P1): sub-acc-push 必须对 acc_id_list 每个 id 跑
106 // allowed_acc_ids 限额. 之前只 validate 非空, 但 key 限到 acc A 的用户传
107 // [A, B, C] 仍能 subscribe 别人账户的 trade push stream — 越权.
108 //
109 // codex 0522 F2 v1.4.106: 走共享 helper `check_per_acc_rate_for_caller`,
110 // 与 `/api/unsub-acc-push` 同源 (单一逻辑, 防漂移).
111 check_per_acc_rate_for_caller(
112 &state.counters,
113 rec.as_deref().map(|r| r.as_ref()),
114 &acc_ids,
115 "/api/sub-acc-push",
116 )?;
117
118 // 2. Audit log:留 endpoint + count + head acc_ids 痕迹(push_health 不
119 // 跟踪 subscribe,本 log 是 silent-stub 的可观测性补丁)
120 let head: Vec<u64> = acc_ids.iter().take(3).copied().collect();
121 tracing::info!(
122 target: futu_auth::audit::TARGET,
123 iface = "rest",
124 endpoint = "/api/sub-acc-push",
125 count = acc_ids.len(),
126 head_acc_ids = ?head,
127 "v1.4.90 P1-C: sub-acc-push CMD forward → SubAccPushHandler"
128 );
129
130 // 3. v1.4.102 codex 51 F2 (P2): REST 不再 dispatch 到 gateway
131 // SubAccPushHandler — 那条路径 register ephemeral REST conn_id 到
132 // SubscriptionManager (per-conn_id), 而 unsub 用新 conn_id 删不到, 留
133 // stale entries 让 push_trd_acc 一直扫. REST sub state map (per-key) 已
134 // 是 REST 层 truly canonical 真相源, 直接 return success + 让 augment
135 // 段 (后) 写 state map.
136 let mut resp = Json(serde_json::json!({
137 "ret_type": 0,
138 "ret_msg": serde_json::Value::Null,
139 "err_code": serde_json::Value::Null,
140 "s2c": {}
141 }));
142 // body 不再下发给 gateway, 但仍要消费防 unused warning (REST 已经从 body
143 // 提了 acc_ids).
144 let _ = body;
145
146 // 4. 仅在 backend 返成功 (ret_type=0) 时 augment session_id / unsub_hint
147 // + v1.4.102 codex 46 F2/F3 / 47 F3 / 48 F3 (P1): 写入 REST sub state
148 // map 让 WS push delivery 过滤生效. **mutate-after-daemon-success** —
149 // proto_request 已 await 完, ret_type=0 才 register, 失败时不留 local
150 // state (避免 daemon 失败但 REST WS state 残留导致行为不一致).
151 if let Some(obj) = resp.0.as_object_mut() {
152 let ret_type_ok = obj.get("ret_type").and_then(|v| v.as_i64()).unwrap_or(-1) == 0;
153 if ret_type_ok {
154 // codex 47 F3 / 48 F3: 仅 daemon-success 后 register local state.
155 if let Some(rec_ref) = rec.as_deref() {
156 let key_id = rec_ref.as_ref().id.clone();
157 crate::adapter::with_rest_acc_subscriptions_write(
158 &state.rest_acc_subscriptions,
159 |subs| {
160 let entry = subs.entry(key_id).or_default();
161 for &acc_id in &acc_ids {
162 entry.insert(acc_id);
163 }
164 },
165 );
166 }
167 let session_id = derive_sub_acc_push_session_id(&acc_ids);
168 obj.insert("subscribed_acc_ids".to_string(), serde_json::json!(acc_ids));
169 obj.insert("session_id".to_string(), Value::String(session_id.clone()));
170 obj.insert(
171 "unsub_hint".to_string(),
172 Value::String(format!(
173 "POST /api/unsub-acc-push with same acc_id_list to revoke (REST is stateless, session_id=\"{session_id}\" is a deterministic token derived from sorted acc_ids)"
174 )),
175 );
176 }
177 }
178 Ok(resp)
179}
180
181/// v1.4.90 P1-C helper:从 JSON body 抽 `c2s.acc_id_list`(兼容 flat body)。
182///
183/// 容忍点:
184/// - `body.c2s.acc_id_list` 嵌套(标准 proto 结构)
185/// - `body.acc_id_list` 扁平(adapter 之后会 wrap 成 c2s,本 helper 提前看一眼)
186/// - 数组元素必须是 JSON 正整数;畸形元素 fail-closed,不静默过滤
187pub(crate) fn extract_acc_id_list(body: &Value) -> Result<Vec<u64>, String> {
188 let raw = body
189 .pointer("/c2s/acc_id_list")
190 .or_else(|| body.pointer("/acc_id_list"));
191 let Some(raw) = raw else {
192 return Ok(Vec::new());
193 };
194 let Some(arr) = raw.as_array() else {
195 return Err("c2s.acc_id_list must be an array of positive integer acc_id values".into());
196 };
197
198 let mut acc_ids = Vec::with_capacity(arr.len());
199 for (idx, value) in arr.iter().enumerate() {
200 let Some(acc_id) = value.as_u64() else {
201 return Err(format!(
202 "c2s.acc_id_list[{idx}] must be a positive integer acc_id"
203 ));
204 };
205 acc_ids.push(acc_id);
206 }
207 Ok(acc_ids)
208}
209
210/// v1.4.90 P1-C helper:sub-acc-push 入参校验。
211///
212/// C++ `IAPIServer_Trd_SubAccPush::OnClientReq_SubAccPush` 没有显式校验
213/// 空 list / 0 acc_id(默默 no-op),但 silent no-op 是用户视角的 bug
214/// (tester 报告 "看似 ret=0 但什么都没订上")。Rust REST 侧 fail-fast。
215pub(crate) fn validate_sub_acc_push_acc_ids(acc_ids: &[u64]) -> Result<(), &'static str> {
216 if acc_ids.is_empty() {
217 return Err("c2s.acc_id_list is required and must not be empty");
218 }
219 if acc_ids.contains(&0) {
220 return Err(
221 "c2s.acc_id_list contains zero — call /api/list-accounts to discover real acc_id values",
222 );
223 }
224 Ok(())
225}
226
227/// codex 0522 F2 v1.4.106: 共享的 per-acc rate / scope check helper.
228///
229/// `/api/sub-acc-push` 与 `/api/unsub-acc-push` 之前在两个文件 (trd.rs +
230/// sys.rs) 各写一份相同的 for-loop 跑 `check_full_skip_rate` per acc_id.
231/// codex F2 audit 指出: REST caller scope 没统一入口, 多套手写 = 漂移风险.
232/// 本 helper 是单一来源 — 两处 callers 都调它, 任一改逻辑都同步生效.
233///
234/// 接 `CallerContext` 而不是 `&KeyRecord`: 让 caller side 总是先 build ctx
235/// (per-call snapshot), 与 `IncomingRequest.caller_*` 同源数据不漂移.
236///
237/// `endpoint` 用作 audit reject label (e.g. "/api/sub-acc-push").
238///
239/// **行为**:
240/// - `ctx.key_id = None` (legacy mode): no-op 返 Ok (route handler 已在 sub
241/// 入口 reject legacy, 但本 helper 防御性兼容)
242/// - `acc_ids` 任一不通过 → audit reject + 返 Err 含 status code (从 outcome
243/// 的 http_status_code 派生, 默认 403)
244/// - 全通过 → Ok
245pub(crate) fn check_per_acc_rate_for_caller(
246 counters: &futu_auth::RuntimeCounters,
247 rec: Option<&futu_auth::KeyRecord>,
248 acc_ids: &[u64],
249 endpoint: &'static str,
250) -> Result<(), (StatusCode, Json<Value>)> {
251 let Some(key_rec) = rec else {
252 // legacy mode — caller handler 已 reject 过, 但防御性兼容. no-op.
253 return Ok(());
254 };
255 for &acc_id in acc_ids {
256 let ctx = futu_auth::CheckCtx {
257 market: String::new(),
258 symbol: String::new(),
259 order_value: None,
260 trd_side: None,
261 acc_id: Some(acc_id),
262 mutation_no_exposure: false,
263 currency: None,
264 };
265 let now = chrono::Utc::now();
266 let outcome = counters.check_full_skip_rate(&key_rec.id, &key_rec.limits(), &ctx, now);
267 if let Some(reason) = outcome.reason() {
268 futu_auth::audit::reject(
269 "rest",
270 endpoint,
271 &key_rec.id,
272 &format!("limit: {reason} (acc_id={acc_id})"),
273 );
274 let status =
275 StatusCode::from_u16(outcome.http_status_code()).unwrap_or(StatusCode::FORBIDDEN);
276 return Err((
277 status,
278 Json(serde_json::json!({
279 "error": format!("{endpoint}: limit check failed for acc_id={acc_id}: {reason}")
280 })),
281 ));
282 }
283 }
284 Ok(())
285}
286
287/// v1.4.90 P1-C helper:从 sorted acc_ids 派生确定性 session_id。
288///
289/// REST stateless 没有 in-process push 注册表(MCP 有 `Arc<Mutex<HashMap<>>>`),
290/// session_id 由 sorted acc_ids hash 派生,**输入相同 → 输出相同**。
291/// 用户存下 session_id 之后通过 `/api/unsub-acc-push` 用相同 acc_ids
292/// 派生出相同 session_id 即可撤销。
293///
294/// 注:这不是为了"防 hallucination"或"鉴权",仅作为输出 token 让用户
295/// 体感对齐 MCP `futu_sub_acc_push` 的 `{session_id, unsub_hint}` 形状。
296fn derive_sub_acc_push_session_id(acc_ids: &[u64]) -> String {
297 use std::collections::hash_map::DefaultHasher;
298 use std::hash::{Hash, Hasher};
299 let mut sorted = acc_ids.to_vec();
300 sorted.sort_unstable();
301 sorted.dedup();
302 let mut hasher = DefaultHasher::new();
303 sorted.hash(&mut hasher);
304 let hash = hasher.finish();
305 format!("rest-sub-{hash:016x}")
306}
307
308/// codex 0522 F2 v1.4.106: caller scope invariant tests for sub/unsub helpers.
309///
310/// 验证 `check_per_acc_rate_for_caller` 给定同一 `KeyRecord` 的两个不同
311/// endpoint label (sub / unsub) 看到同一份 allowed set。即使 helper
312/// 被加新的 endpoint 调用,`KeyRecord.allowed_acc_ids` 仍是 single source
313/// of truth,helper 不能引入 per-endpoint 漂移。
314#[cfg(test)]
315mod tests_v1_4_106_caller_ctx_invariant;
316
317#[cfg(test)]
318mod acc_id_list_tests;
319
320#[cfg(test)]
321mod tests_v1_4_90_p1_c_sub_acc_push;