diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4f041cc..3cb0f0b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -66,3 +66,4 @@ jobs: TEST_DEVNET_RPC_ENDPOINT: ${{ secrets.DEVNET_RPC_ENDPOINT }} TEST_MAINNET_RPC_ENDPOINT: ${{ secrets.MAINNET_RPC_ENDPOINT }} TEST_PRIVATE_KEY: ${{ secrets.TEST_PRIVATE_KEY }} + TEST_GRPC_X_TOKEN: ${{ secrets.TEST_GRPC_X_TOKEN }} diff --git a/Cargo.lock b/Cargo.lock index 10a1c18..d628c01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,7 +215,7 @@ dependencies = [ "anchor-syn", "anyhow", "bs58", - "heck", + "heck 0.3.3", "proc-macro2", "quote", "serde_json", @@ -288,7 +288,7 @@ checksum = "32e8599d21995f68e296265aa5ab0c3cef582fd58afec014d01bd0bce18a4418" dependencies = [ "anchor-lang-idl-spec", "anyhow", - "heck", + "heck 0.3.3", "serde", "serde_json", "sha2 0.10.8", @@ -312,7 +312,7 @@ checksum = "564685b759db12a2424d1b2688cfdf0fec26a023813bc461274754fb0e5d97b0" dependencies = [ "anyhow", "bs58", - "heck", + "heck 0.3.3", "proc-macro2", "quote", "serde", @@ -554,6 +554,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "async-trait" version = "0.1.87" @@ -565,6 +587,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -582,6 +610,62 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "autotools" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef941527c41b0fc0dd48511a8154cd5fc7e29200a0ff8b7203c5d777dbc795cf" +dependencies = [ + "cc", +] + +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.2", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -950,6 +1034,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1260,6 +1354,8 @@ dependencies = [ "tokio-stream", "tokio-tungstenite", "toml 0.8.20", + "yellowstone-grpc-client", + "yellowstone-grpc-proto", ] [[package]] @@ -1429,6 +1525,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94474d15a76982be62ca8a39570dccce148d98c238ebb7408b0a21b2c4bdddc4" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.0" @@ -1639,7 +1741,26 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.8.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.2.0", + "indexmap 2.8.0", "slab", "tokio", "tokio-util", @@ -1655,6 +1776,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.13.2" @@ -1685,6 +1812,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1769,6 +1902,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.2.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http 1.2.0", + "http-body 1.0.1", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.10.1" @@ -1797,9 +1953,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1811,6 +1967,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.9", + "http 1.2.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1819,10 +1996,43 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", - "rustls", + "hyper 0.14.32", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "hyper 1.6.0", + "libc", + "pin-project-lite", + "socket2", "tokio", - "tokio-rustls", + "tower-service", + "tracing", ] [[package]] @@ -1993,6 +2203,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.8.0" @@ -2136,9 +2356,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.170" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libloading" @@ -2226,6 +2446,12 @@ version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.4" @@ -2298,6 +2524,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.14" @@ -2310,7 +2542,7 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.11.1", "security-framework-sys", "tempfile", ] @@ -2557,6 +2789,36 @@ dependencies = [ "num", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap 2.8.0", +] + +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2611,6 +2873,16 @@ dependencies = [ "zerocopy 0.8.23", ] +[[package]] +name = "prettyplease" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "664ec5419c51e34154eec046ebcba56312d5a2fc3b09a06da188e1ad21afadf6" +dependencies = [ + "proc-macro2", + "syn 2.0.100", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -2638,6 +2910,67 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck 0.5.0", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.100", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + +[[package]] +name = "protobuf-src" +version = "1.1.0+21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7ac8852baeb3cc6fb83b93646fb93c0ffe5d14bf138c945ceb4b9948ee0e3c1" +dependencies = [ + "autotools", +] + [[package]] name = "qstring" version = "0.7.2" @@ -2836,10 +3169,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.32", "hyper-rustls", "ipnet", "js-sys", @@ -2849,15 +3182,15 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -2933,10 +3266,37 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki 0.103.1", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.2.0", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2946,6 +3306,21 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2956,6 +3331,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.103.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.20" @@ -3000,7 +3386,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.9.0", - "core-foundation", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -3215,9 +3614,9 @@ checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" [[package]] name = "socket2" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" dependencies = [ "libc", "windows-sys 0.52.0", @@ -5440,6 +5839,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.1" @@ -5458,7 +5863,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", - "core-foundation", + "core-foundation 0.9.4", "system-configuration-sys", ] @@ -5614,7 +6019,17 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" +dependencies = [ + "rustls 0.23.26", "tokio", ] @@ -5692,13 +6107,115 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap", + "indexmap 2.8.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "flate2", + "h2 0.4.9", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-native-certs", + "rustls-pemfile 2.2.0", + "socket2", + "tokio", + "tokio-rustls 0.26.2", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", + "zstd", +] + +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "tonic-health" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1eaf34ddb812120f5c601162d5429933c9b527d901ab0e7f930d3147e33a09b2" +dependencies = [ + "async-stream", + "prost", + "tokio", + "tokio-stream", + "tonic", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -5712,9 +6229,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "tracing-core" version = "0.1.33" @@ -6248,6 +6777,38 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "yellowstone-grpc-client" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7504e16bd8edc6af4ba11be0b42fba24ffb42b383b1373c5d26dc644a327070a" +dependencies = [ + "bytes", + "futures", + "thiserror 1.0.69", + "tonic", + "tonic-health", + "yellowstone-grpc-proto", +] + +[[package]] +name = "yellowstone-grpc-proto" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a19211525073ab3811f57f30056633011f836fd61781a1f3b24ed4404a4c214" +dependencies = [ + "anyhow", + "bincode", + "prost", + "prost-types", + "protobuf-src", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic", + "tonic-build", +] + [[package]] name = "yoke" version = "0.7.5" diff --git a/Cargo.toml b/Cargo.toml index 4e8f930..12dfcc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,8 @@ thiserror = "1" tokio = { version = "1.42", features = ["full"] } tokio-stream = "0.1.17" tokio-tungstenite = { version = "0.26", features = ["native-tls"] } +yellowstone-grpc-client = "6.0.0" +yellowstone-grpc-proto = "6.0.0" drift-pubsub-client = { version = "0.1.1", path = "crates/pubsub-client" } diff --git a/README.md b/README.md index 4879477..d406fed 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,12 @@ _*_ crates.io requires [libdrift](https://github.com/drift-labs/drift-ffi-sys/?t ## Use +The `DriftClient` struct provides methods for reading drift program accounts and crafting transactions. +It is built on a subscription model where live account updates are transparently cached and made accessible via accessor methods. +The client may be subscribed either via Ws or gRPC. + ```rust -use drift_rs::{DriftClient, Wallet}; +use drift_rs::{AccountFilter, DriftClient, Wallet, grpc::GrpcSubscribeOpts}; use solana_sdk::signature::Keypair; async fn main() { @@ -42,11 +46,36 @@ async fn main() { .await .expect("connects"); - // Subscribe to Ws-based live market and price changes + // Subscribe via WebSocket + // + // 1) Ws-based live market and price changes let markets = [MarketId::spot(1), MarketId::perp(0)]; client.subscribe_markets(&markets).await.unwrap(); client.subscribe_oracles(&markets).await.unwrap(); -} + client.subscribe_account("SUBACCOUNT_1"); + + // OR 2) subscribe via gRPC (advanced) + // gRPC automatically subscribes to all markets and oracles + client.grpc_subscribe( + "https://grpc.example.com".into(), + "API-X-TOKEN".into(), + GrpcSubscribeOpts::default() + .user_accounts("SUBACCOUNT_1", "SUB_ACCOUNT_2") + .on_slot(move |new_slot| { + // do something on slot + }) + .on_account( + AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), + move |account| { + // do something on user account updates + }) + ).await; + + // + // Fetch latest values + /// + let sol_perp_price = client.oracle_price(MarketId::perp(0)); + let subaccount_1: User = client.try_get_account("SUBACCOUNT_1")); ``` ## Setup diff --git a/crates/src/account_map.rs b/crates/src/account_map.rs index 4647889..b1eb51e 100644 --- a/crates/src/account_map.rs +++ b/crates/src/account_map.rs @@ -1,6 +1,6 @@ //! Hybrid solana account map backed by Ws or RPC polling use std::{ - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, time::Duration, }; @@ -12,7 +12,7 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey}; use crate::{ - polled_account_subscriber::PolledAccountSubscriber, types::DataAndSlot, + grpc::AccountUpdate, polled_account_subscriber::PolledAccountSubscriber, types::DataAndSlot, websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, UnsubHandle, }; @@ -31,7 +31,8 @@ pub struct AccountMap { pubsub: Arc, rpc: Arc, commitment: CommitmentConfig, - inner: DashMap, ahash::RandomState>, + inner: Arc>, + subscriptions: Arc, ahash::RandomState>>, } impl AccountMap { @@ -44,7 +45,8 @@ impl AccountMap { pubsub, rpc, commitment, - inner: Default::default(), + inner: Arc::default(), + subscriptions: Arc::default(), } } /// Subscribe account with Ws @@ -58,9 +60,8 @@ impl AccountMap { debug!(target: LOG_TARGET, "subscribing: {account:?}"); let user = AccountSub::new(Arc::clone(&self.pubsub), self.commitment, *account); - let user = user.subscribe().await?; - - self.inner.insert(*account, user); + let sub = user.subscribe(Arc::clone(&self.inner)).await?; + self.subscriptions.insert(*account, sub); Ok(()) } @@ -80,17 +81,50 @@ impl AccountMap { debug!(target: LOG_TARGET, "subscribing: {account:?} @ {interval:?}"); let user = AccountSub::polled(Arc::clone(&self.rpc), *account, interval); - let user = user.subscribe().await?; - - self.inner.insert(*account, user); + let sub = user.subscribe(Arc::clone(&self.inner)).await?; + self.subscriptions.insert(*account, sub); Ok(()) } + /// On account update callback for gRPC hook + pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { + let accounts = Arc::clone(&self.inner); + let subscriptions = Arc::clone(&self.subscriptions); + move |update| { + accounts + .entry(update.pubkey) + .and_modify(|x| { + x.slot = update.slot; + x.raw.resize(update.data.len(), 0); + x.raw.clone_from_slice(update.data); + if update.lamports == 0 { + accounts.remove(&update.pubkey); + } + }) + .or_insert({ + subscriptions.insert( + update.pubkey, + AccountSub { + pubkey: update.pubkey, + subscription: SubscriptionImpl::Grpc, + state: Subscribed { + unsub: Mutex::default(), + }, + }, + ); + AccountSlot { + slot: update.slot, + raw: update.data.to_vec(), + } + }); + } + } /// Unsubscribe user account pub fn unsubscribe_account(&self, account: &Pubkey) { - if let Some((acc, unsub)) = self.inner.remove(account) { + if let Some((acc, sub)) = self.subscriptions.remove(account) { debug!(target: LOG_TARGET, "unsubscribing: {acc:?}"); - let _ = unsub.unsubscribe(); + self.inner.remove(account); + let _ = sub.unsubscribe(); } } /// Return data of the given `account` as T, if it exists @@ -102,14 +136,14 @@ impl AccountMap { &self, account: &Pubkey, ) -> Option> { - self.inner - .get(account) - .map(|u| u.get_account_data_and_slot()) + self.inner.get(account).map(|x| DataAndSlot { + slot: x.slot, + data: T::try_deserialize_unchecked(&mut x.raw.as_slice()).expect("deserializes"), + }) } } struct Subscribed { - data_and_slot: Arc>, unsub: Mutex>, } struct Unsubscribed; @@ -151,52 +185,63 @@ impl AccountSub { } /// Start the subscriber task - pub async fn subscribe(self) -> SdkResult> { - let data_and_slot = Arc::new(RwLock::new(AccountSlot::default())); - + pub async fn subscribe( + self, + accounts: Arc>, + ) -> SdkResult> { let unsub = match self.subscription { SubscriptionImpl::Ws(ref ws) => { - let data_and_slot = Arc::clone(&data_and_slot); - ws.subscribe(Self::SUBSCRIPTION_ID, true, move |update| { - let mut guard = data_and_slot.write().expect("acquired"); - guard.raw.clone_from(&update.data); - guard.slot = update.slot; - }) - .await? + let unsub = ws + .subscribe(Self::SUBSCRIPTION_ID, true, move |update| { + accounts + .entry(update.pubkey) + .and_modify(|x| { + x.slot = update.slot; + x.raw.clone_from(&update.data); + if update.lamports == 0 { + accounts.remove(&update.pubkey); + } + }) + .or_insert(AccountSlot { + raw: update.data.clone(), + slot: update.slot, + }); + }) + .await?; + Some(unsub) } SubscriptionImpl::Polled(ref poll) => { - let data_and_slot = Arc::clone(&data_and_slot); - poll.subscribe(move |update| { - let mut guard = data_and_slot.write().expect("acquired"); - guard.raw.clone_from(&update.data); - guard.slot = update.slot; - }) + let unsub = poll.subscribe(move |update| { + accounts + .entry(update.pubkey) + .and_modify(|x| { + x.slot = update.slot; + x.raw.clone_from(&update.data); + if update.lamports == 0 { + accounts.remove(&update.pubkey); + } + }) + .or_insert(AccountSlot { + raw: update.data.clone(), + slot: update.slot, + }); + }); + Some(unsub) } + SubscriptionImpl::Grpc => None, }; Ok(AccountSub { pubkey: self.pubkey, subscription: self.subscription, state: Subscribed { - data_and_slot, - unsub: Mutex::new(Some(unsub)), + unsub: Mutex::new(unsub), }, }) } } impl AccountSub { - /// Return the latest value of the account data along with last updated slot - /// # Panics - /// Panics if account data cannot be deserialized as `T` - pub fn get_account_data_and_slot(&self) -> DataAndSlot { - let guard = self.state.data_and_slot.read().expect("acquired"); - DataAndSlot { - slot: guard.slot, - data: T::try_deserialize_unchecked(&mut guard.raw.as_slice()).expect("desrializes"), - } - } - /// Stop the user subscriber task, if it exists pub fn unsubscribe(self) -> AccountSub { let mut guard = self.state.unsub.lock().expect("acquire"); @@ -217,6 +262,7 @@ impl AccountSub { enum SubscriptionImpl { Ws(WebsocketAccountSubscriber), Polled(PolledAccountSubscriber), + Grpc, } #[cfg(test)] @@ -228,7 +274,8 @@ mod tests { use super::*; use crate::{ accounts::User, - constants::DEFAULT_PUBKEY, + constants::{state_account, DEFAULT_PUBKEY}, + types::accounts::State, utils::{get_ws_url, test_envs::mainnet_endpoint}, Wallet, }; @@ -252,23 +299,27 @@ mod tests { 0, ); - let (res1, res2) = tokio::join!( + let (res1, res2, res3) = tokio::join!( account_map.subscribe_account(&user_1), account_map.subscribe_account(&user_2), + account_map.subscribe_account_polled(state_account(), Some(Duration::from_secs(2))), ); - assert!(res1.and(res2).is_ok()); + assert!(res1.and(res2).and(res3).is_ok()); let handle = tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(8)).await; let account_data = account_map.account_data::(&user_1); assert!(account_data.is_some_and(|x| x.authority != DEFAULT_PUBKEY)); account_map.unsubscribe_account(&user_1); + let account_data = account_map.account_data::(&user_1); + assert!(account_data.is_none()); + let account_data = account_map.account_data::(&user_2); assert!(account_data.is_some_and(|x| x.authority != DEFAULT_PUBKEY)); - let account_data = account_map.account_data::(&user_1); - assert!(account_data.is_none()); + let state_account = account_map.account_data::(state_account()); + assert!(state_account.is_some()); }); assert!(handle.await.is_ok()); diff --git a/crates/src/ffi.rs b/crates/src/ffi.rs index b1cd584..89091f7 100644 --- a/crates/src/ffi.rs +++ b/crates/src/ffi.rs @@ -386,8 +386,8 @@ pub mod abi_types { pub latest_slot: Slot, } + #[cfg(test)] impl<'a> AccountsList<'a> { - #[cfg(test)] pub fn new( perp_markets: &'a mut [AccountWithKey], spot_markets: &'a mut [AccountWithKey], @@ -457,7 +457,10 @@ mod tests { use super::{simulate_place_perp_order, AccountWithKey, AccountsList, MarginContextMode}; use crate::{ accounts::State, - constants::{self, ids::pyth_program}, + constants::{ + ids::pyth_program, + {self}, + }, create_account_info, drift_idl::{ accounts::{PerpMarket, SpotMarket, User}, diff --git a/crates/src/grpc/grpc_subscriber.rs b/crates/src/grpc/grpc_subscriber.rs new file mode 100644 index 0000000..9210b90 --- /dev/null +++ b/crates/src/grpc/grpc_subscriber.rs @@ -0,0 +1,644 @@ +use std::{collections::HashMap, time::Duration}; + +use ahash::HashSet; +use futures_util::{ + sink::SinkExt, + stream::{FuturesUnordered, StreamExt}, +}; +use log::{error, info, warn}; +use solana_rpc_client_api::filter::Memcmp; +use solana_sdk::{clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey}; +use yellowstone_grpc_client::{ + ClientTlsConfig, GeyserGrpcBuilderError, GeyserGrpcClient, GeyserGrpcClientError, Interceptor, +}; +use yellowstone_grpc_proto::{ + geyser::{CommitmentLevel as GeyserCommitmentLevel, SubscribeUpdateAccountInfo}, + prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, + SubscribeRequestFilterSlots, SubscribeRequestPing, + }, + tonic::{transport::Certificate, Status}, +}; + +use crate::{constants::PROGRAM_ID as DRIFT_PROGRAM_ID, types::UnsubHandle}; + +use super::{AccountUpdate, OnAccountFn}; + +type SlotsFilterMap = HashMap; +type AccountFilterMap = HashMap; +type Hooks = Vec<(AccountFilter, Box)>; + +/// Provides filter criteria for accounts over gRPC +/// +/// There are two filter modes: +/// +/// * `full` - requires all filters to trigger a match +/// * `partial` - any one filter will trigger a match +/// +/// ```example(no_run) +/// // match on discriminator AND memcmp +/// let full = AccountFilter::full() +/// .with_discriminator(User::DISCRIMINATOR) +/// .with_memcmp(..); +/// +/// // match on discriminator OR accounts +/// let partial = AccountFilter::partial() +/// .with_discriminator(User::DISCRIMINATOR) +/// .with_accounts([acc1,acc2]) +/// ``` +#[derive(Clone, Default, Debug)] +pub struct AccountFilter { + /// optionally filter updates by discriminator + discriminator: Option<&'static [u8]>, + /// optionally filter updates by Solana Memcmp matches + memcmp: Option, + /// optionally filter updates by pubkey + accounts: Option>, + /// true = full match mode, false = partial + is_full: bool, +} + +impl AccountFilter { + /// Create a filter that matches ALL accounts! + pub fn firehose() -> Self { + AccountFilter::full() + } + /// Create a filter that matches iff all parameters are satisfied + pub fn full() -> Self { + AccountFilter { + is_full: true, + ..Default::default() + } + } + /// Create a filter that matches when any criteria is satisfied + pub fn partial() -> Self { + AccountFilter { + is_full: false, + ..Default::default() + } + } + /// add filter for given `pubkeys` + pub fn with_accounts(mut self, pubkeys: impl Iterator) -> Self { + self.accounts = Some(ahash::HashSet::from_iter(pubkeys)); + self + } + /// add filter for given anchor account `discriminator` + pub fn with_discriminator(mut self, discriminator: &'static [u8]) -> Self { + self.discriminator = Some(discriminator); + self + } + /// add filter for given memcmp filter + pub fn with_memcmp(mut self, memcmp: Memcmp) -> Self { + self.memcmp = Some(memcmp); + self + } + /// Returns true if pubkey/account matches the filter + pub fn matches(&self, pubkey: &Pubkey, account: &SubscribeUpdateAccountInfo) -> bool { + if !self.is_full { + // partial matches + self.discriminator.is_some_and(|x| x == &account.data[..8]) + || self.accounts.as_ref().is_some_and(|x| x.contains(pubkey)) + || self + .memcmp + .as_ref() + .is_some_and(|x| x.bytes_match(&account.data)) + } else { + // full matches + (match self.discriminator { + Some(x) => x == &account.data[..8], + None => true, + }) && (match self.accounts.as_ref() { + Some(x) => x.contains(pubkey), + None => true, + }) && (match self.memcmp.as_ref() { + Some(x) => x.bytes_match(&account.data), + None => true, + }) + } + } +} + +#[derive(Debug, Clone)] +pub struct GrpcConnectionOpts { + /// Apply a timeout to connecting to the uri. + connect_timeout_ms: Option, + /// Sets the tower service default internal buffer size, default is 1024 + buffer_size: Option, + /// Sets whether to use an adaptive flow control. Uses hyper's default otherwise. + http2_adaptive_window: Option, + /// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper's default otherwise. + http2_keep_alive_interval_ms: Option, + /// Sets the max connection-level flow control for HTTP2, default is 65,535 + initial_connection_window_size: Option, + ///Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control, default is 65,535 + initial_stream_window_size: Option, + ///Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper's default otherwise. + keep_alive_timeout_ms: Option, + /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper's default otherwise. + keep_alive_while_idle: Option, + /// Set whether TCP keepalive messages are enabled on accepted connections. + tcp_keepalive_ms: Option, + /// Set the value of TCP_NODELAY option for accepted connections. Enabled by default. + tcp_nodelay: Option, + /// Apply a timeout to each request. + timeout_ms: Option, + /// Max message size before decoding, full blocks can be super large, default is 1GiB + max_decoding_message_size: usize, +} + +impl Default for GrpcConnectionOpts { + fn default() -> Self { + Self { + connect_timeout_ms: None, + buffer_size: None, + http2_adaptive_window: None, + http2_keep_alive_interval_ms: None, + initial_connection_window_size: None, + initial_stream_window_size: None, + keep_alive_timeout_ms: None, + keep_alive_while_idle: None, + tcp_keepalive_ms: None, + tcp_nodelay: None, + timeout_ms: None, + max_decoding_message_size: 1024 * 1024 * 1024, + } + } +} + +/// Options for the geyser subscription request +#[derive(Debug, Default, Clone)] +pub struct GeyserSubscribeOpts { + /// Filter by presence of field txn_signature + accounts_nonempty_txn_signature: Option, + /// Filter by Account Pubkey + accounts_account: Vec, + /// Filter by Offset and Data, format: `offset,data in base58` + accounts_memcmp: Vec, + /// Filter by Data size + accounts_datasize: Option, + /// Re-send message from slot + from_slot: Option, + /// Send ping in subscribe request + ping: Option, +} + +#[derive(Debug, thiserror::Error)] +/// drift gRPC error +pub enum GrpcError { + #[error("grpc connect err: {0}")] + Geyser(GeyserGrpcBuilderError), + #[error("grpc request err: {0}")] + Client(GeyserGrpcClientError), + #[error("grpc stream err: {0}")] + Stream(Status), +} + +/// specialized Drift gRPC client +pub struct DriftGrpcClient { + endpoint: String, + x_token: String, + grpc_opts: Option, + on_account_hooks: Hooks, + on_slot: Box, +} + +impl DriftGrpcClient { + /// Create a new `DriftGrpcClient` + /// + /// It can be started by calling `subscribe` + pub fn new(endpoint: String, x_token: String) -> Self { + Self { + endpoint, + x_token, + on_account_hooks: Default::default(), + grpc_opts: None, + on_slot: Box::new(move |_slot| {}), + } + } + + /// Set gRPC network options + pub fn grpc_connection_opts(mut self, grpc_opts: GrpcConnectionOpts) -> Self { + let _ = self.grpc_opts.insert(grpc_opts); + self + } + + /// Add a callback on slot updates + /// + /// `on_slot` must prioritize fast handling or risk blocking the gRPC thread + pub fn on_slot(&mut self, on_slot: F) { + self.on_slot = Box::new(on_slot); + } + + /// Add a callback on account updates matching `filter` + /// + /// This may be called many times to define multiple callbacks + /// + /// * `filter` - filter accounts by criteria + /// * `on_account` - fn to receive callback on filter match + /// + /// DEV: `on_account` must prioritize fast handling or risk blocking the gRPC thread + pub fn on_account( + &mut self, + filter: AccountFilter, + on_account: T, + ) { + self.on_account_hooks.push((filter, Box::new(on_account))); + } + + /// Start subscription for geyser updates + /// + /// Returns an unsub handle on success + pub async fn subscribe( + self, + commitment: CommitmentLevel, + subscribe_opts: GeyserSubscribeOpts, + ) -> Result { + let mut grpc_client = grpc_connect( + self.endpoint.as_str(), + self.x_token.as_str(), + self.grpc_opts.clone().unwrap_or_default(), + ) + .await + .map_err(|err| { + error!(target: "grpc", "connect failed: {err:?}"); + GrpcError::Geyser(err) + })?; + + let resp = grpc_client.get_version().await.map_err(GrpcError::Client)?; + info!("gRPC connected 🔌: {}", resp.version); + let request = subscribe_opts.to_subscribe_request(commitment); + info!(target: "grpc", "gRPC subscribing: {request:?}"); + + let (unsub_tx, mut unsub_rx) = tokio::sync::oneshot::channel::<()>(); + + // gRPC receives updates very frequently, don't want tokio scheduler moving it + std::thread::spawn(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let ls = tokio::task::LocalSet::new(); + let geyser_task = ls.spawn_local(Self::geyser_subscribe( + grpc_client, + request, + self.on_account_hooks, + self.on_slot, + )); + let mut waiter = FuturesUnordered::new(); + waiter.push(geyser_task); + + // nb: will cause grpc task to drop when triggered but- + // it doesn't call any 'unsub' endpoint + ls.block_on(&rt, async move { + tokio::select! { + biased; + _ = &mut unsub_rx => (), + res = waiter.next() => { + if let Ok(Some(err)) = res.unwrap() { + log::error!(target: "grpc", "subscription task failed: {err:?}"); + } else { + log::error!(target: "grpc", "subscription task ended unexpectedly"); + } + } + } + }); + info!(target: "grpc", "gRPC connection unsubscribed"); + }); + + info!(target: "grpc", "gRPC subscribed ⚡️"); + Ok(unsub_tx) + } + + /// Run the gRPC subscription task + /// + /// It receives all configured updates and routes them to registered callbacks + async fn geyser_subscribe( + mut client: GeyserGrpcClient, + request: SubscribeRequest, + on_account: Hooks, + on_slot: impl Fn(Slot), + ) -> Option { + let max_retries = 3; + let mut retry_count = 0; + let mut latest_slot = 0; + let mut last_error: Option = None; + loop { + if retry_count >= max_retries { + log::warn!(target: "grpc", "max retry attempts reached. disconnecting..."); + break; + } + let (mut subscribe_tx, mut stream) = + match client.subscribe_with_request(Some(request.clone())).await { + Ok(res) => { + retry_count = 0; + res + } + Err(err) => { + log::warn!(target: "grpc", "failed subscription: {err:?}"); + retry_count += 1; + tokio::time::sleep(Duration::from_secs(2_u64.pow(retry_count + 1))).await; + let _ = last_error.insert(GrpcError::Client(err)); + continue; + } + }; + + while let Some(message) = stream.next().await { + match message { + Ok(msg) => { + match msg.update_oneof { + Some(UpdateOneof::Account(account_update)) => { + let account = match account_update.account { + Some(ref account) => account, + None => { + warn!(target: "grpc", "empty account update: {account_update:?}"); + continue; + } + }; + let pubkey = Pubkey::new_from_array( + account.pubkey.as_slice().try_into().unwrap(), + ); + log::trace!(target: "grpc", "account update: {pubkey}"); + let update = AccountUpdate { + owner: DRIFT_PROGRAM_ID, // assuming not subscribed to any other program accounts.. + pubkey, + slot: latest_slot, + lamports: account.lamports, + executable: account.executable, + rent_epoch: account.rent_epoch, + data: &account.data, + }; + + for (filter, hook) in &on_account { + if filter.matches(&pubkey, account) { + hook(&update); + } + } + } + Some(UpdateOneof::Slot(msg)) => { + log::trace!(target: "grpc", "slot: {}", msg.slot); + if msg.slot > latest_slot { + latest_slot = msg.slot; + on_slot(latest_slot); + } + } + Some(UpdateOneof::Ping(_)) => { + // This is necessary to keep load balancers that expect client pings alive. If your load balancer doesn't + // require periodic client pings then this is unnecessary + log::debug!(target: "grpc", "ping"); + let ping = SubscribeRequest { + ping: Some(SubscribeRequestPing { id: 1 }), + ..Default::default() + }; + match tokio::time::timeout( + Duration::from_secs(5), + subscribe_tx.send(ping), + ) + .await + { + Ok(Ok(_)) => (), + Ok(Err(err)) => { + log::warn!(target: "grpc", "ping failed: {err:?}"); + } + Err(_) => { + log::warn!(target: "grpc", "ping timeout"); + } + } + } + Some(UpdateOneof::Pong(_)) => { + log::trace!(target: "grpc", "pong"); + } + Some(other_update) => { + warn!(target: "grpc", "unhandled update: {other_update:?}"); + } + None => { + warn!(target: "grpc", "received empty update"); + break; + } + } + } + Err(status) => { + error!(target: "grpc", "stream error: {status:?}"); + let _ = last_error.insert(GrpcError::Stream(status)); + break; + } + } + } + } + + error!(target: "grpc", "gRPC stream closed"); + last_error + } +} + +impl GeyserSubscribeOpts { + fn to_subscribe_request(&self, commitment: CommitmentLevel) -> SubscribeRequest { + let mut accounts = AccountFilterMap::default(); + let mut filters = vec![]; + for filter in self.accounts_memcmp.iter() { + filters.push(SubscribeRequestFilterAccountsFilter { + filter: Some(AccountsFilterOneof::Memcmp( + SubscribeRequestFilterAccountsFilterMemcmp { + offset: filter.offset() as u64, + data: filter + .bytes() + .map(|b| AccountsFilterMemcmpOneof::Bytes(b.to_vec())), + }, + )), + }); + } + if let Some(datasize) = self.accounts_datasize { + filters.push(SubscribeRequestFilterAccountsFilter { + filter: Some(AccountsFilterOneof::Datasize(datasize)), + }); + } + + accounts.insert( + "client".to_owned(), + SubscribeRequestFilterAccounts { + nonempty_txn_signature: self.accounts_nonempty_txn_signature, + account: self.accounts_account.clone(), + owner: vec![DRIFT_PROGRAM_ID.to_string()], + filters, + }, + ); + + let mut slots = SlotsFilterMap::default(); + slots.insert( + "client".to_owned(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + interslot_updates: Some(false), + }, + ); + + let ping = self.ping.map(|id| SubscribeRequestPing { id }); + + SubscribeRequest { + slots, + accounts, + commitment: Some(match commitment { + CommitmentLevel::Confirmed => GeyserCommitmentLevel::Confirmed, + CommitmentLevel::Processed => GeyserCommitmentLevel::Processed, + CommitmentLevel::Finalized => GeyserCommitmentLevel::Finalized, + } as i32), + ping, + from_slot: self.from_slot, + ..Default::default() + } + } +} + +/// Connect to gRPC endpoint +/// +/// Returns a new `GeyserGrpcClient` +async fn grpc_connect( + endpoint: &str, + x_token: &str, + opts: GrpcConnectionOpts, +) -> Result, GeyserGrpcBuilderError> { + info!(target: "grpc", "gRPC connecting: {endpoint}..."); + let mut tls_config = ClientTlsConfig::new().with_native_roots(); + if let Ok(path) = &std::env::var("GRPC_CA_CERT") { + let bytes = tokio::fs::read(path) + .await + .expect("GRPC_CA_CERT path exists"); + tls_config = tls_config.ca_certificate(Certificate::from_pem(bytes)); + } + let mut builder = GeyserGrpcClient::build_from_shared(endpoint.to_string())? + .x_token(Some(x_token))? + .tls_config(tls_config)? + .max_decoding_message_size(opts.max_decoding_message_size); + + if let Some(duration) = opts.connect_timeout_ms { + builder = builder.connect_timeout(Duration::from_millis(duration)); + } + if let Some(sz) = opts.buffer_size { + builder = builder.buffer_size(sz); + } + if let Some(enabled) = opts.http2_adaptive_window { + builder = builder.http2_adaptive_window(enabled); + } + if let Some(duration) = opts.http2_keep_alive_interval_ms { + builder = builder.http2_keep_alive_interval(Duration::from_millis(duration)); + } + if let Some(sz) = opts.initial_connection_window_size { + builder = builder.initial_connection_window_size(sz); + } + if let Some(sz) = opts.initial_stream_window_size { + builder = builder.initial_stream_window_size(sz); + } + if let Some(duration) = opts.keep_alive_timeout_ms { + builder = builder.keep_alive_timeout(Duration::from_millis(duration)); + } + if let Some(enabled) = opts.keep_alive_while_idle { + builder = builder.keep_alive_while_idle(enabled); + } + if let Some(duration) = opts.tcp_keepalive_ms { + builder = builder.tcp_keepalive(Some(Duration::from_millis(duration))); + } + if let Some(enabled) = opts.tcp_nodelay { + builder = builder.tcp_nodelay(enabled); + } + if let Some(duration) = opts.timeout_ms { + builder = builder.timeout(Duration::from_millis(duration)); + } + + builder.connect().await +} + +#[cfg(test)] +mod test { + use solana_sdk::pubkey::Pubkey; + + use super::*; + + fn create_test_account(data: Vec) -> SubscribeUpdateAccountInfo { + SubscribeUpdateAccountInfo { + data, + ..Default::default() + } + } + + #[test] + fn grpc_partial_match_discriminator() { + let discriminator = &[1, 2, 3, 4, 5, 6, 7, 8]; + let filter = AccountFilter::partial().with_discriminator(discriminator); + + // Test matching discriminator + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + assert!(filter.matches(&Pubkey::new_unique(), &account)); + + // Test non-matching discriminator + let account = create_test_account(vec![8, 7, 6, 5, 4, 3, 2, 1, 9, 10]); + assert!(!filter.matches(&Pubkey::new_unique(), &account)); + } + + #[test] + fn grpc_partial_match_accounts() { + let pubkey = Pubkey::new_unique(); + let filter = AccountFilter::partial().with_accounts([pubkey].into_iter()); + + // Test matching pubkey + let account = create_test_account(vec![1, 2, 3, 4]); + assert!(filter.matches(&pubkey, &account)); + + // Test non-matching pubkey + let other_pubkey = Pubkey::new_unique(); + assert!(!filter.matches(&other_pubkey, &account)); + } + + #[test] + fn grpc_partial_match_memcmp() { + let memcmp = Memcmp::new_raw_bytes(0, vec![1, 2, 3]); + let filter = AccountFilter::partial().with_memcmp(memcmp); + + // Test matching memcmp + let account = create_test_account(vec![1, 2, 3, 4, 5]); + assert!(filter.matches(&Pubkey::new_unique(), &account)); + + // Test non-matching memcmp + let account = create_test_account(vec![3, 2, 1, 4, 5]); + assert!(!filter.matches(&Pubkey::new_unique(), &account)); + } + + #[test] + fn grpc_full_match_all_filters() { + let pubkey = Pubkey::new_unique(); + let discriminator = &[1, 2, 3, 4, 5, 6, 7, 8]; + let memcmp = Memcmp::new_raw_bytes(8, vec![9, 10]); + + let filter = AccountFilter::full() + .with_discriminator(discriminator) + .with_accounts([pubkey].into_iter()) + .with_memcmp(memcmp); + + // Test all match + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + assert!(filter.matches(&pubkey, &account)); + + // Test discriminator mismatch + let account = create_test_account(vec![8, 7, 6, 5, 4, 3, 2, 1, 9, 10]); + assert!(!filter.matches(&pubkey, &account)); + + // Test pubkey mismatch + let other_pubkey = Pubkey::new_unique(); + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + assert!(!filter.matches(&other_pubkey, &account)); + + // Test memcmp mismatch + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 10, 9]); + assert!(!filter.matches(&pubkey, &account)); + } + + #[test] + fn grpc_firehose_matches_everything() { + let filter = AccountFilter::firehose(); + let pubkey = Pubkey::new_unique(); + let account = create_test_account(vec![1, 2, 3, 4]); + + assert!(filter.matches(&pubkey, &account)); + } +} diff --git a/crates/src/grpc/mod.rs b/crates/src/grpc/mod.rs new file mode 100644 index 0000000..1da5853 --- /dev/null +++ b/crates/src/grpc/mod.rs @@ -0,0 +1,115 @@ +//! Drift gRPC module + +use solana_sdk::{ + clock::{Epoch, Slot}, + pubkey::Pubkey, +}; +pub mod grpc_subscriber; +use grpc_subscriber::{AccountFilter, GrpcConnectionOpts}; + +/// grpc account update callback +pub type OnAccountFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static; +/// grpc slot update callback +pub type OnSlotFn = dyn Fn(Slot) + Send + Sync + 'static; + +/// Account update from gRPC +#[derive(PartialEq, Eq, Clone)] +pub struct AccountUpdate<'a> { + /// the account's pubkey + pub pubkey: Pubkey, + /// lamports in the account + pub lamports: u64, + /// data held in the account + pub data: &'a [u8], + /// the program that owns the account. If executable, the program that loads the account. + pub owner: Pubkey, + /// the account's data contains a loaded program (and is now read-only) + pub executable: bool, + /// the epoch at which the account will next owe rent + pub rent_epoch: Epoch, + /// Slot the update was retrieved + pub slot: Slot, +} + +/// Config options for drift gRPC subscription +/// +/// ```example(no_run) +/// // subscribe to all user and users stats accounts +/// let opts = GrpcSubscribeOpts::default() +/// .usermap_on() // subscribe to ALL user accounts +/// .statsmap_on(); // subscribe to ALL user stats accounts +/// +/// // cache specific user accounts only and set a new slot callback +/// let first_3_subaccounts = (0_u16..3).into_iter().map(|i| wallet.sub_account(i)).collect(); +/// let opts = GrpcSubscribeOpts::default() +/// .user_accounts(first_3_subaccounts); +/// .on_slot(move |new_slot| {}) // slot callback +/// ``` +/// +#[derive(Default)] +pub struct GrpcSubscribeOpts { + /// toggle usermap + pub usermap: bool, + /// toggle user stats map + pub user_stats_map: bool, + /// list of user (sub)accounts to subscribe + pub user_accounts: Vec, + /// callback for slot updates + pub on_slot: Option>, + /// custom callback for account updates + pub on_account: Option<(AccountFilter, Box)>, + /// Network level connection config + pub connection_opts: GrpcConnectionOpts, +} + +impl GrpcSubscribeOpts { + /// Cache ALL drift `User` account updates + /// + /// useful for e.g. building the DLOB, fast TX building for makers + /// + /// note: memory requirements ~2GiB + pub fn usermap_on(mut self) -> Self { + self.usermap = true; + self + } + /// Cache ALL drift `UserStats` account updates + /// + /// useful for e.g. fast TX building for makers + pub fn statsmap_on(mut self) -> Self { + self.user_stats_map = true; + self + } + /// Cache account updates for given `users` only + pub fn user_accounts(mut self, users: Vec) -> Self { + self.user_accounts = users; + self + } + /// Set a callback to invoke on new slot updates + /// + /// * `on_slot` - the callback for new slot updates + /// + /// ! `on_slot` must not block the gRPC task + pub fn on_slot(mut self, on_slot: impl Fn(Slot) + Send + Sync + 'static) -> Self { + self.on_slot = Some(Box::new(on_slot)); + self + } + /// Register a custom callback for account updates + /// + /// * `filter` - accounts matching filter will invoke the callback + /// * `on_account` - fn to invoke on matching account update + /// + /// ! `on_account` must not block the gRPC task + pub fn on_account( + mut self, + filter: AccountFilter, + on_account: impl Fn(&AccountUpdate) + Send + Sync + 'static, + ) -> Self { + self.on_account = Some((filter, Box::new(on_account))); + self + } + /// Set network level connection opts + pub fn connection_opts(mut self, opts: GrpcConnectionOpts) -> Self { + self.connection_opts = opts; + self + } +} diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 06ad4d8..bd22dd0 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -1,8 +1,13 @@ //! Drift SDK -use std::{borrow::Cow, collections::BTreeSet, sync::Arc, time::Duration}; +use std::{ + borrow::Cow, + collections::BTreeSet, + sync::{Arc, RwLock}, + time::Duration, +}; -use anchor_lang::{AccountDeserialize, InstructionData}; +use anchor_lang::{AccountDeserialize, Discriminator, InstructionData}; pub use drift_pubsub_client::PubsubClient; use futures_util::TryFutureExt; use log::debug; @@ -11,6 +16,7 @@ use solana_rpc_client_api::response::Response; use solana_sdk::{ account::Account, clock::Slot, + commitment_config::CommitmentLevel, compute_budget::ComputeBudgetInstruction, hash::Hash, instruction::{AccountMeta, Instruction}, @@ -27,6 +33,7 @@ use crate::{ ProgramData, DEFAULT_PUBKEY, SYSVAR_INSTRUCTIONS_PUBKEY, }, drift_idl::traits::ToAccountMetas, + grpc::grpc_subscriber::{AccountFilter, DriftGrpcClient, GeyserSubscribeOpts}, marketmap::MarketMap, oraclemap::{Oracle, OracleMap}, swift_order_subscriber::{SignedOrderInfo, SwiftOrderStream}, @@ -36,7 +43,7 @@ use crate::{ }, utils::{get_http_url, get_ws_url}, }; -pub use crate::{types::Context, wallet::Wallet}; +pub use crate::{grpc::GrpcSubscribeOpts, types::Context, wallet::Wallet}; // utils pub mod async_utils; @@ -52,6 +59,7 @@ pub mod drift_idl; pub mod types; // internal infra +pub mod grpc; pub mod polled_account_subscriber; pub mod websocket_account_subscriber; pub mod websocket_program_account_subscriber; @@ -65,14 +73,12 @@ pub mod swift_order_subscriber; pub mod jit_client; +pub mod account_map; pub mod marketmap; pub mod oraclemap; pub mod slot_subscriber; pub mod usermap; -// wrappers -pub mod account_map; - #[cfg(feature = "dlob")] pub mod dlob; @@ -698,20 +704,49 @@ impl DriftClient { } /// Return a reference to the internal spot market map + #[cfg(feature = "unsafe_pub")] pub fn spot_market_map(&self) -> Arc>> { self.backend.spot_market_map.map() } /// Return a reference to the internal perp market map + #[cfg(feature = "unsafe_pub")] pub fn perp_market_map(&self) -> Arc>> { self.backend.perp_market_map.map() } /// Return a reference to the internal oracle map + #[cfg(feature = "unsafe_pub")] pub fn oracle_map(&self) -> Arc> { self.backend.oracle_map.map() } + /// Subscribe to all: markets, oracles, users, and slot updates over gRPC + /// + /// Updates are transparently handled by the `DriftClient` and calls to get User accounts, markets, oracles, etc. + /// will utilize the latest cached updates from the gRPC subscription. + /// + /// use `opts` to control what is _cached_ by the client. The gRPC connection will always subscribe + /// to all drift accounts regardless. + /// + /// * `endpoint` - the gRPC endpoint + /// * `x_token` - gRPC authentication X token + /// * `opts` - configure callbacks and caching + /// + pub async fn grpc_subscribe( + &self, + endpoint: String, + x_token: String, + opts: GrpcSubscribeOpts, + ) -> SdkResult<()> { + self.backend.grpc_subscribe(endpoint, x_token, opts).await + } + + /// Unsubscribe the gRPC connection + pub fn grpc_unsubscribe(&self) { + self.backend.grpc_unsubscribe(); + } + /// Return a reference to the internal backend #[cfg(feature = "unsafe_pub")] pub fn backend(&self) -> &'static DriftClientBackend { @@ -730,6 +765,7 @@ pub struct DriftClientBackend { perp_market_map: MarketMap, spot_market_map: MarketMap, oracle_map: OracleMap, + grpc_unsub: RwLock>, } impl DriftClientBackend { /// Initialize a new `DriftClientBackend` @@ -799,9 +835,16 @@ impl DriftClientBackend { perp_market_map, spot_market_map, oracle_map, + grpc_unsub: RwLock::default(), }) } + /// Returns true if `DriftClientBackend` is subscribed via gRPC + pub fn is_grpc_subscribed(&self) -> bool { + let unsub = self.grpc_unsub.read().unwrap(); + unsub.is_some() + } + /// Start subscription for latest block hashes async fn subscribe_blockhashes(&self) -> SdkResult<()> { self.blockhash_subscriber.subscribe(); @@ -810,6 +853,11 @@ impl DriftClientBackend { /// Start subscriptions for market accounts async fn subscribe_markets(&self, markets: &[MarketId]) -> SdkResult<()> { + if self.is_grpc_subscribed() { + log::info!("already subscribed markets via gRPC"); + return Err(SdkError::AlreadySubscribed); + } + let (perps, spot) = markets .iter() .partition::, _>(|x| x.is_perp()); @@ -823,9 +871,93 @@ impl DriftClientBackend { /// Start subscriptions for market oracle accounts async fn subscribe_oracles(&self, markets: &[MarketId]) -> SdkResult<()> { + if self.is_grpc_subscribed() { + log::info!("already subscribed oracles via gRPC"); + return Err(SdkError::AlreadySubscribed); + } + self.oracle_map.subscribe(markets).await } + /// Subscribe to all: markets, oracles, and slot updates over gRPC + async fn grpc_subscribe( + &self, + endpoint: String, + x_token: String, + opts: GrpcSubscribeOpts, + ) -> SdkResult<()> { + let mut grpc = + DriftGrpcClient::new(endpoint, x_token).grpc_connection_opts(opts.connection_opts); + + grpc.on_account( + AccountFilter::partial().with_discriminator(SpotMarket::DISCRIMINATOR), + self.spot_market_map.on_account_fn(), + ); + grpc.on_account( + AccountFilter::partial().with_discriminator(PerpMarket::DISCRIMINATOR), + self.perp_market_map.on_account_fn(), + ); + let oracles: Vec = self + .oracle_map + .oracle_by_market + .iter() + .map(|x| x.1 .0) + .collect(); + grpc.on_account( + AccountFilter::partial().with_accounts(oracles.into_iter()), + self.oracle_map.on_account_fn(), + ); + + if opts.usermap { + grpc.on_account( + AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), + self.account_map.on_account_fn(), + ); + } else { + // when usermap is on, the custom accounts are already included + // usermap off: subscribe to custom `User` accounts + grpc.on_account( + AccountFilter::full() + .with_discriminator(User::DISCRIMINATOR) + .with_accounts(opts.user_accounts.into_iter()), + self.account_map.on_account_fn(), + ); + } + + if opts.user_stats_map { + grpc.on_account( + AccountFilter::partial().with_discriminator(UserStats::DISCRIMINATOR), + self.account_map.on_account_fn(), + ); + } + + // set custom callbacks + if let Some((filter, on_account)) = opts.on_account { + grpc.on_account(filter, on_account); + } + if let Some(f) = opts.on_slot { + grpc.on_slot(f); + } + + // start subscription + let grpc_unsub = grpc + .subscribe(CommitmentLevel::Confirmed, GeyserSubscribeOpts::default()) + .await + .map_err(SdkError::Grpc)?; + + let mut unsub = self.grpc_unsub.write().unwrap(); + let _ = unsub.insert(grpc_unsub); + + Ok(()) + } + + /// Unsubscribe the gRPC connection + fn grpc_unsubscribe(&self) { + let mut guard = self.grpc_unsub.write().unwrap(); + let unsub = guard.take(); + unsub.map(|u| u.send(())); + } + /// End subscriptions to live program data async fn unsubscribe(&self) -> SdkResult<()> { self.blockhash_subscriber.unsubscribe(); @@ -839,7 +971,7 @@ impl DriftClientBackend { &self, market_index: u16, ) -> Option> { - if self.perp_market_map.is_subscribed(market_index) { + if self.is_grpc_subscribed() || self.perp_market_map.is_subscribed(market_index) { self.perp_market_map.get(&market_index) } else { None @@ -850,7 +982,7 @@ impl DriftClientBackend { &self, market_index: u16, ) -> Option> { - if self.spot_market_map.is_subscribed(market_index) { + if self.is_grpc_subscribed() || self.spot_market_map.is_subscribed(market_index) { self.spot_market_map.get(&market_index) } else { None @@ -938,6 +1070,9 @@ impl DriftClientBackend { Ok(value) } else { let account_data = self.rpc_client.get_account_data(account).await?; + if account_data.is_empty() { + return Err(SdkError::NoAccountData(*account)); + } T::try_deserialize(&mut account_data.as_slice()) .map_err(|err| SdkError::Anchor(Box::new(err))) } @@ -1997,6 +2132,7 @@ mod tests { Arc::clone(&rpc_client), CommitmentConfig::processed(), ), + grpc_unsub: Default::default(), }; DriftClient { diff --git a/crates/src/marketmap.rs b/crates/src/marketmap.rs index 1125cfa..ef93e81 100644 --- a/crates/src/marketmap.rs +++ b/crates/src/marketmap.rs @@ -24,6 +24,7 @@ use crate::{ accounts::State, constants::{self, derive_perp_market_account, derive_spot_market_account, state_account}, drift_idl::types::OracleSource, + grpc::AccountUpdate, memcmp::get_market_filter, types::MapOf, websocket_account_subscriber::WebsocketAccountSubscriber, @@ -103,6 +104,22 @@ where Arc::clone(&self.marketmap) } + /// Returns a hook for driving the map with new `Account` updates + pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { + let marketmap = self.map(); + move |update: &AccountUpdate| { + let market = T::deserialize(&mut &update.data[8..]).expect("deser market"); + let idx = market.market_index(); + marketmap.insert( + idx, + DataAndSlot { + slot: update.slot, + data: market, + }, + ); + } + } + /// Subscribe to market account updates pub async fn subscribe(&self, markets: &[MarketId]) -> SdkResult<()> { log::debug!(target: LOG_TARGET, "subscribing: {:?}", T::MARKET_TYPE); diff --git a/crates/src/math/liquidation.rs b/crates/src/math/liquidation.rs index 588cabf..51a423b 100644 --- a/crates/src/math/liquidation.rs +++ b/crates/src/math/liquidation.rs @@ -4,6 +4,7 @@ use std::ops::Neg; +use super::get_oracle_normalization_factor; use crate::{ ffi::{ self, calculate_margin_requirement_and_total_collateral_and_liability_info, AccountsList, @@ -23,8 +24,6 @@ use crate::{ DriftClient, MarginMode, MarketId, SdkError, SdkResult, SpotPosition, }; -use super::get_oracle_normalization_factor; - /// Info on a position's liquidation price and unrealized PnL #[derive(Debug)] pub struct LiquidationAndPnlInfo { @@ -375,7 +374,10 @@ mod tests { use super::*; use crate::{ - constants::{self, ids::pyth_program}, + constants::{ + ids::pyth_program, + {self}, + }, drift_idl::types::{HistoricalOracleData, MarketStatus, OracleSource, SpotPosition, AMM}, math::constants::{ AMM_RESERVE_PRECISION, BASE_PRECISION_I64, LIQUIDATION_FEE_PRECISION, PEG_PRECISION, diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 0d7b258..91acacf 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -16,8 +16,9 @@ use solana_sdk::{ use crate::{ drift_idl::types::OracleSource, ffi::{get_oracle_price, OraclePriceData}, + grpc::AccountUpdate, types::MapOf, - websocket_account_subscriber::{AccountUpdate, WebsocketAccountSubscriber}, + websocket_account_subscriber::{AccountUpdate as WsAccountUpdate, WebsocketAccountSubscriber}, MarketId, SdkError, SdkResult, UnsubHandle, }; @@ -183,11 +184,10 @@ impl OracleMap { let futs_iter = pending_subscriptions.into_iter().map(|sub_fut| { let oraclemap = Arc::clone(&self.oraclemap); - let oracle_shared_mode = self + let oracle_shared_mode = *(self .shared_oracles .get(&sub_fut.pubkey) - .expect("oracle exists") - .clone(); + .expect("oracle exists")); async move { let unsub = @@ -354,13 +354,81 @@ impl OracleMap { pub fn map(&self) -> Arc> { Arc::clone(&self.oraclemap) } + + /// Returns a hook for driving the map with new `Account` updates + pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { + let marketmap = self.map(); + let oracle_lookup = self.shared_oracles.clone(); + + move |update: &AccountUpdate| match oracle_lookup.get(&update.pubkey).unwrap() { + OracleShareMode::Dual { + spot: _, + perp: _, + source, + } => { + update_handler_grpc(update, *source, &marketmap); + } + OracleShareMode::Isolated { market: _, source } => { + update_handler_grpc(update, *source, &marketmap); + } + OracleShareMode::DualMixed { spot, perp } => { + update_handler_grpc(update, spot.1, &marketmap); + update_handler_grpc(update, perp.1, &marketmap); + } + } + } } /// Handler fn for new oracle account data -fn update_handler( +#[inline] +fn update_handler_grpc( update: &AccountUpdate, oracle_source: OracleSource, oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>, +) { + let lamports = update.lamports; + let slot = update.slot; + match get_oracle_price( + oracle_source, + &mut ( + update.pubkey, + Account { + owner: update.owner, + data: update.data.to_vec(), + lamports, + ..Default::default() + }, + ), + slot, + ) { + Ok(price_data) => { + oracle_map + .entry((update.pubkey, oracle_source as u8)) + .and_modify(|o| { + o.data = price_data; + o.slot = slot; + o.raw.resize(update.data.len(), 0); + o.raw.clone_from_slice(update.data); + }) + .or_insert(Oracle { + pubkey: update.pubkey, + source: oracle_source, + data: price_data, + slot, + raw: update.data.to_vec(), + }); + } + Err(err) => { + log::error!("Failed to get oracle price: {err:?}, {:?}", update.pubkey) + } + } +} + +/// Handler fn for new oracle account data +fn update_handler( + update: &WsAccountUpdate, + oracle_source: OracleSource, + oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>, ) { let oracle_pubkey = update.pubkey; let lamports = update.lamports; diff --git a/crates/src/swift_order_subscriber.rs b/crates/src/swift_order_subscriber.rs index 199dd9c..c0a958f 100644 --- a/crates/src/swift_order_subscriber.rs +++ b/crates/src/swift_order_subscriber.rs @@ -16,8 +16,10 @@ use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use tokio_stream::wrappers::ReceiverStream; use tokio_tungstenite::{connect_async, tungstenite::Message}; -pub use crate::types::SignedMsgOrderParamsDelegateMessage as SignedDelegateOrder; -pub use crate::types::SignedMsgOrderParamsMessage as SignedOrder; +pub use crate::types::{ + SignedMsgOrderParamsDelegateMessage as SignedDelegateOrder, + SignedMsgOrderParamsMessage as SignedOrder, +}; use crate::{ constants::MarketExt, types::{Context, MarketId, OrderParams, SdkError, SdkResult}, @@ -43,8 +45,6 @@ pub const SWIFT_MAINNET_WS_URL: &str = "wss://swift.drift.trade"; const LOG_TARGET: &str = "swift"; -/// Wrapper for a signed order message (aka swift order) - /// Common fields of signed message types pub struct SignedMessageInfo { pub taker_pubkey: Pubkey, diff --git a/crates/src/types.rs b/crates/src/types.rs index 2da4c6d..46c0e3e 100644 --- a/crates/src/types.rs +++ b/crates/src/types.rs @@ -30,6 +30,7 @@ pub use crate::drift_idl::{ use crate::{ constants::{ids, LUTS_DEVNET, LUTS_MAINNET}, drift_idl::errors::ErrorCode, + grpc::grpc_subscriber::GrpcError, Wallet, }; @@ -333,6 +334,8 @@ pub enum SdkError { LibDriftVersion, #[error("wallet signing disabled")] WalletSigningDisabled, + #[error("{0}")] + Grpc(#[from] GrpcError), } impl SdkError { diff --git a/tests/integration.rs b/tests/integration.rs index b5a3659..ef4588f 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,14 +1,17 @@ use std::time::Duration; +use anchor_lang::Discriminator; use drift_rs::{ + constants::DEFAULT_PUBKEY, event_subscriber::RpcClient, + grpc::grpc_subscriber::AccountFilter, math::constants::{BASE_PRECISION_I64, LAMPORTS_PER_SOL_I64, PRICE_PRECISION_U64}, types::{accounts::User, Context, MarketId, NewOrder, PostOnlyParam}, utils::test_envs::{devnet_endpoint, mainnet_endpoint, test_keypair}, - DriftClient, TransactionBuilder, Wallet, + DriftClient, GrpcSubscribeOpts, Pubkey, TransactionBuilder, Wallet, }; use futures_util::StreamExt; -use solana_sdk::signature::Keypair; +use solana_sdk::{clock::Slot, signature::Keypair}; #[tokio::test] async fn client_sync_subscribe_all_devnet() { @@ -100,6 +103,64 @@ async fn client_sync_subscribe_mainnet() { dbg!(price); } +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn client_sync_subscribe_mainnet_grpc() { + let _ = env_logger::try_init(); + let client = DriftClient::new( + Context::MainNet, + RpcClient::new(mainnet_endpoint()), + Keypair::new().into(), + ) + .await + .expect("connects"); + + let (slot_update_tx, mut slot_update_rx) = tokio::sync::mpsc::channel::(1); + let (user_update_tx, mut user_update_rx) = tokio::sync::mpsc::channel::(1); + + assert!(client + .grpc_subscribe( + "https://api.rpcpool.com".into(), + std::env::var("TEST_GRPC_X_TOKEN").expect("TEST_GRPC_X_TOKEN set"), + GrpcSubscribeOpts::default() + .usermap_on() + .on_slot(move |new_slot| { + println!("slot: {new_slot}"); + let _ = slot_update_tx.try_send(new_slot); + }) + .on_account( + AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), + move |account| { + println!("account: {}", account.pubkey); + let _ = user_update_tx.try_send(account.pubkey); + } + ) + ) + .await + .is_ok()); + + // wait for updates + tokio::time::sleep(Duration::from_secs(6)).await; + + // oracles available + let price = client.oracle_price(MarketId::perp(4)).await.expect("ok"); + assert!(price > 0); + dbg!(price); + let price = client.oracle_price(MarketId::spot(32)).await.expect("ok"); + assert!(price > 0); + dbg!(price); + + // markets available + assert!(client.try_get_perp_market_account(0).is_ok()); + assert!(client.try_get_spot_market_account(1).is_ok()); + + // slot update received + assert!(slot_update_rx.try_recv().is_ok_and(|s| s > 0)); + + // user update received + assert!(user_update_rx.try_recv().is_ok_and(|u| u != DEFAULT_PUBKEY)); + client.grpc_unsubscribe(); +} + #[tokio::test] async fn place_and_cancel_orders() { let _ = env_logger::try_init();