build success.

This commit is contained in:
grabbit 2025-04-05 01:04:52 +08:00
parent 62d6f38bc5
commit d2fe6a6fc6
19 changed files with 1438 additions and 313 deletions

View File

@ -5,11 +5,15 @@ edition = "2021"
authors = ["Meteor Detection Team"]
description = "A Raspberry Pi based meteor detection system"
[features]
default = []
gpio = ["rppal", "embedded-hal"] # Feature to enable GPIO functionality
[dependencies]
# Hardware interfaces
rppal = "0.22.1" # Raspberry Pi hardware access
rppal = { version = "0.22.1", optional = true } # Raspberry Pi hardware access
serialport = "4.2.0" # Serial port for GPS
embedded-hal = "0.2.7" # Hardware abstraction layer
embedded-hal = { version = "0.2.7", optional = true } # Hardware abstraction layer
# Video processing
opencv = { version = "0.94.2" } # OpenCV bindings
@ -47,6 +51,7 @@ thiserror = "1.0.40" # Error definitions
config = "0.13.3" # Configuration management
uuid = { version = "1.3.3", features = ["v4", "serde"] } # UUIDs
clap = { version = "4.2.5", features = ["derive"] } # Command line argument parsing
rand = "0.8.5" # Random number generation for sensor simulation
[dev-dependencies]
criterion = "0.4.0" # Benchmarking

View File

