Skip to content

Commit ad5a247

Browse files
authored
[TT-14253] Unload Streams Properly (#7033)
### **User description** PR for https://tyktech.atlassian.net/browse/TT-14253 This story is related to https://tyktech.atlassian.net/browse/TT-14252 I cannot find a reliable way to implement an integration test for this particular case. The test suite and Tyk Streams require a refactor to test this case in a programatic way. It's not possible to test it via tyk-analytics integration tests because the effect of calling `Unload` method is hidden under layers of abstraction. We can only understand the stream is unloaded from GW logs if the debug mode is enabled. It's possible to reproduce TT-14253 and TT-14252 with the following streams config: ```yaml input: label: "" http_client: url: "" # No default (required) verb: GET headers: {} rate_limit: "" # No default (optional) timeout: 5s payload: "" # No default (optional) stream: enabled: false reconnect: true scanner: lines: {} auto_replay_nacks: true output: label: "" http_client: url: "" # No default (required) verb: POST headers: {} rate_limit: "" # No default (optional) timeout: 5s max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: "" ``` ___ ### **PR Type** Enhancement ___ ### **Description** - Properly unload Tyk Streams when an API is removed - Add Unload method to middleware interface and wrappers - Ensure all middleware implement Unload, even if empty - Improve stream reset logic and logging in Streams middleware ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><table> <tr> <td> <details> <summary><strong>middleware.go</strong><dd><code>Refactor and enhance stream unloading logic</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> ee/middleware/streams/middleware.go <li>Refactored stream reset logic into a helper method<br> <li> Enhanced Unload to reset all cached and default streams<br> <li> Improved logging for stream reset errors<br> <li> Clarified Unload's purpose in comments </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7033/files#diff-0ce428c0f09dca65e3df6e72d01fee63b6f237785e41e6ecf0ce34a8b65c74a5">+19/-7</a>&nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>middleware.go</strong><dd><code>Add Unload method to UpstreamBasicAuth middleware</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> ee/middleware/upstreambasicauth/middleware.go <li>Added Unload method with no-op implementation<br> <li> Ensured middleware conforms to new interface </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7033/files#diff-4738eae6b1fa23f58598d492a9fa4fa6682545bf00bf122040180425a5e9dc60">+4/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>middleware.go</strong><dd><code>Add Unload method to UpstreamOAuth middleware</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> ee/middleware/upstreamoauth/middleware.go <li>Added Unload method with no-op implementation<br> <li> Ensured middleware conforms to new interface </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7033/files#diff-1347b256f3728407a2697f1e824391eaa329162e644741d8321f25c7a0630363">+4/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>api_loader.go</strong><dd><code>Detect and prepare removed APIs for unloading</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> gateway/api_loader.go <li>Identified removed API specs for unloading<br> <li> Prepared specsToUnload list for proper cleanup </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7033/files#diff-cdf0b7f176c9d18e1a314b78ddefc2cb3a94b3de66f1f360174692c915734c68">+7/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>middleware_wrap.go</strong><dd><code>Add Unload to middleware wrapper and fix receiver names</code>&nbsp; &nbsp; </dd></summary> <hr> gateway/middleware_wrap.go <li>Implemented Unload method in wrapMiddleware<br> <li> Fixed receiver naming for consistency<br> <li> Ensured interface compliance </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7033/files#diff-1da43bd02220acad12bce8d8c5600b4acfee3c40f90c53825802747004c9fb0a">+7/-3</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td> <details> <summary><strong>interfaces.go</strong><dd><code>Extend Middleware interface with Unload method</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> internal/model/interfaces.go <li>Added Unload method to Middleware interface<br> <li> Updated interface documentation accordingly </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7033/files#diff-43ba6dd4a8d193850dea32e8af5c361470cd62bfa390c580a39f7142a56bd391">+1/-0</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></td></tr></tr></tbody></table> ___ > <details> <summary> Need help?</summary><li>Type <code>/help how to ...</code> in the comments thread for any questions about PR-Agent usage.</li><li>Check out the <a href="https://qodo-merge-docs.qodo.ai/usage-guide/">documentation</a> for more information.</li></details>
1 parent 3984fa6 commit ad5a247

File tree

7 files changed

+53
-22
lines changed

7 files changed

+53
-22
lines changed

ee/middleware/streams/middleware.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,30 +263,42 @@ func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ in
263263
return nil, middleware.StatusRespond
264264
}
265265

