Skip to main content

futu_rest/routes/qot/
mod.rs

1//! 行情 REST API 路由
2//!
3//! 所有行情相关接口通过 proto_request 适配到现有 handler。
4
5use axum::extract::Json;
6use axum::http::StatusCode;
7use bytes::Bytes;
8use prost::Message;
9use serde_json::Value;
10
11use futu_codec::header::ProtoFmtType;
12use futu_proto::qot_get_basic_qot;
13use futu_proto::qot_get_broker;
14use futu_proto::qot_get_capital_distribution;
15use futu_proto::qot_get_capital_flow;
16use futu_proto::qot_get_code_change;
17use futu_proto::qot_get_future_info;
18use futu_proto::qot_get_holding_change_list;
19use futu_proto::qot_get_ipo_list;
20use futu_proto::qot_get_kl;
21use futu_proto::qot_get_market_state;
22use futu_proto::qot_get_option_chain;
23use futu_proto::qot_get_option_expiration_date;
24use futu_proto::qot_get_order_book;
25use futu_proto::qot_get_owner_plate;
26use futu_proto::qot_get_plate_security;
27use futu_proto::qot_get_plate_set;
28use futu_proto::qot_get_price_reminder;
29use futu_proto::qot_get_reference;
30use futu_proto::qot_get_rt;
31use futu_proto::qot_get_security_snapshot;
32use futu_proto::qot_get_static_info;
33use futu_proto::qot_get_sub_info;
34use futu_proto::qot_get_suspend;
35use futu_proto::qot_get_ticker;
36use futu_proto::qot_get_user_security;
37use futu_proto::qot_get_warrant;
38use futu_proto::qot_modify_user_security;
39use futu_proto::qot_request_history_kl;
40use futu_proto::qot_request_history_kl_quota;
41use futu_proto::qot_request_rehab;
42use futu_proto::qot_request_trade_date;
43use futu_proto::qot_set_price_reminder;
44use futu_proto::qot_stock_filter;
45use futu_proto::qot_sub;
46use futu_proto::used_quota;
47use futu_server::conn::IncomingRequest;
48
49use crate::adapter::{
50    self, RestState, apply_known_field_aliases, expand_symbol_shorthand,
51    maybe_wrap_flat_body_as_c2s, normalize_json_keys_snake_case,
52};
53
54type ApiResult = Result<Json<Value>, (StatusCode, Json<Value>)>;
55
56/// v1.4.90 P0-B: REST 全 endpoint 共享 conn_id(替代之前每次请求 `next_conn_id()`
57/// 自增 → 永久泄漏 sub-quota 直到耗尽 4000 上限)。
58///
59/// **背景**:v1.4.81 一直到 v1.4.89 之前,REST `proto_request` 每次都调
60/// `state.next_conn_id()` 拿新 virtual conn_id(10_000_000 自增)。`SubscriptionManager`
61/// 按 conn_id 记账:subscribe 把 (security, sub_type) 挂在 conn_id 下、quota
62/// +=1;unsubscribe 从同一 conn_id 删除、quota -=1。但 REST `subscribe` 用 conn_id=X
63/// → backend 订阅 ✓,下一次 REST `unsubscribe` 用 conn_id=Y → 找不到任何挂载 →
64/// 静默 no-op,quota 永不释放。日积月累 4000 quota 耗尽,整个 daemon 不能再
65/// 订阅任何东西。
66///
67/// **修法**:所有 REST sub-related endpoint(`subscribe` / `unsubscribe` /
68/// `get_sub_info` / `query_subscription`)锁定到 `REST_SHARED_CONN`. lifecycle
69/// 由 daemon 进程管,不随 REST 请求生灭。
70///
71/// **REST 视角合理性**:REST 是 stateless API,调用方不持续持有 daemon TCP
72/// 连接,"连接 ID" 在 REST 层面没物理意义。把所有 REST 流量当作"REST gateway"
73/// 这一个虚拟客户端的多次调用,对应一个固定 conn_id 是最干净的语义。v1.4.106
74/// 起 quote / kline / orderbook / ticker / rt handler 也会检查
75/// `SubscriptionManager::is_qot_subscribed(conn_id, ...)`,所以这些订阅门禁
76/// 读路径也必须锁定到同一 `REST_SHARED_CONN`,否则 REST subscribe 后下一次
77/// REST read 看不到同一个 conn 的订阅。
78///
79/// **取值选择**:`0xFFFF_FFFE_u64`(4_294_967_294)。`next_conn_id` 起点
80/// 10_000_000,要 ~4.28B 次调用才碰到此值,daemon 早已重启。`0xFFFF_FFFF_u64`
81/// 留给未来可能的 sentinel(如"all REST"广播)。
82///
83/// 对齐 MCP 路径行为:MCP 用单 `FutuClient` 复用底层 TCP,所有 MCP 请求自然
84/// 共享同一个 daemon 分配的 conn_id,不存在该 bug。REST 现在显式实现同语义。
85pub const REST_SHARED_CONN: u64 = 0xFFFF_FFFE;
86
87/// v1.4.90 P0-B: 用 REST_SHARED_CONN 而非 `state.next_conn_id()` 派发请求。
88///
89/// 复刻 `adapter::proto_request_with_idempotency` 的 JSON normalize → encode →
90/// dispatch → decode → JSON 流程,唯一差别:dispatch 时 `conn_id =
91/// REST_SHARED_CONN`. 用于 sub-related endpoint 防 quota 泄漏(见
92/// `REST_SHARED_CONN` 注释)。
93///
94/// 不复用 adapter::proto_request 是因为该函数硬编码 `state.next_conn_id()`,
95/// 改 adapter 会越权(v1.4.90 多 agent 并行约定 agent C 改 adapter,不交叉)。
96///
97/// **codex 0522 F3 v1.4.106 (option B)**: 接 `Option<&CallerContext>` 让
98/// gateway handler 知道 REST caller key 身份, 即使 conn_id 是
99/// `REST_SHARED_CONN` 全局共享 (per-key 订阅配额 / cleanup / 审计). QOT
100/// 行情 path 不直接受 `allowed_acc_ids` 约束, 但 `caller_key_id` 让未来
101/// per-key 订阅 owner 模型 (e.g. QotSubscriptionState owner) 可识别 caller
102/// 而不是看作 "REST 全局".
103async fn proto_request_shared_conn<Req, Rsp>(
104    state: &RestState,
105    proto_id: u32,
106    json_body: Option<Value>,
107    ctx: Option<&crate::caller_context::CallerContext>,
108) -> ApiResult
109where
110    Req: Message + Default + serde::de::DeserializeOwned,
111    Rsp: Message + Default + serde::Serialize,
112{
113    // 1. JSON → protobuf 请求(与 adapter::proto_request_with_idempotency 同序)
114    let req_msg: Req = if let Some(mut body) = json_body {
115        normalize_json_keys_snake_case(&mut body);
116        apply_known_field_aliases(&mut body);
117        maybe_wrap_flat_body_as_c2s(&mut body);
118        // 注:sub-related endpoint 不需要 trd header expand(trade-only path)
119        expand_symbol_shorthand(&mut body)
120            .map_err(|e| (StatusCode::BAD_REQUEST, Json(validation_error_body(e))))?;
121        if let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) {
122            let validation_result = if let Some(c2s) = body.get_mut("c2s") {
123                futu_surface_spec::validate_and_normalize(spec, c2s)
124            } else {
125                futu_surface_spec::validate_and_normalize(spec, &mut body)
126            };
127            if let Err(err) = validation_result {
128                return Err(map_surface_spec_error(spec, err));
129            }
130        }
131        serde_json::from_value(body).map_err(|e| {
132            (
133                StatusCode::BAD_REQUEST,
134                Json(validation_error_body(format!("invalid request body: {e}"))),
135            )
136        })?
137    } else {
138        Req::default()
139    };
140
141    // 2. encode
142    let body = Bytes::from(req_msg.encode_to_vec());
143
144    // 3. dispatch with REST_SHARED_CONN(区别点)
145    // codex 0522 F3 v1.4.106: 同时填 caller_key_id (per-call snapshot) 让
146    // gateway handler 识别 REST caller 身份. allowed_acc_ids 仍 None
147    // (QOT 行情不直接 acc-bound), 真填走 ctx.caller_allowed_acc_ids_arc.
148    let incoming = IncomingRequest::builder(
149        REST_SHARED_CONN,
150        proto_id,
151        state.next_serial(),
152        ProtoFmtType::Protobuf,
153        body,
154    )
155    .with_caller_scope(
156        ctx.and_then(|c| c.caller_allowed_acc_ids_arc()),
157        ctx.and_then(|c| c.caller_key_id()),
158    )
159    .build();
160    let resp_bytes = state
161        .router
162        .dispatch(REST_SHARED_CONN, &incoming)
163        .await
164        .ok_or_else(|| {
165            (
166                StatusCode::INTERNAL_SERVER_ERROR,
167                Json(serde_json::json!({
168                    "error": "handler returned no response"
169                })),
170            )
171        })?;
172
173    // 4. decode
174    let rsp_msg = Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
175        (
176            StatusCode::INTERNAL_SERVER_ERROR,
177            Json(serde_json::json!({
178                "error": format!("failed to decode response: {e}")
179            })),
180        )
181    })?;
182
183    // 5. serialize JSON
184    let mut json_rsp = serde_json::to_value(&rsp_msg).map_err(|e| {
185        (
186            StatusCode::INTERNAL_SERVER_ERROR,
187            Json(serde_json::json!({
188                "error": format!("failed to serialize response: {e}")
189            })),
190        )
191    })?;
192
193    // 6. err_code 前缀(与 adapter::maybe_wrap_err_code_prefix 同语义;该 helper
194    //    在 adapter.rs 是 private,inline 一个等价小版本以满足"不碰 adapter" 约束)
195    wrap_err_code_prefix_inline(&mut json_rsp);
196
197    Ok(Json(json_rsp))
198}
199
200fn map_surface_spec_error(
201    spec: &'static futu_surface_spec::EndpointSpec,
202    err: futu_surface_spec::DispatchError,
203) -> (StatusCode, Json<Value>) {
204    let proto_id = spec
205        .proto_id()
206        .map(|id| id.to_string())
207        .unwrap_or_else(|| "daemon-local".to_string());
208    let ret_msg = format!(
209        "{} (endpoint: {}, proto_id: {})",
210        err, spec.canonical_name, proto_id
211    );
212    let mut body = validation_error_body(ret_msg);
213    let machine_error_field = spec.runtime.error.machine_error_field;
214    if let Some(obj) = body.as_object_mut() {
215        obj.insert(
216            machine_error_field.to_string(),
217            serde_json::json!({
218                "kind": "validation_error",
219                "message": err.to_string(),
220                "endpoint": spec.canonical_name,
221                "proto_id": proto_id,
222            }),
223        );
224    }
225    (StatusCode::BAD_REQUEST, Json(body))
226}
227
228fn validation_error_body(message: impl Into<String>) -> Value {
229    let message = message.into();
230    serde_json::json!({
231        "ret_type": -1,
232        "ret_msg": message,
233        "error": message,
234    })
235}
236
237/// v1.4.90 P0-B: inline `adapter::maybe_wrap_err_code_prefix` 的等价实现。
238///
239/// adapter.rs 里的版本是 private(v1.4.34 BUG-2b 沉淀)。本 helper 为
240/// proto_request_shared_conn 服务,避免修改 adapter(不交叉 agent C scope)。
241///
242/// 语义与 adapter 版本对齐:
243/// - ret_type == 0 → 不动
244/// - ret_type != 0 + err_code present → `[err_code=<X>] <ret_msg>`
245/// - 幂等:已带 `[err_code=` 前缀的 ret_msg 不重复包
246fn wrap_err_code_prefix_inline(v: &mut Value) {
247    let obj = match v.as_object_mut() {
248        Some(o) => o,
249        None => return,
250    };
251    let is_err = obj
252        .get("ret_type")
253        .and_then(|t| t.as_i64())
254        .map(|t| t != 0)
255        .unwrap_or(false);
256    if !is_err {
257        return;
258    }
259    let raw_msg = obj
260        .get("ret_msg")
261        .and_then(|m| m.as_str())
262        .unwrap_or("")
263        .to_string();
264    if raw_msg.starts_with("[err_code=") {
265        return;
266    }
267    let err_code_label = match obj.get("err_code") {
268        Some(Value::Number(n)) => n
269            .as_i64()
270            .map(|i| i.to_string())
271            .unwrap_or_else(|| "none".to_string()),
272        _ => "none".to_string(),
273    };
274    let new_msg = if raw_msg.is_empty() {
275        format!("[err_code={err_code_label}]")
276    } else {
277        format!("[err_code={err_code_label}] {raw_msg}")
278    };
279    obj.insert("ret_msg".to_string(), Value::String(new_msg));
280}
281
282/// POST /api/subscribe — 订阅/退订行情
283///
284/// **v1.4.90 P0-B fix**: 用 `REST_SHARED_CONN` 替代 `state.next_conn_id()`,
285/// 杜绝 quota 永久泄漏(详见 `REST_SHARED_CONN` 注释).
286///
287/// **v1.4.104 eli S-005 (P1) fix**: REST 层显式检查 `is_sub_or_un_sub` 字段
288/// 在 raw JSON body 是否 present. proto bool 字段 missing 时 prost 默认为
289/// false → handler 走 unsub 路径; 但 unsub 路径对 invalid ticker silent
290/// success (handler 注释 line 187-189: unsub by-design 不做 backend 解析).
291/// agent 调用方默认只传 `symbols`, 不写 boolean → silent 没订阅.
292///
293/// 修法: REST adapter 入口先检查 `is_sub_or_un_sub` 在 body 里 (snake-case
294/// 归一化后) 是否 present. 不 present → 400 提示用户必须显式传字段.
295///
296/// **v1.4.104 codex round 2 F5 (P2) fix**: REST 全部 caller 共用一个虚拟连接
297/// `REST_SHARED_CONN=0xFFFFFFFE` (v1.4.90 P0-B 防 quota 泄漏的设计). 因此
298/// `is_unsub_all=true` 调用会**清掉所有 REST callers** 的 QOT 订阅 (因为大家
299/// 共享同一 conn_id 的 subscription bucket). 对于多 REST client 部署:
300///
301///   - 当前合约: `is_unsub_all=true` 在 REST 显式 reject, 仅返 400 解释
302///     "REST is_unsub_all 是 process-wide 操作, 跨 caller 影响, 默认禁用".
303///   - 可用替代: 显式列 `security_list` + `sub_type_list` 做单 symbol 退订;
304///     或改用 MCP / gRPC / WS 这类 per-conn surface 执行 unsub_all.
305///   - REST 目前没有 process-wide opt-in query, 也没有 admin clear endpoint;
306///     后续若要支持 REST per-key unsub_all, 必须先补独立状态模型与公开契约。
307///
308/// 当前实装: 选项 A 保守 (拒+提示), 防止 caller A 意外清掉 caller B 的 subs.
309/// MCP / gRPC / WS 各 caller 有自己 conn_id, 不受影响.
310// Split: 1408 行 → 5 子文件 (contiguous fn 段)
311mod misc;
312mod quotes;
313mod reference;
314mod snapshot;
315mod subscribe;
316
317#[cfg(test)]
318mod tests;
319
320#[cfg(test)]
321use misc::inject_default_is_req_all_conn;
322#[cfg(test)]
323use quotes::{annotate_quote_cache_miss, orderbook_loud_unsub_hint};
324#[cfg(test)]
325use snapshot::{
326    augment_snapshot_with_exchange_code, augment_static_info_with_exchange_code,
327    check_static_info_input,
328};
329#[cfg(test)]
330use subscribe::body_has_sub_or_unsub_flag;
331
332pub use misc::{
333    get_risk_free_rate, get_spread_table, get_ticker_statistic, get_ticker_statistic_detail,
334    list_plates, query_subscription, unsubscribe,
335};
336pub use quotes::{get_basic_qot, get_broker, get_kl, get_order_book, get_rt, get_ticker};
337pub use reference::{
338    get_capital_distribution, get_capital_flow, get_code_change, get_future_info,
339    get_holding_change, get_ipo_list, get_market_state, get_option_chain,
340    get_option_expiration_date, get_owner_plate, get_plate_security, get_plate_set,
341    get_price_reminder, get_reference, get_suspend, get_used_quota, get_user_security, get_warrant,
342    modify_user_security, request_history_kl, request_history_kl_quota, request_rehab,
343    request_trading_days, set_price_reminder, stock_filter,
344};
345pub use snapshot::{get_snapshot, get_static_info};
346pub use subscribe::{get_sub_info, subscribe};