warpgate/src/supervisor.rs
grabbit 46e592c3a4 Flatten project structure: move warpgate/ contents to repo root
Single-crate project doesn't need a subdirectory. Moves Cargo.toml,
src/, templates/ to root for standard Rust project layout. Updates
.gitignore and test harness binary paths accordingly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 11:25:15 +08:00

582 lines
19 KiB
Rust

//! `warpgate run` — single-process supervisor for all services.
//!
//! Manages rclone mount + protocol services in one process tree with
//! coordinated startup and shutdown. Designed to run as a systemd unit
//! or standalone (Docker-friendly).
use std::os::unix::process::CommandExt;
use std::process::{Child, Command};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use crate::config::Config;
use crate::rclone::mount::{build_mount_args, is_mounted};
use crate::services::{nfs, samba, webdav};
/// Mount ready timeout.
const MOUNT_TIMEOUT: Duration = Duration::from_secs(30);
/// Supervision loop poll interval.
const POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Grace period for SIGTERM before escalating to SIGKILL.
const SIGTERM_GRACE: Duration = Duration::from_secs(3);
/// Max restart attempts before giving up on a protocol service.
const MAX_RESTARTS: u32 = 3;
/// Reset restart counter after this period of stable running.
const RESTART_STABLE_PERIOD: Duration = Duration::from_secs(300);
/// Max time to wait for write-back queue to drain on shutdown.
const WRITEBACK_DRAIN_TIMEOUT: Duration = Duration::from_secs(300);
/// Poll interval when waiting for write-back drain.
const WRITEBACK_POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Tracks restart attempts for a supervised child process.
struct RestartTracker {
count: u32,
last_restart: Option<Instant>,
}
impl RestartTracker {
fn new() -> Self {
Self {
count: 0,
last_restart: None,
}
}
/// Returns true if another restart is allowed. Resets counter if the
/// service has been stable for `RESTART_STABLE_PERIOD`.
fn can_restart(&mut self) -> bool {
if let Some(last) = self.last_restart
&& last.elapsed() >= RESTART_STABLE_PERIOD
{
self.count = 0;
}
self.count < MAX_RESTARTS
}
fn record_restart(&mut self) {
self.count += 1;
self.last_restart = Some(Instant::now());
}
}
/// Child processes for protocol servers managed by the supervisor.
///
/// Implements `Drop` to kill any spawned children — prevents orphaned
/// processes if startup fails partway through `start_protocols()`.
struct ProtocolChildren {
smbd: Option<Child>,
webdav: Option<Child>,
}
impl Drop for ProtocolChildren {
fn drop(&mut self) {
for child in [&mut self.smbd, &mut self.webdav].into_iter().flatten() {
graceful_kill(child);
}
}
}
/// Entry point — called from main.rs for `warpgate run`.
pub fn run(config: &Config) -> Result<()> {
let shutdown = Arc::new(AtomicBool::new(false));
// Install signal handler (SIGTERM + SIGINT)
let shutdown_flag = Arc::clone(&shutdown);
ctrlc::set_handler(move || {
eprintln!("Signal received, shutting down...");
shutdown_flag.store(true, Ordering::SeqCst);
})
.context("Failed to set signal handler")?;
// Phase 1: Preflight — generate configs, create dirs
println!("Preflight checks...");
preflight(config)?;
// Phase 2: Start rclone mount and wait for it to become ready
println!("Starting rclone mount...");
let mut mount_child = start_and_wait_mount(config, &shutdown)?;
println!("Mount ready at {}", config.mount.point.display());
// Phase 3: Start protocol services
if shutdown.load(Ordering::SeqCst) {
println!("Shutdown signal received during mount.");
let _ = mount_child.kill();
let _ = mount_child.wait();
return Ok(());
}
println!("Starting protocol services...");
let mut protocols = start_protocols(config)?;
// Phase 3.5: Auto-warmup (non-blocking, best-effort)
if !config.warmup.rules.is_empty() && config.warmup.auto {
println!("Running auto-warmup...");
for rule in &config.warmup.rules {
if shutdown.load(Ordering::SeqCst) {
break;
}
if let Err(e) =
crate::cli::warmup::run(config, &rule.path, rule.newer_than.as_deref())
{
eprintln!("Warmup warning: {e}");
}
}
}
// Phase 4: Supervision loop
println!("Supervision active. Press Ctrl+C to stop.");
let result = supervise(config, &mut mount_child, &mut protocols, Arc::clone(&shutdown));
// Phase 5: Teardown (always runs)
println!("Shutting down...");
shutdown_services(config, &mut mount_child, &mut protocols);
result
}
/// Write configs and create directories. Reuses existing modules.
fn preflight(config: &Config) -> Result<()> {
// Ensure mount point exists
std::fs::create_dir_all(&config.mount.point).with_context(|| {
format!(
"Failed to create mount point: {}",
config.mount.point.display()
)
})?;
// Ensure cache directory exists
std::fs::create_dir_all(&config.cache.dir).with_context(|| {
format!(
"Failed to create cache dir: {}",
config.cache.dir.display()
)
})?;
// Generate rclone config
crate::rclone::config::write_config(config)?;
// Generate protocol configs
if config.protocols.enable_smb {
samba::write_config(config)?;
}
if config.protocols.enable_nfs {
nfs::write_config(config)?;
}
Ok(())
}
/// Spawn rclone mount process and poll until the FUSE mount appears.
fn start_and_wait_mount(config: &Config, shutdown: &AtomicBool) -> Result<Child> {
let args = build_mount_args(config);
let mut child = Command::new("rclone")
.args(&args)
.process_group(0) // isolate from terminal SIGINT
.spawn()
.context("Failed to spawn rclone mount")?;
// Poll for mount readiness
let deadline = Instant::now() + MOUNT_TIMEOUT;
loop {
// Check for shutdown signal (e.g. Ctrl+C during mount wait)
if shutdown.load(Ordering::SeqCst) {
let _ = child.kill();
let _ = child.wait();
anyhow::bail!("Interrupted while waiting for mount");
}
if Instant::now() > deadline {
let _ = child.kill();
let _ = child.wait();
anyhow::bail!(
"Timed out waiting for mount at {} ({}s)",
config.mount.point.display(),
MOUNT_TIMEOUT.as_secs()
);
}
// Detect early rclone exit (e.g. bad config, auth failure)
match child.try_wait() {
Ok(Some(status)) => {
anyhow::bail!("rclone mount exited immediately ({status}). Check remote/auth config.");
}
Ok(None) => {} // still running, good
Err(e) => {
anyhow::bail!("Failed to check rclone mount status: {e}");
}
}
match is_mounted(config) {
Ok(true) => break,
Ok(false) => {}
Err(e) => eprintln!("Warning: mount check failed: {e}"),
}
thread::sleep(Duration::from_millis(500));
}
Ok(child)
}
/// Spawn smbd as a foreground child process.
fn spawn_smbd() -> Result<Child> {
Command::new("smbd")
.args(["--foreground", "--debug-stdout", "--no-process-group",
"--configfile", samba::SMB_CONF_PATH])
.process_group(0)
.spawn()
.context("Failed to spawn smbd")
}
/// Start protocol services after the mount is ready.
///
/// - SMB: spawn `smbd -F` as a child process
/// - NFS: `exportfs -ra`
/// - WebDAV: spawn `rclone serve webdav` as a child process
fn start_protocols(config: &Config) -> Result<ProtocolChildren> {
let smbd = if config.protocols.enable_smb {
let child = spawn_smbd()?;
println!(" SMB: started");
Some(child)
} else {
None
};
if config.protocols.enable_nfs {
let status = Command::new("exportfs")
.arg("-ra")
.status()
.context("Failed to run exportfs -ra")?;
if !status.success() {
anyhow::bail!("exportfs -ra failed: {status}");
}
println!(" NFS: exported");
}
let webdav = if config.protocols.enable_webdav {
let child = spawn_webdav(config)?;
println!(" WebDAV: started");
Some(child)
} else {
None
};
Ok(ProtocolChildren { smbd, webdav })
}
/// Spawn a `rclone serve webdav` child process.
fn spawn_webdav(config: &Config) -> Result<Child> {
let args = webdav::build_serve_args(config);
Command::new("rclone")
.args(&args)
.process_group(0)
.spawn()
.context("Failed to spawn rclone serve webdav")
}
/// Main supervision loop. Polls child processes every 2s.
///
/// - If rclone mount dies → full shutdown (data safety: dirty files may be in flight).
/// - If smbd/WebDAV dies → restart up to 3 times (counter resets after 5 min stable).
/// - Checks shutdown flag set by signal handler.
fn supervise(
config: &Config,
mount: &mut Child,
protocols: &mut ProtocolChildren,
shutdown: Arc<AtomicBool>,
) -> Result<()> {
let mut smbd_tracker = RestartTracker::new();
let mut webdav_tracker = RestartTracker::new();
loop {
// Check for shutdown signal
if shutdown.load(Ordering::SeqCst) {
println!("Shutdown signal received.");
return Ok(());
}
// Check rclone mount process
match mount.try_wait() {
Ok(Some(status)) => {
anyhow::bail!(
"rclone mount exited unexpectedly ({}). Initiating full shutdown for data safety.",
status
);
}
Ok(None) => {} // still running
Err(e) => {
anyhow::bail!("Failed to check rclone mount status: {e}");
}
}
// Check smbd process (if enabled)
if let Some(child) = &mut protocols.smbd {
match child.try_wait() {
Ok(Some(status)) => {
eprintln!("smbd exited ({status}).");
if smbd_tracker.can_restart() {
smbd_tracker.record_restart();
let delay = smbd_tracker.count * 2;
eprintln!(
"Restarting smbd in {delay}s ({}/{MAX_RESTARTS})...",
smbd_tracker.count,
);
thread::sleep(Duration::from_secs(delay.into()));
match spawn_smbd() {
Ok(new_child) => *child = new_child,
Err(e) => {
eprintln!("Failed to restart smbd: {e}");
protocols.smbd = None;
}
}
} else {
eprintln!(
"smbd exceeded max restarts ({MAX_RESTARTS}), giving up."
);
protocols.smbd = None;
}
}
Ok(None) => {} // still running
Err(e) => eprintln!("Warning: failed to check smbd status: {e}"),
}
}
// Check WebDAV process (if enabled)
if let Some(child) = &mut protocols.webdav {
match child.try_wait() {
Ok(Some(status)) => {
eprintln!("WebDAV exited ({status}).");
if webdav_tracker.can_restart() {
webdav_tracker.record_restart();
let delay = webdav_tracker.count * 2;
eprintln!(
"Restarting WebDAV in {delay}s ({}/{MAX_RESTARTS})...",
webdav_tracker.count,
);
thread::sleep(Duration::from_secs(delay.into()));
match spawn_webdav(config) {
Ok(new_child) => *child = new_child,
Err(e) => {
eprintln!("Failed to restart WebDAV: {e}");
protocols.webdav = None;
}
}
} else {
eprintln!(
"WebDAV exceeded max restarts ({MAX_RESTARTS}), giving up."
);
protocols.webdav = None;
}
}
Ok(None) => {} // still running
Err(e) => eprintln!("Warning: failed to check WebDAV status: {e}"),
}
}
thread::sleep(POLL_INTERVAL);
}
}
/// Send SIGTERM, wait up to `SIGTERM_GRACE`, then SIGKILL if still alive.
///
/// smbd forks worker processes per client connection — SIGTERM lets
/// the parent signal its children to exit cleanly. SIGKILL would
/// orphan those workers.
fn graceful_kill(child: &mut Child) {
let pid = child.id() as i32;
// SAFETY: sending a signal to a known child PID is safe.
unsafe { libc::kill(pid, libc::SIGTERM) };
let deadline = Instant::now() + SIGTERM_GRACE;
loop {
match child.try_wait() {
Ok(Some(_)) => return, // exited cleanly
Ok(None) => {}
Err(_) => break,
}
if Instant::now() > deadline {
break;
}
thread::sleep(Duration::from_millis(100));
}
// Still alive after grace period — escalate
let _ = child.kill(); // SIGKILL
let _ = child.wait();
}
/// Wait for rclone VFS write-back queue to drain.
///
/// Polls `vfs/stats` every 2s. Exits when uploads_in_progress + uploads_queued
/// reaches 0, or after 5 minutes (safety cap to avoid hanging forever).
fn wait_writeback_drain() {
use crate::rclone::rc;
let deadline = Instant::now() + WRITEBACK_DRAIN_TIMEOUT;
let mut first = true;
loop {
match rc::vfs_stats() {
Ok(vfs) => {
if let Some(dc) = &vfs.disk_cache {
let pending = dc.uploads_in_progress + dc.uploads_queued;
if pending == 0 {
if !first {
println!(" Write-back queue drained.");
}
return;
}
if first {
println!(
" Waiting for write-back queue ({pending} files pending)..."
);
first = false;
} else {
eprint!("\r Write-back: {pending} files remaining... ");
}
} else {
return; // no cache info → nothing to wait for
}
}
Err(_) => return, // RC API unavailable → rclone already gone
}
if Instant::now() > deadline {
eprintln!();
eprintln!(
" Warning: write-back drain timed out after {}s, proceeding with shutdown.",
WRITEBACK_DRAIN_TIMEOUT.as_secs()
);
return;
}
thread::sleep(WRITEBACK_POLL_INTERVAL);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_restart_tracker_new() {
let tracker = RestartTracker::new();
assert_eq!(tracker.count, 0);
assert!(tracker.last_restart.is_none());
}
#[test]
fn test_restart_tracker_record_restart() {
let mut tracker = RestartTracker::new();
tracker.record_restart();
assert_eq!(tracker.count, 1);
assert!(tracker.last_restart.is_some());
}
#[test]
fn test_restart_tracker_can_restart_under_max() {
let mut tracker = RestartTracker::new();
assert!(tracker.can_restart());
tracker.record_restart();
assert!(tracker.can_restart()); // count = 1
tracker.record_restart();
assert!(tracker.can_restart()); // count = 2
}
#[test]
fn test_restart_tracker_cannot_restart_at_max() {
let mut tracker = RestartTracker::new();
for _ in 0..MAX_RESTARTS {
tracker.record_restart();
}
assert!(!tracker.can_restart()); // count = 3 = MAX_RESTARTS
}
#[test]
fn test_restart_tracker_backoff_delay() {
let mut tracker = RestartTracker::new();
tracker.record_restart();
assert_eq!(tracker.count * 2, 2); // 2s delay
tracker.record_restart();
assert_eq!(tracker.count * 2, 4); // 4s delay
tracker.record_restart();
assert_eq!(tracker.count * 2, 6); // 6s delay
}
#[test]
fn test_restart_tracker_multiple_record() {
let mut tracker = RestartTracker::new();
tracker.record_restart();
tracker.record_restart();
tracker.record_restart();
assert_eq!(tracker.count, 3);
assert!(!tracker.can_restart());
}
#[test]
fn test_constants() {
assert_eq!(MOUNT_TIMEOUT, Duration::from_secs(30));
assert_eq!(POLL_INTERVAL, Duration::from_secs(2));
assert_eq!(SIGTERM_GRACE, Duration::from_secs(3));
assert_eq!(MAX_RESTARTS, 3);
assert_eq!(RESTART_STABLE_PERIOD, Duration::from_secs(300));
assert_eq!(WRITEBACK_DRAIN_TIMEOUT, Duration::from_secs(300));
assert_eq!(WRITEBACK_POLL_INTERVAL, Duration::from_secs(2));
}
}
/// Reverse-order teardown of all services.
///
/// Order: stop smbd → unexport NFS → kill WebDAV → unmount FUSE → kill rclone.
fn shutdown_services(config: &Config, mount: &mut Child, protocols: &mut ProtocolChildren) {
// Stop SMB
if let Some(child) = &mut protocols.smbd {
graceful_kill(child);
println!(" SMB: stopped");
}
// Unexport NFS
if config.protocols.enable_nfs {
let _ = Command::new("exportfs").arg("-ua").status();
println!(" NFS: unexported");
}
// Kill WebDAV
if let Some(child) = &mut protocols.webdav {
graceful_kill(child);
println!(" WebDAV: stopped");
}
// Wait for write-back queue to drain before unmounting
wait_writeback_drain();
// Lazy unmount FUSE (skip if rclone already unmounted on signal)
if is_mounted(config).unwrap_or(false) {
let mount_point = config.mount.point.display().to_string();
let unmounted = Command::new("fusermount3")
.args(["-uz", &mount_point])
.status()
.map(|s| s.success())
.unwrap_or(false);
if !unmounted {
let _ = Command::new("fusermount")
.args(["-uz", &mount_point])
.status();
}
}
println!(" FUSE: unmounted");
// Gracefully stop rclone
graceful_kill(mount);
println!(" rclone: stopped");
}