Skip to content

Introduce cluster support #299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: cluster-support-feature-branch
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ features = {}

[dependencies.typedb-driver]
features = []
rev = "41e42b94ef0bee98d59ae627e46c45acb14a0700"
git = "https://github.com/typedb/typedb-driver"
tag = "3.4.0"
default-features = false

[dependencies.futures]
Expand Down
14 changes: 12 additions & 2 deletions dependencies/typedb/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ def typedb_dependencies():
)

def typedb_driver():
# TODO: Return typedb
# native.local_repository(
# name = "typedb_driver",
# path = "../typedb-driver",
# )
git_repository(
name = "typedb_driver",
remote = "https://github.com/typedb/typedb-driver",
tag = "3.4.0", # sync-marker: do not remove this comment, this is used for sync-dependencies by @typedb_driver
remote = "https://github.com/farost/typedb-driver",
commit = "41e42b94ef0bee98d59ae627e46c45acb14a0700", # sync-marker: do not remove this comment, this is used for sync-dependencies by @typedb_driver
)
# git_repository(
# name = "typedb_driver",
# remote = "https://github.com/typedb/typedb-driver",
# tag = "3.4.0", # sync-marker: do not remove this comment, this is used for sync-dependencies by @typedb_driver
# )

def typeql():
git_repository(
Expand Down
28 changes: 23 additions & 5 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,27 @@ pub struct Args {
#[arg(long, value_name = "path to script file")]
pub script: Vec<String>,

/// TypeDB address to connect to. If using TLS encryption, this must start with "https://"
#[arg(long, value_name = "host:port")]
pub address: String,
/// TypeDB address to connect to (host:port). If using TLS encryption, this must start with "https://".
#[arg(long, value_name = "host:port", conflicts_with_all = ["addresses", "address_translation"])]
pub address: Option<String>,

/// A comma-separated list of TypeDB replica addresses of a single cluster to connect to.
#[arg(long, value_name = "host1:port1,host2:port2", conflicts_with_all = ["address", "address_translation"])]
pub addresses: Option<String>,

/// A comma-separated list of public=private address pairs. Public addresses are the user-facing
/// addresses of the replicas, and private addresses are the addresses used for the server-side
/// connection between replicas.
#[arg(long, value_name = "public=private,...", conflicts_with_all = ["address", "addresses"])]
pub address_translation: Option<String>,

/// If used in a cluster environment, disables attempts to redirect requests to server replicas,
/// limiting Console to communicate only with the single address specified in the `address`
/// argument.
/// Use for administrative / debug purposes to test a specific replica only: this option will
/// lower the success rate of Console's operations in production.
#[arg(long = "replication-disabled", default_value = "false")]
pub replication_disabled: bool,

/// Username for authentication
#[arg(long, value_name = "username")]
Expand All @@ -45,8 +63,8 @@ pub struct Args {

/// Disable error reporting. Error reporting helps TypeDB improve by reporting
/// errors and crashes to the development team.
#[arg(long = "diagnostics-disable", default_value = "false")]
pub diagnostics_disable: bool,
#[arg(long = "diagnostics-disabled", default_value = "false")]
pub diagnostics_disabled: bool,

/// Print the Console binary version
#[arg(long = "version")]
Expand Down
73 changes: 67 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use std::{
rc::Rc,
sync::Arc,
};

use std::collections::HashMap;
use clap::Parser;
use home::home_dir;
use rustyline::error::ReadlineError;
use sentry::ClientOptions;
use typedb_driver::{Credentials, DriverOptions, Transaction, TransactionType, TypeDBDriver};
use typedb_driver::{Addresses, Credentials, DriverOptions, Transaction, TransactionType, TypeDBDriver};

use crate::{
cli::Args,
Expand All @@ -40,6 +40,7 @@ use crate::{
},
runtime::BackgroundRuntime,
};
use crate::operations::{replica_deregister, replica_list, replica_primary, replica_register, server_version};

mod cli;
mod completions;
Expand Down Expand Up @@ -99,10 +100,11 @@ fn main() {
if args.password.is_none() {
args.password = Some(LineReaderHidden::new().readline(&format!("password for '{}': ", args.username)));
}
if !args.diagnostics_disable {
if !args.diagnostics_disabled {
init_diagnostics()
}
if !args.tls_disabled && !args.address.starts_with("https:") {
let address_info = parse_addresses(&args);
if !args.tls_disabled && !address_info.only_https {
println!(
"\
TLS connections can only be enabled when connecting to HTTPS endpoints, for example using 'https://<ip>:port'. \
Expand All @@ -113,10 +115,11 @@ fn main() {
}
let runtime = BackgroundRuntime::new();
let tls_root_ca_path = args.tls_root_ca.as_ref().map(|value| Path::new(value));
let driver_options = DriverOptions::new().use_replication(!args.replication_disabled).is_tls_enabled(!args.tls_disabled).tls_root_ca(tls_root_ca_path).unwrap();
let driver = match runtime.run(TypeDBDriver::new(
args.address,
address_info.addresses,
Credentials::new(&args.username, args.password.as_ref().unwrap()),
DriverOptions::new(!args.tls_disabled, tls_root_ca_path).unwrap(),
driver_options,
)) {
Ok(driver) => Arc::new(driver),
Err(err) => {
Expand Down Expand Up @@ -275,6 +278,9 @@ fn execute_commands(
}

fn entry_repl(driver: Arc<TypeDBDriver>, runtime: BackgroundRuntime) -> Repl<ConsoleContext> {
let server_commands = Subcommand::new("server")
.add(CommandLeaf::new("version", "Retrieve server version.", server_version));

let database_commands = Subcommand::new("database")
.add(CommandLeaf::new("list", "List databases on the server.", database_list))
.add(CommandLeaf::new_with_input(
Expand Down Expand Up @@ -348,6 +354,25 @@ fn entry_repl(driver: Arc<TypeDBDriver>, runtime: BackgroundRuntime) -> Repl<Con
user_update_password,
));

let replica_commands = Subcommand::new("replica")
.add(CommandLeaf::new("list", "List replicas.", replica_list))
.add(CommandLeaf::new("primary", "Get current primary replica.", replica_primary))
.add(CommandLeaf::new_with_inputs(
"register",
"Register new replica.",
vec![
CommandInput::new("replica id", get_word, None, None),
CommandInput::new("address", get_word, None, None),
],
replica_register,
))
.add(CommandLeaf::new_with_input(
"deregister",
"Deregister existing replica.",
CommandInput::new("replica id", get_word, None, None),
replica_deregister,
));

let transaction_commands = Subcommand::new("transaction")
.add(CommandLeaf::new_with_input(
"read",
Expand All @@ -371,8 +396,10 @@ fn entry_repl(driver: Arc<TypeDBDriver>, runtime: BackgroundRuntime) -> Repl<Con
let history_path = home_dir().unwrap_or_else(|| temp_dir()).join(ENTRY_REPL_HISTORY);

let repl = Repl::new(PROMPT.to_owned(), history_path, false, None)
.add(server_commands)
.add(database_commands)
.add(user_commands)
.add(replica_commands)
.add(transaction_commands);

repl
Expand Down Expand Up @@ -425,6 +452,40 @@ fn transaction_type_str(transaction_type: TransactionType) -> &'static str {
}
}

struct AddressInfo {
only_https: bool,
addresses: Addresses,
}

fn parse_addresses(args: &Args) -> AddressInfo {
if let Some(address) = &args.address {
AddressInfo {only_https: is_https_address(address), addresses: Addresses::try_from_address_str(address).unwrap() }
} else if let Some(addresses) = &args.addresses {
let split = addresses.split(',').map(str::to_string).collect::<Vec<_>>();
println!("Split: {split:?}");
let only_https = split.iter().all(|address| is_https_address(address));
AddressInfo {only_https, addresses: Addresses::try_from_addresses_str(split).unwrap() }
} else if let Some(translation) = &args.address_translation {
let mut map = HashMap::new();
let mut only_https = true;
for pair in translation.split(',') {
let (public_address, private_address) = pair
.split_once('=')
.unwrap_or_else(|| panic!("Invalid address pair: {pair}. Must be of form public=private"));
only_https = only_https && is_https_address(public_address);
map.insert(public_address.to_string(), private_address.to_string());
}
println!("Translation map:: {map:?}");
AddressInfo {only_https, addresses: Addresses::try_from_translation_str(map).unwrap() }
} else {
panic!("At least one of --address, --addresses, or --address-translation must be provided.");
}
}

fn is_https_address(address: &str) -> bool {
address.starts_with("https:")
}

fn init_diagnostics() {
let _ = sentry::init((
DIAGNOSTICS_REPORTING_URI,
Expand Down
63 changes: 61 additions & 2 deletions src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ use crate::{
transaction_repl, ConsoleContext,
};

pub(crate) fn server_version(context: &mut ConsoleContext, _input: &[String]) -> CommandResult {
let driver = context.driver.clone();
let server_version = context
.background_runtime
.run(async move { driver.server_version().await })
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
println!("{}", server_version);
Ok(())
}

pub(crate) fn database_list(context: &mut ConsoleContext, _input: &[String]) -> CommandResult {
let driver = context.driver.clone();
let databases = context
Expand Down Expand Up @@ -117,7 +127,7 @@ pub(crate) fn user_list(context: &mut ConsoleContext, _input: &[String]) -> Comm
println!("No users are present.");
} else {
for user in users {
println!("{}", user.name);
println!("{}", user.name());
}
}
Ok(())
Expand Down Expand Up @@ -175,7 +185,7 @@ pub(crate) fn user_update_password(context: &mut ConsoleContext, input: &[String
.expect("Could not fetch currently logged in user.");

user.update_password(new_password).await.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
Ok(current_user.name == username)
Ok(current_user.name() == username)
})?;
if updated_current_user {
println!("Successfully updated current user's password, exiting console. Please log in with the updated credentials.");
Expand All @@ -186,6 +196,55 @@ pub(crate) fn user_update_password(context: &mut ConsoleContext, input: &[String
Ok(())
}

pub(crate) fn replica_list(context: &mut ConsoleContext, _input: &[String]) -> CommandResult {
let driver = context.driver.clone();
let replicas = driver.replicas();
if replicas.is_empty() {
println!("No replicas are present.");
} else {
for replica in replicas {
println!("{}", replica.address());
}
}
Ok(())
}

pub(crate) fn replica_primary(context: &mut ConsoleContext, _input: &[String]) -> CommandResult {
let driver = context.driver.clone();
let primary_replica = driver.primary_replica();
if let Some(primary_replica) = primary_replica {
println!("{}", primary_replica.address());
} else {
println!("No primary replica is present.");
}
Ok(())
}

pub(crate) fn replica_register(context: &mut ConsoleContext, input: &[String]) -> CommandResult {
let driver = context.driver.clone();
let replica_id: u64 = input[0].parse().map_err(|_| Box::new(ReplError { message: format!("Replica id '{}' must be a number.", input[0]) })
as Box<dyn Error + Send>)?;
let address = input[1].clone();
context
.background_runtime
.run(async move { driver.register_replica(replica_id, address).await })
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
println!("Successfully registered replica.");
Ok(())
}

pub(crate) fn replica_deregister(context: &mut ConsoleContext, input: &[String]) -> CommandResult {
let driver = context.driver.clone();
let replica_id: u64 = input[0].parse().map_err(|_| Box::new(ReplError { message: format!("Replica id '{}' must be a number.", input[0]) })
as Box<dyn Error + Send>)?;
context
.background_runtime
.run(async move { driver.deregister_replica(replica_id).await })
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
println!("Successfully deregistered replica.");
Ok(())
}

pub(crate) fn transaction_read(context: &mut ConsoleContext, input: &[String]) -> CommandResult {
let driver = context.driver.clone();
let db_name = &input[0];
Expand Down