Skip to content

Commit 79c54b9

Browse files
committed
lsat: add stream interceptor
1 parent bcd92ce commit 79c54b9

File tree

3 files changed

+273
-174
lines changed

3 files changed

+273
-174
lines changed

lsat/interceptor.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,52 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
145145
return invoker(rpcCtx2, method, req, reply, cc, iCtx.opts...)
146146
}
147147

148+
// StreamInterceptor is an interceptor method that can be used directly by gRPC
149+
// for streaming calls. If the store contains a token, it is attached as
150+
// credentials to every stream establishment call before patching it through.
151+
// The response error is also intercepted for every initial stream initiation.
152+
// If there is an error returned and it is indicating a payment challenge, a
153+
// token is acquired and paid for automatically. The original request is then
154+
// repeated back to the server, now with the new token attached.
155+
func (i *Interceptor) StreamInterceptor(ctx context.Context,
156+
desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
157+
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream,
158+
error) {
159+
160+
// To avoid paying for a token twice if two parallel requests are
161+
// happening, we require an exclusive lock here.
162+
i.lock.Lock()
163+
defer i.lock.Unlock()
164+
165+
// Create the context that we'll use to initiate the real request. This
166+
// contains the means to extract response headers and possibly also an
167+
// auth token, if we already have paid for one.
168+
iCtx, err := i.newInterceptContext(ctx, opts)
169+
if err != nil {
170+
return nil, err
171+
}
172+
173+
// Try establishing the stream now. If anything goes wrong, we only
174+
// handle the LSAT error message that comes in the form of a gRPC status
175+
// error. The context of a stream will be used for the whole lifetime of
176+
// it, so we can't really clamp down on the initial call with a timeout.
177+
stream, err := streamer(ctx, desc, cc, method, iCtx.opts...)
178+
if !isPaymentRequired(err) {
179+
return stream, err
180+
}
181+
182+
// Find out if we need to pay for a new token or perhaps resume
183+
// a previously aborted payment.
184+
err = i.handlePayment(iCtx)
185+
if err != nil {
186+
return nil, err
187+
}
188+
189+
// Execute the same request again, now with the LSAT token added
190+
// as an RPC credential.
191+
return streamer(ctx, desc, cc, method, iCtx.opts...)
192+
}
193+
148194
// newInterceptContext creates the initial intercept context that can capture
149195
// metadata from the server and sends the local token to the server if one
150196
// already exists.

0 commit comments

Comments
 (0)