From a8fe1859e3150d3be3a49ef6a2af4092a7f276ce Mon Sep 17 00:00:00 2001 From: grabbit Date: Thu, 19 Feb 2026 15:37:05 +0800 Subject: [PATCH] Add web auth, notifications, scheduled warmup, NAS offline state - config: add [web] password field for HTTP Basic Auth - config: add [notifications] webhook URL + thresholds - config: add warmup.warmup_schedule for nightly cache warmup - daemon: add nas_offline, all_synced, notification tracking to DaemonStatus - daemon: add SupervisorCmd::Reconnect(String) for share reconnect - supervisor: compute nas_offline/all_synced each poll cycle - supervisor: send webhook notifications (NAS offline, writeback depth) - supervisor: handle Reconnect command (kill+reset share for re-probe) - supervisor: scheduled warmup based on warmup_schedule cron hour - web/mod: HTTP Basic Auth middleware (when web.password is set) - web/api: expose nas_offline, all_synced in status endpoint - web/api: POST /api/reconnect/{share} endpoint Co-Authored-By: Claude Sonnet 4.6 --- src/config.rs | 68 ++++++++++++++++++++++++++ src/daemon.rs | 17 +++++++ src/supervisor.rs | 118 ++++++++++++++++++++++++++++++++++++++++++++++ src/web/api.rs | 24 ++++++++++ src/web/mod.rs | 68 +++++++++++++++++++++++++- 5 files changed, 294 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 9d275b7..7121415 100644 --- a/src/config.rs +++ b/src/config.rs @@ -33,6 +33,10 @@ pub struct Config { pub dir_refresh: DirRefreshConfig, #[serde(default)] pub log: LogConfig, + #[serde(default)] + pub web: WebConfig, + #[serde(default)] + pub notifications: NotificationsConfig, pub shares: Vec, } @@ -179,12 +183,56 @@ pub struct ProtocolsConfig { pub webdav_port: u16, } +/// Web UI configuration. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct WebConfig { + /// Web UI password for HTTP Basic Auth. Empty = no auth (default). + #[serde(default)] + pub password: String, +} + +/// Push notification configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NotificationsConfig { + /// Webhook URL for notifications (Telegram/Bark/DingTalk compatible). Empty = disabled. + #[serde(default)] + pub webhook_url: String, + /// Cache usage % threshold to trigger notification (default: 80). + #[serde(default = "default_notify_cache_threshold")] + pub cache_threshold_pct: u8, + /// Minutes NAS must be offline before notification (default: 5). + #[serde(default = "default_notify_offline_minutes")] + pub nas_offline_minutes: u64, + /// Write-back queue depth that triggers notification (default: 50). + #[serde(default = "default_notify_writeback_depth")] + pub writeback_depth: u64, +} + +impl Default for NotificationsConfig { + fn default() -> Self { + Self { + webhook_url: String::new(), + cache_threshold_pct: default_notify_cache_threshold(), + nas_offline_minutes: default_notify_offline_minutes(), + writeback_depth: default_notify_writeback_depth(), + } + } +} + +fn default_notify_cache_threshold() -> u8 { 80 } +fn default_notify_offline_minutes() -> u64 { 5 } +fn default_notify_writeback_depth() -> u64 { 50 } + /// Warmup configuration — auto-cache paths on startup. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WarmupConfig { /// Auto-warmup on startup (default: true when rules exist). #[serde(default = "default_true")] pub auto: bool, + /// Cron schedule for periodic cache warmup (e.g. "0 2 * * *" = 2am daily). + /// Empty = disabled (only runs on startup if auto=true). + #[serde(default)] + pub warmup_schedule: String, /// Warmup rules — paths to pre-cache. #[serde(default)] pub rules: Vec, @@ -194,6 +242,7 @@ impl Default for WarmupConfig { fn default() -> Self { Self { auto: true, + warmup_schedule: String::new(), rules: Vec::new(), } } @@ -484,6 +533,23 @@ impl Config { writeln!(out, "recursive = {}", self.dir_refresh.recursive).unwrap(); writeln!(out).unwrap(); + // --- Web UI --- + writeln!(out, "# --- Web UI (change = no restart) ---").unwrap(); + writeln!(out, "[web]").unwrap(); + writeln!(out, "# password = \"your-password\" # Set to enable HTTP Basic Auth").unwrap(); + writeln!(out, "password = {:?}", self.web.password).unwrap(); + writeln!(out).unwrap(); + + // --- Notifications --- + writeln!(out, "# --- Notifications (change = no restart) ---").unwrap(); + writeln!(out, "[notifications]").unwrap(); + writeln!(out, "# webhook_url = \"https://api.telegram.org/bot/sendMessage?chat_id=\"").unwrap(); + writeln!(out, "webhook_url = {:?}", self.notifications.webhook_url).unwrap(); + writeln!(out, "cache_threshold_pct = {}", self.notifications.cache_threshold_pct).unwrap(); + writeln!(out, "nas_offline_minutes = {}", self.notifications.nas_offline_minutes).unwrap(); + writeln!(out, "writeback_depth = {}", self.notifications.writeback_depth).unwrap(); + writeln!(out).unwrap(); + // --- Shares --- writeln!(out, "# --- Shares (change = per-share restart) ---").unwrap(); for share in &self.shares { @@ -512,6 +578,8 @@ impl Config { writeln!(out, "# --- Warmup (change = no restart) ---").unwrap(); writeln!(out, "[warmup]").unwrap(); writeln!(out, "auto = {}", self.warmup.auto).unwrap(); + writeln!(out, "# warmup_schedule = \"0 2 * * *\" # Nightly at 2am").unwrap(); + writeln!(out, "warmup_schedule = {:?}", self.warmup.warmup_schedule).unwrap(); writeln!(out).unwrap(); for rule in &self.warmup.rules { writeln!(out, "[[warmup.rules]]").unwrap(); diff --git a/src/daemon.rs b/src/daemon.rs index 3e3449a..88cbf49 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -60,6 +60,16 @@ pub struct DaemonStatus { pub dir_refresh_dirs_ok: HashMap, /// Number of subdirectories that failed to refresh in the last cycle, keyed by share name. pub dir_refresh_dirs_failed: HashMap, + /// Whether all NAS connections are currently unreachable. + pub nas_offline: bool, + /// Whether all write-back has completed (dirty_count=0, transfers=0). + pub all_synced: bool, + /// When NAS first went offline (for offline-duration notification). + pub nas_offline_since: Option, + /// Whether we've already sent the NAS-offline notification (reset on reconnect). + pub nas_offline_notified: bool, + /// Cache warning level already notified (0=none, 3=writeback depth). + pub cache_notified_level: u8, } impl DaemonStatus { @@ -93,6 +103,11 @@ impl DaemonStatus { dir_refresh_gen_arc: Arc::new(AtomicU64::new(0)), dir_refresh_dirs_ok: HashMap::new(), dir_refresh_dirs_failed: HashMap::new(), + nas_offline: false, + all_synced: false, + nas_offline_since: None, + nas_offline_notified: false, + cache_notified_level: 0, } } @@ -301,6 +316,8 @@ pub enum SupervisorCmd { Shutdown, /// Live bandwidth adjustment (Tier A — no restart needed). BwLimit { up: String, down: String }, + /// Reconnect (re-probe + re-mount) a single share by name. + Reconnect(String), } #[cfg(test)] diff --git a/src/supervisor.rs b/src/supervisor.rs index 52b2fcf..201ce49 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -695,6 +695,15 @@ fn spawn_webdav(config: &Config) -> Result { .context("Failed to spawn rclone serve webdav") } +/// Send a notification via webhook (fire-and-forget, logs on error). +fn send_webhook_notification(url: &str, message: &str) { + if url.is_empty() { return; } + let body = serde_json::json!({ "text": message, "message": message }); + if let Err(e) = ureq::post(url).send_json(&body) { + warn!("Notification webhook failed: {}", e); + } +} + /// Main supervision loop with command channel. /// /// Uses `recv_timeout` on the command channel so it can both respond to @@ -715,6 +724,7 @@ fn supervise( let mut webdav_tracker = RestartTracker::new(); let mut prev_states: HashMap = HashMap::new(); let mut last_stats_snapshot = Instant::now(); + let mut last_scheduled_warmup: Option = None; loop { // Check for commands (non-blocking with timeout = POLL_INTERVAL) @@ -727,6 +737,26 @@ fn supervise( info!(bw_limit_up = %up, bw_limit_down = %down, "bandwidth limit applied"); apply_bwlimit(mounts, &up, &down); } + Ok(SupervisorCmd::Reconnect(share_name)) => { + info!("Reconnect requested for share '{}'", share_name); + if let Some(_idx) = shared_config.read().unwrap().shares.iter().position(|s| s.name == share_name) { + // Kill existing mount if running + if let Some(pos) = mounts.iter().position(|m| m.name == share_name) { + let mut m = mounts.remove(pos); + let _ = m.child.kill(); + let _ = m.child.wait(); + } + // Reset health so it gets re-probed and re-mounted on next poll + let mut status = shared_status.write().unwrap(); + if let Some(s) = status.shares.iter_mut().find(|s| s.name == share_name) { + s.mounted = false; + s.health = crate::daemon::ShareHealth::Pending; + } + info!("Share '{}' reset for reconnect", share_name); + } else { + warn!("Reconnect: share '{}' not found", share_name); + } + } Ok(SupervisorCmd::Reload(new_config)) => { info!("Config reload requested..."); handle_reload( @@ -839,6 +869,94 @@ fn supervise( // Update shared status with fresh RC stats update_status(shared_status, mounts, protocols, &config); + // Compute nas_offline and all_synced, then check notifications + { + let mut status = shared_status.write().unwrap(); + + // nas_offline: true when ALL shares are either not mounted or have failed health + let all_failed = status.shares.iter().all(|s| !s.mounted || matches!(s.health, ShareHealth::Failed(_))); + let any_mounted = status.shares.iter().any(|s| s.mounted); + let nas_offline = !any_mounted || (any_mounted && all_failed); + + // all_synced: true when dirty_count=0 and transfers=0 across all shares + let total_dirty: u64 = status.shares.iter().map(|s| s.dirty_count).sum(); + let total_transfers: u64 = status.shares.iter().map(|s| s.transfers).sum(); + let all_synced = total_dirty == 0 && total_transfers == 0; + + status.nas_offline = nas_offline; + status.all_synced = all_synced; + + // Check notifications + let notif = config.notifications.clone(); + if !notif.webhook_url.is_empty() { + let url = notif.webhook_url.clone(); + + // NAS offline notification + if status.nas_offline { + if status.nas_offline_since.is_none() { + status.nas_offline_since = Some(Instant::now()); + } + let elapsed_mins = status.nas_offline_since + .map(|t| t.elapsed().as_secs() / 60) + .unwrap_or(0); + if elapsed_mins >= notif.nas_offline_minutes && !status.nas_offline_notified { + send_webhook_notification(&url, &format!( + "\u{26a0}\u{fe0f} Warpgate: NAS has been offline for {} minutes. Writes are queued locally.", elapsed_mins + )); + status.nas_offline_notified = true; + } + } else { + status.nas_offline_since = None; + status.nas_offline_notified = false; + } + + // Write-back depth notification + if total_dirty >= notif.writeback_depth { + if status.cache_notified_level < 3 { + send_webhook_notification(&url, &format!( + "\u{26a0}\u{fe0f} Warpgate: {} files pending write-back to NAS.", total_dirty + )); + status.cache_notified_level = 3; + } + } else if status.cache_notified_level == 3 { + status.cache_notified_level = 0; + } + } + } + + // Scheduled warmup check + { + let cfg = shared_config.read().unwrap(); + let schedule = cfg.warmup.warmup_schedule.clone(); + if !schedule.is_empty() && !cfg.warmup.rules.is_empty() { + let should_run = match last_scheduled_warmup { + None => { + // First check: see if current hour matches schedule hour + let now = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let hour_of_day = (now % 86400) / 3600; + // Parse "0 H * * *" -> extract H + let scheduled_hour = schedule.split_whitespace() + .nth(1) + .and_then(|h| h.parse::().ok()) + .unwrap_or(2); + hour_of_day == scheduled_hour + } + Some(last) => last.elapsed() >= Duration::from_secs(86400), + }; + + if should_run { + info!("Scheduled warmup triggered (schedule: {})", schedule); + last_scheduled_warmup = Some(Instant::now()); + let cfg_clone = cfg.clone(); + drop(cfg); + spawn_warmup(&cfg_clone, shared_status, &shutdown); + } + } + } + // Log cache state changes and periodic snapshots log_cache_events(shared_status, &config, &mut prev_states, &mut last_stats_snapshot); diff --git a/src/web/api.rs b/src/web/api.rs index 2dedb69..c5993b7 100644 --- a/src/web/api.rs +++ b/src/web/api.rs @@ -22,6 +22,7 @@ pub fn routes() -> Router { .route("/api/config", post(post_config)) .route("/api/bwlimit", post(post_bwlimit)) .route("/api/logs", get(get_logs)) + .route("/api/reconnect/{share}", post(reconnect_share)) } /// GET /api/status — overall daemon status. @@ -33,6 +34,8 @@ struct StatusResponse { webdav_running: bool, nfs_exported: bool, warmup: Vec, + nas_offline: bool, + all_synced: bool, } #[derive(Serialize)] @@ -153,6 +156,8 @@ async fn get_status(State(state): State) -> Json { } }) .collect(), + nas_offline: status.nas_offline, + all_synced: status.all_synced, }) } @@ -380,3 +385,22 @@ async fn get_logs( entries, }) } + +/// POST /api/reconnect/{share} — trigger reconnect for a single share. +async fn reconnect_share( + State(state): State, + Path(share_name): Path, +) -> Json { + // Validate share exists + { + let cfg = state.config.read().unwrap(); + if cfg.find_share(&share_name).is_none() { + return Json(serde_json::json!({ "ok": false, "message": format!("Share '{}' not found", share_name) })); + } + } + + match state.cmd_tx.send(crate::daemon::SupervisorCmd::Reconnect(share_name.clone())) { + Ok(()) => Json(serde_json::json!({ "ok": true, "message": format!("Reconnecting share '{}'", share_name) })), + Err(e) => Json(serde_json::json!({ "ok": false, "message": format!("Failed to send reconnect: {}", e) })), + } +} diff --git a/src/web/mod.rs b/src/web/mod.rs index 46aa587..e37583a 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -11,8 +11,11 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread; -use axum::http::header; +use axum::extract::Request; +use axum::http::{header, StatusCode}; +use axum::middleware::{self, Next}; use axum::response::IntoResponse; +use axum::response::Response; use axum::routing::get; use axum::Router; @@ -29,6 +32,68 @@ async fn style_css() -> impl IntoResponse { ([(header::CONTENT_TYPE, "text/css")], STYLE_CSS) } +/// HTTP Basic Auth middleware. Only active when `web.password` is set. +async fn basic_auth( + axum::extract::State(state): axum::extract::State, + request: Request, + next: Next, +) -> Response { + let password = { + let cfg = state.config.read().unwrap(); + cfg.web.password.clone() + }; + + if password.is_empty() { + return next.run(request).await; + } + + // Check Authorization header + let auth_header = request + .headers() + .get(header::AUTHORIZATION) + .and_then(|h| h.to_str().ok()) + .unwrap_or(""); + + if auth_header.starts_with("Basic ") { + let encoded = &auth_header[6..]; + if let Ok(decoded) = base64_decode(encoded) { + // Format: "warpgate:" or just ":" + let parts: Vec<&str> = decoded.splitn(2, ':').collect(); + let provided_password = parts.get(1).copied().unwrap_or(""); + if provided_password == password { + return next.run(request).await; + } + } + } + + // Return 401 with WWW-Authenticate header + ( + StatusCode::UNAUTHORIZED, + [(header::WWW_AUTHENTICATE, "Basic realm=\"Warpgate\"")], + "Unauthorized", + ).into_response() +} + +fn base64_decode(s: &str) -> Result { + let alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + let mut result = Vec::new(); + let s = s.trim_end_matches('='); + let mut buf = 0u32; + let mut bits = 0u32; + for c in s.chars() { + if let Some(pos) = alphabet.find(c) { + buf = (buf << 6) | pos as u32; + bits += 6; + if bits >= 8 { + bits -= 8; + result.push((buf >> bits) as u8); + buf &= (1 << bits) - 1; + } + } + } + String::from_utf8(result).map_err(|_| ()) +} + /// Build the axum router with all routes. pub fn build_router(state: SharedState) -> Router { Router::new() @@ -36,6 +101,7 @@ pub fn build_router(state: SharedState) -> Router { .merge(pages::routes()) .merge(sse::routes()) .merge(api::routes()) + .layer(middleware::from_fn_with_state(state.clone(), basic_auth)) .with_state(state) }