This commit is contained in:
grabbit 2025-04-10 00:34:13 +08:00
parent b45f2d198f
commit 07d99d009f
4 changed files with 904 additions and 159 deletions

365
src/app.rs Normal file
View File

@ -0,0 +1,365 @@
//! Application orchestrator and dependency manager
//!
//! This module provides a centralized Application struct that handles
//! initialization, lifecycle management, and component wiring.
use anyhow::{Context, Result};
use chrono::Utc;
use log::{info, error, warn, debug};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::{Mutex, mpsc};
use tokio::task::{JoinHandle};
use tokio::signal;
use crate::camera::{self, CameraController, Frame, MeteorEvent};
use crate::config::{self, Config};
use crate::detection::{self, DetectionPipeline};
use crate::events::{EventBus, GpsStatusEvent, EnvironmentEvent, FrameEvent, DetectionEvent};
use crate::gps::{self, GpsController, GpsStatus};
use crate::hooks::{self, HookManager};
use crate::monitoring::{self, SystemMonitor};
use crate::overlay::{self, watermark};
use crate::sensors::{self, SensorController, EnvironmentData};
use crate::storage::{self, StorageManager};
use crate::streaming::{self, RtspServer};
use crate::communication::{self, CommunicationManager};
/// Main application orchestrator
pub struct Application {
/// Application configuration
config: Config,
/// Event bus for component communication
event_bus: EventBus,
/// Collection of background tasks
tasks: Vec<JoinHandle<()>>,
/// Set to true when application shutdown has been requested
shutdown_requested: bool,
// Core components
camera_controller: Option<Arc<Mutex<CameraController>>>,
gps_controller: Option<Arc<Mutex<GpsController>>>,
sensor_controller: Option<Arc<Mutex<SensorController>>>,
detection_pipeline: Option<Arc<Mutex<detection::DetectionPipeline>>>,
hook_manager: Option<Arc<Mutex<HookManager>>>,
watermark: Option<Arc<Mutex<watermark::Watermark>>>,
storage_manager: Option<storage::StorageManager>,
communication_manager: Option<Arc<Mutex<communication::CommunicationManager>>>,
rtsp_server: Option<Arc<Mutex<RtspServer>>>,
system_monitor: Option<monitoring::SystemMonitor>,
}
impl Application {
/// Create a new application instance with the given configuration
pub fn new(config: Config) -> Self {
Self {
config,
event_bus: EventBus::new(),
tasks: Vec::new(),
shutdown_requested: false,
camera_controller: None,
gps_controller: None,
sensor_controller: None,
detection_pipeline: None,
hook_manager: None,
watermark: None,
storage_manager: None,
communication_manager: None,
rtsp_server: None,
system_monitor: None,
}
}
/// Initialize the application components
pub async fn initialize(&mut self) -> Result<()> {
info!("Initializing application components");
// Initialize camera
let camera_controller = camera::CameraController::new(&self.config)
.await
.context("Failed to initialize camera")?;
self.camera_controller = Some(Arc::new(Mutex::new(camera_controller)));
// Initialize GPS
let gps_controller = gps::GpsController::new(&self.config)
.await
.context("Failed to initialize GPS")?;
self.gps_controller = Some(Arc::new(Mutex::new(gps_controller)));
// Initialize sensors
let sensor_controller = sensors::SensorController::new(&self.config)
.await
.context("Failed to initialize sensors")?;
self.sensor_controller = Some(Arc::new(Mutex::new(sensor_controller)));
// Initialize hook manager
let hook_manager = hooks::HookManager::new();
self.hook_manager = Some(Arc::new(Mutex::new(hook_manager)));
// Initialize watermark
// Note: This still uses the old approach with direct dependencies.
// In a future refactoring, this would be updated to use event subscriptions.
let watermark = {
let gps_status = self.gps_controller.as_ref().unwrap().lock().await.get_status();
let env_data = self.sensor_controller.as_ref().unwrap().lock().await.get_current_data();
overlay::watermark::Watermark::new(
self.config.watermark.clone(),
Arc::new(StdMutex::new(env_data)),
Arc::new(StdMutex::new(gps_status)),
)
};
self.watermark = Some(Arc::new(Mutex::new(watermark)));
// Initialize RTSP server if enabled
if self.config.rtsp.enabled {
let rtsp_server = streaming::RtspServer::new(self.config.rtsp.clone());
self.rtsp_server = Some(Arc::new(Mutex::new(rtsp_server)));
}
// Initialize detection pipeline
// Note: This still uses the original constructor that takes a direct dependency
// on the camera controller. In a future refactoring, this would be updated to
// use frame events instead.
let detection_pipeline = detection::DetectionPipeline::new(
self.camera_controller.as_ref().unwrap().clone(),
&self.config,
self.config.detection.pipeline.clone()
).context("Failed to initialize detection pipeline")?;
self.detection_pipeline = Some(Arc::new(Mutex::new(detection_pipeline)));
// Initialize storage
let storage_manager = storage::StorageManager::new(&self.config)
.await
.context("Failed to initialize storage")?;
self.storage_manager = Some(storage_manager);
// Initialize communication
// Note: This still uses the original constructor with direct dependencies.
// In a future refactoring, this would be updated to use event subscriptions.
let communication_manager = communication::CommunicationManager::new(
&self.config,
self.camera_controller.as_ref().unwrap().clone(),
self.gps_controller.as_ref().unwrap().clone(),
).await.context("Failed to initialize communication")?;
self.communication_manager = Some(Arc::new(Mutex::new(communication_manager)));
// Initialize monitoring
let system_monitor = monitoring::SystemMonitor::new(&self.config)
.await
.context("Failed to initialize system monitor")?;
self.system_monitor = Some(system_monitor);
info!("All application components initialized");
Ok(())
}
/// Start application components and set up event handlers
pub async fn start(&mut self) -> Result<()> {
info!("Starting application components");
// Setup frame hook for watermark
{
if let (Some(hook_manager), Some(watermark)) = (&self.hook_manager, &self.watermark) {
let mut manager = hook_manager.lock().await;
let watermark_clone = watermark.clone();
manager.register_hook(Box::new(hooks::BasicFrameHook::new(
"watermark",
"Watermark Overlay",
"Adds timestamp, GPS, and sensor data overlay to frames",
self.config.watermark.enabled,
move |frame, timestamp| {
// Using block_in_place to properly handle async operations in sync context
tokio::task::block_in_place(|| {
let mut guard = futures::executor::block_on(watermark_clone.lock());
guard.apply(frame, timestamp)
})
},
)));
}
}
// Start sensors
if let Some(controller) = &self.sensor_controller {
controller.lock().await.initialize().await
.context("Failed to initialize sensors")?;
controller.lock().await.start().await
.context("Failed to start sensors")?;
// Add task to bridge sensor data to event bus (publisher)
let event_sender = self.event_bus.environment_sender();
let controller_clone = controller.clone();
self.tasks.push(tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let env_data = match controller_clone.lock().await.get_current_data() {
data => data.clone(),
};
let event = EnvironmentEvent {
timestamp: Utc::now(),
temperature: env_data.temperature,
humidity: env_data.humidity,
sky_brightness: env_data.sky_brightness,
};
if let Err(e) = event_sender.send(event) {
warn!("Failed to publish environment event: {}", e);
}
}
}));
}
// Start GPS
if let Some(controller) = &self.gps_controller {
controller.lock().await.initialize().await
.context("Failed to initialize GPS")?;
controller.lock().await.start().await
.context("Failed to start GPS")?;
// Add task to bridge GPS data to event bus
let event_sender = self.event_bus.gps_sender();
let controller_clone = controller.clone();
self.tasks.push(tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let gps_status = match controller_clone.lock().await.get_status() {
status => status.clone(),
};
let event = GpsStatusEvent {
timestamp: Utc::now(),
latitude: gps_status.position.latitude,
longitude: gps_status.position.longitude,
altitude: gps_status.position.altitude,
azimuth: gps_status.camera_orientation.azimuth,
elevation: gps_status.camera_orientation.elevation,
sync_status: gps_status.sync_status.clone(),
time_accuracy_ms: gps_status.time_accuracy_ms,
satellites: gps_status.satellites,
};
if let Err(e) = event_sender.send(event) {
warn!("Failed to publish GPS event: {}", e);
}
}
}));
}
// Start RTSP server if enabled
if let Some(rtsp_server) = &self.rtsp_server {
rtsp_server.lock().await.start().await
.context("Failed to start RTSP server")?;
info!("RTSP server started at {}", rtsp_server.lock().await.get_url());
// Add task to feed frames to RTSP server
// This would be replaced with an event subscription in a future refactoring
let server_clone = rtsp_server.clone();
let rtsp_enabled = self.config.rtsp.enabled;
self.tasks.push(tokio::spawn(async move {
if rtsp_enabled {
info!("Starting RTSP streaming task");
// Implement a proper continuous loop to keep the task alive
loop {
// This would normally feed frames to the RTSP server
// For now, just sleep to keep the task alive
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}));
}
// Start detection pipeline and connect it to the event bus
if let Some(pipeline) = &self.detection_pipeline {
// Spawn detection pipeline task
let pipeline_clone = pipeline.clone();
self.tasks.push(tokio::spawn(async move {
if let Err(e) = pipeline_clone.lock().await.run().await {
error!("Detection pipeline error: {}", e);
}
}));
// In a future refactoring, we'd add an adapter here to transform
// detection pipeline events into DetectionEvent messages on the event bus
}
// Start communication manager
if let Some(comms) = &self.communication_manager {
// Spawn communication task using a cloned Arc<Mutex<...>>
let comms_clone = comms.clone();
self.tasks.push(tokio::spawn(async move {
if let Err(e) = comms_clone.lock().await.run().await {
error!("Communication manager error: {}", e);
}
}));
// Note: We've moved the communication manager into the task
// In a future event-based architecture, we'd have a CommunicationService
// that would act on events from the event bus instead of having direct
// dependencies on other components
}
// Start system monitor
if let Some(monitor) = self.system_monitor.take() {
// Spawn monitor task and move ownership to the task
self.tasks.push(tokio::spawn(async move {
if let Err(e) = monitor.run().await {
error!("System monitor error: {}", e);
}
}));
// Note: We've moved the system monitor into the task
// In a future event-based architecture, the monitoring would
// likely subscribe to events from other components
}
info!("All application components started");
Ok(())
}
/// Run the application until shutdown is requested
pub async fn run(&mut self) -> Result<()> {
info!("Application running, press Ctrl+C to exit");
// Wait for shutdown signal
match signal::ctrl_c().await {
Ok(()) => {
info!("Shutdown signal received, preparing to exit");
self.shutdown_requested = true;
}
Err(err) => {
error!("Error waiting for Ctrl+C: {}", err);
}
}
// Shutdown all tasks
for task in self.tasks.drain(..) {
task.abort();
}
info!("Application shutdown complete");
Ok(())
}
/// Request application shutdown
pub fn request_shutdown(&mut self) {
self.shutdown_requested = true;
}
/// Get a reference to the event bus
pub fn event_bus(&self) -> &EventBus {
&self.event_bus
}
/// Get a mutable reference to the event bus
pub fn event_bus_mut(&mut self) -> &mut EventBus {
&mut self.event_bus
}
}

