diff --git a/Cargo.lock b/Cargo.lock index 185d2e0a..d64b52d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,54 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "amq-protocol" +version = "7.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "587d313f3a8b4a40f866cc84b6059fe83133bf172165ac3b583129dd211d8e1c" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc707ab9aa964a85d9fc25908a3fdc486d2e619406883b3105b48bf304a8d606" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf99351d92a161c61ec6ecb213bc7057f5b837dd4e64ba6cb6491358efd770c4" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f89f8273826a676282208e5af38461a07fe939def57396af6ad5997fcf56577d" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -285,6 +333,7 @@ dependencies = [ "futures-util", "hdfs-native-object-store", "humantime", + "lapin", "lazy_static", "mockall", "num_cpus", @@ -541,7 +590,7 @@ version = "55.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73a47aa0c771b5381de2b7f16998d351a6f4eb839f1e13d48353e17e873d969b" dependencies = [ - "bitflags", + "bitflags 2.9.1", "serde", ] @@ -585,6 +634,57 @@ dependencies = [ "term", ] +[[package]] +name = "asn1-rs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60" +dependencies = [ + "asn1-rs-derive", + "asn1-rs-impl", + "displaydoc", + "nom", + "num-traits", + "rusticata-macros", + "thiserror 2.0.12", + "time", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", + "synstructure", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.19" @@ -602,13 +702,100 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-executor" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb812ffb58524bdd10860d7d974e2f01cc0950c2438a74ee5ec2e2280c6c4ffa" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand 2.3.0", + "futures-lite 2.6.0", + "pin-project-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13f937e26114b93193065fd44f507aa2e9169ad0cdabbb996920b1fe1ddea7ba" +dependencies = [ + "async-channel", + "async-executor", + "async-io 2.4.1", + "async-lock 3.4.0", + "blocking", + "futures-lite 2.6.0", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9af57045d58eeb1f7060e7025a1631cbc6399e0a1d10ad6735b3d0ea7f8346ce" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.28", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1237c0ae75a0f3765f58910ff9cdd0a12eeb39ab2f4c7de23262f337f0aacbb3" +dependencies = [ + "async-lock 3.4.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.6.0", + "parking", + "polling 3.8.0", + "rustix 1.0.7", + "slab", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + [[package]] name = "async-lock" version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener", + "event-listener 5.4.0", "event-listener-strategy", "pin-project-lite", ] @@ -649,6 +836,18 @@ dependencies = [ "url", ] +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io 1.13.0", + "async-trait", + "futures-core", + "reactor-trait", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -671,6 +870,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.88" @@ -722,7 +927,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "hex", "http 1.3.1", "ring", @@ -801,7 +1006,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "http-body 0.4.6", "percent-encoding", @@ -826,7 +1031,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "regex-lite", "tracing", @@ -848,7 +1053,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "regex-lite", "tracing", @@ -871,7 +1076,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "regex-lite", "tracing", @@ -999,7 +1204,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "http 1.3.1", "http-body 0.4.6", @@ -1137,7 +1342,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd0b50b1b78dbadd44ab18b3c794e496f3a139abb9fbc27d9c94c4eebbb96496" dependencies = [ - "fastrand", + "fastrand 2.3.0", ] [[package]] @@ -1352,7 +1557,7 @@ version = "0.69.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" dependencies = [ - "bitflags", + "bitflags 2.9.1", "cexpr", "clang-sys", "itertools 0.12.1", @@ -1375,7 +1580,7 @@ version = "0.71.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" dependencies = [ - "bitflags", + "bitflags 2.9.1", "cexpr", "clang-sys", "itertools 0.13.0", @@ -1402,6 +1607,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.9.1" @@ -1463,6 +1674,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite 2.6.0", + "piper", +] + [[package]] name = "borsh" version = "1.5.7" @@ -1860,6 +2084,18 @@ dependencies = [ "cc", ] +[[package]] +name = "cms" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b77c319abfd5219629c45c34c89ba945ed3c5e49fcde9d16b6c3885f118a730" +dependencies = [ + "const-oid", + "der", + "spki", + "x509-cert", +] + [[package]] name = "codespan-reporting" version = "0.12.0" @@ -2002,6 +2238,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "cookie-factory" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" + [[package]] name = "core-foundation" version = "0.9.4" @@ -2989,10 +3231,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" dependencies = [ "const-oid", + "der_derive", + "flagset", "pem-rfc7468", "zeroize", ] +[[package]] +name = "der-parser" +version = "10.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6" +dependencies = [ + "asn1-rs", + "displaydoc", + "nom", + "num-bigint", + "num-traits", + "rusticata-macros", +] + +[[package]] +name = "der_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "deranged" version = "0.4.0" @@ -3043,7 +3312,7 @@ checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" dependencies = [ "cfg-if", "libc", - "socket2", + "socket2 0.5.10", "windows-sys 0.48.0", ] @@ -3181,7 +3450,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea14ef9355e3beab063703aa9dab15afd25f0667c341310c1e5274bb1d0da18" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3195,6 +3464,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.4.0" @@ -3212,10 +3487,19 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" dependencies = [ - "event-listener", + "event-listener 5.4.0", "pin-project-lite", ] +[[package]] +name = "executor-trait" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c39dff9342e4e0e16ce96be751eb21a94e94a87bb2f6e63ad1961c2ce109bf" +dependencies = [ + "async-trait", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -3245,6 +3529,15 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -3275,13 +3568,19 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "flagset" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7ac824320a75a52197e8f2d787f6a38b6718bb6897a35142d749af3c0e8f4fe" + [[package]] name = "flatbuffers" version = "25.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" dependencies = [ - "bitflags", + "bitflags 2.9.1", "rustc_version", ] @@ -3443,6 +3742,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "fastrand 2.3.0", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -3702,7 +4029,7 @@ checksum = "fe9a986a98854573dfbc130f42f81e92f6d4581e23060708842fede6edef0f1f" dependencies = [ "aes", "base64 0.22.1", - "bitflags", + "bitflags 2.9.1", "bytes", "cbc", "chrono", @@ -3726,7 +4053,7 @@ dependencies = [ "rand 0.8.5", "regex", "roxmltree", - "socket2", + "socket2 0.5.10", "thiserror 2.0.12", "tokio", "url", @@ -3762,6 +4089,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "hermit-abi" version = "0.5.1" @@ -3903,7 +4236,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -4012,7 +4345,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -4227,19 +4560,39 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "integer-encoding" version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipconfig" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2", + "socket2 0.5.10", "widestring", "windows-sys 0.48.0", "winreg", @@ -4384,6 +4737,28 @@ dependencies = [ "rustversion", ] +[[package]] +name = "lapin" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4273975142078ed200dedd77f09c8903dec110d0b02a0c8ad45796b39b691ea9" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4516,7 +4891,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.53.0", + "windows-targets 0.52.6", ] [[package]] @@ -4541,7 +4916,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags", + "bitflags 2.9.1", "libc", "redox_syscall", ] @@ -4583,6 +4958,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -4797,11 +5178,11 @@ version = "0.12.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" dependencies = [ - "async-lock", + "async-lock 3.4.0", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "event-listener", + "event-listener 5.4.0", "futures-util", "loom", "parking_lot", @@ -4858,7 +5239,7 @@ dependencies = [ "rand 0.9.1", "serde", "serde_json", - "socket2", + "socket2 0.5.10", "thiserror 2.0.12", "tokio", "tokio-native-tls", @@ -4875,7 +5256,7 @@ checksum = "6e0ec195e788c95f36b7cf88127d538465fc2f7773e6e47af01834738eab0aee" dependencies = [ "base64 0.22.1", "bigdecimal", - "bitflags", + "bitflags 2.9.1", "btoi", "byteorder", "bytes", @@ -5083,7 +5464,7 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" dependencies = [ - "hermit-abi", + "hermit-abi 0.5.1", "libc", ] @@ -5196,6 +5577,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "oid-registry" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12f40cff3dde1b6087cc5d5f5d4d65712f34016a03ed60e9c08dcc392736b5b7" +dependencies = [ + "asn1-rs", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -5214,7 +5604,7 @@ version = "6.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "336b9c63443aceef14bea841b899035ae3abe89b7c486aaf4c5bd8aafedac3f0" dependencies = [ - "bitflags", + "bitflags 2.9.1", "libc", "once_cell", "onig_sys", @@ -5242,7 +5632,7 @@ version = "0.10.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" dependencies = [ - "bitflags", + "bitflags 2.9.1", "cfg-if", "foreign-types", "libc", @@ -5330,6 +5720,28 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p12-keystore" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cae83056e7cb770211494a0ecf66d9fa7eba7d00977e5bb91f0e925b40b937f" +dependencies = [ + "cbc", + "cms", + "der", + "des", + "hex", + "hmac", + "pkcs12", + "pkcs5", + "rand 0.9.1", + "rc2", + "sha1", + "sha2", + "thiserror 2.0.12", + "x509-parser", +] + [[package]] name = "parking" version = "2.2.1" @@ -5431,6 +5843,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "peeking_take_while" version = "1.0.0" @@ -5587,6 +6009,29 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cfae3ead413ca051a681152bd266438d3bfa301c9bdf836939a14c721bb2a21" +dependencies = [ + "doc-comment", + "flume", + "parking_lot", + "tracing", +] + +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand 2.3.0", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -5598,6 +6043,36 @@ dependencies = [ "spki", ] +[[package]] +name = "pkcs12" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "695b3df3d3cc1015f12d70235e35b6b79befc5fa7a9b95b951eab1dd07c9efc2" +dependencies = [ + "cms", + "const-oid", + "der", + "digest", + "spki", + "x509-cert", + "zeroize", +] + +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der", + "pbkdf2", + "scrypt", + "sha2", + "spki", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -5614,6 +6089,37 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b53a684391ad002dd6a596ceb6c74fd004fdce75f4be2e3f615068abbea5fd50" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi 0.5.1", + "pin-project-lite", + "rustix 1.0.7", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "poly1305" version = "0.8.0" @@ -6040,7 +6546,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls 0.23.27", - "socket2", + "socket2 0.5.10", "thiserror 2.0.12", "tokio", "tracing", @@ -6077,9 +6583,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.5.10", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6179,6 +6685,15 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + [[package]] name = "rdkafka" version = "0.37.0" @@ -6214,6 +6729,17 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "recursive" version = "0.1.1" @@ -6258,7 +6784,7 @@ dependencies = [ "rand 0.9.1", "ryu", "sha1_smol", - "socket2", + "socket2 0.5.10", "tokio", "tokio-native-tls", "tokio-util", @@ -6271,7 +6797,7 @@ version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ - "bitflags", + "bitflags 2.9.1", ] [[package]] @@ -6507,7 +7033,7 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ - "bitflags", + "bitflags 2.9.1", "fallible-iterator 0.3.0", "fallible-streaming-iterator", "hashlink 0.9.1", @@ -6558,17 +7084,40 @@ dependencies = [ "semver", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom", +] + +[[package]] +name = "rustix" +version = "0.37.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "519165d378b97752ca44bbe15047d5d3409e875f39327546b42ac81d7e18c1b6" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6577,11 +7126,11 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ - "bitflags", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6625,6 +7174,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-connector" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70cc376c6ba1823ae229bacf8ad93c136d93524eab0e4e5e0e4f96b9c4e5b212" +dependencies = [ + "log", + "rustls 0.23.27", + "rustls-native-certs 0.7.3", + "rustls-pki-types", + "rustls-webpki 0.103.3", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -6801,6 +7363,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.1" @@ -6860,7 +7433,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.9.1", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -6873,7 +7446,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags", + "bitflags 2.9.1", "core-foundation 0.10.0", "core-foundation-sys", "libc", @@ -7191,6 +7764,16 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.10" @@ -7286,7 +7869,7 @@ dependencies = [ "crc", "crossbeam-queue", "either", - "event-listener", + "event-listener 5.4.0", "futures-core", "futures-intrusive", "futures-io", @@ -7357,7 +7940,7 @@ checksum = "0afdd3aa7a629683c2d750c2df343025545087081ab5942593a5288855b1b7a7" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.9.1", "byteorder", "bytes", "crc", @@ -7399,7 +7982,7 @@ checksum = "a0bedbe1bbb5e2615ef347a5e9d8cd7680fb63e77d9dafc0f29be15e53f1ebe6" dependencies = [ "atoi", "base64 0.22.1", - "bitflags", + "bitflags 2.9.1", "byteorder", "crc", "dotenvy", @@ -7468,7 +8051,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7620,7 +8203,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags", + "bitflags 2.9.1", "core-foundation 0.9.4", "system-configuration-sys", ] @@ -7664,17 +8247,29 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e502f78cdbb8ba4718f566c418c52bc729126ffd16baee5baa718cf25dd5a69a" +[[package]] +name = "tcp-stream" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "495b0abdce3dc1f8fd27240651c9e68890c14e9d9c61527b1ce44d8a5a7bd3d5" +dependencies = [ + "cfg-if", + "p12-keystore", + "rustls-connector", + "rustls-pemfile 2.2.0", +] + [[package]] name = "tempfile" version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ - "fastrand", + "fastrand 2.3.0", "getrandom 0.3.3", "once_cell", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -7841,7 +8436,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.10", "tokio-macros", "windows-sys 0.52.0", ] @@ -7904,7 +8499,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.9.1", - "socket2", + "socket2 0.5.10", "tokio", "tokio-util", "whoami", @@ -8095,7 +8690,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "socket2", + "socket2 0.5.10", "tokio", "tokio-stream", "tower 0.4.13", @@ -8160,7 +8755,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags", + "bitflags 2.9.1", "bytes", "futures-util", "http 1.3.1", @@ -8671,6 +9266,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "waker-fn" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" + [[package]] name = "walkdir" version = "2.5.0" @@ -9197,7 +9798,7 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags", + "bitflags 2.9.1", ] [[package]] @@ -9225,6 +9826,34 @@ dependencies = [ "tap", ] +[[package]] +name = "x509-cert" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1301e935010a701ae5f8655edc0ad17c44bad3ac5ce8c39185f75453b720ae94" +dependencies = [ + "const-oid", + "der", + "spki", +] + +[[package]] +name = "x509-parser" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4569f339c0c402346d4a75a9e39cf8dad310e287eef1ff56d4c68e5067f53460" +dependencies = [ + "asn1-rs", + "data-encoding", + "der-parser", + "lazy_static", + "nom", + "oid-registry", + "rusticata-macros", + "thiserror 2.0.12", + "time", +] + [[package]] name = "xattr" version = "1.5.0" diff --git a/crates/arkflow-plugin/Cargo.toml b/crates/arkflow-plugin/Cargo.toml index 57c0bf66..449b7ec6 100644 --- a/crates/arkflow-plugin/Cargo.toml +++ b/crates/arkflow-plugin/Cargo.toml @@ -75,6 +75,9 @@ tokio-tungstenite = { version = "0.26", features = ["native-tls"] } # NATS async-nats = "0.41" +# RabbitMQ +lapin = "2.5" + # modbus tokio-modbus = { version = "0.16", default-features = false, features = ["tcp"] } diff --git a/crates/arkflow-plugin/src/input/amqp09.rs b/crates/arkflow-plugin/src/input/amqp09.rs new file mode 100644 index 00000000..84fc4b5e --- /dev/null +++ b/crates/arkflow-plugin/src/input/amqp09.rs @@ -0,0 +1,373 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Amqp09 input component +//! +//! Receive data from Amqp09 broker + +use arkflow_core::input::{register_input_builder, Ack, Input, InputBuilder}; +use arkflow_core::{Error, MessageBatch, Resource}; + +use async_trait::async_trait; +use flume::{Receiver, Sender}; +use futures_util::stream::StreamExt; +use lapin::{ + options::*, types::FieldTable, Channel, Connection, ConnectionProperties, Consumer, + ExchangeKind, +}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; +use tracing::error; + +/// Amqp09 input configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Amqp09InputConfig { + /// Amqp09 broker URL (e.g., "amqp://localhost:5672") + url: String, + /// Queue name to consume from + queue: String, + /// Exchange name (optional) + exchange: Option, + /// Exchange type (optional, default: "direct") + exchange_type: Option, + /// Routing key (optional) + routing_key: Option, + /// Whether to declare the queue if it doesn't exist + declare_queue: Option, + /// Whether to declare the exchange if it doesn't exist + declare_exchange: Option, + /// Queue durability + durable: Option, + /// Auto-delete queue when no consumers + auto_delete: Option, + /// Exclusive queue + exclusive: Option, + /// Consumer tag + consumer_tag: Option, + /// Auto-acknowledge messages + auto_ack: Option, + /// Prefetch count + prefetch_count: Option, +} + +/// Amqp09 input component +pub struct Amqp09Input { + input_name: Option, + config: Amqp09InputConfig, + connection: Arc>>, + channel: Arc>>, + consumer: Arc>>, + sender: Sender, + receiver: Receiver, + cancellation_token: CancellationToken, +} + +enum Amqp09Msg { + Delivery(lapin::message::Delivery), + Err(Error), +} + +impl Amqp09Input { + /// Create a new Amqp09 input component + fn new(name: Option<&String>, config: Amqp09InputConfig) -> Result { + let (sender, receiver) = flume::bounded::(1000); + let cancellation_token = CancellationToken::new(); + Ok(Self { + input_name: name.cloned(), + config, + connection: Arc::new(Mutex::new(None)), + channel: Arc::new(Mutex::new(None)), + consumer: Arc::new(Mutex::new(None)), + sender, + receiver, + cancellation_token, + }) + } +} + +#[async_trait] +impl Input for Amqp09Input { + async fn connect(&self) -> Result<(), Error> { + // Create connection + let conn = Connection::connect(&self.config.url, ConnectionProperties::default()) + .await + .map_err(|e| Error::Connection(format!("Failed to connect to Amqp09: {}", e)))?; + + // Create channel + let channel = conn + .create_channel() + .await + .map_err(|e| Error::Connection(format!("Failed to create channel: {}", e)))?; + + // Set prefetch count if specified + if let Some(prefetch_count) = self.config.prefetch_count { + channel + .basic_qos(prefetch_count, BasicQosOptions::default()) + .await + .map_err(|e| Error::Connection(format!("Failed to set QoS: {}", e)))?; + } + + // Declare exchange if needed + if let Some(exchange) = &self.config.exchange { + if self.config.declare_exchange.unwrap_or(false) { + let exchange_type = match self.config.exchange_type.as_deref() { + Some("direct") => ExchangeKind::Direct, + Some("fanout") => ExchangeKind::Fanout, + Some("topic") => ExchangeKind::Topic, + Some("headers") => ExchangeKind::Headers, + _ => ExchangeKind::Direct, + }; + + channel + .exchange_declare( + exchange, + exchange_type, + ExchangeDeclareOptions { + durable: self.config.durable.unwrap_or(true), + auto_delete: self.config.auto_delete.unwrap_or(false), + ..Default::default() + }, + FieldTable::default(), + ) + .await + .map_err(|e| Error::Connection(format!("Failed to declare exchange: {}", e)))?; + } + } + + // Declare queue if needed + if self.config.declare_queue.unwrap_or(false) { + channel + .queue_declare( + &self.config.queue, + QueueDeclareOptions { + durable: self.config.durable.unwrap_or(true), + exclusive: self.config.exclusive.unwrap_or(false), + auto_delete: self.config.auto_delete.unwrap_or(false), + ..Default::default() + }, + FieldTable::default(), + ) + .await + .map_err(|e| Error::Connection(format!("Failed to declare queue: {}", e)))?; + } + + // Bind queue to exchange if both are specified + if let (Some(exchange), Some(routing_key)) = + (&self.config.exchange, &self.config.routing_key) + { + channel + .queue_bind( + &self.config.queue, + exchange, + routing_key, + QueueBindOptions::default(), + FieldTable::default(), + ) + .await + .map_err(|e| Error::Connection(format!("Failed to bind queue: {}", e)))?; + } + + // Create consumer + let consumer = channel + .basic_consume( + &self.config.queue, + self.config.consumer_tag.as_deref().unwrap_or(""), + BasicConsumeOptions { + no_ack: self.config.auto_ack.unwrap_or(false), + ..Default::default() + }, + FieldTable::default(), + ) + .await + .map_err(|e| Error::Connection(format!("Failed to create consumer: {}", e)))?; + + // Store connection, channel, and consumer + { + let mut conn_guard = self.connection.lock().await; + *conn_guard = Some(conn); + } + { + let mut channel_guard = self.channel.lock().await; + *channel_guard = Some(channel); + } + { + let mut consumer_guard = self.consumer.lock().await; + *consumer_guard = Some(consumer); + } + + // Start consuming messages + let consumer_arc = Arc::clone(&self.consumer); + let sender_clone = self.sender.clone(); + let cancellation_token = self.cancellation_token.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + break; + } + result = async { + let mut consumer_guard = consumer_arc.lock().await; + if let Some(consumer) = &mut *consumer_guard { + consumer.next().await + } else { + None + } + } => { + match result { + Some(delivery_result) => { + match delivery_result { + Ok(delivery) => { + if let Err(_) = sender_clone.send_async(Amqp09Msg::Delivery(delivery)).await { + break; + } + } + Err(e) => { + error!("Amqp09 delivery error: {}", e); + if let Err(_) = sender_clone.send_async(Amqp09Msg::Err(Error::Disconnection)).await { + break; + } + } + } + } + None => { + // Consumer ended + if let Err(_) = sender_clone.send_async(Amqp09Msg::Err(Error::EOF)).await { + break; + } + } + } + } + } + } + }); + + Ok(()) + } + + async fn read(&self) -> Result<(MessageBatch, Arc), Error> { + { + let connection_guard = self.connection.lock().await; + if connection_guard.is_none() { + return Err(Error::Disconnection); + } + } + + let cancellation_token = self.cancellation_token.clone(); + + tokio::select! { + result = self.receiver.recv_async() => { + match result { + Ok(msg) => { + match msg { + Amqp09Msg::Delivery(delivery) => { + let payload = delivery.data.to_vec(); + let mut msg = MessageBatch::new_binary(vec![payload])?; + msg.set_input_name(self.input_name.clone()); + + Ok((msg, Arc::new(Amqp09Ack { + channel: Arc::clone(&self.channel), + delivery_tag: delivery.delivery_tag, + auto_ack: self.config.auto_ack.unwrap_or(false), + }))) + } + Amqp09Msg::Err(e) => Err(e), + } + } + Err(_) => Err(Error::EOF), + } + } + _ = cancellation_token.cancelled() => { + Err(Error::EOF) + } + } + } + + async fn close(&self) -> Result<(), Error> { + self.cancellation_token.cancel(); + + // Close consumer + { + let mut consumer_guard = self.consumer.lock().await; + *consumer_guard = None; + } + + // Close channel + { + let mut channel_guard = self.channel.lock().await; + if let Some(channel) = channel_guard.take() { + let _ = channel.close(200, "Normal shutdown").await; + } + } + + // Close connection + { + let mut conn_guard = self.connection.lock().await; + if let Some(conn) = conn_guard.take() { + let _ = conn.close(200, "Normal shutdown").await; + } + } + + Ok(()) + } +} + +struct Amqp09Ack { + channel: Arc>>, + delivery_tag: u64, + auto_ack: bool, +} + +#[async_trait] +impl Ack for Amqp09Ack { + async fn ack(&self) { + if !self.auto_ack { + let channel_guard = self.channel.lock().await; + if let Some(channel) = &*channel_guard { + if let Err(e) = channel + .basic_ack(self.delivery_tag, BasicAckOptions::default()) + .await + { + error!("Failed to ack message: {}", e); + } + } + } + } +} + +pub(crate) struct Amqp09InputBuilder; + +impl InputBuilder for Amqp09InputBuilder { + fn build( + &self, + name: Option<&String>, + config: &Option, + _resource: &Resource, + ) -> Result, Error> { + if config.is_none() { + return Err(Error::Config( + "Amqp0.9 input configuration is missing".to_string(), + )); + } + + let config: Amqp09InputConfig = serde_json::from_value(config.clone().unwrap())?; + Ok(Arc::new(Amqp09Input::new(name, config)?)) + } +} + +pub fn init() -> Result<(), Error> { + register_input_builder("amqp09", Arc::new(Amqp09InputBuilder)) +} diff --git a/crates/arkflow-plugin/src/input/mod.rs b/crates/arkflow-plugin/src/input/mod.rs index 67c35e07..d97c3709 100644 --- a/crates/arkflow-plugin/src/input/mod.rs +++ b/crates/arkflow-plugin/src/input/mod.rs @@ -18,6 +18,7 @@ use arkflow_core::Error; +pub mod amqp09; pub mod file; pub mod generate; pub mod http; @@ -38,6 +39,7 @@ pub fn init() -> Result<(), Error> { memory::init()?; mqtt::init()?; nats::init()?; + amqp09::init()?; redis::init()?; sql::init()?; websocket::init()?; diff --git a/crates/arkflow-plugin/src/output/amqp09.rs b/crates/arkflow-plugin/src/output/amqp09.rs new file mode 100644 index 00000000..07611e9c --- /dev/null +++ b/crates/arkflow-plugin/src/output/amqp09.rs @@ -0,0 +1,236 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Amqp09 output component +//! +//! Send data to Amqp09 broker + +use arkflow_core::output::{register_output_builder, Output, OutputBuilder}; +use arkflow_core::{Error, MessageBatch, Resource}; + +use crate::expr::Expr; +use async_trait::async_trait; +use lapin::{ + options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, + ExchangeKind, +}; +use serde::{Deserialize, Serialize}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// Amqp09 output configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +struct Amqp09OutputConfig { + /// Amqp09 broker URL (e.g., "amqp://localhost:5672") + url: String, + /// Exchange name to publish to + exchange: String, + /// Exchange type (optional, default: "direct") + exchange_type: Option, + /// Routing key for publishing + routing_key: Expr, + /// Whether to declare the exchange if it doesn't exist + declare_exchange: Option, + /// Exchange durability + durable: Option, + /// Auto-delete exchange when no queues are bound + auto_delete: Option, + /// Message persistence + persistent: Option, + /// Publisher confirms + confirm: Option, + /// Message priority + priority: Option, + /// Message expiration (in milliseconds) + expiration: Option, + /// Content type + content_type: Option, + /// Content encoding + content_encoding: Option, + /// Value field to extract from messages + value_field: Option, +} + +/// Amqp09 output component +struct Amqp09Output { + config: Amqp09OutputConfig, + connection: Arc>>, + channel: Arc>>, + connected: AtomicBool, +} + +impl Amqp09Output { + /// Create a new Amqp09 output component + fn new(config: Amqp09OutputConfig) -> Result { + Ok(Self { + config, + connection: Arc::new(Mutex::new(None)), + channel: Arc::new(Mutex::new(None)), + connected: AtomicBool::new(false), + }) + } +} + +#[async_trait] +impl Output for Amqp09Output { + async fn connect(&self) -> Result<(), Error> { + // Create connection + let conn = Connection::connect(&self.config.url, ConnectionProperties::default()) + .await + .map_err(|e| Error::Connection(format!("Failed to connect to Amqp09: {}", e)))?; + + // Create channel + let channel = conn + .create_channel() + .await + .map_err(|e| Error::Connection(format!("Failed to create channel: {}", e)))?; + + // Enable publisher confirms if requested + if self.config.confirm.unwrap_or(false) { + channel + .confirm_select(ConfirmSelectOptions::default()) + .await + .map_err(|e| { + Error::Connection(format!("Failed to enable publisher confirms: {}", e)) + })?; + } + + // Declare exchange if needed + if self.config.declare_exchange.unwrap_or(false) { + let exchange_type = match self.config.exchange_type.as_deref() { + Some("direct") => ExchangeKind::Direct, + Some("fanout") => ExchangeKind::Fanout, + Some("topic") => ExchangeKind::Topic, + Some("headers") => ExchangeKind::Headers, + _ => ExchangeKind::Direct, + }; + + channel + .exchange_declare( + &self.config.exchange, + exchange_type, + ExchangeDeclareOptions { + durable: self.config.durable.unwrap_or(true), + auto_delete: self.config.auto_delete.unwrap_or(false), + ..Default::default() + }, + FieldTable::default(), + ) + .await + .map_err(|e| Error::Connection(format!("Failed to declare exchange: {}", e)))?; + } + + // Store connection and channel + { + let mut conn_guard = self.connection.lock().await; + *conn_guard = Some(conn); + } + { + let mut channel_guard = self.channel.lock().await; + *channel_guard = Some(channel); + } + + self.connected.store(true, Ordering::SeqCst); + Ok(()) + } + + async fn write(&self, batch: MessageBatch) -> Result<(), Error> { + if !self.connected.load(Ordering::SeqCst) { + return Err(Error::Process("Amqp09 output is not connected".to_string())); + } + + let channel_arc = self.channel.clone(); + let channel_guard = channel_arc.lock().await; + let channel = channel_guard + .as_ref() + .ok_or_else(|| Error::Process("Amqp09 channel is not initialized".to_string()))?; + + let value_field = self.config.value_field.as_deref().unwrap_or("data"); + + // Get the message content + let payloads = match batch.to_binary(value_field) { + Ok(v) => v.to_vec(), + Err(e) => { + return Err(e); + } + }; + + let routing_keys = self.config.routing_key.evaluate_expr(&batch).await?; + + for (i, payload) in payloads.into_iter().enumerate() { + let routing_key = routing_keys.get(i).map_or("", |v| v.as_str()); + + channel + .basic_publish( + &self.config.exchange, + routing_key, + BasicPublishOptions::default(), + &payload, + BasicProperties::default(), + ) + .await + .map_err(|e| Error::Process(format!("Failed to publish message: {}", e)))? + .await + .map_err(|e| Error::Process(format!("Failed to confirm message: {}", e)))?; + } + + Ok(()) + } + + async fn close(&self) -> Result<(), Error> { + // Close channel + { + let mut channel_guard = self.channel.lock().await; + if let Some(channel) = channel_guard.take() { + let _ = channel.close(200, "Normal shutdown").await; + } + } + + // Close connection + { + let mut conn_guard = self.connection.lock().await; + if let Some(conn) = conn_guard.take() { + let _ = conn.close(200, "Normal shutdown").await; + } + } + + self.connected.store(false, Ordering::SeqCst); + Ok(()) + } +} + +struct Amqp09OutputBuilder; + +impl OutputBuilder for Amqp09OutputBuilder { + fn build( + &self, + _name: Option<&String>, + config: &Option, + _resource: &Resource, + ) -> Result, Error> { + if config.is_none() { + return Err(Error::Config( + "Amqp09 output configuration is missing".to_string(), + )); + } + + let config: Amqp09OutputConfig = serde_json::from_value(config.clone().unwrap())?; + Ok(Arc::new(Amqp09Output::new(config)?)) + } +} + +pub fn init() -> Result<(), Error> { + register_output_builder("amqp09", Arc::new(Amqp09OutputBuilder)) +} diff --git a/crates/arkflow-plugin/src/output/mod.rs b/crates/arkflow-plugin/src/output/mod.rs index eb144a76..d8349e25 100644 --- a/crates/arkflow-plugin/src/output/mod.rs +++ b/crates/arkflow-plugin/src/output/mod.rs @@ -18,13 +18,14 @@ use arkflow_core::Error; +pub mod amqp09; pub mod drop; pub mod http; pub mod kafka; pub mod mqtt; -pub mod sql; pub mod nats; pub mod redis; +pub mod sql; pub mod stdout; pub fn init() -> Result<(), Error> { @@ -32,9 +33,10 @@ pub fn init() -> Result<(), Error> { http::init()?; kafka::init()?; mqtt::init()?; - stdout::init()?; - sql::init()?; nats::init()?; + amqp09::init()?; redis::init()?; + sql::init()?; + stdout::init()?; Ok(()) }