Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airborne_server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ GCP_SERVICE_ACCOUNT_PATH=/app/airborne-gcp.json

RUST_LOG=debug,info,error,actix_web=info,error
LOG_FORMAT=
RUN_DB_MIGRATIONS=true
SUPERPOSITION_MIGRATION_STRATEGY=PATCH
MIGRATIONS_TO_RUN_ON_BOOT=db,superposition
38 changes: 32 additions & 6 deletions airborne_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ use superposition_sdk::config::Config as SrsConfig;
use tracing_actix_web::TracingLogger;
use utils::{db, kms::decrypt_kms, transaction_manager::start_cleanup_job};

use crate::dashboard::configuration;
use crate::middleware::auth::Auth;
use crate::middleware::request::request_id_mw;
use crate::{
dashboard::configuration,
utils::migrations::{
get_default_configs_from_file, migrate_superposition, SuperpositionMigrationStrategy,
},
};

const MIGRATIONS: EmbeddedMigrations = embed_migrations!();

Expand Down Expand Up @@ -109,10 +114,15 @@ async fn main() -> std::io::Result<()> {
let backlog = std::env::var("BACKLOG")
.map_or(1024, |v| v.parse().expect("BACKLOG must be a valid number"));

let run_pending_migrations = std::env::var("RUN_DB_MIGRATIONS")
.ok()
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or_default();
let superposition_migration_strategy = SuperpositionMigrationStrategy::from(
std::env::var("SUPERPOSITION_MIGRATION_STRATEGY").unwrap_or_else(|_| "PATCH".into()),
);

let migrations_to_run_on_boot: Vec<String> = std::env::var("MIGRATIONS_TO_RUN_ON_BOOT")
.unwrap_or_else(|_| "".into())
.split(',')
.map(|s| s.trim().into())
.collect();

//Need to check if this ENV exists on pod
let uses_local_stack = std::env::var("AWS_ENDPOINT_URL");
Expand All @@ -126,7 +136,7 @@ async fn main() -> std::io::Result<()> {
let aws_kms_client = aws_sdk_kms::Client::new(&shared_config);
let aws_cloudfront_client = aws_sdk_cloudfront::Client::new(&shared_config);

if run_pending_migrations {
if migrations_to_run_on_boot.contains(&"db".to_string()) {
info!("Running pending database migrations");
let mut conn = db::establish_connection(&aws_kms_client).await;
conn.run_pending_migrations(MIGRATIONS)
Expand Down Expand Up @@ -174,6 +184,9 @@ async fn main() -> std::io::Result<()> {
organisation_creation_disabled,
google_spreadsheet_id: spreadsheet_id.clone(),
cloudfront_distribution_id: cf_distribution_id.clone(),
default_configs: get_default_configs_from_file()
.await
.expect("Failed to load superposition default configs from file"),
};

// This is required for localStack
Expand Down Expand Up @@ -233,6 +246,19 @@ async fn main() -> std::io::Result<()> {
info!("Started transaction cleanup background job");
info!("Using server prefix {}", server_path_prefix);

if migrations_to_run_on_boot.contains(&"superposition".to_string()) {
let superposition_migration =
migrate_superposition(&app_state_data, superposition_migration_strategy).await;
if superposition_migration.is_err() {
panic!(
"Superposition migration failed: {:?}",
superposition_migration.err()
);
} else {
println!("Superposition migration completed successfully");
}
}

HttpServer::new(move || {
App::new()
.wrap(actix_web::middleware::from_fn(request_id_mw))
Expand Down
214 changes: 22 additions & 192 deletions airborne_server/src/organisation/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use superposition_sdk::Client;

use crate::middleware::auth::{validate_user, AuthResponse, ADMIN};
use crate::types::{ABError, AppState};
use crate::utils::document::{schema_doc_to_hashmap, value_to_document};
use crate::utils::document::schema_doc_to_hashmap;
use crate::utils::migrations::{migrate_superposition_workspace, SuperpositionMigrationStrategy};
use diesel::RunQueryDsl;

use crate::utils::db::models::{NewWorkspaceName, WorkspaceName};
Expand Down Expand Up @@ -66,7 +67,7 @@ struct ApplicationCreateRequest {
application: String,
}

fn default_config<T: Clone>(
pub fn default_config<T: Clone>(
superposition_client: Client,
workspace_name: String,
superposition_org: String,
Expand Down Expand Up @@ -321,7 +322,7 @@ async fn add_application(
superposition_org_id_from_env
);
// Insert and get the inserted row (to get the id)
let inserted_workspace: WorkspaceName = diesel::insert_into(workspace_names::table)
let mut inserted_workspace: WorkspaceName = diesel::insert_into(workspace_names::table)
.values(&new_workspace_name)
.get_result(&mut conn)
.map_err(|e| {
Expand All @@ -330,6 +331,7 @@ async fn add_application(

let generated_id = inserted_workspace.id;
let generated_workspace_name = format!("workspace{}", generated_id);
inserted_workspace.workspace_name = generated_workspace_name.clone();

// Update the workspace_name to "workspace{id}"
diesel::update(workspace_names::table.filter(workspace_names::id.eq(generated_id)))
Expand Down Expand Up @@ -375,37 +377,15 @@ async fn add_application(
}
};

// Step 5: Create default configurations
let create_default_config_string = default_config::<String>(
state.superposition_client.clone(),
generated_workspace_name.clone(),
superposition_org_id_from_env.clone(), // Use ID from env
);
let create_default_config_int = default_config::<i32>(
state.superposition_client.clone(),
generated_workspace_name.clone(),
superposition_org_id_from_env.clone(), // Use ID from env
);
let create_default_config_doc = default_config::<Document>(
state.superposition_client.clone(),
generated_workspace_name.clone(),
superposition_org_id_from_env.clone(), // Use ID from env
);
let create_default_config_array = default_config::<Vec<Document>>(
state.superposition_client.clone(),
generated_workspace_name.clone(),
superposition_org_id_from_env.clone(), // Use ID from env
);

// Helper function to create default config with error handling
async fn create_config_with_tx<T, E>(
create_fn: impl futures::Future<Output = Result<T, E>>,
async fn create_config_with_tx<E>(
create_fn: impl futures::Future<Output = Result<(), E>>,
key: &str,
transaction: &TransactionManager,
admin: &KeycloakAdmin,
realm: &str,
state: &web::Data<AppState>,
) -> Result<T, actix_web::Error>
) -> Result<(), ABError>
where
E: std::fmt::Display,
{
Expand All @@ -423,183 +403,33 @@ async fn add_application(
info!("Rollback failed: {}", rollback_err);
}

Err(error::ErrorInternalServerError(format!(
Err(ABError::InternalServerError(format!(
"Failed to create configuration for {}: {}",
key, e
)))
}
}
}

// Create all configurations with transaction-aware error handling
create_config_with_tx(
create_default_config_string(
"config.version".to_string(),
"0.0.0".to_string(),
"Value indicating the version of the release config -> config".to_string(),
),
"config.version",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_int(
"config.release_config_timeout".to_string(),
1000,
"Value indicating the version of the release config".to_string(),
),
"config.release_config_timeout",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_int(
"config.boot_timeout".to_string(),
1000,
"Indicating the timeout for downloading the package block".to_string(),
),
"config.boot_timeout",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_doc(
"config.properties".to_string(),
value_to_document(&serde_json::json!({})),
"Value indicating the properties of the config".to_string(),
),
"config.properties",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

info!(
"Creating default configuration (string): key=package.name, value={}",
generated_workspace_name
);
create_config_with_tx(
create_default_config_string(
"package.name".to_string(),
generated_workspace_name.clone(),
"Value indicating the version of the release config".to_string(),
),
"package.name",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_int(
"package.version".to_string(),
0,
"Value indicating the version of the package".to_string(),
),
"package.version",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_doc(
"package.properties".to_string(),
value_to_document(&serde_json::json!({})),
"Value indicating the properties of the package".to_string(),
),
"package.properties",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_string(
"package.index".to_string(),
"".to_string(),
"Value indicating the index of the package".to_string(),
),
"package.index",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_array(
"package.important".to_string(),
vec![],
"Value indicating the important block of the package".to_string(),
),
"package.important",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_array(
"package.lazy".to_string(),
vec![],
"Value indicating the lazy block of the package".to_string(),
),
"package.lazy",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;

create_config_with_tx(
create_default_config_array(
"resources".to_string(),
vec![],
"Value indicating the resources block of the release config".to_string(),
),
"resources",
async {
migrate_superposition_workspace(
&inserted_workspace,
&state,
&SuperpositionMigrationStrategy::Patch,
)
.await
.map_err(|e| {
ABError::InternalServerError(format!("Workspace migration error: {}", e))
})
},
"migrate_superposition_workspace",
&transaction,
&admin,
&realm,
&state,
)
.await
.map_err(|e| ABError::InternalServerError(format!("{}", e)))?;
.await?;

// Mark transaction as complete since all operations have succeeded
transaction.set_database_inserted();
Expand Down
3 changes: 2 additions & 1 deletion airborne_server/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde::Serialize;
use superposition_sdk::Client;
use thiserror::Error;

use crate::utils::db;
use crate::utils::{db, migrations::SuperpositionDefaultConfig};

#[derive(Clone)]
pub struct AppState {
Expand Down Expand Up @@ -49,6 +49,7 @@ pub struct Environment {
pub organisation_creation_disabled: bool,
pub google_spreadsheet_id: String,
pub cloudfront_distribution_id: String,
pub default_configs: Vec<SuperpositionDefaultConfig>,
}
pub trait AppError: std::error::Error + Send + Sync + 'static {
fn code(&self) -> &'static str;
Expand Down
1 change: 1 addition & 0 deletions airborne_server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod document;
pub mod encryption;
pub mod keycloak;
pub mod kms;
pub mod migrations;
pub mod s3;
pub mod semver;
pub mod transaction_manager;
Expand Down
Loading
Loading