use anyhow::{Context, Result}; use chrono::Utc; use clap::Parser; use log::{debug, error, info, warn}; use opencv::{core, highgui, imgcodecs, imgproc, prelude::*, videoio}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::runtime::Runtime; use meteor_detect::camera::frame_buffer::Frame; use meteor_detect::detection::{ AggregationStrategy, BrightnessDetector, BrightnessDetectorParams, CamsDetector, CamsDetectorParams, DetectionResult, DetectorConfig, DetectorFactory, MeteorDetector, PipelineConfig }; /// Command line arguments for detector demo #[derive(Parser, Debug)] #[clap(author, version, about = "Meteor detection demo")] struct Args { /// Input video file path or camera device number #[clap(short, long)] input: String, /// Output directory for feature images #[clap(short, long, default_value = "output")] output_dir: PathBuf, /// Brightness threshold for meteor detection #[clap(long, default_value = "30")] brightness_threshold: u8, /// Std-to-avg ratio threshold for meteor detection #[clap(long, default_value = "1.5")] std_ratio_threshold: f32, /// Minimum pixel count to trigger detection #[clap(long, default_value = "10")] min_pixel_count: u32, /// Minimum trajectory length #[clap(long, default_value = "5")] min_trajectory_length: u32, /// Save all feature images, not just detections #[clap(long)] save_all: bool, /// Display frames in real-time #[clap(short, long)] display: bool, /// Batch size (number of frames to process) #[clap(short, long, default_value = "256")] batch_size: usize, /// Detector type (cams, brightness, both) #[clap(short, long, default_value = "cams")] detector: String, /// Minimum brightness delta for brightness detector #[clap(long, default_value = "30.0")] min_brightness_delta: f32, /// Minimum pixel change for brightness detector #[clap(long, default_value = "10")] min_pixel_change: u32, /// Minimum consecutive frames for brightness detector #[clap(long, default_value = "3")] min_frames: u32, /// Sensitivity for brightness detector #[clap(long, default_value = "0.7")] sensitivity: f32, /// Run detection in parallel #[clap(long)] parallel: bool, /// Result aggregation strategy (any, all, majority) #[clap(long, default_value = "any")] aggregation: String, } fn main() -> Result<()> { // Set up logging env_logger::init(); // Parse command line arguments let args = Args::parse(); // Create output directory if it doesn't exist std::fs::create_dir_all(&args.output_dir)?; // Set up detector parameters let cams_params = CamsDetectorParams { brightness_threshold: args.brightness_threshold, std_to_avg_ratio_threshold: args.std_ratio_threshold, min_pixel_count: args.min_pixel_count, min_trajectory_length: args.min_trajectory_length, save_all_feature_images: args.save_all, output_dir: args.output_dir.clone(), file_prefix: "meteor".to_string(), id: "cams-demo".to_string(), }; let brightness_params = BrightnessDetectorParams { min_brightness_delta: args.min_brightness_delta, min_pixel_change: args.min_pixel_change, min_frames: args.min_frames, sensitivity: args.sensitivity, id: "brightness-demo".to_string(), }; // Create detectors based on command line argument let mut detectors: Vec> = Vec::new(); match args.detector.as_str() { "cams" => { detectors.push(Box::new(CamsDetector::with_params(cams_params))); info!("Using CAMS detector"); }, "brightness" => { detectors.push(Box::new(BrightnessDetector::with_params(brightness_params))); info!("Using brightness detector"); }, "both" => { detectors.push(Box::new(CamsDetector::with_params(cams_params))); detectors.push(Box::new(BrightnessDetector::with_params(brightness_params))); info!("Using both CAMS and brightness detectors"); }, _ => { return Err(anyhow::anyhow!("Unknown detector type: {}", args.detector)); } } // Set up detection pipeline config if running in parallel let aggregation_strategy = match args.aggregation.as_str() { "any" => AggregationStrategy::Any, "all" => AggregationStrategy::All, "majority" => AggregationStrategy::Majority, _ => AggregationStrategy::Any, }; // Open video source let mut cap = if args.input.chars().all(char::is_numeric) { // Input is a camera device number let device_id = args.input.parse::().unwrap_or(0); videoio::VideoCapture::new(device_id, videoio::CAP_ANY)? } else { // Input is a video file videoio::VideoCapture::from_file(&args.input, videoio::CAP_ANY)? }; // Check if video source is opened if !cap.is_opened()? { return Err(anyhow::anyhow!("Failed to open video source: {}", args.input)); } // Get video properties let fps = cap.get(videoio::CAP_PROP_FPS)?; let frame_width = cap.get(videoio::CAP_PROP_FRAME_WIDTH)? as i32; let frame_height = cap.get(videoio::CAP_PROP_FRAME_HEIGHT)? as i32; let frame_count = cap.get(videoio::CAP_PROP_FRAME_COUNT)? as i64; info!( "Video source: {}x{}, {:.2} fps, {} frames", frame_width, frame_height, fps, frame_count ); // Create window if display is enabled if args.display { highgui::named_window("Video", highgui::WINDOW_NORMAL)?; highgui::resize_window("Video", 800, 600)?; } // Initialize frame counter and runtime let mut frame_idx: u64 = 0; let mut runtime = if args.parallel { Some(Runtime::new()?) } else { None }; // Start time let start_time = Instant::now(); // Process frames let mut frame = core::Mat::default(); let mut detections = 0; // Convert detectors to thread-safe Arc>> for parallel mode let shared_detectors: Vec>>> = detectors .into_iter() .map(|d| Arc::new(Mutex::new(d))) .collect(); while cap.read(&mut frame)? { // Skip empty frames if frame.empty() { warn!("Empty frame received, skipping"); continue; } // Process frame with detector(s) let results = if args.parallel && runtime.is_some() { // Process in parallel using tokio runtime let runtime = runtime.as_ref().unwrap(); runtime.block_on(async { // Create a future for each detector let mut handles = Vec::with_capacity(shared_detectors.len()); for detector in &shared_detectors { let frame = frame.clone(); let detector = detector.clone(); let handle = tokio::spawn(async move { let mut detector = detector.lock().unwrap(); match detector.process_frame(&frame, frame_idx) { Ok(result) => Some(result), Err(e) => { error!("Error processing frame with detector {}: {}", detector.get_id(), e); None } } }); handles.push(handle); } // Wait for all detectors to complete let results = futures::future::join_all(handles).await; // Collect results results.into_iter() .filter_map(|r| r.ok().flatten()) .collect::>() }) } else { // Process sequentially let mut results = Vec::new(); for detector in &shared_detectors { let mut detector = detector.lock().unwrap(); match detector.process_frame(&frame, frame_idx) { Ok(result) => { results.push(result); } Err(e) => { error!("Error processing frame with detector {}: {}", detector.get_id(), e); } } } results }; // Aggregate results based on strategy let detected = match aggregation_strategy { AggregationStrategy::Any => results.iter().any(|r| r.detected), AggregationStrategy::All => results.iter().all(|r| r.detected) && !results.is_empty(), AggregationStrategy::Majority => { let count = results.iter().filter(|r| r.detected).count(); count > results.len() / 2 && !results.is_empty() }, _ => results.iter().any(|r| r.detected), }; // Find result with highest confidence let result = if detected { results.iter() .filter(|r| r.detected) .max_by(|a, b| a.confidence.partial_cmp(&b.confidence).unwrap()) .cloned() } else { results.first().cloned() }; if let Some(result) = result { if result.detected { detections += 1; info!( "Meteor detected at frame {}: confidence={:.2}, pixels={}, detector={}", frame_idx, result.confidence, result.pixel_change, result.detector_id.as_deref().unwrap_or("unknown") ); // Draw bounding box if we have one if let Some(bbox) = result.bounding_box { let rect = core::Rect::new( bbox.0 as i32, bbox.1 as i32, bbox.2 as i32, bbox.3 as i32, ); let mut display_frame = frame.clone(); imgproc::rectangle( &mut display_frame, rect, core::Scalar::new(0.0, 255.0, 0.0, 0.0), 2, imgproc::LINE_8, 0, )?; // Save detection frame let detection_path = args .output_dir .join(format!("detection_{}_frame.jpg", frame_idx)); imgcodecs::imwrite( detection_path.to_str().unwrap(), &display_frame, &core::Vector::new(), )?; } } // Display progress if frame_idx % 100 == 0 { let elapsed = start_time.elapsed(); let fps = frame_idx as f64 / elapsed.as_secs_f64(); debug!( "Processed {} frames ({:.2} fps), {} detections", frame_idx, fps, detections, ); } // Display frame if enabled if args.display { // Draw text with frame information let mut display_frame = frame.clone(); let text = format!( "Frame: {} | Detections: {} | {}", frame_idx, detections, if detected { format!("METEOR DETECTED - Confidence: {:.2}", result.confidence) } else { "No detection".to_string() } ); imgproc::put_text( &mut display_frame, &text, core::Point::new(20, 40), imgproc::FONT_HERSHEY_SIMPLEX, 0.7, core::Scalar::new(0.0, 255.0, 0.0, 0.0), 2, imgproc::LINE_8, false, )?; // Show frame highgui::imshow("Video", &display_frame)?; // Wait for key press (30ms delay for video playback) let key = highgui::wait_key(30)?; if key > 0 && key != 255 { // 'q' or ESC key pressed if key == 113 || key == 27 { info!("User requested exit"); break; } } } } frame_idx += 1; } // Close video display window if enabled if args.display { highgui::destroy_all_windows()?; } // Calculate overall statistics let elapsed = start_time.elapsed(); let overall_fps = frame_idx as f64 / elapsed.as_secs_f64(); info!( "Processing complete: {} frames, {} detections, {:.2} fps average", frame_idx, detections, overall_fps ); // Display detection mode info info!( "Detection mode: {}, {}", if args.parallel { "parallel" } else { "sequential" }, match aggregation_strategy { AggregationStrategy::Any => "any detector", AggregationStrategy::All => "all detectors", AggregationStrategy::Majority => "majority of detectors", _ => "custom strategy", } ); Ok(()) }