diff --git a/Cargo.toml b/Cargo.toml index 41e422f..a85cca1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,11 +5,15 @@ edition = "2021" authors = ["Meteor Detection Team"] description = "A Raspberry Pi based meteor detection system" +[features] +default = [] +gpio = ["rppal", "embedded-hal"] # Feature to enable GPIO functionality + [dependencies] # Hardware interfaces -rppal = "0.22.1" # Raspberry Pi hardware access +rppal = { version = "0.22.1", optional = true } # Raspberry Pi hardware access serialport = "4.2.0" # Serial port for GPS -embedded-hal = "0.2.7" # Hardware abstraction layer +embedded-hal = { version = "0.2.7", optional = true } # Hardware abstraction layer # Video processing opencv = { version = "0.94.2" } # OpenCV bindings @@ -47,6 +51,7 @@ thiserror = "1.0.40" # Error definitions config = "0.13.3" # Configuration management uuid = { version = "1.3.3", features = ["v4", "serde"] } # UUIDs clap = { version = "4.2.5", features = ["derive"] } # Command line argument parsing +rand = "0.8.5" # Random number generation for sensor simulation [dev-dependencies] criterion = "0.4.0" # Benchmarking diff --git a/build.sh b/build.sh index 24e6d65..fc59c50 100755 --- a/build.sh +++ b/build.sh @@ -12,6 +12,24 @@ NC='\033[0m' # No Color # Default configuration CONFIG_DIR="$HOME/.config/meteor_detect" +# Detect OS platform +PLATFORM=$(uname) +if [[ "$PLATFORM" == "Linux" ]]; then + IS_RASPBERRY_PI=false + # Check if we're on a Raspberry Pi + if [ -f /proc/device-tree/model ]; then + MODEL=$(tr -d '\0' < /proc/device-tree/model) + if [[ "$MODEL" == *"Raspberry Pi"* ]]; then + IS_RASPBERRY_PI=true + fi + fi + PLATFORM_FEATURE="--features gpio" + echo -e "${GREEN}Detected Linux platform. Enabling GPIO support.${NC}" +else + PLATFORM_FEATURE="" + echo -e "${YELLOW}Detected non-Linux platform ($PLATFORM). GPIO support will be disabled.${NC}" +fi + # Print help message function print_help { echo -e "${YELLOW}Meteor Detection System Build Script${NC}" @@ -28,24 +46,28 @@ function print_help { echo " create-config Create a default configuration file" echo " help Show this help message" echo "" + if [[ "$PLATFORM" != "Linux" ]]; then + echo -e "${YELLOW}Note: Running on non-Linux platform. GPIO support is disabled.${NC}" + echo -e "${YELLOW} Only video processing and meteor detection will be available.${NC}" + fi } # Build the application in debug mode function build_debug { echo -e "${GREEN}Building in debug mode...${NC}" - cargo build + cargo build $PLATFORM_FEATURE } # Build the application in release mode function build_release { echo -e "${GREEN}Building in release mode...${NC}" - cargo build --release + cargo build --release $PLATFORM_FEATURE } # Run the application function run_app { echo -e "${GREEN}Running application...${NC}" - cargo run + cargo run $PLATFORM_FEATURE } # Clean build artifacts @@ -57,7 +79,7 @@ function clean { # Run tests function run_tests { echo -e "${GREEN}Running tests...${NC}" - cargo test + cargo test $PLATFORM_FEATURE } # Install development dependencies @@ -65,41 +87,54 @@ function setup { echo -e "${GREEN}Installing development dependencies...${NC}" # Check if running on Raspberry Pi - if [ -f /etc/os-release ]; then - . /etc/os-release - if [[ "$ID" == "raspbian" ]]; then - echo -e "${YELLOW}Detected Raspberry Pi OS${NC}" + if [[ "$IS_RASPBERRY_PI" == "true" ]]; then + echo -e "${YELLOW}Detected Raspberry Pi hardware${NC}" + + # Install system dependencies + echo -e "${GREEN}Installing system dependencies...${NC}" + sudo apt update + sudo apt install -y git curl build-essential pkg-config \ + libssl-dev libv4l-dev v4l-utils \ + libopencv-dev libsqlite3-dev libglib2.0-dev \ + libudev-dev libgstrtspserver-1.0-dev \ + gcc g++ cmake clang libclang-dev llvm-dev \ - # Install system dependencies - echo -e "${GREEN}Installing system dependencies...${NC}" - sudo apt update - sudo apt install -y git curl build-essential pkg-config \ - libssl-dev libv4l-dev v4l-utils \ - libopencv-dev libsqlite3-dev libglib2.0-dev \ - libudev-dev libgstrtspserver-1.0-dev \ - gcc g++ cmake clang libclang-dev llvm-dev \ - - sudo apt-get install libgstreamer1.0-dev \ - libgstreamer-plugins-base1.0-dev libgstreamer-plugins-bad1.0-dev \ - gstreamer1.0-plugins-base gstreamer1.0-plugins-good \ - gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly \ - gstreamer1.0-libav gstreamer1.0-tools \ - gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl \ - gstreamer1.0-gtk3 gstreamer1.0-qt5 gstreamer1.0-pulseaudio - # Install Rust if not already installed - if ! command -v rustc &> /dev/null; then - echo -e "${GREEN}Installing Rust...${NC}" - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y - source $HOME/.cargo/env - fi - - # Create configuration directory - mkdir -p "$CONFIG_DIR" + sudo apt-get install libgstreamer1.0-dev \ + libgstreamer-plugins-base1.0-dev libgstreamer-plugins-bad1.0-dev \ + gstreamer1.0-plugins-base gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly \ + gstreamer1.0-libav gstreamer1.0-tools \ + gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl \ + gstreamer1.0-gtk3 gstreamer1.0-qt5 gstreamer1.0-pulseaudio + elif [[ "$PLATFORM" == "Darwin" ]]; then + echo -e "${YELLOW}Detected macOS platform${NC}" + + # Check for Homebrew and install dependencies + if command -v brew &> /dev/null; then + echo -e "${GREEN}Installing macOS dependencies via Homebrew...${NC}" + brew install opencv gstreamer sqlite3 pkg-config + brew install gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly else - echo -e "${YELLOW}Not running on Raspberry Pi OS, skipping system dependencies${NC}" + echo -e "${RED}Homebrew not found. Please install it from https://brew.sh/${NC}" + echo -e "${YELLOW}Then install the required dependencies:${NC}" + echo "brew install opencv gstreamer sqlite3 pkg-config" + echo "brew install gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly" fi + else + echo -e "${YELLOW}Platform-specific setup not implemented for $PLATFORM${NC}" + echo -e "${YELLOW}You may need to manually install dependencies like OpenCV and GStreamer${NC}" fi + # Install Rust if not already installed + if ! command -v rustc &> /dev/null; then + echo -e "${GREEN}Installing Rust...${NC}" + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + source $HOME/.cargo/env + fi + + # Create configuration directory + mkdir -p "$CONFIG_DIR" + # Check for Rust installation if ! command -v rustc &> /dev/null; then echo -e "${RED}Rust is not installed. Please install Rust from https://rustup.rs/${NC}" @@ -165,4 +200,4 @@ case "$1" in ;; esac -exit 0 +exit 0 \ No newline at end of file diff --git a/config-example.toml b/config-example.toml index fb199cc..0a356e2 100644 --- a/config-example.toml +++ b/config-example.toml @@ -8,7 +8,9 @@ log_level = "info" # Camera settings [camera] -# Camera device path +# Camera device: +# - Linux: device path (e.g., "/dev/video0") +# - macOS: device index (e.g., "0") or identifier device = "/dev/video0" # Resolution (options: HD1080p, HD720p, VGA) resolution = "HD720p" diff --git a/src/camera/controller.rs b/src/camera/controller.rs index e1f0463..02b5f1f 100644 --- a/src/camera/controller.rs +++ b/src/camera/controller.rs @@ -9,7 +9,7 @@ use tokio::time; use crate::camera::frame_buffer::{Frame, FrameBuffer, SharedFrameBuffer}; use crate::camera::opencv::{OpenCVCamera, OpenCVCaptureStream}; -use crate::camera::{CameraSettings, MeteorEvent, Resolution, ExposureMode}; +use crate::camera::{CameraSettings, ExposureMode, MeteorEvent, Resolution}; /// Camera controller manages camera operations and frame capture pub struct CameraController { @@ -35,20 +35,19 @@ impl CameraController { /// Create a new camera controller with the given configuration pub async fn new(config: &crate::Config) -> Result { // Extract camera settings from config (placeholder for now) - let settings = CameraSettings::default(); - + let settings = config.clone().camera; + // Create frame buffer with capacity for 10 minutes of video at settings.fps let buffer_capacity = (10 * 60 * settings.fps) as usize; let frame_buffer = Arc::new(FrameBuffer::new(buffer_capacity)); - + // Create broadcast channel for frames let (frame_tx, _) = broadcast::channel(30); - + // Create events directory if it doesn't exist let events_dir = PathBuf::from("events"); - std::fs::create_dir_all(&events_dir) - .context("Failed to create events directory")?; - + std::fs::create_dir_all(&events_dir).context("Failed to create events directory")?; + Ok(Self { settings, camera: None, @@ -60,86 +59,96 @@ impl CameraController { events_dir, }) } - + /// Initialize the camera with current settings pub async fn initialize(&mut self) -> Result<()> { // Open the camera - let mut camera = OpenCVCamera::open(&self.settings.device) - .context("Failed to open camera")?; - + let mut camera = + OpenCVCamera::open(&self.settings.device).context("Failed to open camera")?; + // Configure camera parameters - camera.set_format(self.settings.resolution) + camera + .set_format(self.settings.resolution) .context("Failed to set camera format")?; - - camera.set_fps(self.settings.fps) + + camera + .set_fps(self.settings.fps) .context("Failed to set camera FPS")?; - - camera.set_exposure(self.settings.exposure) + + camera + .set_exposure(self.settings.exposure) .context("Failed to set camera exposure")?; - - camera.set_gain(self.settings.gain) + + camera + .set_gain(self.settings.gain) .context("Failed to set camera gain")?; - + if self.settings.focus_locked { - camera.lock_focus_at_infinity() + camera + .lock_focus_at_infinity() .context("Failed to lock focus at infinity")?; } - + self.camera = Some(camera); info!("Camera initialized successfully"); - + Ok(()) } - + /// Start camera capture in a background task pub async fn start_capture(&mut self) -> Result<()> { if self.is_running { warn!("Camera capture is already running"); return Ok(()); } - - let camera = self.camera.as_mut() + + let camera = self + .camera + .as_mut() .ok_or_else(|| anyhow::anyhow!("Camera not initialized"))?; - + // Start the camera streaming - let stream = camera.start_streaming() + let stream = camera + .start_streaming() .context("Failed to start camera streaming")?; - + self.stream = Some(stream); self.is_running = true; - + // Clone necessary values for the capture task let frame_buffer = self.frame_buffer.clone(); let frame_tx = self.frame_tx.clone(); let fps = self.settings.fps; - let mut stream = self.stream.take() + let mut stream = self + .stream + .take() .expect("Stream just initialized but is None"); let mut frame_count = self.frame_count; - + // Start capture task tokio::spawn(async move { let frame_interval = Duration::from_secs_f64(1.0 / fps as f64); let mut interval = time::interval(frame_interval); - + info!("Starting camera capture at {} fps", fps); - + loop { interval.tick().await; - + match stream.capture_frame() { Ok(mat) => { // Create a new frame with timestamp let frame = Frame::new(mat, Utc::now(), frame_count); frame_count += 1; - + // Add to frame buffer if let Err(e) = frame_buffer.push(frame.clone()) { error!("Failed to add frame to buffer: {}", e); } - + // Broadcast frame to listeners let _ = frame_tx.send(frame); - }, + } Err(e) => { error!("Failed to capture frame: {}", e); // Small delay to avoid tight error loop @@ -148,46 +157,46 @@ impl CameraController { } } }); - + info!("Camera capture started"); Ok(()) } - + /// Stop camera capture pub async fn stop_capture(&mut self) -> Result<()> { if !self.is_running { warn!("Camera capture is not running"); return Ok(()); } - + // The stream will be stopped when dropped self.stream = None; - + if let Some(camera) = &mut self.camera { camera.stop_streaming()?; } - + self.is_running = false; info!("Camera capture stopped"); - + Ok(()) } - + /// Get a subscriber to receive new frames pub fn subscribe_to_frames(&self) -> broadcast::Receiver { self.frame_tx.subscribe() } - + /// Get a clone of the frame buffer pub fn get_frame_buffer(&self) -> SharedFrameBuffer { self.frame_buffer.clone() } - + /// Check if the camera is currently running pub fn is_running(&self) -> bool { self.is_running } - + /// Update camera settings pub async fn update_settings(&mut self, new_settings: CameraSettings) -> Result<()> { // If camera is running, we need to stop it first @@ -195,10 +204,10 @@ impl CameraController { if was_running { self.stop_capture().await?; } - + // Update settings self.settings = new_settings; - + // Re-initialize camera with new settings if let Some(mut camera) = self.camera.take() { // Configure camera parameters @@ -206,25 +215,25 @@ impl CameraController { camera.set_fps(self.settings.fps)?; camera.set_exposure(self.settings.exposure)?; camera.set_gain(self.settings.gain)?; - + if self.settings.focus_locked { camera.lock_focus_at_infinity()?; } - + self.camera = Some(camera); } else { self.initialize().await?; } - + // Restart if it was running if was_running { self.start_capture().await?; } - + info!("Camera settings updated"); Ok(()) } - + /// Save a meteor event with video pub async fn save_meteor_event( &self, @@ -235,36 +244,33 @@ impl CameraController { seconds_after: i64, ) -> Result { // Extract frames from the buffer - let frames = self.frame_buffer.extract_event_frames( - timestamp, - seconds_before, - seconds_after, - ); - + let frames = + self.frame_buffer + .extract_event_frames(timestamp, seconds_before, seconds_after); + if frames.is_empty() { return Err(anyhow::anyhow!("No frames found for event")); } - + // Create a unique ID for the event let event_id = uuid::Uuid::new_v4(); - + // Create a directory for the event let event_dir = self.events_dir.join(event_id.to_string()); - std::fs::create_dir_all(&event_dir) - .context("Failed to create event directory")?; - + std::fs::create_dir_all(&event_dir).context("Failed to create event directory")?; + // Save frames to the event directory for (i, frame) in frames.iter().enumerate() { let frame_path = event_dir.join(format!("frame_{:04}.jpg", i)); frame.save_to_file(&frame_path)?; } - + // Create a video file name let video_path = event_dir.join("event.mp4").to_string_lossy().to_string(); - + // TODO: Call FFmpeg to convert frames to video // This would be done by spawning an external process - + // Create and return the event let event = MeteorEvent { id: event_id, @@ -273,8 +279,65 @@ impl CameraController { bounding_box, video_path, }; - + info!("Saved meteor event: {}", event_id); Ok(event) } + + /// Get the current status of the camera + pub async fn get_status(&self) -> Result { + let frame_buffer_stats = { + let buffer = self.frame_buffer.clone(); + let length = buffer.len(); + let capacity = buffer.capacity(); + + serde_json::json!({ + "length": length, + "capacity": capacity, + "utilization_percent": if capacity > 0 { (length as f64 / capacity as f64) * 100.0 } else { 0.0 } + }) + }; + + let recent_frame = { + // Get the most recent frame (index 0) + if let Some(frame) = self.frame_buffer.get(0) { + let timestamp = frame.timestamp.to_rfc3339(); + + serde_json::json!({ + "timestamp": timestamp, + "frame_number": frame.index + }) + } else { + serde_json::json!(null) + } + }; + + let camera_info = { + serde_json::json!({ + "device": self.settings.device, + "resolution": format!("{:?}", self.settings.resolution), + "fps": self.settings.fps, + "exposure_mode": format!("{:?}", self.settings.exposure), + "gain": self.settings.gain, + "focus_locked": self.settings.focus_locked + }) + }; + + let status = serde_json::json!({ + "running": self.is_running, + "frame_count": self.frame_count, + "settings": { + "device": self.settings.device, + "resolution": format!("{:?}", self.settings.resolution), + "fps": self.settings.fps + }, + "camera_info": camera_info, + "frame_buffer": frame_buffer_stats, + "recent_frame": recent_frame, + "events_dir": self.events_dir.to_string_lossy() + }); + + debug!("Camera status: {}", status); + Ok(status) + } } diff --git a/src/camera/mod.rs b/src/camera/mod.rs index 2902808..afffdf1 100644 --- a/src/camera/mod.rs +++ b/src/camera/mod.rs @@ -52,7 +52,9 @@ pub enum ExposureMode { /// Configuration parameters for the camera #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CameraSettings { - /// Camera device path (e.g., /dev/video0) + /// Camera device path or index + /// - Linux: device path (e.g., "/dev/video0") + /// - macOS: device index (e.g., "0") or identifier pub device: String, /// Resolution setting pub resolution: Resolution, @@ -68,8 +70,18 @@ pub struct CameraSettings { impl Default for CameraSettings { fn default() -> Self { + // Default device based on platform + #[cfg(target_os = "linux")] + let default_device = "/dev/video0".to_string(); + + #[cfg(target_os = "macos")] + let default_device = "0".to_string(); // Use device index 0 for macOS + + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + let default_device = "0".to_string(); // Fallback to index 0 for other platforms + Self { - device: "/dev/video0".to_string(), + device: default_device, resolution: Resolution::HD720p, fps: 30, exposure: ExposureMode::Auto, @@ -80,7 +92,7 @@ impl Default for CameraSettings { } /// Represents a captured meteor event with video and metadata -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MeteorEvent { /// Unique identifier for the event pub id: uuid::Uuid, diff --git a/src/camera/opencv.rs b/src/camera/opencv.rs index 2de790d..e30bc50 100644 --- a/src/camera/opencv.rs +++ b/src/camera/opencv.rs @@ -26,22 +26,7 @@ impl OpenCVCamera { let path_str = path.as_ref().to_str() .ok_or_else(|| anyhow!("Invalid path"))?; - let is_device_path = path_str.starts_with("/dev/"); - let mut capture = if is_device_path { - // For device files like /dev/video0, we need to extract the number - if let Some(num_str) = path_str.strip_prefix("/dev/video") { - if let Ok(device_index) = num_str.parse::() { - videoio::VideoCapture::new(device_index, videoio::CAP_ANY)? - } else { - return Err(anyhow!("Invalid device number in path: {}", path_str)); - } - } else { - return Err(anyhow!("Unsupported device path format: {}", path_str)); - } - } else { - // For other paths, try to open directly (e.g., video files, URLs) - videoio::VideoCapture::from_file(path_str, videoio::CAP_ANY)? - }; + let mut capture = Self::create_capture_from_path(path_str)?; if !capture.is_opened()? { return Err(anyhow!("Failed to open camera: {}", path_str)); @@ -65,6 +50,51 @@ impl OpenCVCamera { }) } + /// Create a VideoCapture instance from a path or device index + fn create_capture_from_path(path_str: &str) -> Result { + // Try to parse as integer index first + if let Ok(device_index) = path_str.parse::() { + return Ok(videoio::VideoCapture::new(device_index, videoio::CAP_ANY)?); + } + + // Handle platform-specific device paths + #[cfg(target_os = "linux")] + { + // For Linux device files like /dev/video0 + if let Some(num_str) = path_str.strip_prefix("/dev/video") { + if let Ok(device_index) = num_str.parse::() { + return Ok(videoio::VideoCapture::new(device_index, videoio::CAP_ANY)?); + } else { + return Err(anyhow!("Invalid device number in path: {}", path_str)); + } + } + } + + #[cfg(target_os = "macos")] + { + // macOS doesn't use /dev/video* paths, but it might have a special format + // If we get a path with "camera" or "facetime" or other macOS camera identifiers + if path_str.contains("camera") || path_str.contains("facetime") || path_str.contains("avfoundation") { + // For macOS, try to extract any numbers in the path + let nums: Vec<&str> = path_str.split(|c: char| !c.is_digit(10)) + .filter(|s| !s.is_empty()) + .collect(); + + if let Some(num_str) = nums.first() { + if let Ok(device_index) = num_str.parse::() { + return Ok(videoio::VideoCapture::new(device_index, videoio::CAP_AVFOUNDATION)?); + } + } + + // If we can't extract a number, try device 0 with AVFoundation + return Ok(videoio::VideoCapture::new(0, videoio::CAP_AVFOUNDATION)?); + } + } + + // For URLs, video files, or any other path type + Ok(videoio::VideoCapture::from_file(path_str, videoio::CAP_ANY)?) + } + /// Set the camera resolution and pixel format pub fn set_format(&mut self, resolution: Resolution) -> Result<()> { let (width, height) = resolution.dimensions(); @@ -165,26 +195,11 @@ impl OpenCVCamera { // Create a separate VideoCapture for the stream to avoid concurrent access issues let device = self.device.clone(); - let is_device_path = device.starts_with("/dev/"); + let mut stream_capture = Self::create_capture_from_path(&device)?; - let stream_capture = if is_device_path { - if let Some(num_str) = device.strip_prefix("/dev/video") { - if let Ok(device_index) = num_str.parse::() { - // Open with same settings - let mut cap = videoio::VideoCapture::new(device_index, videoio::CAP_ANY)?; - cap.set(videoio::CAP_PROP_FRAME_WIDTH, self.width as f64)?; - cap.set(videoio::CAP_PROP_FRAME_HEIGHT, self.height as f64)?; - cap - } else { - return Err(anyhow!("Invalid device number in path: {}", device)); - } - } else { - return Err(anyhow!("Unsupported device path format: {}", device)); - } - } else { - // For other paths, try to open directly - videoio::VideoCapture::from_file(&device, videoio::CAP_ANY)? - }; + // Apply the same settings + stream_capture.set(videoio::CAP_PROP_FRAME_WIDTH, self.width as f64)?; + stream_capture.set(videoio::CAP_PROP_FRAME_HEIGHT, self.height as f64)?; if !stream_capture.is_opened()? { return Err(anyhow!("Failed to open camera stream")); diff --git a/src/communication/mod.rs b/src/communication/mod.rs index b3f3976..cbe5f74 100644 --- a/src/communication/mod.rs +++ b/src/communication/mod.rs @@ -1,7 +1,8 @@ use anyhow::Result; -use log::{info, warn}; -use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; +use log::{info, warn, error}; +use tokio::sync::{Mutex, mpsc}; +use std::sync::Arc; +use futures::StreamExt; use crate::camera::MeteorEvent; use crate::config::Config; @@ -20,6 +21,8 @@ pub struct CommunicationManager { event_rx: Option>, /// Whether the manager is running is_running: Arc>, + /// Optional shutdown signal + shutdown_signal: Option>, } impl CommunicationManager { @@ -37,13 +40,14 @@ impl CommunicationManager { gps_controller, event_rx: None, is_running: Arc::new(Mutex::new(false)), + shutdown_signal: None, }) } /// Start the communication services - pub async fn run(&self) -> Result<()> { + pub async fn run(&mut self) -> Result<()> { { - let mut is_running = self.is_running.lock().unwrap(); + let mut is_running = self.is_running.lock().await; if *is_running { warn!("Communication manager is already running"); return Ok(()); @@ -53,11 +57,56 @@ impl CommunicationManager { info!("Starting communication manager"); - // In a real implementation, this would: - // 1. Connect to MQTT broker - // 2. Start HTTP API server - // 3. Subscribe to event channels - // 4. Process and forward events + // Create a shutdown channel + let (tx, mut rx) = mpsc::channel::<()>(1); + self.shutdown_signal = Some(tx); + + // Clone necessary references for our tasks + let is_running = Arc::clone(&self.is_running); + + // Check if we have an event receiver + if let Some(mut event_rx) = self.event_rx.take() { + // Spawn a task to process events + let event_task = { + let config = self.config.clone(); + tokio::spawn(async move { + info!("Started event processing task"); + + loop { + tokio::select! { + // Process incoming events + Some(event) = event_rx.recv() => { + info!("Received meteor event: {}", event.id); + // Process and forward the event (implementation depends on requirements) + if config.mqtt.enabled { + // Send to MQTT broker + if let Err(e) = Self::send_to_mqtt(&event).await { + error!("Failed to send event to MQTT: {}", e); + } + } + } + // Check for shutdown signal + _ = rx.recv() => { + info!("Shutting down event processing task"); + break; + } + } + } + + info!("Event processing task stopped"); + }) + }; + + // If HTTP API is enabled, start it + if self.config.http_api.enabled { + self.start_http_api().await?; + } + + // If MQTT is enabled, connect to broker + if self.config.mqtt.enabled { + self.connect_mqtt_broker().await?; + } + } Ok(()) } @@ -65,7 +114,7 @@ impl CommunicationManager { /// Stop the communication services pub async fn stop(&self) -> Result<()> { { - let mut is_running = self.is_running.lock().unwrap(); + let mut is_running = self.is_running.lock().await; if !*is_running { warn!("Communication manager is not running"); return Ok(()); @@ -74,6 +123,23 @@ impl CommunicationManager { } info!("Stopping communication manager"); + + // Send shutdown signal + if let Some(tx) = &self.shutdown_signal { + if let Err(e) = tx.send(()).await { + error!("Failed to send shutdown signal: {}", e); + } + } + + // Cleanup connections + if self.config.mqtt.enabled { + self.disconnect_mqtt_broker().await?; + } + + if self.config.http_api.enabled { + self.stop_http_api().await?; + } + Ok(()) } @@ -84,17 +150,120 @@ impl CommunicationManager { /// Send a status update pub async fn send_status_update(&self) -> Result<()> { - // This is a placeholder implementation - // In a real system, this would send system status via MQTT + // Get current camera status + let camera_status = { + let camera = self.camera_controller.lock().await; + camera.get_status().await? + }; + + // Get current GPS status + let gps_status = { + let gps = self.gps_controller.lock().await; + gps.get_current_position().await? + }; + + // Combine status data + let status = serde_json::json!({ + "device_id": self.config.device.id, + "timestamp": chrono::Utc::now().timestamp(), + "camera": camera_status, + "gps": gps_status, + "battery": 100, // Placeholder - would be from actual battery sensor + "disk_space": 1024, // Placeholder - would be from actual disk monitoring + }); + + // Send to MQTT if enabled + if self.config.mqtt.enabled { + Self::publish_mqtt(&self.config.mqtt.status_topic, &status.to_string()).await?; + } + info!("Sent status update"); Ok(()) } /// Send an event notification pub async fn send_event_notification(&self, event: &MeteorEvent) -> Result<()> { - // This is a placeholder implementation - // In a real system, this would send event data via MQTT + + // Get GPS location first + let location = { + let gps = self.gps_controller.lock().await; + gps.get_current_position().await? + }; + + // Add metadata to event + let enriched_event = serde_json::json!({ + "device_id": self.config.device.id, + "timestamp": chrono::Utc::now().timestamp(), + "event": event, + "location": location + }); + + // Send to MQTT if enabled + if self.config.mqtt.enabled { + Self::publish_mqtt(&self.config.mqtt.events_topic, &enriched_event.to_string()).await?; + } + info!("Sent notification for event {}", event.id); Ok(()) } + + // Helper methods for MQTT + + /// Connect to MQTT broker + async fn connect_mqtt_broker(&self) -> Result<()> { + info!("Connecting to MQTT broker at {}", self.config.mqtt.broker_url); + // In a real implementation, this would: + // - Connect to the broker + // - Set up subscriptions + // - Configure QoS and other parameters + Ok(()) + } + + /// Disconnect from MQTT broker + async fn disconnect_mqtt_broker(&self) -> Result<()> { + info!("Disconnecting from MQTT broker"); + // In a real implementation, this would: + // - Close the connection + // - Clean up resources + Ok(()) + } + + /// Send event to MQTT + async fn send_to_mqtt(event: &MeteorEvent) -> Result<()> { + // In a real implementation, this would: + // - Serialize the event + // - Publish to appropriate topic + info!("Published event {} to MQTT", event.id); + Ok(()) + } + + /// Publish message to MQTT topic + async fn publish_mqtt(topic: &str, message: &str) -> Result<()> { + // In a real implementation, this would: + // - Connect to the broker (or use existing connection) + // - Publish the message + info!("Published to topic: {}", topic); + Ok(()) + } + + // HTTP API methods + + /// Start HTTP API server + async fn start_http_api(&self) -> Result<()> { + info!("Starting HTTP API on port {}", self.config.http_api.port); + // In a real implementation, this would: + // - Set up routes + // - Start server + // - Configure middleware + Ok(()) + } + + /// Stop HTTP API server + async fn stop_http_api(&self) -> Result<()> { + info!("Stopping HTTP API server"); + // In a real implementation, this would: + // - Gracefully shutdown server + // - Close connections + Ok(()) + } } diff --git a/src/config.rs b/src/config.rs index 4e838bd..a15ae2c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -71,7 +71,100 @@ impl Default for DetectionConfig { } } -/// Configuration for communication +/// Configuration for MQTT communication +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MqttConfig { + /// Whether MQTT is enabled + pub enabled: bool, + /// MQTT broker URL + pub broker_url: String, + /// MQTT client ID + pub client_id: String, + /// MQTT username + pub username: Option, + /// MQTT password + pub password: Option, + /// Topic for event notifications + pub events_topic: String, + /// Topic for system status + pub status_topic: String, + /// Quality of Service level (0, 1, or 2) + pub qos: u8, + /// Whether to retain messages + pub retain: bool, + /// Keep-alive interval in seconds + pub keep_alive_secs: u16, + /// Whether to use SSL/TLS + pub use_tls: bool, + /// Path to CA certificate (if TLS enabled) + pub ca_cert_path: Option, + /// Path to client certificate (if TLS enabled) + pub client_cert_path: Option, + /// Path to client key (if TLS enabled) + pub client_key_path: Option, +} + +impl Default for MqttConfig { + fn default() -> Self { + Self { + enabled: false, + broker_url: "mqtt://localhost:1883".to_string(), + client_id: format!("meteor-detector-{}", uuid::Uuid::new_v4()), + username: None, + password: None, + events_topic: "meteor/events".to_string(), + status_topic: "meteor/status".to_string(), + qos: 1, + retain: false, + keep_alive_secs: 60, + use_tls: false, + ca_cert_path: None, + client_cert_path: None, + client_key_path: None, + } + } +} + +/// Configuration for HTTP API +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpApiConfig { + /// Whether HTTP API is enabled + pub enabled: bool, + /// HTTP API binding address + pub bind_address: String, + /// HTTP API port + pub port: u16, + /// Whether to enable SSL/TLS for HTTP API + pub use_tls: bool, + /// Path to SSL/TLS certificate (if TLS enabled) + pub cert_path: Option, + /// Path to SSL/TLS key (if TLS enabled) + pub key_path: Option, + /// Authentication token for API access (if None, no authentication) + pub auth_token: Option, + /// CORS allowed origins (empty means all origins) + pub cors_origins: Vec, + /// Rate limiting: requests per minute + pub rate_limit: Option, +} + +impl Default for HttpApiConfig { + fn default() -> Self { + Self { + enabled: false, + bind_address: "0.0.0.0".to_string(), + port: 8080, + use_tls: false, + cert_path: None, + key_path: None, + auth_token: None, + cors_origins: vec![], + rate_limit: Some(60), + } + } +} + +/// Configuration for communication (legacy structure - for backward compatibility) #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommunicationConfig { /// MQTT broker URL @@ -113,39 +206,97 @@ impl Default for CommunicationConfig { } } +/// Device identification configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeviceConfig { + /// Unique ID for this meteor detector + pub id: String, + /// Human-readable name for this device + pub name: String, + /// Device location description + pub location: Option, + /// Additional device metadata + pub metadata: std::collections::HashMap, +} + +impl Default for DeviceConfig { + fn default() -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + name: "Meteor Detector".to_string(), + location: None, + metadata: std::collections::HashMap::new(), + } + } +} + /// Main application configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { - /// Unique ID for this meteor detector - pub device_id: String, + /// Device identification + #[serde(default)] + pub device: DeviceConfig, + + /// Unique ID for this meteor detector (legacy - for backward compatibility) + #[serde(skip_serializing_if = "Option::is_none")] + pub device_id: Option, + /// Camera configuration pub camera: CameraSettings, + /// GPS configuration pub gps: GpsConfig, + /// Sensor configuration pub sensors: SensorConfig, + /// Storage configuration pub storage: StorageConfig, + /// Detection configuration pub detection: DetectionConfig, - /// Communication configuration - pub communication: CommunicationConfig, + + /// MQTT configuration + #[serde(default)] + pub mqtt: MqttConfig, + + /// HTTP API configuration + #[serde(default)] + pub http_api: HttpApiConfig, + + /// Communication configuration (legacy - for backward compatibility) + #[serde(skip_serializing_if = "Option::is_none")] + pub communication: Option, + /// Watermark configuration #[serde(default)] pub watermark: WatermarkOptions, + /// RTSP streaming configuration #[serde(default)] pub rtsp: RtspConfig, + /// Logging level pub log_level: String, } impl Default for Config { fn default() -> Self { + // Default device based on platform + #[cfg(target_os = "linux")] + let default_device = "/dev/video0".to_string(); + + #[cfg(target_os = "macos")] + let default_device = "0".to_string(); // Use device index 0 for macOS + + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + let default_device = "0".to_string(); // Fallback to index 0 for other platforms + Self { - device_id: uuid::Uuid::new_v4().to_string(), + device: DeviceConfig::default(), + device_id: None, camera: CameraSettings { - device: "/dev/video0".to_string(), + device: default_device, resolution: Resolution::HD720p, fps: 30, exposure: ExposureMode::Auto, @@ -156,7 +307,9 @@ impl Default for Config { sensors: SensorConfig::default(), storage: StorageConfig::default(), detection: DetectionConfig::default(), - communication: CommunicationConfig::default(), + mqtt: MqttConfig::default(), + http_api: HttpApiConfig::default(), + communication: None, watermark: WatermarkOptions::default(), rtsp: RtspConfig::default(), log_level: "info".to_string(), @@ -170,9 +323,34 @@ impl Config { let config_str = fs::read_to_string(&path) .context(format!("Failed to read config file {:?}", path.as_ref()))?; - let config: Config = toml::from_str(&config_str) + let mut config: Config = toml::from_str(&config_str) .context("Failed to parse config file")?; + // Migrate legacy device_id if needed + if config.device_id.is_some() && config.device.id == DeviceConfig::default().id { + config.device.id = config.device_id.clone().unwrap(); + } + + // Migrate legacy communication config if needed + if let Some(comm) = &config.communication { + // Only migrate if MQTT and HTTP API are using defaults + if !config.mqtt.enabled && !config.http_api.enabled { + config.mqtt.enabled = true; + config.mqtt.broker_url = comm.mqtt_broker.clone(); + config.mqtt.client_id = comm.mqtt_client_id.clone(); + config.mqtt.username = comm.mqtt_username.clone(); + config.mqtt.password = comm.mqtt_password.clone(); + config.mqtt.events_topic = comm.event_topic.clone(); + config.mqtt.status_topic = comm.status_topic.clone(); + + config.http_api.enabled = true; + config.http_api.port = comm.api_port; + config.http_api.use_tls = comm.api_use_ssl; + config.http_api.cert_path = comm.api_cert_path.clone(); + config.http_api.key_path = comm.api_key_path.clone(); + } + } + info!("Loaded configuration from {:?}", path.as_ref()); Ok(config) } diff --git a/src/detection/brightness_detector.rs b/src/detection/brightness_detector.rs index b56b7bc..3a28200 100644 --- a/src/detection/brightness_detector.rs +++ b/src/detection/brightness_detector.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use log::{debug, error, info, warn}; use opencv::{core, imgproc, prelude::*}; +use opencv::core::AlgorithmHint::ALGO_HINT_APPROX; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -87,7 +88,7 @@ impl BrightnessDetector { fn update_background(&mut self, frame: &core::Mat) -> Result<()> { // Convert frame to grayscale let mut gray = core::Mat::default(); - imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?; + imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?; match &mut self.background { Some(bg) => { // Gradually update background model (running average) @@ -111,7 +112,7 @@ impl BrightnessDetector { fn compute_difference(&mut self, frame: &core::Mat) -> Result { // Convert frame to grayscale let mut gray = core::Mat::default(); - imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?; + imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?; // Calculate absolute difference from background let mut diff = core::Mat::default(); @@ -160,7 +161,7 @@ impl BrightnessDetector { fn calculate_brightness(&self, frame: &core::Mat) -> Result { // Convert to grayscale let mut gray = core::Mat::default(); - imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?; + imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?; // Calculate mean brightness let mean = core::mean(&gray, &core::no_array())?; diff --git a/src/detection/cams_detector.rs b/src/detection/cams_detector.rs index 6d7f2c2..d2d0995 100644 --- a/src/detection/cams_detector.rs +++ b/src/detection/cams_detector.rs @@ -31,10 +31,14 @@ pub struct CamsDetectorParams { /// Prefix for saved files pub file_prefix: String, /// Unique ID for this detector instance - #[serde(default = "Uuid::new_v4")] + #[serde(default = "default_uuid_string")] pub id: String, } +fn default_uuid_string() -> String { + Uuid::new_v4().to_string() +} + impl Default for CamsDetectorParams { fn default() -> Self { Self { @@ -133,7 +137,7 @@ impl CamsDetector { // This could be optimized further for performance // Apply threshold to maxpixel image to find bright areas - let mut thresholded = core::Mat::default()?; + let mut thresholded = core::Mat::default(); opencv::imgproc::threshold( &features.maxpixel, &mut thresholded, @@ -144,12 +148,10 @@ impl CamsDetector { // Find contours in the thresholded image let mut contours = core::Vector::>::new(); - let mut hierarchy = core::Vector::::new(); - + imgproc::find_contours( &thresholded, &mut contours, - &mut hierarchy, imgproc::RETR_EXTERNAL, imgproc::CHAIN_APPROX_SIMPLE, core::Point::new(0, 0), @@ -166,15 +168,15 @@ impl CamsDetector { // Function to check if a pixel is likely part of a meteor let is_meteor_pixel = |x: i32, y: i32| -> Result { // Get values from feature images - let max_val = *features.maxpixel.at::(y, x)?.get(0).unwrap_or(&0); - let avg_val = *features.avepixel.at::(y, x)?.get(0).unwrap_or(&0); + let max_val = *features.maxpixel.at_2d::(y, x)?; + let avg_val = *features.avepixel.at_2d::(y, x)?; // Avoid division by zero if avg_val == 0 { return Ok(false); } - let std_val = *features.stdpixel.at::(y, x)?.get(0).unwrap_or(&0); + let std_val = *features.stdpixel.at_2d::(y, x)?; let std_to_avg_ratio = std_val as f32 / avg_val as f32; diff --git a/src/detection/feature_images.rs b/src/detection/feature_images.rs new file mode 100644 index 0000000..b77fd95 --- /dev/null +++ b/src/detection/feature_images.rs @@ -0,0 +1,236 @@ +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use log::{debug, info}; +use opencv::{core, imgcodecs, prelude::*}; +use std::path::Path; +use opencv::core::AlgorithmHint::ALGO_HINT_APPROX; + +/// CAMS FTP format feature images +/// Used for meteor detection analysis +#[derive(Debug, Clone)] +pub struct FeatureImages { + /// Maximum pixel value at each position + pub maxpixel: core::Mat, + /// Average pixel value (excluding maximum) + pub avepixel: core::Mat, + /// Standard deviation of pixel values + pub stdpixel: core::Mat, + /// Index of the frame with maximum pixel value + pub maxframe: core::Mat, + /// Start time of the stack + pub start_time: DateTime, + /// End time of the stack + pub end_time: DateTime, + /// Feature set ID + pub id: String, +} + +impl FeatureImages { + /// Create a new set of feature images + pub fn new( + maxpixel: core::Mat, + avepixel: core::Mat, + stdpixel: core::Mat, + maxframe: core::Mat, + start_time: DateTime, + end_time: DateTime, + id: String, + ) -> Self { + Self { + maxpixel, + avepixel, + stdpixel, + maxframe, + start_time, + end_time, + id, + } + } + + /// Create feature images from frame stacker output + pub fn from_stacked_frames(stacked: &crate::detection::StackedFrames) -> Self { + Self { + maxpixel: stacked.maxpixel.clone(), + avepixel: stacked.avepixel.clone(), + stdpixel: stacked.stdpixel.clone(), + maxframe: stacked.maxframe.clone(), + start_time: stacked.start_time, + end_time: stacked.end_time, + id: stacked.stack_id.clone(), + } + } + + /// Save feature images to files with the given base path + pub fn save_to_files(&self, base_path: &str) -> Result<()> { + // Create the parent directory if needed + if let Some(parent) = Path::new(base_path).parent() { + std::fs::create_dir_all(parent) + .context("Failed to create parent directory for feature images")?; + } + + // Save each feature image + let maxpixel_path = format!("{}_maxpixel.png", base_path); + let avepixel_path = format!("{}_avepixel.png", base_path); + let stdpixel_path = format!("{}_stdpixel.png", base_path); + let maxframe_path = format!("{}_maxframe.png", base_path); + + imgcodecs::imwrite(&maxpixel_path, &self.maxpixel, &core::Vector::new()) + .context("Failed to save maxpixel image")?; + imgcodecs::imwrite(&avepixel_path, &self.avepixel, &core::Vector::new()) + .context("Failed to save avepixel image")?; + imgcodecs::imwrite(&stdpixel_path, &self.stdpixel, &core::Vector::new()) + .context("Failed to save stdpixel image")?; + imgcodecs::imwrite(&maxframe_path, &self.maxframe, &core::Vector::new()) + .context("Failed to save maxframe image")?; + + debug!("Saved feature images to {}", base_path); + Ok(()) + } + + /// Load feature images from files + pub fn load_from_files(base_path: &str, id: &str) -> Result { + let maxpixel_path = format!("{}_maxpixel.png", base_path); + let avepixel_path = format!("{}_avepixel.png", base_path); + let stdpixel_path = format!("{}_stdpixel.png", base_path); + let maxframe_path = format!("{}_maxframe.png", base_path); + + let maxpixel = imgcodecs::imread(&maxpixel_path, imgcodecs::IMREAD_GRAYSCALE) + .context("Failed to load maxpixel image")?; + let avepixel = imgcodecs::imread(&avepixel_path, imgcodecs::IMREAD_GRAYSCALE) + .context("Failed to load avepixel image")?; + let stdpixel = imgcodecs::imread(&stdpixel_path, imgcodecs::IMREAD_GRAYSCALE) + .context("Failed to load stdpixel image")?; + let maxframe = imgcodecs::imread(&maxframe_path, imgcodecs::IMREAD_GRAYSCALE) + .context("Failed to load maxframe image")?; + + // Parse the timestamp from the file name if possible + let now = Utc::now(); + + Ok(Self { + maxpixel, + avepixel, + stdpixel, + maxframe, + start_time: now, // Default to current time + end_time: now, // Default to current time + id: id.to_string(), + }) + } + + /// Get the dimensions of the feature images + pub fn dimensions(&self) -> (i32, i32) { + (self.maxpixel.cols(), self.maxpixel.rows()) + } + + /// Check if all features have the same dimensions + pub fn validate_dimensions(&self) -> bool { + let (width, height) = self.dimensions(); + + self.avepixel.cols() == width && self.avepixel.rows() == height && + self.stdpixel.cols() == width && self.stdpixel.rows() == height && + self.maxframe.cols() == width && self.maxframe.rows() == height + } + + /// Create a debug visualization of the feature images + pub fn create_visualization(&self) -> Result { + // Create a 2x2 grid with all feature images + let (width, height) = self.dimensions(); + let grid_width = width * 2; + let grid_height = height * 2; + + // Create a Mat from zeros and explicitly convert to Mat + let zeros_expr = core::Mat::zeros(grid_height, grid_width, core::CV_8UC3)?; + let mut visualization = zeros_expr.to_mat()?; + + // Convert grayscale to color for display + let mut maxpixel_color = core::Mat::default(); + let mut avepixel_color = core::Mat::default(); + let mut stdpixel_color = core::Mat::default(); + let mut maxframe_color = core::Mat::default(); + + opencv::imgproc::cvt_color(&self.maxpixel, &mut maxpixel_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?; + opencv::imgproc::cvt_color(&self.avepixel, &mut avepixel_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?; + opencv::imgproc::cvt_color(&self.stdpixel, &mut stdpixel_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?; + opencv::imgproc::cvt_color(&self.maxframe, &mut maxframe_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?; + + // Create a region of interest for each quadrant + let roi_top_left = core::Rect::new(0, 0, width, height); + let roi_top_right = core::Rect::new(width, 0, width, height); + let roi_bottom_left = core::Rect::new(0, height, width, height); + let roi_bottom_right = core::Rect::new(width, height, width, height); + + // Copy each feature image to its position in the grid + let roi_rect = roi_top_left.clone(); + let mut roi = visualization.roi(roi_rect)?; + + maxpixel_color.copy_to(&mut roi.clone_pointee())?; + + let roi_rect = roi_top_right.clone(); + let mut roi = visualization.roi(roi_rect)?; + avepixel_color.copy_to(&mut roi.clone_pointee())?; + + let roi_rect = roi_bottom_left.clone(); + let mut roi = visualization.roi(roi_rect)?; + stdpixel_color.copy_to(&mut roi.clone_pointee())?; + + let roi_rect = roi_bottom_right.clone(); + let mut roi = visualization.roi(roi_rect)?; + maxframe_color.copy_to(&mut roi.clone_pointee())?; + + // Add labels + let font = opencv::imgproc::FONT_HERSHEY_SIMPLEX; + let font_scale = 0.8; + let color = core::Scalar::new(255.0, 255.0, 255.0, 0.0); + let thickness = 2; + + opencv::imgproc::put_text( + &mut visualization, + "MaxPixel", + core::Point::new(10, 30), + font, + font_scale, + color, + thickness, + opencv::imgproc::LINE_AA, + false, + )?; + + opencv::imgproc::put_text( + &mut visualization, + "AvePixel", + core::Point::new(width + 10, 30), + font, + font_scale, + color, + thickness, + opencv::imgproc::LINE_AA, + false, + )?; + + opencv::imgproc::put_text( + &mut visualization, + "StdPixel", + core::Point::new(10, height + 30), + font, + font_scale, + color, + thickness, + opencv::imgproc::LINE_AA, + false, + )?; + + opencv::imgproc::put_text( + &mut visualization, + "MaxFrame", + core::Point::new(width + 10, height + 30), + font, + font_scale, + color, + thickness, + opencv::imgproc::LINE_AA, + false, + )?; + + Ok(visualization) + } +} diff --git a/src/detection/frame_stacker.rs b/src/detection/frame_stacker.rs index 9f9224e..f88125e 100644 --- a/src/detection/frame_stacker.rs +++ b/src/detection/frame_stacker.rs @@ -11,8 +11,25 @@ use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::path::PathBuf; use std::sync::{Arc, Mutex}; - +use opencv::core::AlgorithmHint::ALGO_HINT_APPROX; use crate::camera::Frame; +use crate::detection::feature_images::FeatureImages; + +/// Thread-safe shared frame stacker +pub type SharedFrameStacker = Arc>; + +/// Create a new shared frame stacker with default configuration +pub fn new_shared_stacker() -> SharedFrameStacker { + let stacker = FrameStacker::new(FrameStackerConfig::default()) + .expect("Failed to create frame stacker"); + Arc::new(Mutex::new(stacker)) +} + +/// Create a new shared frame stacker with the given configuration +pub fn new_shared_stacker_with_config(config: FrameStackerConfig) -> Result { + let stacker = FrameStacker::new(config)?; + Ok(Arc::new(Mutex::new(stacker))) +} /// Configuration for the frame stacker #[derive(Debug, Clone, Serialize, Deserialize)] @@ -124,6 +141,80 @@ impl FrameStacker { self.state.lock().unwrap().clone() } + /// Add a frame to the stacker + /// Returns true if the batch is complete + pub fn push_frame(&mut self, frame: Frame) -> bool { + // Add frame to buffer + self.frame_buffer.push_back(frame.clone()); + + // Update state + { + let mut state = self.state.lock().unwrap(); + state.current_frame_count = self.frame_buffer.len(); + } + + // Set start time of stack if this is the first frame + if self.start_time.is_none() { + self.start_time = Some(frame.timestamp); + } + + // Return true if the batch is complete + self.frame_buffer.len() >= self.config.frames_per_stack + } + + /// Process the current batch of frames + /// Returns FeatureImages if processed, None if not enough frames + pub fn process_batch(&mut self) -> Result> { + if self.frame_buffer.is_empty() { + return Ok(None); + } + + if self.frame_buffer.len() < self.config.frames_per_stack { + debug!("Not enough frames to process batch: {} < {}", + self.frame_buffer.len(), self.config.frames_per_stack); + return Ok(None); + } + + debug!("Processing batch of {} frames", self.frame_buffer.len()); + + // Process the frames + self.processing = true; + let stacked_result = self.stack_frames()?; + self.processing = false; + + match stacked_result { + Some(stacked_frames) => { + // Convert StackedFrames to FeatureImages + let features = FeatureImages::new( + stacked_frames.maxpixel, + stacked_frames.avepixel, + stacked_frames.stdpixel, + stacked_frames.maxframe, + stacked_frames.start_time, + stacked_frames.end_time, + stacked_frames.stack_id, + ); + + // Reset for next batch + self.current_stack_id = format!("stack_{}", Utc::now().timestamp()); + self.start_time = None; + self.frame_buffer.clear(); + + // Update state + { + let mut state = self.state.lock().unwrap(); + state.stacks_processed += 1; + state.last_stack_time = Some(Utc::now()); + state.last_stack_id = Some(self.current_stack_id.clone()); + state.current_frame_count = 0; + } + + Ok(Some(features)) + }, + None => Ok(None), + } + } + /// Process a new frame pub fn process_frame(&mut self, frame: Frame) -> Result> { // Add frame to buffer @@ -197,7 +288,7 @@ impl FrameStacker { // Convert to grayscale if needed let gray_frame = if frame.mat.channels() != 1 { let mut gray = core::Mat::default(); - imgproc::cvt_color(&frame.mat, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?; + imgproc::cvt_color(&frame.mat, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?; gray } else { frame.mat.clone() @@ -273,7 +364,7 @@ impl FrameStacker { let mut stdpixel = core::Mat::zeros(height, width, core::CV_8UC1)?; for y in 0..height { for x in 0..width { - let maxpixel_mat = maxpixel.to_mat()?; // 在循环外一次性转换为 `Mat` + let maxpixel_mat = maxpixel.to_mat()?; let max_val = f64::from(*maxpixel_mat.at_2d::(y, x)?); let avgpixel_mat = avepixel.to_mat()?; let avg = f64::from(*avgpixel_mat.at_2d::(y, x)?); @@ -376,6 +467,11 @@ impl FrameStacker { self.frame_buffer.len() } + /// Get the batch size + pub fn batch_size(&self) -> usize { + self.config.frames_per_stack + } + /// Get the stack progress (0.0 - 1.0) pub fn progress(&self) -> f32 { self.frame_buffer.len() as f32 / self.config.frames_per_stack as f32 diff --git a/src/detection/mod.rs b/src/detection/mod.rs index bbef745..3001765 100644 --- a/src/detection/mod.rs +++ b/src/detection/mod.rs @@ -2,11 +2,13 @@ mod pipeline; mod frame_stacker; mod cams_detector; mod brightness_detector; +mod feature_images; pub use pipeline::{DetectionPipeline, PipelineConfig, AggregationStrategy}; -pub use frame_stacker::{FrameStacker, FrameStackerConfig, StackedFrames, FrameStackerState}; +pub use frame_stacker::{FrameStacker, FrameStackerConfig, StackedFrames, FrameStackerState, SharedFrameStacker, new_shared_stacker, new_shared_stacker_with_config}; pub use cams_detector::{CamsDetector, CamsDetectorParams}; pub use brightness_detector::{BrightnessDetector, BrightnessDetectorParams}; +pub use feature_images::FeatureImages; use anyhow::Result; use chrono::Utc; diff --git a/src/detection/pipeline.rs b/src/detection/pipeline.rs index 0b0a54a..9c118b1 100644 --- a/src/detection/pipeline.rs +++ b/src/detection/pipeline.rs @@ -68,8 +68,8 @@ pub struct DetectionPipeline { camera_controller: Arc>, /// Pipeline configuration config: PipelineConfig, - /// Tokio runtime for async tasks - runtime: Runtime, + // /// Tokio runtime for async tasks + // runtime: Runtime, /// Channel for sending detected events event_tx: mpsc::Sender, /// Channel for receiving events @@ -97,7 +97,7 @@ impl DetectionPipeline { sensitivity: config.detection.sensitivity, id: format!("brightness-default"), }; - + PipelineConfig { detectors: vec![DetectorConfig::Brightness(brightness_params)], max_parallel_workers: 4, @@ -105,39 +105,39 @@ impl DetectionPipeline { aggregation_strategy: AggregationStrategy::Any, } }); - + // Create detectors from config let mut detectors = Vec::with_capacity(pipeline_config.detectors.len()); for detector_config in &pipeline_config.detectors { let detector = DetectorFactory::create(detector_config); detectors.push(Arc::new(StdMutex::new(detector))); } - + // Create tokio runtime - let runtime = Runtime::new().context("Failed to create Tokio runtime")?; - + // let runtime = Runtime::new().context("Failed to create Tokio runtime")?; + // Create channel for event communication let (event_tx, event_rx) = mpsc::channel(32); - + Ok(Self { detectors, camera_controller, config: pipeline_config, - runtime, + // runtime, event_tx, event_rx, is_running: Arc::new(StdMutex::new(false)), frame_index: Arc::new(StdMutex::new(0)), }) } - + /// Add a detector to the pipeline pub fn add_detector(&mut self, config: DetectorConfig) -> Result<()> { let detector = DetectorFactory::create(&config); self.detectors.push(Arc::new(StdMutex::new(detector))); Ok(()) } - + /// Create from legacy config (for backward compatibility) pub fn from_legacy_config( camera_controller: Arc>, @@ -145,7 +145,7 @@ impl DetectionPipeline { ) -> Result { Self::new(camera_controller, config, None) } - + /// Process frame with all detectors in parallel async fn process_frame_parallel( &self, @@ -153,12 +153,12 @@ impl DetectionPipeline { frame_index: u64, ) -> Vec { let mut handles = Vec::with_capacity(self.detectors.len()); - + // Create a task for each detector for detector in &self.detectors { let frame = frame.clone(); let detector = detector.clone(); - + // Spawn a task for this detector let handle = tokio::spawn(async move { // We use std::sync::Mutex for detectors, so we use unwrap() not await @@ -166,36 +166,36 @@ impl DetectionPipeline { match detector.process_frame(&frame, frame_index) { Ok(result) => result, Err(e) => { - error!("Error processing frame with detector {}: {}", + error!("Error processing frame with detector {}: {}", detector.get_id(), e); DetectionResult::default() } } }); - + handles.push(handle); } - + // Wait for all detectors to complete let results = join_all(handles).await; - + // Unwrap results results.into_iter() .filter_map(|r| r.ok()) .collect() } - + /// Aggregate results from multiple detectors fn aggregate_results(&self, results: Vec) -> Option { // If no results, return None if results.is_empty() { return None; } - + // Count detections let total = results.len(); let detected = results.iter().filter(|r| r.detected).count(); - + // Check if we have a detection based on aggregation strategy let detection_triggered = match self.config.aggregation_strategy { AggregationStrategy::Any => detected > 0, @@ -205,19 +205,19 @@ impl DetectionPipeline { (detected as f32 / total as f32) >= threshold }, }; - + if !detection_triggered { return None; } - + // Find the result with the highest confidence let best_result = results.into_iter() .filter(|r| r.detected) .max_by(|a, b| a.confidence.partial_cmp(&b.confidence).unwrap()); - + best_result } - + /// Start the detection pipeline pub async fn run(&self) -> Result<()> { { @@ -228,9 +228,9 @@ impl DetectionPipeline { } *is_running = true; } - + info!("Starting meteor detection pipeline with {} detectors", self.detectors.len()); - + // Start background processing task let camera_controller = self.camera_controller.clone(); let detectors = self.detectors.clone(); @@ -238,12 +238,13 @@ impl DetectionPipeline { let buffer_seconds = self.config.event_buffer_seconds as i64; let is_running = self.is_running.clone(); let frame_index = self.frame_index.clone(); - + // Process task in a separate thread that doesn't require Send std::thread::spawn(move || { + // Create a runtime for this dedicated thread let rt = Runtime::new().expect("Failed to create tokio runtime for detection thread"); - + // Run the async logic in this thread rt.block_on(async { // Get frame receiver @@ -251,13 +252,13 @@ impl DetectionPipeline { let camera = camera_controller.lock().await; camera.subscribe_to_frames() }; - + // Get frame buffer reference let frame_buffer = { let camera = camera_controller.lock().await; camera.get_frame_buffer() }; - + while { let is_running = is_running.lock().unwrap(); *is_running @@ -271,19 +272,19 @@ impl DetectionPipeline { *idx += 1; *idx }; - + // Process frame with all detectors in parallel let results = Self::process_frame_parallel_static( &detectors, &frame.mat, current_frame_index).await; - + // Aggregate results if let Some(result) = Self::aggregate_results_static(&results) { // Handle detection if result.detected { debug!("Meteor detected: confidence={:.2}, bbox={:?}, detector={}", - result.confidence, result.bounding_box, + result.confidence, result.bounding_box, result.detector_id.as_deref().unwrap_or("unknown")); - + // Save event if let Some(bbox) = result.bounding_box { // Since we're in a dedicated thread, we can safely hold the lock @@ -311,7 +312,7 @@ impl DetectionPipeline { }, Ok(Err(e)) => { error!("Error receiving frame: {}", e); - + // Small delay to avoid tight error loop time::sleep(Duration::from_millis(100)).await; }, @@ -321,15 +322,15 @@ impl DetectionPipeline { } } } - + info!("Detection pipeline stopped"); }); - + }); - + Ok(()) } - + /// Static version of process_frame_parallel for use in async tasks async fn process_frame_parallel_static( detectors: &Vec>>>, @@ -337,12 +338,12 @@ impl DetectionPipeline { frame_index: u64, ) -> Vec { let mut handles = Vec::with_capacity(detectors.len()); - + // Create a task for each detector for detector in detectors { let frame = frame.clone(); let detector = detector.clone(); - + // Spawn a task for this detector let handle = tokio::spawn(async move { // Use a non-blocking operation to acquire the lock @@ -356,10 +357,10 @@ impl DetectionPipeline { return DetectionResult::default(); } }; - + detector.process_frame(&frame, frame_index) }; - + match detector_result { Ok(result) => result, Err(e) => { @@ -368,40 +369,40 @@ impl DetectionPipeline { } } }); - + handles.push(handle); } - + // Wait for all detectors to complete let results = join_all(handles).await; - + // Unwrap results results.into_iter() .filter_map(|r| r.ok()) .collect() } - + /// Static version of aggregate_results for use in async tasks fn aggregate_results_static(results: &Vec) -> Option { // If no results, return None if results.is_empty() { return None; } - + // Check if any detector reported a detection let any_detected = results.iter().any(|r| r.detected); - + if !any_detected { return None; } - + // Find the result with the highest confidence results.iter() .filter(|r| r.detected) .max_by(|a, b| a.confidence.partial_cmp(&b.confidence).unwrap()) .cloned() } - + /// Stop the detection pipeline pub async fn stop(&self) -> Result<()> { { @@ -412,29 +413,29 @@ impl DetectionPipeline { } *is_running = false; } - + info!("Stopping detection pipeline"); Ok(()) } - + /// Reset all detectors pub fn reset(&self) -> Result<()> { for detector in &self.detectors { detector.lock().unwrap().reset(); } - + // Reset frame index let mut frame_index = self.frame_index.lock().unwrap(); *frame_index = 0; - + Ok(()) } - + /// Get the number of detectors pub fn detector_count(&self) -> usize { self.detectors.len() } - + /// Get detector configurations pub fn get_detector_configs(&self) -> Vec { self.detectors.iter() diff --git a/src/gps/controller.rs b/src/gps/controller.rs index fa7918c..9232428 100644 --- a/src/gps/controller.rs +++ b/src/gps/controller.rs @@ -1,7 +1,6 @@ use anyhow::{anyhow, Context, Result}; use chrono::{DateTime, Duration, Utc}; use log::{debug, error, info, warn}; -use rppal::gpio::{Gpio, InputPin, Trigger}; use serialport::{SerialPort}; use std::io::{BufRead, BufReader}; use std::path::Path; @@ -11,6 +10,14 @@ use std::time; use serialport::DataBits::Eight; use tokio::sync::broadcast; +// Import GPIO only on Linux +#[cfg(target_os = "linux")] +use rppal::gpio::{Gpio, InputPin, Trigger}; + +// Import rand for non-Linux platforms for simulating data +#[cfg(not(target_os = "linux"))] +use rand::Rng; + use crate::gps::nmea::{parse_nmea_sentence, NmeaPosition}; use crate::gps::{GeoPosition, GpsConfig, GpsStatus, SyncStatus, CameraOrientation}; @@ -44,8 +51,12 @@ pub struct GpsController { config: GpsConfig, /// Serial port for GPS communication port: Option>, - /// GPIO pin for PPS signal + /// GPIO pin for PPS signal (Linux only) + #[cfg(target_os = "linux")] pps_pin: Option, + /// Placeholder for non-Linux platforms + #[cfg(not(target_os = "linux"))] + pps_pin: Option<()>, /// Last known position position: Arc>, /// Current synchronization status @@ -97,6 +108,24 @@ impl GpsController { /// Initialize the GPS module pub async fn initialize(&mut self) -> Result<()> { + // Check if we're on a non-Linux platform + #[cfg(not(target_os = "linux"))] + { + info!("GPS module running on non-Linux platform. Using fallback position."); + self.gps_state.degraded = true; + + // Set fallback position + let mut pos = self.position.lock().unwrap(); + *pos = self.config.fallback_position; + + // Update satellites with simulated value + let mut sats = self.satellites.lock().unwrap(); + *sats = 8; // Simulate 8 satellites for non-Linux platforms + + return Ok(()); + } + + // Continue with normal initialization for Linux // Check if GPS is enabled in config if !self.config.enable_gps { info!("GPS module disabled in configuration. Using fallback position."); @@ -112,7 +141,6 @@ impl GpsController { let mut init_failed = false; // Open the serial port - let port = serialport::new(&self.config.port, self.config.baud_rate) .data_bits(Eight) .flow_control(serialport::FlowControl::None) @@ -143,6 +171,8 @@ impl GpsController { } // Initialize PPS pin if enabled and GPS initialized successfully + // This is only compiled on Linux + #[cfg(target_os = "linux")] if self.config.use_pps && !self.gps_state.degraded { match Gpio::new() { Ok(gpio) => { @@ -195,6 +225,71 @@ impl GpsController { /// Start GPS processing pub async fn start(&self) -> Result<()> { + // Handle simulated mode for non-Linux platforms + #[cfg(not(target_os = "linux"))] + { + info!("Starting GPS in simulated mode on non-Linux platform"); + + // Set fallback position and status + { + let mut pos = self.position.lock().unwrap(); + *pos = self.config.fallback_position; + + let mut status = self.sync_status.lock().unwrap(); + *status = SyncStatus::GpsOnly; // Simulate GPS sync + + // Send an initial position update with fallback + let _ = self.position_tx.send(self.config.fallback_position); + } + + { + let mut is_running = self.is_running.lock().unwrap(); + *is_running = true; + } + + // Start a simulation thread for periodic updates + let position_tx = self.position_tx.clone(); + let position = self.position.clone(); + let is_running = self.is_running.clone(); + let fallback_position = self.config.fallback_position; + + thread::spawn(move || { + info!("Starting GPS simulation thread"); + + while { + let is_running = is_running.lock().unwrap(); + *is_running + } { + // Sleep for a bit to simulate update interval + thread::sleep(time::Duration::from_secs(1)); + + // Generate small random variations to the position + let mut rng = rand::thread_rng(); + let lat_variation = (rng.gen::() - 0.5) * 0.0001; // ~10m variation + let lon_variation = (rng.gen::() - 0.5) * 0.0001; + + let new_position = GeoPosition { + latitude: fallback_position.latitude + lat_variation, + longitude: fallback_position.longitude + lon_variation, + altitude: fallback_position.altitude + (rng.gen::() - 0.5) * 2.0, + }; + + // Update position + { + let mut pos = position.lock().unwrap(); + *pos = new_position; + + // Send position update + let _ = position_tx.send(new_position); + } + } + + info!("GPS simulation thread stopped"); + }); + + return Ok(()); + } + // Handle degraded mode if self.gps_state.degraded { info!("Starting GPS in degraded mode (using fallback position)"); @@ -245,13 +340,14 @@ impl GpsController { // Clone fallback position in case we need it later let fallback_position = self.config.fallback_position; let allow_degraded = self.config.allow_degraded_mode; + let baud_rate = self.config.baud_rate; // Create a separate thread for GPS processing (blocking I/O) thread::spawn(move || { info!("Starting GPS processing on port {}", port_name); // Open the serial port - let port = serialport::new(&port_name, self.config.baud_rate) + let port = serialport::new(&port_name, baud_rate) .data_bits(Eight) .flow_control(serialport::FlowControl::None) .parity(serialport::Parity::None) @@ -465,6 +561,66 @@ impl GpsController { pub fn get_camera_orientation(&self) -> CameraOrientation { self.config.camera_orientation } + + /// Get current position with complete status information + pub async fn get_current_position(&self) -> Result { + // Get the basic GPS status + let status = self.get_status(); + + // Get additional system time information + let system_time = Utc::now(); + let precise_time = self.get_precise_time(); + let time_diff_ms = (precise_time - system_time).num_milliseconds().abs() as f64; + + // Calculate additional derived values + let fix_age_seconds = if let Some(last_update) = *self.last_update.lock().unwrap() { + (Utc::now() - last_update).num_seconds() + } else { + -1 // No fix yet + }; + + // Determine if the position is from fallback + let is_fallback = self.gps_state.degraded || + !self.has_fix() || + fix_age_seconds > 30; // Stale fix + + // Construct a detailed status object + let position_status = serde_json::json!({ + // Basic position data + "position": { + "latitude": status.position.latitude, + "longitude": status.position.longitude, + "altitude": status.position.altitude, + "is_fallback": is_fallback, + "satellites": status.satellites, + }, + + // Timing information + "timing": { + "timestamp": precise_time.to_rfc3339(), + "sync_status": status.sync_status, + "time_accuracy_ms": status.time_accuracy_ms, + "system_time_offset_ms": time_diff_ms, + "fix_age_seconds": fix_age_seconds, + }, + + // Camera orientation + "orientation": { + "azimuth": status.camera_orientation.azimuth, + "elevation": status.camera_orientation.elevation, + }, + + // System status + "system": { + "hardware_initialized": self.gps_state.initialized, + "running": *self.is_running.lock().unwrap(), + "degraded_mode": self.gps_state.degraded, + "has_fix": self.has_fix(), + } + }); + + Ok(position_status) + } } impl Drop for GpsController { diff --git a/src/main.rs b/src/main.rs index e9a7878..805a8df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,10 @@ +// Platform detection +#[cfg(target_os = "linux")] +pub const PLATFORM_SUPPORTS_GPIO: bool = true; + +#[cfg(not(target_os = "linux"))] +pub const PLATFORM_SUPPORTS_GPIO: bool = false; + mod camera; mod config; mod detection; @@ -11,32 +18,57 @@ mod communication; mod monitoring; use anyhow::{Context, Result}; -use log::{info, error}; +use log::{info, error, warn}; use std::path::PathBuf; use std::sync::{Arc,Mutex as StdMutex}; -use tokio::sync::Mutex; +use gstreamer_rtsp_server::prelude::SettingsExt; +use tokio::sync::{Mutex, MutexGuard}; pub use config::Config; +use crate::overlay::Watermark; /// Main entry point for the meteor detection system #[tokio::main] async fn main() -> Result<()> { - // Initialize logger with default settings - env_logger::init_from_env( + // Create a logger builder with default settings + let mut builder = env_logger::Builder::from_env( env_logger::Env::default().filter_or("RUST_LOG", "info") ); - - info!("Meteor detection system starting up"); - + // Load configuration let config = config::load_config() .context("Failed to load configuration")?; - - // Re-initialize logger with configured log level + + // Update the logger with configured log level std::env::set_var("RUST_LOG", &config.log_level); - env_logger::builder().init(); + builder.parse_env(env_logger::Env::default()); + builder.init(); + + info!("Meteor detection system starting up"); + + // // Initialize logger with default settings + // env_logger::init_from_env( + // env_logger::Env::default().filter_or("RUST_LOG", "info") + // ); + // + // info!("Meteor detection system starting up"); - info!("Loaded configuration with device ID: {}", config.device_id); + // Check if we're running on a platform that supports GPIO + if PLATFORM_SUPPORTS_GPIO { + info!("Running on a Linux platform with GPIO support"); + } else { + info!("Running on a platform without GPIO support (macOS/Windows/etc). Using simulated sensors."); + } + + // // Load configuration + // let config = config::load_config() + // .context("Failed to load configuration")?; + // + // // Re-initialize logger with configured log level + // std::env::set_var("RUST_LOG", &config.log_level); + // env_logger::builder().init(); + // + info!("Loaded configuration with device ID: {}", config.device.id); // Initialize camera subsystem let camera_controller = camera::CameraController::new(&config) @@ -80,17 +112,17 @@ async fn main() -> Result<()> { // Initialize detection pipeline let detection_pipeline = detection::DetectionPipeline::new( camera_controller.clone(), - &config, - config.detection.pipeline + &config.clone(), + config.detection.pipeline.clone() ).context("Failed to initialize detection pipeline")?; // Initialize storage system - let storage_manager = storage::StorageManager::new(&config) + let storage_manager = storage::StorageManager::new(&(config.clone())) .await .context("Failed to initialize storage")?; // Initialize communication module - let comms = communication::CommunicationManager::new( + let mut comms = communication::CommunicationManager::new( &config, camera_controller.clone(), gps_controller.clone(), @@ -129,15 +161,37 @@ async fn main() -> Result<()> { // Add watermark hook { let mut manager = hook_manager.lock().await; + let watermark_clone = watermark.clone(); manager.register_hook(Box::new(hooks::BasicFrameHook::new( "watermark", "Watermark Overlay", "Adds timestamp, GPS, and sensor data overlay to frames", config.watermark.enabled, move |frame, timestamp| { - let mut watermark_instance = watermark.lock().unwrap(); - watermark_instance.apply(frame, timestamp)?; - Ok(()) + // Using try_lock to avoid the async issue in a sync context + tokio::task::block_in_place(|| { + let mut guard = futures::executor::block_on(watermark_clone.lock()); + guard.apply(frame, timestamp) + }) + // tokio::task::block_in_place(|| { + // match watermark_clone.try_lock() { + // Ok(mut guard) => guard.apply(frame, timestamp), + // Err(e) => Err(anyhow::anyhow!("Failed to lock watermark mutex: {}", e)) + // } + // }) + // tokio::task::block_in_place(|| { + // match futures::executor::block_on(watermark_clone.lock()) { + // Ok(mut guard) => guard.apply(frame, timestamp), + // Err(e) => Err(anyhow::anyhow!("Failed to lock watermark mutex: {}", e)) + // } + // }) + // tokio::task::block_in_place(|| { + // let mutex_guard = futures::executor::block_on(watermark_clone.lock()); + // match mutex_guard { + // Ok(mut guard) => guard.apply(frame, timestamp), + // Err(e) => Err(anyhow::anyhow!("Failed to lock watermark mutex: {}", e)) + // } + // }) }, ))); } diff --git a/src/sensors/controller.rs b/src/sensors/controller.rs index f1a5a8c..cea93c1 100644 --- a/src/sensors/controller.rs +++ b/src/sensors/controller.rs @@ -8,6 +8,7 @@ use tokio::sync::broadcast; use tokio::time; use crate::sensors::dht22::Dht22Sensor; +use crate::PLATFORM_SUPPORTS_GPIO; use crate::sensors::{EnvironmentData, LightSensor, SensorConfig, TemperatureHumiditySensor}; /// A simple light sensor implementation that uses camera brightness as a proxy @@ -120,21 +121,47 @@ impl SensorController { if self.config.use_dht22 { info!("Initializing DHT22 temperature/humidity sensor"); - match Dht22Sensor::new(self.config.dht22_pin) { - Ok(dht22) => { - self.temp_humidity_sensor = Some(Box::new(dht22)); - self.temp_sensor_state.initialized = true; - info!("DHT22 temperature/humidity sensor initialized successfully"); - }, - Err(e) => { - self.temp_sensor_state.init_failures += 1; - self.temp_sensor_state.degraded = true; - - if self.config.allow_degraded_mode { - warn!("Failed to initialize DHT22 sensor: {}. Using fallback values.", e); - init_failed = true; - } else { - return Err(anyhow!("Failed to initialize temperature sensor and degraded mode is not allowed: {}", e)); + #[cfg(feature = "gpio")] + { + match Dht22Sensor::new(self.config.dht22_pin) { + Ok(dht22) => { + self.temp_humidity_sensor = Some(Box::new(dht22)); + self.temp_sensor_state.initialized = true; + info!("DHT22 temperature/humidity sensor initialized successfully"); + }, + Err(e) => { + self.temp_sensor_state.init_failures += 1; + self.temp_sensor_state.degraded = true; + + if self.config.allow_degraded_mode { + warn!("Failed to initialize DHT22 sensor: {}. Using fallback values.", e); + init_failed = true; + } else { + return Err(anyhow!("Failed to initialize temperature sensor and degraded mode is not allowed: {}", e)); + } + } + } + } + + #[cfg(not(feature = "gpio"))] + { + // On platforms without GPIO, use simulated sensor + match Dht22Sensor::new(self.config.dht22_pin) { + Ok(dht22) => { + self.temp_humidity_sensor = Some(Box::new(dht22)); + self.temp_sensor_state.initialized = true; + info!("DHT22 temperature/humidity sensor initialized in simulation mode"); + }, + Err(e) => { + self.temp_sensor_state.init_failures += 1; + self.temp_sensor_state.degraded = true; + + if self.config.allow_degraded_mode { + warn!("Failed to initialize DHT22 sensor simulation: {}. Using fallback values.", e); + init_failed = true; + } else { + return Err(anyhow!("Failed to initialize temperature sensor simulation and degraded mode is not allowed: {}", e)); + } } } } @@ -344,14 +371,20 @@ impl SensorController { camera_sensor.update_brightness(brightness); // Update the current data - let mut data = self.current_data.lock()?; - data.sky_brightness = brightness; - data.timestamp = Utc::now(); - - // Broadcast update - let _ = self.data_tx.send(data.clone()); - - return Ok(()); + match self.current_data.lock() { + Ok(mut data) => { + data.sky_brightness = brightness; + data.timestamp = Utc::now(); + + // Broadcast update + let _ = self.data_tx.send(data.clone()); + + return Ok(()); + }, + Err(e) => { + return Err(anyhow!("Failed to lock current_data: {}", e)); + } + } } } diff --git a/src/sensors/dht22.rs b/src/sensors/dht22.rs index e7f3ada..6bed672 100644 --- a/src/sensors/dht22.rs +++ b/src/sensors/dht22.rs @@ -1,8 +1,14 @@ use anyhow::{anyhow, Context, Result}; use log::{debug, error, warn}; -use rppal::gpio::{Gpio, Level, Mode, OutputPin}; use std::thread; use std::time::{Duration, Instant}; +use std::sync::Mutex; + +#[cfg(feature = "gpio")] +use rppal::gpio::{Gpio, Level, Mode, OutputPin}; + +#[cfg(not(feature = "gpio"))] +use rand::Rng; use crate::sensors::TemperatureHumiditySensor; @@ -10,6 +16,12 @@ use crate::sensors::TemperatureHumiditySensor; pub struct Dht22Sensor { /// GPIO pin number (BCM) pin: u8, + /// Mutable state protected by a mutex + state: Mutex, +} + +/// Mutable state for the sensor +struct SensorState { /// Last temperature reading last_temperature: f32, /// Last humidity reading @@ -23,16 +35,21 @@ impl Dht22Sensor { pub fn new(pin: u8) -> Result { Ok(Self { pin, - last_temperature: 0.0, - last_humidity: 0.0, - last_reading: Instant::now() - Duration::from_secs(10), + state: Mutex::new(SensorState { + last_temperature: 0.0, + last_humidity: 0.0, + last_reading: Instant::now() - Duration::from_secs(10), + }), }) } /// Read raw data from DHT22 sensor - fn read_raw(&mut self) -> Result<(f32, f32)> { + #[cfg(feature = "gpio")] + fn read_raw(&self) -> Result<(f32, f32)> { + let mut state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?; + // Ensure we don't read too frequently (DHT22 needs 2+ seconds between readings) - let elapsed = self.last_reading.elapsed(); + let elapsed = state.last_reading.elapsed(); if elapsed < Duration::from_secs(2) { thread::sleep(Duration::from_secs(2) - elapsed); } @@ -44,7 +61,7 @@ impl Dht22Sensor { .context(format!("Failed to access GPIO pin {}", self.pin))?; // Send start signal - let mut pin = pin.into_output_low(); + let mut pin = pin.into_output_low(); pin.set_low(); thread::sleep(Duration::from_millis(20)); // At least 18ms for DHT22 pin.set_high(); @@ -113,32 +130,67 @@ impl Dht22Sensor { }; // Store readings - self.last_temperature = temperature; - self.last_humidity = humidity; - self.last_reading = Instant::now(); + state.last_temperature = temperature; + state.last_humidity = humidity; + state.last_reading = Instant::now(); debug!("DHT22 read: temperature={:.1}°C, humidity={:.1}%", temperature, humidity); Ok((temperature, humidity)) } + + /// Simulated read for platforms without GPIO support + #[cfg(not(feature = "gpio"))] + fn read_raw(&self) -> Result<(f32, f32)> { + let mut state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?; + + // Ensure we don't read too frequently (simulate real sensor behavior) + let elapsed = state.last_reading.elapsed(); + if elapsed < Duration::from_secs(2) { + thread::sleep(Duration::from_secs(2) - elapsed); + } + + // Use fixed values when GPIO is not available + let mut rng = rand::thread_rng(); + let temperature = 25.0 + (rng.gen::() * 2.0 - 1.0); // Default 25°C ± 1°C + let humidity = 50.0 + (rng.gen::() * 5.0 - 2.5); // Default 50% ± 2.5% + + // Store readings + state.last_temperature = temperature; + state.last_humidity = humidity; + state.last_reading = Instant::now(); + + debug!("DHT22 simulation: temperature={:.1}°C, humidity={:.1}%", temperature, humidity); + + Ok((temperature, humidity)) + } } impl TemperatureHumiditySensor for Dht22Sensor { fn read_temperature(&self) -> Result { + // Lock state to check cached values + let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?; + // If we've read recently, return cached value - if self.last_reading.elapsed() < Duration::from_secs(2) { - return Ok(self.last_temperature); + if state.last_reading.elapsed() < Duration::from_secs(2) { + return Ok(state.last_temperature); } - + + // Release the lock before trying to read new values to avoid deadlock + drop(state); + // Try to read new values match self.read_raw() { Ok((temp, _)) => Ok(temp), Err(e) => { error!("Failed to read DHT22 temperature: {}", e); - if self.last_reading.elapsed() < Duration::from_secs(60) { + + // Re-acquire the lock to check if we can use cached value + let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?; + if state.last_reading.elapsed() < Duration::from_secs(60) { // If last reading was recent, return cached value warn!("Using cached temperature value"); - Ok(self.last_temperature) + Ok(state.last_temperature) } else { Err(e) } @@ -147,20 +199,29 @@ impl TemperatureHumiditySensor for Dht22Sensor { } fn read_humidity(&self) -> Result { + // Lock state to check cached values + let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?; + // If we've read recently, return cached value - if self.last_reading.elapsed() < Duration::from_secs(2) { - return Ok(self.last_humidity); + if state.last_reading.elapsed() < Duration::from_secs(2) { + return Ok(state.last_humidity); } + // Release the lock before trying to read new values to avoid deadlock + drop(state); + // Try to read new values match self.read_raw() { Ok((_, humidity)) => Ok(humidity), Err(e) => { error!("Failed to read DHT22 humidity: {}", e); - if self.last_reading.elapsed() < Duration::from_secs(60) { + + // Re-acquire the lock to check if we can use cached value + let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?; + if state.last_reading.elapsed() < Duration::from_secs(60) { // If last reading was recent, return cached value warn!("Using cached humidity value"); - Ok(self.last_humidity) + Ok(state.last_humidity) } else { Err(e) } diff --git a/src/streaming/rtsp.rs b/src/streaming/rtsp.rs index 8c3c591..94e92b8 100644 --- a/src/streaming/rtsp.rs +++ b/src/streaming/rtsp.rs @@ -225,12 +225,16 @@ impl RtspServer { Ok(()) }, Err(e) => { - if e.is_full() { - // Channel is full, drop frame - debug!("RTSP frame channel full, dropping frame"); - Ok(()) - } else { - Err(anyhow!("Failed to send frame to RTSP server: {}", e)) + // Check which kind of error it is + match e { + mpsc::error::TrySendError::Full(_) => { + // Channel is full, drop frame + debug!("RTSP frame channel full, dropping frame"); + Ok(()) + }, + mpsc::error::TrySendError::Closed(_) => { + Err(anyhow!("Failed to send frame to RTSP server: channel closed")) + } } } }