From 275fb05636cc54d9a18d485f7ce60e910ce05934 Mon Sep 17 00:00:00 2001 From: grabbit Date: Thu, 8 Jan 2026 23:26:43 +0800 Subject: [PATCH] feat(edge-client): add OpenCV camera backend with macOS authorization support - Add OpenCV as default camera backend, nokhwa as optional alternative - Make camera backends mutually exclusive via feature flags (opencv_camera, nokhwa_camera) - Remove deprecated hardware_camera feature, use nokhwa_camera instead - Add main thread camera initialization for macOS TCC authorization - Add pre-opened capture storage via static Mutex for async compatibility - Add pixel format conversion utilities (pixel_convert.rs) - Update all cfg guards from hardware_camera to nokhwa_camera macOS requires camera authorization requests on main thread. OpenCV's VideoCapture::new() is now called before tokio runtime starts, with the handle stored for later use by async code. Co-Authored-By: Claude Opus 4.5 --- meteor-edge-client/Cargo.lock | 754 ++++++++++++++++-- meteor-edge-client/Cargo.toml | 21 +- meteor-edge-client/src/camera/factory.rs | 65 +- meteor-edge-client/src/camera/interface.rs | 6 + meteor-edge-client/src/camera/mod.rs | 129 ++- .../src/camera/opencv_camera.rs | 363 +++++++++ .../src/camera/pixel_convert.rs | 244 ++++++ meteor-edge-client/src/camera/production.rs | 568 +++++++++++-- meteor-edge-client/src/core/app.rs | 161 +++- meteor-edge-client/src/detection/detector.rs | 12 +- meteor-edge-client/src/main.rs | 101 +++ meteor-edge-client/src/memory/frame_data.rs | 3 + .../src/memory/memory_monitor.rs | 24 + .../src/network/communication.rs | 45 +- meteor-edge-client/src/storage/storage.rs | 12 +- 15 files changed, 2278 insertions(+), 230 deletions(-) create mode 100644 meteor-edge-client/src/camera/opencv_camera.rs create mode 100644 meteor-edge-client/src/camera/pixel_convert.rs diff --git a/meteor-edge-client/Cargo.lock b/meteor-edge-client/Cargo.lock index 2b819f3..82e1945 100644 --- a/meteor-edge-client/Cargo.lock +++ b/meteor-edge-client/Cargo.lock @@ -97,6 +97,12 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "asn1-rs" version = "0.6.2" @@ -109,7 +115,7 @@ dependencies = [ "nom", "num-traits", "rusticata-macros", - "thiserror", + "thiserror 1.0.69", "time", ] @@ -169,7 +175,7 @@ version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff" dependencies = [ - "bindgen", + "bindgen 0.69.5", "cc", "cmake", "dunce", @@ -183,7 +189,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" dependencies = [ "addr2line", - "cfg-if", + "cfg-if 1.0.1", "libc", "miniz_oxide", "object", @@ -203,6 +209,29 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bindgen" +version = "0.65.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "log", + "peeking_take_while", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", + "which", +] + [[package]] name = "bindgen" version = "0.69.5" @@ -244,6 +273,12 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "block" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d8c1fef690941d3e7788d328517591fecc684c084084702d6ff1641e993699a" + [[package]] name = "block-buffer" version = "0.10.4" @@ -303,6 +338,12 @@ dependencies = [ "nom", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.1" @@ -327,7 +368,17 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-link", + "windows-link 0.1.3", +] + +[[package]] +name = "clang" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c044c781163c001b913cd018fc95a628c50d0d2dfea8bca77dad71edb16e37" +dependencies = [ + "clang-sys", + "libc", ] [[package]] @@ -390,6 +441,34 @@ dependencies = [ "cc", ] +[[package]] +name = "cocoa" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c49e86fc36d5704151f5996b7b3795385f50ce09e3be0f47a0cfde869681cf8" +dependencies = [ + "bitflags 1.3.2", + "block", + "core-foundation 0.7.0", + "core-graphics", + "foreign-types", + "libc", + "objc", +] + +[[package]] +name = "cocoa-foundation" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81411967c50ee9a1fc11365f8c585f863a22a9697c89239c452292c40ba79b0d" +dependencies = [ + "bitflags 2.9.1", + "block", + "core-foundation 0.10.1", + "core-graphics-types", + "objc", +] + [[package]] name = "color_quant" version = "1.1.0" @@ -402,22 +481,96 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "core-foundation" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171" +dependencies = [ + "core-foundation-sys 0.7.0", + "libc", +] + [[package]] name = "core-foundation" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" dependencies = [ - "core-foundation-sys", + "core-foundation-sys 0.8.7", "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys 0.8.7", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" + [[package]] name = "core-foundation-sys" version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core-graphics" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3889374e6ea6ab25dba90bb5d96202f61108058361f6dc72e8b03e6f8bbe923" +dependencies = [ + "bitflags 1.3.2", + "core-foundation 0.7.0", + "foreign-types", + "libc", +] + +[[package]] +name = "core-graphics-types" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d44a101f213f6c4cdc1853d4b78aef6db6bdfa3468798cc1d9912f4735013eb" +dependencies = [ + "bitflags 2.9.1", + "core-foundation 0.10.1", + "libc", +] + +[[package]] +name = "core-media-sys" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "273bf3fc5bf51fd06a7766a84788c1540b6527130a0bce39e00567d6ab9f31f1" +dependencies = [ + "cfg-if 0.1.10", + "core-foundation-sys 0.7.0", + "libc", +] + +[[package]] +name = "core-video-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34ecad23610ad9757664d644e369246edde1803fcb43ed72876565098a5d3828" +dependencies = [ + "cfg-if 0.1.10", + "core-foundation-sys 0.7.0", + "core-graphics", + "libc", + "metal", + "objc", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -433,7 +586,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", ] [[package]] @@ -576,7 +729,7 @@ version = "0.8.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", ] [[package]] @@ -635,12 +788,39 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -761,7 +941,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "js-sys", "libc", "wasi 0.11.1+wasi-snapshot-preview1", @@ -774,7 +954,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "libc", "r-efi", "wasi 0.14.2+wasi-0.2.4", @@ -827,7 +1007,7 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "crunchy", ] @@ -974,7 +1154,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" dependencies = [ "android_system_properties", - "core-foundation-sys", + "core-foundation-sys 0.8.7", "iana-time-zone-haiku", "js-sys", "log", @@ -1144,7 +1324,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" dependencies = [ "bitflags 2.9.1", - "cfg-if", + "cfg-if 1.0.1", "libc", ] @@ -1247,7 +1427,7 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "windows-targets 0.52.6", ] @@ -1305,6 +1485,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "malloc_buf" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62bb907fe88d54d8d9ce32a3cceab4218ed2f6b7d35617cafe9adf84e43919cb" +dependencies = [ + "libc", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -1335,6 +1524,21 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metal" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e198a0ee42bdbe9ef2c09d0b9426f3b2b47d90d93a4a9b0395c4cea605e92dc0" +dependencies = [ + "bitflags 1.3.2", + "block", + "cocoa", + "core-graphics", + "foreign-types", + "log", + "objc", +] + [[package]] name = "meteor-edge-client" version = "0.1.0" @@ -1358,7 +1562,9 @@ dependencies = [ "lazy_static", "libc", "mac_address", + "nokhwa", "num_cpus", + "opencv", "qrcode", "rand", "reqwest", @@ -1371,9 +1577,10 @@ dependencies = [ "sys-info", "sysinfo", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-tungstenite", + "tokio-util", "toml", "tracing", "tracing-appender", @@ -1427,6 +1634,50 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "mozjpeg" +version = "0.10.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7891b80aaa86097d38d276eb98b3805d6280708c4e0a1e6f6aed9380c51fec9" +dependencies = [ + "arrayvec", + "bytemuck", + "libc", + "mozjpeg-sys", + "rgb", +] + +[[package]] +name = "mozjpeg-sys" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f0dc668bf9bf888c88e2fb1ab16a406d2c380f1d082b20d51dd540ab2aa70c1" +dependencies = [ + "cc", + "dunce", + "libc", + "nasm-rs", +] + +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "nasm-rs" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34f676553b60ccbb76f41f9ae8f2428dac3f259ff8f1c2468a174778d06a1af9" +dependencies = [ + "jobserver", + "log", +] + [[package]] name = "nix" version = "0.29.0" @@ -1434,12 +1685,79 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.9.1", - "cfg-if", + "cfg-if 1.0.1", "cfg_aliases", "libc", "memoffset", ] +[[package]] +name = "nokhwa" +version = "0.10.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4cae50786bfa1214ed441f98addbea51ca1b9aaa9e4bf5369cda36654b3efaa" +dependencies = [ + "flume", + "image 0.25.6", + "nokhwa-bindings-linux", + "nokhwa-bindings-macos", + "nokhwa-bindings-windows", + "nokhwa-core", + "parking_lot", + "paste", + "thiserror 2.0.17", +] + +[[package]] +name = "nokhwa-bindings-linux" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd666aaa41d14357817bd9a981773a73c4d00b34d344cfc244e47ebd397b1ec" +dependencies = [ + "nokhwa-core", + "v4l", +] + +[[package]] +name = "nokhwa-bindings-macos" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de78eb4a2d47a68f490899aa0516070d7a972f853ec2bb374ab53be0bd39b60f" +dependencies = [ + "block", + "cocoa-foundation", + "core-foundation 0.10.1", + "core-media-sys", + "core-video-sys", + "flume", + "nokhwa-core", + "objc", + "once_cell", +] + +[[package]] +name = "nokhwa-bindings-windows" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899799275c93ef69bbe8cb888cf6f8249abe751cbc50be5299105022aec14a1c" +dependencies = [ + "nokhwa-core", + "once_cell", + "windows 0.62.2", +] + +[[package]] +name = "nokhwa-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109975552bbd690894f613bce3d408222911e317197c72b2e8b9a1912dc261ae" +dependencies = [ + "bytes", + "image 0.25.6", + "mozjpeg", + "thiserror 2.0.17", +] + [[package]] name = "nom" version = "7.1.3" @@ -1513,6 +1831,25 @@ dependencies = [ "libc", ] +[[package]] +name = "objc" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915b1b472bc21c53464d6c8461c9d3af805ba1ef837e1cac254428f4a77177b1" +dependencies = [ + "malloc_buf", + "objc_exception", +] + +[[package]] +name = "objc_exception" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad970fb455818ad6cba4c122ad012fae53ae8b4795f86378bce65e4f6bab2ca4" +dependencies = [ + "cc", +] + [[package]] name = "object" version = "0.36.7" @@ -1543,6 +1880,41 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "opencv" +version = "0.93.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756181caf50547e4fe39fd5dc2258647e2c0413c8f6ad1725bf76102e5b7392a" +dependencies = [ + "cc", + "dunce", + "jobserver", + "libc", + "num-traits", + "once_cell", + "opencv-binding-generator", + "pkg-config", + "semver", + "shlex", + "vcpkg", + "windows 0.59.0", +] + +[[package]] +name = "opencv-binding-generator" +version = "0.93.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef95a75b87e0f8052af822f04d6fbc48c414907d01d33fd5f3f4184f10f896a" +dependencies = [ + "clang", + "clang-sys", + "dunce", + "once_cell", + "percent-encoding", + "regex", + "shlex", +] + [[package]] name = "openssl-probe" version = "0.1.6" @@ -1577,13 +1949,25 @@ version = "0.9.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "libc", "redox_syscall", "smallvec", "windows-targets 0.52.6", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1602,6 +1986,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "png" version = "0.17.16" @@ -1758,7 +2148,7 @@ checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ "getrandom 0.2.16", "libredox", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1847,6 +2237,15 @@ dependencies = [ "winreg", ] +[[package]] +name = "rgb" +version = "0.8.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6a884d2998352bb4daf0183589aec883f16a6da1f4dde84d8e2e9a5409a1ce" +dependencies = [ + "bytemuck", +] + [[package]] name = "ring" version = "0.17.14" @@ -1854,7 +2253,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", - "cfg-if", + "cfg-if 1.0.1", "getrandom 0.2.16", "libc", "untrusted", @@ -2067,8 +2466,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.9.1", - "core-foundation", - "core-foundation-sys", + "core-foundation 0.9.4", + "core-foundation-sys 0.8.7", "libc", "security-framework-sys", ] @@ -2079,10 +2478,16 @@ version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" dependencies = [ - "core-foundation-sys", + "core-foundation-sys 0.8.7", "libc", ] +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.219" @@ -2142,7 +2547,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "cpufeatures", "digest", ] @@ -2153,7 +2558,7 @@ version = "0.10.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "cpufeatures", "digest", ] @@ -2220,6 +2625,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -2282,13 +2696,13 @@ version = "0.30.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" dependencies = [ - "cfg-if", - "core-foundation-sys", + "cfg-if 1.0.1", + "core-foundation-sys 0.8.7", "libc", "ntapi", "once_cell", "rayon", - "windows", + "windows 0.52.0", ] [[package]] @@ -2298,7 +2712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", - "core-foundation", + "core-foundation 0.9.4", "system-configuration-sys", ] @@ -2308,7 +2722,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" dependencies = [ - "core-foundation-sys", + "core-foundation-sys 0.8.7", "libc", ] @@ -2331,7 +2745,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl 2.0.17", ] [[package]] @@ -2345,13 +2768,24 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", ] [[package]] @@ -2482,8 +2916,12 @@ checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", + "futures-util", + "hashbrown", "pin-project-lite", + "slab", "tokio", ] @@ -2552,7 +2990,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" dependencies = [ "crossbeam-channel", - "thiserror", + "thiserror 1.0.69", "time", "tracing-subscriber", ] @@ -2644,7 +3082,7 @@ dependencies = [ "rustls-native-certs", "rustls-pki-types", "sha1", - "thiserror", + "thiserror 1.0.69", "url", "utf-8", ] @@ -2713,12 +3151,38 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "v4l" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8fbfea44a46799d62c55323f3c55d06df722fbe577851d848d328a1041c3403" +dependencies = [ + "bitflags 1.3.2", + "libc", + "v4l2-sys-mit", +] + +[[package]] +name = "v4l2-sys-mit" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6779878362b9bacadc7893eac76abe69612e8837ef746573c4a5239daf11990b" +dependencies = [ + "bindgen 0.65.1", +] + [[package]] name = "valuable" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -2755,7 +3219,7 @@ version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "once_cell", "rustversion", "wasm-bindgen-macro", @@ -2781,7 +3245,7 @@ version = "0.4.50" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "js-sys", "once_cell", "wasm-bindgen", @@ -2886,6 +3350,37 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f919aee0a93304be7f62e8e5027811bbba96bcb1de84d6618be56e43f8a32a1" +dependencies = [ + "windows-core 0.59.0", + "windows-targets 0.53.5", +] + +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core 0.62.2", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core 0.62.2", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -2895,24 +3390,72 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "810ce18ed2112484b0d4e15d022e5f598113e220c53e373fb31e67e21670c1ce" +dependencies = [ + "windows-implement 0.59.0", + "windows-interface", + "windows-result 0.3.4", + "windows-strings 0.3.1", + "windows-targets 0.53.5", +] + [[package]] name = "windows-core" version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ - "windows-implement", + "windows-implement 0.60.2", "windows-interface", - "windows-link", - "windows-result", - "windows-strings", + "windows-link 0.1.3", + "windows-result 0.3.4", + "windows-strings 0.4.2", +] + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement 0.60.2", + "windows-interface", + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core 0.62.2", + "windows-link 0.2.1", + "windows-threading", ] [[package]] name = "windows-implement" -version = "0.60.0" +version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +checksum = "83577b051e2f49a058c308f17f273b570a6a758386fc291b5f6a934dd84e48c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", @@ -2921,9 +3464,9 @@ dependencies = [ [[package]] name = "windows-interface" -version = "0.59.1" +version = "0.59.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", @@ -2936,13 +3479,47 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core 0.62.2", + "windows-link 0.2.1", +] + [[package]] name = "windows-result" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link", + "windows-link 0.1.3", +] + +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link 0.2.1", +] + +[[package]] +name = "windows-strings" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87fa48cc5d406560701792be122a10132491cff9d0aeb23583cc2dcafc847319" +dependencies = [ + "windows-link 0.1.3", ] [[package]] @@ -2951,7 +3528,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link", + "windows-link 0.1.3", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link 0.2.1", ] [[package]] @@ -3005,13 +3591,39 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link 0.2.1", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -3024,6 +3636,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -3036,6 +3654,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -3048,12 +3672,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -3066,6 +3702,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -3078,6 +3720,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -3090,6 +3738,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -3102,6 +3756,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.7.12" @@ -3117,7 +3777,7 @@ version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ - "cfg-if", + "cfg-if 1.0.1", "windows-sys 0.48.0", ] @@ -3149,7 +3809,7 @@ dependencies = [ "nom", "oid-registry", "rusticata-macros", - "thiserror", + "thiserror 1.0.69", "time", ] diff --git a/meteor-edge-client/Cargo.toml b/meteor-edge-client/Cargo.toml index 3dd0e73..b6d7af3 100644 --- a/meteor-edge-client/Cargo.toml +++ b/meteor-edge-client/Cargo.toml @@ -5,13 +5,14 @@ edition = "2021" default-run = "meteor-edge-client" [features] -default = [] +default = ["opencv_camera"] -# Production hardware features -hardware_camera = [] +# Camera backends (mutually exclusive) +opencv_camera = ["dep:opencv"] +nokhwa_camera = ["dep:nokhwa"] + +# Other features opencv_integration = [] - -# Debug and development features debug_logging = [] performance_profiling = [] @@ -22,6 +23,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" toml = "0.8" tokio = { version = "1.0", features = ["full"] } +tokio-util = { version = "0.7", features = ["full"] } anyhow = "1.0" thiserror = "1.0" dirs = "5.0" @@ -63,6 +65,15 @@ mac_address = "1.1" # Camera interface dependencies async-trait = "0.1" +# Cross-platform camera capture backends (optional, mutually exclusive) +# OpenCV - stable, proper resource release +opencv = { version = "0.93", optional = true, default-features = false, features = ["videoio"] } + +# nokhwa - alternative backend (has resource release issues on macOS) +# - input-native: Use native camera backend (V4L2 on Linux, AVFoundation on macOS) +# - output-threaded: Enable threaded/callback based camera for proper async support +nokhwa = { version = "0.10", features = ["input-native", "output-threaded"], optional = true } + # Optional video processing backends [target.'cfg(windows)'.dependencies] diff --git a/meteor-edge-client/src/camera/factory.rs b/meteor-edge-client/src/camera/factory.rs index 47647b3..7d3f2e5 100644 --- a/meteor-edge-client/src/camera/factory.rs +++ b/meteor-edge-client/src/camera/factory.rs @@ -4,10 +4,16 @@ use std::path::PathBuf; use std::sync::Arc; use super::interface::{CameraConfig, CameraInterface, CameraType}; -use super::production::ProductionCamera; use super::video_file::VideoFileCamera; use crate::memory::frame_pool::HierarchicalFramePool; +// Import the appropriate camera backend +#[cfg(feature = "opencv_camera")] +use super::opencv_camera::OpenCVCamera; + +#[cfg(feature = "nokhwa_camera")] +use super::production::ProductionCamera; + /// Factory for creating camera instances from configuration or specs pub struct CameraFactory { frame_pool: Arc, @@ -19,18 +25,10 @@ impl CameraFactory { } pub fn create_camera(&self, config: CameraConfig) -> Result> { - match config.camera_type { + match config.camera_type.clone() { CameraType::Production { device_id, backend } => { println!("šŸ­ Opening hardware camera: {} ({})", device_id, backend); - let camera = ProductionCamera::new( - device_id, - backend, - config.resolution, - config.fps, - self.frame_pool.clone(), - ) - .context("Failed to create hardware camera")?; - Ok(Box::new(camera)) + self.create_hardware_camera(config, device_id, backend) } CameraType::VideoFile { path, @@ -51,6 +49,51 @@ impl CameraFactory { } } + /// Create hardware camera using the appropriate backend + #[cfg(feature = "opencv_camera")] + fn create_hardware_camera( + &self, + config: CameraConfig, + _device_id: String, + _backend: String, + ) -> Result> { + println!(" Using OpenCV backend"); + let camera = OpenCVCamera::new(config) + .context("Failed to create OpenCV camera")?; + Ok(Box::new(camera)) + } + + #[cfg(all(feature = "nokhwa_camera", not(feature = "opencv_camera")))] + fn create_hardware_camera( + &self, + config: CameraConfig, + device_id: String, + backend: String, + ) -> Result> { + println!(" Using nokhwa backend"); + let camera = ProductionCamera::new( + device_id, + backend, + config.resolution, + config.fps, + self.frame_pool.clone(), + ) + .context("Failed to create nokhwa camera")?; + Ok(Box::new(camera)) + } + + #[cfg(not(any(feature = "opencv_camera", feature = "nokhwa_camera")))] + fn create_hardware_camera( + &self, + _config: CameraConfig, + _device_id: String, + _backend: String, + ) -> Result> { + anyhow::bail!( + "No camera backend available. Build with --features opencv_camera or --features nokhwa_camera" + ) + } + pub fn create_from_spec(&self, spec: &str) -> Result> { let config = self.parse_camera_spec(spec)?; self.create_camera(config) diff --git a/meteor-edge-client/src/camera/interface.rs b/meteor-edge-client/src/camera/interface.rs index 5955aed..0e728de 100644 --- a/meteor-edge-client/src/camera/interface.rs +++ b/meteor-edge-client/src/camera/interface.rs @@ -4,6 +4,7 @@ use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; use std::sync::Arc; +use tokio_util::sync::CancellationToken; use crate::memory::frame_data::{FrameData, FrameFormat}; use std::path::PathBuf; @@ -33,6 +34,11 @@ pub trait CameraInterface: Send + Sync { fn shutdown( &mut self, ) -> std::pin::Pin> + Send + '_>>; + + /// Optional: supply a cancellation token for cooperative shutdown of blocking work + fn set_cancellation_token(&mut self, _token: CancellationToken) { + // Default no-op for cameras that don't need cancellation support + } } /// Represents a captured frame with metadata diff --git a/meteor-edge-client/src/camera/mod.rs b/meteor-edge-client/src/camera/mod.rs index f5f1da5..5e52015 100644 --- a/meteor-edge-client/src/camera/mod.rs +++ b/meteor-edge-client/src/camera/mod.rs @@ -3,20 +3,35 @@ pub mod factory; /// This module provides a unified interface while keeping implementation details isolated // Core interfaces and types (always available) pub mod interface; -pub mod production; +pub mod pixel_convert; mod video_file; +// Camera backends - conditionally compiled +#[cfg(feature = "opencv_camera")] +pub mod opencv_camera; + +#[cfg(feature = "nokhwa_camera")] +pub mod production; + // Re-export core types for convenience pub use factory::{print_available_cameras, CameraFactory}; pub use interface::{ CameraConfig, CameraInterface, CameraMetadata, CameraType, CapturedFrame, FrameMetadata, }; -pub use production::{ProductionCamera, ProductionCameraCapabilities}; pub use video_file::VideoFileCamera; +// Export the active camera backend +#[cfg(feature = "opencv_camera")] +pub use opencv_camera::OpenCVCamera; + +#[cfg(feature = "nokhwa_camera")] +pub use production::{ProductionCamera, ProductionCameraCapabilities}; + use anyhow::Result; use std::sync::Arc; -use tokio::time::{sleep, Duration}; +use tokio::sync::broadcast; +use tokio::time::{sleep, timeout, Duration}; +use tokio_util::sync::CancellationToken; use crate::core::events::{EventBus, FrameCapturedEvent}; use crate::memory::frame_pool::HierarchicalFramePool; @@ -77,12 +92,18 @@ impl CameraController { } /// Start the camera capture loop - pub async fn run(&mut self) -> Result<()> { + pub async fn run( + &mut self, + mut shutdown_rx: broadcast::Receiver<()>, + cancellation_token: CancellationToken, + ) -> Result<()> { println!("šŸŽ„ Starting camera controller..."); // Initialize the camera self.camera.initialize().await?; self.is_running = true; + self.camera + .set_cancellation_token(cancellation_token.clone()); let metadata = self.camera.get_metadata(); println!( @@ -103,45 +124,69 @@ impl CameraController { // Calculate frame timing let frame_duration = Duration::from_secs_f64(1.0 / metadata.target_fps); - // Main capture loop - while self.is_running && self.camera.is_running() { + // Main capture loop with shutdown signal support + loop { + if !self.is_running || !self.camera.is_running() { + break; + } + let start_time = tokio::time::Instant::now(); - match self.camera.capture_frame().await { - Ok(captured_frame) => { - // Create event from captured frame - let event = FrameCapturedEvent::new( - captured_frame.frame_number, - captured_frame.data.clone(), - ); - - // Publish frame event - if let Err(e) = self.event_bus.publish_frame_captured(event) { - eprintln!("āŒ Failed to publish frame event: {}", e); - continue; - } - - // Log progress periodically - if captured_frame.frame_number % 30 == 0 { - println!("šŸ“ø Captured {} frames", captured_frame.frame_number); - } + // Use select! to check for shutdown while capturing frames + tokio::select! { + _ = shutdown_rx.recv() => { + println!("šŸ“· Camera controller received shutdown signal"); + break; } - Err(e) => { - eprintln!( - "āŒ Error capturing frame {}: {}", - self.camera.frame_count(), - e - ); + _ = cancellation_token.cancelled() => { + println!("šŸ“· Camera controller received cancellation request"); + break; + } + frame_result = self.camera.capture_frame() => { + match frame_result { + Ok(captured_frame) => { + // Create event from captured frame + let event = FrameCapturedEvent::new( + captured_frame.frame_number, + captured_frame.data.clone(), + ); - // If this is a production camera and capture fails, we should probably exit - if metadata.camera_type != "VideoFile" { - eprintln!("āŒ Hardware camera failure, stopping..."); - break; + // Publish frame event + if let Err(e) = self.event_bus.publish_frame_captured(event) { + eprintln!("āŒ Failed to publish frame event: {}", e); + continue; + } + + // Log progress periodically + if captured_frame.frame_number % 30 == 0 { + println!("šŸ“ø Captured {} frames", captured_frame.frame_number); + } + } + Err(e) => { + // Treat cancellation as a clean exit + let err_text = e.to_string(); + if err_text.contains("Capture cancelled") { + println!("šŸ“· Camera capture cancelled, stopping loop"); + break; + } else { + eprintln!( + "āŒ Error capturing frame {}: {}", + self.camera.frame_count(), + e + ); + } + + // If this is a production camera and capture fails, we should probably exit + if metadata.camera_type != "VideoFile" { + eprintln!("āŒ Hardware camera failure, stopping..."); + break; + } + + // For video files, wait a bit and continue + sleep(Duration::from_secs(1)).await; + continue; + } } - - // For video files, wait a bit and continue - sleep(Duration::from_secs(1)).await; - continue; } } @@ -153,7 +198,13 @@ impl CameraController { } // Shutdown camera - self.camera.shutdown().await?; + cancellation_token.cancel(); + match timeout(Duration::from_millis(500), self.camera.shutdown()).await { + Ok(result) => result?, + Err(_) => { + eprintln!("āš ļø Camera shutdown timed out, force dropping camera handle"); + } + } self.is_running = false; println!("šŸŽ¬ Camera controller stopped"); diff --git a/meteor-edge-client/src/camera/opencv_camera.rs b/meteor-edge-client/src/camera/opencv_camera.rs new file mode 100644 index 0000000..9eff78f --- /dev/null +++ b/meteor-edge-client/src/camera/opencv_camera.rs @@ -0,0 +1,363 @@ +//! OpenCV-based camera implementation +//! +//! Uses opencv::videoio::VideoCapture for camera capture. +//! Provides reliable resource management with explicit release() on shutdown. +//! +//! On macOS, camera authorization must happen on the main thread. +//! Use `open_capture_on_main_thread()` before starting tokio runtime, +//! then `set_pre_opened_capture()` to store it for later use. + +use anyhow::{Context, Result}; +use chrono::Utc; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; + +use opencv::core::Mat; +use opencv::prelude::*; +use opencv::videoio::{ + VideoCapture, CAP_ANY, CAP_PROP_FRAME_WIDTH, CAP_PROP_FRAME_HEIGHT, CAP_PROP_FPS, +}; + +// Thread-safe storage for pre-opened capture (macOS main thread requirement) +static PRE_OPENED_CAPTURE: Mutex> = Mutex::new(None); + +/// Store a pre-opened capture for later use by OpenCVCamera::new() +/// Call this from main thread BEFORE starting tokio runtime +pub fn set_pre_opened_capture(capture: VideoCapture) { + *PRE_OPENED_CAPTURE.lock().unwrap() = Some(capture); +} + +/// Take the pre-opened capture (can only be called once) +fn take_pre_opened_capture() -> Option { + PRE_OPENED_CAPTURE.lock().unwrap().take() +} + +use super::interface::{ + CameraConfig, CameraInterface, CameraMetadata, CameraType, CapturedFrame, FrameMetadata, +}; +use crate::memory::frame_data::{FrameData, FrameFormat}; + +/// OpenCV-based camera implementation +pub struct OpenCVCamera { + capture: Option, + config: CameraConfig, + frame_count: u64, + is_running: bool, + actual_width: u32, + actual_height: u32, + actual_fps: f64, +} + +// Safety: OpenCVCamera is only accessed from one thread at a time through &mut self +// The VideoCapture raw pointer is not shared across threads +unsafe impl Send for OpenCVCamera {} +unsafe impl Sync for OpenCVCamera {} + +impl OpenCVCamera { + /// Create a new OpenCV camera instance + /// Will use pre-opened capture from main thread if available (macOS) + pub fn new(config: CameraConfig) -> Result { + // Check for pre-opened capture from main thread (macOS authorization) + if let Some(capture) = take_pre_opened_capture() { + println!("šŸŽ„ Using pre-opened camera from main thread (macOS)"); + return Self::with_capture(capture, config); + } + + println!("šŸŽ„ Creating OpenCV camera..."); + Ok(Self { + capture: None, + config, + frame_count: 0, + is_running: false, + actual_width: 0, + actual_height: 0, + actual_fps: 0.0, + }) + } + + /// Open camera on main thread (required for macOS authorization) + /// Call this BEFORE starting tokio runtime + pub fn open_capture_on_main_thread(device_id: i32) -> Result { + println!("šŸŽ„ Opening camera on main thread (macOS authorization)..."); + println!(" Device ID: {}", device_id); + + let cam = VideoCapture::new(device_id, CAP_ANY) + .context("Failed to create VideoCapture")?; + + if !cam.is_opened()? { + anyhow::bail!("Failed to open camera device {}", device_id); + } + + println!("āœ… Camera opened successfully on main thread"); + Ok(cam) + } + + /// Create camera with pre-opened capture handle + pub fn with_capture(capture: VideoCapture, config: CameraConfig) -> Result { + // Get actual resolution from the opened capture + let actual_width = capture.get(CAP_PROP_FRAME_WIDTH)? as u32; + let actual_height = capture.get(CAP_PROP_FRAME_HEIGHT)? as u32; + let actual_fps = capture.get(CAP_PROP_FPS)?; + + println!("šŸŽ„ Creating OpenCV camera with pre-opened capture..."); + println!(" Resolution: {}x{} @ {:.1} FPS", actual_width, actual_height, actual_fps); + + Ok(Self { + capture: Some(capture), + config, + frame_count: 0, + is_running: false, + actual_width, + actual_height, + actual_fps, + }) + } + + /// Get device ID from config + fn get_device_id(&self) -> i32 { + match &self.config.camera_type { + CameraType::Production { device_id, .. } => { + device_id.parse().unwrap_or(0) + } + _ => 0, + } + } + + /// Convert OpenCV Mat to FrameData + fn mat_to_frame_data(&self, mat: &Mat) -> Result { + let rows = mat.rows(); + let cols = mat.cols(); + let channels = mat.channels(); + + if rows <= 0 || cols <= 0 { + anyhow::bail!("Invalid frame dimensions: {}x{}", cols, rows); + } + + let width = cols as u32; + let height = rows as u32; + + // Get raw data from Mat + let data_ptr = mat.data(); + let data_len = (rows * cols * channels) as usize; + + if data_ptr.is_null() { + anyhow::bail!("Mat data pointer is null"); + } + + // Copy data from Mat (OpenCV uses BGR format by default) + let bgr_data = unsafe { std::slice::from_raw_parts(data_ptr, data_len) }; + + // Convert BGR to grayscale for meteor detection + let grayscale_data = if channels == 3 { + bgr_to_grayscale(bgr_data, width as usize, height as usize) + } else if channels == 1 { + bgr_data.to_vec() + } else { + // For other formats, just use first channel or convert + bgr_data.iter().step_by(channels as usize).copied().collect() + }; + + Ok(FrameData::new( + grayscale_data, + width, + height, + FrameFormat::Grayscale, + )) + } +} + +/// Convert BGR to grayscale using standard luminance formula +fn bgr_to_grayscale(bgr_data: &[u8], width: usize, height: usize) -> Vec { + let mut grayscale = Vec::with_capacity(width * height); + + for pixel in bgr_data.chunks_exact(3) { + let b = pixel[0] as f32; + let g = pixel[1] as f32; + let r = pixel[2] as f32; + // ITU-R BT.601 luminance formula + let gray = (0.299 * r + 0.587 * g + 0.114 * b) as u8; + grayscale.push(gray); + } + + grayscale +} + +impl CameraInterface for OpenCVCamera { + fn initialize( + &mut self, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + // Check if capture was pre-opened on main thread (macOS authorization) + if self.capture.is_some() { + println!("šŸŽ„ OpenCV camera already opened (main thread initialization)"); + println!(" Resolution: {}x{} @ {:.1} FPS", + self.actual_width, self.actual_height, self.actual_fps); + + // Configure resolution and FPS + if let Some(cam) = self.capture.as_mut() { + let _ = cam.set(CAP_PROP_FRAME_WIDTH, self.config.resolution.0 as f64); + let _ = cam.set(CAP_PROP_FRAME_HEIGHT, self.config.resolution.1 as f64); + let _ = cam.set(CAP_PROP_FPS, self.config.fps); + + // Read back actual values + self.actual_width = cam.get(CAP_PROP_FRAME_WIDTH)? as u32; + self.actual_height = cam.get(CAP_PROP_FRAME_HEIGHT)? as u32; + self.actual_fps = cam.get(CAP_PROP_FPS)?; + } + + println!("āœ… OpenCV camera ready"); + println!(" Actual resolution: {}x{}", self.actual_width, self.actual_height); + println!(" Actual FPS: {:.1}", self.actual_fps); + + self.is_running = true; + return Ok(()); + } + + // Fallback: open camera in async context (works on Linux/Windows) + let device_id = self.get_device_id(); + println!("šŸŽ„ Initializing OpenCV camera..."); + println!(" Device ID: {}", device_id); + println!(" Target resolution: {}x{}", self.config.resolution.0, self.config.resolution.1); + println!(" Target FPS: {}", self.config.fps); + + // Create VideoCapture + // Note: VideoCapture::new is blocking, but we're in an async context + // On macOS, this may fail if not called from main thread + let mut cam = VideoCapture::new(device_id, CAP_ANY) + .context("Failed to create VideoCapture")?; + + if !cam.is_opened()? { + anyhow::bail!("Failed to open camera device {}", device_id); + } + + // Set resolution and FPS + let _ = cam.set(CAP_PROP_FRAME_WIDTH, self.config.resolution.0 as f64); + let _ = cam.set(CAP_PROP_FRAME_HEIGHT, self.config.resolution.1 as f64); + let _ = cam.set(CAP_PROP_FPS, self.config.fps); + + // Read back actual values + self.actual_width = cam.get(CAP_PROP_FRAME_WIDTH)? as u32; + self.actual_height = cam.get(CAP_PROP_FRAME_HEIGHT)? as u32; + self.actual_fps = cam.get(CAP_PROP_FPS)?; + + println!("āœ… OpenCV camera initialized successfully"); + println!(" Actual resolution: {}x{}", self.actual_width, self.actual_height); + println!(" Actual FPS: {:.1}", self.actual_fps); + + self.capture = Some(cam); + self.is_running = true; + + Ok(()) + }) + } + + fn capture_frame( + &mut self, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + let cam = self.capture.as_mut() + .ok_or_else(|| anyhow::anyhow!("Camera not initialized"))?; + + let mut mat = Mat::default(); + + // Read frame (blocking call) + let success = cam.read(&mut mat)?; + + if !success || mat.empty() { + anyhow::bail!("Failed to capture frame - empty or read failed"); + } + + self.frame_count += 1; + + // Convert Mat to FrameData + let frame_data = self.mat_to_frame_data(&mat)?; + + // Log progress periodically + if self.frame_count == 1 || self.frame_count % 30 == 0 { + println!("šŸ“ø Frame {} captured ({}x{})", + self.frame_count, frame_data.width, frame_data.height); + } + + Ok(CapturedFrame::new( + Arc::new(frame_data), + self.frame_count, + Utc::now(), + FrameMetadata::default(), + )) + }) + } + + fn get_metadata(&self) -> CameraMetadata { + CameraMetadata { + camera_id: self.get_device_id().to_string(), + camera_type: "OpenCV".to_string(), + supported_formats: vec![FrameFormat::Grayscale, FrameFormat::RGB888], + max_resolution: (1920, 1080), + current_resolution: (self.actual_width, self.actual_height), + target_fps: self.actual_fps, + is_real_time: true, + total_frames: None, + } + } + + fn is_running(&self) -> bool { + self.is_running + } + + fn frame_count(&self) -> u64 { + self.frame_count + } + + fn shutdown( + &mut self, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + println!("šŸ›‘ Shutting down OpenCV camera..."); + + if let Some(mut cam) = self.capture.take() { + // Explicitly release the camera resource + // This is the key advantage over nokhwa - reliable resource cleanup + cam.release().context("Failed to release VideoCapture")?; + println!("āœ… OpenCV camera released successfully"); + } + + self.is_running = false; + Ok(()) + }) + } +} + +impl Drop for OpenCVCamera { + fn drop(&mut self) { + // Ensure camera is released even if shutdown wasn't called + if let Some(mut cam) = self.capture.take() { + let _ = cam.release(); + println!("šŸ—‘ļø OpenCV camera dropped and released"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bgr_to_grayscale() { + // Test BGR to grayscale conversion + let bgr = vec![ + 0, 0, 255, // Pure red -> should be ~76 + 0, 255, 0, // Pure green -> should be ~150 + 255, 0, 0, // Pure blue -> should be ~29 + 255, 255, 255, // White -> should be 255 + ]; + + let gray = bgr_to_grayscale(&bgr, 4, 1); + + assert_eq!(gray.len(), 4); + // Check approximate values (luminance formula) + assert!(gray[0] > 70 && gray[0] < 80, "Red should be ~76, got {}", gray[0]); + assert!(gray[1] > 145 && gray[1] < 155, "Green should be ~150, got {}", gray[1]); + assert!(gray[2] > 25 && gray[2] < 35, "Blue should be ~29, got {}", gray[2]); + assert_eq!(gray[3], 255, "White should be 255"); + } +} diff --git a/meteor-edge-client/src/camera/pixel_convert.rs b/meteor-edge-client/src/camera/pixel_convert.rs new file mode 100644 index 0000000..39fe50f --- /dev/null +++ b/meteor-edge-client/src/camera/pixel_convert.rs @@ -0,0 +1,244 @@ +//! Pixel format conversion utilities for camera capture +//! +//! This module provides efficient conversion from various camera pixel formats +//! to grayscale, which is the primary format used by the Vida detection algorithm. + +/// Convert RGB888 (24-bit) to grayscale using ITU-R BT.601 coefficients +/// Y = 0.299*R + 0.587*G + 0.114*B +pub fn rgb888_to_grayscale(rgb_data: &[u8], width: usize, height: usize) -> Vec { + let expected_size = width * height * 3; + if rgb_data.len() < expected_size { + return vec![0u8; width * height]; + } + + let mut grayscale = Vec::with_capacity(width * height); + + for pixel in rgb_data.chunks_exact(3) { + let r = pixel[0] as u32; + let g = pixel[1] as u32; + let b = pixel[2] as u32; + // BT.601 coefficients: 0.299, 0.587, 0.114 + // Using fixed-point: (77*R + 150*G + 29*B) >> 8 + let y = ((77 * r + 150 * g + 29 * b) >> 8) as u8; + grayscale.push(y); + } + + grayscale +} + +/// Convert YUYV (YUV 4:2:2 packed) to grayscale +/// YUYV format: Y0 U0 Y1 V0 (4 bytes for 2 pixels) +/// We only need the Y channel for grayscale +pub fn yuyv_to_grayscale(yuyv_data: &[u8], width: usize, height: usize) -> Vec { + let expected_size = width * height * 2; + if yuyv_data.len() < expected_size { + return vec![0u8; width * height]; + } + + let mut grayscale = Vec::with_capacity(width * height); + + // YUYV: [Y0, U, Y1, V, Y2, U, Y3, V, ...] + // Extract Y values at positions 0, 2, 4, 6, ... + for chunk in yuyv_data.chunks_exact(4) { + grayscale.push(chunk[0]); // Y0 + grayscale.push(chunk[2]); // Y1 + } + + grayscale +} + +/// Convert NV12 (YUV 4:2:0 semi-planar) to grayscale +/// NV12 format: Y plane followed by interleaved UV plane +/// Y plane size: width * height +/// UV plane size: width * height / 2 +pub fn nv12_to_grayscale(nv12_data: &[u8], width: usize, height: usize) -> Vec { + let y_size = width * height; + if nv12_data.len() < y_size { + return vec![0u8; y_size]; + } + + // NV12: Y plane is already grayscale, just copy it + nv12_data[..y_size].to_vec() +} + +/// Convert NV21 (YUV 4:2:0 semi-planar, VU order) to grayscale +/// Same as NV12 for grayscale conversion since we only need Y plane +pub fn nv21_to_grayscale(nv21_data: &[u8], width: usize, height: usize) -> Vec { + nv12_to_grayscale(nv21_data, width, height) +} + +/// Convert I420 (YUV 4:2:0 planar) to grayscale +/// I420 format: Y plane, U plane, V plane (all separate) +pub fn i420_to_grayscale(i420_data: &[u8], width: usize, height: usize) -> Vec { + nv12_to_grayscale(i420_data, width, height) +} + +/// Convert BGR888 (24-bit, OpenCV format) to grayscale +pub fn bgr888_to_grayscale(bgr_data: &[u8], width: usize, height: usize) -> Vec { + let expected_size = width * height * 3; + if bgr_data.len() < expected_size { + return vec![0u8; width * height]; + } + + let mut grayscale = Vec::with_capacity(width * height); + + for pixel in bgr_data.chunks_exact(3) { + let b = pixel[0] as u32; + let g = pixel[1] as u32; + let r = pixel[2] as u32; + let y = ((77 * r + 150 * g + 29 * b) >> 8) as u8; + grayscale.push(y); + } + + grayscale +} + +/// Convert RGBA8888 (32-bit with alpha) to grayscale +pub fn rgba8888_to_grayscale(rgba_data: &[u8], width: usize, height: usize) -> Vec { + let expected_size = width * height * 4; + if rgba_data.len() < expected_size { + return vec![0u8; width * height]; + } + + let mut grayscale = Vec::with_capacity(width * height); + + for pixel in rgba_data.chunks_exact(4) { + let r = pixel[0] as u32; + let g = pixel[1] as u32; + let b = pixel[2] as u32; + // Alpha channel (pixel[3]) is ignored + let y = ((77 * r + 150 * g + 29 * b) >> 8) as u8; + grayscale.push(y); + } + + grayscale +} + +/// Decode MJPEG frame to grayscale +/// Note: This is a simplified implementation that requires the `image` crate +#[cfg(feature = "nokhwa_camera")] +pub fn mjpeg_to_grayscale(jpeg_data: &[u8], width: usize, height: usize) -> Vec { + use image::codecs::jpeg::JpegDecoder; + use image::{DynamicImage, ImageDecoder}; + use std::io::Cursor; + + let cursor = Cursor::new(jpeg_data); + match JpegDecoder::new(cursor) { + Ok(decoder) => { + let (w, h) = decoder.dimensions(); + if let Ok(img) = DynamicImage::from_decoder(decoder) { + let gray = img.to_luma8(); + return gray.into_raw(); + } + vec![0u8; (w * h) as usize] + } + Err(_) => { + // Return black frame on decode error + vec![0u8; width * height] + } + } +} + +#[cfg(not(feature = "nokhwa_camera"))] +pub fn mjpeg_to_grayscale(_jpeg_data: &[u8], width: usize, height: usize) -> Vec { + // Without image crate, return placeholder + vec![128u8; width * height] +} + +/// Pixel format enum for conversion dispatch +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum PixelFormat { + RGB888, + BGR888, + RGBA8888, + YUYV, + NV12, + NV21, + I420, + MJPEG, + Grayscale, +} + +/// Convert any supported pixel format to grayscale +pub fn to_grayscale( + data: &[u8], + width: usize, + height: usize, + format: PixelFormat, +) -> Vec { + match format { + PixelFormat::RGB888 => rgb888_to_grayscale(data, width, height), + PixelFormat::BGR888 => bgr888_to_grayscale(data, width, height), + PixelFormat::RGBA8888 => rgba8888_to_grayscale(data, width, height), + PixelFormat::YUYV => yuyv_to_grayscale(data, width, height), + PixelFormat::NV12 => nv12_to_grayscale(data, width, height), + PixelFormat::NV21 => nv21_to_grayscale(data, width, height), + PixelFormat::I420 => i420_to_grayscale(data, width, height), + PixelFormat::MJPEG => mjpeg_to_grayscale(data, width, height), + PixelFormat::Grayscale => data.to_vec(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rgb_to_grayscale() { + // White pixel (255, 255, 255) should become ~255 + let rgb = vec![255u8, 255, 255]; + let gray = rgb888_to_grayscale(&rgb, 1, 1); + assert_eq!(gray.len(), 1); + assert!(gray[0] >= 254); // Allow for rounding + + // Black pixel (0, 0, 0) should become 0 + let rgb = vec![0u8, 0, 0]; + let gray = rgb888_to_grayscale(&rgb, 1, 1); + assert_eq!(gray[0], 0); + + // Pure red (255, 0, 0) should become ~77 + let rgb = vec![255u8, 0, 0]; + let gray = rgb888_to_grayscale(&rgb, 1, 1); + assert!(gray[0] >= 75 && gray[0] <= 78); + } + + #[test] + fn test_yuyv_to_grayscale() { + // YUYV: Y0=100, U=128, Y1=200, V=128 + let yuyv = vec![100u8, 128, 200, 128]; + let gray = yuyv_to_grayscale(&yuyv, 2, 1); + assert_eq!(gray.len(), 2); + assert_eq!(gray[0], 100); + assert_eq!(gray[1], 200); + } + + #[test] + fn test_nv12_to_grayscale() { + // NV12: Y plane followed by UV + let y_data = vec![50u8, 100, 150, 200]; + let uv_data = vec![128u8, 128]; // UV for 2x2 image + let mut nv12 = y_data.clone(); + nv12.extend(uv_data); + + let gray = nv12_to_grayscale(&nv12, 2, 2); + assert_eq!(gray, y_data); + } + + #[test] + fn test_to_grayscale_dispatch() { + let rgb = vec![128u8, 128, 128]; + let gray = to_grayscale(&rgb, 1, 1, PixelFormat::RGB888); + assert_eq!(gray.len(), 1); + // Mid-gray should stay mid-gray + assert!(gray[0] >= 126 && gray[0] <= 130); + } + + #[test] + fn test_empty_input_handling() { + let empty: Vec = vec![]; + let gray = rgb888_to_grayscale(&empty, 10, 10); + // Should return black frame of expected size + assert_eq!(gray.len(), 100); + assert!(gray.iter().all(|&v| v == 0)); + } +} diff --git a/meteor-edge-client/src/camera/production.rs b/meteor-edge-client/src/camera/production.rs index 6b8acc6..df56d88 100644 --- a/meteor-edge-client/src/camera/production.rs +++ b/meteor-edge-client/src/camera/production.rs @@ -1,35 +1,205 @@ -/// Production camera implementation without any simulation dependencies -/// This module contains only real hardware camera logic +//! Production camera implementation with nokhwa cross-platform support +//! +//! This module provides real hardware camera capture using the nokhwa library, +//! which supports V4L2 (Linux), AVFoundation (macOS), and DirectShow (Windows). +//! +//! Uses CallbackCamera for proper threaded frame capture on all platforms. + use anyhow::{Context, Result}; -// Removed async_trait since we're using boxed futures now use chrono::Utc; +use std::pin::Pin; use std::sync::Arc; -use tokio::time::{sleep, Duration}; +use std::task::{Context as TaskContext, Poll}; +use tokio_util::sync::CancellationToken; use super::interface::{CameraInterface, CameraMetadata, CapturedFrame, FrameMetadata}; use crate::memory::frame_data::{FrameData, FrameFormat}; use crate::memory::frame_pool::HierarchicalFramePool; +#[cfg(feature = "nokhwa_camera")] +use super::pixel_convert::{self, PixelFormat}; + +#[cfg(feature = "nokhwa_camera")] +use nokhwa::{ + pixel_format::RgbFormat, + utils::{CameraIndex, RequestedFormat, RequestedFormatType}, + threaded::CallbackCamera, +}; + +#[cfg(feature = "nokhwa_camera")] +use std::sync::Mutex; + +#[cfg(feature = "nokhwa_camera")] +struct BlockingCapture { + handle: Option, u32, u32)>>>, +} + +#[cfg(feature = "nokhwa_camera")] +impl BlockingCapture { + fn new( + camera: Arc>, + cancel_token: Option, + ) -> Self { + let handle = tokio::task::spawn_blocking(move || -> Result<(Vec, u32, u32)> { + if let Some(token) = cancel_token.as_ref() { + if token.is_cancelled() { + return Err(anyhow::anyhow!("Capture cancelled")); + } + } + + let mut camera_guard = camera + .lock() + .map_err(|e| anyhow::anyhow!("Failed to lock camera: {}", e))?; + + // Use last_frame with retry (non-blocking approach) + let mut buffer = None; + let max_retries = 50; // 50 * 20ms = 1 second max wait + for attempt in 0..max_retries { + if let Some(token) = cancel_token.as_ref() { + if token.is_cancelled() { + return Err(anyhow::anyhow!("Capture cancelled")); + } + } + + match camera_guard.last_frame() { + Ok(frame) => { + buffer = Some(frame); + break; + } + Err(e) => { + if attempt == max_retries - 1 { + return Err(anyhow::anyhow!( + "Failed to get frame after {} retries: {}", + max_retries, + e + )); + } + // Brief sleep between retries + std::thread::sleep(std::time::Duration::from_millis(20)); + } + } + } + let buffer = buffer.context("No frame available from camera")?; + + let width = buffer.resolution().width(); + let height = buffer.resolution().height(); + + if let Some(token) = cancel_token.as_ref() { + if token.is_cancelled() { + return Err(anyhow::anyhow!("Capture cancelled")); + } + } + + // Warn if frame appears empty (likely permission issue) + if width == 0 || height == 0 || buffer.buffer().is_empty() { + static WARNED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); + if !WARNED.swap(true, std::sync::atomic::Ordering::Relaxed) { + eprintln!("āš ļø Warning: Camera frames appear empty. This may be due to:"); + eprintln!(" - macOS camera permission not granted to terminal app"); + eprintln!(" - Go to System Preferences > Privacy & Security > Camera"); + eprintln!(" - Grant access to Terminal or iTerm2"); + } + } + + // Get raw bytes and convert to grayscale + let raw_data = buffer.buffer(); + + // Determine pixel format based on buffer format + let pixel_format = match buffer.source_frame_format() { + nokhwa::utils::FrameFormat::MJPEG => PixelFormat::MJPEG, + nokhwa::utils::FrameFormat::YUYV => PixelFormat::YUYV, + nokhwa::utils::FrameFormat::NV12 => PixelFormat::NV12, + nokhwa::utils::FrameFormat::RAWRGB => PixelFormat::RGB888, + _ => PixelFormat::RGB888, // Default to RGB + }; + + // Try to decode to RGB first, then convert to grayscale + let grayscale_data = if let Ok(rgb_buffer) = buffer.decode_image::() { + // Decoded successfully to RGB + pixel_convert::rgb888_to_grayscale( + rgb_buffer.as_raw(), + width as usize, + height as usize, + ) + } else { + // Fallback to direct conversion + pixel_convert::to_grayscale( + raw_data, + width as usize, + height as usize, + pixel_format, + ) + }; + + Ok((grayscale_data, width, height)) + }); + + Self { + handle: Some(handle), + } + } +} + +#[cfg(feature = "nokhwa_camera")] +impl Drop for BlockingCapture { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.abort(); + } + } +} + +#[cfg(feature = "nokhwa_camera")] +impl std::future::Future for BlockingCapture { + type Output = Result<(Vec, u32, u32)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll { + let handle = self.handle.as_mut().expect("join handle missing"); + match Pin::new(handle).poll(cx) { + Poll::Ready(res) => { + self.handle = None; + match res { + Ok(inner) => Poll::Ready(inner), + Err(e) => Poll::Ready(Err(anyhow::anyhow!("Capture task failed: {}", e))), + } + } + Poll::Pending => Poll::Pending, + } + } +} + /// Production camera implementation for real hardware +/// Uses CallbackCamera for thread-safe frame capture pub struct ProductionCamera { - /// Device identifier (e.g., "/dev/video0" or device index) + /// Device identifier (e.g., "0" for first camera, or "/dev/video0") device_id: String, - /// Backend system (v4l2, directshow, etc.) + /// Backend system (nokhwa, v4l2, etc.) + #[allow(dead_code)] backend: String, /// Current resolution resolution: (u32, u32), /// Target frame rate target_fps: f64, /// Frame pool for memory management + #[allow(dead_code)] frame_pool: Arc, /// Current frame counter frame_counter: u64, /// Whether the camera is currently running is_running: bool, + /// Cancellation token for cooperative shutdown + cancellation_token: Option, /// Camera metadata metadata: CameraMetadata, + /// Threaded camera instance (when feature enabled) + #[cfg(feature = "nokhwa_camera")] + callback_camera: Option>>, } +// ProductionCamera is Send + Sync because CallbackCamera is wrapped in Arc> +unsafe impl Send for ProductionCamera {} +unsafe impl Sync for ProductionCamera {} + impl ProductionCamera { /// Create a new production camera instance pub fn new( @@ -42,7 +212,12 @@ impl ProductionCamera { let metadata = CameraMetadata { camera_id: device_id.clone(), camera_type: format!("Production-{}", backend), - supported_formats: vec![FrameFormat::JPEG, FrameFormat::RGB888, FrameFormat::YUV420], + supported_formats: vec![ + FrameFormat::Grayscale, + FrameFormat::RGB888, + FrameFormat::YUV420, + FrameFormat::JPEG, + ], max_resolution: (1920, 1080), current_resolution: resolution, target_fps, @@ -58,45 +233,34 @@ impl ProductionCamera { frame_pool, frame_counter: 0, is_running: false, + cancellation_token: None, metadata, + #[cfg(feature = "nokhwa_camera")] + callback_camera: None, }) } - /// Initialize camera hardware (placeholder for real implementation) - async fn initialize_hardware(&mut self) -> Result<()> { - println!("šŸŽ„ Initializing production camera hardware..."); - println!(" Device ID: {}", self.device_id); - println!(" Backend: {}", self.backend); - println!(" Resolution: {}x{}", self.resolution.0, self.resolution.1); - println!(" Target FPS: {}", self.target_fps); + /// Parse device ID to camera index + #[cfg(feature = "nokhwa_camera")] + fn parse_camera_index(device_id: &str) -> Result { + // Support various device ID formats: + // - "0", "1", etc. -> index + // - "/dev/video0" -> extract index + // - "device:0" -> extract index - // TODO: Replace with actual camera initialization - // This is where you would: - // 1. Open camera device - // 2. Set resolution and format - // 3. Configure frame rate - // 4. Allocate capture buffers - // 5. Validate camera capabilities + let index_str = if device_id.starts_with("/dev/video") { + device_id.trim_start_matches("/dev/video") + } else if device_id.starts_with("device:") { + device_id.trim_start_matches("device:") + } else { + device_id + }; - // For now, simulate hardware check - sleep(Duration::from_millis(100)).await; + let index: u32 = index_str + .parse() + .context("Failed to parse camera device index")?; - println!("āœ… Production camera hardware initialized"); - Ok(()) - } - - /// Capture frame from real hardware (placeholder implementation) - async fn capture_hardware_frame(&mut self) -> Result { - // TODO: Replace with actual hardware frame capture - // This is where you would: - // 1. Read frame from camera device - // 2. Handle different pixel formats - // 3. Apply necessary color space conversions - // 4. Handle camera-specific metadata - // 5. Implement proper error handling - - // For now, return an error indicating not implemented - anyhow::bail!("Production camera capture not yet implemented. This is a placeholder for real hardware integration.") + Ok(CameraIndex::Index(index)) } } @@ -105,9 +269,137 @@ impl CameraInterface for ProductionCamera { &mut self, ) -> std::pin::Pin> + Send + '_>> { Box::pin(async move { - self.initialize_hardware() + #[cfg(feature = "nokhwa_camera")] + { + println!("šŸŽ„ Initializing production camera with nokhwa (CallbackCamera)..."); + println!(" Device ID: {}", self.device_id); + println!(" Resolution: {}x{}", self.resolution.0, self.resolution.1); + println!(" Target FPS: {}", self.target_fps); + + let device_id = self.device_id.clone(); + + // Initialize camera in blocking task + let camera_result = tokio::task::spawn_blocking(move || -> Result<(CallbackCamera, String, String, u32, u32)> { + // Initialize nokhwa (required on macOS for permissions) + #[cfg(target_os = "macos")] + { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + static INITIALIZED: AtomicBool = AtomicBool::new(false); + + if !INITIALIZED.swap(true, Ordering::SeqCst) { + println!(" Initializing nokhwa for macOS..."); + let permission_result = Arc::new(std::sync::Mutex::new(None::)); + let permission_clone = permission_result.clone(); + + nokhwa::nokhwa_initialize(move |granted| { + *permission_clone.lock().unwrap() = Some(granted); + if granted { + println!(" āœ… Camera permission granted"); + } else { + println!(" āš ļø Camera permission denied - check System Preferences > Privacy > Camera"); + } + }); + + // Give macOS time to process permissions (up to 2 seconds) + for _ in 0..20 { + std::thread::sleep(std::time::Duration::from_millis(100)); + if permission_result.lock().unwrap().is_some() { + break; + } + } + } + } + + let camera_index = Self::parse_camera_index(&device_id) + .context("Failed to parse camera index")?; + + // Use AbsoluteHighestFrameRate for best compatibility + let requested = RequestedFormat::new::( + RequestedFormatType::AbsoluteHighestFrameRate + ); + + println!(" Creating CallbackCamera..."); + + // Create callback camera with retry logic (max 3 attempts) + let mut last_error = None; + let mut callback_camera_opt = None; + + for attempt in 1..=3 { + match CallbackCamera::new( + camera_index.clone(), + requested.clone(), + |_buffer| { + // Empty callback - we use poll_frame() instead + } + ) { + Ok(cam) => { + if attempt > 1 { + println!(" āœ… Camera initialized on attempt {}", attempt); + } + callback_camera_opt = Some(cam); + break; + } + Err(e) => { + println!(" āš ļø Camera init attempt {}/3 failed: {}", attempt, e); + last_error = Some(e); + if attempt < 3 { + println!(" Retrying in 500ms..."); + std::thread::sleep(std::time::Duration::from_millis(500)); + } + } + } + } + + let callback_camera = callback_camera_opt + .ok_or_else(|| anyhow::anyhow!("Camera initialization failed: {:?}", last_error)) + .context("Failed to create callback camera after 3 attempts")?; + + let info = callback_camera.info().clone(); + let res = callback_camera.resolution() + .context("Failed to get camera resolution")?; + + Ok(( + callback_camera, + info.human_name().to_string(), + info.description().to_string(), + res.width(), + res.height(), + )) + }) .await - .context("Failed to initialize production camera hardware")?; + .context("Camera initialization task panicked")? + .context("Failed to initialize production camera")?; + + let (mut callback_camera, camera_name, camera_desc, width, height) = camera_result; + + println!(" Camera name: {}", camera_name); + println!(" Camera description: {}", camera_desc); + println!(" Actual resolution: {}x{}", width, height); + + // Open the camera stream + println!(" Opening camera stream..."); + callback_camera.open_stream().context("Failed to open camera stream")?; + + // Give the camera time to start streaming + std::thread::sleep(std::time::Duration::from_millis(500)); + + println!(" āœ… Camera stream opened"); + + self.resolution = (width, height); + self.metadata.current_resolution = self.resolution; + self.callback_camera = Some(Arc::new(Mutex::new(callback_camera))); + + println!("āœ… Production camera initialized successfully"); + } + + #[cfg(not(feature = "nokhwa_camera"))] + { + println!("šŸŽ„ Production camera hardware support not compiled."); + println!(" Compile with --features nokhwa_camera to enable."); + println!(" Using placeholder mode for testing."); + } + self.is_running = true; Ok(()) }) @@ -122,13 +414,45 @@ impl CameraInterface for ProductionCamera { anyhow::bail!("Camera not initialized or not running"); } - let frame = self - .capture_hardware_frame() - .await - .context("Failed to capture frame from production camera")?; + #[cfg(feature = "nokhwa_camera")] + { + let cancel_token = self.cancellation_token.clone(); + let camera = self.callback_camera.as_ref() + .context("Camera not initialized")? + .clone(); - self.frame_counter += 1; - Ok(frame) + let frame_number = self.frame_counter + 1; + + let mut frame_data = BlockingCapture::new(camera, cancel_token.clone()).await?; + + self.frame_counter = frame_number; + + // Log progress periodically + if frame_number <= 3 || frame_number % 100 == 0 { + println!("šŸ“ø Frame {} captured ({}x{})", frame_number, frame_data.1, frame_data.2); + } + + Ok(CapturedFrame { + data: Arc::new(FrameData::new( + frame_data.0, + frame_data.1, + frame_data.2, + FrameFormat::Grayscale, + )), + frame_number, + capture_timestamp: Utc::now(), + metadata: FrameMetadata::default(), + }) + } + + #[cfg(not(feature = "nokhwa_camera"))] + { + anyhow::bail!( + "Production camera capture requires --features nokhwa_camera.\n\ + Current build does not include hardware camera support.\n\ + Use 'cargo build --features nokhwa_camera' to enable." + ) + } }) } @@ -151,12 +475,15 @@ impl CameraInterface for ProductionCamera { if self.is_running { println!("šŸ›‘ Shutting down production camera..."); - // TODO: Replace with actual camera shutdown - // This is where you would: - // 1. Stop capture - // 2. Release camera device - // 3. Clean up resources - // 4. Reset camera state + #[cfg(feature = "nokhwa_camera")] + { + if let Some(camera) = self.callback_camera.take() { + // Drop the camera handle to let the OS reclaim the device. + // Calling stop_stream can hang on some macOS setups, so we prefer + // a quick drop here to avoid blocking shutdown. + drop(camera); + } + } self.is_running = false; println!("āœ… Production camera shut down successfully"); @@ -164,6 +491,10 @@ impl CameraInterface for ProductionCamera { Ok(()) }) } + + fn set_cancellation_token(&mut self, token: CancellationToken) { + self.cancellation_token = Some(token); + } } /// Camera capabilities detection for production hardware @@ -171,30 +502,118 @@ pub struct ProductionCameraCapabilities; impl ProductionCameraCapabilities { /// Detect available cameras on the system + #[cfg(feature = "nokhwa_camera")] pub fn detect_cameras() -> Result> { - // TODO: Implement actual camera detection - // This would scan for available camera devices using: - // - V4L2 on Linux - // - DirectShow on Windows - // - AVFoundation on macOS + use nokhwa::query; println!("šŸ” Detecting production cameras..."); - // Placeholder: return empty list for now + // Initialize nokhwa on macOS + #[cfg(target_os = "macos")] + { + nokhwa::nokhwa_initialize(|_| {}); + } + + let cameras = query(nokhwa::utils::ApiBackend::Auto) + .context("Failed to query cameras")?; + + let mut devices = Vec::new(); + + for camera_info in cameras { + let index = match camera_info.index() { + CameraIndex::Index(i) => i.to_string(), + CameraIndex::String(s) => s.clone(), + }; + + devices.push(CameraDeviceInfo { + device_id: index, + device_name: camera_info.human_name().to_string(), + vendor: "Unknown".to_string(), + model: camera_info.description().to_string(), + bus_info: camera_info.misc().to_string(), + }); + + println!( + " Found: {} - {}", + camera_info.human_name(), + camera_info.description() + ); + } + + println!(" Total cameras found: {}", devices.len()); + Ok(devices) + } + + /// Detect cameras placeholder when feature disabled + #[cfg(not(feature = "nokhwa_camera"))] + pub fn detect_cameras() -> Result> { + println!("šŸ” Camera detection requires --features nokhwa_camera"); Ok(vec![]) } /// Get detailed capabilities for a specific camera device + #[cfg(feature = "nokhwa_camera")] pub fn get_device_capabilities(device_id: &str) -> Result { - // TODO: Query actual device capabilities + use nokhwa::Camera; println!("šŸ“‹ Querying capabilities for device: {}", device_id); - // Placeholder implementation + let camera_index = ProductionCamera::parse_camera_index(device_id)?; + let requested = RequestedFormat::new::(RequestedFormatType::None); + + let mut camera = Camera::new(camera_index, requested) + .context("Failed to open camera for capability query")?; + + let compatible_formats = camera.compatible_list_by_resolution( + nokhwa::utils::FrameFormat::MJPEG, + ); + + let mut resolutions: Vec<(u32, u32)> = Vec::new(); + if let Ok(formats) = compatible_formats { + for (resolution, _) in formats { + let res = (resolution.width(), resolution.height()); + if !resolutions.contains(&res) { + resolutions.push(res); + } + } + } + + // Default common resolutions if query fails + if resolutions.is_empty() { + resolutions = vec![(640, 480), (1280, 720), (1920, 1080)]; + } + + Ok(DeviceCapabilities { + device_id: device_id.to_string(), + supported_resolutions: resolutions, + supported_formats: vec![ + FrameFormat::Grayscale, + FrameFormat::RGB888, + FrameFormat::YUV420, + FrameFormat::JPEG, + ], + min_fps: 1.0, + max_fps: 60.0, + has_hardware_encoding: false, + }) + } + + /// Get device capabilities placeholder when feature disabled + #[cfg(not(feature = "nokhwa_camera"))] + pub fn get_device_capabilities(device_id: &str) -> Result { + println!( + "šŸ“‹ Device capability query requires --features nokhwa_camera" + ); + Ok(DeviceCapabilities { device_id: device_id.to_string(), supported_resolutions: vec![(640, 480), (1280, 720), (1920, 1080)], - supported_formats: vec![FrameFormat::JPEG, FrameFormat::RGB888, FrameFormat::YUV420], + supported_formats: vec![ + FrameFormat::Grayscale, + FrameFormat::RGB888, + FrameFormat::YUV420, + FrameFormat::JPEG, + ], min_fps: 1.0, max_fps: 60.0, has_hardware_encoding: false, @@ -231,8 +650,8 @@ mod tests { async fn test_production_camera_creation() { let frame_pool = Arc::new(HierarchicalFramePool::new(10)); let camera = ProductionCamera::new( - "/dev/video0".to_string(), - "v4l2".to_string(), + "0".to_string(), + "nokhwa".to_string(), (640, 480), 30.0, frame_pool, @@ -240,8 +659,7 @@ mod tests { assert!(camera.is_ok()); let camera = camera.unwrap(); - assert_eq!(camera.device_id, "/dev/video0"); - assert_eq!(camera.backend, "v4l2"); + assert_eq!(camera.device_id, "0"); assert!(!camera.is_running()); } @@ -254,12 +672,26 @@ mod tests { #[test] fn test_device_capabilities_query() { - let caps = ProductionCameraCapabilities::get_device_capabilities("/dev/video0"); + let caps = ProductionCameraCapabilities::get_device_capabilities("0"); assert!(caps.is_ok()); let caps = caps.unwrap(); - assert_eq!(caps.device_id, "/dev/video0"); + assert_eq!(caps.device_id, "0"); assert!(!caps.supported_resolutions.is_empty()); assert!(!caps.supported_formats.is_empty()); } + + #[cfg(feature = "nokhwa_camera")] + #[test] + fn test_parse_camera_index() { + // Test various device ID formats + let index = ProductionCamera::parse_camera_index("0").unwrap(); + assert!(matches!(index, CameraIndex::Index(0))); + + let index = ProductionCamera::parse_camera_index("/dev/video0").unwrap(); + assert!(matches!(index, CameraIndex::Index(0))); + + let index = ProductionCamera::parse_camera_index("device:1").unwrap(); + assert!(matches!(index, CameraIndex::Index(1))); + } } diff --git a/meteor-edge-client/src/core/app.rs b/meteor-edge-client/src/core/app.rs index 8f96705..9283e43 100644 --- a/meteor-edge-client/src/core/app.rs +++ b/meteor-edge-client/src/core/app.rs @@ -1,7 +1,9 @@ use anyhow::Result; use std::time::Duration; +use tokio::sync::broadcast; use tokio::task::JoinHandle; use tokio::time::sleep; +use tokio_util::sync::CancellationToken; use crate::network::api::ApiClient; use crate::camera::{CameraConfig, CameraController}; @@ -17,19 +19,24 @@ use crate::storage::storage::StorageController; /// Core application coordinator that manages the event bus and background tasks pub struct Application { event_bus: EventBus, - background_tasks: Vec>, + background_tasks: Vec<(String, JoinHandle<()>)>, memory_monitor: MemoryMonitor, camera_override: Option, + shutdown_tx: broadcast::Sender<()>, + cancellation_token: CancellationToken, } impl Application { /// Create a new Application instance with an event bus pub fn new(event_bus_capacity: usize) -> Self { + let (shutdown_tx, _) = broadcast::channel(16); Self { event_bus: EventBus::new(event_bus_capacity), background_tasks: Vec::new(), memory_monitor: MemoryMonitor::new(), camera_override: None, + shutdown_tx, + cancellation_token: CancellationToken::new(), } } @@ -44,21 +51,30 @@ impl Application { // Create a test subscriber to verify event flow let mut test_subscriber = self.event_bus.subscribe(); + let mut test_shutdown_rx = self.shutdown_tx.subscribe(); // Spawn a background task to handle test events let test_handle = tokio::spawn(async move { println!("šŸ“” Test subscriber started, waiting for events..."); - let mut system_started = false; - let mut frame_count = 0; + let mut frame_count = 0u64; - while let Ok(event) = test_subscriber.recv().await { - match event.as_ref() { + loop { + tokio::select! { + _ = test_shutdown_rx.recv() => { + println!("šŸ“” Test subscriber received shutdown signal"); + break; + } + event_result = test_subscriber.recv() => { + let event = match event_result { + Ok(e) => e, + Err(_) => break, + }; + match event.as_ref() { SystemEvent::SystemStarted(system_event) => { println!("āœ… Received SystemStartedEvent!"); println!(" Timestamp: {}", system_event.timestamp); println!(" Version: {}", system_event.version); println!(" Event verification successful! šŸŽ‰"); - system_started = true; } SystemEvent::FrameCaptured(frame_event) => { frame_count += 1; @@ -66,7 +82,8 @@ impl Application { // Record memory optimization metrics record_frame_processed(frame_event.data_size(), 3); // Assume 3 subscribers - if frame_count <= 5 || frame_count % 30 == 0 { + // Log first 5 frames and then every 100th frame + if frame_count <= 5 || frame_count % 100 == 0 { println!("šŸ“ø Received FrameCapturedEvent #{}", frame_event.frame_id); println!(" Timestamp: {}", frame_event.timestamp); let (width, height) = frame_event.dimensions(); @@ -77,15 +94,6 @@ impl Application { ); println!(" Format: {:?}", frame_event.frame_data.format); } - - // Exit after receiving some frames for demo - if frame_count >= 10 { - println!( - "šŸŽ¬ Received {} frames, test subscriber stopping...", - frame_count - ); - break; - } } SystemEvent::MeteorDetected(meteor_event) => { println!( @@ -125,13 +133,16 @@ impl Application { event.centroid_count ); } + } + } } } println!("šŸ”š Test subscriber finished"); }); - self.background_tasks.push(test_handle); + self.background_tasks + .push(("test-subscriber".to_string(), test_handle)); // Give the subscriber a moment to be ready sleep(Duration::from_millis(10)).await; @@ -154,38 +165,48 @@ impl Application { None => load_camera_config()?, }; let mut camera_controller = CameraController::new(camera_config, self.event_bus.clone())?; + let camera_shutdown_rx = self.shutdown_tx.subscribe(); // Spawn camera controller in background task + let camera_cancel = self.cancellation_token.clone(); let camera_handle = tokio::spawn(async move { - if let Err(e) = camera_controller.run().await { + if let Err(e) = camera_controller + .run(camera_shutdown_rx, camera_cancel) + .await + { eprintln!("āŒ Camera controller error: {}", e); } }); - self.background_tasks.push(camera_handle); + self.background_tasks + .push(("camera".to_string(), camera_handle)); // Start memory monitoring reporting println!("šŸ“Š Starting memory optimization monitoring..."); + let memory_shutdown_rx = self.shutdown_tx.subscribe(); let memory_handle = tokio::spawn(async move { use crate::memory::GLOBAL_MEMORY_MONITOR; - GLOBAL_MEMORY_MONITOR.start_reporting(30).await; // Report every 30 seconds + GLOBAL_MEMORY_MONITOR.start_reporting_with_shutdown(30, memory_shutdown_rx).await; }); - self.background_tasks.push(memory_handle); + self.background_tasks + .push(("memory-monitor".to_string(), memory_handle)); // Initialize and start detection controller println!("šŸ” Initializing detection controller..."); let detection_config = DetectionConfig::default(); let mut detection_controller = DetectionController::new(detection_config, self.event_bus.clone()); + let detection_shutdown_rx = self.shutdown_tx.subscribe(); // Spawn detection controller in background task let detection_handle = tokio::spawn(async move { - if let Err(e) = detection_controller.run().await { + if let Err(e) = detection_controller.run(detection_shutdown_rx).await { eprintln!("āŒ Detection controller error: {}", e); } }); - self.background_tasks.push(detection_handle); + self.background_tasks + .push(("detection".to_string(), detection_handle)); // Initialize and start storage controller println!("šŸ’¾ Initializing storage controller..."); @@ -198,15 +219,17 @@ impl Application { return Err(e); } }; + let storage_shutdown_rx = self.shutdown_tx.subscribe(); // Spawn storage controller in background task let storage_handle = tokio::spawn(async move { - if let Err(e) = storage_controller.run().await { + if let Err(e) = storage_controller.run(storage_shutdown_rx).await { eprintln!("āŒ Storage controller error: {}", e); } }); - self.background_tasks.push(storage_handle); + self.background_tasks + .push(("storage".to_string(), storage_handle)); // Initialize and start communication controller println!("šŸ“” Initializing communication controller..."); @@ -221,25 +244,29 @@ impl Application { return Err(e); } }; + let communication_shutdown_rx = self.shutdown_tx.subscribe(); // Spawn communication controller in background task let communication_handle = tokio::spawn(async move { - if let Err(e) = communication_controller.run().await { + if let Err(e) = communication_controller.run(communication_shutdown_rx).await { eprintln!("āŒ Communication controller error: {}", e); } }); - self.background_tasks.push(communication_handle); + self.background_tasks + .push(("communication".to_string(), communication_handle)); // Initialize and start heartbeat task println!("šŸ’“ Initializing heartbeat task..."); + let heartbeat_shutdown_rx = self.shutdown_tx.subscribe(); let heartbeat_handle = tokio::spawn(async move { - if let Err(e) = Self::run_heartbeat_task(heartbeat_config).await { + if let Err(e) = Self::run_heartbeat_task(heartbeat_config, heartbeat_shutdown_rx).await { eprintln!("āŒ Heartbeat task error: {}", e); } }); - self.background_tasks.push(heartbeat_handle); + self.background_tasks + .push(("heartbeat".to_string(), heartbeat_handle)); // Run the main application loop println!("šŸ”„ Starting main application loop..."); @@ -248,23 +275,27 @@ impl Application { Ok(()) } - /// Main application loop - this will eventually coordinate all modules + /// Main application loop - waits for shutdown signal async fn main_loop(&mut self) -> Result<()> { - println!("ā³ Main loop running... (will exit after 10 seconds for demo)"); + println!("šŸ”„ Main loop running... (Press Ctrl+C to stop)"); - // For now, just wait a bit to allow the camera to capture frames and test subscriber to process events - sleep(Duration::from_secs(10)).await; + // Wait for shutdown signal (Ctrl+C) + tokio::signal::ctrl_c().await?; - println!("šŸ›‘ Stopping application..."); + println!("\nšŸ›‘ Received shutdown signal, stopping application..."); - // Wait for all background tasks to complete - for task in self.background_tasks.drain(..) { - if let Err(e) = task.await { - eprintln!("āŒ Background task error: {}", e); - } - } + // Send shutdown signal to all listeners + let _ = self.shutdown_tx.send(()); + self.cancellation_token.cancel(); + + // Give tasks time to respond to shutdown signal + println!(" Waiting for tasks to shutdown gracefully..."); + self.wait_for_tasks(Duration::from_secs(3)).await; println!("āœ… Application stopped successfully"); + + // Let process exit naturally - this allows Drop implementations to run + // and properly release camera resources Ok(()) } @@ -278,8 +309,41 @@ impl Application { self.event_bus.subscriber_count() } + async fn wait_for_tasks(&mut self, timeout: Duration) { + for (name, mut handle) in self.background_tasks.drain(..) { + let timeout_sleep = sleep(timeout); + tokio::pin!(timeout_sleep); + + let result = tokio::select! { + res = &mut handle => Some(res), + _ = &mut timeout_sleep => None, + }; + + match result { + Some(res) => { + if let Err(e) = res { + eprintln!("āš ļø Task '{}' exited with error: {}", name, e); + } else { + println!(" Task '{}' stopped cleanly", name); + } + } + None => { + println!( + "āš ļø Task '{}' did not stop within {:?}, aborting...", + name, timeout + ); + handle.abort(); + let _ = handle.await; + } + } + } + } + /// Background task for sending heartbeat signals to the backend - async fn run_heartbeat_task(config: CommunicationConfig) -> Result<()> { + async fn run_heartbeat_task( + config: CommunicationConfig, + mut shutdown_rx: broadcast::Receiver<()>, + ) -> Result<()> { println!("šŸ’“ Starting heartbeat task..."); println!( " Heartbeat interval: {}s", @@ -290,8 +354,16 @@ impl Application { let config_manager = ConfigManager::new(); loop { - // Wait for the configured interval - sleep(Duration::from_secs(config.heartbeat_interval_seconds)).await; + // Wait for heartbeat interval or shutdown signal + tokio::select! { + _ = shutdown_rx.recv() => { + println!("šŸ’“ Heartbeat task received shutdown signal"); + break; + } + _ = sleep(Duration::from_secs(config.heartbeat_interval_seconds)) => { + // Continue to send heartbeat + } + } // Check if device is registered and has configuration if !config_manager.config_exists() { @@ -336,6 +408,9 @@ impl Application { } } } + + println!("šŸ’“ Heartbeat task stopped"); + Ok(()) } } diff --git a/meteor-edge-client/src/detection/detector.rs b/meteor-edge-client/src/detection/detector.rs index 11e9529..4133619 100644 --- a/meteor-edge-client/src/detection/detector.rs +++ b/meteor-edge-client/src/detection/detector.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; use std::collections::VecDeque; +use tokio::sync::broadcast; use tokio::time::{sleep, Duration}; use crate::core::events::{EventBus, FrameCapturedEvent, MeteorDetectedEvent, SystemEvent}; @@ -64,7 +65,7 @@ impl DetectionController { } /// Start the detection loop - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> { println!("šŸ” Starting meteor detection controller..."); println!(" Buffer size: {} frames", self.config.buffer_capacity); println!(" Algorithm: {}", self.config.algorithm_name); @@ -81,6 +82,12 @@ impl DetectionController { loop { tokio::select! { + // Handle shutdown signal + _ = shutdown_rx.recv() => { + println!("šŸ” Detection controller received shutdown signal"); + break; + } + // Handle incoming events event_result = event_receiver.recv() => { match event_result { @@ -104,6 +111,9 @@ impl DetectionController { } } } + + println!("šŸ” Detection controller stopped"); + Ok(()) } /// Handle incoming events from the event bus diff --git a/meteor-edge-client/src/main.rs b/meteor-edge-client/src/main.rs index edc9423..0714577 100644 --- a/meteor-edge-client/src/main.rs +++ b/meteor-edge-client/src/main.rs @@ -74,6 +74,8 @@ enum Commands { }, /// Test hardware fingerprinting TestFingerprint, + /// List available cameras on this system + ListCameras, /// Show device status and configuration Status, /// Check backend connectivity @@ -149,6 +151,9 @@ async fn main() -> Result<()> { std::process::exit(1); } } + Commands::ListCameras => { + list_cameras(); + } Commands::Status => { show_status().await?; } @@ -164,6 +169,29 @@ async fn main() -> Result<()> { debug, offline, } => { + // Pre-open hardware camera on main thread (required for macOS authorization) + // This must happen BEFORE any .await calls + #[cfg(feature = "opencv_camera")] + { + if camera.starts_with("device:") || camera.starts_with("hw:") { + let device_str = camera + .strip_prefix("device:") + .or_else(|| camera.strip_prefix("hw:")) + .unwrap_or("0"); + let device_id: i32 = device_str.parse().unwrap_or(0); + + match camera::opencv_camera::OpenCVCamera::open_capture_on_main_thread(device_id) { + Ok(capture) => { + camera::opencv_camera::set_pre_opened_capture(capture); + } + Err(e) => { + eprintln!("āŒ Failed to open camera on main thread: {}", e); + std::process::exit(1); + } + } + } + } + if let Err(e) = run_application_with_camera(camera.clone(), config.clone(), *debug, *offline).await { @@ -503,6 +531,79 @@ async fn test_hardware_fingerprint() -> Result<()> { Ok(()) } +/// List available cameras on this system +fn list_cameras() { + println!("šŸ“· Camera Information"); + println!(""); + + // OpenCV backend + #[cfg(feature = "opencv_camera")] + { + println!("Backend: OpenCV"); + println!(""); + println!("OpenCV uses index-based camera access."); + println!("Try device:0 for the default camera."); + println!(""); + println!("Usage: meteor-edge-client run --camera device:"); + println!("Example: meteor-edge-client run --camera device:0"); + } + + // nokhwa backend + #[cfg(feature = "nokhwa_camera")] + { + use camera::production::ProductionCameraCapabilities; + + println!("Backend: nokhwa"); + println!(""); + + match ProductionCameraCapabilities::detect_cameras() { + Ok(cameras) => { + if cameras.is_empty() { + println!("āš ļø No cameras detected."); + println!(""); + println!("Possible reasons:"); + println!(" - No camera connected"); + println!(" - Camera permissions not granted (macOS: System Preferences > Privacy > Camera)"); + println!(" - Camera in use by another application"); + } else { + println!("Found {} camera(s):", cameras.len()); + println!(""); + for (i, cam) in cameras.iter().enumerate() { + println!(" [{}] {}", i, cam.device_name); + println!(" Device ID: {}", cam.device_id); + println!(" Model: {}", cam.model); + if !cam.bus_info.is_empty() { + println!(" Bus: {}", cam.bus_info); + } + println!(""); + } + println!("Usage: meteor-edge-client run --camera device:"); + println!("Example: meteor-edge-client run --camera device:0"); + } + } + Err(e) => { + eprintln!("āŒ Failed to detect cameras: {}", e); + eprintln!(""); + eprintln!("On macOS, you may need to grant camera permissions."); + eprintln!("Go to: System Preferences > Privacy & Security > Camera"); + } + } + } + + // No backend compiled + #[cfg(not(any(feature = "opencv_camera", feature = "nokhwa_camera")))] + { + println!("āš ļø No camera backend compiled."); + println!(""); + println!("To enable camera support, rebuild with:"); + println!(" cargo build --features opencv_camera # OpenCV backend (recommended)"); + println!(" cargo build --features nokhwa_camera # nokhwa backend"); + println!(""); + println!("Alternatively, use video files:"); + println!(" meteor-edge-client run --camera file:video.mp4"); + } +} + /// Run the application with specified camera async fn run_application_with_camera( camera_spec_input: String, diff --git a/meteor-edge-client/src/memory/frame_data.rs b/meteor-edge-client/src/memory/frame_data.rs index 44b3baa..d3eea8b 100644 --- a/meteor-edge-client/src/memory/frame_data.rs +++ b/meteor-edge-client/src/memory/frame_data.rs @@ -30,6 +30,8 @@ pub enum FrameFormat { JPEG, /// H.264 encoded frame H264Frame, + /// 8-bit grayscale (single channel) + Grayscale, } impl FrameData { @@ -84,6 +86,7 @@ impl FrameData { FrameFormat::YUV420 => (width * height * 3 / 2) as usize, FrameFormat::JPEG => (width * height) as usize, // Estimate for JPEG FrameFormat::H264Frame => (width * height / 2) as usize, // Estimate for H.264 + FrameFormat::Grayscale => (width * height) as usize, } } diff --git a/meteor-edge-client/src/memory/memory_monitor.rs b/meteor-edge-client/src/memory/memory_monitor.rs index 60eb58a..a97dd13 100644 --- a/meteor-edge-client/src/memory/memory_monitor.rs +++ b/meteor-edge-client/src/memory/memory_monitor.rs @@ -72,6 +72,30 @@ impl MemoryMonitor { } } + /// Start background reporting loop with shutdown support + pub async fn start_reporting_with_shutdown( + &self, + interval_seconds: u64, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, + ) { + let mut reporting_interval = interval(Duration::from_secs(interval_seconds)); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + println!("šŸ“Š Memory monitor received shutdown signal"); + break; + } + _ = reporting_interval.tick() => { + let stats = self.stats(); + Self::log_stats(&stats).await; + } + } + } + + println!("šŸ“Š Memory monitor stopped"); + } + async fn log_stats(stats: &MemoryStats) { if stats.frames_processed > 0 { println!("šŸ“Š Memory Optimization Stats:"); diff --git a/meteor-edge-client/src/network/communication.rs b/meteor-edge-client/src/network/communication.rs index 1c167c0..f107909 100644 --- a/meteor-edge-client/src/network/communication.rs +++ b/meteor-edge-client/src/network/communication.rs @@ -4,6 +4,7 @@ use std::fs; use std::path::{Path, PathBuf}; use std::process::Command; use tokio::fs as async_fs; +use tokio::sync::broadcast; use tokio::time::{sleep, Duration}; use crate::network::api::ApiClient; @@ -55,7 +56,7 @@ impl CommunicationController { } /// Main run loop for the communication controller - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> { println!("šŸ“” Starting communication controller..."); println!(" API Base URL: {}", self.config.api_base_url); println!(" Retry attempts: {}", self.config.retry_attempts); @@ -67,26 +68,40 @@ impl CommunicationController { let mut event_receiver = self.event_bus.subscribe(); loop { - match event_receiver.recv().await { - Ok(event) => { - if let SystemEvent::EventPackageArchived(archive_event) = event.as_ref() { - println!( - "šŸ“¦ Received EventPackageArchivedEvent: {}", - archive_event.event_id - ); + tokio::select! { + // Handle shutdown signal + _ = shutdown_rx.recv() => { + println!("šŸ“” Communication controller received shutdown signal"); + break; + } - if let Err(e) = self.process_archived_event(archive_event.clone()).await { - eprintln!("āŒ Failed to process archived event: {}", e); + // Handle incoming events + event_result = event_receiver.recv() => { + match event_result { + Ok(event) => { + if let SystemEvent::EventPackageArchived(archive_event) = event.as_ref() { + println!( + "šŸ“¦ Received EventPackageArchivedEvent: {}", + archive_event.event_id + ); + + if let Err(e) = self.process_archived_event(archive_event.clone()).await { + eprintln!("āŒ Failed to process archived event: {}", e); + } + } + } + Err(e) => { + eprintln!("āŒ Error receiving event: {}", e); + // Sleep briefly before continuing to avoid busy loop + sleep(Duration::from_millis(100)).await; } } } - Err(e) => { - eprintln!("āŒ Error receiving event: {}", e); - // Sleep briefly before continuing to avoid busy loop - sleep(Duration::from_millis(100)).await; - } } } + + println!("šŸ“” Communication controller stopped"); + Ok(()) } /// Process an EventPackageArchivedEvent by packaging and uploading diff --git a/meteor-edge-client/src/storage/storage.rs b/meteor-edge-client/src/storage/storage.rs index 594118f..8e3e768 100644 --- a/meteor-edge-client/src/storage/storage.rs +++ b/meteor-edge-client/src/storage/storage.rs @@ -4,6 +4,7 @@ use std::collections::VecDeque; use std::fs; use std::path::{Path, PathBuf}; use tokio::fs as async_fs; +use tokio::sync::broadcast; use tokio::time::{sleep, Duration}; use crate::core::events::{ @@ -144,7 +145,7 @@ impl StorageController { } /// Start the storage controller loop - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> { println!("šŸ’¾ Starting storage controller..."); println!(" Buffer size: {} frames", self.config.frame_buffer_size); println!(" Storage path: {:?}", self.config.base_storage_path); @@ -157,6 +158,12 @@ impl StorageController { loop { tokio::select! { + // Handle shutdown signal + _ = shutdown_rx.recv() => { + println!("šŸ’¾ Storage controller received shutdown signal"); + break; + } + // Handle incoming events event_result = event_receiver.recv() => { match event_result { @@ -180,6 +187,9 @@ impl StorageController { } } } + + println!("šŸ’¾ Storage controller stopped"); + Ok(()) } /// Handle incoming events from the event bus