use anyhow::{Result, Context}; use std::collections::VecDeque; use std::path::{Path, PathBuf}; use tokio::time::{sleep, Duration}; use serde::{Serialize, Deserialize}; use std::fs; use tokio::fs as async_fs; use crate::events::{EventBus, SystemEvent, FrameCapturedEvent, MeteorDetectedEvent, EventPackageArchivedEvent}; /// Configuration for the storage controller #[derive(Debug, Clone)] pub struct StorageConfig { pub frame_buffer_size: usize, pub base_storage_path: PathBuf, pub retention_days: u32, pub video_quality: VideoQuality, pub cleanup_interval_hours: u64, } impl Default for StorageConfig { fn default() -> Self { Self { frame_buffer_size: 200, // Store 200 frames (about 6.7 seconds at 30 FPS) base_storage_path: PathBuf::from("./meteor_events"), retention_days: 30, video_quality: VideoQuality::Medium, cleanup_interval_hours: 24, // Daily cleanup } } } /// Video encoding quality options #[derive(Debug, Clone)] pub enum VideoQuality { Low, Medium, High, } /// Stored frame information for buffering #[derive(Debug, Clone)] struct StoredFrame { frame_id: u64, timestamp: chrono::DateTime, width: u32, height: u32, frame_data: Vec, } impl From for StoredFrame { fn from(event: FrameCapturedEvent) -> Self { Self { frame_id: event.frame_id, timestamp: event.timestamp, width: event.frame_data.width, height: event.frame_data.height, frame_data: event.frame_data.as_slice().to_vec(), // Convert from Arc to Vec } } } /// Metadata structure for meteor events #[derive(Debug, Serialize, Deserialize)] pub struct EventMetadata { pub event_id: String, pub trigger_frame_id: u64, pub detection_timestamp: chrono::DateTime, pub confidence_score: f64, pub algorithm_name: String, pub video_info: VideoInfo, pub detection_info: DetectionInfo, pub system_info: SystemInfo, } #[derive(Debug, Serialize, Deserialize)] pub struct VideoInfo { pub filename: String, pub format: String, pub width: u32, pub height: u32, pub frame_count: usize, pub start_frame_id: u64, pub end_frame_id: u64, pub duration_seconds: f64, } #[derive(Debug, Serialize, Deserialize)] pub struct DetectionInfo { pub algorithm_version: String, pub detection_frames: Vec, pub confidence_threshold: f64, } #[derive(Debug, Serialize, Deserialize)] pub struct DetectionFrame { pub frame_id: u64, pub filename: String, pub confidence: f64, pub timestamp: chrono::DateTime, } #[derive(Debug, Serialize, Deserialize)] pub struct SystemInfo { pub created_at: chrono::DateTime, pub client_version: String, pub frame_buffer_size: usize, pub total_frames_captured: u64, } /// Storage controller that manages event archiving pub struct StorageController { config: StorageConfig, event_bus: EventBus, frame_buffer: VecDeque, total_frames_captured: u64, last_cleanup: chrono::DateTime, } impl StorageController { /// Create a new storage controller pub fn new(config: StorageConfig, event_bus: EventBus) -> Result { // Ensure base storage directory exists if !config.base_storage_path.exists() { fs::create_dir_all(&config.base_storage_path) .context("Failed to create base storage directory")?; } let buffer_capacity = config.frame_buffer_size; Ok(Self { config, event_bus, frame_buffer: VecDeque::with_capacity(buffer_capacity), total_frames_captured: 0, last_cleanup: chrono::Utc::now(), }) } /// Start the storage controller loop pub async fn run(&mut self) -> Result<()> { println!("๐Ÿ’พ Starting storage controller..."); println!(" Buffer size: {} frames", self.config.frame_buffer_size); println!(" Storage path: {:?}", self.config.base_storage_path); println!(" Retention: {} days", self.config.retention_days); let mut event_receiver = self.event_bus.subscribe(); let cleanup_interval = Duration::from_secs(self.config.cleanup_interval_hours * 3600); println!("โœ… Storage controller initialized, starting event loop..."); loop { tokio::select! { // Handle incoming events event_result = event_receiver.recv() => { match event_result { Ok(event) => { if let Err(e) = self.handle_event(event.as_ref()).await { eprintln!("โŒ Error handling storage event: {}", e); } } Err(e) => { eprintln!("โŒ Error receiving storage event: {}", e); tokio::time::sleep(Duration::from_secs(1)).await; } } } // Periodic cleanup check _ = sleep(cleanup_interval) => { if let Err(e) = self.run_cleanup().await { eprintln!("โŒ Error in storage cleanup: {}", e); } } } } } /// Handle incoming events from the event bus async fn handle_event(&mut self, event: &SystemEvent) -> Result<()> { match event { SystemEvent::FrameCaptured(frame_event) => { self.handle_frame_captured(frame_event.clone()).await?; } SystemEvent::MeteorDetected(meteor_event) => { self.handle_meteor_detected(meteor_event.clone()).await?; } SystemEvent::SystemStarted(_) => { println!("๐Ÿ’พ Storage controller received system started event"); } SystemEvent::EventPackageArchived(_) => { // Storage controller doesn't need to handle its own archived events } } Ok(()) } /// Handle frame captured events by adding to buffer async fn handle_frame_captured(&mut self, frame_event: FrameCapturedEvent) -> Result<()> { let stored_frame = StoredFrame::from(frame_event); // Add to circular buffer self.frame_buffer.push_back(stored_frame); // Maintain buffer size while self.frame_buffer.len() > self.config.frame_buffer_size { self.frame_buffer.pop_front(); } self.total_frames_captured += 1; if self.total_frames_captured % 100 == 0 { println!("๐Ÿ’พ Stored {} frames, buffer size: {}", self.total_frames_captured, self.frame_buffer.len() ); } Ok(()) } /// Handle meteor detected events by creating archive async fn handle_meteor_detected(&mut self, meteor_event: MeteorDetectedEvent) -> Result<()> { println!("๐ŸŒŸ Creating meteor event archive for frame #{}", meteor_event.trigger_frame_id); // Create unique event directory let event_dir = self.create_event_directory(&meteor_event).await?; println!(" Event directory: {:?}", event_dir); // Extract frames around the detection let (video_frames, detection_frames) = self.extract_event_frames(&meteor_event)?; // Save video file self.save_video_file(&event_dir, &video_frames).await?; // Save detection frame images self.save_detection_frames(&event_dir, &detection_frames).await?; // Create and save metadata self.save_metadata(&event_dir, &meteor_event, &video_frames, &detection_frames).await?; // Calculate archive size let archive_size = self.calculate_directory_size(&event_dir).await?; // Get event ID from directory name let event_id = event_dir.file_name() .and_then(|name| name.to_str()) .unwrap_or("unknown") .to_string(); // Publish EventPackageArchivedEvent let archived_event = EventPackageArchivedEvent::new( event_id, event_dir.clone(), meteor_event.trigger_frame_id, video_frames.len(), archive_size, ); if let Err(e) = self.event_bus.publish_event_package_archived(archived_event) { eprintln!("โš ๏ธ Failed to publish EventPackageArchivedEvent: {}", e); } else { println!("๐Ÿ“ฆ Published EventPackageArchivedEvent for archive: {:?}", event_dir); } println!("โœ… Meteor event archive created successfully"); Ok(()) } /// Create unique directory for meteor event async fn create_event_directory(&self, meteor_event: &MeteorDetectedEvent) -> Result { let timestamp_str = meteor_event.detection_timestamp .format("%Y%m%d_%H%M%S") .to_string(); let event_id = format!("meteor_{}_{}", timestamp_str, meteor_event.trigger_frame_id); let event_dir = self.config.base_storage_path.join(&event_id); // Create main event directory async_fs::create_dir_all(&event_dir).await .context("Failed to create event directory")?; // Create detection_data subdirectory let detection_dir = event_dir.join("detection_data"); async_fs::create_dir_all(&detection_dir).await .context("Failed to create detection_data directory")?; Ok(event_dir) } /// Extract frames for video and detection analysis fn extract_event_frames(&self, meteor_event: &MeteorDetectedEvent) -> Result<(Vec, Vec)> { if self.frame_buffer.is_empty() { return Ok((Vec::new(), Vec::new())); } let trigger_frame_id = meteor_event.trigger_frame_id; // Find the trigger frame in buffer let trigger_pos = self.frame_buffer.iter() .position(|frame| frame.frame_id == trigger_frame_id); let mut video_frames = Vec::new(); let mut detection_frames = Vec::new(); match trigger_pos { Some(pos) => { // Extract frames for video (e.g., 2 seconds before and after at 30 FPS = 60 frames each side) let video_range = 60; let start_video = pos.saturating_sub(video_range); let end_video = (pos + video_range + 1).min(self.frame_buffer.len()); for i in start_video..end_video { if let Some(frame) = self.frame_buffer.get(i) { video_frames.push(frame.clone()); } } // Extract detection frames (trigger frame and neighbors) let detection_range = 2; // 2 frames before and after let start_detection = pos.saturating_sub(detection_range); let end_detection = (pos + detection_range + 1).min(self.frame_buffer.len()); for i in start_detection..end_detection { if let Some(frame) = self.frame_buffer.get(i) { detection_frames.push(frame.clone()); } } } None => { // If trigger frame not found, use recent frames let recent_count = 120.min(self.frame_buffer.len()); // Last 4 seconds at 30 FPS let start_idx = self.frame_buffer.len().saturating_sub(recent_count); for i in start_idx..self.frame_buffer.len() { if let Some(frame) = self.frame_buffer.get(i) { video_frames.push(frame.clone()); // Use last few frames as detection frames if i >= self.frame_buffer.len().saturating_sub(5) { detection_frames.push(frame.clone()); } } } } } println!(" Extracted {} video frames, {} detection frames", video_frames.len(), detection_frames.len()); Ok((video_frames, detection_frames)) } /// Save video file from frames (simplified implementation) async fn save_video_file(&self, event_dir: &Path, frames: &[StoredFrame]) -> Result<()> { if frames.is_empty() { return Ok(()); } let video_path = event_dir.join("video.mp4"); // For now, create a simple text file with frame information // In a real implementation, this would use ffmpeg or similar for actual video encoding let mut video_info = String::new(); video_info.push_str("# Meteor Event Video File\n"); video_info.push_str(&format!("# Generated at: {}\n", chrono::Utc::now())); video_info.push_str(&format!("# Frame count: {}\n", frames.len())); video_info.push_str(&format!("# Resolution: {}x{}\n", frames[0].width, frames[0].height)); video_info.push_str("# Frame Information:\n"); for frame in frames { video_info.push_str(&format!("Frame {}: {} bytes at {}\n", frame.frame_id, frame.frame_data.len(), frame.timestamp )); } async_fs::write(&video_path, video_info).await .context("Failed to save video file")?; println!(" Saved video placeholder: {:?}", video_path); Ok(()) } /// Save detection frame images async fn save_detection_frames(&self, event_dir: &Path, frames: &[StoredFrame]) -> Result<()> { let detection_dir = event_dir.join("detection_data"); for frame in frames { let frame_filename = format!("frame_{}.jpg", frame.frame_id); let frame_path = detection_dir.join(&frame_filename); // Save the raw frame data (in a real implementation, this would be proper image encoding) async_fs::write(&frame_path, &frame.frame_data).await .with_context(|| format!("Failed to save detection frame {}", frame.frame_id))?; } println!(" Saved {} detection frames to detection_data/", frames.len()); Ok(()) } /// Create and save event metadata async fn save_metadata(&self, event_dir: &Path, meteor_event: &MeteorDetectedEvent, video_frames: &[StoredFrame], detection_frames: &[StoredFrame]) -> Result<()> { let event_id = event_dir.file_name() .and_then(|name| name.to_str()) .unwrap_or("unknown") .to_string(); let video_info = VideoInfo { filename: "video.mp4".to_string(), format: "MP4".to_string(), width: video_frames.first().map(|f| f.width).unwrap_or(0), height: video_frames.first().map(|f| f.height).unwrap_or(0), frame_count: video_frames.len(), start_frame_id: video_frames.first().map(|f| f.frame_id).unwrap_or(0), end_frame_id: video_frames.last().map(|f| f.frame_id).unwrap_or(0), duration_seconds: video_frames.len() as f64 / 30.0, // Assuming 30 FPS }; let detection_info = DetectionInfo { algorithm_version: meteor_event.algorithm_name.clone(), detection_frames: detection_frames.iter().map(|frame| DetectionFrame { frame_id: frame.frame_id, filename: format!("detection_data/frame_{}.jpg", frame.frame_id), confidence: if frame.frame_id == meteor_event.trigger_frame_id { meteor_event.confidence_score } else { 0.0 // Unknown confidence for neighboring frames }, timestamp: frame.timestamp, }).collect(), confidence_threshold: 0.05, // From detection config }; let system_info = SystemInfo { created_at: chrono::Utc::now(), client_version: env!("CARGO_PKG_VERSION").to_string(), frame_buffer_size: self.config.frame_buffer_size, total_frames_captured: self.total_frames_captured, }; let metadata = EventMetadata { event_id, trigger_frame_id: meteor_event.trigger_frame_id, detection_timestamp: meteor_event.detection_timestamp, confidence_score: meteor_event.confidence_score, algorithm_name: meteor_event.algorithm_name.clone(), video_info, detection_info, system_info, }; let metadata_json = serde_json::to_string_pretty(&metadata) .context("Failed to serialize metadata")?; let metadata_path = event_dir.join("metadata.json"); async_fs::write(&metadata_path, metadata_json).await .context("Failed to save metadata file")?; println!(" Saved metadata: {:?}", metadata_path); Ok(()) } /// Run periodic cleanup of old event directories async fn run_cleanup(&mut self) -> Result<()> { let now = chrono::Utc::now(); // Only run cleanup once per day if now.signed_duration_since(self.last_cleanup).num_hours() < 24 { return Ok(()); } println!("๐Ÿงน Running storage cleanup..."); let retention_duration = chrono::Duration::days(self.config.retention_days as i64); let cutoff_date = now - retention_duration; let mut entries = async_fs::read_dir(&self.config.base_storage_path).await .context("Failed to read storage directory")?; let mut deleted_count = 0; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.is_dir() { if let Ok(metadata) = entry.metadata().await { if let Ok(created) = metadata.created() { let created_dt = chrono::DateTime::::from(created); if created_dt < cutoff_date { println!(" Deleting expired event directory: {:?}", path); if let Err(e) = async_fs::remove_dir_all(&path).await { eprintln!("โŒ Failed to delete directory {:?}: {}", path, e); } else { deleted_count += 1; } } } } } } self.last_cleanup = now; if deleted_count > 0 { println!("โœ… Cleanup completed, deleted {} expired event directories", deleted_count); } else { println!("โœ… Cleanup completed, no expired directories found"); } Ok(()) } /// Calculate total size of a directory and its contents async fn calculate_directory_size(&self, dir_path: &Path) -> Result { self.calculate_directory_size_recursive(dir_path).await } /// Recursive helper for calculating directory size fn calculate_directory_size_recursive<'a>(&'a self, dir_path: &'a Path) -> std::pin::Pin> + Send + '_>> { Box::pin(async move { let mut total_size = 0u64; let mut entries = async_fs::read_dir(dir_path).await .context("Failed to read directory for size calculation")?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); let metadata = entry.metadata().await .context("Failed to get file metadata")?; if metadata.is_file() { total_size += metadata.len(); } else if metadata.is_dir() { // Recursively calculate subdirectory size total_size += self.calculate_directory_size_recursive(&path).await?; } } Ok(total_size) }) } /// Get current storage statistics pub fn get_stats(&self) -> StorageStats { StorageStats { buffer_size: self.frame_buffer.len(), buffer_capacity: self.config.frame_buffer_size, total_frames_captured: self.total_frames_captured, storage_path: self.config.base_storage_path.clone(), retention_days: self.config.retention_days, } } } /// Storage system statistics #[derive(Debug, Clone)] pub struct StorageStats { pub buffer_size: usize, pub buffer_capacity: usize, pub total_frames_captured: u64, pub storage_path: PathBuf, pub retention_days: u32, } #[cfg(test)] mod tests { use super::*; use crate::events::EventBus; #[test] fn test_storage_config_default() { let config = StorageConfig::default(); assert_eq!(config.frame_buffer_size, 200); assert_eq!(config.retention_days, 30); assert_eq!(config.cleanup_interval_hours, 24); } #[test] fn test_stored_frame_conversion() { let frame_event = FrameCapturedEvent::new(1, 640, 480, vec![1, 2, 3, 4]); let stored_frame = StoredFrame::from(frame_event.clone()); assert_eq!(stored_frame.frame_id, frame_event.frame_id); assert_eq!(stored_frame.width, frame_event.width); assert_eq!(stored_frame.height, frame_event.height); assert_eq!(stored_frame.frame_data, frame_event.frame_data); } #[tokio::test] async fn test_storage_controller_creation() { let temp_dir = std::env::temp_dir().join("test_meteor_storage"); let config = StorageConfig { base_storage_path: temp_dir.clone(), ..StorageConfig::default() }; let event_bus = EventBus::new(100); let controller = StorageController::new(config, event_bus); assert!(controller.is_ok()); // Cleanup if temp_dir.exists() { let _ = std::fs::remove_dir_all(&temp_dir); } } }