futu_grpc/server.rs
1//! gRPC 服务实现
2//!
3//! FutuOpenD 服务通过通用的 proto_id + body 方式,
4//! 将所有请求转发到现有的 RequestRouter。
5//! 支持流式推送:行情、交易、广播事件通过 SubscribePush 接口推送给客户端。
6//!
7//! ## v1.4.106 codex 0517 ζ25-redo F2: stateful QOT stable identity
8//!
9//! gRPC `request()` 与 `subscribe_push()` 共享同一身份派生函数
10//! [`auth::derive_grpc_conn_id`]:从 `(Bearer token, optional grpc-session-id
11//! metadata)` 派生 deterministic stable conn_id(在
12//! [`auth::GRPC_STABLE_CONN_NAMESPACE`] 即 bit 62 namespace 内)。
13//!
14//! 这让同一 caller 的连续 RPC 命中同一 SubscriptionManager / cache 状态:
15//! - subscribe → unsubscribe / get_sub_info / query_subscription 全对齐
16//! - quote cache 命中(per-conn cache 不再每次 RPC miss)
17//! - QOT push fanout per-conn filter 能找到 caller
18//!
19//! 不同 Bearer / 不同 session_id → 自然隔离(caller 之间互不影响)。
20//! Legacy mode(KeyStore 未配置 / Bearer 缺失)→ 共享一个固定 conn_id(
21//! 全 legacy caller 共享同 sub state,对齐"无鉴权配置 = 单租户"语义)。
22//!
23//! 历史:v1.4.105 之前用自增 `conn_id_counter`,每次 RPC 拿新 ID,
24//! 等价 REST v1.4.90 P0-B 之前的 quota 永久泄漏 bug。F2 修复对齐
25//! `REST_SHARED_CONN` 设计哲学,不同点是 gRPC 按 caller 隔离(REST 共享)。
26
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU32, Ordering};
29
30use bytes::Bytes;
31use tokio::sync::{broadcast, mpsc};
32use tokio_stream::wrappers::ReceiverStream;
33use tonic::{Request, Response, Status};
34
35use futu_auth::{KeyStore, RuntimeCounters, Scope};
36use futu_codec::header::ProtoFmtType;
37use futu_server::conn::IncomingRequest;
38use futu_server::push::ExternalPushSink;
39use futu_server::router::RequestRouter;
40
41use crate::auth::{
42 derive_grpc_conn_id, extract_grpc_idempotency_key, extract_grpc_session_id, extract_grpc_token,
43 grpc_status_for,
44};
45use crate::proto::futu_open_d_server::{FutuOpenD, FutuOpenDServer};
46use crate::proto::{FutuRequest, FutuResponse, PushEvent, SubscribePushRequest};
47use futu_auth_pipeline::{
48 AuthDecision, AuthEnvelope, Credential, Endpoint, FilterRegistry, PushEventCtx, RejectKind,
49 SurfaceId, authenticate_request,
50};
51
52/// gRPC 推送广播器
53///
54/// 实现 `ExternalPushSink` trait,接收 PushDispatcher 的推送事件,
55/// 通过 broadcast channel 分发给所有 SubscribePush 流式连接。
56#[derive(Clone)]
57pub struct GrpcPushBroadcaster {
58 tx: broadcast::Sender<PushEvent>,
59}
60
61impl GrpcPushBroadcaster {
62 pub fn new(capacity: usize) -> Self {
63 let (tx, _) = broadcast::channel(capacity);
64 Self { tx }
65 }
66
67 /// 创建接收端
68 pub fn subscribe(&self) -> broadcast::Receiver<PushEvent> {
69 self.tx.subscribe()
70 }
71
72 fn send(&self, event: PushEvent) {
73 let _ = self.tx.send(event);
74 }
75}
76
77impl ExternalPushSink for GrpcPushBroadcaster {
78 /// **v1.4.106 codex 1131 F4 [P1]**: `rehab_type` 透传到 gRPC PushEvent.
79 /// gRPC 当前 broadcast 模型 (所有 qot:read subscriber 收到所有 quote
80 /// event), per-conn (sec_key, sub_type, rehab_type) 三元 filter 是 raw TCP
81 /// 专属. rehab_type 通过 PushEvent 让客户端可见.
82 fn on_quote_push(
83 &self,
84 sec_key: &str,
85 sub_type: i32,
86 rehab_type: i32,
87 proto_id: u32,
88 body: &[u8],
89 ) {
90 self.send(PushEvent {
91 proto_id,
92 sec_key: sec_key.to_string(),
93 sub_type,
94 rehab_type,
95 body: body.to_vec(),
96 event_type: "quote".to_string(),
97 acc_id: 0,
98 trd_market: String::new(), // 行情推送无 trd_market
99 });
100 }
101
102 fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
103 self.send(PushEvent {
104 proto_id,
105 sec_key: String::new(),
106 sub_type: 0,
107 rehab_type: 0,
108 body: body.to_vec(),
109 event_type: "notify".to_string(),
110 acc_id: 0,
111 trd_market: String::new(),
112 });
113 }
114
115 /// v1.4.105 D3 (Phase 4) T-B3: trade push trd_market 由 PushDispatcher 一
116 /// 次 decode 后透传, 不再各 sink 独立 decode body. 空 / unknown → 空
117 /// 字符串 (PushEvent proto3 默认值, 老 client 解析兼容).
118 fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
119 self.send(PushEvent {
120 proto_id,
121 sec_key: String::new(),
122 sub_type: 0,
123 rehab_type: 0,
124 body: body.to_vec(),
125 event_type: "trade".to_string(),
126 acc_id,
127 trd_market: trd_market.unwrap_or("").to_string(),
128 });
129 }
130}
131
132/// gRPC 服务实现
133pub struct FutuGrpcService {
134 router: Arc<RequestRouter>,
135 push_broadcaster: Arc<GrpcPushBroadcaster>,
136 key_store: Arc<KeyStore>,
137 counters: Arc<RuntimeCounters>,
138 /// v1.4.104: response filter 注册中心 (proto 2001 = AccListFilter).
139 /// 加新 filter 在 `FilterRegistry::install_defaults` 注册一次, 4 surface
140 /// 自动生效.
141 filter_registry: Arc<FilterRegistry>,
142 /// v1.4.106 codex 0517 ζ25-redo F2: gRPC `conn_id_counter` 已删除 —
143 /// 自增 conn_id 让同一 caller 连续 RPC 拿不到同 sub state, 等价 REST
144 /// v1.4.90 P0-B 之前的 quota 永久泄漏 bug. 现在改用
145 /// [`auth::derive_grpc_conn_id`] 从 `(bearer, session_id)` 派生
146 /// deterministic stable conn_id, 同 caller 连续 RPC 命中同一 sub state.
147 serial_counter: AtomicU32,
148}
149
150impl FutuGrpcService {
151 pub fn new(router: Arc<RequestRouter>, push_broadcaster: Arc<GrpcPushBroadcaster>) -> Self {
152 Self::with_auth(
153 router,
154 push_broadcaster,
155 Arc::new(KeyStore::empty()),
156 Arc::new(RuntimeCounters::new()),
157 )
158 }
159
160 /// 完整构造:同时接 key_store + counters(v1.0 推荐入口)
161 ///
162 /// `counters` 应由 main 全进程共享:REST / gRPC / MCP 共用一个实例才能保证
163 /// rate limit / 日累计跨接口一致
164 pub fn with_auth(
165 router: Arc<RequestRouter>,
166 push_broadcaster: Arc<GrpcPushBroadcaster>,
167 key_store: Arc<KeyStore>,
168 counters: Arc<RuntimeCounters>,
169 ) -> Self {
170 Self {
171 router,
172 push_broadcaster,
173 key_store,
174 counters,
175 filter_registry: Arc::new(FilterRegistry::with_defaults()),
176 serial_counter: AtomicU32::new(1),
177 }
178 }
179
180 fn next_serial(&self) -> u32 {
181 self.serial_counter.fetch_add(1, Ordering::Relaxed)
182 }
183}
184
185#[tonic::async_trait]
186impl FutuOpenD for FutuGrpcService {
187 /// 通用请求-响应
188 async fn request(
189 &self,
190 request: Request<FutuRequest>,
191 ) -> Result<Response<FutuResponse>, Status> {
192 // v1.4.104: 走 futu_auth_pipeline::authenticate_request 单一函数,
193 // 不再 inline authenticate / check_scope / rate gate / body-aware /
194 // audit. surface adapter 极薄 — 仅 transport extract + RejectKind →
195 // Status 翻译.
196
197 let proto_id = request.get_ref().proto_id;
198 if proto_id == 0 {
199 return Err(Status::invalid_argument("proto_id is required"));
200 }
201 // v1.4.106 codex 0532 F3 (P2): daemon-internal proto_id (高位
202 // 0x8000_0000 bit) 绝不应从 gRPC 公开 surface 进入 — 仅 REST handler
203 // 内部合成给 router. 显式 reject + audit, 防探测 daemon 内部 routing.
204 if futu_auth::is_internal_proto_id(proto_id) {
205 tracing::warn!(
206 proto_id,
207 "rejecting daemon-internal proto_id at gRPC public surface (codex 0532 F3)"
208 );
209 return Err(Status::permission_denied(
210 "daemon-internal proto_id not allowed on public surface",
211 ));
212 }
213
214 // 1. extract token + session (owned String 避免 borrow vs move 冲突).
215 // v1.4.106 codex 0517 ζ25-redo F2: 同时拿 grpc-session-id metadata,
216 // 派生 stateful QOT stable conn_id (derive_grpc_conn_id).
217 let token = extract_grpc_token(&request);
218 let session_id = extract_grpc_session_id(&request);
219 let idempotency_key = extract_grpc_idempotency_key(&request);
220 let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
221 // v1.4.110 Surface Spec v2: gRPC generic proto path is spec-owned.
222 // Unknown proto_id must not bypass EndpointSpec into auth/router.
223 let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) else {
224 tracing::warn!(
225 proto_id,
226 "rejecting gRPC request for proto_id not declared in EndpointSpec"
227 );
228 return Err(Status::invalid_argument(
229 "proto_id is not declared in endpoint spec",
230 ));
231 };
232 match spec.grpc_exposure() {
233 futu_surface_spec::SurfaceExposure::Exposed(
234 futu_surface_spec::GrpcSurface::GenericProtoRequest,
235 ) => {}
236 futu_surface_spec::SurfaceExposure::NotExposed { reason } => {
237 tracing::warn!(
238 proto_id,
239 endpoint = spec.canonical_name,
240 reason,
241 "rejecting gRPC request for endpoint not exposed to gRPC"
242 );
243 return Err(Status::permission_denied(reason));
244 }
245 }
246 let needed_scope = Some(spec.runtime.scope);
247 let endpoint_name = spec.canonical_name;
248 tracing::debug!(
249 proto_id,
250 endpoint = endpoint_name,
251 "gRPC request dispatch (Layer 2 spec lookup)"
252 );
253 let req_inner = request.into_inner();
254
255 // 2. 构 credential + envelope, 调 pipeline
256 let credential = match token.as_deref() {
257 Some(t) => Credential::Bearer(t),
258 None => Credential::None,
259 };
260 let env = AuthEnvelope {
261 surface: SurfaceId::Grpc,
262 endpoint: Endpoint::Proto(proto_id),
263 needed_scope,
264 credential,
265 proto_id: Some(proto_id),
266 body: &req_inner.body,
267 explicit_acc_id: None,
268 explicit_ctx: None,
269 commit_rate: true, // gRPC middleware 层 commit rate (trade:real)
270 audit_emit: true,
271 };
272 let (allowed_for_filter, caller_rec) =
273 match authenticate_request(&self.key_store, &self.counters, env) {
274 AuthDecision::Reject { kind, reason, .. } => {
275 return Err(grpc_status_for(kind, reason));
276 }
277 AuthDecision::Allow {
278 allowed_acc_ids,
279 rec,
280 ..
281 } => (allowed_acc_ids, rec),
282 };
283
284 // 3. dispatch + response filter
285 // v1.4.105 D2 T-A1 fix: caller_allowed_acc_ids 从 pipeline allow decision
286 // 真填进 IncomingRequest, 让 dispatch handler (e.g. SubAccPushHandler)
287 // 端 enforce per-acc whitelist defense-in-depth.
288 // codex 0522 F1 v1.4.106: 同步填 caller_key_id 让 cross-surface handler
289 // 能识别 gRPC caller.
290 let caller_allowed = allowed_for_filter
291 .as_ref()
292 .map(|s| std::sync::Arc::new(s.clone()));
293 let caller_key_id = caller_rec.as_ref().map(|r| r.id.clone());
294 let incoming = IncomingRequest::builder(
295 stable_conn_id,
296 proto_id,
297 self.next_serial(),
298 ProtoFmtType::Protobuf,
299 Bytes::from(req_inner.body),
300 )
301 .with_idempotency_key(idempotency_key)
302 .with_caller_scope(caller_allowed, caller_key_id)
303 .build();
304
305 match self.router.dispatch(incoming.conn_id, &incoming).await {
306 Some(resp_bytes) => {
307 // v1.4.104: 用 FilterRegistry (单一注册表) 替代 inline
308 // filter_acc_list_response. 加新 filter (e.g. cash-flow) 只
309 // 在 registry 注册一次, 4 surface 自动生效.
310 let filtered_body =
311 self.filter_registry
312 .apply(proto_id, resp_bytes, allowed_for_filter.as_ref());
313 Ok(Response::new(FutuResponse {
314 ret_type: 0,
315 ret_msg: String::new(),
316 proto_id,
317 body: filtered_body,
318 }))
319 }
320 None => Ok(Response::new(FutuResponse {
321 ret_type: -1,
322 ret_msg: "handler returned no response".to_string(),
323 proto_id,
324 body: Vec::new(),
325 })),
326 }
327 }
328
329 type SubscribePushStream = ReceiverStream<Result<PushEvent, Status>>;
330
331 /// 流式推送:客户端建立连接后持续接收行情、交易、广播推送
332 ///
333 /// v1.1:按订阅 key 的 scope 过滤推送 —— `qot:read`-only 的 key 不会收到
334 /// `trade` 类(账户交易回报),对齐 REST `/ws` v0.9.0 加的 push filter。
335 ///
336 /// ## v1.4.104 阶段 7-2: pipeline 委托
337 ///
338 /// **handshake**: pipeline 调一次 with `Endpoint::Event("subscribe")` +
339 /// `needed_scope=None` (跳 scope check, 走 OR 语义手工 check). Allow → 拿
340 /// rec snapshot. Reject (Bearer invalid / expired) → translate to gRPC Status.
341 ///
342 /// **per-event filter**: stream 内每 event 调一次 pipeline with
343 /// `Credential::PreVerified(rec)` + `Endpoint::Event(event_type)` +
344 /// `needed_scope=Some(scope_for_event(...))` + `explicit_acc_id` (trade event
345 /// 给 event.acc_id, 其他不传) + `audit_emit=false` (避免每 event 一条 audit
346 /// 把日志冲爆) + `commit_rate=false` (push 不计 rate). Reject → silent drop +
347 /// `metrics::bump_ws_filtered`. Allow → forward.
348 ///
349 /// 与 v1.4.103 行为 byte-identical: handshake qot:read OR acc:read, per-event
350 /// scope match + trade acc_id whitelist (受限 key + acc_id=0 仍 drop, 由 pipeline
351 /// `body_aware::CheckCtx { acc_id: Some(0) }` + `allowed_acc_ids` 非空 → reject 实现).
352 async fn subscribe_push(
353 &self,
354 request: Request<SubscribePushRequest>,
355 ) -> Result<Response<Self::SubscribePushStream>, Status> {
356 // v1.4.106 codex 1125 F6 [P2]: 显式 notify subscription opt-in.
357 // 对齐 C++ raw TCP `IsConnSubRecvNotify` 语义 (broadcast push 必须显式
358 // sub 才下发).
359 let notify_subscribe = request.get_ref().notify_subscribe;
360
361 // ── handshake: pipeline 调一次拿 caller-key + audit 一次 ──────────────────
362 //
363 // 用 `needed_scope=None` 跳 pipeline scope check (因为我们要 qot:read OR
364 // acc:read 双 OR 语义, 不是单 scope match). Bearer 提取 / verify / expiry
365 // 都走 pipeline (单一 source).
366 //
367 // v1.4.106 codex 0517 ζ25-redo F2: 与 `request()` 共享 stable conn_id.
368 // 这让 ops/log 能把 push stream 和同 caller 的 subscribe request 串起来:
369 // grep `conn_id=<stable>` 看到 subscribe RPC + push event filter 同 id.
370 let token = extract_grpc_token(&request);
371 let session_id = extract_grpc_session_id(&request);
372 let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
373 let credential = match token.as_deref() {
374 Some(t) => Credential::Bearer(t),
375 None => Credential::None,
376 };
377 let env = AuthEnvelope {
378 surface: SurfaceId::Grpc,
379 endpoint: Endpoint::Event("subscribe_push"),
380 needed_scope: None, // OR 语义手工 check, pipeline 不参与
381 credential,
382 proto_id: None,
383 body: &[],
384 explicit_acc_id: None,
385 explicit_ctx: None,
386 commit_rate: false,
387 audit_emit: true, // handshake 一次 audit
388 };
389 let rec_snapshot = match authenticate_request(&self.key_store, &self.counters, env) {
390 AuthDecision::Reject { kind, reason, .. } => {
391 return Err(grpc_status_for(kind, reason));
392 }
393 AuthDecision::Allow { rec, .. } => rec,
394 };
395
396 // OR 语义 scope check: legacy mode (rec=None) 全放行;
397 // scope mode 必须持 qot:read OR acc:read 任一.
398 let (scopes, key_id, allowed_acc_ids) = match rec_snapshot.as_ref() {
399 Some(rec) => {
400 let qot_ok = rec.scopes.contains(&Scope::QotRead);
401 let acc_ok = rec.scopes.contains(&Scope::AccRead);
402 if !qot_ok && !acc_ok {
403 futu_auth::audit::reject(
404 "grpc",
405 "event=subscribe_push",
406 &rec.id,
407 "missing scope qot:read OR acc:read",
408 );
409 return Err(grpc_status_for(
410 RejectKind::Forbidden,
411 "missing scope qot:read OR acc:read".to_string(),
412 ));
413 }
414 (
415 rec.scopes.clone(),
416 rec.id.clone(),
417 rec.allowed_acc_ids.clone(),
418 )
419 }
420 None => (
421 // legacy: 全 scope + 无 acc 限制
422 [
423 Scope::QotRead,
424 Scope::AccRead,
425 Scope::TradeSimulate,
426 Scope::TradeReal,
427 ]
428 .into_iter()
429 .collect::<std::collections::HashSet<Scope>>(),
430 "<none>".to_string(),
431 None,
432 ),
433 };
434
435 let (tx, rx) = mpsc::channel(256);
436 let mut push_rx = self.push_broadcaster.subscribe();
437
438 tracing::info!(
439 key_id = %key_id,
440 // v1.4.106 codex 0517 ζ25-redo F2: stable conn_id 让 subscribe RPC
441 // 和 push stream 在 log 里可关联.
442 conn_id = stable_conn_id,
443 scopes = ?scopes,
444 allowed_acc_ids = ?allowed_acc_ids.as_ref().map(|s| s.len()),
445 "gRPC client subscribed to push events",
446 );
447
448 // ── per-event filter: pipeline 二次调 (audit_emit=false 防日志冲爆) ───────
449 //
450 // 每 event 调一次 pipeline:
451 // - Credential::PreVerified(rec) 复用 handshake 已 verify 的 rec
452 // - Endpoint::Event(event_type) 给 audit label (虽然 audit_emit=false)
453 // - needed_scope = scope_for_event(event_type)
454 // - explicit_acc_id = trade event 给 event.acc_id, 其他 None
455 //
456 // legacy mode (rec_snapshot=None) 不调 pipeline (没必要, 全放行).
457 let key_store_arc = self.key_store.clone();
458 let counters_arc = self.counters.clone();
459 // v1.4.105 D3 (Phase 4): clone filter_registry for per-event Layer 3
460 // (allowed_markets) check + Layer 1/2 reuse.
461 let filter_registry_arc = self.filter_registry.clone();
462 tokio::spawn(async move {
463 loop {
464 match push_rx.recv().await {
465 Ok(event) => {
466 // legacy fast-path: rec=None → 全放行, scope check / acc_id
467 // whitelist 都不需要走 pipeline.
468 let allow_event = if let Some(rec) = rec_snapshot.as_ref() {
469 let needed = scope_for_event(&event.event_type);
470 let explicit_acc_id = if event.event_type == "trade" {
471 Some(event.acc_id)
472 } else {
473 None
474 };
475 let env = AuthEnvelope {
476 surface: SurfaceId::Grpc,
477 endpoint: Endpoint::Event(&event.event_type),
478 needed_scope: Some(needed),
479 credential: Credential::PreVerified(rec.clone()),
480 proto_id: None,
481 body: &[],
482 explicit_acc_id,
483 explicit_ctx: None,
484 commit_rate: false, // push 不计 rate
485 audit_emit: false, // per-event 不 audit, 避免冲爆
486 };
487 matches!(
488 authenticate_request(&key_store_arc, &counters_arc, env),
489 AuthDecision::Allow { .. }
490 )
491 } else {
492 true
493 };
494
495 if !allow_event {
496 // metric label 区分 trade acc_id reject vs scope reject
497 // (近似旧行为: trade 类 reject 标 "trade_acc_id" 仅当
498 // acc_id whitelist 非空; 否则按 event_type)
499 let label = if event.event_type == "trade"
500 && rec_snapshot
501 .as_ref()
502 .and_then(|r| r.allowed_acc_ids.as_ref())
503 .is_some_and(|s| !s.is_empty())
504 {
505 "trade_acc_id"
506 } else {
507 event.event_type.as_str()
508 };
509 futu_auth::metrics::bump_ws_filtered(label, &key_id);
510 continue;
511 }
512
513 // v1.4.105 D3 (Phase 4): FilterRegistry::should_drop_event
514 // Layer 3 (allowed_markets). Layer 1 (allowed_acc_ids) 已在
515 // 上面 pipeline body-aware 跑过, 此处串行 Layer 3 双重
516 // 防御 + 未来 Layer 扩展自动覆盖.
517 // ⚠️ UNVERIFIED — pending real-machine verify (HK+US 双
518 // 账户跨 market 推送, v1.4.106 升级 confidence per
519 // pitfall #57 backend-semantic risk).
520 //
521 // v1.4.105 T-B3: trd_market 改读 PushEvent.trd_market 字段
522 // (PushDispatcher 端一次 decode), 不再各 surface 独立
523 // decode body. 空字符串 = 非 trade event / decode 失败 /
524 // market 未知 → 转 None (Layer 3 不 trigger drop, 向后
525 // 兼容).
526 let event_trd_market =
527 if event.event_type == "trade" && !event.trd_market.is_empty() {
528 Some(event.trd_market.as_str())
529 } else {
530 None
531 };
532 let allowed_markets_for_filter = rec_snapshot
533 .as_ref()
534 .and_then(|r| r.allowed_markets.as_ref());
535 let push_ctx = PushEventCtx {
536 event_type: &event.event_type,
537 event_acc: if event.event_type == "trade" {
538 Some(event.acc_id)
539 } else {
540 None
541 },
542 // Layer 1 已 pipeline 跑, 此处传 None 防双重 reject (LE 之间 OR 短路)
543 allowed_acc_ids: None,
544 // gRPC 没 sub_state (REST 特有)
545 sub_state: None,
546 // Layer 3 — 新加
547 event_trd_market,
548 allowed_markets: allowed_markets_for_filter,
549 };
550 if filter_registry_arc.should_drop_event(&push_ctx) {
551 futu_auth::metrics::bump_ws_filtered("trade_market", &key_id);
552 continue;
553 }
554
555 // v1.4.106 codex 1125 F6 [P2]: notify_subscribe gate.
556 // 对齐 C++ `IsConnSubRecvNotify` (raw TCP). broadcast notify
557 // 类 push (e.g. price reminder) 必须 client 显式 sub 才下发.
558 if event.event_type == "notify" && !notify_subscribe {
559 futu_auth::metrics::bump_ws_filtered("notify_unsub", &key_id);
560 continue;
561 }
562
563 if tx.send(Ok(event)).await.is_err() {
564 break; // 客户端断开
565 }
566 }
567 Err(broadcast::error::RecvError::Lagged(n)) => {
568 tracing::warn!(skipped = n, "gRPC push client lagged, skipped events");
569 // 继续接收,不断开
570 }
571 Err(broadcast::error::RecvError::Closed) => {
572 break; // 广播器关闭
573 }
574 }
575 }
576 tracing::info!("gRPC push stream ended");
577 });
578
579 Ok(Response::new(ReceiverStream::new(rx)))
580 }
581}
582
583// v1.4.104: `grpc_handler_full_check` 已删除 — 功能搬到
584// `futu_auth_pipeline::pipeline::authenticate_request` 内的 body-aware step.
585// gRPC `request()` 调一次 pipeline 即覆盖所有 (caller-key + scope + body-aware
586// + audit + rate). 4 surface 共用同一函数, 不再有 hand-copy diverge.
587
588/// gRPC PushEvent 的 event_type → 客户端必须持有的 scope (与 REST `/ws` 对齐).
589/// 用于 SubscribePush stream 内 per-event filter.
590fn scope_for_event(event_type: &str) -> Scope {
591 match event_type {
592 "trade" => Scope::AccRead, // 账户交易回报
593 _ => Scope::QotRead, // quote / notify / 未知都按行情门槛
594 }
595}
596
597// v1.4.105 D3 (Phase 4) T-B3: 旧 `extract_trd_market_from_trade_event` 已搬到
598// `futu_server::push::extract_trd_market_from_trade_body` + PushDispatcher 端
599// 一次 decode 后透传给 PushEvent.trd_market 字段, 4 surface (gRPC / REST WS /
600// MCP) 共用. gRPC subscribe_push 现读 event.trd_market 字段不再 decode body.
601//
602// 这里若想要静态 import 提示, 可保留:
603//
604// use futu_server::push::extract_trd_market_from_trade_body;
605//
606// 但目前 server.rs 不再调用, 全凭 PushEvent 字段透传, 故不 import.
607
608#[cfg(test)]
609mod tests;
610
611/// 构建 gRPC 服务(供外部调用 tonic Server 使用)
612pub fn build_service(
613 router: Arc<RequestRouter>,
614 push_broadcaster: Arc<GrpcPushBroadcaster>,
615) -> FutuOpenDServer<FutuGrpcService> {
616 FutuOpenDServer::new(FutuGrpcService::new(router, push_broadcaster))
617}
618
619/// 构建 gRPC 服务(带 KeyStore 鉴权 + 共享限额 counters)
620pub fn build_service_with_auth(
621 router: Arc<RequestRouter>,
622 push_broadcaster: Arc<GrpcPushBroadcaster>,
623 key_store: Arc<KeyStore>,
624 counters: Arc<RuntimeCounters>,
625) -> FutuOpenDServer<FutuGrpcService> {
626 FutuOpenDServer::new(FutuGrpcService::with_auth(
627 router,
628 push_broadcaster,
629 key_store,
630 counters,
631 ))
632}
633
634/// 启动 gRPC 服务
635pub async fn start(
636 listen_addr: &str,
637 router: Arc<RequestRouter>,
638 push_broadcaster: Arc<GrpcPushBroadcaster>,
639) -> Result<(), Box<dyn std::error::Error>> {
640 start_with_auth(
641 listen_addr,
642 router,
643 push_broadcaster,
644 Arc::new(KeyStore::empty()),
645 Arc::new(RuntimeCounters::new()),
646 )
647 .await
648}
649
650/// 启动 gRPC 服务(带 KeyStore 鉴权 + 共享限额 counters)
651pub async fn start_with_auth(
652 listen_addr: &str,
653 router: Arc<RequestRouter>,
654 push_broadcaster: Arc<GrpcPushBroadcaster>,
655 key_store: Arc<KeyStore>,
656 counters: Arc<RuntimeCounters>,
657) -> Result<(), Box<dyn std::error::Error>> {
658 let addr = listen_addr
659 .parse()
660 .map_err(|e| format!("invalid addr: {e}"))?;
661 if !key_store.is_configured() {
662 tracing::warn!(
663 "gRPC server running WITHOUT API key auth (legacy mode); \
664 all RPCs are open. Pass --grpc-keys-file to enable scope-based auth."
665 );
666 }
667 let service = build_service_with_auth(router, push_broadcaster, key_store, counters);
668 tracing::info!(addr = %listen_addr, "gRPC 服务已启动");
669 tonic::transport::Server::builder()
670 .add_service(service)
671 .serve(addr)
672 .await?;
673 Ok(())
674}