๐ AWS CDK 101 - ๐ Using batched dynamodb stream to delete item on another dynamodb table
Table of contents
- Benefits in this approach ๐ฆ
- Planning ๐
- Construction ๐ฎ
- New handler function logic ๐
- Helper function dynamodb deleteItem ๐
- Minor changes to the dynamodb table definition ๐
- Defining the new lambda ๐
- Grant permission for the handler to write to the table ๐จ
- Adding Event source to the lambda function ๐ฝ
- Sample dynamodb stream object ๐ฅฃ
- Console log during execution ๐ฟ
๐ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.
If in case missed my previous article, do find it with the below links.
๐ Original previous post at ๐ Dev Post
๐ Reposted previous post at ๐ dev to @aravindvcyber
In this article, let us add a new async batch integration to our message dynamodb table which will help us to delete the processed records from the staging dynamodb table.
Benefits in this approach ๐ฆ
- In this approach we tried to clear the staging table data, asynchronously using the dynamodb stream directly invoking a lambda.
- So we can use this in systems where we may not directly do the scavenging synchronous in a compute-intensive workload. Saving compute by avoiding wait time for these I/O.
- Again, some may argue that anyway I am making a lambda call elsewhere separately, which could be thought of as a good decoupling strategy as well.
- But invocation charges and compute still apply there. Yet, we need no one more thing that the streams can be ready in batches by our lambda and the default is 100, so one invocation.
- At the same time, the handler can do a batch request to delete the data from the dynamodb in a single request.
- So not only you can read a batch of streams, but you can also perform batch delete which, is fast as well as we get this delete operation into chunks of 25 max limit which gives a great reduction in the number of I/O operations from lambda.
- Maybe I will write a separate article later about it, for now, it is not limited to the 1:1 delete operation here.
Planning ๐
Here we will be making use of the dynamodb streams to trigger the deleteItem
action by invoking a new lambda handler function which achieves our objective.
Construction ๐ฎ
We need a new lambda function code that has a fresh handler to receive the dynamodb stream event object and process it as shown below.
Create a new lambda function in the file lambda/message-stream.ts
.
Here you may find that we are targeting an event name to be INSERT
, likewise we can have finer control over our desired outcome during these stream invocations as shown below.
New handler function logic ๐
exports.created = async function (event: any) {
console.log("Received stream:", JSON.stringify(event, undefined, 2));
const results: any[] = [];
await Promise.all(
event.Records.map(async (Record: any) => {
console.log(JSON.stringify(Record, undefined, 2));
if (Record.eventName === "INSERT") {
results.push(await deleteDbItem(Record.dynamodb.Keys));
}
})
);
results.map((res) => console.log(res));
};
Helper function dynamodb deleteItem ๐
Simple helper function to perform deleteItem from a dynamodb table.
const deleteDbItem: any = async (keys: any) => {
console.log("Deleting: ", { keys });
const deleteData: DeleteItemInput = {
TableName: process.env.STAGING_MESSAGES_TABLE_NAME || "",
Key: keys,
ReturnConsumedCapacity: "TOTAL",
};
console.log("deleteItem: ", JSON.stringify(deleteData, undefined, 2));
return await dynamo.deleteItem(deleteData).promise();
};
Minor changes to the dynamodb table definition ๐
I have highlighted the necessary changes, we need to perform dynamodb stream generation for our table.
Most importantly, I have requested both the new and old images, which will have all the necessary data, however, we are not going to use all of the data. This is only for demonstration purposes.
const messages = new dynamodb.Table(this, "MessagesTable", {
tableName: process.env.messagesTable,
sortKey: { name: "createdAt", type: dynamodb.AttributeType.NUMBER },
partitionKey: { name: "messageId", type: dynamodb.AttributeType.STRING },
encryption: dynamodb.TableEncryption.AWS_MANAGED,
readCapacity: 5,
writeCapacity: 5,
//New item added below
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES
});
Defining the new lambda ๐
Here we will use the code bloc used above to provision our lambda as shown below inside our master stack.
const messageStreamFunc = new lambda.Function(this, "messageStreamFunc", {
runtime: lambda.Runtime.NODEJS_14_X,
code: lambda.Code.fromAsset("lambda"),
handler: "message-stream.created",
logRetention: logs.RetentionDays.ONE_MONTH,
tracing: Tracing.ACTIVE,
layers: [nodejsUtils,nodejsXray],
environment: {
STAGING_MESSAGES_TABLE_NAME: envParams.messages.stgTableName || "",
},
});
messageStreamFunc.applyRemovalPolicy(RemovalPolicy.DESTROY);
Grant permission for the handler to write to the table ๐จ
Also, our handler must have sufficient access to delete from the other dynamodb table stgMessages
.
stgMessages.grantWriteData(messageStreamFunc);
Adding Event source to the lambda function ๐ฝ
It is time not to connect the handler function to the dynamodb event source as follows.
messageStreamFunc.addEventSource(
new DynamoEventSource(messages, {
startingPosition: lambda.StartingPosition.LATEST,
})
)
or you can improve the batch-size and window in seconds for long polling as follows.
messageStreamFunc.addEventSource(new DynamoEventSource(messages, {
startingPosition: lambda.StartingPosition.LATEST,
batchSize:100,
maxBatchingWindow: Duration.seconds(60)
}))
Sample dynamodb stream object ๐ฅฃ
I have shared the dynamodb stream object used as payload to invoke our handler lambda below.
{
"Records": [
{
"eventID": "5c9aa5395f970324e088a32578ee0a66",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-south-1",
"dynamodb": {
"ApproximateCreationDateTime": 1652355068,
"Keys": {
"createdAt": {
"N": "1652355039000"
},
"messageId": {
"S": "47d97141-01e7-42a1-b6b3-0c59a6a3827e"
}
},
"NewImage": {
"createdAt": {
"N": "1652355039000"
},
"messageId": {
"S": "47d97141-01e7-42a1-b6b3-0c59a6a3827e"
},
"event": {
"S": "{\n \"message\": {\"new\": \"A secret message\"}\n}"
}
},
"SequenceNumber": "20927100000000035334874026",
"SizeBytes": 173,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "arn:aws:dynamodb:ap-south-1:8888888:table/MessagesTable/stream/2022-05-*****"
}
]
}
Console log during execution ๐ฟ
Finally post-execution, we could find the above JSON payload we have received in the event
object and which is then used to delete from our staging table you may find the results below in cloud watch logs.
In the next article, we will demonstrate how we will use a similar approach to delete Object from S3, which we have previously created.
We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.
โญ We have our next article in serverless, do check out
๐ Thanks for supporting! ๐
Would be great if you like to โ Buy Me a Coffee, to help boost my efforts.
๐ Original post at ๐ Dev Post
๐ Reposted at ๐ dev to @aravindvcyber
๐ AWS CDK 101 - ๐ Using batched dynamodb stream to delete item on another dynamodb table
โ Aravind V (@Aravind_V7) May 14, 2022
@hashnode
Checkout more like this in my pagehttps://t.co/CuYxnKr0Ig#TheHashnodeWriteathon#awscdk #aws #awslambda #thwcloud-computing https://t.co/RlrahA0wrI