meteor_detect/ARCHITECTURE.md

33 KiB
Raw Blame History

流星监控系统 - 技术架构文档

1. 架构概述

1.1 系统定位

流星监控系统是一个基于Rust构建的高性能、实时天文观测平台专为流星检测和天文数据采集而设计。系统采用现代异步编程模型支持跨平台部署具备专业级的精度和可靠性。

1.2 设计原则

  • 异步优先: 基于Tokio的全异步架构最大化并发性能
  • 模块化设计: 松耦合的模块化组件,便于扩展和维护
  • 跨平台兼容: 支持Linux/macOS/Windows针对ARM64优化
  • 零拷贝优化: 最小化内存分配和数据复制
  • 故障隔离: 组件级错误处理,防止故障传播
  • 配置驱动: 通过配置文件控制系统行为,无需修改代码

1.3 核心技术栈

┌─────────────────────────────────────────────────────────────┐
│                    应用层 (Application Layer)                │
├─────────────────────────────────────────────────────────────┤
│  Rust (1.70+) │ Tokio (异步运行时) │ anyhow (错误处理)       │
│  serde (序列化) │ toml (配置) │ log (日志) │ uuid (ID生成)    │
├─────────────────────────────────────────────────────────────┤
│                   中间件层 (Middleware Layer)                │
├─────────────────────────────────────────────────────────────┤
│ OpenCV (计算机视觉) │ FFmpeg (视频编码) │ GStreamer (流媒体) │
│ SQLite (数据库) │ rumqttc (MQTT) │ actix-web (HTTP服务)    │
├─────────────────────────────────────────────────────────────┤
│                    硬件层 (Hardware Layer)                   │
├─────────────────────────────────────────────────────────────┤
│ V4L2 (Linux摄像头) │ rppal (树莓派GPIO) │ serialport (串口)  │
│ astrometry.net (星图解算) │ embedded-hal (硬件抽象)        │
└─────────────────────────────────────────────────────────────┘

2. 系统架构

2.1 整体架构图

                    ┌─────────────────────────────────────────┐
                    │              Application                │
                    │        (主应用协调器)                    │
                    └─────────────────┬───────────────────────┘
                                      │
                    ┌─────────────────┴───────────────────────┐
                    │              EventBus                  │
                    │         (事件总线系统)                   │
                    └─────────────────┬───────────────────────┘
                                      │
    ┌─────────────┬─────────────┬─────┴─────┬─────────────┬─────────────┐
    │             │             │           │             │             │
    ▼             ▼             ▼           ▼             ▼             ▼
┌─────────┐ ┌─────────┐ ┌─────────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Camera  │ │   GPS   │ │  Detection  │ │ Storage │ │  Comm   │ │  RTSP   │
│ System  │ │ System  │ │   Engine    │ │ Manager │ │ Manager │ │ Server  │
└─────────┘ └─────────┘ └─────────────┘ └─────────┘ └─────────┘ └─────────┘
    │             │             │           │             │             │
    ▼             ▼             ▼           ▼             ▼             ▼
┌─────────┐ ┌─────────┐ ┌─────────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│  Frame  │ │  NMEA   │ │ Multi-Algo  │ │  SQLite │ │  MQTT   │ │GStreamer│
│ Buffer  │ │ Parser  │ │  Pipeline   │ │Database │ │ Client  │ │Pipeline │
└─────────┘ └─────────┘ └─────────────┘ └─────────┘ └─────────┘ └─────────┘
    │             │             │           │             │             │
    ▼             ▼             ▼           ▼             ▼             ▼
┌─────────┐ ┌─────────┐ ┌─────────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ OpenCV  │ │ Serial  │ │   Overlay   │ │  File   │ │ HTTP    │ │ Network │
│ Camera  │ │  Port   │ │   System    │ │ System  │ │   API   │ │ Stream  │
└─────────┘ └─────────┘ └─────────────┘ └─────────┘ └─────────┘ └─────────┘

2.2 数据流架构

