diff --git a/src/cli/warmup.rs b/src/cli/warmup.rs index 07d07b5..b862beb 100644 --- a/src/cli/warmup.rs +++ b/src/cli/warmup.rs @@ -9,6 +9,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use anyhow::{Context, Result}; +use tracing::{debug, info, warn}; use crate::config::Config; use crate::daemon::{DaemonStatus, WarmupRuleState}; @@ -130,13 +131,17 @@ pub fn run_tracked( rs.state = WarmupRuleState::Listing; } } + info!(share = %share_name, path = %path, "warmup: listing files"); if !warmup_path.exists() { let msg = format!("Path not found on mount: {}", warmup_path.display()); - let mut status = shared_status.write().unwrap(); - if let Some(rs) = status.warmup.get_mut(rule_index) { - rs.state = WarmupRuleState::Failed(msg.clone()); + { + let mut status = shared_status.write().unwrap(); + if let Some(rs) = status.warmup.get_mut(rule_index) { + rs.state = WarmupRuleState::Failed(msg.clone()); + } } + warn!(share = %share_name, path = %path, error = %msg, "warmup rule failed"); anyhow::bail!("{msg}"); } @@ -157,10 +162,13 @@ pub fn run_tracked( Ok(o) => o, Err(e) => { let msg = format!("Failed to run rclone lsf: {e}"); - let mut status = shared_status.write().unwrap(); - if let Some(rs) = status.warmup.get_mut(rule_index) { - rs.state = WarmupRuleState::Failed(msg.clone()); + { + let mut status = shared_status.write().unwrap(); + if let Some(rs) = status.warmup.get_mut(rule_index) { + rs.state = WarmupRuleState::Failed(msg.clone()); + } } + warn!(share = %share_name, path = %path, error = %msg, "warmup rule failed"); anyhow::bail!("{msg}"); } }; @@ -170,10 +178,13 @@ pub fn run_tracked( "rclone lsf failed: {}", String::from_utf8_lossy(&output.stderr).trim() ); - let mut status = shared_status.write().unwrap(); - if let Some(rs) = status.warmup.get_mut(rule_index) { - rs.state = WarmupRuleState::Failed(msg.clone()); + { + let mut status = shared_status.write().unwrap(); + if let Some(rs) = status.warmup.get_mut(rule_index) { + rs.state = WarmupRuleState::Failed(msg.clone()); + } } + warn!(share = %share_name, path = %path, error = %msg, "warmup rule failed"); anyhow::bail!("{msg}"); } @@ -198,8 +209,10 @@ pub fn run_tracked( } if total == 0 { + info!(share = %share_name, path = %path, "warmup: no files matched"); return Ok(()); } + info!(share = %share_name, path = %path, total, "warmup: caching started"); for file in &files { // Check shutdown / generation before each file @@ -214,9 +227,18 @@ pub fn run_tracked( } if is_cached(config, &share.connection, &share.remote_path, path, file) { - let mut status = shared_status.write().unwrap(); - if let Some(rs) = status.warmup.get_mut(rule_index) { - rs.skipped += 1; + let skipped = { + let mut status = shared_status.write().unwrap(); + if let Some(rs) = status.warmup.get_mut(rule_index) { + rs.skipped += 1; + rs.skipped + } else { + 0 + } + }; + debug!(share = %share_name, file = %file, "warmup: skipped (already cached)"); + if skipped % 100 == 0 { + info!(share = %share_name, skipped, total, "warmup: 100-file milestone (skipped)"); } continue; } @@ -224,36 +246,59 @@ pub fn run_tracked( let full_path = warmup_path.join(file); match std::fs::File::open(&full_path) { Ok(mut f) => { - if let Err(_e) = io::copy(&mut f, &mut io::sink()) { + if let Err(e) = io::copy(&mut f, &mut io::sink()) { + { + let mut status = shared_status.write().unwrap(); + if let Some(rs) = status.warmup.get_mut(rule_index) { + rs.errors += 1; + } + } + warn!(share = %share_name, file = %file, error = %e, "warmup: read error"); + } else { + let cached = { + let mut status = shared_status.write().unwrap(); + if let Some(rs) = status.warmup.get_mut(rule_index) { + rs.cached += 1; + rs.cached + } else { + 0 + } + }; + debug!(share = %share_name, file = %file, "warmup: cached"); + if cached % 100 == 0 { + info!(share = %share_name, cached, total, "warmup: 100-file milestone"); + } + } + } + Err(e) => { + { let mut status = shared_status.write().unwrap(); if let Some(rs) = status.warmup.get_mut(rule_index) { rs.errors += 1; } - } else { - let mut status = shared_status.write().unwrap(); - if let Some(rs) = status.warmup.get_mut(rule_index) { - rs.cached += 1; - } - } - } - Err(_e) => { - let mut status = shared_status.write().unwrap(); - if let Some(rs) = status.warmup.get_mut(rule_index) { - rs.errors += 1; } + warn!(share = %share_name, file = %file, error = %e, "warmup: open error"); } } } // Mark complete - { + let (cached, skipped, errors) = { let mut status = shared_status.write().unwrap(); if status.warmup_generation == generation { if let Some(rs) = status.warmup.get_mut(rule_index) { + let stats = (rs.cached, rs.skipped, rs.errors); rs.state = WarmupRuleState::Complete; + stats + } else { + (0, 0, 0) } + } else { + (0, 0, 0) } - } + }; + info!(share = %share_name, path = %path, total, cached, skipped, errors, + "warmup rule complete"); Ok(()) } diff --git a/src/supervisor.rs b/src/supervisor.rs index 8d0aa6f..b35867d 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -4,6 +4,7 @@ //! process tree with coordinated startup and shutdown. Spawns a built-in web //! server for status monitoring and config hot-reload. +use std::collections::HashMap; use std::os::unix::process::CommandExt; use std::path::PathBuf; use std::process::{Child, Command}; @@ -38,6 +39,24 @@ const RESTART_STABLE_PERIOD: Duration = Duration::from_secs(300); const WRITEBACK_DRAIN_TIMEOUT: Duration = Duration::from_secs(300); /// Poll interval when waiting for write-back drain. const WRITEBACK_POLL_INTERVAL: Duration = Duration::from_secs(2); +/// Transfer speed below this value is considered idle (bytes/sec). +const SPEED_ACTIVE_THRESHOLD: f64 = 10_240.0; // 10 KiB/s +/// Interval for periodic cache stats snapshots. +const STATS_SNAPSHOT_INTERVAL: Duration = Duration::from_secs(60); +/// Cache usage WARN threshold (fraction of max_size). +const CACHE_WARN_THRESHOLD: f64 = 0.80; +/// Cache usage CRIT threshold. +const CACHE_CRITICAL_THRESHOLD: f64 = 0.95; + +/// Per-share state from the previous poll cycle, used for change detection. +struct SharePrevState { + dirty_count: u64, + errored_files: u64, + cache_bytes: u64, + is_active: bool, + /// 0 = normal, 1 = ≥80%, 2 = ≥95% + cache_warn_level: u8, +} /// Tracks restart attempts for a supervised child process. struct RestartTracker { @@ -677,6 +696,8 @@ fn supervise( ) -> Result<()> { let mut smbd_tracker = RestartTracker::new(); let mut webdav_tracker = RestartTracker::new(); + let mut prev_states: HashMap = HashMap::new(); + let mut last_stats_snapshot = Instant::now(); loop { // Check for commands (non-blocking with timeout = POLL_INTERVAL) @@ -686,7 +707,7 @@ fn supervise( return Ok(()); } Ok(SupervisorCmd::BwLimit { up, down }) => { - info!("Applying bandwidth limit: up={up}, down={down}"); + info!(bw_limit_up = %up, bw_limit_down = %down, "bandwidth limit applied"); apply_bwlimit(mounts, &up, &down); } Ok(SupervisorCmd::Reload(new_config)) => { @@ -801,6 +822,9 @@ fn supervise( // Update shared status with fresh RC stats update_status(shared_status, mounts, protocols, &config); + // Log cache state changes and periodic snapshots + log_cache_events(shared_status, &config, &mut prev_states, &mut last_stats_snapshot); + // Notify SSE subscribers that status was refreshed let _ = sse_tx.send(()); } @@ -1349,6 +1373,138 @@ fn shutdown_services(config: &Config, mounts: &mut Vec, protocols: & info!(" rclone: stopped"); } +/// Detect cache state changes and emit structured log events; also emit +/// periodic stats snapshots. Called after every `update_status()` cycle. +fn log_cache_events( + shared_status: &Arc>, + config: &Config, + prev_states: &mut HashMap, + last_snapshot: &mut Instant, +) { + let status = shared_status.read().unwrap(); + let max_bytes = parse_size_bytes(&config.cache.max_size); + let emit_snapshot = last_snapshot.elapsed() >= STATS_SNAPSHOT_INTERVAL; + + for ss in &status.shares { + if !ss.mounted { + continue; + } + + let now_active = ss.speed > SPEED_ACTIVE_THRESHOLD; + let now_warn_level = if let Some(max) = max_bytes { + let frac = ss.cache_bytes as f64 / max as f64; + if frac >= CACHE_CRITICAL_THRESHOLD { + 2 + } else if frac >= CACHE_WARN_THRESHOLD { + 1 + } else { + 0 + } + } else { + 0 + }; + + let prev = prev_states.entry(ss.name.clone()).or_insert(SharePrevState { + dirty_count: ss.dirty_count, + errored_files: ss.errored_files, + cache_bytes: ss.cache_bytes, + is_active: now_active, + cache_warn_level: now_warn_level, + }); + + // dirty_count change + if ss.dirty_count != prev.dirty_count { + info!(share = %ss.name, dirty_count = ss.dirty_count, + prev = prev.dirty_count, "cache dirty_count changed"); + } + + // errored_files change + if ss.errored_files != prev.errored_files { + if ss.errored_files > prev.errored_files { + warn!(share = %ss.name, errored_files = ss.errored_files, + prev = prev.errored_files, "cache errored_files increased"); + } else { + info!(share = %ss.name, errored_files = ss.errored_files, + prev = prev.errored_files, "cache errored_files cleared"); + } + } + + // cache_bytes change >10% + if prev.cache_bytes == 0 && ss.cache_bytes > 0 { + info!(share = %ss.name, cache_bytes = ss.cache_bytes, "cache population started"); + } else if prev.cache_bytes > 0 { + let delta = (ss.cache_bytes as f64 - prev.cache_bytes as f64).abs() + / prev.cache_bytes as f64; + if delta > 0.10 { + info!(share = %ss.name, cache_bytes = ss.cache_bytes, + prev = prev.cache_bytes, "cache size changed >10%"); + } + } + + // transfer idle/active transition + if now_active != prev.is_active { + if now_active { + info!(share = %ss.name, speed_bps = ss.speed as u64, "transfer became active"); + } else { + info!(share = %ss.name, "transfer became idle"); + } + } + + // cache warn level change or periodic snapshot + if now_warn_level != prev.cache_warn_level || emit_snapshot { + if now_warn_level == 2 { + if let Some(max) = max_bytes { + warn!(share = %ss.name, cache_bytes = ss.cache_bytes, cache_max = max, + "cache critically full (>=95%)"); + } + } else if now_warn_level == 1 { + if let Some(max) = max_bytes { + warn!(share = %ss.name, cache_bytes = ss.cache_bytes, cache_max = max, + "cache nearly full (>=80%)"); + } + } + } + + // periodic stats snapshot + if emit_snapshot { + info!(share = %ss.name, + cache_bytes = ss.cache_bytes, dirty_count = ss.dirty_count, + errored_files = ss.errored_files, speed_bps = ss.speed as u64, + transfers = ss.transfers, errors = ss.errors, + "stats snapshot"); + } + + prev.dirty_count = ss.dirty_count; + prev.errored_files = ss.errored_files; + prev.cache_bytes = ss.cache_bytes; + prev.is_active = now_active; + prev.cache_warn_level = now_warn_level; + } + + if emit_snapshot { + *last_snapshot = Instant::now(); + } +} + +/// Parse a human-readable size string (e.g. "200G", "1.5T", "512M") into bytes. +fn parse_size_bytes(s: &str) -> Option { + let s = s.trim(); + let (num_part, suffix) = s + .find(|c: char| c.is_alphabetic()) + .map(|i| s.split_at(i)) + .unwrap_or((s, "")); + let n: f64 = num_part.trim().parse().ok()?; + let mult: f64 = match suffix.to_uppercase().trim_end_matches('B') { + "" => 1.0, + "K" => 1024.0, + "M" => 1024.0_f64.powi(2), + "G" => 1024.0_f64.powi(3), + "T" => 1024.0_f64.powi(4), + _ => return None, + }; + Some((n * mult) as u64) +} + #[cfg(test)] mod tests { use super::*; @@ -1423,4 +1579,15 @@ mod tests { assert_eq!(WRITEBACK_DRAIN_TIMEOUT, Duration::from_secs(300)); assert_eq!(WRITEBACK_POLL_INTERVAL, Duration::from_secs(2)); } + + #[test] + fn test_parse_size_bytes() { + assert_eq!(parse_size_bytes("200G"), Some(200 * 1024 * 1024 * 1024)); + assert_eq!(parse_size_bytes("1T"), Some(1024 * 1024 * 1024 * 1024)); + assert_eq!(parse_size_bytes("512M"), Some(512 * 1024 * 1024)); + assert_eq!(parse_size_bytes("1024K"), Some(1024 * 1024)); + assert_eq!(parse_size_bytes("1024"), Some(1024)); + assert_eq!(parse_size_bytes("200GB"), Some(200 * 1024 * 1024 * 1024)); + assert_eq!(parse_size_bytes("bogus"), None); + } } diff --git a/src/web/api.rs b/src/web/api.rs index 8ca7fe2..2dedb69 100644 --- a/src/web/api.rs +++ b/src/web/api.rs @@ -361,9 +361,17 @@ async fn get_logs( .map(|l| l.to_string()) .collect(); let total_lines = all_lines.len(); + // from_line == 0 means initial load: show the most recent `lines` entries + // (tail behaviour). Subsequent polls pass the exact line offset returned by + // the previous response, so they only pick up newly-appended lines. + let skip = if params.from_line == 0 { + total_lines.saturating_sub(params.lines) + } else { + params.from_line + }; let entries: Vec = all_lines .into_iter() - .skip(params.from_line) + .skip(skip) .take(params.lines) .collect();