Caching LLM Output On-Demand with LangChain, Redis and QStash
In some applications, it may not be feasible to query an LLM when a user requests it. The process of sending a prompt to an API and waiting for its response can be particularly time-consuming. For more complex tasks that involve LangChain, like processing data from a PDF or audio file and feeding it to the LLM, the delay becomes an even bigger hit to user experience.
While streaming is a suitable solution in many cases, it may be more convenient to do everything at some arbitrary point before the user even visits your application. This way, the user can be served the cached response near instantaneously, without having to wait for the LLM to generate it. This is especially useful for applications that use the same input for every user, like a book or news summarizer. But without input from the user, how can we get LangChain to cache a response?
We'll need a microservice that we can call on-demand, capable of sending a prompt to LangChain and caching the response using Upstash Redis. We'll also be using Upstash's Rate Limiting SDK to ensure that we don't accidentally exceed our rate limit. QStash is the most versatile way to call our microservice—we can set it up to cache responses whenever we want, even passing a cron job if our application requires it. It also provides a dashboard that we can use to monitor our microservice's usage. In the case that our endpoint is overloaded or something else goes wrong, QStash will retry the HTTP request and ensure that our message is delivered.
Our microservice will be built using Hono.js, a lightweight and fast web framework for the edge. In this demo, we'll be using a Cloudflare Worker to host our microservice. Thanks to Upstash, however, it can be deployed virtually anywhere, including other edge/serverless runtimes.
You can find the full source code for this demo here.
Prerequisites
Getting started
Creating the project
Unlike many other create-<package>
npm packages, create-hono
requires you to be in an empty directory. First, create a new empty directory for your project and navigate to it:
mkdir langchain-qstash
cd langchain-qstash
mkdir langchain-qstash
cd langchain-qstash
With the recent release of v1.0.0, Bun will be used to scaffold this project. However, it is also possible to use create-hono
with npm
, pnpm
, and yarn
. Be sure to select cloudflare-workers
when prompted for the template. You may see bun
as a separate option, but this is not the template used in the demo.
bun create hono@latest
bun create hono@latest
Installing dependencies
Now, you can now run the following command with the package manager of your choice to install the remaining dependencies:
bun install @upstash/qstash @upstash/ratelimit @upstash/redis langchain openai
bun install @upstash/qstash @upstash/ratelimit @upstash/redis langchain openai
Configuring the project
As of the time of writing, create-hono
includes lockfiles in its .gitignore
by default. While not strictly required, you can update your .gitignore
to exclude lockfiles as follows:
node_modules
dist
.wrangler
.dev.vars
wrangler.toml
node_modules
dist
.wrangler
.dev.vars
wrangler.toml
Now, we can set the environment variables from the prerequisites. They can be appended to your wrangler.toml
, which should already be excluded from source control.
[vars]
QSTASH_CURRENT_SIGNING_KEY="sig_********"
QSTASH_NEXT_SIGNING_KEY="sig_********"
UPSTASH_REDIS_REST_URL="https://********.upstash.io"
UPSTASH_REDIS_REST_TOKEN="********"
OPENAI_API_KEY="sk-********"
[vars]
QSTASH_CURRENT_SIGNING_KEY="sig_********"
QSTASH_NEXT_SIGNING_KEY="sig_********"
UPSTASH_REDIS_REST_URL="https://********.upstash.io"
UPSTASH_REDIS_REST_TOKEN="********"
OPENAI_API_KEY="sk-********"
Finally, modify your src/index.ts
to add typing for the environment variables from earlier:
type Bindings = {
QSTASH_CURRENT_SIGNING_KEY: string;
QSTASH_NEXT_SIGNING_KEY: string;
UPSTASH_REDIS_REST_URL: string;
UPSTASH_REDIS_REST_TOKEN: string;
OPENAI_API_KEY: string;
};
const app = new Hono<{ Bindings: Bindings }>();
type Bindings = {
QSTASH_CURRENT_SIGNING_KEY: string;
QSTASH_NEXT_SIGNING_KEY: string;
UPSTASH_REDIS_REST_URL: string;
UPSTASH_REDIS_REST_TOKEN: string;
OPENAI_API_KEY: string;
};
const app = new Hono<{ Bindings: Bindings }>();
The benefit of having two signing keys comes from the fact that you can roll them once without having to update your environment variables, because QStash will automatically try and use the next signing key if the current one fails.
Development
At this stage, wrangler
should be able to read your environment variables and deploy your project. To do so, you can run the following command with your preferred package manager:
bun run dev
bun run dev
wrangler
will spin up your Hono.js server and provide you with a local URL on port 8787 to test your project. If you would like to directly start an edge preview session instead of testing locally, modify your dev package.json
script as follows:
"dev": "wrangler dev src/index.ts --remote",
"dev": "wrangler dev src/index.ts --remote",
In order to view your Cloudflare Worker's configuration and logs, you'll first have to deploy your worker:
bun run deploy
bun run deploy
This should walk you through the steps of signing into Cloudflare, deploying your project for the first time automatically after authentication.
Creating middleware
For debugging purposes, it will be useful to log the requests we receive from QStash. They'll be visible in the dashboard for our Cloudflare Worker under the "Logs" tab once you press Begin log stream
. Hono.js provides a logger middleware that we can add to our router:
import { Hono } from "hono";
import { logger } from "hono/logger";
// snip
const app = new Hono<{ Bindings: Bindings }>();
app.use("*", logger());
// snip
import { Hono } from "hono";
import { logger } from "hono/logger";
// snip
const app = new Hono<{ Bindings: Bindings }>();
app.use("*", logger());
// snip
Connecting QStash to Cloudflare Workers
Before we can start developing an API endpoint using Hono.js, we need a way to intercept messages sent by QStash, and discard the request if it has an invalid signature. Thankfully, Hono.js provides a way to implement our own custom middleware, which always runs before handlers. It is robust enough that we can organize our middleware into separate files as we see fit.
In our middleware, we can make use of QStash's receiver. Let's create a new file called src/middleware/verify.ts
, and export a function typed with MiddlewareHandler
:
import { Receiver } from "@upstash/qstash";
import { type MiddlewareHandler } from "hono";
declare global {
interface Response {
locals: {
query: string;
};
}
}
export const verify: MiddlewareHandler = async (ctx, next) => {
const receiver = new Receiver({
currentSigningKey: ctx.env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: ctx.env.QSTASH_NEXT_SIGNING_KEY,
});
};
import { Receiver } from "@upstash/qstash";
import { type MiddlewareHandler } from "hono";
declare global {
interface Response {
locals: {
query: string;
};
}
}
export const verify: MiddlewareHandler = async (ctx, next) => {
const receiver = new Receiver({
currentSigningKey: ctx.env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: ctx.env.QSTASH_NEXT_SIGNING_KEY,
});
};
Hono.js passes a ctx
(context) object to each middleware and handler. This isn't directly the same as Cloudflare Workers' ExecutionContext
—but it does contain the same information. Hono's ctx
object is nearly equivalent to the request object, environment, and execution context passed to a default Cloudflare Workers fetch
handler, all combined into one object.
We also modify the global Response
interface to include a custom locals
. Unlike Express, Hono does not create a res.locals
object by default. We'll use this to pass the query to the handler later. Next, we access the typed environment variables from earlier to construct our receiver.
We can now use the receiver to verify the signature of the request:
// snip
const body = await ctx.req.text();
ctx.res.locals = {
query: JSON.parse(body).query,
};
const isValid = await receiver
.verify({
signature: ctx.req.headers.get("Upstash-Signature")!,
body,
})
.catch((err) => {
console.error(err);
return false;
});
if (!isValid) {
return new Response("Invalid signature", { status: 401 });
}
await next();
// snip
const body = await ctx.req.text();
ctx.res.locals = {
query: JSON.parse(body).query,
};
const isValid = await receiver
.verify({
signature: ctx.req.headers.get("Upstash-Signature")!,
body,
})
.catch((err) => {
console.error(err);
return false;
});
if (!isValid) {
return new Response("Invalid signature", { status: 401 });
}
await next();
First, we get the request object from the context. Hono.js conveniently uses only Web Standard APIs, like fetch
, URL
, Request
, and Response
. Although we are using it on Cloudflare Workers in this demo, this enables it to run on countless other environments, including edge/serverless environments.
From the request object, we read the body of the request as text as well as the Upstash-Signature
header, which contains a custom JWT. We can use the receiver to verify the signature contained in the JWT by passing it the signature and body of the request. In the .catch
handler, we make sure to log the error as well as return false
to indicate that the signature is invalid.
Because we are consuming the body of the request, we won't have access to it in our actual handler later. This is because the body is a ReadableStream
that can only be consumed once. Instead, to pass the query to the handler, we can add it to the locals
object on the response. Finally, If the signature is invalid, we return a 401 Unauthorized response. Otherwise, we call next()
to continue to the next middleware or handler.
Adding rate-limiting
Because we're going to be connecting this endpoint to our personal OpenAI API keys, it is important to ensure that we don't accidentally exceed our rate limit. Thankfully, Upstash provides a Rate Limiting SDK that we can use to easily add rate-limiting to our endpoint. If the number of requests exceeds a given threshold, we will return a 429 Too Many Requests response.
Let's start by creating a new middleware in src/middleware/ratelimit.ts
:
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis/cloudflare";
import { type MiddlewareHandler } from "hono";
export const ratelimit: MiddlewareHandler = async (ctx, next) => {
const redis = new Redis({
url: ctx.env.UPSTASH_REDIS_REST_URL,
token: ctx.env.UPSTASH_REDIS_REST_TOKEN,
});
const ratelimit = new Ratelimit({
redis,
limiter: Ratelimit.slidingWindow(10, "10 s"),
analytics: true,
});
await next();
};
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis/cloudflare";
import { type MiddlewareHandler } from "hono";
export const ratelimit: MiddlewareHandler = async (ctx, next) => {
const redis = new Redis({
url: ctx.env.UPSTASH_REDIS_REST_URL,
token: ctx.env.UPSTASH_REDIS_REST_TOKEN,
});
const ratelimit = new Ratelimit({
redis,
limiter: Ratelimit.slidingWindow(10, "10 s"),
analytics: true,
});
await next();
};
Here, we are connecting our Upstash Redis database to the Rate Limiting SDK. We must create the Redis
instance manually using the environment variables from ctx.env
, because Redis.fromEnv()
fails to read them automatically when using Cloudflare Workers. When analytics
is true, the SDK will automatically call Redis to keep a cache of calls for each identifier. It uses @upstash/ratelimit
as the prefix by default.
The SDK also supports using an ephemeral cache that uses a Map
instead of Redis, which can save time and resources under extreme load:
const cache = new Map(); // outside of the middleware handler
// snip
const ratelimit = new Ratelimit({
ephemeralCache: cache,
// snip
});
const cache = new Map(); // outside of the middleware handler
// snip
const ratelimit = new Ratelimit({
ephemeralCache: cache,
// snip
});
We also provide a limiter
to the SDK. This is a function that tells the SDK how to rate limit requests. We are using the slidingWindow
limiter, configured to allow up to 10 requests every 10 seconds.
The SDK also lets us provide an identifier for each request, and it will automatically keep track of the number of requests made within a given time period. In a real-world application, we can use the user's IP address as the identifier, but for the sake of this demo, we will use a constant string to limit all requests with a single ratelimit:
// snip
const identifier = "openai";
const { success } = await ratelimit.limit(identifier);
if (!success) {
return new Response("Too many requests", { status: 429 });
}
await next();
// snip
// snip
const identifier = "openai";
const { success } = await ratelimit.limit(identifier);
if (!success) {
return new Response("Too many requests", { status: 429 });
}
await next();
// snip
If the request is rate-limited, we return a 429 Too Many Requests response. Otherwise, we call next()
to continue to the next middleware or handler. In the case that we had connected multiple Redis databases to the SDK, it will have to perform synchronization between them. This would have lead to dangling promises on Vercel Edge and Cloudflare Workers, which we can take care of like this:
const { pending, success } = await ratelimit.limit(identifier);
ctx.event.waitUntil(pending);
const { pending, success } = await ratelimit.limit(identifier);
ctx.event.waitUntil(pending);
The way to do this can vary depending on the library you are using. Since Hono uses Web Standard APIs, we can use the event.waitUntil
method to wait for the promise to resolve. However, for this demo, we will only be using a single Redis database, so we don't have to worry about dangling promises.
Receiving messages from QStash
When we send an HTTP request to QStash, the destination we specify will be the URL of our Cloudflare Workers endpoint. We can use Hono.js to create a handler for this endpoint. Let's add a new one in src/index.ts
, while enabling our middleware from before:
// snip
import { ratelimit } from "./middleware/ratelimit";
import { verify } from "./middleware/verify";
// snip
app.post("/api/announce", ratelimit, verify, async (ctx) => {});
// snip
// snip
import { ratelimit } from "./middleware/ratelimit";
import { verify } from "./middleware/verify";
// snip
app.post("/api/announce", ratelimit, verify, async (ctx) => {});
// snip
In this handler, we can use LangChain to generate a response to the given prompt, using Upstash Redis to cache the result:
// snip
import { Redis } from "@upstash/redis/cloudflare";
import { UpstashRedisCache } from "langchain/cache/upstash_redis";
import { OpenAI } from "langchain/llms/openai";
// snip
app.post("/api/announce", ratelimit, verify, async (ctx) => {
const redis = new Redis({
url: ctx.env.UPSTASH_REDIS_REST_URL,
token: ctx.env.UPSTASH_REDIS_REST_TOKEN,
});
const cache = new UpstashRedisCache({ client: redis });
const model = new OpenAI({
cache,
openAIApiKey: ctx.env.OPENAI_API_KEY,
});
const query = ctx.res.locals.query;
const result = await model
.call(query)
.then((result) => {
console.log(result);
return result;
})
.catch((err) => console.error(err));
return new Response(result ?? "", { status: 200 });
});
// snip
// snip
import { Redis } from "@upstash/redis/cloudflare";
import { UpstashRedisCache } from "langchain/cache/upstash_redis";
import { OpenAI } from "langchain/llms/openai";
// snip
app.post("/api/announce", ratelimit, verify, async (ctx) => {
const redis = new Redis({
url: ctx.env.UPSTASH_REDIS_REST_URL,
token: ctx.env.UPSTASH_REDIS_REST_TOKEN,
});
const cache = new UpstashRedisCache({ client: redis });
const model = new OpenAI({
cache,
openAIApiKey: ctx.env.OPENAI_API_KEY,
});
const query = ctx.res.locals.query;
const result = await model
.call(query)
.then((result) => {
console.log(result);
return result;
})
.catch((err) => console.error(err));
return new Response(result ?? "", { status: 200 });
});
// snip
Here, we import the classes required to set up caching for LangChain. Then, we construct the chain using the OpenAI model and the Upstash Redis cache, passing an Upstash Redis instance to the cache. Again, we'll have to pass our OpenAI API Key to the model manually as it can't be read automatically on Cloudflare Workers.
Then, we access the query that was previously saved to res.locals
, using it to call the model and log the result. Finally, we return the result as a response, or an empty string if something went wrong. At this stage, we can test our endpoint by sending a POST request to it and checking the response. First, rerun the deploy
script in your package.json:
bun run deploy
bun run deploy
After you retrieve the URL of your Cloudflare Workers endpoint, you can send a POST request to it using curl
:
curl -XPOST \
"https://qstash.upstash.io/v2/publish/https://<YOUR_API_URL>.workers.dev/api/announce" \
-H "Authorization: Bearer <YOUR_QSTASH_TOKEN>" \
-H "Content-Type: application/json" \
-d "{ \"query\": \"What's the derivative of e^x?\" }"
curl -XPOST \
"https://qstash.upstash.io/v2/publish/https://<YOUR_API_URL>.workers.dev/api/announce" \
-H "Authorization: Bearer <YOUR_QSTASH_TOKEN>" \
-H "Content-Type: application/json" \
-d "{ \"query\": \"What's the derivative of e^x?\" }"
Upstash provides a QStash console that you can use to send these requests more easily. It's just as simple to give QStash a cron job to run our request repeatedly. QStash passes the body of the request as-is to our endpoint. We're sending the following JSON payload, where query
is the prompt for the model:
{
"query": "What's the derivative of e^x?"
}
{
"query": "What's the derivative of e^x?"
}
Conclusion
The Rate Limiting SDK successfully caches the number of calls per identifier:
Likewise, generated content from LangChain successfully gets cached in our Upstash Redis database:
And finally, the response is logged in our Cloudflare Worker's logs: