Skip to content

Commit 92a6bee

Browse files
committed
fix: add contexts in logging and reduce parallelism in default config creation
1 parent 6c03f7d commit 92a6bee

File tree

6 files changed

+192
-58
lines changed

6 files changed

+192
-58
lines changed

airborne_server/src/main.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ use superposition_sdk::config::Config as SrsConfig;
4141
use tracing_actix_web::TracingLogger;
4242
use utils::{db, kms::decrypt_kms, transaction_manager::start_cleanup_job};
4343

44-
use crate::{dashboard::configuration, middleware::request::request_id_mw};
45-
44+
use crate::{
45+
dashboard::configuration,
46+
middleware::request::{req_id_header_mw, WithRequestId},
47+
};
4648
const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
4749

4850
#[actix_web::main]
@@ -203,8 +205,8 @@ async fn main() -> std::io::Result<()> {
203205

204206
HttpServer::new(move || {
205207
App::new()
206-
.wrap(actix_web::middleware::from_fn(request_id_mw))
207-
.wrap(TracingLogger::default())
208+
.wrap(TracingLogger::<WithRequestId>::new())
209+
.wrap(actix_web::middleware::from_fn(req_id_header_mw))
208210
.app_data(web::Data::from(app_state.clone()))
209211
.wrap(actix_web::middleware::Compress::default())
210212
.wrap(actix_web::middleware::Logger::default())
Lines changed: 77 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,85 @@
1-
use actix_web::middleware::Next;
21
use actix_web::{
3-
body::BoxBody,
42
dev::{ServiceRequest, ServiceResponse},
5-
Error,
3+
http::{header::HeaderName, header::HeaderValue},
4+
middleware::Next,
5+
Error, HttpMessage,
66
};
7-
use tracing::{info_span, Instrument};
8-
9-
pub async fn request_id_mw(
10-
mut req: ServiceRequest,
11-
next: Next<BoxBody>,
12-
) -> Result<ServiceResponse<BoxBody>, Error> {
13-
let rid = req
14-
.headers()
15-
.get("x-request-id")
16-
.and_then(|v| v.to_str().ok())
17-
.map(|s| s.to_string())
18-
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
19-
20-
req.headers_mut().insert(
21-
actix_web::http::header::HeaderName::from_static("x-request-id"),
22-
rid.parse().unwrap(),
23-
);
24-
25-
let span = info_span!("http_request", request_id = %rid);
26-
27-
// instrument the future so all logs during it are attached to the span
28-
let res = async move {
29-
let mut res = next.call(req).await?;
30-
// add request_id to response headers
7+
use tracing::Span;
8+
use tracing_actix_web::RootSpanBuilder;
9+
use uuid::Uuid;
10+
11+
#[derive(Clone)]
12+
struct RequestId(pub String);
13+
14+
pub struct WithRequestId;
15+
16+
impl RootSpanBuilder for WithRequestId {
17+
fn on_request_start(req: &ServiceRequest) -> Span {
18+
let req_id = req
19+
.headers()
20+
.get("x-request-id")
21+
.and_then(|h| h.to_str().ok())
22+
.map(str::to_owned)
23+
.unwrap_or_else(|| Uuid::new_v4().to_string());
24+
25+
req.extensions_mut().insert(RequestId(req_id.clone()));
26+
27+
let dimensions = req
28+
.headers()
29+
.get("x-dimension")
30+
.and_then(|h| h.to_str().ok())
31+
.map(str::to_owned)
32+
.unwrap_or_default();
33+
34+
let org = req
35+
.headers()
36+
.get("x-organisation")
37+
.and_then(|h| h.to_str().ok())
38+
.map(str::to_owned)
39+
.or_else(|| req.match_info().get("org").map(str::to_owned))
40+
.or_else(|| req.match_info().get("organisation").map(str::to_owned))
41+
.unwrap_or_default();
42+
43+
let app = req
44+
.headers()
45+
.get("x-application")
46+
.and_then(|h| h.to_str().ok())
47+
.map(str::to_owned)
48+
.or_else(|| req.match_info().get("app").map(str::to_owned))
49+
.or_else(|| req.match_info().get("application").map(str::to_owned))
50+
.unwrap_or_default();
51+
52+
tracing::info_span!(
53+
"HTTP request",
54+
request_id = %req_id,
55+
dimensions = %dimensions,
56+
method = %req.method(),
57+
org_id = %org,
58+
app_id = %app,
59+
superposition_workspace = tracing::field::Empty,
60+
route = %req.match_pattern().unwrap_or("<unmatched>".to_string()),
61+
)
62+
}
63+
64+
fn on_request_end<B: actix_web::body::MessageBody>(
65+
_: Span,
66+
_: &Result<ServiceResponse<B>, Error>,
67+
) {
68+
}
69+
}
70+
71+
pub async fn req_id_header_mw<B>(
72+
req: ServiceRequest,
73+
next: Next<B>,
74+
) -> Result<ServiceResponse<B>, Error> {
75+
let mut res = next.call(req).await?;
76+
77+
let req_id = res.request().extensions().get::<RequestId>().cloned();
78+
if let Some(RequestId(id)) = req_id {
3179
res.headers_mut().insert(
32-
actix_web::http::header::HeaderName::from_static("x-request-id"),
33-
rid.parse().unwrap(),
80+
HeaderName::from_static("x-request-id"),
81+
HeaderValue::from_str(&id).unwrap_or(HeaderValue::from_static("invalid-req-id")),
3482
);
35-
Ok::<_, actix_web::Error>(res)
3683
}
37-
.instrument(span)
38-
.await?;
3984
Ok(res)
4085
}

airborne_server/src/organisation/application/properties.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,20 @@ async fn put_properties_schema_api(
299299
Err(e) => match e {
300300
transaction::TxnError::Operation { source, .. } => return Err(source.into()),
301301
transaction::TxnError::Join { source, .. } => {
302-
return Err(
303-
ABError::InternalServerError(format!("Task join error: {}", source)).into(),
304-
)
302+
log::error!("Task Join Error: {:?}", source);
303+
return Err(ABError::InternalServerError("service error".to_string()).into());
304+
}
305+
transaction::TxnError::Panic { index } => {
306+
log::error!(
307+
"Task Panic at: {:?}, task_metadata: {:?}",
308+
index,
309+
task_metadata.get(index)
310+
);
311+
return Err(ABError::InternalServerError("service error".to_string()).into());
312+
}
313+
transaction::TxnError::MissingResult { index } => {
314+
log::error!("Missing result for task: {:?}", index);
315+
return Err(ABError::InternalServerError("service error".to_string()).into());
305316
}
306317
},
307318
}

airborne_server/src/organisation/application/properties/transaction.rs

Lines changed: 83 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use futures::Future;
15+
use futures::{Future, FutureExt};
1616
use std::{fmt::Debug, pin::Pin};
1717
use thiserror::Error;
1818
use tokio::task::{JoinError, JoinSet};
@@ -45,9 +45,13 @@ pub enum TxnError<E: std::error::Error + Send + Sync + 'static> {
4545
#[source]
4646
source: JoinError,
4747
},
48+
#[error("task {index} panicked during execution")]
49+
Panic { index: usize },
50+
#[error("internal error: missing result for operation {index}")]
51+
MissingResult { index: usize },
4852
}
4953

50-
/// Run all operations concurrently and wait for all to complete.
54+
/// Run N-1 operations concurrently, then run the last operation after they complete.
5155
/// If any operation fails:
5256
/// 1) wait for all operations to finish
5357
/// 2) call `rollback` with all indices that completed successfully
@@ -65,9 +69,38 @@ where
6569
RFut: Future<Output = ()> + Send + 'static,
6670
{
6771
let n = operations.len();
68-
let mut set = JoinSet::new();
6972

70-
// Track success indices and store results by original index.
73+
if n == 0 {
74+
return Ok(Vec::new());
75+
}
76+
if n == 1 {
77+
if let Some(op) = operations.into_iter().next() {
78+
match op().await {
79+
Ok(result) => return Ok(vec![result]),
80+
Err(e) => {
81+
rollback(vec![]).await;
82+
return Err(TxnError::Operation {
83+
index: 0,
84+
source: e,
85+
});
86+
}
87+
}
88+
} else {
89+
return Ok(Vec::new());
90+
}
91+
}
92+
93+
// Split operations: first N-1 and the last one
94+
let mut operations = operations;
95+
let last_operation = match operations.pop() {
96+
Some(op) => op,
97+
None => {
98+
return Ok(Vec::new());
99+
}
100+
};
101+
let last_index = n - 1;
102+
103+
let mut set = JoinSet::new();
71104
let mut successes: Vec<usize> = Vec::with_capacity(n);
72105
let mut results: Vec<Option<R>> = Vec::with_capacity(n);
73106
let mut first_error: Option<TxnError<E>> = None;
@@ -76,48 +109,80 @@ where
76109
results.push(None);
77110
}
78111

79-
// Spawn each operation as its own task, tagged with its index.
112+
// Phase 1: Run first N-1 operations in parallel
80113
for (i, op) in operations.into_iter().enumerate() {
81-
set.spawn(async move { (i, op().await) });
114+
set.spawn(async move {
115+
let result = std::panic::AssertUnwindSafe(op()).catch_unwind().await;
116+
117+
match result {
118+
Ok(operation_result) => (i, Ok(operation_result)),
119+
Err(_panic_payload) => (i, Err(())),
120+
}
121+
});
82122
}
83123

84-
// Process completions as they finish; collect all results before deciding.
124+
// Wait for all N-1 operations to complete
85125
while let Some(joined) = set.join_next().await {
86126
match joined {
87-
Ok((i, Ok(val))) => {
127+
Ok((i, Ok(Ok(val)))) => {
88128
results[i] = Some(val);
89129
successes.push(i);
90130
}
91-
Ok((i, Err(e))) => {
92-
// Store the first error but continue waiting for other operations
131+
Ok((i, Ok(Err(e)))) => {
93132
if first_error.is_none() {
94133
first_error = Some(TxnError::Operation {
95134
index: i,
96135
source: e,
97136
});
98137
}
99138
}
139+
Ok((i, Err(()))) => {
140+
if first_error.is_none() {
141+
first_error = Some(TxnError::Panic { index: i });
142+
}
143+
}
100144
Err(join_err) => {
101-
// Store the first join error but continue waiting for other operations
102145
if first_error.is_none() {
103146
first_error = Some(TxnError::Join {
104-
index: usize::MAX, // unknown which one if task panicked before tagging
147+
index: usize::MAX,
105148
source: join_err,
106149
});
107150
}
108151
}
109152
}
110153
}
111154

112-
// If any operation failed, rollback all successful operations
113155
if let Some(err) = first_error {
114156
rollback(successes).await;
115157
return Err(err);
116158
}
117159

118-
// All succeeded; unwrap in original order.
119-
Ok(results
120-
.into_iter()
121-
.map(|o| o.expect("logic error: missing result"))
122-
.collect())
160+
// Phase 2: All N-1 operations succeeded, now run the last operation
161+
match last_operation().await {
162+
Ok(last_result) => {
163+
results[last_index] = Some(last_result);
164+
successes.push(last_index);
165+
166+
// All succeeded
167+
let mut final_results = Vec::with_capacity(n);
168+
for (i, result_opt) in results.into_iter().enumerate() {
169+
match result_opt {
170+
Some(result) => final_results.push(result),
171+
None => {
172+
rollback(successes).await;
173+
return Err(TxnError::MissingResult { index: i });
174+
}
175+
}
176+
}
177+
Ok(final_results)
178+
}
179+
Err(e) => {
180+
// Last operation failed, rollback all successful operations
181+
rollback(successes).await;
182+
Err(TxnError::Operation {
183+
index: last_index,
184+
source: e,
185+
})
186+
}
187+
}
123188
}

airborne_server/src/release.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,6 +1081,10 @@ async fn serve_release(
10811081
let (organisation, application) = path.into_inner();
10821082
let superposition_org_id_from_env = state.env.superposition_org_id.clone();
10831083

1084+
let span = tracing::Span::current();
1085+
span.record("org_id", tracing::field::display(&organisation));
1086+
span.record("app_id", tracing::field::display(&application));
1087+
10841088
info!(
10851089
"Serving release for organisation: {}, application: {}",
10861090
organisation, application

airborne_server/src/utils/workspace.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ pub async fn get_workspace_name_for_application(
3737
});
3838

3939
match workspace_result {
40-
Ok(name) => Ok(name.workspace_name),
40+
Ok(name) => {
41+
let span = tracing::Span::current();
42+
span.record(
43+
"superposition_workspace",
44+
tracing::field::display(&name.workspace_name),
45+
);
46+
Ok(name.workspace_name)
47+
}
4148
Err(ABError::NotFound(_)) => Err(error::ErrorNotFound("workspace not found")),
4249
Err(e) => Err(error::ErrorInternalServerError(e.to_string())),
4350
}

0 commit comments

Comments
 (0)