Korai Docs
Infrastructure

Inngest Background Jobs

Serverless background job processing with Inngest

Inngest Background Jobs

The application uses Inngest for reliable, serverless background job processing. Inngest handles long-running tasks like video processing, clip identification, and export generation asynchronously without blocking API responses.

Inngest Client Configuration

Client Initialization

import { Inngest } from 'inngest';

export const inngest = new Inngest({
  id: 'korai-app',
  name: 'Korai App',
  eventKey: process.env.INNGEST_EVENT_KEY // Optional: only needed for production
});

How It Works:

  • Creates Inngest client for sending/receiving events
  • id: Unique identifier for the application
  • name: Human-readable application name
  • eventKey: Optional authentication key (required in production)

Development vs Production:

  • Development: Works without keys (local dev mode)
  • Production: Requires INNGEST_EVENT_KEY and INNGEST_SIGNING_KEY

API Route Handler

Inngest Serve Endpoint

import { serve } from 'inngest/next';
import { inngest } from '@/lib/inngest';
import { identifyClips, processClips } from '@/inngest/functions';

export const { GET, POST, PUT } = serve({
  client: inngest,
  functions: [identifyClips, processClips],
  signingKey: process.env.INNGEST_SIGNING_KEY // Optional: only needed for production
});

Location: /app/api/inngest/route.ts

How It Works:

  1. Exports HTTP handlers (GET, POST, PUT) for Next.js App Router
  2. Registers functions with Inngest platform
  3. Receives webhook events from Inngest
  4. Executes corresponding function when event triggered

Functions Registered:

  • identifyClips: Identifies viral clips from YouTube videos
  • processClips: Processes and exports selected clips

Signing Key:

  • Validates webhooks from Inngest
  • Prevents unauthorized event execution
  • Optional in development, required in production

Background Functions

Identify Clips Function

export const identifyClips = inngest.createFunction(
  { id: 'identify-clips', name: 'Identify Viral Clips from YouTube Video' },
  { event: 'video/identify.clips' },
  async ({ event, step }) => {
    const { youtubeUrl, prompt, userId } = event.data;

    // Step 1: Generate UUID and create initial video record
    const videoData = await step.run('create-video-record', async () => {
      const videoUuid = uuidv4();
      const s3Key = `youtube-videos/${videoUuid}/yt`;

      const video = await prisma.video.create({
        data: {
          userId,
          youtubeUrl,
          s3Key,
          prompt: prompt || ''
        }
      });

      return { video, s3Key };
    });

    // Step 2: Call the backend API to identify clips
    const clipsResponse = await step.run(
      'call-identify-clips-api',
      async () => {
        const apiUrl = process.env.CLIPS_API_URL;

        if (!apiUrl) {
          throw new Error('CLIPS_API_URL environment variable is not set');
        }

        const response = await fetch(apiUrl, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            Authorization: `Bearer ${process.env.CLIPS_API_TOKEN}`
          },
          body: JSON.stringify({
            youtube_url: youtubeUrl,
            s3_key_yt: videoData.s3Key,
            prompt: prompt || ''
          })
        });

        if (!response.ok) {
          throw new Error(`API request failed: ${response.statusText}`);
        }

        const data: IdentifyClipsResponse = await response.json();
        return data;
      }
    );

    // Step 3: Update video record with response data and create clips
    await step.run('save-clips-to-database', async () => {
      // Update video with metadata
      await prisma.video.update({
        where: { id: videoData.video.id },
        data: {
          totalClips:
            typeof clipsResponse.total_clips === 'number'
              ? clipsResponse.total_clips
              : parseInt(clipsResponse.total_clips) || 0,
          videoDuration: clipsResponse.video_duration?.toString() || null,
          detectedLanguage: clipsResponse.detected_language,
          s3Path: clipsResponse.s3_path
        }
      });

      // Create clips
      if (
        clipsResponse.identified_clips &&
        clipsResponse.identified_clips.length > 0
      ) {
        await prisma.clip.createMany({
          data: clipsResponse.identified_clips.map((clip) => ({
            videoId: videoData.video.id,
            start: clip.start.toString(),
            end: clip.end.toString(),
            title: clip.title,
            summary: clip.summary,
            viralityScore: clip.virality_score.toString(),
            relatedTopics: clip.related_topics,
            transcript: clip.transcript
          }))
        });
      }
    });

    return {
      videoId: videoData.video.id,
      s3Key: videoData.s3Key,
      totalClips: clipsResponse.total_clips
    };
  }
);

Function Configuration:

  • ID: identify-clips (unique function identifier)
  • Name: Identify Viral Clips from YouTube Video (display name)
  • Event: Triggered by video/identify.clips event

Event Data:

{
  youtubeUrl: string;
  prompt: string;
  userId: string;
}

Process Flow:

Step 1: Create Video Record

  • Generates UUID for unique video identifier
  • Creates S3 key: youtube-videos/{uuid}/yt
  • Inserts video record in database with:
    • userId: Owner of the video
    • youtubeUrl: Source video URL
    • s3Key: Storage location
    • prompt: Custom instructions for clip identification

