Follow

Follow
๐Ÿฌ AWS CDK 101 - ๐Ÿ”ฌ Adding Queue to buffer our stepfunction directly invoking lambda

๐Ÿฌ AWS CDK 101 - ๐Ÿ”ฌ Adding Queue to buffer our stepfunction directly invoking lambda

Aravind V's photo
Aravind V
ยทApr 18, 2022ยท

7 min read

Play this article

Table of contents

๐Ÿ”ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.

If in case missed my previous article, do find it with the below links.

๐Ÿ” Original previous post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted previous post at ๐Ÿ”— dev to @aravindvcyber

In this article, let us refactor our previous stepfunction which directly invokes our lambda function by adding a similar queue that triggers lambda indirectly with a batch size limit. This could help us achieve better optimization in lambda concurrency like one in our previous article. scalable-event-driven-processing-using-eventbridge-and-sqs

Current flow โ˜˜๏ธ

statemachine workflow

Proposed ๐Ÿ€

statemachine workflow

Benefits achieved in this approach ๐Ÿšฃโ€โ™€๏ธ

There are multiple benefits associated with this approach as follows.

  • We can detach the direct invocation of our lambda by Stepfunction thereby triggering an indirect/buffered invocation from a standard SQS queue.

  • With this approach, we will be able to have the lambda concurrency limit at a minimum by batching the lambda triggers.

  • Also if you remember one thing our lambda is supposed to make a dynamodb write operation, which if running in high concurrency would need more write units while using our dynamodb table. With this approach, we can use maintain and utilize consistent write units in our table.

  • Besides this, we will be able to scale this easily in higher environments.

  • Also we can also make use of the dead letter queues for debugging and inspection.

Refining the queue used in our previous construct ๐Ÿ”–

Let us edit our previous construct construct/sfn-simple.ts by adding definitions for the dead letter queue and the working queue as follows.

Dead letter Queue ๐ŸŽพ

The dead letter queue needs to catch hold of the failed messages, so the working queue always has records to be processed.

 const sfnCommonEventProcessorQueueDLQ: DeadLetterQueue = {
      queue: new Queue(this, "sfnCommonEventProcessorQueueDLQ", {
        retentionPeriod: Duration.days(14),
        removalPolicy: RemovalPolicy.DESTROY,
        queueName: "sfnCommonEventProcessorQueueDLQ",
      }),
      maxReceiveCount: 100,
};

Standard Queue ๐ŸŠ

The standard queue can also be a FIFO queue based on the cost and business requirements as shown below.


const sfnCommonEventProcessorQueue = new Queue(
      this,
      "sfnCommonEventProcessorQueue",
      {
        retentionPeriod: Duration.days(5),
        removalPolicy: RemovalPolicy.DESTROY,
        deliveryDelay: Duration.seconds(3),
        queueName: "sfnCommonEventProcessorQueue",
        visibilityTimeout: Duration.minutes(100),
        deadLetterQueue: sfnCommonEventProcessorQueueDLQ,
      }
);

New sfn queues

New Job to push messages into our Queue ๐ŸŽฏ

Just like how we defined a new lambda invoke job, we can define a new job that will push a message into a queue that will be later polled by our existing lambda asynchronously.

You could also find that we have fully reused the payload, inputPath, resultPath, resultSelector, etc.

const recordingQueue = new tasks.SqsSendMessage(
      this,
      "Record using Queue",
      {
        inputPath: "$",
        messageBody: sfnTaskPayload,
        queue: sfnCommonEventProcessorQueue,
        comment: "Record message into dynamodb using SQS queue buffered lambda",
        integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
        resultSelector: {
          "Payload.$": "$",
          "StatusCode.$": "$.statusCode",
        },
        resultPath: "$.recordResult",
      }
);

Adding event source for the existing lambda โ›บ

Now we will connect our new queue to our existing lambda inside the same construct.

As you can see that we have set a batch size of the queue this is even though there is n number of state functions running concurrently. You may also use a FIFO queue if find that the order of messages is very important while recording and processing.

triggerFunction.addEventSource(
      new SqsEventSource(sfnCommonEventProcessorQueue, {
        batchSize: 2,
        maxBatchingWindow: Duration.seconds(10)
      })
    );

Here batchSize is the maximum number of records to retrieve per invocation and maxBatchingWindow defines the max time to wait before returning the records to lambda for processing. If not defined, the lambda returns immediately with as many records available under the batchSize.

lambda trigger

Callback Pattern ๐Ÿ‘“

Once again if you notice something, we have implemented the callback pattern, where we pause the execution of the stepfunction after we pushed the message to SQS and wait for lambda to post updates to the statemachine once it has done processing resulting in success or failure.

With this approach, we could understand where the actual work happening, and not keep polling for updates again and again until the result is ready.

callback pattern illustration โ™ฆ๏ธ

callback

callback pattern illustration when succeed ๐Ÿ’Ž

callback success

Changes in the lambda function ๐ŸŽผ

The current changes we have made look sufficient at first sight, but they will fail, this is because the payload lambda current gets will be different from the old invocation. This is because while SQS is polled by lambda, the actual message will be inside an object which contains an array of event records. And inside the event record, the body property will be giving the actual message, when we passed as payload in our previous article. So certainly we need to tweak our lambda a little bit as follows.