┌───────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Camera   │───▶│ Frame Buffer│───▶│  Detection  │───▶│   Storage   │
│  Hardware │    │  (Ring)     │    │   Pipeline  │    │   Manager   │
└───────────┘    └─────────────┘    └─────────────┘    └─────────────┘
                         │                   │                   │
                         ▼                   ▼                   ▼
                 ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
                 │  RTSP       │    │  Event      │    │  Cloud      │
                 │  Streaming  │    │  Broadcast  │    │  Upload     │
                 └─────────────┘    └─────────────┘    └─────────────┘
                                             │
                                             ▼
                                    ┌─────────────┐
                                    │  External   │
                                    │  Clients    │
                                    └─────────────┘

3. 核心组件详述

3.1 应用协调器 (Application)

3.1.1 架构设计

pub struct Application {
    config: Arc<Config>,
    event_bus: Arc<EventBus>,
    camera_controller: Arc<Mutex<CameraController>>,
    detection_engine: Arc<DetectionEngine>,
    // ... 其他组件
}

impl Application {
    pub async fn initialize(&mut self) -> Result<()> {
        // 1. 初始化事件总线
        // 2. 初始化各个组件
        // 3. 建立组件间依赖关系
        // 4. 启动监控系统
    }
    
    pub async fn start(&mut self) -> Result<()> {
        // 1. 启动所有组件
        // 2. 建立数据流管道
        // 3. 启动事件循环
    }
    
    pub async fn run(&mut self) -> Result<()> {
        // 主事件循环,处理系统信号和组件通信
    }
}

3.1.2 生命周期管理

  • 初始化阶段: 加载配置、创建组件实例、验证硬件
  • 启动阶段: 建立通信通道、启动后台任务、开始数据处理
  • 运行阶段: 监控系统状态、处理事件、维护组件健康
  • 关闭阶段: 优雅停止所有组件、保存状态、清理资源

3.2 事件总线系统 (EventBus)

3.2.1 事件类型定义

#[derive(Debug, Clone)]
pub enum Event {
    // 帧事件
    FrameCapture { timestamp: DateTime<Utc>, frame: Arc<Frame> },
    
    // 检测事件
    DetectionResult { 
        timestamp: DateTime<Utc>, 
        result: DetectionResult,
        metadata: EventMetadata 
    },
    
    // GPS事件
    GpsUpdate { 
        timestamp: DateTime<Utc>, 
        position: Position, 
        time_sync: bool 
    },
    
    // 系统事件
    SystemStatus { 
        timestamp: DateTime<Utc>, 
        status: SystemHealth 
    },
    
    // 传感器事件
    SensorData { 
        timestamp: DateTime<Utc>, 
        sensor_type: SensorType, 
        data: SensorReading 
    },
}

3.2.2 事件分发机制

pub struct EventBus {
    channels: HashMap<EventType, broadcast::Sender<Event>>,
    subscribers: HashMap<ComponentId, Vec<EventType>>,
}

impl EventBus {
    pub async fn publish(&self, event: Event) -> Result<()> {
        // 根据事件类型分发到对应的channel
        // 支持事件过滤和路由
        // 处理背压和错误恢复
    }
    
    pub fn subscribe(&mut self, component_id: ComponentId, event_types: Vec<EventType>) -> Receiver<Event> {
        // 为组件创建订阅通道
        // 支持动态订阅和取消订阅
    }
}

3.3 摄像头系统 (Camera System)

3.3.1 摄像头控制器架构

pub struct CameraController {
    camera: Box<dyn Camera + Send + Sync>,
    frame_buffer: Arc<Mutex<FrameBuffer>>,
    settings: CameraSettings,
    broadcast_tx: broadcast::Sender<Frame>,
}

pub trait Camera: Send + Sync {
    async fn initialize(&mut self) -> Result<()>;
    async fn start_capture(&mut self) -> Result<()>;
    async fn capture_frame(&mut self) -> Result<Frame>;
    async fn stop_capture(&mut self) -> Result<()>;
    fn get_capabilities(&self) -> CameraCapabilities;
    fn set_parameters(&mut self, params: CameraParameters) -> Result<()>;
}

