diff --git a/fhevm-engine/Cargo.lock b/fhevm-engine/Cargo.lock index 056ba87d..5ae2c321 100644 --- a/fhevm-engine/Cargo.lock +++ b/fhevm-engine/Cargo.lock @@ -1768,7 +1768,6 @@ dependencies = [ "fhevm-engine-common", "hex", "lazy_static", - "log", "lru", "opentelemetry", "opentelemetry-otlp", @@ -1780,7 +1779,6 @@ dependencies = [ "serde_json", "sha3", "sqlx", - "structured-logger", "strum", "testcontainers", "tfhe", @@ -1791,6 +1789,8 @@ dependencies = [ "tonic-health", "tonic-types", "tonic-web", + "tracing", + "tracing-subscriber", ] [[package]] @@ -2161,16 +2161,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "erased-serde" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d" -dependencies = [ - "serde", - "typeid", -] - [[package]] name = "errno" version = "0.3.9" @@ -3151,10 +3141,6 @@ name = "log" version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" -dependencies = [ - "serde", - "value-bag", -] [[package]] name = "lru" @@ -3297,6 +3283,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -3537,6 +3533,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parity-scale-codec" version = "3.6.12" @@ -4542,15 +4544,6 @@ dependencies = [ "syn 2.0.75", ] -[[package]] -name = "serde_fmt" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4" -dependencies = [ - "serde", -] - [[package]] name = "serde_json" version = "1.0.125" @@ -4658,6 +4651,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -4996,19 +4998,6 @@ dependencies = [ "syn 2.0.75", ] -[[package]] -name = "structured-logger" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16524b1ef57fd2e253216ab20ec44f0dc32b29155a4b3e6bef0a857d8c9f5f08" -dependencies = [ - "log", - "parking_lot", - "serde", - "serde_json", - "tokio", -] - [[package]] name = "strum" version = "0.26.3" @@ -5037,84 +5026,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "sval" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaf38d1fa2ce984086ea42fb856a9f374d94680a4f796831a7fc868d7f2af1b9" - -[[package]] -name = "sval_buffer" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81682ff859964ca1d7cf3d3d0f9ec7204ea04c2c32acb8cc2cf68ecbd3127354" -dependencies = [ - "sval", - "sval_ref", -] - -[[package]] -name = "sval_dynamic" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a213b93bb4c6f4c9f9b17f2e740e077fd18746bbf7c80c72bbadcac68fa7ee4" -dependencies = [ - "sval", -] - -[[package]] -name = "sval_fmt" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6902c6d3fb52c89206fe0dc93546c0123f7d48b5997fd14e61c9e64ff0b63275" -dependencies = [ - "itoa", - "ryu", - "sval", -] - -[[package]] -name = "sval_json" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11a28041ea78cdc394b930ae6b897d36246dc240a29a6edf82d76562487fb0b4" -dependencies = [ - "itoa", - "ryu", - "sval", -] - -[[package]] -name = "sval_nested" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "850346e4b0742a7f2fd2697d703ff80084d0b658f0f2e336d71b8a06abf9b68e" -dependencies = [ - "sval", - "sval_buffer", - "sval_ref", -] - -[[package]] -name = "sval_ref" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824afd97a8919f28a35b0fdea979845cc2ae461a8a3aaa129455cb89c88bb77a" -dependencies = [ - "sval", -] - -[[package]] -name = "sval_serde" -version = "2.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ada7520dd719ed672c786c7db7de4f5230f4d504b0821bd8305cd30ca442315" -dependencies = [ - "serde", - "sval", - "sval_nested", -] - [[package]] name = "syn" version = "1.0.109" @@ -5364,6 +5275,16 @@ dependencies = [ "syn 2.0.75", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -5715,19 +5636,52 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", ] [[package]] -name = "try-lock" -version = "0.2.5" +name = "tracing-log" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] [[package]] -name = "typeid" -version = "1.0.2" +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", + "tracing-serde", +] + +[[package]] +name = "try-lock" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e13db2e0ccd5e14a544e8a246ba2312cd25223f616442d7f2cb0e3db614236e" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" @@ -5855,42 +5809,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "value-bag" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" -dependencies = [ - "value-bag-serde1", - "value-bag-sval2", -] - -[[package]] -name = "value-bag-serde1" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccacf50c5cb077a9abb723c5bcb5e0754c1a433f1e1de89edc328e2760b6328b" -dependencies = [ - "erased-serde", - "serde", - "serde_fmt", -] - -[[package]] -name = "value-bag-sval2" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1785bae486022dfb9703915d42287dcb284c1ee37bd1080eeba78cc04721285b" -dependencies = [ - "sval", - "sval_buffer", - "sval_dynamic", - "sval_fmt", - "sval_json", - "sval_ref", - "sval_serde", -] - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/fhevm-engine/Cargo.toml b/fhevm-engine/Cargo.toml index b1d40a85..e474d5da 100644 --- a/fhevm-engine/Cargo.toml +++ b/fhevm-engine/Cargo.toml @@ -4,7 +4,7 @@ members = ["coprocessor", "executor", "fhevm-engine-common"] [workspace.dependencies] clap = { version = "4.5", features = ["derive"] } -tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1.38.0", features = ["full"] } prost = "0.13" tonic = { version = "0.12", features = ["server"] } bincode = "1.3.3" @@ -13,7 +13,8 @@ anyhow = "1.0.86" daggy = "0.8.0" serde = "1.0.210" prometheus = "0.13.4" -log = { version = "0.4.22", features = ["kv"] } +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["fmt", "json"] } [profile.dev.package.tfhe] overflow-checks = false diff --git a/fhevm-engine/coprocessor/Cargo.toml b/fhevm-engine/coprocessor/Cargo.toml index 8fb9d1ab..1cf62c9d 100644 --- a/fhevm-engine/coprocessor/Cargo.toml +++ b/fhevm-engine/coprocessor/Cargo.toml @@ -32,12 +32,12 @@ strum = { version = "0.26", features = ["derive"] } bincode.workspace = true sha3.workspace = true prometheus.workspace = true -log.workspace = true -structured-logger = "1.0.3" +tracing.workspace = true +tracing-subscriber.workspace = true actix-web = "4.9.0" -opentelemetry = "0.25" -opentelemetry-otlp = "0.25" -opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] } +opentelemetry = "0.25.0" +opentelemetry-otlp = "0.25.0" +opentelemetry_sdk = { version = "0.25.0", features = ["rt-tokio"] } [dev-dependencies] testcontainers = "0.21" diff --git a/fhevm-engine/coprocessor/src/main.rs b/fhevm-engine/coprocessor/src/main.rs index 1c2c501d..bb0f4732 100644 --- a/fhevm-engine/coprocessor/src/main.rs +++ b/fhevm-engine/coprocessor/src/main.rs @@ -1,16 +1,19 @@ +use std::sync::Once; + +use ::tracing::{error, info}; use fhevm_engine_common::keys::{FhevmKeys, SerializedFhevmKeys}; use tokio::task::JoinSet; mod cli; mod db_queries; +mod metrics; mod server; #[cfg(test)] mod tests; mod tfhe_worker; +mod tracing; mod types; mod utils; -mod metrics; -mod tracing; fn main() { let args = crate::cli::parse_args(); @@ -43,27 +46,30 @@ pub fn start_runtime( tokio::select! { main = async_main(args) => { if let Err(e) = main { - log::error!(target: "main_wchannel", error = e.to_string(); "Runtime error"); + error!(target: "main_wchannel", { error = e }, "Runtime error"); } } _ = close_recv.changed() => { - log::info!(target: "main_wchannel", "Service stopped voluntarily"); + info!(target: "main_wchannel", "Service stopped voluntarily"); } } } else { if let Err(e) = async_main(args).await { - log::error!(target: "main", error = e.to_string(); "Runtime error"); + error!(target: "main", { error = e }, "Runtime error"); } } }) } +// Used for testing as we would call `async_main()` multiple times. +static TRACING_INIT: Once = Once::new(); + async fn async_main( args: crate::cli::Args, ) -> Result<(), Box> { - structured_logger::Builder::new() - .with_default_writer(structured_logger::async_json::new_writer(tokio::io::stdout())) - .init(); + TRACING_INIT.call_once(|| { + tracing_subscriber::fmt().json().with_level(true).init(); + }); if let Err(err) = tracing::setup_tracing() { panic!("Error while initializing tracing: {:?}", err); @@ -71,17 +77,17 @@ async fn async_main( let mut set = JoinSet::new(); if args.run_server { - log::info!(target: "async_main", "Initializing api server"); + info!(target: "async_main", "Initializing api server"); set.spawn(crate::server::run_server(args.clone())); } if args.run_bg_worker { - log::info!(target: "async_main", "Initializing background worker"); + info!(target: "async_main", "Initializing background worker"); set.spawn(crate::tfhe_worker::run_tfhe_worker(args.clone())); } if !args.metrics_addr.is_empty() { - log::info!(target: "async_main", "Initializing metrics server"); + info!(target: "async_main", "Initializing metrics server"); set.spawn(crate::metrics::run_metrics_server(args.clone())); } diff --git a/fhevm-engine/coprocessor/src/metrics.rs b/fhevm-engine/coprocessor/src/metrics.rs index 821d234a..a9422696 100644 --- a/fhevm-engine/coprocessor/src/metrics.rs +++ b/fhevm-engine/coprocessor/src/metrics.rs @@ -1,7 +1,11 @@ +use tracing::info; + async fn metrics() -> impl actix_web::Responder { let encoder = prometheus::TextEncoder::new(); let metric_families = prometheus::gather(); - encoder.encode_to_string(&metric_families).expect("can't encode metrics") + encoder + .encode_to_string(&metric_families) + .expect("can't encode metrics") } async fn healthcheck() -> impl actix_web::Responder { @@ -11,7 +15,7 @@ async fn healthcheck() -> impl actix_web::Responder { pub async fn run_metrics_server( args: crate::cli::Args, ) -> Result<(), Box> { - log::info!("metrics server listening at {}", args.metrics_addr); + info!("metrics server listening at {}", args.metrics_addr); let _ = actix_web::HttpServer::new(|| { actix_web::App::new() .route("/metrics", actix_web::web::to(metrics)) @@ -24,4 +28,4 @@ pub async fn run_metrics_server( .await?; Ok(()) -} \ No newline at end of file +} diff --git a/fhevm-engine/coprocessor/src/server.rs b/fhevm-engine/coprocessor/src/server.rs index bff92d03..845f6725 100644 --- a/fhevm-engine/coprocessor/src/server.rs +++ b/fhevm-engine/coprocessor/src/server.rs @@ -32,6 +32,7 @@ use sha3::{Digest, Keccak256}; use sqlx::{query, Acquire}; use tokio::task::spawn_blocking; use tonic::transport::Server; +use tracing::{error, info}; pub mod common { tonic::include_proto!("fhevm.common"); @@ -96,7 +97,7 @@ pub async fn run_server( ) -> Result<(), Box> { loop { if let Err(e) = run_server_iteration(args.clone()).await { - log::error!(target: "grpc_server", error = e.to_string(); "Error running server, retrying shortly"); + error!(target: "grpc_server", { error = e }, "Error running server, retrying shortly"); } tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; @@ -115,9 +116,9 @@ pub async fn run_server_iteration( let coprocessor_key_file = tokio::fs::read_to_string(&args.coprocessor_private_key).await?; let signer = PrivateKeySigner::from_str(coprocessor_key_file.trim())?; - log::info!(target: "grpc_server", address = signer.address().to_string(); "Coprocessor signer initiated"); + info!(target: "grpc_server", { address = signer.address().to_string() }, "Coprocessor signer initiated"); - log::info!("Coprocessor listening on {}", addr); + info!("Coprocessor listening on {}", addr); let pool = sqlx::postgres::PgPoolOptions::new() .max_connections(args.pg_pool_max_connections) .connect(&db_url) @@ -196,13 +197,21 @@ impl GrpcTracer { } pub fn set_error(&mut self, e: impl Error) { - self.ctx.span().set_status(opentelemetry::trace::Status::Error { description: e.to_string().into() }); + self.ctx + .span() + .set_status(opentelemetry::trace::Status::Error { + description: e.to_string().into(), + }); } } impl Clone for GrpcTracer { fn clone(&self) -> Self { - GrpcTracer { ctx: self.ctx.clone(), name: self.name, tracer: opentelemetry::global::tracer(self.name) } + GrpcTracer { + ctx: self.ctx.clone(), + name: self.name, + tracer: opentelemetry::global::tracer(self.name), + } } } @@ -222,10 +231,12 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ ) -> std::result::Result, tonic::Status> { UPLOAD_INPUTS_COUNTER.inc(); let mut tracer = grpc_tracer("upload_inputs"); - self.upload_inputs_impl(request, &tracer).await.inspect_err(|e| { - tracer.set_error(e); - UPLOAD_INPUTS_ERRORS.inc(); - }) + self.upload_inputs_impl(request, &tracer) + .await + .inspect_err(|e| { + tracer.set_error(e); + UPLOAD_INPUTS_ERRORS.inc(); + }) } async fn async_compute( @@ -234,10 +245,12 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ ) -> std::result::Result, tonic::Status> { ASYNC_COMPUTE_COUNTER.inc(); let mut tracer = grpc_tracer("async_compute"); - self.async_compute_impl(request, &tracer).await.inspect_err(|e| { - tracer.set_error(e); - ASYNC_COMPUTE_ERRORS.inc(); - }) + self.async_compute_impl(request, &tracer) + .await + .inspect_err(|e| { + tracer.set_error(e); + ASYNC_COMPUTE_ERRORS.inc(); + }) } async fn trivial_encrypt_ciphertexts( @@ -261,10 +274,12 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ { GET_CIPHERTEXTS_COUNTER.inc(); let mut tracer = grpc_tracer("get_ciphertexts"); - self.get_ciphertexts_impl(request, &tracer).await.inspect_err(|e| { - tracer.set_error(e); - GET_CIPHERTEXTS_ERRORS.inc(); - }) + self.get_ciphertexts_impl(request, &tracer) + .await + .inspect_err(|e| { + tracer.set_error(e); + GET_CIPHERTEXTS_ERRORS.inc(); + }) } } @@ -361,12 +376,9 @@ impl CoprocessorService { let tracer = tracer.clone(); let mut blocking_span = tracer.child_span("blocking_ciphertext_list_expand"); - blocking_span.set_attributes(vec![ - KeyValue::new("idx", idx as i64), - ]); + blocking_span.set_attributes(vec![KeyValue::new("idx", idx as i64)]); tfhe_work_set.spawn_blocking( move || -> Result<_, (Box<(dyn std::error::Error + Send + Sync)>, usize)> { - let mut span = tracer.child_span("set_server_key"); tfhe::set_server_key(server_key.clone()); span.end(); @@ -445,9 +457,7 @@ impl CoprocessorService { .expect("we should have all results computed now"); let mut span = tracer.child_span("db_insert_input_blob"); - span.set_attributes(vec![ - KeyValue::new("idx", idx as i64), - ]); + span.set_attributes(vec![KeyValue::new("idx", idx as i64)]); // save blob for audits and historical reference let _ = sqlx::query!( " @@ -554,9 +564,7 @@ impl CoprocessorService { } let mut span = tracer.child_span("eip_712_signature"); - span.set_attributes(vec![ - KeyValue::new("blob_idx", idx as i64), - ]); + span.set_attributes(vec![KeyValue::new("blob_idx", idx as i64)]); let signing_hash = ct_verification.eip712_signing_hash(&eip_712_domain); let eip_712_signature = self.signer.sign_hash_sync(&signing_hash).map_err(|e| { CoprocessorError::Eip712SigningFailure { @@ -597,7 +605,6 @@ impl CoprocessorService { return Ok(tonic::Response::new(GenericResponse { response_code: 0 })); } - let mut span = tracer.child_span("sort_computations_by_dependencies"); // computations are now sorted based on dependencies or error should have // been returned if there's circular dependency @@ -680,9 +687,10 @@ impl CoprocessorService { CoprocessorError::FhevmError(FhevmError::UnknownFheOperation(comp.operation)) })?; let mut span = tracer.child_span("insert_computation"); - span.set_attributes(vec![ - KeyValue::new("handle", format!("0x{}", hex::encode(&comp.output_handle))) - ]); + span.set_attributes(vec![KeyValue::new( + "handle", + format!("0x{}", hex::encode(&comp.output_handle)), + )]); let res = query!( " INSERT INTO computations( diff --git a/fhevm-engine/coprocessor/src/tfhe_worker.rs b/fhevm-engine/coprocessor/src/tfhe_worker.rs index 4055d470..26f0a066 100644 --- a/fhevm-engine/coprocessor/src/tfhe_worker.rs +++ b/fhevm-engine/coprocessor/src/tfhe_worker.rs @@ -5,14 +5,15 @@ use fhevm_engine_common::{ types::SupportedFheOperations, }; use lazy_static::lazy_static; -use opentelemetry::KeyValue; use opentelemetry::trace::{Span, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; use prometheus::{register_int_counter, IntCounter}; use sqlx::{postgres::PgListener, query, Acquire}; use std::{ collections::{BTreeSet, HashMap}, num::NonZeroUsize, }; +use tracing::{error, info}; lazy_static! { static ref WORKER_ERRORS_COUNTER: IntCounter = @@ -51,7 +52,7 @@ pub async fn run_tfhe_worker( // here we log the errors and make sure we retry if let Err(cycle_error) = tfhe_worker_cycle(&args).await { WORKER_ERRORS_COUNTER.inc(); - log::error!(target: "tfhe_worker", error = cycle_error.to_string(); "Error in background worker, retrying shortly"); + error!(target: "tfhe_worker", { error = cycle_error }, "Error in background worker, retrying shortly"); } tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await; } @@ -83,11 +84,11 @@ async fn tfhe_worker_cycle( tokio::select! { _ = listener.try_recv() => { WORK_ITEMS_NOTIFICATIONS_COUNTER.inc(); - log::info!(target: "tfhe_worker", "Received work_available notification from postgres"); + info!(target: "tfhe_worker", "Received work_available notification from postgres"); }, _ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => { WORK_ITEMS_POLL_COUNTER.inc(); - log::info!(target: "tfhe_worker", "Polling the database for more work on timer"); + info!(target: "tfhe_worker", "Polling the database for more work on timer"); }, }; } @@ -138,8 +139,7 @@ async fn tfhe_worker_cycle( } WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64); - log::info!(target: "tfhe_worker", count = the_work.len(); "Processing work items"); - + info!(target: "tfhe_worker", { count = the_work.len() }, "Processing work items"); // make sure we process each tenant sequentially not to // load different keys in cache by different tenants @@ -170,7 +170,10 @@ async fn tfhe_worker_cycle( let keys_to_query = keys_to_query.into_iter().collect::>(); s.set_attribute(KeyValue::new("keys_to_query", keys_to_query.len() as i64)); - s.set_attribute(KeyValue::new("tenants_to_query", tenants_to_query.len() as i64)); + s.set_attribute(KeyValue::new( + "tenants_to_query", + tenants_to_query.len() as i64, + )); populate_cache_with_tenant_keys(keys_to_query, trx.as_mut(), &tenant_key_cache).await?; s.end(); @@ -206,7 +209,6 @@ async fn tfhe_worker_cycle( let mut tfhe_work_set = tokio::task::JoinSet::new(); // process every tenant by tenant id because we must switch keys for each tenant for w in the_work { - let tenant_key_cache = tenant_key_cache.clone(); let fhe_op: SupportedFheOperations = w .fhe_operation @@ -282,9 +284,16 @@ async fn tfhe_worker_cycle( let (db_type, db_bytes) = res.compress(); s.set_attribute(KeyValue::new("fhe_operation", w.fhe_operation as i64)); - s.set_attribute(KeyValue::new("handle", format!("0x{}", hex::encode(&w.output_handle)))); + s.set_attribute(KeyValue::new( + "handle", + format!("0x{}", hex::encode(&w.output_handle)), + )); s.set_attribute(KeyValue::new("output_type", db_type as i64)); - let input_types = deserialized_cts.iter().map(|i| i.type_num().to_string()).collect::>().join(","); + let input_types = deserialized_cts + .iter() + .map(|i| i.type_num().to_string()) + .collect::>() + .join(","); s.set_attribute(KeyValue::new("input_types", input_types)); s.end(); Ok((w, db_type, db_bytes)) @@ -300,7 +309,10 @@ async fn tfhe_worker_cycle( Ok((w, db_type, db_bytes)) => { let mut s = tracer.start_with_context("insert_ct_into_db", &loop_ctx); s.set_attribute(KeyValue::new("tenant_id", w.tenant_id as i64)); - s.set_attribute(KeyValue::new("handle", format!("0x{}", hex::encode(&w.output_handle)))); + s.set_attribute(KeyValue::new( + "handle", + format!("0x{}", hex::encode(&w.output_handle)), + )); s.set_attribute(KeyValue::new("ciphertext_type", db_type as i64)); let _ = query!(" INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type) @@ -312,7 +324,10 @@ async fn tfhe_worker_cycle( s.end(); let mut s = tracer.start_with_context("update_computation", &loop_ctx); s.set_attribute(KeyValue::new("tenant_id", w.tenant_id as i64)); - s.set_attribute(KeyValue::new("handle", format!("0x{}", hex::encode(&w.output_handle)))); + s.set_attribute(KeyValue::new( + "handle", + format!("0x{}", hex::encode(&w.output_handle)), + )); s.set_attribute(KeyValue::new("ciphertext_type", db_type as i64)); let _ = query!( " @@ -331,17 +346,20 @@ async fn tfhe_worker_cycle( } Err((err, tenant_id, output_handle)) => { WORKER_ERRORS_COUNTER.inc(); - log::error!(target: "tfhe_worker", - tenant_id, - output_handle = format!("0x{}", hex::encode(&output_handle)), - error = err.to_string(); + error!(target: "tfhe_worker", + { tenant_id = tenant_id, error = err, output_handle = format!("0x{}", hex::encode(&output_handle)) }, "error while processing work item" ); let mut s = tracer.start_with_context("set_computation_error_in_db", &loop_ctx); s.set_attribute(KeyValue::new("tenant_id", tenant_id as i64)); - s.set_attribute(KeyValue::new("handle", format!("0x{}", hex::encode(&output_handle)))); + s.set_attribute(KeyValue::new( + "handle", + format!("0x{}", hex::encode(&output_handle)), + )); let err_string = err.to_string(); - s.set_status(opentelemetry::trace::Status::Error { description: err_string.clone().into() }); + s.set_status(opentelemetry::trace::Status::Error { + description: err_string.clone().into(), + }); let _ = query!( " UPDATE computations diff --git a/fhevm-engine/coprocessor/src/tracing.rs b/fhevm-engine/coprocessor/src/tracing.rs index 92f83023..267f6e93 100644 --- a/fhevm-engine/coprocessor/src/tracing.rs +++ b/fhevm-engine/coprocessor/src/tracing.rs @@ -1,6 +1,6 @@ pub fn setup_tracing() -> Result<(), Box> { - let otlp_exporter = opentelemetry_otlp::new_exporter() - .tonic(); + let otlp_exporter = opentelemetry_otlp::new_exporter().tonic(); + let trace_provider = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter(otlp_exporter) @@ -9,4 +9,4 @@ pub fn setup_tracing() -> Result<(), Box