warpgate/src/cli/warmup.rs
grabbit d2b9f46b1a feat: add Apply Config progress modal and fix stale PENDING health after reload
- Add 4-step progress modal to config apply flow (validate, write, reload, services ready)
- Poll SSE-updated data-share-health attributes to detect when services finish restarting
- Fix stale health bug: recalculate health for affected shares based on actual mount
  success instead of preserving old health from before reload
- Add modal overlay/card/step CSS matching the dark theme
- Include connection refactor (multi-protocol support) and probe helpers from prior work

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 01:11:50 +08:00

445 lines
14 KiB
Rust

//! `warpgate warmup` — pre-cache a remote directory to local SSD.
//!
//! Lists files via `rclone lsf`, then reads each through the FUSE mount
//! to trigger VFS caching.
use std::io;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use anyhow::{Context, Result};
use tracing::{debug, info, warn};
use crate::config::Config;
use crate::daemon::{DaemonStatus, WarmupRuleState};
use crate::rclone::config as rclone_config;
use crate::rclone::path as rclone_path;
pub fn run(config: &Config, share_name: &str, path: &str, newer_than: Option<&str>) -> Result<()> {
let share = config
.find_share(share_name)
.with_context(|| format!("Share '{}' not found in config", share_name))?;
let conn = config
.connection_for_share(share)
.with_context(|| format!("Connection '{}' not found", share.connection))?;
let warmup_path = share.mount_point.join(path);
let remote_src = rclone_path::rclone_remote_subpath(conn, share, path);
println!("Warming up: {remote_src}");
println!(" via mount: {}", warmup_path.display());
if !warmup_path.exists() {
anyhow::bail!(
"Path not found on mount: {}. Is the mount running?",
warmup_path.display()
);
}
// List files on remote (supports --max-age for newer_than filter)
let mut cmd = Command::new("rclone");
cmd.arg("lsf")
.arg("--config")
.arg(rclone_config::RCLONE_CONF_PATH)
.arg("--recursive")
.arg("--files-only")
.arg(&remote_src);
if let Some(age) = newer_than {
cmd.arg("--max-age").arg(age);
}
let output = cmd.output().context("Failed to run rclone lsf")?;
if !output.status.success() {
anyhow::bail!(
"rclone lsf failed: {}",
String::from_utf8_lossy(&output.stderr).trim()
);
}
let file_list = String::from_utf8_lossy(&output.stdout);
let files: Vec<&str> = file_list.lines().filter(|l| !l.is_empty()).collect();
let total = files.len();
if total == 0 {
println!("No files matched.");
return Ok(());
}
println!("Found {total} files to cache.");
let mut cached = 0usize;
let mut skipped = 0usize;
let mut errors = 0usize;
let cache_prefix = rclone_path::vfs_cache_prefix(conn, share);
for file in &files {
if is_cached(config, &cache_prefix, path, file) {
skipped += 1;
continue;
}
let full_path = warmup_path.join(file);
match std::fs::File::open(&full_path) {
Ok(mut f) => {
if let Err(e) = io::copy(&mut f, &mut io::sink()) {
eprintln!(" Warning: read failed: {file}: {e}");
errors += 1;
} else {
cached += 1;
eprint!("\r Cached {cached}/{total} (skipped {skipped})");
}
}
Err(e) => {
eprintln!(" Warning: open failed: {file}: {e}");
errors += 1;
}
}
}
eprintln!();
println!(
"Warmup complete: skipped {skipped} (already cached), cached {cached}, errors {errors}."
);
Ok(())
}
/// Like `run()` but reports progress into `shared_status.warmup[rule_index]`.
///
/// Checks `shutdown` and `generation` before each file to allow early exit
/// when the daemon is stopping or a new warmup generation supersedes this one.
pub fn run_tracked(
config: &Config,
share_name: &str,
path: &str,
newer_than: Option<&str>,
shared_status: &Arc<RwLock<DaemonStatus>>,
rule_index: usize,
generation: u64,
shutdown: &AtomicBool,
) -> Result<()> {
let share = config
.find_share(share_name)
.with_context(|| format!("Share '{}' not found in config", share_name))?;
let conn = config
.connection_for_share(share)
.with_context(|| format!("Connection '{}' not found", share.connection))?;
let warmup_path = share.mount_point.join(path);
let remote_src = rclone_path::rclone_remote_subpath(conn, share, path);
// Mark as Listing
{
let mut status = shared_status.write().unwrap();
if status.warmup_generation != generation {
return Ok(());
}
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.state = WarmupRuleState::Listing;
}
}
info!(share = %share_name, path = %path, "warmup: listing files");
if !warmup_path.exists() {
let msg = format!("Path not found on mount: {}", warmup_path.display());
{
let mut status = shared_status.write().unwrap();
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.state = WarmupRuleState::Failed(msg.clone());
}
}
warn!(share = %share_name, path = %path, error = %msg, "warmup rule failed");
anyhow::bail!("{msg}");
}
// List files on remote
let mut cmd = Command::new("rclone");
cmd.arg("lsf")
.arg("--config")
.arg(rclone_config::RCLONE_CONF_PATH)
.arg("--recursive")
.arg("--files-only")
.arg(&remote_src);
if let Some(age) = newer_than {
cmd.arg("--max-age").arg(age);
}
let output = match cmd.output() {
Ok(o) => o,
Err(e) => {
let msg = format!("Failed to run rclone lsf: {e}");
{
let mut status = shared_status.write().unwrap();
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.state = WarmupRuleState::Failed(msg.clone());
}
}
warn!(share = %share_name, path = %path, error = %msg, "warmup rule failed");
anyhow::bail!("{msg}");
}
};
if !output.status.success() {
let msg = format!(
"rclone lsf failed: {}",
String::from_utf8_lossy(&output.stderr).trim()
);
{
let mut status = shared_status.write().unwrap();
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.state = WarmupRuleState::Failed(msg.clone());
}
}
warn!(share = %share_name, path = %path, error = %msg, "warmup rule failed");
anyhow::bail!("{msg}");
}
let file_list = String::from_utf8_lossy(&output.stdout);
let files: Vec<&str> = file_list.lines().filter(|l| !l.is_empty()).collect();
let total = files.len();
// Update total and transition to Caching
{
let mut status = shared_status.write().unwrap();
if status.warmup_generation != generation {
return Ok(());
}
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.total_files = total;
rs.state = if total == 0 {
WarmupRuleState::Complete
} else {
WarmupRuleState::Caching
};
}
}
if total == 0 {
info!(share = %share_name, path = %path, "warmup: no files matched");
return Ok(());
}
info!(share = %share_name, path = %path, total, "warmup: caching started");
let cache_prefix = rclone_path::vfs_cache_prefix(conn, share);
for file in &files {
// Check shutdown / generation before each file
if shutdown.load(Ordering::SeqCst) {
return Ok(());
}
{
let status = shared_status.read().unwrap();
if status.warmup_generation != generation {
return Ok(());
}
}
if is_cached(config, &cache_prefix, path, file) {
let skipped = {
let mut status = shared_status.write().unwrap();
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.skipped += 1;
rs.skipped
} else {
0
}
};
debug!(share = %share_name, file = %file, "warmup: skipped (already cached)");
if skipped % 100 == 0 {
info!(share = %share_name, skipped, total, "warmup: 100-file milestone (skipped)");
}
continue;
}
let full_path = warmup_path.join(file);
match std::fs::File::open(&full_path) {
Ok(mut f) => {
if let Err(e) = io::copy(&mut f, &mut io::sink()) {
{
let mut status = shared_status.write().unwrap();
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.errors += 1;
}
}
warn!(share = %share_name, file = %file, error = %e, "warmup: read error");
} else {
let cached = {
let mut status = shared_status.write().unwrap();
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.cached += 1;
rs.cached
} else {
0
}
};
debug!(share = %share_name, file = %file, "warmup: cached");
if cached % 100 == 0 {
info!(share = %share_name, cached, total, "warmup: 100-file milestone");
}
}
}
Err(e) => {
{
let mut status = shared_status.write().unwrap();
if let Some(rs) = status.warmup.get_mut(rule_index) {
rs.errors += 1;
}
}
warn!(share = %share_name, file = %file, error = %e, "warmup: open error");
}
}
}
// Mark complete
let (cached, skipped, errors) = {
let mut status = shared_status.write().unwrap();
if status.warmup_generation == generation {
if let Some(rs) = status.warmup.get_mut(rule_index) {
let stats = (rs.cached, rs.skipped, rs.errors);
rs.state = WarmupRuleState::Complete;
stats
} else {
(0, 0, 0)
}
} else {
(0, 0, 0)
}
};
info!(share = %share_name, path = %path, total, cached, skipped, errors,
"warmup rule complete");
Ok(())
}
/// Check if a file is already in the rclone VFS cache.
///
/// `cache_prefix` is the protocol-aware relative path from `rclone_path::vfs_cache_prefix`,
/// e.g. `nas/volume1/photos` (SFTP) or `office/photos/subfolder` (SMB).
fn is_cached(config: &Config, cache_prefix: &std::path::Path, warmup_path: &str, relative_path: &str) -> bool {
let cache_path = config
.cache
.dir
.join("vfs")
.join(cache_prefix)
.join(warmup_path)
.join(relative_path);
cache_path.exists()
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn test_config() -> Config {
toml::from_str(
r#"
[[connections]]
name = "nas"
host = "10.0.0.1"
protocol = "sftp"
user = "admin"
[cache]
dir = "/tmp/warpgate-test-cache"
[read]
[bandwidth]
[writeback]
[directory_cache]
[protocols]
[[shares]]
name = "photos"
connection = "nas"
remote_path = "/photos"
mount_point = "/mnt/photos"
"#,
)
.unwrap()
}
fn smb_config() -> Config {
toml::from_str(
r#"
[[connections]]
name = "office"
host = "192.168.1.100"
protocol = "smb"
user = "admin"
pass = "secret"
share = "data"
port = 445
[cache]
dir = "/tmp/warpgate-test-cache"
[read]
[bandwidth]
[writeback]
[directory_cache]
[protocols]
[[shares]]
name = "docs"
connection = "office"
remote_path = "/subfolder"
mount_point = "/mnt/docs"
"#,
)
.unwrap()
}
#[test]
fn test_is_cached_nonexistent_file() {
let config = test_config();
let prefix = PathBuf::from("nas/photos");
assert!(!is_cached(&config, &prefix, "2024", "IMG_001.jpg"));
}
#[test]
fn test_is_cached_deep_path() {
let config = test_config();
let prefix = PathBuf::from("nas/photos");
assert!(!is_cached(&config, &prefix, "Images/2024/January", "photo.cr3"));
}
#[test]
fn test_is_cached_sftp_path_construction() {
let config = test_config();
let share = config.find_share("photos").unwrap();
let conn = config.connection_for_share(share).unwrap();
let prefix = rclone_path::vfs_cache_prefix(conn, share);
let expected = PathBuf::from("/tmp/warpgate-test-cache/vfs/nas/photos/2024/IMG_001.jpg");
let cache_path = config.cache.dir.join("vfs").join(&prefix).join("2024").join("IMG_001.jpg");
assert_eq!(cache_path, expected);
}
#[test]
fn test_is_cached_smb_path_construction() {
let config = smb_config();
let share = config.find_share("docs").unwrap();
let conn = config.connection_for_share(share).unwrap();
let prefix = rclone_path::vfs_cache_prefix(conn, share);
// SMB: includes share name "data" before "subfolder"
let expected = PathBuf::from("/tmp/warpgate-test-cache/vfs/office/data/subfolder/2024/file.jpg");
let cache_path = config.cache.dir.join("vfs").join(&prefix).join("2024").join("file.jpg");
assert_eq!(cache_path, expected);
}
#[test]
fn test_is_cached_remote_path_trimming() {
let config = test_config();
let share = config.find_share("photos").unwrap();
let conn = config.connection_for_share(share).unwrap();
let prefix = rclone_path::vfs_cache_prefix(conn, share);
let cache_path = config.cache.dir.join("vfs").join(&prefix).join("2024").join("file.jpg");
assert!(cache_path.to_string_lossy().contains("nas/photos"));
assert!(!cache_path.to_string_lossy().contains("nas//photos"));
}
}