diff --git a/src/app.rs b/src/app.rs new file mode 100644 index 0000000..bc63735 --- /dev/null +++ b/src/app.rs @@ -0,0 +1,365 @@ +//! Application orchestrator and dependency manager +//! +//! This module provides a centralized Application struct that handles +//! initialization, lifecycle management, and component wiring. + +use anyhow::{Context, Result}; +use chrono::Utc; +use log::{info, error, warn, debug}; +use std::sync::{Arc, Mutex as StdMutex}; +use tokio::sync::{Mutex, mpsc}; +use tokio::task::{JoinHandle}; +use tokio::signal; + +use crate::camera::{self, CameraController, Frame, MeteorEvent}; +use crate::config::{self, Config}; +use crate::detection::{self, DetectionPipeline}; +use crate::events::{EventBus, GpsStatusEvent, EnvironmentEvent, FrameEvent, DetectionEvent}; +use crate::gps::{self, GpsController, GpsStatus}; +use crate::hooks::{self, HookManager}; +use crate::monitoring::{self, SystemMonitor}; +use crate::overlay::{self, watermark}; +use crate::sensors::{self, SensorController, EnvironmentData}; +use crate::storage::{self, StorageManager}; +use crate::streaming::{self, RtspServer}; +use crate::communication::{self, CommunicationManager}; + +/// Main application orchestrator +pub struct Application { + /// Application configuration + config: Config, + /// Event bus for component communication + event_bus: EventBus, + /// Collection of background tasks + tasks: Vec>, + /// Set to true when application shutdown has been requested + shutdown_requested: bool, + + // Core components + camera_controller: Option>>, + gps_controller: Option>>, + sensor_controller: Option>>, + detection_pipeline: Option>>, + hook_manager: Option>>, + watermark: Option>>, + storage_manager: Option, + communication_manager: Option>>, + rtsp_server: Option>>, + system_monitor: Option, +} + +impl Application { + /// Create a new application instance with the given configuration + pub fn new(config: Config) -> Self { + Self { + config, + event_bus: EventBus::new(), + tasks: Vec::new(), + shutdown_requested: false, + + camera_controller: None, + gps_controller: None, + sensor_controller: None, + detection_pipeline: None, + hook_manager: None, + watermark: None, + storage_manager: None, + communication_manager: None, + rtsp_server: None, + system_monitor: None, + } + } + + /// Initialize the application components + pub async fn initialize(&mut self) -> Result<()> { + info!("Initializing application components"); + + // Initialize camera + let camera_controller = camera::CameraController::new(&self.config) + .await + .context("Failed to initialize camera")?; + self.camera_controller = Some(Arc::new(Mutex::new(camera_controller))); + + // Initialize GPS + let gps_controller = gps::GpsController::new(&self.config) + .await + .context("Failed to initialize GPS")?; + self.gps_controller = Some(Arc::new(Mutex::new(gps_controller))); + + // Initialize sensors + let sensor_controller = sensors::SensorController::new(&self.config) + .await + .context("Failed to initialize sensors")?; + self.sensor_controller = Some(Arc::new(Mutex::new(sensor_controller))); + + // Initialize hook manager + let hook_manager = hooks::HookManager::new(); + self.hook_manager = Some(Arc::new(Mutex::new(hook_manager))); + + // Initialize watermark + // Note: This still uses the old approach with direct dependencies. + // In a future refactoring, this would be updated to use event subscriptions. + let watermark = { + let gps_status = self.gps_controller.as_ref().unwrap().lock().await.get_status(); + let env_data = self.sensor_controller.as_ref().unwrap().lock().await.get_current_data(); + + overlay::watermark::Watermark::new( + self.config.watermark.clone(), + Arc::new(StdMutex::new(env_data)), + Arc::new(StdMutex::new(gps_status)), + ) + }; + self.watermark = Some(Arc::new(Mutex::new(watermark))); + + // Initialize RTSP server if enabled + if self.config.rtsp.enabled { + let rtsp_server = streaming::RtspServer::new(self.config.rtsp.clone()); + self.rtsp_server = Some(Arc::new(Mutex::new(rtsp_server))); + } + + // Initialize detection pipeline + // Note: This still uses the original constructor that takes a direct dependency + // on the camera controller. In a future refactoring, this would be updated to + // use frame events instead. + let detection_pipeline = detection::DetectionPipeline::new( + self.camera_controller.as_ref().unwrap().clone(), + &self.config, + self.config.detection.pipeline.clone() + ).context("Failed to initialize detection pipeline")?; + self.detection_pipeline = Some(Arc::new(Mutex::new(detection_pipeline))); + + // Initialize storage + let storage_manager = storage::StorageManager::new(&self.config) + .await + .context("Failed to initialize storage")?; + self.storage_manager = Some(storage_manager); + + // Initialize communication + // Note: This still uses the original constructor with direct dependencies. + // In a future refactoring, this would be updated to use event subscriptions. + let communication_manager = communication::CommunicationManager::new( + &self.config, + self.camera_controller.as_ref().unwrap().clone(), + self.gps_controller.as_ref().unwrap().clone(), + ).await.context("Failed to initialize communication")?; + self.communication_manager = Some(Arc::new(Mutex::new(communication_manager))); + + // Initialize monitoring + let system_monitor = monitoring::SystemMonitor::new(&self.config) + .await + .context("Failed to initialize system monitor")?; + self.system_monitor = Some(system_monitor); + + info!("All application components initialized"); + Ok(()) + } + + /// Start application components and set up event handlers + pub async fn start(&mut self) -> Result<()> { + info!("Starting application components"); + + // Setup frame hook for watermark + { + if let (Some(hook_manager), Some(watermark)) = (&self.hook_manager, &self.watermark) { + let mut manager = hook_manager.lock().await; + let watermark_clone = watermark.clone(); + manager.register_hook(Box::new(hooks::BasicFrameHook::new( + "watermark", + "Watermark Overlay", + "Adds timestamp, GPS, and sensor data overlay to frames", + self.config.watermark.enabled, + move |frame, timestamp| { + // Using block_in_place to properly handle async operations in sync context + tokio::task::block_in_place(|| { + let mut guard = futures::executor::block_on(watermark_clone.lock()); + guard.apply(frame, timestamp) + }) + }, + ))); + } + } + + // Start sensors + if let Some(controller) = &self.sensor_controller { + controller.lock().await.initialize().await + .context("Failed to initialize sensors")?; + controller.lock().await.start().await + .context("Failed to start sensors")?; + + // Add task to bridge sensor data to event bus (publisher) + let event_sender = self.event_bus.environment_sender(); + let controller_clone = controller.clone(); + self.tasks.push(tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + + loop { + interval.tick().await; + + let env_data = match controller_clone.lock().await.get_current_data() { + data => data.clone(), + }; + + let event = EnvironmentEvent { + timestamp: Utc::now(), + temperature: env_data.temperature, + humidity: env_data.humidity, + sky_brightness: env_data.sky_brightness, + }; + + if let Err(e) = event_sender.send(event) { + warn!("Failed to publish environment event: {}", e); + } + } + })); + } + + // Start GPS + if let Some(controller) = &self.gps_controller { + controller.lock().await.initialize().await + .context("Failed to initialize GPS")?; + controller.lock().await.start().await + .context("Failed to start GPS")?; + + // Add task to bridge GPS data to event bus + let event_sender = self.event_bus.gps_sender(); + let controller_clone = controller.clone(); + self.tasks.push(tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + + loop { + interval.tick().await; + + let gps_status = match controller_clone.lock().await.get_status() { + status => status.clone(), + }; + + let event = GpsStatusEvent { + timestamp: Utc::now(), + latitude: gps_status.position.latitude, + longitude: gps_status.position.longitude, + altitude: gps_status.position.altitude, + azimuth: gps_status.camera_orientation.azimuth, + elevation: gps_status.camera_orientation.elevation, + sync_status: gps_status.sync_status.clone(), + time_accuracy_ms: gps_status.time_accuracy_ms, + satellites: gps_status.satellites, + }; + + if let Err(e) = event_sender.send(event) { + warn!("Failed to publish GPS event: {}", e); + } + } + })); + } + + // Start RTSP server if enabled + if let Some(rtsp_server) = &self.rtsp_server { + rtsp_server.lock().await.start().await + .context("Failed to start RTSP server")?; + info!("RTSP server started at {}", rtsp_server.lock().await.get_url()); + + // Add task to feed frames to RTSP server + // This would be replaced with an event subscription in a future refactoring + let server_clone = rtsp_server.clone(); + let rtsp_enabled = self.config.rtsp.enabled; + self.tasks.push(tokio::spawn(async move { + if rtsp_enabled { + info!("Starting RTSP streaming task"); + + // Implement a proper continuous loop to keep the task alive + loop { + // This would normally feed frames to the RTSP server + // For now, just sleep to keep the task alive + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + })); + } + + // Start detection pipeline and connect it to the event bus + if let Some(pipeline) = &self.detection_pipeline { + // Spawn detection pipeline task + let pipeline_clone = pipeline.clone(); + self.tasks.push(tokio::spawn(async move { + if let Err(e) = pipeline_clone.lock().await.run().await { + error!("Detection pipeline error: {}", e); + } + })); + + // In a future refactoring, we'd add an adapter here to transform + // detection pipeline events into DetectionEvent messages on the event bus + } + + // Start communication manager + if let Some(comms) = &self.communication_manager { + // Spawn communication task using a cloned Arc> + let comms_clone = comms.clone(); + self.tasks.push(tokio::spawn(async move { + if let Err(e) = comms_clone.lock().await.run().await { + error!("Communication manager error: {}", e); + } + })); + + // Note: We've moved the communication manager into the task + // In a future event-based architecture, we'd have a CommunicationService + // that would act on events from the event bus instead of having direct + // dependencies on other components + } + + // Start system monitor + if let Some(monitor) = self.system_monitor.take() { + // Spawn monitor task and move ownership to the task + self.tasks.push(tokio::spawn(async move { + if let Err(e) = monitor.run().await { + error!("System monitor error: {}", e); + } + })); + + // Note: We've moved the system monitor into the task + // In a future event-based architecture, the monitoring would + // likely subscribe to events from other components + } + + info!("All application components started"); + Ok(()) + } + + /// Run the application until shutdown is requested + pub async fn run(&mut self) -> Result<()> { + info!("Application running, press Ctrl+C to exit"); + + // Wait for shutdown signal + match signal::ctrl_c().await { + Ok(()) => { + info!("Shutdown signal received, preparing to exit"); + self.shutdown_requested = true; + } + Err(err) => { + error!("Error waiting for Ctrl+C: {}", err); + } + } + + // Shutdown all tasks + for task in self.tasks.drain(..) { + task.abort(); + } + + info!("Application shutdown complete"); + Ok(()) + } + + /// Request application shutdown + pub fn request_shutdown(&mut self) { + self.shutdown_requested = true; + } + + /// Get a reference to the event bus + pub fn event_bus(&self) -> &EventBus { + &self.event_bus + } + + /// Get a mutable reference to the event bus + pub fn event_bus_mut(&mut self) -> &mut EventBus { + &mut self.event_bus + } +} diff --git a/src/events/mod.rs b/src/events/mod.rs new file mode 100644 index 0000000..83bf348 --- /dev/null +++ b/src/events/mod.rs @@ -0,0 +1,212 @@ +//! Event system for decoupled communication between components +//! +//! This module provides a publish-subscribe based event system that allows +//! components to communicate without direct dependencies on each other. + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use log::debug; +use serde::{Serialize, Deserialize}; +use std::fmt; +use tokio::sync::broadcast; + +/// Maximum capacity for broadcast channels +const DEFAULT_CHANNEL_CAPACITY: usize = 32; + +/// GPS position update event +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GpsStatusEvent { + /// Timestamp of the update + pub timestamp: DateTime, + /// Latitude in decimal degrees + pub latitude: f64, + /// Longitude in decimal degrees + pub longitude: f64, + /// Altitude in meters + pub altitude: f64, + /// Azimuth/heading in degrees + pub azimuth: f64, + /// Elevation in degrees + pub elevation: f64, + /// Time synchronization status + pub sync_status: String, + /// Estimated time accuracy in milliseconds + pub time_accuracy_ms: f64, + /// Number of satellites + pub satellites: u8, +} + +impl fmt::Display for GpsStatusEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "GPS({:.6},{:.6},{:.1}m)", + self.latitude, self.longitude, self.altitude) + } +} + +/// Environment sensor data update event +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EnvironmentEvent { + /// Timestamp of the update + pub timestamp: DateTime, + /// Temperature in Celsius + pub temperature: f32, + /// Humidity as percentage (0-100) + pub humidity: f32, + /// Sky brightness level (0-1, where 0 is completely dark) + pub sky_brightness: f32, +} + +impl fmt::Display for EnvironmentEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Env({:.1}°C, {:.1}%, brightness={:.2})", + self.temperature, self.humidity, self.sky_brightness) + } +} + +/// Frame available event +#[derive(Debug, Clone)] +pub struct FrameEvent { + /// Timestamp of the frame + pub timestamp: DateTime, + /// Frame index + pub index: u64, + /// Frame data - using a Vec here to simplify the event system + /// In a real implementation, you might use a more efficient representation + /// or an ID that can be used to fetch the frame from a buffer + pub frame_id: u64, +} + +/// Detection event +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DetectionEvent { + /// Timestamp of the detection + pub timestamp: DateTime, + /// Confidence score (0.0-1.0) + pub confidence: f32, + /// Detector ID that triggered the detection + pub detector_id: String, + /// Bounding box in format (x, y, width, height) + pub bounding_box: Option<(i32, i32, i32, i32)>, + /// Frame index that triggered the detection + pub frame_index: u64, + /// Start frame index (for the event buffer) + pub start_frame: u64, + /// End frame index (for the event buffer) + pub end_frame: u64, +} + +impl fmt::Display for DetectionEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Detection({}, conf={:.2})", + self.detector_id, self.confidence) + } +} + +/// Event bus for managing various event channels +#[derive(Debug, Clone)] +pub struct EventBus { + /// GPS status update channel + gps_tx: broadcast::Sender, + /// Environment data update channel + env_tx: broadcast::Sender, + /// Frame available channel + frame_tx: broadcast::Sender, + /// Detection event channel + detection_tx: broadcast::Sender, +} + +impl EventBus { + /// Create a new event bus with default channel capacities + pub fn new() -> Self { + Self::with_capacity(DEFAULT_CHANNEL_CAPACITY) + } + + /// Create a new event bus with specified channel capacity + pub fn with_capacity(capacity: usize) -> Self { + let (gps_tx, _) = broadcast::channel(capacity); + let (env_tx, _) = broadcast::channel(capacity); + let (frame_tx, _) = broadcast::channel(capacity); + let (detection_tx, _) = broadcast::channel(capacity); + + Self { + gps_tx, + env_tx, + frame_tx, + detection_tx, + } + } + + /// Get a GPS status event sender + pub fn gps_sender(&self) -> broadcast::Sender { + self.gps_tx.clone() + } + + /// Subscribe to GPS status events + pub fn subscribe_gps(&self) -> broadcast::Receiver { + self.gps_tx.subscribe() + } + + /// Get an environment event sender + pub fn environment_sender(&self) -> broadcast::Sender { + self.env_tx.clone() + } + + /// Subscribe to environment events + pub fn subscribe_environment(&self) -> broadcast::Receiver { + self.env_tx.subscribe() + } + + /// Get a frame event sender + pub fn frame_sender(&self) -> broadcast::Sender { + self.frame_tx.clone() + } + + /// Subscribe to frame events + pub fn subscribe_frames(&self) -> broadcast::Receiver { + self.frame_tx.subscribe() + } + + /// Get a detection event sender + pub fn detection_sender(&self) -> broadcast::Sender { + self.detection_tx.clone() + } + + /// Subscribe to detection events + pub fn subscribe_detections(&self) -> broadcast::Receiver { + self.detection_tx.subscribe() + } + + /// Publish a GPS status event + pub fn publish_gps(&self, event: GpsStatusEvent) -> Result<()> { + let count = self.gps_tx.send(event.clone())?; + debug!("Published GPS event {} to {} receivers", event, count); + Ok(()) + } + + /// Publish an environment event + pub fn publish_environment(&self, event: EnvironmentEvent) -> Result<()> { + let count = self.env_tx.send(event.clone())?; + debug!("Published environment event {} to {} receivers", event, count); + Ok(()) + } + + /// Publish a frame event + pub fn publish_frame(&self, event: FrameEvent) -> Result<()> { + let count = self.frame_tx.send(event)?; + debug!("Published frame event to {} receivers", count); + Ok(()) + } + + /// Publish a detection event + pub fn publish_detection(&self, event: DetectionEvent) -> Result<()> { + let count = self.detection_tx.send(event.clone())?; + debug!("Published detection event {} to {} receivers", event, count); + Ok(()) + } +} + +impl Default for EventBus { + fn default() -> Self { + Self::new() + } +} diff --git a/src/main.rs b/src/main.rs index eb49b36..702f06b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod camera; mod config; mod detection; +mod events; mod gps; mod hooks; mod overlay; @@ -25,6 +26,8 @@ pub use config::Config; use meteor_detect::PLATFORM_SUPPORTS_GPIO; use crate::overlay::Watermark; +mod app; + /// Main entry point for the meteor detection system #[tokio::main] async fn main() -> Result<()> { @@ -53,168 +56,21 @@ async fn main() -> Result<()> { info!("Loaded configuration with device ID: {}", config.device.id); - // Initialize camera subsystem - let camera_controller = camera::CameraController::new(&config) - .await - .context("Failed to initialize camera")?; - let camera_controller = Arc::new(Mutex::new(camera_controller)); + // Create the application + let mut application = app::Application::new(config); - // Initialize GPS module - let gps_controller = gps::GpsController::new(&config) - .await - .context("Failed to initialize GPS")?; - let gps_controller = Arc::new(Mutex::new(gps_controller)); + // Initialize all components + application.initialize().await + .context("Failed to initialize application components")?; - // Initialize sensor controller - let sensor_controller = sensors::SensorController::new(&config) - .await - .context("Failed to initialize sensors")?; - let sensor_controller = Arc::new(Mutex::new(sensor_controller)); + // Start all components and set up event handlers + application.start().await + .context("Failed to start application components")?; - // Initialize watermark overlay - let watermark = { - let gps_status = gps_controller.lock().await.get_status(); - let env_data = sensor_controller.lock().await.get_current_data(); - - overlay::watermark::Watermark::new( - config.watermark.clone(), - Arc::new(StdMutex::new(env_data)), - Arc::new(StdMutex::new(gps_status)), - ) - }; - let watermark = Arc::new(Mutex::new(watermark)); + // Run the application until shutdown + application.run().await + .context("Application runtime error")?; - // Initialize frame hook manager - let hook_manager = hooks::HookManager::new(); - let hook_manager = Arc::new(Mutex::new(hook_manager)); - - // Initialize RTSP streaming server - let rtsp_server = streaming::RtspServer::new(config.rtsp.clone()); - let rtsp_server = Arc::new(Mutex::new(rtsp_server)); - - // Initialize detection pipeline - let detection_pipeline = detection::DetectionPipeline::new( - camera_controller.clone(), - &config.clone(), - config.detection.pipeline.clone() - ).context("Failed to initialize detection pipeline")?; - - // Initialize storage system - let storage_manager = storage::StorageManager::new(&(config.clone())) - .await - .context("Failed to initialize storage")?; - - // Initialize communication module - let mut comms = communication::CommunicationManager::new( - &config, - camera_controller.clone(), - gps_controller.clone(), - ).await.context("Failed to initialize communication")?; - - // Initialize health monitoring - let monitor = monitoring::SystemMonitor::new(&config) - .await - .context("Failed to initialize system monitor")?; - - // Start all subsystems - info!("All subsystems initialized, starting main processing loop"); - - // Initialize sensors - sensor_controller.lock().await.initialize().await - .context("Failed to initialize sensors")?; - sensor_controller.lock().await.start().await - .context("Failed to start sensors")?; - - // Initialize GPS - gps_controller.lock().await.initialize().await - .context("Failed to initialize GPS")?; - gps_controller.lock().await.start().await - .context("Failed to start GPS")?; - - // Start RTSP server if enabled - if config.rtsp.enabled { - rtsp_server.lock().await.start().await - .context("Failed to start RTSP server")?; - info!("RTSP server started at {}", rtsp_server.lock().await.get_url()); - } - - // Run the main event loop - let mut tasks = Vec::new(); - - // Add watermark hook - { - let mut manager = hook_manager.lock().await; - let watermark_clone = watermark.clone(); - manager.register_hook(Box::new(hooks::BasicFrameHook::new( - "watermark", - "Watermark Overlay", - "Adds timestamp, GPS, and sensor data overlay to frames", - config.watermark.enabled, - move |frame, timestamp| { - // Using block_in_place to properly handle async operations in sync context - tokio::task::block_in_place(|| { - let mut guard = futures::executor::block_on(watermark_clone.lock()); - guard.apply(frame, timestamp) - }) - }, - ))); - } - - // Spawn detection pipeline task - tasks.push(tokio::spawn(async move { - if let Err(e) = detection_pipeline.run().await { - error!("Detection pipeline error: {}", e); - } - })); - - // Spawn communication manager task - tasks.push(tokio::spawn(async move { - if let Err(e) = comms.run().await { - error!("Communication manager error: {}", e); - } - })); - - // Spawn system monitor task - tasks.push(tokio::spawn(async move { - if let Err(e) = monitor.run().await { - error!("System monitor error: {}", e); - } - })); - - // Add RTSP streaming task with proper loop to keep it alive - let rtsp_config_enabled = config.rtsp.enabled; - tasks.push(tokio::spawn(async move { - if rtsp_config_enabled { - info!("Starting RTSP streaming task"); - - // Implement a proper continuous loop to keep the task alive - loop { - // This would normally feed frames to the RTSP server - // For now, just sleep to keep the task alive - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - - // Add some actual implementation here later - } - } - })); - - // Wait for the Ctrl+C signal - info!("Press Ctrl+C to stop the application"); - match signal::ctrl_c().await { - Ok(()) => { - info!("Shutdown signal received, preparing to exit"); - // Here you would add cleanup code - } - Err(err) => { - error!("Error waiting for Ctrl+C: {}", err); - } - } - - // Now that we've received a shutdown signal, cancel all tasks - for task in tasks { - task.abort(); - } - - info!("Meteor detection system shutting down"); + info!("Meteor detection system shutdown complete"); Ok(()) } diff --git a/src/overlay/event_watermark.rs b/src/overlay/event_watermark.rs new file mode 100644 index 0000000..54bb998 --- /dev/null +++ b/src/overlay/event_watermark.rs @@ -0,0 +1,312 @@ +//! Event-based watermark implementation +//! +//! This module provides a version of the watermark component that uses +//! the event bus to receive updates instead of directly accessing shared state. + +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use log::{debug, error, info, warn}; +use opencv::{core, imgproc, prelude::*, types}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::{Mutex, RwLock}; + +use crate::events::{EventBus, GpsStatusEvent, EnvironmentEvent}; +use crate::gps::{GeoPosition, GpsStatus, CameraOrientation}; +use crate::overlay::watermark::{WatermarkOptions, WatermarkPosition, WatermarkContent}; +use crate::sensors::EnvironmentData; + +/// Event-based watermark for overlaying information on video frames +pub struct EventWatermark { + /// Watermark configuration options + options: WatermarkOptions, + /// Current environment data (updated via events) + environment: Arc>, + /// Current GPS status (updated via events) + gps_status: Arc>, +} + +impl EventWatermark { + /// Create a new event-based watermark with the given options + pub async fn new( + options: WatermarkOptions, + event_bus: &EventBus, + ) -> Self { + // Create empty data stores + let environment = Arc::new(RwLock::new(EnvironmentData { + temperature: 0.0, + humidity: 0.0, + sky_brightness: 0.0, + timestamp: Utc::now(), + })); + + let gps_status = Arc::new(RwLock::new(GpsStatus { + timestamp: Utc::now(), + position: GeoPosition { + latitude: 0.0, + longitude: 0.0, + altitude: 0.0, + }, + camera_orientation: CameraOrientation { + azimuth: 0.0, + elevation: 0.0, + }, + sync_status: "NoSync".to_string(), + time_accuracy_ms: 0.0, + satellites: 0, + })); + + // Setup event subscriptions + Self::setup_subscriptions(event_bus, environment.clone(), gps_status.clone()); + + Self { + options, + environment, + gps_status, + } + } + + /// Setup event subscriptions + fn setup_subscriptions( + event_bus: &EventBus, + environment: Arc>, + gps_status: Arc>, + ) { + // Subscribe to environment events + let mut env_rx = event_bus.subscribe_environment(); + let env_data = environment.clone(); + tokio::spawn(async move { + while let Ok(event) = env_rx.recv().await { + let mut data = env_data.write().await; + data.temperature = event.temperature; + data.humidity = event.humidity; + data.sky_brightness = event.sky_brightness; + data.timestamp = event.timestamp; + debug!("Updated environment data: temp={}, humidity={}, brightness={}", + event.temperature, event.humidity, event.sky_brightness); + } + }); + + // Subscribe to GPS events + let mut gps_rx = event_bus.subscribe_gps(); + let gps_data = gps_status.clone(); + tokio::spawn(async move { + while let Ok(event) = gps_rx.recv().await { + let mut status = gps_data.write().await; + status.timestamp = event.timestamp; + status.position.latitude = event.latitude; + status.position.longitude = event.longitude; + status.position.altitude = event.altitude; + status.camera_orientation.azimuth = event.azimuth; + status.camera_orientation.elevation = event.elevation; + status.fix_quality = event.fix_quality; + status.satellites = event.satellites; + debug!("Updated GPS status: lat={}, lon={}", + event.latitude, event.longitude); + } + }); + } + + /// Update watermark options + pub fn set_options(&mut self, options: WatermarkOptions) { + self.options = options; + } + + /// Convert GPS coordinates to formatted string + async fn format_coordinates(&self) -> String { + let position = { + let gps = self.gps_status.read().await; + gps.position.clone() + }; + + if self.options.coordinate_format == "dms" { + // Convert decimal degrees to degrees, minutes, seconds + let lat_deg = position.latitude.abs().trunc() as i32; + let lat_min = ((position.latitude.abs() - lat_deg as f64) * 60.0).trunc() as i32; + let lat_sec = ((position.latitude.abs() - lat_deg as f64 - lat_min as f64 / 60.0) * 3600.0).round() as i32; + let lat_dir = if position.latitude >= 0.0 { "N" } else { "S" }; + + let lon_deg = position.longitude.abs().trunc() as i32; + let lon_min = ((position.longitude.abs() - lon_deg as f64) * 60.0).trunc() as i32; + let lon_sec = ((position.longitude.abs() - lon_deg as f64 - lon_min as f64 / 60.0) * 3600.0).round() as i32; + let lon_dir = if position.longitude >= 0.0 { "E" } else { "W" }; + + format!( + "{}°{:02}'{:02}\"{} {}°{:02}'{:02}\"{} Alt: {:.1}m", + lat_deg, lat_min, lat_sec, lat_dir, + lon_deg, lon_min, lon_sec, lon_dir, + position.altitude + ) + } else { + // Decimal degrees format + format!( + "Lat: {:.6}° Lon: {:.6}° Alt: {:.1}m", + position.latitude, position.longitude, position.altitude + ) + } + } + + /// Convert temperature to formatted string based on options + async fn format_temperature(&self) -> String { + let temp_c = { + let env = self.environment.read().await; + env.temperature + }; + + if self.options.temperature_format == "F" { + // Convert to Fahrenheit + let temp_f = temp_c * 9.0 / 5.0 + 32.0; + format!("{:.1}°F", temp_f) + } else { + // Default to Celsius + format!("{:.1}°C", temp_c) + } + } + + /// Apply watermark to the given frame + pub async fn apply(&self, frame: &mut core::Mat, timestamp: DateTime) -> Result<()> { + if !self.options.enabled { + return Ok(()); + } + + // Build the text lines to display + let mut lines = Vec::new(); + + for content in &self.options.content { + match content { + WatermarkContent::Timestamp => { + lines.push(timestamp.format(&self.options.time_format).to_string()); + }, + WatermarkContent::GpsCoordinates => { + lines.push(self.format_coordinates().await); + }, + WatermarkContent::Environment => { + let humidity = { + let env = self.environment.read().await; + env.humidity + }; + + lines.push(format!( + "Temp: {} Humidity: {:.1}%", + self.format_temperature().await, + humidity + )); + }, + WatermarkContent::CameraOrientation => { + let orientation = { + let gps = self.gps_status.read().await; + gps.camera_orientation.clone() + }; + + lines.push(format!( + "Az: {:.1}° El: {:.1}°", + orientation.azimuth, + orientation.elevation + )); + }, + WatermarkContent::Custom(text) => { + lines.push(text.clone()); + }, + } + } + + // Skip if no content + if lines.is_empty() { + return Ok(()); + } + + // Get frame dimensions + let width = frame.cols(); + let height = frame.rows(); + + // Calculate text size and position + let font = imgproc::FONT_HERSHEY_SIMPLEX; + let font_scale = self.options.font_scale; + let thickness = self.options.thickness; + let padding = self.options.padding; + + // Calculate total height of all lines + let mut line_heights = Vec::with_capacity(lines.len()); + let mut max_width = 0; + + for line in &lines { + let size = imgproc::get_text_size(line, font, font_scale, thickness, &mut 0)?; + line_heights.push(size.height); + max_width = max_width.max(size.width); + } + + let total_height: i32 = line_heights.iter().sum(); + let line_spacing = 4; // Space between lines + let total_space_height = line_spacing * (lines.len() as i32 - 1); + let text_block_height = total_height + total_space_height + padding * 2; + let text_block_width = max_width + padding * 2; + + // Calculate watermark position + let (x, y) = match self.options.position { + WatermarkPosition::TopLeft => (padding, padding), + WatermarkPosition::TopRight => (width - text_block_width - padding, padding), + WatermarkPosition::BottomLeft => (padding, height - text_block_height - padding), + WatermarkPosition::BottomRight => ( + width - text_block_width - padding, + height - text_block_height - padding + ), + WatermarkPosition::Custom(x, y) => (x as i32, y as i32), + }; + + // Draw background rectangle if enabled + if self.options.background { + let bg_color = core::Scalar::new( + self.options.background_color.0 as f64, + self.options.background_color.1 as f64, + self.options.background_color.2 as f64, + self.options.background_color.3 as f64, + ); + + let rect = core::Rect::new( + x, + y, + text_block_width, + text_block_height + ); + + imgproc::rectangle( + frame, + rect, + bg_color, + -1, // Fill + imgproc::LINE_8, + 0, + )?; + } + + // Draw text lines + let text_color = core::Scalar::new( + self.options.color.0 as f64, + self.options.color.1 as f64, + self.options.color.2 as f64, + self.options.color.3 as f64, + ); + + let mut current_y = y + padding + line_heights[0]; + + for (i, line) in lines.iter().enumerate() { + imgproc::put_text( + frame, + line, + core::Point::new(x + padding, current_y), + font, + font_scale, + text_color, + thickness, + imgproc::LINE_AA, + false, + )?; + + if i < lines.len() - 1 { + current_y += line_heights[i] + line_spacing; + } + } + + Ok(()) + } +}