futu_rest/routes/qot/
subscribe.rs1use axum::extract::{Extension, Json, State};
6use axum::http::StatusCode;
7use futu_auth::KeyRecord;
8use serde_json::Value;
9use std::sync::Arc;
10
11use crate::caller_context::CallerContext;
12use futu_core::proto_id;
13
14use super::*;
15
16pub async fn subscribe(
17 State(state): State<RestState>,
18 rec: Option<Extension<Arc<KeyRecord>>>,
19 Json(body): Json<Value>,
20) -> ApiResult {
21 if !body_has_sub_or_unsub_flag(&body) {
24 return Err((
25 StatusCode::BAD_REQUEST,
26 Json(serde_json::json!({
27 "error": "/api/subscribe: 必须显式传 `is_sub_or_un_sub: true` (订阅) \
28 或 `is_sub_or_un_sub: false` (退订). 缺省时 daemon 会按 false \
29 (退订) 处理, 对 invalid ticker silent success (eli S-005 P1). \
30 alias `is_sub` 也接受. 示例: \
31 {\"c2s\": {\"security_list\": [...], \"sub_type_list\": [1], \
32 \"is_sub_or_un_sub\": true}}",
33 "v1.4.104_eli_s005_fix": true,
34 })),
35 ));
36 }
37 if body_requests_unsub_all(&body) {
40 return Err((
41 StatusCode::BAD_REQUEST,
42 Json(serde_json::json!({
43 "error": "/api/subscribe with is_unsub_all=true is **REST process-wide** — \
44 all REST callers share REST_SHARED_CONN (v1.4.90 P0-B), so \
45 清掉一个 = 清掉**所有 REST clients** 的 qot 订阅 (跨 caller \
46 影响). v1.4.104 codex round 2 F5 P2 fix: 默认 reject, 防止 \
47 caller A 意外清掉 caller B 的 subs. 替代方案:\n \
48 (a) 单 symbol unsubscribe: 列具体 sec_list + sub_type_list + \
49 is_sub_or_un_sub=false (per-key safe);\n \
50 (b) MCP / gRPC / WS surface 调 unsub_all (各 caller 有自己 conn_id). \
51 REST 当前没有 process-wide opt-in 或 admin clear endpoint.",
52 "v1.4.104_codex_round2_f5_fix": true,
53 "alternatives": [
54 "use explicit security_list + sub_type_list + is_sub_or_un_sub=false",
55 "use MCP / gRPC / WS for per-caller unsub_all",
56 ],
57 })),
58 ));
59 }
60 let ctx = CallerContext::from_key_record(rec.as_deref().map(|r| r.as_ref()));
63 proto_request_shared_conn::<qot_sub::Request, qot_sub::Response>(
64 &state,
65 proto_id::QOT_SUB,
66 Some(body),
67 Some(&ctx),
68 )
69 .await
70}
71
72pub(super) fn body_requests_unsub_all(body: &Value) -> bool {
75 const UNSUB_ALL_KEYS: &[&str] = &["is_unsub_all", "isUnsubAll", "IsUnsubAll"];
76 let check_obj = |obj: &serde_json::Map<String, Value>| -> bool {
77 UNSUB_ALL_KEYS
78 .iter()
79 .any(|k| obj.get(*k) == Some(&Value::Bool(true)))
80 };
81 match body {
82 Value::Object(top) => {
83 if check_obj(top) {
84 return true;
85 }
86 if let Some(Value::Object(c2s)) = top.get("c2s")
87 && check_obj(c2s)
88 {
89 return true;
90 }
91 false
92 }
93 _ => false,
94 }
95}
96
97pub(super) fn body_has_sub_or_unsub_flag(body: &Value) -> bool {
108 const SUB_OR_UNSUB_KEYS: &[&str] = &[
110 "is_sub_or_un_sub",
111 "isSubOrUnSub",
112 "IsSubOrUnSub",
113 "is_sub",
114 "isSub",
115 "IsSub",
116 ];
117 const UNSUB_ALL_KEYS: &[&str] = &["is_unsub_all", "isUnsubAll", "IsUnsubAll"];
118 let check_obj = |obj: &serde_json::Map<String, Value>| -> bool {
119 for k in SUB_OR_UNSUB_KEYS {
120 if obj.contains_key(*k) {
121 return true;
122 }
123 }
124 for k in UNSUB_ALL_KEYS {
126 if obj.get(*k) == Some(&Value::Bool(true)) {
127 return true;
128 }
129 }
130 false
131 };
132 match body {
133 Value::Object(top) => {
134 if check_obj(top) {
135 return true;
136 }
137 if let Some(Value::Object(c2s)) = top.get("c2s")
138 && check_obj(c2s)
139 {
140 return true;
141 }
142 false
143 }
144 _ => false,
145 }
146}
147
148pub async fn get_sub_info(
160 State(state): State<RestState>,
161 rec: Option<Extension<Arc<KeyRecord>>>,
162) -> ApiResult {
163 let body = serde_json::json!({
165 "c2s": {"is_req_all_conn": true}
166 });
167 let ctx = CallerContext::from_key_record(rec.as_deref().map(|r| r.as_ref()));
171 proto_request_shared_conn::<qot_get_sub_info::Request, qot_get_sub_info::Response>(
172 &state,
173 proto_id::QOT_GET_SUB_INFO,
174 Some(body),
175 Some(&ctx),
176 )
177 .await
178}