feat(edge-client): add OpenCV camera backend with macOS authorization support
- Add OpenCV as default camera backend, nokhwa as optional alternative - Make camera backends mutually exclusive via feature flags (opencv_camera, nokhwa_camera) - Remove deprecated hardware_camera feature, use nokhwa_camera instead - Add main thread camera initialization for macOS TCC authorization - Add pre-opened capture storage via static Mutex for async compatibility - Add pixel format conversion utilities (pixel_convert.rs) - Update all cfg guards from hardware_camera to nokhwa_camera macOS requires camera authorization requests on main thread. OpenCV's VideoCapture::new() is now called before tokio runtime starts, with the handle stored for later use by async code. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
6e819e0a53
commit
275fb05636
754
meteor-edge-client/Cargo.lock
generated
754
meteor-edge-client/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -5,13 +5,14 @@ edition = "2021"
|
||||
default-run = "meteor-edge-client"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
default = ["opencv_camera"]
|
||||
|
||||
# Production hardware features
|
||||
hardware_camera = []
|
||||
# Camera backends (mutually exclusive)
|
||||
opencv_camera = ["dep:opencv"]
|
||||
nokhwa_camera = ["dep:nokhwa"]
|
||||
|
||||
# Other features
|
||||
opencv_integration = []
|
||||
|
||||
# Debug and development features
|
||||
debug_logging = []
|
||||
performance_profiling = []
|
||||
|
||||
@ -22,6 +23,7 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
toml = "0.8"
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
tokio-util = { version = "0.7", features = ["full"] }
|
||||
anyhow = "1.0"
|
||||
thiserror = "1.0"
|
||||
dirs = "5.0"
|
||||
@ -63,6 +65,15 @@ mac_address = "1.1"
|
||||
# Camera interface dependencies
|
||||
async-trait = "0.1"
|
||||
|
||||
# Cross-platform camera capture backends (optional, mutually exclusive)
|
||||
# OpenCV - stable, proper resource release
|
||||
opencv = { version = "0.93", optional = true, default-features = false, features = ["videoio"] }
|
||||
|
||||
# nokhwa - alternative backend (has resource release issues on macOS)
|
||||
# - input-native: Use native camera backend (V4L2 on Linux, AVFoundation on macOS)
|
||||
# - output-threaded: Enable threaded/callback based camera for proper async support
|
||||
nokhwa = { version = "0.10", features = ["input-native", "output-threaded"], optional = true }
|
||||
|
||||
# Optional video processing backends
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
|
||||
@ -4,10 +4,16 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::interface::{CameraConfig, CameraInterface, CameraType};
|
||||
use super::production::ProductionCamera;
|
||||
use super::video_file::VideoFileCamera;
|
||||
use crate::memory::frame_pool::HierarchicalFramePool;
|
||||
|
||||
// Import the appropriate camera backend
|
||||
#[cfg(feature = "opencv_camera")]
|
||||
use super::opencv_camera::OpenCVCamera;
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
use super::production::ProductionCamera;
|
||||
|
||||
/// Factory for creating camera instances from configuration or specs
|
||||
pub struct CameraFactory {
|
||||
frame_pool: Arc<HierarchicalFramePool>,
|
||||
@ -19,18 +25,10 @@ impl CameraFactory {
|
||||
}
|
||||
|
||||
pub fn create_camera(&self, config: CameraConfig) -> Result<Box<dyn CameraInterface>> {
|
||||
match config.camera_type {
|
||||
match config.camera_type.clone() {
|
||||
CameraType::Production { device_id, backend } => {
|
||||
println!("🏭 Opening hardware camera: {} ({})", device_id, backend);
|
||||
let camera = ProductionCamera::new(
|
||||
device_id,
|
||||
backend,
|
||||
config.resolution,
|
||||
config.fps,
|
||||
self.frame_pool.clone(),
|
||||
)
|
||||
.context("Failed to create hardware camera")?;
|
||||
Ok(Box::new(camera))
|
||||
self.create_hardware_camera(config, device_id, backend)
|
||||
}
|
||||
CameraType::VideoFile {
|
||||
path,
|
||||
@ -51,6 +49,51 @@ impl CameraFactory {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create hardware camera using the appropriate backend
|
||||
#[cfg(feature = "opencv_camera")]
|
||||
fn create_hardware_camera(
|
||||
&self,
|
||||
config: CameraConfig,
|
||||
_device_id: String,
|
||||
_backend: String,
|
||||
) -> Result<Box<dyn CameraInterface>> {
|
||||
println!(" Using OpenCV backend");
|
||||
let camera = OpenCVCamera::new(config)
|
||||
.context("Failed to create OpenCV camera")?;
|
||||
Ok(Box::new(camera))
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "nokhwa_camera", not(feature = "opencv_camera")))]
|
||||
fn create_hardware_camera(
|
||||
&self,
|
||||
config: CameraConfig,
|
||||
device_id: String,
|
||||
backend: String,
|
||||
) -> Result<Box<dyn CameraInterface>> {
|
||||
println!(" Using nokhwa backend");
|
||||
let camera = ProductionCamera::new(
|
||||
device_id,
|
||||
backend,
|
||||
config.resolution,
|
||||
config.fps,
|
||||
self.frame_pool.clone(),
|
||||
)
|
||||
.context("Failed to create nokhwa camera")?;
|
||||
Ok(Box::new(camera))
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "opencv_camera", feature = "nokhwa_camera")))]
|
||||
fn create_hardware_camera(
|
||||
&self,
|
||||
_config: CameraConfig,
|
||||
_device_id: String,
|
||||
_backend: String,
|
||||
) -> Result<Box<dyn CameraInterface>> {
|
||||
anyhow::bail!(
|
||||
"No camera backend available. Build with --features opencv_camera or --features nokhwa_camera"
|
||||
)
|
||||
}
|
||||
|
||||
pub fn create_from_spec(&self, spec: &str) -> Result<Box<dyn CameraInterface>> {
|
||||
let config = self.parse_camera_spec(spec)?;
|
||||
self.create_camera(config)
|
||||
|
||||
@ -4,6 +4,7 @@ use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::memory::frame_data::{FrameData, FrameFormat};
|
||||
use std::path::PathBuf;
|
||||
@ -33,6 +34,11 @@ pub trait CameraInterface: Send + Sync {
|
||||
fn shutdown(
|
||||
&mut self,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>>;
|
||||
|
||||
/// Optional: supply a cancellation token for cooperative shutdown of blocking work
|
||||
fn set_cancellation_token(&mut self, _token: CancellationToken) {
|
||||
// Default no-op for cameras that don't need cancellation support
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a captured frame with metadata
|
||||
|
||||
@ -3,20 +3,35 @@ pub mod factory;
|
||||
/// This module provides a unified interface while keeping implementation details isolated
|
||||
// Core interfaces and types (always available)
|
||||
pub mod interface;
|
||||
pub mod production;
|
||||
pub mod pixel_convert;
|
||||
mod video_file;
|
||||
|
||||
// Camera backends - conditionally compiled
|
||||
#[cfg(feature = "opencv_camera")]
|
||||
pub mod opencv_camera;
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
pub mod production;
|
||||
|
||||
// Re-export core types for convenience
|
||||
pub use factory::{print_available_cameras, CameraFactory};
|
||||
pub use interface::{
|
||||
CameraConfig, CameraInterface, CameraMetadata, CameraType, CapturedFrame, FrameMetadata,
|
||||
};
|
||||
pub use production::{ProductionCamera, ProductionCameraCapabilities};
|
||||
pub use video_file::VideoFileCamera;
|
||||
|
||||
// Export the active camera backend
|
||||
#[cfg(feature = "opencv_camera")]
|
||||
pub use opencv_camera::OpenCVCamera;
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
pub use production::{ProductionCamera, ProductionCameraCapabilities};
|
||||
|
||||
use anyhow::Result;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::{sleep, timeout, Duration};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::core::events::{EventBus, FrameCapturedEvent};
|
||||
use crate::memory::frame_pool::HierarchicalFramePool;
|
||||
@ -77,12 +92,18 @@ impl CameraController {
|
||||
}
|
||||
|
||||
/// Start the camera capture loop
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
pub async fn run(
|
||||
&mut self,
|
||||
mut shutdown_rx: broadcast::Receiver<()>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Result<()> {
|
||||
println!("🎥 Starting camera controller...");
|
||||
|
||||
// Initialize the camera
|
||||
self.camera.initialize().await?;
|
||||
self.is_running = true;
|
||||
self.camera
|
||||
.set_cancellation_token(cancellation_token.clone());
|
||||
|
||||
let metadata = self.camera.get_metadata();
|
||||
println!(
|
||||
@ -103,45 +124,69 @@ impl CameraController {
|
||||
// Calculate frame timing
|
||||
let frame_duration = Duration::from_secs_f64(1.0 / metadata.target_fps);
|
||||
|
||||
// Main capture loop
|
||||
while self.is_running && self.camera.is_running() {
|
||||
// Main capture loop with shutdown signal support
|
||||
loop {
|
||||
if !self.is_running || !self.camera.is_running() {
|
||||
break;
|
||||
}
|
||||
|
||||
let start_time = tokio::time::Instant::now();
|
||||
|
||||
match self.camera.capture_frame().await {
|
||||
Ok(captured_frame) => {
|
||||
// Create event from captured frame
|
||||
let event = FrameCapturedEvent::new(
|
||||
captured_frame.frame_number,
|
||||
captured_frame.data.clone(),
|
||||
);
|
||||
|
||||
// Publish frame event
|
||||
if let Err(e) = self.event_bus.publish_frame_captured(event) {
|
||||
eprintln!("❌ Failed to publish frame event: {}", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Log progress periodically
|
||||
if captured_frame.frame_number % 30 == 0 {
|
||||
println!("📸 Captured {} frames", captured_frame.frame_number);
|
||||
}
|
||||
// Use select! to check for shutdown while capturing frames
|
||||
tokio::select! {
|
||||
_ = shutdown_rx.recv() => {
|
||||
println!("📷 Camera controller received shutdown signal");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!(
|
||||
"❌ Error capturing frame {}: {}",
|
||||
self.camera.frame_count(),
|
||||
e
|
||||
);
|
||||
_ = cancellation_token.cancelled() => {
|
||||
println!("📷 Camera controller received cancellation request");
|
||||
break;
|
||||
}
|
||||
frame_result = self.camera.capture_frame() => {
|
||||
match frame_result {
|
||||
Ok(captured_frame) => {
|
||||
// Create event from captured frame
|
||||
let event = FrameCapturedEvent::new(
|
||||
captured_frame.frame_number,
|
||||
captured_frame.data.clone(),
|
||||
);
|
||||
|
||||
// If this is a production camera and capture fails, we should probably exit
|
||||
if metadata.camera_type != "VideoFile" {
|
||||
eprintln!("❌ Hardware camera failure, stopping...");
|
||||
break;
|
||||
// Publish frame event
|
||||
if let Err(e) = self.event_bus.publish_frame_captured(event) {
|
||||
eprintln!("❌ Failed to publish frame event: {}", e);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Log progress periodically
|
||||
if captured_frame.frame_number % 30 == 0 {
|
||||
println!("📸 Captured {} frames", captured_frame.frame_number);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Treat cancellation as a clean exit
|
||||
let err_text = e.to_string();
|
||||
if err_text.contains("Capture cancelled") {
|
||||
println!("📷 Camera capture cancelled, stopping loop");
|
||||
break;
|
||||
} else {
|
||||
eprintln!(
|
||||
"❌ Error capturing frame {}: {}",
|
||||
self.camera.frame_count(),
|
||||
e
|
||||
);
|
||||
}
|
||||
|
||||
// If this is a production camera and capture fails, we should probably exit
|
||||
if metadata.camera_type != "VideoFile" {
|
||||
eprintln!("❌ Hardware camera failure, stopping...");
|
||||
break;
|
||||
}
|
||||
|
||||
// For video files, wait a bit and continue
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// For video files, wait a bit and continue
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,7 +198,13 @@ impl CameraController {
|
||||
}
|
||||
|
||||
// Shutdown camera
|
||||
self.camera.shutdown().await?;
|
||||
cancellation_token.cancel();
|
||||
match timeout(Duration::from_millis(500), self.camera.shutdown()).await {
|
||||
Ok(result) => result?,
|
||||
Err(_) => {
|
||||
eprintln!("⚠️ Camera shutdown timed out, force dropping camera handle");
|
||||
}
|
||||
}
|
||||
self.is_running = false;
|
||||
|
||||
println!("🎬 Camera controller stopped");
|
||||
|
||||
363
meteor-edge-client/src/camera/opencv_camera.rs
Normal file
363
meteor-edge-client/src/camera/opencv_camera.rs
Normal file
@ -0,0 +1,363 @@
|
||||
//! OpenCV-based camera implementation
|
||||
//!
|
||||
//! Uses opencv::videoio::VideoCapture for camera capture.
|
||||
//! Provides reliable resource management with explicit release() on shutdown.
|
||||
//!
|
||||
//! On macOS, camera authorization must happen on the main thread.
|
||||
//! Use `open_capture_on_main_thread()` before starting tokio runtime,
|
||||
//! then `set_pre_opened_capture()` to store it for later use.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use opencv::core::Mat;
|
||||
use opencv::prelude::*;
|
||||
use opencv::videoio::{
|
||||
VideoCapture, CAP_ANY, CAP_PROP_FRAME_WIDTH, CAP_PROP_FRAME_HEIGHT, CAP_PROP_FPS,
|
||||
};
|
||||
|
||||
// Thread-safe storage for pre-opened capture (macOS main thread requirement)
|
||||
static PRE_OPENED_CAPTURE: Mutex<Option<VideoCapture>> = Mutex::new(None);
|
||||
|
||||
/// Store a pre-opened capture for later use by OpenCVCamera::new()
|
||||
/// Call this from main thread BEFORE starting tokio runtime
|
||||
pub fn set_pre_opened_capture(capture: VideoCapture) {
|
||||
*PRE_OPENED_CAPTURE.lock().unwrap() = Some(capture);
|
||||
}
|
||||
|
||||
/// Take the pre-opened capture (can only be called once)
|
||||
fn take_pre_opened_capture() -> Option<VideoCapture> {
|
||||
PRE_OPENED_CAPTURE.lock().unwrap().take()
|
||||
}
|
||||
|
||||
use super::interface::{
|
||||
CameraConfig, CameraInterface, CameraMetadata, CameraType, CapturedFrame, FrameMetadata,
|
||||
};
|
||||
use crate::memory::frame_data::{FrameData, FrameFormat};
|
||||
|
||||
/// OpenCV-based camera implementation
|
||||
pub struct OpenCVCamera {
|
||||
capture: Option<VideoCapture>,
|
||||
config: CameraConfig,
|
||||
frame_count: u64,
|
||||
is_running: bool,
|
||||
actual_width: u32,
|
||||
actual_height: u32,
|
||||
actual_fps: f64,
|
||||
}
|
||||
|
||||
// Safety: OpenCVCamera is only accessed from one thread at a time through &mut self
|
||||
// The VideoCapture raw pointer is not shared across threads
|
||||
unsafe impl Send for OpenCVCamera {}
|
||||
unsafe impl Sync for OpenCVCamera {}
|
||||
|
||||
impl OpenCVCamera {
|
||||
/// Create a new OpenCV camera instance
|
||||
/// Will use pre-opened capture from main thread if available (macOS)
|
||||
pub fn new(config: CameraConfig) -> Result<Self> {
|
||||
// Check for pre-opened capture from main thread (macOS authorization)
|
||||
if let Some(capture) = take_pre_opened_capture() {
|
||||
println!("🎥 Using pre-opened camera from main thread (macOS)");
|
||||
return Self::with_capture(capture, config);
|
||||
}
|
||||
|
||||
println!("🎥 Creating OpenCV camera...");
|
||||
Ok(Self {
|
||||
capture: None,
|
||||
config,
|
||||
frame_count: 0,
|
||||
is_running: false,
|
||||
actual_width: 0,
|
||||
actual_height: 0,
|
||||
actual_fps: 0.0,
|
||||
})
|
||||
}
|
||||
|
||||
/// Open camera on main thread (required for macOS authorization)
|
||||
/// Call this BEFORE starting tokio runtime
|
||||
pub fn open_capture_on_main_thread(device_id: i32) -> Result<VideoCapture> {
|
||||
println!("🎥 Opening camera on main thread (macOS authorization)...");
|
||||
println!(" Device ID: {}", device_id);
|
||||
|
||||
let cam = VideoCapture::new(device_id, CAP_ANY)
|
||||
.context("Failed to create VideoCapture")?;
|
||||
|
||||
if !cam.is_opened()? {
|
||||
anyhow::bail!("Failed to open camera device {}", device_id);
|
||||
}
|
||||
|
||||
println!("✅ Camera opened successfully on main thread");
|
||||
Ok(cam)
|
||||
}
|
||||
|
||||
/// Create camera with pre-opened capture handle
|
||||
pub fn with_capture(capture: VideoCapture, config: CameraConfig) -> Result<Self> {
|
||||
// Get actual resolution from the opened capture
|
||||
let actual_width = capture.get(CAP_PROP_FRAME_WIDTH)? as u32;
|
||||
let actual_height = capture.get(CAP_PROP_FRAME_HEIGHT)? as u32;
|
||||
let actual_fps = capture.get(CAP_PROP_FPS)?;
|
||||
|
||||
println!("🎥 Creating OpenCV camera with pre-opened capture...");
|
||||
println!(" Resolution: {}x{} @ {:.1} FPS", actual_width, actual_height, actual_fps);
|
||||
|
||||
Ok(Self {
|
||||
capture: Some(capture),
|
||||
config,
|
||||
frame_count: 0,
|
||||
is_running: false,
|
||||
actual_width,
|
||||
actual_height,
|
||||
actual_fps,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get device ID from config
|
||||
fn get_device_id(&self) -> i32 {
|
||||
match &self.config.camera_type {
|
||||
CameraType::Production { device_id, .. } => {
|
||||
device_id.parse().unwrap_or(0)
|
||||
}
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert OpenCV Mat to FrameData
|
||||
fn mat_to_frame_data(&self, mat: &Mat) -> Result<FrameData> {
|
||||
let rows = mat.rows();
|
||||
let cols = mat.cols();
|
||||
let channels = mat.channels();
|
||||
|
||||
if rows <= 0 || cols <= 0 {
|
||||
anyhow::bail!("Invalid frame dimensions: {}x{}", cols, rows);
|
||||
}
|
||||
|
||||
let width = cols as u32;
|
||||
let height = rows as u32;
|
||||
|
||||
// Get raw data from Mat
|
||||
let data_ptr = mat.data();
|
||||
let data_len = (rows * cols * channels) as usize;
|
||||
|
||||
if data_ptr.is_null() {
|
||||
anyhow::bail!("Mat data pointer is null");
|
||||
}
|
||||
|
||||
// Copy data from Mat (OpenCV uses BGR format by default)
|
||||
let bgr_data = unsafe { std::slice::from_raw_parts(data_ptr, data_len) };
|
||||
|
||||
// Convert BGR to grayscale for meteor detection
|
||||
let grayscale_data = if channels == 3 {
|
||||
bgr_to_grayscale(bgr_data, width as usize, height as usize)
|
||||
} else if channels == 1 {
|
||||
bgr_data.to_vec()
|
||||
} else {
|
||||
// For other formats, just use first channel or convert
|
||||
bgr_data.iter().step_by(channels as usize).copied().collect()
|
||||
};
|
||||
|
||||
Ok(FrameData::new(
|
||||
grayscale_data,
|
||||
width,
|
||||
height,
|
||||
FrameFormat::Grayscale,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert BGR to grayscale using standard luminance formula
|
||||
fn bgr_to_grayscale(bgr_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
let mut grayscale = Vec::with_capacity(width * height);
|
||||
|
||||
for pixel in bgr_data.chunks_exact(3) {
|
||||
let b = pixel[0] as f32;
|
||||
let g = pixel[1] as f32;
|
||||
let r = pixel[2] as f32;
|
||||
// ITU-R BT.601 luminance formula
|
||||
let gray = (0.299 * r + 0.587 * g + 0.114 * b) as u8;
|
||||
grayscale.push(gray);
|
||||
}
|
||||
|
||||
grayscale
|
||||
}
|
||||
|
||||
impl CameraInterface for OpenCVCamera {
|
||||
fn initialize(
|
||||
&mut self,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>> {
|
||||
Box::pin(async move {
|
||||
// Check if capture was pre-opened on main thread (macOS authorization)
|
||||
if self.capture.is_some() {
|
||||
println!("🎥 OpenCV camera already opened (main thread initialization)");
|
||||
println!(" Resolution: {}x{} @ {:.1} FPS",
|
||||
self.actual_width, self.actual_height, self.actual_fps);
|
||||
|
||||
// Configure resolution and FPS
|
||||
if let Some(cam) = self.capture.as_mut() {
|
||||
let _ = cam.set(CAP_PROP_FRAME_WIDTH, self.config.resolution.0 as f64);
|
||||
let _ = cam.set(CAP_PROP_FRAME_HEIGHT, self.config.resolution.1 as f64);
|
||||
let _ = cam.set(CAP_PROP_FPS, self.config.fps);
|
||||
|
||||
// Read back actual values
|
||||
self.actual_width = cam.get(CAP_PROP_FRAME_WIDTH)? as u32;
|
||||
self.actual_height = cam.get(CAP_PROP_FRAME_HEIGHT)? as u32;
|
||||
self.actual_fps = cam.get(CAP_PROP_FPS)?;
|
||||
}
|
||||
|
||||
println!("✅ OpenCV camera ready");
|
||||
println!(" Actual resolution: {}x{}", self.actual_width, self.actual_height);
|
||||
println!(" Actual FPS: {:.1}", self.actual_fps);
|
||||
|
||||
self.is_running = true;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Fallback: open camera in async context (works on Linux/Windows)
|
||||
let device_id = self.get_device_id();
|
||||
println!("🎥 Initializing OpenCV camera...");
|
||||
println!(" Device ID: {}", device_id);
|
||||
println!(" Target resolution: {}x{}", self.config.resolution.0, self.config.resolution.1);
|
||||
println!(" Target FPS: {}", self.config.fps);
|
||||
|
||||
// Create VideoCapture
|
||||
// Note: VideoCapture::new is blocking, but we're in an async context
|
||||
// On macOS, this may fail if not called from main thread
|
||||
let mut cam = VideoCapture::new(device_id, CAP_ANY)
|
||||
.context("Failed to create VideoCapture")?;
|
||||
|
||||
if !cam.is_opened()? {
|
||||
anyhow::bail!("Failed to open camera device {}", device_id);
|
||||
}
|
||||
|
||||
// Set resolution and FPS
|
||||
let _ = cam.set(CAP_PROP_FRAME_WIDTH, self.config.resolution.0 as f64);
|
||||
let _ = cam.set(CAP_PROP_FRAME_HEIGHT, self.config.resolution.1 as f64);
|
||||
let _ = cam.set(CAP_PROP_FPS, self.config.fps);
|
||||
|
||||
// Read back actual values
|
||||
self.actual_width = cam.get(CAP_PROP_FRAME_WIDTH)? as u32;
|
||||
self.actual_height = cam.get(CAP_PROP_FRAME_HEIGHT)? as u32;
|
||||
self.actual_fps = cam.get(CAP_PROP_FPS)?;
|
||||
|
||||
println!("✅ OpenCV camera initialized successfully");
|
||||
println!(" Actual resolution: {}x{}", self.actual_width, self.actual_height);
|
||||
println!(" Actual FPS: {:.1}", self.actual_fps);
|
||||
|
||||
self.capture = Some(cam);
|
||||
self.is_running = true;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn capture_frame(
|
||||
&mut self,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = Result<CapturedFrame>> + Send + '_>> {
|
||||
Box::pin(async move {
|
||||
let cam = self.capture.as_mut()
|
||||
.ok_or_else(|| anyhow::anyhow!("Camera not initialized"))?;
|
||||
|
||||
let mut mat = Mat::default();
|
||||
|
||||
// Read frame (blocking call)
|
||||
let success = cam.read(&mut mat)?;
|
||||
|
||||
if !success || mat.empty() {
|
||||
anyhow::bail!("Failed to capture frame - empty or read failed");
|
||||
}
|
||||
|
||||
self.frame_count += 1;
|
||||
|
||||
// Convert Mat to FrameData
|
||||
let frame_data = self.mat_to_frame_data(&mat)?;
|
||||
|
||||
// Log progress periodically
|
||||
if self.frame_count == 1 || self.frame_count % 30 == 0 {
|
||||
println!("📸 Frame {} captured ({}x{})",
|
||||
self.frame_count, frame_data.width, frame_data.height);
|
||||
}
|
||||
|
||||
Ok(CapturedFrame::new(
|
||||
Arc::new(frame_data),
|
||||
self.frame_count,
|
||||
Utc::now(),
|
||||
FrameMetadata::default(),
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
fn get_metadata(&self) -> CameraMetadata {
|
||||
CameraMetadata {
|
||||
camera_id: self.get_device_id().to_string(),
|
||||
camera_type: "OpenCV".to_string(),
|
||||
supported_formats: vec![FrameFormat::Grayscale, FrameFormat::RGB888],
|
||||
max_resolution: (1920, 1080),
|
||||
current_resolution: (self.actual_width, self.actual_height),
|
||||
target_fps: self.actual_fps,
|
||||
is_real_time: true,
|
||||
total_frames: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_running(&self) -> bool {
|
||||
self.is_running
|
||||
}
|
||||
|
||||
fn frame_count(&self) -> u64 {
|
||||
self.frame_count
|
||||
}
|
||||
|
||||
fn shutdown(
|
||||
&mut self,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>> {
|
||||
Box::pin(async move {
|
||||
println!("🛑 Shutting down OpenCV camera...");
|
||||
|
||||
if let Some(mut cam) = self.capture.take() {
|
||||
// Explicitly release the camera resource
|
||||
// This is the key advantage over nokhwa - reliable resource cleanup
|
||||
cam.release().context("Failed to release VideoCapture")?;
|
||||
println!("✅ OpenCV camera released successfully");
|
||||
}
|
||||
|
||||
self.is_running = false;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for OpenCVCamera {
|
||||
fn drop(&mut self) {
|
||||
// Ensure camera is released even if shutdown wasn't called
|
||||
if let Some(mut cam) = self.capture.take() {
|
||||
let _ = cam.release();
|
||||
println!("🗑️ OpenCV camera dropped and released");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_bgr_to_grayscale() {
|
||||
// Test BGR to grayscale conversion
|
||||
let bgr = vec![
|
||||
0, 0, 255, // Pure red -> should be ~76
|
||||
0, 255, 0, // Pure green -> should be ~150
|
||||
255, 0, 0, // Pure blue -> should be ~29
|
||||
255, 255, 255, // White -> should be 255
|
||||
];
|
||||
|
||||
let gray = bgr_to_grayscale(&bgr, 4, 1);
|
||||
|
||||
assert_eq!(gray.len(), 4);
|
||||
// Check approximate values (luminance formula)
|
||||
assert!(gray[0] > 70 && gray[0] < 80, "Red should be ~76, got {}", gray[0]);
|
||||
assert!(gray[1] > 145 && gray[1] < 155, "Green should be ~150, got {}", gray[1]);
|
||||
assert!(gray[2] > 25 && gray[2] < 35, "Blue should be ~29, got {}", gray[2]);
|
||||
assert_eq!(gray[3], 255, "White should be 255");
|
||||
}
|
||||
}
|
||||
244
meteor-edge-client/src/camera/pixel_convert.rs
Normal file
244
meteor-edge-client/src/camera/pixel_convert.rs
Normal file
@ -0,0 +1,244 @@
|
||||
//! Pixel format conversion utilities for camera capture
|
||||
//!
|
||||
//! This module provides efficient conversion from various camera pixel formats
|
||||
//! to grayscale, which is the primary format used by the Vida detection algorithm.
|
||||
|
||||
/// Convert RGB888 (24-bit) to grayscale using ITU-R BT.601 coefficients
|
||||
/// Y = 0.299*R + 0.587*G + 0.114*B
|
||||
pub fn rgb888_to_grayscale(rgb_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
let expected_size = width * height * 3;
|
||||
if rgb_data.len() < expected_size {
|
||||
return vec![0u8; width * height];
|
||||
}
|
||||
|
||||
let mut grayscale = Vec::with_capacity(width * height);
|
||||
|
||||
for pixel in rgb_data.chunks_exact(3) {
|
||||
let r = pixel[0] as u32;
|
||||
let g = pixel[1] as u32;
|
||||
let b = pixel[2] as u32;
|
||||
// BT.601 coefficients: 0.299, 0.587, 0.114
|
||||
// Using fixed-point: (77*R + 150*G + 29*B) >> 8
|
||||
let y = ((77 * r + 150 * g + 29 * b) >> 8) as u8;
|
||||
grayscale.push(y);
|
||||
}
|
||||
|
||||
grayscale
|
||||
}
|
||||
|
||||
/// Convert YUYV (YUV 4:2:2 packed) to grayscale
|
||||
/// YUYV format: Y0 U0 Y1 V0 (4 bytes for 2 pixels)
|
||||
/// We only need the Y channel for grayscale
|
||||
pub fn yuyv_to_grayscale(yuyv_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
let expected_size = width * height * 2;
|
||||
if yuyv_data.len() < expected_size {
|
||||
return vec![0u8; width * height];
|
||||
}
|
||||
|
||||
let mut grayscale = Vec::with_capacity(width * height);
|
||||
|
||||
// YUYV: [Y0, U, Y1, V, Y2, U, Y3, V, ...]
|
||||
// Extract Y values at positions 0, 2, 4, 6, ...
|
||||
for chunk in yuyv_data.chunks_exact(4) {
|
||||
grayscale.push(chunk[0]); // Y0
|
||||
grayscale.push(chunk[2]); // Y1
|
||||
}
|
||||
|
||||
grayscale
|
||||
}
|
||||
|
||||
/// Convert NV12 (YUV 4:2:0 semi-planar) to grayscale
|
||||
/// NV12 format: Y plane followed by interleaved UV plane
|
||||
/// Y plane size: width * height
|
||||
/// UV plane size: width * height / 2
|
||||
pub fn nv12_to_grayscale(nv12_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
let y_size = width * height;
|
||||
if nv12_data.len() < y_size {
|
||||
return vec![0u8; y_size];
|
||||
}
|
||||
|
||||
// NV12: Y plane is already grayscale, just copy it
|
||||
nv12_data[..y_size].to_vec()
|
||||
}
|
||||
|
||||
/// Convert NV21 (YUV 4:2:0 semi-planar, VU order) to grayscale
|
||||
/// Same as NV12 for grayscale conversion since we only need Y plane
|
||||
pub fn nv21_to_grayscale(nv21_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
nv12_to_grayscale(nv21_data, width, height)
|
||||
}
|
||||
|
||||
/// Convert I420 (YUV 4:2:0 planar) to grayscale
|
||||
/// I420 format: Y plane, U plane, V plane (all separate)
|
||||
pub fn i420_to_grayscale(i420_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
nv12_to_grayscale(i420_data, width, height)
|
||||
}
|
||||
|
||||
/// Convert BGR888 (24-bit, OpenCV format) to grayscale
|
||||
pub fn bgr888_to_grayscale(bgr_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
let expected_size = width * height * 3;
|
||||
if bgr_data.len() < expected_size {
|
||||
return vec![0u8; width * height];
|
||||
}
|
||||
|
||||
let mut grayscale = Vec::with_capacity(width * height);
|
||||
|
||||
for pixel in bgr_data.chunks_exact(3) {
|
||||
let b = pixel[0] as u32;
|
||||
let g = pixel[1] as u32;
|
||||
let r = pixel[2] as u32;
|
||||
let y = ((77 * r + 150 * g + 29 * b) >> 8) as u8;
|
||||
grayscale.push(y);
|
||||
}
|
||||
|
||||
grayscale
|
||||
}
|
||||
|
||||
/// Convert RGBA8888 (32-bit with alpha) to grayscale
|
||||
pub fn rgba8888_to_grayscale(rgba_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
let expected_size = width * height * 4;
|
||||
if rgba_data.len() < expected_size {
|
||||
return vec![0u8; width * height];
|
||||
}
|
||||
|
||||
let mut grayscale = Vec::with_capacity(width * height);
|
||||
|
||||
for pixel in rgba_data.chunks_exact(4) {
|
||||
let r = pixel[0] as u32;
|
||||
let g = pixel[1] as u32;
|
||||
let b = pixel[2] as u32;
|
||||
// Alpha channel (pixel[3]) is ignored
|
||||
let y = ((77 * r + 150 * g + 29 * b) >> 8) as u8;
|
||||
grayscale.push(y);
|
||||
}
|
||||
|
||||
grayscale
|
||||
}
|
||||
|
||||
/// Decode MJPEG frame to grayscale
|
||||
/// Note: This is a simplified implementation that requires the `image` crate
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
pub fn mjpeg_to_grayscale(jpeg_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
use image::codecs::jpeg::JpegDecoder;
|
||||
use image::{DynamicImage, ImageDecoder};
|
||||
use std::io::Cursor;
|
||||
|
||||
let cursor = Cursor::new(jpeg_data);
|
||||
match JpegDecoder::new(cursor) {
|
||||
Ok(decoder) => {
|
||||
let (w, h) = decoder.dimensions();
|
||||
if let Ok(img) = DynamicImage::from_decoder(decoder) {
|
||||
let gray = img.to_luma8();
|
||||
return gray.into_raw();
|
||||
}
|
||||
vec![0u8; (w * h) as usize]
|
||||
}
|
||||
Err(_) => {
|
||||
// Return black frame on decode error
|
||||
vec![0u8; width * height]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "nokhwa_camera"))]
|
||||
pub fn mjpeg_to_grayscale(_jpeg_data: &[u8], width: usize, height: usize) -> Vec<u8> {
|
||||
// Without image crate, return placeholder
|
||||
vec![128u8; width * height]
|
||||
}
|
||||
|
||||
/// Pixel format enum for conversion dispatch
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum PixelFormat {
|
||||
RGB888,
|
||||
BGR888,
|
||||
RGBA8888,
|
||||
YUYV,
|
||||
NV12,
|
||||
NV21,
|
||||
I420,
|
||||
MJPEG,
|
||||
Grayscale,
|
||||
}
|
||||
|
||||
/// Convert any supported pixel format to grayscale
|
||||
pub fn to_grayscale(
|
||||
data: &[u8],
|
||||
width: usize,
|
||||
height: usize,
|
||||
format: PixelFormat,
|
||||
) -> Vec<u8> {
|
||||
match format {
|
||||
PixelFormat::RGB888 => rgb888_to_grayscale(data, width, height),
|
||||
PixelFormat::BGR888 => bgr888_to_grayscale(data, width, height),
|
||||
PixelFormat::RGBA8888 => rgba8888_to_grayscale(data, width, height),
|
||||
PixelFormat::YUYV => yuyv_to_grayscale(data, width, height),
|
||||
PixelFormat::NV12 => nv12_to_grayscale(data, width, height),
|
||||
PixelFormat::NV21 => nv21_to_grayscale(data, width, height),
|
||||
PixelFormat::I420 => i420_to_grayscale(data, width, height),
|
||||
PixelFormat::MJPEG => mjpeg_to_grayscale(data, width, height),
|
||||
PixelFormat::Grayscale => data.to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_rgb_to_grayscale() {
|
||||
// White pixel (255, 255, 255) should become ~255
|
||||
let rgb = vec![255u8, 255, 255];
|
||||
let gray = rgb888_to_grayscale(&rgb, 1, 1);
|
||||
assert_eq!(gray.len(), 1);
|
||||
assert!(gray[0] >= 254); // Allow for rounding
|
||||
|
||||
// Black pixel (0, 0, 0) should become 0
|
||||
let rgb = vec![0u8, 0, 0];
|
||||
let gray = rgb888_to_grayscale(&rgb, 1, 1);
|
||||
assert_eq!(gray[0], 0);
|
||||
|
||||
// Pure red (255, 0, 0) should become ~77
|
||||
let rgb = vec![255u8, 0, 0];
|
||||
let gray = rgb888_to_grayscale(&rgb, 1, 1);
|
||||
assert!(gray[0] >= 75 && gray[0] <= 78);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_yuyv_to_grayscale() {
|
||||
// YUYV: Y0=100, U=128, Y1=200, V=128
|
||||
let yuyv = vec![100u8, 128, 200, 128];
|
||||
let gray = yuyv_to_grayscale(&yuyv, 2, 1);
|
||||
assert_eq!(gray.len(), 2);
|
||||
assert_eq!(gray[0], 100);
|
||||
assert_eq!(gray[1], 200);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nv12_to_grayscale() {
|
||||
// NV12: Y plane followed by UV
|
||||
let y_data = vec![50u8, 100, 150, 200];
|
||||
let uv_data = vec![128u8, 128]; // UV for 2x2 image
|
||||
let mut nv12 = y_data.clone();
|
||||
nv12.extend(uv_data);
|
||||
|
||||
let gray = nv12_to_grayscale(&nv12, 2, 2);
|
||||
assert_eq!(gray, y_data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_grayscale_dispatch() {
|
||||
let rgb = vec![128u8, 128, 128];
|
||||
let gray = to_grayscale(&rgb, 1, 1, PixelFormat::RGB888);
|
||||
assert_eq!(gray.len(), 1);
|
||||
// Mid-gray should stay mid-gray
|
||||
assert!(gray[0] >= 126 && gray[0] <= 130);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_input_handling() {
|
||||
let empty: Vec<u8> = vec![];
|
||||
let gray = rgb888_to_grayscale(&empty, 10, 10);
|
||||
// Should return black frame of expected size
|
||||
assert_eq!(gray.len(), 100);
|
||||
assert!(gray.iter().all(|&v| v == 0));
|
||||
}
|
||||
}
|
||||
@ -1,35 +1,205 @@
|
||||
/// Production camera implementation without any simulation dependencies
|
||||
/// This module contains only real hardware camera logic
|
||||
//! Production camera implementation with nokhwa cross-platform support
|
||||
//!
|
||||
//! This module provides real hardware camera capture using the nokhwa library,
|
||||
//! which supports V4L2 (Linux), AVFoundation (macOS), and DirectShow (Windows).
|
||||
//!
|
||||
//! Uses CallbackCamera for proper threaded frame capture on all platforms.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
// Removed async_trait since we're using boxed futures now
|
||||
use chrono::Utc;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use std::task::{Context as TaskContext, Poll};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::interface::{CameraInterface, CameraMetadata, CapturedFrame, FrameMetadata};
|
||||
use crate::memory::frame_data::{FrameData, FrameFormat};
|
||||
use crate::memory::frame_pool::HierarchicalFramePool;
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
use super::pixel_convert::{self, PixelFormat};
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
use nokhwa::{
|
||||
pixel_format::RgbFormat,
|
||||
utils::{CameraIndex, RequestedFormat, RequestedFormatType},
|
||||
threaded::CallbackCamera,
|
||||
};
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
use std::sync::Mutex;
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
struct BlockingCapture {
|
||||
handle: Option<tokio::task::JoinHandle<Result<(Vec<u8>, u32, u32)>>>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
impl BlockingCapture {
|
||||
fn new(
|
||||
camera: Arc<Mutex<CallbackCamera>>,
|
||||
cancel_token: Option<CancellationToken>,
|
||||
) -> Self {
|
||||
let handle = tokio::task::spawn_blocking(move || -> Result<(Vec<u8>, u32, u32)> {
|
||||
if let Some(token) = cancel_token.as_ref() {
|
||||
if token.is_cancelled() {
|
||||
return Err(anyhow::anyhow!("Capture cancelled"));
|
||||
}
|
||||
}
|
||||
|
||||
let mut camera_guard = camera
|
||||
.lock()
|
||||
.map_err(|e| anyhow::anyhow!("Failed to lock camera: {}", e))?;
|
||||
|
||||
// Use last_frame with retry (non-blocking approach)
|
||||
let mut buffer = None;
|
||||
let max_retries = 50; // 50 * 20ms = 1 second max wait
|
||||
for attempt in 0..max_retries {
|
||||
if let Some(token) = cancel_token.as_ref() {
|
||||
if token.is_cancelled() {
|
||||
return Err(anyhow::anyhow!("Capture cancelled"));
|
||||
}
|
||||
}
|
||||
|
||||
match camera_guard.last_frame() {
|
||||
Ok(frame) => {
|
||||
buffer = Some(frame);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt == max_retries - 1 {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to get frame after {} retries: {}",
|
||||
max_retries,
|
||||
e
|
||||
));
|
||||
}
|
||||
// Brief sleep between retries
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
}
|
||||
}
|
||||
}
|
||||
let buffer = buffer.context("No frame available from camera")?;
|
||||
|
||||
let width = buffer.resolution().width();
|
||||
let height = buffer.resolution().height();
|
||||
|
||||
if let Some(token) = cancel_token.as_ref() {
|
||||
if token.is_cancelled() {
|
||||
return Err(anyhow::anyhow!("Capture cancelled"));
|
||||
}
|
||||
}
|
||||
|
||||
// Warn if frame appears empty (likely permission issue)
|
||||
if width == 0 || height == 0 || buffer.buffer().is_empty() {
|
||||
static WARNED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
|
||||
if !WARNED.swap(true, std::sync::atomic::Ordering::Relaxed) {
|
||||
eprintln!("⚠️ Warning: Camera frames appear empty. This may be due to:");
|
||||
eprintln!(" - macOS camera permission not granted to terminal app");
|
||||
eprintln!(" - Go to System Preferences > Privacy & Security > Camera");
|
||||
eprintln!(" - Grant access to Terminal or iTerm2");
|
||||
}
|
||||
}
|
||||
|
||||
// Get raw bytes and convert to grayscale
|
||||
let raw_data = buffer.buffer();
|
||||
|
||||
// Determine pixel format based on buffer format
|
||||
let pixel_format = match buffer.source_frame_format() {
|
||||
nokhwa::utils::FrameFormat::MJPEG => PixelFormat::MJPEG,
|
||||
nokhwa::utils::FrameFormat::YUYV => PixelFormat::YUYV,
|
||||
nokhwa::utils::FrameFormat::NV12 => PixelFormat::NV12,
|
||||
nokhwa::utils::FrameFormat::RAWRGB => PixelFormat::RGB888,
|
||||
_ => PixelFormat::RGB888, // Default to RGB
|
||||
};
|
||||
|
||||
// Try to decode to RGB first, then convert to grayscale
|
||||
let grayscale_data = if let Ok(rgb_buffer) = buffer.decode_image::<RgbFormat>() {
|
||||
// Decoded successfully to RGB
|
||||
pixel_convert::rgb888_to_grayscale(
|
||||
rgb_buffer.as_raw(),
|
||||
width as usize,
|
||||
height as usize,
|
||||
)
|
||||
} else {
|
||||
// Fallback to direct conversion
|
||||
pixel_convert::to_grayscale(
|
||||
raw_data,
|
||||
width as usize,
|
||||
height as usize,
|
||||
pixel_format,
|
||||
)
|
||||
};
|
||||
|
||||
Ok((grayscale_data, width, height))
|
||||
});
|
||||
|
||||
Self {
|
||||
handle: Some(handle),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
impl Drop for BlockingCapture {
|
||||
fn drop(&mut self) {
|
||||
if let Some(handle) = self.handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
impl std::future::Future for BlockingCapture {
|
||||
type Output = Result<(Vec<u8>, u32, u32)>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
|
||||
let handle = self.handle.as_mut().expect("join handle missing");
|
||||
match Pin::new(handle).poll(cx) {
|
||||
Poll::Ready(res) => {
|
||||
self.handle = None;
|
||||
match res {
|
||||
Ok(inner) => Poll::Ready(inner),
|
||||
Err(e) => Poll::Ready(Err(anyhow::anyhow!("Capture task failed: {}", e))),
|
||||
}
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Production camera implementation for real hardware
|
||||
/// Uses CallbackCamera for thread-safe frame capture
|
||||
pub struct ProductionCamera {
|
||||
/// Device identifier (e.g., "/dev/video0" or device index)
|
||||
/// Device identifier (e.g., "0" for first camera, or "/dev/video0")
|
||||
device_id: String,
|
||||
/// Backend system (v4l2, directshow, etc.)
|
||||
/// Backend system (nokhwa, v4l2, etc.)
|
||||
#[allow(dead_code)]
|
||||
backend: String,
|
||||
/// Current resolution
|
||||
resolution: (u32, u32),
|
||||
/// Target frame rate
|
||||
target_fps: f64,
|
||||
/// Frame pool for memory management
|
||||
#[allow(dead_code)]
|
||||
frame_pool: Arc<HierarchicalFramePool>,
|
||||
/// Current frame counter
|
||||
frame_counter: u64,
|
||||
/// Whether the camera is currently running
|
||||
is_running: bool,
|
||||
/// Cancellation token for cooperative shutdown
|
||||
cancellation_token: Option<CancellationToken>,
|
||||
/// Camera metadata
|
||||
metadata: CameraMetadata,
|
||||
/// Threaded camera instance (when feature enabled)
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
callback_camera: Option<Arc<Mutex<CallbackCamera>>>,
|
||||
}
|
||||
|
||||
// ProductionCamera is Send + Sync because CallbackCamera is wrapped in Arc<Mutex<>>
|
||||
unsafe impl Send for ProductionCamera {}
|
||||
unsafe impl Sync for ProductionCamera {}
|
||||
|
||||
impl ProductionCamera {
|
||||
/// Create a new production camera instance
|
||||
pub fn new(
|
||||
@ -42,7 +212,12 @@ impl ProductionCamera {
|
||||
let metadata = CameraMetadata {
|
||||
camera_id: device_id.clone(),
|
||||
camera_type: format!("Production-{}", backend),
|
||||
supported_formats: vec![FrameFormat::JPEG, FrameFormat::RGB888, FrameFormat::YUV420],
|
||||
supported_formats: vec![
|
||||
FrameFormat::Grayscale,
|
||||
FrameFormat::RGB888,
|
||||
FrameFormat::YUV420,
|
||||
FrameFormat::JPEG,
|
||||
],
|
||||
max_resolution: (1920, 1080),
|
||||
current_resolution: resolution,
|
||||
target_fps,
|
||||
@ -58,45 +233,34 @@ impl ProductionCamera {
|
||||
frame_pool,
|
||||
frame_counter: 0,
|
||||
is_running: false,
|
||||
cancellation_token: None,
|
||||
metadata,
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
callback_camera: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize camera hardware (placeholder for real implementation)
|
||||
async fn initialize_hardware(&mut self) -> Result<()> {
|
||||
println!("🎥 Initializing production camera hardware...");
|
||||
println!(" Device ID: {}", self.device_id);
|
||||
println!(" Backend: {}", self.backend);
|
||||
println!(" Resolution: {}x{}", self.resolution.0, self.resolution.1);
|
||||
println!(" Target FPS: {}", self.target_fps);
|
||||
/// Parse device ID to camera index
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
fn parse_camera_index(device_id: &str) -> Result<CameraIndex> {
|
||||
// Support various device ID formats:
|
||||
// - "0", "1", etc. -> index
|
||||
// - "/dev/video0" -> extract index
|
||||
// - "device:0" -> extract index
|
||||
|
||||
// TODO: Replace with actual camera initialization
|
||||
// This is where you would:
|
||||
// 1. Open camera device
|
||||
// 2. Set resolution and format
|
||||
// 3. Configure frame rate
|
||||
// 4. Allocate capture buffers
|
||||
// 5. Validate camera capabilities
|
||||
let index_str = if device_id.starts_with("/dev/video") {
|
||||
device_id.trim_start_matches("/dev/video")
|
||||
} else if device_id.starts_with("device:") {
|
||||
device_id.trim_start_matches("device:")
|
||||
} else {
|
||||
device_id
|
||||
};
|
||||
|
||||
// For now, simulate hardware check
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
let index: u32 = index_str
|
||||
.parse()
|
||||
.context("Failed to parse camera device index")?;
|
||||
|
||||
println!("✅ Production camera hardware initialized");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Capture frame from real hardware (placeholder implementation)
|
||||
async fn capture_hardware_frame(&mut self) -> Result<CapturedFrame> {
|
||||
// TODO: Replace with actual hardware frame capture
|
||||
// This is where you would:
|
||||
// 1. Read frame from camera device
|
||||
// 2. Handle different pixel formats
|
||||
// 3. Apply necessary color space conversions
|
||||
// 4. Handle camera-specific metadata
|
||||
// 5. Implement proper error handling
|
||||
|
||||
// For now, return an error indicating not implemented
|
||||
anyhow::bail!("Production camera capture not yet implemented. This is a placeholder for real hardware integration.")
|
||||
Ok(CameraIndex::Index(index))
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,9 +269,137 @@ impl CameraInterface for ProductionCamera {
|
||||
&mut self,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + '_>> {
|
||||
Box::pin(async move {
|
||||
self.initialize_hardware()
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
{
|
||||
println!("🎥 Initializing production camera with nokhwa (CallbackCamera)...");
|
||||
println!(" Device ID: {}", self.device_id);
|
||||
println!(" Resolution: {}x{}", self.resolution.0, self.resolution.1);
|
||||
println!(" Target FPS: {}", self.target_fps);
|
||||
|
||||
let device_id = self.device_id.clone();
|
||||
|
||||
// Initialize camera in blocking task
|
||||
let camera_result = tokio::task::spawn_blocking(move || -> Result<(CallbackCamera, String, String, u32, u32)> {
|
||||
// Initialize nokhwa (required on macOS for permissions)
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
static INITIALIZED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
if !INITIALIZED.swap(true, Ordering::SeqCst) {
|
||||
println!(" Initializing nokhwa for macOS...");
|
||||
let permission_result = Arc::new(std::sync::Mutex::new(None::<bool>));
|
||||
let permission_clone = permission_result.clone();
|
||||
|
||||
nokhwa::nokhwa_initialize(move |granted| {
|
||||
*permission_clone.lock().unwrap() = Some(granted);
|
||||
if granted {
|
||||
println!(" ✅ Camera permission granted");
|
||||
} else {
|
||||
println!(" ⚠️ Camera permission denied - check System Preferences > Privacy > Camera");
|
||||
}
|
||||
});
|
||||
|
||||
// Give macOS time to process permissions (up to 2 seconds)
|
||||
for _ in 0..20 {
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
if permission_result.lock().unwrap().is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let camera_index = Self::parse_camera_index(&device_id)
|
||||
.context("Failed to parse camera index")?;
|
||||
|
||||
// Use AbsoluteHighestFrameRate for best compatibility
|
||||
let requested = RequestedFormat::new::<RgbFormat>(
|
||||
RequestedFormatType::AbsoluteHighestFrameRate
|
||||
);
|
||||
|
||||
println!(" Creating CallbackCamera...");
|
||||
|
||||
// Create callback camera with retry logic (max 3 attempts)
|
||||
let mut last_error = None;
|
||||
let mut callback_camera_opt = None;
|
||||
|
||||
for attempt in 1..=3 {
|
||||
match CallbackCamera::new(
|
||||
camera_index.clone(),
|
||||
requested.clone(),
|
||||
|_buffer| {
|
||||
// Empty callback - we use poll_frame() instead
|
||||
}
|
||||
) {
|
||||
Ok(cam) => {
|
||||
if attempt > 1 {
|
||||
println!(" ✅ Camera initialized on attempt {}", attempt);
|
||||
}
|
||||
callback_camera_opt = Some(cam);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
println!(" ⚠️ Camera init attempt {}/3 failed: {}", attempt, e);
|
||||
last_error = Some(e);
|
||||
if attempt < 3 {
|
||||
println!(" Retrying in 500ms...");
|
||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let callback_camera = callback_camera_opt
|
||||
.ok_or_else(|| anyhow::anyhow!("Camera initialization failed: {:?}", last_error))
|
||||
.context("Failed to create callback camera after 3 attempts")?;
|
||||
|
||||
let info = callback_camera.info().clone();
|
||||
let res = callback_camera.resolution()
|
||||
.context("Failed to get camera resolution")?;
|
||||
|
||||
Ok((
|
||||
callback_camera,
|
||||
info.human_name().to_string(),
|
||||
info.description().to_string(),
|
||||
res.width(),
|
||||
res.height(),
|
||||
))
|
||||
})
|
||||
.await
|
||||
.context("Failed to initialize production camera hardware")?;
|
||||
.context("Camera initialization task panicked")?
|
||||
.context("Failed to initialize production camera")?;
|
||||
|
||||
let (mut callback_camera, camera_name, camera_desc, width, height) = camera_result;
|
||||
|
||||
println!(" Camera name: {}", camera_name);
|
||||
println!(" Camera description: {}", camera_desc);
|
||||
println!(" Actual resolution: {}x{}", width, height);
|
||||
|
||||
// Open the camera stream
|
||||
println!(" Opening camera stream...");
|
||||
callback_camera.open_stream().context("Failed to open camera stream")?;
|
||||
|
||||
// Give the camera time to start streaming
|
||||
std::thread::sleep(std::time::Duration::from_millis(500));
|
||||
|
||||
println!(" ✅ Camera stream opened");
|
||||
|
||||
self.resolution = (width, height);
|
||||
self.metadata.current_resolution = self.resolution;
|
||||
self.callback_camera = Some(Arc::new(Mutex::new(callback_camera)));
|
||||
|
||||
println!("✅ Production camera initialized successfully");
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "nokhwa_camera"))]
|
||||
{
|
||||
println!("🎥 Production camera hardware support not compiled.");
|
||||
println!(" Compile with --features nokhwa_camera to enable.");
|
||||
println!(" Using placeholder mode for testing.");
|
||||
}
|
||||
|
||||
self.is_running = true;
|
||||
Ok(())
|
||||
})
|
||||
@ -122,13 +414,45 @@ impl CameraInterface for ProductionCamera {
|
||||
anyhow::bail!("Camera not initialized or not running");
|
||||
}
|
||||
|
||||
let frame = self
|
||||
.capture_hardware_frame()
|
||||
.await
|
||||
.context("Failed to capture frame from production camera")?;
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
{
|
||||
let cancel_token = self.cancellation_token.clone();
|
||||
let camera = self.callback_camera.as_ref()
|
||||
.context("Camera not initialized")?
|
||||
.clone();
|
||||
|
||||
self.frame_counter += 1;
|
||||
Ok(frame)
|
||||
let frame_number = self.frame_counter + 1;
|
||||
|
||||
let mut frame_data = BlockingCapture::new(camera, cancel_token.clone()).await?;
|
||||
|
||||
self.frame_counter = frame_number;
|
||||
|
||||
// Log progress periodically
|
||||
if frame_number <= 3 || frame_number % 100 == 0 {
|
||||
println!("📸 Frame {} captured ({}x{})", frame_number, frame_data.1, frame_data.2);
|
||||
}
|
||||
|
||||
Ok(CapturedFrame {
|
||||
data: Arc::new(FrameData::new(
|
||||
frame_data.0,
|
||||
frame_data.1,
|
||||
frame_data.2,
|
||||
FrameFormat::Grayscale,
|
||||
)),
|
||||
frame_number,
|
||||
capture_timestamp: Utc::now(),
|
||||
metadata: FrameMetadata::default(),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "nokhwa_camera"))]
|
||||
{
|
||||
anyhow::bail!(
|
||||
"Production camera capture requires --features nokhwa_camera.\n\
|
||||
Current build does not include hardware camera support.\n\
|
||||
Use 'cargo build --features nokhwa_camera' to enable."
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -151,12 +475,15 @@ impl CameraInterface for ProductionCamera {
|
||||
if self.is_running {
|
||||
println!("🛑 Shutting down production camera...");
|
||||
|
||||
// TODO: Replace with actual camera shutdown
|
||||
// This is where you would:
|
||||
// 1. Stop capture
|
||||
// 2. Release camera device
|
||||
// 3. Clean up resources
|
||||
// 4. Reset camera state
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
{
|
||||
if let Some(camera) = self.callback_camera.take() {
|
||||
// Drop the camera handle to let the OS reclaim the device.
|
||||
// Calling stop_stream can hang on some macOS setups, so we prefer
|
||||
// a quick drop here to avoid blocking shutdown.
|
||||
drop(camera);
|
||||
}
|
||||
}
|
||||
|
||||
self.is_running = false;
|
||||
println!("✅ Production camera shut down successfully");
|
||||
@ -164,6 +491,10 @@ impl CameraInterface for ProductionCamera {
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn set_cancellation_token(&mut self, token: CancellationToken) {
|
||||
self.cancellation_token = Some(token);
|
||||
}
|
||||
}
|
||||
|
||||
/// Camera capabilities detection for production hardware
|
||||
@ -171,30 +502,118 @@ pub struct ProductionCameraCapabilities;
|
||||
|
||||
impl ProductionCameraCapabilities {
|
||||
/// Detect available cameras on the system
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
pub fn detect_cameras() -> Result<Vec<CameraDeviceInfo>> {
|
||||
// TODO: Implement actual camera detection
|
||||
// This would scan for available camera devices using:
|
||||
// - V4L2 on Linux
|
||||
// - DirectShow on Windows
|
||||
// - AVFoundation on macOS
|
||||
use nokhwa::query;
|
||||
|
||||
println!("🔍 Detecting production cameras...");
|
||||
|
||||
// Placeholder: return empty list for now
|
||||
// Initialize nokhwa on macOS
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
nokhwa::nokhwa_initialize(|_| {});
|
||||
}
|
||||
|
||||
let cameras = query(nokhwa::utils::ApiBackend::Auto)
|
||||
.context("Failed to query cameras")?;
|
||||
|
||||
let mut devices = Vec::new();
|
||||
|
||||
for camera_info in cameras {
|
||||
let index = match camera_info.index() {
|
||||
CameraIndex::Index(i) => i.to_string(),
|
||||
CameraIndex::String(s) => s.clone(),
|
||||
};
|
||||
|
||||
devices.push(CameraDeviceInfo {
|
||||
device_id: index,
|
||||
device_name: camera_info.human_name().to_string(),
|
||||
vendor: "Unknown".to_string(),
|
||||
model: camera_info.description().to_string(),
|
||||
bus_info: camera_info.misc().to_string(),
|
||||
});
|
||||
|
||||
println!(
|
||||
" Found: {} - {}",
|
||||
camera_info.human_name(),
|
||||
camera_info.description()
|
||||
);
|
||||
}
|
||||
|
||||
println!(" Total cameras found: {}", devices.len());
|
||||
Ok(devices)
|
||||
}
|
||||
|
||||
/// Detect cameras placeholder when feature disabled
|
||||
#[cfg(not(feature = "nokhwa_camera"))]
|
||||
pub fn detect_cameras() -> Result<Vec<CameraDeviceInfo>> {
|
||||
println!("🔍 Camera detection requires --features nokhwa_camera");
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
/// Get detailed capabilities for a specific camera device
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
pub fn get_device_capabilities(device_id: &str) -> Result<DeviceCapabilities> {
|
||||
// TODO: Query actual device capabilities
|
||||
use nokhwa::Camera;
|
||||
|
||||
println!("📋 Querying capabilities for device: {}", device_id);
|
||||
|
||||
// Placeholder implementation
|
||||
let camera_index = ProductionCamera::parse_camera_index(device_id)?;
|
||||
let requested = RequestedFormat::new::<RgbFormat>(RequestedFormatType::None);
|
||||
|
||||
let mut camera = Camera::new(camera_index, requested)
|
||||
.context("Failed to open camera for capability query")?;
|
||||
|
||||
let compatible_formats = camera.compatible_list_by_resolution(
|
||||
nokhwa::utils::FrameFormat::MJPEG,
|
||||
);
|
||||
|
||||
let mut resolutions: Vec<(u32, u32)> = Vec::new();
|
||||
if let Ok(formats) = compatible_formats {
|
||||
for (resolution, _) in formats {
|
||||
let res = (resolution.width(), resolution.height());
|
||||
if !resolutions.contains(&res) {
|
||||
resolutions.push(res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Default common resolutions if query fails
|
||||
if resolutions.is_empty() {
|
||||
resolutions = vec![(640, 480), (1280, 720), (1920, 1080)];
|
||||
}
|
||||
|
||||
Ok(DeviceCapabilities {
|
||||
device_id: device_id.to_string(),
|
||||
supported_resolutions: resolutions,
|
||||
supported_formats: vec![
|
||||
FrameFormat::Grayscale,
|
||||
FrameFormat::RGB888,
|
||||
FrameFormat::YUV420,
|
||||
FrameFormat::JPEG,
|
||||
],
|
||||
min_fps: 1.0,
|
||||
max_fps: 60.0,
|
||||
has_hardware_encoding: false,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get device capabilities placeholder when feature disabled
|
||||
#[cfg(not(feature = "nokhwa_camera"))]
|
||||
pub fn get_device_capabilities(device_id: &str) -> Result<DeviceCapabilities> {
|
||||
println!(
|
||||
"📋 Device capability query requires --features nokhwa_camera"
|
||||
);
|
||||
|
||||
Ok(DeviceCapabilities {
|
||||
device_id: device_id.to_string(),
|
||||
supported_resolutions: vec![(640, 480), (1280, 720), (1920, 1080)],
|
||||
supported_formats: vec![FrameFormat::JPEG, FrameFormat::RGB888, FrameFormat::YUV420],
|
||||
supported_formats: vec![
|
||||
FrameFormat::Grayscale,
|
||||
FrameFormat::RGB888,
|
||||
FrameFormat::YUV420,
|
||||
FrameFormat::JPEG,
|
||||
],
|
||||
min_fps: 1.0,
|
||||
max_fps: 60.0,
|
||||
has_hardware_encoding: false,
|
||||
@ -231,8 +650,8 @@ mod tests {
|
||||
async fn test_production_camera_creation() {
|
||||
let frame_pool = Arc::new(HierarchicalFramePool::new(10));
|
||||
let camera = ProductionCamera::new(
|
||||
"/dev/video0".to_string(),
|
||||
"v4l2".to_string(),
|
||||
"0".to_string(),
|
||||
"nokhwa".to_string(),
|
||||
(640, 480),
|
||||
30.0,
|
||||
frame_pool,
|
||||
@ -240,8 +659,7 @@ mod tests {
|
||||
|
||||
assert!(camera.is_ok());
|
||||
let camera = camera.unwrap();
|
||||
assert_eq!(camera.device_id, "/dev/video0");
|
||||
assert_eq!(camera.backend, "v4l2");
|
||||
assert_eq!(camera.device_id, "0");
|
||||
assert!(!camera.is_running());
|
||||
}
|
||||
|
||||
@ -254,12 +672,26 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_device_capabilities_query() {
|
||||
let caps = ProductionCameraCapabilities::get_device_capabilities("/dev/video0");
|
||||
let caps = ProductionCameraCapabilities::get_device_capabilities("0");
|
||||
assert!(caps.is_ok());
|
||||
|
||||
let caps = caps.unwrap();
|
||||
assert_eq!(caps.device_id, "/dev/video0");
|
||||
assert_eq!(caps.device_id, "0");
|
||||
assert!(!caps.supported_resolutions.is_empty());
|
||||
assert!(!caps.supported_formats.is_empty());
|
||||
}
|
||||
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
#[test]
|
||||
fn test_parse_camera_index() {
|
||||
// Test various device ID formats
|
||||
let index = ProductionCamera::parse_camera_index("0").unwrap();
|
||||
assert!(matches!(index, CameraIndex::Index(0)));
|
||||
|
||||
let index = ProductionCamera::parse_camera_index("/dev/video0").unwrap();
|
||||
assert!(matches!(index, CameraIndex::Index(0)));
|
||||
|
||||
let index = ProductionCamera::parse_camera_index("device:1").unwrap();
|
||||
assert!(matches!(index, CameraIndex::Index(1)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
use anyhow::Result;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::sleep;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::network::api::ApiClient;
|
||||
use crate::camera::{CameraConfig, CameraController};
|
||||
@ -17,19 +19,24 @@ use crate::storage::storage::StorageController;
|
||||
/// Core application coordinator that manages the event bus and background tasks
|
||||
pub struct Application {
|
||||
event_bus: EventBus,
|
||||
background_tasks: Vec<JoinHandle<()>>,
|
||||
background_tasks: Vec<(String, JoinHandle<()>)>,
|
||||
memory_monitor: MemoryMonitor,
|
||||
camera_override: Option<CameraConfig>,
|
||||
shutdown_tx: broadcast::Sender<()>,
|
||||
cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
impl Application {
|
||||
/// Create a new Application instance with an event bus
|
||||
pub fn new(event_bus_capacity: usize) -> Self {
|
||||
let (shutdown_tx, _) = broadcast::channel(16);
|
||||
Self {
|
||||
event_bus: EventBus::new(event_bus_capacity),
|
||||
background_tasks: Vec::new(),
|
||||
memory_monitor: MemoryMonitor::new(),
|
||||
camera_override: None,
|
||||
shutdown_tx,
|
||||
cancellation_token: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,21 +51,30 @@ impl Application {
|
||||
|
||||
// Create a test subscriber to verify event flow
|
||||
let mut test_subscriber = self.event_bus.subscribe();
|
||||
let mut test_shutdown_rx = self.shutdown_tx.subscribe();
|
||||
|
||||
// Spawn a background task to handle test events
|
||||
let test_handle = tokio::spawn(async move {
|
||||
println!("📡 Test subscriber started, waiting for events...");
|
||||
let mut system_started = false;
|
||||
let mut frame_count = 0;
|
||||
let mut frame_count = 0u64;
|
||||
|
||||
while let Ok(event) = test_subscriber.recv().await {
|
||||
match event.as_ref() {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = test_shutdown_rx.recv() => {
|
||||
println!("📡 Test subscriber received shutdown signal");
|
||||
break;
|
||||
}
|
||||
event_result = test_subscriber.recv() => {
|
||||
let event = match event_result {
|
||||
Ok(e) => e,
|
||||
Err(_) => break,
|
||||
};
|
||||
match event.as_ref() {
|
||||
SystemEvent::SystemStarted(system_event) => {
|
||||
println!("✅ Received SystemStartedEvent!");
|
||||
println!(" Timestamp: {}", system_event.timestamp);
|
||||
println!(" Version: {}", system_event.version);
|
||||
println!(" Event verification successful! 🎉");
|
||||
system_started = true;
|
||||
}
|
||||
SystemEvent::FrameCaptured(frame_event) => {
|
||||
frame_count += 1;
|
||||
@ -66,7 +82,8 @@ impl Application {
|
||||
// Record memory optimization metrics
|
||||
record_frame_processed(frame_event.data_size(), 3); // Assume 3 subscribers
|
||||
|
||||
if frame_count <= 5 || frame_count % 30 == 0 {
|
||||
// Log first 5 frames and then every 100th frame
|
||||
if frame_count <= 5 || frame_count % 100 == 0 {
|
||||
println!("📸 Received FrameCapturedEvent #{}", frame_event.frame_id);
|
||||
println!(" Timestamp: {}", frame_event.timestamp);
|
||||
let (width, height) = frame_event.dimensions();
|
||||
@ -77,15 +94,6 @@ impl Application {
|
||||
);
|
||||
println!(" Format: {:?}", frame_event.frame_data.format);
|
||||
}
|
||||
|
||||
// Exit after receiving some frames for demo
|
||||
if frame_count >= 10 {
|
||||
println!(
|
||||
"🎬 Received {} frames, test subscriber stopping...",
|
||||
frame_count
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
SystemEvent::MeteorDetected(meteor_event) => {
|
||||
println!(
|
||||
@ -125,13 +133,16 @@ impl Application {
|
||||
event.centroid_count
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("🔚 Test subscriber finished");
|
||||
});
|
||||
|
||||
self.background_tasks.push(test_handle);
|
||||
self.background_tasks
|
||||
.push(("test-subscriber".to_string(), test_handle));
|
||||
|
||||
// Give the subscriber a moment to be ready
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
@ -154,38 +165,48 @@ impl Application {
|
||||
None => load_camera_config()?,
|
||||
};
|
||||
let mut camera_controller = CameraController::new(camera_config, self.event_bus.clone())?;
|
||||
let camera_shutdown_rx = self.shutdown_tx.subscribe();
|
||||
|
||||
// Spawn camera controller in background task
|
||||
let camera_cancel = self.cancellation_token.clone();
|
||||
let camera_handle = tokio::spawn(async move {
|
||||
if let Err(e) = camera_controller.run().await {
|
||||
if let Err(e) = camera_controller
|
||||
.run(camera_shutdown_rx, camera_cancel)
|
||||
.await
|
||||
{
|
||||
eprintln!("❌ Camera controller error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
self.background_tasks.push(camera_handle);
|
||||
self.background_tasks
|
||||
.push(("camera".to_string(), camera_handle));
|
||||
|
||||
// Start memory monitoring reporting
|
||||
println!("📊 Starting memory optimization monitoring...");
|
||||
let memory_shutdown_rx = self.shutdown_tx.subscribe();
|
||||
let memory_handle = tokio::spawn(async move {
|
||||
use crate::memory::GLOBAL_MEMORY_MONITOR;
|
||||
GLOBAL_MEMORY_MONITOR.start_reporting(30).await; // Report every 30 seconds
|
||||
GLOBAL_MEMORY_MONITOR.start_reporting_with_shutdown(30, memory_shutdown_rx).await;
|
||||
});
|
||||
self.background_tasks.push(memory_handle);
|
||||
self.background_tasks
|
||||
.push(("memory-monitor".to_string(), memory_handle));
|
||||
|
||||
// Initialize and start detection controller
|
||||
println!("🔍 Initializing detection controller...");
|
||||
let detection_config = DetectionConfig::default();
|
||||
let mut detection_controller =
|
||||
DetectionController::new(detection_config, self.event_bus.clone());
|
||||
let detection_shutdown_rx = self.shutdown_tx.subscribe();
|
||||
|
||||
// Spawn detection controller in background task
|
||||
let detection_handle = tokio::spawn(async move {
|
||||
if let Err(e) = detection_controller.run().await {
|
||||
if let Err(e) = detection_controller.run(detection_shutdown_rx).await {
|
||||
eprintln!("❌ Detection controller error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
self.background_tasks.push(detection_handle);
|
||||
self.background_tasks
|
||||
.push(("detection".to_string(), detection_handle));
|
||||
|
||||
// Initialize and start storage controller
|
||||
println!("💾 Initializing storage controller...");
|
||||
@ -198,15 +219,17 @@ impl Application {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let storage_shutdown_rx = self.shutdown_tx.subscribe();
|
||||
|
||||
// Spawn storage controller in background task
|
||||
let storage_handle = tokio::spawn(async move {
|
||||
if let Err(e) = storage_controller.run().await {
|
||||
if let Err(e) = storage_controller.run(storage_shutdown_rx).await {
|
||||
eprintln!("❌ Storage controller error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
self.background_tasks.push(storage_handle);
|
||||
self.background_tasks
|
||||
.push(("storage".to_string(), storage_handle));
|
||||
|
||||
// Initialize and start communication controller
|
||||
println!("📡 Initializing communication controller...");
|
||||
@ -221,25 +244,29 @@ impl Application {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let communication_shutdown_rx = self.shutdown_tx.subscribe();
|
||||
|
||||
// Spawn communication controller in background task
|
||||
let communication_handle = tokio::spawn(async move {
|
||||
if let Err(e) = communication_controller.run().await {
|
||||
if let Err(e) = communication_controller.run(communication_shutdown_rx).await {
|
||||
eprintln!("❌ Communication controller error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
self.background_tasks.push(communication_handle);
|
||||
self.background_tasks
|
||||
.push(("communication".to_string(), communication_handle));
|
||||
|
||||
// Initialize and start heartbeat task
|
||||
println!("💓 Initializing heartbeat task...");
|
||||
let heartbeat_shutdown_rx = self.shutdown_tx.subscribe();
|
||||
let heartbeat_handle = tokio::spawn(async move {
|
||||
if let Err(e) = Self::run_heartbeat_task(heartbeat_config).await {
|
||||
if let Err(e) = Self::run_heartbeat_task(heartbeat_config, heartbeat_shutdown_rx).await {
|
||||
eprintln!("❌ Heartbeat task error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
self.background_tasks.push(heartbeat_handle);
|
||||
self.background_tasks
|
||||
.push(("heartbeat".to_string(), heartbeat_handle));
|
||||
|
||||
// Run the main application loop
|
||||
println!("🔄 Starting main application loop...");
|
||||
@ -248,23 +275,27 @@ impl Application {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Main application loop - this will eventually coordinate all modules
|
||||
/// Main application loop - waits for shutdown signal
|
||||
async fn main_loop(&mut self) -> Result<()> {
|
||||
println!("⏳ Main loop running... (will exit after 10 seconds for demo)");
|
||||
println!("🔄 Main loop running... (Press Ctrl+C to stop)");
|
||||
|
||||
// For now, just wait a bit to allow the camera to capture frames and test subscriber to process events
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
// Wait for shutdown signal (Ctrl+C)
|
||||
tokio::signal::ctrl_c().await?;
|
||||
|
||||
println!("🛑 Stopping application...");
|
||||
println!("\n🛑 Received shutdown signal, stopping application...");
|
||||
|
||||
// Wait for all background tasks to complete
|
||||
for task in self.background_tasks.drain(..) {
|
||||
if let Err(e) = task.await {
|
||||
eprintln!("❌ Background task error: {}", e);
|
||||
}
|
||||
}
|
||||
// Send shutdown signal to all listeners
|
||||
let _ = self.shutdown_tx.send(());
|
||||
self.cancellation_token.cancel();
|
||||
|
||||
// Give tasks time to respond to shutdown signal
|
||||
println!(" Waiting for tasks to shutdown gracefully...");
|
||||
self.wait_for_tasks(Duration::from_secs(3)).await;
|
||||
|
||||
println!("✅ Application stopped successfully");
|
||||
|
||||
// Let process exit naturally - this allows Drop implementations to run
|
||||
// and properly release camera resources
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -278,8 +309,41 @@ impl Application {
|
||||
self.event_bus.subscriber_count()
|
||||
}
|
||||
|
||||
async fn wait_for_tasks(&mut self, timeout: Duration) {
|
||||
for (name, mut handle) in self.background_tasks.drain(..) {
|
||||
let timeout_sleep = sleep(timeout);
|
||||
tokio::pin!(timeout_sleep);
|
||||
|
||||
let result = tokio::select! {
|
||||
res = &mut handle => Some(res),
|
||||
_ = &mut timeout_sleep => None,
|
||||
};
|
||||
|
||||
match result {
|
||||
Some(res) => {
|
||||
if let Err(e) = res {
|
||||
eprintln!("⚠️ Task '{}' exited with error: {}", name, e);
|
||||
} else {
|
||||
println!(" Task '{}' stopped cleanly", name);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
println!(
|
||||
"⚠️ Task '{}' did not stop within {:?}, aborting...",
|
||||
name, timeout
|
||||
);
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Background task for sending heartbeat signals to the backend
|
||||
async fn run_heartbeat_task(config: CommunicationConfig) -> Result<()> {
|
||||
async fn run_heartbeat_task(
|
||||
config: CommunicationConfig,
|
||||
mut shutdown_rx: broadcast::Receiver<()>,
|
||||
) -> Result<()> {
|
||||
println!("💓 Starting heartbeat task...");
|
||||
println!(
|
||||
" Heartbeat interval: {}s",
|
||||
@ -290,8 +354,16 @@ impl Application {
|
||||
let config_manager = ConfigManager::new();
|
||||
|
||||
loop {
|
||||
// Wait for the configured interval
|
||||
sleep(Duration::from_secs(config.heartbeat_interval_seconds)).await;
|
||||
// Wait for heartbeat interval or shutdown signal
|
||||
tokio::select! {
|
||||
_ = shutdown_rx.recv() => {
|
||||
println!("💓 Heartbeat task received shutdown signal");
|
||||
break;
|
||||
}
|
||||
_ = sleep(Duration::from_secs(config.heartbeat_interval_seconds)) => {
|
||||
// Continue to send heartbeat
|
||||
}
|
||||
}
|
||||
|
||||
// Check if device is registered and has configuration
|
||||
if !config_manager.config_exists() {
|
||||
@ -336,6 +408,9 @@ impl Application {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("💓 Heartbeat task stopped");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
use anyhow::{Context, Result};
|
||||
use std::collections::VecDeque;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
use crate::core::events::{EventBus, FrameCapturedEvent, MeteorDetectedEvent, SystemEvent};
|
||||
@ -64,7 +65,7 @@ impl DetectionController {
|
||||
}
|
||||
|
||||
/// Start the detection loop
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
pub async fn run(&mut self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
|
||||
println!("🔍 Starting meteor detection controller...");
|
||||
println!(" Buffer size: {} frames", self.config.buffer_capacity);
|
||||
println!(" Algorithm: {}", self.config.algorithm_name);
|
||||
@ -81,6 +82,12 @@ impl DetectionController {
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Handle shutdown signal
|
||||
_ = shutdown_rx.recv() => {
|
||||
println!("🔍 Detection controller received shutdown signal");
|
||||
break;
|
||||
}
|
||||
|
||||
// Handle incoming events
|
||||
event_result = event_receiver.recv() => {
|
||||
match event_result {
|
||||
@ -104,6 +111,9 @@ impl DetectionController {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("🔍 Detection controller stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle incoming events from the event bus
|
||||
|
||||
@ -74,6 +74,8 @@ enum Commands {
|
||||
},
|
||||
/// Test hardware fingerprinting
|
||||
TestFingerprint,
|
||||
/// List available cameras on this system
|
||||
ListCameras,
|
||||
/// Show device status and configuration
|
||||
Status,
|
||||
/// Check backend connectivity
|
||||
@ -149,6 +151,9 @@ async fn main() -> Result<()> {
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
Commands::ListCameras => {
|
||||
list_cameras();
|
||||
}
|
||||
Commands::Status => {
|
||||
show_status().await?;
|
||||
}
|
||||
@ -164,6 +169,29 @@ async fn main() -> Result<()> {
|
||||
debug,
|
||||
offline,
|
||||
} => {
|
||||
// Pre-open hardware camera on main thread (required for macOS authorization)
|
||||
// This must happen BEFORE any .await calls
|
||||
#[cfg(feature = "opencv_camera")]
|
||||
{
|
||||
if camera.starts_with("device:") || camera.starts_with("hw:") {
|
||||
let device_str = camera
|
||||
.strip_prefix("device:")
|
||||
.or_else(|| camera.strip_prefix("hw:"))
|
||||
.unwrap_or("0");
|
||||
let device_id: i32 = device_str.parse().unwrap_or(0);
|
||||
|
||||
match camera::opencv_camera::OpenCVCamera::open_capture_on_main_thread(device_id) {
|
||||
Ok(capture) => {
|
||||
camera::opencv_camera::set_pre_opened_capture(capture);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("❌ Failed to open camera on main thread: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) =
|
||||
run_application_with_camera(camera.clone(), config.clone(), *debug, *offline).await
|
||||
{
|
||||
@ -503,6 +531,79 @@ async fn test_hardware_fingerprint() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List available cameras on this system
|
||||
fn list_cameras() {
|
||||
println!("📷 Camera Information");
|
||||
println!("");
|
||||
|
||||
// OpenCV backend
|
||||
#[cfg(feature = "opencv_camera")]
|
||||
{
|
||||
println!("Backend: OpenCV");
|
||||
println!("");
|
||||
println!("OpenCV uses index-based camera access.");
|
||||
println!("Try device:0 for the default camera.");
|
||||
println!("");
|
||||
println!("Usage: meteor-edge-client run --camera device:<index>");
|
||||
println!("Example: meteor-edge-client run --camera device:0");
|
||||
}
|
||||
|
||||
// nokhwa backend
|
||||
#[cfg(feature = "nokhwa_camera")]
|
||||
{
|
||||
use camera::production::ProductionCameraCapabilities;
|
||||
|
||||
println!("Backend: nokhwa");
|
||||
println!("");
|
||||
|
||||
match ProductionCameraCapabilities::detect_cameras() {
|
||||
Ok(cameras) => {
|
||||
if cameras.is_empty() {
|
||||
println!("⚠️ No cameras detected.");
|
||||
println!("");
|
||||
println!("Possible reasons:");
|
||||
println!(" - No camera connected");
|
||||
println!(" - Camera permissions not granted (macOS: System Preferences > Privacy > Camera)");
|
||||
println!(" - Camera in use by another application");
|
||||
} else {
|
||||
println!("Found {} camera(s):", cameras.len());
|
||||
println!("");
|
||||
for (i, cam) in cameras.iter().enumerate() {
|
||||
println!(" [{}] {}", i, cam.device_name);
|
||||
println!(" Device ID: {}", cam.device_id);
|
||||
println!(" Model: {}", cam.model);
|
||||
if !cam.bus_info.is_empty() {
|
||||
println!(" Bus: {}", cam.bus_info);
|
||||
}
|
||||
println!("");
|
||||
}
|
||||
println!("Usage: meteor-edge-client run --camera device:<index>");
|
||||
println!("Example: meteor-edge-client run --camera device:0");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("❌ Failed to detect cameras: {}", e);
|
||||
eprintln!("");
|
||||
eprintln!("On macOS, you may need to grant camera permissions.");
|
||||
eprintln!("Go to: System Preferences > Privacy & Security > Camera");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No backend compiled
|
||||
#[cfg(not(any(feature = "opencv_camera", feature = "nokhwa_camera")))]
|
||||
{
|
||||
println!("⚠️ No camera backend compiled.");
|
||||
println!("");
|
||||
println!("To enable camera support, rebuild with:");
|
||||
println!(" cargo build --features opencv_camera # OpenCV backend (recommended)");
|
||||
println!(" cargo build --features nokhwa_camera # nokhwa backend");
|
||||
println!("");
|
||||
println!("Alternatively, use video files:");
|
||||
println!(" meteor-edge-client run --camera file:video.mp4");
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the application with specified camera
|
||||
async fn run_application_with_camera(
|
||||
camera_spec_input: String,
|
||||
|
||||
@ -30,6 +30,8 @@ pub enum FrameFormat {
|
||||
JPEG,
|
||||
/// H.264 encoded frame
|
||||
H264Frame,
|
||||
/// 8-bit grayscale (single channel)
|
||||
Grayscale,
|
||||
}
|
||||
|
||||
impl FrameData {
|
||||
@ -84,6 +86,7 @@ impl FrameData {
|
||||
FrameFormat::YUV420 => (width * height * 3 / 2) as usize,
|
||||
FrameFormat::JPEG => (width * height) as usize, // Estimate for JPEG
|
||||
FrameFormat::H264Frame => (width * height / 2) as usize, // Estimate for H.264
|
||||
FrameFormat::Grayscale => (width * height) as usize,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -72,6 +72,30 @@ impl MemoryMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Start background reporting loop with shutdown support
|
||||
pub async fn start_reporting_with_shutdown(
|
||||
&self,
|
||||
interval_seconds: u64,
|
||||
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
||||
) {
|
||||
let mut reporting_interval = interval(Duration::from_secs(interval_seconds));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_rx.recv() => {
|
||||
println!("📊 Memory monitor received shutdown signal");
|
||||
break;
|
||||
}
|
||||
_ = reporting_interval.tick() => {
|
||||
let stats = self.stats();
|
||||
Self::log_stats(&stats).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("📊 Memory monitor stopped");
|
||||
}
|
||||
|
||||
async fn log_stats(stats: &MemoryStats) {
|
||||
if stats.frames_processed > 0 {
|
||||
println!("📊 Memory Optimization Stats:");
|
||||
|
||||
@ -4,6 +4,7 @@ use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Command;
|
||||
use tokio::fs as async_fs;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
use crate::network::api::ApiClient;
|
||||
@ -55,7 +56,7 @@ impl CommunicationController {
|
||||
}
|
||||
|
||||
/// Main run loop for the communication controller
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
pub async fn run(&mut self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
|
||||
println!("📡 Starting communication controller...");
|
||||
println!(" API Base URL: {}", self.config.api_base_url);
|
||||
println!(" Retry attempts: {}", self.config.retry_attempts);
|
||||
@ -67,26 +68,40 @@ impl CommunicationController {
|
||||
let mut event_receiver = self.event_bus.subscribe();
|
||||
|
||||
loop {
|
||||
match event_receiver.recv().await {
|
||||
Ok(event) => {
|
||||
if let SystemEvent::EventPackageArchived(archive_event) = event.as_ref() {
|
||||
println!(
|
||||
"📦 Received EventPackageArchivedEvent: {}",
|
||||
archive_event.event_id
|
||||
);
|
||||
tokio::select! {
|
||||
// Handle shutdown signal
|
||||
_ = shutdown_rx.recv() => {
|
||||
println!("📡 Communication controller received shutdown signal");
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = self.process_archived_event(archive_event.clone()).await {
|
||||
eprintln!("❌ Failed to process archived event: {}", e);
|
||||
// Handle incoming events
|
||||
event_result = event_receiver.recv() => {
|
||||
match event_result {
|
||||
Ok(event) => {
|
||||
if let SystemEvent::EventPackageArchived(archive_event) = event.as_ref() {
|
||||
println!(
|
||||
"📦 Received EventPackageArchivedEvent: {}",
|
||||
archive_event.event_id
|
||||
);
|
||||
|
||||
if let Err(e) = self.process_archived_event(archive_event.clone()).await {
|
||||
eprintln!("❌ Failed to process archived event: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("❌ Error receiving event: {}", e);
|
||||
// Sleep briefly before continuing to avoid busy loop
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("❌ Error receiving event: {}", e);
|
||||
// Sleep briefly before continuing to avoid busy loop
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("📡 Communication controller stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process an EventPackageArchivedEvent by packaging and uploading
|
||||
|
||||
@ -4,6 +4,7 @@ use std::collections::VecDeque;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs as async_fs;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
use crate::core::events::{
|
||||
@ -144,7 +145,7 @@ impl StorageController {
|
||||
}
|
||||
|
||||
/// Start the storage controller loop
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
pub async fn run(&mut self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
|
||||
println!("💾 Starting storage controller...");
|
||||
println!(" Buffer size: {} frames", self.config.frame_buffer_size);
|
||||
println!(" Storage path: {:?}", self.config.base_storage_path);
|
||||
@ -157,6 +158,12 @@ impl StorageController {
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Handle shutdown signal
|
||||
_ = shutdown_rx.recv() => {
|
||||
println!("💾 Storage controller received shutdown signal");
|
||||
break;
|
||||
}
|
||||
|
||||
// Handle incoming events
|
||||
event_result = event_receiver.recv() => {
|
||||
match event_result {
|
||||
@ -180,6 +187,9 @@ impl StorageController {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("💾 Storage controller stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle incoming events from the event bus
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user