Skip to content

Commit aedc223

Browse files
authored
[TT-14446] Add an integration test for Input (http_server) → Output(amqp_1) scenario (#7026)
### **User description** <details open> <summary><a href="https://tyktech.atlassian.net/browse/TT-14446" title="TT-14446" target="_blank">TT-14446</a></summary> <br /> <table> <tr> <th>Summary</th> <td>Investigate flaky amqp integration tests</td> </tr> <tr> <th>Type</th> <td> <img alt="Task" src="https://tyktech.atlassian.net/rest/api/2/universal_avatar/view/type/issuetype/avatar/10318?size=medium" /> Task </td> </tr> <tr> <th>Status</th> <td>In Code Review</td> </tr> <tr> <th>Points</th> <td>N/A</td> </tr> <tr> <th>Labels</th> <td>-</td> </tr> </table> </details> <!-- do not remove this marker as it will break jira-lint's functionality. added_by_jira_lint --> --- This PR adds an integration test for `Input (http_server) → Output(amqp_1)` scenario. See warpstreamlabs/bento#302 for details. ___ ### **PR Type** Tests, Enhancement ___ ### **Description** - Add integration test for HTTP input to AMQP 1.0 output scenario - Implement AMQP 1.0 output test logic in test context - Extend test runner to support AMQP 1.0 output validation - Ensure message mapping and pipeline processor are included for AMQP 1.0 ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Tests</strong></td><td><table> <tr> <td> <details> <summary><strong>mw_streaming_amqp_test.go</strong><dd><code>Add and integrate AMQP 1.0 output test in streaming integration tests</code></dd></summary> <hr> gateway/mw_streaming_amqp_test.go <li>Added <code>testAMQP1Output</code> method for AMQP 1.0 message validation<br> <li> Extended <code>testTykStreamAMQPIntegration</code> to handle AMQP 1.0 output<br> <li> Introduced a new integration test for HTTP input to AMQP 1.0 output<br> <li> Ensured proper session, receiver, and message handling for AMQP 1.0 </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/7026/files#diff-3599fefd35a5eddd4f464208edce54054fc3e5045d1694473367e834c331f9dc">+81/-0</a>&nbsp; &nbsp; </td> </tr> </table></td></tr></tr></tbody></table> ___ > <details> <summary> Need help?</summary><li>Type <code>/help how to ...</code> in the comments thread for any questions about PR-Agent usage.</li><li>Check out the <a href="https://qodo-merge-docs.qodo.ai/usage-guide/">documentation</a> for more information.</li></details>
1 parent a9092e5 commit aedc223

File tree

1 file changed

+81
-0
lines changed

1 file changed

+81
-0
lines changed

gateway/mw_streaming_amqp_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ func (t *amqpTestContext) testAMQP09Output(expectedMessages [][]byte) {
310310
result = append(result, d.Body)
311311
if len(result) == len(expectedMessages) {
312312
close(done)
313+
break
313314
}
314315
}
315316
}()
@@ -322,6 +323,48 @@ func (t *amqpTestContext) testAMQP09Output(expectedMessages [][]byte) {
322323
assert.Equal(t.t, expectedMessages, result)
323324
}
324325

326+
func (t *amqpTestContext) testAMQP1Output(expectedMessages [][]byte) {
327+
baseCtx := context.Background()
328+
conn, err := amqp1.Dial(baseCtx, t.amqpURL, nil)
329+
require.NoErrorf(t.t, err, "Failed to connect to RabbitMQ")
330+
331+
session, err := conn.NewSession(baseCtx, nil)
332+
require.NoErrorf(t.t, err, "Failed to create a session")
333+
334+
// create a new receiver
335+
receiver, err := session.NewReceiver(baseCtx, t.queueName, nil)
336+
require.NoErrorf(t.t, err, "Failed to create a receiver")
337+
338+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
339+
defer cancel()
340+
341+
done := make(chan struct{})
342+
var result [][]byte
343+
go func() {
344+
for {
345+
msg, receiverErr := receiver.Receive(ctx, nil)
346+
if receiverErr != nil {
347+
t.t.Logf("Failed to receive message: %v", receiverErr)
348+
break
349+
}
350+
351+
result = append(result, msg.GetData())
352+
if len(result) == len(expectedMessages) {
353+
close(done)
354+
break
355+
}
356+
require.NoError(t.t, receiver.AcceptMessage(ctx, msg))
357+
}
358+
}()
359+
360+
select {
361+
case <-baseCtx.Done():
362+
case <-done:
363+
}
364+
365+
assert.Equal(t.t, expectedMessages, result)
366+
}
367+
325368
func (t *amqpTestContext) testTykStreamAMQPIntegration() {
326369
t.t.Helper()
327370

@@ -359,6 +402,8 @@ func (t *amqpTestContext) testTykStreamAMQPIntegration() {
359402
testWebsocketOutput(t.t, wsClients, messageToSend, numMessages)
360403
} else if t.output == "amqp_0_9" {
361404
t.testAMQP09Output(messages)
405+
} else if t.output == "amqp_1" {
406+
t.testAMQP1Output(messages)
362407
} else {
363408
require.Fail(t.t, "Invalid output type")
364409
}
@@ -527,4 +572,40 @@ streams:
527572
testCtx.initializeAMQP09Environment()
528573
testCtx.testTykStreamAMQPIntegration()
529574
})
575+
576+
t.Run("Publish messages to http input then consume messages from amqp_1 output", func(t *testing.T) {
577+
// We need to add pipeline processor for amqp_1 output,
578+
// see https://github.com/warpstreamlabs/bento/issues/302
579+
queueName := "test-queue-input-http-amqp-1-output"
580+
streamingConfig := fmt.Sprintf(`
581+
streams:
582+
test:
583+
input:
584+
http_server:
585+
path: /post
586+
timeout: 1s
587+
pipeline:
588+
processors:
589+
- mapping: |
590+
meta = deleted()
591+
output:
592+
amqp_1:
593+
urls:
594+
- %s
595+
target_address: %s
596+
`, amqpURL, queueName)
597+
tykStreamingOAS, oasErr := setupOASForStreamAPI(streamingConfig)
598+
require.NoError(t, oasErr)
599+
apiName := setupStreamingAPIForOAS(t, ts, &tykStreamingOAS)
600+
testCtx := &amqpTestContext{
601+
t: t,
602+
ts: ts,
603+
apiName: apiName,
604+
queueName: queueName,
605+
amqpURL: amqpURL,
606+
input: "http_server",
607+
output: "amqp_1",
608+
}
609+
testCtx.testTykStreamAMQPIntegration()
610+
})
530611
}

0 commit comments

Comments
 (0)