1mod guard;
14mod handlers;
15mod state;
16mod tool_account;
17mod tool_args;
18mod tool_auth;
19mod tool_enums;
20mod tools;
21mod trade_pwd;
22mod transport;
26
27use std::path::PathBuf;
28use std::sync::Arc;
29
30use anyhow::{Context, Result};
31use clap::{ArgMatches, CommandFactory, FromArgMatches, Parser, parser::ValueSource};
32use futu_auth::KeyStore;
33use rmcp::ServiceExt;
34use crate::transport::resilient_stdio;
36use tracing_subscriber::{
37 EnvFilter, Layer, filter::filter_fn, fmt, layer::SubscriberExt, util::SubscriberInitExt,
38};
39
40use crate::state::ServerState;
41use crate::tools::FutuServer;
42
43fn setup_logging(
48 default_level: &str,
49 audit_path: Option<&std::path::Path>,
50) -> Result<Option<tracing_appender::non_blocking::WorkerGuard>> {
51 let filter =
52 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
53
54 let fmt_layer = fmt::layer()
55 .with_timer(futu_core::log::LocalRfc3339Timer)
56 .with_writer(std::io::stderr)
57 .with_ansi(false);
58
59 let registry = tracing_subscriber::registry().with(filter).with(fmt_layer);
60
61 if let Some(path) = audit_path {
62 let (writer, guard) = futu_auth::audit::open_writer(path)
63 .with_context(|| format!("open audit log {}", path.display()))?;
64 let audit_layer = fmt::layer()
65 .json()
66 .with_timer(futu_core::log::LocalRfc3339Timer)
67 .flatten_event(true)
68 .with_current_span(false)
69 .with_span_list(false)
70 .with_target(true)
71 .with_writer(writer)
72 .with_filter(filter_fn(|meta| meta.target() == futu_auth::audit::TARGET));
73 registry.with(audit_layer).init();
74 tracing::info!(
75 path = %path.display(),
76 "audit JSONL logger enabled (target=futu_audit)"
77 );
78 Ok(Some(guard))
79 } else {
80 registry.init();
81 Ok(None)
82 }
83}
84
85#[derive(Parser)]
87#[command(
88 name = "futu-mcp",
89 version,
90 about = "FutuOpenD-rs MCP server",
91 long_about = "通过 Model Context Protocol 暴露 Futu 行情/账户工具。默认 stdio transport。"
92)]
93struct Cli {
94 #[arg(short, long, env = "FUTU_GATEWAY", default_value = "127.0.0.1:11111")]
96 gateway: String,
97
98 #[arg(short, long)]
100 verbose: bool,
101
102 #[arg(long)]
107 keys_file: Option<PathBuf>,
108
109 #[arg(long, env = "FUTU_MCP_API_KEY", hide_env_values = true)]
113 api_key: Option<String>,
114
115 #[arg(long, env = "FUTU_TRADE_PWD_ACCOUNT")]
121 trade_pwd_account: Option<String>,
122
123 #[arg(long)]
129 enable_trading: bool,
130
131 #[arg(long, requires = "enable_trading")]
133 allow_real_trading: bool,
134
135 #[arg(long)]
142 audit_log: Option<PathBuf>,
143
144 #[arg(long)]
153 http_listen: Option<String>,
154
155 #[arg(long, requires = "tls_key")]
160 tls_cert: Option<PathBuf>,
161
162 #[arg(long, requires = "tls_cert")]
164 tls_key: Option<PathBuf>,
165
166 #[arg(long)]
178 config: Option<PathBuf>,
179}
180
181#[derive(Debug, Default, serde::Deserialize)]
196#[serde(default, deny_unknown_fields)]
197struct FileConfig {
198 gateway: Option<String>,
199 verbose: Option<bool>,
200 keys_file: Option<PathBuf>,
201 api_key: Option<String>,
202 trade_pwd_account: Option<String>,
203 enable_trading: Option<bool>,
204 allow_real_trading: Option<bool>,
205 audit_log: Option<PathBuf>,
206 http_listen: Option<String>,
207 tls_cert: Option<PathBuf>,
208 tls_key: Option<PathBuf>,
209}
210
211fn is_cli_explicit(matches: &ArgMatches, arg_id: &str) -> bool {
227 matches!(
228 matches.value_source(arg_id),
229 Some(ValueSource::CommandLine) | Some(ValueSource::EnvVariable)
230 )
231}
232
233impl Cli {
234 fn merge_config(mut self, matches: &ArgMatches) -> Result<Self> {
240 let Some(config_path) = &self.config else {
241 return Ok(self);
242 };
243 let content = std::fs::read_to_string(config_path)
244 .with_context(|| format!("read config file {}", config_path.display()))?;
245 let fc: FileConfig = toml::from_str(&content)
246 .with_context(|| format!("parse config file {}", config_path.display()))?;
247
248 if let Some(g) = fc.gateway
251 && !is_cli_explicit(matches, "gateway")
252 {
253 self.gateway = g;
254 }
255 if self.keys_file.is_none() {
258 self.keys_file = fc.keys_file;
259 }
260 if self.api_key.is_none()
261 && let Some(k) = fc.api_key
262 {
263 self.api_key = Some(k);
264 }
265 if self.trade_pwd_account.is_none() {
266 self.trade_pwd_account = fc.trade_pwd_account;
267 }
268 if fc.verbose.is_some() && !is_cli_explicit(matches, "verbose") {
273 self.verbose = fc.verbose.unwrap_or(false);
274 }
275 if fc.enable_trading.is_some() && !is_cli_explicit(matches, "enable_trading") {
276 self.enable_trading = fc.enable_trading.unwrap_or(false);
277 }
278 if fc.allow_real_trading.is_some() && !is_cli_explicit(matches, "allow_real_trading") {
279 self.allow_real_trading = fc.allow_real_trading.unwrap_or(false);
280 }
281 if self.audit_log.is_none() {
282 self.audit_log = fc.audit_log;
283 }
284 if self.http_listen.is_none() {
285 self.http_listen = fc.http_listen;
286 }
287 if self.tls_cert.is_none() {
288 self.tls_cert = fc.tls_cert;
289 }
290 if self.tls_key.is_none() {
291 self.tls_key = fc.tls_key;
292 }
293 eprintln!("[config] loaded {}", config_path.display());
295 Ok(self)
296 }
297}
298
299#[tokio::main]
300async fn main() -> Result<()> {
301 let matches = Cli::command().get_matches();
305 let cli = Cli::from_arg_matches(&matches)
306 .map_err(|e| anyhow::anyhow!("clap derive build failed: {e}"))?
307 .merge_config(&matches)?;
308
309 let default_level = if cli.verbose { "debug" } else { "info" };
311
312 let _audit_guard = setup_logging(default_level, cli.audit_log.as_deref())?;
314
315 let key_store = match &cli.keys_file {
317 Some(path) => {
318 let store = KeyStore::load(path)
319 .with_context(|| format!("load keys file {}", path.display()))?;
320 tracing::info!(
321 path = %path.display(),
322 keys_loaded = store.len(),
323 "scope mode: keys file loaded"
324 );
325 if cli.enable_trading || cli.allow_real_trading {
326 tracing::warn!(
327 "--enable-trading / --allow-real-trading are IGNORED in scope mode; \
328 trading permissions are controlled by API key scopes"
329 );
330 }
331 Arc::new(store)
332 }
333 None => {
334 tracing::info!("legacy mode: no keys file; using --enable-trading switches");
335 Arc::new(KeyStore::empty())
336 }
337 };
338
339 let authed_key = if key_store.is_configured() {
341 match cli.api_key.as_deref() {
342 Some(plaintext) if !plaintext.is_empty() => match key_store.verify(plaintext) {
343 Some(rec) => {
344 tracing::info!(
345 key_id = %rec.id,
346 scopes = ?rec.scopes.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
347 "API key verified"
348 );
349 Some(rec)
350 }
351 None => {
352 tracing::error!("FUTU_MCP_API_KEY does not match any key in keys.json");
353 None
354 }
355 },
356 _ => {
357 tracing::warn!(
358 "scope mode active but FUTU_MCP_API_KEY not set; \
359 all tool calls will be rejected"
360 );
361 None
362 }
363 }
364 } else {
365 None
366 };
367
368 tracing::info!(
369 gateway = %cli.gateway,
370 scope_mode = key_store.is_configured(),
371 enable_trading = cli.enable_trading,
372 allow_real_trading = cli.allow_real_trading,
373 trade_pwd_account = cli.trade_pwd_account.as_deref().unwrap_or("<legacy/env>"),
374 "futu-mcp starting"
375 );
376 if !key_store.is_configured() && cli.enable_trading {
377 tracing::warn!(
378 allow_real_trading = cli.allow_real_trading,
379 "trading write tools ENABLED (legacy mode)"
380 );
381 }
382
383 let state = ServerState::new(cli.gateway)
384 .with_trading(cli.enable_trading, cli.allow_real_trading)
385 .with_key_store(key_store.clone())
386 .with_authed_key(authed_key)
387 .with_trade_pwd_account(cli.trade_pwd_account);
388 let server = FutuServer::new(state.clone());
389
390 if key_store.is_configured() && key_store.has_any_card_num_restrictions() {
407 spawn_card_num_expand_retry(state.clone(), key_store.clone());
408 } else if key_store.is_configured() {
409 tracing::debug!("v1.4.105 eli #4: keystore 无 allowed_card_nums 限制, 跳过 daemon expand");
410 }
411
412 #[cfg(unix)]
415 spawn_sighup_reload(key_store, state.clone());
416
417 futu_auth::metrics::install(std::sync::Arc::new(futu_auth::MetricsRegistry::default()));
421
422 if let Some(listen) = cli.http_listen {
423 let tls = match (cli.tls_cert, cli.tls_key) {
424 (Some(cert), Some(key)) => Some((cert, key)),
425 _ => None,
426 };
427 serve_http(server, &listen, tls).await?;
428 } else {
429 serve_stdio(server).await?;
430 }
431
432 Ok(())
433}
434
435async fn serve_stdio(server: tools::FutuServer) -> Result<()> {
442 let service = server
443 .serve(resilient_stdio())
444 .await
445 .map_err(|e| anyhow::anyhow!("MCP service init failed: {e}"))?;
446
447 service
448 .waiting()
449 .await
450 .map_err(|e| anyhow::anyhow!("MCP service error: {e}"))?;
451 Ok(())
452}
453
454fn render_mcp_metrics_body() -> String {
462 let registry = futu_auth::metrics::global();
463 render_mcp_metrics_body_for(registry.as_deref())
464}
465
466fn render_mcp_metrics_body_for(registry: Option<&futu_auth::MetricsRegistry>) -> String {
467 registry.map(|r| r.render_prometheus()).unwrap_or_else(|| {
468 concat!(
469 "# HELP futu_metrics_registry_installed Whether futu_auth metrics registry is installed (1=yes, 0=no)\n",
470 "# TYPE futu_metrics_registry_installed gauge\n",
471 "futu_metrics_registry_installed{state=\"metrics registry not installed\"} 0\n"
472 )
473 .to_string()
474 })
475}
476
477async fn serve_http(
478 server: tools::FutuServer,
479 listen: &str,
480 tls: Option<(PathBuf, PathBuf)>,
481) -> Result<()> {
482 use rmcp::transport::streamable_http_server::{
483 StreamableHttpService, session::local::LocalSessionManager,
484 };
485
486 let bind_addr = if listen.starts_with(':') {
488 format!("0.0.0.0{listen}")
489 } else {
490 listen.to_string()
491 };
492
493 let session_manager = std::sync::Arc::new(LocalSessionManager::default());
497 let mcp_svc = StreamableHttpService::new(
498 {
499 let server = server.clone();
500 move || Ok::<_, std::io::Error>(server.clone())
501 },
502 session_manager,
503 Default::default(),
504 );
505
506 use axum::routing::get;
511 let mcp_with_auth_hint = axum::Router::new()
512 .nest_service("/mcp", mcp_svc)
513 .layer(axum::middleware::from_fn(inject_www_authenticate));
514
515 let app = axum::Router::new()
516 .route(
517 "/metrics",
518 get(|| async {
519 let body = render_mcp_metrics_body();
520 (
521 axum::http::StatusCode::OK,
522 [(
523 axum::http::header::CONTENT_TYPE,
524 "text/plain; version=0.0.4",
525 )],
526 body,
527 )
528 }),
529 )
530 .route(
531 "/.well-known/oauth-protected-resource",
532 get(oauth_protected_resource_metadata),
533 )
534 .merge(mcp_with_auth_hint);
535
536 let bind_addr_sock: std::net::SocketAddr = bind_addr
537 .parse()
538 .map_err(|e| anyhow::anyhow!("invalid bind address {bind_addr}: {e}"))?;
539
540 if let Some((cert_path, key_path)) = tls {
541 let tls_config =
543 axum_server::tls_rustls::RustlsConfig::from_pem_file(&cert_path, &key_path)
544 .await
545 .with_context(|| {
546 format!(
547 "load TLS cert={} key={}",
548 cert_path.display(),
549 key_path.display()
550 )
551 })?;
552 let handle = axum_server::Handle::new();
553 let shutdown_handle = handle.clone();
554 tokio::spawn(async move {
555 shutdown_signal().await;
556 tracing::info!("graceful shutdown: draining HTTPS connections...");
557 shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
558 });
559 tracing::info!(
560 addr = %bind_addr,
561 cert = %cert_path.display(),
562 "futu-mcp HTTPS transport started \
563 (MCP: /mcp, metrics: /metrics, OAuth metadata: /.well-known/oauth-protected-resource)"
564 );
565 axum_server::bind_rustls(bind_addr_sock, tls_config)
566 .handle(handle)
567 .serve(app.into_make_service())
568 .await
569 .map_err(|e| anyhow::anyhow!("axum-server TLS serve error: {e}"))?;
570 } else {
571 let listener = tokio::net::TcpListener::bind(&bind_addr)
573 .await
574 .map_err(|e| anyhow::anyhow!("bind {bind_addr}: {e}"))?;
575 tracing::info!(
576 addr = %bind_addr,
577 "futu-mcp HTTP transport started \
578 (MCP: /mcp, metrics: /metrics, OAuth metadata: /.well-known/oauth-protected-resource)"
579 );
580 axum::serve(listener, app)
581 .with_graceful_shutdown(async {
582 shutdown_signal().await;
583 tracing::info!("graceful shutdown: draining HTTP connections...");
584 })
585 .await
586 .map_err(|e| anyhow::anyhow!("axum serve error: {e}"))?;
587 }
588 tracing::info!("server stopped");
589 Ok(())
590}
591
592async fn shutdown_signal() {
595 #[cfg(unix)]
596 {
597 use tokio::signal::unix::{SignalKind, signal};
598 let sigterm = match signal(SignalKind::terminate()) {
599 Ok(signal) => Some(signal),
600 Err(e) => {
601 tracing::error!(error = %e, "failed to install SIGTERM handler");
602 None
603 }
604 };
605 let sigint = match signal(SignalKind::interrupt()) {
606 Ok(signal) => Some(signal),
607 Err(e) => {
608 tracing::error!(error = %e, "failed to install SIGINT handler");
609 None
610 }
611 };
612
613 match (sigterm, sigint) {
614 (Some(mut sigterm), Some(mut sigint)) => {
615 tokio::select! {
616 _ = sigterm.recv() => tracing::info!("received SIGTERM"),
617 _ = sigint.recv() => tracing::info!("received SIGINT"),
618 }
619 }
620 (Some(mut sigterm), None) => {
621 sigterm.recv().await;
622 tracing::info!("received SIGTERM");
623 }
624 (None, Some(mut sigint)) => {
625 sigint.recv().await;
626 tracing::info!("received SIGINT");
627 }
628 (None, None) => wait_for_ctrl_c_or_pending().await,
629 }
630 }
631 #[cfg(not(unix))]
632 {
633 wait_for_ctrl_c_or_pending().await;
634 }
635}
636
637async fn wait_for_ctrl_c_or_pending() {
638 match tokio::signal::ctrl_c().await {
639 Ok(()) => tracing::info!("received Ctrl-C"),
640 Err(e) => {
641 tracing::error!(
642 error = %e,
643 "failed to install ctrl-c handler; graceful shutdown signal unavailable"
644 );
645 std::future::pending::<()>().await;
646 }
647 }
648}
649
650async fn oauth_protected_resource_metadata() -> axum::response::Json<serde_json::Value> {
662 axum::response::Json(serde_json::json!({
663 "resource": "/mcp",
664 "bearer_methods_supported": ["header"],
665 "scopes_supported": [
666 "qot:read",
667 "acc:read",
668 "trade:simulate",
669 "trade:real",
670 "trade:unlock"
671 ],
672 "resource_name": "FutuOpenD-rs MCP",
673 "resource_documentation": "https://futuapi.com/reference/mcp/",
674 }))
675}
676
677async fn inject_www_authenticate(
683 req: axum::extract::Request,
684 next: axum::middleware::Next,
685) -> axum::response::Response {
686 let mut resp = next.run(req).await;
687 let status = resp.status();
688 if (status == axum::http::StatusCode::UNAUTHORIZED
689 || status == axum::http::StatusCode::FORBIDDEN)
690 && !resp
691 .headers()
692 .contains_key(axum::http::header::WWW_AUTHENTICATE)
693 {
694 let value = axum::http::HeaderValue::from_static(
697 "Bearer resource_metadata=\"/.well-known/oauth-protected-resource\"",
698 );
699 resp.headers_mut()
700 .insert(axum::http::header::WWW_AUTHENTICATE, value);
701 }
702 resp
703}
704
705#[cfg(unix)]
706fn spawn_sighup_reload(store: Arc<KeyStore>, state: ServerState) {
707 if !store.is_configured() {
708 return;
709 }
710 use tokio::signal::unix::{SignalKind, signal};
711 tokio::spawn(async move {
712 let mut sig = match signal(SignalKind::hangup()) {
713 Ok(s) => s,
714 Err(e) => {
715 tracing::error!(error = %e, "failed to install SIGHUP handler");
716 return;
717 }
718 };
719 tracing::info!("SIGHUP handler installed; send `kill -HUP <pid>` to reload keys");
720 while sig.recv().await.is_some() {
721 match store.reload() {
724 Ok(()) => tracing::warn!(keys_loaded = store.len(), "keys file reloaded on SIGHUP"),
725 Err(e) => {
726 tracing::error!(error = %e, "SIGHUP reload failed; keeping old keys");
727 continue;
728 }
729 }
730 if store.has_any_card_num_restrictions() {
736 let store_clone = store.clone();
737 let state_clone = state.clone();
738 tokio::spawn(async move {
739 if let Err(e) = expand_card_nums_via_daemon(&state_clone, &store_clone).await {
740 tracing::warn!(
741 error = %e,
742 "v1.4.105 eli #4: SIGHUP re-expand failed; sentinel 仍生效保护"
743 );
744 }
745 });
746 }
747 }
748 });
749}
750
751fn spawn_card_num_expand_retry(state: ServerState, key_store: Arc<KeyStore>) {
760 tokio::spawn(async move {
761 const MAX_ATTEMPTS: u32 = 6;
762 const RETRY_INTERVAL_SECS: u64 = 10;
763 for attempt in 1..=MAX_ATTEMPTS {
764 match expand_card_nums_via_daemon(&state, &key_store).await {
765 Ok(()) => {
766 tracing::info!(
767 attempt,
768 "v1.4.105 eli #4: standalone MCP allowed_card_nums expanded \
769 (与 daemon expand 路径 byte-identical)"
770 );
771 return;
772 }
773 Err(e) => {
774 if attempt < MAX_ATTEMPTS {
775 tracing::warn!(
776 attempt,
777 max = MAX_ATTEMPTS,
778 error = %e,
779 "v1.4.105 eli #4: card_num expand 失败, {RETRY_INTERVAL_SECS}s 后重试"
780 );
781 tokio::time::sleep(std::time::Duration::from_secs(RETRY_INTERVAL_SECS))
782 .await;
783 } else {
784 tracing::error!(
785 attempt,
786 error = %e,
787 "v1.4.105 eli #4: card_num expand 在 {MAX_ATTEMPTS} × \
788 {RETRY_INTERVAL_SECS}s 后仍失败; 受限 key 走 fail-closed \
789 sentinel reject 直到下次 SIGHUP / 手动 reload"
790 );
791 }
792 }
793 }
794 }
795 });
796}
797
798async fn expand_card_nums_via_daemon(state: &ServerState, key_store: &Arc<KeyStore>) -> Result<()> {
805 let client = state
807 .client()
808 .await
809 .with_context(|| "connect to daemon for card_num expand")?;
810
811 let accs = futu_trd::account::get_acc_list_for_account_discovery(&client)
814 .await
815 .with_context(|| "GetAccList for card_num expand")?;
816
817 if accs.is_empty() {
818 return Err(anyhow::anyhow!(
819 "GetAccList returned empty list (daemon 已起但无账户?)"
820 ));
821 }
822
823 let resolver = build_card_num_resolver(accs);
828
829 let (resolved, unresolved, ambiguous) = key_store.expand_allowed_card_nums(
832 &resolver,
833 |key_id, cn| {
834 tracing::warn!(
835 key_id = %key_id,
836 card_num = %cn,
837 "v1.4.105 eli #4 fail-closed: card_num not found in daemon GetAccList; \
838 sentinel acc_id=0 让限额引擎 reject 真账户 (写完整 16 位 / specific 4 位)"
839 );
840 },
841 |key_id, cn, candidates| {
842 tracing::warn!(
843 key_id = %key_id,
844 card_num = %cn,
845 candidates = ?candidates,
846 "v1.4.105 eli #4 fail-closed: ambiguous card_num suffix matched 多账户 \
847 (skipped; 写完整 16 位 / specific 4 位)"
848 );
849 },
850 );
851
852 tracing::info!(
853 resolved,
854 unresolved,
855 ambiguous,
856 "v1.4.105 eli #4: standalone MCP allowed_card_nums expanded into allowed_acc_ids"
857 );
858 Ok(())
859}
860
861fn build_card_num_resolver(accs: Vec<futu_trd::TrdAcc>) -> impl Fn(&str) -> Vec<u64> {
867 move |input: &str| -> Vec<u64> {
868 futu_core::account_locator::match_card_num_in_records(&accs, input, None)
869 .unwrap_or_default()
870 }
871}
872
873#[cfg(test)]
874mod tests;