Skip to main content

Events (Workqueue)

Use when: exactly one consumer instance should handle each message (background jobs, billing, anything that shouldn't run twice). You get: at-least-once delivery, retries on throw, and a dead-letter queue when retries run out.

Workqueue events are fire-and-forget messages where exactly one handler instance processes each message. This is the default event delivery model. Contrast with broadcast (all instances receive every message) and ordered events (strict sequential replay).

When to use

Imagine an e-commerce system: an order is created, and you need to send a confirmation email, update inventory, and notify the warehouse. Each of these tasks should happen once — you don't want three instances of the email service each sending the same confirmation.

Workqueue events solve this. When you publish an event, NATS JetStream delivers it to a single consumer in the group. If that consumer fails, the message is redelivered to another instance. If all retries are exhausted, the message is routed to a dead letter queue for investigation.

How it works

The workqueue flow, step by step:

  1. Publish — a service calls client.emit('order.created', data).
  2. Route — the transport publishes to the JetStream subject {service}__microservice.ev.order.created.
  3. Stream — the message is persisted in the service's event stream (workqueue retention).
  4. Consume — one durable pull consumer picks up the message from the stream.
  5. Dispatch — the EventRouter decodes the payload and invokes the matching @EventPattern handler.
  6. Acknowledge — on success, the message is ack'd and removed from the stream. On failure, it is nak'd for redelivery.

Because the stream uses workqueue retention, a message is automatically deleted once acknowledged — keeping the stream compact.

Handlers run concurrently via RxJS mergeMap, bounded by the consumer's max_ack_pending (default: 100). Multiple messages can be in-flight at once.

Code examples

Sending events

Use the standard NestJS ClientProxy.emit() method. No prefix is needed for workqueue events.

src/orders/orders.service.ts
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';

@Injectable()
export class OrdersService {
constructor(
@Inject('orders') private readonly client: ClientProxy,
) {}

async createOrder(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepository.save(dto);

// Fire-and-forget: at-least-once delivery via JetStream
await lastValueFrom(
this.client.emit('order.created', {
orderId: order.id,
customerId: order.customerId,
total: order.total,
}),
);

return order;
}
}

Handling events

Use @EventPattern with no extras. The handler return value is ignored.

src/notifications/notifications.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';

@Controller()
export class NotificationsController {
private readonly logger = new Logger(NotificationsController.name);

@EventPattern('order.created')
async handleOrderCreated(
@Payload() data: { orderId: number; customerId: string; total: number },
): Promise<void> {
this.logger.log(`Sending confirmation for order ${data.orderId}`);
await this.emailService.sendOrderConfirmation(data);
}
}

If handleOrderCreated throws, the message is automatically nak'd and redelivered. No try/catch needed — the transport handles retries.

Consumer naming and @EventPattern extras

At-least-once delivery is on by default — every event handler registered with @EventPattern is automatically backed by a durable JetStream pull consumer with explicit ack. You don't enable it; you configure it.

Where the durable consumer name comes from

The library does not take a per-handler durable name. Instead, one durable consumer per service per stream kind serves every matching @EventPattern in the module. The name is derived from the name you pass to JetstreamModule.forRoot():

JetstreamModule.forRoot({
name: 'orders', // ← this is the service name
servers: ['nats://localhost:4222'],
});
Stream kindGenerated durable consumer name
Workqueue eventsorders__microservice_ev-consumer
Broadcastorders__microservice_broadcast-consumer
Orderedorders__microservice_ordered-consumer

All @EventPattern handlers in the same orders service share the workqueue consumer above. JetStream load-balances across replicas of the same service via that single durable consumer, which is exactly what at-least-once delivery requires — multiple replicas, one cursor, ack semantics enforced server-side.

If you need a different consumer for a specific subject, that's done by giving it its own service, not by overriding the decorator.

Decorator-level options (@EventPattern extras)

The second argument to @EventPattern is the only thing you set per-handler:

@EventPattern('user.created', { broadcast: true })
async onUserCreated(@Payload() user: User) { /* ... */ }

@EventPattern('order.status', { ordered: true })
async onOrderStatus(@Payload() data: OrderStatus) { /* ... */ }

@EventPattern('settings.changed', { meta: { tier: 'critical' } })
async onSettingsChanged(@Payload() s: Settings) { /* ... */ }
ExtraTypeEffect
broadcastbooleanRoutes the handler to the shared broadcast stream — every replica processes every message. See Broadcast.
orderedbooleanBacks the handler with an ordered consumer for strict per-key delivery.
metaRecord<string, unknown>Free-form metadata published to the handler metadata registry.

There is no durable, ackWait, or maxDeliver on the decorator — those are stream-and-consumer-level concerns, configured once on the module under Custom configuration below.

Delivery semantics

The transport uses explicit acknowledgement. The outcome depends on what happens during message processing:

ScenarioActionEffect
Handler succeedsackMessage removed from stream
Handler throws an errornakMessage redelivered after backoff
Payload cannot be decodedtermMessage terminated (no retry)
No handler registered for subjecttermMessage terminated (no retry)
Max deliveries exhaustedtermDead letter callback invoked, then terminated
Decode errors are terminal

If the codec cannot deserialize a message (e.g., corrupted data, schema mismatch), the message is immediately terminated with term(). Retrying would produce the same error, so the transport avoids wasting delivery attempts.

Retry flow

When a handler throws an error, the following retry sequence occurs:

  1. The message is nak'd, signaling NATS to redeliver it.
  2. NATS redelivers the message to an available consumer instance.
  3. If the handler fails again, step 1-2 repeat up to max_deliver times (default: 3).
  4. On the final delivery attempt, if the handler still fails, the transport detects that deliveryCount >= max_deliver and treats the message as a dead letter.
  5. The onDeadLetter callback is invoked (if configured), then the message is terminated with term().

If the onDeadLetter callback itself fails, the message is nak'd one more time, giving the dead letter hook another chance on the next cycle.

For full details on dead letter handling, see the Dead Letter Queue guide.

Idempotency

Because the transport provides at-least-once delivery, a handler may receive the same message more than once — for example, if the service crashes after processing but before acknowledging. Your handlers must be idempotent: processing the same message twice should produce the same result.

Practical patterns

Database upsert — use a unique constraint or ON CONFLICT clause so re-processing the same event doesn't create duplicates:

@EventPattern('order.created')
async handleOrderCreated(@Payload() data: OrderCreatedEvent): Promise<void> {
// Upsert: if this orderId already exists, the insert is a no-op
await this.db.query(
`INSERT INTO processed_orders (order_id, status)
VALUES ($1, $2)
ON CONFLICT (order_id) DO NOTHING`,
[data.orderId, 'confirmed'],
);
}

Idempotency key — track processed message IDs in a cache or database:

@EventPattern('payment.completed')
async handlePayment(@Payload() data: PaymentEvent, @Ctx() ctx: RpcContext): Promise<void> {
// Stream sequence is guaranteed unique within a stream — perfect dedup key
const dedupKey = `payment:${ctx.getStream()}:${ctx.getSequence()}`;

if (await this.cache.exists(dedupKey)) {
return; // Already processed
}

await this.processPayment(data);
await this.cache.set(dedupKey, '1', { ttl: 86400 });
}

See Handler Context for all typed accessors available on RpcContext.

Message deduplication

NATS JetStream has built-in publish-side deduplication. If two messages with the same messageId arrive within the stream's duplicate_window, the second publish is silently dropped.

Use JetstreamRecordBuilder to set a deterministic message ID:

import { JetstreamRecordBuilder } from '@horizon-republic/nestjs-jetstream';
import { lastValueFrom } from 'rxjs';

const record = new JetstreamRecordBuilder({
orderId: order.id,
total: order.total,
})
.setMessageId(`order-created-${order.id}`)
.build();

await lastValueFrom(this.client.emit('order.created', record));

This prevents duplicate publishes in scenarios like:

  • The publisher retries after a network timeout (but the first publish actually succeeded).
  • A controller endpoint is called twice with the same data.

The default duplicate_window is 2 minutes — messages with the same ID published within that window are deduplicated. To extend it, override events.stream.duplicate_window in forRoot() (e.g. duplicate_window: toNanos(5, 'minutes')). See Custom configuration for the full override pattern.

When no message ID is set explicitly, the transport generates a random UUID for each publish — meaning no deduplication occurs by default. Always set a deterministic message ID when duplicate publishes are a concern.

Custom configuration

Stream overrides

The event stream is created with sensible defaults. Override them in forRoot() under events.stream:

JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
events: {
stream: {
max_age: toNanos(14, 'days'), // 14 days instead of 7
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB instead of 5 GB
duplicate_window: toNanos(5, 'minutes'), // 5 min dedup window instead of 2 min
},
},
}),

