2025-08-12 07:21:41 +08:00

558 lines
20 KiB
Rust

use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::{sleep, timeout};
use anyhow::Result;
use crate::ring_buffer::{
RingBuffer, RingBufferConfig, AstronomicalFrame,
create_meteor_frame_buffer, RingBufferMonitor
};
use crate::memory_mapping::{MappingConfig, MappingPool, AccessPattern};
use crate::frame_pool::HierarchicalFramePool;
/// Comprehensive test suite for Ring Buffer & Memory Mapping integration
pub async fn test_ring_buffer_system() -> Result<()> {
println!("🧪 Testing Phase 3 Week 1: Ring Buffer & Memory Mapping");
println!("========================================================");
// Test 1: Basic ring buffer operations
println!("\n📋 Test 1: Basic Ring Buffer Operations");
test_basic_ring_buffer().await?;
// Test 2: High-throughput astronomical frame streaming
println!("\n📋 Test 2: Astronomical Frame Streaming");
test_astronomical_frame_streaming().await?;
// Test 3: Concurrent producer-consumer patterns
println!("\n📋 Test 3: Concurrent Producer-Consumer");
test_concurrent_streaming().await?;
// Test 4: Memory mapping integration
println!("\n📋 Test 4: Memory Mapping Integration");
test_memory_mapping_integration().await?;
println!("\n✅ Ring buffer system tests completed successfully!");
Ok(())
}
/// Test basic ring buffer functionality
async fn test_basic_ring_buffer() -> Result<()> {
let config = RingBufferConfig {
capacity: 64,
enable_prefetch: true,
alignment: 64,
allow_overwrites: false,
enable_stats: true,
};
let buffer = RingBuffer::<u64>::new(config)?;
println!(" ✓ Created ring buffer (capacity: {})", buffer.capacity());
// Test basic write/read operations
for i in 0..32 {
buffer.write(i)?;
}
assert_eq!(buffer.used_slots(), 32);
assert_eq!(buffer.fill_percentage(), 50); // 32/64 * 100
println!(" ✓ Basic write operations completed");
// Test batch operations
let values = vec![100, 101, 102, 103, 104];
let written = buffer.write_batch(&values)?;
assert_eq!(written, 5);
let mut read_buffer = vec![0u64; 10];
let read_count = buffer.read_batch(&mut read_buffer)?;
assert_eq!(read_count, 10);
// Verify data integrity
for (i, &value) in read_buffer.iter().enumerate() {
assert_eq!(value, i as u64);
}
println!(" ✓ Batch operations completed");
// Test statistics
let stats = buffer.stats();
println!(" 📊 Buffer Statistics:");
println!(" Writes: {}, Reads: {}", stats.writes_total, stats.reads_total);
println!(" Peak fill: {}%, Current fill: {}%",
stats.peak_fill_percentage, stats.current_fill_percentage);
println!(" Used/Available: {}/{}", stats.used_slots, stats.available_slots);
assert!(stats.writes_total > 0);
assert!(stats.reads_total > 0);
Ok(())
}
/// Test astronomical frame streaming with realistic data
async fn test_astronomical_frame_streaming() -> Result<()> {
let buffer = create_meteor_frame_buffer(128)?;
println!(" 🌠 Simulating meteor detection frame stream");
// Generate realistic astronomical frames
let mut frames_written = 0;
let start_time = Instant::now();
for frame_id in 0..100 {
let frame = AstronomicalFrame {
frame_id,
timestamp_nanos: start_time.elapsed().as_nanos() as u64,
width: if frame_id % 10 == 0 { 1920 } else { 640 }, // Mix of resolutions
height: if frame_id % 10 == 0 { 1080 } else { 480 },
data_ptr: 0x10000 + (frame_id * 1000) as usize,
data_size: if frame_id % 10 == 0 { 1920 * 1080 * 3 } else { 640 * 480 * 3 },
brightness_sum: 100.0 + (frame_id as f32 * 0.5),
detection_flags: if frame_id % 20 == 0 { 0b0001 } else { 0b0000 }, // Occasional detections
};
match buffer.write(frame) {
Ok(()) => {
frames_written += 1;
// Simulate processing time
if frame_id % 25 == 0 {
sleep(Duration::from_micros(100)).await;
}
}
Err(e) => {
println!(" ⚠️ Frame {} write failed: {}", frame_id, e);
break;
}
}
}
println!(" ✓ Wrote {} astronomical frames", frames_written);
// Process frames in batches
let mut total_processed = 0;
let mut meteor_detections = 0;
let mut batch_buffer = vec![AstronomicalFrame {
frame_id: 0, timestamp_nanos: 0, width: 0, height: 0,
data_ptr: 0, data_size: 0, brightness_sum: 0.0, detection_flags: 0,
}; 10];
while !buffer.is_empty() {
match buffer.read_batch(&mut batch_buffer) {
Ok(count) => {
total_processed += count;
for frame in &batch_buffer[0..count] {
if frame.detection_flags & 0b0001 != 0 {
meteor_detections += 1;
}
}
}
Err(_) => break,
}
}
println!(" ✓ Processed {} frames ({} meteor detections)",
total_processed, meteor_detections);
// Verify statistics
let stats = buffer.stats();
println!(" 📊 Frame Stream Statistics:");
println!(" Total writes: {}, Total reads: {}", stats.writes_total, stats.reads_total);
println!(" Buffer efficiency: {:.1}%",
(stats.reads_total as f64 / stats.writes_total as f64) * 100.0);
assert_eq!(total_processed, frames_written);
assert!(meteor_detections > 0);
Ok(())
}
/// Test concurrent producer-consumer patterns
async fn test_concurrent_streaming() -> Result<()> {
let buffer = Arc::new(create_meteor_frame_buffer(256)?);
let mut monitor = RingBufferMonitor::new();
monitor.add_buffer(buffer.clone());
println!(" 🔄 Testing concurrent streaming with monitoring");
// Start monitoring in background
let monitor_handle = tokio::spawn(async move {
timeout(Duration::from_secs(3), monitor.start_monitoring(Duration::from_millis(500))).await
});
// Producer task - simulates camera feed
let producer_buffer = buffer.clone();
let producer = tokio::spawn(async move {
let mut frame_id = 0;
let start_time = Instant::now();
while start_time.elapsed() < Duration::from_secs(2) {
let frame = AstronomicalFrame {
frame_id,
timestamp_nanos: start_time.elapsed().as_nanos() as u64,
width: 1280,
height: 720,
data_ptr: 0x20000 + (frame_id * 2000) as usize,
data_size: 1280 * 720 * 3,
brightness_sum: 50.0 + (frame_id as f32 * 0.1),
detection_flags: if frame_id % 50 == 0 { 0b0001 } else { 0b0000 },
};
if producer_buffer.try_write(frame).is_ok() {
frame_id += 1;
}
// Simulate camera frame rate (30 FPS = ~33ms per frame)
sleep(Duration::from_millis(5)).await; // Faster for testing
}
frame_id
});
// Consumer task - simulates meteor detection processing
let consumer_buffer = buffer.clone();
let consumer = tokio::spawn(async move {
let mut processed_count = 0;
let mut meteor_count = 0;
let start_time = Instant::now();
while start_time.elapsed() < Duration::from_secs(2) {
match consumer_buffer.try_read() {
Ok(frame) => {
processed_count += 1;
if frame.detection_flags & 0b0001 != 0 {
meteor_count += 1;
println!(" 🌠 Meteor detected in frame {} (brightness: {:.1})",
frame.frame_id, frame.brightness_sum);
}
// Simulate processing time
if processed_count % 20 == 0 {
sleep(Duration::from_micros(500)).await;
}
}
Err(_) => {
// Buffer empty, wait a bit
sleep(Duration::from_micros(100)).await;
}
}
}
(processed_count, meteor_count)
});
// Wait for both tasks to complete
let (frames_produced_result, frames_consumed_result) = tokio::join!(producer, consumer);
let frames_produced = frames_produced_result?;
let (frames_consumed, meteors_detected) = frames_consumed_result?;
// Stop monitoring
drop(monitor_handle);
println!(" ✓ Concurrent streaming completed:");
println!(" Produced: {} frames", frames_produced);
println!(" Consumed: {} frames", frames_consumed);
println!(" Meteors detected: {}", meteors_detected);
// Get final statistics
let stats = buffer.stats();
println!(" 📊 Concurrent Statistics:");
println!(" Buffer efficiency: {:.1}%",
(stats.reads_total as f64 / stats.writes_total as f64) * 100.0);
println!(" Peak fill: {}%", stats.peak_fill_percentage);
println!(" Overwrites: {}, Underruns: {}", stats.overwrites, stats.underruns);
assert!(frames_produced > 0);
assert!(frames_consumed > 0);
Ok(())
}
/// Test memory mapping integration with ring buffers
async fn test_memory_mapping_integration() -> Result<()> {
// Create temporary file for astronomical data
use std::io::Write;
use std::fs::File;
let temp_path = std::env::temp_dir().join("test_astronomical_data.bin");
let mut temp_file = File::create(&temp_path)?;
// Write simulated astronomical data (FITS-like format)
let header = b"SIMPLE = T / file does conform to FITS standard";
let data_size = 1920 * 1080 * 4; // 4-byte pixels
temp_file.write_all(header)?;
temp_file.write_all(&vec![0u8; data_size])?;
temp_file.flush()?;
println!(" 🗺️ Testing memory mapping with {} MB astronomical file",
(header.len() + data_size) / 1024 / 1024);
// Create memory mapping pool
let mapping_pool = MappingPool::new(5);
let config = MappingConfig {
readable: true,
writable: false,
use_large_pages: true,
prefetch_on_map: true,
access_pattern: AccessPattern::Sequential,
lock_in_memory: false,
enable_stats: true,
};
let mapping = mapping_pool.get_mapping(&temp_path, config)?;
println!(" ✓ Created memory mapping ({} bytes)", mapping.size());
// Integrate with ring buffer for frame processing
let frame_buffer = create_meteor_frame_buffer(64)?;
// Simulate processing memory-mapped astronomical data
let chunk_size = 1920 * 720 * 3; // 720p frame size
let total_chunks = mapping.size() / chunk_size;
for chunk_id in 0..total_chunks.min(50) {
let offset = chunk_id * chunk_size;
let actual_size = chunk_size.min(mapping.size() - offset);
// Create frame reference to memory-mapped data
let frame = AstronomicalFrame {
frame_id: chunk_id as u64,
timestamp_nanos: (chunk_id as u64) * 33_333_333, // 30 FPS intervals
width: 1920,
height: 720,
data_ptr: offset, // Offset into memory-mapped file
data_size: actual_size,
brightness_sum: 75.0 + (chunk_id as f32 * 2.5),
detection_flags: if chunk_id % 15 == 0 { 0b0001 } else { 0b0000 },
};
frame_buffer.write(frame)?;
}
println!(" ✓ Queued {} memory-mapped frames for processing",
total_chunks.min(50));
// Process frames and access memory-mapped data
let mut processed_frames = 0;
let mut total_data_accessed = 0;
while !frame_buffer.is_empty() {
if let Ok(frame) = frame_buffer.read() {
// Simulate accessing the memory-mapped data
let mut buffer = vec![0u8; 4096.min(frame.data_size)];
let read_count = mapping.read_at(frame.data_ptr, &mut buffer)?;
total_data_accessed += read_count;
processed_frames += 1;
if frame.detection_flags & 0b0001 != 0 {
println!(" 🌠 Processing potential meteor in frame {} ({}MB)",
frame.frame_id, frame.data_size / 1024 / 1024);
}
}
}
println!(" ✓ Processed {} frames ({} MB of data accessed)",
processed_frames, total_data_accessed / 1024 / 1024);
// Check mapping and pool statistics
let mapping_stats = mapping.stats();
let pool_stats = mapping_pool.stats();
println!(" 📊 Memory Mapping Statistics:");
println!(" File: {}", mapping_stats.path.display());
println!(" Size: {} MB", mapping_stats.bytes_mapped / 1024 / 1024);
println!(" Accesses: {} reads", mapping_stats.read_accesses);
println!(" Pool cache hits: {}, misses: {}",
pool_stats.cache_hits, pool_stats.cache_misses);
assert!(mapping_stats.read_accesses > 0);
assert_eq!(processed_frames, total_chunks.min(50));
Ok(())
}
/// Performance benchmark for ring buffer throughput
pub async fn benchmark_ring_buffer_performance() -> Result<()> {
println!("\n🏁 Ring Buffer Performance Benchmark");
println!("====================================");
// Test different buffer sizes
let buffer_sizes = [64, 256, 1024, 4096];
for &size in &buffer_sizes {
let buffer = create_meteor_frame_buffer(size)?;
let start_time = Instant::now();
// Benchmark write performance
let write_start = Instant::now();
for frame_id in 0..size / 2 {
let frame = AstronomicalFrame {
frame_id: frame_id as u64,
timestamp_nanos: write_start.elapsed().as_nanos() as u64,
width: 1280,
height: 720,
data_ptr: 0x30000,
data_size: 1280 * 720 * 3,
brightness_sum: 60.0,
detection_flags: 0,
};
buffer.write(frame)?;
}
let write_duration = write_start.elapsed();
// Benchmark read performance
let read_start = Instant::now();
let mut frames_read = 0;
while !buffer.is_empty() {
if buffer.read().is_ok() {
frames_read += 1;
}
}
let read_duration = read_start.elapsed();
let total_duration = start_time.elapsed();
let frames_written = size / 2;
let write_throughput = frames_written as f64 / write_duration.as_secs_f64();
let read_throughput = frames_read as f64 / read_duration.as_secs_f64();
println!(" Buffer size {}: {} frames", size, frames_written);
println!(" Write: {:.0} frames/sec ({:.1} μs/frame)",
write_throughput, write_duration.as_micros() as f64 / frames_written as f64);
println!(" Read: {:.0} frames/sec ({:.1} μs/frame)",
read_throughput, read_duration.as_micros() as f64 / frames_read as f64);
println!(" Total: {:?}", total_duration);
// Validate performance targets
assert!(write_throughput > 1000.0, "Write throughput too low: {:.0} frames/sec", write_throughput);
assert!(read_throughput > 1000.0, "Read throughput too low: {:.0} frames/sec", read_throughput);
}
println!(" ✅ Performance benchmarks passed");
Ok(())
}
/// Integration test with existing frame pool system
pub async fn test_integration_with_frame_pools() -> Result<()> {
println!("\n🔗 Integration Test: Ring Buffers + Frame Pools");
println!("===============================================");
// Create frame pool and ring buffer
let frame_pool = Arc::new(HierarchicalFramePool::new(20));
let ring_buffer = Arc::new(create_meteor_frame_buffer(128)?);
println!(" 🔄 Testing integration with hierarchical frame pools");
// Producer: Get frames from pool and queue in ring buffer
let producer_pool = frame_pool.clone();
let producer_buffer = ring_buffer.clone();
let producer = tokio::spawn(async move {
let mut queued_frames = 0;
for frame_id in 0..100 {
// Get buffer from pool
let frame_data = producer_pool.acquire(1280 * 720 * 3);
// Create astronomical frame
let frame = AstronomicalFrame {
frame_id: frame_id as u64,
timestamp_nanos: (frame_id as u64) * 16_666_666, // 60 FPS
width: 1280,
height: 720,
data_ptr: frame_data.as_ref().as_ptr() as usize,
data_size: frame_data.as_ref().len(),
brightness_sum: 80.0 + (frame_id as f32 * 0.2),
detection_flags: if frame_id % 30 == 0 { 0b0001 } else { 0b0000 },
};
if producer_buffer.try_write(frame).is_ok() {
queued_frames += 1;
}
sleep(Duration::from_micros(100)).await; // Simulate frame rate
}
queued_frames
});
// Consumer: Process frames from ring buffer
let consumer_buffer = ring_buffer.clone();
let consumer = tokio::spawn(async move {
let mut processed_frames = 0;
let mut meteor_detections = 0;
sleep(Duration::from_millis(10)).await; // Let producer get ahead
while processed_frames < 100 {
match consumer_buffer.try_read() {
Ok(frame) => {
processed_frames += 1;
if frame.detection_flags & 0b0001 != 0 {
meteor_detections += 1;
}
// Simulate processing
if processed_frames % 10 == 0 {
sleep(Duration::from_micros(200)).await;
}
}
Err(_) => {
sleep(Duration::from_micros(50)).await;
}
}
}
(processed_frames, meteor_detections)
});
// Wait for completion
let (frames_queued_result, frames_processed_result) = tokio::join!(producer, consumer);
let frames_queued = frames_queued_result?;
let (frames_processed, meteors) = frames_processed_result?;
// Check frame pool statistics
let pool_stats = frame_pool.all_stats();
let ring_stats = ring_buffer.stats();
println!(" ✓ Integration test completed:");
println!(" Frames queued: {}", frames_queued);
println!(" Frames processed: {}", frames_processed);
println!(" Meteors detected: {}", meteors);
println!(" Pool allocations: {}", pool_stats.iter().map(|(_, s)| s.total_allocations).sum::<u64>());
println!(" Ring buffer efficiency: {:.1}%",
(ring_stats.reads_total as f64 / ring_stats.writes_total as f64) * 100.0);
assert!(frames_queued > 0);
assert!(frames_processed > 0);
assert_eq!(frames_processed, 100);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_ring_buffer_integration() {
test_ring_buffer_system().await.unwrap();
}
#[tokio::test]
async fn test_performance_benchmark() {
benchmark_ring_buffer_performance().await.unwrap();
}
#[tokio::test]
async fn test_frame_pool_integration() {
test_integration_with_frame_pools().await.unwrap();
}
}