use anyhow::{Context, Result}; use chrono::Utc; use log::{debug, error, info, warn}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::broadcast; use tokio::time; use crate::camera::frame_buffer::{Frame, FrameBuffer, SharedFrameBuffer}; use crate::camera::opencv::{OpenCVCamera, OpenCVCaptureStream}; use crate::camera::{CameraSettings, ExposureMode, MeteorEvent, Resolution}; /// Camera controller manages camera operations and frame capture pub struct CameraController { /// Camera settings settings: CameraSettings, /// The OpenCV camera driver camera: Option, /// The OpenCV capture stream stream: Option, /// Circular buffer for storing recent frames frame_buffer: SharedFrameBuffer, /// Frame counter frame_count: u64, /// Whether the camera is currently running is_running: bool, /// Channel for broadcasting new frames frame_tx: broadcast::Sender, /// Path to save event videos events_dir: PathBuf, } impl CameraController { /// Create a new camera controller with the given configuration pub async fn new(config: &crate::config::Config) -> Result { // Extract camera settings from config (placeholder for now) let settings = config.clone().camera; // Create frame buffer with capacity for 10 minutes of video at settings.fps let buffer_capacity = (10 * 60 * settings.fps) as usize; let frame_buffer = Arc::new(FrameBuffer::new(buffer_capacity)); // Create broadcast channel for frames let (frame_tx, _) = broadcast::channel(30); // Create events directory if it doesn't exist let events_dir = PathBuf::from("events"); std::fs::create_dir_all(&events_dir).context("Failed to create events directory")?; Ok(Self { settings, camera: None, stream: None, frame_buffer, frame_count: 0, is_running: false, frame_tx, events_dir, }) } /// Initialize the camera with current settings pub async fn initialize(&mut self) -> Result<()> { // 使用私有方法实现递归安全的异步初始化 self.initialize_impl().await } /// 内部实现,用于避免异步递归导致的无限大小 future 问题 async fn initialize_impl(&mut self) -> Result<()> { // Open the camera let mut camera = OpenCVCamera::open(&self.settings.device).context("Failed to open camera")?; // Configure camera parameters camera .set_format(self.settings.resolution) .context("Failed to set camera format")?; camera .set_fps(self.settings.fps) .context("Failed to set camera FPS")?; camera .set_exposure(self.settings.exposure) .context("Failed to set camera exposure")?; camera .set_gain(self.settings.gain) .context("Failed to set camera gain")?; if self.settings.focus_locked { camera .lock_focus_at_infinity() .context("Failed to lock focus at infinity")?; } // 给相机一点时间初始化 info!("Waiting for camera to stabilize..."); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; // 尝试拍摄测试帧以确保相机设置正确 info!("Testing camera by capturing a test frame..."); // 直接使用已打开的主相机实例捕获测试帧,避免同时打开两个相机实例 let test_frame_result = camera.capture_test_frame(); // 处理测试帧结果 match test_frame_result { Ok(_) => { info!("Successfully captured test frame - camera is working correctly"); }, Err(e) => { error!("Failed to capture test frame: {}", e); // 检查是否在树莓派上,以及是否已经使用GStreamer let is_raspberry_pi = std::fs::read_to_string("/proc/cpuinfo") .map(|content| content.contains("Raspberry Pi") || content.contains("BCM")) .unwrap_or(false); if is_raspberry_pi && !self.settings.device.contains("!") { // 在树莓派上但没有使用GStreamer pipeline,尝试使用pipeline重新初始化 warn!("Running on Raspberry Pi without GStreamer pipeline. Attempting to reinitialize with GStreamer..."); // 创建更简单的GStreamer pipeline let pipeline = "v4l2src ! videoconvert ! appsink".to_string(); info!("Trying simple GStreamer pipeline: {}", pipeline); self.settings.device = pipeline; // 使用 Box::pin 来解决递归 async 调用问题 // 这样创建一个固定大小的 future return Box::pin(self.initialize_impl()).await; } return Err(anyhow::anyhow!("Camera initialization failed: {}", e)); } } // 给设备一点时间完全释放资源 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; self.camera = Some(camera); info!("Camera initialized successfully"); Ok(()) } /// Start camera capture in a background task pub async fn start_capture(&mut self) -> Result<()> { // 使用私有方法实现递归安全的异步函数 self.start_capture_impl().await } /// 内部实现,用于避免异步递归导致的无限大小 future 问题 async fn start_capture_impl(&mut self) -> Result<()> { if self.is_running { warn!("Camera capture is already running"); return Ok(()); } let camera = self .camera .as_mut() .ok_or_else(|| anyhow::anyhow!("Camera not initialized"))?; // 使用已有的相机实例,避免资源冲突 let camera_ref = self .camera .as_mut() .ok_or_else(|| anyhow::anyhow!("Camera not initialized"))?; // 确保相机没有在流式传输 if camera_ref.is_streaming() { camera_ref.stop_streaming()?; // 等待资源释放 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; } // 直接使用现有实例启动流 info!("Starting camera streaming with device: {}", self.settings.device); let stream_result = camera_ref.start_streaming(); let mut stream = match stream_result { Ok(stream) => stream, Err(e) => { error!("Failed to start camera streaming: {}", e); // 检测是否在树莓派上 let is_raspberry_pi = std::fs::read_to_string("/proc/cpuinfo") .map(|content| content.contains("Raspberry Pi") || content.contains("BCM")) .unwrap_or(false); if is_raspberry_pi && !self.settings.device.contains("!") { // 在树莓派上但没有使用GStreamer pipeline,尝试使用pipeline重新初始化 warn!("Running on Raspberry Pi without GStreamer pipeline. Attempting to reinitialize camera with GStreamer..."); // 创建更简单的GStreamer pipeline let pipeline = format!("v4l2src device={} ! videoconvert ! appsink", extract_device_path(&self.settings.device).unwrap_or("/dev/video0".to_string())); info!("Trying GStreamer pipeline: {}", pipeline); self.settings.device = pipeline; // 使用 Box::pin 来解决递归 async 调用问题 return Box::pin(self.start_capture_impl()).await; } return Err(anyhow::anyhow!("Failed to start camera streaming: {}", e)); } }; // 保存新的相机实例 self.camera = Some(new_camera); // 先保存一个测试帧副本 let test_frame = match stream.capture_frame() { Ok(frame) => { info!("Successfully captured test frame from stream - camera stream is working correctly"); Some(frame) }, Err(e) => { error!("Test frame capture failed: {}", e); return Err(anyhow::anyhow!("Camera stream test failed: {}", e)); } }; // 现在可以安全地移动stream的所有权 self.stream = Some(stream); self.is_running = true; // 如果有测试帧,则处理它(如果需要) if test_frame.is_some() { debug!("Test frame was successfully captured"); } // Clone necessary values for the capture task let frame_buffer = self.frame_buffer.clone(); let frame_tx = self.frame_tx.clone(); let fps = self.settings.fps; let device = self.settings.device.clone(); // 复制设备路径用于恢复 let mut stream = self .stream .take() .expect("Stream just initialized but is None"); let mut frame_count = self.frame_count; // Start capture task tokio::spawn(async move { let frame_interval = Duration::from_secs_f64(1.0 / fps as f64); let mut interval = time::interval(frame_interval); // Error tracking for recovery let mut consecutive_errors = 0; const ERROR_THRESHOLD: u32 = 10; // After 10 errors, try to reinitialize info!("Starting camera capture at {} fps", fps); loop { interval.tick().await; match stream.capture_frame() { Ok(mat) => { // Reset error counter on success consecutive_errors = 0; // Create a new frame with timestamp let frame = Frame::new(mat, Utc::now(), frame_count); frame_count += 1; // Add to frame buffer if let Err(e) = frame_buffer.push(frame.clone()) { error!("Failed to add frame to buffer: {}", e); } // Broadcast frame to listeners let _ = frame_tx.send(frame); } Err(e) => { error!("Failed to capture frame: {}", e); consecutive_errors += 1; if consecutive_errors >= ERROR_THRESHOLD { error!("Too many consecutive errors ({}), attempting to reinitialize camera stream", consecutive_errors); // 检测是否在树莓派上 let is_raspberry_pi = std::fs::read_to_string("/proc/cpuinfo") .map(|content| content.contains("Raspberry Pi") || content.contains("BCM")) .unwrap_or(false); // 判断是否需要尝试GStreamer let use_gstreamer = is_raspberry_pi && !device.contains("!"); if use_gstreamer { // 创建更简单的GStreamer pipeline let pipeline = "v4l2src ! videoconvert ! appsink".to_string(); error!("Raspberry Pi detected - trying simple GStreamer pipeline: {}", pipeline); // 尝试使用GStreamer打开摄像头 match OpenCVCamera::open(&pipeline) { Ok(mut camera) => { // 配置最低限度设置 let _ = camera.set_format(Resolution::HD720p); let _ = camera.set_fps(fps); // 尝试重新启动流 match camera.start_streaming() { Ok(new_stream) => { info!("Successfully reinitialized camera stream with GStreamer"); stream = new_stream; consecutive_errors = 0; }, Err(e) => error!("Failed to restart camera streaming with GStreamer: {}", e) } }, Err(e) => error!("Failed to reopen camera with GStreamer: {}", e) } } else { // 使用原始设备路径重新创建流 match OpenCVCamera::open(&device) { Ok(mut camera) => { // 配置最低限度设置 let _ = camera.set_format(Resolution::HD720p); let _ = camera.set_fps(fps); // 尝试重新启动流 match camera.start_streaming() { Ok(new_stream) => { info!("Successfully reinitialized camera stream"); stream = new_stream; consecutive_errors = 0; }, Err(e) => error!("Failed to restart camera streaming: {}", e) } }, Err(e) => error!("Failed to reopen camera: {}", e) } } } // Small delay to avoid tight error loop time::sleep(Duration::from_millis(100)).await; } } } }); info!("Camera capture started"); Ok(()) } /// Stop camera capture pub async fn stop_capture(&mut self) -> Result<()> { if !self.is_running { warn!("Camera capture is not running"); return Ok(()); } // The stream will be stopped when dropped self.stream = None; if let Some(camera) = &mut self.camera { camera.stop_streaming()?; } self.is_running = false; info!("Camera capture stopped"); Ok(()) } /// Get a subscriber to receive new frames pub fn subscribe_to_frames(&self) -> broadcast::Receiver { self.frame_tx.subscribe() } /// Get a clone of the frame buffer pub fn get_frame_buffer(&self) -> SharedFrameBuffer { self.frame_buffer.clone() } /// Check if the camera is currently running pub fn is_running(&self) -> bool { self.is_running } /// Update camera settings pub async fn update_settings(&mut self, new_settings: CameraSettings) -> Result<()> { // If camera is running, we need to stop it first let was_running = self.is_running(); if was_running { self.stop_capture().await?; } // Update settings self.settings = new_settings; // Re-initialize camera with new settings if let Some(mut camera) = self.camera.take() { // Configure camera parameters camera.set_format(self.settings.resolution)?; camera.set_fps(self.settings.fps)?; camera.set_exposure(self.settings.exposure)?; camera.set_gain(self.settings.gain)?; if self.settings.focus_locked { camera.lock_focus_at_infinity()?; } self.camera = Some(camera); } else { self.initialize_impl().await?; } // Restart if it was running if was_running { self.start_capture_impl().await?; } info!("Camera settings updated"); Ok(()) } /// Save a meteor event with video pub async fn save_meteor_event( &self, timestamp: chrono::DateTime, confidence: f32, bounding_box: (u32, u32, u32, u32), seconds_before: i64, seconds_after: i64, ) -> Result { // Extract frames from the buffer let frames = self.frame_buffer .extract_event_frames(timestamp, seconds_before, seconds_after); if frames.is_empty() { return Err(anyhow::anyhow!("No frames found for event")); } // Create a unique ID for the event let event_id = uuid::Uuid::new_v4(); // Create a directory for the event let event_dir = self.events_dir.join(event_id.to_string()); std::fs::create_dir_all(&event_dir).context("Failed to create event directory")?; // Save frames to the event directory for (i, frame) in frames.iter().enumerate() { let frame_path = event_dir.join(format!("frame_{:04}.jpg", i)); frame.save_to_file(&frame_path)?; } // Create a video file name let video_path = event_dir.join("event.mp4").to_string_lossy().to_string(); // Use tokio::process to call FFmpeg to convert frames to video use tokio::process::Command; info!("Converting frames to video using FFmpeg"); let ffmpeg_result = Command::new("ffmpeg") .args(&[ "-y", // Overwrite output file if it exists "-framerate", &self.settings.fps.to_string(), "-i", &event_dir.join("frame_%04d.jpg").to_string_lossy(), "-c:v", "libx264", "-preset", "medium", "-crf", "23", &video_path ]) .output() .await; match ffmpeg_result { Ok(output) => { if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); warn!("FFmpeg warning: {}", stderr); } else { info!("Successfully created video at: {}", video_path); } }, Err(e) => { warn!("Failed to execute FFmpeg: {}. Video will not be created, only individual frames saved.", e); } } // Create and return the event let event = MeteorEvent { id: event_id, timestamp, confidence, bounding_box, video_path, }; info!("Saved meteor event: {}", event_id); Ok(event) } /// Check the health of the camera system pub async fn check_health(&self) -> Result { // If camera is not running, consider it healthy if !self.is_running { return Ok(true); } // Check if we've received frames recently if let Some(frame) = self.frame_buffer.get(0) { let now = Utc::now(); let elapsed = now.signed_duration_since(frame.timestamp); // If no new frames in last 5 seconds, camera may be stuck if elapsed.num_seconds() > 5 { warn!("Camera health check: No new frames received in {} seconds", elapsed.num_seconds()); return Ok(false); } } else if self.is_running { // If no frames at all but camera is running, something is wrong warn!("Camera health check: Camera is running but no frames in buffer"); return Ok(false); } // All checks passed Ok(true) } /// Get the current status of the camera pub async fn get_status(&self) -> Result { let frame_buffer_stats = { let buffer = self.frame_buffer.clone(); let length = buffer.len(); let capacity = buffer.capacity(); serde_json::json!({ "length": length, "capacity": capacity, "utilization_percent": if capacity > 0 { (length as f64 / capacity as f64) * 100.0 } else { 0.0 } }) }; let recent_frame = { // Get the most recent frame (index 0) if let Some(frame) = self.frame_buffer.get(0) { let timestamp = frame.timestamp.to_rfc3339(); serde_json::json!({ "timestamp": timestamp, "frame_number": frame.index }) } else { serde_json::json!(null) } }; let camera_info = { serde_json::json!({ "device": self.settings.device, "resolution": format!("{:?}", self.settings.resolution), "fps": self.settings.fps, "exposure_mode": format!("{:?}", self.settings.exposure), "gain": self.settings.gain, "focus_locked": self.settings.focus_locked }) }; let status = serde_json::json!({ "running": self.is_running, "frame_count": self.frame_count, "settings": { "device": self.settings.device, "resolution": format!("{:?}", self.settings.resolution), "fps": self.settings.fps }, "camera_info": camera_info, "frame_buffer": frame_buffer_stats, "recent_frame": recent_frame, "events_dir": self.events_dir.to_string_lossy() }); debug!("Camera status: {}", status); Ok(status) } } /// 从设备字符串中提取/dev/video设备路径 fn extract_device_path(device_str: &str) -> Result { // 如果是简单的/dev/video路径,直接返回 if device_str.starts_with("/dev/video") { return Ok(device_str.to_string()); } // 如果是GStreamer pipeline,尝试提取device参数 if device_str.contains("v4l2src") && device_str.contains("device=") { // 匹配device=/dev/videoX 格式 if let Some(start_idx) = device_str.find("device=") { let device_part = &device_str[start_idx + 7..]; // "device=".len() == 7 // 如果device路径用引号括起来 if device_part.starts_with('"') || device_part.starts_with('\'') { let quote = device_part.chars().next().unwrap(); if let Some(end_idx) = device_part[1..].find(quote) { return Ok(device_part[1..=end_idx].to_string()); } } // 否则找到下一个空格或感叹号 else if let Some(end_idx) = device_part.find(|c| c == ' ' || c == '!') { return Ok(device_part[..end_idx].to_string()); } else { // 如果没有终止符,假设它是整个剩余部分 return Ok(device_part.to_string()); } } } // 没找到设备路径,返回默认值 Err(anyhow::anyhow!("Could not extract device path from: {}", device_str)) } impl Drop for CameraController { fn drop(&mut self) { info!("Cleaning up camera controller resources"); // Stop streaming if running if self.is_running { // Use block_in_place since drop can't be async if let Err(e) = tokio::task::block_in_place(|| { futures::executor::block_on(self.stop_capture()) }) { error!("Error stopping camera during cleanup: {}", e); } } // Release camera resources self.camera = None; self.stream = None; info!("Camera controller resources released"); } } // 为CameraController实现Clone trait impl Clone for CameraController { fn clone(&self) -> Self { // 创建新的广播通道 let (frame_tx, _) = tokio::sync::broadcast::channel(30); Self { settings: self.settings.clone(), camera: None, // 不克隆相机实例,因为它包含底层资源 stream: None, // 不克隆流,因为它也包含底层资源 frame_buffer: self.frame_buffer.clone(), frame_count: self.frame_count, is_running: self.is_running, frame_tx, // 使用新创建的广播通道 events_dir: self.events_dir.clone(), } } }