warpgate/src/web/sse.rs
grabbit d5b83a0075 fix: SSE 实时更新同步状态指示器
sync-status 区域未包含在 SSE OOB swap 中,导致页面加载后
状态永远不会更新。新增 SyncStatusPartial 模板并加入 SSE
payload,使 dirty count 归零时 UI 能实时切换。

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 17:30:08 +08:00

222 lines
6.7 KiB
Rust

//! Server-Sent Events endpoint for real-time status push.
//!
//! The supervisor sends `()` on a `broadcast::Sender` after every
//! `update_status()` cycle. Each SSE client subscribes, renders the
//! partial templates, and pushes them as a single `status` event with
//! htmx OOB swap attributes so multiple DOM regions update at once.
use std::convert::Infallible;
use std::time::Duration;
use askama::Template;
use axum::extract::State;
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::routing::get;
use axum::Router;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use crate::web::SharedState;
pub fn routes() -> Router<SharedState> {
Router::new().route("/events", get(sse_handler))
}
async fn sse_handler(
State(state): State<SharedState>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
let rx = state.sse_tx.subscribe();
let stream = BroadcastStream::new(rx).filter_map(move |r| {
match r {
Ok(()) => {
let status = state.status.read().unwrap();
let config = state.config.read().unwrap();
let html = render_sse_payload(&status, &config);
Some(Ok(Event::default().event("status").data(html)))
}
Err(_) => None, // lagged, skip
}
});
Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
}
/// Render all SSE partials into a single HTML payload.
///
/// Uses htmx OOB (Out-of-Band) swap so a single SSE event can update
/// multiple independent DOM regions:
/// - `#dashboard-stats` — stat cards
/// - `#share-rows` — share card list
/// - `#protocol-badges` — SMB/NFS/WebDAV badges
fn render_sse_payload(
status: &crate::daemon::DaemonStatus,
config: &crate::config::Config,
) -> String {
let shares: Vec<SseShareView> = status
.shares
.iter()
.map(|s| {
let share_config = config.find_share(&s.name);
let (warmup_state, warmup_done, warmup_total) =
status.warmup_summary_for(&s.name);
SseShareView {
name: s.name.clone(),
connection: share_config
.map(|sc| sc.connection.clone())
.unwrap_or_default(),
mount_point: share_config
.map(|sc| sc.mount_point.display().to_string())
.unwrap_or_default(),
remote_path: share_config
.map(|sc| sc.remote_path.clone())
.unwrap_or_default(),
mounted: s.mounted,
cache_display: s.cache_display(),
dirty_count: s.dirty_count,
errored_files: s.errored_files,
speed_display: s.speed_display(),
transfers: s.transfers,
errors: s.errors,
read_only: share_config.map(|sc| sc.read_only).unwrap_or(false),
health: s.health_label().to_string(),
health_message: s.health_message().unwrap_or("").to_string(),
rc_port: s.rc_port,
warmup_state: warmup_state.to_string(),
warmup_done,
warmup_total,
}
})
.collect();
let healthy_count = shares.iter().filter(|s| s.health == "OK").count();
let failed_count = shares.iter().filter(|s| s.health == "FAILED").count();
let total_cache: u64 = status.shares.iter().map(|s| s.cache_bytes).sum();
let total_speed: f64 = status.shares.iter().map(|s| s.speed).sum();
let active_transfers: u64 = status.shares.iter().map(|s| s.transfers).sum();
let stats = DashboardStatsPartial {
total_shares: shares.len(),
healthy_count,
failed_count,
total_cache_display: format_bytes_static(total_cache),
aggregate_speed_display: if total_speed < 1.0 {
"-".to_string()
} else {
format!("{}/s", format_bytes_static(total_speed as u64))
},
active_transfers,
uptime: status.uptime_string(),
};
let share_rows = ShareRowsPartial {
shares: shares.clone(),
};
let badges = ProtocolBadgesPartial {
smbd_running: status.smbd_running,
nfs_exported: status.nfs_exported,
webdav_running: status.webdav_running,
};
let sync_status = SyncStatusPartial {
all_synced: status.all_synced,
};
let mut html = String::new();
// Primary target: dashboard stats
if let Ok(s) = stats.render() {
html.push_str(&s);
}
// OOB: share rows
if let Ok(s) = share_rows.render() {
html.push_str(&s);
}
// OOB: protocol badges
if let Ok(s) = badges.render() {
html.push_str(&s);
}
// OOB: sync status indicator
if let Ok(s) = sync_status.render() {
html.push_str(&s);
}
html
}
fn format_bytes_static(bytes: u64) -> String {
const KIB: f64 = 1024.0;
const MIB: f64 = KIB * 1024.0;
const GIB: f64 = MIB * 1024.0;
const TIB: f64 = GIB * 1024.0;
let b = bytes as f64;
if b >= TIB {
format!("{:.1} TiB", b / TIB)
} else if b >= GIB {
format!("{:.1} GiB", b / GIB)
} else if b >= MIB {
format!("{:.1} MiB", b / MIB)
} else if b >= KIB {
format!("{:.1} KiB", b / KIB)
} else {
format!("{bytes} B")
}
}
// --- SSE partial templates ---
#[derive(Clone)]
#[allow(dead_code)] // fields used by askama templates
pub struct SseShareView {
pub name: String,
pub connection: String,
pub mount_point: String,
pub remote_path: String,
pub mounted: bool,
pub cache_display: String,
pub dirty_count: u64,
pub errored_files: u64,
pub speed_display: String,
pub transfers: u64,
pub errors: u64,
pub read_only: bool,
pub health: String,
pub health_message: String,
pub rc_port: u16,
pub warmup_state: String,
pub warmup_done: usize,
pub warmup_total: usize,
}
#[derive(Template)]
#[template(path = "web/partials/dashboard_stats.html")]
struct DashboardStatsPartial {
total_shares: usize,
healthy_count: usize,
#[allow(dead_code)]
failed_count: usize,
total_cache_display: String,
aggregate_speed_display: String,
active_transfers: u64,
uptime: String,
}
#[derive(Template)]
#[template(path = "web/partials/share_rows.html")]
struct ShareRowsPartial {
shares: Vec<SseShareView>,
}
#[derive(Template)]
#[template(path = "web/partials/protocol_badges.html")]
struct ProtocolBadgesPartial {
smbd_running: bool,
nfs_exported: bool,
webdav_running: bool,
}
#[derive(Template)]
#[template(path = "web/partials/sync_status.html")]
struct SyncStatusPartial {
all_synced: bool,
}