Consumer overrides

Override consumer settings under events.consumer:

JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
events: {
consumer: {
max_deliver: 5, // 5 retries instead of 3
ack_wait: toNanos(30, 'seconds'), // 30s ack timeout instead of 10s
max_ack_pending: 50, // Limit in-flight messages to 50
},
},
}),
When to increase ack_wait

If your handler calls a slow external API (e.g., sending emails, processing payments), increase ack_wait so that NATS doesn't redeliver the message before your handler finishes. The default is 10 seconds — long-running handlers may need 30s or more. For unbounded processing, consider ack extension instead.

When to increase max_deliver

The default of 3 delivery attempts works well for transient errors (network blips, temporary database unavailability). Increase it to 5 or higher if your handlers interact with unreliable external services where intermittent failures are common.

Default values — the ones you'll actually tune

SettingDefaultWhy it matters
max_deliver3How many retry attempts before the message is marked dead
ack_wait10 secondsTime a handler has to ack before NATS redelivers
max_ack_pending100In-flight cap — primary backpressure control
max_age7 daysHow long events live in the stream before being purged
duplicate_window2 minutesDedup window for setMessageId()

See Default Configs — Event Stream and Event Consumer for the complete list of stream and consumer defaults.

Error handling

When a handler throws, the transport automatically nak's the message for redelivery. For most cases, this is all you need. However, some errors are non-recoverable — retrying will never succeed. For these, use ctx.terminate() to permanently discard the message.

