·14 min read

Serverless Event Sourcing and CQRS with Next.js and Upstash

Kay PlößerKay PlößerDeveloper and Author (Guest Author)

Microservices are a widespread software architecture, and with serverless technologies that allow for granular deployments, they have become even more critical. Event sourcing and command query responsibility segregation (CQRS) are architectural patterns that help you to get the most out of your microservices.

What is Event Sourcing?

The primary idea of event sourcing is to think of all operations in your system as events. You then save the events that modify your system state in an event store instead of applying their changes immediately. Other services will then load the events from the store and apply the changes asynchronously.

The event store is a ledger that keeps track of everything that happened to your system. You don’t delete anything; you save a delete event. This way, services you add to the system can recreate their internal state by reading the events. If someone accidentally deleted or overrode data, it’s still in the event store. You have exact timestamps for every interaction with your system.

What is CQRS?

CQRS is an alternative to the create, read, update, and delete (CRUD) model. Instead of having one data model for all your operations, you create models for your commands (the writing operations ) and queries (the reading operations ). This gives you more flexibility in modeling your data and also simplifies the use of event sourcing. Every command becomes an event that goes to your event store. Every query reads from a database with the latest state created by applying events. Often these databases are called projections.

We will build a simple to-do list to learn how you can apply these patterns in a serverless way.

Features

The to-do list will have an API that allows users to create, read, update, and delete tasks.

  • The writing operations land in an event store.
  • The event store will notify other services about new events so that they can update their state.
  • The reading operations hit the projection store.
  • The projection store listens to event notifications and calculates its state from the events.

Technology

The bread and butter of this system are Upstash services. This way, everything is serverless, and the system scales automatically.

  • Upstash Kafka as an event store
  • Upstash Redis as a projection store
  • QStash as a notification service

Additionally, we will use Next.js with Apollo GraphQL to build the gateway to our serverless system.

Note: Upstash Kafka’s free tier only persists events for 7 days; in a production environment, you can use unlimited persistence of a paid account to prevent events from deletion.

Prerequisites

You need an Upstash account to create the required databases, but the free tier is enough for this tutorial. You also need an up-to-date installation of Node.js. You can install everything else via NPM.

I also recommend using GitHub Codespaces to run the example because you need a public hostname for QStash.

Getting the Full Code

This article will only explain the crucial parts of the system. To run the example, clone the repository from GitHub before you move to the Functional Steps section.

Implementation

Let’s look at the architecture of the whole system first. Figure 1 illustrates how the Upstash services and our serverless functions work together.

Figure 1: Serverless CQRS and event store architecture Figure 1: Serverless CQRS and event store architecture

The first function handles client requests with a GraphQL API. GraphQL queries handle the reading queries of the CQRS pattern, and GraphQL mutations handle the writing commands.

Let’s check out the GraphQL schema:

schema/typedefs.ts
...
type Task {
  id: String!
  text: String!
  done: Boolean!
}
type Query {
  list: [Task]
}
type Mutation {
  createTask(text: String!): Task
  updateTask(id: String!, text: String!, done: Boolean!): Task
  deleteTask(id: String!): String
}
...
schema/typedefs.ts
...
type Task {
  id: String!
  text: String!
  done: Boolean!
}
type Query {
  list: [Task]
}
type Mutation {
  createTask(text: String!): Task
  updateTask(id: String!, text: String!, done: Boolean!): Task
  deleteTask(id: String!): String
}
...

GraphQL schemas require separate types for queries and mutations, we get one model for read access (Query) and one for write access (Mutation) out of the box.

The query resolvers use ProjectionStore, a TypeScript class that works as a client to Upstash Redis.

The mutation resolvers use EventStore, a TypeScript class that works as a client to Upstash Kafka.

The EventStore will call a Notificator when it writes a new event to Upstash Kafka. This Notifier is a TypeScript class that acts as a client to QStash.

Our stores are just abstractions over Upstash services.

The second function gets notified by QStash when new events are stored in Upstash Kafka. It will read the new events via EventStore and process and write them via ProjectionStore. The system's clients never use this function; it just runs in the background.

