use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use reqwest::{multipart, Client}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::time::{Duration, Instant}; use tokio::{fs, time}; use crate::config::Config; use crate::logging::{LogFileManager, StructuredLogger, generate_correlation_id}; /// Configuration for log upload functionality #[derive(Debug, Clone)] pub struct LogUploadConfig { pub backend_url: String, pub device_id: String, pub upload_interval_hours: u64, pub max_retry_attempts: u32, pub retry_delay_seconds: u64, pub max_upload_size_mb: u64, pub auth_token: Option, } impl Default for LogUploadConfig { fn default() -> Self { Self { backend_url: "http://localhost:3000".to_string(), device_id: "unknown".to_string(), upload_interval_hours: 1, max_retry_attempts: 3, retry_delay_seconds: 300, // 5 minutes max_upload_size_mb: 50, auth_token: None, } } } /// Response from the log upload endpoint #[derive(Debug, Serialize, Deserialize)] pub struct LogUploadResponse { pub success: bool, #[serde(rename = "uploadId")] pub upload_id: String, #[serde(rename = "processedEntries")] pub processed_entries: u32, pub message: String, } /// Log uploader service for batch uploading log files pub struct LogUploader { config: LogUploadConfig, logger: StructuredLogger, http_client: Client, log_file_manager: LogFileManager, } impl LogUploader { pub fn new( config: LogUploadConfig, logger: StructuredLogger, log_directory: PathBuf, ) -> Self { let http_client = Client::builder() .timeout(Duration::from_secs(300)) // 5 minute timeout .build() .expect("Failed to create HTTP client"); let log_file_manager = LogFileManager::new(log_directory); Self { config, logger, http_client, log_file_manager, } } /// Start the log upload background task pub async fn start_upload_task(self) -> Result<()> { let correlation_id = generate_correlation_id(); self.logger.startup_event( "log_uploader", "1.0.0", Some(&correlation_id) ); self.logger.info( &format!( "Starting log upload task with interval: {} hours", self.config.upload_interval_hours ), Some(&correlation_id) ); let mut interval = time::interval(Duration::from_secs( self.config.upload_interval_hours * 3600 )); loop { interval.tick().await; let upload_correlation_id = generate_correlation_id(); self.logger.info( "Starting scheduled log upload", Some(&upload_correlation_id) ); match self.upload_logs(&upload_correlation_id).await { Ok(uploaded_count) => { self.logger.info( &format!("Log upload completed successfully: {} files uploaded", uploaded_count), Some(&upload_correlation_id) ); } Err(e) => { self.logger.error( "Log upload failed", Some(&*e), Some(&upload_correlation_id) ); } } // Clean up old logs to prevent disk space issues if let Err(e) = self.cleanup_old_logs(&upload_correlation_id).await { self.logger.warn( &format!("Failed to cleanup old logs: {}", e), Some(&upload_correlation_id) ); } } } /// Upload all eligible log files async fn upload_logs(&self, correlation_id: &str) -> Result { let uploadable_files = self.log_file_manager.get_uploadable_log_files().await .context("Failed to get uploadable log files")?; if uploadable_files.is_empty() { self.logger.debug("No log files ready for upload", Some(correlation_id)); return Ok(0); } self.logger.info( &format!("Found {} log files ready for upload", uploadable_files.len()), Some(correlation_id) ); let mut uploaded_count = 0; for file_path in uploadable_files { match self.upload_single_file(&file_path, correlation_id).await { Ok(_) => { uploaded_count += 1; // Remove the original file after successful upload if let Err(e) = self.log_file_manager.remove_log_file(&file_path).await { self.logger.warn( &format!("Failed to remove uploaded log file {}: {}", file_path.display(), e), Some(correlation_id) ); } else { self.logger.debug( &format!("Removed uploaded log file: {}", file_path.display()), Some(correlation_id) ); } } Err(e) => { self.logger.error( &format!("Failed to upload log file {}: {}", file_path.display(), e), Some(&*e), Some(correlation_id) ); // Continue with other files even if one fails } } } Ok(uploaded_count) } /// Upload a single log file with retry logic async fn upload_single_file(&self, file_path: &PathBuf, correlation_id: &str) -> Result { let mut last_error = None; for attempt in 1..=self.config.max_retry_attempts { self.logger.debug( &format!("Uploading log file {} (attempt {}/{})", file_path.display(), attempt, self.config.max_retry_attempts), Some(correlation_id) ); match self.perform_upload(file_path, correlation_id).await { Ok(response) => { self.logger.info( &format!( "Successfully uploaded log file: {} (upload_id: {}, processed_entries: {})", file_path.display(), response.upload_id, response.processed_entries ), Some(correlation_id) ); return Ok(response); } Err(e) => { last_error = Some(e); if attempt < self.config.max_retry_attempts { self.logger.warn( &format!( "Upload attempt {} failed for {}, retrying in {} seconds", attempt, file_path.display(), self.config.retry_delay_seconds ), Some(correlation_id) ); time::sleep(Duration::from_secs(self.config.retry_delay_seconds)).await; } } } } Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Upload failed after all retry attempts"))) } /// Perform the actual HTTP upload async fn perform_upload(&self, file_path: &PathBuf, correlation_id: &str) -> Result { let start_time = Instant::now(); // Check file size let metadata = std::fs::metadata(file_path) .context("Failed to get file metadata")?; let file_size_mb = metadata.len() / (1024 * 1024); if file_size_mb > self.config.max_upload_size_mb { return Err(anyhow::anyhow!( "File too large: {}MB > {}MB limit", file_size_mb, self.config.max_upload_size_mb )); } // Compress the log file let compressed_path = self.log_file_manager.compress_log_file(file_path).await .context("Failed to compress log file")?; // Ensure compressed file is cleaned up let _cleanup_guard = FileCleanupGuard::new(compressed_path.clone()); // Read compressed file let file_content = fs::read(&compressed_path).await .context("Failed to read compressed log file")?; // Create multipart form let filename = compressed_path.file_name() .and_then(|n| n.to_str()) .unwrap_or("log.gz") .to_string(); let part = multipart::Part::bytes(file_content) .file_name(filename) .mime_str("application/gzip")?; let form = multipart::Form::new() .part("logFile", part) .text("deviceId", self.config.device_id.clone()) .text("source", "edge_client") .text("description", format!("Automated upload from {}", file_path.display())); // Prepare request let url = format!("{}/api/v1/logs/upload", self.config.backend_url); let mut request_builder = self.http_client .post(&url) .header("x-correlation-id", correlation_id) .multipart(form); // Add authentication if available if let Some(ref token) = self.config.auth_token { request_builder = request_builder.bearer_auth(token); } // Send request let response = request_builder.send().await .context("Failed to send upload request")?; let status = response.status(); let duration = start_time.elapsed(); self.logger.communication_event( "log_upload", &url, Some(status.as_u16()), Some(correlation_id) ); self.logger.performance_event( "log_upload", duration.as_millis() as u64, Some(correlation_id) ); if !status.is_success() { let error_text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string()); return Err(anyhow::anyhow!( "Upload failed with status {}: {}", status, error_text )); } // Parse response let upload_response: LogUploadResponse = response.json().await .context("Failed to parse upload response")?; Ok(upload_response) } /// Clean up old log files to prevent disk space issues async fn cleanup_old_logs(&self, correlation_id: &str) -> Result<()> { let max_total_size = 500 * 1024 * 1024; // 500MB max total log storage let total_size_before = self.log_file_manager.get_total_log_size().await?; if total_size_before > max_total_size { self.logger.info( &format!( "Log directory size ({} bytes) exceeds limit ({} bytes), cleaning up old logs", total_size_before, max_total_size ), Some(correlation_id) ); self.log_file_manager.cleanup_old_logs(max_total_size).await?; let total_size_after = self.log_file_manager.get_total_log_size().await?; self.logger.info( &format!( "Log cleanup completed: {} bytes -> {} bytes", total_size_before, total_size_after ), Some(correlation_id) ); } Ok(()) } /// Update authentication token pub fn update_auth_token(&mut self, token: Option) { self.config.auth_token = token; } } /// RAII guard to ensure file cleanup struct FileCleanupGuard { file_path: PathBuf, } impl FileCleanupGuard { fn new(file_path: PathBuf) -> Self { Self { file_path } } } impl Drop for FileCleanupGuard { fn drop(&mut self) { if self.file_path.exists() { if let Err(e) = std::fs::remove_file(&self.file_path) { eprintln!("Failed to cleanup temporary file {}: {}", self.file_path.display(), e); } } } } /// Create log uploader from configuration pub fn create_log_uploader( config: &Config, logger: StructuredLogger, log_directory: PathBuf, ) -> LogUploader { let upload_config = LogUploadConfig { backend_url: config.backend_url.clone(), device_id: config.device_id.clone(), upload_interval_hours: config.log_upload_interval_hours.unwrap_or(1), max_retry_attempts: 3, retry_delay_seconds: 300, max_upload_size_mb: 50, auth_token: config.auth_token.clone(), }; LogUploader::new(upload_config, logger, log_directory) }