266-
// Unload closes and remove active streams.
266+
func (s *Middleware) resetStream(streamValue any) {
267+
if stream, ok := streamValue.(*Stream); ok {
268+
if err := stream.Reset(); err != nil {
269+
s.Logger().WithError(err).Error("Failed to reset stream")
270+
}
271+
}
272+
}
273+
274+
// Unload closes and remove active streams. This method is called when the API is removed.
267275
func (s *Middleware) Unload() {
268276
s.Logger().Debugf("Unloading streaming middleware %s", s.Spec.Name)
269277

270278
totalStreams := 0
271279
s.cancel()
272280

281+
// Reset cached streams and stop the underlying Bento instances
273282
s.StreamManagerCache.Range(func(_, value interface{}) bool {
274283
manager, ok := value.(*Manager)
275284
if !ok {
276285
return true
277286
}
278287
manager.streams.Range(func(_, streamValue interface{}) bool {
279288
totalStreams++
280-
if stream, ok := streamValue.(*Stream); ok {
281-
if err := stream.Reset(); err != nil {
282-
s.Logger().WithError(err).Error("Failed to reset stream")
283-
}
284-
}
285-
return true
289+
s.resetStream(streamValue)
290+
return true // continue iterating
286291
})
287292
return true
288293
})
289294

295+
// Finally, reset the default manager and stop the underlying Bento instance
296+
s.defaultManager.streams.Range(func(_, streamValue interface{}) bool {
297+
totalStreams++
298+
s.resetStream(streamValue)
299+
return true // continue iterating
300+
})
301+
290302
GlobalStreamCounter.Add(-int64(totalStreams))
291303
s.StreamManagerCache = sync.Map{}
292304
s.Logger().Info("All streams successfully removed")

ee/middleware/upstreambasicauth/middleware.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ type Middleware struct {
1919
base BaseMiddleware
2020
}
2121

22+
func (m *Middleware) Unload() {
23+
// nothing to do here
24+
}
25+
2226
// Middleware implements model.Middleware.
2327
var _ model.Middleware = &Middleware{}
2428

ee/middleware/upstreamoauth/middleware.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ type Middleware struct {
2323
passwordStorageHandler Storage
2424
}
2525

26+
func (m *Middleware) Unload() {
27+
// nothing to do here
28+
}
29+
2630
// Middleware implements model.Middleware.
2731
var _ model.Middleware = &Middleware{}
2832

gateway/api_loader.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,13 @@ func (gw *Gateway) loadApps(specs []*APISpec) {
10701070
}
10711071
}
10721072

1073+
// Find the removed specs to unload them
1074+
for apiID, curSpec := range gw.apisByID {
1075+
if _, ok := tmpSpecRegister[apiID]; !ok {
1076+
specsToUnload = append(specsToUnload, curSpec)
1077+
}
1078+
}
1079+
10731080
gw.apisByID = tmpSpecRegister
10741081
gw.apisHandlesByID = tmpSpecHandles
10751082

gateway/middleware_wrap.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type wrapMiddleware struct {
1313
mw model.Middleware
1414
}
1515

16-
var _ TykMiddleware = &wrapMiddleware{}
16+
var _ TykMiddleware = (*wrapMiddleware)(nil)
1717

1818
// WrapMiddleware returns a new TykMiddleware with the provided base middleware,
1919
// and the smaller model.Middleware interface. It allows to implement model.Middleware,
@@ -41,8 +41,8 @@ func (w *wrapMiddleware) Name() string {
4141
return w.mw.Name()
4242
}
4343

44-
func (s *wrapMiddleware) Logger() *logrus.Entry {
45-
return s.mw.Logger()
44+
func (w *wrapMiddleware) Logger() *logrus.Entry {
45+
return w.mw.Logger()
4646
}
4747

4848
func (w *wrapMiddleware) EnabledForSpec() bool {
@@ -52,3 +52,7 @@ func (w *wrapMiddleware) EnabledForSpec() bool {
5252
func (w *wrapMiddleware) ProcessRequest(rw http.ResponseWriter, r *http.Request, data interface{}) (error, int) {
5353
return w.mw.ProcessRequest(rw, r, data)
5454
}
55+
56+
func (w *wrapMiddleware) Unload() {
57+
w.mw.Unload()
58+
}

internal/graphengine/engine_v2.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,24 +239,23 @@ func (e *EngineV2) HandleReverseProxy(params ReverseProxyParams) (res *http.Resp
239239
return nil, false, err
240240
}
241241

242-
gqlRequest := e.ctxRetrieveRequestFunc(params.OutRequest)
243-
// Cleanup method, frees allocated resources, if they are eligible for freeing up.
244-
// Currently, it only frees up the allocated resources of a GraphQL query that
245-
// has a cached query plan.
246-
//
247-
// graphql-go-tools uses the parsed query (ast.Document in graphql-go-tools codebase)
248-
// in the planner and caches the plans. If a plan has been cached, we can reset the created
249-
// ast.Document struct and put it back to the pool for later use. By this way, we can reduce the GC
250-
// pressure and number of allocations per GraphQL query.
251-
// See TT-9864 for the details.
252-
defer gqlRequest.Cleanup()
253-
254242
switch reverseProxyType {
255243
case ReverseProxyTypeIntrospection:
256244
return e.gqlTools.handleIntrospection(e.Schema)
257245
case ReverseProxyTypeWebsocketUpgrade:
258246
return e.handoverWebSocketConnectionToGraphQLExecutionEngine(&params)
259247
case ReverseProxyTypeGraphEngine:
248+
gqlRequest := e.ctxRetrieveRequestFunc(params.OutRequest)
249+
// Cleanup method frees allocated resources if they are eligible for freeing up.
250+
// Currently, it only frees up the allocated resources of a GraphQL query that
251+
// has a cached query plan.
252+
//
253+
// graphql-go-tools uses the parsed query (ast.Document in graphql-go-tools codebase)
254+
// in the planner and caches the plans. If a plan has been cached, we can reset the created
255+
// ast.Document struct and put it back in the pool for later use. In this way, we can reduce the GC
256+
// pressure and number of allocations per GraphQL query.
257+
// See TT-9864 for the details.
258+
defer gqlRequest.Cleanup()
260259
return e.handoverRequestToGraphQLExecutionEngine(gqlRequest, params.OutRequest)
261260
case ReverseProxyTypePreFlight:
262261
if e.ApiDefinition.GraphQL.ExecutionMode == apidef.GraphQLExecutionModeProxyOnly {

internal/model/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Middleware interface {
2727
Logger() *logrus.Entry
2828
ProcessRequest(w http.ResponseWriter, r *http.Request, conf interface{}) (error, int) // Handles request
2929
EnabledForSpec() bool
30+
Unload()
3031
}
3132

3233
// LoggerProvider returns a new *logrus.Entry for the request.

0 commit comments

Comments
 (0)