warpgate/src/supervisor.rs
grabbit 00df439bc5 Add cache operation logging and fix log viewer to show latest entries
- supervisor: emit structured events for dirty_count/errored_files
  changes, cache size changes >10%, transfer active/idle transitions,
  cache ≥80%/≥95% warnings, and 60s periodic stats snapshots
- supervisor: add parse_size_bytes() helper; structured BwLimit log
- warmup: add per-file debug/info/warn logging with 100-file milestones
  and rule-complete summary
- web/api: fix /api/logs initial load to return most recent entries
  (tail behaviour) instead of oldest entries

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 14:39:41 +08:00

1594 lines
55 KiB
Rust

//! `warpgate run` — single-process supervisor for all services.
//!
//! Manages rclone mount processes (one per share) + protocol services in one
//! process tree with coordinated startup and shutdown. Spawns a built-in web
//! server for status monitoring and config hot-reload.
use std::collections::HashMap;
use std::os::unix::process::CommandExt;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{self, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant, SystemTime};
use anyhow::{Context, Result};
use tracing::{error, info, warn};
use crate::config::Config;
use crate::config_diff::{self, ChangeTier};
use crate::daemon::{DaemonStatus, ShareHealth, SupervisorCmd, WarmupRuleState, WarmupRuleStatus};
use crate::scheduler::ScheduledTask;
use crate::rclone::mount::{build_mount_args, is_mounted};
use crate::rclone::rc;
use crate::services::{nfs, samba, webdav};
/// Mount ready timeout.
const MOUNT_TIMEOUT: Duration = Duration::from_secs(30);
/// Supervision loop poll interval.
const POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Grace period for SIGTERM before escalating to SIGKILL.
const SIGTERM_GRACE: Duration = Duration::from_secs(3);
/// Max restart attempts before giving up on a protocol service.
const MAX_RESTARTS: u32 = 3;
/// Reset restart counter after this period of stable running.
const RESTART_STABLE_PERIOD: Duration = Duration::from_secs(300);
/// Max time to wait for write-back queue to drain on shutdown.
const WRITEBACK_DRAIN_TIMEOUT: Duration = Duration::from_secs(300);
/// Poll interval when waiting for write-back drain.
const WRITEBACK_POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Transfer speed below this value is considered idle (bytes/sec).
const SPEED_ACTIVE_THRESHOLD: f64 = 10_240.0; // 10 KiB/s
/// Interval for periodic cache stats snapshots.
const STATS_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(60);
/// Cache usage WARN threshold (fraction of max_size).
const CACHE_WARN_THRESHOLD: f64 = 0.80;
/// Cache usage CRIT threshold.
const CACHE_CRITICAL_THRESHOLD: f64 = 0.95;
/// Per-share state from the previous poll cycle, used for change detection.
struct SharePrevState {
dirty_count: u64,
errored_files: u64,
cache_bytes: u64,
is_active: bool,
/// 0 = normal, 1 = ≥80%, 2 = ≥95%
cache_warn_level: u8,
}
/// Tracks restart attempts for a supervised child process.
struct RestartTracker {
count: u32,
last_restart: Option<Instant>,
}
impl RestartTracker {
fn new() -> Self {
Self {
count: 0,
last_restart: None,
}
}
/// Returns true if another restart is allowed. Resets counter if the
/// service has been stable for `RESTART_STABLE_PERIOD`.
fn can_restart(&mut self) -> bool {
if let Some(last) = self.last_restart
&& last.elapsed() >= RESTART_STABLE_PERIOD
{
self.count = 0;
}
self.count < MAX_RESTARTS
}
fn record_restart(&mut self) {
self.count += 1;
self.last_restart = Some(Instant::now());
}
}
/// A named rclone mount child process for a single share.
struct MountChild {
name: String,
child: Child,
rc_port: u16,
}
/// Child processes for protocol servers managed by the supervisor.
///
/// Implements `Drop` to kill any spawned children — prevents orphaned
/// processes if startup fails partway through `start_protocols()`.
struct ProtocolChildren {
smbd: Option<Child>,
webdav: Option<Child>,
}
impl Drop for ProtocolChildren {
fn drop(&mut self) {
for child in [&mut self.smbd, &mut self.webdav].into_iter().flatten() {
graceful_kill(child);
}
}
}
/// Entry point — called from main.rs for `warpgate run`.
pub fn run(config: &Config, config_path: PathBuf) -> Result<()> {
let shutdown = Arc::new(AtomicBool::new(false));
// Install signal handler (SIGTERM + SIGINT)
let shutdown_flag = Arc::clone(&shutdown);
ctrlc::set_handler(move || {
info!("Signal received, shutting down...");
shutdown_flag.store(true, Ordering::SeqCst);
})
.context("Failed to set signal handler")?;
// Set up shared state for web server integration
let shared_config = Arc::new(RwLock::new(config.clone()));
let share_names: Vec<String> = config.shares.iter().map(|s| s.name.clone()).collect();
let shared_status = Arc::new(RwLock::new(DaemonStatus::new(&share_names)));
let (cmd_tx, cmd_rx) = mpsc::channel::<SupervisorCmd>();
// Create SSE broadcast channel (supervisor → web clients)
let (sse_tx, _) = tokio::sync::broadcast::channel::<()>(16);
// Spawn the web UI server in a background thread
let _web_handle = crate::web::spawn_web_server(
Arc::clone(&shared_config),
Arc::clone(&shared_status),
cmd_tx.clone(),
config_path,
sse_tx.clone(),
);
// Also wire shutdown signal to the command channel
let shutdown_tx = cmd_tx;
let shutdown_for_cmd = Arc::clone(&shutdown);
thread::spawn(move || {
// Poll the AtomicBool and forward to cmd channel when set
loop {
if shutdown_for_cmd.load(Ordering::SeqCst) {
let _ = shutdown_tx.send(SupervisorCmd::Shutdown);
return;
}
thread::sleep(Duration::from_millis(200));
}
});
// Phase 1: Preflight — create dirs, write rclone.conf
info!("Preflight checks...");
preflight(config)?;
// Phase 1.5: Probe remote paths in parallel
info!("Probing remote paths...");
let healthy_names = probe_all_shares(config, &shared_status, &shutdown)?;
if healthy_names.is_empty() {
anyhow::bail!("All shares failed probe — no healthy mounts to start");
}
// Build a config containing only healthy shares for protocol configs
let mut healthy_config = config.clone();
healthy_config
.shares
.retain(|s| healthy_names.contains(&s.name));
// Phase 1.75: Generate protocol configs with only healthy shares
write_protocol_configs(&healthy_config)?;
// Phase 2: Start rclone mounts only for healthy shares
info!("Starting rclone mounts...");
let mut mount_children = start_and_wait_mounts(&healthy_config, &shutdown)?;
for share in &healthy_config.shares {
info!(" Mount ready at {}", share.mount_point.display());
}
// Update status: mounts are ready (match by name, not index)
{
let mut status = shared_status.write().unwrap();
for mc in &mount_children {
if let Some(ss) = status.shares.iter_mut().find(|s| s.name == mc.name) {
ss.mounted = true;
ss.rc_port = mc.rc_port;
}
}
}
// Phase 3: Start protocol services
if shutdown.load(Ordering::SeqCst) {
info!("Shutdown signal received during mount.");
for mc in &mut mount_children {
let _ = mc.child.kill();
let _ = mc.child.wait();
}
return Ok(());
}
info!("Starting protocol services...");
let mut protocols = start_protocols(&healthy_config)?;
// Update status: protocols running
{
let mut status = shared_status.write().unwrap();
status.smbd_running = protocols.smbd.is_some();
status.webdav_running = protocols.webdav.is_some();
status.nfs_exported = healthy_config.protocols.enable_nfs;
}
// Phase 3.5: Auto-warmup in background thread (non-blocking)
spawn_warmup(config, &shared_status, &shutdown);
// Phase 3.6: Dir-refresh background threads (non-blocking)
spawn_dir_refresh(config, &shared_status, &shutdown);
// Phase 4: Supervision loop with command channel
info!("Supervision active. Web UI at http://localhost:8090. Press Ctrl+C to stop.");
let result = supervise(
&shared_config,
&shared_status,
&cmd_rx,
&mut mount_children,
&mut protocols,
Arc::clone(&shutdown),
&sse_tx,
);
// Phase 5: Teardown (always runs)
info!("Shutting down...");
let config = shared_config.read().unwrap().clone();
shutdown_services(&config, &mut mount_children, &mut protocols);
result
}
/// Spawn a background warmup thread for all configured warmup rules.
///
/// Increments the warmup generation counter so any previous warmup thread
/// will detect the change and exit. Each rule is processed sequentially
/// with progress reported into `shared_status.warmup`.
fn spawn_warmup(
config: &Config,
shared_status: &Arc<RwLock<DaemonStatus>>,
shutdown: &Arc<AtomicBool>,
) {
if config.warmup.rules.is_empty() || !config.warmup.auto {
// Clear stale warmup status when rules are removed or auto is disabled
let mut status = shared_status.write().unwrap();
status.warmup.clear();
return;
}
// Pre-populate warmup status entries and bump generation
let generation = {
let mut status = shared_status.write().unwrap();
status.warmup_generation += 1;
status.warmup = config
.warmup
.rules
.iter()
.map(|rule| WarmupRuleStatus {
share: rule.share.clone(),
path: rule.path.clone(),
newer_than: rule.newer_than.clone(),
state: WarmupRuleState::Pending,
total_files: 0,
skipped: 0,
cached: 0,
errors: 0,
})
.collect();
status.warmup_generation
};
let warmup_config = config.clone();
let warmup_status = Arc::clone(shared_status);
let warmup_shutdown = Arc::clone(shutdown);
thread::spawn(move || {
info!("Auto-warmup started (background, generation {generation})...");
for (i, rule) in warmup_config.warmup.rules.iter().enumerate() {
if warmup_shutdown.load(Ordering::SeqCst) {
info!("Auto-warmup interrupted by shutdown.");
break;
}
// Check if our generation is still current
{
let status = warmup_status.read().unwrap();
if status.warmup_generation != generation {
info!("Auto-warmup superseded by newer generation.");
return;
}
}
if let Err(e) = crate::cli::warmup::run_tracked(
&warmup_config,
&rule.share,
&rule.path,
rule.newer_than.as_deref(),
&warmup_status,
i,
generation,
&warmup_shutdown,
) {
warn!("Warmup warning: {e}");
}
}
info!("Auto-warmup complete.");
});
}
/// Spawn per-share background threads that periodically call `vfs/refresh` to
/// keep the rclone directory listing cache warm.
///
/// Bumps the dir-refresh generation counter so any previous threads detect
/// that they've been superseded and exit cleanly. Each share whose effective
/// interval is non-zero gets its own `ScheduledTask` thread.
fn spawn_dir_refresh(
config: &Config,
shared_status: &Arc<RwLock<DaemonStatus>>,
shutdown: &Arc<AtomicBool>,
) {
// Quick check: skip entirely if no share will actually refresh.
let any_active = config
.shares
.iter()
.any(|s| config.effective_dir_refresh_interval(s).is_some());
if !any_active {
return;
}
// Bump generation and clone the shared Arc<AtomicU64> for threads.
let gen_arc: Arc<AtomicU64> = {
let mut s = shared_status.write().unwrap();
s.dir_refresh_generation += 1;
s.dir_refresh_running = true;
let g = s.dir_refresh_generation;
s.dir_refresh_gen_arc.store(g, Ordering::SeqCst);
Arc::clone(&s.dir_refresh_gen_arc)
};
let generation = gen_arc.load(Ordering::SeqCst);
for (i, share) in config.shares.iter().enumerate() {
let interval = match config.effective_dir_refresh_interval(share) {
Some(d) => d,
None => continue,
};
let share_name = share.name.clone();
let mount_point = share.mount_point.clone();
let recursive = config.dir_refresh.recursive;
let rc_port = config.rc_port(i);
let status = Arc::clone(shared_status);
let gen_arc2 = Arc::clone(&gen_arc);
let sd = Arc::clone(shutdown);
info!(
" dir-refresh: scheduling '{}' every {}s",
share_name,
interval.as_secs()
);
ScheduledTask {
name: "dir-refresh",
interval,
}
.spawn(generation, gen_arc2, sd, move || {
// Enumerate top-level subdirectories by reading the FUSE mount point.
// The VFS root itself is not a valid vfs/refresh target in rclone.
let dirs: Vec<String> = std::fs::read_dir(&mount_point)
.with_context(|| format!("dir-refresh: failed to read mount point for '{share_name}'"))?
.filter_map(|entry| {
let entry = entry.ok()?;
if entry.file_type().ok()?.is_dir() {
entry.file_name().into_string().ok()
} else {
None
}
})
.collect();
if dirs.is_empty() {
tracing::warn!(share = %share_name, "dir-refresh: no subdirs found, skipping");
return Ok(());
}
let mut ok = 0usize;
let mut failed = 0usize;
for dir in &dirs {
match rc::vfs_refresh(rc_port, dir, recursive) {
Ok(()) => {
tracing::info!(share = %share_name, dir = %dir, "dir-refresh OK");
ok += 1;
}
Err(e) => {
tracing::warn!(share = %share_name, dir = %dir, error = %e, "dir-refresh failed");
failed += 1;
}
}
}
tracing::info!(
share = %share_name,
dirs_ok = ok,
dirs_failed = failed,
"dir-refresh cycle complete"
);
let mut s = status.write().unwrap();
s.last_dir_refresh.insert(share_name.clone(), SystemTime::now());
s.dir_refresh_dirs_ok.insert(share_name.clone(), ok);
s.dir_refresh_dirs_failed.insert(share_name.clone(), failed);
Ok(())
});
}
}
/// Write rclone config and create directories (protocol configs generated after probe).
fn preflight(config: &Config) -> Result<()> {
// Ensure mount points exist for each share
for share in &config.shares {
std::fs::create_dir_all(&share.mount_point).with_context(|| {
format!(
"Failed to create mount point: {}",
share.mount_point.display()
)
})?;
}
// Ensure cache directory exists
std::fs::create_dir_all(&config.cache.dir).with_context(|| {
format!(
"Failed to create cache dir: {}",
config.cache.dir.display()
)
})?;
// Generate rclone config
crate::rclone::config::write_config(config)?;
Ok(())
}
/// Generate protocol configs (SMB/NFS) for the given config.
///
/// Called after probing so only healthy shares are included.
fn write_protocol_configs(config: &Config) -> Result<()> {
if config.protocols.enable_smb {
samba::write_config(config)?;
if config.smb_auth.enabled {
samba::setup_user(config)?;
}
}
if config.protocols.enable_nfs {
nfs::write_config(config)?;
}
Ok(())
}
/// Probe all shares in parallel and return the set of healthy share names.
///
/// Updates `shared_status` with probe results as they complete.
fn probe_all_shares(
config: &Config,
shared_status: &Arc<RwLock<DaemonStatus>>,
shutdown: &AtomicBool,
) -> Result<Vec<String>> {
use std::collections::HashSet;
let shares: Vec<_> = config.shares.clone();
let config_clone = config.clone();
// Mark all shares as Probing
{
let mut status = shared_status.write().unwrap();
for ss in &mut status.shares {
ss.health = ShareHealth::Probing;
}
}
// Spawn one thread per share
let handles: Vec<_> = shares
.into_iter()
.map(|share| {
let cfg = config_clone.clone();
let name = share.name.clone();
thread::spawn(move || {
let result = crate::rclone::probe::probe_remote_path(&cfg, &share);
(name, result)
})
})
.collect();
// Collect results
let mut healthy = HashSet::new();
for handle in handles {
if shutdown.load(Ordering::SeqCst) {
anyhow::bail!("Interrupted during probe");
}
match handle.join() {
Ok((name, Ok(()))) => {
info!(" Probe OK: {name}");
let mut status = shared_status.write().unwrap();
if let Some(ss) = status.shares.iter_mut().find(|s| s.name == name) {
ss.health = ShareHealth::Healthy;
}
healthy.insert(name);
}
Ok((name, Err(e))) => {
let msg = format!("{e}");
error!(" Probe FAILED: {name} — {msg}");
let mut status = shared_status.write().unwrap();
if let Some(ss) = status.shares.iter_mut().find(|s| s.name == name) {
ss.health = ShareHealth::Failed(msg);
}
}
Err(_) => {
error!(" Probe thread panicked");
}
}
}
Ok(healthy.into_iter().collect())
}
/// Spawn rclone mount processes for all shares and poll until each FUSE mount appears.
fn start_and_wait_mounts(config: &Config, shutdown: &AtomicBool) -> Result<Vec<MountChild>> {
let mut children = Vec::new();
for (i, share) in config.shares.iter().enumerate() {
let rc_port = config.rc_port(i);
let args = build_mount_args(config, share, rc_port);
let child = Command::new("rclone")
.args(&args)
.process_group(0)
.spawn()
.with_context(|| format!("Failed to spawn rclone mount for share '{}'", share.name))?;
children.push(MountChild {
name: share.name.clone(),
child,
rc_port,
});
}
// Poll for all mounts to become ready
let deadline = Instant::now() + MOUNT_TIMEOUT;
let mut ready = vec![false; config.shares.len()];
loop {
if shutdown.load(Ordering::SeqCst) {
for mc in &mut children {
let _ = mc.child.kill();
let _ = mc.child.wait();
}
anyhow::bail!("Interrupted while waiting for mounts");
}
if Instant::now() > deadline {
for mc in &mut children {
let _ = mc.child.kill();
let _ = mc.child.wait();
}
let pending: Vec<&str> = config.shares.iter()
.zip(ready.iter())
.filter(|(_, r)| !**r)
.map(|(s, _)| s.name.as_str())
.collect();
anyhow::bail!(
"Timed out waiting for mounts ({}s). Still pending: {}",
MOUNT_TIMEOUT.as_secs(),
pending.join(", ")
);
}
// Check for early exits
for (i, mc) in children.iter_mut().enumerate() {
if ready[i] {
continue;
}
match mc.child.try_wait() {
Ok(Some(status)) => {
anyhow::bail!(
"rclone mount for '{}' exited immediately ({status}). Check remote/auth config.",
mc.name
);
}
Ok(None) => {}
Err(e) => {
anyhow::bail!("Failed to check rclone mount status for '{}': {e}", mc.name);
}
}
}
// Check mount readiness
let mut all_ready = true;
for (i, share) in config.shares.iter().enumerate() {
if ready[i] {
continue;
}
match is_mounted(&share.mount_point) {
Ok(true) => ready[i] = true,
Ok(false) => all_ready = false,
Err(e) => {
warn!("Warning: mount check failed for '{}': {e}", share.name);
all_ready = false;
}
}
}
if all_ready {
break;
}
thread::sleep(Duration::from_millis(500));
}
Ok(children)
}
/// Spawn smbd as a foreground child process.
fn spawn_smbd() -> Result<Child> {
Command::new("smbd")
.args(["--foreground", "--debug-stdout", "--no-process-group",
"--configfile", samba::SMB_CONF_PATH])
.process_group(0)
.spawn()
.context("Failed to spawn smbd")
}
/// Start protocol services after the mount is ready.
fn start_protocols(config: &Config) -> Result<ProtocolChildren> {
let smbd = if config.protocols.enable_smb {
let child = spawn_smbd()?;
info!(" SMB: started");
Some(child)
} else {
None
};
if config.protocols.enable_nfs {
let status = Command::new("exportfs")
.arg("-ra")
.status()
.context("Failed to run exportfs -ra")?;
if !status.success() {
anyhow::bail!("exportfs -ra failed: {status}");
}
info!(" NFS: exported");
}
let webdav = if config.protocols.enable_webdav {
let child = spawn_webdav(config)?;
info!(" WebDAV: started");
Some(child)
} else {
None
};
Ok(ProtocolChildren { smbd, webdav })
}
/// Spawn a `rclone serve webdav` child process.
fn spawn_webdav(config: &Config) -> Result<Child> {
let args = webdav::build_serve_args(config);
Command::new("rclone")
.args(&args)
.process_group(0)
.spawn()
.context("Failed to spawn rclone serve webdav")
}
/// Main supervision loop with command channel.
///
/// Uses `recv_timeout` on the command channel so it can both respond to
/// commands from the web UI and poll child processes every POLL_INTERVAL.
///
/// - If any rclone mount dies → full shutdown (data safety).
/// - If smbd/WebDAV dies → restart up to 3 times.
fn supervise(
shared_config: &Arc<RwLock<Config>>,
shared_status: &Arc<RwLock<DaemonStatus>>,
cmd_rx: &mpsc::Receiver<SupervisorCmd>,
mounts: &mut Vec<MountChild>,
protocols: &mut ProtocolChildren,
shutdown: Arc<AtomicBool>,
sse_tx: &tokio::sync::broadcast::Sender<()>,
) -> Result<()> {
let mut smbd_tracker = RestartTracker::new();
let mut webdav_tracker = RestartTracker::new();
let mut prev_states: HashMap<String, SharePrevState> = HashMap::new();
let mut last_stats_snapshot = Instant::now();
loop {
// Check for commands (non-blocking with timeout = POLL_INTERVAL)
match cmd_rx.recv_timeout(POLL_INTERVAL) {
Ok(SupervisorCmd::Shutdown) => {
info!("Shutdown command received.");
return Ok(());
}
Ok(SupervisorCmd::BwLimit { up, down }) => {
info!(bw_limit_up = %up, bw_limit_down = %down, "bandwidth limit applied");
apply_bwlimit(mounts, &up, &down);
}
Ok(SupervisorCmd::Reload(new_config)) => {
info!("Config reload requested...");
handle_reload(
shared_config,
shared_status,
mounts,
protocols,
&mut smbd_tracker,
&mut webdav_tracker,
new_config,
&shutdown,
)?;
info!("Config reload complete.");
}
Err(RecvTimeoutError::Timeout) => {} // normal poll cycle
Err(RecvTimeoutError::Disconnected) => {
info!("Command channel disconnected, shutting down.");
return Ok(());
}
}
// Check for shutdown signal
if shutdown.load(Ordering::SeqCst) {
info!("Shutdown signal received.");
return Ok(());
}
// Check all rclone mount processes
for mc in mounts.iter_mut() {
match mc.child.try_wait() {
Ok(Some(status)) => {
anyhow::bail!(
"rclone mount for '{}' exited unexpectedly ({}). Initiating full shutdown for data safety.",
mc.name,
status
);
}
Ok(None) => {}
Err(e) => {
anyhow::bail!("Failed to check rclone mount status for '{}': {e}", mc.name);
}
}
}
// Check smbd process (if enabled)
if let Some(child) = &mut protocols.smbd {
match child.try_wait() {
Ok(Some(status)) => {
warn!("smbd exited ({status}).");
if smbd_tracker.can_restart() {
smbd_tracker.record_restart();
let delay = smbd_tracker.count * 2;
warn!(
"Restarting smbd in {delay}s ({}/{MAX_RESTARTS})...",
smbd_tracker.count,
);
thread::sleep(Duration::from_secs(delay.into()));
match spawn_smbd() {
Ok(new_child) => *child = new_child,
Err(e) => {
error!("Failed to restart smbd: {e}");
protocols.smbd = None;
}
}
} else {
error!(
"smbd exceeded max restarts ({MAX_RESTARTS}), giving up."
);
protocols.smbd = None;
}
}
Ok(None) => {}
Err(e) => warn!("Warning: failed to check smbd status: {e}"),
}
}
// Check WebDAV process (if enabled)
let config = shared_config.read().unwrap().clone();
if let Some(child) = &mut protocols.webdav {
match child.try_wait() {
Ok(Some(status)) => {
warn!("WebDAV exited ({status}).");
if webdav_tracker.can_restart() {
webdav_tracker.record_restart();
let delay = webdav_tracker.count * 2;
warn!(
"Restarting WebDAV in {delay}s ({}/{MAX_RESTARTS})...",
webdav_tracker.count,
);
thread::sleep(Duration::from_secs(delay.into()));
match spawn_webdav(&config) {
Ok(new_child) => *child = new_child,
Err(e) => {
error!("Failed to restart WebDAV: {e}");
protocols.webdav = None;
}
}
} else {
error!(
"WebDAV exceeded max restarts ({MAX_RESTARTS}), giving up."
);
protocols.webdav = None;
}
}
Ok(None) => {}
Err(e) => warn!("Warning: failed to check WebDAV status: {e}"),
}
}
// Update shared status with fresh RC stats
update_status(shared_status, mounts, protocols, &config);
// Log cache state changes and periodic snapshots
log_cache_events(shared_status, &config, &mut prev_states, &mut last_stats_snapshot);
// Notify SSE subscribers that status was refreshed
let _ = sse_tx.send(());
}
}
/// Poll RC API for each share and update the shared DaemonStatus.
///
/// Matches mounts to status entries by name (not index) so the mapping
/// stays correct after dynamic PerShare add/remove/modify reloads.
///
/// Uses a two-phase approach to avoid holding the write lock during HTTP IO:
/// Phase 1 collects all data without any lock; Phase 2 applies it under a
/// short-lived write lock (pure memory writes, no IO).
fn update_status(
shared_status: &Arc<RwLock<DaemonStatus>>,
mounts: &[MountChild],
protocols: &ProtocolChildren,
config: &Config,
) {
struct ShareSnapshot {
name: String,
rc_port: u16,
mounted: bool,
cache_bytes: u64,
dirty_count: u64,
errored_files: u64,
speed: f64,
transfers: u64,
errors: u64,
}
// Phase 1: collect all data WITHOUT holding any lock.
let snapshots: Vec<ShareSnapshot> = mounts
.iter()
.map(|mc| {
let mount_point = config
.shares
.iter()
.find(|s| s.name == mc.name)
.map(|s| s.mount_point.clone())
.unwrap_or_default();
let mounted = is_mounted(&mount_point).unwrap_or(false);
let (cache_bytes, dirty_count, errored_files) = rc::vfs_stats(mc.rc_port)
.ok()
.and_then(|v| v.disk_cache)
.map(|dc| (dc.bytes_used, dc.uploads_in_progress + dc.uploads_queued, dc.errored_files))
.unwrap_or((0, 0, 0));
let (speed, transfers, errors) = rc::core_stats(mc.rc_port)
.map(|core| {
let active = core.transferring.len() as u64;
(if active > 0 { core.speed } else { 0.0 }, active, core.errors)
})
.unwrap_or((0.0, 0, 0));
ShareSnapshot {
name: mc.name.clone(),
rc_port: mc.rc_port,
mounted,
cache_bytes,
dirty_count,
errored_files,
speed,
transfers,
errors,
}
})
.collect();
// Phase 2: apply collected data under write lock — no IO here.
let mut status = shared_status.write().unwrap();
for snap in snapshots {
if let Some(ss) = status.shares.iter_mut().find(|s| s.name == snap.name) {
ss.mounted = snap.mounted;
ss.rc_port = snap.rc_port;
ss.cache_bytes = snap.cache_bytes;
ss.dirty_count = snap.dirty_count;
ss.errored_files = snap.errored_files;
ss.speed = snap.speed;
ss.transfers = snap.transfers;
ss.errors = snap.errors;
}
}
status.smbd_running = protocols.smbd.is_some();
status.webdav_running = protocols.webdav.is_some();
status.nfs_exported = config.protocols.enable_nfs;
}
/// Apply bandwidth limits to all rclone mounts via RC API (Tier A — no restart).
fn apply_bwlimit(mounts: &[MountChild], up: &str, down: &str) {
for mc in mounts {
match rc::bwlimit(mc.rc_port, Some(up), Some(down)) {
Ok(_) => info!(" bwlimit applied to '{}'", mc.name),
Err(e) => warn!(" bwlimit failed for '{}': {e}", mc.name),
}
}
}
/// Handle a config reload using the tiered change strategy.
fn handle_reload(
shared_config: &Arc<RwLock<Config>>,
shared_status: &Arc<RwLock<DaemonStatus>>,
mounts: &mut Vec<MountChild>,
protocols: &mut ProtocolChildren,
smbd_tracker: &mut RestartTracker,
webdav_tracker: &mut RestartTracker,
new_config: Config,
shutdown: &Arc<AtomicBool>,
) -> Result<()> {
let old_config = shared_config.read().unwrap().clone();
let diff = config_diff::diff(&old_config, &new_config);
if diff.is_empty() {
info!(" No changes detected.");
return Ok(());
}
info!(" Changes: {}", diff.summary());
match diff.highest_tier() {
ChangeTier::None => {}
ChangeTier::Live => {
// Tier A: bandwidth only — RC API call, no restart
info!(" Tier A: applying bandwidth limits via RC API...");
apply_bwlimit(mounts, &new_config.bandwidth.limit_up, &new_config.bandwidth.limit_down);
}
ChangeTier::Protocol => {
// Tier B: protocol-only changes — regen configs, restart protocol services
// Also apply bandwidth if changed
if diff.bandwidth_changed {
apply_bwlimit(mounts, &new_config.bandwidth.limit_up, &new_config.bandwidth.limit_down);
}
info!(" Tier B: restarting protocol services...");
restart_protocols(protocols, smbd_tracker, webdav_tracker, &new_config)?;
}
ChangeTier::PerShare => {
// Tier C: per-share changes — drain affected, unmount, remount
if diff.bandwidth_changed {
apply_bwlimit(mounts, &new_config.bandwidth.limit_up, &new_config.bandwidth.limit_down);
}
// Regenerate rclone.conf if connections changed
if !diff.connections_added.is_empty()
|| !diff.connections_removed.is_empty()
|| !diff.connections_modified.is_empty()
{
info!(" Regenerating rclone.conf (connections changed)...");
crate::rclone::config::write_config(&new_config)?;
}
// Handle removed shares: drain → unmount → kill
for name in &diff.shares_removed {
info!(" Removing share '{name}'...");
if let Some(idx) = mounts.iter().position(|mc| mc.name == *name) {
let mc = &mounts[idx];
wait_writeback_drain(mc.rc_port);
unmount_share(&old_config, &mc.name);
let mut mc = mounts.remove(idx);
graceful_kill(&mut mc.child);
}
}
// Handle modified shares: treat as remove + add
for name in &diff.shares_modified {
info!(" Restarting modified share '{name}'...");
// Remove old
if let Some(idx) = mounts.iter().position(|mc| mc.name == *name) {
let mc = &mounts[idx];
wait_writeback_drain(mc.rc_port);
unmount_share(&old_config, &mc.name);
let mut mc = mounts.remove(idx);
graceful_kill(&mut mc.child);
}
// Add new
if let Some((i, share)) = new_config.shares.iter().enumerate().find(|(_, s)| s.name == *name) {
let rc_port = new_config.rc_port(i);
if let Ok(mc) = spawn_mount(&new_config, share, rc_port) {
mounts.push(mc);
}
}
}
// Handle added shares: spawn new mount
for name in &diff.shares_added {
info!(" Adding share '{name}'...");
if let Some((i, share)) = new_config.shares.iter().enumerate().find(|(_, s)| s.name == *name) {
let rc_port = new_config.rc_port(i);
std::fs::create_dir_all(&share.mount_point).ok();
if let Ok(mc) = spawn_mount(&new_config, share, rc_port) {
mounts.push(mc);
}
}
}
// Update protocol configs to reflect share changes
if diff.protocols_changed {
// Protocol settings changed too — full restart needed
restart_protocols(protocols, smbd_tracker, webdav_tracker, &new_config)?;
} else if !diff.shares_removed.is_empty() || !diff.shares_added.is_empty() || !diff.shares_modified.is_empty() {
// Only shares changed — live reload is sufficient
reload_protocol_configs(protocols, &new_config)?;
}
}
ChangeTier::Global => {
// Tier D: global restart — drain all → stop everything → restart
info!(" Tier D: full restart (global settings changed)...");
// Drain all write-back queues
for mc in mounts.iter() {
wait_writeback_drain(mc.rc_port);
}
// Stop all protocol services
stop_protocols(protocols, &old_config);
// Unmount and kill all rclone instances
for mc in mounts.iter_mut() {
unmount_share(&old_config, &mc.name);
graceful_kill(&mut mc.child);
}
mounts.clear();
// Re-preflight with new config
preflight(&new_config)?;
// Re-probe all shares
let shutdown_flag = AtomicBool::new(false);
let healthy_names =
probe_all_shares(&new_config, shared_status, &shutdown_flag)?;
// Build healthy-only config for mounts and protocols
let mut healthy_config = new_config.clone();
healthy_config
.shares
.retain(|s| healthy_names.contains(&s.name));
write_protocol_configs(&healthy_config)?;
// Re-start mounts (healthy only)
let mut new_mounts = start_and_wait_mounts(&healthy_config, &shutdown_flag)?;
mounts.append(&mut new_mounts);
// Re-start protocols
let new_protocols = start_protocols(&healthy_config)?;
// Replace old protocol children (Drop will handle any leftover)
*protocols = new_protocols;
*smbd_tracker = RestartTracker::new();
*webdav_tracker = RestartTracker::new();
}
}
// Update shared config
{
let mut cfg = shared_config.write().unwrap();
*cfg = new_config.clone();
}
// Update shared status with new share list
{
let mut status = shared_status.write().unwrap();
let new_shares: Vec<crate::daemon::ShareStatus> = new_config
.shares
.iter()
.enumerate()
.map(|(i, s)| {
// Preserve existing stats if share still exists
let existing = status.shares.iter().find(|ss| ss.name == s.name);
crate::daemon::ShareStatus {
name: s.name.clone(),
mounted: existing.map(|e| e.mounted).unwrap_or(false),
rc_port: new_config.rc_port(i),
cache_bytes: existing.map(|e| e.cache_bytes).unwrap_or(0),
dirty_count: existing.map(|e| e.dirty_count).unwrap_or(0),
errored_files: existing.map(|e| e.errored_files).unwrap_or(0),
speed: existing.map(|e| e.speed).unwrap_or(0.0),
transfers: existing.map(|e| e.transfers).unwrap_or(0),
errors: existing.map(|e| e.errors).unwrap_or(0),
health: existing
.map(|e| e.health.clone())
.unwrap_or_else(|| {
// New share: if mount succeeded, it's healthy
if mounts.iter().any(|mc| mc.name == s.name) {
ShareHealth::Healthy
} else {
ShareHealth::Pending
}
}),
}
})
.collect();
status.shares = new_shares;
status.smbd_running = protocols.smbd.is_some();
status.webdav_running = protocols.webdav.is_some();
status.nfs_exported = new_config.protocols.enable_nfs;
}
// Re-trigger warmup if settings changed
if diff.warmup_changed {
info!(" Warmup settings changed, re-triggering...");
spawn_warmup(&new_config, shared_status, shutdown);
}
// Re-trigger dir-refresh if settings changed
if diff.dir_refresh_changed {
info!(" Dir-refresh settings changed, re-triggering...");
spawn_dir_refresh(&new_config, shared_status, shutdown);
}
Ok(())
}
/// Spawn a single rclone mount for a share.
fn spawn_mount(config: &Config, share: &crate::config::ShareConfig, rc_port: u16) -> Result<MountChild> {
let args = build_mount_args(config, share, rc_port);
let child = Command::new("rclone")
.args(&args)
.process_group(0)
.spawn()
.with_context(|| format!("Failed to spawn rclone mount for share '{}'", share.name))?;
// Wait for mount to appear
let deadline = Instant::now() + MOUNT_TIMEOUT;
loop {
if Instant::now() > deadline {
anyhow::bail!("Timed out waiting for mount '{}'", share.name);
}
match is_mounted(&share.mount_point) {
Ok(true) => break,
_ => thread::sleep(Duration::from_millis(500)),
}
}
info!(" Mount ready: {} at {}", share.name, share.mount_point.display());
Ok(MountChild {
name: share.name.clone(),
child,
rc_port,
})
}
/// Unmount a single share's FUSE mount.
fn unmount_share(config: &Config, share_name: &str) {
if let Some(share) = config.find_share(share_name) {
if is_mounted(&share.mount_point).unwrap_or(false) {
let mp = share.mount_point.display().to_string();
let unmounted = Command::new("fusermount3")
.args(["-uz", &mp])
.status()
.map(|s| s.success())
.unwrap_or(false);
if !unmounted {
let _ = Command::new("fusermount")
.args(["-uz", &mp])
.status();
}
}
}
}
/// Stop protocol services only (without touching mounts).
fn stop_protocols(protocols: &mut ProtocolChildren, config: &Config) {
if let Some(child) = &mut protocols.smbd {
graceful_kill(child);
info!(" SMB: stopped");
}
protocols.smbd = None;
if config.protocols.enable_nfs {
let _ = Command::new("exportfs").arg("-ua").status();
info!(" NFS: unexported");
}
if let Some(child) = &mut protocols.webdav {
graceful_kill(child);
info!(" WebDAV: stopped");
}
protocols.webdav = None;
}
/// Reload protocol configs without full restart (share add/remove/modify).
///
/// Writes updated smb.conf / NFS exports, then signals the running services
/// to re-read them:
/// - smbd: SIGHUP causes it to reload smb.conf (new shares appear, removed
/// shares disappear for new connections).
/// - NFS: `exportfs -ra` re-reads the exports file.
/// - WebDAV: no action needed (serves from FUSE mount directly).
fn reload_protocol_configs(protocols: &ProtocolChildren, config: &Config) -> Result<()> {
if config.protocols.enable_smb {
samba::write_config(config)?;
if let Some(child) = &protocols.smbd {
let pid = child.id() as i32;
// SAFETY: sending SIGHUP to a known child PID is safe.
unsafe { libc::kill(pid, libc::SIGHUP) };
info!(" SMB: config reloaded (SIGHUP)");
}
}
if config.protocols.enable_nfs {
nfs::write_config(config)?;
let _ = Command::new("exportfs").arg("-ra").status();
info!(" NFS: re-exported");
}
Ok(())
}
/// Restart protocol services (Tier B). Regen configs and restart smbd/NFS/WebDAV.
fn restart_protocols(
protocols: &mut ProtocolChildren,
smbd_tracker: &mut RestartTracker,
webdav_tracker: &mut RestartTracker,
config: &Config,
) -> Result<()> {
// Stop existing
stop_protocols(protocols, config);
// Regenerate configs
if config.protocols.enable_smb {
samba::write_config(config)?;
if config.smb_auth.enabled {
samba::setup_user(config)?;
}
}
if config.protocols.enable_nfs {
nfs::write_config(config)?;
}
// Start fresh
let new_protocols = start_protocols(config)?;
*protocols = new_protocols;
*smbd_tracker = RestartTracker::new();
*webdav_tracker = RestartTracker::new();
Ok(())
}
/// Send SIGTERM to the entire process group, wait up to `SIGTERM_GRACE`,
/// then SIGKILL if still alive.
///
/// All children are spawned with `.process_group(0)` so the child PID equals
/// the process group ID. Using `-pid` ensures forked workers (e.g. smbd
/// per-client forks) are also terminated — otherwise orphaned workers hold
/// the listening socket and prevent the new process from binding.
fn graceful_kill(child: &mut Child) {
let pid = child.id() as i32;
// SAFETY: sending a signal to a known child process group is safe.
unsafe { libc::kill(-pid, libc::SIGTERM) };
let deadline = Instant::now() + SIGTERM_GRACE;
loop {
match child.try_wait() {
Ok(Some(_)) => return,
Ok(None) => {}
Err(_) => break,
}
if Instant::now() > deadline {
break;
}
thread::sleep(Duration::from_millis(100));
}
// Escalate: SIGKILL the entire process group
unsafe { libc::kill(-pid, libc::SIGKILL) };
let _ = child.wait();
}
/// Wait for rclone VFS write-back queue to drain on a specific RC port.
fn wait_writeback_drain(port: u16) {
let deadline = Instant::now() + WRITEBACK_DRAIN_TIMEOUT;
let mut first = true;
loop {
match rc::vfs_stats(port) {
Ok(vfs) => {
if let Some(dc) = &vfs.disk_cache {
let pending = dc.uploads_in_progress + dc.uploads_queued;
if pending == 0 {
if !first {
info!(" Write-back queue drained.");
}
return;
}
if first {
info!(
" Waiting for write-back queue ({pending} files pending)..."
);
first = false;
} else {
info!(" Write-back: {pending} files remaining...");
}
} else {
return;
}
}
Err(_) => return,
}
if Instant::now() > deadline {
warn!(
" Warning: write-back drain timed out after {}s, proceeding.",
WRITEBACK_DRAIN_TIMEOUT.as_secs()
);
return;
}
thread::sleep(WRITEBACK_POLL_INTERVAL);
}
}
/// Reverse-order teardown of all services.
fn shutdown_services(config: &Config, mounts: &mut Vec<MountChild>, protocols: &mut ProtocolChildren) {
// Stop protocol services
stop_protocols(protocols, config);
// Wait for write-back queues to drain on each mount
for mc in mounts.iter() {
wait_writeback_drain(mc.rc_port);
}
// Lazy unmount each share's FUSE mount
for share in &config.shares {
if is_mounted(&share.mount_point).unwrap_or(false) {
let mp = share.mount_point.display().to_string();
let unmounted = Command::new("fusermount3")
.args(["-uz", &mp])
.status()
.map(|s| s.success())
.unwrap_or(false);
if !unmounted {
let _ = Command::new("fusermount")
.args(["-uz", &mp])
.status();
}
}
}
info!(" FUSE: unmounted");
// Gracefully stop all rclone mount processes
for mc in mounts.iter_mut() {
graceful_kill(&mut mc.child);
}
info!(" rclone: stopped");
}
/// Detect cache state changes and emit structured log events; also emit
/// periodic stats snapshots. Called after every `update_status()` cycle.
fn log_cache_events(
shared_status: &Arc<RwLock<DaemonStatus>>,
config: &Config,
prev_states: &mut HashMap<String, SharePrevState>,
last_snapshot: &mut Instant,
) {
let status = shared_status.read().unwrap();
let max_bytes = parse_size_bytes(&config.cache.max_size);
let emit_snapshot = last_snapshot.elapsed() >= STATS_SNAPSHOT_INTERVAL;
for ss in &status.shares {
if !ss.mounted {
continue;
}
let now_active = ss.speed > SPEED_ACTIVE_THRESHOLD;
let now_warn_level = if let Some(max) = max_bytes {
let frac = ss.cache_bytes as f64 / max as f64;
if frac >= CACHE_CRITICAL_THRESHOLD {
2
} else if frac >= CACHE_WARN_THRESHOLD {
1
} else {
0
}
} else {
0
};
let prev = prev_states.entry(ss.name.clone()).or_insert(SharePrevState {
dirty_count: ss.dirty_count,
errored_files: ss.errored_files,
cache_bytes: ss.cache_bytes,
is_active: now_active,
cache_warn_level: now_warn_level,
});
// dirty_count change
if ss.dirty_count != prev.dirty_count {
info!(share = %ss.name, dirty_count = ss.dirty_count,
prev = prev.dirty_count, "cache dirty_count changed");
}
// errored_files change
if ss.errored_files != prev.errored_files {
if ss.errored_files > prev.errored_files {
warn!(share = %ss.name, errored_files = ss.errored_files,
prev = prev.errored_files, "cache errored_files increased");
} else {
info!(share = %ss.name, errored_files = ss.errored_files,
prev = prev.errored_files, "cache errored_files cleared");
}
}
// cache_bytes change >10%
if prev.cache_bytes == 0 && ss.cache_bytes > 0 {
info!(share = %ss.name, cache_bytes = ss.cache_bytes, "cache population started");
} else if prev.cache_bytes > 0 {
let delta = (ss.cache_bytes as f64 - prev.cache_bytes as f64).abs()
/ prev.cache_bytes as f64;
if delta > 0.10 {
info!(share = %ss.name, cache_bytes = ss.cache_bytes,
prev = prev.cache_bytes, "cache size changed >10%");
}
}
// transfer idle/active transition
if now_active != prev.is_active {
if now_active {
info!(share = %ss.name, speed_bps = ss.speed as u64, "transfer became active");
} else {
info!(share = %ss.name, "transfer became idle");
}
}
// cache warn level change or periodic snapshot
if now_warn_level != prev.cache_warn_level || emit_snapshot {
if now_warn_level == 2 {
if let Some(max) = max_bytes {
warn!(share = %ss.name, cache_bytes = ss.cache_bytes, cache_max = max,
"cache critically full (>=95%)");
}
} else if now_warn_level == 1 {
if let Some(max) = max_bytes {
warn!(share = %ss.name, cache_bytes = ss.cache_bytes, cache_max = max,
"cache nearly full (>=80%)");
}
}
}
// periodic stats snapshot
if emit_snapshot {
info!(share = %ss.name,
cache_bytes = ss.cache_bytes, dirty_count = ss.dirty_count,
errored_files = ss.errored_files, speed_bps = ss.speed as u64,
transfers = ss.transfers, errors = ss.errors,
"stats snapshot");
}
prev.dirty_count = ss.dirty_count;
prev.errored_files = ss.errored_files;
prev.cache_bytes = ss.cache_bytes;
prev.is_active = now_active;
prev.cache_warn_level = now_warn_level;
}
if emit_snapshot {
*last_snapshot = Instant::now();
}
}
/// Parse a human-readable size string (e.g. "200G", "1.5T", "512M") into bytes.
fn parse_size_bytes(s: &str) -> Option<u64> {
let s = s.trim();
let (num_part, suffix) = s
.find(|c: char| c.is_alphabetic())
.map(|i| s.split_at(i))
.unwrap_or((s, ""));
let n: f64 = num_part.trim().parse().ok()?;
let mult: f64 = match suffix.to_uppercase().trim_end_matches('B') {
"" => 1.0,
"K" => 1024.0,
"M" => 1024.0_f64.powi(2),
"G" => 1024.0_f64.powi(3),
"T" => 1024.0_f64.powi(4),
_ => return None,
};
Some((n * mult) as u64)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_restart_tracker_new() {
let tracker = RestartTracker::new();
assert_eq!(tracker.count, 0);
assert!(tracker.last_restart.is_none());
}
#[test]
fn test_restart_tracker_record_restart() {
let mut tracker = RestartTracker::new();
tracker.record_restart();
assert_eq!(tracker.count, 1);
assert!(tracker.last_restart.is_some());
}
#[test]
fn test_restart_tracker_can_restart_under_max() {
let mut tracker = RestartTracker::new();
assert!(tracker.can_restart());
tracker.record_restart();
assert!(tracker.can_restart()); // count = 1
tracker.record_restart();
assert!(tracker.can_restart()); // count = 2
}
#[test]
fn test_restart_tracker_cannot_restart_at_max() {
let mut tracker = RestartTracker::new();
for _ in 0..MAX_RESTARTS {
tracker.record_restart();
}
assert!(!tracker.can_restart()); // count = 3 = MAX_RESTARTS
}
#[test]
fn test_restart_tracker_backoff_delay() {
let mut tracker = RestartTracker::new();
tracker.record_restart();
assert_eq!(tracker.count * 2, 2); // 2s delay
tracker.record_restart();
assert_eq!(tracker.count * 2, 4); // 4s delay
tracker.record_restart();
assert_eq!(tracker.count * 2, 6); // 6s delay
}
#[test]
fn test_restart_tracker_multiple_record() {
let mut tracker = RestartTracker::new();
tracker.record_restart();
tracker.record_restart();
tracker.record_restart();
assert_eq!(tracker.count, 3);
assert!(!tracker.can_restart());
}
#[test]
fn test_constants() {
assert_eq!(MOUNT_TIMEOUT, Duration::from_secs(30));
assert_eq!(POLL_INTERVAL, Duration::from_secs(2));
assert_eq!(SIGTERM_GRACE, Duration::from_secs(3));
assert_eq!(MAX_RESTARTS, 3);
assert_eq!(RESTART_STABLE_PERIOD, Duration::from_secs(300));
assert_eq!(WRITEBACK_DRAIN_TIMEOUT, Duration::from_secs(300));
assert_eq!(WRITEBACK_POLL_INTERVAL, Duration::from_secs(2));
}
#[test]
fn test_parse_size_bytes() {
assert_eq!(parse_size_bytes("200G"), Some(200 * 1024 * 1024 * 1024));
assert_eq!(parse_size_bytes("1T"), Some(1024 * 1024 * 1024 * 1024));
assert_eq!(parse_size_bytes("512M"), Some(512 * 1024 * 1024));
assert_eq!(parse_size_bytes("1024K"), Some(1024 * 1024));
assert_eq!(parse_size_bytes("1024"), Some(1024));
assert_eq!(parse_size_bytes("200GB"), Some(200 * 1024 * 1024 * 1024));
assert_eq!(parse_size_bytes("bogus"), None);
}
}