Skip to content

Conversation

@easwars
Copy link
Contributor

@easwars easwars commented Oct 3, 2025

Fixes #8125

The original race in the xDS client:

  • Resource watch is cancelled by the user of the xdsClient (e.g. xdsResolver)
  • xdsClient removes the resource from its cache and queues an unsubscribe request to the ADS stream.
  • A watch for the same resource is registered immediately, and the xdsClient instructs the ADS stream to subscribe (as it's not in cache).
  • The ADS stream sends a redundant request (same resources, version, nonce) which the management server ignores.
  • The new resource watch sees a "resource-not-found" error once the watch timer fires.

The original fix:

Delay the resource's removal from the cache until the unsubscribe request was transmitted over the wire, a change implemented in #8369. However, this solution introduced new complications:

  • The resource's removal from the xdsClient's cache became an asynchronous operation, occurring while the unsubscribe request was being sent.
  • This asynchronous behavior meant the state maintained within the ADS stream could still diverge from the cache's state.
  • A critical section was absent between the ADS stream's message transmission logic and the xdsClient's cache access, which is performed during subscription/unsubscription by its users.

The root cause of the previous seen races can be put down two things:

  • Batching of writes for subscribe and unsubscribe calls
    • After batching, it may appear that nothing has changed in the list of subscribed resources, even though a resource was removed and added again, and therefore the management server would not send any response. It is important that the management server see the exact sequence of subscribe and unsubscribe calls.
  • State maintained in the ADS stream going out of sync with the state maintained in the resource cache

How does this PR address the above issue?

This PR simplifies the implementation of the ADS stream by removing two pieces of functionality

  • Stop batching of writes on the ADS stream
    • If the user registers multiple watches, e.g. resource A, B, and C, the stream would now send three requests: [A], [A B], [A B C].
  • Queue the exact request to be sent out based on the current state
    • As part of handling a subscribe/unsubscribe request, the ADS stream implementation will queue the exact request to be sent out. When asynchronously sending the request out, it will not use the current state, but instead just write the queued request on the wire.
  • Don't buffer writes when waiting for flow control
    • Flow control is already blocking reads from the stream. Blocking writes as well during this period might provide some additional flow control, but not much, and removing this logic simplifies the stream implementation quite a bit.

RELEASE NOTES:

  • xdsclient: fix a race in the xdsClient that could lead to resource-not-found errors

@easwars easwars added Type: Bug Area: xDS Includes everything xDS related, including LB policies used with xDS. labels Oct 3, 2025
@easwars easwars added this to the 1.77 Release milestone Oct 3, 2025
@codecov
Copy link

codecov bot commented Oct 3, 2025

Codecov Report

❌ Patch coverage is 85.50725% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.32%. Comparing base (0d49384) to head (364c27f).
⚠️ Report is 7 commits behind head on master.

Files with missing lines Patch % Lines
internal/xds/clients/xdsclient/ads_stream.go 86.56% 5 Missing and 4 partials ⚠️
internal/xds/clients/internal/buffer/unbounded.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8627      +/-   ##
==========================================
+ Coverage   83.22%   83.32%   +0.10%     
==========================================
  Files         417      415       -2     
  Lines       32180    32114      -66     
==========================================
- Hits        26781    26759      -22     
+ Misses       4025     3996      -29     
+ Partials     1374     1359      -15     
Files with missing lines Coverage Δ
internal/xds/clients/xdsclient/channel.go 81.25% <100.00%> (ø)
internal/xds/clients/internal/buffer/unbounded.go 91.89% <0.00%> (-8.11%) ⬇️
internal/xds/clients/xdsclient/ads_stream.go 84.89% <86.56%> (-1.94%) ⬇️

... and 33 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@easwars easwars requested review from arjan-bal and dfawley October 7, 2025 23:32
@easwars
Copy link
Contributor Author

easwars commented Oct 7, 2025

@danielzhaotongliu

@arjan-bal
Copy link
Contributor

Could you explain how the new logic prevents the following race condition that can cause a channel to briefly enter TRANSIENT_FAILURE?
I'm thinking of this specific scenario:

  1. Channel 0, which is watching LDS resource L0, is closed. The xDS client queues an unsubscribe request for L0 to be sent to the management server.
  2. Immediately after, a new channel (Channel 1) starts and its xDS resolver registers a watch for the exact same resource, L0. The client now queues a new subscription request for L0.
  3. Before the management server processes the new subscription for L0, it sends a DiscoveryResponse based on the previous state (where the client had unsubscribed). This response therefore excludes L0.
  4. The xDS client receives this response and notifies the active watcher for L0 (belonging to Channel 1) that the resource is missing. This causes Channel 1 to enter TRANSIENT_FAILURE until the next update from the server includes L0.

How does this PR ensure that the watcher for Channel 1 isn't incorrectly notified of a missing resource in this situation?

@arjan-bal arjan-bal assigned easwars and unassigned arjan-bal Oct 13, 2025
@easwars
Copy link
Contributor Author

easwars commented Oct 13, 2025

The above condition that you mention will not lead to Channel 1 entering TRANSIENT_FAILURE. This case is taken care of explicitly here:

if state.cache == nil {

In fact this is case that can happen even without any of the race conditions involving the xDS client and this is also explicitly mentioned in the envoy xDS documentation here: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist

Hope this helps.

@easwars easwars assigned arjan-bal and unassigned easwars Oct 13, 2025
Copy link
Contributor

@arjan-bal arjan-bal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@arjan-bal arjan-bal removed their assignment Oct 16, 2025
@arjan-bal
Copy link
Contributor

The above condition that you mention will not lead to Channel 1 entering TRANSIENT_FAILURE. This case is taken care of explicitly here:

if state.cache == nil {

In fact this is case that can happen even without any of the race conditions involving the xDS client and this is also explicitly mentioned in the envoy xDS documentation here: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist

Hope this helps.

I see. The difference in the scenario I asked about and the problem caused by the initial fix is the presence of the LDS resource in the cache. If L0 is present in the cache but not in the latest update from the management server, the channel is put in TF. However, if L0 is not present in the cache, we wait for the expiry timer to expire.

@arjan-bal
Copy link
Contributor

Though the PR description doesn't explicitly call out how the change addresses the race condition, my understanding is that the race is caused because the writes for subscribe and unsubscribe calls are batched. After batching, it may appear that nothing has changed in the list of subscribed resources, even though a resource was removed and added again. By removing batching of writes, the management server will see the unsubscription and re-subscription of the resource and send the necessary update.

@easwars
Copy link
Contributor Author

easwars commented Oct 16, 2025

If L0 is present in the cache but not in the latest update from the management server, the channel is put in TF. However, if L0 is not present in the cache, we wait for the expiry timer to expire.

That is true.

And the first part about the resource being in the cache, but not in a response from the management server resulting in the channel moving to TF applies only to LDS and CDS. For RDS and EDS, if a similar scenario happens, we would simply continue using the resource in the cache.

@easwars
Copy link
Contributor Author

easwars commented Oct 16, 2025

Though the PR description doesn't explicitly call out how the change addresses the race condition, my understanding is that the race is caused because the writes for subscribe and unsubscribe calls are batched. After batching, it may appear that nothing has changed in the list of subscribed resources, even though a resource was removed and added again. By removing batching of writes, the management server will see the unsubscription and re-subscription of the resource and send the necessary update.

Your understanding is correct and I've clarified the exact underlying issue causing the race and how the change addresses them in the PR description. Thanks.

//
// It's expected to be used in scenarios where the buffered data is no longer
// relevant, and needs to be cleared.
func (b *Unbounded) Reset() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inherently racy with other calls to Put() (and Load(), of course). I hope we are using it correctly! :) And if we have an external lock that is used for all things that call Put and Reset, I wonder if a different data structure might be more efficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inherently racy with other calls to Put() (and Load(), of course). I hope we are using it correctly!

I had the same question. My understanding is that Reset acts as an optimization to avoid redundant requests, but it isn't required for correctness.

Even so, you're right about the potential for races. I think adding a godoc comment to warn about this is a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've ended up using a list type from the stdlib for the implementation of the request queue, which will now be guarded by the adsStreamImpls mutex.

While this eliminates the possibility of the race (which existed in the previous approach), the use of a condition variable to know when the request queue is empty (and blocking until it becomes non-empty), and the fact that sync.Cond doesn't allow blocking in a select with other ways of getting unblocked, the code has become a bit uglier and more complicated in my opinion.

Another option would be to create a new type like SynchronizedUnboundedBuffer that takes a sync.Locker and guards access to all of its methods with given lock. But even in that case, since Get returns a channel, it is not a viable option I believe.

Another reason I went with the stdlib list instead of trying to synchronize access to the unbounded buffer (while also supporting a Reset method) was that the unbounded buffer is currently thread-safe and adding a new method which moves the responsibility of synchronization to the caller didn't seem very appealing.

Would like to hear your thoughts on these.

Copy link
Contributor

@arjan-bal arjan-bal Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of a similar approach that uses a channel instead of a sync.Cond.

Add the following fields in adsStreamImpl:

  1. notifySender: make(chan struct{}, 1). This channel will be used by subscribe and unsubscribe to inform the send goroutine that there "may" be buffered requests in the queue.
  2. pendingRequests: []request{}. This will be protected by mu.

subscribe() and unsubscribe will do the following:

  1. Acquire the mu lock.
  2. Append to the pendingRequests slice.
  3. Release the mu lock.
  4. Send to the notifySender channel (with a default case to avoid blocking).

The send goroutine will do the following:

  1. Run a select statement waiting on s.streamCh and s.notifySender.
    1. If the s.streamCh case is selected: Call sendExisting, which will acquire the mu lock and swap out pendingRequests with an empty slice.
    2. If the s.notifySender case is selected: Acquire the mu lock, swap out pendingRequests keeping the original slice, release the mu lock, and then call sendNew for each request in the original slice.

Notice that sending on the notifySender channel is done after appending to the pendingRequests slice. This ensures that the send goroutine doesn't miss any notifications. This is because if there's an object in the notifySender channel, it's guaranteed that at some point in the future, the send goroutine will read from the notifySender channel, acquire mu, and see the new elements in the pendingRequests slice.

Do you think this makes the implementation simpler, and still correct?

state, ok := s.resourceTypeState[typ]
if !ok {
// State is created when the first subscription for this type is made.
panic(fmt.Sprintf("no state exists for resource type %v", typ))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a production panic? Or should this be log.Errorf() (which fails all tests) and a return <some error> (or nil since the stream is still usable, depending on what it means to return an error?)? The latter feels safer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, we were not even checking if the value existed in the map before. We were blindly using it. So, the potential for a real panic seems almost non-existent. Literally, the ADS stream implementation adds the entry to the map when it first sees a subscribe request for this resource type and then queues the request. So, we are definitely going to have the entry in the map. I'll remove the panic, so it doesn't cause confusion to the reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we never remove entries from this map. So, we are safe.


// Clear any queued requests. Previously subscribed to resources will be
// resent below.
s.requestCh.Reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to mandate that s.mu is held when calling Put or Reset?

Maybe we should wrap accesses to it somehow to guarantee that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied on the other comment.

@dfawley dfawley assigned easwars and unassigned dfawley Oct 27, 2025
@dfawley
Copy link
Member

dfawley commented Oct 27, 2025

(Whoops, forgot to reassign after reviewing.)

@easwars easwars force-pushed the xdsclient_unsubscribe_resubscribe_race branch from 7152711 to 3a11e3e Compare October 29, 2025 21:26
@arjan-bal arjan-bal removed this from the 1.77 Release milestone Oct 30, 2025
@arjan-bal arjan-bal added this to the 1.78 Release milestone Oct 30, 2025
@easwars easwars force-pushed the xdsclient_unsubscribe_resubscribe_race branch from 3a11e3e to 364c27f Compare October 30, 2025 18:44
Copy link
Contributor Author

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the rebase and the force push. I had to do that since I had to import the changes here to google3 to ensure that the new changes don't introduce any flakiness and since a whole bunch of changes have gone into the repo since i first sent this PR out, it was untenable to do that.

state, ok := s.resourceTypeState[typ]
if !ok {
// State is created when the first subscription for this type is made.
panic(fmt.Sprintf("no state exists for resource type %v", typ))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, we were not even checking if the value existed in the map before. We were blindly using it. So, the potential for a real panic seems almost non-existent. Literally, the ADS stream implementation adds the entry to the map when it first sees a subscribe request for this resource type and then queues the request. So, we are definitely going to have the entry in the map. I'll remove the panic, so it doesn't cause confusion to the reader.

state, ok := s.resourceTypeState[typ]
if !ok {
// State is created when the first subscription for this type is made.
panic(fmt.Sprintf("no state exists for resource type %v", typ))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we never remove entries from this map. So, we are safe.

//
// It's expected to be used in scenarios where the buffered data is no longer
// relevant, and needs to be cleared.
func (b *Unbounded) Reset() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've ended up using a list type from the stdlib for the implementation of the request queue, which will now be guarded by the adsStreamImpls mutex.

While this eliminates the possibility of the race (which existed in the previous approach), the use of a condition variable to know when the request queue is empty (and blocking until it becomes non-empty), and the fact that sync.Cond doesn't allow blocking in a select with other ways of getting unblocked, the code has become a bit uglier and more complicated in my opinion.

Another option would be to create a new type like SynchronizedUnboundedBuffer that takes a sync.Locker and guards access to all of its methods with given lock. But even in that case, since Get returns a channel, it is not a viable option I believe.

Another reason I went with the stdlib list instead of trying to synchronize access to the unbounded buffer (while also supporting a Reset method) was that the unbounded buffer is currently thread-safe and adding a new method which moves the responsibility of synchronization to the caller didn't seem very appealing.

Would like to hear your thoughts on these.


// Clear any queued requests. Previously subscribed to resources will be
// resent below.
s.requestCh.Reset()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied on the other comment.

@easwars easwars assigned dfawley and arjan-bal and unassigned easwars Oct 30, 2025
@arjan-bal arjan-bal assigned easwars and unassigned arjan-bal Oct 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area: xDS Includes everything xDS related, including LB policies used with xDS. Type: Bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

xdsclient: race around resource subscriptions and unsubscsriptions

3 participants