Skip to content

Commit b06eaf8

Browse files
committed
deployment_id in spans, LogRecorder for Batcher
1 parent 88499f0 commit b06eaf8

File tree

9 files changed

+37
-39
lines changed

9 files changed

+37
-39
lines changed

common/src/log.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ pub struct DeploymentLogLayer<R>
148148
where
149149
R: LogRecorder + Send + Sync,
150150
{
151-
pub recorder: R,
151+
pub log_recorder: R,
152152
pub internal_service: Backend,
153153
}
154154

@@ -170,7 +170,7 @@ where
170170
let extensions = span.extensions();
171171

172172
if let Some(details) = extensions.get::<ScopeDetails>() {
173-
self.recorder.record(LogItem::new(
173+
self.log_recorder.record(LogItem::new(
174174
details.deployment_id,
175175
self.internal_service.clone(),
176176
format_event(event),
@@ -210,7 +210,7 @@ where
210210
metadata.name().blue(),
211211
);
212212

213-
self.recorder.record(LogItem::new(
213+
self.log_recorder.record(LogItem::new(
214214
details.deployment_id,
215215
self.internal_service.clone(),
216216
message,

deployer/src/deployment/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ impl DeploymentManager {
238238
self.queue_send.send(queued).await.unwrap();
239239
}
240240

241-
#[instrument(skip(self), fields(id = %built.id, state = %State::Built))]
241+
#[instrument(skip(self), fields(deployment_id = %built.id, state = %State::Built))]
242242
pub async fn run_push(&self, built: Built) {
243243
self.run_send.send(built).await.unwrap();
244244
}

deployer/src/deployment/queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ pub async fn task(
112112
}
113113
}
114114

115-
#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
115+
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))]
116116
fn build_failed(_id: &Uuid, error: impl std::error::Error + 'static) {
117117
error!(
118118
error = &error as &dyn std::error::Error,
@@ -148,7 +148,7 @@ async fn remove_from_queue(queue_client: impl BuildQueueClient, id: Uuid) {
148148
}
149149
}
150150

151-
#[instrument(skip(run_send), fields(id = %built.id, state = %State::Built))]
151+
#[instrument(skip(run_send), fields(deployment_id = %built.id, state = %State::Built))]
152152
async fn promote_to_run(mut built: Built, run_send: RunSender) {
153153
let cx = Span::current().context();
154154

@@ -173,7 +173,7 @@ pub struct Queued {
173173
}
174174

175175
impl Queued {
176-
#[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))]
176+
#[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(deployment_id = %self.id, state = %State::Building))]
177177
async fn handle(
178178
self,
179179
storage_manager: ArtifactsStorageManager,

deployer/src/deployment/run.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,25 +167,25 @@ async fn kill_old_deployments(
167167
Ok(())
168168
}
169169

170-
#[instrument(skip(_id), fields(id = %_id, state = %State::Completed))]
170+
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Completed))]
171171
fn completed_cleanup(_id: &Uuid) {
172172
info!("service finished all on its own");
173173
}
174174

175-
#[instrument(skip(_id), fields(id = %_id, state = %State::Stopped))]
175+
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Stopped))]
176176
fn stopped_cleanup(_id: &Uuid) {
177177
info!("service was stopped by the user");
178178
}
179179

180-
#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
180+
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))]
181181
fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) {
182182
error!(
183183
error = &error as &dyn std::error::Error,
184184
"service encountered an error"
185185
);
186186
}
187187