{
    "Records": [
        {
            "messageId": "ff11de7f-795a-4c85-8612-9d376e01df0d",
            "receiptHandle": "**********",
            "body": "{\"Record\":{\"createdAt\":\"2022-04-18T07:51:06Z\",\"messageId\":\"538a8436-9987-1f65-478d-815c77f00d0f\",\"event\":{\"message\":\"A secret message\"}},\"MyTaskToken\":\"**********\"}",
            "attributes": {
                "ApproximateReceiveCount": "3",
                "SentTimestamp": "1650268267274",
                "SenderId": "AROAYLZFFS6HZ3ESZEEBH",
                "ApproximateFirstReceiveTimestamp": "1650268270274"
            },
            "messageAttributes": {},
            "md5OfBody": "edddae585d76eb14439f5804dcef24a4",
            "eventSource": "aws:sqs",
            "eventSourceARN": "**********",
            "awsRegion": "ap-south-1"
        }
    ]
}

Rewriting the lambda code ๐Ÿ“

So, let us write the message recorder lambda function as shown below taking note of the above payload changes.

Let us think this gives us an additional step, rather this step helps us to process writing to lambda in a batch with records of messages during the polling process.

import { PutItemInput } from "aws-sdk/clients/dynamodb";
import { DynamoDB, StepFunctions } from "aws-sdk";
const sfn = new StepFunctions({ apiVersion: "2016-11-23" });
exports.processor = async function (event: any) {
  const dynamo = new DynamoDB();
  let result: any | undefined = undefined;
  await Promise.all(
    event.Records.map(async (Record: any) => {

      const msg = JSON.parse(Record.body).Record;
      const crt_time: number = new Date(msg.createdAt).getTime();
      const putData: PutItemInput = {
        TableName: process.env.MESSAGES_TABLE_NAME || "",
        Item: {
          messageId: { S: msg.messageId },
          createdAt: { N: `${crt_time}` },
          event: { S: JSON.stringify(msg.event) },
        },
        ReturnConsumedCapacity: "TOTAL",
      }; 
      try {
        result = await dynamo.putItem(putData).promise();
      } catch (err) {
        const sendFailure: StepFunctions.SendTaskFailureInput = {
          error: JSON.stringify(err),
          cause: JSON.stringify({
            statusCode: 500,
            headers: { "Content-Type": "text/json" },
            putStatus: {
              messageId: msg.messageId,
              ProcessorResult: err,
            },
          }),
          taskToken: JSON.parse(Record.body).MyTaskToken,
        };
        console.log(sendFailure);
        await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
          if (err) console.log(err, err.stack); 
          else console.log(data); 
        });
        return sendFailure;
      }
      const sendSuccess: StepFunctions.SendTaskSuccessInput = {
        output: JSON.stringify({
          statusCode: 200,
          headers: { "Content-Type": "text/json" },
          putStatus: {
            messageId: msg.messageId,
            ProcessorResult: result,
          },
        }),
        taskToken: JSON.parse(Record.body).MyTaskToken,
      };

      console.log(sendSuccess);

      await sfn
        .sendTaskSuccess(sendSuccess, function (err: any, data: any) {
          if (err) console.log(err, err.stack); 
          else console.log(data); 
        })
        .promise();

      return sendSuccess;
    })
  );
};

lambda changes summary ๐Ÿ‹

exports.processor = async function (event: any) {
  const dynamo = new DynamoDB();
  let result: any | undefined = undefined;
  await Promise.all(
    event.Records.map(async (Record: any) => {  
      const msg = JSON.parse(Record.body).Record;
      /////rest the same like previous article
    })
  );
};

TaskToken changes ๐Ÿ†

Besides the taskToken has been also changed, inside the lambda from previous example to the below value.

Before

taskToken: Record.MyTaskToken,

After

taskToken: JSON.parse(Record.body).MyTaskToken,

Stepfunction Timeout ๐ŸŒผ

Note that the timeout set for the stepfunction will cancel and exit the execution if it is waiting for more time than the defined stepfunction timeout value.

b799d9d4-5c32-5f42-89bb-830315b023f7    INFO    TaskTimedOut: Task Timed Out: 'Provided task does not exist anymore'
    at Request.extractError (/var/runtime/node_modules/aws-sdk/lib/protocol/json.js:52:27)
    at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
    at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:78:10)
    at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:686:14)
    at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10)
    at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12)
    at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10
    at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9)
    at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:688:12)
    at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:116:18) {
  code: 'TaskTimedOut',

Deploying our changes ๐Ÿช

While you deploy your changes you may get a prompt to accept the IAM change for granting the necessary privileges for sfn to send messages to SQS, similar to what we explicitly granted to lambda in our last article.

IAM policy changes

sqs to lambda policy

Sample execution log ๐ŸŒฐ

Execution log

Thus we have achieved the purpose of this article to make the stepfunction to push the messages as records into a queue, which is later polled by the lambda and processed further to return the status to the statemachine. Hence this architecture is much more scalable than the previous one with the direct invocation of the lambda function.

We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.

โญ We have our next article in serverless, do check out

๐ŸŽ‰ Thanks for supporting! ๐Ÿ™

Would be great if you like to โ˜• Buy Me a Coffee, to help boost my efforts.

๐Ÿ” Original post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted at ๐Ÿ”— dev to @aravindvcyber

๐Ÿฌ AWS CDK 101 - ๐Ÿ”ฌ Adding Queue to buffer our stepfunction directly invoking lambda#typescript #serverless #messagequeue #awscdk #aws https://t.co/Fm785cuPf8

โ€” Aravind V (@Aravind_V7) April 19, 2022

Did you find this article valuable?

Support Aravind V by becoming a sponsor. Any amount is appreciated!

See recent sponsors |ย Learn more about Hashnode Sponsors
ย 
Share this