Skip to content

refactor(query): reduce memory usage during aggregate spill #17550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 85 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
f953fff
refactor(query): refactor aggreagte spill code
zhang2014 Mar 4, 2025
d6e7d7c
refactor(query): refactor aggreagte spill code
zhang2014 Mar 5, 2025
259fec8
refactor(query): refactor aggreagte spill code
zhang2014 Mar 6, 2025
3e397cd
refactor(query): refactor aggreagte spill code
zhang2014 Mar 11, 2025
e03ce66
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
e57cbe1
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Mar 13, 2025
be7c1da
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
b7cf072
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
bf59696
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
b77c041
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
78a0199
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
22dad6e
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
417f64d
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
4193c6c
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
20ca2a9
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
472aa37
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
2f62d3d
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
3a0652b
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
9a56d22
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
552bcda
refactor(query): refactor aggreagte spill code
zhang2014 Mar 19, 2025
7d46563
refactor(query): refactor aggreagte spill code
zhang2014 Mar 19, 2025
f6465f3
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Mar 19, 2025
5f1fbbc
refactor(query): refactor aggreagte spill code
zhang2014 Mar 19, 2025
d61d925
refactor(query): refactor aggreagte spill code
zhang2014 Mar 20, 2025
1206211
refactor(query): refactor aggreagte spill code
zhang2014 Mar 24, 2025
0374a7f
refactor(query): refactor aggreagte spill code
zhang2014 Mar 24, 2025
ce6e754
refactor(query): refactor aggreagte spill code
zhang2014 Mar 28, 2025
6a53857
refactor(query): refactor aggreagte spill code
zhang2014 Mar 29, 2025
713d2c7
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Mar 30, 2025
31037d1
refactor(query): refactor aggreagte spill code
zhang2014 Mar 30, 2025
7038f10
Merge branch 'refactor/aggregate_spill' of github.com:zhang2014/dataf…
zhang2014 Mar 30, 2025
f687164
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
7e5963f
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
eaee69a
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Apr 1, 2025
55f522d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
2a7f2bc
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
7af4255
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
9728660
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
b37b273
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
5ae50c7
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
62bf28b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
cd8160a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
0da9b3b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 3, 2025
715a50d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 3, 2025
404af38
refactor(query): refactor aggreagte spill code
zhang2014 Apr 3, 2025
48237d6
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
00f9096
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
e39e64b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
a6b2dc2
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
8ae9c4c
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
b6b4fd9
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
b8d280d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
dcbcd53
refactor(query): refactor aggreagte spill code
zhang2014 Apr 6, 2025
9dc9d1d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 6, 2025
b90b6c1
refactor(query): refactor aggreagte spill code
zhang2014 Apr 6, 2025
be1e1ba
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Apr 6, 2025
34d6770
refactor(query): refactor aggreagte spill code
zhang2014 Apr 7, 2025
e03481a
Merge branch 'refactor/aggregate_spill' of github.com:zhang2014/dataf…
zhang2014 Apr 7, 2025
6612c9d
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Apr 7, 2025
b90f249
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
31069ce
Merge branch 'refactor/aggregate_spill' of github.com:zhang2014/dataf…
zhang2014 Apr 8, 2025
76eabfe
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
12ba91b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
98b58d8
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
b54d32a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
6a6bace
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
1f80959
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
2d6bf9a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
b8fad43
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
bb7ca69
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
2be5e4a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
1551f0e
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
c7bc7da
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
b06e8ef
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
66ade37
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
0b9305a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
87c911b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 13, 2025
d827b8c
refactor(query): refactor aggreagte spill code
zhang2014 Apr 13, 2025
d7e2a36
refactor(query): refactor aggreagte spill code
zhang2014 Apr 13, 2025
eb7cb15
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
5090728
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
580440b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
9631855
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
d8fe94d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
68cf7c0
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 111 additions & 99 deletions Cargo.lock

Large diffs are not rendered by default.

40 changes: 16 additions & 24 deletions src/common/base/src/runtime/memory/mem_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ pub struct MemStat {
name: Option<String>,

pub(crate) used: AtomicI64,
pub(crate) peek_used: AtomicI64,
pub(crate) peak_used: AtomicI64,

/// The limit of max used memory for this tracker.
///
/// Set to 0 to disable the limit.
limit: AtomicI64,

parent_memory_stat: Vec<Arc<MemStat>>,
parent_memory_stat: Option<Arc<MemStat>>,
}

