Table of contents
- Benefits of using S3 data โ๏ธ
- Construction Plan ๐
- Refactoring put operation inside the lambda into a helper function ๐
- Refactoring the sendSuccess Helper ๐ฎ
- Refractoring the sendFailure Helper ๐
- Granting read Object access to lambda ๐
- Changes inside the lambda ๐
- Removing the event content from statemachine invocation payload ๐ญ
- Integration testing ๐
- Keypoint async-await ๐
๐ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.
If in case missed my previous article, do find it with the below links.
๐ Original previous post at ๐ Dev Post
๐ Reposted previous post at ๐ dev to @aravindvcyber
In this article, let us refactor one of our previous step functions which invoked lambda which puts records to dynamo using full message from the event data, into one which can read message data from s3 with keys during the event data. Also, this is a direct follow-up to my previous article mentioned above.
Benefits of using S3 data โ๏ธ
These are not the only benefits, it is just only what I just observed in my deployments.
Using S3 data directly will make the event detail payload always of the same payload size and very light
While a huge message is pumped into the api gateway S3 will act as the place to capture the full message payload and only the s3 object identifier is pushed into the event detail so that we are not pushing a lot of into the actual moving parts.
This also helps to simply preprocess the actual message received by checking for the format, data integrity, transformation, and even antivirus scanning.
S3 in itself can trigger various event-driven actions asynchronously by its event notifications.
Pumping the event details into the step functions will be now much more efficient as we are no longer worried about the size of the data.
Also here we have using S3 as staging, if we need high performance and high throughput we can change it to dynamodb for storing this staging data.
Construction Plan ๐
As we mentioned earlier we are trying to read a JSON from s3, from our message recorder lambda function which we have refined in our previous articles for writing to dynamodb.
import { S3 } from "aws-sdk";
const s3 = new S3();
Refactoring put operation inside the lambda into a helper function ๐
We have refactored the previously used message recorder function for better reuse of the components as shown below.
const dbPut: any = async (Record: any, msg: any) => {
const dynamo = new DynamoDB();
const crt_time: number = new Date(msg.createdAt).getTime();
const putData: PutItemInput = {
TableName: process.env.MESSAGES_TABLE_NAME || "",
Item: {
messageId: { S: msg.messageId },
createdAt: { N: `${crt_time}` },
event: { S: msg.event },
},
ReturnConsumedCapacity: "TOTAL",
};
console.log("putData", JSON.stringify(putData, undefined, 2));
await dynamo.putItem(putData).promise();
};
Refactoring the sendSuccess Helper ๐ฎ
We will also make use of this opportunity to refactor the existing SendTaskSuccess statement into a dedicated function that we could efficiently.
const funcSuccess: any = (res: any, mid: string, token: string) => {
console.log("sending success ", { res });
const sendSuccess: StepFunctions.SendTaskSuccessInput = {
output: JSON.stringify({
statusCode: 200,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: mid,
ProcessorResult: res,
},
}),
taskToken: token,
};
const resultStatus = sfn.sendTaskSuccess(
sendSuccess,
(err: any, data: any) => {
if (err) console.log(err, err.stack);
else console.log(data);
}
);
console.log("sent success: ", { resultStatus, sendSuccess });
};
Refractoring the sendFailure Helper ๐
Similarly, we will also refactor and create sendTaskFailure module into a new helper function.
const funcFailure: any = (err: any, mid: string, token: string) => {
console.log("sending failure ", { err });
const sendFailure: StepFunctions.SendTaskFailureInput = {
error: JSON.stringify(err),
cause: JSON.stringify({
statusCode: 500,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: mid,
ProcessorResult: err,
},
}),
taskToken: token,
};
const resultStatus = sfn.sendTaskFailure(
sendFailure,
(err: any, data: any) => {
if (err) console.log(err, err.stack);
else console.log(data);
}
);
console.log("sent failure: ", { resultStatus, sendFailure });
};
Granting read Object access to lambda ๐
In our previous article we have granted access to the entry handler to putObject, here we will grant access to the recorder function to read data as follows.
stgMsgBucket.grantWrite(eventCounterBus.handler);
stgMsgBucket.grantRead(messageRecorder);
Changes inside the lambda ๐
The below changes will be used inside the lambda handler to read every message using the event.detail
content which now has the bucket name and the key from the event.
Here can find that the bucket name and object key is extracted from the message and used to retrieve an object from S3, then it is put into dynamodb for recording using the various helper functions created above.
await Promise.all(
event.Records.map(async (Record: any) => {
console.log("Received message:", JSON.stringify(event, undefined, 2));
const msg = JSON.parse(Record.body).Record;
const s3Get = await s3
.getObject({
Bucket: msg.bucket,
Key: msg.key,
})
.promise();
const data = s3Get.Body?.toString("utf-8");
if (data) {
msg.event = data;
const token = JSON.parse(Record.body).MyTaskToken;
await dbPut(Record, msg)
.then(async (data: any) => {
await funcSuccess(data, msg.messageId, token);
})
.catch(async (err: any) => {
await funcFailure(err, msg.messageId, token);
});
}
})
);
Once we get the message, we are updating the message object with the message content and inserting it into dynamodb with the dbPut
we have refracted earlier.
Removing the event content from statemachine invocation payload ๐ญ
Now we can remove the usage of the actual message body across the entire pipeline, which makes it reduce the storage used in transmission call during various invocations call.
const sfnTaskPayload = sfn.TaskInput.fromObject({
MyTaskToken: sfn.JsonPath.taskToken,
Record: {
"messageId.$": "$.id",
"createdAt.$": "$.time",
// "event.$": "States.StringToJson($.detail.message)",
// "event.$": "$.detail.message",
"bucket.$": "$.detail.message.bucket",
"key.$": "$.detail.message.key"
},
});
Also in the entry handler function discussed in the last article as shown below
const message = JSON.parse(event.body);
message.uuid = getUuid();
message.handler = context.awsRequestId;
message.key = `uploads/${message.uuid}.json`;
message.bucket = process.env.BucketName || "";
console.log("Initial request:", JSON.stringify(message, undefined, 2));
delete message.message; //new line added, since s3 will have the data
Integration testing ๐
Keypoint async-await ๐
One important thing you may have to learn here would be how we have handled the async-await, to fetch from S3 first and then write to dynamodb, among other async operations.
Beginners most likely may go wrong in using async-await and they may mesh up by getting execution leaks when they implemented callbacks poorly.
By then the trace below can help you understand where the problem is present.
We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.
โญ We have our next article in serverless, do check out
๐ Thanks for supporting! ๐
Would be great if you like to โ Buy Me a Coffee, to help boost my efforts.
๐ Original post at ๐ Dev Post
๐ Reposted at ๐ dev to @aravindvcyber
๐ AWS CDK 101 - ๐ฌ Fetching JSON from dynamodb vs S3 through stepfunction @hashnode
โ Aravind V (@Aravind_V7) May 8, 2022
Checkout the full collectionhttps://t.co/CuYxnKr0Ig