futu_rest/adapter/mod.rs
1//! 通用适配器:JSON ↔ Protobuf 转换 + 请求分发
2//!
3//! 核心思路:
4//! 1. HTTP 请求带 JSON body → 反序列化为 prost Message → encode 为 bytes
5//! 2. 构造 IncomingRequest { proto_id, body } → 调用 RequestRouter::dispatch
6//! 3. 响应 bytes → decode 为 prost Message → 序列化为 JSON 返回
7
8use std::collections::{HashMap, HashSet};
9use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
10use std::sync::{Arc, RwLock};
11
12use bytes::Bytes;
13use prost::Message;
14
15use futu_auth::KeyStore;
16use futu_codec::header::ProtoFmtType;
17use futu_server::conn::IncomingRequest;
18use futu_server::router::RequestRouter;
19use futu_surface_spec::{DispatchError, EndpointSpec};
20
21use crate::ws::WsBroadcaster;
22
23pub(crate) type RestAccSubscriptionMap = HashMap<String, HashSet<u64>>;
24pub(crate) type RestAccSubscriptions = Arc<RwLock<RestAccSubscriptionMap>>;
25
26pub(crate) fn with_rest_acc_subscriptions_read<T>(
27 subscriptions: &RestAccSubscriptions,
28 f: impl FnOnce(&RestAccSubscriptionMap) -> T,
29) -> T {
30 match subscriptions.read() {
31 Ok(guard) => f(&guard),
32 Err(poisoned) => {
33 tracing::warn!("rest_acc_subscriptions read lock poisoned; recovering inner state");
34 let guard = poisoned.into_inner();
35 f(&guard)
36 }
37 }
38}
39
40pub(crate) fn with_rest_acc_subscriptions_write<T>(
41 subscriptions: &RestAccSubscriptions,
42 f: impl FnOnce(&mut RestAccSubscriptionMap) -> T,
43) -> T {
44 match subscriptions.write() {
45 Ok(mut guard) => f(&mut guard),
46 Err(poisoned) => {
47 tracing::warn!("rest_acc_subscriptions write lock poisoned; recovering inner state");
48 let mut guard = poisoned.into_inner();
49 f(&mut guard)
50 }
51 }
52}
53
54/// v1.4.90 P0-C DoS guard: 单次请求 `security_list` / `code_list` / `symbols`
55/// / `stocks` / `symbol_list` 数组最大长度。
56///
57/// **背景**: v1.4.82 B1 引入 string-array shorthand expand 后, adapter 层
58/// 对数组长度无约束. 攻击者(或 buggy SDK) `POST /api/subscribe` 一次传
59/// 150_000 symbols → quota 爆 + RSS 40× 放大 + handler O(N) 处理阻塞.
60///
61/// 100 是 Futu OpenD 单订阅 batch 的合理上限(参照 C++ OpenD 真机经验, 大
62/// portfolio 用户也极少超过 100). 超出 → 立即 400 + loud msg 引导 client
63/// 分批.
64///
65/// 触发位置: `expand_symbols_array_to_security_list` 入口 + 已含 object
66/// array 的 `security_list` 原样路径(防止用户绕过 string shorthand 直接
67/// 传 150k objects). v1.4.82 B1 漏的 input validation 这里补上.
68pub const MAX_SYMBOLS_PER_REQUEST: usize = 100;
69
70/// v1.4.32+ `/api/admin/status` 的后端快照 provider。
71///
72/// 由 `futu-opend` 启动时注入:closure 捕获 `Arc<GatewayBridge>`,每次调用
73/// 返回 `serde_json::Value` 形式的 `StatusSnapshot`。这样 futu-rest 不需要
74/// 反向依赖 futu-gateway。未配置时 `admin_status` 返 503。
75pub type AdminStatusProvider = Arc<dyn Fn() -> serde_json::Value + Send + Sync>;
76
77/// v1.4.32+ `/api/admin/reload` 的 handler closure。
78///
79/// 同 `AdminStatusProvider`:由 opend 注入,捕获 Bridge 的 Arc 调其
80/// `reload()` 方法,返 JSON。
81/// v1.4.34: reload 升级到 async(内部调 remember_login 网络 I/O),所以 handler
82/// 也变成 async。返 `Pin<Box<dyn Future>>` 是标准做法。如果做成 sync + 内部
83/// block_on 会在 axum async handler 里导致 runtime 死锁。
84pub type AdminReloadHandler = Arc<
85 dyn Fn() -> std::pin::Pin<
86 Box<dyn std::future::Future<Output = serde_json::Value> + Send + 'static>,
87 > + Send
88 + Sync,
89>;
90
91/// v1.4.83 §9 Phase 2 F5: Push channel health snapshot provider.
92///
93/// opend 启动时注入 closure, 捕获 `GatewayBridge.push_health` Arc, 在
94/// `/api/push-subscriber-info` 调用时返 JSON snapshot. 和 `AdminStatusProvider`
95/// 同一 pattern —— 避免 futu-rest 直接依赖 futu-gateway 造成循环.
96pub type PushHealthSnapshotProvider = Arc<dyn Fn() -> serde_json::Value + Send + Sync>;
97
98/// v1.4.105 D12 (Phase 2): card_num → acc_ids 解析器.
99///
100/// opend 启动时注入 closure, 捕获 `bridge.caches.trd_cache` Arc, 调
101/// `find_acc_ids_by_card_num(input) -> Vec<u64>`. REST trade handler 在 dispatch
102/// 前用此解析 user 传的 `card_num` 字段(4 位末尾 / 16 位完整)→ acc_id, 写
103/// 进 `c2s.header.acc_id`. 和 `AdminStatusProvider` 同 pattern —— 避免 futu-rest
104/// 反向依赖 futu-cache / futu-gateway.
105///
106/// 行为契约 (与 `TrdCache::find_acc_ids_by_card_num` 等价):
107/// - input 必须 4 位 / 16 位纯数字 (caller 应已 validate, 此处不 validate)
108/// - 0 match → empty Vec (caller 决定 reject)
109/// - 多 match → 多 acc_id Vec (caller 决定 reject ambiguous)
110/// - 1 match → 单 acc_id Vec
111pub type CardNumResolver = Arc<dyn Fn(&str) -> Vec<u64> + Send + Sync>;
112
113// Split from the former 1223-line adapter.rs into focused submodules.
114mod proto_request;
115mod response;
116mod state;
117mod symbol_normalize;
118#[cfg(test)]
119use axum::Json;
120#[cfg(test)]
121use axum::http::StatusCode;
122#[cfg(test)]
123use proto_request::{map_dispatch_error, maybe_wrap_err_code_prefix, validation_error_body};
124#[cfg(test)]
125use symbol_normalize::to_snake_case;
126#[cfg(test)]
127mod tests;
128
129pub use proto_request::{
130 proto_request, proto_request_with_ctx, proto_request_with_filter,
131 proto_request_with_idempotency, proto_request_with_idempotency_and_caller,
132};
133pub use response::ApiResponse;
134pub(crate) use response::{
135 apply_known_field_aliases, expand_symbol_shorthand, maybe_expand_flat_trd_header,
136 maybe_wrap_flat_body_as_c2s,
137};
138pub use state::RestState;
139pub use symbol_normalize::normalize_json_keys_snake_case;