Implementing the Commands

We will start by implementing the commands of CQRS with GraphQL mutation resolvers. Let’s look at the following code:

schema/resolvers.ts
...
export const resolvers = {
  Query: {...},
  Mutation: {
    async createTask(_: unknown, input: CreateTaskInput) {
      const newTask = {
        ...input,
        id: String(Date.now() + Math.random()),
        done: false,
      }
      await eventStore.saveEvent({
        type: "create-task",
         data: newTask
      })
      return newTask
    },
    async updateTask(_: unknown, input: UpdateTaskInput) {
      await eventStore.saveEvent({
        type: "update-task",
        data: input
      })
      return { ...input }
    },
    async deleteTask(_: unknown, input: DeleteTaskInput) {
      await eventStore.saveEvent({
        type: "delete-task",
        data: { id: input.id, text: "", done: false },
      })
      return input.id
    },
  },
}
...
schema/resolvers.ts
...
export const resolvers = {
  Query: {...},
  Mutation: {
    async createTask(_: unknown, input: CreateTaskInput) {
      const newTask = {
        ...input,
        id: String(Date.now() + Math.random()),
        done: false,
      }
      await eventStore.saveEvent({
        type: "create-task",
         data: newTask
      })
      return newTask
    },
    async updateTask(_: unknown, input: UpdateTaskInput) {
      await eventStore.saveEvent({
        type: "update-task",
        data: input
      })
      return { ...input }
    },
    async deleteTask(_: unknown, input: DeleteTaskInput) {
      await eventStore.saveEvent({
        type: "delete-task",
        data: { id: input.id, text: "", done: false },
      })
      return input.id
    },
  },
}
...

As we saw in the schema before, we need three resolvers:

  1. createTask constructs a new task object, puts it into an event object, and sends it to the EventStore.
  2. updateTask puts the resolvers input into a new event object and sends it to the EventStore.
  3. deleteTask puts the input.id into a new event object and sends it to the EventStore.

Besides the task construction in createTask, there is not much logic in these resolvers. Everything is handed off to the EventStore. So, let’s look at the saveEvent method of our EventStore.

utils/eventstore.ts
...
async saveEvent(event: Omit<TaskEvent, "offset">) {
  await fetch(
    this.producerUrl + JSON.stringify(event),
    this.fetchConfig
  )
  await notify()
}
...
utils/eventstore.ts
...
async saveEvent(event: Omit<TaskEvent, "offset">) {
  await fetch(
    this.producerUrl + JSON.stringify(event),
    this.fetchConfig
  )
  await notify()
}
...

Nothing fancy here either; just sending a GET request with our new event to Upstash Kafka and calling notify. For completeness, let’s also look at the implementation of the notify function.

utils/notificator.ts
...
export const notify = async () => {
  const qstashClient = new Client({
    token: process.env.QSTASH_TOKEN!
  })
  await qstashClient.publish({
    topic: process.env.QSTASH_TOPIC!,
    body: String(Math.random()),
    headers: {
       content: "application/json",
    },
  })
}
...
utils/notificator.ts
...
export const notify = async () => {
  const qstashClient = new Client({
    token: process.env.QSTASH_TOKEN!
  })
  await qstashClient.publish({
    topic: process.env.QSTASH_TOPIC!,
    body: String(Math.random()),
    headers: {
       content: "application/json",
    },
  })
}
...

Since QStash handles all the work, there is not much to see here either. While that might not be exciting, it shows how little code we must write to set up the whole thing.

Implementing the Query

There is just one query resolver, and it lists all the tasks.

schema/resolvers.ts
...
export const resolvers = {
  Query: {
    async list() {
      return await projectionStore.loadTasks()
    },
  },
  Mutation: {...},
}
...
schema/resolvers.ts
...
export const resolvers = {
  Query: {
    async list() {
      return await projectionStore.loadTasks()
    },
  },
  Mutation: {...},
}
...

The resolver calls loadTasks on the projectionStore, which handles the work. The projectionStore uses Upstash Redis to store the state calculated from all the events we put in Upstash Kafka. Let’s look at the loadTasks method.

