Skip to content

Commit 8ccb580

Browse files
feat: telemetry report for parquet metrics (#25425)
- added mechanism within PersistedFile to expose parquet file related metrics. The details are updated when new snapshot is generated and also when all snapshots are loaded when the process starts up - at the point of creating the telemetry payload these parquet metrics are looked up before sending it to the server. Closes: #25418
1 parent 7d37bbb commit 8ccb580

File tree

8 files changed

+324
-86
lines changed

8 files changed

+324
-86
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

influxdb3/src/commands/serve.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ use influxdb3_server::{
1818
use influxdb3_telemetry::store::TelemetryStore;
1919
use influxdb3_wal::{Gen1Duration, WalConfig};
2020
use influxdb3_write::{
21-
last_cache::LastCacheProvider, parquet_cache::create_cached_obj_store_and_oracle,
22-
persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer,
21+
last_cache::LastCacheProvider,
22+
parquet_cache::create_cached_obj_store_and_oracle,
23+
persister::Persister,
24+
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl},
25+
WriteBuffer,
2326
};
2427
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
2528
use iox_time::SystemProvider;
@@ -415,29 +418,21 @@ pub async fn command(config: Config) -> Result<()> {
415418
snapshot_size: config.wal_snapshot_size,
416419
};
417420

418-
let catalog = persister
419-
.load_or_create_catalog()
420-
.await
421-
.map_err(Error::InitializePersistedCatalog)?;
421+
let catalog = Arc::new(
422+
persister
423+
.load_or_create_catalog()
424+
.await
425+
.map_err(Error::InitializePersistedCatalog)?,
426+
);
422427

423428
let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner())
424429
.map_err(Error::InitializeLastCache)?;
425430
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");
426431

