Skip to content

Commit ba6b4e7

Browse files
authored
Merge pull request #145 from guggero/stream-interceptor
lsat: add stream interceptor
2 parents b785254 + 79c54b9 commit ba6b4e7

File tree

3 files changed

+365
-223
lines changed

3 files changed

+365
-223
lines changed

lsat/interceptor.go

Lines changed: 138 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ func NewInterceptor(lnd *lndclient.LndServices, store Store,
8989
}
9090
}
9191

92+
// interceptContext is a struct that contains all information about a call that
93+
// is intercepted by the interceptor.
94+
type interceptContext struct {
95+
mainCtx context.Context
96+
opts []grpc.CallOption
97+
metadata *metadata.MD
98+
token *Token
99+
}
100+
92101
// UnaryInterceptor is an interceptor method that can be used directly by gRPC
93102
// for unary calls. If the store contains a token, it is attached as credentials
94103
// to every call before patching it through. The response error is also
@@ -105,38 +114,119 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
105114
i.lock.Lock()
106115
defer i.lock.Unlock()
107116

108-
addLsatCredentials := func(token *Token) error {
109-
macaroon, err := token.PaidMacaroon()
110-
if err != nil {
111-
return err
112-
}
113-
opts = append(opts, grpc.PerRPCCredentials(
114-
macaroons.NewMacaroonCredential(macaroon),
115-
))
116-
return nil
117+
// Create the context that we'll use to initiate the real request. This
118+
// contains the means to extract response headers and possibly also an
119+
// auth token, if we already have paid for one.
120+
iCtx, err := i.newInterceptContext(ctx, opts)
121+
if err != nil {
122+
return err
123+
}
124+
125+
// Try executing the call now. If anything goes wrong, we only handle
126+
// the LSAT error message that comes in the form of a gRPC status error.
127+
rpcCtx, cancel := context.WithTimeout(ctx, i.callTimeout)
128+
defer cancel()
129+
err = invoker(rpcCtx, method, req, reply, cc, iCtx.opts...)
130+
if !isPaymentRequired(err) {
131+
return err
132+
}
133+
134+
// Find out if we need to pay for a new token or perhaps resume
135+
// a previously aborted payment.
136+
err = i.handlePayment(iCtx)
137+
if err != nil {
138+
return err
139+
}
140+
141+
// Execute the same request again, now with the LSAT
142+
// token added as an RPC credential.
143+
rpcCtx2, cancel2 := context.WithTimeout(ctx, i.callTimeout)
144+
defer cancel2()
145+
return invoker(rpcCtx2, method, req, reply, cc, iCtx.opts...)
146+
}
147+
148+
// StreamInterceptor is an interceptor method that can be used directly by gRPC
149+
// for streaming calls. If the store contains a token, it is attached as
150+
// credentials to every stream establishment call before patching it through.
151+
// The response error is also intercepted for every initial stream initiation.
152+
// If there is an error returned and it is indicating a payment challenge, a
153+
// token is acquired and paid for automatically. The original request is then
154+
// repeated back to the server, now with the new token attached.
155+
func (i *Interceptor) StreamInterceptor(ctx context.Context,
156+
desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
157+
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream,
158+
error) {
159+
160+
// To avoid paying for a token twice if two parallel requests are
161+
// happening, we require an exclusive lock here.
162+
i.lock.Lock()
163+
defer i.lock.Unlock()
164+
165+
// Create the context that we'll use to initiate the real request. This
166+
// contains the means to extract response headers and possibly also an
167+
// auth token, if we already have paid for one.
168+
iCtx, err := i.newInterceptContext(ctx, opts)
169+
if err != nil {
170+
return nil, err
171+
}
172+
173+
// Try establishing the stream now. If anything goes wrong, we only
174+
// handle the LSAT error message that comes in the form of a gRPC status
175+
// error. The context of a stream will be used for the whole lifetime of
176+
// it, so we can't really clamp down on the initial call with a timeout.
177+
stream, err := streamer(ctx, desc, cc, method, iCtx.opts...)
178+
if !isPaymentRequired(err) {
179+
return stream, err
180+
}
181+
182+
// Find out if we need to pay for a new token or perhaps resume
183+
// a previously aborted payment.
184+
err = i.handlePayment(iCtx)
185+
if err != nil {
186+
return nil, err
187+
}
188+
189+
// Execute the same request again, now with the LSAT token added
190+
// as an RPC credential.
191+
return streamer(ctx, desc, cc, method, iCtx.opts...)
192+
}
193+
194+
// newInterceptContext creates the initial intercept context that can capture
195+
// metadata from the server and sends the local token to the server if one
196+
// already exists.
197+
func (i *Interceptor) newInterceptContext(ctx context.Context,
198+
opts []grpc.CallOption) (*interceptContext, error) {
199+
200+
iCtx := &interceptContext{
201+
mainCtx: ctx,
202+
opts: opts,
203+
metadata: &metadata.MD{},
117204
}
118205

119206
// Let's see if the store already contains a token and what state it
120207
// might be in. If a previous call was aborted, we might have a pending
121208
// token that needs to be handled separately.
122-
token, err := i.store.CurrentToken()
209+
var err error
210+
iCtx.token, err = i.store.CurrentToken()
123211
switch {
124212
// If there is no token yet, nothing to do at this point.
125213
case err == ErrNoToken:
126214

127215
// Some other error happened that we have to surface.
128216
case err != nil:
129217
log.Errorf("Failed to get token from store: %v", err)
130-
return fmt.Errorf("getting token from store failed: %v", err)
218+
return nil, fmt.Errorf("getting token from store failed: %v",
219+
err)
131220

132221
// Only if we have a paid token append it. We don't resume a pending
133222
// payment just yet, since we don't even know if a token is required for
134223
// this call. We also never send a pending payment to the server since
135224
// we know it's not valid.
136-
case !token.isPending():
137-
if err = addLsatCredentials(token); err != nil {
225+
case !iCtx.token.isPending():
226+
if err = i.addLsatCredentials(iCtx); err != nil {
138227
log.Errorf("Adding macaroon to request failed: %v", err)
139-
return fmt.Errorf("adding macaroon failed: %v", err)
228+
return nil, fmt.Errorf("adding macaroon failed: %v",
229+
err)
140230
}
141231
}
142232

@@ -145,60 +235,59 @@ func (i *Interceptor) UnaryInterceptor(ctx context.Context, method string,
145235
// option. We execute the request and inspect the error. If it's the
146236
// LSAT specific payment required error, we might execute the same
147237
// method again later with the paid LSAT token.
148-
trailerMetadata := &metadata.MD{}
149-
opts = append(opts, grpc.Trailer(trailerMetadata))
150-
rpcCtx, cancel := context.WithTimeout(ctx, i.callTimeout)
151-
defer cancel()
152-
err = invoker(rpcCtx, method, req, reply, cc, opts...)
153-
154-
// Only handle the LSAT error message that comes in the form of
155-
// a gRPC status error.
156-
if isPaymentRequired(err) {
157-
paidToken, err := i.handlePayment(ctx, token, trailerMetadata)
158-
if err != nil {
159-
return err
160-
}
161-
if err = addLsatCredentials(paidToken); err != nil {
162-
log.Errorf("Adding macaroon to request failed: %v", err)
163-
return fmt.Errorf("adding macaroon failed: %v", err)
164-
}
165-
166-
// Execute the same request again, now with the LSAT
167-
// token added as an RPC credential.
168-
rpcCtx2, cancel2 := context.WithTimeout(ctx, i.callTimeout)
169-
defer cancel2()
170-
return invoker(rpcCtx2, method, req, reply, cc, opts...)
171-
}
172-
return err
238+
iCtx.opts = append(iCtx.opts, grpc.Trailer(iCtx.metadata))
239+
return iCtx, nil
173240
}
174241

175242
// handlePayment tries to obtain a valid token by either tracking the payment
176243
// status of a pending token or paying for a new one.
177-
func (i *Interceptor) handlePayment(ctx context.Context, token *Token,
178-
md *metadata.MD) (*Token, error) {
179-
244+
func (i *Interceptor) handlePayment(iCtx *interceptContext) error {
180245
switch {
181246
// Resume/track a pending payment if it was interrupted for some reason.
182-
case token != nil && token.isPending():
247+
case iCtx.token != nil && iCtx.token.isPending():
183248
log.Infof("Payment of LSAT token is required, resuming/" +
184249
"tracking previous payment from pending LSAT token")
185-
err := i.trackPayment(ctx, token)
250+
err := i.trackPayment(iCtx.mainCtx, iCtx.token)
186251
if err != nil {
187-
return nil, err
252+
return err
188253
}
189-
return token, nil
190254

191255
// We don't have a token yet, try to get a new one.
192-
case token == nil:
256+
case iCtx.token == nil:
193257
// We don't have a token yet, get a new one.
194258
log.Infof("Payment of LSAT token is required, paying invoice")
195-
return i.payLsatToken(ctx, md)
259+
var err error
260+
iCtx.token, err = i.payLsatToken(iCtx.mainCtx, iCtx.metadata)
261+
if err != nil {
262+
return err
263+
}
196264

197265
// We have a token and it's valid, nothing more to do here.
198266
default:
199267
log.Debugf("Found valid LSAT token to add to request")
200-
return token, nil
201268
}
269+
270+
if err := i.addLsatCredentials(iCtx); err != nil {
271+
log.Errorf("Adding macaroon to request failed: %v", err)
272+
return fmt.Errorf("adding macaroon failed: %v", err)
273+
}
274+
return nil
275+
}
276+
277+
// addLsatCredentials adds an LSAT token to the given intercept context.
278+
func (i *Interceptor) addLsatCredentials(iCtx *interceptContext) error {
279+
if iCtx.token == nil {
280+
return fmt.Errorf("cannot add nil token to context")
281+
}
282+
283+
macaroon, err := iCtx.token.PaidMacaroon()
284+
if err != nil {
285+
return err
286+
}
287+
iCtx.opts = append(iCtx.opts, grpc.PerRPCCredentials(
288+
macaroons.NewMacaroonCredential(macaroon),
289+
))
290+
return nil
202291
}
203292

204293
// payLsatToken reads the payment challenge from the response metadata and tries

0 commit comments

Comments
 (0)