utils/projectionstore.ts
...
async loadTasks(): Promise<Task[]> {
  const taskIds = await this.redisClient.smembers("tasks")
  const pipeline = this.redisClient.pipeline()
  for (let taskId of taskIds) pipeline.get(taskId)
  return await pipeline.exec<Task[]>()
}
...
utils/projectionstore.ts
...
async loadTasks(): Promise<Task[]> {
  const taskIds = await this.redisClient.smembers("tasks")
  const pipeline = this.redisClient.pipeline()
  for (let taskId of taskIds) pipeline.get(taskId)
  return await pipeline.exec<Task[]>()
}
...

First, we load the taskIds, which are stored in a Redis Set. Then we use each ID to fetch the corresponding task. A pipeline ensures we send all get commands in one request to Upstash.

Implementing the Projection

Now that the GraphQL resolvers are ready, we must calculate our read state. We need to read all events from our EventStore and execute them on our ProjectionStore. This is done in a new serverless function that our notify function notifies via QStash.

The handler of our serverless function is quite simple:

app/projection/route.ts
...
export const POST = validate(async (request: Request) => {
  const offset = await projectionStore.loadOffset()
  const events = await eventStore.loadEvents(offset)
  await projectionStore.processEvents(events)
  return new Response()
})
...
app/projection/route.ts
...
export const POST = validate(async (request: Request) => {
  const offset = await projectionStore.loadOffset()
  const events = await eventStore.loadEvents(offset)
  await projectionStore.processEvents(events)
  return new Response()
})
...

If the request came from QStash, we would load the last offset stored in the projectionStore to know which events aren’t processed yet. Then we load the latest events from the eventStore and feed them to the projectionStore so it can calculate the new state.

The function handler is wrapped in a validate function from our notificator, which ensures only QStash can call this endpoint.

utils/notificator.ts
...
export const validate = (handler: RequestHandler) => {
  const qstashReceiver = new Receiver({
    currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
    nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
  })
 
  return async (request: Request) => {
    const isValid = await qstashReceiver.verify({
      signature: request.headers.get("upstash-signature")!,
      body: await request.text(),
    })
 
    if (!isValid) {
      return new Response("Unauthorized", {
        status: 401,
        statusText: "Unauthorized",
      })
    }
 
    return await handler(request)
  }
}
...
utils/notificator.ts
...
export const validate = (handler: RequestHandler) => {
  const qstashReceiver = new Receiver({
    currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
    nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
  })
 
  return async (request: Request) => {
    const isValid = await qstashReceiver.verify({
      signature: request.headers.get("upstash-signature")!,
      body: await request.text(),
    })
 
    if (!isValid) {
      return new Response("Unauthorized", {
        status: 401,
        statusText: "Unauthorized",
      })
    }
 
    return await handler(request)
  }
}
...

The validate function uses the QStash signing keys to check every request before calling the actual handler of an endpoint.

The loadEvents method of our EventStore looks like this:

utils/eventstore.ts
...
async loadEvents(offset: number): Promise<TaskEvent[]> {
  const response = await fetch(this.fetchUrl, {
    ...this.fetchConfig,
    method: "POST",
    body: JSON.stringify({
      topic: process.env.UPSTASH_KAFKA_TOPIC,
      partition: 0,
      offset,
    }),
  })
  const messages: {value: string; offset: number }[] =
    await response.json()
  return messages.map(({ value, offset }) => {
    const event = JSON.parse(value)
    return { ...event, offset }
  })
}
...
utils/eventstore.ts
...
async loadEvents(offset: number): Promise<TaskEvent[]> {
  const response = await fetch(this.fetchUrl, {
    ...this.fetchConfig,
    method: "POST",
    body: JSON.stringify({
      topic: process.env.UPSTASH_KAFKA_TOPIC,
      partition: 0,
      offset,
    }),
  })
  const messages: {value: string; offset: number }[] =
    await response.json()
  return messages.map(({ value, offset }) => {
    const event = JSON.parse(value)
    return { ...event, offset }
  })
}
...

The method expects an offset, to ensure that only new events are loaded.

