1use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::DashMap;
7
8use crate::conn::ClientConn;
9use crate::metrics::GatewayMetrics;
10use crate::subscription::SubscriptionManager;
11
12pub trait ExternalPushSink: Send + Sync {
17 fn on_quote_push(&self, sec_key: &str, sub_type: i32, proto_id: u32, body: &[u8]);
19 fn on_broadcast_push(&self, proto_id: u32, body: &[u8]);
21 fn on_trade_push(&self, acc_id: u64, proto_id: u32, body: &[u8]);
23}
24
25pub struct PushDispatcher {
27 connections: Arc<DashMap<u64, ClientConn>>,
28 subscriptions: Arc<SubscriptionManager>,
29 metrics: Option<Arc<GatewayMetrics>>,
30 external_sinks: Vec<Arc<dyn ExternalPushSink>>,
32}
33
34impl PushDispatcher {
35 pub fn new(
36 connections: Arc<DashMap<u64, ClientConn>>,
37 subscriptions: Arc<SubscriptionManager>,
38 ) -> Self {
39 Self {
40 connections,
41 subscriptions,
42 metrics: None,
43 external_sinks: Vec::new(),
44 }
45 }
46
47 pub fn with_metrics(mut self, metrics: Arc<GatewayMetrics>) -> Self {
49 self.metrics = Some(metrics);
50 self
51 }
52
53 pub fn with_external_sink(mut self, sink: Arc<dyn ExternalPushSink>) -> Self {
55 self.external_sinks.push(sink);
56 self
57 }
58
59 fn record_push(&self) {
60 if let Some(ref m) = self.metrics {
61 m.client_pushes_sent
62 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
63 }
64 }
65
66 pub async fn push_to_conn(&self, conn_id: u64, proto_id: u32, body: Vec<u8>) {
68 if let Some(conn) = self.connections.get(&conn_id) {
69 let frame = conn.make_frame(proto_id, 0, Bytes::from(body));
70 let _ = conn.tx.send(frame).await;
71 self.record_push();
72 }
73 }
74
75 pub async fn push_notify(&self, proto_id: u32, body: Vec<u8>) {
77 let body = Bytes::from(body);
78 for entry in self.connections.iter() {
79 let conn = entry.value();
80 if self.subscriptions.is_subscribed_notify(conn.conn_id) {
81 let frame = conn.make_frame(proto_id, 0, body.clone());
82 let _ = conn.tx.send(frame).await;
83 self.record_push();
84 }
85 }
86 }
87
88 pub async fn push_trd_acc(&self, acc_id: u64, proto_id: u32, body: Vec<u8>) {
90 for sink in &self.external_sinks {
92 sink.on_trade_push(acc_id, proto_id, &body);
93 }
94 let body = Bytes::from(body);
95 let subscribers = self.subscriptions.get_acc_subscribers(acc_id);
96 for conn_id in subscribers {
97 if let Some(conn) = self.connections.get(&conn_id) {
98 let frame = conn.make_frame(proto_id, 0, body.clone());
99 let _ = conn.tx.send(frame).await;
100 self.record_push();
101 }
102 }
103 }
104
105 pub async fn push_broadcast(&self, proto_id: u32, body: Vec<u8>) {
108 for sink in &self.external_sinks {
110 sink.on_broadcast_push(proto_id, &body);
111 }
112 let body = Bytes::from(body);
113 for entry in self.connections.iter() {
114 let conn = entry.value();
115 if self.subscriptions.is_subscribed_notify(conn.conn_id) {
116 let frame = conn.make_frame(proto_id, 0, body.clone());
117 let _ = conn.tx.send(frame).await;
118 self.record_push();
119 }
120 }
121 }
122
123 pub async fn push_qot(&self, security_key: &str, sub_type: i32, proto_id: u32, body: Vec<u8>) {
125 for sink in &self.external_sinks {
127 sink.on_quote_push(security_key, sub_type, proto_id, &body);
128 }
129 let body = Bytes::from(body);
130 let subscribers = self
131 .subscriptions
132 .get_qot_subscribers(security_key, sub_type);
133 for conn_id in subscribers {
134 if let Some(conn) = self.connections.get(&conn_id) {
135 let frame = conn.make_frame(proto_id, 0, body.clone());
136 let _ = conn.tx.send(frame).await;
137 self.record_push();
138 }
139 }
140 }
141}