merge: backend config, web auth, notifications, scheduled warmup, nas_offline/all_synced

This commit is contained in:
grabbit 2026-02-19 15:45:17 +08:00
commit d4bc2dd59d
5 changed files with 319 additions and 1 deletions

View File

@ -33,6 +33,10 @@ pub struct Config {
pub dir_refresh: DirRefreshConfig,
#[serde(default)]
pub log: LogConfig,
#[serde(default)]
pub web: WebConfig,
#[serde(default)]
pub notifications: NotificationsConfig,
pub shares: Vec<ShareConfig>,
}
@ -179,12 +183,56 @@ pub struct ProtocolsConfig {
pub webdav_port: u16,
}
/// Web UI configuration.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WebConfig {
/// Web UI password for HTTP Basic Auth. Empty = no auth (default).
#[serde(default)]
pub password: String,
}
/// Push notification configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotificationsConfig {
/// Webhook URL for notifications (Telegram/Bark/DingTalk compatible). Empty = disabled.
#[serde(default)]
pub webhook_url: String,
/// Cache usage % threshold to trigger notification (default: 80).
#[serde(default = "default_notify_cache_threshold")]
pub cache_threshold_pct: u8,
/// Minutes NAS must be offline before notification (default: 5).
#[serde(default = "default_notify_offline_minutes")]
pub nas_offline_minutes: u64,
/// Write-back queue depth that triggers notification (default: 50).
#[serde(default = "default_notify_writeback_depth")]
pub writeback_depth: u64,
}
impl Default for NotificationsConfig {
fn default() -> Self {
Self {
webhook_url: String::new(),
cache_threshold_pct: default_notify_cache_threshold(),
nas_offline_minutes: default_notify_offline_minutes(),
writeback_depth: default_notify_writeback_depth(),
}
}
}
fn default_notify_cache_threshold() -> u8 { 80 }
fn default_notify_offline_minutes() -> u64 { 5 }
fn default_notify_writeback_depth() -> u64 { 50 }
/// Warmup configuration — auto-cache paths on startup.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WarmupConfig {
/// Auto-warmup on startup (default: true when rules exist).
#[serde(default = "default_true")]
pub auto: bool,
/// Cron schedule for periodic cache warmup (e.g. "0 2 * * *" = 2am daily).
/// Empty = disabled (only runs on startup if auto=true).
#[serde(default)]
pub warmup_schedule: String,
/// Warmup rules — paths to pre-cache.
#[serde(default)]
pub rules: Vec<WarmupRule>,
@ -194,6 +242,7 @@ impl Default for WarmupConfig {
fn default() -> Self {
Self {
auto: true,
warmup_schedule: String::new(),
rules: Vec::new(),
}
}
@ -484,6 +533,23 @@ impl Config {
writeln!(out, "recursive = {}", self.dir_refresh.recursive).unwrap();
writeln!(out).unwrap();
// --- Web UI ---
writeln!(out, "# --- Web UI (change = no restart) ---").unwrap();
writeln!(out, "[web]").unwrap();
writeln!(out, "# password = \"your-password\" # Set to enable HTTP Basic Auth").unwrap();
writeln!(out, "password = {:?}", self.web.password).unwrap();
writeln!(out).unwrap();
// --- Notifications ---
writeln!(out, "# --- Notifications (change = no restart) ---").unwrap();
writeln!(out, "[notifications]").unwrap();
writeln!(out, "# webhook_url = \"https://api.telegram.org/bot<token>/sendMessage?chat_id=<id>\"").unwrap();
writeln!(out, "webhook_url = {:?}", self.notifications.webhook_url).unwrap();
writeln!(out, "cache_threshold_pct = {}", self.notifications.cache_threshold_pct).unwrap();
writeln!(out, "nas_offline_minutes = {}", self.notifications.nas_offline_minutes).unwrap();
writeln!(out, "writeback_depth = {}", self.notifications.writeback_depth).unwrap();
writeln!(out).unwrap();
// --- Shares ---
writeln!(out, "# --- Shares (change = per-share restart) ---").unwrap();
for share in &self.shares {
@ -512,6 +578,8 @@ impl Config {
writeln!(out, "# --- Warmup (change = no restart) ---").unwrap();
writeln!(out, "[warmup]").unwrap();
writeln!(out, "auto = {}", self.warmup.auto).unwrap();
writeln!(out, "# warmup_schedule = \"0 2 * * *\" # Nightly at 2am").unwrap();
writeln!(out, "warmup_schedule = {:?}", self.warmup.warmup_schedule).unwrap();
writeln!(out).unwrap();
for rule in &self.warmup.rules {
writeln!(out, "[[warmup.rules]]").unwrap();
@ -633,6 +701,11 @@ impl Config {
}
}
// Validate notification thresholds
if self.notifications.cache_threshold_pct > 100 {
anyhow::bail!("notifications.cache_threshold_pct must be 0100, got {}", self.notifications.cache_threshold_pct);
}
// Validate SMB auth
if self.smb_auth.enabled {
if self.smb_auth.username.is_none() {

View File

@ -60,6 +60,18 @@ pub struct DaemonStatus {
pub dir_refresh_dirs_ok: HashMap<String, usize>,
/// Number of subdirectories that failed to refresh in the last cycle, keyed by share name.
pub dir_refresh_dirs_failed: HashMap<String, usize>,
/// Whether all NAS connections are currently unreachable.
pub nas_offline: bool,
/// Whether all write-back has completed (dirty_count=0, transfers=0).
pub all_synced: bool,
/// When NAS first went offline (for offline-duration notification).
pub nas_offline_since: Option<Instant>,
/// Whether we've already sent the NAS-offline notification (reset on reconnect).
pub nas_offline_notified: bool,
/// Cache warning level already notified (0=none, 3=writeback depth).
pub cache_notified_level: u8,
/// Whether we've already sent the cache-threshold notification (reset when usage drops).
pub cache_threshold_notified: bool,
}
impl DaemonStatus {
@ -93,6 +105,12 @@ impl DaemonStatus {
dir_refresh_gen_arc: Arc::new(AtomicU64::new(0)),
dir_refresh_dirs_ok: HashMap::new(),
dir_refresh_dirs_failed: HashMap::new(),
nas_offline: false,
all_synced: false,
nas_offline_since: None,
nas_offline_notified: false,
cache_notified_level: 0,
cache_threshold_notified: false,
}
}
@ -301,6 +319,8 @@ pub enum SupervisorCmd {
Shutdown,
/// Live bandwidth adjustment (Tier A — no restart needed).
BwLimit { up: String, down: String },
/// Reconnect (re-probe + re-mount) a single share by name.
Reconnect(String),
}
#[cfg(test)]

View File

@ -695,6 +695,15 @@ fn spawn_webdav(config: &Config) -> Result<Child> {
.context("Failed to spawn rclone serve webdav")
}
/// Send a notification via webhook (fire-and-forget, logs on error).
fn send_webhook_notification(url: &str, message: &str) {
if url.is_empty() { return; }
let body = serde_json::json!({ "text": message, "message": message });
if let Err(e) = ureq::post(url).send_json(&body) {
warn!("Notification webhook failed: {}", e);
}
}
/// Main supervision loop with command channel.
///
/// Uses `recv_timeout` on the command channel so it can both respond to
@ -715,6 +724,7 @@ fn supervise(
let mut webdav_tracker = RestartTracker::new();
let mut prev_states: HashMap<String, SharePrevState> = HashMap::new();
let mut last_stats_snapshot = Instant::now();
let mut last_scheduled_warmup: Option<Instant> = None;
loop {
// Check for commands (non-blocking with timeout = POLL_INTERVAL)
@ -727,6 +737,26 @@ fn supervise(
info!(bw_limit_up = %up, bw_limit_down = %down, "bandwidth limit applied");
apply_bwlimit(mounts, &up, &down);
}
Ok(SupervisorCmd::Reconnect(share_name)) => {
info!("Reconnect requested for share '{}'", share_name);
if let Some(_idx) = shared_config.read().unwrap().shares.iter().position(|s| s.name == share_name) {
// Kill existing mount if running
if let Some(pos) = mounts.iter().position(|m| m.name == share_name) {
let mut m = mounts.remove(pos);
let _ = m.child.kill();
let _ = m.child.wait();
}
// Reset health so it gets re-probed and re-mounted on next poll
let mut status = shared_status.write().unwrap();
if let Some(s) = status.shares.iter_mut().find(|s| s.name == share_name) {
s.mounted = false;
s.health = crate::daemon::ShareHealth::Pending;
}
info!("Share '{}' reset for reconnect", share_name);
} else {
warn!("Reconnect: share '{}' not found", share_name);
}
}
Ok(SupervisorCmd::Reload(new_config)) => {
info!("Config reload requested...");
handle_reload(
@ -839,6 +869,111 @@ fn supervise(
// Update shared status with fresh RC stats
update_status(shared_status, mounts, protocols, &config);
// Compute nas_offline and all_synced, then check notifications
{
let mut status = shared_status.write().unwrap();
// nas_offline: true when ALL shares are either not mounted or have failed health
let all_failed = status.shares.iter().all(|s| !s.mounted || matches!(s.health, ShareHealth::Failed(_)));
let any_mounted = status.shares.iter().any(|s| s.mounted);
let nas_offline = !any_mounted || (any_mounted && all_failed);
// all_synced: true when dirty_count=0 and transfers=0 across all shares
let total_dirty: u64 = status.shares.iter().map(|s| s.dirty_count).sum();
let total_transfers: u64 = status.shares.iter().map(|s| s.transfers).sum();
let all_synced = total_dirty == 0 && total_transfers == 0;
status.nas_offline = nas_offline;
status.all_synced = all_synced;
// Check notifications
let notif = config.notifications.clone();
if !notif.webhook_url.is_empty() {
let url = notif.webhook_url.clone();
// NAS offline notification
if status.nas_offline {
if status.nas_offline_since.is_none() {
status.nas_offline_since = Some(Instant::now());
}
let elapsed_mins = status.nas_offline_since
.map(|t| t.elapsed().as_secs() / 60)
.unwrap_or(0);
if elapsed_mins >= notif.nas_offline_minutes && !status.nas_offline_notified {
send_webhook_notification(&url, &format!(
"\u{26a0}\u{fe0f} Warpgate: NAS has been offline for {} minutes. Writes are queued locally.", elapsed_mins
));
status.nas_offline_notified = true;
}
} else {
status.nas_offline_since = None;
status.nas_offline_notified = false;
}
// Cache usage % notification
if notif.cache_threshold_pct > 0 {
let total_cache: u64 = status.shares.iter().map(|s| s.cache_bytes).sum();
if let Some(max_bytes) = parse_size_bytes(&config.cache.max_size) {
let pct = (total_cache as f64 / max_bytes as f64 * 100.0) as u8;
if pct >= notif.cache_threshold_pct && !status.cache_threshold_notified {
send_webhook_notification(&url, &format!(
"\u{26a0}\u{fe0f} Warpgate: cache usage {}% — consider cleaning", pct
));
status.cache_threshold_notified = true;
} else if pct < notif.cache_threshold_pct.saturating_sub(5) {
// Hysteresis: reset when usage drops 5% below threshold
status.cache_threshold_notified = false;
}
}
}
// Write-back depth notification
if total_dirty >= notif.writeback_depth {
if status.cache_notified_level < 3 {
send_webhook_notification(&url, &format!(
"\u{26a0}\u{fe0f} Warpgate: {} files pending write-back to NAS.", total_dirty
));
status.cache_notified_level = 3;
}
} else if status.cache_notified_level == 3 {
status.cache_notified_level = 0;
}
}
}
// Scheduled warmup check
{
let cfg = shared_config.read().unwrap();
let schedule = cfg.warmup.warmup_schedule.clone();
if !schedule.is_empty() && !cfg.warmup.rules.is_empty() {
let should_run = match last_scheduled_warmup {
None => {
// First check: see if current hour matches schedule hour
let now = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let hour_of_day = (now % 86400) / 3600;
// Parse "0 H * * *" -> extract H
let scheduled_hour = schedule.split_whitespace()
.nth(1)
.and_then(|h| h.parse::<u64>().ok())
.unwrap_or(2);
hour_of_day == scheduled_hour
}
Some(last) => last.elapsed() >= Duration::from_secs(86400),
};
if should_run {
info!("Scheduled warmup triggered (schedule: {})", schedule);
last_scheduled_warmup = Some(Instant::now());
let cfg_clone = cfg.clone();
drop(cfg);
spawn_warmup(&cfg_clone, shared_status, &shutdown);
}
}
}
// Log cache state changes and periodic snapshots
log_cache_events(shared_status, &config, &mut prev_states, &mut last_stats_snapshot);

View File

@ -22,6 +22,7 @@ pub fn routes() -> Router<SharedState> {
.route("/api/config", post(post_config))
.route("/api/bwlimit", post(post_bwlimit))
.route("/api/logs", get(get_logs))
.route("/api/reconnect/{share}", post(reconnect_share))
}
/// GET /api/status — overall daemon status.
@ -33,6 +34,8 @@ struct StatusResponse {
webdav_running: bool,
nfs_exported: bool,
warmup: Vec<WarmupRuleStatusResponse>,
nas_offline: bool,
all_synced: bool,
}
#[derive(Serialize)]
@ -153,6 +156,8 @@ async fn get_status(State(state): State<SharedState>) -> Json<StatusResponse> {
}
})
.collect(),
nas_offline: status.nas_offline,
all_synced: status.all_synced,
})
}
@ -380,3 +385,22 @@ async fn get_logs(
entries,
})
}
/// POST /api/reconnect/{share} — trigger reconnect for a single share.
async fn reconnect_share(
State(state): State<SharedState>,
Path(share_name): Path<String>,
) -> Json<serde_json::Value> {
// Validate share exists
{
let cfg = state.config.read().unwrap();
if cfg.find_share(&share_name).is_none() {
return Json(serde_json::json!({ "ok": false, "message": format!("Share '{}' not found", share_name) }));
}
}
match state.cmd_tx.send(crate::daemon::SupervisorCmd::Reconnect(share_name.clone())) {
Ok(()) => Json(serde_json::json!({ "ok": true, "message": format!("Reconnecting share '{}'", share_name) })),
Err(e) => Json(serde_json::json!({ "ok": false, "message": format!("Failed to send reconnect: {}", e) })),
}
}

View File

@ -11,8 +11,11 @@ use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::thread;
use axum::http::header;
use axum::extract::Request;
use axum::http::{header, StatusCode};
use axum::middleware::{self, Next};
use axum::response::IntoResponse;
use axum::response::Response;
use axum::routing::get;
use axum::Router;
@ -29,6 +32,68 @@ async fn style_css() -> impl IntoResponse {
([(header::CONTENT_TYPE, "text/css")], STYLE_CSS)
}
/// HTTP Basic Auth middleware. Only active when `web.password` is set.
async fn basic_auth(
axum::extract::State(state): axum::extract::State<SharedState>,
request: Request,
next: Next,
) -> Response {
let password = {
let cfg = state.config.read().unwrap();
cfg.web.password.clone()
};
if password.is_empty() {
return next.run(request).await;
}
// Check Authorization header
let auth_header = request
.headers()
.get(header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.unwrap_or("");
if auth_header.starts_with("Basic ") {
let encoded = &auth_header[6..];
if let Ok(decoded) = base64_decode(encoded) {
// Format: "warpgate:<password>" or just ":<password>"
let parts: Vec<&str> = decoded.splitn(2, ':').collect();
let provided_password = parts.get(1).copied().unwrap_or("");
if provided_password == password {
return next.run(request).await;
}
}
}
// Return 401 with WWW-Authenticate header
(
StatusCode::UNAUTHORIZED,
[(header::WWW_AUTHENTICATE, "Basic realm=\"Warpgate\"")],
"Unauthorized",
).into_response()
}
fn base64_decode(s: &str) -> Result<String, ()> {
let alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = Vec::new();
let s = s.trim_end_matches('=');
let mut buf = 0u32;
let mut bits = 0u32;
for c in s.chars() {
if let Some(pos) = alphabet.find(c) {
buf = (buf << 6) | pos as u32;
bits += 6;
if bits >= 8 {
bits -= 8;
result.push((buf >> bits) as u8);
buf &= (1 << bits) - 1;
}
}
}
String::from_utf8(result).map_err(|_| ())
}
/// Build the axum router with all routes.
pub fn build_router(state: SharedState) -> Router {
Router::new()
@ -36,6 +101,7 @@ pub fn build_router(state: SharedState) -> Router {
.merge(pages::routes())
.merge(sse::routes())
.merge(api::routes())
.layer(middleware::from_fn_with_state(state.clone(), basic_auth))
.with_state(state)
}