Skip to main content

futu_rest/adapter/
proto_request.rs

1//! Split from adapter.rs: proto_request.
2//!
3//! pub items: proto_request,proto_request_with_filter,proto_request_with_idempotency,proto_request_with_idempotency_and_caller,proto_request_with_ctx.
4
5use axum::Json;
6use axum::http::StatusCode;
7use serde_json::Value;
8
9use super::*;
10
11///
12/// 泛型参数:
13/// - `Req`: protobuf 请求类型 (prost::Message + serde::Deserialize)
14/// - `Rsp`: protobuf 响应类型 (prost::Message + serde::Serialize)
15///
16/// 流程: JSON → Req → encode → dispatch(proto_id) → decode → Rsp → JSON
17pub async fn proto_request<Req, Rsp>(
18    state: &RestState,
19    proto_id: u32,
20    json_body: Option<Value>,
21) -> Result<Json<Value>, (StatusCode, Json<Value>)>
22where
23    Req: Message + Default + serde::de::DeserializeOwned,
24    Rsp: Message + Default + serde::Serialize,
25{
26    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, None, None).await
27}
28
29/// 把 `DispatchError` 翻译成 REST 400 response (HTTP + JSON body).
30///
31/// pitfall #45 loud error: 不返 silent empty ret_type=0, 返清晰 400 + ret_msg.
32pub(super) fn map_dispatch_error(
33    spec: &'static EndpointSpec,
34    err: DispatchError,
35) -> (StatusCode, Json<Value>) {
36    let proto_id = spec
37        .proto_id()
38        .map(|id| id.to_string())
39        .unwrap_or_else(|| "daemon-local".to_string());
40    let ret_msg = format!(
41        "{} (endpoint: {}, proto_id: {})",
42        err, spec.canonical_name, proto_id
43    );
44    let status = StatusCode::from_u16(spec.runtime.error.validation_http_status)
45        .unwrap_or(StatusCode::BAD_REQUEST);
46    let machine_error_field = spec.runtime.error.machine_error_field;
47    let mut body = serde_json::json!({
48        "ret_type": -1,
49        "ret_msg": ret_msg,
50    });
51    if let Some(obj) = body.as_object_mut() {
52        obj.insert(
53            machine_error_field.to_string(),
54            serde_json::json!({
55                "kind": "validation_error",
56                "message": err.to_string(),
57                "endpoint": spec.canonical_name,
58                "proto_id": proto_id,
59            }),
60        );
61    }
62    (status, Json(body))
63}
64
65pub(super) fn validation_error_body(message: impl Into<String>) -> Value {
66    let message = message.into();
67    serde_json::json!({
68        "ret_type": -1,
69        "ret_msg": message,
70        "error": message,
71    })
72}
73
74/// v1.4.104 阶段 7-1: 走 FilterRegistry 的 proto_request 变体.
75///
76/// 跟 [`proto_request`] / [`proto_request_with_idempotency`] 流程一致, 但在
77/// `decode protobuf 响应` 之前**插入 FilterRegistry::apply** 一步, 按
78/// `allowed_acc_ids` filter 受限 key 的响应 acc_list (proto 2001 TRD_GET_ACC_LIST
79/// 等). 与 gRPC server.rs / WS ws_listener.rs 同源 (单一 registry).
80///
81/// `allowed_acc_ids = None` 时 filter no-op (legacy / 无限制 key).
82///
83/// **codex 0522 F2 v1.4.106**: 推荐改用 [`proto_request_with_ctx`], 这个
84/// helper 改为内部包装, 丢失 `caller_key_id`. 保留作 backward-compat 给
85/// 已有的 acc_list filter call site 不破坏 (route 层迁移到 ctx 后此函数
86/// 全部 caller 应该归零, 保留一段过渡期再删).
87pub async fn proto_request_with_filter<Req, Rsp>(
88    state: &RestState,
89    proto_id: u32,
90    json_body: Option<Value>,
91    allowed_acc_ids: Option<&std::collections::HashSet<u64>>,
92) -> Result<Json<Value>, (StatusCode, Json<Value>)>
93where
94    Req: Message + Default + serde::de::DeserializeOwned,
95    Rsp: Message + Default + serde::Serialize,
96{
97    // codex 0522 F1 v1.4.106: 构 minimal CallerContext 把 allowed_acc_ids 同时
98    // 接进 IncomingRequest.caller_allowed_acc_ids + FilterRegistry. caller_key_id
99    // 仍 None (老 call site 没传). 推荐 caller 迁移到 proto_request_with_ctx.
100    let ctx = if let Some(allowed) = allowed_acc_ids {
101        crate::caller_context::CallerContext {
102            key_id: None,
103            allowed_acc_ids: Some(std::sync::Arc::new(allowed.clone())),
104        }
105    } else {
106        crate::caller_context::CallerContext::legacy()
107    };
108    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, None, Some(&ctx)).await
109}
110
111/// v1.4.38 Phase 4: 支持 `Idempotency-Key` header 的 proto_request。
112/// 老 call site 继续用 `proto_request`(header=None), 新写 trade endpoint
113/// 用 `proto_request_with_idempotency` 从 axum HeaderMap 提取 header 后传入。
114pub async fn proto_request_with_idempotency<Req, Rsp>(
115    state: &RestState,
116    proto_id: u32,
117    json_body: Option<Value>,
118    idempotency_key: Option<String>,
119) -> Result<Json<Value>, (StatusCode, Json<Value>)>
120where
121    Req: Message + Default + serde::de::DeserializeOwned,
122    Rsp: Message + Default + serde::Serialize,
123{
124    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, None).await
125}
126
127/// v1.4.106 codex 0920 F1 (P1): 支持 caller key id 的 idempotency 变体.
128/// 让 cache namespace 跨 caller 隔离 — 不同 caller 用同 Idempotency-Key
129/// **不能** 跨 caller 命中老 response (避免跨账户数据泄漏 + 重复下单).
130///
131/// **call site**: REST trade endpoint (place / modify / cancel / reconfirm)
132/// 在 `rec: Option<Extension<Arc<KeyRecord>>>` 抽 caller_key_id 后传入.
133pub async fn proto_request_with_idempotency_and_caller<Req, Rsp>(
134    state: &RestState,
135    proto_id: u32,
136    json_body: Option<Value>,
137    idempotency_key: Option<String>,
138    caller_key_id: Option<String>,
139) -> Result<Json<Value>, (StatusCode, Json<Value>)>
140where
141    Req: Message + Default + serde::de::DeserializeOwned,
142    Rsp: Message + Default + serde::Serialize,
143{
144    let ctx = caller_key_id.map(|k| crate::caller_context::CallerContext {
145        key_id: Some(k),
146        allowed_acc_ids: None,
147    });
148    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, ctx.as_ref())
149        .await
150}
151
152/// codex 0522 F1 v1.4.106 (推荐 API): 带 `CallerContext` 的 proto_request.
153///
154/// 与 `proto_request_with_idempotency` / `proto_request_with_filter` 的关系:
155/// 后两者只接 `idempotency_key` 或 `allowed_acc_ids` 单一维度, 本函数接完整
156/// `CallerContext` (含 `key_id` + `allowed_acc_ids`), 把 caller scope **同时**
157/// 接到三个下游消费点:
158///
159/// 1. `IncomingRequest.caller_allowed_acc_ids` (dispatch handler 的 per-acc
160///    enforce, defense-in-depth)
161/// 2. `IncomingRequest.caller_key_id` (per-key 配额 / cleanup / 审计入口)
162/// 3. `FilterRegistry::apply` (响应 acc_list 过滤)
163///
164/// 任一接错 → `dispatch handler` 看到 `None` 就 silent bypass (codex F1 audit
165/// 实锤的 v1.4.105 之前 REST regression).
166///
167/// `ctx = None` 等价 `legacy mode` (无 caller key, 无 acc 限制) — 通常仅
168/// 测试 / 显式 unauthenticated route 用. 真 route handler 应该总是构 ctx
169/// 从 `Extension<Arc<KeyRecord>>` 抽出来 (`CallerContext::from_key_record`).
170pub async fn proto_request_with_ctx<Req, Rsp>(
171    state: &RestState,
172    proto_id: u32,
173    json_body: Option<Value>,
174    idempotency_key: Option<String>,
175    ctx: Option<&crate::caller_context::CallerContext>,
176) -> Result<Json<Value>, (StatusCode, Json<Value>)>
177where
178    Req: Message + Default + serde::de::DeserializeOwned,
179    Rsp: Message + Default + serde::Serialize,
180{
181    proto_request_internal::<Req, Rsp>(state, proto_id, json_body, idempotency_key, ctx).await
182}
183
184/// v1.4.104 阶段 7-1 + codex 0522 F1 v1.4.106: 内部统一实现, 接 `CallerContext`
185/// 替代单 `allowed_acc_ids` 入参. 调度时同时填 `IncomingRequest.caller_key_id`
186/// 与 `caller_allowed_acc_ids`, 响应过滤共用同一个 `allowed_acc_ids` 借引用 ——
187/// 单一来源, 防 routes 层手写多套 caller scope check 漂移 (codex F2).
188async fn proto_request_internal<Req, Rsp>(
189    state: &RestState,
190    proto_id: u32,
191    json_body: Option<Value>,
192    idempotency_key: Option<String>,
193    ctx: Option<&crate::caller_context::CallerContext>,
194) -> Result<Json<Value>, (StatusCode, Json<Value>)>
195where
196    Req: Message + Default + serde::de::DeserializeOwned,
197    Rsp: Message + Default + serde::Serialize,
198{
199    // 1. JSON → protobuf 请求
200    // v1.4.45: normalize camelCase keys to snake_case(兼容 FTAPI 官方文档的
201    // camelCase 字段名,如 accID/trdEnv/filterConditions)—— 见 normalize_json_keys_snake_case 注释
202    // v1.4.68: alias 常见 SDK 惯用字段名到 proto 字段(如 max_count → max_ack_kl_num)
203    // v1.4.73 BUG-005: auto-wrap flat body 到 `{c2s: ...}` 结构 —— 用户习惯用
204    // 扁平 body(对齐 Python SDK 风格),但 proto Request struct 要求嵌套 `c2s`。
205    let req_msg: Req = if let Some(mut body) = json_body {
206        normalize_json_keys_snake_case(&mut body);
207        apply_known_field_aliases(&mut body);
208        // v1.4.73 BUG-005: flat body → {c2s: body} 自动包装(见 fn 注释)
209        maybe_wrap_flat_body_as_c2s(&mut body);
210        // v1.4.90 P2-D: trade endpoint 用户常传 flat `acc_id` / `trd_env`
211        // / `trd_market` / `jp_acc_type`, proto 期望 `c2s.header.{...}`.
212        // 在 c2s 已建好但尚未 expand symbol shorthand 时补 header 嵌套.
213        maybe_expand_flat_trd_header(&mut body);
214        // v1.4.73 BUG-005: history-kline 用户习惯 "symbol": "HK.00700" 代替
215        // 嵌套 security:{market,code},adapter 解析 symbol 前缀 + code
216        // v1.4.90 P0-C: expand 路径加 MAX_SYMBOLS_PER_REQUEST 检查, 超返 400.
217        expand_symbol_shorthand(&mut body)
218            .map_err(|e| (StatusCode::BAD_REQUEST, Json(validation_error_body(e))))?;
219        // v1.4.110 Layer 2: spec-driven validation 兜底 — 所有 transformations
220        // 完成后, 按 proto_id lookup EndpointSpec, 调 validate_and_normalize.
221        // body 已是 `{c2s: {...}}` (或 c2s 字段在顶层, 取决于 endpoint), spec
222        // 验 c2s 内部. 未 declare spec 的 endpoint 跳 (向后兼容).
223        if let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) {
224            // 多数 endpoint body 是 {c2s: {actual_content}}, validate c2s 内容.
225            // 但有 endpoint 没 wrap c2s (eg. empty Request struct), validate body 自身.
226            let validation_result = if let Some(c2s) = body.get_mut("c2s") {
227                futu_surface_spec::validate_and_normalize(spec, c2s)
228            } else {
229                futu_surface_spec::validate_and_normalize(spec, &mut body)
230            };
231            if let Err(err) = validation_result {
232                return Err(map_dispatch_error(spec, err));
233            }
234        }
235        serde_json::from_value(body).map_err(|e| {
236            (
237                StatusCode::BAD_REQUEST,
238                Json(validation_error_body(format!("invalid request body: {e}"))),
239            )
240        })?
241    } else {
242        Req::default()
243    };
244
245    // 2. encode 为 protobuf bytes
246    let body = Bytes::from(req_msg.encode_to_vec());
247
248    // 3. 构造 IncomingRequest 调用现有 handler
249    // codex 0522 F1 v1.4.106: caller_allowed_acc_ids + caller_key_id 同时从
250    // CallerContext 拿, 单一来源, 不再分别写 None.
251    let incoming = IncomingRequest::builder(
252        state.next_conn_id(),
253        proto_id,
254        state.next_serial(),
255        ProtoFmtType::Protobuf,
256        body,
257    )
258    .with_idempotency_key(idempotency_key)
259    .with_caller_scope(
260        ctx.and_then(|c| c.caller_allowed_acc_ids_arc()),
261        ctx.and_then(|c| c.caller_key_id()),
262    )
263    .build();
264
265    let resp_bytes = state
266        .router
267        .dispatch(incoming.conn_id, &incoming)
268        .await
269        .ok_or_else(|| {
270            (
271                StatusCode::INTERNAL_SERVER_ERROR,
272                Json(serde_json::json!({
273                    "error": "handler returned no response"
274                })),
275            )
276        })?;
277
278    // 3.5. v1.4.104 阶段 7-1: response filter (cross-surface 共享 FilterRegistry).
279    //      proto 2001 TRD_GET_ACC_LIST 默认注册, allowed_acc_ids 非空时 filter
280    //      响应 acc_list, 受限 key 不能跨账户 enumerate. legacy / 无限制 key /
281    //      非注册 proto_id → no-op 返原 bytes 不动.
282    // codex 0522 F2 v1.4.106: 同一 ctx.allowed_acc_ids 借引用 (与
283    // IncomingRequest.caller_allowed_acc_ids 是同一份 Arc), 单一来源.
284    let resp_bytes = state.filter_registry.apply(
285        proto_id,
286        resp_bytes,
287        ctx.and_then(|c| c.allowed_acc_ids_borrow()),
288    );
289
290    // 4. decode protobuf 响应
291    let rsp_msg = Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
292        (
293            StatusCode::INTERNAL_SERVER_ERROR,
294            Json(serde_json::json!({
295                "error": format!("failed to decode response: {e}")
296            })),
297        )
298    })?;
299
300    // 5. 序列化为 JSON
301    let mut json_rsp = serde_json::to_value(&rsp_msg).map_err(|e| {
302        (
303            StatusCode::INTERNAL_SERVER_ERROR,
304            Json(serde_json::json!({
305                "error": format!("failed to serialize response: {e}")
306            })),
307        )
308    })?;
309
310    // 6. v1.4.34 BUG-2b 修:给所有错误响应的 ret_msg 加 `[err_code=X]` 前缀
311    //    历史:v1.4.27 server_err() helper 在 futu-trd 客户端 lib 里包了 CLI/gRPC/MCP
312    //    三个 surface,漏了 REST 这个 surface。eli 4 次独立复现;v1.4.30 / 31 /
313    //    32 / 33 都没修。根因:REST 走 proto_request 直接序列化响应,没经过 server_err
314    //    helper 层。v1.4.34 直接在 JSON 层包一下,不动 daemon response(daemon 要给
315    //    CLI 客户端留原始 err_code 字段,CLI 自己会包)。
316    maybe_wrap_err_code_prefix(&mut json_rsp);
317
318    Ok(Json(json_rsp))
319}
320
321/// v1.4.34 BUG-2b:REST 响应的 ret_msg 包 `[err_code=X]` 前缀。
322///
323/// 与 `futu_trd::server_err()` 同语义,但作用在已序列化的 JSON 上:
324///
325/// - `ret_type == 0`:成功,不动
326/// - `ret_type != 0` 且 `err_code` 非 null:`[err_code=<code>] <原 ret_msg>`
327/// - `ret_type != 0` 且 `err_code` 缺省:`[err_code=none] <原 ret_msg>`
328/// - `ret_type != 0` 且 ret_msg 空:只留 `[err_code=<X>]` 带方括号的标签
329///
330/// 幂等(已经带 `[err_code=` 前缀的 ret_msg 不重复包)。方括号位置精确,既
331/// 便于客户端 grep,也不误伤"错误描述里恰好有方括号"的正常 msg。
332///
333/// 只对**顶层**的 `ret_type / ret_msg / err_code` 生效;嵌套在 s2c 里的状态字段
334/// 不动。
335pub(super) fn maybe_wrap_err_code_prefix(v: &mut Value) {
336    let obj = match v.as_object_mut() {
337        Some(o) => o,
338        None => return,
339    };
340    // 只处理失败响应
341    let is_err = obj
342        .get("ret_type")
343        .and_then(|t| t.as_i64())
344        .map(|t| t != 0)
345        .unwrap_or(false);
346    if !is_err {
347        return;
348    }
349    // 读当前 msg(可能是 null)
350    let raw_msg = obj
351        .get("ret_msg")
352        .and_then(|m| m.as_str())
353        .unwrap_or("")
354        .to_string();
355    // 幂等:已带前缀就不动(多轮 middleware 不应该双包)
356    if raw_msg.starts_with("[err_code=") {
357        return;
358    }
359    // 读 err_code(可能是 null / 整数)
360    let err_code_label = match obj.get("err_code") {
361        Some(Value::Number(n)) => n
362            .as_i64()
363            .map(|i| i.to_string())
364            .unwrap_or("none".to_string()),
365        _ => "none".to_string(),
366    };
367    let new_msg = if raw_msg.is_empty() {
368        format!("[err_code={err_code_label}]")
369    } else {
370        format!("[err_code={err_code_label}] {raw_msg}")
371    };
372    obj.insert("ret_msg".to_string(), Value::String(new_msg));
373}