1use axum::extract::Json;
6use axum::http::StatusCode;
7use bytes::Bytes;
8use prost::Message;
9use serde_json::Value;
10
11use futu_codec::header::ProtoFmtType;
12use futu_proto::qot_get_basic_qot;
13use futu_proto::qot_get_broker;
14use futu_proto::qot_get_capital_distribution;
15use futu_proto::qot_get_capital_flow;
16use futu_proto::qot_get_code_change;
17use futu_proto::qot_get_future_info;
18use futu_proto::qot_get_holding_change_list;
19use futu_proto::qot_get_ipo_list;
20use futu_proto::qot_get_kl;
21use futu_proto::qot_get_market_state;
22use futu_proto::qot_get_option_chain;
23use futu_proto::qot_get_option_expiration_date;
24use futu_proto::qot_get_order_book;
25use futu_proto::qot_get_owner_plate;
26use futu_proto::qot_get_plate_security;
27use futu_proto::qot_get_plate_set;
28use futu_proto::qot_get_price_reminder;
29use futu_proto::qot_get_reference;
30use futu_proto::qot_get_rt;
31use futu_proto::qot_get_security_snapshot;
32use futu_proto::qot_get_static_info;
33use futu_proto::qot_get_sub_info;
34use futu_proto::qot_get_suspend;
35use futu_proto::qot_get_ticker;
36use futu_proto::qot_get_user_security;
37use futu_proto::qot_get_warrant;
38use futu_proto::qot_modify_user_security;
39use futu_proto::qot_request_history_kl;
40use futu_proto::qot_request_history_kl_quota;
41use futu_proto::qot_request_rehab;
42use futu_proto::qot_request_trade_date;
43use futu_proto::qot_set_price_reminder;
44use futu_proto::qot_stock_filter;
45use futu_proto::qot_sub;
46use futu_proto::used_quota;
47use futu_server::conn::IncomingRequest;
48
49use crate::adapter::{
50 self, RestState, apply_known_field_aliases, expand_symbol_shorthand,
51 maybe_wrap_flat_body_as_c2s, normalize_json_keys_snake_case,
52};
53
54type ApiResult = Result<Json<Value>, (StatusCode, Json<Value>)>;
55
56pub const REST_SHARED_CONN: u64 = 0xFFFF_FFFE;
86
87async fn proto_request_shared_conn<Req, Rsp>(
104 state: &RestState,
105 proto_id: u32,
106 json_body: Option<Value>,
107 ctx: Option<&crate::caller_context::CallerContext>,
108) -> ApiResult
109where
110 Req: Message + Default + serde::de::DeserializeOwned,
111 Rsp: Message + Default + serde::Serialize,
112{
113 let req_msg: Req = if let Some(mut body) = json_body {
115 normalize_json_keys_snake_case(&mut body);
116 apply_known_field_aliases(&mut body);
117 maybe_wrap_flat_body_as_c2s(&mut body);
118 expand_symbol_shorthand(&mut body)
120 .map_err(|e| (StatusCode::BAD_REQUEST, Json(validation_error_body(e))))?;
121 if let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) {
122 let validation_result = if let Some(c2s) = body.get_mut("c2s") {
123 futu_surface_spec::validate_and_normalize(spec, c2s)
124 } else {
125 futu_surface_spec::validate_and_normalize(spec, &mut body)
126 };
127 if let Err(err) = validation_result {
128 return Err(map_surface_spec_error(spec, err));
129 }
130 }
131 serde_json::from_value(body).map_err(|e| {
132 (
133 StatusCode::BAD_REQUEST,
134 Json(validation_error_body(format!("invalid request body: {e}"))),
135 )
136 })?
137 } else {
138 Req::default()
139 };
140
141 let body = Bytes::from(req_msg.encode_to_vec());
143
144 let incoming = IncomingRequest::builder(
149 REST_SHARED_CONN,
150 proto_id,
151 state.next_serial(),
152 ProtoFmtType::Protobuf,
153 body,
154 )
155 .with_caller_scope(
156 ctx.and_then(|c| c.caller_allowed_acc_ids_arc()),
157 ctx.and_then(|c| c.caller_key_id()),
158 )
159 .build();
160 let resp_bytes = state
161 .router
162 .dispatch(REST_SHARED_CONN, &incoming)
163 .await
164 .ok_or_else(|| {
165 (
166 StatusCode::INTERNAL_SERVER_ERROR,
167 Json(serde_json::json!({
168 "error": "handler returned no response"
169 })),
170 )
171 })?;
172
173 let rsp_msg = Rsp::decode(Bytes::from(resp_bytes)).map_err(|e| {
175 (
176 StatusCode::INTERNAL_SERVER_ERROR,
177 Json(serde_json::json!({
178 "error": format!("failed to decode response: {e}")
179 })),
180 )
181 })?;
182
183 let mut json_rsp = serde_json::to_value(&rsp_msg).map_err(|e| {
185 (
186 StatusCode::INTERNAL_SERVER_ERROR,
187 Json(serde_json::json!({
188 "error": format!("failed to serialize response: {e}")
189 })),
190 )
191 })?;
192
193 wrap_err_code_prefix_inline(&mut json_rsp);
196
197 Ok(Json(json_rsp))
198}
199
200fn map_surface_spec_error(
201 spec: &'static futu_surface_spec::EndpointSpec,
202 err: futu_surface_spec::DispatchError,
203) -> (StatusCode, Json<Value>) {
204 let proto_id = spec
205 .proto_id()
206 .map(|id| id.to_string())
207 .unwrap_or_else(|| "daemon-local".to_string());
208 let ret_msg = format!(
209 "{} (endpoint: {}, proto_id: {})",
210 err, spec.canonical_name, proto_id
211 );
212 let mut body = validation_error_body(ret_msg);
213 let machine_error_field = spec.runtime.error.machine_error_field;
214 if let Some(obj) = body.as_object_mut() {
215 obj.insert(
216 machine_error_field.to_string(),
217 serde_json::json!({
218 "kind": "validation_error",
219 "message": err.to_string(),
220 "endpoint": spec.canonical_name,
221 "proto_id": proto_id,
222 }),
223 );
224 }
225 (StatusCode::BAD_REQUEST, Json(body))
226}
227
228fn validation_error_body(message: impl Into<String>) -> Value {
229 let message = message.into();
230 serde_json::json!({
231 "ret_type": -1,
232 "ret_msg": message,
233 "error": message,
234 })
235}
236
237fn wrap_err_code_prefix_inline(v: &mut Value) {
247 let obj = match v.as_object_mut() {
248 Some(o) => o,
249 None => return,
250 };
251 let is_err = obj
252 .get("ret_type")
253 .and_then(|t| t.as_i64())
254 .map(|t| t != 0)
255 .unwrap_or(false);
256 if !is_err {
257 return;
258 }
259 let raw_msg = obj
260 .get("ret_msg")
261 .and_then(|m| m.as_str())
262 .unwrap_or("")
263 .to_string();
264 if raw_msg.starts_with("[err_code=") {
265 return;
266 }
267 let err_code_label = match obj.get("err_code") {
268 Some(Value::Number(n)) => n
269 .as_i64()
270 .map(|i| i.to_string())
271 .unwrap_or_else(|| "none".to_string()),
272 _ => "none".to_string(),
273 };
274 let new_msg = if raw_msg.is_empty() {
275 format!("[err_code={err_code_label}]")
276 } else {
277 format!("[err_code={err_code_label}] {raw_msg}")
278 };
279 obj.insert("ret_msg".to_string(), Value::String(new_msg));
280}
281
282mod misc;
312mod quotes;
313mod reference;
314mod snapshot;
315mod subscribe;
316
317#[cfg(test)]
318mod tests;
319
320#[cfg(test)]
321use misc::inject_default_is_req_all_conn;
322#[cfg(test)]
323use quotes::{annotate_quote_cache_miss, orderbook_loud_unsub_hint};
324#[cfg(test)]
325use snapshot::{
326 augment_snapshot_with_exchange_code, augment_static_info_with_exchange_code,
327 check_static_info_input,
328};
329#[cfg(test)]
330use subscribe::body_has_sub_or_unsub_flag;
331
332pub use misc::{
333 get_risk_free_rate, get_spread_table, get_ticker_statistic, get_ticker_statistic_detail,
334 list_plates, query_subscription, unsubscribe,
335};
336pub use quotes::{get_basic_qot, get_broker, get_kl, get_order_book, get_rt, get_ticker};
337pub use reference::{
338 get_capital_distribution, get_capital_flow, get_code_change, get_future_info,
339 get_holding_change, get_ipo_list, get_market_state, get_option_chain,
340 get_option_expiration_date, get_owner_plate, get_plate_security, get_plate_set,
341 get_price_reminder, get_reference, get_suspend, get_used_quota, get_user_security, get_warrant,
342 modify_user_security, request_history_kl, request_history_kl_quota, request_rehab,
343 request_trading_days, set_price_reminder, stock_filter,
344};
345pub use snapshot::{get_snapshot, get_static_info};
346pub use subscribe::{get_sub_info, subscribe};