Skip to content

Conversation

jerrypeng
Copy link
Contributor

@jerrypeng jerrypeng commented Oct 2, 2025

What changes were proposed in this pull request?

Add a memory source implementation that support Real-time Mode. This source is going to be used to test Real-time Mode. In this implementation of the memory source, a RPC server is set up on the driver and source tasks will constantly pull this RPC server for new data. This differs from the existing memory source implementation as data is sent once to tasks as part of the Partition/Split metadata at the beginning of a batch.

Why are the changes needed?

To test Real-time Mode queries.

Does this PR introduce any user-facing change?

No, this source is purely to help testing.

How was this patch tested?

Actual unit tests to test RTM will be added in the future once the engine supports actually running queries in RTM.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot removed the CORE label Oct 13, 2025
@jerrypeng jerrypeng changed the title [WIP] [SPARK-53785][SS] Memory Source for RTM [SPARK-53785][SS] Memory Source for RTM Oct 14, 2025
import org.apache.spark.util.{Clock, SystemClock}

/* The singleton object to control the time in testing */
object LowLatencyClock {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Planning to add additional code to this file i.e. the actual implementation of RealTimeStreamScanExec. Adding LowLatencyClock here since it is used by LowLatencyMemorySource. Trying to keep the PRs small :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean you will add the code of RealTimeStreamScanExec in other PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the follow PR to provide more context:
#52620

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

* available.
*
* If numPartitions is provided, the rows will be redistributed to the given number of partitions.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this doc is same and copied from MemoryStream. Do we need to update the doc of MemoryStream so it can be more specified or reflect the difference between it and LowLatencyMemoryStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me remove this. The difference is described in the class LowLatencyMemoryStream

override def planInputPartitions(start: OffsetV2): Array[InputPartition] = {
val startOffset = start.asInstanceOf[LowLatencyMemoryStreamOffset]
synchronized {
val endpointName = s"ContinuousRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RealTimeRecordEndpoint? Or LowLatencyRecordEndpoint? As this is not for ContinuousStream, ContinuousRecordEndpoint looks a bit confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename but the reason is the ContinuousRecordEndpoint is borrowed from "ContinuousMemoryStream".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, oh, you are reusing ContinuousRecordEndpoint. I see.

val startOffset = start.asInstanceOf[LowLatencyMemoryStreamOffset]
val endOffset = end.asInstanceOf[LowLatencyMemoryStreamOffset]
synchronized {
val endpointName = s"ContinuousRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

with SupportsRealTimeMode {
private implicit val formats: Formats = Serialization.formats(NoTypeHints)

// LowLatencyReader implementation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment supposed to be here? Seems unrelated to the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove

}

override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): OffsetV2 = {
LowLatencyMemoryStreamOffset((0 until numPartitions).map(i => (i, records(i).size)).toMap)
Copy link
Member

@viirya viirya Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to add synchronized to latestOffset as it also access records? Although it just reads, but the size might be inconsistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting point. For RTM, offset returned from latestOffset is actually not used. The offset returned from latestOffset defines the end offset of a batch for non-rtm streaming queries. In RTM, the end offset of a batch is calculated when the batch finishes. However, this source also support non-RTM queries. Though in streaming we typically use StreamTest framework that executes test actions and batches in synchronized steps so any race should not happen. Though for best practices I will add synchronized to the method.


override def reset(): Unit = {
super.reset()
records.foreach(_.clear())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need synchronized too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add

@jerrypeng
Copy link
Contributor Author

@viirya thank you for the review. I have addressed your comments. PTAL!

@jerrypeng jerrypeng requested a review from viirya October 15, 2025 21:43
@viirya viirya closed this in 8499a62 Oct 17, 2025
@viirya
Copy link
Member

viirya commented Oct 17, 2025

Merged to master. Thanks @jerrypeng

@jerrypeng
Copy link
Contributor Author

@viirya thank you for the review and merging!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants