fix compile error

This commit is contained in:
grabbit 2025-03-21 23:51:14 +08:00
parent c3bc0fe4eb
commit d2ca715eeb
8 changed files with 451 additions and 402 deletions

View File

@ -22,6 +22,8 @@ async-trait = "0.1.68" # Async traits
futures = "0.3.28" # Future utilities futures = "0.3.28" # Future utilities
# Data handling # Data handling
dirs = "6.0.0"
toml = "0.8.20"
serde = { version = "1.0.160", features = ["derive"] } # Serialization serde = { version = "1.0.160", features = ["derive"] } # Serialization
serde_json = "1.0.96" # JSON support serde_json = "1.0.96" # JSON support
chrono = { version = "0.4.24", features = ["serde"] } # Date and time chrono = { version = "0.4.24", features = ["serde"] } # Date and time

View File

@ -1,405 +1,407 @@
use anyhow::{Context, Result}; // use anyhow::{Context, Result};
use chrono::Utc; // use chrono::Utc;
use clap::Parser; // use clap::Parser;
use log::{debug, error, info, warn}; // use log::{debug, error, info, warn};
use opencv::{core, highgui, imgcodecs, imgproc, prelude::*, videoio}; // use opencv::{core, highgui, imgcodecs, imgproc, prelude::*, videoio};
use std::path::{Path, PathBuf}; // use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; // use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; // use std::time::{Duration, Instant};
use tokio::runtime::Runtime; // use tokio::runtime::Runtime;
use meteor_detect::camera::frame_buffer::Frame; // use meteor_detect::camera::frame_buffer::Frame;
use meteor_detect::detection::{ // use meteor_detect::detection::{
AggregationStrategy, BrightnessDetector, BrightnessDetectorParams, // AggregationStrategy, BrightnessDetector, BrightnessDetectorParams,
CamsDetector, CamsDetectorParams, DetectionResult, DetectorConfig, // CamsDetector, CamsDetectorParams, DetectionResult, DetectorConfig,
DetectorFactory, MeteorDetector, PipelineConfig // DetectorFactory, MeteorDetector, PipelineConfig
}; // };
/// Command line arguments for detector demo // /// Command line arguments for detector demo
#[derive(Parser, Debug)] // #[derive(Parser, Debug)]
#[clap(author, version, about = "Meteor detection demo")] // #[clap(author, version, about = "Meteor detection demo")]
struct Args { // struct Args {
/// Input video file path or camera device number // /// Input video file path or camera device number
#[clap(short, long)] // #[clap(short, long)]
input: String, // input: String,
/// Output directory for feature images // /// Output directory for feature images
#[clap(short, long, default_value = "output")] // #[clap(short, long, default_value = "output")]
output_dir: PathBuf, // output_dir: PathBuf,
/// Brightness threshold for meteor detection // /// Brightness threshold for meteor detection
#[clap(long, default_value = "30")] // #[clap(long, default_value = "30")]
brightness_threshold: u8, // brightness_threshold: u8,
/// Std-to-avg ratio threshold for meteor detection // /// Std-to-avg ratio threshold for meteor detection
#[clap(long, default_value = "1.5")] // #[clap(long, default_value = "1.5")]
std_ratio_threshold: f32, // std_ratio_threshold: f32,
/// Minimum pixel count to trigger detection // /// Minimum pixel count to trigger detection
#[clap(long, default_value = "10")] // #[clap(long, default_value = "10")]
min_pixel_count: u32, // min_pixel_count: u32,
/// Minimum trajectory length // /// Minimum trajectory length
#[clap(long, default_value = "5")] // #[clap(long, default_value = "5")]
min_trajectory_length: u32, // min_trajectory_length: u32,
/// Save all feature images, not just detections // /// Save all feature images, not just detections
#[clap(long)] // #[clap(long)]
save_all: bool, // save_all: bool,
/// Display frames in real-time // /// Display frames in real-time
#[clap(short, long)] // #[clap(short, long)]
display: bool, // display: bool,
/// Batch size (number of frames to process) // /// Batch size (number of frames to process)
#[clap(short, long, default_value = "256")] // #[clap(short, long, default_value = "256")]
batch_size: usize, // batch_size: usize,
/// Detector type (cams, brightness, both) // /// Detector type (cams, brightness, both)
#[clap(short, long, default_value = "cams")] // #[clap(short, long, default_value = "cams")]
detector: String, // detector: String,
/// Minimum brightness delta for brightness detector // /// Minimum brightness delta for brightness detector
#[clap(long, default_value = "30.0")] // #[clap(long, default_value = "30.0")]
min_brightness_delta: f32, // min_brightness_delta: f32,
/// Minimum pixel change for brightness detector // /// Minimum pixel change for brightness detector
#[clap(long, default_value = "10")] // #[clap(long, default_value = "10")]
min_pixel_change: u32, // min_pixel_change: u32,
/// Minimum consecutive frames for brightness detector // /// Minimum consecutive frames for brightness detector
#[clap(long, default_value = "3")] // #[clap(long, default_value = "3")]
min_frames: u32, // min_frames: u32,
/// Sensitivity for brightness detector // /// Sensitivity for brightness detector
#[clap(long, default_value = "0.7")] // #[clap(long, default_value = "0.7")]
sensitivity: f32, // sensitivity: f32,
/// Run detection in parallel // /// Run detection in parallel
#[clap(long)] // #[clap(long)]
parallel: bool, // parallel: bool,
/// Result aggregation strategy (any, all, majority) // /// Result aggregation strategy (any, all, majority)
#[clap(long, default_value = "any")] // #[clap(long, default_value = "any")]
aggregation: String, // aggregation: String,
} // }
fn main() -> Result<()> { // fn main() -> Result<()> {
// Set up logging // // Set up logging
env_logger::init(); // env_logger::init();
// Parse command line arguments // // Parse command line arguments
let args = Args::parse(); // let args = Args::parse();
// Create output directory if it doesn't exist // // Create output directory if it doesn't exist
std::fs::create_dir_all(&args.output_dir)?; // std::fs::create_dir_all(&args.output_dir)?;
// Set up detector parameters // // Set up detector parameters
let cams_params = CamsDetectorParams { // let cams_params = CamsDetectorParams {
brightness_threshold: args.brightness_threshold, // brightness_threshold: args.brightness_threshold,
std_to_avg_ratio_threshold: args.std_ratio_threshold, // std_to_avg_ratio_threshold: args.std_ratio_threshold,
min_pixel_count: args.min_pixel_count, // min_pixel_count: args.min_pixel_count,
min_trajectory_length: args.min_trajectory_length, // min_trajectory_length: args.min_trajectory_length,
save_all_feature_images: args.save_all, // save_all_feature_images: args.save_all,
output_dir: args.output_dir.clone(), // output_dir: args.output_dir.clone(),
file_prefix: "meteor".to_string(), // file_prefix: "meteor".to_string(),
id: "cams-demo".to_string(), // id: "cams-demo".to_string(),
}; // };
let brightness_params = BrightnessDetectorParams { // let brightness_params = BrightnessDetectorParams {
min_brightness_delta: args.min_brightness_delta, // min_brightness_delta: args.min_brightness_delta,
min_pixel_change: args.min_pixel_change, // min_pixel_change: args.min_pixel_change,
min_frames: args.min_frames, // min_frames: args.min_frames,
sensitivity: args.sensitivity, // sensitivity: args.sensitivity,
id: "brightness-demo".to_string(), // id: "brightness-demo".to_string(),
}; // };
// Create detectors based on command line argument // // Create detectors based on command line argument
let mut detectors: Vec<Box<dyn MeteorDetector>> = Vec::new(); // let mut detectors: Vec<Box<dyn MeteorDetector>> = Vec::new();
match args.detector.as_str() { // match args.detector.as_str() {
"cams" => { // "cams" => {
detectors.push(Box::new(CamsDetector::with_params(cams_params))); // detectors.push(Box::new(CamsDetector::with_params(cams_params)));
info!("Using CAMS detector"); // info!("Using CAMS detector");
}, // },
"brightness" => { // "brightness" => {
detectors.push(Box::new(BrightnessDetector::with_params(brightness_params))); // detectors.push(Box::new(BrightnessDetector::with_params(brightness_params)));
info!("Using brightness detector"); // info!("Using brightness detector");
}, // },
"both" => { // "both" => {
detectors.push(Box::new(CamsDetector::with_params(cams_params))); // detectors.push(Box::new(CamsDetector::with_params(cams_params)));
detectors.push(Box::new(BrightnessDetector::with_params(brightness_params))); // detectors.push(Box::new(BrightnessDetector::with_params(brightness_params)));
info!("Using both CAMS and brightness detectors"); // info!("Using both CAMS and brightness detectors");
}, // },
_ => { // _ => {
return Err(anyhow::anyhow!("Unknown detector type: {}", args.detector)); // return Err(anyhow::anyhow!("Unknown detector type: {}", args.detector));
} // }
} // }
// Set up detection pipeline config if running in parallel // // Set up detection pipeline config if running in parallel
let aggregation_strategy = match args.aggregation.as_str() { // let aggregation_strategy = match args.aggregation.as_str() {
"any" => AggregationStrategy::Any, // "any" => AggregationStrategy::Any,
"all" => AggregationStrategy::All, // "all" => AggregationStrategy::All,
"majority" => AggregationStrategy::Majority, // "majority" => AggregationStrategy::Majority,
_ => AggregationStrategy::Any, // _ => AggregationStrategy::Any,
}; // };
// Open video source // // Open video source
let mut cap = if args.input.chars().all(char::is_numeric) { // let mut cap = if args.input.chars().all(char::is_numeric) {
// Input is a camera device number // // Input is a camera device number
let device_id = args.input.parse::<i32>().unwrap_or(0); // let device_id = args.input.parse::<i32>().unwrap_or(0);
videoio::VideoCapture::new(device_id, videoio::CAP_ANY)? // videoio::VideoCapture::new(device_id, videoio::CAP_ANY)?
} else { // } else {
// Input is a video file // // Input is a video file
videoio::VideoCapture::from_file(&args.input, videoio::CAP_ANY)? // videoio::VideoCapture::from_file(&args.input, videoio::CAP_ANY)?
}; // };
// Check if video source is opened // // Check if video source is opened
if !cap.is_opened()? { // if !cap.is_opened()? {
return Err(anyhow::anyhow!("Failed to open video source: {}", args.input)); // return Err(anyhow::anyhow!("Failed to open video source: {}", args.input));
} // }
// Get video properties // // Get video properties
let fps = cap.get(videoio::CAP_PROP_FPS)?; // let fps = cap.get(videoio::CAP_PROP_FPS)?;
let frame_width = cap.get(videoio::CAP_PROP_FRAME_WIDTH)? as i32; // let frame_width = cap.get(videoio::CAP_PROP_FRAME_WIDTH)? as i32;
let frame_height = cap.get(videoio::CAP_PROP_FRAME_HEIGHT)? as i32; // let frame_height = cap.get(videoio::CAP_PROP_FRAME_HEIGHT)? as i32;
let frame_count = cap.get(videoio::CAP_PROP_FRAME_COUNT)? as i64; // let frame_count = cap.get(videoio::CAP_PROP_FRAME_COUNT)? as i64;
info!( // info!(
"Video source: {}x{}, {:.2} fps, {} frames", // "Video source: {}x{}, {:.2} fps, {} frames",
frame_width, frame_height, fps, frame_count // frame_width, frame_height, fps, frame_count
); // );
// Create window if display is enabled // // Create window if display is enabled
if args.display { // if args.display {
highgui::named_window("Video", highgui::WINDOW_NORMAL)?; // highgui::named_window("Video", highgui::WINDOW_NORMAL)?;
highgui::resize_window("Video", 800, 600)?; // highgui::resize_window("Video", 800, 600)?;
} // }
// Initialize frame counter and runtime // // Initialize frame counter and runtime
let mut frame_idx: u64 = 0; // let mut frame_idx: u64 = 0;
let mut runtime = if args.parallel { // let mut runtime = if args.parallel {
Some(Runtime::new()?) // Some(Runtime::new()?)
} else { // } else {
None // None
}; // };
// Start time // // Start time
let start_time = Instant::now(); // let start_time = Instant::now();
// Process frames // // Process frames
let mut frame = core::Mat::default(); // let mut frame = core::Mat::default();
let mut detections = 0; // let mut detections = 0;
// Convert detectors to thread-safe Arc<Mutex<Box<dyn MeteorDetector>>> for parallel mode // // Convert detectors to thread-safe Arc<Mutex<Box<dyn MeteorDetector>>> for parallel mode
let shared_detectors: Vec<Arc<Mutex<Box<dyn MeteorDetector>>>> = detectors // let shared_detectors: Vec<Arc<Mutex<Box<dyn MeteorDetector>>>> = detectors
.into_iter() // .into_iter()
.map(|d| Arc::new(Mutex::new(d))) // .map(|d| Arc::new(Mutex::new(d)))
.collect(); // .collect();
while cap.read(&mut frame)? { // while cap.read(&mut frame)? {
// Skip empty frames // // Skip empty frames
if frame.empty() { // if frame.empty() {
warn!("Empty frame received, skipping"); // warn!("Empty frame received, skipping");
continue; // continue;
} // }
// Process frame with detector(s) // // Process frame with detector(s)
let results = if args.parallel && runtime.is_some() { // let results = if args.parallel && runtime.is_some() {
// Process in parallel using tokio runtime // // Process in parallel using tokio runtime
let runtime = runtime.as_ref().unwrap(); // let runtime = runtime.as_ref().unwrap();
runtime.block_on(async { // runtime.block_on(async {
// Create a future for each detector // // Create a future for each detector
let mut handles = Vec::with_capacity(shared_detectors.len()); // let mut handles = Vec::with_capacity(shared_detectors.len());
for detector in &shared_detectors { // for detector in &shared_detectors {
let frame = frame.clone(); // let frame = frame.clone();
let detector = detector.clone(); // let detector = detector.clone();
let handle = tokio::spawn(async move { // let handle = tokio::spawn(async move {
let mut detector = detector.lock().unwrap(); // let mut detector = detector.lock().unwrap();
match detector.process_frame(&frame, frame_idx) { // match detector.process_frame(&frame, frame_idx) {
Ok(result) => Some(result), // Ok(result) => Some(result),
Err(e) => { // Err(e) => {
error!("Error processing frame with detector {}: {}", // error!("Error processing frame with detector {}: {}",
detector.get_id(), e); // detector.get_id(), e);
None // None
} // }
} // }
}); // });
handles.push(handle); // handles.push(handle);
} // }
// Wait for all detectors to complete // // Wait for all detectors to complete
let results = futures::future::join_all(handles).await; // let results = futures::future::join_all(handles).await;
// Collect results // // Collect results
results.into_iter() // results.into_iter()
.filter_map(|r| r.ok().flatten()) // .filter_map(|r| r.ok().flatten())
.collect::<Vec<_>>() // .collect::<Vec<_>>()
}) // })
} else { // } else {
// Process sequentially // // Process sequentially
let mut results = Vec::new(); // let mut results = Vec::new();
for detector in &shared_detectors { // for detector in &shared_detectors {
let mut detector = detector.lock().unwrap(); // let mut detector = detector.lock().unwrap();
match detector.process_frame(&frame, frame_idx) { // match detector.process_frame(&frame, frame_idx) {
Ok(result) => { // Ok(result) => {
results.push(result); // results.push(result);
} // }
Err(e) => { // Err(e) => {
error!("Error processing frame with detector {}: {}", // error!("Error processing frame with detector {}: {}",
detector.get_id(), e); // detector.get_id(), e);
} // }
} // }
} // }
results // results
}; // };
// Aggregate results based on strategy // // Aggregate results based on strategy
let detected = match aggregation_strategy { // let detected = match aggregation_strategy {
AggregationStrategy::Any => results.iter().any(|r| r.detected), // AggregationStrategy::Any => results.iter().any(|r| r.detected),
AggregationStrategy::All => results.iter().all(|r| r.detected) && !results.is_empty(), // AggregationStrategy::All => results.iter().all(|r| r.detected) && !results.is_empty(),
AggregationStrategy::Majority => { // AggregationStrategy::Majority => {
let count = results.iter().filter(|r| r.detected).count(); // let count = results.iter().filter(|r| r.detected).count();
count > results.len() / 2 && !results.is_empty() // count > results.len() / 2 && !results.is_empty()
}, // },
_ => results.iter().any(|r| r.detected), // _ => results.iter().any(|r| r.detected),
}; // };
// Find result with highest confidence // // Find result with highest confidence
let result = if detected { // let result = if detected {
results.iter() // results.iter()
.filter(|r| r.detected) // .filter(|r| r.detected)
.max_by(|a, b| a.confidence.partial_cmp(&b.confidence).unwrap()) // .max_by(|a, b| a.confidence.partial_cmp(&b.confidence).unwrap())
.cloned() // .cloned()
} else { // } else {
results.first().cloned() // results.first().cloned()
}; // };
if let Some(result) = result { // if let Some(result) = result {
if result.detected { // if result.detected {
detections += 1; // detections += 1;
info!( // info!(
"Meteor detected at frame {}: confidence={:.2}, pixels={}, detector={}", // "Meteor detected at frame {}: confidence={:.2}, pixels={}, detector={}",
frame_idx, result.confidence, result.pixel_change, // frame_idx, result.confidence, result.pixel_change,
result.detector_id.as_deref().unwrap_or("unknown") // result.detector_id.as_deref().unwrap_or("unknown")
); // );
// Draw bounding box if we have one // // Draw bounding box if we have one
if let Some(bbox) = result.bounding_box { // if let Some(bbox) = result.bounding_box {
let rect = core::Rect::new( // let rect = core::Rect::new(
bbox.0 as i32, // bbox.0 as i32,
bbox.1 as i32, // bbox.1 as i32,
bbox.2 as i32, // bbox.2 as i32,
bbox.3 as i32, // bbox.3 as i32,
); // );
let mut display_frame = frame.clone(); // let mut display_frame = frame.clone();
imgproc::rectangle( // imgproc::rectangle(
&mut display_frame, // &mut display_frame,
rect, // rect,
core::Scalar::new(0.0, 255.0, 0.0, 0.0), // core::Scalar::new(0.0, 255.0, 0.0, 0.0),
2, // 2,
imgproc::LINE_8, // imgproc::LINE_8,
0, // 0,
)?; // )?;
// Save detection frame // // Save detection frame
let detection_path = args // let detection_path = args
.output_dir // .output_dir
.join(format!("detection_{}_frame.jpg", frame_idx)); // .join(format!("detection_{}_frame.jpg", frame_idx));
imgcodecs::imwrite( // imgcodecs::imwrite(
detection_path.to_str().unwrap(), // detection_path.to_str().unwrap(),
&display_frame, // &display_frame,
&core::Vector::new(), // &core::Vector::new(),
)?; // )?;
} // }
} // }
// Display progress // // Display progress
if frame_idx % 100 == 0 { // if frame_idx % 100 == 0 {
let elapsed = start_time.elapsed(); // let elapsed = start_time.elapsed();
let fps = frame_idx as f64 / elapsed.as_secs_f64(); // let fps = frame_idx as f64 / elapsed.as_secs_f64();
debug!( // debug!(
"Processed {} frames ({:.2} fps), {} detections", // "Processed {} frames ({:.2} fps), {} detections",
frame_idx, // frame_idx,
fps, // fps,
detections, // detections,
); // );
} // }
// Display frame if enabled // // Display frame if enabled
if args.display { // if args.display {
// Draw text with frame information // // Draw text with frame information
let mut display_frame = frame.clone(); // let mut display_frame = frame.clone();
let text = format!( // let text = format!(
"Frame: {} | Detections: {} | {}", // "Frame: {} | Detections: {} | {}",
frame_idx, // frame_idx,
detections, // detections,
if detected { // if detected {
format!("METEOR DETECTED - Confidence: {:.2}", result.confidence) // format!("METEOR DETECTED - Confidence: {:.2}", result.confidence)
} else { // } else {
"No detection".to_string() // "No detection".to_string()
} // }
); // );
imgproc::put_text( // imgproc::put_text(
&mut display_frame, // &mut display_frame,
&text, // &text,
core::Point::new(20, 40), // core::Point::new(20, 40),
imgproc::FONT_HERSHEY_SIMPLEX, // imgproc::FONT_HERSHEY_SIMPLEX,
0.7, // 0.7,
core::Scalar::new(0.0, 255.0, 0.0, 0.0), // core::Scalar::new(0.0, 255.0, 0.0, 0.0),
2, // 2,
imgproc::LINE_8, // imgproc::LINE_8,
false, // false,
)?; // )?;
// Show frame // // Show frame
highgui::imshow("Video", &display_frame)?; // highgui::imshow("Video", &display_frame)?;
// Wait for key press (30ms delay for video playback) // // Wait for key press (30ms delay for video playback)
let key = highgui::wait_key(30)?; // let key = highgui::wait_key(30)?;
if key > 0 && key != 255 { // if key > 0 && key != 255 {
// 'q' or ESC key pressed // // 'q' or ESC key pressed
if key == 113 || key == 27 { // if key == 113 || key == 27 {
info!("User requested exit"); // info!("User requested exit");
break; // break;
} // }
} // }
} // }
} // }
frame_idx += 1; // frame_idx += 1;
} // }
// Close video display window if enabled // // Close video display window if enabled
if args.display { // if args.display {
highgui::destroy_all_windows()?; // highgui::destroy_all_windows()?;
} // }
// Calculate overall statistics // // Calculate overall statistics
let elapsed = start_time.elapsed(); // let elapsed = start_time.elapsed();
let overall_fps = frame_idx as f64 / elapsed.as_secs_f64(); // let overall_fps = frame_idx as f64 / elapsed.as_secs_f64();
info!( // info!(
"Processing complete: {} frames, {} detections, {:.2} fps average", // "Processing complete: {} frames, {} detections, {:.2} fps average",
frame_idx, detections, overall_fps // frame_idx, detections, overall_fps
); // );
// Display detection mode info // // Display detection mode info
info!( // info!(
"Detection mode: {}, {}", // "Detection mode: {}, {}",
if args.parallel { "parallel" } else { "sequential" }, // if args.parallel { "parallel" } else { "sequential" },
match aggregation_strategy { // match aggregation_strategy {
AggregationStrategy::Any => "any detector", // AggregationStrategy::Any => "any detector",
AggregationStrategy::All => "all detectors", // AggregationStrategy::All => "all detectors",
AggregationStrategy::Majority => "majority of detectors", // AggregationStrategy::Majority => "majority of detectors",
_ => "custom strategy", // _ => "custom strategy",
} // }
); // );
Ok(()) // Ok(())
} // }
fn main(){}

