Skip to content

Commit 1fa6576

Browse files
committed
storage: Make render sources take a proper struct
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 68bb5ba commit 1fa6576

File tree

3 files changed

+43
-40
lines changed

3 files changed

+43
-40
lines changed

src/dataflow-types/src/client.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,23 +153,26 @@ pub struct CreateSourceCommand<T> {
153153
pub ts_bindings: Vec<(PartitionId, T, crate::sources::MzOffset)>,
154154
}
155155

156+
/// A command to render a single source
157+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
158+
pub struct RenderSourcesCommand<T> {
159+
/// A human-readable name.
160+
pub debug_name: String,
161+
/// The dataflow's ID.
162+
pub dataflow_id: uuid::Uuid,
163+
/// An optional frontier to which the input should be advanced.
164+
pub as_of: Option<Antichain<T>>,
165+
/// Sources instantiations made available to the dataflow.
166+
pub source_imports: BTreeMap<GlobalId, SourceInstanceDesc<T>>,
167+
}
168+
156169
/// Commands related to the ingress and egress of collections.
157170
#[derive(Clone, Debug, Serialize, Deserialize)]
158171
pub enum StorageCommand<T = mz_repr::Timestamp> {
159172
/// Create the enumerated sources, each associated with its identifier.
160173
CreateSources(Vec<CreateSourceCommand<T>>),
161174
/// Render the enumerated sources.
162-
///
163-
/// Each source has a name for debugging purposes, an optional "as of" frontier and collection
164-
/// of sources to import.
165-
RenderSources(
166-
Vec<(
167-
/* debug_name */ String,
168-
/* dataflow_id */ uuid::Uuid,
169-
/* as_of */ Option<Antichain<T>>,
170-
/* source_imports*/ BTreeMap<GlobalId, SourceInstanceDesc<T>>,
171-
)>,
172-
),
175+
RenderSources(Vec<RenderSourcesCommand<T>>),
173176
/// Enable compaction in storage-managed collections.
174177
///
175178
/// Each entry in the vector names a collection and provides a frontier after which

src/storage/src/boundary/boundary.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,13 @@ pub use boundary_hook::BoundaryHook;
114114
mod boundary_hook {
115115
use std::collections::BTreeMap;
116116
use std::fmt;
117+
use std::iter::once;
117118

118119
use async_trait::async_trait;
119120

120-
use mz_dataflow_types::client::{GenericClient, StorageCommand, StorageResponse};
121+
use mz_dataflow_types::client::{
122+
GenericClient, RenderSourcesCommand, StorageCommand, StorageResponse,
123+
};
121124
use mz_dataflow_types::sources::SourceDesc;
122125
use mz_dataflow_types::{SourceInstanceDesc, SourceInstanceId, SourceInstanceRequest};
123126
use mz_expr::GlobalId;
@@ -169,23 +172,22 @@ mod boundary_hook {
169172
for source in sources.iter() {
170173
if let Some(requests) = self.pending.remove(&source.id) {
171174
render_requests.extend(requests.into_iter().map(|request| {
172-
(
173-
format!(
175+
RenderSourcesCommand {
176+
debug_name: format!(
174177
"SourceDataflow({:?}, {:?})",
175178
request.dataflow_id, request.source_id
176179
),
177-
request.dataflow_id,
178-
Some(request.as_of.clone()),
179-
Some((
180+
dataflow_id: request.dataflow_id,
181+
as_of: Some(request.as_of.clone()),
182+
source_imports: once((
180183
request.source_id,
181184
SourceInstanceDesc {
182185
description: source.desc.clone(),
183186
arguments: request.arguments,
184187
},
185188
))
186-
.into_iter()
187189
.collect(),
188-
)
190+
}
189191
}));
190192
}
191193
self.sources.insert(source.id, source.desc.clone());
@@ -211,15 +213,15 @@ mod boundary_hook {
211213
let unique_id = request.unique_id();
212214
if !self.suppress.contains_key(&unique_id) {
213215
if let Some(source) = self.sources.get(&request.source_id) {
214-
let command = StorageCommand::RenderSources(vec![(
215-
format!("SourceDataflow({:?}, {:?})", request.dataflow_id, request.source_id),
216-
request.dataflow_id,
217-
Some(request.as_of.clone()),
218-
Some((request.source_id, SourceInstanceDesc {
216+
let command = StorageCommand::RenderSources(vec![RenderSourcesCommand {
217+
debug_name: format!("SourceDataflow({:?}, {:?})", request.dataflow_id, request.source_id),
218+
dataflow_id: request.dataflow_id,
219+
as_of: Some(request.as_of.clone()),
220+
source_imports: once((request.source_id, SourceInstanceDesc {
219221
description: source.clone(),
220222
arguments: request.arguments,
221-
})).into_iter().collect(),
222-
)]);
223+
})).collect(),
224+
}]);
223225
self.client.send(command).await.unwrap()
224226
} else {
225227
self.pending.entry(request.source_id).or_insert(Vec::new()).push(request);

src/storage/src/storage_state.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//! Worker-local state for storage timely instances.
77
88
use std::cell::RefCell;
9-
use std::collections::{BTreeMap, HashMap};
9+
use std::collections::HashMap;
1010
use std::rc::{Rc, Weak};
1111
use std::time::{Duration, Instant};
1212

@@ -22,11 +22,11 @@ use tokio::sync::mpsc;
2222
use tracing::{debug, trace};
2323

2424
use mz_dataflow_types::client::{
25-
CreateSourceCommand, StorageCommand, StorageResponse, TimestampBindingFeedback,
25+
CreateSourceCommand, RenderSourcesCommand, StorageCommand, StorageResponse,
26+
TimestampBindingFeedback,
2627
};
2728
use mz_dataflow_types::sources::AwsExternalId;
2829
use mz_dataflow_types::sources::{ExternalSourceConnector, SourceConnector};
29-
use mz_dataflow_types::SourceInstanceDesc;
3030
use mz_expr::{GlobalId, PartitionId};
3131
use mz_ore::now::NowFn;
3232
use mz_persist::client::RuntimeClient;
@@ -117,7 +117,7 @@ pub struct ActiveStorageState<'a, A: Allocate, B: StorageCapture> {
117117

118118
impl<'a, A: Allocate, B: StorageCapture> ActiveStorageState<'a, A, B> {
119119
/// Sets up the timestamp binding machinery if needed for this source
120-
fn setup_timestamp_binding_state(&mut self, source: &CreateSourceCommand<u64>) {
120+
fn setup_timestamp_binding_state(&mut self, source: &CreateSourceCommand<Timestamp>) {
121121
let ts_history = if let SourceConnector::External {
122122
connector,
123123
ts_frequency,
@@ -347,16 +347,14 @@ impl<'a, A: Allocate, B: StorageCapture> ActiveStorageState<'a, A, B> {
347347
}
348348
}
349349

350-
fn build_storage_dataflow(
351-
&mut self,
352-
dataflows: Vec<(
353-
String,
354-
uuid::Uuid,
355-
Option<Antichain<Timestamp>>,
356-
BTreeMap<GlobalId, SourceInstanceDesc>,
357-
)>,
358-
) {
359-
for (debug_name, dataflow_id, as_of, source_imports) in dataflows {
350+
fn build_storage_dataflow(&mut self, dataflows: Vec<RenderSourcesCommand<Timestamp>>) {
351+
for RenderSourcesCommand {
352+
debug_name,
353+
dataflow_id,
354+
as_of,
355+
source_imports,
356+
} in dataflows
357+
{
360358
crate::render::build_storage_dataflow(
361359
self.timely_worker,
362360
&mut self.storage_state,

0 commit comments

Comments
 (0)