-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53785][SS] Memory Source for RTM #52502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2799945
to
4bf7398
Compare
137390c
to
20bdfa6
Compare
import org.apache.spark.util.{Clock, SystemClock} | ||
|
||
/* The singleton object to control the time in testing */ | ||
object LowLatencyClock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Planning to add additional code to this file i.e. the actual implementation of RealTimeStreamScanExec. Adding LowLatencyClock here since it is used by LowLatencyMemorySource. Trying to keep the PRs small :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean you will add the code of RealTimeStreamScanExec in other PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the follow PR to provide more context:
#52620
7d2f24f
to
20bdfa6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @HeartSaVioR
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change.
* available. | ||
* | ||
* If numPartitions is provided, the rows will be redistributed to the given number of partitions. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this doc is same and copied from MemoryStream
. Do we need to update the doc of MemoryStream
so it can be more specified or reflect the difference between it and LowLatencyMemoryStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me remove this. The difference is described in the class LowLatencyMemoryStream
override def planInputPartitions(start: OffsetV2): Array[InputPartition] = { | ||
val startOffset = start.asInstanceOf[LowLatencyMemoryStreamOffset] | ||
synchronized { | ||
val endpointName = s"ContinuousRecordEndpoint-${java.util.UUID.randomUUID()}-$id" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RealTimeRecordEndpoint? Or LowLatencyRecordEndpoint? As this is not for ContinuousStream, ContinuousRecordEndpoint looks a bit confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rename but the reason is the ContinuousRecordEndpoint is borrowed from "ContinuousMemoryStream".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, oh, you are reusing ContinuousRecordEndpoint
. I see.
val startOffset = start.asInstanceOf[LowLatencyMemoryStreamOffset] | ||
val endOffset = end.asInstanceOf[LowLatencyMemoryStreamOffset] | ||
synchronized { | ||
val endpointName = s"ContinuousRecordEndpoint-${java.util.UUID.randomUUID()}-$id" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
...src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
Outdated
Show resolved
Hide resolved
with SupportsRealTimeMode { | ||
private implicit val formats: Formats = Serialization.formats(NoTypeHints) | ||
|
||
// LowLatencyReader implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment supposed to be here? Seems unrelated to the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove
} | ||
|
||
override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): OffsetV2 = { | ||
LowLatencyMemoryStreamOffset((0 until numPartitions).map(i => (i, records(i).size)).toMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to add synchronized
to latestOffset
as it also access records
? Although it just reads, but the size
might be inconsistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting point. For RTM, offset returned from latestOffset is actually not used. The offset returned from latestOffset defines the end offset of a batch for non-rtm streaming queries. In RTM, the end offset of a batch is calculated when the batch finishes. However, this source also support non-RTM queries. Though in streaming we typically use StreamTest framework that executes test actions and batches in synchronized steps so any race should not happen. Though for best practices I will add synchronized to the method.
|
||
override def reset(): Unit = { | ||
super.reset() | ||
records.foreach(_.clear()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need synchronized
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add
8c8b48a
to
f962a13
Compare
f962a13
to
f9bace5
Compare
@viirya thank you for the review. I have addressed your comments. PTAL! |
Merged to master. Thanks @jerrypeng |
@viirya thank you for the review and merging! |
What changes were proposed in this pull request?
Add a memory source implementation that support Real-time Mode. This source is going to be used to test Real-time Mode. In this implementation of the memory source, a RPC server is set up on the driver and source tasks will constantly pull this RPC server for new data. This differs from the existing memory source implementation as data is sent once to tasks as part of the Partition/Split metadata at the beginning of a batch.
Why are the changes needed?
To test Real-time Mode queries.
Does this PR introduce any user-facing change?
No, this source is purely to help testing.
How was this patch tested?
Actual unit tests to test RTM will be added in the future once the engine supports actually running queries in RTM.
Was this patch authored or co-authored using generative AI tooling?
No