Skip to content

Commit 43de923

Browse files
authored
refactor(metactl): change to subcommands & support transfer-leader (#16254)
1 parent 81deb35 commit 43de923

File tree

9 files changed

+442
-209
lines changed

9 files changed

+442
-209
lines changed

โ€ŽCargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/meta/binaries/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ fastrace = { workspace = true }
4444
futures = { workspace = true }
4545
log = { workspace = true }
4646
rand = { workspace = true }
47+
reqwest = { workspace = true }
4748
serde = { workspace = true }
4849
serde_json = { workspace = true }
4950
tokio = { workspace = true }
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use reqwest::Client;
16+
use serde::Deserialize;
17+
18+
pub struct MetaAdminClient {
19+
client: Client,
20+
endpoint: String,
21+
}
22+
23+
impl MetaAdminClient {
24+
pub fn new(addr: &str) -> Self {
25+
let client = Client::new();
26+
MetaAdminClient {
27+
client,
28+
endpoint: format!("http://{}", addr),
29+
}
30+
}
31+
32+
pub async fn status(&self) -> anyhow::Result<AdminStatusResponse> {
33+
let resp = self
34+
.client
35+
.get(format!("{}/v1/cluster/status", self.endpoint))
36+
.send()
37+
.await?;
38+
let status = resp.status();
39+
if status.is_success() {
40+
let result = resp.json::<AdminStatusResponse>().await?;
41+
Ok(result)
42+
} else {
43+
let data = resp.bytes().await?;
44+
let msg = String::from_utf8_lossy(&data);
45+
Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg))
46+
}
47+
}
48+
49+
pub async fn transfer_leader(
50+
&self,
51+
target: Option<u64>,
52+
) -> anyhow::Result<AdminTransferLeaderResponse> {
53+
let resp = match target {
54+
Some(to) => {
55+
self.client
56+
.get(format!(
57+
"{}/v1/ctrl/trigger_transfer_leader?to={}",
58+
self.endpoint, to
59+
))
60+
.send()
61+
.await?
62+
}
63+
None => {
64+
self.client
65+
.get(format!("{}/v1/ctrl/trigger_transfer_leader", self.endpoint))
66+
.send()
67+
.await?
68+
}
69+
};
70+
let status = resp.status();
71+
if status.is_success() {
72+
let result = resp.json::<AdminTransferLeaderResponse>().await?;
73+
Ok(result)
74+
} else {
75+
let data = resp.bytes().await?;
76+
let msg = String::from_utf8_lossy(&data);
77+
Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg))
78+
}
79+
}
80+
}
81+
82+
#[derive(Deserialize, Debug)]
83+
pub struct AdminStatusResponse {
84+
pub name: String,
85+
}
86+
87+
#[derive(Deserialize, Debug)]
88+
pub struct AdminTransferLeaderResponse {
89+
pub from: u64,
90+
pub to: u64,
91+
pub voter_ids: Vec<u64>,
92+
}

โ€Žsrc/meta/binaries/metactl/export_from_disk.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,29 @@ use databend_meta::store::StoreInner;
2121
use futures::TryStreamExt;
2222

2323
use crate::upgrade;
24-
use crate::Config;
24+
use crate::ExportArgs;
2525

