添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Large Messages

The large message utility handles SQS and SNS messages which have had their payloads offloaded to S3 if they are larger than the maximum allowed size (256 KB).

Notice

The large message utility (available in the powertools-sqs module for versions v1.16.1 and earlier) is now deprecated and replaced by the powertools-large-messages described in this page. You can still get the documentation here and the migration guide here .

Features

  • Automatically retrieve the content of S3 objects when SQS or SNS messages have been offloaded to S3.
  • Automatically delete the S3 Objects after processing succeeds.
  • Compatible with the batch module (with SQS).
  • Background

    stateDiagram-v2
        direction LR
        Function : Lambda Function
        state Application {
            direction TB
            sendMsg: sendMessage(QueueUrl, MessageBody)
            extendLib: extended-client-lib
            [*] --> sendMsg
            sendMsg --> extendLib
            state extendLib {
                state if_big <<choice>>
                bigMsg: MessageBody > 256KB ?
                putObject: putObject(S3Bucket, S3Key, Body)
                updateMsg: Update MessageBody<br>with a pointer to S3<br>and add a message attribute
                bigMsg --> if_big
                if_big --> [*]: size(body) <= 256kb
                if_big --> putObject: size(body) > 256kb
                putObject --> updateMsg
                updateMsg --> [*]
        state Function {
            direction TB
            iterateMsgs: Iterate over messages
            ptLargeMsg: powertools-large-messages
            [*] --> Handler
            Handler --> iterateMsgs
            iterateMsgs --> ptLargeMsg
            state ptLargeMsg {
                state if_pointer <<choice>>
                pointer: Message attribute <br>for large message ?
                normalMsg: Small message,<br>body left unchanged
                getObject: getObject(S3Pointer)
                deleteObject: deleteObject(S3Pointer)
                updateBody: Update message body<br>with content from S3 object<br>and remove message attribute
                updateMD5: Update MD5 of the body<br>and attributes (SQS only)
                yourcode: <b>YOUR CODE HERE!</b>
                pointer --> if_pointer
                if_pointer --> normalMsg : False
                normalMsg --> [*]
                if_pointer --> getObject : True
                getObject --> updateBody
                updateBody --> updateMD5
                updateMD5 --> yourcode
                yourcode --> deleteObject
                deleteObject --> [*]
        [*] --> Application
        Application --> Function : Lambda Invocation
        Function --> [*]
    

    SQS and SNS message payload is limited to 256KB. If you wish to send messages with a larger payload, you can leverage the amazon-sqs-java-extended-client-lib or amazon-sns-java-extended-client-lib which offload the message to Amazon S3. See documentation (SQS / SNS)

    When offloaded to S3, the message contains a specific message attribute and the payload only contains a pointer to the S3 object (bucket and object key).

    This utility automatically retrieves messages which have been offloaded to S3 using the extended client libraries. Once a message's payload has been processed successfully, the utility deletes the payload from S3.

    This utility is compatible with versions 1.1.0+ of amazon-sqs-java-extended-client-lib and 1.0.0+ of amazon-sns-java-extended-client-lib.

    Install

    Depending on your version of Java (either Java 1.8 or 11+), the configuration slightly changes.

    <dependency> <groupId>software.amazon.lambda</groupId> <artifactId>powertools-large-messages</artifactId> <version>1.18.0</version> </dependency> </dependencies> <!-- configure the aspectj-maven-plugin to compile-time weave (CTW) the aws-lambda-powertools-java aspects into your project --> <build> <plugins> <plugin> <groupId>dev.aspectj</groupId> <artifactId>aspectj-maven-plugin</artifactId> <version>1.13.1</version> <configuration> <source>11</source> <!-- or higher --> <target>11</target> <!-- or higher --> <complianceLevel>11</complianceLevel> <!-- or higher --> <aspectLibraries> <aspectLibrary> <groupId>software.amazon.lambda</groupId> <artifactId>powertools-large-messages</artifactId> </aspectLibrary> </aspectLibraries> </configuration> <executions> <execution> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
    40
    <dependencies>
        <dependency>
            <groupId>software.amazon.lambda</groupId>
            <artifactId>powertools-large-messages</artifactId>
            <version>1.18.0</version>
        </dependency>
    </dependencies>
    <!-- configure the aspectj-maven-plugin to compile-time weave (CTW) the aws-lambda-powertools-java aspects into your project -->
    <build>
        <plugins>
            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>aspectj-maven-plugin</artifactId>
                 <version>1.14.0</version>
                 <configuration>
                     <source>1.8</source>
                     <target>1.8</target>
                     <complianceLevel>1.8</complianceLevel>
                     <aspectLibraries>
                         <aspectLibrary>
                             <groupId>software.amazon.lambda</groupId>
                             <artifactId>powertools-large-messages</artifactId>
                         </aspectLibrary>
                     </aspectLibraries>
                 </configuration>
                 <executions>
                     <execution>
                         <goals>
                             <goal>compile</goal>
                         </goals>
                     </execution>
                 </executions>
            </plugin>
        </plugins>
    </build>
    15
        plugins {
            id 'java'
            id 'io.freefair.aspectj.post-compile-weaving' version '8.1.0'
        repositories {
            mavenCentral()
        dependencies {
            aspect 'software.amazon.lambda:powertools-large-messages:1.18.0'
        sourceCompatibility = 11 // or higher
        targetCompatibility = 11 // or higher
    15
        plugins {
            id 'java'
            id 'io.freefair.aspectj.post-compile-weaving' version '6.6.3'
        repositories {
            mavenCentral()
        dependencies {
            aspect 'software.amazon.lambda:powertools-large-messages:1.18.0'
        sourceCompatibility = 1.8
        targetCompatibility = 1.8
    

    Permissions

    As the utility interacts with Amazon S3, the lambda function must have the following permissions on the S3 bucket used for the large messages offloading:

  • s3:GetObject
  • s3:DeleteObject
  • Annotation

    The annotation @LargeMessage can be used on any method where the first parameter is one of:

  • SQSEvent.SQSMessage
  • SNSEvent.SNSRecord
  • 17
    import software.amazon.lambda.powertools.largemessages.LargeMessage;
    public class SqsMessageHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
        @Override
        public SQSBatchResponse handleRequest(SQSEvent event, Context context) {
            for (SQSMessage message: event.getRecords()) {
                processRawMessage(message, context);
            return SQSBatchResponse.builder().build();
        @LargeMessage
        private void processRawMessage(SQSEvent.SQSMessage sqsMessage, Context context) {
            // sqsMessage.getBody() will contain the content of the S3 object
    15
    import software.amazon.lambda.powertools.largemessages.LargeMessage;
    public class SnsRecordHandler implements RequestHandler<SNSEvent, String> {
        @Override
        public String handleRequest(SNSEvent event, Context context) {
            processSNSRecord(event.records.get(0)); // there are always only one message 
            return "Hello World";
        @LargeMessage
        private void processSNSRecord(SNSEvent.SNSRecord snsRecord) {
            // snsRecord.getSNS().getMessage() will contain the content of the S3 object
    

    When the Lambda function is invoked with a SQS or SNS event, the utility first checks if the content was offloaded to S3. In the case of a large message, there is a message attribute specifying the size of the offloaded message and the message contains a pointer to the S3 object.

    If this is the case, the utility will retrieve the object from S3 using the getObject(bucket, key) API, and place the content of the object in the message payload. You can then directly use the content of the message. If there was an error during the S3 download, the function will fail with a LargeMessageProcessingException.

    After your code is invoked and returns without error, the object is deleted from S3 using the deleteObject(bucket, key) API. You can disable the deletion of S3 objects with the following configuration:

    4
    @LargeMessage(deleteS3Object = false)
    private void processRawMessage(SQSEvent.SQSMessage sqsMessage) {
        // do something with the message
    

    Use together with batch module

    This utility works perfectly together with the batch module (powertools-batch), especially for SQS:

    Combining batch and large message modules
     1
    19
    public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
        private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
        public SqsBatchHandler() {
            handler = new BatchMessageHandlerBuilder()
                    .withSqsBatchHandler()
                    .buildWithRawMessageHandler(this::processMessage);
        @Override
        public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
            return handler.processBatch(sqsEvent, context);
        @LargeMessage
        private void processMessage(SQSEvent.SQSMessage sqsMessage) {
            // do something with the message
    

    Use together with idempotency module

    This utility also works together with the idempotency module (powertools-idempotency). You can add both the @LargeMessage and @Idempotent annotations, in any order, to the same method. The @Idempotent takes precedence over the @LargeMessage annotation. It means Idempotency module will use the initial raw message (containing the S3 pointer) and not the large message.

    Combining idempotency and large message modules
     1
    28
    public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
        public SqsBatchHandler() {
            Idempotency.config().withConfig(
                        IdempotencyConfig.builder()
                                .withEventKeyJMESPath("body") // get the body of the message for the idempotency key
                                .build())
                .withPersistenceStore(
                        DynamoDBPersistenceStore.builder()
                                .withTableName(System.getenv("IDEMPOTENCY_TABLE"))
                                .build()
                ).configure();
        @Override
        public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
            for (SQSMessage message: event.getRecords()) {
                processRawMessage(message, context);
            return SQSBatchResponse.builder().build();
        @Idempotent
        @LargeMessage
        private String processRawMessage(@IdempotencyKey SQSEvent.SQSMessage sqsMessage, Context context) {
            // do something with the message
    

    Customizing S3 client configuration

    To interact with S3, the utility creates a default S3 Client :

    4
    S3Client client = S3Client.builder()
                        .httpClient(UrlConnectionHttpClient.builder().build())
                        .region(Region.of(System.getenv(AWS_REGION_ENV)))
                        .build();
    

    If you need to customize this S3Client, you can leverage the LargeMessageConfig singleton:

    19
    import software.amazon.lambda.powertools.largemessages.LargeMessage;
    public class SnsRecordHandler implements RequestHandler<SNSEvent, String> {
        public SnsRecordHandler() {
            LargeMessageConfig.init().withS3Client(/* put your custom S3Client here */);
        @Override
        public String handleRequest(SNSEvent event, Context context) {
            processSNSRecord(event.records.get(0)); 
            return "Hello World";
        @LargeMessage
        private void processSNSRecord(SNSEvent.SNSRecord snsRecord) {
            // snsRecord.getSNS().getMessage() will contain the content of the S3 object
    

    Migration from the SQS Large Message utility

  • Replace the dependency in maven / gradle: powertools-sqs ==> powertools-large-messages
  • Replace the annotation: @SqsLargeMessage ==> @LargeMessage (the new module handles both SQS and SNS)
  • Move the annotation away from the Lambda handleRequest method and put it on a method with SQSEvent.SQSMessage or SNSEvent.SNSRecord as first parameter.
  • The annotation now handles a single message, contrary to the previous version that was handling the complete batch. It gives more control, especially when dealing with partial failures with SQS (see the batch module).
  • The new module only provides an annotation, an equivalent to the SqsUtils class is not available anymore in this new version.
  • Finally, if you are still using the powertools-sqs library for batch processing, consider moving to powertools-batch at the same time to remove the dependency on this library completely; it has been deprecated and will be removed in v2.