Events (Workqueue)
Use when: exactly one consumer instance should handle each message (background jobs, billing, anything that shouldn't run twice). You get: at-least-once delivery, retries on throw, and a dead-letter queue when retries run out.
Workqueue events are fire-and-forget messages where exactly one handler instance processes each message. This is the default event delivery model. Contrast with broadcast (all instances receive every message) and ordered events (strict sequential replay).
When to use
Imagine an e-commerce system: an order is created, and you need to send a confirmation email, update inventory, and notify the warehouse. Each of these tasks should happen once — you don't want three instances of the email service each sending the same confirmation.
Workqueue events solve this. When you publish an event, NATS JetStream delivers it to a single consumer in the group. If that consumer fails, the message is redelivered to another instance. If all retries are exhausted, the message is routed to a dead letter queue for investigation.
How it works
The workqueue flow, step by step:
- Publish — a service calls
client.emit('order.created', data). - Route — the transport publishes to the JetStream subject
{service}__microservice.ev.order.created. - Stream — the message is persisted in the service's event stream (workqueue retention).
- Consume — one durable pull consumer picks up the message from the stream.
- Dispatch — the
EventRouterdecodes the payload and invokes the matching@EventPatternhandler. - Acknowledge — on success, the message is
ack'd and removed from the stream. On failure, it isnak'd for redelivery.
Because the stream uses workqueue retention, a message is automatically deleted once acknowledged — keeping the stream compact.
Handlers run concurrently via RxJS mergeMap, bounded by the consumer's max_ack_pending (default: 100). Multiple messages can be in-flight at once.
Code examples
Sending events
Use the standard NestJS ClientProxy.emit() method. No prefix is needed for workqueue events.
import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';
@Injectable()
export class OrdersService {
constructor(
@Inject('orders') private readonly client: ClientProxy,
) {}
async createOrder(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepository.save(dto);
// Fire-and-forget: at-least-once delivery via JetStream
await lastValueFrom(
this.client.emit('order.created', {
orderId: order.id,
customerId: order.customerId,
total: order.total,
}),
);
return order;
}
}
Handling events
Use @EventPattern with no extras. The handler return value is ignored.
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
@Controller()
export class NotificationsController {
private readonly logger = new Logger(NotificationsController.name);
@EventPattern('order.created')
async handleOrderCreated(
@Payload() data: { orderId: number; customerId: string; total: number },
): Promise<void> {
this.logger.log(`Sending confirmation for order ${data.orderId}`);
await this.emailService.sendOrderConfirmation(data);
}
}
If handleOrderCreated throws, the message is automatically nak'd and redelivered. No try/catch needed — the transport handles retries.
Consumer naming and @EventPattern extras
At-least-once delivery is on by default — every event handler registered with @EventPattern is automatically backed by a durable JetStream pull consumer with explicit ack. You don't enable it; you configure it.
Where the durable consumer name comes from
The library does not take a per-handler durable name. Instead, one durable consumer per service per stream kind serves every matching @EventPattern in the module. The name is derived from the name you pass to JetstreamModule.forRoot():
JetstreamModule.forRoot({
name: 'orders', // ← this is the service name
servers: ['nats://localhost:4222'],
});
| Stream kind | Generated durable consumer name |
|---|---|
| Workqueue events | orders__microservice_ev-consumer |
| Broadcast | orders__microservice_broadcast-consumer |
| Ordered | orders__microservice_ordered-consumer |
All @EventPattern handlers in the same orders service share the workqueue consumer above. JetStream load-balances across replicas of the same service via that single durable consumer, which is exactly what at-least-once delivery requires — multiple replicas, one cursor, ack semantics enforced server-side.
If you need a different consumer for a specific subject, that's done by giving it its own service, not by overriding the decorator.
Decorator-level options (@EventPattern extras)
The second argument to @EventPattern is the only thing you set per-handler:
@EventPattern('user.created', { broadcast: true })
async onUserCreated(@Payload() user: User) { /* ... */ }
@EventPattern('order.status', { ordered: true })
async onOrderStatus(@Payload() data: OrderStatus) { /* ... */ }
@EventPattern('settings.changed', { meta: { tier: 'critical' } })
async onSettingsChanged(@Payload() s: Settings) { /* ... */ }
| Extra | Type | Effect |
|---|---|---|
broadcast | boolean | Routes the handler to the shared broadcast stream — every replica processes every message. See Broadcast. |
ordered | boolean | Backs the handler with an ordered consumer for strict per-key delivery. |
meta | Record<string, unknown> | Free-form metadata published to the handler metadata registry. |
There is no durable, ackWait, or maxDeliver on the decorator — those are stream-and-consumer-level concerns, configured once on the module under Custom configuration below.
Delivery semantics
The transport uses explicit acknowledgement. The outcome depends on what happens during message processing:
| Scenario | Action | Effect |
|---|---|---|
| Handler succeeds | ack | Message removed from stream |
| Handler throws an error | nak | Message redelivered after backoff |
| Payload cannot be decoded | term | Message terminated (no retry) |
| No handler registered for subject | term | Message terminated (no retry) |
| Max deliveries exhausted | term | Dead letter callback invoked, then terminated |
If the codec cannot deserialize a message (e.g., corrupted data, schema mismatch), the message is immediately terminated with term(). Retrying would produce the same error, so the transport avoids wasting delivery attempts.
Retry flow
When a handler throws an error, the following retry sequence occurs:
- The message is
nak'd, signaling NATS to redeliver it. - NATS redelivers the message to an available consumer instance.
- If the handler fails again, step 1-2 repeat up to
max_delivertimes (default: 3). - On the final delivery attempt, if the handler still fails, the transport detects that
deliveryCount >= max_deliverand treats the message as a dead letter. - The
onDeadLettercallback is invoked (if configured), then the message is terminated withterm().
If the onDeadLetter callback itself fails, the message is nak'd one more time, giving the dead letter hook another chance on the next cycle.
For full details on dead letter handling, see the Dead Letter Queue guide.
Idempotency
Because the transport provides at-least-once delivery, a handler may receive the same message more than once — for example, if the service crashes after processing but before acknowledging. Your handlers must be idempotent: processing the same message twice should produce the same result.
Practical patterns
Database upsert — use a unique constraint or ON CONFLICT clause so re-processing the same event doesn't create duplicates:
@EventPattern('order.created')
async handleOrderCreated(@Payload() data: OrderCreatedEvent): Promise<void> {
// Upsert: if this orderId already exists, the insert is a no-op
await this.db.query(
`INSERT INTO processed_orders (order_id, status)
VALUES ($1, $2)
ON CONFLICT (order_id) DO NOTHING`,
[data.orderId, 'confirmed'],
);
}
Idempotency key — track processed message IDs in a cache or database:
@EventPattern('payment.completed')
async handlePayment(@Payload() data: PaymentEvent, @Ctx() ctx: RpcContext): Promise<void> {
// Stream sequence is guaranteed unique within a stream — perfect dedup key
const dedupKey = `payment:${ctx.getStream()}:${ctx.getSequence()}`;
if (await this.cache.exists(dedupKey)) {
return; // Already processed
}
await this.processPayment(data);
await this.cache.set(dedupKey, '1', { ttl: 86400 });
}
See Handler Context for all typed accessors available on RpcContext.
Message deduplication
NATS JetStream has built-in publish-side deduplication. If two messages with the same messageId arrive within the stream's duplicate_window, the second publish is silently dropped.
Use JetstreamRecordBuilder to set a deterministic message ID:
import { JetstreamRecordBuilder } from '@horizon-republic/nestjs-jetstream';
import { lastValueFrom } from 'rxjs';
const record = new JetstreamRecordBuilder({
orderId: order.id,
total: order.total,
})
.setMessageId(`order-created-${order.id}`)
.build();
await lastValueFrom(this.client.emit('order.created', record));
This prevents duplicate publishes in scenarios like:
- The publisher retries after a network timeout (but the first publish actually succeeded).
- A controller endpoint is called twice with the same data.
The default duplicate_window is 2 minutes — messages with the same ID published within that window are deduplicated. To extend it, override events.stream.duplicate_window in forRoot() (e.g. duplicate_window: toNanos(5, 'minutes')). See Custom configuration for the full override pattern.
When no message ID is set explicitly, the transport generates a random UUID for each publish — meaning no deduplication occurs by default. Always set a deterministic message ID when duplicate publishes are a concern.
Custom configuration
Stream overrides
The event stream is created with sensible defaults. Override them in forRoot() under events.stream:
JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
events: {
stream: {
max_age: toNanos(14, 'days'), // 14 days instead of 7
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB instead of 5 GB
duplicate_window: toNanos(5, 'minutes'), // 5 min dedup window instead of 2 min
},
},
}),
Consumer overrides
Override consumer settings under events.consumer:
JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
events: {
consumer: {
max_deliver: 5, // 5 retries instead of 3
ack_wait: toNanos(30, 'seconds'), // 30s ack timeout instead of 10s
max_ack_pending: 50, // Limit in-flight messages to 50
},
},
}),
If your handler calls a slow external API (e.g., sending emails, processing payments), increase ack_wait so that NATS doesn't redeliver the message before your handler finishes. The default is 10 seconds — long-running handlers may need 30s or more. For unbounded processing, consider ack extension instead.
The default of 3 delivery attempts works well for transient errors (network blips, temporary database unavailability). Increase it to 5 or higher if your handlers interact with unreliable external services where intermittent failures are common.
Default values — the ones you'll actually tune
| Setting | Default | Why it matters |
|---|---|---|
max_deliver | 3 | How many retry attempts before the message is marked dead |
ack_wait | 10 seconds | Time a handler has to ack before NATS redelivers |
max_ack_pending | 100 | In-flight cap — primary backpressure control |
max_age | 7 days | How long events live in the stream before being purged |
duplicate_window | 2 minutes | Dedup window for setMessageId() |
See Default Configs — Event Stream and Event Consumer for the complete list of stream and consumer defaults.
Error handling
When a handler throws, the transport automatically nak's the message for redelivery. For most cases, this is all you need. However, some errors are non-recoverable — retrying will never succeed. For these, use ctx.terminate() to permanently discard the message.
import { Controller, Logger } from '@nestjs/common';
import { Ctx, EventPattern, Payload } from '@nestjs/microservices';
import { RpcContext } from '@horizon-republic/nestjs-jetstream';
@Controller()
export class OrdersController {
private readonly logger = new Logger(OrdersController.name);
@EventPattern('order.process')
async handleOrder(
@Payload() data: OrderPayload,
@Ctx() ctx: RpcContext,
): Promise<void> {
try {
await this.ordersService.process(data);
// Success — ack is called automatically by the transport
} catch (error) {
if (this.isNonRecoverable(error)) {
// Non-recoverable: invalid payload, business rule violation, etc.
// ctx.terminate() prevents redelivery — the message is permanently discarded.
const reason = error instanceof Error ? error.message : String(error);
ctx.terminate('Non-recoverable: ' + reason);
this.logger.error(`Permanently discarding order`, error);
return;
}
// Recoverable errors (DB timeout, external API down, etc.):
// Re-throw — the transport calls nak automatically,
// triggering redelivery after ack_wait.
// After max_deliver attempts, onDeadLetter is invoked.
throw error;
}
}
private isNonRecoverable(error: unknown): boolean {
return error instanceof ValidationError
|| error instanceof BusinessRuleViolation;
}
}
Message settlement outcomes
Every message ends in one of three states:
| Outcome | When | Effect |
|---|---|---|
| auto-ack (no action) | Handler succeeds | Message removed from stream. The transport calls ack() on the underlying message when the handler returns successfully. |
ctx.retry() | Recoverable error | Message redelivered (optionally with { delayMs } delay). The transport applies the same retry semantics automatically when a handler throws, so you only need to call ctx.retry() manually when you want to return early without raising an exception. |
ctx.terminate(reason) | Non-recoverable error | Message permanently discarded. Must be called manually in the handler. |
Relationship with dead letter handling
When a message is nak'd repeatedly and reaches the max_deliver limit (default: 3), the transport treats it as a dead letter:
- The
onDeadLettercallback is invoked (if configured) with full message context. - The message is terminated with
term().
This means ctx.terminate() is for errors you know will never succeed (validation failures, schema mismatches), while throw is for errors that might succeed on retry (timeouts, temporary unavailability). For messages that exhaust all retries, the dead letter mechanism provides a safety net.
See the Dead Letter Queue guide for how to configure and handle dead letters.
What's next?
- Broadcast Events — fan-out delivery to all service instances
- Dead Letter Queue — handle messages that exhaust all retries
- Record Builder — attach custom headers and message IDs
- Lifecycle Hooks — observe transport events like dead letters and message routing
- Performance Tuning — concurrency, ack extension, and backpressure
- Troubleshooting — diagnosing delivery and redelivery issues