2626
/// Print the entire sled db.
2727
///
2828
/// The output encodes every key-value into one line:
2929
/// `[sled_tree_name, {key_space: {key, value}}]`
3030
/// E.g.:
3131
/// `["state_machine/0",{"GenericKV":{"key":"wow","value":{"seq":3,"meta":null,"data":[119,111,119]}}}`
32-
pub async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
33-
upgrade::upgrade(config).await?;
32+
pub async fn export_from_dir(args: &ExportArgs) -> anyhow::Result<()> {
33+
let raft_config: RaftConfig = args.clone().into();
34+
upgrade::upgrade(&raft_config).await?;
3435

3536
eprintln!();
3637
eprintln!("Export:");
3738

38-
let raft_config: RaftConfig = config.clone().into();
39-
4039
let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?;
4140
let mut lines = Arc::new(sto_inn).export();
4241

4342
eprintln!(" From: {}", raft_config.raft_dir);
4443

45-
let file: Option<File> = if !config.db.is_empty() {
46-
eprintln!(" To: File: {}", config.db);
47-
Some((File::create(&config.db))?)
44+
let file: Option<File> = if !args.db.is_empty() {
45+
eprintln!(" To: File: {}", args.db);
46+
Some((File::create(&args.db))?)
4847
} else {
4948
eprintln!(" To: <stdout>");
5049
None

โ€Žsrc/meta/binaries/metactl/export_from_grpc.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,19 @@ use databend_common_meta_types::protobuf;
2525
use tokio::net::TcpSocket;
2626
use tokio_stream::StreamExt;
2727

28-
use crate::Config;
28+
use crate::ExportArgs;
2929

3030
/// Dump metasrv data, raft-log, state machine etc in json to stdout.
31-
pub async fn export_from_running_node(config: &Config) -> Result<(), anyhow::Error> {
31+
pub async fn export_from_running_node(args: &ExportArgs) -> Result<(), anyhow::Error> {
3232
eprintln!();
3333
eprintln!("Export:");
34-
eprintln!(" From: online meta-service: {}", config.grpc_api_address);
35-
eprintln!(" Export To: {}", config.db);
36-
eprintln!(" Export Chunk Size: {:?}", config.export_chunk_size);
37-
38-
let grpc_api_addr = get_available_socket_addr(&config.grpc_api_address).await?;
39-
40-
export_from_grpc(
41-
grpc_api_addr.to_string().as_str(),
42-
config.db.clone(),
43-
config.export_chunk_size,
44-
)
45-
.await?;
34+
eprintln!(" From: online meta-service: {}", args.grpc_api_address);
35+
eprintln!(" Export To: {}", args.db);
36+
eprintln!(" Export Chunk Size: {:?}", args.chunk_size);
37+
38+
let grpc_api_addr = get_available_socket_addr(args.grpc_api_address.as_str()).await?;
39+
let addr = grpc_api_addr.to_string();
40+
export_from_grpc(addr.as_str(), args.db.clone(), args.chunk_size).await?;
4641
Ok(())
4742
}
4843

โ€Žsrc/meta/binaries/metactl/import.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,38 +55,38 @@ use url::Url;
5555

5656
use crate::reading;
5757
use crate::upgrade;
58-
use crate::Config;
58+
use crate::ImportArgs;
5959

60-
pub async fn import_data(config: &Config) -> anyhow::Result<()> {
61-
let raft_dir = config.raft_dir.clone().unwrap_or_default();
60+
pub async fn import_data(args: &ImportArgs) -> anyhow::Result<()> {
61+
let raft_dir = args.raft_dir.clone().unwrap_or_default();
6262

6363
eprintln!();
6464
eprintln!("Import:");
6565
eprintln!(" Into Meta Dir: '{}'", raft_dir);
66-
eprintln!(" Initialize Cluster with Id: {}, cluster: {{", config.id);
67-
for peer in config.initial_cluster.clone() {
66+
eprintln!(" Initialize Cluster with Id: {}, cluster: {{", args.id);
67+
for peer in args.initial_cluster.clone() {
6868
eprintln!(" Peer: {}", peer);
6969
}
7070
eprintln!(" }}");
7171

72-
let nodes = build_nodes(config.initial_cluster.clone(), config.id)?;
72+
let nodes = build_nodes(args.initial_cluster.clone(), args.id)?;
7373

7474
init_sled_db(raft_dir.clone(), 64 * 1024 * 1024 * 1024);
7575

76-
clear(config)?;
77-
let max_log_id = import_from_stdin_or_file(config).await?;
76+
clear(args)?;
77+
let max_log_id = import_from_stdin_or_file(args).await?;
7878

79-
if config.initial_cluster.is_empty() {
79+
if args.initial_cluster.is_empty() {
8080
return Ok(());
8181
}
8282

83-
init_new_cluster(config, nodes, max_log_id, config.id).await?;
83+
init_new_cluster(args, nodes, max_log_id).await?;
8484
Ok(())
8585
}
8686

8787
/// Import from lines of exported data and Return the max log id that is found.
8888
async fn import_lines<B: BufRead + 'static>(
89-
config: &Config,
89+
raft_config: RaftConfig,
9090
lines: Lines<B>,
9191
) -> anyhow::Result<Option<LogId>> {
9292
#[allow(clippy::useless_conversion)]
@@ -106,8 +106,8 @@ async fn import_lines<B: BufRead + 'static>(
106106
please use an older version databend-metactl to import from V001"
107107
));
108108
}
109-
DataVersion::V002 => import_v002(config, it).await?,
110-
DataVersion::V003 => import_v003(config, it).await?,
109+
DataVersion::V002 => import_v002(raft_config, it).await?,
110+
DataVersion::V003 => import_v003(raft_config, it).await?,
111111
};
112112

113113
Ok(max_log_id)
@@ -119,11 +119,11 @@ async fn import_lines<B: BufRead + 'static>(
119119
///
120120
/// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
121121
async fn import_v002(
122-
config: &Config,
122+
raft_config: RaftConfig,
123123
lines: impl IntoIterator<Item = Result<String, io::Error>>,
124124
) -> anyhow::Result<Option<LogId>> {
125125
// v002 and v003 share the same exported data format.
126-
import_v003(config, lines).await
126+
import_v003(raft_config, lines).await
127127
}
128128

129129
/// Import serialized lines for `DataVersion::V003`
@@ -132,11 +132,9 @@ async fn import_v002(
132132
///
133133
/// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
134134
async fn import_v003(
135-
config: &Config,
135+
raft_config: RaftConfig,
136136
lines: impl IntoIterator<Item = Result<String, io::Error>>,
137137
) -> anyhow::Result<Option<LogId>> {
138-
let raft_config: RaftConfig = config.clone().into();
139-
140138
let db = get_sled_db();
141139

142140
let mut n = 0;
@@ -221,24 +219,26 @@ async fn import_v003(
221219
/// Insert them into sled db and flush.
222220
///
223221
/// Finally upgrade the data in raft_dir to the latest version.
224-
async fn import_from_stdin_or_file(config: &Config) -> anyhow::Result<Option<LogId>> {
225-
let restore = config.db.clone();
222+
async fn import_from_stdin_or_file(args: &ImportArgs) -> anyhow::Result<Option<LogId>> {
223+
let restore = args.db.clone();
226224

225+
let raft_config: RaftConfig = args.clone().into();
227226
let max_log_id = if restore.is_empty() {
228227
eprintln!(" From: <stdin>");
229228
let lines = io::stdin().lines();
230229

231-
import_lines(config, lines).await?
230+
import_lines(raft_config, lines).await?
232231
} else {
233-
eprintln!(" From: {}", config.db);
232+
eprintln!(" From: {}", args.db);
234233
let file = File::open(restore)?;
235234
let reader = BufReader::new(file);
236235
let lines = reader.lines();
237236

238-
import_lines(config, lines).await?
237+
import_lines(raft_config, lines).await?
239238
};
240239

241-
upgrade::upgrade(config).await?;
240+
let raft_config: RaftConfig = args.clone().into();
241+
upgrade::upgrade(&raft_config).await?;
242242

243243
Ok(max_log_id)
244244
}
@@ -298,16 +298,15 @@ fn build_nodes(initial_cluster: Vec<String>, id: u64) -> anyhow::Result<BTreeMap
298298

299299
// initial_cluster format: node_id=endpoint,grpc_api_addr;
300300
async fn init_new_cluster(
301-
config: &Config,
301+
args: &ImportArgs,
302302
nodes: BTreeMap<NodeId, Node>,
303303
max_log_id: Option<LogId>,
304-
id: u64,
305304
) -> anyhow::Result<()> {
306305
eprintln!();
307306
eprintln!("Initialize Cluster with: {:?}", nodes);
308307

309308
let db = get_sled_db();
310-
let raft_config: RaftConfig = config.clone().into();
309+
let raft_config: RaftConfig = args.clone().into();
311310

312311
let mut sto = RaftStore::open_create(&raft_config, Some(()), None).await?;
313312

@@ -375,13 +374,13 @@ async fn init_new_cluster(
375374

376375
// Reset node id
377376
let raft_state = RaftState::open_create(&db, &raft_config, Some(()), None).await?;
378-
raft_state.set_node_id(id).await?;
377+
raft_state.set_node_id(args.id).await?;
379378

380379
Ok(())
381380
}
382381

383382
/// Clear all sled data and on-disk snapshot.
384-
fn clear(config: &Config) -> anyhow::Result<()> {
383+
fn clear(args: &ImportArgs) -> anyhow::Result<()> {
385384
eprintln!();
386385
eprintln!("Clear All Sled Trees Before Import:");
387386
let db = get_sled_db();
@@ -394,7 +393,7 @@ fn clear(config: &Config) -> anyhow::Result<()> {
394393
eprintln!(" Cleared sled tree: {}", name);
395394
}
396395

397-
let df_meta_path = format!("{}/df_meta", config.raft_dir.clone().unwrap_or_default());
396+
let df_meta_path = format!("{}/df_meta", args.raft_dir.clone().unwrap_or_default());
398397
if Path::new(&df_meta_path).exists() {
399398
remove_dir_all(&df_meta_path)?;
400399
}

0 commit comments

Comments
ย (0)