3.3.2 帧缓冲区设计

pub struct FrameBuffer {
    buffer: VecDeque<Arc<Frame>>,
    capacity: usize,
    total_size: AtomicUsize,
    max_size: usize,
}

impl FrameBuffer {
    pub fn push(&mut self, frame: Arc<Frame>) {
        // 环形缓冲区逻辑
        // 自动清理旧帧
        // 内存使用监控
    }
    
    pub fn get_recent_frames(&self, duration: Duration) -> Vec<Arc<Frame>> {
        // 根据时间范围获取帧
        // 支持高效的时间索引
    }
}

3.3.3 跨平台摄像头支持

// Linux V4L2 支持
#[cfg(target_os = "linux")]
pub struct V4L2Camera {
    device_path: String,
    capture: opencv::videoio::VideoCapture,
}

// macOS AVFoundation 支持
#[cfg(target_os = "macos")]
pub struct AVFoundationCamera {
    device_index: i32,
    capture: opencv::videoio::VideoCapture,
}

// 文件输入支持 (跨平台)
pub struct FileInputCamera {
    file_path: PathBuf,
    capture: opencv::videoio::VideoCapture,
    loop_video: bool,
}

3.4 检测引擎 (Detection Engine)

3.4.1 多检测器架构

pub struct DetectionEngine {
    detectors: Vec<Box<dyn Detector + Send + Sync>>,
    aggregator: Box<dyn ResultAggregator + Send + Sync>,
    config: DetectorConfig,
}

pub trait Detector: Send + Sync {
    fn name(&self) -> &str;
    async fn detect(&mut self, frame: &Frame, context: &DetectionContext) -> Result<DetectionResult>;
    fn configure(&mut self, config: &DetectorConfig) -> Result<()>;
    fn get_metrics(&self) -> DetectorMetrics;
}

3.4.2 检测器实现

// 亮度变化检测器
pub struct BrightnessDetector {
    background_model: BackgroundSubtractor,
    threshold: f32,
    min_area: u32,
}

// CAMS兼容检测器
pub struct CamsDetector {
    frame_stack: FrameStack,
    detection_params: CamsParameters,
}

// 帧叠加检测器
pub struct FrameStackerDetector {
    stack_size: usize,
    noise_reduction: NoiseReductionFilter,
}

3.4.3 结果聚合策略

pub enum AggregationStrategy {
    Any,      // 任一检测器检测到即触发
    All,      // 所有检测器都检测到才触发
    Majority, // 大多数检测器检测到才触发
    Threshold(f32), // 检测置信度阈值
    Custom(Box<dyn Fn(&[DetectionResult]) -> bool>), // 自定义策略
}

pub struct ResultAggregator {
    strategy: AggregationStrategy,
    history: VecDeque<Vec<DetectionResult>>,
}

3.5 GPS系统 (GPS System)

3.5.1 GPS控制器设计

pub struct GpsController {
    port: Box<dyn SerialPort>,
    parser: NmeaParser,
    pps_handler: Option<PpsHandler>,
    position_cache: Arc<Mutex<Option<Position>>>,
    time_sync_status: Arc<AtomicBool>,
}

impl GpsController {
    pub async fn start(&mut self) -> Result<()> {
        // 1. 打开串口连接
        // 2. 启动NMEA数据读取循环
        // 3. 启动PPS信号处理
        // 4. 启动位置数据发布
    }
    
    async fn read_nmea_loop(&mut self) -> Result<()> {
        // 持续读取NMEA数据
        // 解析GPS信息
        // 发布位置更新事件
    }
}

3.4.2 NMEA解析器

pub struct NmeaParser {
    sentence_parsers: HashMap<String, Box<dyn SentenceParser>>,
}

pub trait SentenceParser {
    fn parse(&self, sentence: &str) -> Result<ParsedData>;
}