Step 2: Call Clips API

  • POSTs to external clips identification service
  • Sends:
    • youtube_url: Video to process
    • s3_key_yt: Where to store downloaded video
    • prompt: AI instructions for clip identification
  • Authenticates with Bearer token
  • Returns clips data with timestamps, titles, summaries, virality scores

Step 3: Save Clips to Database

  • Updates video record with metadata:
    • totalClips: Number of clips found
    • videoDuration: Length of video
    • detectedLanguage: Language detected
    • s3Path: Full S3 path to video
  • Creates clip records in batch using createMany:
    • Links each clip to video via videoId
    • Stores start/end times, title, summary
    • Stores virality score and related topics
    • Stores transcript segment

Return Value:

{
  videoId: string;
  s3Key: string;
  totalClips: number;
}

Why Steps?:

  • Each step.run() is atomic and retriable
  • If function fails, resumes from last completed step
  • Prevents duplicate work on retry
  • Provides progress visibility in Inngest dashboard

Process Clips Function

export const processClips = inngest.createFunction(
  { id: 'process-clips', name: 'Process and Export Viral Clips' },
  { event: 'clips/process' },
  async ({ event, step }) => {
    const {
      videoId,
      s3Key,
      selectedClips,
      targetLanguage,
      aspectRatio,
      userId
    } = event.data;

    // Step 1: Call the backend API to process clips
    const processedResponse = await step.run(
      'call-process-clips-api',
      async () => {
        const apiUrl = process.env.PROCESS_CLIPS_API_URL;

        if (!apiUrl) {
          throw new Error(
            'PROCESS_CLIPS_API_URL environment variable is not set'
          );
        }

        const payload: ProcessClipsPayload = {
          s3_key: s3Key,
          clips: selectedClips.map((clip: { start: string; end: string }) => ({
            start: parseFloat(clip.start),
            end: parseFloat(clip.end)
          })),
          prompt: '',
          target_language: targetLanguage,
          aspect_ratio: aspectRatio,
          subtitles: true,
          subtitle_customization: {
            enabled: true,
            position: 'middle',
            font_size: 120,
            font_family: 'Anton',
            font_color: '#FFFFFF',
            outline_color: '#000000',
            outline_width: 2.5,
            background_color: null,
            background_opacity: 0.0,
            shadow_enabled: true,
            shadow_color: '#808080',
            shadow_offset: 3.0,
            max_words_per_line: 3,
            margin_horizontal: 60,
            margin_vertical: 180,
            fade_in_duration: 0,
            fade_out_duration: 0,
            karaoke_enabled: true,
            karaoke_highlight_color: '#0DE050',
            karaoke_popup_scale: 1.25
          }
        };

        const response = await fetch(apiUrl, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            Authorization: `Bearer ${process.env.CLIPS_API_TOKEN}`
          },
          body: JSON.stringify(payload)
        });

        if (!response.ok) {
          throw new Error(`API request failed: ${response.statusText}`);
        }

        const data: ProcessClipsResponse = await response.json();
        return data;
      }
    );

    // Step 2: Save exported clips to database
    await step.run('save-exported-clips-to-database', async () => {
      if (
        processedResponse.processed_clips &&
        processedResponse.processed_clips.length > 0
      ) {
        await prisma.exportedClip.createMany({
          data: processedResponse.processed_clips.map((clip) => ({
            videoId,
            start: clip.start.toString(),
            end: clip.end.toString(),
            s3Key: clip.s3_key,
            aspectRatio,
            targetLanguage
          }))
        });
      }
    });

    return {
      videoId,
      processedClips: processedResponse.processed_clips.length
    };
  }
);

Function Configuration:

  • ID: process-clips (unique function identifier)
  • Name: Process and Export Viral Clips (display name)
  • Event: Triggered by clips/process event

Event Data:

{
  videoId: string;
  s3Key: string;
  selectedClips: Array<{ start: string; end: string }>;
  targetLanguage: string | null;
  aspectRatio: string;
  userId: string;
}

Process Flow:

Step 1: Call Process API

  • POSTs to external video processing service
  • Payload includes:
    • s3_key: Original video location
    • clips: Array of clip timestamps to extract
    • target_language: Translation language (null if none)
    • aspect_ratio: Output format (1:1, 16:9, 9:16)
    • subtitles: Enable subtitle burning
    • subtitle_customization: Detailed subtitle styling:
      • Position: Subtitle placement (middle, top, bottom)
      • Font: Family (Anton), size (120), color (#FFFFFF)
      • Outline: Color (#000000), width (2.5)
      • Shadow: Enabled, color (#808080), offset (3.0)
      • Layout: Max words per line (3), margins (60, 180)
      • Animation: Fade durations, karaoke effect
      • Karaoke: Highlight color (#0DE050), popup scale (1.25)
  • API processes video:
    • Extracts clip segments
    • Re-encodes to aspect ratio
    • Burns subtitles with styling
    • Translates if target language specified
    • Uploads to S3
  • Returns array of processed clips with S3 keys

Step 2: Save Exported Clips

  • Creates ExportedClip records for each processed clip
  • Stores:
    • videoId: Parent video reference
    • start/end: Original clip timestamps
    • s3Key: Location of processed video file
    • aspectRatio: Export format
    • targetLanguage: Translation language (if any)

Return Value:

{
  videoId: string;
  processedClips: number;
}

Triggering Functions

Sending Events

From API Route:

import { inngest } from '@/lib/inngest';

export async function POST(req: Request) {
  const { youtubeUrl, prompt } = await req.json();
  const { userId } = await auth();

  // Trigger identify clips function
  await inngest.send({
    name: 'video/identify.clips',
    data: {
      youtubeUrl,
      prompt,
      userId
    }
  });

  return Response.json({ 
    message: 'Video processing started',
    status: 'pending'
  });
}

Event Properties:

  • name: Event identifier matching function trigger
  • data: Payload passed to function

From Client Action:

export async function startClipProcessing(
  videoId: string,
  selectedClips: Clip[],
  options: ProcessOptions
) {
  const response = await fetch('/api/clips/process', {
    method: 'POST',
    body: JSON.stringify({
      videoId,
      selectedClips,
      ...options
    })
  });
  
  // API route triggers Inngest event
  return response.json();
}

Key Benefits

Reliability

  • Automatic Retries: Failed steps retry with exponential backoff
  • Durability: Events persisted, not lost if function crashes
  • Idempotency: Steps won't re-execute if already completed
  • Error Handling: Catches and logs errors with full context

Scalability

  • Serverless: No servers to manage
  • Auto-Scaling: Handles any event volume
  • Concurrent Execution: Multiple functions run in parallel
  • No Timeouts: Long-running jobs supported (hours)

Developer Experience

  • Local Development: Works without external dependencies
  • Dashboard: View function runs, logs, and errors
  • Replay: Re-run failed functions from dashboard
  • Debugging: Step-by-step execution visibility

Use Cases

  • Video Processing: Long operations without API timeouts
  • Batch Operations: Process many items asynchronously
  • External API Calls: Retry on failure automatically
  • Scheduled Tasks: Cron-like functionality
  • Webhooks: Handle incoming webhooks reliably

Monitoring

Inngest Dashboard

View in Inngest dashboard:

  • Function Runs: All executions with status
  • Step Progress: Which steps completed
  • Logs: Console logs from function
  • Errors: Stack traces and error details
  • Duration: How long each step took
  • Retry Attempts: Number of retries per step

Access: https://app.inngest.com

Status Tracking

In Database:

// Add status field to Video model
model Video {
  // ... existing fields
  processingStatus String @default("pending") // pending, processing, completed, failed
}

// Update status throughout process
await prisma.video.update({
  where: { id: videoId },
  data: { processingStatus: 'processing' }
});

Polling Endpoint:

// GET /api/videos/:id/status
export async function GET(
  req: Request,
  { params }: { params: { id: string } }
) {
  const video = await prisma.video.findUnique({
    where: { id: params.id },
    select: {
      processingStatus: true,
      totalClips: true,
      _count: {
        select: { exportedClips: true }
      }
    }
  });

  return Response.json(video);
}

Environment Variables

# Inngest Configuration
INNGEST_EVENT_KEY=your-event-key-here
INNGEST_SIGNING_KEY=your-signing-key-here

# External API Configuration
CLIPS_API_URL=https://api.example.com/identify-clips
PROCESS_CLIPS_API_URL=https://api.example.com/process-clips
CLIPS_API_TOKEN=your-api-token-here

Development:

  • Event and signing keys optional for local dev
  • Inngest Dev Server runs locally
  • No external Inngest account needed

Production:

  • Event and signing keys required
  • Get keys from Inngest dashboard
  • Configure webhook endpoint: https://yourdomain.com/api/inngest

Error Handling

Automatic Retries

await step.run('api-call-with-retry', async () => {
  const response = await fetch(apiUrl, { method: 'POST', body: data });
  
  if (!response.ok) {
    throw new Error(`API failed: ${response.statusText}`);
  }
  
  return response.json();
});

Retry Behavior:

  • Step throws error → Inngest retries automatically
  • Exponential backoff: 1s, 2s, 4s, 8s, 16s, ...
  • Max retries configurable (default: 3)
  • After max retries, function marked as failed

Custom Error Handling

await step.run('process-with-fallback', async () => {
  try {
    return await primaryAPI.process(data);
  } catch (error) {
    console.error('Primary API failed:', error);
    
    // Try fallback
    try {
      return await fallbackAPI.process(data);
    } catch (fallbackError) {
      console.error('Fallback also failed:', fallbackError);
      throw new Error('Both APIs failed');
    }
  }
});

Partial Success Handling

await step.run('process-clips-individually', async () => {
  const results = [];
  const errors = [];

  for (const clip of clips) {
    try {
      const result = await processClip(clip);
      results.push(result);
    } catch (error) {
      errors.push({ clip, error });
    }
  }

  if (errors.length > 0) {
    console.warn('Some clips failed:', errors);
  }

  if (results.length === 0) {
    throw new Error('All clips failed to process');
  }

  return { results, errors };
});

This allows continuing with successful clips even if some fail.