1use std::sync::Arc;
4
5use axum::Extension;
6use axum::extract::{Json, Query, State};
7use axum::http::StatusCode;
8use bytes::Bytes;
9use futu_codec::header::ProtoFmtType;
10use futu_server::conn::IncomingRequest;
11use prost::Message;
12use serde::Deserialize;
13use serde_json::Value;
14
15use futu_core::proto_id;
16use futu_proto::get_delay_statistics;
17use futu_proto::get_global_state;
18use futu_proto::get_user_info;
19use futu_proto::test_cmd;
20use futu_backend::proto_internal::futu_token_state;
22use futu_qot::quote_rights::SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE;
23
24use crate::adapter::{self, RestState};
25
26type ApiResult = Result<Json<Value>, (StatusCode, Json<Value>)>;
27
28pub async fn get_global_state(State(state): State<RestState>) -> ApiResult {
33 adapter::proto_request::<get_global_state::Request, get_global_state::Response>(
34 &state,
35 proto_id::GET_GLOBAL_STATE,
36 None,
37 )
38 .await
39}
40
41pub async fn get_user_info(State(state): State<RestState>) -> ApiResult {
43 adapter::proto_request::<get_user_info::Request, get_user_info::Response>(
44 &state,
45 proto_id::GET_USER_INFO,
46 None,
47 )
48 .await
49}
50
51#[derive(Debug, Deserialize)]
52#[serde(deny_unknown_fields)]
53pub struct QuoteRightsQuery {
54 refresh: Option<bool>,
55}
56
57pub async fn get_quote_rights(
59 State(state): State<RestState>,
60 Query(query): Query<QuoteRightsQuery>,
61) -> ApiResult {
62 if query.refresh.unwrap_or(false) {
63 let req = test_cmd::Request {
64 c2s: test_cmd::C2s {
65 cmd: "request_highest_quote_right".to_string(),
66 param_str: None,
67 param_bytes: None,
68 },
69 };
70 let resp: test_cmd::Response = dispatch_proto(
71 &state,
72 proto_id::TEST_CMD,
73 req,
74 "request_highest_quote_right",
75 )
76 .await?;
77 if resp.ret_type != 0 {
78 return Err(api_error(
79 StatusCode::BAD_GATEWAY,
80 format!(
81 "request_highest_quote_right ret_type={} msg={}",
82 resp.ret_type,
83 resp.ret_msg.unwrap_or_default()
84 ),
85 ));
86 }
87 }
88
89 let req = test_cmd::Request {
90 c2s: test_cmd::C2s {
91 cmd: SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE.to_string(),
92 param_str: None,
93 param_bytes: None,
94 },
95 };
96 let resp: test_cmd::Response = dispatch_proto(
97 &state,
98 proto_id::TEST_CMD,
99 req,
100 SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE,
101 )
102 .await?;
103 if resp.ret_type != 0 {
104 return Err(api_error(
105 StatusCode::BAD_GATEWAY,
106 format!(
107 "{} ret_type={} msg={}",
108 SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE,
109 resp.ret_type,
110 resp.ret_msg.unwrap_or_default()
111 ),
112 ));
113 }
114 let json = resp.s2c.and_then(|s| s.result_str).ok_or_else(|| {
115 api_error(
116 StatusCode::BAD_GATEWAY,
117 format!("{SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE}: missing result_str"),
118 )
119 })?;
120 serde_json::from_str::<Value>(&json).map(Json).map_err(|e| {
121 api_error(
122 StatusCode::INTERNAL_SERVER_ERROR,
123 format!("parse {SYS_QUERY_GET_QUOTE_RIGHTS_PROFILE}: {e}"),
124 )
125 })
126}
127
128async fn dispatch_proto<Req, Rsp>(
129 state: &RestState,
130 proto_id: u32,
131 req: Req,
132 label: &str,
133) -> Result<Rsp, (StatusCode, Json<Value>)>
134where
135 Req: Message,
136 Rsp: Message + Default,
137{
138 let incoming = IncomingRequest::builder(
139 state.next_conn_id(),
140 proto_id,
141 state.next_serial(),
142 ProtoFmtType::Protobuf,
143 Bytes::from(req.encode_to_vec()),
144 )
145 .build();
146 let resp_bytes = state
147 .router
148 .dispatch(incoming.conn_id, &incoming)
149 .await
150 .ok_or_else(|| {
151 api_error(
152 StatusCode::INTERNAL_SERVER_ERROR,
153 format!("{label}: handler returned no response"),
154 )
155 })?;
156 Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
157 api_error(
158 StatusCode::INTERNAL_SERVER_ERROR,
159 format!("decode {label}: {e}"),
160 )
161 })
162}
163
164fn api_error(status: StatusCode, message: String) -> (StatusCode, Json<Value>) {
165 (
166 status,
167 Json(serde_json::json!({
168 "ret_type": -1,
169 "ret_msg": message,
170 })),
171 )
172}
173
174pub async fn get_delay_statistics(State(state): State<RestState>) -> ApiResult {
176 adapter::proto_request::<get_delay_statistics::Request, get_delay_statistics::Response>(
177 &state,
178 proto_id::GET_DELAY_STATISTICS,
179 None,
180 )
181 .await
182}
183
184pub async fn get_delay_statistics_post(
190 State(state): State<RestState>,
191 Json(body): Json<Value>,
192) -> ApiResult {
193 adapter::proto_request::<get_delay_statistics::Request, get_delay_statistics::Response>(
194 &state,
195 proto_id::GET_DELAY_STATISTICS,
196 Some(body),
197 )
198 .await
199}
200
201pub async fn ping(State(state): State<RestState>) -> ApiResult {
209 let ok = adapter::proto_request::<get_global_state::Request, get_global_state::Response>(
212 &state,
213 proto_id::GET_GLOBAL_STATE,
214 None,
215 )
216 .await
217 .is_ok();
218
219 Ok(Json(serde_json::json!({
220 "ok": ok,
221 "version": env!("CARGO_PKG_VERSION"),
222 "gateway": "futu-opend-rs",
223 })))
224}
225
226pub async fn push_subscriber_info(
246 State(state): State<RestState>,
247) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
248 if let Some(ref provider) = state.push_health_snapshot_provider {
250 let health = provider();
251 return Ok(Json(serde_json::json!({
252 "ret_type": 0,
253 "ret_msg": "success",
254 "push_health": health,
255 "recommendations": [
256 {
257 "purpose": "查订阅列表 + 全局 quota",
258 "endpoint": "POST /api/query-subscription -d '{}'",
259 "note": "v1.4.83 起默认 all-conn 视图"
260 },
261 {
262 "purpose": "接收 push 数据(quote / tick / orderbook 等)",
263 "endpoint": "WebSocket /ws (支持 Bearer Token 握手)"
264 }
265 ],
266 })));
267 }
268 Ok(Json(serde_json::json!({
270 "ret_type": 0,
271 "ret_msg": "push health snapshot provider 未注入(通常是 embedded \
272 test 场景; 生产 daemon 应有 provider)。",
273 "recommendations": [
274 {
275 "purpose": "查订阅列表 + 全局 quota",
276 "endpoint": "POST /api/query-subscription -d '{}'"
277 },
278 {
279 "purpose": "接收 push 数据",
280 "endpoint": "WebSocket /ws"
281 }
282 ],
283 })))
284}
285
286pub async fn unsub_acc_push(
302 State(state): State<RestState>,
303 rec: Option<Extension<Arc<futu_auth::KeyRecord>>>,
304 Json(mut body): Json<Value>,
305) -> ApiResult {
306 if rec.is_none() {
310 futu_auth::audit::reject(
311 "rest",
312 "/api/unsub-acc-push",
313 "<legacy>",
314 "unsub-acc-push not supported in legacy mode (no keys.json)",
315 );
316 return Err((
317 axum::http::StatusCode::FORBIDDEN,
318 Json(serde_json::json!({
319 "error": "/api/unsub-acc-push: legacy mode (no keys.json) does not support per-key sub state. \
320 Configure keys.json and pass Bearer token to enable.",
321 "ret_type": -1,
322 "hint": "v1.4.103 B9: legacy mode previously returned silent success without revoking. Now loud-reject to surface the limitation."
323 })),
324 ));
325 }
326 crate::adapter::normalize_json_keys_snake_case(&mut body);
328
329 let acc_ids = match crate::routes::trd::extract_acc_id_list(&body) {
331 Ok(acc_ids) => acc_ids,
332 Err(reason) => {
333 let key_id = rec
334 .as_deref()
335 .map(|r| r.as_ref().id.clone())
336 .unwrap_or_else(|| "<legacy>".to_string());
337 futu_auth::audit::reject("rest", "/api/unsub-acc-push", &key_id, &reason);
338 return Err((
339 axum::http::StatusCode::BAD_REQUEST,
340 Json(serde_json::json!({
341 "error": format!("/api/unsub-acc-push: {reason}")
342 })),
343 ));
344 }
345 };
346 if let Err(reason) = crate::routes::trd::validate_sub_acc_push_acc_ids(&acc_ids) {
347 let key_id = rec
348 .as_deref()
349 .map(|r| r.as_ref().id.clone())
350 .unwrap_or_else(|| "<legacy>".to_string());
351 futu_auth::audit::reject("rest", "/api/unsub-acc-push", &key_id, reason);
352 return Err((
353 axum::http::StatusCode::BAD_REQUEST,
354 Json(serde_json::json!({
355 "error": format!("/api/unsub-acc-push: {reason}")
356 })),
357 ));
358 }
359
360 crate::routes::trd::check_per_acc_rate_for_caller(
365 &state.counters,
366 rec.as_deref().map(|r| r.as_ref()),
367 &acc_ids,
368 "/api/unsub-acc-push",
369 )?;
370
371 let _ = body;
376 let daemon_resp: Json<serde_json::Value> = Json(serde_json::json!({
377 "ret_type": 0,
378 "ret_msg": serde_json::Value::Null,
379 "err_code": serde_json::Value::Null,
380 "s2c": {}
381 }));
382
383 if let Some(rec_ref) = rec.as_deref() {
389 let key_id = rec_ref.as_ref().id.clone();
390 crate::adapter::with_rest_acc_subscriptions_write(&state.rest_acc_subscriptions, |subs| {
391 let entry = subs.entry(key_id).or_default();
394 for &acc_id in &acc_ids {
395 entry.remove(&acc_id);
396 }
397 });
399 }
400
401 Ok(daemon_resp)
402}
403
404#[derive(Debug, Deserialize, Default)]
428#[serde(deny_unknown_fields)]
429pub struct TokenStateQuery {
430 pub app_id: Option<String>,
432}
433
434pub async fn get_token_state(
435 State(state): State<RestState>,
436 Query(q): Query<TokenStateQuery>,
437 body: Option<Json<Value>>,
438) -> ApiResult {
439 let body_val = match body {
441 Some(Json(mut v)) => {
442 if let Some(qs_app) = q.app_id.as_ref()
444 && let Some(map) = v.as_object_mut()
445 {
446 let c2s_has = map
447 .get("c2s")
448 .and_then(|c| c.as_object())
449 .is_some_and(|c| c.contains_key("app_id") || c.contains_key("appId"));
450 let top_has = map.contains_key("app_id") || map.contains_key("appId");
451 if !c2s_has && !top_has {
452 map.entry("c2s".to_string())
453 .or_insert_with(|| serde_json::json!({}))
454 .as_object_mut()
455 .map(|c| {
456 c.insert(
457 "app_id".to_string(),
458 serde_json::Value::String(qs_app.clone()),
459 )
460 });
461 }
462 }
463 Some(v)
464 }
465 None => {
466 q.app_id
468 .as_ref()
469 .map(|app_id| serde_json::json!({"c2s": {"app_id": app_id}}))
470 }
471 };
472 adapter::proto_request::<
473 futu_token_state::DaemonGetTokenStateReq,
474 futu_token_state::DaemonGetTokenStateRsp,
475 >(&state, proto_id::GET_TOKEN_STATE, body_val)
476 .await
477}
478
479#[cfg(test)]
480mod tests;