Inngest Background Jobs
Serverless background job processing with Inngest
Inngest Background Jobs
The application uses Inngest for reliable, serverless background job processing. Inngest handles long-running tasks like video processing, clip identification, and export generation asynchronously without blocking API responses.
Inngest Client Configuration
Client Initialization
import { Inngest } from 'inngest';
export const inngest = new Inngest({
id: 'korai-app',
name: 'Korai App',
eventKey: process.env.INNGEST_EVENT_KEY // Optional: only needed for production
});
How It Works:
- Creates Inngest client for sending/receiving events
id
: Unique identifier for the applicationname
: Human-readable application nameeventKey
: Optional authentication key (required in production)
Development vs Production:
- Development: Works without keys (local dev mode)
- Production: Requires
INNGEST_EVENT_KEY
andINNGEST_SIGNING_KEY
API Route Handler
Inngest Serve Endpoint
import { serve } from 'inngest/next';
import { inngest } from '@/lib/inngest';
import { identifyClips, processClips } from '@/inngest/functions';
export const { GET, POST, PUT } = serve({
client: inngest,
functions: [identifyClips, processClips],
signingKey: process.env.INNGEST_SIGNING_KEY // Optional: only needed for production
});
Location: /app/api/inngest/route.ts
How It Works:
- Exports HTTP handlers (
GET
,POST
,PUT
) for Next.js App Router - Registers functions with Inngest platform
- Receives webhook events from Inngest
- Executes corresponding function when event triggered
Functions Registered:
identifyClips
: Identifies viral clips from YouTube videosprocessClips
: Processes and exports selected clips
Signing Key:
- Validates webhooks from Inngest
- Prevents unauthorized event execution
- Optional in development, required in production
Background Functions
Identify Clips Function
export const identifyClips = inngest.createFunction(
{ id: 'identify-clips', name: 'Identify Viral Clips from YouTube Video' },
{ event: 'video/identify.clips' },
async ({ event, step }) => {
const { youtubeUrl, prompt, userId } = event.data;
// Step 1: Generate UUID and create initial video record
const videoData = await step.run('create-video-record', async () => {
const videoUuid = uuidv4();
const s3Key = `youtube-videos/${videoUuid}/yt`;
const video = await prisma.video.create({
data: {
userId,
youtubeUrl,
s3Key,
prompt: prompt || ''
}
});
return { video, s3Key };
});
// Step 2: Call the backend API to identify clips
const clipsResponse = await step.run(
'call-identify-clips-api',
async () => {
const apiUrl = process.env.CLIPS_API_URL;
if (!apiUrl) {
throw new Error('CLIPS_API_URL environment variable is not set');
}
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.CLIPS_API_TOKEN}`
},
body: JSON.stringify({
youtube_url: youtubeUrl,
s3_key_yt: videoData.s3Key,
prompt: prompt || ''
})
});
if (!response.ok) {
throw new Error(`API request failed: ${response.statusText}`);
}
const data: IdentifyClipsResponse = await response.json();
return data;
}
);
// Step 3: Update video record with response data and create clips
await step.run('save-clips-to-database', async () => {
// Update video with metadata
await prisma.video.update({
where: { id: videoData.video.id },
data: {
totalClips:
typeof clipsResponse.total_clips === 'number'
? clipsResponse.total_clips
: parseInt(clipsResponse.total_clips) || 0,
videoDuration: clipsResponse.video_duration?.toString() || null,
detectedLanguage: clipsResponse.detected_language,
s3Path: clipsResponse.s3_path
}
});
// Create clips
if (
clipsResponse.identified_clips &&
clipsResponse.identified_clips.length > 0
) {
await prisma.clip.createMany({
data: clipsResponse.identified_clips.map((clip) => ({
videoId: videoData.video.id,
start: clip.start.toString(),
end: clip.end.toString(),
title: clip.title,
summary: clip.summary,
viralityScore: clip.virality_score.toString(),
relatedTopics: clip.related_topics,
transcript: clip.transcript
}))
});
}
});
return {
videoId: videoData.video.id,
s3Key: videoData.s3Key,
totalClips: clipsResponse.total_clips
};
}
);
Function Configuration:
- ID:
identify-clips
(unique function identifier) - Name:
Identify Viral Clips from YouTube Video
(display name) - Event: Triggered by
video/identify.clips
event
Event Data:
{
youtubeUrl: string;
prompt: string;
userId: string;
}
Process Flow:
Step 1: Create Video Record
- Generates UUID for unique video identifier
- Creates S3 key:
youtube-videos/{uuid}/yt
- Inserts video record in database with:
userId
: Owner of the videoyoutubeUrl
: Source video URLs3Key
: Storage locationprompt
: Custom instructions for clip identification
Step 2: Call Clips API
- POSTs to external clips identification service
- Sends:
youtube_url
: Video to processs3_key_yt
: Where to store downloaded videoprompt
: AI instructions for clip identification
- Authenticates with
Bearer
token - Returns clips data with timestamps, titles, summaries, virality scores
Step 3: Save Clips to Database
- Updates video record with metadata:
totalClips
: Number of clips foundvideoDuration
: Length of videodetectedLanguage
: Language detecteds3Path
: Full S3 path to video
- Creates clip records in batch using
createMany
:- Links each clip to video via
videoId
- Stores start/end times, title, summary
- Stores virality score and related topics
- Stores transcript segment
- Links each clip to video via
Return Value:
{
videoId: string;
s3Key: string;
totalClips: number;
}
Why Steps?:
- Each
step.run()
is atomic and retriable - If function fails, resumes from last completed step
- Prevents duplicate work on retry
- Provides progress visibility in Inngest dashboard
Process Clips Function
export const processClips = inngest.createFunction(
{ id: 'process-clips', name: 'Process and Export Viral Clips' },
{ event: 'clips/process' },
async ({ event, step }) => {
const {
videoId,
s3Key,
selectedClips,
targetLanguage,
aspectRatio,
userId
} = event.data;
// Step 1: Call the backend API to process clips
const processedResponse = await step.run(
'call-process-clips-api',
async () => {
const apiUrl = process.env.PROCESS_CLIPS_API_URL;
if (!apiUrl) {
throw new Error(
'PROCESS_CLIPS_API_URL environment variable is not set'
);
}
const payload: ProcessClipsPayload = {
s3_key: s3Key,
clips: selectedClips.map((clip: { start: string; end: string }) => ({
start: parseFloat(clip.start),
end: parseFloat(clip.end)
})),
prompt: '',
target_language: targetLanguage,
aspect_ratio: aspectRatio,
subtitles: true,
subtitle_customization: {
enabled: true,
position: 'middle',
font_size: 120,
font_family: 'Anton',
font_color: '#FFFFFF',
outline_color: '#000000',
outline_width: 2.5,
background_color: null,
background_opacity: 0.0,
shadow_enabled: true,
shadow_color: '#808080',
shadow_offset: 3.0,
max_words_per_line: 3,
margin_horizontal: 60,
margin_vertical: 180,
fade_in_duration: 0,
fade_out_duration: 0,
karaoke_enabled: true,
karaoke_highlight_color: '#0DE050',
karaoke_popup_scale: 1.25
}
};
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.CLIPS_API_TOKEN}`
},
body: JSON.stringify(payload)
});
if (!response.ok) {
throw new Error(`API request failed: ${response.statusText}`);
}
const data: ProcessClipsResponse = await response.json();
return data;
}
);
// Step 2: Save exported clips to database
await step.run('save-exported-clips-to-database', async () => {
if (
processedResponse.processed_clips &&
processedResponse.processed_clips.length > 0
) {
await prisma.exportedClip.createMany({
data: processedResponse.processed_clips.map((clip) => ({
videoId,
start: clip.start.toString(),
end: clip.end.toString(),
s3Key: clip.s3_key,
aspectRatio,
targetLanguage
}))
});
}
});
return {
videoId,
processedClips: processedResponse.processed_clips.length
};
}
);
Function Configuration:
- ID:
process-clips
(unique function identifier) - Name:
Process and Export Viral Clips
(display name) - Event: Triggered by
clips/process
event
Event Data:
{
videoId: string;
s3Key: string;
selectedClips: Array<{ start: string; end: string }>;
targetLanguage: string | null;
aspectRatio: string;
userId: string;
}
Process Flow:
Step 1: Call Process API
- POSTs to external video processing service
- Payload includes:
s3_key
: Original video locationclips
: Array of clip timestamps to extracttarget_language
: Translation language (null if none)aspect_ratio
: Output format (1:1, 16:9, 9:16)subtitles
: Enable subtitle burningsubtitle_customization
: Detailed subtitle styling:- Position: Subtitle placement (middle, top, bottom)
- Font: Family (Anton), size (120), color (#FFFFFF)
- Outline: Color (#000000), width (2.5)
- Shadow: Enabled, color (#808080), offset (3.0)
- Layout: Max words per line (3), margins (60, 180)
- Animation: Fade durations, karaoke effect
- Karaoke: Highlight color (#0DE050), popup scale (1.25)
- API processes video:
- Extracts clip segments
- Re-encodes to aspect ratio
- Burns subtitles with styling
- Translates if target language specified
- Uploads to S3
- Returns array of processed clips with S3 keys
Step 2: Save Exported Clips
- Creates
ExportedClip
records for each processed clip - Stores:
videoId
: Parent video referencestart
/end
: Original clip timestampss3Key
: Location of processed video fileaspectRatio
: Export formattargetLanguage
: Translation language (if any)
Return Value:
{
videoId: string;
processedClips: number;
}
Triggering Functions
Sending Events
From API Route:
import { inngest } from '@/lib/inngest';
export async function POST(req: Request) {
const { youtubeUrl, prompt } = await req.json();
const { userId } = await auth();
// Trigger identify clips function
await inngest.send({
name: 'video/identify.clips',
data: {
youtubeUrl,
prompt,
userId
}
});
return Response.json({
message: 'Video processing started',
status: 'pending'
});
}
Event Properties:
name
: Event identifier matching function triggerdata
: Payload passed to function
From Client Action:
export async function startClipProcessing(
videoId: string,
selectedClips: Clip[],
options: ProcessOptions
) {
const response = await fetch('/api/clips/process', {
method: 'POST',
body: JSON.stringify({
videoId,
selectedClips,
...options
})
});
// API route triggers Inngest event
return response.json();
}
Key Benefits
Reliability
- Automatic Retries: Failed steps retry with exponential backoff
- Durability: Events persisted, not lost if function crashes
- Idempotency: Steps won't re-execute if already completed
- Error Handling: Catches and logs errors with full context
Scalability
- Serverless: No servers to manage
- Auto-Scaling: Handles any event volume
- Concurrent Execution: Multiple functions run in parallel
- No Timeouts: Long-running jobs supported (hours)
Developer Experience
- Local Development: Works without external dependencies
- Dashboard: View function runs, logs, and errors
- Replay: Re-run failed functions from dashboard
- Debugging: Step-by-step execution visibility
Use Cases
- Video Processing: Long operations without API timeouts
- Batch Operations: Process many items asynchronously
- External API Calls: Retry on failure automatically
- Scheduled Tasks: Cron-like functionality
- Webhooks: Handle incoming webhooks reliably
Monitoring
Inngest Dashboard
View in Inngest dashboard:
- Function Runs: All executions with status
- Step Progress: Which steps completed
- Logs: Console logs from function
- Errors: Stack traces and error details
- Duration: How long each step took
- Retry Attempts: Number of retries per step
Access: https://app.inngest.com
Status Tracking
In Database:
// Add status field to Video model
model Video {
// ... existing fields
processingStatus String @default("pending") // pending, processing, completed, failed
}
// Update status throughout process
await prisma.video.update({
where: { id: videoId },
data: { processingStatus: 'processing' }
});
Polling Endpoint:
// GET /api/videos/:id/status
export async function GET(
req: Request,
{ params }: { params: { id: string } }
) {
const video = await prisma.video.findUnique({
where: { id: params.id },
select: {
processingStatus: true,
totalClips: true,
_count: {
select: { exportedClips: true }
}
}
});
return Response.json(video);
}
Environment Variables
# Inngest Configuration
INNGEST_EVENT_KEY=your-event-key-here
INNGEST_SIGNING_KEY=your-signing-key-here
# External API Configuration
CLIPS_API_URL=https://api.example.com/identify-clips
PROCESS_CLIPS_API_URL=https://api.example.com/process-clips
CLIPS_API_TOKEN=your-api-token-here
Development:
- Event and signing keys optional for local dev
- Inngest Dev Server runs locally
- No external Inngest account needed
Production:
- Event and signing keys required
- Get keys from Inngest dashboard
- Configure webhook endpoint:
https://yourdomain.com/api/inngest
Error Handling
Automatic Retries
await step.run('api-call-with-retry', async () => {
const response = await fetch(apiUrl, { method: 'POST', body: data });
if (!response.ok) {
throw new Error(`API failed: ${response.statusText}`);
}
return response.json();
});
Retry Behavior:
- Step throws error → Inngest retries automatically
- Exponential backoff: 1s, 2s, 4s, 8s, 16s, ...
- Max retries configurable (default: 3)
- After max retries, function marked as failed
Custom Error Handling
await step.run('process-with-fallback', async () => {
try {
return await primaryAPI.process(data);
} catch (error) {
console.error('Primary API failed:', error);
// Try fallback
try {
return await fallbackAPI.process(data);
} catch (fallbackError) {
console.error('Fallback also failed:', fallbackError);
throw new Error('Both APIs failed');
}
}
});
Partial Success Handling
await step.run('process-clips-individually', async () => {
const results = [];
const errors = [];
for (const clip of clips) {
try {
const result = await processClip(clip);
results.push(result);
} catch (error) {
errors.push({ clip, error });
}
}
if (errors.length > 0) {
console.warn('Some clips failed:', errors);
}
if (results.length === 0) {
throw new Error('All clips failed to process');
}
return { results, errors };
});
This allows continuing with successful clips even if some fail.