Example – database-first event sourcing
This example builds on the cloud-native database and event stream examples and pieces them together to demonstrate the publishing side of the Database-First Variant. As depicted in the diagram at the top of the pattern, a command writes to the database (AWS DynamoDB) and a stream processor (AWS Lambda) consumes the database events, transforms them to domain events, and then publishes the domain events to the event stream (AWS Kinesis). The following fragment from a Serverless Framework serverless.yml file demonstrates provisioning a function to be triggered by a cloud-native database, such as AWS DynamoDB. It is very similar to consuming from an event stream. This also shows configuring environment variables, such as the target event stream name, to parameterize the execution of the function.
functions:
trigger:
handler: handler.trigger
events:
- stream:
type: dynamodb
arn:
Fn::GetAtt:
- Table
- StreamArn
batchSize: 25
startingPosition: TRIM_HORIZON
environment:
STREAM_NAME:
Ref: Stream
Next, we have the example of the stream processor itself. This example builds on the example in the event streaming pattern and leverages the functional reactive programming model to process a micro-batch of cloud-native database events. First, we map the database event to a domain event that can be published downstream without exposing the details of how the component is implemented. Thus, we can change our design decisions and dispose of the internal architecture of a component without effecting downstream components. We also include all the available context information in the event, such as the old and new versions of the data, so that consumers have the optimal amount of information to work with. We also use the entity's ID, which is a version 4 UUID, as the partition key, for the scalability reasons discussed in the Event Streaming pattern. Next, we wrap and batch the events for publication to the stream. Finally, the function callback is invoked once all the events in the micro-batch have flowed through the pipeline. If the function fails, it will retry to ensure that our database transaction ultimately results in the domain event being atomically published to the stream. We will discuss error handling in the Stream Circuit Breaker pattern.
export const trigger = (event, context, cb) => {
_(event.Records)
.map(toEvent)
.map(toRecord)
.batch(25)
.flatMap(publish)
.collect().toCallback(cb);
};
const toEvent = record => ({
id: uuid.v1(),
type: `item-${record.eventName.toLowerCase()}`,
timestamp: record.dynamodb.ApproximateCreationDateTime,
partitionKey: record.dynamodb.Keys.id.S,
item: {
old: record.dynamodb.OldImage,
new: record.dynamodb.NewImage,
},
});
const toRecord = event => ( {
PartitionKey: event.partitionKey,
Data: new Buffer(JSON.stringify(event)),
})
const publish = records => {
const params = {
StreamName: process.env.STREAM_NAME,
Records: records
};
const kinesis = new aws.Kinesis();
return _(kinesis.putRecords(params).promise());
}