Background: Every message in Kafka gets an offset number. This number is incremented with each new message. These offsets are used to prevent the double processing of a message. Which is exactly what we want to do.

We fetch all messages since the offset and extract the events from them.

Finally, the processEvents method of the projectionStore will calculate our state for the query resolver:

utils/projectionstore.ts
...
async processEvents(events: TaskEvent[]) {
  if (events.length === 0) return
  const pipeline = this.redisClient.pipeline()
  for (let event of events) {
    switch (event.type) {
      case "create-task":
        pipeline.set(event.data.id, event.data)
        pipeline.sadd("tasks", event.data.id)
        break
      case "update-task":
        pipeline.set(event.data.id, event.data)
        break
      case "delete-task":
        peline.srem("tasks", event.data.id)
        pipeline.del(event.data.id)
        break
    }
  }
  pipeline.set("lastOffset", offset)
  await pipeline.exec()
}
...
utils/projectionstore.ts
...
async processEvents(events: TaskEvent[]) {
  if (events.length === 0) return
  const pipeline = this.redisClient.pipeline()
  for (let event of events) {
    switch (event.type) {
      case "create-task":
        pipeline.set(event.data.id, event.data)
        pipeline.sadd("tasks", event.data.id)
        break
      case "update-task":
        pipeline.set(event.data.id, event.data)
        break
      case "delete-task":
        peline.srem("tasks", event.data.id)
        pipeline.del(event.data.id)
        break
    }
  }
  pipeline.set("lastOffset", offset)
  await pipeline.exec()
}
...

Again, we use the pipeline feature of Upstash Redis to ensure we only send one request with all Redis commands.

We will add at least one command to the pipeline. The create-tasks and delete-task event requires an update to the Redis Set that stores our ID, so it will use two commands. Ultimately, we add the last event offset so our projectionStore remembers where it left off when the next notification arrives.

The pipeline ensures we only send one request to Upstash, but it also puts the commands in a transaction, which ensures that we only execute all or none of them. This way, we can be sure the lastOffset only gets updated when we really updated the state.

Functional Steps

Make sure you cloned the whole example from GitHub before moving on.

In the next steps, we will set up the infrastructure for our system. We will use Upstash services, so you need an Upstash account for this, but we only use the free tier here.

We will write all Upstash credentials into a .env.local file, so they are available in the environment variables of our development machine. Here is a template for this file:

.env.local
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...
 
UPSTASH_KAFKA_URL=...
UPSTASH_KAFKA_TOPIC=...
UPSTASH_KAFKA_USERNAME=...
UPSTASH_KAFKA_PASSWORD=...
 
QSTASH_URL=...
QSTASH_TOPIC=...
QSTASH_TOKEN=...
QSTASH_CURRENT_SIGNING_KEY=...
QSTASH_NEXT_SIGNING_KEY=...
.env.local
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...
 
UPSTASH_KAFKA_URL=...
UPSTASH_KAFKA_TOPIC=...
UPSTASH_KAFKA_USERNAME=...
UPSTASH_KAFKA_PASSWORD=...
 
QSTASH_URL=...
QSTASH_TOPIC=...
QSTASH_TOKEN=...
QSTASH_CURRENT_SIGNING_KEY=...
QSTASH_NEXT_SIGNING_KEY=...

Creating an Upstash Redis Database

  1. Go to the Upstash console
  2. Click the “Create Database” button
  3. Fill out the form shown in figure 2
  4. Click “Create”

Figure 2: Create Database Figure 2: Create Database

After creating the database, you can scroll down to the “REST API” section and copy the UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN to the .env.local file.

Creating an Upstash Kafka Database

  1. Open the Upstash Kafka Console
  2. Click the “Create Cluster” button
  3. Fill out the form shown in figure 3
  4. Click “Create Cluster”
  5. Fill out the form shown in figure 4
  6. Click “Create Topic”

Figure 3: Create cluster Figure 3: Create cluster

Figure 4: Create topic Figure 4: Create topic

