- 
                Notifications
    You must be signed in to change notification settings 
- Fork 4.6k
xdsclient: stop batching writes on the ADS stream #8627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
xdsclient: stop batching writes on the ADS stream #8627
Conversation
| Codecov Report❌ Patch coverage is  
 Additional details and impacted files@@            Coverage Diff             @@
##           master    #8627      +/-   ##
==========================================
+ Coverage   83.22%   83.32%   +0.10%     
==========================================
  Files         417      415       -2     
  Lines       32180    32114      -66     
==========================================
- Hits        26781    26759      -22     
+ Misses       4025     3996      -29     
+ Partials     1374     1359      -15     
 🚀 New features to boost your workflow:
 | 
| Could you explain how the new logic prevents the following race condition that can cause a channel to briefly enter  
 How does this PR ensure that the watcher for Channel 1 isn't incorrectly notified of a missing resource in this situation? | 
| The above condition that you mention will not lead to Channel 1 entering  
 In fact this is case that can happen even without any of the race conditions involving the xDS client and this is also explicitly mentioned in the envoy xDS documentation here: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#knowing-when-a-requested-resource-does-not-exist Hope this helps. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| 
 I see. The difference in the scenario I asked about and the problem caused by the initial fix is the presence of the LDS resource in the cache. If L0 is present in the cache but not in the latest update from the management server, the channel is put in TF. However, if L0 is not present in the cache, we wait for the expiry timer to expire. | 
| Though the PR description doesn't explicitly call out how the change addresses the race condition, my understanding is that the race is caused because the writes for subscribe and unsubscribe calls are batched. After batching, it may appear that nothing has changed in the list of subscribed resources, even though a resource was removed and added again. By removing batching of writes, the management server will see the unsubscription and re-subscription of the resource and send the necessary update. | 
| 
 That is true. And the first part about the resource being in the cache, but not in a response from the management server resulting in the channel moving to TF applies only to LDS and CDS. For RDS and EDS, if a similar scenario happens, we would simply continue using the resource in the cache. | 
| 
 Your understanding is correct and I've clarified the exact underlying issue causing the race and how the change addresses them in the PR description. Thanks. | 
| // | ||
| // It's expected to be used in scenarios where the buffered data is no longer | ||
| // relevant, and needs to be cleared. | ||
| func (b *Unbounded) Reset() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inherently racy with other calls to Put() (and Load(), of course).  I hope we are using it correctly! :)  And if we have an external lock that is used for all things that call Put and Reset, I wonder if a different data structure might be more efficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inherently racy with other calls to Put() (and Load(), of course). I hope we are using it correctly!
I had the same question. My understanding is that Reset acts as an optimization to avoid redundant requests, but it isn't required for correctness.
Even so, you're right about the potential for races. I think adding a godoc comment to warn about this is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've ended up using a list type from the stdlib for the implementation of the request queue, which will now be guarded by the adsStreamImpls mutex.
While this eliminates the possibility of the race (which existed in the previous approach), the use of a condition variable to know when the request queue is empty (and blocking until it becomes non-empty), and the fact that sync.Cond doesn't allow blocking in a select with other ways of getting unblocked, the code has become a bit uglier and more complicated in my opinion.
Another option would be to create a new type like SynchronizedUnboundedBuffer that takes a sync.Locker and guards access to all of its methods with given lock. But even in that case, since Get returns a channel, it is not a viable option I believe.
Another reason I went with the stdlib list instead of trying to synchronize access to the unbounded buffer (while also supporting a Reset method) was that the unbounded buffer is currently thread-safe and adding a new method which moves the responsibility of synchronization to the caller didn't seem very appealing.
Would like to hear your thoughts on these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of a similar approach that uses a channel instead of a sync.Cond.
Add the following fields in adsStreamImpl:
- notifySender: make(chan struct{}, 1). This channel will be used by- subscribeand- unsubscribeto inform the- sendgoroutine that there "may" be buffered requests in the queue.
- pendingRequests: []request{}. This will be protected by- mu.
subscribe() and unsubscribe will do the following:
- Acquire the mulock.
- Append to the pendingRequestsslice.
- Release the mulock.
- Send to the notifySenderchannel (with adefaultcase to avoid blocking).
The send goroutine will do the following:
- Run a selectstatement waiting ons.streamChands.notifySender.- If the s.streamChcase is selected: CallsendExisting, which will acquire themulock and swap outpendingRequestswith an empty slice.
- If the s.notifySendercase is selected: Acquire themulock, swap outpendingRequestskeeping the original slice, release themulock, and then callsendNewfor each request in the original slice.
 