// 支持的NMEA语句类型
impl NmeaParser {
    pub fn new() -> Self {
        let mut parsers = HashMap::new();
        parsers.insert("GPGGA".to_string(), Box::new(GgaParser::new()));
        parsers.insert("GPRMC".to_string(), Box::new(RmcParser::new()));
        parsers.insert("GPGSA".to_string(), Box::new(GsaParser::new()));
        // ... 其他解析器
    }
}

3.6 叠加系统 (Overlay System)

3.6.1 水印叠加架构

pub struct WatermarkOverlay {
    elements: Vec<Box<dyn OverlayElement>>,
    position: Position,
    style: TextStyle,
}

pub trait OverlayElement: Send + Sync {
    fn render(&self, context: &RenderContext) -> Result<String>;
    fn get_dependencies(&self) -> Vec<DataSource>;
}

// 水印元素实现
pub struct TimestampElement {
    format: String,
    timezone: chrono_tz::Tz,
}

pub struct GpsCoordinateElement {
    format: CoordinateFormat,
    precision: u8,
}

pub struct EnvironmentDataElement {
    sensors: Vec<SensorType>,
    format: String,
}

3.6.2 星图叠加系统

pub struct StarChartOverlay {
    solver: AstrometrySolver,
    catalog: StarCatalog,
    solution_cache: Arc<Mutex<Option<PlateSolution>>>,
    update_interval: Duration,
}

impl StarChartOverlay {
    pub async fn update_solution(&mut self, frame: &Frame) -> Result<()> {
        // 1. 提取星点
        // 2. 调用astrometry.net求解
        // 3. 缓存解算结果
        // 4. 生成星图叠加
    }
    
    pub fn render_stars(&self, solution: &PlateSolution) -> Result<Vec<StarPosition>> {
        // 根据解算结果渲染星图
        // 包括星点、星座线、天区网格
    }
}

3.7 存储管理器 (Storage Manager)

3.7.1 存储策略

pub struct StorageManager {
    event_storage: EventStorage,
    raw_video_storage: RawVideoStorage,
    metadata_db: MetadataDatabase,
    cleanup_scheduler: CleanupScheduler,
}

pub struct StoragePolicy {
    pub event_retention_days: u32,
    pub max_disk_usage_gb: u64,
    pub compression_enabled: bool,
    pub backup_enabled: bool,
}

impl StorageManager {
    pub async fn store_event(&self, event: DetectionEvent) -> Result<EventId> {
        // 1. 提取相关视频片段
        // 2. 保存帧序列
        // 3. 生成元数据
        // 4. 存储到数据库
        // 5. 触发清理策略
    }
}

3.7.2 数据库设计

-- 事件表
CREATE TABLE events (
    id TEXT PRIMARY KEY,
    timestamp DATETIME NOT NULL,
    duration_ms INTEGER NOT NULL,
    detection_confidence REAL NOT NULL,
    video_path TEXT NOT NULL,
    metadata_json TEXT NOT NULL,
    gps_lat REAL,
    gps_lon REAL,
    gps_alt REAL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- 传感器数据表
CREATE TABLE sensor_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    event_id TEXT NOT NULL,
    sensor_type TEXT NOT NULL,
    timestamp DATETIME NOT NULL,
    value REAL NOT NULL,
    unit TEXT NOT NULL,
    FOREIGN KEY (event_id) REFERENCES events(id)
);

