diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e4084a23050c8..bedf945dab32a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -123,6 +123,8 @@ jobs: with: ref: ${{ github.event.inputs.sui_repo_ref || github.ref }} - uses: taiki-e/install-action@nextest + - name: Add postgres to PATH + run: echo "/usr/lib/postgresql/14/bin" >> $GITHUB_PATH - name: Set Swap Space uses: pierotofy/set-swap-space@master with: @@ -406,40 +408,16 @@ jobs: if: needs.diff.outputs.isRust == 'true' timeout-minutes: 45 runs-on: [ ubuntu-ghcloud ] - services: - postgres: - image: postgres - env: - POSTGRES_PASSWORD: postgrespw - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - --name postgres_container - ports: - - 5432:5432 steps: - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # Pin v4.1.1 with: ref: ${{ github.event.inputs.sui_repo_ref || github.ref }} - uses: taiki-e/install-action@nextest - - name: Setup db - run: | - PGPASSWORD=$POSTGRES_PASSWORD psql -h localhost -U $POSTGRES_USER -c 'CREATE DATABASE sui_indexer;' -c 'ALTER SYSTEM SET max_connections = 500;' - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgrespw - - run: docker restart --time 0 postgres_container - - run: sleep 5 + - name: Add postgres to PATH + run: echo "/usr/lib/postgresql/14/bin" >> $GITHUB_PATH - name: tests-requiring-postgres run: | - cargo nextest run --test-threads 1 --package sui-graphql-rpc --test e2e_tests --test examples_validation_tests --test dot_move_e2e --features pg_integration - cargo nextest run --test-threads 1 --package sui-graphql-rpc --lib --features pg_integration -- test_query_cost - cargo nextest run --test-threads 4 --package sui-graphql-e2e-tests --features pg_integration - cargo nextest run --test-threads 1 --package sui-cluster-test --test local_cluster_test --features pg_integration - cargo nextest run --test-threads 1 --package sui-indexer --test ingestion_tests --features pg_integration - env: - POSTGRES_HOST: localhost - POSTGRES_PORT: 5432 + # The tests in these packages have been converted to use a temporary, ephemoral postgres database and so can be run in parallel + cargo nextest run --profile ci --package sui-indexer --package sui-graphql-e2e-tests --package sui-cluster-test --package sui-graphql-rpc --features pg_integration + diff --git a/Cargo.lock b/Cargo.lock index e75d30481f1cc..f5f0e60ca7919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3676,18 +3676,6 @@ dependencies = [ "tokio-postgres", ] -[[package]] -name = "diesel-derive-enum" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81c5131a2895ef64741dad1d483f358c2a229a3a2d1b256778cdc5e146db64d4" -dependencies = [ - "heck 0.4.1", - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", -] - [[package]] name = "diesel_derives" version = "2.2.3" @@ -11839,31 +11827,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serial_test" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" -dependencies = [ - "dashmap", - "futures", - "lazy_static", - "log", - "parking_lot 0.12.1", - "serial_test_derive", -] - -[[package]] -name = "serial_test_derive" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" -dependencies = [ - "proc-macro2 1.0.78", - "quote 1.0.35", - "syn 2.0.48", -] - [[package]] name = "sha-1" version = "0.9.8" @@ -13564,7 +13527,6 @@ dependencies = [ "serde_json", "serde_with 3.9.0", "serde_yaml 0.8.26", - "serial_test", "shared-crypto", "similar", "simulacrum", @@ -13625,13 +13587,14 @@ dependencies = [ "async-trait", "axum 0.7.5", "backoff", + "bb8", "bcs", "cached", "chrono", "clap", "criterion", "diesel", - "diesel-derive-enum", + "diesel-async", "diesel_migrations", "fastcrypto", "futures", diff --git a/Cargo.toml b/Cargo.toml index 284e8885efb48..3ef2a911284af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -319,7 +319,6 @@ derive-syn-parse = "0.1.5" derive_builder = "0.12.0" derive_more = "0.99.17" diesel = "2.2" -diesel-derive-enum = "2.1" diesel_migrations = "2.2" diesel-async = "0.5" dirs = "4.0.0" @@ -439,7 +438,6 @@ rustyline = "9.1.2" rustyline-derive = "0.7.0" schemars = { version = "0.8.21", features = ["either"] } scopeguard = "1.1" -serial_test = "2.0.0" serde = { version = "1.0.144", features = ["derive", "rc"] } serde-name = "0.2.1" serde-reflection = "0.3.6" diff --git a/crates/sui-cluster-test/src/cluster.rs b/crates/sui-cluster-test/src/cluster.rs index 0c30fc06db556..03b0dfb58d48e 100644 --- a/crates/sui-cluster-test/src/cluster.rs +++ b/crates/sui-cluster-test/src/cluster.rs @@ -5,10 +5,12 @@ use super::config::{ClusterTestOpt, Env}; use async_trait::async_trait; use std::net::SocketAddr; use std::path::Path; +use sui_config::local_ip_utils::get_available_port; use sui_config::Config; use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG}; use sui_graphql_rpc::config::{ConnectionConfig, ServiceConfig}; use sui_graphql_rpc::test_infra::cluster::start_graphql_server_with_fn_rpc; +use sui_indexer::tempdb::TempDb; use sui_indexer::test_utils::{start_test_indexer, ReaderWriterConfig}; use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore}; use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv}; @@ -155,7 +157,12 @@ pub struct LocalNewCluster { faucet_key: AccountKeyPair, config_directory: tempfile::TempDir, #[allow(unused)] + data_ingestion_path: tempfile::TempDir, + #[allow(unused)] cancellation_tokens: Vec, + #[allow(unused)] + database: Option, + graphql_url: Option, } impl LocalNewCluster { @@ -163,27 +170,20 @@ impl LocalNewCluster { pub fn swarm(&self) -> &Swarm { &self.test_cluster.swarm } + + pub fn graphql_url(&self) -> &Option { + &self.graphql_url + } } #[async_trait] impl Cluster for LocalNewCluster { async fn start(options: &ClusterTestOpt) -> Result { - let data_ingestion_path = tempdir()?.into_path(); - // TODO: options should contain port instead of address - let fullnode_port = options.fullnode_address.as_ref().map(|addr| { - addr.parse::() - .expect("Unable to parse fullnode address") - .port() - }); - - let indexer_address = options.indexer_address.as_ref().map(|addr| { - addr.parse::() - .expect("Unable to parse indexer address") - }); + let data_ingestion_path = tempdir()?; let mut cluster_builder = TestClusterBuilder::new() .enable_fullnode_events() - .with_data_ingestion_dir(data_ingestion_path.clone()); + .with_data_ingestion_dir(data_ingestion_path.path().to_path_buf()); // Check if we already have a config directory that is passed if let Some(config_dir) = options.config_dir.clone() { @@ -211,10 +211,6 @@ impl Cluster for LocalNewCluster { } } - if let Some(rpc_port) = fullnode_port { - cluster_builder = cluster_builder.with_fullnode_rpc_port(rpc_port); - } - let mut test_cluster = cluster_builder.build().await; // Use the wealthy account for faucet @@ -226,36 +222,39 @@ impl Cluster for LocalNewCluster { let fullnode_url = test_cluster.fullnode_handle.rpc_url.clone(); let mut cancellation_tokens = vec![]; - if let (Some(pg_address), Some(indexer_address)) = - (options.pg_address.clone(), indexer_address) - { - // Start in writer mode + let (database, indexer_url, graphql_url) = if options.with_indexer_and_graphql { + let database = TempDb::new()?; + let pg_address = database.database().url().as_str().to_owned(); + let indexer_jsonrpc_address = format!("127.0.0.1:{}", get_available_port("127.0.0.1")); + let graphql_address = format!("127.0.0.1:{}", get_available_port("127.0.0.1")); + let graphql_url = format!("http://{graphql_address}"); + + // Start indexer writer let (_, _, writer_token) = start_test_indexer( Some(pg_address.clone()), fullnode_url.clone(), ReaderWriterConfig::writer_mode(None, None), - data_ingestion_path.clone(), + data_ingestion_path.path().to_path_buf(), ) .await; cancellation_tokens.push(writer_token.drop_guard()); - // Start in reader mode + // Start indexer jsonrpc service let (_, _, reader_token) = start_test_indexer( - Some(pg_address), + Some(pg_address.clone()), fullnode_url.clone(), - ReaderWriterConfig::reader_mode(indexer_address.to_string()), - data_ingestion_path, + ReaderWriterConfig::reader_mode(indexer_jsonrpc_address.clone()), + data_ingestion_path.path().to_path_buf(), ) .await; cancellation_tokens.push(reader_token.drop_guard()); - } - if let Some(graphql_address) = &options.graphql_address { + // Start the graphql service let graphql_address = graphql_address.parse::()?; let graphql_connection_config = ConnectionConfig::new( Some(graphql_address.port()), Some(graphql_address.ip().to_string()), - options.pg_address.clone(), + Some(pg_address), None, None, None, @@ -268,7 +267,15 @@ impl Cluster for LocalNewCluster { ServiceConfig::test_defaults(), ) .await; - } + + ( + Some(database), + Some(indexer_jsonrpc_address), + Some(graphql_url), + ) + } else { + (None, None, None) + }; // Let nodes connect to one another tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; @@ -279,8 +286,11 @@ impl Cluster for LocalNewCluster { fullnode_url, faucet_key, config_directory: tempfile::tempdir()?, - indexer_url: options.indexer_address.clone(), + data_ingestion_path, + indexer_url, cancellation_tokens, + database, + graphql_url, }) } diff --git a/crates/sui-cluster-test/src/config.rs b/crates/sui-cluster-test/src/config.rs index a40c72ff2af0c..9df8facf131d3 100644 --- a/crates/sui-cluster-test/src/config.rs +++ b/crates/sui-cluster-test/src/config.rs @@ -40,6 +40,11 @@ pub struct ClusterTestOpt { /// URL for the indexer RPC server #[clap(long)] pub graphql_address: Option, + /// Indicate that an indexer and graphql service should be started + /// + /// Only used with a local cluster + #[clap(long)] + pub with_indexer_and_graphql: bool, } fn obfuscated_pg_address(val: &Option, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -68,6 +73,7 @@ impl ClusterTestOpt { pg_address: None, config_dir: None, graphql_address: None, + with_indexer_and_graphql: false, } } } diff --git a/crates/sui-cluster-test/tests/local_cluster_test.rs b/crates/sui-cluster-test/tests/local_cluster_test.rs index 2995a95e26b86..a5bdc47236ef3 100644 --- a/crates/sui-cluster-test/tests/local_cluster_test.rs +++ b/crates/sui-cluster-test/tests/local_cluster_test.rs @@ -16,31 +16,19 @@ async fn test_sui_cluster() { use reqwest::StatusCode; use sui_cluster_test::cluster::Cluster; use sui_cluster_test::cluster::LocalNewCluster; - use sui_cluster_test::config::Env; use sui_graphql_rpc::client::simple_client::SimpleClient; use tokio::time::sleep; telemetry_subscribers::init_for_testing(); - let fullnode_rpc_port: u16 = 9020; - let indexer_rpc_port: u16 = 9124; - let pg_address = "postgres://postgres:postgrespw@localhost:5432/sui_indexer".to_string(); - let graphql_address = format!("127.0.0.1:{}", 8000); - let opts = ClusterTestOpt { - env: Env::NewLocal, - faucet_address: None, - fullnode_address: Some(format!("127.0.0.1:{}", fullnode_rpc_port)), - epoch_duration_ms: Some(60000), - indexer_address: Some(format!("127.0.0.1:{}", indexer_rpc_port)), - pg_address: Some(pg_address), - config_dir: None, - graphql_address: Some(graphql_address), + with_indexer_and_graphql: true, + ..ClusterTestOpt::new_local() }; - let _cluster = LocalNewCluster::start(&opts).await.unwrap(); + let cluster = LocalNewCluster::start(&opts).await.unwrap(); - let grphql_url: String = format!("http://127.0.0.1:{}", 8000); + let grphql_url = cluster.graphql_url().to_owned().unwrap(); sleep(std::time::Duration::from_secs(20)).await; diff --git a/crates/sui-graphql-rpc/Cargo.toml b/crates/sui-graphql-rpc/Cargo.toml index bbc3993caa2e4..803c6e0b35a77 100644 --- a/crates/sui-graphql-rpc/Cargo.toml +++ b/crates/sui-graphql-rpc/Cargo.toml @@ -40,7 +40,6 @@ prometheus.workspace = true rand.workspace = true # todo: cleanup test only deps regex.workspace = true reqwest.workspace = true -serial_test.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true diff --git a/crates/sui-graphql-rpc/src/config.rs b/crates/sui-graphql-rpc/src/config.rs index c61e87fec45db..10e4f44a26f1b 100644 --- a/crates/sui-graphql-rpc/src/config.rs +++ b/crates/sui-graphql-rpc/src/config.rs @@ -45,13 +45,13 @@ pub struct ServerConfig { #[derive(Clone, Eq, PartialEq)] pub struct ConnectionConfig { /// Port to bind the server to - pub(crate) port: u16, + pub port: u16, /// Host to bind the server to - pub(crate) host: String, - pub(crate) db_url: String, - pub(crate) db_pool_size: u32, - pub(crate) prom_url: String, - pub(crate) prom_port: u16, + pub host: String, + pub db_url: String, + pub db_pool_size: u32, + pub prom_url: String, + pub prom_port: u16, } /// Configuration on features supported by the GraphQL service, passed in a TOML-based file. These @@ -373,27 +373,6 @@ impl ConnectionConfig { } } - pub fn ci_integration_test_cfg() -> Self { - Self { - db_url: "postgres://postgres:postgrespw@localhost:5432/sui_graphql_rpc_e2e_tests" - .to_string(), - ..Default::default() - } - } - - pub fn ci_integration_test_cfg_with_db_name( - db_name: String, - port: u16, - prom_port: u16, - ) -> Self { - Self { - db_url: format!("postgres://postgres:postgrespw@localhost:5432/{}", db_name), - port, - prom_port, - ..Default::default() - } - } - pub fn db_name(&self) -> String { self.db_url.split('/').last().unwrap().to_string() } diff --git a/crates/sui-graphql-rpc/src/data/pg.rs b/crates/sui-graphql-rpc/src/data/pg.rs index 1beeb69028ea7..c743e4f0cbae3 100644 --- a/crates/sui-graphql-rpc/src/data/pg.rs +++ b/crates/sui-graphql-rpc/src/data/pg.rs @@ -197,22 +197,22 @@ mod query_cost { #[cfg(all(test, feature = "pg_integration"))] mod tests { use super::*; - use crate::config::ConnectionConfig; use diesel::QueryDsl; use sui_framework::BuiltInFramework; use sui_indexer::{ db::{get_pool_connection, new_connection_pool, reset_database, ConnectionPoolConfig}, models::objects::StoredObject, schema::objects, + tempdb::TempDb, types::IndexedObject, }; #[test] fn test_query_cost() { - let connection_config = ConnectionConfig::default(); + let database = TempDb::new().unwrap(); let mut pool_config = ConnectionPoolConfig::default(); - pool_config.set_pool_size(connection_config.db_pool_size); - let pool = new_connection_pool(&connection_config.db_url, &pool_config).unwrap(); + pool_config.set_pool_size(5); + let pool = new_connection_pool(database.database().url().as_str(), &pool_config).unwrap(); let mut conn = get_pool_connection(&pool).unwrap(); reset_database(&mut conn).unwrap(); diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index cdcd59f208259..0d345e4addf32 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -15,7 +15,6 @@ use crate::data::{DataLoader, Db}; use crate::extensions::directive_checker::DirectiveChecker; use crate::metrics::Metrics; use crate::mutation::Mutation; -use crate::types::chain_identifier::ChainIdentifier; use crate::types::datatype::IMoveDatatype; use crate::types::move_object::IMoveObject; use crate::types::object::IObject; @@ -225,6 +224,7 @@ impl ServerBuilder { self } + #[cfg(all(test, feature = "pg_integration"))] fn build_schema(self) -> Schema { self.schema.finish() } @@ -677,8 +677,11 @@ async fn get_or_init_server_start_time() -> &'static Instant { ONCE.get_or_init(|| async move { Instant::now() }).await } +#[cfg(all(test, feature = "pg_integration"))] pub mod tests { use super::*; + use crate::test_infra::cluster::{prep_executor_cluster, start_cluster}; + use crate::types::chain_identifier::ChainIdentifier; use crate::{ config::{ConnectionConfig, Limits, ServiceConfig, Version}, context_data::db_data_provider::PgManager, @@ -691,18 +694,23 @@ pub mod tests { use serde_json::json; use std::sync::Arc; use std::time::Duration; - use sui_sdk::{wallet_context::WalletContext, SuiClient}; + use sui_indexer::tempdb::get_available_port; + use sui_sdk::SuiClient; use sui_types::digests::get_mainnet_chain_identifier; use sui_types::transaction::TransactionData; use uuid::Uuid; /// Prepares a schema for tests dealing with extensions. Returns a `ServerBuilder` that can be /// further extended with `context_data` and `extension` for testing. - fn prep_schema( - connection_config: Option, - service_config: Option, - ) -> ServerBuilder { - let connection_config = connection_config.unwrap_or_default(); + fn prep_schema(db_url: String, service_config: Option) -> ServerBuilder { + let connection_config = ConnectionConfig { + port: get_available_port(), + host: "127.0.0.1".to_owned(), + db_url, + db_pool_size: 5, + prom_url: "127.0.0.1".to_owned(), + prom_port: get_available_port(), + }; let service_config = service_config.unwrap_or_default(); let reader = PgManager::reader_with_config( @@ -747,13 +755,15 @@ pub mod tests { } fn metrics() -> Metrics { - let binding_address: SocketAddr = "0.0.0.0:9185".parse().unwrap(); + let binding_address: SocketAddr = format!("127.0.0.1:{}", get_available_port()) + .parse() + .unwrap(); let registry = mysten_metrics::start_prometheus_server(binding_address).default_registry(); Metrics::new(®istry) } fn ip_address() -> SocketAddr { - let binding_address: SocketAddr = "0.0.0.0:51515".parse().unwrap(); + let binding_address: SocketAddr = "127.0.0.1:51515".parse().unwrap(); binding_address } @@ -761,7 +771,18 @@ pub mod tests { Uuid::new_v4() } - pub async fn test_timeout_impl(wallet: &WalletContext) { + #[tokio::test] + async fn test_timeout() { + telemetry_subscribers::init_for_testing(); + let cluster = start_cluster(ServiceConfig::test_defaults()).await; + cluster + .wait_for_checkpoint_catchup(1, Duration::from_secs(10)) + .await; + // timeout test includes mutation timeout, which requies a [SuiClient] to be able to run + // the test, and a transaction. [WalletContext] gives access to everything that's needed. + let wallet = &cluster.network.validator_fullnode_handle.wallet; + let db_url = cluster.network.graphql_connection_config.db_url.clone(); + struct TimedExecuteExt { pub min_req_delay: Duration, } @@ -792,12 +813,13 @@ pub mod tests { timeout: Duration, query: &str, sui_client: &SuiClient, + db_url: String, ) -> Response { let mut cfg = ServiceConfig::default(); cfg.limits.request_timeout_ms = timeout.as_millis() as u32; cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32; - let schema = prep_schema(None, Some(cfg)) + let schema = prep_schema(db_url, Some(cfg)) .context_data(Some(sui_client.clone())) .extension(Timeout) .extension(TimedExecuteExt { @@ -813,13 +835,13 @@ pub mod tests { let delay = Duration::from_millis(100); let sui_client = wallet.get_client().await.unwrap(); - test_timeout(delay, timeout, query, &sui_client) + test_timeout(delay, timeout, query, &sui_client, db_url.clone()) .await .into_result() .expect("Should complete successfully"); // Should timeout - let errs: Vec<_> = test_timeout(delay, delay, query, &sui_client) + let errs: Vec<_> = test_timeout(delay, delay, query, &sui_client, db_url.clone()) .await .into_result() .unwrap_err() @@ -862,7 +884,7 @@ pub mod tests { tx_bytes.encoded(), signature_base64.encoded() ); - let errs: Vec<_> = test_timeout(delay, delay, &query, &sui_client) + let errs: Vec<_> = test_timeout(delay, delay, &query, &sui_client, db_url.clone()) .await .into_result() .unwrap_err() @@ -874,10 +896,16 @@ pub mod tests { delay.as_secs_f32() ); assert_eq!(errs, vec![exp]); + + cluster.cleanup_resources().await } - pub async fn test_query_depth_limit_impl() { - async fn exec_query_depth_limit(depth: u32, query: &str) -> Response { + #[tokio::test] + async fn test_query_depth_limit() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); + + async fn exec_query_depth_limit(db_url: String, depth: u32, query: &str) -> Response { let service_config = ServiceConfig { limits: Limits { max_query_depth: depth, @@ -886,19 +914,20 @@ pub mod tests { ..Default::default() }; - let schema = prep_schema(None, Some(service_config)) + let schema = prep_schema(db_url, Some(service_config)) .context_data(PayloadSize(100)) .extension(QueryLimitsChecker) .build_schema(); schema.execute(query).await } - exec_query_depth_limit(1, "{ chainIdentifier }") + exec_query_depth_limit(db_url.clone(), 1, "{ chainIdentifier }") .await .into_result() .expect("Should complete successfully"); exec_query_depth_limit( + db_url.clone(), 5, "{ chainIdentifier protocolConfig { configs { value key }} }", ) @@ -907,7 +936,7 @@ pub mod tests { .expect("Should complete successfully"); // Should fail - let errs: Vec<_> = exec_query_depth_limit(0, "{ chainIdentifier }") + let errs: Vec<_> = exec_query_depth_limit(db_url.clone(), 0, "{ chainIdentifier }") .await .into_result() .unwrap_err() @@ -917,6 +946,7 @@ pub mod tests { assert_eq!(errs, vec!["Query nesting is over 0".to_string()]); let errs: Vec<_> = exec_query_depth_limit( + db_url.clone(), 2, "{ chainIdentifier protocolConfig { configs { value key }} }", ) @@ -929,8 +959,11 @@ pub mod tests { assert_eq!(errs, vec!["Query nesting is over 2".to_string()]); } - pub async fn test_query_node_limit_impl() { - async fn exec_query_node_limit(nodes: u32, query: &str) -> Response { + #[tokio::test] + async fn test_query_node_limit() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); + async fn exec_query_node_limit(db_url: String, nodes: u32, query: &str) -> Response { let service_config = ServiceConfig { limits: Limits { max_query_nodes: nodes, @@ -939,19 +972,20 @@ pub mod tests { ..Default::default() }; - let schema = prep_schema(None, Some(service_config)) + let schema = prep_schema(db_url, Some(service_config)) .context_data(PayloadSize(100)) .extension(QueryLimitsChecker) .build_schema(); schema.execute(query).await } - exec_query_node_limit(1, "{ chainIdentifier }") + exec_query_node_limit(db_url.clone(), 1, "{ chainIdentifier }") .await .into_result() .expect("Should complete successfully"); exec_query_node_limit( + db_url.clone(), 5, "{ chainIdentifier protocolConfig { configs { value key }} }", ) @@ -960,7 +994,7 @@ pub mod tests { .expect("Should complete successfully"); // Should fail - let err: Vec<_> = exec_query_node_limit(0, "{ chainIdentifier }") + let err: Vec<_> = exec_query_node_limit(db_url.clone(), 0, "{ chainIdentifier }") .await .into_result() .unwrap_err() @@ -970,6 +1004,7 @@ pub mod tests { assert_eq!(err, vec!["Query has over 0 nodes".to_string()]); let err: Vec<_> = exec_query_node_limit( + db_url.clone(), 4, "{ chainIdentifier protocolConfig { configs { value key }} }", ) @@ -982,7 +1017,11 @@ pub mod tests { assert_eq!(err, vec!["Query has over 4 nodes".to_string()]); } - pub async fn test_query_default_page_limit_impl(connection_config: ConnectionConfig) { + #[tokio::test] + async fn test_query_default_page_limit() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); + let service_config = ServiceConfig { limits: Limits { default_page_size: 1, @@ -990,7 +1029,7 @@ pub mod tests { }, ..Default::default() }; - let schema = prep_schema(Some(connection_config), Some(service_config)).build_schema(); + let schema = prep_schema(db_url, Some(service_config)).build_schema(); let resp = schema .execute("{ checkpoints { nodes { sequenceNumber } } }") @@ -1027,8 +1066,12 @@ pub mod tests { ); } - pub async fn test_query_max_page_limit_impl() { - let schema = prep_schema(None, None).build_schema(); + #[tokio::test] + async fn test_query_max_page_limit() { + telemetry_subscribers::init_for_testing(); + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); + let schema = prep_schema(db_url, None).build_schema(); schema .execute("{ objects(first: 1) { nodes { version } } }") @@ -1051,8 +1094,12 @@ pub mod tests { ); } - pub async fn test_query_complexity_metrics_impl() { - let server_builder = prep_schema(None, None).context_data(PayloadSize(100)); + #[tokio::test] + async fn test_query_complexity_metrics() { + telemetry_subscribers::init_for_testing(); + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); + let server_builder = prep_schema(db_url, None).context_data(PayloadSize(100)); let metrics = server_builder.state.metrics.clone(); let schema = server_builder .extension(QueryLimitsChecker) // QueryLimitsChecker is where we actually set the metrics @@ -1086,13 +1133,14 @@ pub mod tests { assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.); } - pub async fn test_health_check_impl() { - let server_builder = prep_schema(None, None); + #[tokio::test] + pub async fn test_health_check() { + let cluster = prep_executor_cluster().await; + let url = format!( "http://{}:{}/health", - server_builder.state.connection.host, server_builder.state.connection.port + cluster.graphql_connection_config.host, cluster.graphql_connection_config.port ); - server_builder.build_schema(); let resp = reqwest::get(&url).await.unwrap(); assert_eq!(resp.status(), reqwest::StatusCode::OK); @@ -1100,17 +1148,18 @@ pub mod tests { let url_with_param = format!("{}?max_checkpoint_lag_ms=1", url); let resp = reqwest::get(&url_with_param).await.unwrap(); assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT); + cluster.cleanup_resources().await } /// Execute a GraphQL request with `limits` in place, expecting an error to be returned. /// Returns the list of errors returned. - async fn execute_for_error(limits: Limits, request: Request) -> String { + async fn execute_for_error(db_url: &str, limits: Limits, request: Request) -> String { let service_config = ServiceConfig { limits, ..Default::default() }; - let schema = prep_schema(None, Some(service_config)) + let schema = prep_schema(db_url.to_owned(), Some(service_config)) .context_data(PayloadSize( // Payload size is usually set per request, and it is the size of the raw HTTP // request, which includes the query, variables, and surrounding JSON. Simulate for @@ -1133,9 +1182,13 @@ pub mod tests { errs.join("\n") } - pub async fn test_payload_read_exceeded_impl() { + #[tokio::test] + async fn test_payload_read_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 400, max_query_payload_size: 10, @@ -1160,9 +1213,13 @@ pub mod tests { ); } - pub async fn test_payload_mutation_exceeded_impl() { + #[tokio::test] + async fn test_payload_mutation_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 400, @@ -1187,9 +1244,13 @@ pub mod tests { ); } - pub async fn test_payload_dry_run_exceeded_impl() { + #[tokio::test] + async fn test_payload_dry_run_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 400, @@ -1215,9 +1276,13 @@ pub mod tests { ); } - pub async fn test_payload_total_exceeded_impl() { + #[tokio::test] + async fn test_payload_total_exceeded_impl() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 10, @@ -1242,9 +1307,13 @@ pub mod tests { ); } - pub async fn test_payload_using_vars_mutation_exceeded_impl() { + #[tokio::test] + async fn test_payload_using_vars_mutation_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 500, @@ -1274,9 +1343,13 @@ pub mod tests { ); } - pub async fn test_payload_using_vars_read_exceeded_impl() { + #[tokio::test] + async fn test_payload_using_vars_read_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 500, max_query_payload_size: 10, @@ -1306,9 +1379,13 @@ pub mod tests { ); } - pub async fn test_payload_using_vars_dry_run_exceeded_impl() { + #[tokio::test] + async fn test_payload_using_vars_dry_run_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 400, @@ -1338,9 +1415,13 @@ pub mod tests { ); } - pub async fn test_payload_using_vars_dry_run_read_exceeded_impl() { + #[tokio::test] + async fn test_payload_using_vars_dry_run_read_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 400, max_query_payload_size: 10, @@ -1370,10 +1451,14 @@ pub mod tests { ); } - pub async fn test_payload_multiple_execution_exceeded_impl() { + #[tokio::test] + async fn test_payload_multiple_execution_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); // First check that the limit is large enough to hold one transaction's parameters (by // checking that we hit the read limit). let err = execute_for_error( + &db_url, Limits { max_tx_payload_size: 30, max_query_payload_size: 320, @@ -1395,6 +1480,7 @@ pub mod tests { assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 30, max_query_payload_size: 800, @@ -1424,10 +1510,14 @@ pub mod tests { ); } - pub async fn test_payload_multiple_dry_run_exceeded_impl() { + #[tokio::test] + async fn test_payload_multiple_dry_run_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); // First check that tx limit is large enough to hold one transaction's parameters (by // checking that we hit the read limit). let err = execute_for_error( + &db_url, Limits { max_tx_payload_size: 20, max_query_payload_size: 330, @@ -1450,6 +1540,7 @@ pub mod tests { assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 20, max_query_payload_size: 800, @@ -1481,10 +1572,14 @@ pub mod tests { ); } - pub async fn test_payload_execution_multiple_sigs_exceeded_impl() { + #[tokio::test] + async fn test_payload_execution_multiple_sigs_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); // First check that the limit is large enough to hold a transaction with a single signature // (by checking that we hite the read limit). let err = execute_for_error( + &db_url, Limits { max_tx_payload_size: 30, max_query_payload_size: 320, @@ -1507,6 +1602,7 @@ pub mod tests { assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 30, max_query_payload_size: 500, @@ -1534,11 +1630,15 @@ pub mod tests { ) } - pub async fn test_payload_sig_var_execution_exceeded_impl() { + #[tokio::test] + async fn test_payload_sig_var_execution_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); // Variables can show up in the sub-structure of a GraphQL value as well, and we need to // count those as well. assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 500, @@ -1575,7 +1675,10 @@ pub mod tests { && !err.starts_with("Transaction payload too large") } - pub async fn test_payload_reusing_vars_execution_impl() { + #[tokio::test] + async fn test_payload_reusing_vars_execution() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); // Test that when variables are re-used as execution params, the size of the variable is // only counted once. @@ -1583,6 +1686,7 @@ pub mod tests { // fail the initial payload check. assert!(!passed_tx_checks( &execute_for_error( + &db_url, Limits { max_tx_payload_size: 1, max_query_payload_size: 1, @@ -1612,6 +1716,7 @@ pub mod tests { // check. assert!(passed_tx_checks( &execute_for_error( + &db_url, limits.clone(), Request::new( r#" @@ -1635,6 +1740,7 @@ pub mod tests { // variable fails the transaction limit. assert!(!passed_tx_checks( &execute_for_error( + &db_url, limits.clone(), Request::new( r#" @@ -1658,6 +1764,7 @@ pub mod tests { // transaction payload limit again. assert!(passed_tx_checks( &execute_for_error( + &db_url, limits, Request::new( r#" @@ -1678,7 +1785,10 @@ pub mod tests { )); } - pub async fn test_payload_reusing_vars_dry_run_impl() { + #[tokio::test] + async fn test_payload_reusing_vars_dry_run() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); // Like `test_payload_reusing_vars_execution` but the variable is used in a dry-run. let limits = Limits { @@ -1690,6 +1800,7 @@ pub mod tests { // A single dry-run is under the limit. assert!(passed_tx_checks( &execute_for_error( + &db_url, limits.clone(), Request::new( r#" @@ -1713,6 +1824,7 @@ pub mod tests { // Duplicating the dry-run causes us to hit the limit. assert!(!passed_tx_checks( &execute_for_error( + &db_url, limits.clone(), Request::new( r#" @@ -1743,6 +1855,7 @@ pub mod tests { // And by re-using the variable, we are under the transaction limit again. assert!(passed_tx_checks( &execute_for_error( + &db_url, limits, Request::new( r#" @@ -1771,9 +1884,13 @@ pub mod tests { )); } - pub async fn test_payload_named_fragment_execution_exceeded_impl() { + #[tokio::test] + async fn test_payload_named_fragment_execution_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 500, @@ -1802,9 +1919,13 @@ pub mod tests { ); } - pub async fn test_payload_inline_fragment_execution_exceeded_impl() { + #[tokio::test] + async fn test_payload_inline_fragment_execution_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 500, @@ -1831,9 +1952,13 @@ pub mod tests { ); } - pub async fn test_payload_named_fragment_dry_run_exceeded_impl() { + #[tokio::test] + async fn test_payload_named_fragment_dry_run_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 500, @@ -1863,9 +1988,13 @@ pub mod tests { ); } - pub async fn test_payload_inline_fragment_dry_run_exceeded_impl() { + #[tokio::test] + async fn test_payload_inline_fragment_dry_run_exceeded() { + let cluster = prep_executor_cluster().await; + let db_url = cluster.graphql_connection_config.db_url.clone(); assert_eq!( execute_for_error( + &db_url, Limits { max_tx_payload_size: 10, max_query_payload_size: 500, diff --git a/crates/sui-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-graphql-rpc/src/test_infra/cluster.rs index 1f6e3b00e7a3d..d1d8ee85cac4c 100644 --- a/crates/sui-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-graphql-rpc/src/test_infra/cluster.rs @@ -6,6 +6,9 @@ use crate::config::ServerConfig; use crate::config::ServiceConfig; use crate::config::Version; use crate::server::graphiql_server::start_graphiql_server; +use rand::rngs::StdRng; +use rand::SeedableRng; +use simulacrum::Simulacrum; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; @@ -15,11 +18,14 @@ pub use sui_indexer::config::SnapshotLagConfig; use sui_indexer::errors::IndexerError; use sui_indexer::store::indexer_store::IndexerStore; use sui_indexer::store::PgIndexerStore; -use sui_indexer::test_utils::force_delete_database; +use sui_indexer::tempdb::get_available_port; +use sui_indexer::tempdb::TempDb; use sui_indexer::test_utils::start_test_indexer_impl; use sui_indexer::test_utils::ReaderWriterConfig; use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT}; use sui_types::storage::RestStateReader; +use tempfile::tempdir; +use tempfile::TempDir; use test_cluster::TestCluster; use test_cluster::TestClusterBuilder; use tokio::join; @@ -33,8 +39,6 @@ const EPOCH_DURATION_MS: u64 = 15000; const ACCOUNT_NUM: usize = 20; const GAS_OBJECT_COUNT: usize = 3; -pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000; - pub struct ExecutorCluster { pub executor_server_handle: JoinHandle<()>, pub indexer_store: PgIndexerStore, @@ -44,6 +48,9 @@ pub struct ExecutorCluster { pub snapshot_config: SnapshotLagConfig, pub graphql_connection_config: ConnectionConfig, pub cancellation_token: CancellationToken, + #[allow(unused)] + database: TempDb, + tempdir: Option, } pub struct Cluster { @@ -57,38 +64,36 @@ pub struct NetworkCluster { pub indexer_store: PgIndexerStore, pub indexer_join_handle: JoinHandle>, pub cancellation_token: CancellationToken, + #[allow(unused)] + data_ingestion_path: TempDir, + #[allow(unused)] + database: TempDb, + pub graphql_connection_config: ConnectionConfig, } /// Starts a validator, fullnode, indexer, and graphql service for testing. -pub async fn start_cluster( - graphql_connection_config: ConnectionConfig, - internal_data_source_rpc_port: Option, - service_config: ServiceConfig, -) -> Cluster { - let network_cluster = start_network_cluster( - graphql_connection_config.clone(), - internal_data_source_rpc_port, - ) - .await; +pub async fn start_cluster(service_config: ServiceConfig) -> Cluster { + let network_cluster = start_network_cluster().await; + let graphql_connection_config = network_cluster.graphql_connection_config.clone(); let fn_rpc_url: String = network_cluster .validator_fullnode_handle .rpc_url() .to_string(); + let server_url = format!( + "http://{}:{}/", + graphql_connection_config.host, graphql_connection_config.port + ); + let graphql_server_handle = start_graphql_server_with_fn_rpc( - graphql_connection_config.clone(), + graphql_connection_config, Some(fn_rpc_url), Some(network_cluster.cancellation_token.clone()), service_config, ) .await; - let server_url = format!( - "http://{}:{}/", - graphql_connection_config.host, graphql_connection_config.port - ); - // Starts graphql client let client = SimpleClient::new(server_url); wait_for_graphql_server(&client).await; @@ -103,26 +108,30 @@ pub async fn start_cluster( /// Starts a validator, fullnode, indexer (using data ingestion). Re-using GraphQL's ConnectionConfig for convenience. /// This does not start any GraphQL services, only the network cluster. You can start a GraphQL service /// calling `start_graphql_server`. -pub async fn start_network_cluster( - graphql_connection_config: ConnectionConfig, - internal_data_source_rpc_port: Option, -) -> NetworkCluster { - let data_ingestion_path = tempfile::tempdir().unwrap().into_path(); +pub async fn start_network_cluster() -> NetworkCluster { + let database = TempDb::new().unwrap(); + let graphql_connection_config = ConnectionConfig { + port: get_available_port(), + host: "127.0.0.1".to_owned(), + db_url: database.database().url().as_str().to_owned(), + db_pool_size: 5, + prom_url: "127.0.0.1".to_owned(), + prom_port: get_available_port(), + }; + let data_ingestion_path = tempfile::tempdir().unwrap(); let db_url = graphql_connection_config.db_url.clone(); let cancellation_token = CancellationToken::new(); // Starts validator+fullnode - let val_fn = - start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone()) - .await; + let val_fn = start_validator_with_fullnode(data_ingestion_path.path().to_path_buf()).await; // Starts indexer let (pg_store, pg_handle) = start_test_indexer_impl( Some(db_url), val_fn.rpc_url().to_string(), ReaderWriterConfig::writer_mode(None, None), - /* reset_database */ true, - Some(data_ingestion_path), + /* reset_database */ false, + Some(data_ingestion_path.path().to_path_buf()), cancellation_token.clone(), ) .await; @@ -132,25 +141,35 @@ pub async fn start_network_cluster( indexer_store: pg_store, indexer_join_handle: pg_handle, cancellation_token, + data_ingestion_path, + database, + graphql_connection_config, } } /// Takes in a simulated instantiation of a Sui blockchain and builds a cluster around it. This /// cluster is typically used in e2e tests to emulate and test behaviors. pub async fn serve_executor( - graphql_connection_config: ConnectionConfig, - internal_data_source_rpc_port: u16, executor: Arc, snapshot_config: Option, epochs_to_keep: Option, data_ingestion_path: PathBuf, ) -> ExecutorCluster { + let database = TempDb::new().unwrap(); + let graphql_connection_config = ConnectionConfig { + port: get_available_port(), + host: "127.0.0.1".to_owned(), + db_url: database.database().url().as_str().to_owned(), + db_pool_size: 5, + prom_url: "127.0.0.1".to_owned(), + prom_port: get_available_port(), + }; let db_url = graphql_connection_config.db_url.clone(); // Creates a cancellation token and adds this to the ExecutorCluster, so that we can send a // cancellation token on cleanup let cancellation_token = CancellationToken::new(); - let executor_server_url: SocketAddr = format!("127.0.0.1:{}", internal_data_source_rpc_port) + let executor_server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port()) .parse() .unwrap(); @@ -164,7 +183,7 @@ pub async fn serve_executor( Some(db_url), format!("http://{}", executor_server_url), ReaderWriterConfig::writer_mode(snapshot_config.clone(), epochs_to_keep), - /* reset_database */ true, + /* reset_database */ false, Some(data_ingestion_path), cancellation_token.clone(), ) @@ -196,9 +215,45 @@ pub async fn serve_executor( snapshot_config: snapshot_config.unwrap_or_default(), graphql_connection_config, cancellation_token, + database, + tempdir: None, } } +pub async fn prep_executor_cluster() -> ExecutorCluster { + let rng = StdRng::from_seed([12; 32]); + let data_ingestion_path = tempdir().unwrap(); + let mut sim = Simulacrum::new_with_rng(rng); + sim.set_data_ingestion_path(data_ingestion_path.path().to_path_buf()); + + sim.create_checkpoint(); + sim.create_checkpoint(); + sim.create_checkpoint(); + sim.advance_epoch(true); + sim.create_checkpoint(); + sim.advance_clock( + std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap(), + ); + sim.create_checkpoint(); + + let mut cluster = serve_executor( + Arc::new(sim), + None, + None, + data_ingestion_path.path().to_path_buf(), + ) + .await; + + cluster + .wait_for_checkpoint_catchup(6, Duration::from_secs(10)) + .await; + + cluster.tempdir = Some(data_ingestion_path); + cluster +} + pub async fn start_graphql_server( graphql_connection_config: ConnectionConfig, cancellation_token: CancellationToken, @@ -237,11 +292,8 @@ pub async fn start_graphql_server_with_fn_rpc( }) } -async fn start_validator_with_fullnode( - internal_data_source_rpc_port: Option, - data_ingestion_dir: PathBuf, -) -> TestCluster { - let mut test_cluster_builder = TestClusterBuilder::new() +async fn start_validator_with_fullnode(data_ingestion_dir: PathBuf) -> TestCluster { + TestClusterBuilder::new() .with_num_validators(VALIDATOR_COUNT) .with_epoch_duration_ms(EPOCH_DURATION_MS) .with_data_ingestion_dir(data_ingestion_dir) @@ -251,13 +303,9 @@ async fn start_validator_with_fullnode( gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT], }; ACCOUNT_NUM - ]); - - if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port { - test_cluster_builder = - test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port); - }; - test_cluster_builder.build().await + ]) + .build() + .await } /// Repeatedly ping the GraphQL server for 10s, until it responds @@ -440,8 +488,6 @@ impl ExecutorCluster { pub async fn cleanup_resources(self) { self.cancellation_token.cancel(); let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle); - let db_url = self.graphql_connection_config.db_url.clone(); - force_delete_database(db_url).await; } pub async fn force_objects_snapshot_catchup(&self, start_cp: u64, end_cp: u64) { diff --git a/crates/sui-graphql-rpc/tests/dot_move_e2e.rs b/crates/sui-graphql-rpc/tests/dot_move_e2e.rs index 76e034caef3a4..16ecd05393724 100644 --- a/crates/sui-graphql-rpc/tests/dot_move_e2e.rs +++ b/crates/sui-graphql-rpc/tests/dot_move_e2e.rs @@ -13,6 +13,7 @@ mod tests { }, }; use sui_graphql_rpc_client::simple_client::SimpleClient; + use sui_indexer::tempdb::get_available_port; use sui_json_rpc_types::ObjectChange; use sui_move_build::BuildConfig; use sui_types::{ @@ -29,7 +30,6 @@ mod tests { const DEMO_PKG_V2: &str = "tests/dot_move/demo_v2/"; const DEMO_PKG_V3: &str = "tests/dot_move/demo_v3/"; - const DB_NAME: &str = "sui_graphql_rpc_e2e_tests"; const DEMO_TYPE: &str = "::demo::V1Type"; const DEMO_TYPE_V2: &str = "::demo::V2Type"; const DEMO_TYPE_V3: &str = "::demo::V3Type"; @@ -39,8 +39,7 @@ mod tests { #[tokio::test] async fn test_dot_move_e2e() { - let network_cluster = - start_network_cluster(gql_default_config(DB_NAME, 8000, 9184), None).await; + let network_cluster = start_network_cluster().await; let external_network_chain_id = network_cluster .validator_fullnode_handle @@ -86,8 +85,7 @@ mod tests { // The first cluster uses internal resolution (mimics our base network, does not rely on external chain). let internal_client = init_dot_move_gql( - 8000, - 9184, + network_cluster.graphql_connection_config.clone(), ServiceConfig::dot_move_test_defaults( false, None, @@ -99,8 +97,11 @@ mod tests { .await; let external_client = init_dot_move_gql( - 8001, - 9185, + ConnectionConfig { + port: get_available_port(), + prom_port: get_available_port(), + ..network_cluster.graphql_connection_config.clone() + }, ServiceConfig::dot_move_test_defaults( true, // external resolution Some(internal_client.url()), @@ -226,15 +227,17 @@ mod tests { } async fn init_dot_move_gql( - gql_port: u16, - prom_port: u16, + connection_config: ConnectionConfig, config: ServiceConfig, ) -> SimpleClient { - let cfg = gql_default_config(DB_NAME, gql_port, prom_port); + let _gql_handle = + start_graphql_server_with_fn_rpc(connection_config.clone(), None, None, config).await; - let _gql_handle = start_graphql_server_with_fn_rpc(cfg.clone(), None, None, config).await; - - let server_url = format!("http://{}:{}/", cfg.host(), cfg.port()); + let server_url = format!( + "http://{}:{}/", + connection_config.host(), + connection_config.port() + ); // Starts graphql client let client = SimpleClient::new(server_url); @@ -506,8 +509,4 @@ mod tests { fn type_query(named_type: &str) -> String { format!(r#"typeByName(name: "{}") {{ layout }}"#, named_type) } - - fn gql_default_config(db_name: &str, port: u16, prom_port: u16) -> ConnectionConfig { - ConnectionConfig::ci_integration_test_cfg_with_db_name(db_name.to_string(), port, prom_port) - } } diff --git a/crates/sui-graphql-rpc/tests/e2e_tests.rs b/crates/sui-graphql-rpc/tests/e2e_tests.rs index cf2f2263453e5..ed7bbb04d0863 100644 --- a/crates/sui-graphql-rpc/tests/e2e_tests.rs +++ b/crates/sui-graphql-rpc/tests/e2e_tests.rs @@ -7,18 +7,15 @@ mod tests { use rand::rngs::StdRng; use rand::SeedableRng; use serde_json::json; - use serial_test::serial; use simulacrum::Simulacrum; use std::sync::Arc; use std::time::Duration; use sui_graphql_rpc::client::simple_client::GraphqlQueryVariable; use sui_graphql_rpc::client::ClientError; - use sui_graphql_rpc::config::ConnectionConfig; use sui_graphql_rpc::config::Limits; use sui_graphql_rpc::config::ServiceConfig; + use sui_graphql_rpc::test_infra::cluster::prep_executor_cluster; use sui_graphql_rpc::test_infra::cluster::start_cluster; - use sui_graphql_rpc::test_infra::cluster::ExecutorCluster; - use sui_graphql_rpc::test_infra::cluster::DEFAULT_INTERNAL_DATA_SOURCE_PORT; use sui_types::digests::ChainIdentifier; use sui_types::gas_coin::GAS; use sui_types::transaction::CallArg; @@ -30,47 +27,11 @@ mod tests { use tempfile::tempdir; use tokio::time::sleep; - async fn prep_executor_cluster() -> (ConnectionConfig, ExecutorCluster) { - let rng = StdRng::from_seed([12; 32]); - let data_ingestion_path = tempdir().unwrap().into_path(); - let mut sim = Simulacrum::new_with_rng(rng); - sim.set_data_ingestion_path(data_ingestion_path.clone()); - - sim.create_checkpoint(); - sim.create_checkpoint(); - - let connection_config = ConnectionConfig::ci_integration_test_cfg(); - - let cluster = sui_graphql_rpc::test_infra::cluster::serve_executor( - connection_config.clone(), - DEFAULT_INTERNAL_DATA_SOURCE_PORT, - Arc::new(sim), - None, - None, - data_ingestion_path, - ) - .await; - - cluster - .wait_for_checkpoint_catchup(1, Duration::from_secs(10)) - .await; - - (connection_config, cluster) - } - #[tokio::test] - #[serial] async fn test_simple_client_validator_cluster() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); + telemetry_subscribers::init_for_testing(); - let cluster = start_cluster( - ConnectionConfig::default(), - None, - ServiceConfig::test_defaults(), - ) - .await; + let cluster = start_cluster(ServiceConfig::test_defaults()).await; cluster .wait_for_checkpoint_catchup(1, Duration::from_secs(10)) @@ -105,12 +66,11 @@ mod tests { } #[tokio::test] - #[serial] async fn test_simple_client_simulator_cluster() { let rng = StdRng::from_seed([12; 32]); let mut sim = Simulacrum::new_with_rng(rng); - let data_ingestion_path = tempdir().unwrap().into_path(); - sim.set_data_ingestion_path(data_ingestion_path.clone()); + let data_ingestion_path = tempdir().unwrap(); + sim.set_data_ingestion_path(data_ingestion_path.path().to_path_buf()); sim.create_checkpoint(); sim.create_checkpoint(); @@ -127,12 +87,10 @@ mod tests { chain_id_actual ); let cluster = sui_graphql_rpc::test_infra::cluster::serve_executor( - ConnectionConfig::default(), - DEFAULT_INTERNAL_DATA_SOURCE_PORT, Arc::new(sim), None, None, - data_ingestion_path, + data_ingestion_path.path().to_path_buf(), ) .await; cluster @@ -154,9 +112,8 @@ mod tests { } #[tokio::test] - #[serial] async fn test_graphql_client_response() { - let (_, cluster) = prep_executor_cluster().await; + let cluster = prep_executor_cluster().await; let query = r#" { @@ -183,9 +140,8 @@ mod tests { } #[tokio::test] - #[serial] async fn test_graphql_client_variables() { - let (_, cluster) = prep_executor_cluster().await; + let cluster = prep_executor_cluster().await; let query = r#"{obj1: object(address: $framework_addr) {address} obj2: object(address: $deepbook_addr) {address}}"#; @@ -323,15 +279,10 @@ mod tests { } #[tokio::test] - #[serial] async fn test_transaction_execution() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); + telemetry_subscribers::init_for_testing(); - let connection_config = ConnectionConfig::ci_integration_test_cfg(); - - let cluster = start_cluster(connection_config, None, ServiceConfig::test_defaults()).await; + let cluster = start_cluster(ServiceConfig::test_defaults()).await; let addresses = cluster .network @@ -431,7 +382,6 @@ mod tests { } #[tokio::test] - #[serial] async fn test_zklogin_sig_verify() { use shared_crypto::intent::Intent; use shared_crypto::intent::IntentMessage; @@ -442,16 +392,9 @@ mod tests { use sui_types::utils::load_test_vectors; use sui_types::zk_login_authenticator::ZkLoginAuthenticator; - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); + telemetry_subscribers::init_for_testing(); - let cluster = start_cluster( - ConnectionConfig::default(), - None, - ServiceConfig::test_defaults(), - ) - .await; + let cluster = start_cluster(ServiceConfig::test_defaults()).await; let test_cluster = &cluster.network.validator_fullnode_handle; test_cluster.wait_for_epoch_all_nodes(1).await; @@ -557,18 +500,10 @@ mod tests { // TODO: add more test cases for transaction execution/dry run in transactional test runner. #[tokio::test] - #[serial] async fn test_transaction_dry_run() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); + telemetry_subscribers::init_for_testing(); - let cluster = start_cluster( - ConnectionConfig::default(), - None, - ServiceConfig::test_defaults(), - ) - .await; + let cluster = start_cluster(ServiceConfig::test_defaults()).await; let addresses = cluster .network @@ -659,18 +594,10 @@ mod tests { // Test dry run where the transaction kind is provided instead of the full transaction. #[tokio::test] - #[serial] async fn test_transaction_dry_run_with_kind() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); + telemetry_subscribers::init_for_testing(); - let cluster = start_cluster( - ConnectionConfig::default(), - None, - ServiceConfig::test_defaults(), - ) - .await; + let cluster = start_cluster(ServiceConfig::test_defaults()).await; let addresses = cluster .network @@ -738,18 +665,10 @@ mod tests { // Test that we can handle dry run with failures at execution stage too. #[tokio::test] - #[serial] async fn test_dry_run_failed_execution() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); + telemetry_subscribers::init_for_testing(); - let cluster = start_cluster( - ConnectionConfig::default(), - None, - ServiceConfig::test_defaults(), - ) - .await; + let cluster = start_cluster(ServiceConfig::test_defaults()).await; let addresses = cluster .network @@ -836,18 +755,10 @@ mod tests { } #[tokio::test] - #[serial] async fn test_epoch_data() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); + telemetry_subscribers::init_for_testing(); - let cluster = start_cluster( - ConnectionConfig::default(), - None, - ServiceConfig::test_defaults(), - ) - .await; + let cluster = start_cluster(ServiceConfig::test_defaults()).await; cluster .network @@ -885,210 +796,17 @@ mod tests { cluster.cleanup_resources().await } - use sui_graphql_rpc::server::builder::tests::*; - - #[tokio::test] - #[serial] - async fn test_timeout() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); - let cluster = start_cluster( - ConnectionConfig::default(), - None, - ServiceConfig::test_defaults(), - ) - .await; - cluster - .wait_for_checkpoint_catchup(1, Duration::from_secs(10)) - .await; - // timeout test includes mutation timeout, which requies a [SuiClient] to be able to run - // the test, and a transaction. [WalletContext] gives access to everything that's needed. - let wallet = &cluster.network.validator_fullnode_handle.wallet; - test_timeout_impl(wallet).await; - cluster.cleanup_resources().await - } - - #[tokio::test] - #[serial] - async fn test_query_depth_limit() { - test_query_depth_limit_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_query_node_limit() { - test_query_node_limit_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_query_default_page_limit() { - let (connection_config, _) = prep_executor_cluster().await; - test_query_default_page_limit_impl(connection_config).await; - } - - #[tokio::test] - #[serial] - async fn test_query_max_page_limit() { - test_query_max_page_limit_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_query_complexity_metrics() { - test_query_complexity_metrics_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_health_check() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); - let connection_config = ConnectionConfig::ci_integration_test_cfg(); - let cluster = sui_graphql_rpc::test_infra::cluster::start_cluster( - connection_config, - None, - ServiceConfig::test_defaults(), - ) - .await; - - // We wait until checkpoint 1 is indexed, to give enough time to the - // watermark task to pick up a valid checkpoint timestamp. - cluster - .wait_for_checkpoint_catchup(1, Duration::from_secs(10)) - .await; - test_health_check_impl().await; - cluster.cleanup_resources().await - } - - #[tokio::test] - #[serial] - async fn test_payload_total_exceeded() { - test_payload_total_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_read_exceeded() { - test_payload_read_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_mutation_exceeded() { - test_payload_mutation_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_dry_run_exceeded() { - test_payload_dry_run_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_using_vars_mutation_exceeded() { - test_payload_using_vars_mutation_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_using_vars_read_exceeded() { - test_payload_using_vars_read_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_using_vars_dry_run_read_exceeded() { - test_payload_using_vars_dry_run_read_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_using_vars_dry_run_exceeded() { - test_payload_using_vars_dry_run_exceeded_impl().await; - } - #[tokio::test] - #[serial] - async fn test_payload_multiple_execution_exceeded() { - test_payload_multiple_execution_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_multiple_dry_run_exceeded() { - test_payload_multiple_dry_run_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_execution_multiple_sigs_exceeded() { - test_payload_execution_multiple_sigs_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_sig_var_execution_exceeded() { - test_payload_sig_var_execution_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_reusing_vars_execution() { - test_payload_reusing_vars_execution_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_reusing_vars_dry_run() { - test_payload_reusing_vars_dry_run_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_named_fragment_execution_exceeded() { - test_payload_named_fragment_execution_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_inline_fragment_execution_exceeded() { - test_payload_inline_fragment_execution_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_named_fragment_dry_run_exceeded() { - test_payload_named_fragment_dry_run_exceeded_impl().await; - } - - #[tokio::test] - #[serial] - async fn test_payload_inline_fragment_dry_run_exceeded() { - test_payload_inline_fragment_dry_run_exceeded_impl().await; - } - - #[tokio::test] - #[serial] async fn test_payload_using_vars_mutation_passes() { - let _guard = telemetry_subscribers::TelemetryConfig::new() - .with_env() - .init(); - let cluster = sui_graphql_rpc::test_infra::cluster::start_cluster( - ConnectionConfig::ci_integration_test_cfg(), - None, - ServiceConfig { - limits: Limits { - max_query_payload_size: 5000, - max_tx_payload_size: 6000, - ..Default::default() - }, - ..ServiceConfig::test_defaults() + telemetry_subscribers::init_for_testing(); + let cluster = sui_graphql_rpc::test_infra::cluster::start_cluster(ServiceConfig { + limits: Limits { + max_query_payload_size: 5000, + max_tx_payload_size: 6000, + ..Default::default() }, - ) + ..ServiceConfig::test_defaults() + }) .await; let addresses = cluster .network diff --git a/crates/sui-graphql-rpc/tests/examples_validation_tests.rs b/crates/sui-graphql-rpc/tests/examples_validation_tests.rs index 4ad8e9bb752d9..e0599c69405b2 100644 --- a/crates/sui-graphql-rpc/tests/examples_validation_tests.rs +++ b/crates/sui-graphql-rpc/tests/examples_validation_tests.rs @@ -4,19 +4,12 @@ #[cfg(feature = "pg_integration")] mod tests { use anyhow::{anyhow, Context, Result}; - use rand::rngs::StdRng; - use rand::SeedableRng; - use serial_test::serial; - use simulacrum::Simulacrum; use std::cmp::max; use std::collections::BTreeMap; use std::fs; use std::path::PathBuf; - use std::sync::Arc; - use sui_graphql_rpc::config::{ConnectionConfig, Limits}; - use sui_graphql_rpc::test_infra::cluster::ExecutorCluster; - use sui_graphql_rpc::test_infra::cluster::DEFAULT_INTERNAL_DATA_SOURCE_PORT; - use tempfile::tempdir; + use sui_graphql_rpc::config::Limits; + use sui_graphql_rpc::test_infra::cluster::{prep_executor_cluster, ExecutorCluster}; struct Example { contents: String, @@ -142,26 +135,10 @@ mod tests { } #[tokio::test] - #[serial] async fn good_examples_within_limits() { - let rng = StdRng::from_seed([12; 32]); - let data_ingestion_path = tempdir().unwrap().into_path(); - let mut sim = Simulacrum::new_with_rng(rng); + let cluster = prep_executor_cluster().await; let (mut max_nodes, mut max_output_nodes, mut max_depth, mut max_payload) = (0, 0, 0, 0); - sim.set_data_ingestion_path(data_ingestion_path.clone()); - sim.create_checkpoint(); - - let cluster = sui_graphql_rpc::test_infra::cluster::serve_executor( - ConnectionConfig::default(), - DEFAULT_INTERNAL_DATA_SOURCE_PORT, - Arc::new(sim), - None, - None, - data_ingestion_path, - ) - .await; - let mut errors = vec![]; for (name, example) in good_examples().expect("Could not load examples") { errors.extend( @@ -209,25 +186,9 @@ mod tests { } #[tokio::test] - #[serial] async fn bad_examples_fail() { - let rng = StdRng::from_seed([12; 32]); - let data_ingestion_path = tempdir().unwrap().into_path(); - let mut sim = Simulacrum::new_with_rng(rng); + let cluster = prep_executor_cluster().await; let (mut max_nodes, mut max_output_nodes, mut max_depth, mut max_payload) = (0, 0, 0, 0); - sim.set_data_ingestion_path(data_ingestion_path.clone()); - - sim.create_checkpoint(); - - let cluster = sui_graphql_rpc::test_infra::cluster::serve_executor( - ConnectionConfig::default(), - DEFAULT_INTERNAL_DATA_SOURCE_PORT, - Arc::new(sim), - None, - None, - data_ingestion_path, - ) - .await; for (name, example) in bad_examples() { let errors = test_query( diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 66a1f842d8f4b..b206ca5f48748 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -17,7 +17,8 @@ serde_with.workspace = true clap = { workspace = true, features = ["env"] } tap.workspace = true diesel = { workspace = true, features = ["postgres", "chrono", "r2d2", "serde_json"] } -diesel-derive-enum = { workspace = true, features = ["postgres"] } +diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] } +bb8 = "0.8.5" futures.workspace = true itertools.workspace = true jsonrpsee.workspace = true diff --git a/crates/sui-indexer/src/database.rs b/crates/sui-indexer/src/database.rs new file mode 100644 index 0000000000000..f872f25369e9b --- /dev/null +++ b/crates/sui-indexer/src/database.rs @@ -0,0 +1,160 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use diesel::prelude::ConnectionError; +use diesel_async::pooled_connection::bb8::Pool; +use diesel_async::pooled_connection::bb8::PooledConnection; +use diesel_async::pooled_connection::bb8::RunError; +use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use diesel_async::pooled_connection::PoolError; +use diesel_async::RunQueryDsl; +use diesel_async::{AsyncConnection, AsyncPgConnection}; +use futures::FutureExt; +use url::Url; + +use crate::db::ConnectionConfig; +use crate::db::ConnectionPoolConfig; + +pub struct ConnectionPool { + database_url: Arc, + pool: Pool, +} + +impl ConnectionPool { + pub async fn new(database_url: Url, config: ConnectionPoolConfig) -> Result { + let database_url = Arc::new(database_url); + let connection_config = config.connection_config(); + let mut manager_config = diesel_async::pooled_connection::ManagerConfig::default(); + manager_config.custom_setup = + Box::new(move |url| establish_connection(url, connection_config).boxed()); + let manager = + AsyncDieselConnectionManager::new_with_config(database_url.as_str(), manager_config); + + Pool::builder() + .max_size(config.pool_size) + .connection_timeout(config.connection_timeout) + .build(manager) + .await + .map(|pool| Self { database_url, pool }) + } + + /// Retrieves a connection from the pool. + pub async fn get(&self) -> Result, RunError> { + self.pool.get().await.map(Connection::PooledConnection) + } + + /// Get a new dedicated connection that will not be managed by the pool. + /// An application may want a persistent connection (e.g. to do a + /// postgres LISTEN) that will not be closed or repurposed by the pool. + /// + /// This method allows reusing the manager's configuration but otherwise + /// bypassing the pool + pub async fn dedicated_connection(&self) -> Result, PoolError> { + self.pool + .dedicated_connection() + .await + .map(Connection::Dedicated) + } + + /// Returns information about the current state of the pool. + pub fn state(&self) -> bb8::State { + self.pool.state() + } + + /// Returns the database url that this pool is configured with + pub fn url(&self) -> &Url { + &self.database_url + } +} + +pub enum Connection<'a> { + PooledConnection(PooledConnection<'a, AsyncPgConnection>), + Dedicated(AsyncPgConnection), +} + +impl Connection<'static> { + pub async fn dedicated(database_url: &Url) -> Result { + AsyncPgConnection::establish(database_url.as_str()) + .await + .map(Connection::Dedicated) + } + + /// Run the provided Migrations + pub async fn run_migrations( + self, + migrations: M, + ) -> diesel::migration::Result>> + where + M: diesel::migration::MigrationSource + Send + 'static, + { + use diesel::migration::MigrationVersion; + use diesel_migrations::MigrationHarness; + + let mut connection = + diesel_async::async_connection_wrapper::AsyncConnectionWrapper::::from(self); + + tokio::task::spawn_blocking(move || { + connection + .run_pending_migrations(migrations) + .map(|versions| versions.iter().map(MigrationVersion::as_owned).collect()) + }) + .await + .unwrap() + } +} + +impl<'a> std::ops::Deref for Connection<'a> { + type Target = AsyncPgConnection; + + fn deref(&self) -> &Self::Target { + match self { + Connection::PooledConnection(pooled) => pooled.deref(), + Connection::Dedicated(dedicated) => dedicated, + } + } +} + +impl<'a> std::ops::DerefMut for Connection<'a> { + fn deref_mut(&mut self) -> &mut AsyncPgConnection { + match self { + Connection::PooledConnection(pooled) => pooled.deref_mut(), + Connection::Dedicated(dedicated) => dedicated, + } + } +} + +impl ConnectionConfig { + async fn apply(&self, connection: &mut AsyncPgConnection) -> Result<(), diesel::result::Error> { + diesel::sql_query(format!( + "SET statement_timeout = {}", + self.statement_timeout.as_millis(), + )) + .execute(connection) + .await?; + + if self.read_only { + diesel::sql_query("SET default_transaction_read_only = 'on'") + .execute(connection) + .await?; + } + + Ok(()) + } +} + +/// Function used by the Connection Pool Manager to establish and setup new connections +async fn establish_connection( + url: &str, + config: ConnectionConfig, +) -> Result { + let mut connection = AsyncPgConnection::establish(url).await?; + + config + .apply(&mut connection) + .await + .map_err(ConnectionError::CouldntSetupConfiguration)?; + + Ok(connection) +} diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs index 8f38a66023c4f..9f7798df678a5 100644 --- a/crates/sui-indexer/src/db.rs +++ b/crates/sui-indexer/src/db.rs @@ -36,7 +36,7 @@ impl ConnectionPoolConfig { const DEFAULT_CONNECTION_TIMEOUT: u64 = 30; const DEFAULT_STATEMENT_TIMEOUT: u64 = 3600; - fn connection_config(&self) -> ConnectionConfig { + pub(crate) fn connection_config(&self) -> ConnectionConfig { ConnectionConfig { statement_timeout: self.statement_timeout, read_only: false, diff --git a/crates/sui-indexer/src/lib.rs b/crates/sui-indexer/src/lib.rs index a9d39609049b8..6de984c53022c 100644 --- a/crates/sui-indexer/src/lib.rs +++ b/crates/sui-indexer/src/lib.rs @@ -26,6 +26,7 @@ use crate::indexer_reader::IndexerReader; use errors::IndexerError; pub mod apis; +pub mod database; pub mod db; pub mod errors; pub mod handlers; @@ -36,6 +37,7 @@ pub mod models; pub mod schema; pub mod store; pub mod system_package_task; +pub mod tempdb; pub mod test_utils; pub mod types; diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 2fd2c1531b76a..b7edb28a0fbd1 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -1,7 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::any::Any; use std::collections::BTreeMap; use async_trait::async_trait; @@ -21,7 +20,7 @@ pub enum ObjectChangeToCommit { } #[async_trait] -pub trait IndexerStore: Any + Clone + Sync + Send + 'static { +pub trait IndexerStore: Clone + Sync + Send + 'static { async fn get_latest_checkpoint_sequence_number(&self) -> Result, IndexerError>; async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError>; @@ -94,6 +93,4 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static { &self, epoch: u64, ) -> Result; - - fn as_any(&self) -> &dyn Any; } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 3a0a75a6975cc..9b2af81873db2 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -1,7 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::any::Any as StdAny; use std::collections::hash_map::Entry; use std::collections::BTreeMap; use std::collections::HashMap; @@ -2175,10 +2174,6 @@ impl IndexerStore for PgIndexerStore { .await } - fn as_any(&self) -> &dyn StdAny { - self - } - /// Persist protocol configs and feature flags until the protocol version for the latest epoch /// we have stored in the db, inclusive. fn persist_protocol_configs_and_feature_flags( diff --git a/crates/sui-indexer/src/tempdb.rs b/crates/sui-indexer/src/tempdb.rs new file mode 100644 index 0000000000000..c7c025cef1b82 --- /dev/null +++ b/crates/sui-indexer/src/tempdb.rs @@ -0,0 +1,340 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use anyhow::Context; +use anyhow::Result; +use std::fs::OpenOptions; +use std::{ + path::{Path, PathBuf}, + process::{Child, Command}, + time::{Duration, Instant}, +}; +use tracing::trace; +use url::Url; + +/// A temporary, local postgres database +pub struct TempDb { + database: LocalDatabase, + + // Directory used for the ephemeral database. + // + // On drop the directory will be cleaned an its contents deleted. + // + // NOTE: This needs to be the last entry in this struct so that the database is dropped before + // and has a chance to gracefully shutdown before the directory is deleted. + dir: tempfile::TempDir, +} + +impl TempDb { + /// Create and start a new temporary postgres database. + /// + /// A fresh database will be initialized in a temporary directory that will be cleandup on drop. + /// The running `postgres` service will be serving traffic on an available, os-assigned port. + pub fn new() -> Result { + let dir = tempfile::TempDir::new()?; + let port = get_available_port(); + + let database = LocalDatabase::new_initdb(dir.path().to_owned(), port)?; + + Ok(Self { dir, database }) + } + + pub fn database(&self) -> &LocalDatabase { + &self.database + } + + pub fn database_mut(&mut self) -> &mut LocalDatabase { + &mut self.database + } + + pub fn dir(&self) -> &Path { + self.dir.path() + } +} + +#[derive(Debug)] +struct PostgresProcess { + dir: PathBuf, + inner: Child, +} + +impl PostgresProcess { + fn start(dir: PathBuf, port: u16) -> Result { + let child = Command::new("postgres") + // Set the data directory to use + .arg("-D") + .arg(&dir) + // Set the port to listen for incoming connections + .args(["-p", &port.to_string()]) + // Disable creating and listening on a UDS + .args(["-c", "unix_socket_directories="]) + // pipe stdout and stderr to files located in the data directory + .stdout( + OpenOptions::new() + .create(true) + .append(true) + .open(dir.join("stdout"))?, + ) + .stderr( + OpenOptions::new() + .create(true) + .append(true) + .open(dir.join("stderr"))?, + ) + .spawn() + .context("command not found: postgres")?; + + Ok(Self { dir, inner: child }) + } + + // https://www.postgresql.org/docs/16/app-pg-ctl.html + fn pg_ctl_stop(&mut self) -> Result<()> { + let output = Command::new("pg_ctl") + .arg("stop") + .arg("-D") + .arg(&self.dir) + .arg("-mfast") + .output() + .context("command not found: pg_ctl")?; + + if output.status.success() { + Ok(()) + } else { + Err(anyhow!("couldn't shut down postgres")) + } + } + + fn dump_stdout_stderr(&self) -> Result<(String, String)> { + let stdout = std::fs::read_to_string(self.dir.join("stdout"))?; + let stderr = std::fs::read_to_string(self.dir.join("stderr"))?; + + Ok((stdout, stderr)) + } +} + +impl Drop for PostgresProcess { + // When the Process struct goes out of scope we need to kill the child process + fn drop(&mut self) { + tracing::error!("dropping postgres"); + // check if the process has already been terminated + match self.inner.try_wait() { + // The child process has already terminated, perhaps due to a crash + Ok(Some(_)) => {} + + // The process is still running so we need to attempt to kill it + _ => { + if self.pg_ctl_stop().is_err() { + // Couldn't gracefully stop server so we'll just kill it + self.inner.kill().expect("postgres couldn't be killed"); + } + self.inner.wait().unwrap(); + } + } + + // Dump the contents of stdout/stderr if TRACE is enabled + if tracing::event_enabled!(tracing::Level::TRACE) { + if let Ok((stdout, stderr)) = self.dump_stdout_stderr() { + trace!("stdout: {stdout}"); + trace!("stderr: {stderr}"); + } + } + } +} + +/// Local instance of a `postgres` server. +/// +/// See for more info. +pub struct LocalDatabase { + dir: PathBuf, + port: u16, + url: Url, + process: Option, +} + +impl LocalDatabase { + /// Start a local `postgres` database service. + /// + /// `dir`: The location of the on-disk postgres database. The database must already exist at + /// the provided path. If you instead want to create a new database see `Self::new_initdb`. + /// + /// `port`: The port to listen for incoming connection on. + pub fn new(dir: PathBuf, port: u16) -> Result { + let url = format!( + "postgres://postgres:postgrespw@localhost:{port}/{db_name}", + db_name = "postgres" + ) + .parse() + .unwrap(); + let mut db = Self { + dir, + port, + url, + process: None, + }; + db.start()?; + Ok(db) + } + + /// Initialize and start a local `postgres` database service. + /// + /// Unlike `Self::new`, this will initialize a clean database at the provided path. + pub fn new_initdb(dir: PathBuf, port: u16) -> Result { + initdb(&dir)?; + Self::new(dir, port) + } + + /// Return the url used to connect to the database + pub fn url(&self) -> &Url { + &self.url + } + + fn start(&mut self) -> Result<()> { + if self.process.is_none() { + self.process = Some(PostgresProcess::start(self.dir.clone(), self.port)?); + self.wait_till_ready() + .map_err(|e| anyhow!("unable to start postgres: {e:?}"))?; + } + + Ok(()) + } + + fn health_check(&mut self) -> Result<(), HealthCheckError> { + if let Some(p) = &mut self.process { + match p.inner.try_wait() { + // This would mean the child process has crashed + Ok(Some(_)) => Err(HealthCheckError::NotRunning), + + // This is the case where the process is still running + Ok(None) => pg_isready(self.port), + + // Some other unknown error + Err(e) => Err(HealthCheckError::Unknown(e.to_string())), + } + } else { + Err(HealthCheckError::NotRunning) + } + } + + fn wait_till_ready(&mut self) -> Result<(), HealthCheckError> { + let start = Instant::now(); + + while start.elapsed() < Duration::from_secs(10) { + match self.health_check() { + Ok(()) => return Ok(()), + Err(HealthCheckError::NotReady) => {} + Err(HealthCheckError::NotRunning | HealthCheckError::Unknown(_)) => break, + } + + std::thread::sleep(Duration::from_millis(50)); + } + + Err(HealthCheckError::Unknown( + "timeout reached when waiting for service to be ready".to_owned(), + )) + } +} + +#[derive(Debug)] +enum HealthCheckError { + NotRunning, + NotReady, + #[allow(unused)] + Unknown(String), +} + +/// Run the postgres `pg_isready` command to get the status of database +/// +/// See for more info +fn pg_isready(port: u16) -> Result<(), HealthCheckError> { + let output = Command::new("pg_isready") + .arg("--host=localhost") + .arg("-p") + .arg(port.to_string()) + .arg("--username=postgres") + .output() + .map_err(|e| HealthCheckError::Unknown(format!("command not found: pg_ctl: {e}")))?; + + trace!("pg_isready code: {:?}", output.status.code()); + trace!("pg_isready output: {}", output.stderr.escape_ascii()); + trace!("pg_isready output: {}", output.stdout.escape_ascii()); + if output.status.success() { + Ok(()) + } else { + Err(HealthCheckError::NotReady) + } +} + +/// Run the postgres `initdb` command to initialize a database at the provided path +/// +/// See for more info +fn initdb(dir: &Path) -> Result<()> { + let output = Command::new("initdb") + .arg("-D") + .arg(dir) + .arg("--no-instructions") + .arg("--username=postgres") + .output() + .context("command not found: initdb")?; + + if output.status.success() { + Ok(()) + } else { + Err(anyhow!("unable to initialize database")) + } +} + +/// Return an ephemeral, available port. On unix systems, the port returned will be in the +/// TIME_WAIT state ensuring that the OS won't hand out this port for some grace period. +/// Callers should be able to bind to this port given they use SO_REUSEADDR. +pub fn get_available_port() -> u16 { + const MAX_PORT_RETRIES: u32 = 1000; + + for _ in 0..MAX_PORT_RETRIES { + if let Ok(port) = get_ephemeral_port() { + return port; + } + } + + panic!("Error: could not find an available port"); +} + +fn get_ephemeral_port() -> std::io::Result { + // Request a random available port from the OS + let listener = std::net::TcpListener::bind(("127.0.0.1", 0))?; + let addr = listener.local_addr()?; + + // Create and accept a connection (which we'll promptly drop) in order to force the port + // into the TIME_WAIT state, ensuring that the port will be reserved from some limited + // amount of time (roughly 60s on some Linux systems) + let _sender = std::net::TcpStream::connect(addr)?; + let _incoming = listener.accept()?; + + Ok(addr.port()) +} + +#[cfg(test)] +mod test { + #[tokio::test] + async fn smoketest() { + use crate::database::Connection; + use crate::tempdb::TempDb; + use diesel_async::RunQueryDsl; + + telemetry_subscribers::init_for_testing(); + + let db = TempDb::new().unwrap(); + println!("dir: {:?}", db.dir.path()); + + let url = db.database.url(); + println!("url: {}", url.as_str()); + let mut connection = Connection::dedicated(url).await.unwrap(); + + // Run a simple query to verify the db can properly be queried + let resp = diesel::sql_query("SELECT datname FROM pg_database") + .execute(&mut connection) + .await + .unwrap(); + println!("resp: {:?}", resp); + } +} diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 076082e646866..112bae5f3d088 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -94,8 +94,8 @@ pub async fn start_test_indexer_impl( // to prevent maxing out let pool_config = ConnectionPoolConfig { pool_size: 5, - connection_timeout: Duration::from_secs(5), - statement_timeout: Duration::from_secs(5), + connection_timeout: Duration::from_secs(10), + statement_timeout: Duration::from_secs(30), }; println!("db_url: {db_url}"); @@ -184,22 +184,6 @@ fn replace_db_name(db_url: &str, new_db_name: &str) -> (String, String) { ) } -pub async fn force_delete_database(db_url: String) { - // Replace the database name with the default `postgres`, which should be the last string after `/` - // This is necessary because you can't drop a database while being connected to it. - // Hence switch to the default `postgres` database to drop the active database. - let (default_db_url, db_name) = replace_db_name(&db_url, "postgres"); - let mut pool_config = ConnectionPoolConfig::default(); - pool_config.set_pool_size(1); - - let blocking_pool = new_connection_pool(&default_db_url, &pool_config).unwrap(); - blocking_pool - .get() - .unwrap() - .batch_execute(&format!("DROP DATABASE IF EXISTS {} WITH (FORCE)", db_name)) - .unwrap(); -} - #[derive(Clone)] pub struct SuiTransactionBlockResponseBuilder<'a> { response: SuiTransactionBlockResponse, diff --git a/crates/sui-indexer/tests/ingestion_tests.rs b/crates/sui-indexer/tests/ingestion_tests.rs index 0d4807f394754..77764db6fa94b 100644 --- a/crates/sui-indexer/tests/ingestion_tests.rs +++ b/crates/sui-indexer/tests/ingestion_tests.rs @@ -16,6 +16,8 @@ mod ingestion_tests { use sui_indexer::models::{objects::StoredObject, transactions::StoredTransaction}; use sui_indexer::schema::{objects, transactions}; use sui_indexer::store::{indexer_store::IndexerStore, PgIndexerStore}; + use sui_indexer::tempdb::get_available_port; + use sui_indexer::tempdb::TempDb; use sui_indexer::test_utils::{start_test_indexer, ReaderWriterConfig}; use sui_types::base_types::SuiAddress; use sui_types::effects::TransactionEffectsAPI; @@ -35,9 +37,6 @@ mod ingestion_tests { }}; } - const DEFAULT_SERVER_PORT: u16 = 3000; - const DEFAULT_DB_URL: &str = "postgres://postgres:postgrespw@localhost:5432/sui_indexer"; - /// Set up a test indexer fetching from a REST endpoint served by the given Simulacrum. async fn set_up( sim: Arc, @@ -46,8 +45,10 @@ mod ingestion_tests { JoinHandle<()>, PgIndexerStore, JoinHandle>, + TempDb, ) { - let server_url: SocketAddr = format!("127.0.0.1:{}", DEFAULT_SERVER_PORT) + let database = TempDb::new().unwrap(); + let server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port()) .parse() .unwrap(); @@ -58,13 +59,13 @@ mod ingestion_tests { }); // Starts indexer let (pg_store, pg_handle, _) = start_test_indexer( - Some(DEFAULT_DB_URL.to_owned()), + Some(database.database().url().as_str().to_owned()), format!("http://{}", server_url), ReaderWriterConfig::writer_mode(None, None), data_ingestion_path, ) .await; - (server_handle, pg_store, pg_handle) + (server_handle, pg_store, pg_handle, database) } /// Wait for the indexer to catch up to the given checkpoint sequence number. @@ -80,7 +81,7 @@ mod ingestion_tests { .unwrap(); cp_opt.is_none() || (cp_opt.unwrap() < checkpoint_sequence_number) } { - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } }) .await @@ -90,8 +91,9 @@ mod ingestion_tests { #[tokio::test] pub async fn test_transaction_table() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); let mut sim = Simulacrum::new(); - let data_ingestion_path = tempdir().unwrap().into_path(); + let data_ingestion_path = tempdir.path().to_path_buf(); sim.set_data_ingestion_path(data_ingestion_path.clone()); // Execute a simple transaction. @@ -103,7 +105,7 @@ mod ingestion_tests { // Create a checkpoint which should include the transaction we executed. let checkpoint = sim.create_checkpoint(); - let (_, pg_store, _) = set_up(Arc::new(sim), data_ingestion_path).await; + let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await; // Wait for the indexer to catch up to the checkpoint. wait_for_checkpoint(&pg_store, 1).await?; @@ -135,8 +137,9 @@ mod ingestion_tests { #[tokio::test] pub async fn test_object_type() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); let mut sim = Simulacrum::new(); - let data_ingestion_path = tempdir().unwrap().into_path(); + let data_ingestion_path = tempdir.path().to_path_buf(); sim.set_data_ingestion_path(data_ingestion_path.clone()); // Execute a simple transaction. @@ -148,7 +151,7 @@ mod ingestion_tests { // Create a checkpoint which should include the transaction we executed. let _ = sim.create_checkpoint(); - let (_, pg_store, _) = set_up(Arc::new(sim), data_ingestion_path).await; + let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await; // Wait for the indexer to catch up to the checkpoint. wait_for_checkpoint(&pg_store, 1).await?; diff --git a/crates/sui-transactional-test-runner/src/test_adapter.rs b/crates/sui-transactional-test-runner/src/test_adapter.rs index fce7f09ac3cb9..0137e16d29fab 100644 --- a/crates/sui-transactional-test-runner/src/test_adapter.rs +++ b/crates/sui-transactional-test-runner/src/test_adapter.rs @@ -40,9 +40,6 @@ use move_vm_runtime::session::SerializedReturnValues; use once_cell::sync::Lazy; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::fmt::{self, Write}; -use std::hash::Hash; -use std::hash::Hasher; -use std::path::PathBuf; use std::time::Duration; use std::{ collections::{BTreeMap, BTreeSet}, @@ -52,7 +49,6 @@ use std::{ use sui_core::authority::test_authority_builder::TestAuthorityBuilder; use sui_core::authority::AuthorityState; use sui_framework::DEFAULT_FRAMEWORK_PATH; -use sui_graphql_rpc::config::ConnectionConfig; use sui_graphql_rpc::test_infra::cluster::ExecutorCluster; use sui_graphql_rpc::test_infra::cluster::{serve_executor, SnapshotLagConfig}; use sui_json_rpc_api::QUERY_MAX_RESULT_LIMIT; @@ -229,7 +225,7 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { Self::ExtraInitArgs, )>, >, - path: &Path, + _path: &Path, ) -> (Self, Option) { let rng = StdRng::from_seed(RNG_SEED); assert!( @@ -341,7 +337,6 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { custom_validator_account, reference_gas_price, snapshot_config, - path.to_path_buf(), epochs_to_keep, ) .await @@ -2103,7 +2098,6 @@ async fn init_sim_executor( custom_validator_account: bool, reference_gas_price: Option, snapshot_config: SnapshotLagConfig, - test_file_path: PathBuf, epochs_to_keep: Option, ) -> ( Box, @@ -2173,26 +2167,7 @@ async fn init_sim_executor( let data_ingestion_path = tempdir().unwrap().into_path(); sim.set_data_ingestion_path(data_ingestion_path.clone()); - // Hash the file path to create custom unique DB name - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - test_file_path.hash(&mut hasher); - let hash = hasher.finish(); - let db_name = format!("sui_graphql_test_{}", hash); - - // Use the hash as a seed to generate a random port number - let base_port = hash as u16 % 8192; - - let graphql_port = 20000 + base_port; - let graphql_prom_port = graphql_port + 1; - let internal_data_port = graphql_prom_port + 1; - let cluster = serve_executor( - ConnectionConfig::ci_integration_test_cfg_with_db_name( - db_name, - graphql_port, - graphql_prom_port, - ), - internal_data_port, Arc::new(read_replica), Some(snapshot_config), epochs_to_keep,