-
Notifications
You must be signed in to change notification settings - Fork 663
feat: refreshable source, SourceExec part #22620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
e00eb0f
to
b3d8058
Compare
75e6f3a
to
09dedc4
Compare
b3d8058
to
0a9032a
Compare
09dedc4
to
5b518e5
Compare
6743e88
to
e0f1777
Compare
5b518e5
to
a409585
Compare
a409585
to
6f04b2b
Compare
e0f1777
to
de09f3b
Compare
6f04b2b
to
ca59314
Compare
de09f3b
to
3707c1b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: StreamChunk Filtering Blocks Batch Source Completion
The SourceExecutor
incorrectly filters out empty chunks (cardinality = 0). This prevents the StreamChunk::default()
signal, which batch sources use to indicate completion, from propagating downstream, thereby preventing proper reporting of load completion for batch sources.
src/stream/src/executor/source/source_executor.rs#L810-L814
risingwave/src/stream/src/executor/source/source_executor.rs
Lines 810 to 814 in 3707c1b
prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns); | |
if card > 0 { | |
yield Message::Chunk(chunk); | |
} | |
self.try_flush_data().await?; |
Bugbot free trial expires on July 29, 2025
Learn more in the Cursor dashboard.
Was this report helpful? Give feedback by reacting with 👍 or 👎
ca59314
to
e06c1e8
Compare
3707c1b
to
293dd5e
Compare
e06c1e8
to
553fc73
Compare
293dd5e
to
9969797
Compare
553fc73
to
23ca770
Compare
9969797
to
462614d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements the SourceExec part of the refreshable source feature, which allows batch sources to be refreshed with new data. The implementation focuses on adding mechanisms to track and report when batch sources have finished loading data.
- Added
BatchSourceSplit
trait and implementation for batch posix fs connector as a testing framework - Introduced load finish reporting mechanism through barrier system to notify when batch sources complete data loading
- Added refresh functionality to trigger re-enumeration of splits when refresh commands are received
Reviewed Changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
src/stream/src/task/barrier_worker/mod.rs | Added load_finished_source_ids field to BarrierCompleteResult for tracking completed batch sources |
src/stream/src/task/barrier_worker/managed_state.rs | Enhanced barrier state management to handle source load finished events and track them per epoch |
src/stream/src/task/barrier_manager/mod.rs | Added ReportSourceLoadFinished event and methods for reporting load completion |
src/stream/src/executor/source/source_executor.rs | Implemented batch source detection, refresh logic, and load finish reporting in SourceExecutor |
src/stream/src/executor/source/reader_stream.rs | Added logic to mark batch splits as finished when stream consumption completes |
src/stream/src/executor/source/executor_core.rs | Added batch source detection and batch split retrieval methods to StreamSourceCore |
src/meta/src/barrier/worker.rs | Updated GlobalBarrierWorker to include barrier scheduler for handling load finish commands |
src/meta/src/barrier/context/context_impl.rs | Implemented handling of load finished source IDs by scheduling LoadFinish commands |
src/meta/src/barrier/complete_task.rs | Added load_finished_source_ids field to CompleteBarrierTask for async handling |
src/connector/src/source/filesystem/opendal_source/batch_posix_fs_source.rs | New batch posix fs connector implementation for testing refreshable sources |
src/connector/src/source/batch.rs | Added BatchSourceSplit trait and BatchSourceSplitImpl enum for batch source abstractions |
src/connector/src/source/filesystem/opendal_source/batch_posix_fs_source.rs
Outdated
Show resolved
Hide resolved
894d2ee
to
bfd6233
Compare
// TODO: enable this when implementation done | ||
// self.get(UPSTREAM_SOURCE_KEY) | ||
// .map(|s| s.eq_ignore_ascii_case(BATCH_POSIX_FS_CONNECTOR)) | ||
// .unwrap_or(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We enable this after the whole feature is done, i.e., in #22695 (which has e2e test)
|
||
// handle_load_finished_source_ids is called when collecting barrier, | ||
// so we need to spawn a new task to yield a LoadFinish barrier to avoid deadlock. | ||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wenym1 Is it ok to inject a command within collect_barrier like this(using a spawn task)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we spawn a task here without awaiting on it, I guess here we only want to schedule a command and don't need to wait for the command to be executed? If so, it will be more straightforward to just enqueue the command to BarrierScheduler
without waiting on the notifier. Such operation does not even require async context, because the queue is protected by a sync lock.
We can add a new method in BarrierScheduler
with name like run_command_no_wait
, which calls self.push
with the command and an empty notifier, and then return immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
src/connector/src/source/filesystem/opendal_source/batch_posix_fs_source.rs
Outdated
Show resolved
Hide resolved
src/connector/src/source/filesystem/opendal_source/batch_posix_fs_source.rs
Outdated
Show resolved
Hide resolved
bfd6233
to
47f55da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6
) -> Self { | ||
let is_batch_source = source_desc_builder | ||
.with_properties() | ||
.is_refreshable_connector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we try unifying the terminology (batch vs refreshable)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which one do you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or just "refreshable batch source"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or "batch source" + "refreshable table"?
break 'consume; | ||
} | ||
} | ||
} | ||
if !is_error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other cases where the connector read stream can be finite (that is, finishes without error)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, for normal connector e.g., datagen
, they are finite. Before this change, it will be retried indefinitely.
Looks like this PR extends new SQL syntax or updates existing ones. Make sure that:
|
47f55da
to
f1e5606
Compare
f1e5606
to
82f268b
Compare
92a451a
to
be94be6
Compare
pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> { | ||
#[expect(clippy::match_single_binding)] | ||
match self { | ||
// SplitImpl::BatchPosixFs(batch_posix_fs_split) => { | ||
// Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split)) | ||
// } | ||
_ => None, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment here because we separate the batch posix file source PR, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
StreamChunk::empty( | ||
self.source_desc | ||
.columns | ||
.iter() | ||
.map(|c| c.data_type.clone()) | ||
.collect_vec() | ||
.as_slice(), | ||
), | ||
latest_splits_info.clone(), | ||
); | ||
break 'build_consume_loop; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to send an empty chunk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature of the stream is StreamChunkWithState
, i.e., (StreamChunk, HashMap<SplitId, SplitImpl>)
.
What I want to send is the latest_splits_info
here (Then in the main loop of SourceExecutor
, it can check batch_split.finished()
). The least invasive change I can come up with is to pair an empty chunk here.. 🤪
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BugenZhao would you like to take another look?
Signed-off-by: xxchan <xxchan22f@gmail.com>
be94be6
to
0266dcb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me confirm the behavior:
- upon source receives the
RefreshBatchSplits
mutation, it converts the current split assignment to batch_split, then rebuild the reader to start with state in batch_split. (but lacks how to define a finite stream) - In the re-snapshot process, the intermediate state is also written to state table, and can resume if recover happens
- report to local barrier manager if re-snapshot is done. How we switch back to normal splits?
debug_assert_eq!( | ||
self.latest_split_info.len(), | ||
1, | ||
"batch source should have only one split" | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the statement holds? I dont see how we define refreshable-connector and I don't see each connector parallelism is assigned to exactly one split.
.collect_vec() | ||
.as_slice(), | ||
), | ||
latest_splits_info.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this state written to state table?
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Second part of #22690. See the tracking issue for the whole picture of the user journey.
Source part.
Key changes include:
BatchSourceSplit
interface in connector, and use it in SourceExec.batch_posix_fs
connector for testing purporseLoadFinish
barrier: Similar to MV creation progress reporting, SourceExec will report load finish to barrier worker, and reported to meta as part ofBarrierCompleteResponse
. When handling this, meta will trigger aLoadFinish
barrier.Checklist
Documentation