2025-11-03 00:11:13 +08:00

442 lines
15 KiB
Rust

use anyhow::{Context, Result};
use reqwest::multipart;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use tokio::fs as async_fs;
use tokio::time::{sleep, Duration};
use crate::network::api::ApiClient;
use crate::core::config::ConfigManager;
use crate::core::events::{EventBus, EventPackageArchivedEvent, SystemEvent};
use serde_json::json;
/// Configuration for the communication controller
#[derive(Debug, Clone)]
pub struct CommunicationConfig {
pub api_base_url: String,
pub retry_attempts: u32,
pub retry_delay_seconds: u64,
pub max_retry_delay_seconds: u64,
pub request_timeout_seconds: u64,
pub heartbeat_interval_seconds: u64,
}
impl Default for CommunicationConfig {
fn default() -> Self {
Self {
api_base_url: "http://localhost:3000".to_string(),
retry_attempts: 3,
retry_delay_seconds: 2,
max_retry_delay_seconds: 60,
request_timeout_seconds: 300, // 5 minutes for large file uploads
heartbeat_interval_seconds: 300, // 5 minutes for heartbeat
}
}
}
/// Communication controller that handles event package uploads
pub struct CommunicationController {
config: CommunicationConfig,
event_bus: EventBus,
api_client: ApiClient,
}
impl CommunicationController {
/// Create a new CommunicationController
pub fn new(config: CommunicationConfig, event_bus: EventBus) -> Result<Self> {
let api_client = ApiClient::new(config.api_base_url.clone());
Ok(Self {
config,
event_bus,
api_client,
})
}
/// Main run loop for the communication controller
pub async fn run(&mut self) -> Result<()> {
println!("📡 Starting communication controller...");
println!(" API Base URL: {}", self.config.api_base_url);
println!(" Retry attempts: {}", self.config.retry_attempts);
println!(
" Request timeout: {}s",
self.config.request_timeout_seconds
);
let mut event_receiver = self.event_bus.subscribe();
loop {
match event_receiver.recv().await {
Ok(event) => {
if let SystemEvent::EventPackageArchived(archive_event) = event.as_ref() {
println!(
"📦 Received EventPackageArchivedEvent: {}",
archive_event.event_id
);
if let Err(e) = self.process_archived_event(archive_event.clone()).await {
eprintln!("❌ Failed to process archived event: {}", e);
}
}
}
Err(e) => {
eprintln!("❌ Error receiving event: {}", e);
// Sleep briefly before continuing to avoid busy loop
sleep(Duration::from_millis(100)).await;
}
}
}
}
/// Process an EventPackageArchivedEvent by packaging and uploading
async fn process_archived_event(&self, event: EventPackageArchivedEvent) -> Result<()> {
println!("🗂️ Processing archived event: {}", event.event_id);
println!(" Event directory: {:?}", event.event_directory_path);
println!(" Total frames: {}", event.total_frames);
println!(" Archive size: {} bytes", event.archive_size_bytes);
// Step 1: Verify the event directory exists
if !event.event_directory_path.exists() {
anyhow::bail!(
"Event directory does not exist: {:?}",
event.event_directory_path
);
}
// Step 2: Create compressed archive
let archive_path = self
.create_compressed_archive(&event.event_directory_path, &event.event_id)
.await?;
println!("✅ Created compressed archive: {:?}", archive_path);
// Step 3: Build upload context (auth + payload)
let (jwt_token, event_data_json) = self.prepare_upload_context(&event).await?;
// Step 4: Upload with retry logic
let upload_result = self
.upload_with_retry(&archive_path, &event, &jwt_token, &event_data_json)
.await;
// Step 5: Cleanup regardless of upload result
match upload_result {
Ok(_) => {
println!("✅ Upload successful, cleaning up local files...");
// Delete the compressed archive
if let Err(e) = async_fs::remove_file(&archive_path).await {
eprintln!(
"⚠️ Failed to delete archive file {:?}: {}",
archive_path, e
);
}
// Delete the original event directory
if let Err(e) = async_fs::remove_dir_all(&event.event_directory_path).await {
eprintln!(
"⚠️ Failed to delete event directory {:?}: {}",
event.event_directory_path, e
);
} else {
println!(
"🗑️ Cleaned up event directory: {:?}",
event.event_directory_path
);
}
}
Err(e) => {
eprintln!("❌ Upload failed after retries: {}", e);
// Still clean up the temporary archive file
if let Err(cleanup_err) = async_fs::remove_file(&archive_path).await {
eprintln!(
"⚠️ Failed to delete archive file {:?}: {}",
archive_path, cleanup_err
);
}
return Err(e);
}
}
Ok(())
}
/// Create a compressed tar.gz archive of the event directory
async fn create_compressed_archive(&self, event_dir: &Path, event_id: &str) -> Result<PathBuf> {
let parent_dir = event_dir
.parent()
.context("Event directory must have a parent directory")?;
let dir_name = event_dir
.file_name()
.context("Event directory must have a name")?
.to_string_lossy();
let archive_filename = format!("{}.tar.gz", event_id);
let archive_path = parent_dir.join(&archive_filename);
println!("📦 Creating tar.gz archive: {:?}", archive_path);
println!(" Compressing directory: {:?}", event_dir);
// Use tar command to create compressed archive
let output = Command::new("tar")
.arg("-czf")
.arg(&archive_path)
.arg("-C")
.arg(parent_dir)
.arg(&*dir_name)
.output()
.context("Failed to execute tar command")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("tar command failed: {}", stderr);
}
// Verify archive was created
if !archive_path.exists() {
anyhow::bail!("Archive file was not created: {:?}", archive_path);
}
let metadata =
fs::metadata(&archive_path).context("Failed to get archive file metadata")?;
println!("✅ Archive created successfully: {} bytes", metadata.len());
Ok(archive_path)
}
/// Upload file with exponential backoff retry logic
async fn upload_with_retry(
&self,
archive_path: &Path,
event: &EventPackageArchivedEvent,
jwt_token: &str,
event_data_json: &str,
) -> Result<()> {
let mut delay = Duration::from_secs(self.config.retry_delay_seconds);
let max_delay = Duration::from_secs(self.config.max_retry_delay_seconds);
for attempt in 1..=self.config.retry_attempts {
println!(
"📤 Upload attempt {}/{}",
attempt, self.config.retry_attempts
);
match self
.upload_archive(archive_path, event, jwt_token, event_data_json)
.await
{
Ok(_) => {
println!("✅ Upload successful on attempt {}", attempt);
return Ok(());
}
Err(e) => {
eprintln!("❌ Upload attempt {} failed: {}", attempt, e);
if attempt < self.config.retry_attempts {
println!("⏳ Waiting {}s before retry...", delay.as_secs());
sleep(delay).await;
// Exponential backoff: double the delay, but cap at max_delay
delay = std::cmp::min(delay * 2, max_delay);
} else {
return Err(anyhow::anyhow!(
"Upload failed after {} attempts: {}",
self.config.retry_attempts,
e
));
}
}
}
}
unreachable!()
}
/// Upload archive file to the backend API
async fn upload_archive(
&self,
archive_path: &Path,
event: &EventPackageArchivedEvent,
jwt_token: &str,
event_data_json: &str,
) -> Result<()> {
let url = format!("{}/api/v1/events/upload", self.config.api_base_url);
println!("🌐 Uploading to: {}", url);
println!(" File: {:?}", archive_path);
// Read the archive file
let file_content = async_fs::read(archive_path)
.await
.context("Failed to read archive file")?;
let file_name = archive_path
.file_name()
.context("Archive file must have a name")?
.to_string_lossy()
.to_string();
// Create multipart form data
let form = multipart::Form::new()
.part(
"file",
multipart::Part::bytes(file_content)
.file_name(file_name)
.mime_str("application/gzip")?,
)
.text("event_id", event.event_id.clone())
.text("trigger_frame_id", event.trigger_frame_id.to_string())
.text("total_frames", event.total_frames.to_string())
.text("archive_size_bytes", event.archive_size_bytes.to_string())
.text("archived_timestamp", event.archived_timestamp.to_rfc3339())
.text("eventData", event_data_json.to_string());
// Create HTTP client with timeout
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(self.config.request_timeout_seconds))
.build()
.context("Failed to create HTTP client")?;
// Send the request
let response = client
.post(&url)
.bearer_auth(jwt_token)
.multipart(form)
.send()
.await
.context("Failed to send upload request")?;
let status = response.status();
println!("📡 Upload response status: {}", status);
if status.is_success() {
println!("✅ File uploaded successfully!");
Ok(())
} else {
let response_text = response
.text()
.await
.unwrap_or_else(|_| "Unable to read response body".to_string());
anyhow::bail!("Upload failed with status {}: {}", status, response_text);
}
}
}
impl CommunicationController {
async fn prepare_upload_context(
&self,
event: &EventPackageArchivedEvent,
) -> Result<(String, String)> {
let config_manager = ConfigManager::new();
let device_config = config_manager
.load_config()
.context("Failed to load device configuration for upload")?;
if !device_config.registered {
anyhow::bail!("Device is not registered; cannot upload events");
}
let jwt_token = device_config
.jwt_token
.clone()
.context("Device configuration missing JWT token")?;
let metadata_path = event.event_directory_path.join("metadata.json");
let metadata_json = match async_fs::read_to_string(&metadata_path).await {
Ok(content) => serde_json::from_str::<serde_json::Value>(&content).ok(),
Err(_) => None,
};
let event_timestamp = metadata_json
.as_ref()
.and_then(|value| value.get("detection_timestamp"))
.and_then(|value| value.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| event.archived_timestamp.to_rfc3339());
let mut metadata_payload = json!({
"deviceId": device_config.device_id,
"eventId": event.event_id,
"triggerFrameId": event.trigger_frame_id,
"framesCaptured": event.total_frames,
"archiveSizeBytes": event.archive_size_bytes,
});
if let Some(value) = metadata_json {
if let Some(obj) = metadata_payload.as_object_mut() {
obj.insert("eventMetadata".to_string(), value);
}
}
let event_data = json!({
"eventType": "meteor",
"eventTimestamp": event_timestamp,
"metadata": metadata_payload,
});
let event_data_json =
serde_json::to_string(&event_data).context("Failed to serialize event data payload")?;
Ok((jwt_token, event_data_json))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
#[tokio::test]
async fn test_create_compressed_archive() {
// Create temporary directory structure
let temp_dir = TempDir::new().unwrap();
let event_dir = temp_dir.path().join("test_event_123");
fs::create_dir_all(&event_dir).unwrap();
// Create some test files
fs::write(event_dir.join("video.mp4"), b"fake video data").unwrap();
fs::write(event_dir.join("metadata.json"), b"{\"test\": \"data\"}").unwrap();
let detection_dir = event_dir.join("detection_data");
fs::create_dir_all(&detection_dir).unwrap();
fs::write(detection_dir.join("frame_001.jpg"), b"fake image data").unwrap();
// Create communication controller
let config = CommunicationConfig::default();
let event_bus = EventBus::new(100);
let comm_controller = CommunicationController::new(config, event_bus).unwrap();
// Test archive creation
let archive_path = comm_controller
.create_compressed_archive(&event_dir, "test_event_123")
.await
.unwrap();
// Verify archive exists and has content
assert!(archive_path.exists());
assert!(archive_path.to_string_lossy().ends_with(".tar.gz"));
let metadata = fs::metadata(&archive_path).unwrap();
assert!(metadata.len() > 0);
// Clean up
let _ = fs::remove_file(&archive_path);
}
#[test]
fn test_communication_config_default() {
let config = CommunicationConfig::default();
assert_eq!(config.api_base_url, "http://localhost:3000");
assert_eq!(config.retry_attempts, 3);
assert_eq!(config.retry_delay_seconds, 2);
assert_eq!(config.max_retry_delay_seconds, 60);
assert_eq!(config.request_timeout_seconds, 300);
assert_eq!(config.heartbeat_interval_seconds, 300);
}
}