diff --git a/.github/workflows/reuse.linux.yml b/.github/workflows/reuse.linux.yml index 51695f7813311..88b66953c5e73 100644 --- a/.github/workflows/reuse.linux.yml +++ b/.github/workflows/reuse.linux.yml @@ -120,6 +120,9 @@ jobs: needs: [ build, check ] steps: - uses: actions/checkout@v4 + - uses: ./.github/actions/setup_license + with: + runner_provider: ${{ inputs.runner_provider }} - uses: ./.github/actions/test_metactl timeout-minutes: 10 @@ -128,6 +131,9 @@ jobs: needs: [ build, check ] steps: - uses: actions/checkout@v4 + - uses: ./.github/actions/setup_license + with: + runner_provider: ${{ inputs.runner_provider }} - uses: ./.github/actions/test_compat_meta_query timeout-minutes: 10 @@ -136,6 +142,9 @@ jobs: needs: [ build, check ] steps: - uses: actions/checkout@v4 + - uses: ./.github/actions/setup_license + with: + runner_provider: ${{ inputs.runner_provider }} - uses: ./.github/actions/test_compat_fuse timeout-minutes: 20 @@ -144,6 +153,9 @@ jobs: needs: [ build, check ] steps: - uses: actions/checkout@v4 + - uses: ./.github/actions/setup_license + with: + runner_provider: ${{ inputs.runner_provider }} - uses: ./.github/actions/test_compat_meta_meta timeout-minutes: 20 @@ -160,6 +172,9 @@ jobs: needs: [ build, check ] steps: - uses: actions/checkout@v4 + - uses: ./.github/actions/setup_license + with: + runner_provider: ${{ inputs.runner_provider }} - uses: ./.github/actions/test_meta_cluster timeout-minutes: 10 diff --git a/Cargo.lock b/Cargo.lock index 095c067a7238f..611ee2ffbd6a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4868,7 +4868,12 @@ dependencies = [ name = "databend-enterprise-meta" version = "0.1.0" dependencies = [ + "anyhow", + "databend-common-base", "databend-common-building", + "databend-common-license", + "jwt-simple 0.11.9", + "log", ] [[package]] @@ -5002,6 +5007,7 @@ dependencies = [ "databend-common-building", "databend-common-grpc", "databend-common-http", + "databend-common-license", "databend-common-meta-api", "databend-common-meta-client", "databend-common-meta-kvapi", @@ -5011,6 +5017,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-metrics", "databend-common-tracing", + "databend-enterprise-meta", "deepsize", "derive_more", "env_logger", diff --git a/src/meta/ee/Cargo.toml b/src/meta/ee/Cargo.toml index e1635b15a063e..40871bb186407 100644 --- a/src/meta/ee/Cargo.toml +++ b/src/meta/ee/Cargo.toml @@ -12,6 +12,11 @@ doctest = false test = true [dependencies] +anyhow = { workspace = true } +databend-common-base = { workspace = true } +databend-common-license = { workspace = true } +jwt-simple = { workspace = true } +log = { workspace = true } [build-dependencies] databend-common-building = { workspace = true } diff --git a/src/meta/ee/src/lib.rs b/src/meta/ee/src/lib.rs index 793348751919e..67a7dd1c53aa5 100644 --- a/src/meta/ee/src/lib.rs +++ b/src/meta/ee/src/lib.rs @@ -11,3 +11,120 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt; +use databend_common_license::license::LicenseInfo; +use jwt_simple::algorithms::ECDSAP256KeyPairLike; +use jwt_simple::algorithms::ECDSAP256PublicKeyLike; +use jwt_simple::algorithms::ES256KeyPair; +use jwt_simple::algorithms::ES256PublicKey; +use jwt_simple::claims::Claims; +use jwt_simple::claims::JWTClaims; +use jwt_simple::prelude::Clock; + +#[derive(Clone, Default)] +pub struct MetaServiceEnterpriseGate { + /// License ES256 signed jwt claim token. + license_token: Arc>>, + + /// The public key to verify the license token. + public_key: String, +} + +impl MetaServiceEnterpriseGate { + const LICENSE_PUBLIC_KEY: &'static str = r#"-----BEGIN PUBLIC KEY----- +MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEGsKCbhXU7j56VKZ7piDlLXGhud0a +pWjW3wxSdeARerxs/BeoWK7FspDtfLaAT8iJe4YEmR0JpkRQ8foWs0ve3w== +-----END PUBLIC KEY-----"#; + + pub fn new(token: Option) -> Self { + Self { + license_token: Arc::new(Mutex::new(token)), + public_key: Self::LICENSE_PUBLIC_KEY.to_string(), + } + } + + /// Create with a temp license for testing. + pub fn new_testing() -> Self { + let lic = LicenseInfo { + r#type: Some("trial".to_string()), + org: Some("databend".to_string()), + tenants: Some(vec!["test".to_string()]), + features: None, + }; + + let key_pair = ES256KeyPair::generate(); + let claims = Claims::with_custom_claims(lic, jwt_simple::prelude::Duration::from_hours(2)); + let token = key_pair.sign(claims).unwrap(); + + let public_key = key_pair.public_key().to_pem().unwrap(); + + Self { + license_token: Arc::new(Mutex::new(Some(token))), + public_key, + } + } + + /// Parse the JWT token and restore the claims. + pub fn parse_jwt_token(&self, raw: &str) -> Result, anyhow::Error> { + let public_key = ES256PublicKey::from_pem(&self.public_key)?; + + let claim = public_key.verify_token::(raw, None)?; + + Ok(claim) + } + + fn check_license(&self, raw: &str) -> Result<(), anyhow::Error> { + let claim = self.parse_jwt_token(raw)?; + + let now = Clock::now_since_epoch(); + + if Some(now) > claim.expires_at { + let expires_at = claim.expires_at.unwrap_or_default(); + let unix_timestamp = Duration::from_millis(expires_at.as_millis()); + + return Err(anyhow::anyhow!(format!( + "License is expired at: {}", + unix_timestamp.display_unix_timestamp() + ))); + } + + Ok(()) + } + + pub fn assert_cluster_enabled(&self) -> Result<(), anyhow::Error> { + let token = { + let x = self.license_token.lock().unwrap(); + x.clone() + }; + + let Some(token) = token.as_ref() else { + log::error!( + "No license set in config `databend_enterprise_license`, clustering is disabled", + ); + return Err(anyhow::anyhow!( + "No license set in config `databend_enterprise_license`, clustering is disabled" + )); + }; + + if let Err(e) = self.check_license(token) { + log::error!("Check license failed: {}", e); + return Err(e); + } + + Ok(()) + } + + pub fn update_license(&self, license: String) -> Result<(), anyhow::Error> { + self.check_license(&license)?; + + let mut x = self.license_token.lock().unwrap(); + *x = Some(license); + + Ok(()) + } +} diff --git a/src/meta/raft-store/src/config.rs b/src/meta/raft-store/src/config.rs index 9c4f383cfa46d..ad0a4061493d1 100644 --- a/src/meta/raft-store/src/config.rs +++ b/src/meta/raft-store/src/config.rs @@ -131,6 +131,12 @@ pub struct RaftConfig { /// Max timeout(in milli seconds) when waiting a cluster leader. pub wait_leader_timeout: u64, + + /// License token in string to enable enterprise features(including: `cluster`) + pub databend_enterprise_license: Option, + + /// For test only: whether to fake an enterprise license. + pub fake_ee_license: bool, } pub fn get_default_raft_advertise_host() -> String { @@ -172,6 +178,8 @@ impl Default for RaftConfig { sled_max_cache_size_mb: 10 * 1024, cluster_name: "foo_cluster".to_string(), wait_leader_timeout: 70000, + databend_enterprise_license: None, + fake_ee_license: false, } } } diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index d4f541a72bb01..e4db9231ee9b0 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -31,6 +31,7 @@ databend-common-arrow = { workspace = true } databend-common-base = { workspace = true } databend-common-grpc = { workspace = true } databend-common-http = { workspace = true } +databend-common-license = { workspace = true } databend-common-meta-api = { workspace = true } databend-common-meta-client = { workspace = true } databend-common-meta-kvapi = { workspace = true } @@ -40,6 +41,7 @@ databend-common-meta-stoerr = { workspace = true } databend-common-meta-types = { workspace = true } databend-common-metrics = { workspace = true } databend-common-tracing = { workspace = true } +databend-enterprise-meta = { workspace = true } deepsize = { workspace = true } derive_more = { workspace = true } fastrace = { workspace = true } diff --git a/src/meta/service/src/api/http/v1/ctrl.rs b/src/meta/service/src/api/http/v1/ctrl.rs index 2578db12c70b8..5e51be9eb087c 100644 --- a/src/meta/service/src/api/http/v1/ctrl.rs +++ b/src/meta/service/src/api/http/v1/ctrl.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Arc; +use databend_common_license::display_jwt_claims::DisplayJWTClaimsExt; use databend_common_meta_sled_store::openraft::async_runtime::watch::WatchReceiver; use databend_common_meta_types::NodeId; use http::StatusCode; @@ -127,3 +129,52 @@ pub async fn trigger_transfer_leader( voter_ids, })) } + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct UpdateLicenseQuery { + pub(crate) license: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct UpdateLicenseResponse { + pub from: NodeId, + pub to: NodeId, + pub voter_ids: Vec, +} + +/// Update the `databend_enterprise_license` of this meta node. +#[poem::handler] +pub async fn update_license( + meta_node: Data<&Arc>, + query: Option>, +) -> poem::Result { + let Some(query) = query else { + return Err(poem::Error::from_string( + "Invalid license", + StatusCode::BAD_REQUEST, + )); + }; + + let metrics = meta_node.raft.metrics().borrow_watched().clone(); + let id = metrics.id; + + let saved = meta_node + .ee_gate + .parse_jwt_token(query.license.as_str()) + .map_err(|e| { + poem::Error::from_string(format!("Invalid license: {}", e), StatusCode::BAD_REQUEST) + })?; + + meta_node + .ee_gate + .update_license(query.license.clone()) + .map_err(|e| poem::Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))?; + + let claim_str = saved.display_jwt_claims().to_string(); + info!("id={} Updated license: {}", id, claim_str); + + let mut resp = BTreeMap::new(); + resp.insert("Success", claim_str); + + Ok(Json(resp)) +} diff --git a/src/meta/service/src/api/http_service.rs b/src/meta/service/src/api/http_service.rs index d376a77698cd0..43f0ee68a5ff9 100644 --- a/src/meta/service/src/api/http_service.rs +++ b/src/meta/service/src/api/http_service.rs @@ -65,6 +65,10 @@ impl HttpService { "/v1/ctrl/trigger_transfer_leader", get(super::http::v1::ctrl::trigger_transfer_leader), ) + .at( + "/v1/ctrl/update_license", + get(super::http::v1::ctrl::update_license), + ) .at( "/v1/cluster/nodes", get(super::http::v1::cluster_state::nodes_handler), diff --git a/src/meta/service/src/configs/outer_v0.rs b/src/meta/service/src/configs/outer_v0.rs index 68e61cba82ad6..f76a3413ccbf9 100644 --- a/src/meta/service/src/configs/outer_v0.rs +++ b/src/meta/service/src/configs/outer_v0.rs @@ -303,6 +303,7 @@ pub struct ConfigViaEnv { pub sled_tree_prefix: String, pub sled_max_cache_size_mb: u64, pub cluster_name: String, + pub databend_enterprise_license: Option, } impl Default for ConfigViaEnv { @@ -356,6 +357,7 @@ impl From for ConfigViaEnv { sled_tree_prefix: cfg.raft_config.sled_tree_prefix, sled_max_cache_size_mb: cfg.raft_config.sled_max_cache_size_mb, cluster_name: cfg.raft_config.cluster_name, + databend_enterprise_license: cfg.raft_config.databend_enterprise_license, } } } @@ -393,6 +395,7 @@ impl Into for ConfigViaEnv { sled_tree_prefix: self.sled_tree_prefix, sled_max_cache_size_mb: self.sled_max_cache_size_mb, cluster_name: self.cluster_name, + databend_enterprise_license: self.databend_enterprise_license, }; let log_config = LogConfig { file: FileLogConfig { @@ -564,6 +567,10 @@ pub struct RaftConfig { /// Max timeout(in milli seconds) when waiting a cluster leader. #[clap(long, default_value = "180000")] pub wait_leader_timeout: u64, + + /// License token in string to enable enterprise features(including: `cluster`) + #[clap(long, value_name = "VALUE")] + pub databend_enterprise_license: Option, } // TODO(rotbl): should not be used. @@ -602,6 +609,8 @@ impl From for InnerRaftConfig { sled_max_cache_size_mb: x.sled_max_cache_size_mb, cluster_name: x.cluster_name, wait_leader_timeout: x.wait_leader_timeout, + databend_enterprise_license: x.databend_enterprise_license, + fake_ee_license: false, } } } @@ -635,6 +644,7 @@ impl From for RaftConfig { sled_max_cache_size_mb: inner.sled_max_cache_size_mb, cluster_name: inner.cluster_name, wait_leader_timeout: inner.wait_leader_timeout, + databend_enterprise_license: inner.databend_enterprise_license, } } } diff --git a/src/meta/service/src/meta_service/meta_node.rs b/src/meta/service/src/meta_service/meta_node.rs index b2f8777ec1d65..bf0b2b07de6f5 100644 --- a/src/meta/service/src/meta_service/meta_node.rs +++ b/src/meta/service/src/meta_service/meta_node.rs @@ -58,6 +58,7 @@ use databend_common_meta_types::Node; use databend_common_meta_types::NodeId; use databend_common_meta_types::RaftMetrics; use databend_common_meta_types::TypeConfig; +use databend_enterprise_meta::MetaServiceEnterpriseGate; use fastrace::func_name; use fastrace::prelude::*; use futures::channel::oneshot; @@ -107,6 +108,7 @@ pub struct MetaNode { pub sto: RaftStore, pub dispatcher_handle: EventDispatcherHandle, pub raft: MetaRaft, + pub(crate) ee_gate: MetaServiceEnterpriseGate, pub running_tx: watch::Sender<()>, pub running_rx: watch::Receiver<()>, pub join_handles: Mutex>>>, @@ -122,6 +124,7 @@ impl Opened for MetaNode { pub struct MetaNodeBuilder { node_id: Option, raft_config: Option, + ee_gate: MetaServiceEnterpriseGate, sto: Option, raft_service_endpoint: Option, } @@ -142,7 +145,9 @@ impl MetaNodeBuilder { .take() .ok_or_else(|| MetaStartupError::InvalidConfig(String::from("sto is not set")))?; - let net = NetworkFactory::new(sto.clone()); + let ee_gate = self.ee_gate.clone(); + + let net = NetworkFactory::new(sto.clone(), ee_gate.clone()); let log_store = sto.clone(); let sm_store = sto.clone(); @@ -163,6 +168,7 @@ impl MetaNodeBuilder { sto: sto.clone(), dispatcher_handle: EventDispatcherHandle::new(dispatcher_tx), raft: raft.clone(), + ee_gate, running_tx: tx, running_rx: rx, join_handles: Mutex::new(Vec::new()), @@ -208,11 +214,27 @@ impl MetaNodeBuilder { impl MetaNode { pub fn builder(config: &RaftConfig) -> MetaNodeBuilder { + let ee_gate = if config.fake_ee_license { + MetaServiceEnterpriseGate::new_testing() + } else { + // read env var QUERY_DATABEND_ENTERPRISE_LICENSE: + // - if it is set, use it as the license. + // - if it is not set, use the license in config. + + let license = std::env::var("QUERY_DATABEND_ENTERPRISE_LICENSE"); + if let Ok(token) = license { + MetaServiceEnterpriseGate::new(Some(token)) + } else { + MetaServiceEnterpriseGate::new(config.databend_enterprise_license.clone()) + } + }; + let raft_config = MetaNode::new_raft_config(config); MetaNodeBuilder { node_id: None, raft_config: Some(raft_config), + ee_gate, sto: None, raft_service_endpoint: None, } diff --git a/src/meta/service/src/meta_service/raft_service_impl.rs b/src/meta/service/src/meta_service/raft_service_impl.rs index fb229a3934828..e98041f47c3c9 100644 --- a/src/meta/service/src/meta_service/raft_service_impl.rs +++ b/src/meta/service/src/meta_service/raft_service_impl.rs @@ -50,6 +50,7 @@ use databend_common_meta_types::StorageError; use databend_common_meta_types::Vote; use databend_common_meta_types::VoteRequest; use databend_common_metrics::count::Count; +use databend_common_tracing::start_trace_for_remote_request; use fastrace::full_name; use fastrace::prelude::*; use futures::TryStreamExt; @@ -276,7 +277,7 @@ impl RaftServiceImpl { #[async_trait::async_trait] impl RaftService for RaftServiceImpl { async fn forward(&self, request: Request) -> Result, Status> { - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let root = start_trace_for_remote_request(full_name!(), &request); async { let forward_req: ForwardRequest = GrpcHelper::parse_req(request)?; @@ -299,7 +300,7 @@ impl RaftService for RaftServiceImpl { &self, request: Request, ) -> Result, Status> { - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let root = start_trace_for_remote_request(full_name!(), &request); async { let forward_req: ForwardRequest = GrpcHelper::parse_req(request)?; @@ -323,7 +324,7 @@ impl RaftService for RaftServiceImpl { &self, request: Request, ) -> Result, Status> { - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let root = start_trace_for_remote_request(full_name!(), &request); async { self.incr_meta_metrics_recv_bytes_from_peer(&request); @@ -351,7 +352,7 @@ impl RaftService for RaftServiceImpl { &self, request: Request, ) -> Result, Status> { - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let root = start_trace_for_remote_request(full_name!(), &request); self.do_install_snapshot_v1(request).in_span(root).await } @@ -359,12 +360,12 @@ impl RaftService for RaftServiceImpl { &self, request: Request>, ) -> Result, Status> { - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let root = start_trace_for_remote_request(full_name!(), &request); self.do_install_snapshot_v003(request).in_span(root).await } async fn vote(&self, request: Request) -> Result, Status> { - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let root = start_trace_for_remote_request(full_name!(), &request); async { self.incr_meta_metrics_recv_bytes_from_peer(&request); @@ -391,7 +392,7 @@ impl RaftService for RaftServiceImpl { &self, request: Request, ) -> Result, Status> { - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let root = start_trace_for_remote_request(full_name!(), &request); let fu = async { let req = request.into_inner(); let req: TransferLeaderRequest = req.try_into()?; diff --git a/src/meta/service/src/network.rs b/src/meta/service/src/network.rs index a7e39547c3d1e..22ac52ae8b2c7 100644 --- a/src/meta/service/src/network.rs +++ b/src/meta/service/src/network.rs @@ -24,6 +24,7 @@ use backon::ExponentialBuilder; use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::mpsc; use databend_common_base::base::tokio::time::Instant; +use databend_common_base::display::display_slice::DisplaySliceExt; use databend_common_base::future::TimedFutureExt; use databend_common_base::runtime; use databend_common_meta_raft_store::leveled_store::db_exporter::DBExporter; @@ -68,6 +69,7 @@ use databend_common_meta_types::Vote; use databend_common_meta_types::VoteRequest; use databend_common_meta_types::VoteResponse; use databend_common_metrics::count::Count; +use databend_enterprise_meta::MetaServiceEnterpriseGate; use fastrace::func_name; use futures::FutureExt; use futures::TryStreamExt; @@ -75,7 +77,6 @@ use log::debug; use log::error; use log::info; use log::warn; -use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use crate::metrics::raft_metrics; @@ -133,17 +134,20 @@ impl Default for Backoff { } } -#[derive(Clone)] pub struct NetworkFactory { sto: RaftStore, + /// Enterprise edition features gate + ee_gate: MetaServiceEnterpriseGate, + backoff: Backoff, } impl NetworkFactory { - pub fn new(sto: RaftStore) -> NetworkFactory { + pub fn new(sto: RaftStore, ee_gate: MetaServiceEnterpriseGate) -> NetworkFactory { NetworkFactory { sto, + ee_gate, backoff: Backoff::default(), } } @@ -159,7 +163,10 @@ pub struct Network { /// The endpoint of the target node. endpoint: Endpoint, - client: Mutex>, + /// Enterprise edition features gate + ee_gate: MetaServiceEnterpriseGate, + + raft_client: std::sync::Mutex>, sto: RaftStore, @@ -170,22 +177,31 @@ impl Network { /// Create a new RaftClient to the specified target node. #[logcall::logcall(err = "debug")] #[fastrace::trace] - pub async fn new_client(&self, addr: &str) -> Result { - info!(id = self.id; "Raft NetworkConnection connect: target={}: {}", self.target, addr); + pub async fn new_raft_client(&self, addr: &str) -> Result { + info!( + "id={} Raft NetworkConnection connect: target={}: {}", + self.id, self.target, addr + ); + + if let Err(e) = self.ee_gate.assert_cluster_enabled() { + return Err(Unreachable::new(&AnyError::error(e.to_string()))); + }; - let channel = tonic::transport::Endpoint::new(addr.to_string())? + let channel = tonic::transport::Endpoint::new(addr.to_string()) + .map_err(|e| Unreachable::new(&e))? .connect() .log_elapsed_debug(format!( - "Raft NetworkConnection new_client: connect target: {}", - self.target + "id={} Raft NetworkConnection new_client: connect target: {}", + self.id, self.target )) - .await?; + .await + .map_err(|e| Unreachable::new(&e))?; let client = RaftClientApi::new(self.target, self.endpoint.clone(), channel); info!( - "Raft NetworkConnection connected to: target={}: {}", - self.target, addr + "id={} Raft NetworkConnection connected to: target={}: {}", + self.id, self.target, addr ); Ok(client) @@ -195,13 +211,17 @@ impl Network { #[logcall::logcall(err = "debug")] #[fastrace::trace] async fn take_client(&mut self) -> Result { - let mut client = self.client.lock().await; + { + let mut client = self.raft_client.lock().unwrap(); - if let Some(c) = client.take() { - return Ok(c); + if let Some(c) = client.take() { + return Ok(c); + } } let n = 3; + let mut errors = vec![]; + for _i in 0..n { let endpoint = self .lookup_target_address() @@ -225,7 +245,7 @@ impl Network { let addr = format!("http://{}", self.endpoint); - let res = self.new_client(&addr).await; + let res = self.new_raft_client(&addr).await; match res { Ok(c) => { return Ok(c); @@ -235,14 +255,17 @@ impl Network { "Raft NetworkConnection fail to connect: target={}: addr={}: {}", self.target, &addr, e ); + errors.push(e); tokio::time::sleep(Duration::from_millis(50)).await; } } } let any_err = AnyError::error(format!( - "Raft NetworkConnection fail to connect: target={}, retry={}", - self.target, n + "Raft NetworkConnection fail to connect: target={}, retry={}, errors={}", + self.target, + n, + errors.display() )); error!("{}", any_err); @@ -502,7 +525,7 @@ impl Network { match &grpc_res { Ok(_) => { - self.client.lock().await.replace(client); + self.raft_client.lock().unwrap().replace(client); } Err(e) => { warn!(target = self.target; "install_snapshot failed: {}", e); @@ -686,7 +709,7 @@ impl RaftNetworkV2 for Network { match &grpc_res { Ok(_) => { - self.client.lock().await.replace(client); + self.raft_client.lock().unwrap().replace(client); } Err(e) => { warn!(target = self.target, rpc = rpc.summary(); "append_entries failed: {}", e); @@ -787,7 +810,7 @@ impl RaftNetworkV2 for Network { match &grpc_res { Ok(_) => { - self.client.lock().await.replace(client); + self.raft_client.lock().unwrap().replace(client); } Err(e) => { warn!(target = self.target, rpc = rpc.summary(); "vote failed: {}", e); @@ -823,7 +846,7 @@ impl RaftNetworkV2 for Network { match &grpc_res { Ok(_) => { - self.client.lock().await.replace(client); + self.raft_client.lock().unwrap().replace(client); } Err(e) => { warn!(target = self.target; "{} failed: {}", func_name!(), e); @@ -861,7 +884,8 @@ impl RaftNetworkFactory for NetworkFactory { sto: self.sto.clone(), backoff: self.backoff.clone(), endpoint: Default::default(), - client: Default::default(), + ee_gate: self.ee_gate.clone(), + raft_client: Default::default(), } } } diff --git a/src/meta/service/src/raft_client.rs b/src/meta/service/src/raft_client.rs index 5428893fbe216..626e619a71ec3 100644 --- a/src/meta/service/src/raft_client.rs +++ b/src/meta/service/src/raft_client.rs @@ -57,6 +57,7 @@ impl RaftClientApi for RaftClient { let cli = RaftServiceClient::new(channel) .max_decoding_message_size(GrpcConfig::MAX_DECODING_SIZE) .max_encoding_message_size(GrpcConfig::MAX_ENCODING_SIZE); + count::WithCount::new(cli, PeerCounter { target, endpoint, diff --git a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs index dfafa3b783131..4798350b4bca2 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs @@ -550,7 +550,7 @@ async fn test_meta_node_forbid_leave_leader_via_leader() -> anyhow::Result<()> { async fn test_meta_node_restart() -> anyhow::Result<()> { // - Start a leader and a non-voter; // - Restart them. - // - Check old data an new written data. + // - Check old data and new written data. let tc0 = { let mut tc = MetaSrvTestContext::new(0); @@ -615,7 +615,8 @@ async fn test_meta_node_restart() -> anyhow::Result<()> { info!("restart all"); // restart - let config = configs::Config::default(); + let mut config = configs::Config::default(); + config.raft_config.fake_ee_license = true; let mn0 = MetaNode::builder(&config.raft_config) .node_id(0) .sto(sto0) diff --git a/src/meta/service/tests/it/tests/service.rs b/src/meta/service/tests/it/tests/service.rs index c8a43f53f5879..c54de3a8e990b 100644 --- a/src/meta/service/tests/it/tests/service.rs +++ b/src/meta/service/tests/it/tests/service.rs @@ -158,6 +158,9 @@ impl MetaSrvTestContext { // We use a single sled db for all unit test. Every unit test need a unique prefix so that it opens different tree. config.raft_config.sled_tree_prefix = format!("test-{}-", config_id); + // For testing, we use a fake license to enable clustering functions + config.raft_config.fake_ee_license = true; + { let grpc_port = next_port(); config.grpc_api_address = format!("{}:{}", host, grpc_port);