diff --git a/.doc_gen/metadata/sqs_metadata.yaml b/.doc_gen/metadata/sqs_metadata.yaml index 60d2b83ee2a..4f66fe2e9c2 100644 --- a/.doc_gen/metadata/sqs_metadata.yaml +++ b/.doc_gen/metadata/sqs_metadata.yaml @@ -867,8 +867,20 @@ sqs_Scenario_SendReceiveBatch: - description: Use the wrapper functions to send and receive messages in batches. snippet_tags: - python.example_code.sqs.Scenario_SendReceiveBatch + Java: + versions: + - sdk_version: 2 + github: javav2/example_code/sqs + sdkguide: + excerpts: + - description: Compare single-message SQS operations with automatic batching. + snippet_tags: + - sqs.java2.batch_demo.main + - description: Create functions to wrap &SQS; message functions and use them to send and receive messages in batches. + snippet_tags: + - sqs.java2.sendRecvBatch.main services: - sqs: {CreateQueue, DeleteQueue, SendMessageBatch, ReceiveMessage, DeleteMessageBatch} + sqs: {CreateQueue, DeleteQueue, SendMessageBatch, SendMessage, ReceiveMessage, DeleteMessageBatch, DeleteMessage} sqs_GetQueueAttributes: languages: .NET: diff --git a/javav2/example_code/sqs/README.md b/javav2/example_code/sqs/README.md index 02e10ae1769..7a6ac077c0b 100644 --- a/javav2/example_code/sqs/README.md +++ b/javav2/example_code/sqs/README.md @@ -57,6 +57,7 @@ functions within the same service. - [Manage large messages using S3](src/main/java/com/example/sqs/SqsExtendedClientExample.java) - [Process S3 event notifications](../s3/src/main/java/com/example/s3/ProcessS3EventNotification.java) - [Publish messages to queues](../../usecases/topics_and_queues/src/main/java/com/example/sns/SNSWorkflow.java) +- [Send and receive batches of messages](src/main/java/com/example/sqs/SimpleProducerConsumer.java) - [Use the Amazon SQS Java Messaging Library to work with the JMS interface](../sqs-jms/src/main/java/com/example/sqs/jms/stdqueue/TextMessageSender.java) - [Work with queue tags](src/main/java/com/example/sqs/TagExamples.java) @@ -130,6 +131,22 @@ This example shows you how to do the following: +#### Send and receive batches of messages + +This example shows you how to do the following: + +- Create an Amazon SQS queue. +- Send batches of messages to the queue. +- Receive batches of messages from the queue. +- Delete batches of messages from the queue. + + + + + + + + #### Use the Amazon SQS Java Messaging Library to work with the JMS interface This example shows you how to use the Amazon SQS Java Messaging Library to work with the JMS interface. diff --git a/javav2/example_code/sqs/src/main/java/com/example/sqs/SendRecvBatch.java b/javav2/example_code/sqs/src/main/java/com/example/sqs/SendRecvBatch.java new file mode 100644 index 00000000000..61fb9c1c88b --- /dev/null +++ b/javav2/example_code/sqs/src/main/java/com/example/sqs/SendRecvBatch.java @@ -0,0 +1,372 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.example.sqs; + +// snippet-start:[sqs.java2.sendRecvBatch.main] +// snippet-start:[sqs.java2.sendRecvBatch.import] + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; +import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +// snippet-end:[sqs.java2.sendRecvBatch.import] + + +/** + * Before running this Java V2 code example, set up your development + * environment, including your credentials. + * + * For more information, see the following documentation topic: + * + * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html + */ + +/** + * This code demonstrates basic message operations in Amazon Simple Queue Service (Amazon SQS). + */ + +public class SendRecvBatch { + private static final Logger LOGGER = LoggerFactory.getLogger(SendRecvBatch.class); + private static final SqsClient sqsClient = SqsClient.create(); + + + public static void main(String[] args) { + String queueName = "testQueue" + System.currentTimeMillis(); + String queueUrl = createQueue(queueName); + + // Send messages to the queue + List messages = new ArrayList<>(); + messages.add(new MessageEntry("Hello world!", null)); + messages.add(new MessageEntry("Hello world 2!", null)); + messages.add(new MessageEntry("Hello world 3!", null)); + SendMessageBatchResponse response = sendMessages(queueUrl, messages); + + // Receive messages from the queue + List receivedMessages = receiveMessages(queueUrl, 10, 10); + + // Delete messages from the queue + deleteMessages(queueUrl, receivedMessages); + + // Delete the queue + deleteQueue(queueUrl); + } + // snippet-start:[sqs.java2.sendRecvBatch.sendBatch] + /** + * Send a batch of messages in a single request to an SQS queue. + * This request may return overall success even when some messages were not sent. + * The caller must inspect the Successful and Failed lists in the response and + * resend any failed messages. + * + * @param queueUrl The URL of the queue to receive the messages. + * @param messages The messages to send to the queue. Each message contains a body and attributes. + * @return The response from SQS that contains the list of successful and failed messages. + */ + public static SendMessageBatchResponse sendMessages( + String queueUrl, List messages) { + + try { + List entries = new ArrayList<>(); + + for (int i = 0; i < messages.size(); i++) { + MessageEntry message = messages.get(i); + entries.add(SendMessageBatchRequestEntry.builder() + .id(String.valueOf(i)) + .messageBody(message.getBody()) + .messageAttributes(message.getAttributes()) + .build()); + } + + SendMessageBatchRequest sendBatchRequest = SendMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(entries) + .build(); + + SendMessageBatchResponse response = sqsClient.sendMessageBatch(sendBatchRequest); + + if (!response.successful().isEmpty()) { + for (SendMessageBatchResultEntry resultEntry : response.successful()) { + LOGGER.info("Message sent: " + resultEntry.messageId() + ": " + + messages.get(Integer.parseInt(resultEntry.id())).getBody()); + } + } + + if (!response.failed().isEmpty()) { + for (BatchResultErrorEntry errorEntry : response.failed()) { + LOGGER.warn("Failed to send: " + errorEntry.id() + ": " + + messages.get(Integer.parseInt(errorEntry.id())).getBody()); + } + } + + return response; + + } catch (SqsException e) { + LOGGER.error("Send messages failed to queue: " + queueUrl, e); + throw e; + } + } + // snippet-end:[sqs.java2.sendRecvBatch.sendBatch] + + // snippet-start:[sqs.java2.sendRecvBatch.recvBatch] + /** + * Receive a batch of messages in a single request from an SQS queue. + * + * @param queueUrl The URL of the queue from which to receive messages. + * @param maxNumber The maximum number of messages to receive. The actual number + * of messages received might be less. + * @param waitTime The maximum time to wait (in seconds) before returning. When + * this number is greater than zero, long polling is used. This + * can result in reduced costs and fewer false empty responses. + * @return The list of Message objects received. These each contain the body + * of the message and metadata and custom attributes. + */ + public static List receiveMessages(String queueUrl, int maxNumber, int waitTime) { + try { + ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() + .queueUrl(queueUrl) + .maxNumberOfMessages(maxNumber) + .waitTimeSeconds(waitTime) + .messageAttributeNames("All") + .build(); + + List messages = sqsClient.receiveMessage(receiveRequest).messages(); + + for (Message message : messages) { + LOGGER.info("Received message: " + message.messageId() + ": " + message.body()); + } + + return messages; + + } catch (SqsException e) { + LOGGER.error("Couldn't receive messages from queue: " + queueUrl, e); + throw e; + } + } + // snippet-end:[sqs.java2.sendRecvBatch.recvBatch] + + // snippet-start:[sqs.java2.sendRecvBatch.delBatch] + /** + * Delete a batch of messages from a queue in a single request. + * + * @param queueUrl The URL of the queue from which to delete the messages. + * @param messages The list of messages to delete. + * @return The response from SQS that contains the list of successful and failed + * message deletions. + */ + public static DeleteMessageBatchResponse deleteMessages(String queueUrl, List messages) { + try { + List entries = new ArrayList<>(); + + for (int i = 0; i < messages.size(); i++) { + entries.add(DeleteMessageBatchRequestEntry.builder() + .id(String.valueOf(i)) + .receiptHandle(messages.get(i).receiptHandle()) + .build()); + } + + DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries(entries) + .build(); + + DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(deleteRequest); + + if (!response.successful().isEmpty()) { + for (DeleteMessageBatchResultEntry resultEntry : response.successful()) { + LOGGER.info("Deleted " + messages.get(Integer.parseInt(resultEntry.id())).receiptHandle()); + } + } + + if (!response.failed().isEmpty()) { + for (BatchResultErrorEntry errorEntry : response.failed()) { + LOGGER.warn("Could not delete " + messages.get(Integer.parseInt(errorEntry.id())).receiptHandle()); + } + } + + return response; + + } catch (SqsException e) { + LOGGER.error("Couldn't delete messages from queue " + queueUrl, e); + throw e; + } + } + // snippet-end:[sqs.java2.sendRecvBatch.delBatch] + + // snippet-start:[sqs.java2.sendRecvBatch.scenario] + /** + * Helper class to represent a message with body and attributes. + */ + public static class MessageEntry { + private final String body; + private final Map attributes; + + public MessageEntry(String body, Map attributes) { + this.body = body; + this.attributes = attributes != null ? attributes : new HashMap<>(); + } + + public String getBody() { + return body; + } + + public Map getAttributes() { + return attributes; + } + } + + /** + * Shows how to: + * * Read the lines from this Java file and send the lines in + * batches of 10 as messages to a queue. + * * Receive the messages in batches until the queue is empty. + * * Reassemble the lines of the file and verify they match the original file. + */ + public static void usageDemo() { + System.out.println("-".repeat(88)); + System.out.println("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!"); + System.out.println("-".repeat(88)); + + // Create a queue for the demo. + String queueName = "sqs-usage-demo-message-wrapper-"+System.currentTimeMillis(); + CreateQueueRequest createRequest = CreateQueueRequest.builder() + .queueName(queueName) + .build(); + String queueUrl = sqsClient.createQueue(createRequest).queueUrl(); + System.out.println("Created queue: " + queueUrl); + + try { + // Read the lines from this Java file. + Path projectRoot = Paths.get(System.getProperty("user.dir")); + Path filePath = projectRoot.resolve("src/main/java/com/example/sqs/SendRecvBatch.java"); + List lines = Files.readAllLines(filePath); + + + // Send file lines in batches. + int batchSize = 10; + System.out.println("Sending file lines in batches of " + batchSize + " as messages."); + + for (int i = 0; i < lines.size(); i += batchSize) { + List messageBatch = new ArrayList<>(); + + for (int j = i; j < Math.min(i + batchSize, lines.size()); j++) { + String line = lines.get(j); + if (line == null || line.trim().isEmpty()) { + continue; // Skip empty lines. + } + + Map attributes = new HashMap<>(); + attributes.put("path", MessageAttributeValue.builder() + .dataType("String") + .stringValue(filePath.toString()) + .build()); + attributes.put("line", MessageAttributeValue.builder() + .dataType("String") + .stringValue(String.valueOf(j)) + .build()); + + messageBatch.add(new MessageEntry(lines.get(j), attributes)); + } + + sendMessages(queueUrl, messageBatch); + System.out.print("."); + System.out.flush(); + } + + System.out.println("\nDone. Sent " + lines.size() + " messages."); + + // Receive and process messages. + System.out.println("Receiving, handling, and deleting messages in batches of " + batchSize + "."); + String[] receivedLines = new String[lines.size()]; + boolean moreMessages = true; + + while (moreMessages) { + List receivedMessages = receiveMessages(queueUrl, batchSize, 5); + System.out.print("."); + System.out.flush(); + + for (Message message : receivedMessages) { + int lineNumber = Integer.parseInt(message.messageAttributes().get("line").stringValue()); + receivedLines[lineNumber] = message.body(); + } + + if (!receivedMessages.isEmpty()) { + deleteMessages(queueUrl, receivedMessages); + } else { + moreMessages = false; + } + } + + System.out.println("\nDone."); + + // Verify all lines were received correctly. + boolean allLinesMatch = true; + for (int i = 0; i < lines.size(); i++) { + String originalLine = lines.get(i); + String receivedLine = receivedLines[i] == null ? "" : receivedLines[i]; + + if (!originalLine.equals(receivedLine)) { + allLinesMatch = false; + break; + } + } + + if (allLinesMatch) { + System.out.println("Successfully reassembled all file lines!"); + } else { + System.out.println("Uh oh, some lines were missed!"); + } + + } catch (IOException e) { + LOGGER.error("Error reading file", e); + } finally { + // Clean up by deleting the queue. + DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() + .queueUrl(queueUrl) + .build(); + sqsClient.deleteQueue(deleteQueueRequest); + System.out.println("Deleted queue: " + queueUrl); + } + + System.out.println("Thanks for watching!"); + System.out.println("-".repeat(88)); + } + + private static String createQueue(String queueName) { + CreateQueueRequest createRequest = CreateQueueRequest.builder() + .queueName(queueName) + .build(); + return sqsClient.createQueue(createRequest).queueUrl(); + } + + private static void deleteQueue(String queueUrl) { + DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() + .queueUrl(queueUrl) + .build(); + sqsClient.deleteQueue(deleteQueueRequest); + } + } +// snippet-end:[sqs.java2.sendRecvBatch.scenario] +// snippet-end:[sqs.java2.sendRecvBatch.main] \ No newline at end of file diff --git a/javav2/example_code/sqs/src/main/java/com/example/sqs/SimpleProducerConsumer.java b/javav2/example_code/sqs/src/main/java/com/example/sqs/SimpleProducerConsumer.java new file mode 100644 index 00000000000..e84dc178c71 --- /dev/null +++ b/javav2/example_code/sqs/src/main/java/com/example/sqs/SimpleProducerConsumer.java @@ -0,0 +1,594 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +// snippet-start:[sqs.java2.batch_demo.main] +package com.example.sqs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.core.exception.SdkException; + +import java.math.BigInteger; +import java.util.List; +import java.util.Random; +import java.util.Scanner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Demonstrates the AWS SDK for Java 2.x Automatic Request Batching API for Amazon SQS. + * + * This example showcases the high-level SqsAsyncBatchManager library that provides + * efficient batching and buffering for SQS operations. The batch manager offers + * methods that directly mirror SqsAsyncClient methods—sendMessage, changeMessageVisibility, + * deleteMessage, and receiveMessage—making it a drop-in replacement with minimal code changes. + * + * Key features of the SqsAsyncBatchManager: + * - Automatic batching: The SDK automatically buffers individual requests and sends them + * as batches when maxBatchSize (default: 10) or sendRequestFrequency (default: 200ms) + * thresholds are reached + * - Familiar API: Method signatures match SqsAsyncClient exactly, requiring no learning curve + * - Background optimization: The batch manager maintains internal buffers and handles + * batching logic transparently + * - Asynchronous operations: All methods return CompletableFuture for non-blocking execution + * + * Performance benefits demonstrated: + * - Reduced API calls: Multiple individual requests are consolidated into single batch operations + * - Lower costs: Fewer API calls result in reduced SQS charges + * - Higher throughput: Batch operations process more messages per second + * - Efficient resource utilization: Fewer network round trips and better connection reuse + * + * This example compares: + * 1. Single-message operations using SqsAsyncClient directly + * 2. Batch operations using SqsAsyncBatchManager with identical method calls + * + * Usage patterns: + * - Set batch size to 1 to use SqsAsyncClient for baseline performance measurement + * - Set batch size > 1 to use SqsAsyncBatchManager for optimized batch processing + * - Monitor real-time throughput metrics to observe performance improvements + * + * Prerequisites: + * - AWS SDK for Java 2.x version 2.28.0 or later + * - An existing SQS queue + * - Valid AWS credentials configured + * + * The program displays real-time metrics showing the dramatic performance difference + * between individual operations and automatic batching. + */ +public class SimpleProducerConsumer { + + // The maximum runtime of the program. + private final static int MAX_RUNTIME_MINUTES = 60; + private final static Logger log = LoggerFactory.getLogger(SimpleProducerConsumer.class); + + /** + * Runs the SQS batching demonstration with user-configured parameters. + * + * Prompts for queue name, thread counts, batch size, message size, and runtime. + * Creates producer and consumer threads to demonstrate batching performance. + * + * @param args command line arguments (not used) + * @throws InterruptedException if thread operations are interrupted + */ + public static void main(String[] args) throws InterruptedException { + + final Scanner input = new Scanner(System.in); + + System.out.print("Enter the queue name: "); + final String queueName = input.nextLine(); + + System.out.print("Enter the number of producers: "); + final int producerCount = input.nextInt(); + + System.out.print("Enter the number of consumers: "); + final int consumerCount = input.nextInt(); + + System.out.print("Enter the number of messages per batch: "); + final int batchSize = input.nextInt(); + + System.out.print("Enter the message size in bytes: "); + final int messageSizeByte = input.nextInt(); + + System.out.print("Enter the run time in minutes: "); + final int runTimeMinutes = input.nextInt(); + + // Create SQS async client and batch manager for all operations. + // The SqsAsyncBatchManager is created from the SqsAsyncClient using the + // batchManager() factory method, which provides default batching configuration. + // This high-level library automatically handles request buffering and batching + // while maintaining the same method signatures as SqsAsyncClient. + final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); + final SqsAsyncBatchManager batchManager = sqsAsyncClient.batchManager(); + + final String queueUrl = sqsAsyncClient.getQueueUrl(GetQueueUrlRequest.builder() + .queueName(queueName) + .build()).join().queueUrl(); + + // The flag used to stop producer, consumer, and monitor threads. + final AtomicBoolean stop = new AtomicBoolean(false); + + // Start the producers. + final AtomicInteger producedCount = new AtomicInteger(); + final Thread[] producers = new Thread[producerCount]; + for (int i = 0; i < producerCount; i++) { + if (batchSize == 1) { + producers[i] = new Producer(sqsAsyncClient, queueUrl, messageSizeByte, + producedCount, stop); + } else { + producers[i] = new BatchProducer(batchManager, queueUrl, batchSize, + messageSizeByte, producedCount, stop); + } + producers[i].start(); + } + + // Start the consumers. + final AtomicInteger consumedCount = new AtomicInteger(); + final Thread[] consumers = new Thread[consumerCount]; + for (int i = 0; i < consumerCount; i++) { + if (batchSize == 1) { + consumers[i] = new Consumer(sqsAsyncClient, queueUrl, consumedCount, stop); + } else { + consumers[i] = new BatchConsumer(batchManager, queueUrl, batchSize, + consumedCount, stop); + } + consumers[i].start(); + } + + // Start the monitor thread. + final Thread monitor = new Monitor(producedCount, consumedCount, stop); + monitor.start(); + + // Wait for the specified amount of time then stop. + Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, + MAX_RUNTIME_MINUTES))); + stop.set(true); + + // Join all threads. + for (int i = 0; i < producerCount; i++) { + producers[i].join(); + } + + for (int i = 0; i < consumerCount; i++) { + consumers[i].join(); + } + + monitor.interrupt(); + monitor.join(); + + // Close resources + batchManager.close(); + sqsAsyncClient.close(); + } + + /** + * Creates a random string of approximately the specified size in bytes. + * + * @param sizeByte the target size in bytes for the generated string + * @return a random string encoded in base-32 + */ + private static String makeRandomString(int sizeByte) { + final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; + new Random().nextBytes(bs); + bs[0] = (byte) ((bs[0] | 64) & 127); + return new BigInteger(bs).toString(32); + } + + /** + * Sends messages individually using SqsAsyncClient for baseline performance measurement. + * + * This producer demonstrates traditional single-message operations without batching. + * Each sendMessage() call results in a separate API request to SQS, providing + * a performance baseline for comparison with the batch operations. + * + * The sendMessage() method signature is identical to SqsAsyncBatchManager.sendMessage(), + * showing how the high-level batching library maintains API compatibility while + * adding automatic optimization behind the scenes. + */ + private static class Producer extends Thread { + final SqsAsyncClient sqsAsyncClient; + final String queueUrl; + final AtomicInteger producedCount; + final AtomicBoolean stop; + final String theMessage; + + /** + * Creates a producer thread for single-message operations. + * + * @param sqsAsyncClient the SQS client for sending messages + * @param queueUrl the URL of the target queue + * @param messageSizeByte the size of messages to generate + * @param producedCount shared counter for tracking sent messages + * @param stop shared flag to signal thread termination + */ + Producer(SqsAsyncClient sqsAsyncClient, String queueUrl, int messageSizeByte, + AtomicInteger producedCount, AtomicBoolean stop) { + this.sqsAsyncClient = sqsAsyncClient; + this.queueUrl = queueUrl; + this.producedCount = producedCount; + this.stop = stop; + this.theMessage = makeRandomString(messageSizeByte); + } + + /** + * Continuously sends messages until the stop flag is set. + * + * Uses SqsAsyncClient.sendMessage() directly, resulting in one API call per message. + * This approach provides baseline performance metrics for comparison with batching. + * Each call blocks until the individual message is sent, demonstrating traditional + * one-request-per-operation behavior. + */ + public void run() { + try { + while (!stop.get()) { + sqsAsyncClient.sendMessage(SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody(theMessage) + .build()).join(); + producedCount.incrementAndGet(); + } + } catch (SdkException | java.util.concurrent.CompletionException e) { + // Handle both SdkException and CompletionException from async operations. + // If this unlikely condition occurs, stop. + log.error("Producer: " + e.getMessage()); + System.exit(1); + } + } + } + + /** + * Sends messages using SqsAsyncBatchManager for automatic request batching and optimization. + * + * This producer demonstrates the AWS SDK for Java 2.x high-level batching library. + * The SqsAsyncBatchManager automatically buffers individual sendMessage() calls and + * sends them as batches when thresholds are reached: + * - maxBatchSize: Maximum 10 messages per batch (default) + * - sendRequestFrequency: 200ms timeout before sending partial batches (default) + * + * Key advantages of the batching approach: + * - Identical API: batchManager.sendMessage() has the same signature as sqsAsyncClient.sendMessage() + * - Automatic optimization: No code changes needed to benefit from batching + * - Transparent buffering: The SDK handles batching logic internally + * - Reduced API calls: Multiple messages sent in single batch requests + * - Lower costs: Fewer API calls result in reduced SQS charges + * - Higher throughput: Batch operations process significantly more messages per second + */ + private static class BatchProducer extends Thread { + final SqsAsyncBatchManager batchManager; + final String queueUrl; + final int batchSize; + final AtomicInteger producedCount; + final AtomicBoolean stop; + final String theMessage; + + /** + * Creates a producer thread for batch operations. + * + * @param batchManager the batch manager for efficient message sending + * @param queueUrl the URL of the target queue + * @param batchSize the number of messages to send per batch + * @param messageSizeByte the size of messages to generate + * @param producedCount shared counter for tracking sent messages + * @param stop shared flag to signal thread termination + */ + BatchProducer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, + int messageSizeByte, AtomicInteger producedCount, + AtomicBoolean stop) { + this.batchManager = batchManager; + this.queueUrl = queueUrl; + this.batchSize = batchSize; + this.producedCount = producedCount; + this.stop = stop; + this.theMessage = makeRandomString(messageSizeByte); + } + + /** + * Continuously sends batches of messages using the high-level batching library. + * + * Notice how batchManager.sendMessage() uses the exact same method signature + * and request builder pattern as SqsAsyncClient.sendMessage(). This demonstrates + * the drop-in replacement capability of the SqsAsyncBatchManager. + * + * The SDK automatically: + * - Buffers individual sendMessage() calls internally + * - Groups them into batch requests when thresholds are met + * - Sends SendMessageBatchRequest operations to SQS + * - Returns individual CompletableFuture responses for each message + * + * This transparent batching provides significant performance improvements + * without requiring changes to application logic or error handling patterns. + */ + public void run() { + try { + while (!stop.get()) { + // Send multiple messages using the high-level batch manager. + // Each batchManager.sendMessage() call uses identical syntax to + // sqsAsyncClient.sendMessage(), demonstrating API compatibility. + // The SDK automatically buffers these calls and sends them as + // batch operations when maxBatchSize (10) or sendRequestFrequency (200ms) + // thresholds are reached, significantly improving throughput. + for (int i = 0; i < batchSize; i++) { + CompletableFuture future = batchManager.sendMessage( + SendMessageRequest.builder() + .queueUrl(queueUrl) + .messageBody(theMessage) + .build()); + + // Handle the response asynchronously + future.whenComplete((response, throwable) -> { + if (throwable == null) { + producedCount.incrementAndGet(); + } else if (!(throwable instanceof java.util.concurrent.CancellationException) && + !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { + log.error("BatchProducer: Failed to send message", throwable); + } + // Ignore CancellationException and executor shutdown errors - expected during shutdown + }); + } + + // Small delay to allow batching to occur + Thread.sleep(10); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("BatchProducer interrupted: " + e.getMessage()); + } catch (SdkException | java.util.concurrent.CompletionException e) { + log.error("BatchProducer: " + e.getMessage()); + System.exit(1); + } + } + } + + /** + * Receives and deletes messages individually using SqsAsyncClient for baseline measurement. + * + * This consumer demonstrates traditional single-message operations without batching. + * Each receiveMessage() and deleteMessage() call results in separate API requests, + * providing a performance baseline for comparison with batch operations. + * + * The method signatures are identical to SqsAsyncBatchManager methods: + * - receiveMessage() matches batchManager.receiveMessage() + * - deleteMessage() matches batchManager.deleteMessage() + * + * This API consistency allows easy migration to the high-level batching library. + */ + private static class Consumer extends Thread { + final SqsAsyncClient sqsAsyncClient; + final String queueUrl; + final AtomicInteger consumedCount; + final AtomicBoolean stop; + + /** + * Creates a consumer thread for single-message operations. + * + * @param sqsAsyncClient the SQS client for receiving messages + * @param queueUrl the URL of the source queue + * @param consumedCount shared counter for tracking processed messages + * @param stop shared flag to signal thread termination + */ + Consumer(SqsAsyncClient sqsAsyncClient, String queueUrl, AtomicInteger consumedCount, + AtomicBoolean stop) { + this.sqsAsyncClient = sqsAsyncClient; + this.queueUrl = queueUrl; + this.consumedCount = consumedCount; + this.stop = stop; + } + + /** + * Continuously receives and deletes messages using traditional single-request operations. + * + * Uses SqsAsyncClient methods directly: + * - receiveMessage(): One API call per receive operation + * - deleteMessage(): One API call per delete operation + * + * This approach demonstrates the baseline performance without batching optimization. + * Compare these method calls with the identical signatures used in BatchConsumer + * to see how the high-level batching library maintains API compatibility. + */ + public void run() { + try { + while (!stop.get()) { + try { + final ReceiveMessageResponse result = sqsAsyncClient.receiveMessage( + ReceiveMessageRequest.builder() + .queueUrl(queueUrl) + .build()).join(); + + if (!result.messages().isEmpty()) { + final Message m = result.messages().get(0); + // Note: deleteMessage() signature identical to batchManager.deleteMessage() + sqsAsyncClient.deleteMessage(DeleteMessageRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(m.receiptHandle()) + .build()).join(); + consumedCount.incrementAndGet(); + } + } catch (SdkException | java.util.concurrent.CompletionException e) { + log.error(e.getMessage()); + } + } + } catch (SdkException | java.util.concurrent.CompletionException e) { + // Handle both SdkException and CompletionException from async operations. + // If this unlikely condition occurs, stop. + log.error("Consumer: " + e.getMessage()); + System.exit(1); + } + } + } + + /** + * Receives and deletes messages using SqsAsyncBatchManager for automatic optimization. + * + * This consumer demonstrates the AWS SDK for Java 2.x high-level batching library + * for message consumption. The SqsAsyncBatchManager provides two key optimizations: + * + * 1. Receive optimization: Maintains an internal buffer of messages fetched in the + * background, so receiveMessage() calls return immediately from the buffer + * 2. Delete batching: Automatically buffers deleteMessage() calls and sends them + * as DeleteMessageBatchRequest operations when thresholds are reached + * + * Key features: + * - Identical API: receiveMessage() and deleteMessage() have the same signatures + * as SqsAsyncClient methods, making this a true drop-in replacement + * - Background fetching: The batch manager continuously fetches messages to keep + * the internal buffer populated, reducing receive latency + * - Automatic delete batching: Individual deleteMessage() calls are buffered and + * sent as batch operations (up to 10 per batch, 200ms frequency) + * - Transparent optimization: No application logic changes needed to benefit + * + * Performance benefits: + * - Reduced API calls through automatic batching of delete operations + * - Lower latency for receives due to background message buffering + * - Higher overall throughput with fewer network round trips + */ + private static class BatchConsumer extends Thread { + final SqsAsyncBatchManager batchManager; + final String queueUrl; + final int batchSize; + final AtomicInteger consumedCount; + final AtomicBoolean stop; + + /** + * Creates a consumer thread for batch operations. + * + * @param batchManager the batch manager for efficient message processing + * @param queueUrl the URL of the source queue + * @param batchSize the maximum number of messages to receive per batch + * @param consumedCount shared counter for tracking processed messages + * @param stop shared flag to signal thread termination + */ + BatchConsumer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, + AtomicInteger consumedCount, AtomicBoolean stop) { + this.batchManager = batchManager; + this.queueUrl = queueUrl; + this.batchSize = batchSize; + this.consumedCount = consumedCount; + this.stop = stop; + } + + /** + * Continuously receives and deletes messages using the high-level batching library. + * + * Demonstrates the key advantage of SqsAsyncBatchManager: identical method signatures + * with automatic optimization. Notice how: + * + * - batchManager.receiveMessage() uses the same syntax as sqsAsyncClient.receiveMessage() + * - batchManager.deleteMessage() uses the same syntax as sqsAsyncClient.deleteMessage() + * + * Behind the scenes, the batch manager: + * 1. Maintains an internal message buffer populated by background fetching + * 2. Returns messages immediately from the buffer (reduced latency) + * 3. Automatically batches deleteMessage() calls into DeleteMessageBatchRequest operations + * 4. Sends batch deletes when maxBatchSize (10) or sendRequestFrequency (200ms) is reached + * + * This provides significant performance improvements with zero code changes + * compared to traditional SqsAsyncClient usage patterns. + */ + public void run() { + try { + while (!stop.get()) { + // Receive messages using the high-level batch manager. + // This call uses identical syntax to sqsAsyncClient.receiveMessage() + // but benefits from internal message buffering for improved performance. + final ReceiveMessageResponse result = batchManager.receiveMessage( + ReceiveMessageRequest.builder() + .queueUrl(queueUrl) + .maxNumberOfMessages(Math.min(batchSize, 10)) + .build()).join(); + + if (!result.messages().isEmpty()) { + final List messages = result.messages(); + + // Delete messages using the batch manager. + // Each deleteMessage() call uses identical syntax to SqsAsyncClient + // but the SDK automatically buffers these calls and sends them + // as DeleteMessageBatchRequest operations for optimal performance. + for (Message message : messages) { + CompletableFuture future = batchManager.deleteMessage( + DeleteMessageRequest.builder() + .queueUrl(queueUrl) + .receiptHandle(message.receiptHandle()) + .build()); + + future.whenComplete((response, throwable) -> { + if (throwable == null) { + consumedCount.incrementAndGet(); + } else if (!(throwable instanceof java.util.concurrent.CancellationException) && + !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { + log.error("BatchConsumer: Failed to delete message", throwable); + } + // Ignore CancellationException and executor shutdown errors - expected during shutdown + }); + } + } + + // Small delay to prevent tight polling + Thread.sleep(10); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("BatchConsumer interrupted: " + e.getMessage()); + } catch (SdkException | java.util.concurrent.CompletionException e) { + // Handle both SdkException and CompletionException from async operations. + // If this unlikely condition occurs, stop. + log.error("BatchConsumer: " + e.getMessage()); + System.exit(1); + } + } + } + + /** + * Displays real-time throughput statistics every second. + * + * This thread logs the current count of produced and consumed messages + * to help you monitor the performance comparison. + */ + private static class Monitor extends Thread { + private final AtomicInteger producedCount; + private final AtomicInteger consumedCount; + private final AtomicBoolean stop; + + /** + * Creates a monitoring thread that displays throughput statistics. + * + * @param producedCount shared counter for messages sent + * @param consumedCount shared counter for messages processed + * @param stop shared flag to signal thread termination + */ + Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, + AtomicBoolean stop) { + this.producedCount = producedCount; + this.consumedCount = consumedCount; + this.stop = stop; + } + + /** + * Logs throughput statistics every second until stopped. + * + * Displays the current count of produced and consumed messages + * to help monitor the performance comparison between batching strategies. + */ + public void run() { + try { + while (!stop.get()) { + Thread.sleep(1000); + log.info("produced messages = " + producedCount.get() + + ", consumed messages = " + consumedCount.get()); + } + } catch (InterruptedException e) { + // Allow the thread to exit. + } + } + } +} +// snippet-end:[sqs.java2.batch_demo.main] \ No newline at end of file diff --git a/javav2/example_code/sqs/src/test/java/com/example/sqs/SendRecvBatchTest.java b/javav2/example_code/sqs/src/test/java/com/example/sqs/SendRecvBatchTest.java new file mode 100644 index 00000000000..e62e7d73053 --- /dev/null +++ b/javav2/example_code/sqs/src/test/java/com/example/sqs/SendRecvBatchTest.java @@ -0,0 +1,139 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.example.sqs; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.*; + +import java.util.*; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SendRecvBatchTest { + private static final Logger logger = LoggerFactory.getLogger(SendRecvBatchTest.class); + private static final SqsClient sqsClient = SqsClient.create(); + private String queueUrl = ""; + + @BeforeEach + void setUp() { + String queueName = "SendRecvBatch-queue-" + UUID.randomUUID().toString().replace("-", "").substring(0, 20); + queueUrl = sqsClient.createQueue(b -> b.queueName(queueName)).queueUrl(); + logger.info("Created test queue: {}", queueUrl); + } + + @AfterEach + void tearDown() { + sqsClient.deleteQueue(b -> b.queueUrl(queueUrl)); + logger.info("Deleted test queue: {}", queueUrl); + } + + private static Stream sendMessageBatchTestData() { + return Stream.of( + Arguments.of(List.of( + new SendRecvBatch.MessageEntry("Message 1", Collections.emptyMap()), + new SendRecvBatch.MessageEntry("Message 2", Collections.emptyMap()) + )), + Arguments.of(List.of( + new SendRecvBatch.MessageEntry("Message with attributes", Map.of( + "type", MessageAttributeValue.builder() + .stringValue("test") + .dataType("String") + .build() + )) + )) + ); + } + + @ParameterizedTest + @MethodSource("sendMessageBatchTestData") + @Order(1) + void testSendMessages(List messages) { + logger.info("Testing send messages with {} messages", messages.size()); + SendMessageBatchResponse response = SendRecvBatch.sendMessages(queueUrl, messages); + assertEquals(messages.size(), response.successful().size()); + logger.info("Successfully sent {} messages", response.successful().size()); + } + + @Test + @Order(2) + void testReceiveMessages() { + logger.info("Testing receive messages"); + // First send some messages + List messages = List.of( + new SendRecvBatch.MessageEntry("Test message 1", Collections.emptyMap()), + new SendRecvBatch.MessageEntry("Test message 2", Collections.emptyMap()) + ); + SendRecvBatch.sendMessages(queueUrl, messages); + logger.info("Sent {} messages for receive test", messages.size()); + + List receivedMessages = SendRecvBatch.receiveMessages(queueUrl, 10, 5); + assertFalse(receivedMessages.isEmpty()); + logger.info("Received {} messages", receivedMessages.size()); + } + + @Test + @Order(3) + void testDeleteMessages() { + logger.info("Testing delete messages"); + // First send and receive messages + List messages = List.of( + new SendRecvBatch.MessageEntry("Test message", Collections.emptyMap()) + ); + SendRecvBatch.sendMessages(queueUrl, messages); + logger.info("Sent {} messages for delete test", messages.size()); + + List receivedMessages = SendRecvBatch.receiveMessages(queueUrl, 10, 5); + assertFalse(receivedMessages.isEmpty()); + logger.info("Received {} messages to delete", receivedMessages.size()); + + DeleteMessageBatchResponse response = SendRecvBatch.deleteMessages(queueUrl, receivedMessages); + assertEquals(receivedMessages.size(), response.successful().size()); + logger.info("Successfully deleted {} messages", response.successful().size()); + } + + @Test + @Order(4) + void testMessageEntry() { + logger.info("Testing MessageEntry with attributes"); + Map attributes = Map.of( + "test", MessageAttributeValue.builder() + .stringValue("value") + .dataType("String") + .build() + ); + + SendRecvBatch.MessageEntry entry = new SendRecvBatch.MessageEntry("Test body", attributes); + + assertEquals("Test body", entry.getBody()); + assertEquals(attributes, entry.getAttributes()); + logger.info("MessageEntry test passed with body: {}", entry.getBody()); + } + + @Test + @Order(5) + void testMessageEntryWithNullAttributes() { + logger.info("Testing MessageEntry with null attributes"); + SendRecvBatch.MessageEntry entry = new SendRecvBatch.MessageEntry("Test body", null); + + assertEquals("Test body", entry.getBody()); + assertNotNull(entry.getAttributes()); + assertTrue(entry.getAttributes().isEmpty()); + logger.info("MessageEntry null attributes test passed"); + } +} diff --git a/javav2/example_code/sqs/src/test/java/com/example/sqs/SimpleProducerConsumerIntegrationTest.java b/javav2/example_code/sqs/src/test/java/com/example/sqs/SimpleProducerConsumerIntegrationTest.java new file mode 100644 index 00000000000..7446a6f07a8 --- /dev/null +++ b/javav2/example_code/sqs/src/test/java/com/example/sqs/SimpleProducerConsumerIntegrationTest.java @@ -0,0 +1,257 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.example.sqs; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test for SimpleProducerConsumer that verifies the program works end-to-end. + * + * This test creates a temporary SQS queue and captures Log4J2 messages to verify message processing. + */ +public class SimpleProducerConsumerIntegrationTest { + + private String testQueueName; + private SqsAsyncClient sqsClient; + private String queueUrl; + private TestAppender testAppender; + private Logger logger; + + /** + * Custom Log4J2 appender to capture log events for testing + */ + private static class TestAppender extends AbstractAppender { + private final List logEvents = new ArrayList<>(); + + protected TestAppender() { + super("TestAppender", null, null, true, Property.EMPTY_ARRAY); + } + + @Override + public void append(LogEvent event) { + logEvents.add(event.toImmutable()); + } + + public List getLogEvents() { + return new ArrayList<>(logEvents); + } + + public void clear() { + logEvents.clear(); + } + } + + @BeforeEach + void setUp() { + // Generate unique queue name for each test + testQueueName = "test-queue-" + System.currentTimeMillis() + "-" + Thread.currentThread().getId(); + + // Create SQS client and test queue + sqsClient = SqsAsyncClient.create(); + + // Create queue + sqsClient.createQueue(CreateQueueRequest.builder() + .queueName(testQueueName) + .build()).join(); + + // Get queue URL + queueUrl = sqsClient.getQueueUrl(GetQueueUrlRequest.builder() + .queueName(testQueueName) + .build()).join().queueUrl(); + + // Set up log capture + LoggerContext context = (LoggerContext) LogManager.getContext(false); + logger = context.getLogger(SimpleProducerConsumer.class.getName()); + testAppender = new TestAppender(); + testAppender.start(); + logger.addAppender(testAppender); + logger.setLevel(Level.INFO); + } + + @AfterEach + void tearDown() { + // Clean up log appender + if (logger != null && testAppender != null) { + logger.removeAppender(testAppender); + testAppender.stop(); + } + + // Clean up queue + if (sqsClient != null && queueUrl != null) { + try { + sqsClient.deleteQueue(DeleteQueueRequest.builder() + .queueUrl(queueUrl) + .build()).join(); + } catch (Exception e) { + // Ignore cleanup errors + } + sqsClient.close(); + } + } + + + + /** + * Tests that the SimpleProducerConsumer program executes successfully with single-message operations. + * + * Verifies that: + * - The program accepts user input and starts without errors + * - At least one message is produced by the producer thread + * - Monitor thread logs message counts showing program activity + * - Program completes within the expected timeframe + * + * Uses configuration: 1 producer, 1 consumer, batch size 1, 100-byte messages, 1-minute runtime + */ + @Test + void testSimpleProducerConsumerExecutesSuccessfully() throws Exception { + // Simulate user input: queueName + producers + consumers + batchSize + messageSize + runtime + String simulatedInput = testQueueName + "\n1\n1\n1\n100\n1\n"; + + InputStream originalSystemIn = System.in; + + try { + System.setIn(new ByteArrayInputStream(simulatedInput.getBytes())); + + // Run the program in a separate thread with timeout + CompletableFuture programExecution = CompletableFuture.runAsync(() -> { + try { + SimpleProducerConsumer.main(new String[]{}); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Wait for program to complete or timeout after 70 seconds (1 minute runtime + 10 second buffer) + final int TIMEOUT_SECONDS = 70; + programExecution.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // Verify log messages were captured + List logEvents = testAppender.getLogEvents(); + assertFalse(logEvents.isEmpty(), "Expected log messages to be captured"); + + // Parse monitor log messages to extract message counts + MessageCounts counts = parseMessageCounts(logEvents); + + // Verify that messages were actually processed + assertTrue(counts.foundMessages, "Expected to find monitor log messages"); + assertTrue(counts.maxProduced > 0, "Expected messages to be produced, but got: " + counts.maxProduced); + assertTrue(counts.maxConsumed >= 0, "Expected non-negative consumed count, but got: " + counts.maxConsumed); + + System.out.println("Test passed - Produced: " + counts.maxProduced + ", Consumed: " + counts.maxConsumed); + + } finally { + System.setIn(originalSystemIn); + } + } + + /** + * Tests that the SimpleProducerConsumer program works correctly with batch operations. + * + * Verifies that: + * - The program can handle multiple producers and consumers concurrently + * - Batch operations (batch size > 1) produce messages successfully + * - Multiple threads coordinate properly without conflicts + * - Batching configuration is processed correctly + * + * Uses configuration: 2 producers, 2 consumers, batch size 5, 200-byte messages, 1-minute runtime + */ + @Test + void testMessageProcessingWithBatching() throws Exception { + // Simulate user input: queueName + producers + consumers + batchSize + messageSize + runtime + String simulatedInput = testQueueName + "\n2\n2\n5\n200\n1\n"; + + InputStream originalSystemIn = System.in; + + try { + System.setIn(new ByteArrayInputStream(simulatedInput.getBytes())); + + CompletableFuture programExecution = CompletableFuture.runAsync(() -> { + try { + SimpleProducerConsumer.main(new String[]{}); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + final int TIMEOUT_SECONDS = 70; + programExecution.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + // Verify batching produced messages + List logEvents = testAppender.getLogEvents(); + MessageCounts counts = parseMessageCounts(logEvents); + + assertTrue(counts.maxProduced > 0, "Expected messages to be produced with batching"); + System.out.println("Batching test passed - Produced: " + counts.maxProduced + " messages"); + + } finally { + System.setIn(originalSystemIn); + } + } + + /** + * Helper method to parse message counts from log events. + * + * @param logEvents the log events to parse + * @return MessageCounts object containing parsed counts and status + */ + private MessageCounts parseMessageCounts(List logEvents) { + Pattern messagePattern = Pattern.compile("produced messages = (\\d+), consumed messages = (\\d+)"); + int maxProduced = 0; + int maxConsumed = 0; + boolean foundMessages = false; + + for (LogEvent event : logEvents) { + String message = event.getMessage().getFormattedMessage(); + Matcher matcher = messagePattern.matcher(message); + if (matcher.find()) { + foundMessages = true; + int produced = Integer.parseInt(matcher.group(1)); + int consumed = Integer.parseInt(matcher.group(2)); + maxProduced = Math.max(maxProduced, produced); + maxConsumed = Math.max(maxConsumed, consumed); + } + } + + return new MessageCounts(maxProduced, maxConsumed, foundMessages); + } + + /** + * Data class to hold parsed message count results. + */ + private static class MessageCounts { + final int maxProduced; + final int maxConsumed; + final boolean foundMessages; + + MessageCounts(int maxProduced, int maxConsumed, boolean foundMessages) { + this.maxProduced = maxProduced; + this.maxConsumed = maxConsumed; + this.foundMessages = foundMessages; + } + } +} \ No newline at end of file