188-
#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
188+
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))]
189189
fn start_crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) {
190190
error!(
191191
error = &error as &dyn std::error::Error,
@@ -215,7 +215,7 @@ pub struct Built {
215215
}
216216

217217
impl Built {
218-
#[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
218+
#[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(deployment_id = %self.id, state = %State::Loading))]
219219
#[allow(clippy::too_many_arguments)]
220220
async fn handle(
221221
self,

deployer/src/deployment/state_change_layer.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//! This is very similar to Aspect Oriented Programming where we use the annotations from the function to trigger the recording of a new state.
88
//! This annotation is a [#[instrument]](https://docs.rs/tracing-attributes/latest/tracing_attributes/attr.instrument.html) with an `id` and `state` field as follow:
99
//! ```no-test
10-
//! #[instrument(fields(id = %built.id, state = %State::Built))]
10+
//! #[instrument(fields(deployment_id = %built.id, state = %State::Built))]
1111
//! pub async fn new_state_fn(built: Built) {
1212
//! // Get built ready for starting
1313
//! }
@@ -20,6 +20,7 @@
2020
use std::str::FromStr;
2121

2222
use chrono::Utc;
23+
use shuttle_proto::logger::{Batcher, VecReceiver};
2324
use tracing::{field::Visit, span, warn, Metadata, Subscriber};
2425
use tracing_subscriber::Layer;
2526
use uuid::Uuid;
@@ -63,19 +64,19 @@ where
6364
let mut visitor = NewStateVisitor::default();
6465
attrs.record(&mut visitor);
6566

66-
if visitor.id.is_nil() {
67+
if visitor.deployment_id.is_nil() {
6768
warn!("scope details does not have a valid id");
6869
return;
6970
}
7071

7172
// To deployer persistence
7273
self.state_recorder.record_state(DeploymentState {
73-
id: visitor.id,
74+
id: visitor.deployment_id,
7475
state: visitor.state,
7576
});
7677
// To logger
7778
self.log_recorder.record(LogItem::new(
78-
visitor.id,
79+
visitor.deployment_id,
7980
Backend::Deployer,
8081
format!(
8182
"{} {}",
@@ -86,16 +87,16 @@ where
8687
}
8788
}
8889

89-
/// To extract `id` and `state` fields for scopes that have them
90+
/// To extract `deployment_id` and `state` fields for scopes that have them
9091
#[derive(Default)]
9192
struct NewStateVisitor {
92-
id: Uuid,
93+
deployment_id: Uuid,
9394
state: State,
9495
}
9596

9697
impl NewStateVisitor {
9798
/// Field containing the deployment identifier
98-
const ID_IDENT: &'static str = "id";
99+
const ID_IDENT: &'static str = "deployment_id";
99100

100101
/// Field containing the deployment state identifier
101102
const STATE_IDENT: &'static str = "state";
@@ -112,7 +113,7 @@ impl Visit for NewStateVisitor {
112113
if field.name() == Self::STATE_IDENT {
113114
self.state = State::from_str(&format!("{value:?}")).unwrap_or_default();
114115
} else if field.name() == Self::ID_IDENT {
115-
self.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default();
116+
self.deployment_id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default();
116117
}
117118
}
118119
}

deployer/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use hyper::{
55
server::conn::AddrStream,
66
service::{make_service_fn, service_fn},
77
};
8+
use shuttle_common::log::LogRecorder;
89
use shuttle_proto::logger::logger_client::LoggerClient;
910
use tokio::sync::Mutex;
1011
use tracing::{error, info};
@@ -28,6 +29,7 @@ pub use crate::runtime_manager::RuntimeManager;
2829
pub async fn start(
2930
persistence: Persistence,
3031
runtime_manager: Arc<Mutex<RuntimeManager>>,
32+
log_recorder: impl LogRecorder,
3133
log_fetcher: LoggerClient<
3234
shuttle_common::claims::ClaimService<
3335
shuttle_common::claims::InjectPropagation<tonic::transport::Channel>,
@@ -37,7 +39,7 @@ pub async fn start(
3739
) {
3840
// when _set is dropped once axum exits, the deployment tasks will be aborted.
3941
let deployment_manager = DeploymentManager::builder()
40-
.build_log_recorder(log_fetcher.clone())
42+
.build_log_recorder(log_recorder)
4143
.secret_recorder(persistence.clone())
4244
.active_deployment_getter(persistence.clone())
4345
.artifacts_path(args.artifacts_path)

deployer/src/main.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use shuttle_common::{
77
log::{Backend, DeploymentLogLayer},
88
};
99
use shuttle_deployer::{start, start_proxy, Args, Persistence, RuntimeManager, StateChangeLayer};
10-
use shuttle_proto::logger::logger_client::LoggerClient;
10+
use shuttle_proto::logger::{logger_client::LoggerClient, Batcher};
1111
use tokio::select;
1212
use tower::ServiceBuilder;
1313
use tracing::{error, trace};
@@ -40,16 +40,17 @@ async fn main() {
4040
.expect("failed to connect to logger"),
4141
);
4242
let logger_client = LoggerClient::new(channel);
43+
let logger_batcher = Batcher::wrap(logger_client.clone());
4344

4445
setup_tracing(
4546
tracing_subscriber::registry()
4647
.with(StateChangeLayer {
47-
log_recorder: logger_client.clone(),
48+
log_recorder: logger_batcher.clone(),
4849
state_recorder: persistence.clone(),
4950
})
5051
// TODO: Make all relevant backends set this up in this way
5152
.with(DeploymentLogLayer {
52-
recorder: logger_client.clone(),
53+
log_recorder: logger_batcher.clone(),
5354
internal_service: Backend::Deployer,
5455
}),
5556
Backend::Deployer,
@@ -59,15 +60,15 @@ async fn main() {
5960
args.artifacts_path.clone(),
6061
args.provisioner_address.uri().to_string(),
6162
args.logger_uri.uri().to_string(),
62-
logger_client.clone(),
63+
logger_batcher.clone(),
6364
Some(args.auth_uri.to_string()),
6465
);
6566

6667
select! {
6768
_ = start_proxy(args.proxy_address, args.proxy_fqdn.clone(), persistence.clone()) => {
6869
error!("Proxy stopped.")
6970
},
70-
_ = start(persistence, runtime_manager, logger_client, args) => {
71+
_ = start(persistence, runtime_manager, logger_batcher, logger_client, args) => {
7172
error!("Deployment service stopped.")
7273
},
7374
}

proto/src/lib.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,6 @@ pub mod logger {
236236
use std::time::Duration;
237237

238238
use chrono::{DateTime, NaiveDateTime, Utc};
239-
use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder};
240-
241239
use prost::bytes::Bytes;
242240
use tokio::{select, sync::mpsc, time::interval};
243241
use tonic::{
@@ -247,6 +245,8 @@ pub mod logger {
247245
};
248246
use tracing::error;
249247

248+
use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder};
249+
250250
use self::logger_client::LoggerClient;
251251

252252
include!("generated/logger.rs");
@@ -276,18 +276,12 @@ pub mod logger {
276276
}
277277
}
278278

279-
impl LogRecorder
280-
for logger_client::LoggerClient<
281-
shuttle_common::claims::ClaimService<
282-
shuttle_common::claims::InjectPropagation<tonic::transport::Channel>,
283-
>,
284-
>
279+
impl<I> LogRecorder for Batcher<I>
280+
where
281+
I: VecReceiver<Item = LogItemCommon> + Clone + 'static,
285282
{
286283
fn record(&self, log: LogItemCommon) {
287-
// TODO: Make async + error handling?
288-
// self.send_logs(request)
289-
// .await
290-
// .expect("Failed to sens log line");
284+
self.send(log);
291285
}
292286
}
293287

0 commit comments

Comments
 (0)