- If the 
Notice that sending on the notifySender channel is done after appending to the pendingRequests slice. This ensures that the send goroutine doesn't miss any notifications. This is because if there's an object in the notifySender channel, it's guaranteed that at some point in the future, the send goroutine will read from the notifySender channel, acquire mu, and see the new elements in the pendingRequests slice.
Do you think this makes the implementation simpler, and still correct?
| state, ok := s.resourceTypeState[typ] | ||
| if !ok { | ||
| // State is created when the first subscription for this type is made. | ||
| panic(fmt.Sprintf("no state exists for resource type %v", typ)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want a production panic?  Or should this be log.Errorf() (which fails all tests) and a return <some error> (or nil since the stream is still usable, depending on what it means to return an error?)?  The latter feels safer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly, we were not even checking if the value existed in the map before. We were blindly using it. So, the potential for a real panic seems almost non-existent. Literally, the ADS stream implementation adds the entry to the map when it first sees a subscribe request for this resource type and then queues the request. So, we are definitely going to have the entry in the map. I'll remove the panic, so it doesn't cause confusion to the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we never remove entries from this map. So, we are safe.
|  | ||
| // Clear any queued requests. Previously subscribed to resources will be | ||
| // resent below. | ||
| s.requestCh.Reset() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to mandate that s.mu is held when calling Put or Reset?
Maybe we should wrap accesses to it somehow to guarantee that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replied on the other comment.
| (Whoops, forgot to reassign after reviewing.) | 
7152711    to
    3a11e3e      
    Compare
  
    3a11e3e    to
    364c27f      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the rebase and the force push. I had to do that since I had to import the changes here to google3 to ensure that the new changes don't introduce any flakiness and since a whole bunch of changes have gone into the repo since i first sent this PR out, it was untenable to do that.
| state, ok := s.resourceTypeState[typ] | ||
| if !ok { | ||
| // State is created when the first subscription for this type is made. | ||
| panic(fmt.Sprintf("no state exists for resource type %v", typ)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly, we were not even checking if the value existed in the map before. We were blindly using it. So, the potential for a real panic seems almost non-existent. Literally, the ADS stream implementation adds the entry to the map when it first sees a subscribe request for this resource type and then queues the request. So, we are definitely going to have the entry in the map. I'll remove the panic, so it doesn't cause confusion to the reader.
| state, ok := s.resourceTypeState[typ] | ||
| if !ok { | ||
| // State is created when the first subscription for this type is made. | ||
| panic(fmt.Sprintf("no state exists for resource type %v", typ)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we never remove entries from this map. So, we are safe.
| // | ||
| // It's expected to be used in scenarios where the buffered data is no longer | ||
| // relevant, and needs to be cleared. | ||
| func (b *Unbounded) Reset() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've ended up using a list type from the stdlib for the implementation of the request queue, which will now be guarded by the adsStreamImpls mutex.
While this eliminates the possibility of the race (which existed in the previous approach), the use of a condition variable to know when the request queue is empty (and blocking until it becomes non-empty), and the fact that sync.Cond doesn't allow blocking in a select with other ways of getting unblocked, the code has become a bit uglier and more complicated in my opinion.
Another option would be to create a new type like SynchronizedUnboundedBuffer that takes a sync.Locker and guards access to all of its methods with given lock. But even in that case, since Get returns a channel, it is not a viable option I believe.
Another reason I went with the stdlib list instead of trying to synchronize access to the unbounded buffer (while also supporting a Reset method) was that the unbounded buffer is currently thread-safe and adding a new method which moves the responsibility of synchronization to the caller didn't seem very appealing.
Would like to hear your thoughts on these.
|  | ||
| // Clear any queued requests. Previously subscribed to resources will be | ||
| // resent below. | ||
| s.requestCh.Reset() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replied on the other comment.
Fixes #8125
The original race in the xDS client:
The original fix:
Delay the resource's removal from the cache until the unsubscribe request was transmitted over the wire, a change implemented in #8369. However, this solution introduced new complications:
The root cause of the previous seen races can be put down two things:
How does this PR address the above issue?
This PR simplifies the implementation of the ADS stream by removing two pieces of functionality
A,B, andC, the stream would now send three requests:[A],[A B],[A B C].RELEASE NOTES: