Skip to main content
Some workflows require human approval or input before proceeding. When combined with Upstash Realtime, you can create interactive workflows that pause for user input and provide real-time feedback to your frontend during the entire process. This guide shows you how to implement a human-in-the-loop workflow pattern with real-time updates using Upstash Workflow and Upstash Realtime.

How It Works

In a human-in-the-loop workflow:
  1. The workflow executes initial steps and emits progress events
  2. The workflow pauses at a specific point using context.waitForEvent()
  3. A “waiting for input” event is emitted to notify the frontend
  4. The user makes a decision in the frontend (approve/reject)
  5. The frontend calls an API to notify the workflow using client.notify()
  6. The workflow resumes with the user’s decision
  7. An “input resolved” event is emitted so the frontend can update its UI
  8. The workflow continues and completes based on the decision

Prerequisites

  • An Upstash account with:
    • A QStash project for workflows
    • A Redis database for Realtime
  • Next.js application set up
  • Completed the basic real-time workflow setup

Event Types

For human-in-the-loop workflows, extend your schema in lib/realtime.ts with these additional event types:
const schema = {
  workflow: {
    runFinish: z.object({}),
    stepFinish: z.object({
      stepName: z.string(),
      result: z.unknown().optional()
    }),
    waitingForInput: z.object({
      eventId: z.string(),
      message: z.string()
    }),
    inputResolved: z.object({
      eventId: z.string()
    }),
  }
}
The new event types are:
  • waitingForInput: Emitted when the workflow pauses and needs user input
  • inputResolved: Emitted when the user provides input, so the frontend knows to clear the waiting state

Building the Workflow

1. Create the Workflow Endpoint

Create your workflow at app/api/workflow/human-in-loop/route.ts:
import { serve } from "@upstash/workflow/nextjs";
import { realtime } from "@/lib/realtime";

type WorkflowPayload = {
  userId: string;
  action: string;
};

export const { POST } = serve<WorkflowPayload>(async (context) => {
  const { userId, action } = context.requestPayload;
  const workflowRunId = context.workflowRunId;
  const channel = realtime.channel(workflowRunId);

  // Step 1: Initial Processing
  await context.run("initial-processing", async () => {
    // Your processing logic
    const result = {
      preprocessed: true,
      userId,
      action,
      requiresApproval: true,
    };

    // Emit step completion
    await channel.emit("workflow.stepFinish", {
      stepName: "initial-processing",
      result,
    });
    return result;
  });

  // Step 2: Wait for Human Approval
  const eventId = `approval-${workflowRunId}`;

  const [{ eventData, timeout }] = await Promise.all([
    // Wait for approval event
    context.waitForEvent<{ approved: boolean }>("wait-for-approval", eventId, { timeout: "5m" }),
    // Notify frontend that we're waiting
    context.run("notify-waiting", () =>
      channel.emit("workflow.waitingForInput", {
        eventId,
        message: `Waiting for approval to process action: ${action}`,
      })
    ),
  ]);

  // Handle timeout
  if (timeout) {
    return { success: false, reason: "timeout" };
  }

  // Notify that input was resolved
  await context.run("input-resolved", () =>
    channel.emit("workflow.inputResolved", {
      eventId,
    })
  );

  const status = eventData.approved ? "approved" : "rejected";

  // Step 3: Process based on approval
  await context.run(`process-${status}`, async () => {
    const result = {
      status,
      processedAt: Date.now(),
      action,
      userId,
    };

    // Emit step completion
    await channel.emit("workflow.stepFinish", {
      stepName: `process-${status}`,
      result,
    });
    return result;
  });

  // Step 4: Finalize (only if approved)
  if (eventData.approved) {
    // Additional steps...
  }

  // Emit completion
  await context.run("run-finish", () => channel.emit("workflow.runFinish", {}));

  return {
    success: true,
    approved: eventData.approved,
    workflowRunId,
  };
});
Key patterns:
  1. Using Promise.all: We wait for the event and emit the “waiting” notification simultaneously, ensuring the frontend is notified immediately
  2. Unique event IDs: Use a unique eventId (like approval-${workflowRunId}) to identify which approval request this is
  3. Timeout handling: Always handle the timeout case when waiting for events
  4. Input resolved event: Emit inputResolved after receiving input so the frontend knows to clear the waiting UI

2. Create the Notify Endpoint

Create an endpoint at app/api/notify/route.ts to handle user input:
import { Client } from "@upstash/workflow";
import { NextRequest, NextResponse } from "next/server";

const workflowClient = new Client({
  baseUrl: process.env.QSTASH_URL!,
  token: process.env.QSTASH_TOKEN!,
})

export async function POST(request: NextRequest) {
  const body = await request.json();
  const { eventId, eventData } = body;

  if (!eventId) {
    return NextResponse.json(
      { success: false, error: "eventId is required" },
      { status: 400 }
    );
  }

  // Notify the workflow
  await workflowClient.notify({
    eventId,
    eventData,
  });

  return NextResponse.json({ success: true });
}

Building the Frontend

1. Extend the Custom Hook

Extend your hook from the basic example to handle waiting states:
"use client";