src/orders/orders.controller.ts
import { Controller, Logger } from '@nestjs/common';
import { Ctx, EventPattern, Payload } from '@nestjs/microservices';
import { RpcContext } from '@horizon-republic/nestjs-jetstream';

@Controller()
export class OrdersController {
private readonly logger = new Logger(OrdersController.name);

@EventPattern('order.process')
async handleOrder(
@Payload() data: OrderPayload,
@Ctx() ctx: RpcContext,
): Promise<void> {
try {
await this.ordersService.process(data);
// Success — ack is called automatically by the transport
} catch (error) {
if (this.isNonRecoverable(error)) {
// Non-recoverable: invalid payload, business rule violation, etc.
// ctx.terminate() prevents redelivery — the message is permanently discarded.
const reason = error instanceof Error ? error.message : String(error);
ctx.terminate('Non-recoverable: ' + reason);
this.logger.error(`Permanently discarding order`, error);
return;
}

// Recoverable errors (DB timeout, external API down, etc.):
// Re-throw — the transport calls nak automatically,
// triggering redelivery after ack_wait.
// After max_deliver attempts, onDeadLetter is invoked.
throw error;
}
}

private isNonRecoverable(error: unknown): boolean {
return error instanceof ValidationError
|| error instanceof BusinessRuleViolation;
}
}

Message settlement outcomes

Every message ends in one of three states:

OutcomeWhenEffect
auto-ack (no action)Handler succeedsMessage removed from stream. The transport calls ack() on the underlying message when the handler returns successfully.
ctx.retry()Recoverable errorMessage redelivered (optionally with { delayMs } delay). The transport applies the same retry semantics automatically when a handler throws, so you only need to call ctx.retry() manually when you want to return early without raising an exception.
ctx.terminate(reason)Non-recoverable errorMessage permanently discarded. Must be called manually in the handler.

Relationship with dead letter handling

When a message is nak'd repeatedly and reaches the max_deliver limit (default: 3), the transport treats it as a dead letter:

  1. The onDeadLetter callback is invoked (if configured) with full message context.
  2. The message is terminated with term().

This means ctx.terminate() is for errors you know will never succeed (validation failures, schema mismatches), while throw is for errors that might succeed on retry (timeouts, temporary unavailability). For messages that exhaust all retries, the dead letter mechanism provides a safety net.

See the Dead Letter Queue guide for how to configure and handle dead letters.

What's next?