futu_auth/
audit.rs

1//! 审计事件发射 helpers + JSONL 订阅层
2//!
3//! ## 设计
4//!
5//! 所有 interface(grpc / rest / ws / mcp)把 auth 决策和下单事件都走同一组
6//! helper,emit 的 `tracing::event!` 使用固定 `target = "futu_audit"`。这样:
7//!
8//! - 正常日志订阅(stderr / 文件)照常能看到它们(没过滤就全走)
9//! - 专用 audit JSONL 文件订阅层走 `filter_fn(meta.target() == TARGET)` 只收这些
10//!
11//! JSONL 每条一行,字段固定 —— 字段顺序由 tracing-subscriber 的 json formatter
12//! 决定(level / timestamp / target / message / 其它 fields),方便 `jq`/`duckdb`
13//! 后处理。
14//!
15//! ## 事件形状(field 命名)
16//!
17//! | 字段       | 含义                                        |
18//! |-----------|---------------------------------------------|
19//! | iface     | `"grpc"` / `"rest"` / `"ws"` / `"mcp"`    |
20//! | endpoint  | 具体接口:路径 / proto_id / tool 名         |
21//! | key_id    | API key id,或 `<missing>` / `<invalid>`   |
22//! | outcome   | `"allow"` / `"reject"`                     |
23//! | reason    | reject 时的文字原因(allow 时也可选)       |
24//! | args_hash | 下单工具额外附:args 的 SHA-256 前 8 hex   |
25//! | scope     | 校验的 scope(allow 时可选)                |
26
27use std::path::Path;
28
29use tracing::Level;
30
31/// 固定 target,供 tracing filter 使用
32pub const TARGET: &str = "futu_audit";
33
34/// auth 拒绝事件
35pub fn reject(iface: &str, endpoint: &str, key_id: &str, reason: &str) {
36    tracing::event!(
37        target: TARGET,
38        Level::WARN,
39        iface = iface,
40        endpoint = endpoint,
41        key_id = key_id,
42        outcome = "reject",
43        reason = reason,
44        "auth reject"
45    );
46    crate::metrics::bump_auth_event(iface, "reject", key_id);
47    // 限额类的 reject 额外分桶计数(reason 以 "limit: " 开头是 guard 产生的)
48    if let Some(rest) = reason.strip_prefix("limit: ") {
49        crate::metrics::bump_limit_reject(iface, key_id, rest);
50    } else if reason.starts_with("rate limit")
51        || reason.starts_with("daily value")
52        || reason.starts_with("order value")
53    {
54        crate::metrics::bump_limit_reject(iface, key_id, reason);
55    }
56}
57
58/// auth 通过事件(用 INFO 级别;debug 模式会看到量比较大,由 EnvFilter 过滤)
59pub fn allow(iface: &str, endpoint: &str, key_id: &str, scope: Option<&str>) {
60    tracing::event!(
61        target: TARGET,
62        Level::INFO,
63        iface = iface,
64        endpoint = endpoint,
65        key_id = key_id,
66        outcome = "allow",
67        scope = scope.unwrap_or(""),
68        "auth allow"
69    );
70    crate::metrics::bump_auth_event(iface, "allow", key_id);
71}
72
73/// 交易事件(下单 / 改单 / 撤单)—— 无论 allow / reject 都记录
74pub fn trade(
75    iface: &str,
76    tool: &str,
77    key_id: &str,
78    args_hash: &str,
79    outcome: &str,
80    reason: Option<&str>,
81) {
82    tracing::event!(
83        target: TARGET,
84        Level::WARN,
85        iface = iface,
86        endpoint = tool,
87        key_id = key_id,
88        outcome = outcome,
89        args_hash = args_hash,
90        reason = reason.unwrap_or(""),
91        "trade event"
92    );
93    crate::metrics::bump_auth_event(iface, outcome, key_id);
94}
95
96// -------- JSONL 层安装 --------
97//
98// 这里不直接返回一个 `impl Layer<S>` 是因为 tracing-subscriber 的类型签名挺拗,
99// 放在 main.rs 里现场拼装反而清爽。下面这对 helper 把 "打开文件 + non-blocking
100// 包装" 抽成一个函数,并在退出时保留 guard 防止 flush 丢失。
101
102/// 打开 audit 输出路径,返回一个非阻塞 writer 和 guard(guard 必须活到进程退出)
103///
104/// - 如果 path 以 `.jsonl` / `.log` 等后缀结尾,直接当成单文件 append 打开
105/// - 否则视为目录,使用每日滚动,文件名 `futu-audit.log`
106pub fn open_writer(
107    path: &Path,
108) -> std::io::Result<(
109    tracing_appender::non_blocking::NonBlocking,
110    tracing_appender::non_blocking::WorkerGuard,
111)> {
112    // 启发式:
113    //   - 已存在的目录 → 目录
114    //   - 以 '/' 结尾 → 目录
115    //   - 没有扩展名 → 目录
116    //   - 有扩展名 → 单文件
117    // 想明确语义的话用户可以直接 `mkdir -p /foo/bar` 再传路径。
118    let is_dir_hint = path.is_dir()
119        || path.as_os_str().to_string_lossy().ends_with('/')
120        || path.extension().is_none();
121
122    if is_dir_hint {
123        // 目录:每日滚动
124        std::fs::create_dir_all(path)?;
125        let appender = tracing_appender::rolling::daily(path, "futu-audit.log");
126        Ok(tracing_appender::non_blocking(appender))
127    } else {
128        // 单文件:append
129        if let Some(parent) = path.parent() {
130            if !parent.as_os_str().is_empty() {
131                std::fs::create_dir_all(parent)?;
132            }
133        }
134        let file = std::fs::OpenOptions::new()
135            .append(true)
136            .create(true)
137            .open(path)?;
138        Ok(tracing_appender::non_blocking(file))
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
146
147    #[test]
148    fn target_constant() {
149        assert_eq!(TARGET, "futu_audit");
150    }
151
152    #[test]
153    fn open_single_file_writer() {
154        let dir = tempfile::tempdir().unwrap();
155        let p = dir.path().join("a.jsonl");
156        let (_w, _g) = open_writer(&p).unwrap();
157        assert!(p.exists());
158    }
159
160    #[test]
161    fn open_dir_writer_creates_dir() {
162        let dir = tempfile::tempdir().unwrap();
163        let p = dir.path().join("subdir_no_ext");
164        let (_w, _g) = open_writer(&p).unwrap();
165        assert!(p.is_dir());
166    }
167
168    #[test]
169    fn reject_event_hits_audit_target() {
170        use parking_lot::Mutex;
171        use std::sync::Arc;
172        // 捕获 writer
173        #[derive(Clone)]
174        struct BufWriter(Arc<Mutex<Vec<u8>>>);
175        impl std::io::Write for BufWriter {
176            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
177                self.0.lock().extend_from_slice(buf);
178                Ok(buf.len())
179            }
180            fn flush(&mut self) -> std::io::Result<()> {
181                Ok(())
182            }
183        }
184        impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for BufWriter {
185            type Writer = BufWriter;
186            fn make_writer(&'a self) -> Self::Writer {
187                self.clone()
188            }
189        }
190
191        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
192        let writer = BufWriter(buf.clone());
193        let layer = tracing_subscriber::fmt::layer()
194            .json()
195            .with_writer(writer)
196            .with_filter(tracing_subscriber::filter::filter_fn(|m| {
197                m.target() == TARGET
198            }));
199
200        let sub = tracing_subscriber::registry().with(layer);
201        let _guard = sub.set_default();
202        reject("grpc", "2202", "bot", "missing scope");
203        // 非 audit 的事件不应进入
204        tracing::warn!("regular warn should be ignored");
205
206        let got = String::from_utf8(buf.lock().clone()).unwrap();
207        assert!(got.contains("\"outcome\":\"reject\""), "got={got}");
208        assert!(got.contains("\"key_id\":\"bot\""));
209        assert!(got.contains("\"iface\":\"grpc\""));
210        assert!(!got.contains("regular warn"));
211    }
212}