import { useRealtime } from "@/lib/realtime-client";
import { useState, useCallback } from "react";

interface WorkflowStep {
  stepName: string;
  result?: unknown;
}

interface WaitingState {
  eventId: string;
  message: string;
}

export function useWorkflowWithRealtime() {
  const [workflowRunId, setWorkflowRunId] = useState<string | null>(null);
  const [steps, setSteps] = useState<WorkflowStep[]>([]);
  const [waitingState, setWaitingState] = useState<WaitingState | null>(null);
  const [isTriggering, setIsTriggering] = useState(false);
  const [isRunFinished, setIsRunFinished] = useState(false);

  useRealtime({
    enabled: !!workflowRunId,
    channels: workflowRunId ? [workflowRunId] : [],
    events: [
      "workflow.stepFinish",
      "workflow.runFinish",
      "workflow.waitingForInput",
      "workflow.inputResolved",
    ],
    onData({ event, data }) {
      if (event === "workflow.stepFinish") {
        setSteps((prev) => [
          ...prev,
          {
            stepName: data.stepName,
            result: data.result,
          },
        ]);
      } else if (event === "workflow.runFinish") {
        setIsRunFinished(true);
      } else if (event === "workflow.inputResolved") {
        // Clear waiting state if it matches
        setWaitingState((prev) => 
          prev?.eventId === data.eventId ? null : prev
        );
      } else if (event === "workflow.waitingForInput") {
        setWaitingState({
          eventId: data.eventId,
          message: data.message,
        });
      }
    },
  });

  const trigger = useCallback(async () => {
    setIsTriggering(true);
    setSteps([]);
    setWaitingState(null);
    setIsRunFinished(false);

    const response = await fetch("/api/trigger", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ workflowType: "human-in-loop" }),
    });

    const data = await response.json();
    setWorkflowRunId(data.workflowRunId);
    setIsTriggering(false);
  }, []);

  const continueWorkflow = useCallback(
    async (data: { approved: boolean }) => {
      if (!waitingState) {
        throw new Error("No workflow waiting for input");
      }

      const response = await fetch("/api/notify", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          eventId: waitingState.eventId,
          eventData: data,
        }),
      });

      if (!response.ok) {
        throw new Error("Failed to notify workflow");
      }

      // The waiting state will be cleared when we receive inputResolved event
    },
    [waitingState]
  );

  return {
    trigger,
    continueWorkflow,
    isTriggering,
    workflowRunId,
    steps,
    waitingState,
    isRunFinished,
  };
}
Key additions:
  • waitingState: Tracks when the workflow is waiting for input
  • continueWorkflow: Function to submit user decisions back to the workflow
  • Multiple events subscription: Uses events array to subscribe to multiple event types
  • Input resolved handling: Clears the waiting state when the workflow receives the user’s input

2. Use the Hook with Approval UI

"use client";

import { useWorkflowWithRealtime } from "@/hooks/useWorkflowWithRealtime";

export default function WorkflowPage() {
  const {
    trigger,
    isTriggering,
    steps,
    isRunFinished,
    waitingState,
    continueWorkflow,
  } = useWorkflowWithRealtime();

  return (
    <div
      style={{
        maxWidth: "600px",
        margin: "40px auto",
        fontFamily: "Arial, sans-serif",
      }}
    >
      <button onClick={trigger} disabled={isTriggering}>
        {isTriggering ? "Starting..." : "Click to Trigger Workflow"}
      </button>

      {isRunFinished && (
        <h3 style={{ marginTop: "20px" }}>Workflow Finished!</h3>
      )}

      {/* Show workflow steps */}
      <h3 style={{ marginTop: "20px" }}>Workflow Steps:</h3>
      <div>
        {steps.map((step, index) => (
          <div key={index}>
            <strong>{step.stepName}</strong>
            {Boolean(step.result) && <span>: {JSON.stringify(step.result)}</span>}
          </div>
        ))}
      </div>

      {/* Show approval UI when waiting for input */}
      {waitingState && (
        <div style={{ marginTop: "20px" }} className="approval-prompt">
          <p>{waitingState.message}</p>
          <p>
            <button onClick={() => continueWorkflow({ approved: true })}>
              Click to Approve
            </button>
          </p>
          <p>
            <button onClick={() => continueWorkflow({ approved: false })}>
              Click to Reject
            </button>
          </p>
        </div>
      )}
    </div>
  );
}

How the Pattern Works

Timeline of Events

  1. Initial processing: stepFinish event → Frontend shows completed step
  2. Waiting for approval: waitingForInput event → Frontend shows approval buttons
  3. User clicks approve/reject: Frontend calls /api/notify
  4. Workflow resumes: inputResolved event → Frontend hides approval buttons
  5. Processing continues: More stepFinish events as workflow continues
  6. Workflow completes: runFinish event → Frontend shows “Workflow Finished!”

Benefits

  • Real-time feedback: Users see exactly when their approval is needed
  • No polling: Instant updates via Server-Sent Events
  • Timeout handling: Workflows don’t hang indefinitely waiting for input

Full Example

For a complete working example with all steps, error handling, and full UI components, check out the Upstash Realtime example on GitHub.

Next Steps