Serverless Event Sourcing and CQRS with Next.js and Upstash
Microservices are a widespread software architecture, and with serverless technologies that allow for granular deployments, they have become even more critical. Event sourcing and command query responsibility segregation (CQRS) are architectural patterns that help you to get the most out of your microservices.
What is Event Sourcing?
The primary idea of event sourcing is to think of all operations in your system as events. You then save the events that modify your system state in an event store instead of applying their changes immediately. Other services will then load the events from the store and apply the changes asynchronously.
The event store is a ledger that keeps track of everything that happened to your system. You don’t delete anything; you save a delete event. This way, services you add to the system can recreate their internal state by reading the events. If someone accidentally deleted or overrode data, it’s still in the event store. You have exact timestamps for every interaction with your system.
What is CQRS?
CQRS is an alternative to the create, read, update, and delete (CRUD) model. Instead of having one data model for all your operations, you create models for your commands (the writing operations ) and queries (the reading operations ). This gives you more flexibility in modeling your data and also simplifies the use of event sourcing. Every command becomes an event that goes to your event store. Every query reads from a database with the latest state created by applying events. Often these databases are called projections.
We will build a simple to-do list to learn how you can apply these patterns in a serverless way.
Features
The to-do list will have an API that allows users to create, read, update, and delete tasks.
- The writing operations land in an event store.
- The event store will notify other services about new events so that they can update their state.
- The reading operations hit the projection store.
- The projection store listens to event notifications and calculates its state from the events.
Technology
The bread and butter of this system are Upstash services. This way, everything is serverless, and the system scales automatically.
- Upstash Kafka as an event store
- Upstash Redis as a projection store
- QStash as a notification service
Additionally, we will use Next.js with Apollo GraphQL to build the gateway to our serverless system.
Note: Upstash Kafka’s free tier only persists events for 7 days; in a production environment, you can use unlimited persistence of a paid account to prevent events from deletion.
Prerequisites
You need an Upstash account to create the required databases, but the free tier is enough for this tutorial. You also need an up-to-date installation of Node.js. You can install everything else via NPM.
I also recommend using GitHub Codespaces to run the example because you need a public hostname for QStash.
Getting the Full Code
This article will only explain the crucial parts of the system. To run the example, clone the repository from GitHub before you move to the Functional Steps section.
Implementation
Let’s look at the architecture of the whole system first. Figure 1 illustrates how the Upstash services and our serverless functions work together.
Figure 1: Serverless CQRS and event store architecture
The first function handles client requests with a GraphQL API. GraphQL queries handle the reading queries of the CQRS pattern, and GraphQL mutations handle the writing commands.
Let’s check out the GraphQL schema:
...
type Task {
id: String!
text: String!
done: Boolean!
}
type Query {
list: [Task]
}
type Mutation {
createTask(text: String!): Task
updateTask(id: String!, text: String!, done: Boolean!): Task
deleteTask(id: String!): String
}
...
...
type Task {
id: String!
text: String!
done: Boolean!
}
type Query {
list: [Task]
}
type Mutation {
createTask(text: String!): Task
updateTask(id: String!, text: String!, done: Boolean!): Task
deleteTask(id: String!): String
}
...
GraphQL schemas require separate types for queries and mutations, we get one model for read access (Query
) and one for write access (Mutation
) out of the box.
The query resolvers use ProjectionStore
, a TypeScript class that works as a client to Upstash Redis.
The mutation resolvers use EventStore
, a TypeScript class that works as a client to Upstash Kafka.
The EventStore
will call a Notificator
when it writes a new event to Upstash Kafka. This Notifier
is a TypeScript class that acts as a client to QStash.
Our stores are just abstractions over Upstash services.
The second function gets notified by QStash when new events are stored in Upstash Kafka. It will read the new events via EventStore
and process and write them via ProjectionStore
. The system's clients never use this function; it just runs in the background.
Implementing the Commands
We will start by implementing the commands of CQRS with GraphQL mutation resolvers. Let’s look at the following code:
...
export const resolvers = {
Query: {...},
Mutation: {
async createTask(_: unknown, input: CreateTaskInput) {
const newTask = {
...input,
id: String(Date.now() + Math.random()),
done: false,
}
await eventStore.saveEvent({
type: "create-task",
data: newTask
})
return newTask
},
async updateTask(_: unknown, input: UpdateTaskInput) {
await eventStore.saveEvent({
type: "update-task",
data: input
})
return { ...input }
},
async deleteTask(_: unknown, input: DeleteTaskInput) {
await eventStore.saveEvent({
type: "delete-task",
data: { id: input.id, text: "", done: false },
})
return input.id
},
},
}
...
...
export const resolvers = {
Query: {...},
Mutation: {
async createTask(_: unknown, input: CreateTaskInput) {
const newTask = {
...input,
id: String(Date.now() + Math.random()),
done: false,
}
await eventStore.saveEvent({
type: "create-task",
data: newTask
})
return newTask
},
async updateTask(_: unknown, input: UpdateTaskInput) {
await eventStore.saveEvent({
type: "update-task",
data: input
})
return { ...input }
},
async deleteTask(_: unknown, input: DeleteTaskInput) {
await eventStore.saveEvent({
type: "delete-task",
data: { id: input.id, text: "", done: false },
})
return input.id
},
},
}
...
As we saw in the schema before, we need three resolvers:
createTask
constructs a new task object, puts it into an event object, and sends it to theEventStore
.updateTask
puts the resolvers input into a new event object and sends it to theEventStore
.deleteTask
puts theinput.id
into a new event object and sends it to theEventStore
.
Besides the task construction in createTask
, there is not much logic in these resolvers. Everything is handed off to the EventStore
. So, let’s look at the saveEvent
method of our EventStore
.
...
async saveEvent(event: Omit<TaskEvent, "offset">) {
await fetch(
this.producerUrl + JSON.stringify(event),
this.fetchConfig
)
await notify()
}
...
...
async saveEvent(event: Omit<TaskEvent, "offset">) {
await fetch(
this.producerUrl + JSON.stringify(event),
this.fetchConfig
)
await notify()
}
...
Nothing fancy here either; just sending a GET
request with our new event to Upstash Kafka and calling notify
. For completeness, let’s also look at the implementation of the notify
function.
...
export const notify = async () => {
const qstashClient = new Client({
token: process.env.QSTASH_TOKEN!
})
await qstashClient.publish({
topic: process.env.QSTASH_TOPIC!,
body: String(Math.random()),
headers: {
content: "application/json",
},
})
}
...
...
export const notify = async () => {
const qstashClient = new Client({
token: process.env.QSTASH_TOKEN!
})
await qstashClient.publish({
topic: process.env.QSTASH_TOPIC!,
body: String(Math.random()),
headers: {
content: "application/json",
},
})
}
...
Since QStash handles all the work, there is not much to see here either. While that might not be exciting, it shows how little code we must write to set up the whole thing.
Implementing the Query
There is just one query resolver, and it lists all the tasks.
...
export const resolvers = {
Query: {
async list() {
return await projectionStore.loadTasks()
},
},
Mutation: {...},
}
...
...
export const resolvers = {
Query: {
async list() {
return await projectionStore.loadTasks()
},
},
Mutation: {...},
}
...
The resolver calls loadTasks
on the projectionStore
, which handles the work. The projectionStore
uses Upstash Redis to store the state calculated from all the events we put in Upstash Kafka. Let’s look at the loadTasks
method.
...
async loadTasks(): Promise<Task[]> {
const taskIds = await this.redisClient.smembers("tasks")
const pipeline = this.redisClient.pipeline()
for (let taskId of taskIds) pipeline.get(taskId)
return await pipeline.exec<Task[]>()
}
...
...
async loadTasks(): Promise<Task[]> {
const taskIds = await this.redisClient.smembers("tasks")
const pipeline = this.redisClient.pipeline()
for (let taskId of taskIds) pipeline.get(taskId)
return await pipeline.exec<Task[]>()
}
...
First, we load the taskIds
, which are stored in a Redis Set. Then we use each ID to fetch the corresponding task. A pipeline ensures we send all get commands in one request to Upstash.
Implementing the Projection
Now that the GraphQL resolvers are ready, we must calculate our read state. We need to read all events from our EventStore
and execute them on our ProjectionStore
. This is done in a new serverless function that our notify
function notifies via QStash.
The handler of our serverless function is quite simple:
...
export const POST = validate(async (request: Request) => {
const offset = await projectionStore.loadOffset()
const events = await eventStore.loadEvents(offset)
await projectionStore.processEvents(events)
return new Response()
})
...
...
export const POST = validate(async (request: Request) => {
const offset = await projectionStore.loadOffset()
const events = await eventStore.loadEvents(offset)
await projectionStore.processEvents(events)
return new Response()
})
...
If the request came from QStash, we would load the last offset
stored in the projectionStore
to know which events aren’t processed yet. Then we load the latest events from the eventStore
and feed them to the projectionStore
so it can calculate the new state.
The function handler is wrapped in a validate
function from our notificator
, which ensures only QStash can call this endpoint.
...
export const validate = (handler: RequestHandler) => {
const qstashReceiver = new Receiver({
currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
})
return async (request: Request) => {
const isValid = await qstashReceiver.verify({
signature: request.headers.get("upstash-signature")!,
body: await request.text(),
})
if (!isValid) {
return new Response("Unauthorized", {
status: 401,
statusText: "Unauthorized",
})
}
return await handler(request)
}
}
...
...
export const validate = (handler: RequestHandler) => {
const qstashReceiver = new Receiver({
currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
})
return async (request: Request) => {
const isValid = await qstashReceiver.verify({
signature: request.headers.get("upstash-signature")!,
body: await request.text(),
})
if (!isValid) {
return new Response("Unauthorized", {
status: 401,
statusText: "Unauthorized",
})
}
return await handler(request)
}
}
...
The validate
function uses the QStash signing keys to check every request before calling the actual handler of an endpoint.
The loadEvents
method of our EventStore
looks like this:
...
async loadEvents(offset: number): Promise<TaskEvent[]> {
const response = await fetch(this.fetchUrl, {
...this.fetchConfig,
method: "POST",
body: JSON.stringify({
topic: process.env.UPSTASH_KAFKA_TOPIC,
partition: 0,
offset,
}),
})
const messages: {value: string; offset: number }[] =
await response.json()
return messages.map(({ value, offset }) => {
const event = JSON.parse(value)
return { ...event, offset }
})
}
...
...
async loadEvents(offset: number): Promise<TaskEvent[]> {
const response = await fetch(this.fetchUrl, {
...this.fetchConfig,
method: "POST",
body: JSON.stringify({
topic: process.env.UPSTASH_KAFKA_TOPIC,
partition: 0,
offset,
}),
})
const messages: {value: string; offset: number }[] =
await response.json()
return messages.map(({ value, offset }) => {
const event = JSON.parse(value)
return { ...event, offset }
})
}
...
The method expects an offset
, to ensure that only new events are loaded.
Background: Every message in Kafka gets an offset number. This number is incremented with each new message. These offsets are used to prevent the double processing of a message. Which is exactly what we want to do.
We fetch
all messages
since the offset
and extract the events from them.
Finally, the processEvents
method of the projectionStore
will calculate our state for the query resolver:
...
async processEvents(events: TaskEvent[]) {
if (events.length === 0) return
const pipeline = this.redisClient.pipeline()
for (let event of events) {
switch (event.type) {
case "create-task":
pipeline.set(event.data.id, event.data)
pipeline.sadd("tasks", event.data.id)
break
case "update-task":
pipeline.set(event.data.id, event.data)
break
case "delete-task":
peline.srem("tasks", event.data.id)
pipeline.del(event.data.id)
break
}
}
pipeline.set("lastOffset", offset)
await pipeline.exec()
}
...
...
async processEvents(events: TaskEvent[]) {
if (events.length === 0) return
const pipeline = this.redisClient.pipeline()
for (let event of events) {
switch (event.type) {
case "create-task":
pipeline.set(event.data.id, event.data)
pipeline.sadd("tasks", event.data.id)
break
case "update-task":
pipeline.set(event.data.id, event.data)
break
case "delete-task":
peline.srem("tasks", event.data.id)
pipeline.del(event.data.id)
break
}
}
pipeline.set("lastOffset", offset)
await pipeline.exec()
}
...
Again, we use the pipeline feature of Upstash Redis to ensure we only send one request with all Redis commands.
We will add at least one command to the pipeline. The create-tasks
and delete-task
event requires an update to the Redis Set that stores our ID, so it will use two commands. Ultimately, we add the last event offset
so our projectionStore
remembers where it left off when the next notification arrives.
The pipeline ensures we only send one request to Upstash, but it also puts the commands in a transaction, which ensures that we only execute all or none of them. This way, we can be sure the lastOffset
only gets updated when we really updated the state.
Functional Steps
Make sure you cloned the whole example from GitHub before moving on.
In the next steps, we will set up the infrastructure for our system. We will use Upstash services, so you need an Upstash account for this, but we only use the free tier here.
We will write all Upstash credentials into a .env.local
file, so they are available in the environment variables of our development machine. Here is a template for this file:
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...
UPSTASH_KAFKA_URL=...
UPSTASH_KAFKA_TOPIC=...
UPSTASH_KAFKA_USERNAME=...
UPSTASH_KAFKA_PASSWORD=...
QSTASH_URL=...
QSTASH_TOPIC=...
QSTASH_TOKEN=...
QSTASH_CURRENT_SIGNING_KEY=...
QSTASH_NEXT_SIGNING_KEY=...
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...
UPSTASH_KAFKA_URL=...
UPSTASH_KAFKA_TOPIC=...
UPSTASH_KAFKA_USERNAME=...
UPSTASH_KAFKA_PASSWORD=...
QSTASH_URL=...
QSTASH_TOPIC=...
QSTASH_TOKEN=...
QSTASH_CURRENT_SIGNING_KEY=...
QSTASH_NEXT_SIGNING_KEY=...
Creating an Upstash Redis Database
- Go to the Upstash console
- Click the “Create Database” button
- Fill out the form shown in figure 2
- Click “Create”
Figure 2: Create Database
After creating the database, you can scroll down to the “REST API” section and copy the UPSTASH_REDIS_REST_UR
L and UPSTASH_REDIS_REST_TOKEN
to the .env.local
file.
Creating an Upstash Kafka Database
- Open the Upstash Kafka Console
- Click the “Create Cluster” button
- Fill out the form shown in figure 3
- Click “Create Cluster”
- Fill out the form shown in figure 4
- Click “Create Topic”
Figure 3: Create cluster
Figure 4: Create topic
After you created the database, scroll to the “REST API” section and copy the UPSTASH_KAFKA_URL
, UPSTASH_KAFKA_USERNAME
, and UPSTASH_KAFKA_PASSWORD
to your .env.local
file. Also, add the topic name “events” as UPSTASH_KAFKA_TOPIC
to this file.
Creating an Upstash QStash Queue
- Open the QStash Console
- Navigate to the “Topics” tab
- Click the “Create Topic” button
- Enter “events” as the new topics name
- Click “Create”
- Select the “Add new URL” input
- Write your projection URL like seen in figure 5
- Click “Add”
Figure 5: Add new topic URL
Copy the QSTASH_URL
, QSTASH_TOKEN
, QSTASH_CURRENT_SIGNING_KEY
, and QSTASH_NEXT_SIGNING_KEY
from the “Details” tab (seen in figure 6) to your .env.local
file. Also, add QSTASH_TOPIC
with “events” to the file.
Figure 6: QStash credentials
Testing the System
Now that we implemented and configured everything, we can test the system with Apollo’s GraphQL UI, which Next.js automatically starts on the /graphql
endpoint.
Start the server with the following commands:
$ npm i $ npm run dev
If you’re running on GitHub Codespaces, you must make the server's port public, as seen in figure 7, to Apollo GraphQL UI can read the schema from the server.
Figure 7: Port configuration
Creating a New Task
Command:
mutation {
createTask(text: "Do the dishes!") {
id
text
done
}
}
mutation {
createTask(text: "Do the dishes!") {
id
text
done
}
}
Console output:
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] No offset found! Starting from 1.
[EventStore] Loading events at offset 1
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Creating new task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] No offset found! Starting from 1.
[EventStore] Loading events at offset 1
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Creating new task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
Listing all Tasks
Query:
query {
list {
id
text
done
}
}
query {
list {
id
text
done
}
}
Console Output:
[ProjectionStore] Loading tasks...
[ProjectionStore] 1 tasks loaded!
[ProjectionStore] Loading tasks...
[ProjectionStore] 1 tasks loaded!
Updating an Existing Task
Command:
mutation {
updateTask(id: "...", text: "Do the dishes!", done: true) {
id
text
done
}
}
mutation {
updateTask(id: "...", text: "Do the dishes!", done: true) {
id
text
done
}
}
Console Output:
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 2
[EventStore] Loading events at offset 2
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Updating task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 2
[EventStore] Loading events at offset 2
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Updating task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
Deleting a Task
Command:
mutation {
deleteTask(id: "...")
}
mutation {
deleteTask(id: "...")
}
Console Output:
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 3
[EventStore] Loading events at offset 3
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Deleting task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 3
[EventStore] Loading events at offset 3
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Deleting task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
Summary
Applying the CQRS and Event-Sourcing patterns to serverless computing can help to structure your backends. This makes them more flexible in terms of data modeling and scalability.
Upstash services are ideally suited for this task; they bring all you need. You can store your events in Upstash Kafka, your projected state in Upstash Redis, and let QStash ensure all your services know when updates happen.
Next Steps
The resolvers all run in one function, which makes scaling them independently impossible. But since resolvers are asynchronous in nature, you could move the actual work in a separate API endpoint, so the resolvers just have to redirect. This way, you can scale each resolver as required.