After you created the database, scroll to the “REST API” section and copy the UPSTASH_KAFKA_URL, UPSTASH_KAFKA_USERNAME, and UPSTASH_KAFKA_PASSWORD to your .env.local file. Also, add the topic name “events” as UPSTASH_KAFKA_TOPIC to this file.

Creating an Upstash QStash Queue

  1. Open the QStash Console
  2. Navigate to the “Topics” tab
  3. Click the “Create Topic” button
  4. Enter “events” as the new topics name
  5. Click “Create”
  6. Select the “Add new URL” input
  7. Write your projection URL like seen in figure 5
  8. Click “Add”

Figure 5: Add new topic URL Figure 5: Add new topic URL

Copy the QSTASH_URL, QSTASH_TOKEN, QSTASH_CURRENT_SIGNING_KEY, and QSTASH_NEXT_SIGNING_KEY from the “Details” tab (seen in figure 6) to your .env.local file. Also, add QSTASH_TOPIC with “events” to the file.

Figure 6: QStash credentials Figure 6: QStash credentials

Testing the System

Now that we implemented and configured everything, we can test the system with Apollo’s GraphQL UI, which Next.js automatically starts on the /graphql endpoint.

Start the server with the following commands:

$ npm i $ npm run dev

If you’re running on GitHub Codespaces, you must make the server's port public, as seen in figure 7, to Apollo GraphQL UI can read the schema from the server.

Figure 7: Port configuration Figure 7: Port configuration

Creating a New Task

Command:

mutation {
  createTask(text: "Do the dishes!") {
    id
    text
    done
  }
}
mutation {
  createTask(text: "Do the dishes!") {
    id
    text
    done
  }
}

Console output:

[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] No offset found! Starting from 1.
[EventStore] Loading events at offset 1
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Creating new task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] No offset found! Starting from 1.
[EventStore] Loading events at offset 1
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Creating new task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!

 

Listing all Tasks

Query:

query {
  list {
    id
    text
    done
  }
}
query {
  list {
    id
    text
    done
  }
}

Console Output:

[ProjectionStore] Loading tasks...
[ProjectionStore] 1 tasks loaded!
[ProjectionStore] Loading tasks...
[ProjectionStore] 1 tasks loaded!

 

Updating an Existing Task

Command:

mutation {
  updateTask(id: "...", text: "Do the dishes!", done: true) {
    id
    text
    done
  }
}
mutation {
  updateTask(id: "...", text: "Do the dishes!", done: true) {
    id
    text
    done
  }
}

Console Output:

[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 2
[EventStore] Loading events at offset 2
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Updating task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 2
[EventStore] Loading events at offset 2
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Updating task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!

Deleting a Task

Command:

mutation {
  deleteTask(id: "...")
}
mutation {
  deleteTask(id: "...")
}

Console Output:

[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 3
[EventStore] Loading events at offset 3
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Deleting task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!
[EventStore] Saving Event...
[EventStore] Event saved!
[EventStore] Notifying other services...
[Notifier] Sending notification...
[Notifier] Notification sent!
[EventStore] Services notified!
[Notifier] Validating request...
[Notifier] Request valid!
[ProjectionHandler] Received notification!
[ProjectionStore] Loading offset...
[ProjectionStore] Offset at 3
[EventStore] Loading events at offset 3
[EventStore] 1 events loaded!
[ProjectionStore] Processing events...
[ProjectionStore] Deleting task with ID: 1679311303901.0488
[ProjectionStore] Upading offset...
[ProjectionStore] Events processed!
[ProjectionHandler] Done!

Summary

Applying the CQRS and Event-Sourcing patterns to serverless computing can help to structure your backends. This makes them more flexible in terms of data modeling and scalability.

Upstash services are ideally suited for this task; they bring all you need. You can store your events in Upstash Kafka, your projected state in Upstash Redis, and let QStash ensure all your services know when updates happen.

Next Steps

The resolvers all run in one function, which makes scaling them independently impossible. But since resolvers are asynchronous in nature, you could move the actual work in a separate API endpoint, so the resolvers just have to redirect. This way, you can scale each resolver as required.

Additional Resources

You can find the full code repository on GitHub.