Skip to content

Commit 8e6f6e1

Browse files
committed
1 parent 9a9e028 commit 8e6f6e1

File tree

5 files changed

+433
-0
lines changed

5 files changed

+433
-0
lines changed

.doc_gen/metadata/sqs_metadata.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,3 +1211,22 @@ sqs_Scenario_UseJMS:
12111211
- javav2/example_code/sqs-jms/src/main/java/com/example/sqs/jms/SqsJmsExampleUtils.java
12121212
services:
12131213
sqs: {CreateQueue, DeleteQueue}
1214+
sqs_Scenario_SqsExtendedClient:
1215+
title: Manage large &SQS; messages using &S3; with an &AWS; SDK
1216+
title_abbrev: Manage large messages using S3
1217+
synopsis: use the Amazon SQS Extended Client Library to work with large &SQS; messages.
1218+
category: Scenarios
1219+
languages:
1220+
Java:
1221+
versions:
1222+
- sdk_version: 2
1223+
github: javav2/example_code/sqs
1224+
sdkguide: AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-s3-messages.html
1225+
excerpts:
1226+
- description:
1227+
snippet_tags:
1228+
- sqs.java2.sqs-extended-client.main
1229+
services:
1230+
sqs: {SendMessage, ReceiveMessage}
1231+
s3: {CreateBucket, PutBucketLifecycleConfiguration}
1232+

javav2/example_code/sqs/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Code examples that show you how to accomplish a specific task by calling multipl
5454
functions within the same service.
5555

5656
- [Create and publish to a FIFO topic](../sns/src/main/java/com/example/sns/PriceUpdateExample.java)
57+
- [Manage large messages using S3](src/main/java/com/example/sqs/SqsExtendedClientExample.java)
5758
- [Process S3 event notifications](../s3/src/main/java/com/example/s3/ProcessS3EventNotification.java)
5859
- [Publish messages to queues](../../usecases/topics_and_queues/src/main/java/com/example/sns/SNSWorkflow.java)
5960
- [Use the Amazon SQS Java Messaging Library to work with the JMS interface](../sqs-jms/src/main/java/com/example/sqs/jms/stdqueue/TextMessageSender.java)
@@ -89,6 +90,18 @@ This example shows you how to create and publish to a FIFO Amazon SNS topic.
8990
<!--custom.scenarios.sns_PublishFifoTopic.start-->
9091
<!--custom.scenarios.sns_PublishFifoTopic.end-->
9192

93+
#### Manage large messages using S3
94+
95+
This example shows you how to use the Amazon SQS Extended Client Library to work with large Amazon SQS messages.
96+
97+
98+
<!--custom.scenario_prereqs.sqs_Scenario_SqsExtendedClient.start-->
99+
<!--custom.scenario_prereqs.sqs_Scenario_SqsExtendedClient.end-->
100+
101+
102+
<!--custom.scenarios.sqs_Scenario_SqsExtendedClient.start-->
103+
<!--custom.scenarios.sqs_Scenario_SqsExtendedClient.end-->
104+
92105
#### Process S3 event notifications
93106

94107
This example shows you how to work with S3 event notifications in an object-oriented way.

javav2/example_code/sqs/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
<groupId>software.amazon.awssdk</groupId>
6060
<artifactId>secretsmanager</artifactId>
6161
</dependency>
62+
<dependency>
63+
<groupId>com.amazonaws</groupId>
64+
<artifactId>amazon-sqs-java-extended-client-lib</artifactId>
65+
<version>2.1.1</version>
66+
</dependency>
6267
<dependency>
6368
<groupId>com.google.code.gson</groupId>
6469
<artifactId>gson</artifactId>
@@ -109,6 +114,12 @@
109114
<groupId>org.apache.logging.log4j</groupId>
110115
<artifactId>log4j-1.2-api</artifactId>
111116
</dependency>
117+
<!-- Joda Time dependency -->
118+
<dependency>
119+
<groupId>joda-time</groupId>
120+
<artifactId>joda-time</artifactId>
121+
<version>2.12.6</version>
122+
</dependency>
112123
<dependency>
113124
<groupId>org.mockito</groupId>
114125
<artifactId>mockito-core</artifactId>
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package com.example.sqs;
5+
// snippet-start:[sqs.java2.sqs-extended-client.main]
6+
import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
7+
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.joda.time.DateTime;
11+
import org.joda.time.format.DateTimeFormat;
12+
import software.amazon.awssdk.services.s3.S3Client;
13+
import software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration;
14+
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
15+
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
16+
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
17+
import software.amazon.awssdk.services.s3.model.ExpirationStatus;
18+
import software.amazon.awssdk.services.s3.model.LifecycleExpiration;
19+
import software.amazon.awssdk.services.s3.model.LifecycleRule;
20+
import software.amazon.awssdk.services.s3.model.LifecycleRuleFilter;
21+
import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
22+
import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse;
23+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
24+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
25+
import software.amazon.awssdk.services.s3.model.PutBucketLifecycleConfigurationRequest;
26+
import software.amazon.awssdk.services.sqs.SqsClient;
27+
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
28+
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
29+
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
30+
import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest;
31+
import software.amazon.awssdk.services.sqs.model.Message;
32+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
33+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
34+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
35+
36+
import java.util.Arrays;
37+
import java.util.List;
38+
import java.util.UUID;
39+
40+
/**
41+
* Example of using Amazon SQS Extended Client Library for Java 2.x.
42+
*/
43+
public class SqsExtendedClientExample {
44+
private static final Logger logger = LoggerFactory.getLogger(SqsExtendedClientExample.class);
45+
46+
private String s3BucketName;
47+
private String queueUrl;
48+
private final String queueName;
49+
private final S3Client s3Client;
50+
private final SqsClient sqsExtendedClient;
51+
private final int messageSize;
52+
53+
/**
54+
* Constructor with default clients and message size.
55+
*/
56+
public SqsExtendedClientExample() {
57+
this(S3Client.create(), 300000);
58+
}
59+
60+
/**
61+
* Constructor with custom S3 client and message size.
62+
*
63+
* @param s3Client The S3 client to use
64+
* @param messageSize The size of the test message to create
65+
*/
66+
public SqsExtendedClientExample(S3Client s3Client, int messageSize) {
67+
this.s3Client = s3Client;
68+
this.messageSize = messageSize;
69+
70+
// Generate a unique bucket name.
71+
this.s3BucketName = UUID.randomUUID() + "-" +
72+
DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime());
73+
74+
// Generate a unique queue name.
75+
this.queueName = "MyQueue-" + UUID.randomUUID();
76+
77+
// Configure the SQS extended client.
78+
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
79+
.withPayloadSupportEnabled(s3Client, s3BucketName);
80+
81+
this.sqsExtendedClient = new AmazonSQSExtendedClient(SqsClient.builder().build(), extendedClientConfig);
82+
}
83+
84+
public static void main(String[] args) {
85+
SqsExtendedClientExample example = new SqsExtendedClientExample();
86+
try {
87+
example.setup();
88+
example.sendAndReceiveMessage();
89+
} finally {
90+
example.cleanup();
91+
}
92+
}
93+
94+
/**
95+
* Send a large message and receive it back.
96+
*
97+
* @return The received message
98+
*/
99+
public Message sendAndReceiveMessage() {
100+
try {
101+
// Create a large message.
102+
char[] chars = new char[messageSize];
103+
Arrays.fill(chars, 'x');
104+
String largeMessage = new String(chars);
105+
106+
// Send the message.
107+
final SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
108+
.queueUrl(queueUrl)
109+
.messageBody(largeMessage)
110+
.build();
111+
112+
sqsExtendedClient.sendMessage(sendMessageRequest);
113+
logger.info("Sent message of size: {}", largeMessage.length());
114+
115+
// Receive and return the message.
116+
final ReceiveMessageResponse receiveMessageResponse = sqsExtendedClient.receiveMessage(
117+
ReceiveMessageRequest.builder().queueUrl(queueUrl).build());
118+
119+
List<Message> messages = receiveMessageResponse.messages();
120+
if (messages.isEmpty()) {
121+
throw new RuntimeException("No messages received");
122+
}
123+
124+
Message message = messages.getFirst();
125+
logger.info("\nMessage received.");
126+
logger.info(" ID: {}", message.messageId());
127+
logger.info(" Receipt handle: {}", message.receiptHandle());
128+
logger.info(" Message body size: {}", message.body().length());
129+
logger.info(" Message body (first 5 characters): {}", message.body().substring(0, 5));
130+
131+
return message;
132+
} catch (RuntimeException e) {
133+
logger.error("Error during message processing: {}", e.getMessage(), e);
134+
throw e;
135+
}
136+
}
137+
// snippet-end:[sqs.java2.sqs-extended-client.main]
138+
/**
139+
* Set up the S3 bucket and SQS queue.
140+
*/
141+
public void setup() {
142+
try {
143+
// Create and configure the S3 bucket.
144+
createAndConfigureS3Bucket();
145+
146+
// Create the SQS queue.
147+
createSqsQueue();
148+
} catch (RuntimeException e) {
149+
logger.error("Error during setup: {}", e.getMessage(), e);
150+
cleanup(); // Clean up any resources that were created before the error
151+
throw e;
152+
}
153+
}
154+
155+
/**
156+
* Clean up all AWS resources
157+
*/
158+
public void cleanup() {
159+
try {
160+
// Delete the queue if it was created
161+
if (queueUrl != null) {
162+
sqsExtendedClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build());
163+
logger.info("Deleted the queue: {}", queueUrl);
164+
queueUrl = null;
165+
}
166+
167+
// Delete the S3 bucket and its contents if it was created
168+
if (s3BucketName != null) {
169+
deleteBucketAndAllContents();
170+
logger.info("Deleted the bucket: {}", s3BucketName);
171+
s3BucketName = null;
172+
}
173+
} catch (RuntimeException e) {
174+
logger.error("Error during cleanup: {}", e.getMessage(), e);
175+
}
176+
}
177+
178+
/**
179+
* Create and configure the S3 bucket with lifecycle rules
180+
*/
181+
private void createAndConfigureS3Bucket() {
182+
final LifecycleRule lifeCycleRule = LifecycleRule.builder()
183+
.expiration(LifecycleExpiration.builder().days(14).build())
184+
.filter(LifecycleRuleFilter.builder().prefix("").build())
185+
.status(ExpirationStatus.ENABLED)
186+
.build();
187+
188+
final BucketLifecycleConfiguration lifecycleConfig = BucketLifecycleConfiguration.builder()
189+
.rules(lifeCycleRule)
190+
.build();
191+
192+
s3Client.createBucket(CreateBucketRequest.builder().bucket(s3BucketName).build());
193+
s3Client.putBucketLifecycleConfiguration(PutBucketLifecycleConfigurationRequest.builder()
194+
.bucket(s3BucketName)
195+
.lifecycleConfiguration(lifecycleConfig)
196+
.build());
197+
198+
logger.info("Bucket created and configured: {}", s3BucketName);
199+
}
200+
201+
/**
202+
* Create the SQS queue
203+
*/
204+
private void createSqsQueue() {
205+
final CreateQueueResponse createQueueResponse = sqsExtendedClient.createQueue(
206+
CreateQueueRequest.builder().queueName(queueName).build());
207+
queueUrl = createQueueResponse.queueUrl();
208+
logger.info("Queue created: {}", queueUrl);
209+
}
210+
211+
/**
212+
* Delete the message from the SQS queue
213+
*
214+
* @param message The message to delete
215+
*/
216+
public void deleteMessage(Message message) {
217+
sqsExtendedClient.deleteMessage(
218+
DeleteMessageRequest.builder()
219+
.queueUrl(queueUrl)
220+
.receiptHandle(message.receiptHandle())
221+
.build());
222+
223+
logger.info("Deleted the message: {}", message.messageId());
224+
}
225+
226+
/**
227+
* Delete the S3 bucket and all its contents
228+
*/
229+
private void deleteBucketAndAllContents() {
230+
ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(
231+
ListObjectsV2Request.builder().bucket(s3BucketName).build());
232+
233+
listObjectsResponse.contents().forEach(object -> {
234+
s3Client.deleteObject(DeleteObjectRequest.builder()
235+
.bucket(s3BucketName)
236+
.key(object.key())
237+
.build());
238+
logger.info("Deleted S3 object: {}", object.key());
239+
});
240+
241+
ListObjectVersionsResponse listVersionsResponse = s3Client.listObjectVersions(
242+
ListObjectVersionsRequest.builder().bucket(s3BucketName).build());
243+
244+
listVersionsResponse.versions().forEach(version -> s3Client.deleteObject(DeleteObjectRequest.builder()
245+
.bucket(s3BucketName)
246+
.key(version.key())
247+
.versionId(version.versionId())
248+
.build()));
249+
250+
s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(s3BucketName).build());
251+
}
252+
253+
/**
254+
* Get the S3 bucket name
255+
*
256+
* @return The S3 bucket name
257+
*/
258+
public String getS3BucketName() {
259+
return s3BucketName;
260+
}
261+
262+
/**
263+
* Get the SQS queue URL
264+
*
265+
* @return The SQS queue URL
266+
*/
267+
public String getQueueUrl() {
268+
return queueUrl;
269+
}
270+
271+
/**
272+
* Get the SQS queue name
273+
*
274+
* @return The SQS queue name
275+
*/
276+
public String getQueueName() {
277+
return queueName;
278+
}
279+
280+
}

0 commit comments

Comments
 (0)