Skip to main content

futu_grpc/
server.rs

1//! gRPC 服务实现
2//!
3//! FutuOpenD 服务通过通用的 proto_id + body 方式,
4//! 将所有请求转发到现有的 RequestRouter。
5//! 支持流式推送:行情、交易、广播事件通过 SubscribePush 接口推送给客户端。
6//!
7//! ## v1.4.106 codex 0517 ζ25-redo F2: stateful QOT stable identity
8//!
9//! gRPC `request()` 与 `subscribe_push()` 共享同一身份派生函数
10//! [`auth::derive_grpc_conn_id`]:从 `(Bearer token, optional grpc-session-id
11//! metadata)` 派生 deterministic stable conn_id(在
12//! [`auth::GRPC_STABLE_CONN_NAMESPACE`] 即 bit 62 namespace 内)。
13//!
14//! 这让同一 caller 的连续 RPC 命中同一 SubscriptionManager / cache 状态:
15//! - subscribe → unsubscribe / get_sub_info / query_subscription 全对齐
16//! - quote cache 命中(per-conn cache 不再每次 RPC miss)
17//! - QOT push fanout per-conn filter 能找到 caller
18//!
19//! 不同 Bearer / 不同 session_id → 自然隔离(caller 之间互不影响)。
20//! Legacy mode(KeyStore 未配置 / Bearer 缺失)→ 共享一个固定 conn_id(
21//! 全 legacy caller 共享同 sub state,对齐"无鉴权配置 = 单租户"语义)。
22//!
23//! 历史:v1.4.105 之前用自增 `conn_id_counter`,每次 RPC 拿新 ID,
24//! 等价 REST v1.4.90 P0-B 之前的 quota 永久泄漏 bug。F2 修复对齐
25//! `REST_SHARED_CONN` 设计哲学,不同点是 gRPC 按 caller 隔离(REST 共享)。
26
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU32, Ordering};
29
30use bytes::Bytes;
31use tokio::sync::{broadcast, mpsc};
32use tokio_stream::wrappers::ReceiverStream;
33use tonic::{Request, Response, Status};
34
35use futu_auth::{KeyStore, RuntimeCounters, Scope};
36use futu_codec::header::ProtoFmtType;
37use futu_server::conn::IncomingRequest;
38use futu_server::push::ExternalPushSink;
39use futu_server::router::RequestRouter;
40
41use crate::auth::{
42    derive_grpc_conn_id, extract_grpc_idempotency_key, extract_grpc_session_id, extract_grpc_token,
43    grpc_status_for,
44};
45use crate::proto::futu_open_d_server::{FutuOpenD, FutuOpenDServer};
46use crate::proto::{FutuRequest, FutuResponse, PushEvent, SubscribePushRequest};
47use futu_auth_pipeline::{
48    AuthDecision, AuthEnvelope, Credential, Endpoint, FilterRegistry, PushEventCtx, RejectKind,
49    SurfaceId, authenticate_request,
50};
51
52/// gRPC 推送广播器
53///
54/// 实现 `ExternalPushSink` trait,接收 PushDispatcher 的推送事件,
55/// 通过 broadcast channel 分发给所有 SubscribePush 流式连接。
56#[derive(Clone)]
57pub struct GrpcPushBroadcaster {
58    tx: broadcast::Sender<PushEvent>,
59}
60
61impl GrpcPushBroadcaster {
62    pub fn new(capacity: usize) -> Self {
63        let (tx, _) = broadcast::channel(capacity);
64        Self { tx }
65    }
66
67    /// 创建接收端
68    pub fn subscribe(&self) -> broadcast::Receiver<PushEvent> {
69        self.tx.subscribe()
70    }
71
72    fn send(&self, event: PushEvent) {
73        let _ = self.tx.send(event);
74    }
75}
76
77impl ExternalPushSink for GrpcPushBroadcaster {
78    /// **v1.4.106 codex 1131 F4 [P1]**: `rehab_type` 透传到 gRPC PushEvent.
79    /// gRPC 当前 broadcast 模型 (所有 qot:read subscriber 收到所有 quote
80    /// event), per-conn (sec_key, sub_type, rehab_type) 三元 filter 是 raw TCP
81    /// 专属. rehab_type 通过 PushEvent 让客户端可见.
82    fn on_quote_push(
83        &self,
84        sec_key: &str,
85        sub_type: i32,
86        rehab_type: i32,
87        proto_id: u32,
88        body: &[u8],
89    ) {
90        self.send(PushEvent {
91            proto_id,
92            sec_key: sec_key.to_string(),
93            sub_type,
94            rehab_type,
95            body: body.to_vec(),
96            event_type: "quote".to_string(),
97            acc_id: 0,
98            trd_market: String::new(), // 行情推送无 trd_market
99        });
100    }
101
102    fn on_broadcast_push(&self, proto_id: u32, body: &[u8]) {
103        self.send(PushEvent {
104            proto_id,
105            sec_key: String::new(),
106            sub_type: 0,
107            rehab_type: 0,
108            body: body.to_vec(),
109            event_type: "notify".to_string(),
110            acc_id: 0,
111            trd_market: String::new(),
112        });
113    }
114
115    /// v1.4.105 D3 (Phase 4) T-B3: trade push trd_market 由 PushDispatcher 一
116    /// 次 decode 后透传, 不再各 sink 独立 decode body. 空 / unknown → 空
117    /// 字符串 (PushEvent proto3 默认值, 老 client 解析兼容).
118    fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8], trd_market: Option<&str>) {
119        self.send(PushEvent {
120            proto_id,
121            sec_key: String::new(),
122            sub_type: 0,
123            rehab_type: 0,
124            body: body.to_vec(),
125            event_type: "trade".to_string(),
126            acc_id,
127            trd_market: trd_market.unwrap_or("").to_string(),
128        });
129    }
130}
131
132/// gRPC 服务实现
133pub struct FutuGrpcService {
134    router: Arc<RequestRouter>,
135    push_broadcaster: Arc<GrpcPushBroadcaster>,
136    key_store: Arc<KeyStore>,
137    counters: Arc<RuntimeCounters>,
138    /// v1.4.104: response filter 注册中心 (proto 2001 = AccListFilter).
139    /// 加新 filter 在 `FilterRegistry::install_defaults` 注册一次, 4 surface
140    /// 自动生效.
141    filter_registry: Arc<FilterRegistry>,
142    /// v1.4.106 codex 0517 ζ25-redo F2: gRPC `conn_id_counter` 已删除 —
143    /// 自增 conn_id 让同一 caller 连续 RPC 拿不到同 sub state, 等价 REST
144    /// v1.4.90 P0-B 之前的 quota 永久泄漏 bug. 现在改用
145    /// [`auth::derive_grpc_conn_id`] 从 `(bearer, session_id)` 派生
146    /// deterministic stable conn_id, 同 caller 连续 RPC 命中同一 sub state.
147    serial_counter: AtomicU32,
148}
149
150impl FutuGrpcService {
151    pub fn new(router: Arc<RequestRouter>, push_broadcaster: Arc<GrpcPushBroadcaster>) -> Self {
152        Self::with_auth(
153            router,
154            push_broadcaster,
155            Arc::new(KeyStore::empty()),
156            Arc::new(RuntimeCounters::new()),
157        )
158    }
159
160    /// 完整构造:同时接 key_store + counters(v1.0 推荐入口)
161    ///
162    /// `counters` 应由 main 全进程共享:REST / gRPC / MCP 共用一个实例才能保证
163    /// rate limit / 日累计跨接口一致
164    pub fn with_auth(
165        router: Arc<RequestRouter>,
166        push_broadcaster: Arc<GrpcPushBroadcaster>,
167        key_store: Arc<KeyStore>,
168        counters: Arc<RuntimeCounters>,
169    ) -> Self {
170        Self {
171            router,
172            push_broadcaster,
173            key_store,
174            counters,
175            filter_registry: Arc::new(FilterRegistry::with_defaults()),
176            serial_counter: AtomicU32::new(1),
177        }
178    }
179
180    fn next_serial(&self) -> u32 {
181        self.serial_counter.fetch_add(1, Ordering::Relaxed)
182    }
183}
184
185#[tonic::async_trait]
186impl FutuOpenD for FutuGrpcService {
187    /// 通用请求-响应
188    async fn request(
189        &self,
190        request: Request<FutuRequest>,
191    ) -> Result<Response<FutuResponse>, Status> {
192        // v1.4.104: 走 futu_auth_pipeline::authenticate_request 单一函数,
193        // 不再 inline authenticate / check_scope / rate gate / body-aware /
194        // audit. surface adapter 极薄 — 仅 transport extract + RejectKind →
195        // Status 翻译.
196
197        let proto_id = request.get_ref().proto_id;
198        if proto_id == 0 {
199            return Err(Status::invalid_argument("proto_id is required"));
200        }
201        // v1.4.106 codex 0532 F3 (P2): daemon-internal proto_id (高位
202        // 0x8000_0000 bit) 绝不应从 gRPC 公开 surface 进入 — 仅 REST handler
203        // 内部合成给 router. 显式 reject + audit, 防探测 daemon 内部 routing.
204        if futu_auth::is_internal_proto_id(proto_id) {
205            tracing::warn!(
206                proto_id,
207                "rejecting daemon-internal proto_id at gRPC public surface (codex 0532 F3)"
208            );
209            return Err(Status::permission_denied(
210                "daemon-internal proto_id not allowed on public surface",
211            ));
212        }
213
214        // 1. extract token + session (owned String 避免 borrow vs move 冲突).
215        // v1.4.106 codex 0517 ζ25-redo F2: 同时拿 grpc-session-id metadata,
216        // 派生 stateful QOT stable conn_id (derive_grpc_conn_id).
217        let token = extract_grpc_token(&request);
218        let session_id = extract_grpc_session_id(&request);
219        let idempotency_key = extract_grpc_idempotency_key(&request);
220        let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
221        // v1.4.110 Surface Spec v2: gRPC generic proto path is spec-owned.
222        // Unknown proto_id must not bypass EndpointSpec into auth/router.
223        let Some(spec) = futu_surface_spec::lookup_endpoint_by_proto_id(proto_id) else {
224            tracing::warn!(
225                proto_id,
226                "rejecting gRPC request for proto_id not declared in EndpointSpec"
227            );
228            return Err(Status::invalid_argument(
229                "proto_id is not declared in endpoint spec",
230            ));
231        };
232        match spec.grpc_exposure() {
233            futu_surface_spec::SurfaceExposure::Exposed(
234                futu_surface_spec::GrpcSurface::GenericProtoRequest,
235            ) => {}
236            futu_surface_spec::SurfaceExposure::NotExposed { reason } => {
237                tracing::warn!(
238                    proto_id,
239                    endpoint = spec.canonical_name,
240                    reason,
241                    "rejecting gRPC request for endpoint not exposed to gRPC"
242                );
243                return Err(Status::permission_denied(reason));
244            }
245        }
246        let needed_scope = Some(spec.runtime.scope);
247        let endpoint_name = spec.canonical_name;
248        tracing::debug!(
249            proto_id,
250            endpoint = endpoint_name,
251            "gRPC request dispatch (Layer 2 spec lookup)"
252        );
253        let req_inner = request.into_inner();
254
255        // 2. 构 credential + envelope, 调 pipeline
256        let credential = match token.as_deref() {
257            Some(t) => Credential::Bearer(t),
258            None => Credential::None,
259        };
260        let env = AuthEnvelope {
261            surface: SurfaceId::Grpc,
262            endpoint: Endpoint::Proto(proto_id),
263            needed_scope,
264            credential,
265            proto_id: Some(proto_id),
266            body: &req_inner.body,
267            explicit_acc_id: None,
268            explicit_ctx: None,
269            commit_rate: true, // gRPC middleware 层 commit rate (trade:real)
270            audit_emit: true,
271        };
272        let (allowed_for_filter, caller_rec) =
273            match authenticate_request(&self.key_store, &self.counters, env) {
274                AuthDecision::Reject { kind, reason, .. } => {
275                    return Err(grpc_status_for(kind, reason));
276                }
277                AuthDecision::Allow {
278                    allowed_acc_ids,
279                    rec,
280                    ..
281                } => (allowed_acc_ids, rec),
282            };
283
284        // 3. dispatch + response filter
285        // v1.4.105 D2 T-A1 fix: caller_allowed_acc_ids 从 pipeline allow decision
286        // 真填进 IncomingRequest, 让 dispatch handler (e.g. SubAccPushHandler)
287        // 端 enforce per-acc whitelist defense-in-depth.
288        // codex 0522 F1 v1.4.106: 同步填 caller_key_id 让 cross-surface handler
289        // 能识别 gRPC caller.
290        let caller_allowed = allowed_for_filter
291            .as_ref()
292            .map(|s| std::sync::Arc::new(s.clone()));
293        let caller_key_id = caller_rec.as_ref().map(|r| r.id.clone());
294        let incoming = IncomingRequest::builder(
295            stable_conn_id,
296            proto_id,
297            self.next_serial(),
298            ProtoFmtType::Protobuf,
299            Bytes::from(req_inner.body),
300        )
301        .with_idempotency_key(idempotency_key)
302        .with_caller_scope(caller_allowed, caller_key_id)
303        .build();
304
305        match self.router.dispatch(incoming.conn_id, &incoming).await {
306            Some(resp_bytes) => {
307                // v1.4.104: 用 FilterRegistry (单一注册表) 替代 inline
308                // filter_acc_list_response. 加新 filter (e.g. cash-flow) 只
309                // 在 registry 注册一次, 4 surface 自动生效.
310                let filtered_body =
311                    self.filter_registry
312                        .apply(proto_id, resp_bytes, allowed_for_filter.as_ref());
313                Ok(Response::new(FutuResponse {
314                    ret_type: 0,
315                    ret_msg: String::new(),
316                    proto_id,
317                    body: filtered_body,
318                }))
319            }
320            None => Ok(Response::new(FutuResponse {
321                ret_type: -1,
322                ret_msg: "handler returned no response".to_string(),
323                proto_id,
324                body: Vec::new(),
325            })),
326        }
327    }
328
329    type SubscribePushStream = ReceiverStream<Result<PushEvent, Status>>;
330
331    /// 流式推送:客户端建立连接后持续接收行情、交易、广播推送
332    ///
333    /// v1.1:按订阅 key 的 scope 过滤推送 —— `qot:read`-only 的 key 不会收到
334    /// `trade` 类(账户交易回报),对齐 REST `/ws` v0.9.0 加的 push filter。
335    ///
336    /// ## v1.4.104 阶段 7-2: pipeline 委托
337    ///
338    /// **handshake**: pipeline 调一次 with `Endpoint::Event("subscribe")` +
339    /// `needed_scope=None` (跳 scope check, 走 OR 语义手工 check). Allow → 拿
340    /// rec snapshot. Reject (Bearer invalid / expired) → translate to gRPC Status.
341    ///
342    /// **per-event filter**: stream 内每 event 调一次 pipeline with
343    /// `Credential::PreVerified(rec)` + `Endpoint::Event(event_type)` +
344    /// `needed_scope=Some(scope_for_event(...))` + `explicit_acc_id` (trade event
345    /// 给 event.acc_id, 其他不传) + `audit_emit=false` (避免每 event 一条 audit
346    /// 把日志冲爆) + `commit_rate=false` (push 不计 rate). Reject → silent drop +
347    /// `metrics::bump_ws_filtered`. Allow → forward.
348    ///
349    /// 与 v1.4.103 行为 byte-identical: handshake qot:read OR acc:read, per-event
350    /// scope match + trade acc_id whitelist (受限 key + acc_id=0 仍 drop, 由 pipeline
351    /// `body_aware::CheckCtx { acc_id: Some(0) }` + `allowed_acc_ids` 非空 → reject 实现).
352    async fn subscribe_push(
353        &self,
354        request: Request<SubscribePushRequest>,
355    ) -> Result<Response<Self::SubscribePushStream>, Status> {
356        // v1.4.106 codex 1125 F6 [P2]: 显式 notify subscription opt-in.
357        // 对齐 C++ raw TCP `IsConnSubRecvNotify` 语义 (broadcast push 必须显式
358        // sub 才下发).
359        let notify_subscribe = request.get_ref().notify_subscribe;
360
361        // ── handshake: pipeline 调一次拿 caller-key + audit 一次 ──────────────────
362        //
363        // 用 `needed_scope=None` 跳 pipeline scope check (因为我们要 qot:read OR
364        // acc:read 双 OR 语义, 不是单 scope match). Bearer 提取 / verify / expiry
365        // 都走 pipeline (单一 source).
366        //
367        // v1.4.106 codex 0517 ζ25-redo F2: 与 `request()` 共享 stable conn_id.
368        // 这让 ops/log 能把 push stream 和同 caller 的 subscribe request 串起来:
369        // grep `conn_id=<stable>` 看到 subscribe RPC + push event filter 同 id.
370        let token = extract_grpc_token(&request);
371        let session_id = extract_grpc_session_id(&request);
372        let stable_conn_id = derive_grpc_conn_id(token.as_deref(), session_id.as_deref());
373        let credential = match token.as_deref() {
374            Some(t) => Credential::Bearer(t),
375            None => Credential::None,
376        };
377        let env = AuthEnvelope {
378            surface: SurfaceId::Grpc,
379            endpoint: Endpoint::Event("subscribe_push"),
380            needed_scope: None, // OR 语义手工 check, pipeline 不参与
381            credential,
382            proto_id: None,
383            body: &[],
384            explicit_acc_id: None,
385            explicit_ctx: None,
386            commit_rate: false,
387            audit_emit: true, // handshake 一次 audit
388        };
389        let rec_snapshot = match authenticate_request(&self.key_store, &self.counters, env) {
390            AuthDecision::Reject { kind, reason, .. } => {
391                return Err(grpc_status_for(kind, reason));
392            }
393            AuthDecision::Allow { rec, .. } => rec,
394        };
395
396        // OR 语义 scope check: legacy mode (rec=None) 全放行;
397        // scope mode 必须持 qot:read OR acc:read 任一.
398        let (scopes, key_id, allowed_acc_ids) = match rec_snapshot.as_ref() {
399            Some(rec) => {
400                let qot_ok = rec.scopes.contains(&Scope::QotRead);
401                let acc_ok = rec.scopes.contains(&Scope::AccRead);
402                if !qot_ok && !acc_ok {
403                    futu_auth::audit::reject(
404                        "grpc",
405                        "event=subscribe_push",
406                        &rec.id,
407                        "missing scope qot:read OR acc:read",
408                    );
409                    return Err(grpc_status_for(
410                        RejectKind::Forbidden,
411                        "missing scope qot:read OR acc:read".to_string(),
412                    ));
413                }
414                (
415                    rec.scopes.clone(),
416                    rec.id.clone(),
417                    rec.allowed_acc_ids.clone(),
418                )
419            }
420            None => (
421                // legacy: 全 scope + 无 acc 限制
422                [
423                    Scope::QotRead,
424                    Scope::AccRead,
425                    Scope::TradeSimulate,
426                    Scope::TradeReal,
427                ]
428                .into_iter()
429                .collect::<std::collections::HashSet<Scope>>(),
430                "<none>".to_string(),
431                None,
432            ),
433        };
434
435        let (tx, rx) = mpsc::channel(256);
436        let mut push_rx = self.push_broadcaster.subscribe();
437
438        tracing::info!(
439            key_id = %key_id,
440            // v1.4.106 codex 0517 ζ25-redo F2: stable conn_id 让 subscribe RPC
441            // 和 push stream 在 log 里可关联.
442            conn_id = stable_conn_id,
443            scopes = ?scopes,
444            allowed_acc_ids = ?allowed_acc_ids.as_ref().map(|s| s.len()),
445            "gRPC client subscribed to push events",
446        );
447
448        // ── per-event filter: pipeline 二次调 (audit_emit=false 防日志冲爆) ───────
449        //
450        // 每 event 调一次 pipeline:
451        //   - Credential::PreVerified(rec) 复用 handshake 已 verify 的 rec
452        //   - Endpoint::Event(event_type) 给 audit label (虽然 audit_emit=false)
453        //   - needed_scope = scope_for_event(event_type)
454        //   - explicit_acc_id = trade event 给 event.acc_id, 其他 None
455        //
456        // legacy mode (rec_snapshot=None) 不调 pipeline (没必要, 全放行).
457        let key_store_arc = self.key_store.clone();
458        let counters_arc = self.counters.clone();
459        // v1.4.105 D3 (Phase 4): clone filter_registry for per-event Layer 3
460        // (allowed_markets) check + Layer 1/2 reuse.
461        let filter_registry_arc = self.filter_registry.clone();
462        tokio::spawn(async move {
463            loop {
464                match push_rx.recv().await {
465                    Ok(event) => {
466                        // legacy fast-path: rec=None → 全放行, scope check / acc_id
467                        // whitelist 都不需要走 pipeline.
468                        let allow_event = if let Some(rec) = rec_snapshot.as_ref() {
469                            let needed = scope_for_event(&event.event_type);
470                            let explicit_acc_id = if event.event_type == "trade" {
471                                Some(event.acc_id)
472                            } else {
473                                None
474                            };
475                            let env = AuthEnvelope {
476                                surface: SurfaceId::Grpc,
477                                endpoint: Endpoint::Event(&event.event_type),
478                                needed_scope: Some(needed),
479                                credential: Credential::PreVerified(rec.clone()),
480                                proto_id: None,
481                                body: &[],
482                                explicit_acc_id,
483                                explicit_ctx: None,
484                                commit_rate: false, // push 不计 rate
485                                audit_emit: false,  // per-event 不 audit, 避免冲爆
486                            };
487                            matches!(
488                                authenticate_request(&key_store_arc, &counters_arc, env),
489                                AuthDecision::Allow { .. }
490                            )
491                        } else {
492                            true
493                        };
494
495                        if !allow_event {
496                            // metric label 区分 trade acc_id reject vs scope reject
497                            // (近似旧行为: trade 类 reject 标 "trade_acc_id" 仅当
498                            // acc_id whitelist 非空; 否则按 event_type)
499                            let label = if event.event_type == "trade"
500                                && rec_snapshot
501                                    .as_ref()
502                                    .and_then(|r| r.allowed_acc_ids.as_ref())
503                                    .is_some_and(|s| !s.is_empty())
504                            {
505                                "trade_acc_id"
506                            } else {
507                                event.event_type.as_str()
508                            };
509                            futu_auth::metrics::bump_ws_filtered(label, &key_id);
510                            continue;
511                        }
512
513                        // v1.4.105 D3 (Phase 4): FilterRegistry::should_drop_event
514                        // Layer 3 (allowed_markets). Layer 1 (allowed_acc_ids) 已在
515                        // 上面 pipeline body-aware 跑过, 此处串行 Layer 3 双重
516                        // 防御 + 未来 Layer 扩展自动覆盖.
517                        // ⚠️ UNVERIFIED — pending real-machine verify (HK+US 双
518                        // 账户跨 market 推送, v1.4.106 升级 confidence per
519                        // pitfall #57 backend-semantic risk).
520                        //
521                        // v1.4.105 T-B3: trd_market 改读 PushEvent.trd_market 字段
522                        // (PushDispatcher 端一次 decode), 不再各 surface 独立
523                        // decode body. 空字符串 = 非 trade event / decode 失败 /
524                        // market 未知 → 转 None (Layer 3 不 trigger drop, 向后
525                        // 兼容).
526                        let event_trd_market =
527                            if event.event_type == "trade" && !event.trd_market.is_empty() {
528                                Some(event.trd_market.as_str())
529                            } else {
530                                None
531                            };
532                        let allowed_markets_for_filter = rec_snapshot
533                            .as_ref()
534                            .and_then(|r| r.allowed_markets.as_ref());
535                        let push_ctx = PushEventCtx {
536                            event_type: &event.event_type,
537                            event_acc: if event.event_type == "trade" {
538                                Some(event.acc_id)
539                            } else {
540                                None
541                            },
542                            // Layer 1 已 pipeline 跑, 此处传 None 防双重 reject (LE 之间 OR 短路)
543                            allowed_acc_ids: None,
544                            // gRPC 没 sub_state (REST 特有)
545                            sub_state: None,
546                            // Layer 3 — 新加
547                            event_trd_market,
548                            allowed_markets: allowed_markets_for_filter,
549                        };
550                        if filter_registry_arc.should_drop_event(&push_ctx) {
551                            futu_auth::metrics::bump_ws_filtered("trade_market", &key_id);
552                            continue;
553                        }
554
555                        // v1.4.106 codex 1125 F6 [P2]: notify_subscribe gate.
556                        // 对齐 C++ `IsConnSubRecvNotify` (raw TCP). broadcast notify
557                        // 类 push (e.g. price reminder) 必须 client 显式 sub 才下发.
558                        if event.event_type == "notify" && !notify_subscribe {
559                            futu_auth::metrics::bump_ws_filtered("notify_unsub", &key_id);
560                            continue;
561                        }
562
563                        if tx.send(Ok(event)).await.is_err() {
564                            break; // 客户端断开
565                        }
566                    }
567                    Err(broadcast::error::RecvError::Lagged(n)) => {
568                        tracing::warn!(skipped = n, "gRPC push client lagged, skipped events");
569                        // 继续接收,不断开
570                    }
571                    Err(broadcast::error::RecvError::Closed) => {
572                        break; // 广播器关闭
573                    }
574                }
575            }
576            tracing::info!("gRPC push stream ended");
577        });
578
579        Ok(Response::new(ReceiverStream::new(rx)))
580    }
581}
582
583// v1.4.104: `grpc_handler_full_check` 已删除 — 功能搬到
584// `futu_auth_pipeline::pipeline::authenticate_request` 内的 body-aware step.
585// gRPC `request()` 调一次 pipeline 即覆盖所有 (caller-key + scope + body-aware
586// + audit + rate). 4 surface 共用同一函数, 不再有 hand-copy diverge.
587
588/// gRPC PushEvent 的 event_type → 客户端必须持有的 scope (与 REST `/ws` 对齐).
589/// 用于 SubscribePush stream 内 per-event filter.
590fn scope_for_event(event_type: &str) -> Scope {
591    match event_type {
592        "trade" => Scope::AccRead, // 账户交易回报
593        _ => Scope::QotRead,       // quote / notify / 未知都按行情门槛
594    }
595}
596
597// v1.4.105 D3 (Phase 4) T-B3: 旧 `extract_trd_market_from_trade_event` 已搬到
598// `futu_server::push::extract_trd_market_from_trade_body` + PushDispatcher 端
599// 一次 decode 后透传给 PushEvent.trd_market 字段, 4 surface (gRPC / REST WS /
600// MCP) 共用. gRPC subscribe_push 现读 event.trd_market 字段不再 decode body.
601//
602// 这里若想要静态 import 提示, 可保留:
603//
604//     use futu_server::push::extract_trd_market_from_trade_body;
605//
606// 但目前 server.rs 不再调用, 全凭 PushEvent 字段透传, 故不 import.
607
608#[cfg(test)]
609mod tests;
610
611/// 构建 gRPC 服务(供外部调用 tonic Server 使用)
612pub fn build_service(
613    router: Arc<RequestRouter>,
614    push_broadcaster: Arc<GrpcPushBroadcaster>,
615) -> FutuOpenDServer<FutuGrpcService> {
616    FutuOpenDServer::new(FutuGrpcService::new(router, push_broadcaster))
617}
618
619/// 构建 gRPC 服务(带 KeyStore 鉴权 + 共享限额 counters)
620pub fn build_service_with_auth(
621    router: Arc<RequestRouter>,
622    push_broadcaster: Arc<GrpcPushBroadcaster>,
623    key_store: Arc<KeyStore>,
624    counters: Arc<RuntimeCounters>,
625) -> FutuOpenDServer<FutuGrpcService> {
626    FutuOpenDServer::new(FutuGrpcService::with_auth(
627        router,
628        push_broadcaster,
629        key_store,
630        counters,
631    ))
632}
633
634/// 启动 gRPC 服务
635pub async fn start(
636    listen_addr: &str,
637    router: Arc<RequestRouter>,
638    push_broadcaster: Arc<GrpcPushBroadcaster>,
639) -> Result<(), Box<dyn std::error::Error>> {
640    start_with_auth(
641        listen_addr,
642        router,
643        push_broadcaster,
644        Arc::new(KeyStore::empty()),
645        Arc::new(RuntimeCounters::new()),
646    )
647    .await
648}
649
650/// 启动 gRPC 服务(带 KeyStore 鉴权 + 共享限额 counters)
651pub async fn start_with_auth(
652    listen_addr: &str,
653    router: Arc<RequestRouter>,
654    push_broadcaster: Arc<GrpcPushBroadcaster>,
655    key_store: Arc<KeyStore>,
656    counters: Arc<RuntimeCounters>,
657) -> Result<(), Box<dyn std::error::Error>> {
658    let addr = listen_addr
659        .parse()
660        .map_err(|e| format!("invalid addr: {e}"))?;
661    if !key_store.is_configured() {
662        tracing::warn!(
663            "gRPC server running WITHOUT API key auth (legacy mode); \
664             all RPCs are open. Pass --grpc-keys-file to enable scope-based auth."
665        );
666    }
667    let service = build_service_with_auth(router, push_broadcaster, key_store, counters);
668    tracing::info!(addr = %listen_addr, "gRPC 服务已启动");
669    tonic::transport::Server::builder()
670        .add_service(service)
671        .serve(addr)
672        .await?;
673    Ok(())
674}