@ -12,6 +12,24 @@ NC='\033[0m' # No Color
# Default configuration
CONFIG_DIR="$HOME/.config/meteor_detect"
# Detect OS platform
PLATFORM=$(uname)
if [[ "$PLATFORM" == "Linux" ]]; then
IS_RASPBERRY_PI=false
# Check if we're on a Raspberry Pi
if [ -f /proc/device-tree/model ]; then
MODEL=$(tr -d '\0' < /proc/device-tree/model)
if [[ "$MODEL" == *"Raspberry Pi"* ]]; then
IS_RASPBERRY_PI=true
fi
fi
PLATFORM_FEATURE="--features gpio"
echo -e "${GREEN}Detected Linux platform. Enabling GPIO support.${NC}"
else
PLATFORM_FEATURE=""
echo -e "${YELLOW}Detected non-Linux platform ($PLATFORM). GPIO support will be disabled.${NC}"
fi
# Print help message
function print_help {
echo -e "${YELLOW}Meteor Detection System Build Script${NC}"
@ -28,24 +46,28 @@ function print_help {
echo " create-config Create a default configuration file"
echo " help Show this help message"
echo ""
if [[ "$PLATFORM" != "Linux" ]]; then
echo -e "${YELLOW}Note: Running on non-Linux platform. GPIO support is disabled.${NC}"
echo -e "${YELLOW} Only video processing and meteor detection will be available.${NC}"
fi
}
# Build the application in debug mode
function build_debug {
echo -e "${GREEN}Building in debug mode...${NC}"
cargo build
cargo build $PLATFORM_FEATURE
}
# Build the application in release mode
function build_release {
echo -e "${GREEN}Building in release mode...${NC}"
cargo build --release
cargo build --release $PLATFORM_FEATURE
}
# Run the application
function run_app {
echo -e "${GREEN}Running application...${NC}"
cargo run
cargo run $PLATFORM_FEATURE
}
# Clean build artifacts
@ -57,7 +79,7 @@ function clean {
# Run tests
function run_tests {
echo -e "${GREEN}Running tests...${NC}"
cargo test
cargo test $PLATFORM_FEATURE
}
# Install development dependencies
@ -65,10 +87,8 @@ function setup {
echo -e "${GREEN}Installing development dependencies...${NC}"
# Check if running on Raspberry Pi
if [ -f /etc/os-release ]; then
. /etc/os-release
if [[ "$ID" == "raspbian" ]]; then
echo -e "${YELLOW}Detected Raspberry Pi OS${NC}"
if [[ "$IS_RASPBERRY_PI" == "true" ]]; then
echo -e "${YELLOW}Detected Raspberry Pi hardware${NC}"
# Install system dependencies
echo -e "${GREEN}Installing system dependencies...${NC}"
@ -86,6 +106,25 @@ function setup {
gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl \
gstreamer1.0-gtk3 gstreamer1.0-qt5 gstreamer1.0-pulseaudio
elif [[ "$PLATFORM" == "Darwin" ]]; then
echo -e "${YELLOW}Detected macOS platform${NC}"
# Check for Homebrew and install dependencies
if command -v brew &> /dev/null; then
echo -e "${GREEN}Installing macOS dependencies via Homebrew...${NC}"
brew install opencv gstreamer sqlite3 pkg-config
brew install gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly
else
echo -e "${RED}Homebrew not found. Please install it from https://brew.sh/${NC}"
echo -e "${YELLOW}Then install the required dependencies:${NC}"
echo "brew install opencv gstreamer sqlite3 pkg-config"
echo "brew install gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly"
fi
else
echo -e "${YELLOW}Platform-specific setup not implemented for $PLATFORM${NC}"
echo -e "${YELLOW}You may need to manually install dependencies like OpenCV and GStreamer${NC}"
fi
# Install Rust if not already installed
if ! command -v rustc &> /dev/null; then
echo -e "${GREEN}Installing Rust...${NC}"
@ -95,10 +134,6 @@ function setup {
# Create configuration directory
mkdir -p "$CONFIG_DIR"
else
echo -e "${YELLOW}Not running on Raspberry Pi OS, skipping system dependencies${NC}"
fi
fi
# Check for Rust installation
if ! command -v rustc &> /dev/null; then

View File

@ -8,7 +8,9 @@ log_level = "info"
# Camera settings
[camera]
# Camera device path
# Camera device:
# - Linux: device path (e.g., "/dev/video0")
# - macOS: device index (e.g., "0") or identifier
device = "/dev/video0"
# Resolution (options: HD1080p, HD720p, VGA)
resolution = "HD720p"

View File

@ -9,7 +9,7 @@ use tokio::time;
use crate::camera::frame_buffer::{Frame, FrameBuffer, SharedFrameBuffer};
use crate::camera::opencv::{OpenCVCamera, OpenCVCaptureStream};
use crate::camera::{CameraSettings, MeteorEvent, Resolution, ExposureMode};
use crate::camera::{CameraSettings, ExposureMode, MeteorEvent, Resolution};
/// Camera controller manages camera operations and frame capture
pub struct CameraController {
@ -35,7 +35,7 @@ impl CameraController {
/// Create a new camera controller with the given configuration
pub async fn new(config: &crate::Config) -> Result<Self> {
// Extract camera settings from config (placeholder for now)
let settings = CameraSettings::default();
let settings = config.clone().camera;
// Create frame buffer with capacity for 10 minutes of video at settings.fps
let buffer_capacity = (10 * 60 * settings.fps) as usize;
@ -46,8 +46,7 @@ impl CameraController {
// Create events directory if it doesn't exist
let events_dir = PathBuf::from("events");
std::fs::create_dir_all(&events_dir)
.context("Failed to create events directory")?;
std::fs::create_dir_all(&events_dir).context("Failed to create events directory")?;
Ok(Self {
settings,
@ -64,24 +63,29 @@ impl CameraController {
/// Initialize the camera with current settings
pub async fn initialize(&mut self) -> Result<()> {
// Open the camera
let mut camera = OpenCVCamera::open(&self.settings.device)
.context("Failed to open camera")?;
let mut camera =
OpenCVCamera::open(&self.settings.device).context("Failed to open camera")?;
// Configure camera parameters
camera.set_format(self.settings.resolution)
camera
.set_format(self.settings.resolution)
.context("Failed to set camera format")?;
camera.set_fps(self.settings.fps)
camera
.set_fps(self.settings.fps)
.context("Failed to set camera FPS")?;
camera.set_exposure(self.settings.exposure)
camera
.set_exposure(self.settings.exposure)
.context("Failed to set camera exposure")?;
camera.set_gain(self.settings.gain)
camera
.set_gain(self.settings.gain)
.context("Failed to set camera gain")?;
if self.settings.focus_locked {
camera.lock_focus_at_infinity()
camera
.lock_focus_at_infinity()
.context("Failed to lock focus at infinity")?;
}
@ -98,11 +102,14 @@ impl CameraController {
return Ok(());
}
let camera = self.camera.as_mut()
let camera = self
.camera
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Camera not initialized"))?;
// Start the camera streaming
let stream = camera.start_streaming()
let stream = camera
.start_streaming()
.context("Failed to start camera streaming")?;
self.stream = Some(stream);
@ -112,7 +119,9 @@ impl CameraController {
let frame_buffer = self.frame_buffer.clone();
let frame_tx = self.frame_tx.clone();
let fps = self.settings.fps;
let mut stream = self.stream.take()
let mut stream = self
.stream
.take()
.expect("Stream just initialized but is None");
let mut frame_count = self.frame_count;
@ -139,7 +148,7 @@ impl CameraController {
// Broadcast frame to listeners
let _ = frame_tx.send(frame);
},
}
Err(e) => {
error!("Failed to capture frame: {}", e);
// Small delay to avoid tight error loop
@ -235,11 +244,9 @@ impl CameraController {
seconds_after: i64,
) -> Result<MeteorEvent> {
// Extract frames from the buffer
let frames = self.frame_buffer.extract_event_frames(
timestamp,
seconds_before,
seconds_after,
);
let frames =
self.frame_buffer
.extract_event_frames(timestamp, seconds_before, seconds_after);
if frames.is_empty() {
return Err(anyhow::anyhow!("No frames found for event"));
@ -250,8 +257,7 @@ impl CameraController {
// Create a directory for the event
let event_dir = self.events_dir.join(event_id.to_string());
std::fs::create_dir_all(&event_dir)
.context("Failed to create event directory")?;
std::fs::create_dir_all(&event_dir).context("Failed to create event directory")?;
// Save frames to the event directory
for (i, frame) in frames.iter().enumerate() {
@ -277,4 +283,61 @@ impl CameraController {
info!("Saved meteor event: {}", event_id);
Ok(event)
}
/// Get the current status of the camera
pub async fn get_status(&self) -> Result<serde_json::Value> {
let frame_buffer_stats = {
let buffer = self.frame_buffer.clone();
let length = buffer.len();
let capacity = buffer.capacity();
serde_json::json!({
"length": length,
"capacity": capacity,
"utilization_percent": if capacity > 0 { (length as f64 / capacity as f64) * 100.0 } else { 0.0 }
})
};
let recent_frame = {
// Get the most recent frame (index 0)
if let Some(frame) = self.frame_buffer.get(0) {
let timestamp = frame.timestamp.to_rfc3339();
serde_json::json!({
"timestamp": timestamp,
"frame_number": frame.index
})
} else {
serde_json::json!(null)
}
};
let camera_info = {
serde_json::json!({
"device": self.settings.device,
"resolution": format!("{:?}", self.settings.resolution),
"fps": self.settings.fps,
"exposure_mode": format!("{:?}", self.settings.exposure),
"gain": self.settings.gain,
"focus_locked": self.settings.focus_locked
})
};
let status = serde_json::json!({
"running": self.is_running,
"frame_count": self.frame_count,
"settings": {
"device": self.settings.device,
"resolution": format!("{:?}", self.settings.resolution),
"fps": self.settings.fps
},
"camera_info": camera_info,
"frame_buffer": frame_buffer_stats,
"recent_frame": recent_frame,
"events_dir": self.events_dir.to_string_lossy()
});
debug!("Camera status: {}", status);
Ok(status)
}
}

View File

@ -52,7 +52,9 @@ pub enum ExposureMode {
/// Configuration parameters for the camera
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CameraSettings {
/// Camera device path (e.g., /dev/video0)
/// Camera device path or index
/// - Linux: device path (e.g., "/dev/video0")
/// - macOS: device index (e.g., "0") or identifier
pub device: String,
/// Resolution setting
pub resolution: Resolution,
@ -68,8 +70,18 @@ pub struct CameraSettings {
impl Default for CameraSettings {
fn default() -> Self {
// Default device based on platform
#[cfg(target_os = "linux")]
let default_device = "/dev/video0".to_string();
#[cfg(target_os = "macos")]
let default_device = "0".to_string(); // Use device index 0 for macOS
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
let default_device = "0".to_string(); // Fallback to index 0 for other platforms
Self {
device: "/dev/video0".to_string(),
device: default_device,
resolution: Resolution::HD720p,
fps: 30,
exposure: ExposureMode::Auto,
@ -80,7 +92,7 @@ impl Default for CameraSettings {
}
/// Represents a captured meteor event with video and metadata
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeteorEvent {
/// Unique identifier for the event
pub id: uuid::Uuid,

View File

@ -26,22 +26,7 @@ impl OpenCVCamera {
let path_str = path.as_ref().to_str()
.ok_or_else(|| anyhow!("Invalid path"))?;
let is_device_path = path_str.starts_with("/dev/");
let mut capture = if is_device_path {
// For device files like /dev/video0, we need to extract the number
if let Some(num_str) = path_str.strip_prefix("/dev/video") {
if let Ok(device_index) = num_str.parse::<i32>() {
videoio::VideoCapture::new(device_index, videoio::CAP_ANY)?
} else {
return Err(anyhow!("Invalid device number in path: {}", path_str));
}
} else {
return Err(anyhow!("Unsupported device path format: {}", path_str));
}
} else {
// For other paths, try to open directly (e.g., video files, URLs)
videoio::VideoCapture::from_file(path_str, videoio::CAP_ANY)?
};
let mut capture = Self::create_capture_from_path(path_str)?;
if !capture.is_opened()? {
return Err(anyhow!("Failed to open camera: {}", path_str));
@ -65,6 +50,51 @@ impl OpenCVCamera {
})
}
/// Create a VideoCapture instance from a path or device index
fn create_capture_from_path(path_str: &str) -> Result<videoio::VideoCapture> {
// Try to parse as integer index first
if let Ok(device_index) = path_str.parse::<i32>() {
return Ok(videoio::VideoCapture::new(device_index, videoio::CAP_ANY)?);
}
// Handle platform-specific device paths
#[cfg(target_os = "linux")]
{
// For Linux device files like /dev/video0
if let Some(num_str) = path_str.strip_prefix("/dev/video") {
if let Ok(device_index) = num_str.parse::<i32>() {
return Ok(videoio::VideoCapture::new(device_index, videoio::CAP_ANY)?);
} else {
return Err(anyhow!("Invalid device number in path: {}", path_str));
}
}
}
#[cfg(target_os = "macos")]
{
// macOS doesn't use /dev/video* paths, but it might have a special format
// If we get a path with "camera" or "facetime" or other macOS camera identifiers
if path_str.contains("camera") || path_str.contains("facetime") || path_str.contains("avfoundation") {
// For macOS, try to extract any numbers in the path
let nums: Vec<&str> = path_str.split(|c: char| !c.is_digit(10))
.filter(|s| !s.is_empty())
.collect();
if let Some(num_str) = nums.first() {
if let Ok(device_index) = num_str.parse::<i32>() {
return Ok(videoio::VideoCapture::new(device_index, videoio::CAP_AVFOUNDATION)?);
}
}
// If we can't extract a number, try device 0 with AVFoundation
return Ok(videoio::VideoCapture::new(0, videoio::CAP_AVFOUNDATION)?);
}
}
// For URLs, video files, or any other path type
Ok(videoio::VideoCapture::from_file(path_str, videoio::CAP_ANY)?)
}
/// Set the camera resolution and pixel format
pub fn set_format(&mut self, resolution: Resolution) -> Result<()> {
let (width, height) = resolution.dimensions();
@ -165,26 +195,11 @@ impl OpenCVCamera {
// Create a separate VideoCapture for the stream to avoid concurrent access issues
let device = self.device.clone();
let is_device_path = device.starts_with("/dev/");
let mut stream_capture = Self::create_capture_from_path(&device)?;
let stream_capture = if is_device_path {
if let Some(num_str) = device.strip_prefix("/dev/video") {
if let Ok(device_index) = num_str.parse::<i32>() {
// Open with same settings
let mut cap = videoio::VideoCapture::new(device_index, videoio::CAP_ANY)?;
cap.set(videoio::CAP_PROP_FRAME_WIDTH, self.width as f64)?;
cap.set(videoio::CAP_PROP_FRAME_HEIGHT, self.height as f64)?;
cap
} else {
return Err(anyhow!("Invalid device number in path: {}", device));
}
} else {
return Err(anyhow!("Unsupported device path format: {}", device));
}
} else {
// For other paths, try to open directly
videoio::VideoCapture::from_file(&device, videoio::CAP_ANY)?
};
// Apply the same settings
stream_capture.set(videoio::CAP_PROP_FRAME_WIDTH, self.width as f64)?;
stream_capture.set(videoio::CAP_PROP_FRAME_HEIGHT, self.height as f64)?;
if !stream_capture.is_opened()? {
return Err(anyhow!("Failed to open camera stream"));

View File

@ -1,7 +1,8 @@
use anyhow::Result;
use log::{info, warn};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use log::{info, warn, error};
use tokio::sync::{Mutex, mpsc};
use std::sync::Arc;
use futures::StreamExt;
use crate::camera::MeteorEvent;
use crate::config::Config;
@ -20,6 +21,8 @@ pub struct CommunicationManager {
event_rx: Option<mpsc::Receiver<MeteorEvent>>,
/// Whether the manager is running
is_running: Arc<Mutex<bool>>,
/// Optional shutdown signal
shutdown_signal: Option<mpsc::Sender<()>>,
}
impl CommunicationManager {
@ -37,13 +40,14 @@ impl CommunicationManager {
gps_controller,
event_rx: None,
is_running: Arc::new(Mutex::new(false)),
shutdown_signal: None,
})
}
/// Start the communication services
pub async fn run(&self) -> Result<()> {
pub async fn run(&mut self) -> Result<()> {
{
let mut is_running = self.is_running.lock().unwrap();
let mut is_running = self.is_running.lock().await;
if *is_running {
warn!("Communication manager is already running");
return Ok(());
@ -53,11 +57,56 @@ impl CommunicationManager {
info!("Starting communication manager");
// In a real implementation, this would:
// 1. Connect to MQTT broker
// 2. Start HTTP API server
// 3. Subscribe to event channels
// 4. Process and forward events
// Create a shutdown channel
let (tx, mut rx) = mpsc::channel::<()>(1);
self.shutdown_signal = Some(tx);
// Clone necessary references for our tasks
let is_running = Arc::clone(&self.is_running);
// Check if we have an event receiver
if let Some(mut event_rx) = self.event_rx.take() {
// Spawn a task to process events
let event_task = {
let config = self.config.clone();
tokio::spawn(async move {
info!("Started event processing task");
loop {
tokio::select! {
// Process incoming events
Some(event) = event_rx.recv() => {
info!("Received meteor event: {}", event.id);
// Process and forward the event (implementation depends on requirements)
if config.mqtt.enabled {
// Send to MQTT broker
if let Err(e) = Self::send_to_mqtt(&event).await {
error!("Failed to send event to MQTT: {}", e);
}
}
}
// Check for shutdown signal
_ = rx.recv() => {
info!("Shutting down event processing task");
break;
}
}
}
info!("Event processing task stopped");
})
};
// If HTTP API is enabled, start it
if self.config.http_api.enabled {
self.start_http_api().await?;
}
// If MQTT is enabled, connect to broker
if self.config.mqtt.enabled {
self.connect_mqtt_broker().await?;
}
}
Ok(())
}
@ -65,7 +114,7 @@ impl CommunicationManager {
/// Stop the communication services
pub async fn stop(&self) -> Result<()> {
{
let mut is_running = self.is_running.lock().unwrap();
let mut is_running = self.is_running.lock().await;
if !*is_running {
warn!("Communication manager is not running");
return Ok(());
@ -74,6 +123,23 @@ impl CommunicationManager {
}
info!("Stopping communication manager");
// Send shutdown signal
if let Some(tx) = &self.shutdown_signal {
if let Err(e) = tx.send(()).await {
error!("Failed to send shutdown signal: {}", e);
}
}
// Cleanup connections
if self.config.mqtt.enabled {
self.disconnect_mqtt_broker().await?;
}
if self.config.http_api.enabled {
self.stop_http_api().await?;
}
Ok(())
}
@ -84,17 +150,120 @@ impl CommunicationManager {
/// Send a status update
pub async fn send_status_update(&self) -> Result<()> {
// This is a placeholder implementation
// In a real system, this would send system status via MQTT
// Get current camera status
let camera_status = {
let camera = self.camera_controller.lock().await;
camera.get_status().await?
};
// Get current GPS status
let gps_status = {
let gps = self.gps_controller.lock().await;
gps.get_current_position().await?
};
// Combine status data
let status = serde_json::json!({
"device_id": self.config.device.id,
"timestamp": chrono::Utc::now().timestamp(),
"camera": camera_status,
"gps": gps_status,
"battery": 100, // Placeholder - would be from actual battery sensor
"disk_space": 1024, // Placeholder - would be from actual disk monitoring
});
// Send to MQTT if enabled
if self.config.mqtt.enabled {
Self::publish_mqtt(&self.config.mqtt.status_topic, &status.to_string()).await?;
}
info!("Sent status update");
Ok(())
}
/// Send an event notification
pub async fn send_event_notification(&self, event: &MeteorEvent) -> Result<()> {
// This is a placeholder implementation
// In a real system, this would send event data via MQTT
// Get GPS location first
let location = {
let gps = self.gps_controller.lock().await;
gps.get_current_position().await?
};
// Add metadata to event
let enriched_event = serde_json::json!({
"device_id": self.config.device.id,
"timestamp": chrono::Utc::now().timestamp(),
"event": event,
"location": location
});
// Send to MQTT if enabled
if self.config.mqtt.enabled {
Self::publish_mqtt(&self.config.mqtt.events_topic, &enriched_event.to_string()).await?;
}
info!("Sent notification for event {}", event.id);
Ok(())
}
// Helper methods for MQTT
/// Connect to MQTT broker
async fn connect_mqtt_broker(&self) -> Result<()> {
info!("Connecting to MQTT broker at {}", self.config.mqtt.broker_url);
// In a real implementation, this would:
// - Connect to the broker
// - Set up subscriptions
// - Configure QoS and other parameters
Ok(())
}
/// Disconnect from MQTT broker
async fn disconnect_mqtt_broker(&self) -> Result<()> {
info!("Disconnecting from MQTT broker");
// In a real implementation, this would:
// - Close the connection
// - Clean up resources
Ok(())
}
/// Send event to MQTT
async fn send_to_mqtt(event: &MeteorEvent) -> Result<()> {
// In a real implementation, this would:
// - Serialize the event
// - Publish to appropriate topic
info!("Published event {} to MQTT", event.id);
Ok(())
}
/// Publish message to MQTT topic
async fn publish_mqtt(topic: &str, message: &str) -> Result<()> {
// In a real implementation, this would:
// - Connect to the broker (or use existing connection)
// - Publish the message
info!("Published to topic: {}", topic);
Ok(())
}
// HTTP API methods
/// Start HTTP API server
async fn start_http_api(&self) -> Result<()> {
info!("Starting HTTP API on port {}", self.config.http_api.port);
// In a real implementation, this would:
// - Set up routes
// - Start server
// - Configure middleware
Ok(())
}
/// Stop HTTP API server
async fn stop_http_api(&self) -> Result<()> {
info!("Stopping HTTP API server");
// In a real implementation, this would:
// - Gracefully shutdown server
// - Close connections
Ok(())
}
}

View File

@ -71,7 +71,100 @@ impl Default for DetectionConfig {
}
}
/// Configuration for communication
/// Configuration for MQTT communication
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttConfig {
/// Whether MQTT is enabled
pub enabled: bool,
/// MQTT broker URL
pub broker_url: String,
/// MQTT client ID
pub client_id: String,
/// MQTT username
pub username: Option<String>,
/// MQTT password
pub password: Option<String>,
/// Topic for event notifications
pub events_topic: String,
/// Topic for system status
pub status_topic: String,
/// Quality of Service level (0, 1, or 2)
pub qos: u8,
/// Whether to retain messages
pub retain: bool,
/// Keep-alive interval in seconds
pub keep_alive_secs: u16,
/// Whether to use SSL/TLS
pub use_tls: bool,
/// Path to CA certificate (if TLS enabled)
pub ca_cert_path: Option<PathBuf>,
/// Path to client certificate (if TLS enabled)
pub client_cert_path: Option<PathBuf>,
/// Path to client key (if TLS enabled)
pub client_key_path: Option<PathBuf>,
}
impl Default for MqttConfig {
fn default() -> Self {
Self {
enabled: false,
broker_url: "mqtt://localhost:1883".to_string(),
client_id: format!("meteor-detector-{}", uuid::Uuid::new_v4()),
username: None,
password: None,
events_topic: "meteor/events".to_string(),
status_topic: "meteor/status".to_string(),
qos: 1,
retain: false,
keep_alive_secs: 60,
use_tls: false,
ca_cert_path: None,
client_cert_path: None,
client_key_path: None,
}
}
}
/// Configuration for HTTP API
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpApiConfig {
/// Whether HTTP API is enabled
pub enabled: bool,
/// HTTP API binding address
pub bind_address: String,
/// HTTP API port
pub port: u16,
/// Whether to enable SSL/TLS for HTTP API
pub use_tls: bool,
/// Path to SSL/TLS certificate (if TLS enabled)
pub cert_path: Option<PathBuf>,
/// Path to SSL/TLS key (if TLS enabled)
pub key_path: Option<PathBuf>,
/// Authentication token for API access (if None, no authentication)
pub auth_token: Option<String>,
/// CORS allowed origins (empty means all origins)
pub cors_origins: Vec<String>,
/// Rate limiting: requests per minute
pub rate_limit: Option<u32>,
}
impl Default for HttpApiConfig {
fn default() -> Self {
Self {
enabled: false,
bind_address: "0.0.0.0".to_string(),
port: 8080,
use_tls: false,
cert_path: None,
key_path: None,
auth_token: None,
cors_origins: vec![],
rate_limit: Some(60),
}
}
}
/// Configuration for communication (legacy structure - for backward compatibility)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommunicationConfig {
/// MQTT broker URL
@ -113,39 +206,97 @@ impl Default for CommunicationConfig {
}
}
/// Device identification configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceConfig {
/// Unique ID for this meteor detector
pub id: String,
/// Human-readable name for this device
pub name: String,
/// Device location description
pub location: Option<String>,
/// Additional device metadata
pub metadata: std::collections::HashMap<String, String>,
}
impl Default for DeviceConfig {
fn default() -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
name: "Meteor Detector".to_string(),
location: None,
metadata: std::collections::HashMap::new(),
}
}
}
/// Main application configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
/// Unique ID for this meteor detector
pub device_id: String,
/// Device identification
#[serde(default)]
pub device: DeviceConfig,
/// Unique ID for this meteor detector (legacy - for backward compatibility)
#[serde(skip_serializing_if = "Option::is_none")]
pub device_id: Option<String>,
/// Camera configuration
pub camera: CameraSettings,
/// GPS configuration
pub gps: GpsConfig,
/// Sensor configuration
pub sensors: SensorConfig,
/// Storage configuration
pub storage: StorageConfig,
/// Detection configuration
pub detection: DetectionConfig,
/// Communication configuration
pub communication: CommunicationConfig,
/// MQTT configuration
#[serde(default)]
pub mqtt: MqttConfig,
/// HTTP API configuration
#[serde(default)]
pub http_api: HttpApiConfig,
/// Communication configuration (legacy - for backward compatibility)
#[serde(skip_serializing_if = "Option::is_none")]
pub communication: Option<CommunicationConfig>,
/// Watermark configuration
#[serde(default)]
pub watermark: WatermarkOptions,
/// RTSP streaming configuration
#[serde(default)]
pub rtsp: RtspConfig,
/// Logging level
pub log_level: String,
}
impl Default for Config {
fn default() -> Self {
// Default device based on platform
#[cfg(target_os = "linux")]
let default_device = "/dev/video0".to_string();
#[cfg(target_os = "macos")]
let default_device = "0".to_string(); // Use device index 0 for macOS
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
let default_device = "0".to_string(); // Fallback to index 0 for other platforms
Self {
device_id: uuid::Uuid::new_v4().to_string(),
device: DeviceConfig::default(),
device_id: None,
camera: CameraSettings {
device: "/dev/video0".to_string(),
device: default_device,
resolution: Resolution::HD720p,
fps: 30,
exposure: ExposureMode::Auto,
@ -156,7 +307,9 @@ impl Default for Config {
sensors: SensorConfig::default(),
storage: StorageConfig::default(),
detection: DetectionConfig::default(),
communication: CommunicationConfig::default(),
mqtt: MqttConfig::default(),
http_api: HttpApiConfig::default(),
communication: None,
watermark: WatermarkOptions::default(),
rtsp: RtspConfig::default(),
log_level: "info".to_string(),
@ -170,9 +323,34 @@ impl Config {
let config_str = fs::read_to_string(&path)
.context(format!("Failed to read config file {:?}", path.as_ref()))?;
let config: Config = toml::from_str(&config_str)
let mut config: Config = toml::from_str(&config_str)
.context("Failed to parse config file")?;
// Migrate legacy device_id if needed
if config.device_id.is_some() && config.device.id == DeviceConfig::default().id {
config.device.id = config.device_id.clone().unwrap();
}
// Migrate legacy communication config if needed
if let Some(comm) = &config.communication {
// Only migrate if MQTT and HTTP API are using defaults
if !config.mqtt.enabled && !config.http_api.enabled {
config.mqtt.enabled = true;
config.mqtt.broker_url = comm.mqtt_broker.clone();
config.mqtt.client_id = comm.mqtt_client_id.clone();
config.mqtt.username = comm.mqtt_username.clone();
config.mqtt.password = comm.mqtt_password.clone();
config.mqtt.events_topic = comm.event_topic.clone();
config.mqtt.status_topic = comm.status_topic.clone();
config.http_api.enabled = true;
config.http_api.port = comm.api_port;
config.http_api.use_tls = comm.api_use_ssl;
config.http_api.cert_path = comm.api_cert_path.clone();
config.http_api.key_path = comm.api_key_path.clone();
}
}
info!("Loaded configuration from {:?}", path.as_ref());
Ok(config)
}

View File

@ -1,6 +1,7 @@
use anyhow::{Context, Result};
use log::{debug, error, info, warn};
use opencv::{core, imgproc, prelude::*};
use opencv::core::AlgorithmHint::ALGO_HINT_APPROX;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -87,7 +88,7 @@ impl BrightnessDetector {
fn update_background(&mut self, frame: &core::Mat) -> Result<()> {
// Convert frame to grayscale
let mut gray = core::Mat::default();
imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?;
imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?;
match &mut self.background {
Some(bg) => {
// Gradually update background model (running average)
@ -111,7 +112,7 @@ impl BrightnessDetector {
fn compute_difference(&mut self, frame: &core::Mat) -> Result<core::Mat> {
// Convert frame to grayscale
let mut gray = core::Mat::default();
imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?;
imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?;
// Calculate absolute difference from background
let mut diff = core::Mat::default();
@ -160,7 +161,7 @@ impl BrightnessDetector {
fn calculate_brightness(&self, frame: &core::Mat) -> Result<f32> {
// Convert to grayscale
let mut gray = core::Mat::default();
imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?;
imgproc::cvt_color(frame, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?;
// Calculate mean brightness
let mean = core::mean(&gray, &core::no_array())?;

View File

@ -31,10 +31,14 @@ pub struct CamsDetectorParams {
/// Prefix for saved files
pub file_prefix: String,
/// Unique ID for this detector instance
#[serde(default = "Uuid::new_v4")]
#[serde(default = "default_uuid_string")]
pub id: String,
}
fn default_uuid_string() -> String {
Uuid::new_v4().to_string()
}
impl Default for CamsDetectorParams {
fn default() -> Self {
Self {
@ -133,7 +137,7 @@ impl CamsDetector {
// This could be optimized further for performance
// Apply threshold to maxpixel image to find bright areas
let mut thresholded = core::Mat::default()?;
let mut thresholded = core::Mat::default();
opencv::imgproc::threshold(
&features.maxpixel,
&mut thresholded,
@ -144,12 +148,10 @@ impl CamsDetector {
// Find contours in the thresholded image
let mut contours = core::Vector::<core::Vector<core::Point>>::new();
let mut hierarchy = core::Vector::<core::Vec4i>::new();
imgproc::find_contours(
&thresholded,
&mut contours,
&mut hierarchy,
imgproc::RETR_EXTERNAL,
imgproc::CHAIN_APPROX_SIMPLE,
core::Point::new(0, 0),
@ -166,15 +168,15 @@ impl CamsDetector {
// Function to check if a pixel is likely part of a meteor
let is_meteor_pixel = |x: i32, y: i32| -> Result<bool> {
// Get values from feature images
let max_val = *features.maxpixel.at::<u8>(y, x)?.get(0).unwrap_or(&0);
let avg_val = *features.avepixel.at::<u8>(y, x)?.get(0).unwrap_or(&0);
let max_val = *features.maxpixel.at_2d::<u8>(y, x)?;
let avg_val = *features.avepixel.at_2d::<u8>(y, x)?;
// Avoid division by zero
if avg_val == 0 {
return Ok(false);
}
let std_val = *features.stdpixel.at::<u8>(y, x)?.get(0).unwrap_or(&0);
let std_val = *features.stdpixel.at_2d::<u8>(y, x)?;
let std_to_avg_ratio = std_val as f32 / avg_val as f32;

View File

@ -0,0 +1,236 @@
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use log::{debug, info};
use opencv::{core, imgcodecs, prelude::*};
use std::path::Path;
use opencv::core::AlgorithmHint::ALGO_HINT_APPROX;
/// CAMS FTP format feature images
/// Used for meteor detection analysis
#[derive(Debug, Clone)]
pub struct FeatureImages {
/// Maximum pixel value at each position
pub maxpixel: core::Mat,
/// Average pixel value (excluding maximum)
pub avepixel: core::Mat,
/// Standard deviation of pixel values
pub stdpixel: core::Mat,
/// Index of the frame with maximum pixel value
pub maxframe: core::Mat,
/// Start time of the stack
pub start_time: DateTime<Utc>,
/// End time of the stack
pub end_time: DateTime<Utc>,
/// Feature set ID
pub id: String,
}
impl FeatureImages {
/// Create a new set of feature images
pub fn new(
maxpixel: core::Mat,
avepixel: core::Mat,
stdpixel: core::Mat,
maxframe: core::Mat,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
id: String,
) -> Self {
Self {
maxpixel,
avepixel,
stdpixel,
maxframe,
start_time,
end_time,
id,
}
}
/// Create feature images from frame stacker output
pub fn from_stacked_frames(stacked: &crate::detection::StackedFrames) -> Self {
Self {
maxpixel: stacked.maxpixel.clone(),
avepixel: stacked.avepixel.clone(),
stdpixel: stacked.stdpixel.clone(),
maxframe: stacked.maxframe.clone(),
start_time: stacked.start_time,
end_time: stacked.end_time,
id: stacked.stack_id.clone(),
}
}
/// Save feature images to files with the given base path
pub fn save_to_files(&self, base_path: &str) -> Result<()> {
// Create the parent directory if needed
if let Some(parent) = Path::new(base_path).parent() {
std::fs::create_dir_all(parent)
.context("Failed to create parent directory for feature images")?;
}
// Save each feature image
let maxpixel_path = format!("{}_maxpixel.png", base_path);
let avepixel_path = format!("{}_avepixel.png", base_path);
let stdpixel_path = format!("{}_stdpixel.png", base_path);
let maxframe_path = format!("{}_maxframe.png", base_path);
imgcodecs::imwrite(&maxpixel_path, &self.maxpixel, &core::Vector::new())
.context("Failed to save maxpixel image")?;
imgcodecs::imwrite(&avepixel_path, &self.avepixel, &core::Vector::new())
.context("Failed to save avepixel image")?;
imgcodecs::imwrite(&stdpixel_path, &self.stdpixel, &core::Vector::new())
.context("Failed to save stdpixel image")?;
imgcodecs::imwrite(&maxframe_path, &self.maxframe, &core::Vector::new())
.context("Failed to save maxframe image")?;
debug!("Saved feature images to {}", base_path);
Ok(())
}
/// Load feature images from files
pub fn load_from_files(base_path: &str, id: &str) -> Result<Self> {
let maxpixel_path = format!("{}_maxpixel.png", base_path);
let avepixel_path = format!("{}_avepixel.png", base_path);
let stdpixel_path = format!("{}_stdpixel.png", base_path);
let maxframe_path = format!("{}_maxframe.png", base_path);
let maxpixel = imgcodecs::imread(&maxpixel_path, imgcodecs::IMREAD_GRAYSCALE)
.context("Failed to load maxpixel image")?;
let avepixel = imgcodecs::imread(&avepixel_path, imgcodecs::IMREAD_GRAYSCALE)
.context("Failed to load avepixel image")?;
let stdpixel = imgcodecs::imread(&stdpixel_path, imgcodecs::IMREAD_GRAYSCALE)
.context("Failed to load stdpixel image")?;
let maxframe = imgcodecs::imread(&maxframe_path, imgcodecs::IMREAD_GRAYSCALE)
.context("Failed to load maxframe image")?;
// Parse the timestamp from the file name if possible
let now = Utc::now();
Ok(Self {
maxpixel,
avepixel,
stdpixel,
maxframe,
start_time: now, // Default to current time
end_time: now, // Default to current time
id: id.to_string(),
})
}
/// Get the dimensions of the feature images
pub fn dimensions(&self) -> (i32, i32) {
(self.maxpixel.cols(), self.maxpixel.rows())
}
/// Check if all features have the same dimensions
pub fn validate_dimensions(&self) -> bool {
let (width, height) = self.dimensions();
self.avepixel.cols() == width && self.avepixel.rows() == height &&
self.stdpixel.cols() == width && self.stdpixel.rows() == height &&
self.maxframe.cols() == width && self.maxframe.rows() == height
}
/// Create a debug visualization of the feature images
pub fn create_visualization(&self) -> Result<core::Mat> {
// Create a 2x2 grid with all feature images
let (width, height) = self.dimensions();
let grid_width = width * 2;
let grid_height = height * 2;
// Create a Mat from zeros and explicitly convert to Mat
let zeros_expr = core::Mat::zeros(grid_height, grid_width, core::CV_8UC3)?;
let mut visualization = zeros_expr.to_mat()?;
// Convert grayscale to color for display
let mut maxpixel_color = core::Mat::default();
let mut avepixel_color = core::Mat::default();
let mut stdpixel_color = core::Mat::default();
let mut maxframe_color = core::Mat::default();
opencv::imgproc::cvt_color(&self.maxpixel, &mut maxpixel_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?;
opencv::imgproc::cvt_color(&self.avepixel, &mut avepixel_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?;
opencv::imgproc::cvt_color(&self.stdpixel, &mut stdpixel_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?;
opencv::imgproc::cvt_color(&self.maxframe, &mut maxframe_color, opencv::imgproc::COLOR_GRAY2BGR, 0,ALGO_HINT_APPROX)?;
// Create a region of interest for each quadrant
let roi_top_left = core::Rect::new(0, 0, width, height);
let roi_top_right = core::Rect::new(width, 0, width, height);
let roi_bottom_left = core::Rect::new(0, height, width, height);
let roi_bottom_right = core::Rect::new(width, height, width, height);
// Copy each feature image to its position in the grid
let roi_rect = roi_top_left.clone();
let mut roi = visualization.roi(roi_rect)?;
maxpixel_color.copy_to(&mut roi.clone_pointee())?;
let roi_rect = roi_top_right.clone();
let mut roi = visualization.roi(roi_rect)?;
avepixel_color.copy_to(&mut roi.clone_pointee())?;
let roi_rect = roi_bottom_left.clone();
let mut roi = visualization.roi(roi_rect)?;
stdpixel_color.copy_to(&mut roi.clone_pointee())?;
let roi_rect = roi_bottom_right.clone();
let mut roi = visualization.roi(roi_rect)?;
maxframe_color.copy_to(&mut roi.clone_pointee())?;
// Add labels
let font = opencv::imgproc::FONT_HERSHEY_SIMPLEX;
let font_scale = 0.8;
let color = core::Scalar::new(255.0, 255.0, 255.0, 0.0);
let thickness = 2;
opencv::imgproc::put_text(
&mut visualization,
"MaxPixel",
core::Point::new(10, 30),
font,
font_scale,
color,
thickness,
opencv::imgproc::LINE_AA,
false,
)?;
opencv::imgproc::put_text(
&mut visualization,
"AvePixel",
core::Point::new(width + 10, 30),
font,
font_scale,
color,
thickness,
opencv::imgproc::LINE_AA,
false,
)?;
opencv::imgproc::put_text(
&mut visualization,
"StdPixel",
core::Point::new(10, height + 30),
font,
font_scale,
color,
thickness,
opencv::imgproc::LINE_AA,
false,
)?;
opencv::imgproc::put_text(
&mut visualization,
"MaxFrame",
core::Point::new(width + 10, height + 30),
font,
font_scale,
color,
thickness,
opencv::imgproc::LINE_AA,
false,
)?;
Ok(visualization)
}
}

View File

@ -11,8 +11,25 @@ use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use opencv::core::AlgorithmHint::ALGO_HINT_APPROX;
use crate::camera::Frame;
use crate::detection::feature_images::FeatureImages;
/// Thread-safe shared frame stacker
pub type SharedFrameStacker = Arc<Mutex<FrameStacker>>;
/// Create a new shared frame stacker with default configuration
pub fn new_shared_stacker() -> SharedFrameStacker {
let stacker = FrameStacker::new(FrameStackerConfig::default())
.expect("Failed to create frame stacker");
Arc::new(Mutex::new(stacker))
}
/// Create a new shared frame stacker with the given configuration
pub fn new_shared_stacker_with_config(config: FrameStackerConfig) -> Result<SharedFrameStacker> {
let stacker = FrameStacker::new(config)?;
Ok(Arc::new(Mutex::new(stacker)))
}
/// Configuration for the frame stacker
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -124,6 +141,80 @@ impl FrameStacker {
self.state.lock().unwrap().clone()
}
/// Add a frame to the stacker
/// Returns true if the batch is complete
pub fn push_frame(&mut self, frame: Frame) -> bool {
// Add frame to buffer
self.frame_buffer.push_back(frame.clone());
// Update state
{
let mut state = self.state.lock().unwrap();
state.current_frame_count = self.frame_buffer.len();
}
// Set start time of stack if this is the first frame
if self.start_time.is_none() {
self.start_time = Some(frame.timestamp);
}
// Return true if the batch is complete
self.frame_buffer.len() >= self.config.frames_per_stack
}
/// Process the current batch of frames
/// Returns FeatureImages if processed, None if not enough frames
pub fn process_batch(&mut self) -> Result<Option<FeatureImages>> {
if self.frame_buffer.is_empty() {
return Ok(None);
}
if self.frame_buffer.len() < self.config.frames_per_stack {
debug!("Not enough frames to process batch: {} < {}",
self.frame_buffer.len(), self.config.frames_per_stack);
return Ok(None);
}
debug!("Processing batch of {} frames", self.frame_buffer.len());
// Process the frames
self.processing = true;
let stacked_result = self.stack_frames()?;
self.processing = false;
match stacked_result {
Some(stacked_frames) => {
// Convert StackedFrames to FeatureImages
let features = FeatureImages::new(
stacked_frames.maxpixel,
stacked_frames.avepixel,
stacked_frames.stdpixel,
stacked_frames.maxframe,
stacked_frames.start_time,
stacked_frames.end_time,
stacked_frames.stack_id,
);
// Reset for next batch
self.current_stack_id = format!("stack_{}", Utc::now().timestamp());
self.start_time = None;
self.frame_buffer.clear();
// Update state
{
let mut state = self.state.lock().unwrap();
state.stacks_processed += 1;
state.last_stack_time = Some(Utc::now());
state.last_stack_id = Some(self.current_stack_id.clone());
state.current_frame_count = 0;
}
Ok(Some(features))
},
None => Ok(None),
}
}
/// Process a new frame
pub fn process_frame(&mut self, frame: Frame) -> Result<Option<StackedFrames>> {
// Add frame to buffer
@ -197,7 +288,7 @@ impl FrameStacker {
// Convert to grayscale if needed
let gray_frame = if frame.mat.channels() != 1 {
let mut gray = core::Mat::default();
imgproc::cvt_color(&frame.mat, &mut gray, imgproc::COLOR_BGR2GRAY, 0)?;
imgproc::cvt_color(&frame.mat, &mut gray, imgproc::COLOR_BGR2GRAY, 0,ALGO_HINT_APPROX)?;
gray
} else {
frame.mat.clone()
@ -273,7 +364,7 @@ impl FrameStacker {
let mut stdpixel = core::Mat::zeros(height, width, core::CV_8UC1)?;
for y in 0..height {
for x in 0..width {
let maxpixel_mat = maxpixel.to_mat()?; // 在循环外一次性转换为 `Mat`
let maxpixel_mat = maxpixel.to_mat()?;
let max_val = f64::from(*maxpixel_mat.at_2d::<u8>(y, x)?);
let avgpixel_mat = avepixel.to_mat()?;
let avg = f64::from(*avgpixel_mat.at_2d::<u8>(y, x)?);
@ -376,6 +467,11 @@ impl FrameStacker {
self.frame_buffer.len()
}
/// Get the batch size
pub fn batch_size(&self) -> usize {
self.config.frames_per_stack
}
/// Get the stack progress (0.0 - 1.0)
pub fn progress(&self) -> f32 {
self.frame_buffer.len() as f32 / self.config.frames_per_stack as f32

View File

@ -2,11 +2,13 @@ mod pipeline;
mod frame_stacker;
mod cams_detector;
mod brightness_detector;
mod feature_images;
pub use pipeline::{DetectionPipeline, PipelineConfig, AggregationStrategy};
pub use frame_stacker::{FrameStacker, FrameStackerConfig, StackedFrames, FrameStackerState};
pub use frame_stacker::{FrameStacker, FrameStackerConfig, StackedFrames, FrameStackerState, SharedFrameStacker, new_shared_stacker, new_shared_stacker_with_config};
pub use cams_detector::{CamsDetector, CamsDetectorParams};
pub use brightness_detector::{BrightnessDetector, BrightnessDetectorParams};
pub use feature_images::FeatureImages;
use anyhow::Result;
use chrono::Utc;

View File

@ -68,8 +68,8 @@ pub struct DetectionPipeline {
camera_controller: Arc<Mutex<CameraController>>,
/// Pipeline configuration
config: PipelineConfig,
/// Tokio runtime for async tasks
runtime: Runtime,
// /// Tokio runtime for async tasks
// runtime: Runtime,
/// Channel for sending detected events
event_tx: mpsc::Sender<MeteorEvent>,
/// Channel for receiving events
@ -114,7 +114,7 @@ impl DetectionPipeline {
}
// Create tokio runtime
let runtime = Runtime::new().context("Failed to create Tokio runtime")?;
// let runtime = Runtime::new().context("Failed to create Tokio runtime")?;
// Create channel for event communication
let (event_tx, event_rx) = mpsc::channel(32);
@ -123,7 +123,7 @@ impl DetectionPipeline {
detectors,
camera_controller,
config: pipeline_config,
runtime,
// runtime,
event_tx,
event_rx,
is_running: Arc::new(StdMutex::new(false)),
@ -241,6 +241,7 @@ impl DetectionPipeline {
// Process task in a separate thread that doesn't require Send
std::thread::spawn(move || {
// Create a runtime for this dedicated thread
let rt = Runtime::new().expect("Failed to create tokio runtime for detection thread");

View File

@ -1,7 +1,6 @@
use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, Duration, Utc};
use log::{debug, error, info, warn};
use rppal::gpio::{Gpio, InputPin, Trigger};
use serialport::{SerialPort};
use std::io::{BufRead, BufReader};
use std::path::Path;
@ -11,6 +10,14 @@ use std::time;
use serialport::DataBits::Eight;
use tokio::sync::broadcast;
// Import GPIO only on Linux
#[cfg(target_os = "linux")]
use rppal::gpio::{Gpio, InputPin, Trigger};
// Import rand for non-Linux platforms for simulating data
#[cfg(not(target_os = "linux"))]
use rand::Rng;
use crate::gps::nmea::{parse_nmea_sentence, NmeaPosition};
use crate::gps::{GeoPosition, GpsConfig, GpsStatus, SyncStatus, CameraOrientation};
@ -44,8 +51,12 @@ pub struct GpsController {
config: GpsConfig,
/// Serial port for GPS communication
port: Option<Box<dyn SerialPort>>,
/// GPIO pin for PPS signal
/// GPIO pin for PPS signal (Linux only)
#[cfg(target_os = "linux")]
pps_pin: Option<InputPin>,
/// Placeholder for non-Linux platforms
#[cfg(not(target_os = "linux"))]
pps_pin: Option<()>,
/// Last known position
position: Arc<Mutex<GeoPosition>>,
/// Current synchronization status
@ -97,6 +108,24 @@ impl GpsController {
/// Initialize the GPS module
pub async fn initialize(&mut self) -> Result<()> {
// Check if we're on a non-Linux platform
#[cfg(not(target_os = "linux"))]
{
info!("GPS module running on non-Linux platform. Using fallback position.");
self.gps_state.degraded = true;
// Set fallback position
let mut pos = self.position.lock().unwrap();
*pos = self.config.fallback_position;
// Update satellites with simulated value
let mut sats = self.satellites.lock().unwrap();
*sats = 8; // Simulate 8 satellites for non-Linux platforms
return Ok(());
}
// Continue with normal initialization for Linux
// Check if GPS is enabled in config
if !self.config.enable_gps {
info!("GPS module disabled in configuration. Using fallback position.");
@ -112,7 +141,6 @@ impl GpsController {
let mut init_failed = false;
// Open the serial port
let port = serialport::new(&self.config.port, self.config.baud_rate)
.data_bits(Eight)
.flow_control(serialport::FlowControl::None)
@ -143,6 +171,8 @@ impl GpsController {
}
// Initialize PPS pin if enabled and GPS initialized successfully
// This is only compiled on Linux
#[cfg(target_os = "linux")]
if self.config.use_pps && !self.gps_state.degraded {
match Gpio::new() {
Ok(gpio) => {
@ -195,6 +225,71 @@ impl GpsController {
/// Start GPS processing
pub async fn start(&self) -> Result<()> {
// Handle simulated mode for non-Linux platforms
#[cfg(not(target_os = "linux"))]
{
info!("Starting GPS in simulated mode on non-Linux platform");
// Set fallback position and status
{
let mut pos = self.position.lock().unwrap();
*pos = self.config.fallback_position;
let mut status = self.sync_status.lock().unwrap();
*status = SyncStatus::GpsOnly; // Simulate GPS sync
// Send an initial position update with fallback
let _ = self.position_tx.send(self.config.fallback_position);
}
{
let mut is_running = self.is_running.lock().unwrap();
*is_running = true;
}
// Start a simulation thread for periodic updates
let position_tx = self.position_tx.clone();
let position = self.position.clone();
let is_running = self.is_running.clone();
let fallback_position = self.config.fallback_position;
thread::spawn(move || {
info!("Starting GPS simulation thread");
while {
let is_running = is_running.lock().unwrap();
*is_running
} {
// Sleep for a bit to simulate update interval
thread::sleep(time::Duration::from_secs(1));
// Generate small random variations to the position
let mut rng = rand::thread_rng();
let lat_variation = (rng.gen::<f64>() - 0.5) * 0.0001; // ~10m variation
let lon_variation = (rng.gen::<f64>() - 0.5) * 0.0001;
let new_position = GeoPosition {
latitude: fallback_position.latitude + lat_variation,
longitude: fallback_position.longitude + lon_variation,
altitude: fallback_position.altitude + (rng.gen::<f64>() - 0.5) * 2.0,
};
// Update position
{
let mut pos = position.lock().unwrap();
*pos = new_position;
// Send position update
let _ = position_tx.send(new_position);
}
}
info!("GPS simulation thread stopped");
});
return Ok(());
}
// Handle degraded mode
if self.gps_state.degraded {
info!("Starting GPS in degraded mode (using fallback position)");
@ -245,13 +340,14 @@ impl GpsController {
// Clone fallback position in case we need it later
let fallback_position = self.config.fallback_position;
let allow_degraded = self.config.allow_degraded_mode;
let baud_rate = self.config.baud_rate;
// Create a separate thread for GPS processing (blocking I/O)
thread::spawn(move || {
info!("Starting GPS processing on port {}", port_name);
// Open the serial port
let port = serialport::new(&port_name, self.config.baud_rate)
let port = serialport::new(&port_name, baud_rate)
.data_bits(Eight)
.flow_control(serialport::FlowControl::None)
.parity(serialport::Parity::None)
@ -465,6 +561,66 @@ impl GpsController {
pub fn get_camera_orientation(&self) -> CameraOrientation {
self.config.camera_orientation
}
/// Get current position with complete status information
pub async fn get_current_position(&self) -> Result<serde_json::Value> {
// Get the basic GPS status
let status = self.get_status();
// Get additional system time information
let system_time = Utc::now();
let precise_time = self.get_precise_time();
let time_diff_ms = (precise_time - system_time).num_milliseconds().abs() as f64;
// Calculate additional derived values
let fix_age_seconds = if let Some(last_update) = *self.last_update.lock().unwrap() {
(Utc::now() - last_update).num_seconds()
} else {
-1 // No fix yet
};
// Determine if the position is from fallback
let is_fallback = self.gps_state.degraded ||
!self.has_fix() ||
fix_age_seconds > 30; // Stale fix
// Construct a detailed status object
let position_status = serde_json::json!({
// Basic position data
"position": {
"latitude": status.position.latitude,
"longitude": status.position.longitude,
"altitude": status.position.altitude,
"is_fallback": is_fallback,
"satellites": status.satellites,
},
// Timing information
"timing": {
"timestamp": precise_time.to_rfc3339(),
"sync_status": status.sync_status,
"time_accuracy_ms": status.time_accuracy_ms,
"system_time_offset_ms": time_diff_ms,
"fix_age_seconds": fix_age_seconds,
},
// Camera orientation
"orientation": {
"azimuth": status.camera_orientation.azimuth,
"elevation": status.camera_orientation.elevation,
},
// System status
"system": {
"hardware_initialized": self.gps_state.initialized,
"running": *self.is_running.lock().unwrap(),
"degraded_mode": self.gps_state.degraded,
"has_fix": self.has_fix(),
}
});
Ok(position_status)
}
}
impl Drop for GpsController {

View File

@ -1,3 +1,10 @@
// Platform detection
#[cfg(target_os = "linux")]
pub const PLATFORM_SUPPORTS_GPIO: bool = true;
#[cfg(not(target_os = "linux"))]
pub const PLATFORM_SUPPORTS_GPIO: bool = false;
mod camera;
mod config;
mod detection;
@ -11,32 +18,57 @@ mod communication;
mod monitoring;
use anyhow::{Context, Result};
use log::{info, error};
use log::{info, error, warn};
use std::path::PathBuf;
use std::sync::{Arc,Mutex as StdMutex};
use tokio::sync::Mutex;
use gstreamer_rtsp_server::prelude::SettingsExt;
use tokio::sync::{Mutex, MutexGuard};
pub use config::Config;
use crate::overlay::Watermark;
/// Main entry point for the meteor detection system
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logger with default settings
env_logger::init_from_env(
// Create a logger builder with default settings
let mut builder = env_logger::Builder::from_env(
env_logger::Env::default().filter_or("RUST_LOG", "info")
);
info!("Meteor detection system starting up");
// Load configuration
let config = config::load_config()
.context("Failed to load configuration")?;
// Re-initialize logger with configured log level
// Update the logger with configured log level
std::env::set_var("RUST_LOG", &config.log_level);
env_logger::builder().init();
builder.parse_env(env_logger::Env::default());
builder.init();
info!("Loaded configuration with device ID: {}", config.device_id);
info!("Meteor detection system starting up");
// // Initialize logger with default settings
// env_logger::init_from_env(
// env_logger::Env::default().filter_or("RUST_LOG", "info")
// );
//
// info!("Meteor detection system starting up");
// Check if we're running on a platform that supports GPIO
if PLATFORM_SUPPORTS_GPIO {
info!("Running on a Linux platform with GPIO support");
} else {
info!("Running on a platform without GPIO support (macOS/Windows/etc). Using simulated sensors.");
}
// // Load configuration
// let config = config::load_config()
// .context("Failed to load configuration")?;
//
// // Re-initialize logger with configured log level
// std::env::set_var("RUST_LOG", &config.log_level);
// env_logger::builder().init();
//
info!("Loaded configuration with device ID: {}", config.device.id);
// Initialize camera subsystem
let camera_controller = camera::CameraController::new(&config)
@ -80,17 +112,17 @@ async fn main() -> Result<()> {
// Initialize detection pipeline
let detection_pipeline = detection::DetectionPipeline::new(
camera_controller.clone(),
&config,
config.detection.pipeline
&config.clone(),
config.detection.pipeline.clone()
).context("Failed to initialize detection pipeline")?;
// Initialize storage system
let storage_manager = storage::StorageManager::new(&config)
let storage_manager = storage::StorageManager::new(&(config.clone()))
.await
.context("Failed to initialize storage")?;
// Initialize communication module
let comms = communication::CommunicationManager::new(
let mut comms = communication::CommunicationManager::new(
&config,
camera_controller.clone(),
gps_controller.clone(),
@ -129,15 +161,37 @@ async fn main() -> Result<()> {
// Add watermark hook
{
let mut manager = hook_manager.lock().await;
let watermark_clone = watermark.clone();
manager.register_hook(Box::new(hooks::BasicFrameHook::new(
"watermark",
"Watermark Overlay",
"Adds timestamp, GPS, and sensor data overlay to frames",
config.watermark.enabled,
move |frame, timestamp| {
let mut watermark_instance = watermark.lock().unwrap();
watermark_instance.apply(frame, timestamp)?;
Ok(())
// Using try_lock to avoid the async issue in a sync context
tokio::task::block_in_place(|| {
let mut guard = futures::executor::block_on(watermark_clone.lock());
guard.apply(frame, timestamp)
})
// tokio::task::block_in_place(|| {
// match watermark_clone.try_lock() {
// Ok(mut guard) => guard.apply(frame, timestamp),
// Err(e) => Err(anyhow::anyhow!("Failed to lock watermark mutex: {}", e))
// }
// })
// tokio::task::block_in_place(|| {
// match futures::executor::block_on(watermark_clone.lock()) {
// Ok(mut guard) => guard.apply(frame, timestamp),
// Err(e) => Err(anyhow::anyhow!("Failed to lock watermark mutex: {}", e))
// }
// })
// tokio::task::block_in_place(|| {
// let mutex_guard = futures::executor::block_on(watermark_clone.lock());
// match mutex_guard {
// Ok(mut guard) => guard.apply(frame, timestamp),
// Err(e) => Err(anyhow::anyhow!("Failed to lock watermark mutex: {}", e))
// }
// })
},
)));
}

View File

@ -8,6 +8,7 @@ use tokio::sync::broadcast;
use tokio::time;
use crate::sensors::dht22::Dht22Sensor;
use crate::PLATFORM_SUPPORTS_GPIO;
use crate::sensors::{EnvironmentData, LightSensor, SensorConfig, TemperatureHumiditySensor};
/// A simple light sensor implementation that uses camera brightness as a proxy
@ -120,6 +121,8 @@ impl SensorController {
if self.config.use_dht22 {
info!("Initializing DHT22 temperature/humidity sensor");
#[cfg(feature = "gpio")]
{
match Dht22Sensor::new(self.config.dht22_pin) {
Ok(dht22) => {
self.temp_humidity_sensor = Some(Box::new(dht22));
@ -138,6 +141,30 @@ impl SensorController {
}
}
}
}
#[cfg(not(feature = "gpio"))]
{
// On platforms without GPIO, use simulated sensor
match Dht22Sensor::new(self.config.dht22_pin) {
Ok(dht22) => {
self.temp_humidity_sensor = Some(Box::new(dht22));
self.temp_sensor_state.initialized = true;
info!("DHT22 temperature/humidity sensor initialized in simulation mode");
},
Err(e) => {
self.temp_sensor_state.init_failures += 1;
self.temp_sensor_state.degraded = true;
if self.config.allow_degraded_mode {
warn!("Failed to initialize DHT22 sensor simulation: {}. Using fallback values.", e);
init_failed = true;
} else {
return Err(anyhow!("Failed to initialize temperature sensor simulation and degraded mode is not allowed: {}", e));
}
}
}
}
} else {
// Sensor is not enabled, mark as degraded and use fallback
info!("Temperature sensor disabled in config. Using fallback values.");
@ -344,7 +371,8 @@ impl SensorController {
camera_sensor.update_brightness(brightness);
// Update the current data
let mut data = self.current_data.lock()?;
match self.current_data.lock() {
Ok(mut data) => {
data.sky_brightness = brightness;
data.timestamp = Utc::now();
@ -352,6 +380,11 @@ impl SensorController {
let _ = self.data_tx.send(data.clone());
return Ok(());
},
Err(e) => {
return Err(anyhow!("Failed to lock current_data: {}", e));
}
}
}
}

View File

@ -1,8 +1,14 @@
use anyhow::{anyhow, Context, Result};
use log::{debug, error, warn};
use rppal::gpio::{Gpio, Level, Mode, OutputPin};
use std::thread;
use std::time::{Duration, Instant};
use std::sync::Mutex;
#[cfg(feature = "gpio")]
use rppal::gpio::{Gpio, Level, Mode, OutputPin};
#[cfg(not(feature = "gpio"))]
use rand::Rng;
use crate::sensors::TemperatureHumiditySensor;
@ -10,6 +16,12 @@ use crate::sensors::TemperatureHumiditySensor;
pub struct Dht22Sensor {
/// GPIO pin number (BCM)
pin: u8,
/// Mutable state protected by a mutex
state: Mutex<SensorState>,
}
/// Mutable state for the sensor
struct SensorState {
/// Last temperature reading
last_temperature: f32,
/// Last humidity reading
@ -23,16 +35,21 @@ impl Dht22Sensor {
pub fn new(pin: u8) -> Result<Self> {
Ok(Self {
pin,
state: Mutex::new(SensorState {
last_temperature: 0.0,
last_humidity: 0.0,
last_reading: Instant::now() - Duration::from_secs(10),
}),
})
}
/// Read raw data from DHT22 sensor
fn read_raw(&mut self) -> Result<(f32, f32)> {
#[cfg(feature = "gpio")]
fn read_raw(&self) -> Result<(f32, f32)> {
let mut state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?;
// Ensure we don't read too frequently (DHT22 needs 2+ seconds between readings)
let elapsed = self.last_reading.elapsed();
let elapsed = state.last_reading.elapsed();
if elapsed < Duration::from_secs(2) {
thread::sleep(Duration::from_secs(2) - elapsed);
}
@ -113,32 +130,67 @@ impl Dht22Sensor {
};
// Store readings
self.last_temperature = temperature;
self.last_humidity = humidity;
self.last_reading = Instant::now();
state.last_temperature = temperature;
state.last_humidity = humidity;
state.last_reading = Instant::now();
debug!("DHT22 read: temperature={:.1}°C, humidity={:.1}%", temperature, humidity);
Ok((temperature, humidity))
}
/// Simulated read for platforms without GPIO support
#[cfg(not(feature = "gpio"))]
fn read_raw(&self) -> Result<(f32, f32)> {
let mut state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?;
// Ensure we don't read too frequently (simulate real sensor behavior)
let elapsed = state.last_reading.elapsed();
if elapsed < Duration::from_secs(2) {
thread::sleep(Duration::from_secs(2) - elapsed);
}
// Use fixed values when GPIO is not available
let mut rng = rand::thread_rng();
let temperature = 25.0 + (rng.gen::<f32>() * 2.0 - 1.0); // Default 25°C ± 1°C
let humidity = 50.0 + (rng.gen::<f32>() * 5.0 - 2.5); // Default 50% ± 2.5%
// Store readings
state.last_temperature = temperature;
state.last_humidity = humidity;
state.last_reading = Instant::now();
debug!("DHT22 simulation: temperature={:.1}°C, humidity={:.1}%", temperature, humidity);
Ok((temperature, humidity))
}
}
impl TemperatureHumiditySensor for Dht22Sensor {
fn read_temperature(&self) -> Result<f32> {
// Lock state to check cached values
let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?;
// If we've read recently, return cached value
if self.last_reading.elapsed() < Duration::from_secs(2) {
return Ok(self.last_temperature);
if state.last_reading.elapsed() < Duration::from_secs(2) {
return Ok(state.last_temperature);
}
// Release the lock before trying to read new values to avoid deadlock
drop(state);
// Try to read new values
match self.read_raw() {
Ok((temp, _)) => Ok(temp),
Err(e) => {
error!("Failed to read DHT22 temperature: {}", e);
if self.last_reading.elapsed() < Duration::from_secs(60) {
// Re-acquire the lock to check if we can use cached value
let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?;
if state.last_reading.elapsed() < Duration::from_secs(60) {
// If last reading was recent, return cached value
warn!("Using cached temperature value");
Ok(self.last_temperature)
Ok(state.last_temperature)
} else {
Err(e)
}
@ -147,20 +199,29 @@ impl TemperatureHumiditySensor for Dht22Sensor {
}
fn read_humidity(&self) -> Result<f32> {
// Lock state to check cached values
let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?;
// If we've read recently, return cached value
if self.last_reading.elapsed() < Duration::from_secs(2) {
return Ok(self.last_humidity);
if state.last_reading.elapsed() < Duration::from_secs(2) {
return Ok(state.last_humidity);
}
// Release the lock before trying to read new values to avoid deadlock
drop(state);
// Try to read new values
match self.read_raw() {
Ok((_, humidity)) => Ok(humidity),
Err(e) => {
error!("Failed to read DHT22 humidity: {}", e);
if self.last_reading.elapsed() < Duration::from_secs(60) {
// Re-acquire the lock to check if we can use cached value
let state = self.state.lock().map_err(|_| anyhow!("Failed to lock sensor state"))?;
if state.last_reading.elapsed() < Duration::from_secs(60) {
// If last reading was recent, return cached value
warn!("Using cached humidity value");
Ok(self.last_humidity)
Ok(state.last_humidity)
} else {
Err(e)
}

View File

@ -225,12 +225,16 @@ impl RtspServer {
Ok(())
},
Err(e) => {
if e.is_full() {
// Check which kind of error it is
match e {
mpsc::error::TrySendError::Full(_) => {
// Channel is full, drop frame
debug!("RTSP frame channel full, dropping frame");
Ok(())
} else {
Err(anyhow!("Failed to send frame to RTSP server: {}", e))
},
mpsc::error::TrySendError::Closed(_) => {
Err(anyhow!("Failed to send frame to RTSP server: channel closed"))
}
}
}
}