Skip to content

Commit 45ae588

Browse files
authored
[TT-14794] fix issue where an invalid stream path results in 500 (#7047)
### **User description** <details open> <summary><a href="https://tyktech.atlassian.net/browse/TT-14794" title="TT-14794" target="_blank">TT-14794</a></summary> <br /> <table> <tr> <th>Summary</th> <td>Calling a wrong path on streams API returns `There was a problem proxying the request`</td> </tr> <tr> <th>Type</th> <td> <img alt="Bug" src="https://tyktech.atlassian.net/rest/api/2/universal_avatar/view/type/issuetype/avatar/10303?size=medium" /> Bug </td> </tr> <tr> <th>Status</th> <td>In Dev</td> </tr> <tr> <th>Points</th> <td>N/A</td> </tr> <tr> <th>Labels</th> <td>-</td> </tr> </table> </details> <!-- do not remove this marker as it will break jira-lint's functionality. added_by_jira_lint --> --- This PR fixes an issue, where an invalid path would result in a 500 Internal Server Error without giving any feedback to the user. Example: SSE Output path on `/get/stream`. When calling `/get/steam` would result in `500`. Now it will respond with `404`. ___ ### **PR Type** Bug fix, Tests ___ ### **Description** - Fix 500 error for invalid stream paths, return 404 instead - Add tests for SSE output path error handling and success - Implement test SSE client for streaming endpoint verification - Improve debug logging for missing stream paths ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>middleware.go</strong><dd><code>Return 404 and log when stream path is missing</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> ee/middleware/streams/middleware.go <li>Return 404 Not Found with error for missing stream paths<br> <li> Add debug log for unmatched stream paths </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7047/files#diff-0ce428c0f09dca65e3df6e72d01fee63b6f237785e41e6ecf0ce34a8b65c74a5">+2/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></td></tr><tr><td><strong>Tests</strong></td><td><table> <tr> <td> <details> <summary><strong>mw_streaming_test.go</strong><dd><code>Add tests for SSE output path error and success</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></summary> <hr> gateway/mw_streaming_test.go <li>Add test to verify 404 for invalid SSE output paths<br> <li> Add test for successful SSE connection and message receipt<br> <li> Implement helper SSE client for testing streaming endpoints </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7047/files#diff-a0d1bd0196a741537a3c850e340225c8993e49d709c838af0f1b48b9893af1da">+148/-0</a>&nbsp; </td> </tr> </table></td></tr></tr></tbody></table> ___ > <details> <summary> Need help?</summary><li>Type <code>/help how to ...</code> in the comments thread for any questions about PR-Agent usage.</li><li>Check out the <a href="https://qodo-merge-docs.qodo.ai/usage-guide/">documentation</a> for more information.</li></details>
1 parent 7e70a02 commit 45ae588

File tree

2 files changed

+150
-1
lines changed

2 files changed

+150
-1
lines changed

ee/middleware/streams/middleware.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ func (s *Middleware) processStreamsConfig(r *http.Request, streams map[string]an
227227
func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int) {
228228
strippedPath := s.Spec.StripListenPath(r.URL.Path)
229229
if !s.defaultManager.hasPath(strippedPath) {
230-
return nil, http.StatusOK
230+
s.Logger().Debugf("Path not found: %s", strippedPath)
231+
return errors.New("not found"), http.StatusNotFound
231232
}
232233

233234
s.Logger().Debugf("Processing request: %s, %s", r.URL.Path, strippedPath)

gateway/mw_streaming_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package gateway
44

55
import (
6+
"bufio"
67
"bytes"
78
"context"
89
"crypto/tls"
@@ -11,6 +12,7 @@ import (
1112
"net/http"
1213
"os"
1314
"strings"
15+
"sync"
1416
"testing"
1517
"time"
1618

@@ -185,6 +187,7 @@ streams:
185187
output:
186188
http_server:
187189
ws_path: /subscribe
190+
stream_path: /get/stream
188191
`
189192

190193
func TestStreamingAPISingleClient(t *testing.T) {
@@ -1041,3 +1044,148 @@ func TestStreamingAPIGarbageCollection(t *testing.T) {
10411044
})
10421045
require.Equal(t, 0, streamManagersAfterGC)
10431046
}
1047+
1048+
func TestStreaming_HttpOutputPaths(t *testing.T) {
1049+
ts := StartTest(func(globalConf *config.Config) {
1050+
globalConf.Streaming.Enabled = true
1051+
})
1052+
t.Cleanup(ts.Close)
1053+
1054+
oasAPI, err := setupOASForStreamAPI(bentoHTTPServerTemplate)
1055+
require.NoError(t, err)
1056+
1057+
apiName := "output-paths-test-api"
1058+
ts.Gw.BuildAndLoadAPI(func(spec *APISpec) {
1059+
spec.Proxy.ListenPath = fmt.Sprintf("/%s", apiName)
1060+
spec.UseKeylessAccess = true
1061+
spec.IsOAS = true
1062+
spec.OAS = oasAPI
1063+
spec.OAS.Fill(*spec.APIDefinition)
1064+
})
1065+
1066+
apiUrl := fmt.Sprintf("%s/%s", ts.URL, apiName)
1067+
1068+
t.Run("should fail with 404 Not Found if the path is invalid", func(t *testing.T) {
1069+
pathWithTypo := "get/steam"
1070+
targetUrl := fmt.Sprintf("%s/%s", apiUrl, pathWithTypo)
1071+
1072+
sseClient := newTestStreamSSEClient(context.Background(), targetUrl)
1073+
statusCode, err := sseClient.Connect()
1074+
1075+
assert.NoError(t, err)
1076+
assert.Equal(t, http.StatusNotFound, statusCode)
1077+
})
1078+
1079+
t.Run("should connect to SSE endpoint and consume messages as expected", func(t *testing.T) {
1080+
correctPath := "get/stream"
1081+
targetUrl := fmt.Sprintf("%s/%s", apiUrl, correctPath)
1082+
1083+
sseContext, cancelFn := context.WithCancel(context.Background())
1084+
sseClient := newTestStreamSSEClient(sseContext, targetUrl)
1085+
go func() {
1086+
statusCode, err := sseClient.Connect()
1087+
require.NoError(t, err)
1088+
require.Equal(t, http.StatusOK, statusCode)
1089+
}()
1090+
1091+
defer func() {
1092+
err := sseClient.Close()
1093+
require.NoError(t, err)
1094+
cancelFn()
1095+
}()
1096+
1097+
receiveMessageChan := make(chan string)
1098+
go func() {
1099+
err := sseClient.ReadMessages(receiveMessageChan)
1100+
require.NoError(t, err)
1101+
}()
1102+
1103+
testMessage := `{"test": "message"}`
1104+
publishURL := fmt.Sprintf("%s/post", apiUrl)
1105+
publishResp, err := http.Post(publishURL, "application/json", bytes.NewReader([]byte(testMessage)))
1106+
require.NoError(t, err)
1107+
require.Equal(t, http.StatusOK, publishResp.StatusCode)
1108+
1109+
var message string
1110+
assert.Eventuallyf(t, func() bool {
1111+
message = <-receiveMessageChan
1112+
return true
1113+
}, 3*time.Second, 10*time.Millisecond, "SSE message not received")
1114+
assert.Equal(t, testMessage, message)
1115+
})
1116+
}
1117+
1118+
type testStreamSSEClient struct {
1119+
ctx context.Context
1120+
url string
1121+
client http.Client
1122+
resp *http.Response
1123+
done chan struct{}
1124+
wg *sync.WaitGroup
1125+
}
1126+
1127+
func newTestStreamSSEClient(ctx context.Context, url string) *testStreamSSEClient {
1128+
wg := sync.WaitGroup{}
1129+
wg.Add(1)
1130+
1131+
return &testStreamSSEClient{
1132+
ctx: ctx,
1133+
url: url,
1134+
client: http.Client{},
1135+
done: make(chan struct{}),
1136+
wg: &wg,
1137+
}
1138+
}
1139+
1140+
func (sse *testStreamSSEClient) Connect() (statusCode int, err error) {
1141+
req, err := http.NewRequestWithContext(sse.ctx, http.MethodGet, sse.url, nil)
1142+
if err != nil {
1143+
return 0, fmt.Errorf("creating request: %w", err)
1144+
}
1145+
1146+
req.Header.Set("Accept", "text/event-stream")
1147+
req.Header.Set("Cache-Control", "no-cache")
1148+
req.Header.Set("Connection", "keep-alive")
1149+
1150+
sse.resp, err = sse.client.Do(req)
1151+
sse.wg.Done()
1152+
if err != nil {
1153+
close(sse.done)
1154+
if sse.resp != nil {
1155+
statusCode = sse.resp.StatusCode
1156+
}
1157+
return statusCode, fmt.Errorf("doing request: %w", err)
1158+
}
1159+
1160+
return sse.resp.StatusCode, nil
1161+
}
1162+
1163+
func (sse *testStreamSSEClient) Close() error {
1164+
close(sse.done)
1165+
return nil
1166+
}
1167+
1168+
func (sse *testStreamSSEClient) ReadMessages(messageChan chan<- string) (err error) {
1169+
sse.wg.Wait() // we need to make sure that the connection is ready
1170+
scanner := bufio.NewScanner(sse.resp.Body)
1171+
1172+
defer func() {
1173+
err = sse.resp.Body.Close()
1174+
}()
1175+
1176+
for scanner.Scan() {
1177+
select {
1178+
case <-sse.ctx.Done():
1179+
return nil
1180+
case <-sse.done:
1181+
return nil
1182+
default:
1183+
message := scanner.Text()
1184+
if message != "" {
1185+
messageChan <- message
1186+
}
1187+
}
1188+
}
1189+
1190+
return nil
1191+
}

0 commit comments

Comments
 (0)