From 9963f77c260df6d3c2c7e8cf1dba42c0cd656e40 Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Tue, 15 Apr 2025 21:49:57 +0800 Subject: [PATCH 01/11] stubbing grpc client --- Cargo.lock | 613 ++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 2 + crates/src/lib.rs | 5 +- 3 files changed, 591 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10a1c18..d628c01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,7 +215,7 @@ dependencies = [ "anchor-syn", "anyhow", "bs58", - "heck", + "heck 0.3.3", "proc-macro2", "quote", "serde_json", @@ -288,7 +288,7 @@ checksum = "32e8599d21995f68e296265aa5ab0c3cef582fd58afec014d01bd0bce18a4418" dependencies = [ "anchor-lang-idl-spec", "anyhow", - "heck", + "heck 0.3.3", "serde", "serde_json", "sha2 0.10.8", @@ -312,7 +312,7 @@ checksum = "564685b759db12a2424d1b2688cfdf0fec26a023813bc461274754fb0e5d97b0" dependencies = [ "anyhow", "bs58", - "heck", + "heck 0.3.3", "proc-macro2", "quote", "serde", @@ -554,6 +554,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "async-trait" version = "0.1.87" @@ -565,6 +587,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -582,6 +610,62 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "autotools" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef941527c41b0fc0dd48511a8154cd5fc7e29200a0ff8b7203c5d777dbc795cf" +dependencies = [ + "cc", +] + +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.2", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -950,6 +1034,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1260,6 +1354,8 @@ dependencies = [ "tokio-stream", "tokio-tungstenite", "toml 0.8.20", + "yellowstone-grpc-client", + "yellowstone-grpc-proto", ] [[package]] @@ -1429,6 +1525,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94474d15a76982be62ca8a39570dccce148d98c238ebb7408b0a21b2c4bdddc4" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.0" @@ -1639,7 +1741,26 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.8.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.2.0", + "indexmap 2.8.0", "slab", "tokio", "tokio-util", @@ -1655,6 +1776,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.13.2" @@ -1685,6 +1812,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1769,6 +1902,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.2.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http 1.2.0", + "http-body 1.0.1", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.10.1" @@ -1797,9 +1953,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1811,6 +1967,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.9", + "http 1.2.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1819,10 +1996,43 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", - "rustls", + "hyper 0.14.32", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "hyper 1.6.0", + "libc", + "pin-project-lite", + "socket2", "tokio", - "tokio-rustls", + "tower-service", + "tracing", ] [[package]] @@ -1993,6 +2203,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.8.0" @@ -2136,9 +2356,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.170" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libloading" @@ -2226,6 +2446,12 @@ version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.7.4" @@ -2298,6 +2524,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.14" @@ -2310,7 +2542,7 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.11.1", "security-framework-sys", "tempfile", ] @@ -2557,6 +2789,36 @@ dependencies = [ "num", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap 2.8.0", +] + +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2611,6 +2873,16 @@ dependencies = [ "zerocopy 0.8.23", ] +[[package]] +name = "prettyplease" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "664ec5419c51e34154eec046ebcba56312d5a2fc3b09a06da188e1ad21afadf6" +dependencies = [ + "proc-macro2", + "syn 2.0.100", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -2638,6 +2910,67 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck 0.5.0", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.100", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + +[[package]] +name = "protobuf-src" +version = "1.1.0+21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7ac8852baeb3cc6fb83b93646fb93c0ffe5d14bf138c945ceb4b9948ee0e3c1" +dependencies = [ + "autotools", +] + [[package]] name = "qstring" version = "0.7.2" @@ -2836,10 +3169,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.32", "hyper-rustls", "ipnet", "js-sys", @@ -2849,15 +3182,15 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -2933,10 +3266,37 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki 0.103.1", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.2.0", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2946,6 +3306,21 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2956,6 +3331,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.103.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.20" @@ -3000,7 +3386,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.9.0", - "core-foundation", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -3215,9 +3614,9 @@ checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" [[package]] name = "socket2" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" dependencies = [ "libc", "windows-sys 0.52.0", @@ -5440,6 +5839,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.1" @@ -5458,7 +5863,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", - "core-foundation", + "core-foundation 0.9.4", "system-configuration-sys", ] @@ -5614,7 +6019,17 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" +dependencies = [ + "rustls 0.23.26", "tokio", ] @@ -5692,13 +6107,115 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap", + "indexmap 2.8.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "flate2", + "h2 0.4.9", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-native-certs", + "rustls-pemfile 2.2.0", + "socket2", + "tokio", + "tokio-rustls 0.26.2", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", + "zstd", +] + +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "tonic-health" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1eaf34ddb812120f5c601162d5429933c9b527d901ab0e7f930d3147e33a09b2" +dependencies = [ + "async-stream", + "prost", + "tokio", + "tokio-stream", + "tonic", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -5712,9 +6229,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "tracing-core" version = "0.1.33" @@ -6248,6 +6777,38 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "yellowstone-grpc-client" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7504e16bd8edc6af4ba11be0b42fba24ffb42b383b1373c5d26dc644a327070a" +dependencies = [ + "bytes", + "futures", + "thiserror 1.0.69", + "tonic", + "tonic-health", + "yellowstone-grpc-proto", +] + +[[package]] +name = "yellowstone-grpc-proto" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a19211525073ab3811f57f30056633011f836fd61781a1f3b24ed4404a4c214" +dependencies = [ + "anyhow", + "bincode", + "prost", + "prost-types", + "protobuf-src", + "solana-account-decoder", + "solana-sdk", + "solana-transaction-status", + "tonic", + "tonic-build", +] + [[package]] name = "yoke" version = "0.7.5" diff --git a/Cargo.toml b/Cargo.toml index 4e8f930..12dfcc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,8 @@ thiserror = "1" tokio = { version = "1.42", features = ["full"] } tokio-stream = "0.1.17" tokio-tungstenite = { version = "0.26", features = ["native-tls"] } +yellowstone-grpc-client = "6.0.0" +yellowstone-grpc-proto = "6.0.0" drift-pubsub-client = { version = "0.1.1", path = "crates/pubsub-client" } diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 06ad4d8..0178593 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -52,6 +52,7 @@ pub mod drift_idl; pub mod types; // internal infra +pub mod grpc; pub mod polled_account_subscriber; pub mod websocket_account_subscriber; pub mod websocket_program_account_subscriber; @@ -65,14 +66,12 @@ pub mod swift_order_subscriber; pub mod jit_client; +pub mod account_map; pub mod marketmap; pub mod oraclemap; pub mod slot_subscriber; pub mod usermap; -// wrappers -pub mod account_map; - #[cfg(feature = "dlob")] pub mod dlob; From 497b3878b891ba78e437b234ae09cffc314ed5b4 Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Wed, 16 Apr 2025 14:37:04 +0800 Subject: [PATCH 02/11] tests --- crates/src/ffi.rs | 2 +- crates/src/grpc/grpc_subscriber.rs | 522 +++++++++++++++++++++++++++ crates/src/grpc/mod.rs | 1 + crates/src/lib.rs | 40 +- crates/src/marketmap.rs | 18 + crates/src/oraclemap.rs | 77 +++- crates/src/swift_order_subscriber.rs | 2 - crates/src/types.rs | 3 + tests/integration.rs | 31 ++ 9 files changed, 687 insertions(+), 9 deletions(-) create mode 100644 crates/src/grpc/grpc_subscriber.rs create mode 100644 crates/src/grpc/mod.rs diff --git a/crates/src/ffi.rs b/crates/src/ffi.rs index b1cd584..f6ca8dd 100644 --- a/crates/src/ffi.rs +++ b/crates/src/ffi.rs @@ -386,8 +386,8 @@ pub mod abi_types { pub latest_slot: Slot, } + #[cfg(test)] impl<'a> AccountsList<'a> { - #[cfg(test)] pub fn new( perp_markets: &'a mut [AccountWithKey], spot_markets: &'a mut [AccountWithKey], diff --git a/crates/src/grpc/grpc_subscriber.rs b/crates/src/grpc/grpc_subscriber.rs new file mode 100644 index 0000000..a32e758 --- /dev/null +++ b/crates/src/grpc/grpc_subscriber.rs @@ -0,0 +1,522 @@ +use std::{collections::HashMap, time::Duration}; + +use crate::constants::PROGRAM_ID as DRIFT_PROGRAM_ID; + +use ahash::HashSet; +use futures_util::{sink::SinkExt, stream::StreamExt}; +use log::{error, info, warn}; +use solana_rpc_client_api::filter::Memcmp; +use solana_sdk::{ + clock::{Epoch, Slot}, + commitment_config::CommitmentLevel, + pubkey::Pubkey, +}; +use yellowstone_grpc_client::{ + ClientTlsConfig, GeyserGrpcBuilderError, GeyserGrpcClient, GeyserGrpcClientError, Interceptor, +}; +use yellowstone_grpc_proto::{ + geyser::{CommitmentLevel as GeyserCommitmentLevel, SubscribeUpdateAccountInfo}, + prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, + SubscribeRequestFilterSlots, SubscribeRequestPing, + }, + tonic::transport::Certificate, +}; + +type SlotsFilterMap = HashMap; +type AccountFilterMap = HashMap; +type HookFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static; +type Hooks = Vec<(AccountFilter, Box)>; + +/// Account update from gRPC +#[derive(PartialEq, Eq, Clone)] +pub struct AccountUpdate<'a> { + /// the account's pubkey + pub pubkey: Pubkey, + /// lamports in the account + pub lamports: u64, + /// data held in the account + pub data: &'a [u8], + /// the program that owns the account. If executable, the program that loads the account. + pub owner: Pubkey, + /// the account's data contains a loaded program (and is now read-only) + pub executable: bool, + /// the epoch at which the account will next owe rent + pub rent_epoch: Epoch, + /// Slot the update was retrieved + pub slot: Slot, +} + +/// Provides filter criteria for accounts over gRPC +/// +/// There are two filter modes: +/// +/// * `full` - requires all filters to trigger a match +/// * `partial` - any one filter will trigger a match +/// +/// ```example(no_run) +/// // match on discriminator AND memcmp +/// let full = AccountFilter::full() +/// .with_discriminator(User::DISCRIMINATOR) +/// .with_memcmp(..); +/// +/// // match on discriminator OR accounts +/// let partial = AccountFilter::partial() +/// .with_discriminator(User::DISCRIMINATOR) +/// .with_accounts([acc1,acc2]) +/// ``` +#[derive(Clone, Default, Debug)] +pub struct AccountFilter { + /// optionally filter updates by discriminator + discriminator: Option<&'static [u8]>, + /// optionally filter updates by Solana Memcmp matches + memcmp: Option, + /// optionally filter updates by pubkey + accounts: Option>, + /// true = full match mode, false = partial + is_full: bool, +} + +impl AccountFilter { + /// Create a filter that matches ALL accounts! + pub fn firehose() -> Self { + AccountFilter::full() + } + /// Create a filter that matches iff all parameters are satisfied + pub fn full() -> Self { + AccountFilter { + is_full: true, + ..Default::default() + } + } + /// Create a filter that matches when any criteria is satisfied + pub fn partial() -> Self { + AccountFilter { + is_full: false, + ..Default::default() + } + } + /// add filter for given `pubkeys` + pub fn with_accounts(mut self, pubkeys: impl Iterator) -> Self { + self.accounts = Some(ahash::HashSet::from_iter(pubkeys)); + self + } + /// add filter for given anchor account `discriminator` + pub fn with_discriminator(mut self, discriminator: &'static [u8]) -> Self { + self.discriminator = Some(discriminator); + self + } + /// add filter for given memcmp filter + pub fn with_memcmp(mut self, memcmp: Memcmp) -> Self { + self.memcmp = Some(memcmp); + self + } + /// Returns true if pubkey/account matches the filter + pub fn matches(&self, pubkey: &Pubkey, account: &SubscribeUpdateAccountInfo) -> bool { + if !self.is_full { + // partial matches + self.discriminator.is_some_and(|x| x == &account.data[..8]) + || self.accounts.as_ref().is_some_and(|x| x.contains(pubkey)) + || self + .memcmp + .as_ref() + .is_some_and(|x| x.bytes_match(&account.data)) + } else { + // full matches + (match self.discriminator { + Some(x) => x == &account.data[..8], + None => true, + }) && (match self.accounts.as_ref() { + Some(x) => x.contains(pubkey), + None => true, + }) && (match self.memcmp.as_ref() { + Some(x) => x.bytes_match(&account.data), + None => true, + }) + } + } +} + +#[derive(Debug, Clone)] +pub struct GrpcOpts { + /// Apply a timeout to connecting to the uri. + connect_timeout_ms: Option, + /// Sets the tower service default internal buffer size, default is 1024 + buffer_size: Option, + /// Sets whether to use an adaptive flow control. Uses hyper’s default otherwise. + http2_adaptive_window: Option, + /// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. + http2_keep_alive_interval_ms: Option, + /// Sets the max connection-level flow control for HTTP2, default is 65,535 + initial_connection_window_size: Option, + ///Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control, default is 65,535 + initial_stream_window_size: Option, + ///Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. + keep_alive_timeout_ms: Option, + /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper’s default otherwise. + keep_alive_while_idle: Option, + /// Set whether TCP keepalive messages are enabled on accepted connections. + tcp_keepalive_ms: Option, + /// Set the value of TCP_NODELAY option for accepted connections. Enabled by default. + tcp_nodelay: Option, + /// Apply a timeout to each request. + timeout_ms: Option, + /// Max message size before decoding, full blocks can be super large, default is 1GiB + max_decoding_message_size: usize, +} + +impl Default for GrpcOpts { + fn default() -> Self { + Self { + connect_timeout_ms: None, + buffer_size: None, + http2_adaptive_window: None, + http2_keep_alive_interval_ms: None, + initial_connection_window_size: None, + initial_stream_window_size: None, + keep_alive_timeout_ms: None, + keep_alive_while_idle: None, + tcp_keepalive_ms: None, + tcp_nodelay: None, + timeout_ms: None, + max_decoding_message_size: 1024 * 1024 * 1024, + } + } +} + +/// Options for gRPC subscription +#[derive(Debug, Default, Clone)] +pub struct SubscribeOpts { + /// Filter by presence of field txn_signature + accounts_nonempty_txn_signature: Option, + /// Filter by Account Pubkey + accounts_account: Vec, + /// Filter by Offset and Data, format: `offset,data in base58` + accounts_memcmp: Vec, + /// Filter by Data size + accounts_datasize: Option, + /// Re-send message from slot + from_slot: Option, + /// Send ping in subscribe request + ping: Option, +} + +#[derive(Debug, thiserror::Error)] +/// drift gRPC error +pub enum GrpcError { + #[error("grpc connect err: {0}")] + Geyser(GeyserGrpcBuilderError), + #[error("grpc request err: {0}")] + Client(GeyserGrpcClientError), +} + +/// specialized Drift gRPC client +pub struct DriftGrpcClient { + endpoint: String, + x_token: String, + grpc_opts: Option, + on_account_hooks: Hooks, + on_slot: Box, +} + +impl DriftGrpcClient { + /// Create a new `DriftGrpcClient` + /// + /// It can be started by calling `subscribe` + pub fn new(endpoint: String, x_token: String) -> Self { + Self { + endpoint, + x_token, + on_account_hooks: Default::default(), + grpc_opts: None, + on_slot: Box::new(move |_slot| {}), + } + } + + /// Set gRPC network options + pub fn grpc_opts(&mut self, grpc_opts: GrpcOpts) { + let _ = self.grpc_opts.insert(grpc_opts); + } + + /// Add a callback on slot updates + /// + /// `on_slot` must prioritize fast handling or risk blocking the gRPC thread + pub fn on_slot(&mut self, on_slot: F) { + self.on_slot = Box::new(on_slot); + } + + /// Add a callback on account updates matching `filter` + /// + /// This may be called many times to define multiple callbacks + /// + /// * `filter` - filter accounts by criteria + /// * `on_account` - fn to receive callback on filter match + /// + /// DEV: `on_account` must prioritize fast handling or risk blocking the gRPC thread + pub fn on_account( + &mut self, + filter: AccountFilter, + on_account: T, + ) { + self.on_account_hooks.push((filter, Box::new(on_account))); + } + + /// Start subscription for geyser updates + pub async fn subscribe( + self, + commitment: CommitmentLevel, + subscribe_opts: SubscribeOpts, + ) -> Result<(), GrpcError> { + let mut grpc_client = grpc_connect( + self.endpoint.as_str(), + self.x_token.as_str(), + self.grpc_opts.clone().unwrap_or_default(), + ) + .await + .map_err(|err| { + error!(target: "grpc", "connect failed: {err:?}"); + GrpcError::Geyser(err) + })?; + + grpc_client.ping(1).await.map_err(GrpcError::Client)?; + info!("gRPC connected 🔌"); + let request = subscribe_opts.to_subscribe_request(commitment); + info!(target: "grpc", "gRPC subscribing: {request:?}"); + + tokio::spawn({ + async move { + Self::geyser_subscribe(grpc_client, request, self.on_account_hooks, self.on_slot) + .await + } + }); + info!("gRPC subscribed ⚡️"); + + Ok(()) + } + + /// Run the gRPC subscription task + /// + /// It receives all configured updates and routes them to registered callbacks + async fn geyser_subscribe( + mut client: GeyserGrpcClient, + request: SubscribeRequest, + on_account: Hooks, + on_slot: impl Fn(Slot), + ) { + let max_retries = 3; + let mut retry_count = 0; + let mut latest_slot = 0; + loop { + if retry_count >= max_retries { + log::warn!(target: "grpc", "max retry attempts reached. disconnecting..."); + break; + } + let (mut subscribe_tx, mut stream) = + match client.subscribe_with_request(Some(request.clone())).await { + Ok(res) => { + retry_count = 0; + res + } + Err(err) => { + log::warn!(target: "grpc", "failed subscription: {err:?}"); + retry_count += 1; + continue; + } + }; + + while let Some(message) = stream.next().await { + match message { + Ok(msg) => { + match msg.update_oneof { + Some(UpdateOneof::Account(account_update)) => { + let account = match account_update.account { + Some(ref account) => account, + None => { + warn!(target: "grpc", "empty account update: {account_update:?}"); + continue; + } + }; + let pubkey = Pubkey::new_from_array( + account.pubkey.as_slice().try_into().unwrap(), + ); + log::trace!(target: "grpc", "account update: {pubkey}"); + let update = AccountUpdate { + owner: DRIFT_PROGRAM_ID, // assuming not subscribed to any other accounts.. + pubkey, + slot: latest_slot, + lamports: account.lamports, + executable: account.executable, + rent_epoch: account.rent_epoch, + data: &account.data, + }; + + for (filter, hook) in &on_account { + if filter.matches(&pubkey, account) { + hook(&update); + } + } + } + Some(UpdateOneof::Slot(msg)) => { + log::debug!(target: "grpc", "slot: {}", msg.slot); + if msg.slot > latest_slot { + latest_slot = msg.slot; + on_slot(latest_slot); + } + } + Some(UpdateOneof::Ping(_)) => { + // This is necessary to keep load balancers that expect client pings alive. If your load balancer doesn't + // require periodic client pings then this is unnecessary + log::debug!(target: "grpc", "ping"); + // TODO: set timeout + if let Err(err) = subscribe_tx + .send(SubscribeRequest { + ping: Some(SubscribeRequestPing { id: 1 }), + ..Default::default() + }) + .await + { + log::warn!(target: "grpc", "ping failed: {err:?}"); + } + } + Some(UpdateOneof::Pong(_)) => { + log::debug!(target: "grpc", "pong"); + } + Some(other_update) => { + warn!(target: "grpc", "unhandled update: {other_update:?}"); + } + None => { + error!(target: "grpc", "update not found in the message"); + break; + } + } + } + Err(error) => { + error!(target: "grpc", "stream error: {error:?}"); + break; + } + } + } + } + + warn!(target: "grpc", "gRPC stream closed"); + } +} + +impl SubscribeOpts { + fn to_subscribe_request(&self, commitment: CommitmentLevel) -> SubscribeRequest { + let mut accounts = AccountFilterMap::default(); + let mut filters = vec![]; + for filter in self.accounts_memcmp.iter() { + filters.push(SubscribeRequestFilterAccountsFilter { + filter: Some(AccountsFilterOneof::Memcmp( + SubscribeRequestFilterAccountsFilterMemcmp { + offset: filter.offset() as u64, + data: filter + .bytes() + .map(|b| AccountsFilterMemcmpOneof::Bytes(b.to_vec())), + }, + )), + }); + } + if let Some(datasize) = self.accounts_datasize { + filters.push(SubscribeRequestFilterAccountsFilter { + filter: Some(AccountsFilterOneof::Datasize(datasize)), + }); + } + + accounts.insert( + "client".to_owned(), + SubscribeRequestFilterAccounts { + nonempty_txn_signature: self.accounts_nonempty_txn_signature, + account: self.accounts_account.clone(), + owner: vec![DRIFT_PROGRAM_ID.to_string()], + filters, + }, + ); + + let mut slots = SlotsFilterMap::default(); + slots.insert( + "client".to_owned(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + interslot_updates: Some(false), + }, + ); + + let ping = self.ping.map(|id| SubscribeRequestPing { id }); + + SubscribeRequest { + slots, + accounts, + commitment: Some(match commitment { + CommitmentLevel::Confirmed => GeyserCommitmentLevel::Confirmed, + CommitmentLevel::Processed => GeyserCommitmentLevel::Processed, + CommitmentLevel::Finalized => GeyserCommitmentLevel::Finalized, + } as i32), + ping, + from_slot: self.from_slot, + ..Default::default() + } + } +} + +/// Connect to gRPC endpoint +/// +/// Returns a new `GeyserGrpcClient` +async fn grpc_connect( + endpoint: &str, + x_token: &str, + opts: GrpcOpts, +) -> Result, GeyserGrpcBuilderError> { + info!(target: "grpc", "gRPC connecting: {endpoint}..."); + let mut tls_config = ClientTlsConfig::new().with_native_roots(); + if let Ok(path) = &std::env::var("GRPC_CA_CERT") { + let bytes = tokio::fs::read(path) + .await + .expect("GRPC_CA_CERT path exists"); + tls_config = tls_config.ca_certificate(Certificate::from_pem(bytes)); + } + let mut builder = GeyserGrpcClient::build_from_shared(endpoint.to_string())? + .x_token(Some(x_token))? + .tls_config(tls_config)? + .max_decoding_message_size(opts.max_decoding_message_size); + + if let Some(duration) = opts.connect_timeout_ms { + builder = builder.connect_timeout(Duration::from_millis(duration)); + } + if let Some(sz) = opts.buffer_size { + builder = builder.buffer_size(sz); + } + if let Some(enabled) = opts.http2_adaptive_window { + builder = builder.http2_adaptive_window(enabled); + } + if let Some(duration) = opts.http2_keep_alive_interval_ms { + builder = builder.http2_keep_alive_interval(Duration::from_millis(duration)); + } + if let Some(sz) = opts.initial_connection_window_size { + builder = builder.initial_connection_window_size(sz); + } + if let Some(sz) = opts.initial_stream_window_size { + builder = builder.initial_stream_window_size(sz); + } + if let Some(duration) = opts.keep_alive_timeout_ms { + builder = builder.keep_alive_timeout(Duration::from_millis(duration)); + } + if let Some(enabled) = opts.keep_alive_while_idle { + builder = builder.keep_alive_while_idle(enabled); + } + if let Some(duration) = opts.tcp_keepalive_ms { + builder = builder.tcp_keepalive(Some(Duration::from_millis(duration))); + } + if let Some(enabled) = opts.tcp_nodelay { + builder = builder.tcp_nodelay(enabled); + } + if let Some(duration) = opts.timeout_ms { + builder = builder.timeout(Duration::from_millis(duration)); + } + + builder.connect().await +} diff --git a/crates/src/grpc/mod.rs b/crates/src/grpc/mod.rs new file mode 100644 index 0000000..e662963 --- /dev/null +++ b/crates/src/grpc/mod.rs @@ -0,0 +1 @@ +pub mod grpc_subscriber; diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 0178593..6255e1b 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -2,15 +2,17 @@ use std::{borrow::Cow, collections::BTreeSet, sync::Arc, time::Duration}; -use anchor_lang::{AccountDeserialize, InstructionData}; +use anchor_lang::{AccountDeserialize, Discriminator, InstructionData}; pub use drift_pubsub_client::PubsubClient; use futures_util::TryFutureExt; +use grpc::grpc_subscriber::AccountFilter; use log::debug; pub use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::response::Response; use solana_sdk::{ account::Account, clock::Slot, + commitment_config::CommitmentLevel, compute_budget::ComputeBudgetInstruction, hash::Hash, instruction::{AccountMeta, Instruction}, @@ -27,6 +29,7 @@ use crate::{ ProgramData, DEFAULT_PUBKEY, SYSVAR_INSTRUCTIONS_PUBKEY, }, drift_idl::traits::ToAccountMetas, + grpc::grpc_subscriber::DriftGrpcClient, marketmap::MarketMap, oraclemap::{Oracle, OracleMap}, swift_order_subscriber::{SignedOrderInfo, SwiftOrderStream}, @@ -711,6 +714,11 @@ impl DriftClient { self.backend.oracle_map.map() } + /// Subscribe to all: markets, oracles, and slot updates over gRPC + pub async fn grpc_subscribe(&self, endpoint: String, x_token: String) -> SdkResult<()> { + self.backend.grpc_subscribe(endpoint, x_token).await + } + /// Return a reference to the internal backend #[cfg(feature = "unsafe_pub")] pub fn backend(&self) -> &'static DriftClientBackend { @@ -825,6 +833,36 @@ impl DriftClientBackend { self.oracle_map.subscribe(markets).await } + /// Subscribe to all: markets, oracles, and slot updates over gRPC + async fn grpc_subscribe(&self, endpoint: String, x_token: String) -> SdkResult<()> { + let mut grpc = DriftGrpcClient::new(endpoint, x_token); + grpc.on_account( + AccountFilter::partial().with_discriminator(SpotMarket::DISCRIMINATOR), + self.spot_market_map.on_account_fn(), + ); + grpc.on_account( + AccountFilter::partial().with_discriminator(PerpMarket::DISCRIMINATOR), + self.perp_market_map.on_account_fn(), + ); + let oracles: Vec = self + .oracle_map + .oracle_by_market + .iter() + .map(|x| x.1 .0) + .collect(); + grpc.on_account( + AccountFilter::partial().with_accounts(oracles.into_iter()), + self.oracle_map.on_account_fn(), + ); + grpc.on_slot(|slot| { + log::info!("new slot: {slot}"); + }); + + grpc.subscribe(CommitmentLevel::Confirmed, Default::default()) + .await + .map_err(Into::into) + } + /// End subscriptions to live program data async fn unsubscribe(&self) -> SdkResult<()> { self.blockhash_subscriber.unsubscribe(); diff --git a/crates/src/marketmap.rs b/crates/src/marketmap.rs index 1125cfa..a14b087 100644 --- a/crates/src/marketmap.rs +++ b/crates/src/marketmap.rs @@ -24,6 +24,7 @@ use crate::{ accounts::State, constants::{self, derive_perp_market_account, derive_spot_market_account, state_account}, drift_idl::types::OracleSource, + grpc::grpc_subscriber::AccountUpdate, memcmp::get_market_filter, types::MapOf, websocket_account_subscriber::WebsocketAccountSubscriber, @@ -103,6 +104,23 @@ where Arc::clone(&self.marketmap) } + /// Returns a hook for driving the map with new `Account` updates + pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { + let marketmap = self.map(); + move |update: &AccountUpdate| { + dbg!("grpc update market!!"); + let market = T::deserialize(&mut &update.data[8..]).expect("deser market"); + let idx = market.market_index(); + marketmap.insert( + idx, + DataAndSlot { + slot: update.slot, + data: market, + }, + ); + } + } + /// Subscribe to market account updates pub async fn subscribe(&self, markets: &[MarketId]) -> SdkResult<()> { log::debug!(target: LOG_TARGET, "subscribing: {:?}", T::MARKET_TYPE); diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 0d7b258..48eb0a3 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -16,8 +16,9 @@ use solana_sdk::{ use crate::{ drift_idl::types::OracleSource, ffi::{get_oracle_price, OraclePriceData}, + grpc::grpc_subscriber::AccountUpdate, types::MapOf, - websocket_account_subscriber::{AccountUpdate, WebsocketAccountSubscriber}, + websocket_account_subscriber::{AccountUpdate as WsAccountUpdate, WebsocketAccountSubscriber}, MarketId, SdkError, SdkResult, UnsubHandle, }; @@ -183,11 +184,10 @@ impl OracleMap { let futs_iter = pending_subscriptions.into_iter().map(|sub_fut| { let oraclemap = Arc::clone(&self.oraclemap); - let oracle_shared_mode = self + let oracle_shared_mode = *(self .shared_oracles .get(&sub_fut.pubkey) - .expect("oracle exists") - .clone(); + .expect("oracle exists")); async move { let unsub = @@ -354,13 +354,80 @@ impl OracleMap { pub fn map(&self) -> Arc> { Arc::clone(&self.oraclemap) } + + /// Returns a hook for driving the map with new `Account` updates + pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { + let marketmap = self.map(); + let oracle_lookup = self.shared_oracles.clone(); + + move |update: &AccountUpdate| match oracle_lookup.get(&update.pubkey).unwrap() { + OracleShareMode::Dual { + spot: _, + perp: _, + source, + } => { + update_handler_grpc(update, *source, &marketmap); + } + OracleShareMode::Isolated { market: _, source } => { + update_handler_grpc(update, *source, &marketmap); + } + OracleShareMode::DualMixed { spot, perp } => { + update_handler_grpc(update, spot.1, &marketmap); + update_handler_grpc(update, perp.1, &marketmap); + } + } + } } /// Handler fn for new oracle account data -fn update_handler( +#[inline] +fn update_handler_grpc( update: &AccountUpdate, oracle_source: OracleSource, oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>, +) { + let lamports = update.lamports; + let slot = update.slot; + match get_oracle_price( + oracle_source, + &mut ( + update.pubkey, + Account { + owner: update.owner, + data: update.data.to_vec(), + lamports, + ..Default::default() + }, + ), + slot, + ) { + Ok(price_data) => { + oracle_map + .entry((update.pubkey, oracle_source as u8)) + .and_modify(|o| { + o.data = price_data; + o.slot = slot; + o.raw.clone_from_slice(update.data); + }) + .or_insert(Oracle { + pubkey: update.pubkey, + source: oracle_source, + data: price_data, + slot, + raw: update.data.to_vec(), + }); + } + Err(err) => { + log::error!("Failed to get oracle price: {err:?}, {:?}", update.pubkey) + } + } +} + +/// Handler fn for new oracle account data +fn update_handler( + update: &WsAccountUpdate, + oracle_source: OracleSource, + oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>, ) { let oracle_pubkey = update.pubkey; let lamports = update.lamports; diff --git a/crates/src/swift_order_subscriber.rs b/crates/src/swift_order_subscriber.rs index 199dd9c..288865b 100644 --- a/crates/src/swift_order_subscriber.rs +++ b/crates/src/swift_order_subscriber.rs @@ -43,8 +43,6 @@ pub const SWIFT_MAINNET_WS_URL: &str = "wss://swift.drift.trade"; const LOG_TARGET: &str = "swift"; -/// Wrapper for a signed order message (aka swift order) - /// Common fields of signed message types pub struct SignedMessageInfo { pub taker_pubkey: Pubkey, diff --git a/crates/src/types.rs b/crates/src/types.rs index 2da4c6d..46c0e3e 100644 --- a/crates/src/types.rs +++ b/crates/src/types.rs @@ -30,6 +30,7 @@ pub use crate::drift_idl::{ use crate::{ constants::{ids, LUTS_DEVNET, LUTS_MAINNET}, drift_idl::errors::ErrorCode, + grpc::grpc_subscriber::GrpcError, Wallet, }; @@ -333,6 +334,8 @@ pub enum SdkError { LibDriftVersion, #[error("wallet signing disabled")] WalletSigningDisabled, + #[error("{0}")] + Grpc(#[from] GrpcError), } impl SdkError { diff --git a/tests/integration.rs b/tests/integration.rs index b5a3659..41cfe7a 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -100,6 +100,37 @@ async fn client_sync_subscribe_mainnet() { dbg!(price); } +#[tokio::test] +async fn client_sync_subscribe_mainnet_grpc() { + let _ = env_logger::try_init(); + let client = DriftClient::new( + Context::MainNet, + RpcClient::new(mainnet_endpoint()), + Keypair::new().into(), + ) + .await + .expect("connects"); + assert!(client + .grpc_subscribe( + "https://api.rpcpool.com".into(), + std::env::var("TEST_GRPC_X_TOKEN").expect("TEST_GRPC_X_TOKEN set"), + ) + .await + .is_ok()); + + tokio::time::sleep(Duration::from_secs(4)).await; + + let price = client.oracle_price(MarketId::perp(1)).await.expect("ok"); + assert!(price > 0); + dbg!(price); + let price = client.oracle_price(MarketId::perp(4)).await.expect("ok"); + assert!(price > 0); + dbg!(price); + let price = client.oracle_price(MarketId::spot(32)).await.expect("ok"); + assert!(price > 0); + dbg!(price); +} + #[tokio::test] async fn place_and_cancel_orders() { let _ = env_logger::try_init(); From abbf0ad53e101cf59197d59deb23ca015a1702ec Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Wed, 16 Apr 2025 15:59:47 +0800 Subject: [PATCH 03/11] subscribe users and user stats --- crates/src/account_map.rs | 109 ++++++++++++++++++++++++++------------ crates/src/lib.rs | 69 +++++++++++++++++++++--- crates/src/marketmap.rs | 1 - crates/src/oraclemap.rs | 1 + tests/integration.rs | 7 ++- 5 files changed, 143 insertions(+), 44 deletions(-) diff --git a/crates/src/account_map.rs b/crates/src/account_map.rs index 4647889..833cc16 100644 --- a/crates/src/account_map.rs +++ b/crates/src/account_map.rs @@ -1,6 +1,6 @@ //! Hybrid solana account map backed by Ws or RPC polling use std::{ - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, time::Duration, }; @@ -12,8 +12,9 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey}; use crate::{ - polled_account_subscriber::PolledAccountSubscriber, types::DataAndSlot, - websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, UnsubHandle, + grpc::grpc_subscriber::AccountUpdate, polled_account_subscriber::PolledAccountSubscriber, + types::DataAndSlot, websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, + UnsubHandle, }; const LOG_TARGET: &str = "accountmap"; @@ -31,7 +32,7 @@ pub struct AccountMap { pubsub: Arc, rpc: Arc, commitment: CommitmentConfig, - inner: DashMap, ahash::RandomState>, + inner: Arc, ahash::RandomState>>, } impl AccountMap { @@ -44,7 +45,7 @@ impl AccountMap { pubsub, rpc, commitment, - inner: Default::default(), + inner: Arc::default(), } } /// Subscribe account with Ws @@ -58,7 +59,7 @@ impl AccountMap { debug!(target: LOG_TARGET, "subscribing: {account:?}"); let user = AccountSub::new(Arc::clone(&self.pubsub), self.commitment, *account); - let user = user.subscribe().await?; + let user = user.subscribe(Arc::clone(&self.inner)).await?; self.inner.insert(*account, user); @@ -80,12 +81,36 @@ impl AccountMap { debug!(target: LOG_TARGET, "subscribing: {account:?} @ {interval:?}"); let user = AccountSub::polled(Arc::clone(&self.rpc), *account, interval); - let user = user.subscribe().await?; + let user = user.subscribe(Arc::clone(&self.inner)).await?; self.inner.insert(*account, user); Ok(()) } + pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) + Send + Sync + 'static { + let accounts = Arc::clone(&self.inner); + move |update| { + dbg!("grpc update user!!", update.pubkey); + accounts + .entry(update.pubkey) + .and_modify(|x| { + x.state.data_and_slot.slot = update.slot; + x.state.data_and_slot.raw.resize(update.data.len(), 0); + x.state.data_and_slot.raw.clone_from_slice(update.data); + }) + .or_insert(AccountSub:: { + pubkey: update.pubkey, + subscription: SubscriptionImpl::Grpc, + state: Subscribed { + data_and_slot: AccountSlot { + slot: update.slot, + raw: update.data.to_vec(), + }, + unsub: Mutex::default(), + }, + }); + } + } /// Unsubscribe user account pub fn unsubscribe_account(&self, account: &Pubkey) { if let Some((acc, unsub)) = self.inner.remove(account) { @@ -102,14 +127,15 @@ impl AccountMap { &self, account: &Pubkey, ) -> Option> { - self.inner - .get(account) - .map(|u| u.get_account_data_and_slot()) + match self.inner.get(account) { + Some(entry) => entry.get_account_data_and_slot(), + None => None, + } } } struct Subscribed { - data_and_slot: Arc>, + data_and_slot: AccountSlot, unsub: Mutex>, } struct Unsubscribed; @@ -151,35 +177,43 @@ impl AccountSub { } /// Start the subscriber task - pub async fn subscribe(self) -> SdkResult> { - let data_and_slot = Arc::new(RwLock::new(AccountSlot::default())); - + pub async fn subscribe( + self, + accounts: Arc, ahash::RandomState>>, + ) -> SdkResult> { let unsub = match self.subscription { SubscriptionImpl::Ws(ref ws) => { - let data_and_slot = Arc::clone(&data_and_slot); - ws.subscribe(Self::SUBSCRIPTION_ID, true, move |update| { - let mut guard = data_and_slot.write().expect("acquired"); - guard.raw.clone_from(&update.data); - guard.slot = update.slot; - }) - .await? + let unsub = ws + .subscribe(Self::SUBSCRIPTION_ID, true, move |update| { + accounts.entry(update.pubkey).and_modify(|x| { + x.state.data_and_slot.slot = update.slot; + x.state.data_and_slot.raw.clone_from(&update.data); + }); + }) + .await?; + Some(unsub) } SubscriptionImpl::Polled(ref poll) => { - let data_and_slot = Arc::clone(&data_and_slot); - poll.subscribe(move |update| { - let mut guard = data_and_slot.write().expect("acquired"); - guard.raw.clone_from(&update.data); - guard.slot = update.slot; - }) + let unsub = poll.subscribe(move |update| { + accounts.entry(update.pubkey).and_modify(|x| { + x.state.data_and_slot.slot = update.slot; + x.state.data_and_slot.raw.clone_from(&update.data); + }); + }); + Some(unsub) } + SubscriptionImpl::Grpc => None, }; - Ok(AccountSub { + Ok(AccountSub:: { pubkey: self.pubkey, subscription: self.subscription, state: Subscribed { - data_and_slot, - unsub: Mutex::new(Some(unsub)), + data_and_slot: AccountSlot { + raw: vec![], + slot: 0, + }, + unsub: Mutex::new(unsub), }, }) } @@ -189,12 +223,16 @@ impl AccountSub { /// Return the latest value of the account data along with last updated slot /// # Panics /// Panics if account data cannot be deserialized as `T` - pub fn get_account_data_and_slot(&self) -> DataAndSlot { - let guard = self.state.data_and_slot.read().expect("acquired"); - DataAndSlot { - slot: guard.slot, - data: T::try_deserialize_unchecked(&mut guard.raw.as_slice()).expect("desrializes"), + pub fn get_account_data_and_slot(&self) -> Option> { + if self.state.data_and_slot.raw.is_empty() { + return None; } + + Some(DataAndSlot { + slot: self.state.data_and_slot.slot, + data: T::try_deserialize_unchecked(&mut self.state.data_and_slot.raw.as_slice()) + .expect("deserializes"), + }) } /// Stop the user subscriber task, if it exists @@ -217,6 +255,7 @@ impl AccountSub { enum SubscriptionImpl { Ws(WebsocketAccountSubscriber), Polled(PolledAccountSubscriber), + Grpc, } #[cfg(test)] diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 6255e1b..78077f0 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -5,7 +5,6 @@ use std::{borrow::Cow, collections::BTreeSet, sync::Arc, time::Duration}; use anchor_lang::{AccountDeserialize, Discriminator, InstructionData}; pub use drift_pubsub_client::PubsubClient; use futures_util::TryFutureExt; -use grpc::grpc_subscriber::AccountFilter; use log::debug; pub use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::response::Response; @@ -29,7 +28,7 @@ use crate::{ ProgramData, DEFAULT_PUBKEY, SYSVAR_INSTRUCTIONS_PUBKEY, }, drift_idl::traits::ToAccountMetas, - grpc::grpc_subscriber::DriftGrpcClient, + grpc::grpc_subscriber::{AccountFilter, DriftGrpcClient}, marketmap::MarketMap, oraclemap::{Oracle, OracleMap}, swift_order_subscriber::{SignedOrderInfo, SwiftOrderStream}, @@ -715,8 +714,28 @@ impl DriftClient { } /// Subscribe to all: markets, oracles, and slot updates over gRPC - pub async fn grpc_subscribe(&self, endpoint: String, x_token: String) -> SdkResult<()> { - self.backend.grpc_subscribe(endpoint, x_token).await + pub async fn grpc_subscribe( + &self, + endpoint: String, + x_token: String, + all_users: bool, + all_user_stats: bool, + on_slot: Option, + ) -> SdkResult<()> { + let first_3_sub_accounts = (0_u16..=3) + .into_iter() + .map(|i| self.wallet.sub_account(i)) + .collect(); + self.backend + .grpc_subscribe( + endpoint, + x_token, + Some(first_3_sub_accounts), + all_users, + all_user_stats, + on_slot, + ) + .await } /// Return a reference to the internal backend @@ -834,7 +853,15 @@ impl DriftClientBackend { } /// Subscribe to all: markets, oracles, and slot updates over gRPC - async fn grpc_subscribe(&self, endpoint: String, x_token: String) -> SdkResult<()> { + async fn grpc_subscribe( + &self, + endpoint: String, + x_token: String, + users: Option>, + all_users: bool, + all_user_stats: bool, + on_slot: Option, + ) -> SdkResult<()> { let mut grpc = DriftGrpcClient::new(endpoint, x_token); grpc.on_account( AccountFilter::partial().with_discriminator(SpotMarket::DISCRIMINATOR), @@ -854,8 +881,33 @@ impl DriftClientBackend { AccountFilter::partial().with_accounts(oracles.into_iter()), self.oracle_map.on_account_fn(), ); - grpc.on_slot(|slot| { - log::info!("new slot: {slot}"); + + // subscribe to custom `User` accounts + users.map(|u| { + grpc.on_account( + AccountFilter::full() + .with_discriminator(User::DISCRIMINATOR) + .with_accounts(u.into_iter()), + self.account_map.on_account_fn(), + ); + }); + + if all_users { + grpc.on_account( + AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), + self.account_map.on_account_fn(), + ); + } + + if all_user_stats { + grpc.on_account( + AccountFilter::partial().with_discriminator(UserStats::DISCRIMINATOR), + self.account_map.on_account_fn(), + ); + } + + on_slot.map(|f| { + grpc.on_slot(f); }); grpc.subscribe(CommitmentLevel::Confirmed, Default::default()) @@ -975,6 +1027,9 @@ impl DriftClientBackend { Ok(value) } else { let account_data = self.rpc_client.get_account_data(account).await?; + if account_data.is_empty() { + return Err(SdkError::NoAccountData(*account)); + } T::try_deserialize(&mut account_data.as_slice()) .map_err(|err| SdkError::Anchor(Box::new(err))) } diff --git a/crates/src/marketmap.rs b/crates/src/marketmap.rs index a14b087..bb9dd64 100644 --- a/crates/src/marketmap.rs +++ b/crates/src/marketmap.rs @@ -108,7 +108,6 @@ where pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { let marketmap = self.map(); move |update: &AccountUpdate| { - dbg!("grpc update market!!"); let market = T::deserialize(&mut &update.data[8..]).expect("deser market"); let idx = market.market_index(); marketmap.insert( diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 48eb0a3..1e9d4fa 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -407,6 +407,7 @@ fn update_handler_grpc( .and_modify(|o| { o.data = price_data; o.slot = slot; + o.raw.resize(update.data.len(), 0); o.raw.clone_from_slice(update.data); }) .or_insert(Oracle { diff --git a/tests/integration.rs b/tests/integration.rs index 41cfe7a..1bf3346 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -114,11 +114,16 @@ async fn client_sync_subscribe_mainnet_grpc() { .grpc_subscribe( "https://api.rpcpool.com".into(), std::env::var("TEST_GRPC_X_TOKEN").expect("TEST_GRPC_X_TOKEN set"), + true, + true, + Some(move |slot| { + println!("slot: {slot}"); + }) ) .await .is_ok()); - tokio::time::sleep(Duration::from_secs(4)).await; + tokio::time::sleep(Duration::from_secs(8)).await; let price = client.oracle_price(MarketId::perp(1)).await.expect("ok"); assert!(price > 0); From 48e5796a118e10f0da40374553b93b0917731f6b Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 10:51:14 +0800 Subject: [PATCH 04/11] Add GrpcSubscriptionOpts struct --- crates/src/account_map.rs | 3 +- crates/src/grpc/grpc_subscriber.rs | 66 ++++++++++++-------- crates/src/lib.rs | 98 +++++++++++++++++++----------- crates/src/types.rs | 94 ++++++++++++++++++++++++++-- tests/integration.rs | 26 +++++--- 5 files changed, 210 insertions(+), 77 deletions(-) diff --git a/crates/src/account_map.rs b/crates/src/account_map.rs index 833cc16..51b6160 100644 --- a/crates/src/account_map.rs +++ b/crates/src/account_map.rs @@ -87,10 +87,9 @@ impl AccountMap { Ok(()) } - pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) + Send + Sync + 'static { + pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { let accounts = Arc::clone(&self.inner); move |update| { - dbg!("grpc update user!!", update.pubkey); accounts .entry(update.pubkey) .and_modify(|x| { diff --git a/crates/src/grpc/grpc_subscriber.rs b/crates/src/grpc/grpc_subscriber.rs index a32e758..b93b9b1 100644 --- a/crates/src/grpc/grpc_subscriber.rs +++ b/crates/src/grpc/grpc_subscriber.rs @@ -28,7 +28,7 @@ use yellowstone_grpc_proto::{ type SlotsFilterMap = HashMap; type AccountFilterMap = HashMap; -type HookFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static; +type HookFn = dyn Fn(&AccountUpdate) + 'static; type Hooks = Vec<(AccountFilter, Box)>; /// Account update from gRPC @@ -141,7 +141,7 @@ impl AccountFilter { } #[derive(Debug, Clone)] -pub struct GrpcOpts { +pub struct GrpcConnectionOpts { /// Apply a timeout to connecting to the uri. connect_timeout_ms: Option, /// Sets the tower service default internal buffer size, default is 1024 @@ -168,7 +168,7 @@ pub struct GrpcOpts { max_decoding_message_size: usize, } -impl Default for GrpcOpts { +impl Default for GrpcConnectionOpts { fn default() -> Self { Self { connect_timeout_ms: None, @@ -217,9 +217,9 @@ pub enum GrpcError { pub struct DriftGrpcClient { endpoint: String, x_token: String, - grpc_opts: Option, + grpc_opts: Option, on_account_hooks: Hooks, - on_slot: Box, + on_slot: Box, } impl DriftGrpcClient { @@ -237,14 +237,15 @@ impl DriftGrpcClient { } /// Set gRPC network options - pub fn grpc_opts(&mut self, grpc_opts: GrpcOpts) { + pub fn grpc_connection_opts(mut self, grpc_opts: GrpcConnectionOpts) -> Self { let _ = self.grpc_opts.insert(grpc_opts); + self } /// Add a callback on slot updates /// /// `on_slot` must prioritize fast handling or risk blocking the gRPC thread - pub fn on_slot(&mut self, on_slot: F) { + pub fn on_slot(&mut self, on_slot: F) { self.on_slot = Box::new(on_slot); } @@ -256,7 +257,7 @@ impl DriftGrpcClient { /// * `on_account` - fn to receive callback on filter match /// /// DEV: `on_account` must prioritize fast handling or risk blocking the gRPC thread - pub fn on_account( + pub fn on_account( &mut self, filter: AccountFilter, on_account: T, @@ -286,13 +287,15 @@ impl DriftGrpcClient { let request = subscribe_opts.to_subscribe_request(commitment); info!(target: "grpc", "gRPC subscribing: {request:?}"); - tokio::spawn({ - async move { - Self::geyser_subscribe(grpc_client, request, self.on_account_hooks, self.on_slot) - .await - } - }); - info!("gRPC subscribed ⚡️"); + // gRPC receives updates very frequently, don't want tokio scheduling processing elsewhere + let localset = tokio::task::LocalSet::new(); + localset.spawn_local(Self::geyser_subscribe( + grpc_client, + request, + self.on_account_hooks, + self.on_slot, + )); + info!(target: "grpc", "gRPC subscribed ⚡️"); Ok(()) } @@ -323,6 +326,7 @@ impl DriftGrpcClient { Err(err) => { log::warn!(target: "grpc", "failed subscription: {err:?}"); retry_count += 1; + tokio::time::sleep(Duration::from_secs(2_u64.pow(retry_count + 1))).await; continue; } }; @@ -360,7 +364,7 @@ impl DriftGrpcClient { } } Some(UpdateOneof::Slot(msg)) => { - log::debug!(target: "grpc", "slot: {}", msg.slot); + log::trace!(target: "grpc", "slot: {}", msg.slot); if msg.slot > latest_slot { latest_slot = msg.slot; on_slot(latest_slot); @@ -370,19 +374,27 @@ impl DriftGrpcClient { // This is necessary to keep load balancers that expect client pings alive. If your load balancer doesn't // require periodic client pings then this is unnecessary log::debug!(target: "grpc", "ping"); - // TODO: set timeout - if let Err(err) = subscribe_tx - .send(SubscribeRequest { - ping: Some(SubscribeRequestPing { id: 1 }), - ..Default::default() - }) - .await + let ping = SubscribeRequest { + ping: Some(SubscribeRequestPing { id: 1 }), + ..Default::default() + }; + match tokio::time::timeout( + Duration::from_secs(5), + subscribe_tx.send(ping), + ) + .await { - log::warn!(target: "grpc", "ping failed: {err:?}"); + Ok(Ok(_)) => (), + Ok(Err(err)) => { + log::warn!(target: "grpc", "ping failed: {err:?}"); + } + Err(_) => { + log::warn!(target: "grpc", "ping timeout"); + } } } Some(UpdateOneof::Pong(_)) => { - log::debug!(target: "grpc", "pong"); + log::trace!(target: "grpc", "pong"); } Some(other_update) => { warn!(target: "grpc", "unhandled update: {other_update:?}"); @@ -401,7 +413,7 @@ impl DriftGrpcClient { } } - warn!(target: "grpc", "gRPC stream closed"); + error!(target: "grpc", "gRPC stream closed"); } } @@ -469,7 +481,7 @@ impl SubscribeOpts { async fn grpc_connect( endpoint: &str, x_token: &str, - opts: GrpcOpts, + opts: GrpcConnectionOpts, ) -> Result, GeyserGrpcBuilderError> { info!(target: "grpc", "gRPC connecting: {endpoint}..."); let mut tls_config = ClientTlsConfig::new().with_native_roots(); diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 78077f0..638afb2 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -1,10 +1,16 @@ //! Drift SDK -use std::{borrow::Cow, collections::BTreeSet, sync::Arc, time::Duration}; +use std::{ + borrow::Cow, + collections::BTreeSet, + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; use anchor_lang::{AccountDeserialize, Discriminator, InstructionData}; pub use drift_pubsub_client::PubsubClient; use futures_util::TryFutureExt; +use grpc::grpc_subscriber::SubscribeOpts; use log::debug; pub use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::response::Response; @@ -699,43 +705,42 @@ impl DriftClient { } /// Return a reference to the internal spot market map + #[cfg(feature = "unsafe_pub")] pub fn spot_market_map(&self) -> Arc>> { self.backend.spot_market_map.map() } /// Return a reference to the internal perp market map + #[cfg(feature = "unsafe_pub")] pub fn perp_market_map(&self) -> Arc>> { self.backend.perp_market_map.map() } /// Return a reference to the internal oracle map + #[cfg(feature = "unsafe_pub")] pub fn oracle_map(&self) -> Arc> { self.backend.oracle_map.map() } - /// Subscribe to all: markets, oracles, and slot updates over gRPC + /// Subscribe to all: markets, oracles, users, and slot updates over gRPC + /// + /// Updates are transparently handled by the `DriftClient` and calls to get User accounts, markets, oracles, etc. + /// will utilize the latest cached updates from the gRPC subscription. + /// + /// use `opts` to control what is _cached_ by the client. The gRPC connection will always subscribe + /// to all drift accounts regardless. + /// + /// * `endpoint` - the gRPC endpoint + /// * `x_token` - gRPC authentication X token + /// * `opts` - configure callbacks and caching + /// pub async fn grpc_subscribe( &self, endpoint: String, x_token: String, - all_users: bool, - all_user_stats: bool, - on_slot: Option, + opts: GrpcSubscribeOpts, ) -> SdkResult<()> { - let first_3_sub_accounts = (0_u16..=3) - .into_iter() - .map(|i| self.wallet.sub_account(i)) - .collect(); - self.backend - .grpc_subscribe( - endpoint, - x_token, - Some(first_3_sub_accounts), - all_users, - all_user_stats, - on_slot, - ) - .await + self.backend.grpc_subscribe(endpoint, x_token, opts).await } /// Return a reference to the internal backend @@ -756,6 +761,7 @@ pub struct DriftClientBackend { perp_market_map: MarketMap, spot_market_map: MarketMap, oracle_map: OracleMap, + grpc_subscribed: AtomicBool, } impl DriftClientBackend { /// Initialize a new `DriftClientBackend` @@ -825,9 +831,16 @@ impl DriftClientBackend { perp_market_map, spot_market_map, oracle_map, + grpc_subscribed: AtomicBool::new(false), }) } + /// Returns true if `DriftClientBackend` is subscribed via gRPC + pub fn is_grpc_subscribed(&self) -> bool { + self.grpc_subscribed + .load(std::sync::atomic::Ordering::Relaxed) + } + /// Start subscription for latest block hashes async fn subscribe_blockhashes(&self) -> SdkResult<()> { self.blockhash_subscriber.subscribe(); @@ -836,6 +849,11 @@ impl DriftClientBackend { /// Start subscriptions for market accounts async fn subscribe_markets(&self, markets: &[MarketId]) -> SdkResult<()> { + if self.is_grpc_subscribed() { + log::info!("already subscribed markets via gRPC"); + return Err(SdkError::AlreadySubscribed); + } + let (perps, spot) = markets .iter() .partition::, _>(|x| x.is_perp()); @@ -849,6 +867,11 @@ impl DriftClientBackend { /// Start subscriptions for market oracle accounts async fn subscribe_oracles(&self, markets: &[MarketId]) -> SdkResult<()> { + if self.is_grpc_subscribed() { + log::info!("already subscribed oracles via gRPC"); + return Err(SdkError::AlreadySubscribed); + } + self.oracle_map.subscribe(markets).await } @@ -857,12 +880,11 @@ impl DriftClientBackend { &self, endpoint: String, x_token: String, - users: Option>, - all_users: bool, - all_user_stats: bool, - on_slot: Option, + opts: GrpcSubscribeOpts, ) -> SdkResult<()> { - let mut grpc = DriftGrpcClient::new(endpoint, x_token); + let mut grpc = + DriftGrpcClient::new(endpoint, x_token).grpc_connection_opts(opts.connection_opts); + grpc.on_account( AccountFilter::partial().with_discriminator(SpotMarket::DISCRIMINATOR), self.spot_market_map.on_account_fn(), @@ -882,35 +904,39 @@ impl DriftClientBackend { self.oracle_map.on_account_fn(), ); - // subscribe to custom `User` accounts - users.map(|u| { + if opts.usermap { grpc.on_account( - AccountFilter::full() - .with_discriminator(User::DISCRIMINATOR) - .with_accounts(u.into_iter()), + AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), self.account_map.on_account_fn(), ); - }); - - if all_users { + } else { + // when usermap is on, the custom accounts are already included + // usermap off: subscribe to custom `User` accounts grpc.on_account( - AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), + AccountFilter::full() + .with_discriminator(User::DISCRIMINATOR) + .with_accounts(opts.user_accounts.into_iter()), self.account_map.on_account_fn(), ); } - if all_user_stats { + if opts.user_stats_map { grpc.on_account( AccountFilter::partial().with_discriminator(UserStats::DISCRIMINATOR), self.account_map.on_account_fn(), ); } - on_slot.map(|f| { + // set custom callbacks + opts.on_account.map(|(filter, on_account)| { + grpc.on_account(filter, on_account); + }); + opts.on_slot.map(|f| { grpc.on_slot(f); }); - grpc.subscribe(CommitmentLevel::Confirmed, Default::default()) + // start subscription + grpc.subscribe(CommitmentLevel::Confirmed, SubscribeOpts::default()) .await .map_err(Into::into) } diff --git a/crates/src/types.rs b/crates/src/types.rs index 46c0e3e..28a81e0 100644 --- a/crates/src/types.rs +++ b/crates/src/types.rs @@ -6,15 +6,16 @@ use std::{ use dashmap::DashMap; pub use solana_rpc_client_api::config::RpcSendTransactionConfig; -pub use solana_sdk::{ - commitment_config::CommitmentConfig, message::VersionedMessage, - transaction::VersionedTransaction, -}; use solana_sdk::{ + clock::Slot, instruction::{AccountMeta, InstructionError}, pubkey::Pubkey, transaction::TransactionError, }; +pub use solana_sdk::{ + commitment_config::CommitmentConfig, message::VersionedMessage, + transaction::VersionedTransaction, +}; use thiserror::Error; use tokio::sync::oneshot; use tokio_tungstenite::tungstenite; @@ -30,10 +31,93 @@ pub use crate::drift_idl::{ use crate::{ constants::{ids, LUTS_DEVNET, LUTS_MAINNET}, drift_idl::errors::ErrorCode, - grpc::grpc_subscriber::GrpcError, + grpc::grpc_subscriber::{AccountFilter, AccountUpdate, GrpcConnectionOpts, GrpcError}, Wallet, }; +/// Config options for drift gRPC subscription +/// +/// ```example(no_run) +/// // subscribe to all user and users stats accounts +/// let opts = GrpcSubscribeOpts::default() +/// .usermap_on() // subscribe to ALL user accounts +/// .statsmap_on(); // subscribe to ALL user stats accounts +/// +/// // cache specific user accounts only and set a new slot callback +/// let first_3_subaccounts = (0_u16..3).into_iter().map(|i| wallet.sub_account(i)).collect(); +/// let opts = GrpcSubscribeOpts::default() +/// .user_accounts(first_3_subaccounts); +/// .on_slot(move |new_slot| {}) // slot callback +/// ``` +/// +#[derive(Default)] +pub struct GrpcSubscribeOpts { + /// toggle usermap + pub usermap: bool, + /// toggle user stats map + pub user_stats_map: bool, + /// list of user (sub)accounts to subscribe + pub user_accounts: Vec, + /// callback for slot updates + pub on_slot: Option>, + /// custom callback for account updates + pub on_account: Option<(AccountFilter, Box)>, + /// Network level connection config + pub connection_opts: GrpcConnectionOpts, +} + +impl GrpcSubscribeOpts { + /// Cache ALL drift `User` account updates + /// + /// useful for e.g. building the DLOB, fast TX building for makers + /// + /// note: memory requirements ~2GiB + pub fn usermap_on(mut self) -> Self { + self.usermap = true; + self + } + /// Cache ALL drift `UserStats` account updates + /// + /// useful for e.g. fast TX building for makers + pub fn statsmap_on(mut self) -> Self { + self.user_stats_map = true; + self + } + /// Cache account updates for given `users` only + pub fn user_accounts(mut self, users: Vec) -> Self { + self.user_accounts = users; + self + } + /// Set a callback to invoke on new slot updates + /// + /// * `on_slot` - the callback for new slot updates + /// + /// ! `on_slot` must not block the gRPC task + pub fn on_slot(mut self, on_slot: impl Fn(Slot) + 'static) -> Self { + self.on_slot = Some(Box::new(on_slot)); + self + } + /// Register a custom callback for account updates + /// + /// * `filter` - accounts matching filter will invoke the callback + /// * `on_account` - fn to invoke on matching account update + /// + /// ! `on_account` must not block the gRPC task + pub fn on_account( + mut self, + filter: AccountFilter, + on_account: impl Fn(&AccountUpdate) + 'static, + ) -> Self { + self.on_account = Some((filter, Box::new(on_account))); + self + } + /// Set network level connection opts + pub fn connection_opts(mut self, opts: GrpcConnectionOpts) -> Self { + self.connection_opts = opts; + self + } +} + /// Map from K => V pub type MapOf = DashMap; diff --git a/tests/integration.rs b/tests/integration.rs index 1bf3346..8e16729 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,4 +1,7 @@ -use std::time::Duration; +use std::{ + sync::{atomic::AtomicU64, Arc}, + time::Duration, +}; use drift_rs::{ event_subscriber::RpcClient, @@ -110,30 +113,39 @@ async fn client_sync_subscribe_mainnet_grpc() { ) .await .expect("connects"); + + let latest_slot = Arc::new(AtomicU64::default()); + let latest_slot_ref = Arc::clone(&latest_slot); assert!(client .grpc_subscribe( "https://api.rpcpool.com".into(), std::env::var("TEST_GRPC_X_TOKEN").expect("TEST_GRPC_X_TOKEN set"), true, true, - Some(move |slot| { - println!("slot: {slot}"); + Some(move |new_slot| { + println!("slot: {new_slot}"); + latest_slot_ref.store(new_slot, std::sync::atomic::Ordering::Relaxed); }) ) .await .is_ok()); - tokio::time::sleep(Duration::from_secs(8)).await; + tokio::time::sleep(Duration::from_secs(5)).await; - let price = client.oracle_price(MarketId::perp(1)).await.expect("ok"); - assert!(price > 0); - dbg!(price); + // oracles available let price = client.oracle_price(MarketId::perp(4)).await.expect("ok"); assert!(price > 0); dbg!(price); let price = client.oracle_price(MarketId::spot(32)).await.expect("ok"); assert!(price > 0); dbg!(price); + + // markets available + assert!(client.try_get_perp_market_account(1).is_ok()); + assert!(client.try_get_spot_market_account(1).is_ok()); + + // slot updated + assert!(latest_slot.load(std::sync::atomic::Ordering::Relaxed) > 0); } #[tokio::test] From 697a2f4a9361c996664c5ac1e529ecc1bf53920f Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 12:12:41 +0800 Subject: [PATCH 05/11] tidying up --- crates/src/grpc/grpc_subscriber.rs | 180 +++++++++++++++++++++++++---- crates/src/lib.rs | 33 ++++-- crates/src/types.rs | 11 +- tests/integration.rs | 57 +++++---- 4 files changed, 227 insertions(+), 54 deletions(-) diff --git a/crates/src/grpc/grpc_subscriber.rs b/crates/src/grpc/grpc_subscriber.rs index b93b9b1..ea26a5d 100644 --- a/crates/src/grpc/grpc_subscriber.rs +++ b/crates/src/grpc/grpc_subscriber.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, time::Duration}; -use crate::constants::PROGRAM_ID as DRIFT_PROGRAM_ID; +use crate::{constants::PROGRAM_ID as DRIFT_PROGRAM_ID, types::UnsubHandle}; use ahash::HashSet; use futures_util::{sink::SinkExt, stream::StreamExt}; @@ -28,7 +28,7 @@ use yellowstone_grpc_proto::{ type SlotsFilterMap = HashMap; type AccountFilterMap = HashMap; -type HookFn = dyn Fn(&AccountUpdate) + 'static; +type HookFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static; type Hooks = Vec<(AccountFilter, Box)>; /// Account update from gRPC @@ -146,17 +146,17 @@ pub struct GrpcConnectionOpts { connect_timeout_ms: Option, /// Sets the tower service default internal buffer size, default is 1024 buffer_size: Option, - /// Sets whether to use an adaptive flow control. Uses hyper’s default otherwise. + /// Sets whether to use an adaptive flow control. Uses hyper's default otherwise. http2_adaptive_window: Option, - /// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. + /// Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper's default otherwise. http2_keep_alive_interval_ms: Option, /// Sets the max connection-level flow control for HTTP2, default is 65,535 initial_connection_window_size: Option, ///Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control, default is 65,535 initial_stream_window_size: Option, - ///Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper’s default otherwise. + ///Set http2 KEEP_ALIVE_TIMEOUT. Uses hyper's default otherwise. keep_alive_timeout_ms: Option, - /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper’s default otherwise. + /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hyper's default otherwise. keep_alive_while_idle: Option, /// Set whether TCP keepalive messages are enabled on accepted connections. tcp_keepalive_ms: Option, @@ -219,7 +219,7 @@ pub struct DriftGrpcClient { x_token: String, grpc_opts: Option, on_account_hooks: Hooks, - on_slot: Box, + on_slot: Box, } impl DriftGrpcClient { @@ -245,7 +245,7 @@ impl DriftGrpcClient { /// Add a callback on slot updates /// /// `on_slot` must prioritize fast handling or risk blocking the gRPC thread - pub fn on_slot(&mut self, on_slot: F) { + pub fn on_slot(&mut self, on_slot: F) { self.on_slot = Box::new(on_slot); } @@ -257,7 +257,7 @@ impl DriftGrpcClient { /// * `on_account` - fn to receive callback on filter match /// /// DEV: `on_account` must prioritize fast handling or risk blocking the gRPC thread - pub fn on_account( + pub fn on_account( &mut self, filter: AccountFilter, on_account: T, @@ -266,11 +266,13 @@ impl DriftGrpcClient { } /// Start subscription for geyser updates + /// + /// Returns an unsub handle on success pub async fn subscribe( self, commitment: CommitmentLevel, subscribe_opts: SubscribeOpts, - ) -> Result<(), GrpcError> { + ) -> Result { let mut grpc_client = grpc_connect( self.endpoint.as_str(), self.x_token.as_str(), @@ -282,22 +284,62 @@ impl DriftGrpcClient { GrpcError::Geyser(err) })?; - grpc_client.ping(1).await.map_err(GrpcError::Client)?; - info!("gRPC connected 🔌"); + let resp = grpc_client.get_version().await.map_err(GrpcError::Client)?; + info!("gRPC connected 🔌: {}", resp.version); let request = subscribe_opts.to_subscribe_request(commitment); info!(target: "grpc", "gRPC subscribing: {request:?}"); - // gRPC receives updates very frequently, don't want tokio scheduling processing elsewhere - let localset = tokio::task::LocalSet::new(); - localset.spawn_local(Self::geyser_subscribe( - grpc_client, - request, - self.on_account_hooks, - self.on_slot, - )); + let (subscribe_tx, subscribe_rx) = tokio::sync::oneshot::channel::<()>(); + let (unsub_tx, unsub_rx) = tokio::sync::oneshot::channel::<()>(); + + // gRPC receives updates very frequently, don't want tokio scheduler moving it + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + if let Err(ref err) = rt { + log::error!("failed grpc thread init: {err:?}"); + } + let rt = rt.unwrap(); + + rt.spawn(async move { + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + }); + + let _task = rt.spawn(Self::geyser_subscribe( + grpc_client, + request, + self.on_account_hooks, + self.on_slot, + )); + + rt.spawn(async move { + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + info!("HELLO FROM CT RT"); + }); + + subscribe_tx.send(()).expect("sent"); + // nb: will cause grpc task to drop but doesn't call any 'unsub' endpoint + let _ = unsub_rx.blocking_recv(); + info!(target: "grpc", "gRPC connection unsubscribed"); + }); + + if let Err(err) = subscribe_rx.await { + error!("WTF: {err:?}"); + } + info!(target: "grpc", "gRPC subscribed ⚡️"); - Ok(()) + Ok(unsub_tx) } /// Run the gRPC subscription task @@ -332,6 +374,7 @@ impl DriftGrpcClient { }; while let Some(message) = stream.next().await { + dbg!("update"); match message { Ok(msg) => { match msg.update_oneof { @@ -348,7 +391,7 @@ impl DriftGrpcClient { ); log::trace!(target: "grpc", "account update: {pubkey}"); let update = AccountUpdate { - owner: DRIFT_PROGRAM_ID, // assuming not subscribed to any other accounts.. + owner: DRIFT_PROGRAM_ID, // assuming not subscribed to any other program accounts.. pubkey, slot: latest_slot, lamports: account.lamports, @@ -532,3 +575,96 @@ async fn grpc_connect( builder.connect().await } + +#[cfg(test)] +mod test { + use super::*; + use solana_sdk::pubkey::Pubkey; + + fn create_test_account(data: Vec) -> SubscribeUpdateAccountInfo { + SubscribeUpdateAccountInfo { + data, + ..Default::default() + } + } + + #[test] + fn grpc_partial_match_discriminator() { + let discriminator = &[1, 2, 3, 4, 5, 6, 7, 8]; + let filter = AccountFilter::partial().with_discriminator(discriminator); + + // Test matching discriminator + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + assert!(filter.matches(&Pubkey::new_unique(), &account)); + + // Test non-matching discriminator + let account = create_test_account(vec![8, 7, 6, 5, 4, 3, 2, 1, 9, 10]); + assert!(!filter.matches(&Pubkey::new_unique(), &account)); + } + + #[test] + fn grpc_partial_match_accounts() { + let pubkey = Pubkey::new_unique(); + let filter = AccountFilter::partial().with_accounts([pubkey].into_iter()); + + // Test matching pubkey + let account = create_test_account(vec![1, 2, 3, 4]); + assert!(filter.matches(&pubkey, &account)); + + // Test non-matching pubkey + let other_pubkey = Pubkey::new_unique(); + assert!(!filter.matches(&other_pubkey, &account)); + } + + #[test] + fn grpc_partial_match_memcmp() { + let memcmp = Memcmp::new_raw_bytes(0, vec![1, 2, 3]); + let filter = AccountFilter::partial().with_memcmp(memcmp); + + // Test matching memcmp + let account = create_test_account(vec![1, 2, 3, 4, 5]); + assert!(filter.matches(&Pubkey::new_unique(), &account)); + + // Test non-matching memcmp + let account = create_test_account(vec![3, 2, 1, 4, 5]); + assert!(!filter.matches(&Pubkey::new_unique(), &account)); + } + + #[test] + fn grpc_full_match_all_filters() { + let pubkey = Pubkey::new_unique(); + let discriminator = &[1, 2, 3, 4, 5, 6, 7, 8]; + let memcmp = Memcmp::new_raw_bytes(8, vec![9, 10]); + + let filter = AccountFilter::full() + .with_discriminator(discriminator) + .with_accounts([pubkey].into_iter()) + .with_memcmp(memcmp); + + // Test all match + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + assert!(filter.matches(&pubkey, &account)); + + // Test discriminator mismatch + let account = create_test_account(vec![8, 7, 6, 5, 4, 3, 2, 1, 9, 10]); + assert!(!filter.matches(&pubkey, &account)); + + // Test pubkey mismatch + let other_pubkey = Pubkey::new_unique(); + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + assert!(!filter.matches(&other_pubkey, &account)); + + // Test memcmp mismatch + let account = create_test_account(vec![1, 2, 3, 4, 5, 6, 7, 8, 10, 9]); + assert!(!filter.matches(&pubkey, &account)); + } + + #[test] + fn grpc_firehose_matches_everything() { + let filter = AccountFilter::firehose(); + let pubkey = Pubkey::new_unique(); + let account = create_test_account(vec![1, 2, 3, 4]); + + assert!(filter.matches(&pubkey, &account)); + } +} diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 638afb2..4b758c3 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -3,7 +3,7 @@ use std::{ borrow::Cow, collections::BTreeSet, - sync::{atomic::AtomicBool, Arc}, + sync::{Arc, RwLock}, time::Duration, }; @@ -743,6 +743,11 @@ impl DriftClient { self.backend.grpc_subscribe(endpoint, x_token, opts).await } + /// Unsubscribe the gRPC connection + pub fn grpc_unsubscribe(&self) { + self.backend.grpc_unsubscribe(); + } + /// Return a reference to the internal backend #[cfg(feature = "unsafe_pub")] pub fn backend(&self) -> &'static DriftClientBackend { @@ -761,7 +766,7 @@ pub struct DriftClientBackend { perp_market_map: MarketMap, spot_market_map: MarketMap, oracle_map: OracleMap, - grpc_subscribed: AtomicBool, + grpc_unsub: RwLock>, } impl DriftClientBackend { /// Initialize a new `DriftClientBackend` @@ -831,14 +836,14 @@ impl DriftClientBackend { perp_market_map, spot_market_map, oracle_map, - grpc_subscribed: AtomicBool::new(false), + grpc_unsub: RwLock::default(), }) } /// Returns true if `DriftClientBackend` is subscribed via gRPC pub fn is_grpc_subscribed(&self) -> bool { - self.grpc_subscribed - .load(std::sync::atomic::Ordering::Relaxed) + let unsub = self.grpc_unsub.read().unwrap(); + unsub.is_some() } /// Start subscription for latest block hashes @@ -936,9 +941,22 @@ impl DriftClientBackend { }); // start subscription - grpc.subscribe(CommitmentLevel::Confirmed, SubscribeOpts::default()) + let grpc_unsub = grpc + .subscribe(CommitmentLevel::Confirmed, SubscribeOpts::default()) .await - .map_err(Into::into) + .map_err(|err| SdkError::Grpc(err))?; + + let mut unsub = self.grpc_unsub.write().unwrap(); + let _ = unsub.insert(grpc_unsub); + + Ok(()) + } + + /// Unsubscribe the gRPC connection + fn grpc_unsubscribe(&self) { + let mut guard = self.grpc_unsub.write().unwrap(); + let unsub = guard.take(); + unsub.map(|u| u.send(())); } /// End subscriptions to live program data @@ -2115,6 +2133,7 @@ mod tests { Arc::clone(&rpc_client), CommitmentConfig::processed(), ), + grpc_unsub: Default::default(), }; DriftClient { diff --git a/crates/src/types.rs b/crates/src/types.rs index 28a81e0..9a73fa0 100644 --- a/crates/src/types.rs +++ b/crates/src/types.rs @@ -59,9 +59,12 @@ pub struct GrpcSubscribeOpts { /// list of user (sub)accounts to subscribe pub user_accounts: Vec, /// callback for slot updates - pub on_slot: Option>, + pub on_slot: Option>, /// custom callback for account updates - pub on_account: Option<(AccountFilter, Box)>, + pub on_account: Option<( + AccountFilter, + Box, + )>, /// Network level connection config pub connection_opts: GrpcConnectionOpts, } @@ -93,7 +96,7 @@ impl GrpcSubscribeOpts { /// * `on_slot` - the callback for new slot updates /// /// ! `on_slot` must not block the gRPC task - pub fn on_slot(mut self, on_slot: impl Fn(Slot) + 'static) -> Self { + pub fn on_slot(mut self, on_slot: impl Fn(Slot) + Send + Sync + 'static) -> Self { self.on_slot = Some(Box::new(on_slot)); self } @@ -106,7 +109,7 @@ impl GrpcSubscribeOpts { pub fn on_account( mut self, filter: AccountFilter, - on_account: impl Fn(&AccountUpdate) + 'static, + on_account: impl Fn(&AccountUpdate) + Send + Sync + 'static, ) -> Self { self.on_account = Some((filter, Box::new(on_account))); self diff --git a/tests/integration.rs b/tests/integration.rs index 8e16729..1b33a76 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,17 +1,17 @@ -use std::{ - sync::{atomic::AtomicU64, Arc}, - time::Duration, -}; +use std::time::Duration; +use anchor_lang::Discriminator; use drift_rs::{ + constants::DEFAULT_PUBKEY, event_subscriber::RpcClient, + grpc::grpc_subscriber::AccountFilter, math::constants::{BASE_PRECISION_I64, LAMPORTS_PER_SOL_I64, PRICE_PRECISION_U64}, - types::{accounts::User, Context, MarketId, NewOrder, PostOnlyParam}, + types::{accounts::User, Context, GrpcSubscribeOpts, MarketId, NewOrder, PostOnlyParam}, utils::test_envs::{devnet_endpoint, mainnet_endpoint, test_keypair}, - DriftClient, TransactionBuilder, Wallet, + DriftClient, Pubkey, TransactionBuilder, Wallet, }; use futures_util::StreamExt; -use solana_sdk::signature::Keypair; +use solana_sdk::{clock::Slot, signature::Keypair}; #[tokio::test] async fn client_sync_subscribe_all_devnet() { @@ -103,7 +103,8 @@ async fn client_sync_subscribe_mainnet() { dbg!(price); } -#[tokio::test] +// run with multithreaded RT otherwise the gRPC worker thread will block the test +#[tokio::test(flavor = "multi_thread")] async fn client_sync_subscribe_mainnet_grpc() { let _ = env_logger::try_init(); let client = DriftClient::new( @@ -114,23 +115,33 @@ async fn client_sync_subscribe_mainnet_grpc() { .await .expect("connects"); - let latest_slot = Arc::new(AtomicU64::default()); - let latest_slot_ref = Arc::clone(&latest_slot); + let (slot_update_tx, mut slot_update_rx) = tokio::sync::mpsc::channel::(1); + let (user_update_tx, mut user_update_rx) = tokio::sync::mpsc::channel::(1); + assert!(client .grpc_subscribe( "https://api.rpcpool.com".into(), std::env::var("TEST_GRPC_X_TOKEN").expect("TEST_GRPC_X_TOKEN set"), - true, - true, - Some(move |new_slot| { - println!("slot: {new_slot}"); - latest_slot_ref.store(new_slot, std::sync::atomic::Ordering::Relaxed); - }) + GrpcSubscribeOpts::default() + .usermap_on() + .on_slot(move |new_slot| { + println!("slot: {new_slot}"); + slot_update_tx.try_send(new_slot).expect("update sent"); + }) + .on_account( + AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), + move |account| { + println!("account: {}", account.pubkey); + user_update_tx + .try_send(account.pubkey) + .expect("update sent"); + } + ) ) .await .is_ok()); - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(10)).await; // oracles available let price = client.oracle_price(MarketId::perp(4)).await.expect("ok"); @@ -140,12 +151,16 @@ async fn client_sync_subscribe_mainnet_grpc() { assert!(price > 0); dbg!(price); + // slot update received + assert!(slot_update_rx.try_recv().is_ok_and(|s| s > 0)); + + // user update received + assert!(user_update_rx.try_recv().is_ok_and(|u| u != DEFAULT_PUBKEY)); + // markets available - assert!(client.try_get_perp_market_account(1).is_ok()); + assert!(client.try_get_perp_market_account(0).is_ok()); assert!(client.try_get_spot_market_account(1).is_ok()); - - // slot updated - assert!(latest_slot.load(std::sync::atomic::Ordering::Relaxed) > 0); + } #[tokio::test] From 3f08eedf2bdcefe48a271562738d92aa7a2eaa9c Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 14:21:46 +0800 Subject: [PATCH 06/11] fixes --- crates/src/grpc/grpc_subscriber.rs | 84 +++++++++++++++--------------- crates/src/lib.rs | 4 +- tests/integration.rs | 22 ++++---- 3 files changed, 53 insertions(+), 57 deletions(-) diff --git a/crates/src/grpc/grpc_subscriber.rs b/crates/src/grpc/grpc_subscriber.rs index ea26a5d..c2a1e70 100644 --- a/crates/src/grpc/grpc_subscriber.rs +++ b/crates/src/grpc/grpc_subscriber.rs @@ -3,7 +3,10 @@ use std::{collections::HashMap, time::Duration}; use crate::{constants::PROGRAM_ID as DRIFT_PROGRAM_ID, types::UnsubHandle}; use ahash::HashSet; -use futures_util::{sink::SinkExt, stream::StreamExt}; +use futures_util::{ + sink::SinkExt, + stream::{FuturesUnordered, StreamExt}, +}; use log::{error, info, warn}; use solana_rpc_client_api::filter::Memcmp; use solana_sdk::{ @@ -23,7 +26,7 @@ use yellowstone_grpc_proto::{ SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterSlots, SubscribeRequestPing, }, - tonic::transport::Certificate, + tonic::{transport::Certificate, Status}, }; type SlotsFilterMap = HashMap; @@ -211,6 +214,8 @@ pub enum GrpcError { Geyser(GeyserGrpcBuilderError), #[error("grpc request err: {0}")] Client(GeyserGrpcClientError), + #[error("grpc stream err: {0}")] + Stream(Status), } /// specialized Drift gRPC client @@ -289,56 +294,46 @@ impl DriftGrpcClient { let request = subscribe_opts.to_subscribe_request(commitment); info!(target: "grpc", "gRPC subscribing: {request:?}"); - let (subscribe_tx, subscribe_rx) = tokio::sync::oneshot::channel::<()>(); - let (unsub_tx, unsub_rx) = tokio::sync::oneshot::channel::<()>(); + let (unsub_tx, mut unsub_rx) = tokio::sync::oneshot::channel::<()>(); // gRPC receives updates very frequently, don't want tokio scheduler moving it - std::thread::spawn(move || { + std::thread::spawn(|| { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .build(); - if let Err(ref err) = rt { - log::error!("failed grpc thread init: {err:?}"); - } - let rt = rt.unwrap(); - - rt.spawn(async move { - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - }); - - let _task = rt.spawn(Self::geyser_subscribe( + .build() + .unwrap(); + let ls = tokio::task::LocalSet::new(); + let geyser_task = ls.spawn_local(Self::geyser_subscribe( grpc_client, request, self.on_account_hooks, self.on_slot, )); - - rt.spawn(async move { - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); - info!("HELLO FROM CT RT"); + let mut waiter = FuturesUnordered::new(); + waiter.push(geyser_task); + + // nb: will cause grpc task to drop when triggered but- + // it doesn't call any 'unsub' endpoint + let _ = ls.block_on(&rt, async move { + loop { + tokio::select! { + biased; + _ = &mut unsub_rx => break, + res = waiter.next() => { + if let Ok(Some(err)) = res.unwrap() { + log::error!(target: "grpc", "subscription task failed: {err:?}"); + } else { + log::error!(target: "grpc", "subscription task ended unexpectedly"); + } + break; + } + } + } }); - - subscribe_tx.send(()).expect("sent"); - // nb: will cause grpc task to drop but doesn't call any 'unsub' endpoint - let _ = unsub_rx.blocking_recv(); info!(target: "grpc", "gRPC connection unsubscribed"); }); - if let Err(err) = subscribe_rx.await { - error!("WTF: {err:?}"); - } - info!(target: "grpc", "gRPC subscribed ⚡️"); - Ok(unsub_tx) } @@ -350,10 +345,11 @@ impl DriftGrpcClient { request: SubscribeRequest, on_account: Hooks, on_slot: impl Fn(Slot), - ) { + ) -> Option { let max_retries = 3; let mut retry_count = 0; let mut latest_slot = 0; + let mut last_error: Option = None; loop { if retry_count >= max_retries { log::warn!(target: "grpc", "max retry attempts reached. disconnecting..."); @@ -369,12 +365,12 @@ impl DriftGrpcClient { log::warn!(target: "grpc", "failed subscription: {err:?}"); retry_count += 1; tokio::time::sleep(Duration::from_secs(2_u64.pow(retry_count + 1))).await; + let _ = last_error.insert(GrpcError::Client(err)); continue; } }; while let Some(message) = stream.next().await { - dbg!("update"); match message { Ok(msg) => { match msg.update_oneof { @@ -443,13 +439,14 @@ impl DriftGrpcClient { warn!(target: "grpc", "unhandled update: {other_update:?}"); } None => { - error!(target: "grpc", "update not found in the message"); + warn!(target: "grpc", "received empty update"); break; } } } - Err(error) => { - error!(target: "grpc", "stream error: {error:?}"); + Err(status) => { + error!(target: "grpc", "stream error: {status:?}"); + let _ = last_error.insert(GrpcError::Stream(status)); break; } } @@ -457,6 +454,7 @@ impl DriftGrpcClient { } error!(target: "grpc", "gRPC stream closed"); + last_error } } diff --git a/crates/src/lib.rs b/crates/src/lib.rs index 4b758c3..fd6291d 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -972,7 +972,7 @@ impl DriftClientBackend { &self, market_index: u16, ) -> Option> { - if self.perp_market_map.is_subscribed(market_index) { + if self.is_grpc_subscribed() || self.perp_market_map.is_subscribed(market_index) { self.perp_market_map.get(&market_index) } else { None @@ -983,7 +983,7 @@ impl DriftClientBackend { &self, market_index: u16, ) -> Option> { - if self.spot_market_map.is_subscribed(market_index) { + if self.is_grpc_subscribed() || self.spot_market_map.is_subscribed(market_index) { self.spot_market_map.get(&market_index) } else { None diff --git a/tests/integration.rs b/tests/integration.rs index 1b33a76..edd34db 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -103,8 +103,7 @@ async fn client_sync_subscribe_mainnet() { dbg!(price); } -// run with multithreaded RT otherwise the gRPC worker thread will block the test -#[tokio::test(flavor = "multi_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn client_sync_subscribe_mainnet_grpc() { let _ = env_logger::try_init(); let client = DriftClient::new( @@ -126,22 +125,21 @@ async fn client_sync_subscribe_mainnet_grpc() { .usermap_on() .on_slot(move |new_slot| { println!("slot: {new_slot}"); - slot_update_tx.try_send(new_slot).expect("update sent"); + let _ = slot_update_tx.try_send(new_slot); }) .on_account( AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), move |account| { println!("account: {}", account.pubkey); - user_update_tx - .try_send(account.pubkey) - .expect("update sent"); + let _ = user_update_tx.try_send(account.pubkey); } ) ) .await .is_ok()); - tokio::time::sleep(Duration::from_secs(10)).await; + // wait for updates + tokio::time::sleep(Duration::from_secs(6)).await; // oracles available let price = client.oracle_price(MarketId::perp(4)).await.expect("ok"); @@ -151,16 +149,16 @@ async fn client_sync_subscribe_mainnet_grpc() { assert!(price > 0); dbg!(price); + // markets available + assert!(client.try_get_perp_market_account(0).is_ok()); + assert!(client.try_get_spot_market_account(1).is_ok()); + // slot update received assert!(slot_update_rx.try_recv().is_ok_and(|s| s > 0)); // user update received assert!(user_update_rx.try_recv().is_ok_and(|u| u != DEFAULT_PUBKEY)); - - // markets available - assert!(client.try_get_perp_market_account(0).is_ok()); - assert!(client.try_get_spot_market_account(1).is_ok()); - + client.grpc_unsubscribe(); } #[tokio::test] From fee83042f42f384cc658df7368f6c7bae7a9a916 Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 16:43:36 +0800 Subject: [PATCH 07/11] update README --- README.md | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 4879477..7b5e7be 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,12 @@ _*_ crates.io requires [libdrift](https://github.com/drift-labs/drift-ffi-sys/?t ## Use +The `DriftClient` struct provides methods for reading drift program accounts and crafting transactions. +It is built on a subscription model where live account updates are transparently cached and made accessible via accessor methods. +The client may be subscribed either via Ws or gRPC. + ```rust -use drift_rs::{DriftClient, Wallet}; +use drift_rs::{AccountFilter, DriftClient, GrpcSubscribeOpts, Wallet}; use solana_sdk::signature::Keypair; async fn main() { @@ -42,11 +46,36 @@ async fn main() { .await .expect("connects"); - // Subscribe to Ws-based live market and price changes + // Subscribe via WebSocket + // + // 1) Ws-based live market and price changes let markets = [MarketId::spot(1), MarketId::perp(0)]; client.subscribe_markets(&markets).await.unwrap(); client.subscribe_oracles(&markets).await.unwrap(); -} + client.subscribe_account("SUBACCOUNT_1"); + + // OR 2) subscribe via gRPC (advanced) + // gRPC automatically subscribes to all markets and oracles + client.grpc_subscribe( + "https://grpc.example.com".into(), + "API-X-TOKEN".into(), + GrpcSubscribeOpts::default() + .user_accounts("SUBACCOUNT_1", "SUB_ACCOUNT_2") + .on_slot(move |new_slot| { + // do something on slot + }) + .on_account( + AccountFilter::partial().with_discriminator(User::DISCRIMINATOR), + move |account| { + // do something on user account updates + }) + ).await; + + // + // Fetch latest values + /// + let sol_perp_price = client.oracle_price(MarketId::perp(0)); + let subaccount_1: User = client.try_get_account("SUBACCOUNT_1")); ``` ## Setup From d9010c67d62215519e4066c7989c956c7136dd01 Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 16:55:46 +0800 Subject: [PATCH 08/11] fmt --- crates/src/drift_idl.rs | 3 ++- crates/src/ffi.rs | 5 ++++- crates/src/grpc/grpc_subscriber.rs | 7 ++++--- crates/src/grpc/mod.rs | 1 + crates/src/lib.rs | 3 +-- crates/src/math/liquidation.rs | 8 +++++--- crates/src/swift_order_subscriber.rs | 6 ++++-- 7 files changed, 21 insertions(+), 12 deletions(-) diff --git a/crates/src/drift_idl.rs b/crates/src/drift_idl.rs index 7a3d1b9..3aff1c3 100644 --- a/crates/src/drift_idl.rs +++ b/crates/src/drift_idl.rs @@ -2105,8 +2105,9 @@ pub mod instructions { } pub mod types { #![doc = r" IDL types"] - use super::*; use std::ops::Mul; + + use super::*; #[doc = ""] #[doc = " backwards compatible u128 deserializing data from rust <=1.76.0 when u/i128 was 8-byte aligned"] #[doc = " https://solana.stackexchange.com/questions/7720/using-u128-without-sacrificing-alignment-8"] diff --git a/crates/src/ffi.rs b/crates/src/ffi.rs index f6ca8dd..89091f7 100644 --- a/crates/src/ffi.rs +++ b/crates/src/ffi.rs @@ -457,7 +457,10 @@ mod tests { use super::{simulate_place_perp_order, AccountWithKey, AccountsList, MarginContextMode}; use crate::{ accounts::State, - constants::{self, ids::pyth_program}, + constants::{ + ids::pyth_program, + {self}, + }, create_account_info, drift_idl::{ accounts::{PerpMarket, SpotMarket, User}, diff --git a/crates/src/grpc/grpc_subscriber.rs b/crates/src/grpc/grpc_subscriber.rs index c2a1e70..a5d0a35 100644 --- a/crates/src/grpc/grpc_subscriber.rs +++ b/crates/src/grpc/grpc_subscriber.rs @@ -1,7 +1,5 @@ use std::{collections::HashMap, time::Duration}; -use crate::{constants::PROGRAM_ID as DRIFT_PROGRAM_ID, types::UnsubHandle}; - use ahash::HashSet; use futures_util::{ sink::SinkExt, @@ -29,6 +27,8 @@ use yellowstone_grpc_proto::{ tonic::{transport::Certificate, Status}, }; +use crate::{constants::PROGRAM_ID as DRIFT_PROGRAM_ID, types::UnsubHandle}; + type SlotsFilterMap = HashMap; type AccountFilterMap = HashMap; type HookFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static; @@ -576,9 +576,10 @@ async fn grpc_connect( #[cfg(test)] mod test { - use super::*; use solana_sdk::pubkey::Pubkey; + use super::*; + fn create_test_account(data: Vec) -> SubscribeUpdateAccountInfo { SubscribeUpdateAccountInfo { data, diff --git a/crates/src/grpc/mod.rs b/crates/src/grpc/mod.rs index e662963..5fc4540 100644 --- a/crates/src/grpc/mod.rs +++ b/crates/src/grpc/mod.rs @@ -1 +1,2 @@ +//! Drift gRPC module pub mod grpc_subscriber; diff --git a/crates/src/lib.rs b/crates/src/lib.rs index fd6291d..e44593a 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -10,7 +10,6 @@ use std::{ use anchor_lang::{AccountDeserialize, Discriminator, InstructionData}; pub use drift_pubsub_client::PubsubClient; use futures_util::TryFutureExt; -use grpc::grpc_subscriber::SubscribeOpts; use log::debug; pub use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::response::Response; @@ -34,7 +33,7 @@ use crate::{ ProgramData, DEFAULT_PUBKEY, SYSVAR_INSTRUCTIONS_PUBKEY, }, drift_idl::traits::ToAccountMetas, - grpc::grpc_subscriber::{AccountFilter, DriftGrpcClient}, + grpc::grpc_subscriber::{AccountFilter, DriftGrpcClient, SubscribeOpts}, marketmap::MarketMap, oraclemap::{Oracle, OracleMap}, swift_order_subscriber::{SignedOrderInfo, SwiftOrderStream}, diff --git a/crates/src/math/liquidation.rs b/crates/src/math/liquidation.rs index 588cabf..51a423b 100644 --- a/crates/src/math/liquidation.rs +++ b/crates/src/math/liquidation.rs @@ -4,6 +4,7 @@ use std::ops::Neg; +use super::get_oracle_normalization_factor; use crate::{ ffi::{ self, calculate_margin_requirement_and_total_collateral_and_liability_info, AccountsList, @@ -23,8 +24,6 @@ use crate::{ DriftClient, MarginMode, MarketId, SdkError, SdkResult, SpotPosition, }; -use super::get_oracle_normalization_factor; - /// Info on a position's liquidation price and unrealized PnL #[derive(Debug)] pub struct LiquidationAndPnlInfo { @@ -375,7 +374,10 @@ mod tests { use super::*; use crate::{ - constants::{self, ids::pyth_program}, + constants::{ + ids::pyth_program, + {self}, + }, drift_idl::types::{HistoricalOracleData, MarketStatus, OracleSource, SpotPosition, AMM}, math::constants::{ AMM_RESERVE_PRECISION, BASE_PRECISION_I64, LIQUIDATION_FEE_PRECISION, PEG_PRECISION, diff --git a/crates/src/swift_order_subscriber.rs b/crates/src/swift_order_subscriber.rs index 288865b..c0a958f 100644 --- a/crates/src/swift_order_subscriber.rs +++ b/crates/src/swift_order_subscriber.rs @@ -16,8 +16,10 @@ use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use tokio_stream::wrappers::ReceiverStream; use tokio_tungstenite::{connect_async, tungstenite::Message}; -pub use crate::types::SignedMsgOrderParamsDelegateMessage as SignedDelegateOrder; -pub use crate::types::SignedMsgOrderParamsMessage as SignedOrder; +pub use crate::types::{ + SignedMsgOrderParamsDelegateMessage as SignedDelegateOrder, + SignedMsgOrderParamsMessage as SignedOrder, +}; use crate::{ constants::MarketExt, types::{Context, MarketId, OrderParams, SdkError, SdkResult}, From 5f84e93a7a17e3d2700ce5b272c6eccafa33d376 Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 17:47:20 +0800 Subject: [PATCH 09/11] Fix accountmap test --- crates/src/account_map.rs | 141 ++++++++++++++++------------- crates/src/drift_idl.rs | 3 +- crates/src/grpc/grpc_subscriber.rs | 59 ++++-------- crates/src/grpc/mod.rs | 113 +++++++++++++++++++++++ crates/src/lib.rs | 16 ++-- crates/src/marketmap.rs | 2 +- crates/src/oraclemap.rs | 2 +- crates/src/types.rs | 97 +------------------- 8 files changed, 223 insertions(+), 210 deletions(-) diff --git a/crates/src/account_map.rs b/crates/src/account_map.rs index 51b6160..b1eb51e 100644 --- a/crates/src/account_map.rs +++ b/crates/src/account_map.rs @@ -12,9 +12,8 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey}; use crate::{ - grpc::grpc_subscriber::AccountUpdate, polled_account_subscriber::PolledAccountSubscriber, - types::DataAndSlot, websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, - UnsubHandle, + grpc::AccountUpdate, polled_account_subscriber::PolledAccountSubscriber, types::DataAndSlot, + websocket_account_subscriber::WebsocketAccountSubscriber, SdkResult, UnsubHandle, }; const LOG_TARGET: &str = "accountmap"; @@ -32,7 +31,8 @@ pub struct AccountMap { pubsub: Arc, rpc: Arc, commitment: CommitmentConfig, - inner: Arc, ahash::RandomState>>, + inner: Arc>, + subscriptions: Arc, ahash::RandomState>>, } impl AccountMap { @@ -46,6 +46,7 @@ impl AccountMap { rpc, commitment, inner: Arc::default(), + subscriptions: Arc::default(), } } /// Subscribe account with Ws @@ -59,9 +60,8 @@ impl AccountMap { debug!(target: LOG_TARGET, "subscribing: {account:?}"); let user = AccountSub::new(Arc::clone(&self.pubsub), self.commitment, *account); - let user = user.subscribe(Arc::clone(&self.inner)).await?; - - self.inner.insert(*account, user); + let sub = user.subscribe(Arc::clone(&self.inner)).await?; + self.subscriptions.insert(*account, sub); Ok(()) } @@ -81,40 +81,50 @@ impl AccountMap { debug!(target: LOG_TARGET, "subscribing: {account:?} @ {interval:?}"); let user = AccountSub::polled(Arc::clone(&self.rpc), *account, interval); - let user = user.subscribe(Arc::clone(&self.inner)).await?; - - self.inner.insert(*account, user); + let sub = user.subscribe(Arc::clone(&self.inner)).await?; + self.subscriptions.insert(*account, sub); Ok(()) } + /// On account update callback for gRPC hook pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) { let accounts = Arc::clone(&self.inner); + let subscriptions = Arc::clone(&self.subscriptions); move |update| { accounts .entry(update.pubkey) .and_modify(|x| { - x.state.data_and_slot.slot = update.slot; - x.state.data_and_slot.raw.resize(update.data.len(), 0); - x.state.data_and_slot.raw.clone_from_slice(update.data); + x.slot = update.slot; + x.raw.resize(update.data.len(), 0); + x.raw.clone_from_slice(update.data); + if update.lamports == 0 { + accounts.remove(&update.pubkey); + } }) - .or_insert(AccountSub:: { - pubkey: update.pubkey, - subscription: SubscriptionImpl::Grpc, - state: Subscribed { - data_and_slot: AccountSlot { - slot: update.slot, - raw: update.data.to_vec(), + .or_insert({ + subscriptions.insert( + update.pubkey, + AccountSub { + pubkey: update.pubkey, + subscription: SubscriptionImpl::Grpc, + state: Subscribed { + unsub: Mutex::default(), + }, }, - unsub: Mutex::default(), - }, + ); + AccountSlot { + slot: update.slot, + raw: update.data.to_vec(), + } }); } } /// Unsubscribe user account pub fn unsubscribe_account(&self, account: &Pubkey) { - if let Some((acc, unsub)) = self.inner.remove(account) { + if let Some((acc, sub)) = self.subscriptions.remove(account) { debug!(target: LOG_TARGET, "unsubscribing: {acc:?}"); - let _ = unsub.unsubscribe(); + self.inner.remove(account); + let _ = sub.unsubscribe(); } } /// Return data of the given `account` as T, if it exists @@ -126,15 +136,14 @@ impl AccountMap { &self, account: &Pubkey, ) -> Option> { - match self.inner.get(account) { - Some(entry) => entry.get_account_data_and_slot(), - None => None, - } + self.inner.get(account).map(|x| DataAndSlot { + slot: x.slot, + data: T::try_deserialize_unchecked(&mut x.raw.as_slice()).expect("deserializes"), + }) } } struct Subscribed { - data_and_slot: AccountSlot, unsub: Mutex>, } struct Unsubscribed; @@ -178,40 +187,54 @@ impl AccountSub { /// Start the subscriber task pub async fn subscribe( self, - accounts: Arc, ahash::RandomState>>, + accounts: Arc>, ) -> SdkResult> { let unsub = match self.subscription { SubscriptionImpl::Ws(ref ws) => { let unsub = ws .subscribe(Self::SUBSCRIPTION_ID, true, move |update| { - accounts.entry(update.pubkey).and_modify(|x| { - x.state.data_and_slot.slot = update.slot; - x.state.data_and_slot.raw.clone_from(&update.data); - }); + accounts + .entry(update.pubkey) + .and_modify(|x| { + x.slot = update.slot; + x.raw.clone_from(&update.data); + if update.lamports == 0 { + accounts.remove(&update.pubkey); + } + }) + .or_insert(AccountSlot { + raw: update.data.clone(), + slot: update.slot, + }); }) .await?; Some(unsub) } SubscriptionImpl::Polled(ref poll) => { let unsub = poll.subscribe(move |update| { - accounts.entry(update.pubkey).and_modify(|x| { - x.state.data_and_slot.slot = update.slot; - x.state.data_and_slot.raw.clone_from(&update.data); - }); + accounts + .entry(update.pubkey) + .and_modify(|x| { + x.slot = update.slot; + x.raw.clone_from(&update.data); + if update.lamports == 0 { + accounts.remove(&update.pubkey); + } + }) + .or_insert(AccountSlot { + raw: update.data.clone(), + slot: update.slot, + }); }); Some(unsub) } SubscriptionImpl::Grpc => None, }; - Ok(AccountSub:: { + Ok(AccountSub { pubkey: self.pubkey, subscription: self.subscription, state: Subscribed { - data_and_slot: AccountSlot { - raw: vec![], - slot: 0, - }, unsub: Mutex::new(unsub), }, }) @@ -219,21 +242,6 @@ impl AccountSub { } impl AccountSub { - /// Return the latest value of the account data along with last updated slot - /// # Panics - /// Panics if account data cannot be deserialized as `T` - pub fn get_account_data_and_slot(&self) -> Option> { - if self.state.data_and_slot.raw.is_empty() { - return None; - } - - Some(DataAndSlot { - slot: self.state.data_and_slot.slot, - data: T::try_deserialize_unchecked(&mut self.state.data_and_slot.raw.as_slice()) - .expect("deserializes"), - }) - } - /// Stop the user subscriber task, if it exists pub fn unsubscribe(self) -> AccountSub { let mut guard = self.state.unsub.lock().expect("acquire"); @@ -266,7 +274,8 @@ mod tests { use super::*; use crate::{ accounts::User, - constants::DEFAULT_PUBKEY, + constants::{state_account, DEFAULT_PUBKEY}, + types::accounts::State, utils::{get_ws_url, test_envs::mainnet_endpoint}, Wallet, }; @@ -290,23 +299,27 @@ mod tests { 0, ); - let (res1, res2) = tokio::join!( + let (res1, res2, res3) = tokio::join!( account_map.subscribe_account(&user_1), account_map.subscribe_account(&user_2), + account_map.subscribe_account_polled(state_account(), Some(Duration::from_secs(2))), ); - assert!(res1.and(res2).is_ok()); + assert!(res1.and(res2).and(res3).is_ok()); let handle = tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(8)).await; let account_data = account_map.account_data::(&user_1); assert!(account_data.is_some_and(|x| x.authority != DEFAULT_PUBKEY)); account_map.unsubscribe_account(&user_1); + let account_data = account_map.account_data::(&user_1); + assert!(account_data.is_none()); + let account_data = account_map.account_data::(&user_2); assert!(account_data.is_some_and(|x| x.authority != DEFAULT_PUBKEY)); - let account_data = account_map.account_data::(&user_1); - assert!(account_data.is_none()); + let state_account = account_map.account_data::(state_account()); + assert!(state_account.is_some()); }); assert!(handle.await.is_ok()); diff --git a/crates/src/drift_idl.rs b/crates/src/drift_idl.rs index 3aff1c3..7a3d1b9 100644 --- a/crates/src/drift_idl.rs +++ b/crates/src/drift_idl.rs @@ -2105,9 +2105,8 @@ pub mod instructions { } pub mod types { #![doc = r" IDL types"] - use std::ops::Mul; - use super::*; + use std::ops::Mul; #[doc = ""] #[doc = " backwards compatible u128 deserializing data from rust <=1.76.0 when u/i128 was 8-byte aligned"] #[doc = " https://solana.stackexchange.com/questions/7720/using-u128-without-sacrificing-alignment-8"] diff --git a/crates/src/grpc/grpc_subscriber.rs b/crates/src/grpc/grpc_subscriber.rs index a5d0a35..9210b90 100644 --- a/crates/src/grpc/grpc_subscriber.rs +++ b/crates/src/grpc/grpc_subscriber.rs @@ -7,11 +7,7 @@ use futures_util::{ }; use log::{error, info, warn}; use solana_rpc_client_api::filter::Memcmp; -use solana_sdk::{ - clock::{Epoch, Slot}, - commitment_config::CommitmentLevel, - pubkey::Pubkey, -}; +use solana_sdk::{clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey}; use yellowstone_grpc_client::{ ClientTlsConfig, GeyserGrpcBuilderError, GeyserGrpcClient, GeyserGrpcClientError, Interceptor, }; @@ -29,29 +25,11 @@ use yellowstone_grpc_proto::{ use crate::{constants::PROGRAM_ID as DRIFT_PROGRAM_ID, types::UnsubHandle}; +use super::{AccountUpdate, OnAccountFn}; + type SlotsFilterMap = HashMap; type AccountFilterMap = HashMap; -type HookFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static; -type Hooks = Vec<(AccountFilter, Box)>; - -/// Account update from gRPC -#[derive(PartialEq, Eq, Clone)] -pub struct AccountUpdate<'a> { - /// the account's pubkey - pub pubkey: Pubkey, - /// lamports in the account - pub lamports: u64, - /// data held in the account - pub data: &'a [u8], - /// the program that owns the account. If executable, the program that loads the account. - pub owner: Pubkey, - /// the account's data contains a loaded program (and is now read-only) - pub executable: bool, - /// the epoch at which the account will next owe rent - pub rent_epoch: Epoch, - /// Slot the update was retrieved - pub slot: Slot, -} +type Hooks = Vec<(AccountFilter, Box)>; /// Provides filter criteria for accounts over gRPC /// @@ -190,9 +168,9 @@ impl Default for GrpcConnectionOpts { } } -/// Options for gRPC subscription +/// Options for the geyser subscription request #[derive(Debug, Default, Clone)] -pub struct SubscribeOpts { +pub struct GeyserSubscribeOpts { /// Filter by presence of field txn_signature accounts_nonempty_txn_signature: Option, /// Filter by Account Pubkey @@ -276,7 +254,7 @@ impl DriftGrpcClient { pub async fn subscribe( self, commitment: CommitmentLevel, - subscribe_opts: SubscribeOpts, + subscribe_opts: GeyserSubscribeOpts, ) -> Result { let mut grpc_client = grpc_connect( self.endpoint.as_str(), @@ -314,18 +292,15 @@ impl DriftGrpcClient { // nb: will cause grpc task to drop when triggered but- // it doesn't call any 'unsub' endpoint - let _ = ls.block_on(&rt, async move { - loop { - tokio::select! { - biased; - _ = &mut unsub_rx => break, - res = waiter.next() => { - if let Ok(Some(err)) = res.unwrap() { - log::error!(target: "grpc", "subscription task failed: {err:?}"); - } else { - log::error!(target: "grpc", "subscription task ended unexpectedly"); - } - break; + ls.block_on(&rt, async move { + tokio::select! { + biased; + _ = &mut unsub_rx => (), + res = waiter.next() => { + if let Ok(Some(err)) = res.unwrap() { + log::error!(target: "grpc", "subscription task failed: {err:?}"); + } else { + log::error!(target: "grpc", "subscription task ended unexpectedly"); } } } @@ -458,7 +433,7 @@ impl DriftGrpcClient { } } -impl SubscribeOpts { +impl GeyserSubscribeOpts { fn to_subscribe_request(&self, commitment: CommitmentLevel) -> SubscribeRequest { let mut accounts = AccountFilterMap::default(); let mut filters = vec![]; diff --git a/crates/src/grpc/mod.rs b/crates/src/grpc/mod.rs index 5fc4540..1da5853 100644 --- a/crates/src/grpc/mod.rs +++ b/crates/src/grpc/mod.rs @@ -1,2 +1,115 @@ //! Drift gRPC module + +use solana_sdk::{ + clock::{Epoch, Slot}, + pubkey::Pubkey, +}; pub mod grpc_subscriber; +use grpc_subscriber::{AccountFilter, GrpcConnectionOpts}; + +/// grpc account update callback +pub type OnAccountFn = dyn Fn(&AccountUpdate) + Send + Sync + 'static; +/// grpc slot update callback +pub type OnSlotFn = dyn Fn(Slot) + Send + Sync + 'static; + +/// Account update from gRPC +#[derive(PartialEq, Eq, Clone)] +pub struct AccountUpdate<'a> { + /// the account's pubkey + pub pubkey: Pubkey, + /// lamports in the account + pub lamports: u64, + /// data held in the account + pub data: &'a [u8], + /// the program that owns the account. If executable, the program that loads the account. + pub owner: Pubkey, + /// the account's data contains a loaded program (and is now read-only) + pub executable: bool, + /// the epoch at which the account will next owe rent + pub rent_epoch: Epoch, + /// Slot the update was retrieved + pub slot: Slot, +} + +/// Config options for drift gRPC subscription +/// +/// ```example(no_run) +/// // subscribe to all user and users stats accounts +/// let opts = GrpcSubscribeOpts::default() +/// .usermap_on() // subscribe to ALL user accounts +/// .statsmap_on(); // subscribe to ALL user stats accounts +/// +/// // cache specific user accounts only and set a new slot callback +/// let first_3_subaccounts = (0_u16..3).into_iter().map(|i| wallet.sub_account(i)).collect(); +/// let opts = GrpcSubscribeOpts::default() +/// .user_accounts(first_3_subaccounts); +/// .on_slot(move |new_slot| {}) // slot callback +/// ``` +/// +#[derive(Default)] +pub struct GrpcSubscribeOpts { + /// toggle usermap + pub usermap: bool, + /// toggle user stats map + pub user_stats_map: bool, + /// list of user (sub)accounts to subscribe + pub user_accounts: Vec, + /// callback for slot updates + pub on_slot: Option>, + /// custom callback for account updates + pub on_account: Option<(AccountFilter, Box)>, + /// Network level connection config + pub connection_opts: GrpcConnectionOpts, +} + +impl GrpcSubscribeOpts { + /// Cache ALL drift `User` account updates + /// + /// useful for e.g. building the DLOB, fast TX building for makers + /// + /// note: memory requirements ~2GiB + pub fn usermap_on(mut self) -> Self { + self.usermap = true; + self + } + /// Cache ALL drift `UserStats` account updates + /// + /// useful for e.g. fast TX building for makers + pub fn statsmap_on(mut self) -> Self { + self.user_stats_map = true; + self + } + /// Cache account updates for given `users` only + pub fn user_accounts(mut self, users: Vec) -> Self { + self.user_accounts = users; + self + } + /// Set a callback to invoke on new slot updates + /// + /// * `on_slot` - the callback for new slot updates + /// + /// ! `on_slot` must not block the gRPC task + pub fn on_slot(mut self, on_slot: impl Fn(Slot) + Send + Sync + 'static) -> Self { + self.on_slot = Some(Box::new(on_slot)); + self + } + /// Register a custom callback for account updates + /// + /// * `filter` - accounts matching filter will invoke the callback + /// * `on_account` - fn to invoke on matching account update + /// + /// ! `on_account` must not block the gRPC task + pub fn on_account( + mut self, + filter: AccountFilter, + on_account: impl Fn(&AccountUpdate) + Send + Sync + 'static, + ) -> Self { + self.on_account = Some((filter, Box::new(on_account))); + self + } + /// Set network level connection opts + pub fn connection_opts(mut self, opts: GrpcConnectionOpts) -> Self { + self.connection_opts = opts; + self + } +} diff --git a/crates/src/lib.rs b/crates/src/lib.rs index e44593a..bd22dd0 100644 --- a/crates/src/lib.rs +++ b/crates/src/lib.rs @@ -33,7 +33,7 @@ use crate::{ ProgramData, DEFAULT_PUBKEY, SYSVAR_INSTRUCTIONS_PUBKEY, }, drift_idl::traits::ToAccountMetas, - grpc::grpc_subscriber::{AccountFilter, DriftGrpcClient, SubscribeOpts}, + grpc::grpc_subscriber::{AccountFilter, DriftGrpcClient, GeyserSubscribeOpts}, marketmap::MarketMap, oraclemap::{Oracle, OracleMap}, swift_order_subscriber::{SignedOrderInfo, SwiftOrderStream}, @@ -43,7 +43,7 @@ use crate::{ }, utils::{get_http_url, get_ws_url}, }; -pub use crate::{types::Context, wallet::Wallet}; +pub use crate::{grpc::GrpcSubscribeOpts, types::Context, wallet::Wallet}; // utils pub mod async_utils; @@ -932,18 +932,18 @@ impl DriftClientBackend { } // set custom callbacks - opts.on_account.map(|(filter, on_account)| { + if let Some((filter, on_account)) = opts.on_account { grpc.on_account(filter, on_account); - }); - opts.on_slot.map(|f| { + } + if let Some(f) = opts.on_slot { grpc.on_slot(f); - }); + } // start subscription let grpc_unsub = grpc - .subscribe(CommitmentLevel::Confirmed, SubscribeOpts::default()) + .subscribe(CommitmentLevel::Confirmed, GeyserSubscribeOpts::default()) .await - .map_err(|err| SdkError::Grpc(err))?; + .map_err(SdkError::Grpc)?; let mut unsub = self.grpc_unsub.write().unwrap(); let _ = unsub.insert(grpc_unsub); diff --git a/crates/src/marketmap.rs b/crates/src/marketmap.rs index bb9dd64..ef93e81 100644 --- a/crates/src/marketmap.rs +++ b/crates/src/marketmap.rs @@ -24,7 +24,7 @@ use crate::{ accounts::State, constants::{self, derive_perp_market_account, derive_spot_market_account, state_account}, drift_idl::types::OracleSource, - grpc::grpc_subscriber::AccountUpdate, + grpc::AccountUpdate, memcmp::get_market_filter, types::MapOf, websocket_account_subscriber::WebsocketAccountSubscriber, diff --git a/crates/src/oraclemap.rs b/crates/src/oraclemap.rs index 1e9d4fa..91acacf 100644 --- a/crates/src/oraclemap.rs +++ b/crates/src/oraclemap.rs @@ -16,7 +16,7 @@ use solana_sdk::{ use crate::{ drift_idl::types::OracleSource, ffi::{get_oracle_price, OraclePriceData}, - grpc::grpc_subscriber::AccountUpdate, + grpc::AccountUpdate, types::MapOf, websocket_account_subscriber::{AccountUpdate as WsAccountUpdate, WebsocketAccountSubscriber}, MarketId, SdkError, SdkResult, UnsubHandle, diff --git a/crates/src/types.rs b/crates/src/types.rs index 9a73fa0..46c0e3e 100644 --- a/crates/src/types.rs +++ b/crates/src/types.rs @@ -6,16 +6,15 @@ use std::{ use dashmap::DashMap; pub use solana_rpc_client_api::config::RpcSendTransactionConfig; +pub use solana_sdk::{ + commitment_config::CommitmentConfig, message::VersionedMessage, + transaction::VersionedTransaction, +}; use solana_sdk::{ - clock::Slot, instruction::{AccountMeta, InstructionError}, pubkey::Pubkey, transaction::TransactionError, }; -pub use solana_sdk::{ - commitment_config::CommitmentConfig, message::VersionedMessage, - transaction::VersionedTransaction, -}; use thiserror::Error; use tokio::sync::oneshot; use tokio_tungstenite::tungstenite; @@ -31,96 +30,10 @@ pub use crate::drift_idl::{ use crate::{ constants::{ids, LUTS_DEVNET, LUTS_MAINNET}, drift_idl::errors::ErrorCode, - grpc::grpc_subscriber::{AccountFilter, AccountUpdate, GrpcConnectionOpts, GrpcError}, + grpc::grpc_subscriber::GrpcError, Wallet, }; -/// Config options for drift gRPC subscription -/// -/// ```example(no_run) -/// // subscribe to all user and users stats accounts -/// let opts = GrpcSubscribeOpts::default() -/// .usermap_on() // subscribe to ALL user accounts -/// .statsmap_on(); // subscribe to ALL user stats accounts -/// -/// // cache specific user accounts only and set a new slot callback -/// let first_3_subaccounts = (0_u16..3).into_iter().map(|i| wallet.sub_account(i)).collect(); -/// let opts = GrpcSubscribeOpts::default() -/// .user_accounts(first_3_subaccounts); -/// .on_slot(move |new_slot| {}) // slot callback -/// ``` -/// -#[derive(Default)] -pub struct GrpcSubscribeOpts { - /// toggle usermap - pub usermap: bool, - /// toggle user stats map - pub user_stats_map: bool, - /// list of user (sub)accounts to subscribe - pub user_accounts: Vec, - /// callback for slot updates - pub on_slot: Option>, - /// custom callback for account updates - pub on_account: Option<( - AccountFilter, - Box, - )>, - /// Network level connection config - pub connection_opts: GrpcConnectionOpts, -} - -impl GrpcSubscribeOpts { - /// Cache ALL drift `User` account updates - /// - /// useful for e.g. building the DLOB, fast TX building for makers - /// - /// note: memory requirements ~2GiB - pub fn usermap_on(mut self) -> Self { - self.usermap = true; - self - } - /// Cache ALL drift `UserStats` account updates - /// - /// useful for e.g. fast TX building for makers - pub fn statsmap_on(mut self) -> Self { - self.user_stats_map = true; - self - } - /// Cache account updates for given `users` only - pub fn user_accounts(mut self, users: Vec) -> Self { - self.user_accounts = users; - self - } - /// Set a callback to invoke on new slot updates - /// - /// * `on_slot` - the callback for new slot updates - /// - /// ! `on_slot` must not block the gRPC task - pub fn on_slot(mut self, on_slot: impl Fn(Slot) + Send + Sync + 'static) -> Self { - self.on_slot = Some(Box::new(on_slot)); - self - } - /// Register a custom callback for account updates - /// - /// * `filter` - accounts matching filter will invoke the callback - /// * `on_account` - fn to invoke on matching account update - /// - /// ! `on_account` must not block the gRPC task - pub fn on_account( - mut self, - filter: AccountFilter, - on_account: impl Fn(&AccountUpdate) + Send + Sync + 'static, - ) -> Self { - self.on_account = Some((filter, Box::new(on_account))); - self - } - /// Set network level connection opts - pub fn connection_opts(mut self, opts: GrpcConnectionOpts) -> Self { - self.connection_opts = opts; - self - } -} - /// Map from K => V pub type MapOf = DashMap; From d6447826bc5e329e271b42d23a57cf96fb560a6c Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 17:58:54 +0800 Subject: [PATCH 10/11] fix --- README.md | 2 +- tests/integration.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7b5e7be..d406fed 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ It is built on a subscription model where live account updates are transparently The client may be subscribed either via Ws or gRPC. ```rust -use drift_rs::{AccountFilter, DriftClient, GrpcSubscribeOpts, Wallet}; +use drift_rs::{AccountFilter, DriftClient, Wallet, grpc::GrpcSubscribeOpts}; use solana_sdk::signature::Keypair; async fn main() { diff --git a/tests/integration.rs b/tests/integration.rs index edd34db..ef4588f 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -6,9 +6,9 @@ use drift_rs::{ event_subscriber::RpcClient, grpc::grpc_subscriber::AccountFilter, math::constants::{BASE_PRECISION_I64, LAMPORTS_PER_SOL_I64, PRICE_PRECISION_U64}, - types::{accounts::User, Context, GrpcSubscribeOpts, MarketId, NewOrder, PostOnlyParam}, + types::{accounts::User, Context, MarketId, NewOrder, PostOnlyParam}, utils::test_envs::{devnet_endpoint, mainnet_endpoint, test_keypair}, - DriftClient, Pubkey, TransactionBuilder, Wallet, + DriftClient, GrpcSubscribeOpts, Pubkey, TransactionBuilder, Wallet, }; use futures_util::StreamExt; use solana_sdk::{clock::Slot, signature::Keypair}; From b3e96142995fdfefaaafc93e6bb5b737a72c708b Mon Sep 17 00:00:00 2001 From: jordy25519 Date: Thu, 17 Apr 2025 18:10:46 +0800 Subject: [PATCH 11/11] ci: set env --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4f041cc..3cb0f0b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -66,3 +66,4 @@ jobs: TEST_DEVNET_RPC_ENDPOINT: ${{ secrets.DEVNET_RPC_ENDPOINT }} TEST_MAINNET_RPC_ENDPOINT: ${{ secrets.MAINNET_RPC_ENDPOINT }} TEST_PRIVATE_KEY: ${{ secrets.TEST_PRIVATE_KEY }} + TEST_GRPC_X_TOKEN: ${{ secrets.TEST_GRPC_X_TOKEN }}