-- 系统状态表
CREATE TABLE system_status (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp DATETIME NOT NULL,
    component TEXT NOT NULL,
    status TEXT NOT NULL,
    metrics_json TEXT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

3.8 通信管理器 (Communication Manager)

3.8.1 MQTT客户端

pub struct MqttManager {
    client: rumqttc::AsyncClient,
    eventloop: rumqttc::EventLoop,
    config: MqttConfig,
    publish_queue: Arc<Mutex<VecDeque<PublishMessage>>>,
}

impl MqttManager {
    pub async fn publish_event(&self, event: &DetectionEvent) -> Result<()> {
        let message = serde_json::to_string(&event)?;
        self.client.publish(
            &self.config.events_topic,
            rumqttc::QoS::AtLeastOnce,
            false,
            message
        ).await?;
        Ok(())
    }
    
    pub async fn publish_status(&self, status: &SystemStatus) -> Result<()> {
        // 发布系统状态信息
    }
}

3.8.2 HTTP API服务器

use actix_web::{web, App, HttpServer, Result as ActixResult};

pub struct HttpApiServer {
    config: HttpApiConfig,
    app_data: web::Data<AppState>,
}

// API端点定义
pub async fn get_system_status(data: web::Data<AppState>) -> ActixResult<impl Responder> {
    // 返回系统状态信息
}

pub async fn get_events(
    data: web::Data<AppState>,
    query: web::Query<EventQuery>
) -> ActixResult<impl Responder> {
    // 返回事件列表
}

pub async fn get_event_details(
    data: web::Data<AppState>,
    path: web::Path<String>
) -> ActixResult<impl Responder> {
    // 返回特定事件详情
}

pub async fn update_config(
    data: web::Data<AppState>,
    config: web::Json<Config>
) -> ActixResult<impl Responder> {
    // 更新系统配置
}

3.9 RTSP流媒体服务器

3.9.1 GStreamer集成

pub struct RtspServer {
    server: gstreamer_rtsp_server::RTSPServer,
    factory: gstreamer_rtsp_server::RTSPMediaFactory,
    config: RtspConfig,
}

impl RtspServer {
    pub fn new(config: RtspConfig) -> Result<Self> {
        let server = gstreamer_rtsp_server::RTSPServer::new();
        
        // 配置RTSP服务器
        server.set_service(&config.port.to_string());
        
        // 创建媒体工厂
        let factory = gstreamer_rtsp_server::RTSPMediaFactory::new();
        factory.set_launch(&format!(
            "appsrc name=source ! videoconvert ! x264enc ! rtph264pay name=pay0 pt=96"
        ));
        
        Ok(Self { server, factory, config })
    }
    
    pub async fn start(&mut self) -> Result<()> {
        // 启动RTSP服务器
        // 注册媒体工厂
        // 开始流媒体传输
    }
}

4. 配置系统设计

4.1 配置结构设计

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
    pub device: DeviceConfig,
    pub camera: CameraSettings,
    pub gps: GpsConfig,
    pub sensors: SensorConfig,
    pub detection: DetectionConfig,
    pub storage: StorageConfig,
    pub mqtt: MqttConfig,
    pub http_api: HttpApiConfig,
    pub rtsp: RtspConfig,
    pub watermark: WatermarkOptions,
    pub star_chart: StarChartOptions,
    pub log_level: String,
}

4.2 配置加载与验证

impl Config {
    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
        // 1. 读取TOML配置文件
        // 2. 反序列化为Config结构
        // 3. 验证配置有效性
        // 4. 应用默认值
        // 5. 处理向后兼容性
    }
    
    pub fn validate(&self) -> Result<()> {
        // 验证配置参数的有效性
        // 检查文件路径是否存在
        // 验证网络配置
        // 检查硬件兼容性
    }
    
    pub fn migrate_legacy(&mut self) -> Result<()> {
        // 处理旧版本配置文件的迁移
        // 保持向后兼容性
    }
}

5. 性能优化策略

5.1 异步编程优化

// 使用Tokio的异步运行时
#[tokio::main]
async fn main() -> Result<()> {
    // 创建多线程异步运行时
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(num_cpus::get())
        .enable_all()
        .build()?;
    
    // 运行应用
    rt.block_on(async {
        let mut app = Application::new(config);
        app.run().await
    })
}

