warpgate/src/daemon.rs
grabbit 64d6171ec9 Unify logging to tracing: file appender + unified log viewer
Replace scattered println!/eprintln! with structured tracing macros throughout
supervisor, scheduler, and web modules. Add LogConfig (file + level) to Config
and a new logging module that initialises a stderr + optional non-blocking file
appender on `warpgate run`. Remove the in-memory LogBuffer/LogEntry from
AppState; the web /api/logs endpoint now reads the log file directly with
from_line/lines pagination. `warpgate log` replaces journalctl with `tail`,
and the Logs tab Alpine.js is updated to match the new API response shape.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 11:24:06 +08:00

483 lines
15 KiB
Rust

//! Shared state and command types for the daemon (supervisor + web server).
//!
//! The supervisor owns all mutable state. The web server gets read-only access
//! to status via `Arc<RwLock<DaemonStatus>>` and sends commands via an mpsc channel.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::time::{Instant, SystemTime};
use crate::config::Config;
/// Default port for the built-in web UI / API server.
pub const DEFAULT_WEB_PORT: u16 = 8090;
/// Shared application state accessible by both the supervisor and web server.
pub struct AppState {
/// Current active configuration (read by UI, updated on reload).
pub config: Arc<RwLock<Config>>,
/// Live daemon status (updated by supervisor every poll cycle).
pub status: Arc<RwLock<DaemonStatus>>,
/// Command channel: web server → supervisor.
pub cmd_tx: mpsc::Sender<SupervisorCmd>,
/// Path to the config file on disk.
pub config_path: PathBuf,
/// SSE broadcast: supervisor sends `()` after each status update;
/// web server subscribers render partials and push to connected clients.
pub sse_tx: tokio::sync::broadcast::Sender<()>,
}
/// Overall daemon status, updated by the supervisor loop.
pub struct DaemonStatus {
/// When the daemon started.
pub started_at: Instant,
/// Per-share mount status.
pub shares: Vec<ShareStatus>,
/// Whether smbd is running.
pub smbd_running: bool,
/// Whether WebDAV is running.
pub webdav_running: bool,
/// Whether NFS exports are active.
pub nfs_exported: bool,
/// Per-rule warmup status (populated when warmup is triggered).
pub warmup: Vec<WarmupRuleStatus>,
/// Generation counter — incremented each time warmup is (re)started.
/// Workers check this to detect when they've been superseded.
pub warmup_generation: u64,
/// Whether any dir-refresh threads are active.
pub dir_refresh_running: bool,
/// Timestamp of the last successful dir-refresh call, keyed by share name.
pub last_dir_refresh: HashMap<String, SystemTime>,
/// Generation counter for dir-refresh threads (incremented on each re-spawn).
pub dir_refresh_generation: u64,
/// Shared atomic for generation — dir-refresh threads hold a clone and check
/// this directly (without taking the RwLock) to detect supersession.
pub dir_refresh_gen_arc: Arc<AtomicU64>,
}
impl DaemonStatus {
/// Create initial status for a set of share names.
pub fn new(share_names: &[String]) -> Self {
Self {
started_at: Instant::now(),
shares: share_names
.iter()
.map(|name| ShareStatus {
name: name.clone(),
mounted: false,
rc_port: 0,
cache_bytes: 0,
dirty_count: 0,
errored_files: 0,
speed: 0.0,
transfers: 0,
errors: 0,
health: ShareHealth::Pending,
})
.collect(),
smbd_running: false,
webdav_running: false,
nfs_exported: false,
warmup: Vec::new(),
warmup_generation: 0,
dir_refresh_running: false,
last_dir_refresh: HashMap::new(),
dir_refresh_generation: 0,
dir_refresh_gen_arc: Arc::new(AtomicU64::new(0)),
}
}
/// Aggregate warmup summary for a share.
///
/// Returns `(state_label, done_count, total_count)` where state_label is
/// one of "none", "pending", "running", "complete", "failed".
pub fn warmup_summary_for(&self, share_name: &str) -> (&str, usize, usize) {
let rules: Vec<&WarmupRuleStatus> = self
.warmup
.iter()
.filter(|r| r.share == share_name)
.collect();
if rules.is_empty() {
return ("none", 0, 0);
}
let total: usize = rules.iter().map(|r| r.total_files).sum();
let done: usize = rules.iter().map(|r| r.cached + r.skipped).sum();
let any_failed = rules
.iter()
.any(|r| matches!(r.state, WarmupRuleState::Failed(_)));
let any_running = rules
.iter()
.any(|r| matches!(r.state, WarmupRuleState::Listing | WarmupRuleState::Caching));
let all_complete = rules
.iter()
.all(|r| matches!(r.state, WarmupRuleState::Complete));
let label = if any_failed {
"failed"
} else if any_running {
"running"
} else if all_complete {
"complete"
} else {
"pending"
};
(label, done, total)
}
/// How long ago the last dir-refresh ran for this share.
///
/// Returns `None` if no refresh has completed yet for the share.
pub fn dir_refresh_ago_for(&self, share_name: &str) -> Option<String> {
let ts = self.last_dir_refresh.get(share_name)?;
let secs = ts.elapsed().unwrap_or_default().as_secs();
Some(format_ago(secs))
}
/// Format uptime as a human-readable string.
pub fn uptime_string(&self) -> String {
let secs = self.started_at.elapsed().as_secs();
let days = secs / 86400;
let hours = (secs % 86400) / 3600;
let mins = (secs % 3600) / 60;
if days > 0 {
format!("{days}d {hours}h {mins}m")
} else if hours > 0 {
format!("{hours}h {mins}m")
} else {
format!("{mins}m")
}
}
}
/// Per-share runtime status.
pub struct ShareStatus {
/// Share name (matches ShareConfig.name).
pub name: String,
/// Whether the FUSE mount is active.
pub mounted: bool,
/// RC API port for this share's rclone instance.
pub rc_port: u16,
/// Bytes used in VFS disk cache.
pub cache_bytes: u64,
/// Number of dirty files (uploads_in_progress + uploads_queued).
pub dirty_count: u64,
/// Number of errored cache files.
pub errored_files: u64,
/// Current transfer speed (bytes/sec from core/stats).
pub speed: f64,
/// Active transfer count.
pub transfers: u64,
/// Cumulative error count.
pub errors: u64,
/// Pre-mount probe result.
pub health: ShareHealth,
}
impl ShareStatus {
/// Format cache size as human-readable string.
pub fn cache_display(&self) -> String {
format_bytes(self.cache_bytes)
}
/// Format speed as human-readable string.
pub fn speed_display(&self) -> String {
if self.speed < 1.0 {
"-".to_string()
} else {
format!("{}/s", format_bytes(self.speed as u64))
}
}
/// Human-readable health label: "PENDING", "PROBING", "OK", or "FAILED".
pub fn health_label(&self) -> &str {
match &self.health {
ShareHealth::Pending => "PENDING",
ShareHealth::Probing => "PROBING",
ShareHealth::Healthy => "OK",
ShareHealth::Failed(_) => "FAILED",
}
}
/// Error message when health is Failed, None otherwise.
pub fn health_message(&self) -> Option<&str> {
match &self.health {
ShareHealth::Failed(msg) => Some(msg),
_ => None,
}
}
/// Whether the share is healthy (probe succeeded).
pub fn is_healthy(&self) -> bool {
self.health == ShareHealth::Healthy
}
}
/// Format an elapsed-seconds count as "5s ago", "3m ago", "2h ago".
fn format_ago(secs: u64) -> String {
if secs < 60 {
format!("{secs}s ago")
} else if secs < 3600 {
format!("{}m ago", secs / 60)
} else {
format!("{}h ago", secs / 3600)
}
}
/// Format bytes as human-readable (e.g. "45.2 GiB").
fn format_bytes(bytes: u64) -> String {
const KIB: f64 = 1024.0;
const MIB: f64 = KIB * 1024.0;
const GIB: f64 = MIB * 1024.0;
const TIB: f64 = GIB * 1024.0;
let b = bytes as f64;
if b >= TIB {
format!("{:.1} TiB", b / TIB)
} else if b >= GIB {
format!("{:.1} GiB", b / GIB)
} else if b >= MIB {
format!("{:.1} MiB", b / MIB)
} else if b >= KIB {
format!("{:.1} KiB", b / KIB)
} else {
format!("{bytes} B")
}
}
/// Per-share health state from pre-mount probing.
#[derive(Clone, Debug, PartialEq)]
pub enum ShareHealth {
/// Not yet probed (initial state).
Pending,
/// Probe in progress.
Probing,
/// Remote path verified, ready to mount.
Healthy,
/// Probe failed — share will not be mounted.
Failed(String),
}
/// Per-rule warmup progress, updated by the warmup worker thread.
#[derive(Clone, Debug, serde::Serialize)]
pub struct WarmupRuleStatus {
pub share: String,
pub path: String,
pub newer_than: Option<String>,
pub state: WarmupRuleState,
pub total_files: usize,
pub skipped: usize,
pub cached: usize,
pub errors: usize,
}
/// State machine for a single warmup rule.
#[derive(Clone, Debug, PartialEq, serde::Serialize)]
pub enum WarmupRuleState {
Pending,
Listing,
Caching,
Complete,
Failed(String),
}
/// Commands sent from the web server (or CLI) to the supervisor.
pub enum SupervisorCmd {
/// Apply a new configuration (triggers tiered reload).
Reload(Config),
/// Graceful shutdown.
Shutdown,
/// Live bandwidth adjustment (Tier A — no restart needed).
BwLimit { up: String, down: String },
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(0), "0 B");
assert_eq!(format_bytes(500), "500 B");
assert_eq!(format_bytes(1024), "1.0 KiB");
assert_eq!(format_bytes(1536), "1.5 KiB");
assert_eq!(format_bytes(1048576), "1.0 MiB");
assert_eq!(format_bytes(1073741824), "1.0 GiB");
assert_eq!(format_bytes(1099511627776), "1.0 TiB");
}
#[test]
fn test_daemon_status_new() {
let names = vec!["photos".to_string(), "projects".to_string()];
let status = DaemonStatus::new(&names);
assert_eq!(status.shares.len(), 2);
assert_eq!(status.shares[0].name, "photos");
assert_eq!(status.shares[1].name, "projects");
assert!(!status.smbd_running);
assert!(!status.webdav_running);
assert!(!status.nfs_exported);
assert!(status.warmup.is_empty());
assert_eq!(status.warmup_generation, 0);
}
#[test]
fn test_uptime_string() {
let status = DaemonStatus::new(&[]);
let uptime = status.uptime_string();
assert!(uptime.contains('m'));
}
#[test]
fn test_share_status_display() {
let share = ShareStatus {
name: "test".into(),
mounted: true,
rc_port: 5572,
cache_bytes: 48_578_891_776, // ~45.2 GiB
dirty_count: 3,
errored_files: 0,
speed: 2_200_000.0,
transfers: 2,
errors: 0,
health: ShareHealth::Healthy,
};
assert!(share.cache_display().contains("GiB"));
assert!(share.speed_display().contains("/s"));
}
#[test]
fn test_share_status_no_speed() {
let share = ShareStatus {
name: "test".into(),
mounted: true,
rc_port: 5572,
cache_bytes: 0,
dirty_count: 0,
errored_files: 0,
speed: 0.0,
transfers: 0,
errors: 0,
health: ShareHealth::Pending,
};
assert_eq!(share.speed_display(), "-");
}
#[test]
fn test_share_health_labels() {
let mut share = ShareStatus {
name: "test".into(),
mounted: false,
rc_port: 0,
cache_bytes: 0,
dirty_count: 0,
errored_files: 0,
speed: 0.0,
transfers: 0,
errors: 0,
health: ShareHealth::Pending,
};
assert_eq!(share.health_label(), "PENDING");
assert!(share.health_message().is_none());
assert!(!share.is_healthy());
share.health = ShareHealth::Probing;
assert_eq!(share.health_label(), "PROBING");
share.health = ShareHealth::Healthy;
assert_eq!(share.health_label(), "OK");
assert!(share.is_healthy());
share.health = ShareHealth::Failed("remote path not found".into());
assert_eq!(share.health_label(), "FAILED");
assert_eq!(share.health_message(), Some("remote path not found"));
assert!(!share.is_healthy());
}
#[test]
fn test_warmup_summary_no_rules() {
let status = DaemonStatus::new(&["photos".to_string()]);
let (label, done, total) = status.warmup_summary_for("photos");
assert_eq!(label, "none");
assert_eq!(done, 0);
assert_eq!(total, 0);
}
#[test]
fn test_warmup_summary_pending() {
let mut status = DaemonStatus::new(&["photos".to_string()]);
status.warmup.push(WarmupRuleStatus {
share: "photos".into(),
path: "/2024".into(),
newer_than: None,
state: WarmupRuleState::Pending,
total_files: 0,
skipped: 0,
cached: 0,
errors: 0,
});
let (label, _, _) = status.warmup_summary_for("photos");
assert_eq!(label, "pending");
}
#[test]
fn test_warmup_summary_running() {
let mut status = DaemonStatus::new(&["photos".to_string()]);
status.warmup.push(WarmupRuleStatus {
share: "photos".into(),
path: "/2024".into(),
newer_than: None,
state: WarmupRuleState::Caching,
total_files: 100,
skipped: 10,
cached: 40,
errors: 0,
});
let (label, done, total) = status.warmup_summary_for("photos");
assert_eq!(label, "running");
assert_eq!(done, 50);
assert_eq!(total, 100);
}
#[test]
fn test_warmup_summary_complete() {
let mut status = DaemonStatus::new(&["photos".to_string()]);
status.warmup.push(WarmupRuleStatus {
share: "photos".into(),
path: "/2024".into(),
newer_than: None,
state: WarmupRuleState::Complete,
total_files: 100,
skipped: 30,
cached: 70,
errors: 0,
});
let (label, done, total) = status.warmup_summary_for("photos");
assert_eq!(label, "complete");
assert_eq!(done, 100);
assert_eq!(total, 100);
}
#[test]
fn test_warmup_summary_wrong_share() {
let mut status = DaemonStatus::new(&["photos".to_string()]);
status.warmup.push(WarmupRuleStatus {
share: "photos".into(),
path: "/2024".into(),
newer_than: None,
state: WarmupRuleState::Caching,
total_files: 50,
skipped: 0,
cached: 10,
errors: 0,
});
let (label, _, _) = status.warmup_summary_for("videos");
assert_eq!(label, "none");
}
}