427-
let telemetry_store =
428-
setup_telemetry_store(&config.object_store_config, catalog.instance_id(), num_cpus).await;
429-
430-
let common_state = CommonServerState::new(
431-
Arc::clone(&metrics),
432-
trace_exporter,
433-
trace_header_parser,
434-
Arc::clone(&telemetry_store),
435-
)?;
436-
437-
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
432+
let write_buffer_impl = Arc::new(
438433
WriteBufferImpl::new(
439434
Arc::clone(&persister),
440-
Arc::new(catalog),
435+
Arc::clone(&catalog),
441436
Arc::new(last_cache),
442437
Arc::<SystemProvider>::clone(&time_provider),
443438
Arc::clone(&exec),
@@ -447,6 +442,24 @@ pub async fn command(config: Config) -> Result<()> {
447442
.await
448443
.map_err(|e| Error::WriteBufferInit(e.into()))?,
449444
);
445+
446+
let telemetry_store = setup_telemetry_store(
447+
&config.object_store_config,
448+
catalog.instance_id(),
449+
num_cpus,
450+
Arc::clone(&write_buffer_impl.persisted_files()),
451+
)
452+
.await;
453+
454+
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
455+
456+
let common_state = CommonServerState::new(
457+
Arc::clone(&metrics),
458+
trace_exporter,
459+
trace_header_parser,
460+
Arc::clone(&telemetry_store),
461+
)?;
462+
450463
let query_executor = Arc::new(QueryExecutorImpl::new(
451464
write_buffer.catalog(),
452465
Arc::clone(&write_buffer),
@@ -486,6 +499,7 @@ async fn setup_telemetry_store(
486499
object_store_config: &ObjectStoreConfig,
487500
instance_id: Arc<str>,
488501
num_cpus: usize,
502+
persisted_files: Arc<PersistedFiles>,
489503
) -> Arc<TelemetryStore> {
490504
let os = std::env::consts::OS;
491505
let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
@@ -503,6 +517,7 @@ async fn setup_telemetry_store(
503517
Arc::from(influx_version),
504518
Arc::from(storage_type),
505519
num_cpus,
520+
persisted_files,
506521
)
507522
.await
508523
}

influxdb3_server/src/lib.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -748,14 +748,6 @@ mod tests {
748748
async fn setup_server(start_time: i64) -> (String, CancellationToken, Arc<dyn WriteBuffer>) {
749749
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
750750
let metrics = Arc::new(metric::Registry::new());
751-
let dummy_telem_store = TelemetryStore::new_without_background_runners();
752-
let common_state = crate::CommonServerState::new(
753-
Arc::clone(&metrics),
754-
None,
755-
trace_header_parser,
756-
dummy_telem_store,
757-
)
758-
.unwrap();
759751
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
760752
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time)));
761753
let (object_store, parquet_cache) =
@@ -777,9 +769,7 @@ mod tests {
777769
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
778770
let dummy_host_id = Arc::from("dummy-host-id");
779771
let instance_id = Arc::from("dummy-instance-id");
780-
let telemetry_store = TelemetryStore::new_without_background_runners();
781-
782-
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
772+
let write_buffer_impl = Arc::new(
783773
influxdb3_write::write_buffer::WriteBufferImpl::new(
784774
Arc::clone(&persister),
785775
Arc::new(Catalog::new(dummy_host_id, instance_id)),
@@ -792,6 +782,18 @@ mod tests {
792782
.await
793783
.unwrap(),
794784
);
785+
786+
let dummy_telem_store = TelemetryStore::new_without_background_runners(Arc::clone(
787+
&write_buffer_impl.persisted_files(),
788+
));
789+
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
790+
let common_state = crate::CommonServerState::new(
791+
Arc::clone(&metrics),
792+
None,
793+
trace_header_parser,
794+
Arc::clone(&dummy_telem_store),
795+
)
796+
.unwrap();
795797
let query_executor = crate::query_executor::QueryExecutorImpl::new(
796798
write_buffer.catalog(),
797799
Arc::clone(&write_buffer),
@@ -800,7 +802,7 @@ mod tests {
800802
Arc::new(HashMap::new()),
801803
10,
802804
10,
803-
telemetry_store,
805+
Arc::clone(&dummy_telem_store),
804806
);
805807

806808
// bind to port 0 will assign a random available port:

influxdb3_server/src/query_executor.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ mod tests {
645645
let executor = make_exec(Arc::clone(&object_store));
646646
let host_id = Arc::from("dummy-host-id");
647647
let instance_id = Arc::from("instance-id");
648-
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
648+
let write_buffer_impl = Arc::new(
649649
WriteBufferImpl::new(
650650
Arc::clone(&persister),
651651
Arc::new(Catalog::new(host_id, instance_id)),
@@ -663,9 +663,13 @@ mod tests {
663663
.await
664664
.unwrap(),
665665
);
666+
667+
let dummy_telem_store = TelemetryStore::new_without_background_runners(Arc::clone(
668+
&write_buffer_impl.persisted_files(),
669+
));
670+
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
666671
let metrics = Arc::new(Registry::new());
667672
let df_config = Arc::new(Default::default());
668-
let dummy_telem_store = TelemetryStore::new_without_background_runners();
669673
let query_executor = QueryExecutorImpl::new(
670674
write_buffer.catalog(),
671675
Arc::clone(&write_buffer),

influxdb3_telemetry/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ sysinfo.workspace = true
1818
num.workspace = true
1919
thiserror.workspace = true
2020

21+
# Local Deps
22+
influxdb3_write = { path = "../influxdb3_write" }
23+
2124
[dev-dependencies]
2225
test-log.workspace = true
2326
proptest.workspace = true

influxdb3_telemetry/src/sender.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ pub(crate) struct TelemetryPayload {
6262
pub query_requests_min: u64,
6363
pub query_requests_max: u64,
6464
pub query_requests_avg: u64,
65+
// parquet files
66+
pub parquet_file_count: u64,
67+
pub parquet_file_size_mb: f64,
68+
pub parquet_row_count: u64,
6569
}
6670

6771
pub(crate) async fn send_telemetry_in_background(

influxdb3_telemetry/src/store.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{sync::Arc, time::Duration};
22

3+
use influxdb3_write::write_buffer::persisted_files::PersistedFiles;
34
use num::Float;
45
use observability_deps::tracing::{debug, warn};
56

@@ -15,6 +16,7 @@ use crate::{
1516
#[derive(Debug)]
1617
pub struct TelemetryStore {
1718
inner: parking_lot::Mutex<TelemetryStoreInner>,
19+
persisted_files: Arc<PersistedFiles>,
1820
}
1921

2022
const SAMPLER_INTERVAL_SECS: u64 = 60;
@@ -27,6 +29,7 @@ impl TelemetryStore {
2729
influx_version: Arc<str>,
2830
storage_type: Arc<str>,
2931
cores: usize,
32+
persisted_files: Arc<PersistedFiles>,
3033
) -> Arc<Self> {
3134
debug!(
3235
instance_id = ?instance_id,
@@ -39,6 +42,7 @@ impl TelemetryStore {
3942
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
4043
let store = Arc::new(TelemetryStore {
4144
inner: parking_lot::Mutex::new(inner),
45+
persisted_files,
4246
});
4347

4448
if !cfg!(test) {
@@ -52,7 +56,7 @@ impl TelemetryStore {
5256
store
5357
}
5458

55-
pub fn new_without_background_runners() -> Arc<Self> {
59+
pub fn new_without_background_runners(persisted_files: Arc<PersistedFiles>) -> Arc<Self> {
5660
let instance_id = Arc::from("dummy-instance-id");
5761
let os = Arc::from("Linux");
5862
let influx_version = Arc::from("influxdb3-0.1.0");
@@ -61,6 +65,7 @@ impl TelemetryStore {
6165
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
6266
Arc::new(TelemetryStore {
6367
inner: parking_lot::Mutex::new(inner),
68+
persisted_files,
6469
})
6570
}
6671

@@ -100,7 +105,12 @@ impl TelemetryStore {
100105

101106
pub(crate) fn snapshot(&self) -> TelemetryPayload {
102107
let inner_store = self.inner.lock();
103-
inner_store.snapshot()
108+
let (file_count, size_mb, row_count) = self.persisted_files.get_metrics();
109+
let mut payload = inner_store.snapshot();
110+
payload.parquet_file_count = file_count;
111+
payload.parquet_file_size_mb = size_mb;
112+
payload.parquet_row_count = row_count;
113+
payload
104114
}
105115
}
106116

@@ -194,6 +204,11 @@ impl TelemetryStoreInner {
194204
query_requests_min: self.reads.num_queries.min,
195205
query_requests_max: self.reads.num_queries.max,
196206
query_requests_avg: self.reads.num_queries.avg,
207+
208+
// hmmm. will be nice to pass persisted file in?
209+
parquet_file_count: 0,
210+
parquet_file_size_mb: 0.0,
211+
parquet_row_count: 0,
197212
}
198213
}
199214

@@ -265,13 +280,18 @@ mod tests {
265280

266281
#[test_log::test(tokio::test)]
267282
async fn test_telemetry_store_cpu_mem() {
283+
let persisted_snapshots = Vec::new();
268284
// create store
285+
let persisted_files = Arc::from(PersistedFiles::new_from_persisted_snapshots(
286+
persisted_snapshots,
287+
));
269288
let store: Arc<TelemetryStore> = TelemetryStore::new(
270289
Arc::from("some-instance-id"),
271290
Arc::from("Linux"),
272291
Arc::from("OSS-v3.0"),
273292
Arc::from("Memory"),
274293
10,
294+
persisted_files,
275295
)
276296
.await;
277297
// check snapshot
@@ -301,6 +321,9 @@ mod tests {
301321
assert_eq!(expected_mem_in_mb, snapshot.memory_used_mb_min);
302322
assert_eq!(128, snapshot.memory_used_mb_max);
303323
assert_eq!(122, snapshot.memory_used_mb_avg);
324+
assert_eq!(0, snapshot.parquet_file_count);
325+
assert_eq!(0.0, snapshot.parquet_file_size_mb);
326+
assert_eq!(0, snapshot.parquet_row_count);
304327

305328
// add some writes
306329
store.add_write_metrics(100, 100);

0 commit comments

Comments
 (0)