meteor_detect/src/camera/controller.rs

641 lines
25 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use anyhow::{Context, Result};
use chrono::Utc;
use log::{debug, error, info, warn};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time;
use crate::camera::frame_buffer::{Frame, FrameBuffer, SharedFrameBuffer};
use crate::camera::opencv::{OpenCVCamera, OpenCVCaptureStream};
use crate::camera::{CameraSettings, ExposureMode, MeteorEvent, Resolution};
/// Camera controller manages camera operations and frame capture
pub struct CameraController {
/// Camera settings
settings: CameraSettings,
/// The OpenCV camera driver
camera: Option<OpenCVCamera>,
/// The OpenCV capture stream
stream: Option<OpenCVCaptureStream>,
/// Circular buffer for storing recent frames
frame_buffer: SharedFrameBuffer,
/// Frame counter
frame_count: u64,
/// Whether the camera is currently running
is_running: bool,
/// Channel for broadcasting new frames
frame_tx: broadcast::Sender<Frame>,
/// Path to save event videos
events_dir: PathBuf,
}
impl CameraController {
/// Create a new camera controller with the given configuration
pub async fn new(config: &crate::config::Config) -> Result<Self> {
// Extract camera settings from config (placeholder for now)
let settings = config.clone().camera;
// Create frame buffer with capacity for 10 minutes of video at settings.fps
let buffer_capacity = (10 * 60 * settings.fps) as usize;
let frame_buffer = Arc::new(FrameBuffer::new(buffer_capacity));
// Create broadcast channel for frames
let (frame_tx, _) = broadcast::channel(30);
// Create events directory if it doesn't exist
let events_dir = PathBuf::from("events");
std::fs::create_dir_all(&events_dir).context("Failed to create events directory")?;
Ok(Self {
settings,
camera: None,
stream: None,
frame_buffer,
frame_count: 0,
is_running: false,
frame_tx,
events_dir,
})
}
/// Initialize the camera with current settings
pub async fn initialize(&mut self) -> Result<()> {
// 使用私有方法实现递归安全的异步初始化
self.initialize_impl().await
}
/// 内部实现,用于避免异步递归导致的无限大小 future 问题
async fn initialize_impl(&mut self) -> Result<()> {
// Open the camera
let mut camera =
OpenCVCamera::open(&self.settings.device).context("Failed to open camera")?;
// Configure camera parameters
camera
.set_format(self.settings.resolution)
.context("Failed to set camera format")?;
camera
.set_fps(self.settings.fps)
.context("Failed to set camera FPS")?;
camera
.set_exposure(self.settings.exposure)
.context("Failed to set camera exposure")?;
camera
.set_gain(self.settings.gain)
.context("Failed to set camera gain")?;
if self.settings.focus_locked {
camera
.lock_focus_at_infinity()
.context("Failed to lock focus at infinity")?;
}
// 尝试拍摄测试帧以确保相机设置正确
info!("Testing camera by capturing a test frame...");
let mut test_stream = match camera.start_streaming() {
Ok(stream) => stream,
Err(e) => {
error!("Failed to start test stream: {}", e);
// 检查是否在树莓派上以及是否已经使用GStreamer
let is_raspberry_pi = std::fs::read_to_string("/proc/cpuinfo")
.map(|content| content.contains("Raspberry Pi") || content.contains("BCM"))
.unwrap_or(false);
if is_raspberry_pi && !self.settings.device.contains("!") {
// 在树莓派上但没有使用GStreamer pipeline尝试使用pipeline重新初始化
warn!("Running on Raspberry Pi without GStreamer pipeline. Attempting to reinitialize with GStreamer...");
// 创建GStreamer pipeline
let device_num = self.settings.device.strip_prefix("/dev/video")
.unwrap_or("0");
let pipeline = format!(
"v4l2src device=/dev/video{} ! video/x-raw,width={},height={},framerate={}/1 ! videoconvert ! appsink",
device_num, self.settings.resolution.dimensions().0,
self.settings.resolution.dimensions().1, self.settings.fps
);
info!("Trying GStreamer pipeline: {}", pipeline);
self.settings.device = pipeline;
// 使用 Box::pin 来解决递归 async 调用问题
// 这样创建一个固定大小的 future
return Box::pin(self.initialize_impl()).await;
}
return Err(anyhow::anyhow!("Camera initialization failed: {}", e));
}
};
// 尝试捕获测试帧
match test_stream.capture_frame() {
Ok(_) => {
info!("Successfully captured test frame - camera is working correctly");
},
Err(e) => {
error!("Failed to capture test frame: {}", e);
return Err(anyhow::anyhow!("Camera initialization failed: Unable to capture test frame"));
}
}
self.camera = Some(camera);
info!("Camera initialized successfully");
Ok(())
}
/// Start camera capture in a background task
pub async fn start_capture(&mut self) -> Result<()> {
// 使用私有方法实现递归安全的异步函数
self.start_capture_impl().await
}
/// 内部实现,用于避免异步递归导致的无限大小 future 问题
async fn start_capture_impl(&mut self) -> Result<()> {
if self.is_running {
warn!("Camera capture is already running");
return Ok(());
}
let camera = self
.camera
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Camera not initialized"))?;
// Start the camera streaming
info!("Starting camera streaming with device: {}", self.settings.device);
let stream_result = camera.start_streaming();
let stream = match stream_result {
Ok(stream) => stream,
Err(e) => {
error!("Failed to start camera streaming: {}", e);
// 检测是否在树莓派上
let is_raspberry_pi = std::fs::read_to_string("/proc/cpuinfo")
.map(|content| content.contains("Raspberry Pi") || content.contains("BCM"))
.unwrap_or(false);
if is_raspberry_pi && !self.settings.device.contains("!") {
// 在树莓派上但没有使用GStreamer pipeline尝试使用pipeline重新初始化
warn!("Running on Raspberry Pi without GStreamer pipeline. Attempting to reinitialize camera with GStreamer...");
// 关闭当前相机
self.camera = None;
// 创建GStreamer pipeline
let device_num = self.settings.device.strip_prefix("/dev/video")
.unwrap_or("0");
let pipeline = format!(
"v4l2src device=/dev/video{} ! video/x-raw,width={},height={},framerate={}/1 ! videoconvert ! appsink",
device_num, self.settings.resolution.dimensions().0,
self.settings.resolution.dimensions().1, self.settings.fps
);
info!("Trying GStreamer pipeline: {}", pipeline);
self.settings.device = pipeline;
// 重新初始化相机
match OpenCVCamera::open(&self.settings.device) {
Ok(mut camera) => {
// 基本配置
let _ = camera.set_format(self.settings.resolution);
let _ = camera.set_fps(self.settings.fps);
self.camera = Some(camera);
// 使用 Box::pin 来解决递归 async 调用问题
return Box::pin(self.start_capture_impl()).await;
},
Err(e) => {
error!("Failed to reinitialize camera with GStreamer: {}", e);
return Err(anyhow::anyhow!("Camera streaming failed on Raspberry Pi: {}", e));
}
}
}
return Err(anyhow::anyhow!("Failed to start camera streaming: {}", e));
}
};
self.stream = Some(stream);
self.is_running = true;
// 尝试捕获一个测试帧,确保流工作正常
if let Some(mut stream_test) = self.stream.as_mut() {
match stream_test.capture_frame() {
Ok(_) => info!("Successfully captured test frame - camera stream is working correctly"),
Err(e) => {
error!("Test frame capture failed: {}", e);
self.is_running = false;
self.stream = None;
return Err(anyhow::anyhow!("Camera stream test failed: {}", e));
}
}
}
// Clone necessary values for the capture task
let frame_buffer = self.frame_buffer.clone();
let frame_tx = self.frame_tx.clone();
let fps = self.settings.fps;
let device = self.settings.device.clone(); // 复制设备路径用于恢复
let mut stream = self
.stream
.take()
.expect("Stream just initialized but is None");
let mut frame_count = self.frame_count;
// Start capture task
tokio::spawn(async move {
let frame_interval = Duration::from_secs_f64(1.0 / fps as f64);
let mut interval = time::interval(frame_interval);
// Error tracking for recovery
let mut consecutive_errors = 0;
const ERROR_THRESHOLD: u32 = 10; // After 10 errors, try to reinitialize
info!("Starting camera capture at {} fps", fps);
loop {
interval.tick().await;
match stream.capture_frame() {
Ok(mat) => {
// Reset error counter on success
consecutive_errors = 0;
// Create a new frame with timestamp
let frame = Frame::new(mat, Utc::now(), frame_count);
frame_count += 1;
// Add to frame buffer
if let Err(e) = frame_buffer.push(frame.clone()) {
error!("Failed to add frame to buffer: {}", e);
}
// Broadcast frame to listeners
let _ = frame_tx.send(frame);
}
Err(e) => {
error!("Failed to capture frame: {}", e);
consecutive_errors += 1;
if consecutive_errors >= ERROR_THRESHOLD {
error!("Too many consecutive errors ({}), attempting to reinitialize camera stream", consecutive_errors);
// 检测是否在树莓派上
let is_raspberry_pi = std::fs::read_to_string("/proc/cpuinfo")
.map(|content| content.contains("Raspberry Pi") || content.contains("BCM"))
.unwrap_or(false);
// 判断是否需要尝试GStreamer
let use_gstreamer = is_raspberry_pi && !device.contains("!");
if use_gstreamer {
// 创建GStreamer pipeline
let device_num = device.strip_prefix("/dev/video").unwrap_or("0");
let pipeline = format!(
"v4l2src device=/dev/video{} ! video/x-raw,width={},height={},framerate={}/1 ! videoconvert ! appsink",
device_num, 1280, 720, fps
);
error!("Raspberry Pi detected - trying GStreamer pipeline: {}", pipeline);
// 尝试使用GStreamer打开摄像头
match OpenCVCamera::open(&pipeline) {
Ok(mut camera) => {
// 配置最低限度设置
let _ = camera.set_format(Resolution::HD720p);
let _ = camera.set_fps(fps);
// 尝试重新启动流
match camera.start_streaming() {
Ok(new_stream) => {
info!("Successfully reinitialized camera stream with GStreamer");
stream = new_stream;
consecutive_errors = 0;
},
Err(e) => error!("Failed to restart camera streaming with GStreamer: {}", e)
}
},
Err(e) => error!("Failed to reopen camera with GStreamer: {}", e)
}
} else {
// 使用原始设备路径重新创建流
match OpenCVCamera::open(&device) {
Ok(mut camera) => {
// 配置最低限度设置
let _ = camera.set_format(Resolution::HD720p);
let _ = camera.set_fps(fps);
// 尝试重新启动流
match camera.start_streaming() {
Ok(new_stream) => {
info!("Successfully reinitialized camera stream");
stream = new_stream;
consecutive_errors = 0;
},
Err(e) => error!("Failed to restart camera streaming: {}", e)
}
},
Err(e) => error!("Failed to reopen camera: {}", e)
}
}
}
// Small delay to avoid tight error loop
time::sleep(Duration::from_millis(100)).await;
}
}
}
});
info!("Camera capture started");
Ok(())
}
/// Stop camera capture
pub async fn stop_capture(&mut self) -> Result<()> {
if !self.is_running {
warn!("Camera capture is not running");
return Ok(());
}
// The stream will be stopped when dropped
self.stream = None;
if let Some(camera) = &mut self.camera {
camera.stop_streaming()?;
}
self.is_running = false;
info!("Camera capture stopped");
Ok(())
}
/// Get a subscriber to receive new frames
pub fn subscribe_to_frames(&self) -> broadcast::Receiver<Frame> {
self.frame_tx.subscribe()
}
/// Get a clone of the frame buffer
pub fn get_frame_buffer(&self) -> SharedFrameBuffer {
self.frame_buffer.clone()
}
/// Check if the camera is currently running
pub fn is_running(&self) -> bool {
self.is_running
}
/// Update camera settings
pub async fn update_settings(&mut self, new_settings: CameraSettings) -> Result<()> {
// If camera is running, we need to stop it first
let was_running = self.is_running();
if was_running {
self.stop_capture().await?;
}
// Update settings
self.settings = new_settings;
// Re-initialize camera with new settings
if let Some(mut camera) = self.camera.take() {
// Configure camera parameters
camera.set_format(self.settings.resolution)?;
camera.set_fps(self.settings.fps)?;
camera.set_exposure(self.settings.exposure)?;
camera.set_gain(self.settings.gain)?;
if self.settings.focus_locked {
camera.lock_focus_at_infinity()?;
}
self.camera = Some(camera);
} else {
self.initialize_impl().await?;
}
// Restart if it was running
if was_running {
self.start_capture_impl().await?;
}
info!("Camera settings updated");
Ok(())
}
/// Save a meteor event with video
pub async fn save_meteor_event(
&self,
timestamp: chrono::DateTime<Utc>,
confidence: f32,
bounding_box: (u32, u32, u32, u32),
seconds_before: i64,
seconds_after: i64,
) -> Result<MeteorEvent> {
// Extract frames from the buffer
let frames =
self.frame_buffer
.extract_event_frames(timestamp, seconds_before, seconds_after);
if frames.is_empty() {
return Err(anyhow::anyhow!("No frames found for event"));
}
// Create a unique ID for the event
let event_id = uuid::Uuid::new_v4();
// Create a directory for the event
let event_dir = self.events_dir.join(event_id.to_string());
std::fs::create_dir_all(&event_dir).context("Failed to create event directory")?;
// Save frames to the event directory
for (i, frame) in frames.iter().enumerate() {
let frame_path = event_dir.join(format!("frame_{:04}.jpg", i));
frame.save_to_file(&frame_path)?;
}
// Create a video file name
let video_path = event_dir.join("event.mp4").to_string_lossy().to_string();
// Use tokio::process to call FFmpeg to convert frames to video
use tokio::process::Command;
info!("Converting frames to video using FFmpeg");
let ffmpeg_result = Command::new("ffmpeg")
.args(&[
"-y", // Overwrite output file if it exists
"-framerate", &self.settings.fps.to_string(),
"-i", &event_dir.join("frame_%04d.jpg").to_string_lossy(),
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
&video_path
])
.output()
.await;
match ffmpeg_result {
Ok(output) => {
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
warn!("FFmpeg warning: {}", stderr);
} else {
info!("Successfully created video at: {}", video_path);
}
},
Err(e) => {
warn!("Failed to execute FFmpeg: {}. Video will not be created, only individual frames saved.", e);
}
}
// Create and return the event
let event = MeteorEvent {
id: event_id,
timestamp,
confidence,
bounding_box,
video_path,
};
info!("Saved meteor event: {}", event_id);
Ok(event)
}
/// Check the health of the camera system
pub async fn check_health(&self) -> Result<bool> {
// If camera is not running, consider it healthy
if !self.is_running {
return Ok(true);
}
// Check if we've received frames recently
if let Some(frame) = self.frame_buffer.get(0) {
let now = Utc::now();
let elapsed = now.signed_duration_since(frame.timestamp);
// If no new frames in last 5 seconds, camera may be stuck
if elapsed.num_seconds() > 5 {
warn!("Camera health check: No new frames received in {} seconds", elapsed.num_seconds());
return Ok(false);
}
} else if self.is_running {
// If no frames at all but camera is running, something is wrong
warn!("Camera health check: Camera is running but no frames in buffer");
return Ok(false);
}
// All checks passed
Ok(true)
}
/// Get the current status of the camera
pub async fn get_status(&self) -> Result<serde_json::Value> {
let frame_buffer_stats = {
let buffer = self.frame_buffer.clone();
let length = buffer.len();
let capacity = buffer.capacity();
serde_json::json!({
"length": length,
"capacity": capacity,
"utilization_percent": if capacity > 0 { (length as f64 / capacity as f64) * 100.0 } else { 0.0 }
})
};
let recent_frame = {
// Get the most recent frame (index 0)
if let Some(frame) = self.frame_buffer.get(0) {
let timestamp = frame.timestamp.to_rfc3339();
serde_json::json!({
"timestamp": timestamp,
"frame_number": frame.index
})
} else {
serde_json::json!(null)
}
};
let camera_info = {
serde_json::json!({
"device": self.settings.device,
"resolution": format!("{:?}", self.settings.resolution),
"fps": self.settings.fps,
"exposure_mode": format!("{:?}", self.settings.exposure),
"gain": self.settings.gain,
"focus_locked": self.settings.focus_locked
})
};
let status = serde_json::json!({
"running": self.is_running,
"frame_count": self.frame_count,
"settings": {
"device": self.settings.device,
"resolution": format!("{:?}", self.settings.resolution),
"fps": self.settings.fps
},
"camera_info": camera_info,
"frame_buffer": frame_buffer_stats,
"recent_frame": recent_frame,
"events_dir": self.events_dir.to_string_lossy()
});
debug!("Camera status: {}", status);
Ok(status)
}
}
impl Drop for CameraController {
fn drop(&mut self) {
info!("Cleaning up camera controller resources");
// Stop streaming if running
if self.is_running {
// Use block_in_place since drop can't be async
if let Err(e) = tokio::task::block_in_place(|| {
futures::executor::block_on(self.stop_capture())
}) {
error!("Error stopping camera during cleanup: {}", e);
}
}
// Release camera resources
self.camera = None;
self.stream = None;
info!("Camera controller resources released");
}
}
// 为CameraController实现Clone trait
impl Clone for CameraController {
fn clone(&self) -> Self {
// 创建新的广播通道
let (frame_tx, _) = tokio::sync::broadcast::channel(30);
Self {
settings: self.settings.clone(),
camera: None, // 不克隆相机实例,因为它包含底层资源
stream: None, // 不克隆流,因为它也包含底层资源
frame_buffer: self.frame_buffer.clone(),
frame_count: self.frame_count,
is_running: self.is_running,
frame_tx, // 使用新创建的广播通道
events_dir: self.events_dir.clone(),
}
}
}