From 078ab4505ed66781406e5ba012396fd5261414d4 Mon Sep 17 00:00:00 2001 From: grabbit Date: Thu, 19 Feb 2026 13:37:21 +0800 Subject: [PATCH] Fix portal freeze: release write lock before HTTP IO in update_status update_status() previously acquired the shared_status write lock on the first line and then called rc::vfs_stats() and rc::core_stats() (blocking ureq HTTP) for every share while holding it. With dir-refresh flooding the rclone RC port, these calls could take seconds, starving all web handler reads and making the portal completely unresponsive. Refactor to a two-phase approach: Phase 1 collects all RC stats with no lock held; Phase 2 applies the results under a short-lived write lock (pure memory writes, microseconds). Lock hold time drops from seconds to microseconds regardless of rclone response latency. Also included in this batch: - vfs_refresh now reads the response body and surfaces partial failures - dir-refresh iterates top-level FUSE subdirs instead of refreshing "/" (rclone does not accept "/" as a valid vfs/refresh target) - Track per-share dirs_ok / dirs_failed counts in DaemonStatus and expose them through the web API Co-Authored-By: Claude Sonnet 4.6 --- src/daemon.rs | 6 ++ src/rclone/rc.rs | 23 ++++++- src/supervisor.rs | 148 +++++++++++++++++++++++++++++++++++----------- src/web/api.rs | 6 ++ 4 files changed, 145 insertions(+), 38 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index ba48445..3e3449a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -56,6 +56,10 @@ pub struct DaemonStatus { /// Shared atomic for generation — dir-refresh threads hold a clone and check /// this directly (without taking the RwLock) to detect supersession. pub dir_refresh_gen_arc: Arc, + /// Number of subdirectories successfully refreshed in the last cycle, keyed by share name. + pub dir_refresh_dirs_ok: HashMap, + /// Number of subdirectories that failed to refresh in the last cycle, keyed by share name. + pub dir_refresh_dirs_failed: HashMap, } impl DaemonStatus { @@ -87,6 +91,8 @@ impl DaemonStatus { last_dir_refresh: HashMap::new(), dir_refresh_generation: 0, dir_refresh_gen_arc: Arc::new(AtomicU64::new(0)), + dir_refresh_dirs_ok: HashMap::new(), + dir_refresh_dirs_failed: HashMap::new(), } } diff --git a/src/rclone/rc.rs b/src/rclone/rc.rs index c762a31..2d7eb5a 100644 --- a/src/rclone/rc.rs +++ b/src/rclone/rc.rs @@ -89,10 +89,29 @@ pub fn vfs_forget(port: u16, dir: &str) -> Result<()> { /// /// Triggers rclone to re-fetch directory metadata from the remote. /// Does NOT download file contents — only refreshes directory entries. +/// +/// `path` should be `""` for the VFS root, or a relative sub-path (no leading `/`). +/// Returns an error if any path in the response result map failed. pub fn vfs_refresh(port: u16, path: &str, recursive: bool) -> Result<()> { let addr = rc_addr(port); - ureq::post(format!("{addr}/vfs/refresh")) - .send_json(serde_json::json!({ "dir": path, "recursive": recursive }))?; + let resp: serde_json::Value = ureq::post(format!("{addr}/vfs/refresh")) + .send_json(serde_json::json!({ "dir": path, "recursive": recursive }))? + .body_mut() + .read_json() + .context("Failed to parse vfs/refresh response")?; + + // Response body: { "result": { "": "OK" | "" } } + if let Some(result) = resp.get("result").and_then(|r| r.as_object()) { + let failed: Vec = result + .iter() + .filter(|(_, v)| v.as_str().unwrap_or("") != "OK") + .map(|(k, v)| format!("{k}: {}", v.as_str().unwrap_or("unknown"))) + .collect(); + if !failed.is_empty() { + anyhow::bail!("vfs/refresh partial failure — {}", failed.join(", ")); + } + } + Ok(()) } diff --git a/src/supervisor.rs b/src/supervisor.rs index e05b9ee..8d0aa6f 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -335,6 +335,7 @@ fn spawn_dir_refresh( }; let share_name = share.name.clone(); + let mount_point = share.mount_point.clone(); let recursive = config.dir_refresh.recursive; let rc_port = config.rc_port(i); let status = Arc::clone(shared_status); @@ -352,11 +353,51 @@ fn spawn_dir_refresh( interval, } .spawn(generation, gen_arc2, sd, move || { - rc::vfs_refresh(rc_port, "/", recursive) - .with_context(|| format!("dir-refresh for '{share_name}'"))?; - info!(" dir-refresh OK: {share_name}"); + // Enumerate top-level subdirectories by reading the FUSE mount point. + // The VFS root itself is not a valid vfs/refresh target in rclone. + let dirs: Vec = std::fs::read_dir(&mount_point) + .with_context(|| format!("dir-refresh: failed to read mount point for '{share_name}'"))? + .filter_map(|entry| { + let entry = entry.ok()?; + if entry.file_type().ok()?.is_dir() { + entry.file_name().into_string().ok() + } else { + None + } + }) + .collect(); + + if dirs.is_empty() { + tracing::warn!(share = %share_name, "dir-refresh: no subdirs found, skipping"); + return Ok(()); + } + + let mut ok = 0usize; + let mut failed = 0usize; + for dir in &dirs { + match rc::vfs_refresh(rc_port, dir, recursive) { + Ok(()) => { + tracing::info!(share = %share_name, dir = %dir, "dir-refresh OK"); + ok += 1; + } + Err(e) => { + tracing::warn!(share = %share_name, dir = %dir, error = %e, "dir-refresh failed"); + failed += 1; + } + } + } + + tracing::info!( + share = %share_name, + dirs_ok = ok, + dirs_failed = failed, + "dir-refresh cycle complete" + ); + let mut s = status.write().unwrap(); s.last_dir_refresh.insert(share_name.clone(), SystemTime::now()); + s.dir_refresh_dirs_ok.insert(share_name.clone(), ok); + s.dir_refresh_dirs_failed.insert(share_name.clone(), failed); Ok(()) }); } @@ -769,47 +810,82 @@ fn supervise( /// /// Matches mounts to status entries by name (not index) so the mapping /// stays correct after dynamic PerShare add/remove/modify reloads. +/// +/// Uses a two-phase approach to avoid holding the write lock during HTTP IO: +/// Phase 1 collects all data without any lock; Phase 2 applies it under a +/// short-lived write lock (pure memory writes, no IO). fn update_status( shared_status: &Arc>, mounts: &[MountChild], protocols: &ProtocolChildren, config: &Config, ) { - let mut status = shared_status.write().unwrap(); - - // Update per-share stats from RC API — match by name, not index - for mc in mounts.iter() { - let mount_point = config - .shares - .iter() - .find(|s| s.name == mc.name) - .map(|s| s.mount_point.clone()) - .unwrap_or_default(); - - if let Some(ss) = status.shares.iter_mut().find(|s| s.name == mc.name) { - ss.mounted = is_mounted(&mount_point).unwrap_or(false); - ss.rc_port = mc.rc_port; - - // Fetch VFS stats (cache info, dirty files) - if let Ok(vfs) = rc::vfs_stats(mc.rc_port) { - if let Some(dc) = &vfs.disk_cache { - ss.cache_bytes = dc.bytes_used; - ss.dirty_count = dc.uploads_in_progress + dc.uploads_queued; - ss.errored_files = dc.errored_files; - } - } - - // Fetch core stats (speed, transfers) - if let Ok(core) = rc::core_stats(mc.rc_port) { - let active = core.transferring.len() as u64; - ss.speed = if active > 0 { core.speed } else { 0.0 }; - ss.transfers = active; - ss.errors = core.errors; - } - } + struct ShareSnapshot { + name: String, + rc_port: u16, + mounted: bool, + cache_bytes: u64, + dirty_count: u64, + errored_files: u64, + speed: f64, + transfers: u64, + errors: u64, } - // Update protocol status + // Phase 1: collect all data WITHOUT holding any lock. + let snapshots: Vec = mounts + .iter() + .map(|mc| { + let mount_point = config + .shares + .iter() + .find(|s| s.name == mc.name) + .map(|s| s.mount_point.clone()) + .unwrap_or_default(); + + let mounted = is_mounted(&mount_point).unwrap_or(false); + + let (cache_bytes, dirty_count, errored_files) = rc::vfs_stats(mc.rc_port) + .ok() + .and_then(|v| v.disk_cache) + .map(|dc| (dc.bytes_used, dc.uploads_in_progress + dc.uploads_queued, dc.errored_files)) + .unwrap_or((0, 0, 0)); + + let (speed, transfers, errors) = rc::core_stats(mc.rc_port) + .map(|core| { + let active = core.transferring.len() as u64; + (if active > 0 { core.speed } else { 0.0 }, active, core.errors) + }) + .unwrap_or((0.0, 0, 0)); + + ShareSnapshot { + name: mc.name.clone(), + rc_port: mc.rc_port, + mounted, + cache_bytes, + dirty_count, + errored_files, + speed, + transfers, + errors, + } + }) + .collect(); + + // Phase 2: apply collected data under write lock — no IO here. + let mut status = shared_status.write().unwrap(); + for snap in snapshots { + if let Some(ss) = status.shares.iter_mut().find(|s| s.name == snap.name) { + ss.mounted = snap.mounted; + ss.rc_port = snap.rc_port; + ss.cache_bytes = snap.cache_bytes; + ss.dirty_count = snap.dirty_count; + ss.errored_files = snap.errored_files; + ss.speed = snap.speed; + ss.transfers = snap.transfers; + ss.errors = snap.errors; + } + } status.smbd_running = protocols.smbd.is_some(); status.webdav_running = protocols.webdav.is_some(); status.nfs_exported = config.protocols.enable_nfs; diff --git a/src/web/api.rs b/src/web/api.rs index ca47026..8ca7fe2 100644 --- a/src/web/api.rs +++ b/src/web/api.rs @@ -67,6 +67,8 @@ struct ShareStatusResponse { warmup_total: usize, dir_refresh_active: bool, last_dir_refresh_ago: Option, + last_dir_refresh_dirs_ok: usize, + last_dir_refresh_dirs_failed: usize, } /// Build a `ShareStatusResponse` for one share, including dir-refresh fields. @@ -81,6 +83,8 @@ fn share_to_response( .map(|sc| config.effective_dir_refresh_interval(sc).is_some()) .unwrap_or(false); let last_dir_refresh_ago = status.dir_refresh_ago_for(&s.name); + let last_dir_refresh_dirs_ok = status.dir_refresh_dirs_ok.get(&s.name).copied().unwrap_or(0); + let last_dir_refresh_dirs_failed = status.dir_refresh_dirs_failed.get(&s.name).copied().unwrap_or(0); ShareStatusResponse { name: s.name.clone(), mounted: s.mounted, @@ -100,6 +104,8 @@ fn share_to_response( warmup_total, dir_refresh_active, last_dir_refresh_ago, + last_dir_refresh_dirs_ok, + last_dir_refresh_dirs_failed, } }