// 并发处理优化
async fn process_frame_concurrent(frame: Arc<Frame>, detectors: &[Detector]) -> Result<Vec<DetectionResult>> {
    let tasks: Vec<_> = detectors.iter().map(|detector| {
        let frame = Arc::clone(&frame);
        tokio::spawn(async move {
            detector.detect(&frame).await
        })
    }).collect();
    
    let results = futures::future::join_all(tasks).await;
    // 处理结果...
}

5.2 内存管理优化

// 零拷贝帧共享
pub struct Frame {
    data: Arc<opencv::core::Mat>,
    timestamp: DateTime<Utc>,
    metadata: FrameMetadata,
}

impl Frame {
    pub fn share(&self) -> Arc<Frame> {
        // 共享帧数据,避免内存拷贝
        Arc::new(Frame {
            data: Arc::clone(&self.data),
            timestamp: self.timestamp,
            metadata: self.metadata.clone(),
        })
    }
}

// 内存池管理
pub struct FramePool {
    pool: Vec<Frame>,
    available: VecDeque<usize>,
}

impl FramePool {
    pub fn acquire(&mut self) -> Option<Frame> {
        // 从池中获取可用帧
        // 避免频繁内存分配
    }
    
    pub fn release(&mut self, frame: Frame) {
        // 将帧返回池中重用
    }
}

5.3 I/O优化

// 异步文件I/O
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn save_event_async(event: &DetectionEvent, path: &Path) -> Result<()> {
    let mut file = File::create(path).await?;
    let data = serde_json::to_vec(event)?;
    file.write_all(&data).await?;
    file.sync_all().await?;
    Ok(())
}

// 批量I/O操作
pub struct BatchWriter {
    buffer: Vec<WriteOperation>,
    batch_size: usize,
    flush_interval: Duration,
}

impl BatchWriter {
    pub async fn write(&mut self, operation: WriteOperation) -> Result<()> {
        self.buffer.push(operation);
        if self.buffer.len() >= self.batch_size {
            self.flush().await?;
        }
        Ok(())
    }
    
    async fn flush(&mut self) -> Result<()> {
        // 批量执行I/O操作
        // 减少系统调用开销
    }
}

6. 错误处理与恢复

6.1 分层错误处理

