Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions airborne_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ use superposition_sdk::config::Config as SrsConfig;
use tracing_actix_web::TracingLogger;
use utils::{db, kms::decrypt_kms, transaction_manager::start_cleanup_job};

use crate::{dashboard::configuration, middleware::request::request_id_mw};

use crate::{
dashboard::configuration,
middleware::request::{req_id_header_mw, WithRequestId},
};
const MIGRATIONS: EmbeddedMigrations = embed_migrations!();

#[actix_web::main]
Expand Down Expand Up @@ -203,8 +205,8 @@ async fn main() -> std::io::Result<()> {

HttpServer::new(move || {
App::new()
.wrap(actix_web::middleware::from_fn(request_id_mw))
.wrap(TracingLogger::default())
.wrap(TracingLogger::<WithRequestId>::new())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR have other changes, Or is this needed to sequentialise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has changes for request_id other than sequentialise

.wrap(actix_web::middleware::from_fn(req_id_header_mw))
.app_data(web::Data::from(app_state.clone()))
.wrap(actix_web::middleware::Compress::default())
.wrap(actix_web::middleware::Logger::default())
Expand Down
109 changes: 77 additions & 32 deletions airborne_server/src/middleware/request.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,85 @@
use actix_web::middleware::Next;
use actix_web::{
body::BoxBody,
dev::{ServiceRequest, ServiceResponse},
Error,
http::{header::HeaderName, header::HeaderValue},
middleware::Next,
Error, HttpMessage,
};
use tracing::{info_span, Instrument};

pub async fn request_id_mw(
mut req: ServiceRequest,
next: Next<BoxBody>,
) -> Result<ServiceResponse<BoxBody>, Error> {
let rid = req
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());

req.headers_mut().insert(
actix_web::http::header::HeaderName::from_static("x-request-id"),
rid.parse().unwrap(),
);

let span = info_span!("http_request", request_id = %rid);

// instrument the future so all logs during it are attached to the span
let res = async move {
let mut res = next.call(req).await?;
// add request_id to response headers
use tracing::Span;
use tracing_actix_web::RootSpanBuilder;
use uuid::Uuid;

#[derive(Clone)]
struct RequestId(pub String);

pub struct WithRequestId;

impl RootSpanBuilder for WithRequestId {
fn on_request_start(req: &ServiceRequest) -> Span {
let req_id = req
.headers()
.get("x-request-id")
.and_then(|h| h.to_str().ok())
.map(str::to_owned)
.unwrap_or_else(|| Uuid::new_v4().to_string());

req.extensions_mut().insert(RequestId(req_id.clone()));

let dimensions = req
.headers()
.get("x-dimension")
.and_then(|h| h.to_str().ok())
.map(str::to_owned)
.unwrap_or_default();

let org = req
.headers()
.get("x-organisation")
.and_then(|h| h.to_str().ok())
.map(str::to_owned)
.or_else(|| req.match_info().get("org").map(str::to_owned))
.or_else(|| req.match_info().get("organisation").map(str::to_owned))
.unwrap_or_default();

let app = req
.headers()
.get("x-application")
.and_then(|h| h.to_str().ok())
.map(str::to_owned)
.or_else(|| req.match_info().get("app").map(str::to_owned))
.or_else(|| req.match_info().get("application").map(str::to_owned))
.unwrap_or_default();

tracing::info_span!(
"HTTP request",
request_id = %req_id,
dimensions = %dimensions,
method = %req.method(),
org_id = %org,
app_id = %app,
superposition_workspace = tracing::field::Empty,
route = %req.match_pattern().unwrap_or("<unmatched>".to_string()),
)
}

fn on_request_end<B: actix_web::body::MessageBody>(
_: Span,
_: &Result<ServiceResponse<B>, Error>,
) {
}
}

pub async fn req_id_header_mw<B>(
req: ServiceRequest,
next: Next<B>,
) -> Result<ServiceResponse<B>, Error> {
let mut res = next.call(req).await?;

let req_id = res.request().extensions().get::<RequestId>().cloned();
if let Some(RequestId(id)) = req_id {
res.headers_mut().insert(
actix_web::http::header::HeaderName::from_static("x-request-id"),
rid.parse().unwrap(),
HeaderName::from_static("x-request-id"),
HeaderValue::from_str(&id).unwrap_or(HeaderValue::from_static("invalid-req-id")),
);
Ok::<_, actix_web::Error>(res)
}
.instrument(span)
.await?;
Ok(res)
}
17 changes: 14 additions & 3 deletions airborne_server/src/organisation/application/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,20 @@ async fn put_properties_schema_api(
Err(e) => match e {
transaction::TxnError::Operation { source, .. } => return Err(source.into()),
transaction::TxnError::Join { source, .. } => {
return Err(
ABError::InternalServerError(format!("Task join error: {}", source)).into(),
)
log::error!("Task Join Error: {:?}", source);
return Err(ABError::InternalServerError("service error".to_string()).into());
}
transaction::TxnError::Panic { index } => {
log::error!(
"Task Panic at: {:?}, task_metadata: {:?}",
index,
task_metadata.get(index)
);
return Err(ABError::InternalServerError("service error".to_string()).into());
}
transaction::TxnError::MissingResult { index } => {
log::error!("Missing result for task: {:?}", index);
return Err(ABError::InternalServerError("service error".to_string()).into());
}
},
}
Expand Down
101 changes: 83 additions & 18 deletions airborne_server/src/organisation/application/properties/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::Future;
use futures::{Future, FutureExt};
use std::{fmt::Debug, pin::Pin};
use thiserror::Error;
use tokio::task::{JoinError, JoinSet};
Expand Down Expand Up @@ -45,9 +45,13 @@ pub enum TxnError<E: std::error::Error + Send + Sync + 'static> {
#[source]
source: JoinError,
},
#[error("task {index} panicked during execution")]
Panic { index: usize },
#[error("internal error: missing result for operation {index}")]
MissingResult { index: usize },
}

/// Run all operations concurrently and wait for all to complete.
/// Run N-1 operations concurrently, then run the last operation after they complete.
/// If any operation fails:
/// 1) wait for all operations to finish
/// 2) call `rollback` with all indices that completed successfully
Expand All @@ -65,9 +69,38 @@ where
RFut: Future<Output = ()> + Send + 'static,
{
let n = operations.len();
let mut set = JoinSet::new();

// Track success indices and store results by original index.
if n == 0 {
return Ok(Vec::new());
}
if n == 1 {
if let Some(op) = operations.into_iter().next() {
match op().await {
Ok(result) => return Ok(vec![result]),
Err(e) => {
rollback(vec![]).await;
return Err(TxnError::Operation {
index: 0,
source: e,
});
}
}
} else {
return Ok(Vec::new());
}
}

// Split operations: first N-1 and the last one
let mut operations = operations;
let last_operation = match operations.pop() {
Some(op) => op,
None => {
return Ok(Vec::new());
}
};
let last_index = n - 1;

let mut set = JoinSet::new();
let mut successes: Vec<usize> = Vec::with_capacity(n);
let mut results: Vec<Option<R>> = Vec::with_capacity(n);
let mut first_error: Option<TxnError<E>> = None;
Expand All @@ -76,48 +109,80 @@ where
results.push(None);
}

// Spawn each operation as its own task, tagged with its index.
// Phase 1: Run first N-1 operations in parallel
for (i, op) in operations.into_iter().enumerate() {
set.spawn(async move { (i, op().await) });
set.spawn(async move {
let result = std::panic::AssertUnwindSafe(op()).catch_unwind().await;

match result {
Ok(operation_result) => (i, Ok(operation_result)),
Err(_panic_payload) => (i, Err(())),
}
});
}

// Process completions as they finish; collect all results before deciding.
// Wait for all N-1 operations to complete
while let Some(joined) = set.join_next().await {
match joined {
Ok((i, Ok(val))) => {
Ok((i, Ok(Ok(val)))) => {
results[i] = Some(val);
successes.push(i);
}
Ok((i, Err(e))) => {
// Store the first error but continue waiting for other operations
Ok((i, Ok(Err(e)))) => {
if first_error.is_none() {
first_error = Some(TxnError::Operation {
index: i,
source: e,
});
}
}
Ok((i, Err(()))) => {
if first_error.is_none() {
first_error = Some(TxnError::Panic { index: i });
}
}
Err(join_err) => {
// Store the first join error but continue waiting for other operations
if first_error.is_none() {
first_error = Some(TxnError::Join {
index: usize::MAX, // unknown which one if task panicked before tagging
index: usize::MAX,
source: join_err,
});
}
}
}
}

// If any operation failed, rollback all successful operations
if let Some(err) = first_error {
rollback(successes).await;
return Err(err);
}

// All succeeded; unwrap in original order.
Ok(results
.into_iter()
.map(|o| o.expect("logic error: missing result"))
.collect())
// Phase 2: All N-1 operations succeeded, now run the last operation
match last_operation().await {
Ok(last_result) => {
results[last_index] = Some(last_result);
successes.push(last_index);

// All succeeded
let mut final_results = Vec::with_capacity(n);
for (i, result_opt) in results.into_iter().enumerate() {
match result_opt {
Some(result) => final_results.push(result),
None => {
rollback(successes).await;
return Err(TxnError::MissingResult { index: i });
}
}
}
Ok(final_results)
}
Err(e) => {
// Last operation failed, rollback all successful operations
rollback(successes).await;
Err(TxnError::Operation {
index: last_index,
source: e,
})
}
}
}
4 changes: 4 additions & 0 deletions airborne_server/src/release.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,10 @@ async fn serve_release(
let (organisation, application) = path.into_inner();
let superposition_org_id_from_env = state.env.superposition_org_id.clone();

let span = tracing::Span::current();
span.record("org_id", tracing::field::display(&organisation));
span.record("app_id", tracing::field::display(&application));

info!(
"Serving release for organisation: {}, application: {}",
organisation, application
Expand Down
9 changes: 8 additions & 1 deletion airborne_server/src/utils/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ pub async fn get_workspace_name_for_application(
});

match workspace_result {
Ok(name) => Ok(name.workspace_name),
Ok(name) => {
let span = tracing::Span::current();
span.record(
"superposition_workspace",
tracing::field::display(&name.workspace_name),
);
Ok(name.workspace_name)
}
Err(ABError::NotFound(_)) => Err(error::ErrorNotFound("workspace not found")),
Err(e) => Err(error::ErrorInternalServerError(e.to_string())),
}
Expand Down
Loading