2025-04-22 01:46:51 +08:00

403 lines
17 KiB
Rust

//! Application orchestrator and dependency manager
//!
//! This module provides a centralized Application struct that handles
//! initialization, lifecycle management, and component wiring.
use anyhow::{Context, Result};
use chrono::Utc;
use log::{info, error, warn, debug};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::{Mutex, mpsc};
use tokio::task::{JoinHandle};
use tokio::signal;
use crate::camera::{self, CameraController, Frame, MeteorEvent};
use crate::config::{self, Config};
use crate::detection::{self, DetectionPipeline};
use crate::events::{EventBus, GpsStatusEvent, EnvironmentEvent, FrameEvent, DetectionEvent};
use crate::gps::{self, GpsController, GpsStatus};
use crate::hooks::{self, HookManager};
use crate::monitoring::{self, SystemMonitor};
use crate::overlay::{self, watermark, star_chart};
use crate::sensors::{self, SensorController, EnvironmentData};
use crate::storage::{self, StorageManager};
use crate::streaming::{self, RtspServer};
use crate::communication::{self, CommunicationManager};
/// Main application orchestrator
pub struct Application {
/// Application configuration
config: Config,
/// Event bus for component communication
event_bus: EventBus,
/// Collection of background tasks
tasks: Vec<JoinHandle<()>>,
/// Set to true when application shutdown has been requested
shutdown_requested: bool,
// Core components
camera_controller: Option<Arc<Mutex<CameraController>>>,
gps_controller: Option<Arc<Mutex<GpsController>>>,
sensor_controller: Option<Arc<Mutex<SensorController>>>,
detection_pipeline: Option<Arc<Mutex<detection::DetectionPipeline>>>,
hook_manager: Option<Arc<Mutex<HookManager>>>,
watermark: Option<Arc<Mutex<watermark::Watermark>>>,
star_chart: Option<Arc<Mutex<star_chart::StarChart>>>,
storage_manager: Option<storage::StorageManager>,
communication_manager: Option<Arc<Mutex<communication::CommunicationManager>>>,
rtsp_server: Option<Arc<Mutex<RtspServer>>>,
system_monitor: Option<monitoring::SystemMonitor>,
}
impl Application {
/// Create a new application instance with the given configuration
pub fn new(config: Config) -> Self {
Self {
config,
event_bus: EventBus::new(),
tasks: Vec::new(),
shutdown_requested: false,
camera_controller: None,
gps_controller: None,
sensor_controller: None,
detection_pipeline: None,
hook_manager: None,
watermark: None,
star_chart: None,
storage_manager: None,
communication_manager: None,
rtsp_server: None,
system_monitor: None,
}
}
/// Initialize the application components
pub async fn initialize(&mut self) -> Result<()> {
info!("Initializing application components");
// Initialize camera
let camera_controller = camera::CameraController::new(&self.config)
.await
.context("Failed to initialize camera")?;
self.camera_controller = Some(Arc::new(Mutex::new(camera_controller)));
// Initialize GPS
let gps_controller = gps::GpsController::new(&self.config)
.await
.context("Failed to initialize GPS")?;
self.gps_controller = Some(Arc::new(Mutex::new(gps_controller)));
// Initialize sensors
let sensor_controller = sensors::SensorController::new(&self.config)
.await
.context("Failed to initialize sensors")?;
self.sensor_controller = Some(Arc::new(Mutex::new(sensor_controller)));
// Initialize hook manager
let hook_manager = hooks::HookManager::new();
self.hook_manager = Some(Arc::new(Mutex::new(hook_manager)));
// Initialize watermark
// Note: This still uses the old approach with direct dependencies.
// In a future refactoring, this would be updated to use event subscriptions.
let watermark = {
let gps_status = self.gps_controller.as_ref().unwrap().lock().await.get_status();
let env_data = self.sensor_controller.as_ref().unwrap().lock().await.get_current_data();
overlay::watermark::Watermark::new(
self.config.watermark.clone(),
Arc::new(StdMutex::new(env_data)),
Arc::new(StdMutex::new(gps_status)),
)
};
self.watermark = Some(Arc::new(Mutex::new(watermark)));
// Initialize star chart overlay
let star_chart = star_chart::StarChart::new(
self.config.star_chart.clone(),
Arc::new(StdMutex::new(self.gps_controller.as_ref().unwrap().lock().await.get_status()))
).await.context("Failed to initialize star chart overlay")?;
self.star_chart = Some(Arc::new(Mutex::new(star_chart)));
// Initialize RTSP server if enabled
if self.config.rtsp.enabled {
let rtsp_server = streaming::RtspServer::new(self.config.rtsp.clone());
self.rtsp_server = Some(Arc::new(Mutex::new(rtsp_server)));
}
// Initialize detection pipeline
// Note: This still uses the original constructor that takes a direct dependency
// on the camera controller. In a future refactoring, this would be updated to
// use frame events instead.
let detection_pipeline = detection::DetectionPipeline::new(
self.camera_controller.as_ref().unwrap().clone(),
&self.config,
self.config.detection.pipeline.clone()
).context("Failed to initialize detection pipeline")?;
self.detection_pipeline = Some(Arc::new(Mutex::new(detection_pipeline)));
// Initialize storage
let storage_manager = storage::StorageManager::new(&self.config)
.await
.context("Failed to initialize storage")?;
self.storage_manager = Some(storage_manager);
// Initialize communication
// Note: This still uses the original constructor with direct dependencies.
// In a future refactoring, this would be updated to use event subscriptions.
let communication_manager = communication::CommunicationManager::new(
&self.config,
self.camera_controller.as_ref().unwrap().clone(),
self.gps_controller.as_ref().unwrap().clone(),
).await.context("Failed to initialize communication")?;
self.communication_manager = Some(Arc::new(Mutex::new(communication_manager)));
// Initialize monitoring
let system_monitor = monitoring::SystemMonitor::new(&self.config)
.await
.context("Failed to initialize system monitor")?;
self.system_monitor = Some(system_monitor);
info!("All application components initialized");
Ok(())
}
/// Start application components and set up event handlers
pub async fn start(&mut self) -> Result<()> {
info!("Starting application components");
// Setup frame hook for watermark
{
if let (Some(hook_manager), Some(watermark)) = (&self.hook_manager, &self.watermark) {
let mut manager = hook_manager.lock().await;
let watermark_clone = watermark.clone();
manager.register_hook(Box::new(hooks::BasicFrameHook::new(
"watermark",
"Watermark Overlay",
"Adds timestamp, GPS, and sensor data overlay to frames",
self.config.watermark.enabled,
move |frame, timestamp| {
// Using block_in_place to properly handle async operations in sync context
tokio::task::block_in_place(|| {
let mut guard = futures::executor::block_on(watermark_clone.lock());
guard.apply(frame, timestamp)
})
},
)));
}
}
// Setup frame hook for star chart overlay
{
if let (Some(hook_manager), Some(star_chart)) = (&self.hook_manager, &self.star_chart) {
let mut manager = hook_manager.lock().await;
let star_chart_clone = star_chart.clone();
manager.register_hook(Box::new(hooks::BasicFrameHook::new(
"star_chart",
"Star Chart Overlay",
"Adds calculated star chart overlay to frames",
self.config.star_chart.enabled,
move |frame, timestamp| {
// Using block_in_place to properly handle async operations in sync context
tokio::task::block_in_place(|| {
let mut guard = futures::executor::block_on(star_chart_clone.lock());
futures::executor::block_on(guard.apply(frame, timestamp))
})
},
)));
}
}
// Start sensors
if let Some(controller) = &self.sensor_controller {
controller.lock().await.initialize().await
.context("Failed to initialize sensors")?;
controller.lock().await.start().await
.context("Failed to start sensors")?;
// Add task to bridge sensor data to event bus (publisher)
let event_sender = self.event_bus.environment_sender();
let controller_clone = controller.clone();
self.tasks.push(tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let env_data = match controller_clone.lock().await.get_current_data() {
data => data.clone(),
};
let event = EnvironmentEvent {
timestamp: Utc::now(),
temperature: env_data.temperature,
humidity: env_data.humidity,
sky_brightness: env_data.sky_brightness,
};
if let Err(e) = event_sender.send(event) {
warn!("Failed to publish environment event: {}", e);
}
}
}));
}
// Start GPS
if let Some(controller) = &self.gps_controller {
controller.lock().await.initialize().await
.context("Failed to initialize GPS")?;
controller.lock().await.start().await
.context("Failed to start GPS")?;
// Add task to bridge GPS data to event bus
let event_sender = self.event_bus.gps_sender();
let controller_clone = controller.clone();
self.tasks.push(tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let gps_status = match controller_clone.lock().await.get_status() {
status => status.clone(),
};
let event = GpsStatusEvent {
timestamp: Utc::now(),
latitude: gps_status.position.latitude,
longitude: gps_status.position.longitude,
altitude: gps_status.position.altitude,
azimuth: gps_status.camera_orientation.azimuth,
elevation: gps_status.camera_orientation.elevation,
sync_status: gps_status.sync_status.clone(),
time_accuracy_ms: gps_status.time_accuracy_ms,
satellites: gps_status.satellites,
};
if let Err(e) = event_sender.send(event) {
warn!("Failed to publish GPS event: {}", e);
}
}
}));
}
// Start RTSP server if enabled
if let Some(rtsp_server) = &self.rtsp_server {
rtsp_server.lock().await.start().await
.context("Failed to start RTSP server")?;
info!("RTSP server started at {}", rtsp_server.lock().await.get_url());
// Add task to feed frames to RTSP server
// This would be replaced with an event subscription in a future refactoring
let server_clone = rtsp_server.clone();
let rtsp_enabled = self.config.rtsp.enabled;
self.tasks.push(tokio::spawn(async move {
if rtsp_enabled {
info!("Starting RTSP streaming task");
// Implement a proper continuous loop to keep the task alive
loop {
// This would normally feed frames to the RTSP server
// For now, just sleep to keep the task alive
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}));
}
// Start detection pipeline and connect it to the event bus
if let Some(pipeline) = &self.detection_pipeline {
// Spawn detection pipeline task
let pipeline_clone = pipeline.clone();
self.tasks.push(tokio::spawn(async move {
if let Err(e) = pipeline_clone.lock().await.run().await {
error!("Detection pipeline error: {}", e);
}
}));
// In a future refactoring, we'd add an adapter here to transform
// detection pipeline events into DetectionEvent messages on the event bus
}
// Start communication manager
if let Some(comms) = &self.communication_manager {
// Spawn communication task using a cloned Arc<Mutex<...>>
let comms_clone = comms.clone();
self.tasks.push(tokio::spawn(async move {
if let Err(e) = comms_clone.lock().await.run().await {
error!("Communication manager error: {}", e);
}
}));
// Note: We've moved the communication manager into the task
// In a future event-based architecture, we'd have a CommunicationService
// that would act on events from the event bus instead of having direct
// dependencies on other components
}
// Start system monitor
if let Some(monitor) = self.system_monitor.take() {
// Spawn monitor task and move ownership to the task
self.tasks.push(tokio::spawn(async move {
if let Err(e) = monitor.run().await {
error!("System monitor error: {}", e);
}
}));
// Note: We've moved the system monitor into the task
// In a future event-based architecture, the monitoring would
// likely subscribe to events from other components
}
info!("All application components started");
Ok(())
}
/// Run the application until shutdown is requested
pub async fn run(&mut self) -> Result<()> {
info!("Application running, press Ctrl+C to exit");
// Wait for shutdown signal
match signal::ctrl_c().await {
Ok(()) => {
info!("Shutdown signal received, preparing to exit");
self.shutdown_requested = true;
}
Err(err) => {
error!("Error waiting for Ctrl+C: {}", err);
}
}
// Shutdown the star chart component
if let Some(star_chart) = &self.star_chart {
if let Err(e) = star_chart.lock().await.shutdown().await {
error!("Error shutting down star chart: {}", e);
}
}
// Shutdown all tasks
for task in self.tasks.drain(..) {
task.abort();
}
info!("Application shutdown complete");
Ok(())
}
/// Request application shutdown
pub fn request_shutdown(&mut self) {
self.shutdown_requested = true;
}
/// Get a reference to the event bus
pub fn event_bus(&self) -> &EventBus {
&self.event_bus
}
/// Get a mutable reference to the event bus
pub fn event_bus_mut(&mut self) -> &mut EventBus {
&mut self.event_bus
}
}