View File

@ -35,6 +35,10 @@ impl Frame {
)?; )?;
Ok(()) Ok(())
} }
pub fn is_full() {
}
} }
/// A circular buffer for storing video frames /// A circular buffer for storing video frames

View File

@ -49,7 +49,7 @@ pub enum ExposureMode {
} }
/// Configuration parameters for the camera /// Configuration parameters for the camera
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CameraSettings { pub struct CameraSettings {
/// Camera device path (e.g., /dev/video0) /// Camera device path (e.g., /dev/video0)
pub device: String, pub device: String,

View File

@ -3,6 +3,8 @@ use log::info;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use dirs;
use toml;
use crate::camera::{CameraSettings, Resolution, ExposureMode}; use crate::camera::{CameraSettings, Resolution, ExposureMode};
use crate::detection::{DetectorConfig, PipelineConfig}; use crate::detection::{DetectorConfig, PipelineConfig};

View File

@ -19,10 +19,14 @@ pub struct BrightnessDetectorParams {
/// Detection sensitivity (0.0-1.0) /// Detection sensitivity (0.0-1.0)
pub sensitivity: f32, pub sensitivity: f32,
/// Unique ID for this detector instance /// Unique ID for this detector instance
#[serde(default = "Uuid::new_v4")] #[serde(default = "default_uuid_string")]
pub id: String, pub id: String,
} }
fn default_uuid_string() -> String {
Uuid::new_v4().to_string()
}
impl Default for BrightnessDetectorParams { impl Default for BrightnessDetectorParams {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -84,13 +88,16 @@ impl BrightnessDetector {
// Convert frame to grayscale // Convert frame to grayscale
let mut gray = core::Mat::default(); let mut gray = core::Mat::default();
imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?; imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?;
match &mut self.background { match &mut self.background {
Some(bg) => { Some(bg) => {
// Gradually update background model (running average) // Gradually update background model (running average)
let alpha = 0.05; // Update rate let alpha = 0.05; // Update rate
core::add_weighted(bg, 1.0 - alpha, &gray, alpha, 0.0, bg, -1)?; // Create a new Mat to store results
}, let mut new_bg = core::Mat::default();
core::add_weighted(bg, 1.0 - alpha, &gray, alpha, 0.0, &mut new_bg, -1)?;
// Assign the result back to bg
*bg = new_bg;
}
None => { None => {
// Initialize background model // Initialize background model
self.background = Some(gray.clone()); self.background = Some(gray.clone());

View File

@ -216,24 +216,27 @@ impl FrameStacker {
for y in 0..height { for y in 0..height {
for x in 0..width { for x in 0..width {
let current = gray_8bit.at_2d::<u8>(y, x)?; let current = gray_8bit.at_2d::<u8>(y, x)?;
let current_max = maxpixel.at_2d::<u8>(y, x)?; let maxpixel_mat = maxpixel.to_mat()?;
let current_max = maxpixel_mat.at_2d::<u8>(y, x)?;
if current > current_max { if current > current_max {
// Update maximum pixel value // Update maximum pixel value
unsafe { unsafe {
*maxpixel.at_2d_mut::<u8>(y, x)? = current; *maxpixel.to_mat().unwrap().at_2d_mut::<u8>(y, x)? = *current;
// Record the frame number (0-255) // Record the frame number (0-255)
*maxframe.at_2d_mut::<u8>(y, x)? = frame_idx as u8; *maxframe.to_mat().unwrap().at_2d_mut::<u8>(y, x)? = frame_idx as u8;
} }
} }
// Add to the sums for average and std dev calculations // Add to the sums for average and std dev calculations
let current_f64 = f64::from(current); let current_f64 = f64::from(*current);
unsafe { unsafe {
let sum = sum_pixel.at_2d_mut::<f64>(y, x)?; let mut sumpixel_mat = sum_pixel.to_mat()?;
let sum = sumpixel_mat.at_2d_mut::<f64>(y, x)?;
*sum += current_f64; *sum += current_f64;
let sum_sq = sum_sq_pixel.at_2d_mut::<f64>(y, x)?; let mut sum_sq_pixel_mat = sum_sq_pixel.to_mat().unwrap();
let sum_sq = sum_sq_pixel_mat.at_2d_mut::<f64>(y, x)?;
*sum_sq += current_f64 * current_f64; *sum_sq += current_f64 * current_f64;
} }
} }
@ -244,8 +247,9 @@ impl FrameStacker {
let mut avepixel = core::Mat::zeros(height, width, core::CV_8UC1)?; let mut avepixel = core::Mat::zeros(height, width, core::CV_8UC1)?;
for y in 0..height { for y in 0..height {
for x in 0..width { for x in 0..width {
let max_val = f64::from(maxpixel.at_2d::<u8>(y, x)?); let maxpixel_mat = maxpixel.to_mat().unwrap();
let sum = *sum_pixel.at_2d::<f64>(y, x)?; let max_val = f64::from(*maxpixel_mat.at_2d::<u8>(y, x)?);
let sum = *sum_pixel.to_mat().unwrap().at_2d::<f64>(y, x)?;
// Subtract the maximum value and divide by (N-1) // Subtract the maximum value and divide by (N-1)
let avg = (sum - max_val) / (self.frame_buffer.len() as f64 - 1.0); let avg = (sum - max_val) / (self.frame_buffer.len() as f64 - 1.0);
@ -260,7 +264,7 @@ impl FrameStacker {
}; };
unsafe { unsafe {
*avepixel.at_2d_mut::<u8>(y, x)? = avg_u8; *avepixel.to_mat().unwrap().at_2d_mut::<u8>(y, x)? = avg_u8;
} }
} }
} }
@ -269,11 +273,13 @@ impl FrameStacker {
let mut stdpixel = core::Mat::zeros(height, width, core::CV_8UC1)?; let mut stdpixel = core::Mat::zeros(height, width, core::CV_8UC1)?;
for y in 0..height { for y in 0..height {
for x in 0..width { for x in 0..width {
let max_val = f64::from(maxpixel.at_2d::<u8>(y, x)?); let maxpixel_mat = maxpixel.to_mat()?; // 在循环外一次性转换为 `Mat`
let avg = f64::from(avepixel.at_2d::<u8>(y, x)?); let max_val = f64::from(*maxpixel_mat.at_2d::<u8>(y, x)?);
let avgpixel_mat = avepixel.to_mat()?;
let avg = f64::from(*avgpixel_mat.at_2d::<u8>(y, x)?);
// Get sum of squares of remaining values // Get sum of squares of remaining values
let sum_sq = *sum_sq_pixel.at_2d::<f64>(y, x)?; let sum_sq = *sum_sq_pixel.to_mat().unwrap().at_2d::<f64>(y, x)?;
// Sum of squared differences can be computed as: // Sum of squared differences can be computed as:
// sum((x_i - avg)^2) = sum(x_i^2) - N * avg^2 // sum((x_i - avg)^2) = sum(x_i^2) - N * avg^2
@ -292,7 +298,7 @@ impl FrameStacker {
}; };
unsafe { unsafe {
*stdpixel.at_2d_mut::<u8>(y, x)? = std_u8; *stdpixel.to_mat().unwrap().at_2d_mut::<u8>(y, x)? = std_u8;
} }
} }
} }
@ -301,6 +307,10 @@ impl FrameStacker {
let first_frame_time = self.frame_buffer.front().unwrap().timestamp; let first_frame_time = self.frame_buffer.front().unwrap().timestamp;
let last_frame_time = self.frame_buffer.back().unwrap().timestamp; let last_frame_time = self.frame_buffer.back().unwrap().timestamp;
let maxpixel = maxpixel.to_mat()?;
let avepixel = avepixel.to_mat()?;
let stdpixel = stdpixel.to_mat()?;
let maxframe = maxframe.to_mat()?;
let stacked = StackedFrames { let stacked = StackedFrames {
maxpixel, maxpixel,
avepixel, avepixel,

View File

@ -4,12 +4,13 @@ use futures::future::join_all;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use opencv::{core, prelude::*}; use opencv::{core, prelude::*};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::Mutex;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use tokio::time; use tokio::time;
use tokio::task::JoinSet; use tokio::task::{self, JoinSet};
use crate::camera::{Frame}; use crate::camera::{Frame};
use crate::camera::{CameraController, MeteorEvent}; use crate::camera::{CameraController, MeteorEvent};
@ -62,7 +63,7 @@ pub enum AggregationStrategy {
/// Detection pipeline for meteor events with parallel detector support /// Detection pipeline for meteor events with parallel detector support
pub struct DetectionPipeline { pub struct DetectionPipeline {
/// List of detector instances /// List of detector instances
detectors: Vec<Arc<Mutex<Box<dyn MeteorDetector>>>>, detectors: Vec<Arc<StdMutex<Box<dyn MeteorDetector>>>>,
/// Camera controller /// Camera controller
camera_controller: Arc<Mutex<CameraController>>, camera_controller: Arc<Mutex<CameraController>>,
/// Pipeline configuration /// Pipeline configuration
@ -74,9 +75,9 @@ pub struct DetectionPipeline {
/// Channel for receiving events /// Channel for receiving events
event_rx: mpsc::Receiver<MeteorEvent>, event_rx: mpsc::Receiver<MeteorEvent>,
/// Whether the pipeline is running /// Whether the pipeline is running
is_running: Arc<Mutex<bool>>, is_running: Arc<StdMutex<bool>>,
/// Current frame index /// Current frame index
frame_index: Arc<Mutex<u64>>, frame_index: Arc<StdMutex<u64>>,
} }
impl DetectionPipeline { impl DetectionPipeline {
@ -109,7 +110,7 @@ impl DetectionPipeline {
let mut detectors = Vec::with_capacity(pipeline_config.detectors.len()); let mut detectors = Vec::with_capacity(pipeline_config.detectors.len());
for detector_config in &pipeline_config.detectors { for detector_config in &pipeline_config.detectors {
let detector = DetectorFactory::create(detector_config); let detector = DetectorFactory::create(detector_config);
detectors.push(Arc::new(Mutex::new(detector))); detectors.push(Arc::new(StdMutex::new(detector)));
} }
// Create tokio runtime // Create tokio runtime
@ -125,15 +126,15 @@ impl DetectionPipeline {
runtime, runtime,
event_tx, event_tx,
event_rx, event_rx,
is_running: Arc::new(Mutex::new(false)), is_running: Arc::new(StdMutex::new(false)),
frame_index: Arc::new(Mutex::new(0)), frame_index: Arc::new(StdMutex::new(0)),
}) })
} }
/// Add a detector to the pipeline /// Add a detector to the pipeline
pub fn add_detector(&mut self, config: DetectorConfig) -> Result<()> { pub fn add_detector(&mut self, config: DetectorConfig) -> Result<()> {
let detector = DetectorFactory::create(&config); let detector = DetectorFactory::create(&config);
self.detectors.push(Arc::new(Mutex::new(detector))); self.detectors.push(Arc::new(StdMutex::new(detector)));
Ok(()) Ok(())
} }
@ -160,6 +161,7 @@ impl DetectionPipeline {
// Spawn a task for this detector // Spawn a task for this detector
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
// We use std::sync::Mutex for detectors, so we use unwrap() not await
let mut detector = detector.lock().unwrap(); let mut detector = detector.lock().unwrap();
match detector.process_frame(&frame, frame_index) { match detector.process_frame(&frame, frame_index) {
Ok(result) => result, Ok(result) => result,
@ -237,17 +239,22 @@ impl DetectionPipeline {
let is_running = self.is_running.clone(); let is_running = self.is_running.clone();
let frame_index = self.frame_index.clone(); let frame_index = self.frame_index.clone();
// Process incoming frames // Process task in a separate thread that doesn't require Send
let process_task = tokio::spawn(async move { std::thread::spawn(move || {
// Create a runtime for this dedicated thread
let rt = Runtime::new().expect("Failed to create tokio runtime for detection thread");
// Run the async logic in this thread
rt.block_on(async {
// Get frame receiver // Get frame receiver
let mut frame_rx = { let mut frame_rx = {
let camera = camera_controller.lock().unwrap(); let camera = camera_controller.lock().await;
camera.subscribe_to_frames() camera.subscribe_to_frames()
}; };
// Get frame buffer reference // Get frame buffer reference
let frame_buffer = { let frame_buffer = {
let camera = camera_controller.lock().unwrap(); let camera = camera_controller.lock().await;
camera.get_frame_buffer() camera.get_frame_buffer()
}; };
@ -279,8 +286,9 @@ impl DetectionPipeline {
// Save event // Save event
if let Some(bbox) = result.bounding_box { if let Some(bbox) = result.bounding_box {
let event = { // Since we're in a dedicated thread, we can safely hold the lock
let mut camera = camera_controller.lock().unwrap(); // across await points without Send concerns
let mut camera = camera_controller.lock().await;
match camera.save_meteor_event( match camera.save_meteor_event(
frame.timestamp, frame.timestamp,
result.confidence, result.confidence,
@ -288,16 +296,15 @@ impl DetectionPipeline {
buffer_seconds, buffer_seconds,
buffer_seconds buffer_seconds
).await { ).await {
Ok(event) => event, Ok(event) => {
// Send event notification
let _ = event_tx.send(event).await;
},
Err(e) => { Err(e) => {
error!("Failed to save meteor event: {}", e); error!("Failed to save meteor event: {}", e);
continue; continue;
} }
} }
};
// Send event notification
let _ = event_tx.send(event).await;
} }
} }
} }
@ -318,12 +325,14 @@ impl DetectionPipeline {
info!("Detection pipeline stopped"); info!("Detection pipeline stopped");
}); });
});
Ok(()) Ok(())
} }
/// Static version of process_frame_parallel for use in async tasks /// Static version of process_frame_parallel for use in async tasks
async fn process_frame_parallel_static( async fn process_frame_parallel_static(
detectors: &Vec<Arc<Mutex<Box<dyn MeteorDetector>>>>, detectors: &Vec<Arc<StdMutex<Box<dyn MeteorDetector>>>>,
frame: &core::Mat, frame: &core::Mat,
frame_index: u64, frame_index: u64,
) -> Vec<DetectionResult> { ) -> Vec<DetectionResult> {
@ -336,12 +345,25 @@ impl DetectionPipeline {
// Spawn a task for this detector // Spawn a task for this detector
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let mut detector = detector.lock().unwrap(); // Use a non-blocking operation to acquire the lock
match detector.process_frame(&frame, frame_index) { // We deliberately avoid async lock here since the detector
// implementation isn't expecting async operations
let detector_result = {
let mut detector = match detector.try_lock() {
Ok(guard) => guard,
Err(_) => {
error!("Failed to acquire detector lock");
return DetectionResult::default();
}
};
detector.process_frame(&frame, frame_index)
};
match detector_result {
Ok(result) => result, Ok(result) => result,
Err(e) => { Err(e) => {
error!("Error processing frame with detector {}: {}", error!("Error processing frame: {}", e);
detector.get_id(), e);
DetectionResult::default() DetectionResult::default()
} }
} }