Skip to main content

Handler Context

Every @EventPattern and @MessagePattern handler can inject RpcContext to access message metadata, JetStream delivery info, and control message settlement. This works identically for both event and RPC handlers.

Injecting the context

Use the standard NestJS @Ctx() decorator:

import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { RpcContext } from '@horizon-republic/nestjs-jetstream';

@Controller()
export class OrdersController {
@EventPattern('order.created')
async handleOrderCreated(
@Payload() data: OrderCreatedDto,
@Ctx() ctx: RpcContext,
) {
const subject = ctx.getSubject();
const tenant = ctx.getHeader('x-tenant');
// ...
}
}

Methods reference

Message accessors

MethodReturn typeDescription
getSubject()stringThe NATS subject this message was published to
getHeader(key)string | undefinedValue of a single header, or undefined if missing
getHeaders()MsgHdrs | undefinedAll NATS message headers (the raw NATS MsgHdrs object from @nats-io/transport-node)
isJetStream()booleanType guard — returns true when the message is a JetStream message
getMessage()JsMsg | MsgThe raw NATS message (type depends on transport mode)

JetStream metadata

Since v2.7.0

These return undefined for Core NATS messages — no type guard needed.

MethodReturn typeDescription
getDeliveryCount()number | undefinedHow many times this message has been delivered
getStream()string | undefinedThe JetStream stream name
getSequence()number | undefinedStream sequence number
getTimestamp()Date | undefinedMessage publish timestamp
getCallerName()string | undefinedName of the service that sent the message

Settlement actions

Since v2.7.0

Control how the transport acknowledges the message — without throwing errors.

MethodEffectUse case
ctx.retry({ delayMs? })msg.nak(delayMs) — redeliverBusiness-level retry (external service unavailable, resource locked)
ctx.terminate(reason?)msg.term(reason) — permanent rejectMessage no longer relevant (order cancelled, entity deleted)
(no action)msg.ack() — acknowledgeSuccessful processing (default)

JetStream metadata

Access delivery info directly without type narrowing:

@EventPattern('order.created')
async handle(@Payload() data: OrderCreatedDto, @Ctx() ctx: RpcContext) {
console.log('Attempt:', ctx.getDeliveryCount()); // 1, 2, 3...
console.log('Stream:', ctx.getStream()); // 'orders__microservice_ev-stream'
console.log('Sequence:', ctx.getSequence()); // 42
console.log('Published:', ctx.getTimestamp()); // Date object
console.log('From:', ctx.getCallerName()); // 'api-gateway__microservice'
}

Use getDeliveryCount() for fallback logic on retries:

@EventPattern('payment.process')
async handle(@Payload() data: PaymentDto, @Ctx() ctx: RpcContext) {
if (ctx.getDeliveryCount()! >= 3) {
// 3rd attempt — try a different payment provider
await this.fallbackProvider.process(data);
return;
}

await this.primaryProvider.process(data);
}

Controlling message settlement

By default, the transport automatically acks on success and naks on error. Use retry() and terminate() to override this without throwing:

Business retry

When conditions aren't right to process the message now, but will be later:

@EventPattern('order.fulfill')
async handle(@Payload() data: FulfillDto, @Ctx() ctx: RpcContext) {
if (!await this.inventoryService.isAvailable()) {
ctx.retry({ delayMs: 30_000 }); // try again in 30 seconds
return;
}

await this.fulfillOrder(data);
// auto-ack — no flags set
}

Without delayMs, the message is redelivered immediately:

ctx.retry(); // immediate redelivery

Terminate

When the message is no longer relevant and should not be retried or sent to DLQ:

@EventPattern('order.process')
async handle(@Payload() data: OrderDto, @Ctx() ctx: RpcContext) {
const order = await this.orderService.find(data.orderId);

if (order.status === 'cancelled') {
ctx.terminate('Order already cancelled');
return;
}

await this.process(order);
}

Settlement decision flow

Mutual exclusivity

retry() and terminate() cannot both be called in the same handler — the second call throws an Error. Choose one intent per message.

Scope

Settlement actions only affect JetStream event handlers (workqueue and broadcast). They have no effect on ordered events (auto-acknowledged) or RPC handlers (separate settlement logic).

Extracting custom headers

Headers set via JetstreamRecordBuilder.setHeader() are available through getHeader():

Publisher
const record = new JetstreamRecordBuilder(data)
.setHeader('x-tenant', 'acme')
.setHeader('x-trace-id', crypto.randomUUID())
.build();

await lastValueFrom(this.client.emit('order.created', record));
Handler
@EventPattern('order.created')
async handle(@Payload() data: OrderCreatedDto, @Ctx() ctx: RpcContext) {
const tenant = ctx.getHeader('x-tenant'); // 'acme'
const traceId = ctx.getHeader('x-trace-id'); // uuid string
const missing = ctx.getHeader('x-unknown'); // undefined
}

The isJetStream() type guard

RpcContext can wrap either a JetStream message (JsMsg) or a Core NATS message (Msg). The isJetStream() method narrows the return type of getMessage():

if (ctx.isJetStream()) {
const msg = ctx.getMessage(); // TypeScript knows this is JsMsg
console.log('Redelivered:', msg.redelivered);
}
When is it not JetStream?

This check is useful when writing code that works across both Core RPC mode (Msg) and JetStream mode (JsMsg). The new metadata getters (getDeliveryCount(), etc.) already handle this internally — they return undefined for Core messages.

Accessing the raw NATS message

For advanced use cases, getMessage() gives direct access to the underlying NATS message object. Prefer the typed accessors when possible.

if (ctx.isJetStream()) {
const msg = ctx.getMessage();
console.log('Pending:', msg.info.pending);
console.log('Stream sequence:', msg.info.streamSequence);
}
Manual acknowledgment

Calling msg.ack(), msg.nak(), or msg.term() directly bypasses the transport's settlement logic. Use ctx.retry() and ctx.terminate() instead — they integrate with ack extension, dead letter handling, and observability hooks.

See also