Building a real-time data pipeline using Serverless and Kinesis Streams
What is a real-time data pipeline?
At the end of this post we are going to deploy a real-time data pipeline that can support 3,000 PUTs per second at an input rate of 3 MB/sec and output rate of 6 MB/sec.
Before we dive into the nuts and bolts of building a real-time data pipeline using serverless, we should define what we mean. To do that, it’s helpful to step back from the serverless and infrastructure pieces all together.
In a real-time use case, we are generally thinking of data streaming in. Meaning as the data gets created, it streams to the common location where others can read it.
Once data is sent it needs to be land somewhere, or put another way, it needs to enter our data pipeline. Think of the client sending data as the starting point for our pipeline. The receiver of that data is the underlying message stream or buffer that our applications are going to read from. The applications that read the data from this stream are the services that are going to perform business logic.
When we say real-time data pipeline, we are thinking about writing data as fast as possible to a common message stream. That data is then read as fast as the data is coming in by applications or services in our architecture.
A few use case examples where real-time data processing is powerful.
- Internet of Things (IOT) devices that emit data in real-time that should be analyzed.
- Website or App clickstreams to determine user behaviors.
- Stock market tickers where price changes need to be handled in near real-time.
These are some common use cases where we see real-time data pipelines come up. But, this list is not exhaustive. The example we are going to work through applies to any application or service that is looking to process data as fast as possible.
Kinesis Streams for our streaming data
In this post, we are going to take advantage of Kinesis Data Streams as our streaming data service. We are going to create a new stream that our real-time applications can write their data to. Our consumer services can then read the data from the stream as it is coming in to perform their own operations on it.
Before we dive into creating our stream let’s talk about the terminology that surrounds Kinesis. Think of each definition as you read down as working our way up the stack to the higher-level concept.
- Services that write data to our stream are often called producers. The application services that read data off our stream we often call consumers.
- In Kinesis, a stream is a grouping of shards that we can continuously stream data to.
- Think of a shard as a lane on a freeway where our stream is all the lanes of the freeway. It is an append-only log that contains a sequence of data records ordered by their arrival time. A single shard can ingest up to 1000 records per second.
- A data record is a record that a data producer is going to write to our stream. This record consists of a sequence number, partition key, and a blob of data that can be up to 1 MB in size. The partition key is a key that producers of data are going to send to spread data out across the shards in our stream. The sequence number is a value-added by Kinesis when records are written to the stream.
We are going to be using this terminology as we start building up our example. Keep in mind that a stream is nothing more than a grouping of shards. It’s analogous to a freeway being a collection of lanes.
Our Simple Real-Time Pipeline
To get started with our simple real-time data pipeline we are going to need to take care of a few prerequisites. We are going to be using the Serverless Framework to provision our example Lambda function that reads from the Kinesis stream and our stream itself. Make sure you get the framework installed by following the guide here.
With the framework installed, we can set up our serverless.yml
to provision our Lambda function that will be invoked by our Kinesis stream.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | service: name: serverless-kinesis-demo custom: webpack: webpackConfig: ./webpack.config.js includeModules: true # Add the serverless-webpack plugin plugins: - serverless-webpack provider: name: aws runtime: nodejs10.x environment: AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1 iamManagedPolicies: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" iamRoleStatements: - Effect: 'Allow' Action: - 'kinesis:PutRecord' - 'kinesis:GetRecord' Resource: Fn::GetAtt: - OurStream - Arn functions: ingestion: handler: handler.ingestStream events: - stream: type: kinesis arn: Fn::GetAtt: - OurStream - Arn resources: Resources: OurStream: Type: AWS::Kinesis::Stream Properties: Name: OurStream RetentionPeriodHours: 24 ShardCount: 3 |
Here we see our ingestion
function defined. It has an event source of stream
where we reference our Kinesis stream defined further down. Our stream is defined in the resources
section using CloudFormation syntax. For our simple pipeline, we have a basic Kinesis stream with three shards and a retention period of 24 hours.
Before we actually provision our initial data pipeline, let’s update our handler.ts
to read from the stream and print out some data.
1 2 3 4 5 6 7 8 | import { KinesisStreamHandler } from 'aws-lambda'; import 'source-map-support/register'; export const ingestStream: KinesisStreamHandler = async (event, _context) => { event.Records.forEach(record => { console.log(record.kinesis.data); }); } |
Our Lambda function is going to be triggered by our stream as data comes in. By default, Kinesis invokes our function as soon as data is available in the stream. This means that even if our batch contains a single record, it is still going to invoke our function.
Let’s get this initial pipeline provisioned in our AWS account so we can see it in action.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | $ serverless deploy
Serverless: Bundling with Webpack...
Time: 1217ms
Built at: 2019-11-10 12:20:39
Serverless: Stack update finished...
Service Information
service: serverless-kinesis-demo
stage: dev
region: us-east-1
stack: serverless-kinesis-demo-dev
resources: 8
api keys:
None
endpoints:
None
functions:
ingestion: serverless-kinesis-demo-dev-ingestion
layers:
None
|
Now that our initial data pipeline configured, how can we test it? Well, we can create another Lambda function that pushes data into our stream.
Simulating real-time data
To simulate feeding real-time data into our Kinesis stream we are going to create another Lambda function that writes data to our stream. This allows us to have a method of testing that our downstream consumers are reading from the stream and taking their own actions.
First, we need to define another function inside of our serverless.yml
file below. Our functions
section of the template should now look like this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | functions: ingestion: handler: handler.ingestStream events: - stream: type: kinesis arn: Fn::GetAtt: - OurStream - Arn realtimeData: handler: handler.feedStream timeout: 120 environment: KINESIS_STREAM: OurStream |
Here we have a new function, realtimeData
. It calls the handler feedStream
that we are going to add in a moment. This new function has a timeout of two minutes. Every time we invoke it we will write data to our stream for up to two minutes. The last thing to call out here is the environment variable we are adding to our function, KINESIS_STREAM
. This has the name of the Kinesis stream defined down in our resources
section so we know which one to write to.
Now, let’s update handler.ts
to have our new function handler for the realtimeData
Lambda.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | export const ingestStream: KinesisStreamHandler = async (event, _context) => { event.Records.forEach(record => { console.log(record.kinesis.data); }); } export const feedStream: Handler = async (_, _context: Context) => { const stream = process.env.KINESIS_STREAM; const client = new Kinesis({region: 'us-east-1'}); let loopIter = 0 while(_context.getRemainingTimeInMillis() > 30) { loopIter++; const partKey = `loop_${loopIter}`; const payload = JSON.stringify({ 'event_x': Math.random() * 10, 'event_y': Math.random() * 20 }); let resp = client.putRecord({ StreamName: stream, PartitionKey: partKey, Data: payload }); console.log(resp); } } |
We see in our feedStream
handler that we are going to write data objects that consist of a random x and y value inside of an object. We will keep putting records into our Kinesis stream until we are almost out of time in our invocation. Let’s go ahead and deploy this second Lambda function to our account.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | $ sls deploy
Serverless: Stack update finished...
Service Information
service: serverless-kinesis-demo
stage: dev
region: us-east-1
stack: serverless-kinesis-demo-dev
resources: 11
api keys:
None
endpoints:
None
functions:
ingestion: serverless-kinesis-demo-dev-ingestion
realtimeData: serverless-kinesis-demo-dev-realtimeData
layers:
None
|
Now that we have both of our Lambda functions provisioned let’s invoke the realtimeData
function manually and confirm that triggers our ingestion
function from the Kinesis stream. We can also invoke our function from the AWS Console under the Lambda service. We would just use the default test event and invoke our function that way.
1 | $ aws lambda invoke --function-name serverless-kinesis-demo-dev-realtimeData response.json |
After invoking our real-time data simulator function we should be able to look at the CloudWatch logs for that function. In the logs we see messages like this one:
1 2 3 4 | { ShardId: 'shardId-000000000001', SequenceNumber: '49601280698984557860252853423873190438335673992564703250' } |
This log message shows that our record was written to our stream, specifically it was written to the shard id 000000000001. Now if we look at the logs for our ingestion function in CloudWatch we should see that the record triggered the function as expected.
1 | INFO eyJldmVudF94Ijo3Ljc4NDIzMjk0MDY0MjI4MywiZXZlbnRfeSI6Ni42ODgxODAwNzA0NjQxMTh9 |
It looks like our function was invoked as expected, but why is the record data a bunch of garbage? This is because PutRecord
base64 encodes the data that is written to the stream so that it’s simpler to send binary data through the stream. So if we base64 decode the record data above we should see the true output that was written to the stream.
1 2 3 | $ echo eyJldmVudF94Ijo3Ljc4NDIzMjk0MDY0MjI4MywiZXZlbnRfeSI6Ni42ODgxODAwNzA0NjQxMTh9 | base64 -d {"event_x":7.784232940642283,"event_y":6.688180070464118} |
There we go, we now have our original event that was written to the stream. Depending on what our real-time data pipeline is operating on, we could do the base64 decode right in our code. We will leave that for a future exercise as it may or may not fit into your use case.
Where we can go from here
We now have a functioning real-time data pipeline. It demonstrates some of the power of Kinesis data streams for very fast writes and reads. We are currently simulating the real-time data via a Lambda function that is writing data to the stream in a loop. We then have another function that is reading the data from that stream and printing it to the logs.
This is a basic demo, but it highlights the throughput we can get from Kinesis on both the data producer and consumer side of things. But where can this type of architecture go from here?
When it comes to Kinesis data streams our throughput is determined by the number of shards our stream has. This is the basic unit we can use to reason about how fast we can write data to our stream and how fast we can read it off. With our current configuration of three shards, we can support 3,000 PUTs per second at an input rate of 3 MB/sec and output rate of 6 MB/sec. This is because a single shard can support 1,000 PUTs, 1 MB/sec input data, and 2 MB/sec output data.
When it comes to thinking about where we can go next, we need to think about what performance our use case requires. For example, imagine we had 100 IoT devices that needed to write to this stream. Each device writes 1 KB of data every tenth of a second, so we make 10 PUT requests with 10 KB in a second. This means across all 100 devices, we are going to make 1000 PUT requests with 1000 KB or 1 MB of data every second.
In this architecture, we could reason that our Kinesis stream is over-provisioned. In essence, we could scale down the number of shards allocated to our stream from three to one.
What this example shows is that taking our real-time data pipeline to the next level requires thinking about performance. To leverage Kinesis streams to their full potential requires a back of the napkin idea of how much data is going to get written to the stream per second.
There are a lot of strategies we can use to make efficient use of Kinesis as well. For example, we can make use of Application Auto Scaling to scale up and down the number of shards our stream has. This allows us to recognize that our back of the napkin calculations from above could be a best guess. By scaling our shards we give ourselves a bit of room to grow and shrink as our workloads change.
We can also optimize things on the consumer side. The stream can support enhanced fan-out consumers which allows more than one consumer to read from the stream in parallel. This allows us to add consumers and still maintain our desired output throughput. Our earlier example consisted of a single consumer that was reading from the stream. But, in a true real-time data pipeline we could have many consumers reading the data from the stream.
Conclusion
A real-time data pipeline is an architecture that demands high throughput writes and reads. We need to be able to write a lot of data very fast so that our consumers can quickly read it off and take action. This architecture requires an underlying infrastructure that can support that level of throughput.
Thus, we often think of infrastructure that supports streaming data to consumers. As we saw in this post, Kinesis data streams give us exactly that. We can provision a stream with N number of shards which allows us to write a lot of data very quickly. Our consumers can then read that data off the stream very quickly and in parallel.
This type of architecture works great for all kinds of real-time operations. Application click streams, IoT device streams, video game insights and debug streams are a few examples. If we need to stream a lot of critical data fast, Kinesis works great for this. If we are OK with batching that data up and shipping it off on some kind of timer, a data stream might be overkill for your scenario.
I hope that this post has given you a better understanding of how you can build a serverless real-time data pipeline using Kinesis and Serverless Framework. By no means is this the only way you can build a pipeline of this nature, but it’s at least a place to start and get familiar with the concept.
If you have any questions about serverless, data pipelines, or anything related to Amazon Web Services, hit me up on Twitter @kylegalbraith or drop a message below. Also checkout my weekly Learn by Doing newsletter or my Learn AWS By Using It course to learn even more about the cloud, coding, and DevOps.
If you have questions about CloudForecast and how it can help your organization monitor AWS cost, feel free to ping Tony: [email protected]
Want to try CloudForecast? Sign up today and get started with a risk-free 30 day free trial. No credit card required.
Manage, track, and report your AWS spending in seconds — not hours
CloudForecast’s focused daily AWS cost monitoring reports to help busy engineering teams understand their AWS costs, rapidly respond to any overspends, and promote opportunities to save costs.
Monitor & Manage AWS Cost in Seconds — Not Hours
CloudForecast makes the tedious work of AWS cost monitoring less tedious.
AWS cost management is easy with CloudForecast
We would love to learn more about the problems you are facing around AWS cost. Connect with us directly and we’ll schedule a time to chat!