StreetJS provides three complementary systems for background work:
System
Use Case
JobQueue + @Job
Durable task queue backed by PostgreSQL, with retries and DLQ
CronScheduler
Cron expression-based recurring tasks
WorkflowEngine
Multi-step saga workflows with compensation and distributed locking
All three systems use the same PostgreSQL pool — no Redis or external broker required.
JobQueue
Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import{JobQueue,Job,STREET_JOBS_MIGRATION_SQL,STREET_DLQ_MIGRATION_SQL,}from'streetjs';// Run migrations onceawaitpool.query(STREET_JOBS_MIGRATION_SQL);awaitpool.query(STREET_DLQ_MIGRATION_SQL);constqueue=newJobQueue(pool,{pollIntervalMs:1000,// Poll every secondconcurrency:5,// Process up to 5 jobs simultaneouslymaxRetries:3,// Retry failed jobs up to 3 times});
Defining Jobs with @Job
Use the @Job decorator to declare a job handler class. The first argument is the job name — used to route jobs to the correct handler.
1
2
3
4
5
6
7
8
9
10
11
import{Job,typeJobContext}from'streetjs';@Job('send-email')classSendEmailJob{asyncrun(data:{to:string;subject:string;body:string},ctx:JobContext):Promise<void>{// ctx.attempt — current attempt number (1-based)// ctx.jobId — unique job IDconsole.log(`Sending email to ${data.to} (attempt ${ctx.attempt})`);awaitsendEmailViaProvider(data.to,data.subject,data.body);}}
Enqueuing Jobs
1
2
3
4
5
6
7
8
9
10
11
// Enqueue a job to run immediatelyawaitqueue.enqueue('send-email',{to:'user@example.com',subject:'Welcome!',body:'Thanks for signing up.',});// Enqueue with a delayawaitqueue.enqueue('send-email',{to:'...'},{runAt:newDate(Date.now()+60_000),// Run in 60 seconds});
Starting the Worker
1
2
3
4
5
6
7
8
9
// Register handlers and start pollingqueue.register(newSendEmailJob());awaitqueue.start();// Graceful shutdownprocess.on('SIGTERM',async ()=>{awaitqueue.stop();// Drains in-flight jobs before shutting downprocess.exit(0);});
DLQ Configuration
Failed jobs that exceed maxRetries are moved to the Dead Letter Queue (DLQ). Configure a DLQ handler to alert or archive them:
1
2
3
4
queue.onDeadLetter(async (job)=>{console.error('Job failed permanently:',job.name,job.data,job.error);// Send to Slack, PagerDuty, etc.});
import{CronScheduler,CronParseError}from'streetjs';constcron=newCronScheduler();// Run every hourcron.schedule('0 * * * *',async ()=>{console.log('Hourly cleanup task running');awaitcleanupExpiredSessions(pool);});// Run at midnight on the 1st of every monthcron.schedule('0 0 1 * *',async ()=>{awaitgenerateMonthlyReport(pool);});// Run every 5 minutescron.schedule('*/5 * * * *',async ()=>{awaitpool.query('REFRESH MATERIALIZED VIEW CONCURRENTLY analytics_summary');});cron.start();// Graceful shutdownprocess.on('SIGTERM',()=>cron.stop());
Parsing errors throw CronParseError at schedule() call time, so misconfigured crons are caught immediately on startup.
WorkflowEngine
WorkflowEngine implements the Saga pattern for multi-step processes that need rollback on failure.
import{typeWorkflowStep,typeWorkflowContext}from'streetjs';engine.define('user-onboarding',[{name:'create-account',asyncrun(input:{email:string;name:string},ctx:WorkflowContext){constuserId=awaitcreateUser(input.email,input.name);return{userId};},asynccompensate(output:{userId:string}){awaitdeleteUser(output.userId);// Roll back if a later step fails},},{name:'send-welcome-email',asyncrun(input:{userId:string}){awaitsendWelcomeEmail(input.userId);returninput;},timeoutMs:10_000,// Step-level timeout},{name:'provision-workspace',asyncrun(input:{userId:string}){constworkspaceId=awaitcreateWorkspace(input.userId);return{userId:input.userId,workspaceId};},asynccompensate(output:{workspaceId:string}){awaitdeleteWorkspace(output.workspaceId);},},]);
If a process crashes mid-workflow, you can resume from the last completed step:
1
2
awaitengine.resume(workflowId);// Picks up from `current_step` stored in the database
Distributed Locking
WorkflowEngine.resume() automatically acquires a PostgreSQL advisory lock (pg_try_advisory_lock) keyed to workflow:<workflowId> before executing any steps. This prevents two workers from executing the same workflow simultaneously in a multi-instance deployment. The lock is held for up to 30 seconds and released in a finally block.