impl MemStat {
Expand All @@ -56,17 +56,17 @@ impl MemStat {
id: 0,
name: None,
used: AtomicI64::new(0),
peek_used: AtomicI64::new(0),
peak_used: AtomicI64::new(0),
limit: AtomicI64::new(0),
parent_memory_stat: vec![],
parent_memory_stat: None,
}
}

pub fn create(name: String) -> Arc<MemStat> {
MemStat::create_child(name, vec![])
MemStat::create_child(name, None)
}

pub fn create_child(name: String, parent_memory_stat: Vec<Arc<MemStat>>) -> Arc<MemStat> {
pub fn create_child(name: String, parent_memory_stat: Option<Arc<MemStat>>) -> Arc<MemStat> {
let id = match GlobalSequence::next() {
0 => GlobalSequence::next(),
id => id,
Expand All @@ -76,16 +76,12 @@ impl MemStat {
id,
name: Some(name),
used: AtomicI64::new(0),
peek_used: AtomicI64::new(0),
peak_used: AtomicI64::new(0),
limit: AtomicI64::new(0),
parent_memory_stat,
})
}

pub fn get_parent_memory_stat(&self) -> Vec<Arc<MemStat>> {
self.parent_memory_stat.clone()
}

pub fn set_limit(&self, mut size: i64) {
// It may cause the process unable to run if memory limit is too low.
if size > 0 && size < MINIMUM_MEMORY_LIMIT {
Expand All @@ -107,19 +103,15 @@ impl MemStat {
let mut used = self.used.fetch_add(batch_memory_used, Ordering::Relaxed);

used += batch_memory_used;
self.peek_used.fetch_max(used, Ordering::Relaxed);
self.peak_used.fetch_max(used, Ordering::Relaxed);

for (idx, parent_memory_stat) in self.parent_memory_stat.iter().enumerate() {
if let Some(parent_memory_stat) = self.parent_memory_stat.as_ref() {
if let Err(cause) = parent_memory_stat
.record_memory::<NEED_ROLLBACK>(batch_memory_used, current_memory_alloc)
{
if NEED_ROLLBACK {
// We only roll back the memory that alloc failed
self.used.fetch_sub(current_memory_alloc, Ordering::Relaxed);

for index in 0..idx {
self.parent_memory_stat[index].rollback(current_memory_alloc);
}
}

return Err(cause);
Expand All @@ -142,8 +134,8 @@ impl MemStat {
pub fn rollback(&self, memory_usage: i64) {
self.used.fetch_sub(memory_usage, Ordering::Relaxed);

for parent_memory_stat in &self.parent_memory_stat {
parent_memory_stat.rollback(memory_usage)
if let Some(parent_memory_stat) = &self.parent_memory_stat {
parent_memory_stat.rollback(memory_usage);
}
}

Expand Down Expand Up @@ -171,7 +163,7 @@ impl MemStat {

#[inline]
pub fn get_peek_memory_usage(&self) -> i64 {
self.peek_used.load(Ordering::Relaxed)
self.peak_used.load(Ordering::Relaxed)
}
}

Expand Down Expand Up @@ -268,7 +260,7 @@ mod tests {
fn test_multiple_level_mem_stat() -> Result<()> {
let mem_stat = MemStat::create("TEST".to_string());
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));

mem_stat.record_memory::<false>(1, 1).unwrap();
mem_stat.record_memory::<false>(2, 2).unwrap();
Expand All @@ -292,7 +284,7 @@ mod tests {
let mem_stat = MemStat::create("TEST".to_string());
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2);
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));
child_mem_stat.set_limit(MINIMUM_MEMORY_LIMIT);

mem_stat.record_memory::<false>(1, 1).unwrap();
Expand Down Expand Up @@ -322,7 +314,7 @@ mod tests {
let mem_stat = MemStat::create("TEST".to_string());
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT);
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));
child_mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2);

assert!(child_mem_stat
Expand All @@ -335,7 +327,7 @@ mod tests {
let mem_stat = MemStat::create("TEST".to_string());
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2);
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));
child_mem_stat.set_limit(MINIMUM_MEMORY_LIMIT);

assert!(child_mem_stat
Expand Down
4 changes: 2 additions & 2 deletions src/common/base/src/runtime/memory/stat_buffer_global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl GlobalStatBuffer {
.used
.fetch_add(memory_usage, Ordering::Relaxed);
self.global_mem_stat
.peek_used
.peak_used
.fetch_max(used + memory_usage, Ordering::Relaxed);
return Ok(());
}
Expand Down Expand Up @@ -126,7 +126,7 @@ impl GlobalStatBuffer {
.used
.fetch_add(memory_usage, Ordering::Relaxed);
self.global_mem_stat
.peek_used
.peak_used
.fetch_max(used + memory_usage, Ordering::Relaxed);
return;
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/base/src/runtime/memory/stat_buffer_mem_stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl MemStatBuffer {
if self.destroyed_thread_local_macro {
let used = mem_stat.used.fetch_add(usage, Ordering::Relaxed);
mem_stat
.peek_used
.peak_used
.fetch_max(used + usage, Ordering::Relaxed);
return Ok(());
}
Expand Down Expand Up @@ -134,7 +134,7 @@ impl MemStatBuffer {
if self.destroyed_thread_local_macro {
let used = mem_stat.used.fetch_add(memory_usage, Ordering::Relaxed);
mem_stat
.peek_used
.peak_used
.fetch_max(used + memory_usage, Ordering::Relaxed);
return;
}
Expand Down
14 changes: 14 additions & 0 deletions src/query/expression/src/aggregate/partitioned_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::sync::Arc;

use bumpalo::Bump;
use itertools::Itertools;
use serde::Deserializer;
use serde::Serializer;

use super::payload::Payload;
use super::probe_state::ProbeState;
Expand Down Expand Up @@ -50,6 +52,18 @@ pub struct PartitionedPayload {
unsafe impl Send for PartitionedPayload {}
unsafe impl Sync for PartitionedPayload {}

impl serde::Serialize for PartitionedPayload {
fn serialize<S: Serializer>(&self, _: S) -> Result<S::Ok, S::Error> {
unreachable!("PartitionedPayload must not be exchanged between multiple nodes.")
}
}

impl<'de> serde::Deserialize<'de> for PartitionedPayload {
fn deserialize<D: Deserializer<'de>>(_: D) -> Result<Self, D::Error> {
unreachable!("PartitionedPayload must not be exchanged between multiple nodes.")
}
}

impl PartitionedPayload {
pub fn new(
group_types: Vec<DataType>,
Expand Down
36 changes: 30 additions & 6 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ impl Payload {
);
}

pub fn scatter(&self, state: &mut PayloadFlushState, partition_count: usize) -> bool {
pub fn scatter_with_seed<const SEED: u64>(
&self,
state: &mut PayloadFlushState,
partitions: usize,
) -> bool {
if state.flush_page >= self.pages.len() {
return false;
}
Expand All @@ -397,23 +401,27 @@ impl Payload {
state.flush_page += 1;
state.flush_page_row = 0;
state.row_count = 0;
return self.scatter(state, partition_count);
return self.scatter_with_seed::<SEED>(state, partitions);
}

let end = (state.flush_page_row + BATCH_SIZE).min(page.rows);
let rows = end - state.flush_page_row;
state.row_count = rows;

state.probe_state.reset_partitions(partition_count);
state.probe_state.reset_partitions(partitions);

let mods: StrengthReducedU64 = StrengthReducedU64::new(partitions as u64);

let mods: StrengthReducedU64 = StrengthReducedU64::new(partition_count as u64);
for idx in 0..rows {
state.addresses[idx] = self.data_ptr(page, idx + state.flush_page_row);

let hash = unsafe { read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };
let mut hash = unsafe { read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };

let partition_idx = (hash % mods) as usize;
if SEED != 0 {
hash = Self::combine_hash(hash, SEED);
}

let partition_idx = (hash % mods) as usize;
let sel = &mut state.probe_state.partition_entries[partition_idx];
sel[state.probe_state.partition_count[partition_idx]] = idx;
state.probe_state.partition_count[partition_idx] += 1;
Expand All @@ -422,6 +430,10 @@ impl Payload {
true
}

pub fn scatter(&self, state: &mut PayloadFlushState, partitions: usize) -> bool {
self.scatter_with_seed::<0>(state, partitions)
}

pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
let fake_rows = fake_rows.unwrap_or(0);
let columns = (0..self.aggrs.len())
Expand All @@ -434,6 +446,18 @@ impl Payload {
.collect_vec();
DataBlock::new_from_columns(columns)
}

#[allow(unused_parens)]
fn combine_hash(hash: u64, seed: u64) -> u64 {
static KMUL: u64 = 0x9ddfea08eb382d69;

let mut a = (seed ^ hash).wrapping_mul(KMUL);
a ^= (a >> 47);

let mut b = (hash ^ a).wrapping_mul(KMUL);
b ^= (b >> 47);
b.wrapping_mul(KMUL)
}
}

impl Drop for Payload {
Expand Down
16 changes: 0 additions & 16 deletions src/query/expression/src/aggregate/payload_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,6 @@ impl Payload {
Ok(Some(DataBlock::new_from_columns(cols)))
}

pub fn group_by_flush_all(&self) -> Result<DataBlock> {
let mut state = PayloadFlushState::default();
let mut blocks = vec![];

while self.flush(&mut state) {
let cols = state.take_group_columns();
blocks.push(DataBlock::new_from_columns(cols));
}

if blocks.is_empty() {
return Ok(self.empty_block(None));
}

DataBlock::concat(&blocks)
}

pub fn flush(&self, state: &mut PayloadFlushState) -> bool {
if state.flush_page >= self.pages.len() {
return false;
Expand Down
2 changes: 2 additions & 0 deletions src/query/pipeline/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#![feature(once_cell_try)]
#![feature(variant_count)]
#![feature(associated_type_defaults)]
#![feature(adt_const_params)]
#![feature(let_chains)]
#![allow(clippy::arc_with_non_send_sync)]
#![allow(clippy::useless_asref)]

Expand Down
14 changes: 11 additions & 3 deletions src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;

use databend_common_base::base::tokio::sync::Barrier;
use databend_common_base::runtime::defer;
use databend_common_base::runtime::drop_guard;
use databend_common_exception::ErrorCode;
Expand Down Expand Up @@ -456,11 +457,18 @@ impl Pipeline {
let input_len = pipe.output_length;
let mut items = Vec::with_capacity(input_len);

for _index in 0..input_len {
let barrier = Arc::new(Barrier::new(input_len));
for index in 0..input_len {
let input = InputPort::create();
let outputs: Vec<_> = (0..n).map(|_| OutputPort::create()).collect();
items.push(PipeItem::create(
PartitionProcessor::create(input.clone(), outputs.clone(), exchange.clone()),
PartitionProcessor::create(
input.clone(),
outputs.clone(),
exchange.clone(),
index,
barrier.clone(),
),
vec![input],
outputs,
));
Expand All @@ -481,7 +489,7 @@ impl Pipeline {
let output = OutputPort::create();
let inputs: Vec<_> = (0..input_len).map(|_| InputPort::create()).collect();
items.push(PipeItem::create(
MergePartitionProcessor::create(
MergePartitionProcessor::<T>::create(
inputs.clone(),
output.clone(),
exchange.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ pub use resize_processor::create_resize_item;
pub use resize_processor::ResizeProcessor;
pub use shuffle_processor::Exchange;
pub use shuffle_processor::MergePartitionProcessor;
pub use shuffle_processor::MultiwayStrategy;
pub use shuffle_processor::PartitionProcessor;
pub use shuffle_processor::ShuffleProcessor;
20 changes: 20 additions & 0 deletions src/query/pipeline/core/src/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ pub trait Processor: Send {
Err(ErrorCode::Unimplemented("Unimplemented async_process."))
}

fn prepare_spill_payload(&mut self) -> Result<bool> {
Err(ErrorCode::Unimplemented(
"Unimplemented prepare_spill_payload",
))
}

async fn flush_spill_payload(&mut self) -> Result<bool> {
Err(ErrorCode::Unimplemented(
"Unimplemented flush_spill_payload",
))
}

fn configure_peer_nodes(&mut self, _nodes: &[String]) {
// do nothing by default
}

fn details_status(&self) -> Option<String> {
None
}
Expand Down Expand Up @@ -198,6 +214,10 @@ impl ProcessorPtr {
.boxed()
}

pub fn configure_peer_nodes(&self, nodes: &[String]) {
unsafe { (*self.inner.get()).configure_peer_nodes(nodes) }
}

/// # Safety
pub unsafe fn details_status(&self) -> Option<String> {
(*self.inner.get()).details_status()
Expand Down
Loading
Loading