1use std::collections::{HashMap, HashSet};
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::{Duration, Instant};
31
32use futu_core::qot_stock_key::QotSecurityKey;
33use parking_lot::RwLock;
34
35pub struct SubscriptionManager {
37 notify_subs: RwLock<HashSet<u64>>,
39
40 trd_acc_subs: RwLock<HashMap<u64, HashSet<u64>>>,
42
43 qot_subs: RwLock<HashMap<(QotSecurityKey, i32), HashSet<u64>>>,
46
47 qot_push_regs: RwLock<HashMap<(QotSecurityKey, i32, i32), HashSet<u64>>>,
51
52 qot_sub_sessions: RwLock<HashMap<(QotSecurityKey, i32), HashMap<u64, i32>>>,
57
58 qot_orderbook_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
61
62 qot_broker_detail: RwLock<HashMap<QotSecurityKey, HashMap<u64, bool>>>,
65
66 total_quota: RwLock<u32>,
70
71 qot_sub_times: RwLock<HashMap<(QotSecurityKey, i32), Instant>>,
75
76 qot_disconnected_conns: RwLock<HashSet<u64>>,
82
83 qot_disconnect_sync_generation: AtomicU64,
89}
90
91pub const TOTAL_QUOTA: u32 = 4000;
98
99pub const QOT_MIN_UNSUB_ELAPSED_SECS: u64 = 59;
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum SubResult {
110 NewGlobal,
112 AlreadyGlobal,
114 NoChange,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum UnsubResult {
121 LastSubscriber,
124 StillSubscribed,
126 NotSubscribed,
129}
130
131impl SubscriptionManager {
132 pub fn new() -> Self {
133 Self {
134 notify_subs: RwLock::new(HashSet::new()),
135 trd_acc_subs: RwLock::new(HashMap::new()),
136 qot_subs: RwLock::new(HashMap::new()),
137 qot_push_regs: RwLock::new(HashMap::new()),
138 qot_sub_sessions: RwLock::new(HashMap::new()),
139 qot_orderbook_detail: RwLock::new(HashMap::new()),
140 qot_broker_detail: RwLock::new(HashMap::new()),
141 total_quota: RwLock::new(TOTAL_QUOTA),
142 qot_sub_times: RwLock::new(HashMap::new()),
143 qot_disconnected_conns: RwLock::new(HashSet::new()),
144 qot_disconnect_sync_generation: AtomicU64::new(0),
145 }
146 }
147
148 pub fn subscribe_notify(&self, conn_id: u64) {
151 self.notify_subs.write().insert(conn_id);
152 }
153
154 pub fn unsubscribe_notify(&self, conn_id: u64) {
155 self.notify_subs.write().remove(&conn_id);
156 }
157
158 pub fn is_subscribed_notify(&self, conn_id: u64) -> bool {
159 self.notify_subs.read().contains(&conn_id)
160 }
161
162 pub fn subscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
165 self.trd_acc_subs
166 .write()
167 .entry(acc_id)
168 .or_default()
169 .insert(conn_id);
170 }
171
172 pub fn unsubscribe_trd_acc(&self, conn_id: u64, acc_id: u64) {
173 if let Some(subs) = self.trd_acc_subs.write().get_mut(&acc_id) {
174 subs.remove(&conn_id);
175 }
176 }
177
178 pub fn get_acc_subscribers(&self, acc_id: u64) -> Vec<u64> {
179 self.trd_acc_subs
180 .read()
181 .get(&acc_id)
182 .map(|s| s.iter().copied().collect())
183 .unwrap_or_default()
184 }
185
186 pub fn make_qot_key(market: i32, code: &str, sub_type: i32) -> String {
190 format!("{market}_{code}:{sub_type}")
191 }
192
193 #[inline]
194 fn broker_key(sec_key: &QotSecurityKey) -> QotSecurityKey {
195 sec_key.clone()
196 }
197
198 pub fn subscribe_qot_broker(
204 &self,
205 conn_id: u64,
206 sec_key: &QotSecurityKey,
207 sub_type: i32,
208 ) -> SubResult {
209 self.subscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
210 }
211
212 fn subscribe_qot_inner(&self, conn_id: u64, key: QotSecurityKey, sub_type: i32) -> SubResult {
213 self.qot_disconnected_conns.write().remove(&conn_id);
214 let mut qot = self.qot_subs.write();
215 let map_key = (key.clone(), sub_type);
216 let entry = qot.entry(map_key.clone()).or_default();
217 let was_empty_global = entry.is_empty();
218 let inserted = entry.insert(conn_id);
219 if was_empty_global && inserted {
220 self.qot_sub_times.write().insert(map_key, Instant::now());
221 }
222 if !inserted {
223 SubResult::NoChange
224 } else if was_empty_global {
225 SubResult::NewGlobal
226 } else {
227 SubResult::AlreadyGlobal
228 }
229 }
230
231 pub fn unsubscribe_qot_broker(
234 &self,
235 conn_id: u64,
236 sec_key: &QotSecurityKey,
237 sub_type: i32,
238 ) -> UnsubResult {
239 self.unsubscribe_qot_inner(conn_id, Self::broker_key(sec_key), sub_type)
240 }
241
242 fn unsubscribe_qot_inner(
243 &self,
244 conn_id: u64,
245 key: QotSecurityKey,
246 sub_type: i32,
247 ) -> UnsubResult {
248 let map_key = (key.clone(), sub_type);
249 let became_empty;
250 let was_member;
251 {
252 let mut qot = self.qot_subs.write();
253 let entry = match qot.get_mut(&map_key) {
254 Some(e) => e,
255 None => return UnsubResult::NotSubscribed,
256 };
257 was_member = entry.remove(&conn_id);
258 became_empty = entry.is_empty();
259 if became_empty {
260 qot.remove(&map_key);
261 }
262 }
263 if !was_member {
264 return UnsubResult::NotSubscribed;
265 }
266 if became_empty {
267 self.qot_sub_times.write().remove(&map_key);
268 }
269 {
271 let mut sess = self.qot_sub_sessions.write();
272 if let Some(e) = sess.get_mut(&map_key) {
273 e.remove(&conn_id);
274 if e.is_empty() {
275 sess.remove(&map_key);
276 }
277 }
278 }
279 if sub_type == sub_type_orderbook() {
280 let mut ob = self.qot_orderbook_detail.write();
281 if let Some(e) = ob.get_mut(&key) {
282 e.remove(&conn_id);
283 if e.is_empty() {
284 ob.remove(&key);
285 }
286 }
287 }
288 if sub_type == sub_type_broker() {
289 let mut br = self.qot_broker_detail.write();
290 if let Some(e) = br.get_mut(&key) {
291 e.remove(&conn_id);
292 if e.is_empty() {
293 br.remove(&key);
294 }
295 }
296 }
297 if became_empty {
298 UnsubResult::LastSubscriber
299 } else {
300 UnsubResult::StillSubscribed
301 }
302 }
303
304 pub fn is_qot_subscribed_broker(
306 &self,
307 conn_id: u64,
308 sec_key: &QotSecurityKey,
309 sub_type: i32,
310 ) -> bool {
311 self.qot_subs
312 .read()
313 .get(&(Self::broker_key(sec_key), sub_type))
314 .is_some_and(|subs| subs.contains(&conn_id))
315 }
316
317 pub fn is_globally_subscribed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
320 self.qot_subs
321 .read()
322 .get(&(Self::broker_key(sec_key), sub_type))
323 .is_some_and(|subs| !subs.is_empty())
324 }
325
326 pub fn qot_min_unsub_elapsed_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> bool {
328 self.qot_sub_times
329 .read()
330 .get(&(Self::broker_key(sec_key), sub_type))
331 .map(|instant| instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS))
332 .unwrap_or(true)
333 }
334
335 pub fn qot_min_unsub_remaining_secs_broker(
337 &self,
338 sec_key: &QotSecurityKey,
339 sub_type: i32,
340 ) -> u64 {
341 self.qot_sub_times
342 .read()
343 .get(&(Self::broker_key(sec_key), sub_type))
344 .map(|instant| QOT_MIN_UNSUB_ELAPSED_SECS.saturating_sub(instant.elapsed().as_secs()))
345 .unwrap_or(0)
346 }
347
348 pub fn qot_disconnect_sync_generation(&self) -> u64 {
350 self.qot_disconnect_sync_generation.load(Ordering::SeqCst)
351 }
352
353 fn bump_qot_disconnect_sync_generation(&self) {
354 self.qot_disconnect_sync_generation
355 .fetch_add(1, Ordering::SeqCst);
356 }
357
358 fn conn_has_qot_subs(&self, conn_id: u64) -> bool {
359 self.qot_subs
360 .read()
361 .values()
362 .any(|subs| subs.contains(&conn_id))
363 }
364
365 #[doc(hidden)]
366 pub fn backdate_qot_sub_time_broker_for_test(
367 &self,
368 sec_key: &QotSecurityKey,
369 sub_type: i32,
370 elapsed: Duration,
371 ) {
372 let map_key = (Self::broker_key(sec_key), sub_type);
373 let instant = Instant::now()
374 .checked_sub(elapsed)
375 .unwrap_or_else(Instant::now);
376 self.qot_sub_times.write().insert(map_key, instant);
377 }
378
379 pub fn unsubscribe_all_qot_collect_global_empty(&self, conn_id: u64) -> Vec<(String, i32)> {
384 let mut became_empty: Vec<(String, i32)> = Vec::new();
385 let keys_to_check: Vec<(QotSecurityKey, i32)> = self
386 .qot_subs
387 .read()
388 .iter()
389 .filter(|(_, set)| set.contains(&conn_id))
390 .map(|(k, _)| k.clone())
391 .collect();
392 {
393 let mut qot = self.qot_subs.write();
394 for k in keys_to_check {
395 if let Some(set) = qot.get_mut(&k) {
396 set.remove(&conn_id);
397 if set.is_empty() {
398 became_empty.push((k.0.cache_key(), k.1));
399 let removed_key = k.clone();
400 qot.remove(&k);
401 self.qot_sub_times.write().remove(&removed_key);
402 }
403 }
404 }
405 }
406 {
408 let mut sess = self.qot_sub_sessions.write();
409 sess.retain(|_, m| {
410 m.remove(&conn_id);
411 !m.is_empty()
412 });
413 }
414 {
415 let mut ob = self.qot_orderbook_detail.write();
416 ob.retain(|_, m| {
417 m.remove(&conn_id);
418 !m.is_empty()
419 });
420 }
421 {
422 let mut br = self.qot_broker_detail.write();
423 br.retain(|_, m| {
424 m.remove(&conn_id);
425 !m.is_empty()
426 });
427 }
428 {
429 let mut pr = self.qot_push_regs.write();
430 pr.retain(|_, set| {
431 set.remove(&conn_id);
432 !set.is_empty()
433 });
434 }
435 became_empty
436 }
437
438 pub fn cleanup_due_disconnected_qot(&self) -> Vec<(String, i32)> {
446 let disconnected: Vec<u64> = self.qot_disconnected_conns.read().iter().copied().collect();
447 if disconnected.is_empty() {
448 return Vec::new();
449 }
450
451 let mut to_remove: Vec<(u64, QotSecurityKey, i32)> = Vec::new();
452 {
453 let qot = self.qot_subs.read();
454 let sub_times = self.qot_sub_times.read();
455 for conn_id in &disconnected {
456 for ((key, sub_type), subs) in qot.iter() {
457 if !subs.contains(conn_id) {
458 continue;
459 }
460 let elapsed_ok = sub_times
461 .get(&(key.clone(), *sub_type))
462 .map(|instant| {
463 instant.elapsed() >= Duration::from_secs(QOT_MIN_UNSUB_ELAPSED_SECS)
464 })
465 .unwrap_or(true);
466 if elapsed_ok {
467 to_remove.push((*conn_id, key.clone(), *sub_type));
468 }
469 }
470 }
471 }
472
473 let mut became_empty = Vec::new();
474 for (conn_id, key, sub_type) in to_remove {
475 let display_key = key.cache_key();
476 if matches!(
477 self.unsubscribe_qot_inner(conn_id, key, sub_type),
478 UnsubResult::LastSubscriber
479 ) {
480 became_empty.push((display_key, sub_type));
481 }
482 }
483
484 {
485 let mut disconnected = self.qot_disconnected_conns.write();
486 disconnected.retain(|conn_id| self.conn_has_qot_subs(*conn_id));
487 }
488
489 if !became_empty.is_empty() {
490 self.bump_qot_disconnect_sync_generation();
491 }
492 became_empty
493 }
494
495 pub fn unsubscribe_all_qot_dry_run(&self, conn_id: u64) -> Vec<(String, i32)> {
507 let mut became_empty: Vec<(String, i32)> = Vec::new();
508 let qot = self.qot_subs.read();
509 for ((k, sub_type), set) in qot.iter() {
510 if !set.contains(&conn_id) {
511 continue;
512 }
513 if set.len() == 1 {
515 became_empty.push((k.cache_key(), *sub_type));
516 }
517 }
518 became_empty
519 }
520
521 pub fn unsubscribe_all_qot_commit(&self, conn_id: u64) -> Vec<(String, i32)> {
525 self.unsubscribe_all_qot_collect_global_empty(conn_id)
526 }
527
528 pub fn get_qot_subscribers_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> Vec<u64> {
532 self.qot_subs
533 .read()
534 .get(&(Self::broker_key(sec_key), sub_type))
535 .map(|s| s.iter().copied().collect())
536 .unwrap_or_default()
537 }
538
539 pub fn crypto_stock_globally_unsubscribed(&self, stock_id: u64) -> bool {
552 let qot = self.qot_subs.read();
553 !qot.iter()
554 .any(|((key, _sub_type), subs)| !subs.is_empty() && key.stock_key.stock_id == stock_id)
555 }
556
557 pub fn crypto_stock_broker_globally_unsubscribed(&self, stock_id: u64, broker_id: u32) -> bool {
568 let target_broker = std::num::NonZeroU32::new(broker_id);
569 let qot = self.qot_subs.read();
570 !qot.iter().any(|((key, _sub_type), subs)| {
571 !subs.is_empty()
572 && key.stock_key.stock_id == stock_id
573 && key.stock_key.broker_id == target_broker
574 })
575 }
576
577 pub fn register_push_broker(
583 &self,
584 conn_id: u64,
585 sec_key: &QotSecurityKey,
586 sub_type: i32,
587 rehab_type: i32,
588 ) {
589 self.register_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
590 }
591
592 fn register_push_inner(
593 &self,
594 conn_id: u64,
595 key: QotSecurityKey,
596 sub_type: i32,
597 rehab_type: i32,
598 ) {
599 let effective_rehab = if is_kl_sub_type(sub_type) {
600 rehab_type
601 } else {
602 0
603 };
604 self.qot_push_regs
605 .write()
606 .entry((key, sub_type, effective_rehab))
607 .or_default()
608 .insert(conn_id);
609 }
610
611 pub fn unregister_push_broker(
613 &self,
614 conn_id: u64,
615 sec_key: &QotSecurityKey,
616 sub_type: i32,
617 rehab_type: i32,
618 ) {
619 self.unregister_push_inner(conn_id, Self::broker_key(sec_key), sub_type, rehab_type)
620 }
621
622 fn unregister_push_inner(
623 &self,
624 conn_id: u64,
625 key: QotSecurityKey,
626 sub_type: i32,
627 rehab_type: i32,
628 ) {
629 let effective_rehab = if is_kl_sub_type(sub_type) {
630 rehab_type
631 } else {
632 0
633 };
634 let map_key = (key, sub_type, effective_rehab);
635 let mut pr = self.qot_push_regs.write();
636 if let Some(set) = pr.get_mut(&map_key) {
637 set.remove(&conn_id);
638 if set.is_empty() {
639 pr.remove(&map_key);
640 }
641 }
642 }
643
644 pub fn get_qot_push_subscribers_broker(
647 &self,
648 sec_key: &QotSecurityKey,
649 sub_type: i32,
650 rehab_type: i32,
651 ) -> Vec<u64> {
652 self.get_qot_push_subscribers_inner(Self::broker_key(sec_key), sub_type, rehab_type)
653 }
654
655 pub fn get_qot_push_subscribers_by_cache_key(
662 &self,
663 cache_key: &str,
664 sub_type: i32,
665 rehab_type: i32,
666 ) -> Vec<u64> {
667 if QotSecurityKey::parse_cache_key(cache_key).is_none() {
668 return Vec::new();
669 }
670
671 let effective_rehab = if is_kl_sub_type(sub_type) {
672 rehab_type
673 } else {
674 0
675 };
676 let regs = self.qot_push_regs.read();
677 let mut out = Vec::new();
678 for ((key, stored_sub_type, stored_rehab), subscribers) in regs.iter() {
679 if *stored_sub_type != sub_type
680 || *stored_rehab != effective_rehab
681 || key.cache_key() != cache_key
682 {
683 continue;
684 }
685 out.extend(subscribers.iter().copied());
686 }
687
688 out.sort_unstable();
689 out.dedup();
690 out
691 }
692
693 fn get_qot_push_subscribers_inner(
694 &self,
695 key: QotSecurityKey,
696 sub_type: i32,
697 rehab_type: i32,
698 ) -> Vec<u64> {
699 let effective_rehab = if is_kl_sub_type(sub_type) {
700 rehab_type
701 } else {
702 0
703 };
704 self.qot_push_regs
705 .read()
706 .get(&(key, sub_type, effective_rehab))
707 .map(|s| s.iter().copied().collect())
708 .unwrap_or_default()
709 }
710
711 pub fn is_push_registered_any_rehab_broker(
713 &self,
714 conn_id: u64,
715 sec_key: &QotSecurityKey,
716 sub_type: i32,
717 ) -> bool {
718 let broker_key = Self::broker_key(sec_key);
719 let pr = self.qot_push_regs.read();
720 pr.iter().any(|((k, st, _rehab), set)| {
721 k == &broker_key && *st == sub_type && set.contains(&conn_id)
722 })
723 }
724
725 pub fn set_conn_session_broker(
728 &self,
729 conn_id: u64,
730 sec_key: &QotSecurityKey,
731 sub_type: i32,
732 session: i32,
733 ) {
734 self.qot_sub_sessions
735 .write()
736 .entry((Self::broker_key(sec_key), sub_type))
737 .or_default()
738 .insert(conn_id, session);
739 }
740
741 pub fn get_global_session_broker(&self, sec_key: &QotSecurityKey, sub_type: i32) -> i32 {
742 self.qot_sub_sessions
743 .read()
744 .get(&(Self::broker_key(sec_key), sub_type))
745 .map(|m| m.values().copied().max().unwrap_or(1))
746 .unwrap_or(1)
747 }
748
749 pub fn get_conn_session_broker(
751 &self,
752 conn_id: u64,
753 sec_key: &QotSecurityKey,
754 sub_type: i32,
755 ) -> i32 {
756 self.qot_sub_sessions
757 .read()
758 .get(&(Self::broker_key(sec_key), sub_type))
759 .and_then(|m| m.get(&conn_id).copied())
760 .unwrap_or(1)
761 }
762
763 pub fn set_conn_orderbook_detail_broker(
764 &self,
765 conn_id: u64,
766 sec_key: &QotSecurityKey,
767 detail: bool,
768 ) {
769 self.qot_orderbook_detail
770 .write()
771 .entry(Self::broker_key(sec_key))
772 .or_default()
773 .insert(conn_id, detail);
774 }
775
776 pub fn is_global_orderbook_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
777 self.qot_orderbook_detail
778 .read()
779 .get(&Self::broker_key(sec_key))
780 .map(|m| m.values().any(|&d| d))
781 .unwrap_or(false)
782 }
783
784 pub fn set_conn_broker_detail_broker(
785 &self,
786 conn_id: u64,
787 sec_key: &QotSecurityKey,
788 detail: bool,
789 ) {
790 self.qot_broker_detail
791 .write()
792 .entry(Self::broker_key(sec_key))
793 .or_default()
794 .insert(conn_id, detail);
795 }
796
797 pub fn is_global_broker_detail_broker(&self, sec_key: &QotSecurityKey) -> bool {
798 self.qot_broker_detail
799 .read()
800 .get(&Self::broker_key(sec_key))
801 .map(|m| m.values().any(|&d| d))
802 .unwrap_or(false)
803 }
804
805 pub fn get_conn_used_quota(&self, conn_id: u64) -> u32 {
809 self.qot_subs
810 .read()
811 .iter()
812 .filter(|(_, set)| set.contains(&conn_id))
813 .count() as u32
814 }
815
816 pub fn get_total_used_quota(&self) -> u32 {
819 self.qot_subs.read().len() as u32
820 }
821
822 pub fn get_total_quota(&self) -> u32 {
824 *self.total_quota.read()
825 }
826
827 pub fn set_total_quota_from_backend(&self, value: u32) {
829 let mut q = self.total_quota.write();
830 if *q != value {
831 tracing::info!(
832 old = *q,
833 new = value,
834 "v1.4.106 codex 1131 F5: total_quota updated from backend"
835 );
836 *q = value;
837 }
838 }
839
840 pub fn get_remain_quota(&self) -> u32 {
841 let total = self.get_total_quota();
842 let used = self.get_total_used_quota();
843 total.saturating_sub(used)
844 }
845
846 pub fn get_conn_qot_subs(&self, conn_id: u64) -> HashMap<i32, Vec<String>> {
849 let qot = self.qot_subs.read();
850 let mut result: HashMap<i32, Vec<String>> = HashMap::new();
851 for ((key, sub_type), conn_ids) in qot.iter() {
852 if conn_ids.contains(&conn_id) {
853 result.entry(*sub_type).or_default().push(key.cache_key());
854 }
855 }
856 result
857 }
858
859 pub fn get_all_qot_conn_ids(&self) -> HashSet<u64> {
860 let qot = self.qot_subs.read();
861 let mut ids = HashSet::new();
862 for conn_ids in qot.values() {
863 ids.extend(conn_ids);
864 }
865 ids
866 }
867
868 pub fn get_all_trd_conn_ids(&self) -> HashSet<u64> {
870 let trd = self.trd_acc_subs.read();
871 let mut ids = HashSet::new();
872 for conn_ids in trd.values() {
873 ids.extend(conn_ids);
874 }
875 ids
876 }
877
878 pub fn compute_global_desired_set(&self) -> Vec<(String, i32)> {
882 let qot = self.qot_subs.read();
883 let mut out = Vec::with_capacity(qot.len());
884 for (k, sub_type) in qot.keys() {
885 out.push((k.cache_key(), *sub_type));
886 }
887 out
888 }
889 #[inline]
906 pub fn qot_global_desired_keys(&self) -> Vec<(String, i32)> {
907 self.compute_global_desired_set()
908 }
909
910 pub fn on_disconnect(&self, conn_id: u64) -> Vec<(String, i32)> {
913 self.notify_subs.write().remove(&conn_id);
914
915 {
916 let mut trd = self.trd_acc_subs.write();
917 for subs in trd.values_mut() {
918 subs.remove(&conn_id);
919 }
920 }
921
922 {
923 let mut pr = self.qot_push_regs.write();
924 pr.retain(|_, set| {
925 set.remove(&conn_id);
926 !set.is_empty()
927 });
928 }
929
930 if self.conn_has_qot_subs(conn_id) {
931 self.qot_disconnected_conns.write().insert(conn_id);
932 }
933 self.cleanup_due_disconnected_qot()
934 }
935}
936
937impl Default for SubscriptionManager {
938 fn default() -> Self {
939 Self::new()
940 }
941}
942
943#[inline]
944fn sub_type_orderbook() -> i32 {
945 2
946}
947
948#[inline]
949fn sub_type_broker() -> i32 {
950 14
951}
952
953#[inline]
955fn is_kl_sub_type(sub_type: i32) -> bool {
956 matches!(sub_type, 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 15 | 16 | 17)
957}
958
959#[cfg(test)]
960mod tests;