// 系统级错误
#[derive(thiserror::Error, Debug)]
pub enum SystemError {
    #[error("Hardware initialization failed: {0}")]
    HardwareError(#[from] HardwareError),
    
    #[error("Configuration error: {0}")]
    ConfigError(#[from] ConfigError),
    
    #[error("Network error: {0}")]
    NetworkError(#[from] NetworkError),
}

// 组件级错误
#[derive(thiserror::Error, Debug)]
pub enum CameraError {
    #[error("Camera not found: {device}")]
    DeviceNotFound { device: String },
    
    #[error("Invalid camera parameters: {msg}")]
    InvalidParameters { msg: String },
    
    #[error("Frame capture timeout")]
    CaptureTimeout,
}

// 错误恢复策略
pub struct ErrorRecoveryManager {
    strategies: HashMap<ErrorType, RecoveryStrategy>,
    retry_count: HashMap<ComponentId, usize>,
    max_retries: usize,
}

impl ErrorRecoveryManager {
    pub async fn handle_error(&mut self, error: SystemError, component: ComponentId) -> RecoveryAction {
        match self.strategies.get(&error.error_type()) {
            Some(RecoveryStrategy::Retry) => {
                let count = self.retry_count.entry(component).or_insert(0);
                *count += 1;
                if *count <= self.max_retries {
                    RecoveryAction::Retry
                } else {
                    RecoveryAction::Disable
                }
            },
            Some(RecoveryStrategy::Restart) => RecoveryAction::Restart,
            Some(RecoveryStrategy::Ignore) => RecoveryAction::Continue,
            None => RecoveryAction::Shutdown,
        }
    }
}

6.2 健康监控系统

pub struct HealthMonitor {
    components: HashMap<ComponentId, ComponentHealth>,
    check_interval: Duration,
    alerting: AlertingSystem,
}

pub struct ComponentHealth {
    pub status: HealthStatus,
    pub last_heartbeat: DateTime<Utc>,
    pub error_count: usize,
    pub metrics: ComponentMetrics,
}

impl HealthMonitor {
    pub async fn start_monitoring(&mut self) -> Result<()> {
        let mut interval = tokio::time::interval(self.check_interval);
        
        loop {
            interval.tick().await;
            self.check_all_components().await;
        }
    }
    
    async fn check_all_components(&mut self) {
        for (component_id, health) in &mut self.components {
            if self.is_component_unhealthy(health) {
                self.handle_unhealthy_component(*component_id, health).await;
            }
        }
    }
}

7. 安全性设计

7.1 网络安全

// TLS配置
pub struct TlsConfig {
    pub cert_path: PathBuf,
    pub key_path: PathBuf,
    pub ca_cert_path: Option<PathBuf>,
    pub verify_client: bool,
}

// API认证
pub struct ApiAuthentication {
    pub tokens: HashSet<String>,
    pub token_expiry: HashMap<String, DateTime<Utc>>,
    pub rate_limiter: RateLimiter,
}

impl ApiAuthentication {
    pub fn verify_token(&self, token: &str) -> Result<bool> {
        if !self.tokens.contains(token) {
            return Ok(false);
        }
        
        if let Some(expiry) = self.token_expiry.get(token) {
            if *expiry < Utc::now() {
                return Ok(false);
            }
        }
        
        Ok(true)
    }
}

7.2 数据保护

// 敏感数据加密
pub struct SecureStorage {
    cipher: Aes256Gcm,
    key: [u8; 32],
}

impl SecureStorage {
    pub fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
        let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
        let ciphertext = self.cipher.encrypt(&nonce, data)?;
        
        let mut result = nonce.to_vec();
        result.extend_from_slice(&ciphertext);
        Ok(result)
    }
    
    pub fn decrypt(&self, encrypted_data: &[u8]) -> Result<Vec<u8>> {
        let (nonce, ciphertext) = encrypted_data.split_at(12);
        let nonce = Nonce::from_slice(nonce);
        let plaintext = self.cipher.decrypt(nonce, ciphertext)?;
        Ok(plaintext)
    }
}

8. 测试策略

8.1 单元测试

#[cfg(test)]
mod tests {
    use super::*;
    use mockall::predicate::*;
    use tokio_test;

    #[tokio::test]
    async fn test_frame_detection() {
        let mut mock_detector = MockDetector::new();
        mock_detector
            .expect_detect()
            .with(always())
            .times(1)
            .returning(|_| Ok(DetectionResult::new(0.8, vec![])));

        let frame = create_test_frame();
        let result = mock_detector.detect(&frame).await;
        
        assert!(result.is_ok());
        assert_eq!(result.unwrap().confidence, 0.8);
    }
}

8.2 集成测试

#[tokio::test]
async fn test_end_to_end_detection() {
    // 创建测试配置
    let config = create_test_config();
    
    // 初始化应用
    let mut app = Application::new(config);
    app.initialize().await.unwrap();
    
    // 注入测试帧
    let test_frame = create_meteor_frame();
    app.process_frame(test_frame).await.unwrap();
    
    // 验证检测结果
    let events = app.get_detected_events().await;
    assert!(!events.is_empty());
}

8.3 性能基准测试

use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn bench_detection_pipeline(c: &mut Criterion) {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let detector = BrightnessDetector::new();
    let frame = create_test_frame();
    
    c.bench_function("detection_pipeline", |b| {
        b.iter(|| {
            rt.block_on(async {
                detector.detect(black_box(&frame)).await
            })
        })
    });
}

criterion_group!(benches, bench_detection_pipeline);
criterion_main!(benches);

9. 部署架构

9.1 容器化部署

FROM rust:1.70-slim as builder

WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src

RUN apt-get update && apt-get install -y \
    libopencv-dev \
    libssl-dev \
    pkg-config

RUN cargo build --release

FROM debian:bullseye-slim

RUN apt-get update && apt-get install -y \
    libopencv-core4.5 \
    libopencv-imgproc4.5 \
    libopencv-videoio4.5 \
    astrometry.net \
    && rm -rf /var/lib/apt/lists/*

COPY --from=builder /app/target/release/meteor_detect /usr/local/bin/
COPY config-example.toml /etc/meteor_detect/config.toml

EXPOSE 8080 8554

CMD ["meteor_detect"]

9.2 系统服务配置

# /etc/systemd/system/meteor-detect.service
[Unit]
Description=Meteor Detection System
After=network.target

[Service]
Type=simple
User=meteor
Group=meteor
WorkingDirectory=/opt/meteor-detect
ExecStart=/opt/meteor-detect/bin/meteor_detect
Restart=always
RestartSec=10
Environment=RUST_LOG=info

[Install]
WantedBy=multi-user.target

10. 监控与运维

10.1 指标收集

use prometheus::{Counter, Histogram, Gauge, Registry};

pub struct Metrics {
    pub frames_processed: Counter,
    pub detection_latency: Histogram,
    pub system_memory: Gauge,
    pub disk_usage: Gauge,
}

impl Metrics {
    pub fn new() -> Self {
        Self {
            frames_processed: Counter::new("frames_processed_total", "Total frames processed")
                .expect("metric can be created"),
            detection_latency: Histogram::with_opts(
                prometheus::HistogramOpts::new("detection_latency_seconds", "Detection latency")
                    .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0])
            ).expect("metric can be created"),
            system_memory: Gauge::new("system_memory_bytes", "System memory usage")
                .expect("metric can be created"),
            disk_usage: Gauge::new("disk_usage_bytes", "Disk usage")
                .expect("metric can be created"),
        }
    }
}

10.2 日志管理

use tracing::{info, warn, error, debug};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

pub fn init_logging(config: &LogConfig) -> Result<()> {
    let file_appender = tracing_appender::rolling::daily(&config.log_dir, "meteor-detect.log");
    let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
    
    tracing_subscriber::registry()
        .with(tracing_subscriber::EnvFilter::new(&config.level))
        .with(tracing_subscriber::fmt::layer().with_writer(std::io::stdout))
        .with(tracing_subscriber::fmt::layer()
            .with_writer(non_blocking)
            .with_ansi(false))
        .init();
    
    Ok(())
}

11. 扩展性设计

11.1 插件架构

pub trait DetectorPlugin: Send + Sync {
    fn name(&self) -> &str;
    fn version(&self) -> &str;
    fn create_detector(&self, config: &DetectorConfig) -> Result<Box<dyn Detector>>;
}

pub struct PluginManager {
    plugins: HashMap<String, Box<dyn DetectorPlugin>>,
    plugin_dir: PathBuf,
}

impl PluginManager {
    pub fn load_plugins(&mut self) -> Result<()> {
        // 扫描插件目录
        // 动态加载插件库
        // 注册插件接口
    }
    
    pub fn create_detector(&self, plugin_name: &str, config: &DetectorConfig) -> Result<Box<dyn Detector>> {
        let plugin = self.plugins.get(plugin_name)
            .ok_or_else(|| anyhow!("Plugin not found: {}", plugin_name))?;
        plugin.create_detector(config)
    }
}

11.2 微服务架构支持

// 服务发现
pub struct ServiceRegistry {
    services: HashMap<String, ServiceEndpoint>,
    health_checker: HealthChecker,
}

// 负载均衡
pub struct LoadBalancer {
    backends: Vec<Backend>,
    strategy: LoadBalancingStrategy,
}

// 服务间通信
pub struct ServiceClient {
    http_client: reqwest::Client,
    base_url: Url,
    auth_token: Option<String>,
}

本技术架构文档详细描述了流星监控系统的完整技术架构涵盖了从底层硬件抽象到上层应用逻辑的各个层面。系统采用现代化的Rust异步编程模型具备高性能、高可靠性和强扩展性的特点能够满足专业级天文观测的需求。