diff --git a/src/common/base/src/runtime/executor_stats/mod.rs b/src/common/base/src/runtime/executor_stats/mod.rs new file mode 100644 index 0000000000000..70d13581d99cb --- /dev/null +++ b/src/common/base/src/runtime/executor_stats/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod stats; + +pub use stats::ExecutorStats; +pub use stats::ExecutorStatsSlot; +pub use stats::ExecutorStatsSnapshot; diff --git a/src/common/base/src/runtime/executor_stats/stats.rs b/src/common/base/src/runtime/executor_stats/stats.rs new file mode 100644 index 0000000000000..e4fc49278a331 --- /dev/null +++ b/src/common/base/src/runtime/executor_stats/stats.rs @@ -0,0 +1,199 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::time::SystemTime; + +use crate::runtime::ThreadTracker; + +const RING_BUFFER_SIZE: usize = 10; +const TS_SHIFT: u32 = 32; +const VAL_MASK: u64 = 0xFFFFFFFF; + +const MICROS_PER_SEC: u64 = 1_000_000; + +/// Snapshot of executor statistics containing timestamp-value pairs for process time and rows. +#[derive(Debug, Clone)] +pub struct ExecutorStatsSnapshot { + pub process_time: Vec<(u32, u32)>, + pub process_rows: Vec<(u32, u32)>, +} + +/// Packs a timestamp (u32) and a value (u32) into a u64. +#[inline] +fn pack(timestamp: u32, value: u32) -> u64 { + (timestamp as u64) << TS_SHIFT | (value as u64) +} + +/// Unpacks an u64 into a timestamp (u32) and a value (u32). +#[inline] +fn unpack(packed: u64) -> (u32, u32) { + ((packed >> TS_SHIFT) as u32, (packed & VAL_MASK) as u32) +} + +/// A slot for storing executor statistics for a specific time window (1 second). +/// +/// It uses a single AtomicU64 to store both a Unix timestamp and a value. +/// - The upper 32 bits store the timestamp (seconds since Unix epoch). +/// - The lower 32 bits store the accumulated value (e.g., rows, duration in micros). +#[derive(Default)] +pub struct ExecutorStatsSlot(AtomicU64); + +impl ExecutorStatsSlot { + /// Creates a new empty ExecutorStatsSlot. + pub fn new() -> Self { + Self::default() + } + + /// Records a metric value using the provided timestamp. + pub fn record_metric(&self, timestamp: usize, value: usize) { + // Convert to u32, clamping if necessary + let timestamp_u32 = timestamp as u32; + let value_u32 = if value > u32::MAX as usize { + u32::MAX + } else { + value as u32 + }; + self.add(timestamp_u32, value_u32); + } + + /// Adds a value to the slot for the given timestamp. + /// + /// This operation is thread-safe and uses a lock-free CAS loop. + /// If the time window has expired, the value is reset before adding. + pub fn add(&self, timestamp: u32, value_to_add: u32) { + let mut current_packed = self.0.load(Ordering::SeqCst); + loop { + let (current_ts, current_val) = unpack(current_packed); + let new_packed = if current_ts == timestamp { + pack(current_ts, current_val.saturating_add(value_to_add)) + } else { + pack(timestamp, value_to_add) + }; + match self.0.compare_exchange_weak( + current_packed, + new_packed, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => return, + Err(expected) => { + current_packed = expected; + } + } + } + } + + /// Gets the timestamp and value + pub fn get(&self) -> (u32, u32) { + let packed = self.0.load(Ordering::Acquire); + unpack(packed) + } +} + +// A ring-buffer thread-free implementation for storing scheduling profile +pub struct ExecutorStats { + pub process_time: [ExecutorStatsSlot; RING_BUFFER_SIZE], + pub process_rows: [ExecutorStatsSlot; RING_BUFFER_SIZE], +} + +impl ExecutorStats { + pub fn new() -> Self { + let process_time = std::array::from_fn(|_| ExecutorStatsSlot::new()); + let process_rows = std::array::from_fn(|_| ExecutorStatsSlot::new()); + ExecutorStats { + process_time, + process_rows, + } + } + + pub fn record_process(&self, begin: SystemTime, elapsed_micros: usize, rows: usize) { + let begin_micros = begin + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_micros() as u64; + + let end_micros = begin_micros + elapsed_micros as u64; + + let begin_timestamp_secs = begin_micros / MICROS_PER_SEC; + let end_timestamp_secs = end_micros / MICROS_PER_SEC; + + if begin_timestamp_secs == end_timestamp_secs { + // Single second case - record all in one slot + let slot_idx = (begin_timestamp_secs % RING_BUFFER_SIZE as u64) as usize; + self.process_time[slot_idx] + .record_metric(begin_timestamp_secs as usize, elapsed_micros); + self.process_rows[slot_idx].record_metric(begin_timestamp_secs as usize, rows); + } else { + // Cross-second case - distribute proportionally + let total_duration_micros = elapsed_micros as u64; + + for current_sec in begin_timestamp_secs..=end_timestamp_secs { + let slot_idx = (current_sec % RING_BUFFER_SIZE as u64) as usize; + + let sec_start_micros = if current_sec == begin_timestamp_secs { + begin_micros % MICROS_PER_SEC + } else { + 0 + }; + + let sec_end_micros = if current_sec == end_timestamp_secs { + end_micros % MICROS_PER_SEC + } else { + MICROS_PER_SEC + }; + + let sec_duration_micros = sec_end_micros - sec_start_micros; + let proportion = sec_duration_micros as f64 / total_duration_micros as f64; + + let allocated_micros = (elapsed_micros as f64 * proportion) as usize; + let allocated_rows = (rows as f64 * proportion) as usize; + + if allocated_micros > 0 { + self.process_time[slot_idx] + .record_metric(current_sec as usize, allocated_micros); + } + if allocated_rows > 0 { + self.process_rows[slot_idx].record_metric(current_sec as usize, allocated_rows); + } + } + } + } + + pub fn record_thread_tracker(rows: usize) { + ThreadTracker::with(|x| { + x.borrow() + .payload + .process_rows + .store(rows, Ordering::SeqCst) + }); + } + + pub fn dump_snapshot(&self) -> ExecutorStatsSnapshot { + let process_time_snapshot = self.process_time.iter().map(|slot| slot.get()).collect(); + let process_rows_snapshot = self.process_rows.iter().map(|slot| slot.get()).collect(); + + ExecutorStatsSnapshot { + process_time: process_time_snapshot, + process_rows: process_rows_snapshot, + } + } +} + +impl Default for ExecutorStats { + fn default() -> Self { + Self::new() + } +} diff --git a/src/common/base/src/runtime/mod.rs b/src/common/base/src/runtime/mod.rs index f68edd7d0c53a..83780ce530ae6 100644 --- a/src/common/base/src/runtime/mod.rs +++ b/src/common/base/src/runtime/mod.rs @@ -16,6 +16,7 @@ mod backtrace; mod catch_unwind; mod defer; pub mod error_info; +mod executor_stats; mod global_runtime; mod memory; pub mod metrics; @@ -35,6 +36,9 @@ pub use catch_unwind::catch_unwind; pub use catch_unwind::drop_guard; pub use catch_unwind::CatchUnwindFuture; pub use defer::defer; +pub use executor_stats::ExecutorStats; +pub use executor_stats::ExecutorStatsSlot; +pub use executor_stats::ExecutorStatsSnapshot; pub use global_runtime::GlobalIORuntime; pub use global_runtime::GlobalQueryRuntime; pub use memory::set_alloc_error_hook; diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index d59060826e617..4a1f0d95f45a2 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -45,6 +45,7 @@ use std::cell::RefCell; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -132,7 +133,6 @@ impl CaptureLogSettings { } } -#[derive(Clone)] pub struct TrackingPayload { pub query_id: Option, pub profile: Option>, @@ -143,6 +143,26 @@ pub struct TrackingPayload { pub local_time_series_profile: Option>, pub workload_group_resource: Option>, pub perf_enabled: bool, + pub process_rows: AtomicUsize, +} + +impl Clone for TrackingPayload { + fn clone(&self) -> Self { + TrackingPayload { + query_id: self.query_id.clone(), + profile: self.profile.clone(), + mem_stat: self.mem_stat.clone(), + metrics: self.metrics.clone(), + capture_log_settings: self.capture_log_settings.clone(), + time_series_profile: self.time_series_profile.clone(), + local_time_series_profile: self.local_time_series_profile.clone(), + workload_group_resource: self.workload_group_resource.clone(), + perf_enabled: self.perf_enabled, + process_rows: AtomicUsize::new( + self.process_rows.load(std::sync::atomic::Ordering::SeqCst), + ), + } + } } pub struct TrackingGuard { @@ -222,6 +242,7 @@ impl ThreadTracker { local_time_series_profile: None, workload_group_resource: None, perf_enabled: false, + process_rows: AtomicUsize::new(0), }), } } @@ -336,6 +357,18 @@ impl ThreadTracker { .ok() .and_then(|x| x) } + + pub fn process_rows() -> usize { + TRACKER + .try_with(|tracker| { + tracker + .borrow() + .payload + .process_rows + .load(std::sync::atomic::Ordering::SeqCst) + }) + .unwrap_or(0) + } } pin_project! { diff --git a/src/common/base/tests/it/executor_stats.rs b/src/common/base/tests/it/executor_stats.rs new file mode 100644 index 0000000000000..9abc675cd4aa6 --- /dev/null +++ b/src/common/base/tests/it/executor_stats.rs @@ -0,0 +1,185 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; + +use databend_common_base::runtime::ExecutorStats; +use databend_common_base::runtime::ExecutorStatsSlot; +use databend_common_base::runtime::Thread; + +#[test] +fn test_executor_stats_slot_basic() { + let slot = ExecutorStatsSlot::new(); + let timestamp = 1000; + + // Test initial state + let (ts, val) = slot.get(); + assert_eq!(ts, 0); + assert_eq!(val, 0); + + // Test adding values + slot.add(timestamp, 100); + let (ts, val) = slot.get(); + assert_eq!(ts, timestamp); + assert_eq!(val, 100); + + // Test accumulation with same timestamp + slot.add(timestamp, 200); + let (ts, val) = slot.get(); + assert_eq!(ts, timestamp); + assert_eq!(val, 300); + + // Test different timestamp resets value + slot.add(timestamp + 1, 50); + let (ts, val) = slot.get(); + assert_eq!(ts, timestamp + 1); + assert_eq!(val, 50); +} + +#[test] +fn test_executor_stats_slot_overflow() { + let slot = ExecutorStatsSlot::new(); + let timestamp = 2000; + + // Test saturation at u32::MAX + slot.add(timestamp, u32::MAX - 10); + slot.add(timestamp, 20); // Should saturate at u32::MAX + let (ts, val) = slot.get(); + assert_eq!(ts, timestamp); + assert_eq!(val, u32::MAX); +} + +#[test] +fn test_executor_stats_slot_concurrent() { + let slot = Arc::new(ExecutorStatsSlot::new()); + let timestamp = 4000; + let num_threads = 10; + let adds_per_thread = 100; + + let handles: Vec<_> = (0..num_threads) + .map(|_| { + let slot = slot.clone(); + Thread::spawn(move || { + for _ in 0..adds_per_thread { + slot.add(timestamp, 1); + } + }) + }) + .collect(); + + for handle in handles { + handle.join().unwrap(); + } + + // All threads should have accumulated their values + let (ts, val) = slot.get(); + assert_eq!(ts, timestamp); + assert_eq!(val, num_threads * adds_per_thread); +} + +// --- record_process function tests --- + +#[test] +fn test_record_process_single_second() { + let stats = ExecutorStats::new(); + let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1000); + let elapsed_micros = 500_000; // 500ms = 500,000 microseconds + let rows = 1000; + + stats.record_process(base_time, elapsed_micros, rows); + + let snapshot = stats.dump_snapshot(); + + // Find the slot with data + let mut found_time = false; + let mut found_rows = false; + + for (ts, micros) in snapshot.process_time { + if ts == 1000 && micros == 500_000 { + found_time = true; + break; + } + } + + for (ts, row_count) in snapshot.process_rows { + if ts == 1000 && row_count == 1000 { + found_rows = true; + break; + } + } + + assert!( + found_time, + "Should find process time recorded in single second" + ); + assert!( + found_rows, + "Should find process rows recorded in single second" + ); +} + +#[test] +fn test_record_process_proportional_allocation() { + let stats = ExecutorStats::new(); + // Start at 999.5 seconds, run for 1.0 seconds (exactly half in each second) + let base_time = SystemTime::UNIX_EPOCH + Duration::from_millis(999_500); + let elapsed_micros = 1_000_000; // 1 second = 1,000,000 microseconds + let rows = 1000; + + stats.record_process(base_time, elapsed_micros, rows); + + let snapshot = stats.dump_snapshot(); + + let mut micros_999 = 0; + let mut micros_1000 = 0; + let mut rows_999 = 0; + let mut rows_1000 = 0; + + for (ts, micros) in snapshot.process_time { + if ts == 999 { + micros_999 = micros; + } else if ts == 1000 { + micros_1000 = micros; + } + } + + for (ts, row_count) in snapshot.process_rows { + if ts == 999 { + rows_999 = row_count; + } else if ts == 1000 { + rows_1000 = row_count; + } + } + + // Due to floating point precision issues, we check ranges rather than exact values + assert!( + micros_999 + micros_1000 <= 1_000_000, + "Total micros should not exceed input" + ); + assert!( + rows_999 + rows_1000 <= 1000, + "Total rows should not exceed input" + ); + + // Each second should have some allocation + assert!(micros_999 > 0, "Second 999 should have some time allocated"); + assert!( + micros_1000 > 0, + "Second 1000 should have some time allocated" + ); + assert!(rows_999 > 0, "Second 999 should have some rows allocated"); + assert!(rows_1000 > 0, "Second 1000 should have some rows allocated"); +} diff --git a/src/common/base/tests/it/main.rs b/src/common/base/tests/it/main.rs index 9ec23a32818be..1fb7a2fec8535 100644 --- a/src/common/base/tests/it/main.rs +++ b/src/common/base/tests/it/main.rs @@ -14,6 +14,7 @@ use databend_common_base::mem_allocator::TrackingGlobalAllocator; +mod executor_stats; mod ext; mod fixed_heap; mod metrics; diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 734f8319a02ee..7928fcd83f530 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -24,6 +24,7 @@ use std::time::SystemTime; use dashmap::DashMap; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; @@ -448,6 +449,9 @@ pub trait TableContext: Send + Sync { fn set_nodes_perf(&self, _node: String, _perf: String) { unimplemented!() } + fn get_running_query_execution_stats(&self) -> Vec<(String, ExecutorStatsSnapshot)> { + unimplemented!() + } } pub type AbortChecker = Arc; diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index 6313f261f4dab..fa257bfa5d207 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use databend_common_base::runtime::drop_guard; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::ExecutorStats; use databend_common_base::runtime::QueryTimeSeriesProfile; use databend_common_base::runtime::TimeSeriesProfileName; use databend_common_exception::Result; @@ -190,7 +191,13 @@ impl InputPort { let unset_flags = HAS_DATA | NEED_DATA; match self.shared.swap(std::ptr::null_mut(), 0, unset_flags) { address if address.is_null() => None, - address => Some((*Box::from_raw(address)).0), + address => { + let block = (*Box::from_raw(address)).0; + if let Ok(data_block) = block.as_ref() { + ExecutorStats::record_thread_tracker(data_block.num_rows()); + } + Some(block) + } } } } diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index 55e6e614f7aab..0d32b7543f549 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -52,6 +52,7 @@ use databend_common_storages_system::PasswordPoliciesTable; use databend_common_storages_system::ProceduresTable; use databend_common_storages_system::ProcessesTable; use databend_common_storages_system::QueryCacheTable; +use databend_common_storages_system::QueryExecutionTable; use databend_common_storages_system::RolesTable; use databend_common_storages_system::SettingsTable; use databend_common_storages_system::StagesTable; @@ -165,6 +166,10 @@ impl SystemDatabase { sys_db_meta.next_table_id(), config.query.max_query_log_size, )), + QueryExecutionTable::create( + sys_db_meta.next_table_id(), + config.query.max_query_log_size, + ), ]); disable_system_table_load = config.query.disable_system_table_load; } else { diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index eae85cbdd53fa..48e7d3a97743b 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -23,11 +23,14 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::PoisonError; +use std::time::SystemTime; use databend_common_base::base::WatchNotify; use databend_common_base::runtime::error_info::NodeErrorType; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::ExecutorStats; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::QueryTimeSeriesProfileBuilder; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TimeSeriesProfiles; @@ -40,6 +43,7 @@ use databend_common_pipeline_core::processors::EventCause; use databend_common_pipeline_core::processors::PlanScope; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_core::PlanProfile; +use databend_common_storages_system::QueryExecutionStatsQueue; use fastrace::prelude::*; use log::debug; use log::trace; @@ -178,6 +182,7 @@ struct ExecutingGraph { finished_notify: Arc, finish_condvar_notify: Option, Condvar)>>, finished_error: Mutex>, + executor_stats: ExecutorStats, } type StateLockGuard = ExecutingGraph; @@ -193,6 +198,7 @@ impl ExecutingGraph { let mut time_series_profile_builder = QueryTimeSeriesProfileBuilder::new(query_id.to_string()); Self::init_graph(&mut pipeline, &mut graph, &mut time_series_profile_builder); + let executor_stats = ExecutorStats::new(); Ok(ExecutingGraph { graph, finished_nodes: AtomicUsize::new(0), @@ -203,6 +209,7 @@ impl ExecutingGraph { finished_notify: Arc::new(WatchNotify::new()), finish_condvar_notify, finished_error: Mutex::new(None), + executor_stats, }) } @@ -218,7 +225,7 @@ impl ExecutingGraph { for pipeline in &mut pipelines { Self::init_graph(pipeline, &mut graph, &mut time_series_profile_builder); } - + let executor_stats = ExecutorStats::new(); Ok(ExecutingGraph { finished_nodes: AtomicUsize::new(0), graph, @@ -229,6 +236,7 @@ impl ExecutingGraph { finished_notify: Arc::new(WatchNotify::new()), finish_condvar_notify, finished_error: Mutex::new(None), + executor_stats, }) } @@ -383,18 +391,19 @@ impl ExecutingGraph { if let Some(schedule_index) = need_schedule_nodes.pop_front() { let node = &locker.graph[schedule_index]; - - let event = { - let guard = ThreadTracker::tracking(node.tracking_payload.clone()); + let (event, process_rows) = { + let mut payload = node.tracking_payload.clone(); + payload.process_rows = AtomicUsize::new(0); + let guard = ThreadTracker::tracking(payload); if state_guard_cache.is_none() { state_guard_cache = Some(node.state.lock().unwrap()); } let event = node.processor.event(event_cause)?; - + let process_rows = ThreadTracker::process_rows(); match guard.flush() { - Ok(_) => Ok(event), + Ok(_) => Ok((event, process_rows)), Err(out_of_limit) => { Err(ErrorCode::PanicError(format!("{:?}", out_of_limit))) } @@ -420,6 +429,7 @@ impl ExecutingGraph { schedule_queue.push_sync(ProcessorWrapper { processor: node.processor.clone(), graph: graph.clone(), + process_rows, }); State::Processing } @@ -427,6 +437,7 @@ impl ExecutingGraph { schedule_queue.push_async(ProcessorWrapper { processor: node.processor.clone(), graph: graph.clone(), + process_rows, }); State::Processing } @@ -481,6 +492,7 @@ impl ExecutingGraph { pub struct ProcessorWrapper { pub processor: ProcessorPtr, pub graph: Arc, + pub process_rows: usize, } pub struct ScheduleQueue { @@ -616,37 +628,25 @@ impl ScheduleQueue { } } - if !self.sync_queue.is_empty() { - while let Some(processor) = self.sync_queue.pop_front() { - if processor - .graph - .can_perform_task(executor.epoch.load(Ordering::SeqCst)) - { - context.set_task(ExecutorTask::Sync(processor)); - break; - } else { - let mut tasks = VecDeque::with_capacity(1); - tasks.push_back(ExecutorTask::Sync(processor)); - global.push_tasks(context.get_worker_id(), None, tasks); - } + let mut tasks_to_global = VecDeque::with_capacity(self.sync_queue.len()); + + if let Some(processor) = self.sync_queue.pop_front() { + if processor + .graph + .can_perform_task(executor.epoch.load(Ordering::SeqCst)) + { + context.set_task(ExecutorTask::Sync(processor)); + } else { + tasks_to_global.push_back(ExecutorTask::Sync(processor)); } } - if !self.sync_queue.is_empty() { - let mut current_tasks = VecDeque::with_capacity(self.sync_queue.len()); - let mut next_tasks = VecDeque::with_capacity(self.sync_queue.len()); - while let Some(processor) = self.sync_queue.pop_front() { - if processor - .graph - .can_perform_task(executor.epoch.load(Ordering::SeqCst)) - { - current_tasks.push_back(ExecutorTask::Sync(processor)); - } else { - next_tasks.push_back(ExecutorTask::Sync(processor)); - } - } - let worker_id = context.get_worker_id(); - global.push_tasks(worker_id, Some(current_tasks), next_tasks); + // Add remaining tasks from sync queue to global queue + while let Some(processor) = self.sync_queue.pop_front() { + tasks_to_global.push_back(ExecutorTask::Sync(processor)); + } + if !tasks_to_global.is_empty() { + global.push_tasks(context.get_worker_id(), None, tasks_to_global); } } @@ -970,6 +970,25 @@ impl RunningGraph { pub fn change_priority(&self, priority: u64) { self.0.max_points.store(priority, Ordering::SeqCst); } + + pub fn record_process(&self, begin: SystemTime, elapsed_micros: usize, rows: usize) { + self.0 + .executor_stats + .record_process(begin, elapsed_micros, rows); + } + + pub fn get_query_execution_stats(&self) -> ExecutorStatsSnapshot { + self.0.executor_stats.dump_snapshot() + } +} + +impl Drop for RunningGraph { + fn drop(&mut self) { + let execution_stats = self.get_query_execution_stats(); + if let Ok(queue) = QueryExecutionStatsQueue::instance() { + let _ = queue.append_data((self.get_query_id().to_string(), execution_stats)); + } + } } impl Debug for Node { diff --git a/src/query/service/src/pipelines/executor/executor_worker_context.rs b/src/query/service/src/pipelines/executor/executor_worker_context.rs index 579c4bdc5930a..2559533d7b2c6 100644 --- a/src/query/service/src/pipelines/executor/executor_worker_context.rs +++ b/src/query/service/src/pipelines/executor/executor_worker_context.rs @@ -17,6 +17,7 @@ use std::fmt::Formatter; use std::intrinsics::assume; use std::sync::Arc; use std::time::Instant; +use std::time::SystemTime; use databend_common_base::runtime::error_info::NodeErrorType; use databend_common_base::runtime::profile::Profile; @@ -163,13 +164,16 @@ impl ExecutorWorkerContext { ) -> Result)>> { let payload = proc.graph.get_node_tracking_payload(proc.processor.id()); let guard = ThreadTracker::tracking(payload.clone()); - + let begin = SystemTime::now(); let instant = Instant::now(); proc.processor.process()?; let nanos = instant.elapsed().as_nanos(); assume(nanos < 18446744073709551615_u128); Profile::record_usize_profile(ProfileStatisticsName::CpuTime, nanos as usize); + let process_rows = proc.process_rows; + proc.graph + .record_process(begin, nanos as usize / 1_000, process_rows); if let Err(out_of_limit) = guard.flush() { return Err(ErrorCode::PanicError(format!("{:?}", out_of_limit))); diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 53a82eba8da10..a05d163c32f58 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -20,6 +20,7 @@ use std::time::Instant; use databend_common_base::base::WatchNotify; use databend_common_base::runtime::catch_unwind; use databend_common_base::runtime::defer; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; @@ -296,4 +297,15 @@ impl PipelineExecutor { } } } + + pub fn get_query_execution_stats(&self) -> ExecutorStatsSnapshot { + match self { + PipelineExecutor::QueryPipelineExecutor(executor) => { + executor.get_query_execution_stats() + } + PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => { + query_wrapper.graph.get_query_execution_stats() + } + } + } } diff --git a/src/query/service/src/pipelines/executor/queries_executor_tasks.rs b/src/query/service/src/pipelines/executor/queries_executor_tasks.rs index 61b9aefcbbf92..d491331a05696 100644 --- a/src/query/service/src/pipelines/executor/queries_executor_tasks.rs +++ b/src/query/service/src/pipelines/executor/queries_executor_tasks.rs @@ -121,7 +121,6 @@ impl QueriesExecutorTasksQueue { continue; } } - context.set_task(task); let workers_condvar = context.get_workers_condvar(); diff --git a/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs b/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs index 197288735bf33..7bbd489f560c0 100644 --- a/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs @@ -147,8 +147,13 @@ impl QueriesPipelineExecutor { let mut context = ExecutorWorkerContext::create(thread_num, workers_condvar); while !self.global_tasks_queue.is_finished() { - // When there are not enough tasks, the thread will be blocked, so we need loop check. + // Load tasks from global queue into worker context when context is empty. + // When context already contains tasks (new scheduled task triggered + // by previously executed processors in this context), + // those local tasks take priority and are executed first. while !self.global_tasks_queue.is_finished() && !context.has_task() { + // If no tasks are available in the global tasks queue, + // this steal_tasks_to_context will block here self.global_tasks_queue .steal_task_to_context(&mut context, self); } diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index b45e707f30cc8..9ff2d9fa7eb24 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -19,6 +19,7 @@ use std::time::Instant; use databend_common_base::base::tokio; use databend_common_base::runtime::catch_unwind; use databend_common_base::runtime::error_info::NodeErrorType; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::LimitMemGuard; use databend_common_base::runtime::Runtime; @@ -445,6 +446,10 @@ impl QueryPipelineExecutor { false => self.graph.fetch_profiling(None), } } + + pub fn get_query_execution_stats(&self) -> ExecutorStatsSnapshot { + self.graph.get_query_execution_stats() + } } impl Drop for QueryPipelineExecutor { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 1bb5af570df33..d2f9051a39d15 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -25,6 +25,7 @@ use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::FlightData; use async_channel::Receiver; use databend_common_base::base::GlobalInstance; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::QueryPerf; use databend_common_base::runtime::Thread; @@ -114,6 +115,30 @@ impl DataExchangeManager { }) } + pub fn get_query_execution_stats(&self) -> Vec<(String, ExecutorStatsSnapshot)> { + let mut executors = Vec::new(); + { + let queries_coordinator_guard = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; + for (query_id, query_coordinator) in queries_coordinator.iter() { + if let Some(info) = &query_coordinator.info { + if let Some(executor) = info.query_executor.clone() { + executors.push((query_id, executor)); + } + } + } + } + executors + .into_iter() + .map(|(query_id, executor)| { + ( + query_id.clone(), + executor.get_inner().get_query_execution_stats(), + ) + }) + .collect() + } + pub fn get_query_ctx(&self, query_id: &str) -> Result> { let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 1add9e6893719..654212af80ce3 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -38,6 +38,7 @@ use databend_common_base::base::ProgressValues; use databend_common_base::base::SpillProgress; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::MemStat; use databend_common_base::runtime::ThreadTracker; @@ -1066,6 +1067,12 @@ impl TableContext for QueryContext { SessionManager::instance().processes_info() } + fn get_running_query_execution_stats(&self) -> Vec<(String, ExecutorStatsSnapshot)> { + let mut all = SessionManager::instance().get_query_execution_stats(); + all.extend(DataExchangeManager::instance().get_query_execution_stats()); + all + } + fn get_queued_queries(&self) -> Vec { let queries = QueriesQueueManager::instance() .list() diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 9547f4fe1d539..ab0daff25207e 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -31,6 +31,7 @@ use databend_common_base::base::short_sql; use databend_common_base::base::Progress; use databend_common_base::base::SpillProgress; use databend_common_base::runtime::drop_guard; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::MemStat; use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::Catalog; @@ -793,6 +794,13 @@ impl QueryContextShared { } } + pub fn get_query_execution_stats(&self) -> Option { + if let Some(executor) = self.executor.read().upgrade() { + return Some(executor.get_query_execution_stats()); + } + None + } + pub fn set_query_memory_tracking(&self, mem_stat: Option>) { let mut mem_stat_guard = self.mem_stat.write(); *mem_stat_guard = mem_stat; diff --git a/src/query/service/src/sessions/session_mgr.rs b/src/query/service/src/sessions/session_mgr.rs index 34b6b6fe87423..508efe71be88d 100644 --- a/src/query/service/src/sessions/session_mgr.rs +++ b/src/query/service/src/sessions/session_mgr.rs @@ -25,6 +25,7 @@ use databend_common_base::base::tokio; use databend_common_base::base::GlobalInstance; use databend_common_base::base::SignalStream; use databend_common_base::runtime::metrics::GLOBAL_METRICS_REGISTRY; +use databend_common_base::runtime::ExecutorStatsSnapshot; use databend_common_base::runtime::LimitMemGuard; use databend_common_catalog::session_type::SessionType; use databend_common_catalog::table_context::ProcessInfoState; @@ -394,6 +395,26 @@ impl SessionManager { ))) } + pub fn get_query_execution_stats(&self) -> Vec<(String, ExecutorStatsSnapshot)> { + let mut res = Vec::new(); + for weak_ptr in self.active_sessions_snapshot() { + let Some(arc_session) = weak_ptr.upgrade() else { + continue; + }; + + let session_ctx = arc_session.session_ctx.as_ref(); + + if let Some(context_shared) = session_ctx.get_query_context_shared() { + let query_id = context_shared.init_query_id.as_ref().read().clone(); + let stats = context_shared.get_query_execution_stats(); + if let Some(stats) = stats { + res.push((query_id, stats)); + } + } + } + res + } + fn active_sessions_snapshot(&self) -> Vec> { // Here the situation is the same of method `graceful_shutdown`: // diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index a4794f78a311d..7d43dcbff6eaf 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -277,6 +277,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'node' | 'system' | 'malloc_stats_totals' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'node' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'node' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | +| 'node' | 'system' | 'query_execution' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'node' | 'system' | 'temporary_tables' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'non_unique' | 'information_schema' | 'statistics' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'null_count' | 'system' | 'columns' | 'Nullable(UInt64)' | 'BIGINT UNSIGNED' | '' | '' | 'YES' | '' | NULL | NULL | NULL | @@ -318,10 +319,13 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'position_in_unique_constraint' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'privileges' | 'information_schema' | 'columns' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'procedure_id' | 'system' | 'procedures' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL | +| 'process_rows' | 'system' | 'query_execution' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL | +| 'process_time_in_micros' | 'system' | 'query_execution' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'processed' | 'system' | 'notification_history' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | NULL | NULL | NULL | | 'query_id' | 'system' | 'backtrace' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'query_id' | 'system' | 'locks' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'query_id' | 'system' | 'query_cache' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | +| 'query_id' | 'system' | 'query_execution' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'query_id' | 'system' | 'task_history' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'range' | 'system' | 'settings' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'referenced_column_name' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | NULL | NULL | NULL | @@ -409,6 +413,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'table_version' | 'system' | 'streams' | 'Nullable(UInt64)' | 'BIGINT UNSIGNED' | '' | '' | 'YES' | '' | NULL | NULL | NULL | | 'target_features' | 'system' | 'build_options' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'time' | 'system' | 'processes' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL | +| 'timestamp' | 'system' | 'query_execution' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'total_columns' | 'system' | 'tables' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'total_columns' | 'system' | 'tables_with_history' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL | | 'type' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL | diff --git a/src/query/storages/system/src/lib.rs b/src/query/storages/system/src/lib.rs index a03c00f73f1a2..0027761efbc2f 100644 --- a/src/query/storages/system/src/lib.rs +++ b/src/query/storages/system/src/lib.rs @@ -49,6 +49,7 @@ mod password_policies_table; mod procedures_table; mod processes_table; mod query_cache_table; +mod query_execution_table; mod query_log_table; mod roles_table; mod settings_table; @@ -101,6 +102,8 @@ pub use password_policies_table::PasswordPoliciesTable; pub use procedures_table::ProceduresTable; pub use processes_table::ProcessesTable; pub use query_cache_table::QueryCacheTable; +pub use query_execution_table::QueryExecutionStatsQueue; +pub use query_execution_table::QueryExecutionTable; pub use query_log_table::LogType; pub use query_log_table::QueryLogElement; pub use roles_table::RolesTable; diff --git a/src/query/storages/system/src/query_execution_table.rs b/src/query/storages/system/src/query_execution_table.rs new file mode 100644 index 0000000000000..35025afc590c2 --- /dev/null +++ b/src/query/storages/system/src/query_execution_table.rs @@ -0,0 +1,216 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::SystemTime; + +use databend_common_base::runtime::ExecutorStatsSnapshot; +use databend_common_catalog::table::DistributionLevel; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::types::UInt32Type; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::TableSchemaRefExt; +use databend_common_meta_app::schema::TableIdent; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; + +use crate::SyncOneBlockSystemTable; +use crate::SyncSystemTable; +use crate::SystemLogElement; +use crate::SystemLogQueue; + +pub type QueryExecutionStatsQueue = SystemLogQueue; +pub type QueryExecutionStatsElement = (String, ExecutorStatsSnapshot); + +impl SystemLogElement for QueryExecutionStatsElement { + const TABLE_NAME: &'static str = "DUMMY"; + + fn schema() -> TableSchemaRef { + unimplemented!("Only used log queue, not a table"); + } + + fn fill_to_data_block( + &self, + _columns: &mut Vec, + ) -> databend_common_exception::Result<()> { + unimplemented!("Only used log queue, not a table"); + } +} + +pub struct QueryExecutionTable { + table_info: TableInfo, +} + +#[async_trait::async_trait] +impl SyncSystemTable for QueryExecutionTable { + const NAME: &'static str = "system.query_execution"; + + const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Warehouse; + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + fn get_full_data(&self, ctx: Arc) -> Result { + let mut running = ctx.get_running_query_execution_stats(); + let archive = self.get_archive()?; + running.extend(archive); + let local_id = ctx.get_cluster().local_id.clone(); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as u32; + let valid_time_range = now - 10..now; + + let mut rows_by_timestamp: HashMap> = HashMap::new(); + let mut times_by_timestamp: HashMap> = HashMap::new(); + + for (query_id, stats) in running { + aggregate_stats_by_timestamp( + stats.process_rows, + &query_id, + &valid_time_range, + &mut rows_by_timestamp, + ); + aggregate_stats_by_timestamp( + stats.process_time, + &query_id, + &valid_time_range, + &mut times_by_timestamp, + ); + } + + let columns = self.build_data_columns( + local_id, + valid_time_range, + &rows_by_timestamp, + ×_by_timestamp, + )?; + + Ok(DataBlock::new_from_columns(columns)) + } +} + +impl QueryExecutionTable { + pub fn create(table_id: u64, max_rows: usize) -> Arc { + let schema = TableSchemaRefExt::create(vec![ + TableField::new("node", TableDataType::String), + TableField::new("timestamp", TableDataType::Timestamp), + TableField::new("query_id", TableDataType::String), + TableField::new( + "process_rows", + TableDataType::Number(NumberDataType::UInt64), + ), + TableField::new( + "process_time_in_micros", + TableDataType::Number(NumberDataType::UInt64), + ), + ]); + + let table_info = TableInfo { + desc: "'system'.'query_execution'".to_string(), + name: "query_execution".to_string(), + ident: TableIdent::new(table_id, 0), + meta: TableMeta { + schema, + engine: "SystemQueryExecution".to_string(), + ..Default::default() + }, + ..Default::default() + }; + + QueryExecutionStatsQueue::init(max_rows); + + SyncOneBlockSystemTable::create(QueryExecutionTable { table_info }) + } + + pub fn get_archive(&self) -> Result> { + let archive = QueryExecutionStatsQueue::instance()? + .data + .read() + .event_queue + .iter() + .filter_map(|e| e.as_ref().map(|e| e.clone())) + .collect(); + Ok(archive) + } + + fn build_data_columns( + &self, + local_id: String, + valid_time_range: std::ops::Range, + rows_by_timestamp: &HashMap>, + times_by_timestamp: &HashMap>, + ) -> Result> { + let mut nodes = Vec::new(); + let mut timestamps = Vec::new(); + let mut query_ids = Vec::new(); + let mut process_rows = Vec::new(); + let mut process_times = Vec::new(); + + let empty_map = HashMap::new(); + for timestamp in valid_time_range { + let rows_for_timestamp = rows_by_timestamp.get(×tamp).unwrap_or(&empty_map); + let times_for_timestamp = times_by_timestamp.get(×tamp).unwrap_or(&empty_map); + + let mut all_query_ids = std::collections::HashSet::new(); + all_query_ids.extend(rows_for_timestamp.keys()); + all_query_ids.extend(times_for_timestamp.keys()); + + for query_id in all_query_ids { + nodes.push(local_id.clone()); + timestamps.push(timestamp as i64 * 1_000_000); + query_ids.push(query_id.clone()); + process_rows.push(rows_for_timestamp.get(query_id).copied().unwrap_or(0)); + process_times.push(times_for_timestamp.get(query_id).copied().unwrap_or(0)); + } + } + + Ok(vec![ + StringType::from_data(nodes), + TimestampType::from_data(timestamps), + StringType::from_data(query_ids), + UInt32Type::from_data(process_rows), + UInt32Type::from_data(process_times), + ]) + } +} + +fn aggregate_stats_by_timestamp( + stats_data: Vec<(u32, u32)>, + query_id: &str, + valid_time_range: &std::ops::Range, + target_map: &mut HashMap>, +) { + for (timestamp, value) in stats_data { + if !valid_time_range.contains(×tamp) { + continue; + } + target_map + .entry(timestamp) + .or_default() + .insert(query_id.to_string(), value); + } +} diff --git a/tests/logging/test-history-tables.sh b/tests/logging/test-history-tables.sh index 23e7367d76484..4bcc9b27a4e41 100644 --- a/tests/logging/test-history-tables.sh +++ b/tests/logging/test-history-tables.sh @@ -110,42 +110,3 @@ if [ "$response2_data" != "[]" ] && [ "$response2_data" != "null" ]; then else echo "✓ response2 data field is empty as expected" fi - -# **External -> Internal**: should not reset - -echo "Kill databend-query-1" -PORT=9091 - -# Find the PID listening on the specified port -PID=$(lsof -t -i :$PORT) - -# Check if a PID was found -if [ -z "$PID" ]; then - echo "No process found listening on port $PORT." -else - echo "Found process with PID $PID listening on port $PORT. Killing it..." - kill -9 "$PID" - echo "Process $PID killed." -fi - -echo 'Restart databend-query node-1' -nohup env RUST_BACKTRACE=1 target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-1.toml --internal-enable-sandbox-tenant >./.databend/query-1.out 2>&1 & - -echo "Waiting on node-1..." -python3 scripts/ci/wait_tcp.py --timeout 30 --port 9091 - -response3=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"select count(*) from system_history.query_history;\"}") -response3_data=$(echo "$response3" | jq -r '.data') - -if [ "$response3_data" = "[]" ] || [ "$response3_data" = "null" ] || [ "$response3_data" = "0" ]; then - echo "ERROR: response3 data field is empty or 0 but should contain data" - echo "response3: $response3" - exit 1 -else - echo "✓ response3 data field contains data as expected" -fi - -sleep 15 - -echo "Running test queries to test external history tables" -./tests/logging/check_logs_table.sh