Skip to content

feat: refreshable source, SourceExec part #22620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

xxchan
Copy link
Member

@xxchan xxchan commented Jul 16, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Second part of #22690. See the tracking issue for the whole picture of the user journey.

Source part.

Key changes include:

  • Added a new BatchSourceSplit interface in connector, and use it in SourceExec.
  • Added a new batch_posix_fs connector for testing purporse
  • Added a mechanism to trigger LoadFinish barrier: Similar to MV creation progress reporting, SourceExec will report load finish to barrier worker, and reported to meta as part of BarrierCompleteResponse. When handling this, meta will trigger a LoadFinish barrier.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • I have checked the Release Timeline and Currently Supported Versions to determine which release branches I need to cherry-pick this PR into.

Documentation

  • My PR needs documentation updates.

Copy link
Member Author

xxchan commented Jul 16, 2025

@github-actions github-actions bot added the type/feature Type: New feature. label Jul 16, 2025
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from e00eb0f to b3d8058 Compare July 22, 2025 05:37
@xxchan xxchan force-pushed the xxchan/protective-jackal branch from 75e6f3a to 09dedc4 Compare July 22, 2025 05:37
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from b3d8058 to 0a9032a Compare July 22, 2025 06:27
@xxchan xxchan force-pushed the xxchan/protective-jackal branch from 09dedc4 to 5b518e5 Compare July 22, 2025 06:42
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch 3 times, most recently from 6743e88 to e0f1777 Compare July 22, 2025 08:35
@xxchan xxchan force-pushed the xxchan/protective-jackal branch from 5b518e5 to a409585 Compare July 22, 2025 08:35
@xxchan xxchan force-pushed the xxchan/protective-jackal branch from a409585 to 6f04b2b Compare July 22, 2025 10:07
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from e0f1777 to de09f3b Compare July 22, 2025 10:07
@xxchan xxchan marked this pull request as ready for review July 23, 2025 02:17
@xxchan xxchan force-pushed the xxchan/protective-jackal branch from 6f04b2b to ca59314 Compare July 23, 2025 02:19
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from de09f3b to 3707c1b Compare July 23, 2025 02:19
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: StreamChunk Filtering Blocks Batch Source Completion

The SourceExecutor incorrectly filters out empty chunks (cardinality = 0). This prevents the StreamChunk::default() signal, which batch sources use to indicate completion, from propagating downstream, thereby preventing proper reporting of load completion for batch sources.

src/stream/src/executor/source/source_executor.rs#L810-L814

prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
if card > 0 {
yield Message::Chunk(chunk);
}
self.try_flush_data().await?;

Fix in CursorFix in Web


Bugbot free trial expires on July 29, 2025
Learn more in the Cursor dashboard.

Was this report helpful? Give feedback by reacting with 👍 or 👎

@xxchan xxchan force-pushed the xxchan/protective-jackal branch from ca59314 to e06c1e8 Compare July 23, 2025 02:40
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from 3707c1b to 293dd5e Compare July 23, 2025 02:40
@xxchan xxchan force-pushed the xxchan/protective-jackal branch from e06c1e8 to 553fc73 Compare July 23, 2025 02:56
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from 293dd5e to 9969797 Compare July 23, 2025 02:56
@xxchan xxchan force-pushed the xxchan/protective-jackal branch from 553fc73 to 23ca770 Compare July 23, 2025 05:19
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from 9969797 to 462614d Compare July 23, 2025 05:19
@xxchan xxchan requested review from Copilot and tabVersion July 25, 2025 07:33
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements the SourceExec part of the refreshable source feature, which allows batch sources to be refreshed with new data. The implementation focuses on adding mechanisms to track and report when batch sources have finished loading data.

  • Added BatchSourceSplit trait and implementation for batch posix fs connector as a testing framework
  • Introduced load finish reporting mechanism through barrier system to notify when batch sources complete data loading
  • Added refresh functionality to trigger re-enumeration of splits when refresh commands are received

Reviewed Changes

Copilot reviewed 32 out of 32 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/stream/src/task/barrier_worker/mod.rs Added load_finished_source_ids field to BarrierCompleteResult for tracking completed batch sources
src/stream/src/task/barrier_worker/managed_state.rs Enhanced barrier state management to handle source load finished events and track them per epoch
src/stream/src/task/barrier_manager/mod.rs Added ReportSourceLoadFinished event and methods for reporting load completion
src/stream/src/executor/source/source_executor.rs Implemented batch source detection, refresh logic, and load finish reporting in SourceExecutor
src/stream/src/executor/source/reader_stream.rs Added logic to mark batch splits as finished when stream consumption completes
src/stream/src/executor/source/executor_core.rs Added batch source detection and batch split retrieval methods to StreamSourceCore
src/meta/src/barrier/worker.rs Updated GlobalBarrierWorker to include barrier scheduler for handling load finish commands
src/meta/src/barrier/context/context_impl.rs Implemented handling of load finished source IDs by scheduling LoadFinish commands
src/meta/src/barrier/complete_task.rs Added load_finished_source_ids field to CompleteBarrierTask for async handling
src/connector/src/source/filesystem/opendal_source/batch_posix_fs_source.rs New batch posix fs connector implementation for testing refreshable sources
src/connector/src/source/batch.rs Added BatchSourceSplit trait and BatchSourceSplitImpl enum for batch source abstractions