212
src/events/mod.rs Normal file
View File

@ -0,0 +1,212 @@
//! Event system for decoupled communication between components
//!
//! This module provides a publish-subscribe based event system that allows
//! components to communicate without direct dependencies on each other.
use anyhow::Result;
use chrono::{DateTime, Utc};
use log::debug;
use serde::{Serialize, Deserialize};
use std::fmt;
use tokio::sync::broadcast;
/// Maximum capacity for broadcast channels
const DEFAULT_CHANNEL_CAPACITY: usize = 32;
/// GPS position update event
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GpsStatusEvent {
/// Timestamp of the update
pub timestamp: DateTime<Utc>,
/// Latitude in decimal degrees
pub latitude: f64,
/// Longitude in decimal degrees
pub longitude: f64,
/// Altitude in meters
pub altitude: f64,
/// Azimuth/heading in degrees
pub azimuth: f64,
/// Elevation in degrees
pub elevation: f64,
/// Time synchronization status
pub sync_status: String,
/// Estimated time accuracy in milliseconds
pub time_accuracy_ms: f64,
/// Number of satellites
pub satellites: u8,
}
impl fmt::Display for GpsStatusEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GPS({:.6},{:.6},{:.1}m)",
self.latitude, self.longitude, self.altitude)
}
}
/// Environment sensor data update event
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvironmentEvent {
/// Timestamp of the update
pub timestamp: DateTime<Utc>,
/// Temperature in Celsius
pub temperature: f32,
/// Humidity as percentage (0-100)
pub humidity: f32,
/// Sky brightness level (0-1, where 0 is completely dark)
pub sky_brightness: f32,
}
impl fmt::Display for EnvironmentEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Env({:.1}°C, {:.1}%, brightness={:.2})",
self.temperature, self.humidity, self.sky_brightness)
}
}
/// Frame available event
#[derive(Debug, Clone)]
pub struct FrameEvent {
/// Timestamp of the frame
pub timestamp: DateTime<Utc>,
/// Frame index
pub index: u64,
/// Frame data - using a Vec<u8> here to simplify the event system
/// In a real implementation, you might use a more efficient representation
/// or an ID that can be used to fetch the frame from a buffer
pub frame_id: u64,
}
/// Detection event
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetectionEvent {
/// Timestamp of the detection
pub timestamp: DateTime<Utc>,
/// Confidence score (0.0-1.0)
pub confidence: f32,
/// Detector ID that triggered the detection
pub detector_id: String,
/// Bounding box in format (x, y, width, height)
pub bounding_box: Option<(i32, i32, i32, i32)>,
/// Frame index that triggered the detection
pub frame_index: u64,
/// Start frame index (for the event buffer)
pub start_frame: u64,
/// End frame index (for the event buffer)
pub end_frame: u64,
}
impl fmt::Display for DetectionEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Detection({}, conf={:.2})",
self.detector_id, self.confidence)
}
}
/// Event bus for managing various event channels
#[derive(Debug, Clone)]
pub struct EventBus {
/// GPS status update channel
gps_tx: broadcast::Sender<GpsStatusEvent>,
/// Environment data update channel
env_tx: broadcast::Sender<EnvironmentEvent>,
/// Frame available channel
frame_tx: broadcast::Sender<FrameEvent>,
/// Detection event channel
detection_tx: broadcast::Sender<DetectionEvent>,
}
impl EventBus {
/// Create a new event bus with default channel capacities
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
/// Create a new event bus with specified channel capacity
pub fn with_capacity(capacity: usize) -> Self {
let (gps_tx, _) = broadcast::channel(capacity);
let (env_tx, _) = broadcast::channel(capacity);
let (frame_tx, _) = broadcast::channel(capacity);
let (detection_tx, _) = broadcast::channel(capacity);
Self {
gps_tx,
env_tx,
frame_tx,
detection_tx,
}
}
/// Get a GPS status event sender
pub fn gps_sender(&self) -> broadcast::Sender<GpsStatusEvent> {
self.gps_tx.clone()
}
/// Subscribe to GPS status events
pub fn subscribe_gps(&self) -> broadcast::Receiver<GpsStatusEvent> {
self.gps_tx.subscribe()
}
/// Get an environment event sender
pub fn environment_sender(&self) -> broadcast::Sender<EnvironmentEvent> {
self.env_tx.clone()
}
/// Subscribe to environment events
pub fn subscribe_environment(&self) -> broadcast::Receiver<EnvironmentEvent> {
self.env_tx.subscribe()
}
/// Get a frame event sender
pub fn frame_sender(&self) -> broadcast::Sender<FrameEvent> {
self.frame_tx.clone()
}
/// Subscribe to frame events
pub fn subscribe_frames(&self) -> broadcast::Receiver<FrameEvent> {
self.frame_tx.subscribe()
}
/// Get a detection event sender
pub fn detection_sender(&self) -> broadcast::Sender<DetectionEvent> {
self.detection_tx.clone()
}
/// Subscribe to detection events
pub fn subscribe_detections(&self) -> broadcast::Receiver<DetectionEvent> {
self.detection_tx.subscribe()
}
/// Publish a GPS status event
pub fn publish_gps(&self, event: GpsStatusEvent) -> Result<()> {
let count = self.gps_tx.send(event.clone())?;
debug!("Published GPS event {} to {} receivers", event, count);
Ok(())
}
/// Publish an environment event
pub fn publish_environment(&self, event: EnvironmentEvent) -> Result<()> {
let count = self.env_tx.send(event.clone())?;
debug!("Published environment event {} to {} receivers", event, count);
Ok(())
}
/// Publish a frame event
pub fn publish_frame(&self, event: FrameEvent) -> Result<()> {
let count = self.frame_tx.send(event)?;
debug!("Published frame event to {} receivers", count);
Ok(())
}
/// Publish a detection event
pub fn publish_detection(&self, event: DetectionEvent) -> Result<()> {
let count = self.detection_tx.send(event.clone())?;
debug!("Published detection event {} to {} receivers", event, count);
Ok(())
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}

View File

@ -2,6 +2,7 @@
mod camera;
mod config;
mod detection;
mod events;
mod gps;
mod hooks;
mod overlay;
@ -25,6 +26,8 @@ pub use config::Config;
use meteor_detect::PLATFORM_SUPPORTS_GPIO;
use crate::overlay::Watermark;
mod app;
/// Main entry point for the meteor detection system
#[tokio::main]
async fn main() -> Result<()> {
@ -53,168 +56,21 @@ async fn main() -> Result<()> {
info!("Loaded configuration with device ID: {}", config.device.id);
// Initialize camera subsystem
let camera_controller = camera::CameraController::new(&config)
.await
.context("Failed to initialize camera")?;
let camera_controller = Arc::new(Mutex::new(camera_controller));
// Create the application
let mut application = app::Application::new(config);
// Initialize GPS module
let gps_controller = gps::GpsController::new(&config)
.await
.context("Failed to initialize GPS")?;
let gps_controller = Arc::new(Mutex::new(gps_controller));
// Initialize all components
application.initialize().await
.context("Failed to initialize application components")?;
// Initialize sensor controller
let sensor_controller = sensors::SensorController::new(&config)
.await
.context("Failed to initialize sensors")?;
let sensor_controller = Arc::new(Mutex::new(sensor_controller));
// Start all components and set up event handlers
application.start().await
.context("Failed to start application components")?;
// Initialize watermark overlay
let watermark = {
let gps_status = gps_controller.lock().await.get_status();
let env_data = sensor_controller.lock().await.get_current_data();
overlay::watermark::Watermark::new(
config.watermark.clone(),
Arc::new(StdMutex::new(env_data)),
Arc::new(StdMutex::new(gps_status)),
)
};
let watermark = Arc::new(Mutex::new(watermark));
// Run the application until shutdown
application.run().await
.context("Application runtime error")?;
// Initialize frame hook manager
let hook_manager = hooks::HookManager::new();
let hook_manager = Arc::new(Mutex::new(hook_manager));
// Initialize RTSP streaming server
let rtsp_server = streaming::RtspServer::new(config.rtsp.clone());
let rtsp_server = Arc::new(Mutex::new(rtsp_server));
// Initialize detection pipeline
let detection_pipeline = detection::DetectionPipeline::new(
camera_controller.clone(),
&config.clone(),
config.detection.pipeline.clone()
).context("Failed to initialize detection pipeline")?;
// Initialize storage system
let storage_manager = storage::StorageManager::new(&(config.clone()))
.await
.context("Failed to initialize storage")?;
// Initialize communication module
let mut comms = communication::CommunicationManager::new(
&config,
camera_controller.clone(),
gps_controller.clone(),
).await.context("Failed to initialize communication")?;
// Initialize health monitoring
let monitor = monitoring::SystemMonitor::new(&config)
.await
.context("Failed to initialize system monitor")?;
// Start all subsystems
info!("All subsystems initialized, starting main processing loop");
// Initialize sensors
sensor_controller.lock().await.initialize().await
.context("Failed to initialize sensors")?;
sensor_controller.lock().await.start().await
.context("Failed to start sensors")?;
// Initialize GPS
gps_controller.lock().await.initialize().await
.context("Failed to initialize GPS")?;
gps_controller.lock().await.start().await
.context("Failed to start GPS")?;
// Start RTSP server if enabled
if config.rtsp.enabled {
rtsp_server.lock().await.start().await
.context("Failed to start RTSP server")?;
info!("RTSP server started at {}", rtsp_server.lock().await.get_url());
}
// Run the main event loop
let mut tasks = Vec::new();
// Add watermark hook
{
let mut manager = hook_manager.lock().await;
let watermark_clone = watermark.clone();
manager.register_hook(Box::new(hooks::BasicFrameHook::new(
"watermark",
"Watermark Overlay",
"Adds timestamp, GPS, and sensor data overlay to frames",
config.watermark.enabled,
move |frame, timestamp| {
// Using block_in_place to properly handle async operations in sync context
tokio::task::block_in_place(|| {
let mut guard = futures::executor::block_on(watermark_clone.lock());
guard.apply(frame, timestamp)
})
},
)));
}
// Spawn detection pipeline task
tasks.push(tokio::spawn(async move {
if let Err(e) = detection_pipeline.run().await {
error!("Detection pipeline error: {}", e);
}
}));
// Spawn communication manager task
tasks.push(tokio::spawn(async move {
if let Err(e) = comms.run().await {
error!("Communication manager error: {}", e);
}
}));
// Spawn system monitor task
tasks.push(tokio::spawn(async move {
if let Err(e) = monitor.run().await {
error!("System monitor error: {}", e);
}
}));
// Add RTSP streaming task with proper loop to keep it alive
let rtsp_config_enabled = config.rtsp.enabled;
tasks.push(tokio::spawn(async move {
if rtsp_config_enabled {
info!("Starting RTSP streaming task");
// Implement a proper continuous loop to keep the task alive
loop {
// This would normally feed frames to the RTSP server
// For now, just sleep to keep the task alive
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// Add some actual implementation here later
}
}
}));
// Wait for the Ctrl+C signal
info!("Press Ctrl+C to stop the application");
match signal::ctrl_c().await {
Ok(()) => {
info!("Shutdown signal received, preparing to exit");
// Here you would add cleanup code
}
Err(err) => {
error!("Error waiting for Ctrl+C: {}", err);
}
}
// Now that we've received a shutdown signal, cancel all tasks
for task in tasks {
task.abort();
}
info!("Meteor detection system shutting down");
info!("Meteor detection system shutdown complete");
Ok(())
}

View File

@ -0,0 +1,312 @@
//! Event-based watermark implementation
//!
//! This module provides a version of the watermark component that uses
//! the event bus to receive updates instead of directly accessing shared state.
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use log::{debug, error, info, warn};
use opencv::{core, imgproc, prelude::*, types};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use crate::events::{EventBus, GpsStatusEvent, EnvironmentEvent};
use crate::gps::{GeoPosition, GpsStatus, CameraOrientation};
use crate::overlay::watermark::{WatermarkOptions, WatermarkPosition, WatermarkContent};
use crate::sensors::EnvironmentData;
/// Event-based watermark for overlaying information on video frames
pub struct EventWatermark {
/// Watermark configuration options
options: WatermarkOptions,
/// Current environment data (updated via events)
environment: Arc<RwLock<EnvironmentData>>,
/// Current GPS status (updated via events)
gps_status: Arc<RwLock<GpsStatus>>,
}
impl EventWatermark {
/// Create a new event-based watermark with the given options
pub async fn new(
options: WatermarkOptions,
event_bus: &EventBus,
) -> Self {
// Create empty data stores
let environment = Arc::new(RwLock::new(EnvironmentData {
temperature: 0.0,
humidity: 0.0,
sky_brightness: 0.0,
timestamp: Utc::now(),
}));
let gps_status = Arc::new(RwLock::new(GpsStatus {
timestamp: Utc::now(),
position: GeoPosition {
latitude: 0.0,
longitude: 0.0,
altitude: 0.0,
},
camera_orientation: CameraOrientation {
azimuth: 0.0,
elevation: 0.0,
},
sync_status: "NoSync".to_string(),
time_accuracy_ms: 0.0,
satellites: 0,
}));
// Setup event subscriptions
Self::setup_subscriptions(event_bus, environment.clone(), gps_status.clone());
Self {
options,
environment,
gps_status,
}
}
/// Setup event subscriptions
fn setup_subscriptions(
event_bus: &EventBus,
environment: Arc<RwLock<EnvironmentData>>,
gps_status: Arc<RwLock<GpsStatus>>,
) {
// Subscribe to environment events
let mut env_rx = event_bus.subscribe_environment();
let env_data = environment.clone();
tokio::spawn(async move {
while let Ok(event) = env_rx.recv().await {
let mut data = env_data.write().await;
data.temperature = event.temperature;
data.humidity = event.humidity;
data.sky_brightness = event.sky_brightness;
data.timestamp = event.timestamp;
debug!("Updated environment data: temp={}, humidity={}, brightness={}",
event.temperature, event.humidity, event.sky_brightness);
}
});
// Subscribe to GPS events
let mut gps_rx = event_bus.subscribe_gps();
let gps_data = gps_status.clone();
tokio::spawn(async move {
while let Ok(event) = gps_rx.recv().await {
let mut status = gps_data.write().await;
status.timestamp = event.timestamp;
status.position.latitude = event.latitude;
status.position.longitude = event.longitude;
status.position.altitude = event.altitude;
status.camera_orientation.azimuth = event.azimuth;
status.camera_orientation.elevation = event.elevation;
status.fix_quality = event.fix_quality;
status.satellites = event.satellites;
debug!("Updated GPS status: lat={}, lon={}",
event.latitude, event.longitude);
}
});
}
/// Update watermark options
pub fn set_options(&mut self, options: WatermarkOptions) {
self.options = options;
}
/// Convert GPS coordinates to formatted string
async fn format_coordinates(&self) -> String {
let position = {
let gps = self.gps_status.read().await;
gps.position.clone()
};
if self.options.coordinate_format == "dms" {
// Convert decimal degrees to degrees, minutes, seconds
let lat_deg = position.latitude.abs().trunc() as i32;
let lat_min = ((position.latitude.abs() - lat_deg as f64) * 60.0).trunc() as i32;
let lat_sec = ((position.latitude.abs() - lat_deg as f64 - lat_min as f64 / 60.0) * 3600.0).round() as i32;
let lat_dir = if position.latitude >= 0.0 { "N" } else { "S" };
let lon_deg = position.longitude.abs().trunc() as i32;
let lon_min = ((position.longitude.abs() - lon_deg as f64) * 60.0).trunc() as i32;
let lon_sec = ((position.longitude.abs() - lon_deg as f64 - lon_min as f64 / 60.0) * 3600.0).round() as i32;
let lon_dir = if position.longitude >= 0.0 { "E" } else { "W" };
format!(
"{}°{:02}'{:02}\"{} {}°{:02}'{:02}\"{} Alt: {:.1}m",
lat_deg, lat_min, lat_sec, lat_dir,
lon_deg, lon_min, lon_sec, lon_dir,
position.altitude
)
} else {
// Decimal degrees format
format!(
"Lat: {:.6}° Lon: {:.6}° Alt: {:.1}m",
position.latitude, position.longitude, position.altitude
)
}
}
/// Convert temperature to formatted string based on options
async fn format_temperature(&self) -> String {
let temp_c = {
let env = self.environment.read().await;
env.temperature
};
if self.options.temperature_format == "F" {
// Convert to Fahrenheit
let temp_f = temp_c * 9.0 / 5.0 + 32.0;
format!("{:.1}°F", temp_f)
} else {
// Default to Celsius
format!("{:.1}°C", temp_c)
}
}
/// Apply watermark to the given frame
pub async fn apply(&self, frame: &mut core::Mat, timestamp: DateTime<Utc>) -> Result<()> {
if !self.options.enabled {
return Ok(());
}
// Build the text lines to display
let mut lines = Vec::new();
for content in &self.options.content {
match content {
WatermarkContent::Timestamp => {
lines.push(timestamp.format(&self.options.time_format).to_string());
},
WatermarkContent::GpsCoordinates => {
lines.push(self.format_coordinates().await);
},
WatermarkContent::Environment => {
let humidity = {
let env = self.environment.read().await;
env.humidity
};
lines.push(format!(
"Temp: {} Humidity: {:.1}%",
self.format_temperature().await,
humidity
));
},
WatermarkContent::CameraOrientation => {
let orientation = {
let gps = self.gps_status.read().await;
gps.camera_orientation.clone()
};
lines.push(format!(
"Az: {:.1}° El: {:.1}°",
orientation.azimuth,
orientation.elevation
));
},
WatermarkContent::Custom(text) => {
lines.push(text.clone());
},
}
}
// Skip if no content
if lines.is_empty() {
return Ok(());
}
// Get frame dimensions
let width = frame.cols();
let height = frame.rows();
// Calculate text size and position
let font = imgproc::FONT_HERSHEY_SIMPLEX;
let font_scale = self.options.font_scale;
let thickness = self.options.thickness;
let padding = self.options.padding;
// Calculate total height of all lines
let mut line_heights = Vec::with_capacity(lines.len());
let mut max_width = 0;
for line in &lines {
let size = imgproc::get_text_size(line, font, font_scale, thickness, &mut 0)?;
line_heights.push(size.height);
max_width = max_width.max(size.width);
}
let total_height: i32 = line_heights.iter().sum();
let line_spacing = 4; // Space between lines
let total_space_height = line_spacing * (lines.len() as i32 - 1);
let text_block_height = total_height + total_space_height + padding * 2;
let text_block_width = max_width + padding * 2;
// Calculate watermark position
let (x, y) = match self.options.position {
WatermarkPosition::TopLeft => (padding, padding),
WatermarkPosition::TopRight => (width - text_block_width - padding, padding),
WatermarkPosition::BottomLeft => (padding, height - text_block_height - padding),
WatermarkPosition::BottomRight => (
width - text_block_width - padding,
height - text_block_height - padding
),
WatermarkPosition::Custom(x, y) => (x as i32, y as i32),
};
// Draw background rectangle if enabled
if self.options.background {
let bg_color = core::Scalar::new(
self.options.background_color.0 as f64,
self.options.background_color.1 as f64,
self.options.background_color.2 as f64,
self.options.background_color.3 as f64,
);
let rect = core::Rect::new(
x,
y,
text_block_width,
text_block_height
);
imgproc::rectangle(
frame,
rect,
bg_color,
-1, // Fill
imgproc::LINE_8,
0,
)?;
}
// Draw text lines
let text_color = core::Scalar::new(
self.options.color.0 as f64,
self.options.color.1 as f64,
self.options.color.2 as f64,
self.options.color.3 as f64,
);
let mut current_y = y + padding + line_heights[0];
for (i, line) in lines.iter().enumerate() {
imgproc::put_text(
frame,
line,
core::Point::new(x + padding, current_y),
font,
font_scale,
text_color,
thickness,
imgproc::LINE_AA,
false,
)?;
if i < lines.len() - 1 {
current_y += line_heights[i] + line_spacing;
}
}
Ok(())
}
}