@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from 894d2ee to bfd6233 Compare July 25, 2025 08:15
Comment on lines +207 to +210
// TODO: enable this when implementation done
// self.get(UPSTREAM_SOURCE_KEY)
// .map(|s| s.eq_ignore_ascii_case(BATCH_POSIX_FS_CONNECTOR))
// .unwrap_or(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We enable this after the whole feature is done, i.e., in #22695 (which has e2e test)


// handle_load_finished_source_ids is called when collecting barrier,
// so we need to spawn a new task to yield a LoadFinish barrier to avoid deadlock.
tokio::spawn(async move {
Copy link
Contributor

@chenzl25 chenzl25 Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenym1 Is it ok to inject a command within collect_barrier like this(using a spawn task)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we spawn a task here without awaiting on it, I guess here we only want to schedule a command and don't need to wait for the command to be executed? If so, it will be more straightforward to just enqueue the command to BarrierScheduler without waiting on the notifier. Such operation does not even require async context, because the queue is protected by a sync lock.

We can add a new method in BarrierScheduler with name like run_command_no_wait, which calls self.push with the command and an empty notifier, and then return immediately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from bfd6233 to 47f55da Compare July 28, 2025 07:11
Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6

Comment on lines 65 to +69
) -> Self {
let is_batch_source = source_desc_builder
.with_properties()
.is_refreshable_connector();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try unifying the terminology (batch vs refreshable)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which one do you prefer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just "refreshable batch source"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or "batch source" + "refreshable table"?

break 'consume;
}
}
}
if !is_error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other cases where the connector read stream can be finite (that is, finishes without error)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, for normal connector e.g., datagen, they are finite. Before this change, it will be retried indefinitely.

Base automatically changed from xxchan/protective-jackal to main July 28, 2025 08:27
Copy link

graphite-app bot commented Jul 28, 2025

Looks like this PR extends new SQL syntax or updates existing ones. Make sure that:

  • Test cases about the new/updated syntax are added in src/sqlparser/tests/testdata. Especially, double check the formatted_sql is still a valid SQL #20713
  • The meaning of each enum variant is documented in PR description. Additionally, document what it means when each optional clause is omitted.

@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from 47f55da to f1e5606 Compare July 28, 2025 08:41
@xxchan xxchan mentioned this pull request Jul 28, 2025
8 tasks
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from f1e5606 to 82f268b Compare July 28, 2025 09:01
@xxchan xxchan requested a review from BugenZhao July 28, 2025 09:01
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch 5 times, most recently from 92a451a to be94be6 Compare July 29, 2025 15:39
Comment on lines +709 to +717
pub fn into_batch_split(self) -> Option<BatchSourceSplitImpl> {
#[expect(clippy::match_single_binding)]
match self {
// SplitImpl::BatchPosixFs(batch_posix_fs_split) => {
// Some(BatchSourceSplitImpl::BatchPosixFs(batch_posix_fs_split))
// }
_ => None,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment here because we separate the batch posix file source PR, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Comment on lines +232 to +242
StreamChunk::empty(
self.source_desc
.columns
.iter()
.map(|c| c.data_type.clone())
.collect_vec()
.as_slice(),
),
latest_splits_info.clone(),
);
break 'build_consume_loop;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to send an empty chunk?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature of the stream is StreamChunkWithState, i.e., (StreamChunk, HashMap<SplitId, SplitImpl>).

What I want to send is the latest_splits_info here (Then in the main loop of SourceExecutor, it can check batch_split.finished()). The least invasive change I can come up with is to pair an empty chunk here.. 🤪

Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member Author

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BugenZhao would you like to take another look?

xxchan added 2 commits July 30, 2025 15:27
Signed-off-by: xxchan <xxchan22f@gmail.com>
@xxchan xxchan force-pushed the 07-14-feat_refreshable_source_sourceexec_part branch from be94be6 to 0266dcb Compare July 30, 2025 07:32
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me confirm the behavior:

  1. upon source receives the RefreshBatchSplits mutation, it converts the current split assignment to batch_split, then rebuild the reader to start with state in batch_split. (but lacks how to define a finite stream)
  2. In the re-snapshot process, the intermediate state is also written to state table, and can resume if recover happens
  3. report to local barrier manager if re-snapshot is done. How we switch back to normal splits?

Comment on lines +92 to +96
debug_assert_eq!(
self.latest_split_info.len(),
1,
"batch source should have only one split"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the statement holds? I dont see how we define refreshable-connector and I don't see each connector parallelism is assigned to exactly one split.

.collect_vec()
.as_slice(),
),
latest_splits_info.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this state written to state table?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants