Record Builder & Deduplication
JetstreamRecordBuilder is a fluent builder for attaching custom headers, per-request timeouts, and deduplication IDs to outbound messages. It follows the same record-builder pattern used by other NestJS transports (RmqRecord, NatsRecord).
Basic usage
Build a record and pass it as the payload to client.send() or client.emit():
import { JetstreamRecordBuilder } from '@horizon-republic/nestjs-jetstream';
const record = new JetstreamRecordBuilder({ orderId: 42 })
.setHeader('x-tenant', 'acme')
.setTimeout(5000)
.build();
// Fire-and-forget event
await lastValueFrom(this.client.emit('order.created', record));
// RPC request
const result = await lastValueFrom(this.client.send('get.order', record));
The builder is immutable after .build() — the returned JetstreamRecord is a frozen snapshot of the data, headers, timeout, and message ID at the time of construction.
Custom headers
Use setHeader() to attach metadata that travels alongside the payload. Headers are available in handlers via RpcContext.getHeader().
const record = new JetstreamRecordBuilder(data)
.setHeader('x-tenant', 'acme')
.setHeader('x-trace-id', traceId)
.build();
For multiple headers at once, use setHeaders():
const record = new JetstreamRecordBuilder(data)
.setHeaders({
'x-tenant': 'acme',
'x-trace-id': traceId,
'x-request-source': 'api-gateway',
})
.build();
Message ID & JetStream deduplication
Since v2.4.0JetStream has built-in server-side deduplication. When a message is published with a message ID, the server remembers that ID for a configurable time window. If a second message with the same ID arrives within the window, it is silently dropped — no duplicate processing occurs.
How the dedup window works
Each JetStream stream has a duplicate_window setting that controls how long the server remembers message IDs. The default window is 2 minutes for event, broadcast, and ordered streams, and 30 seconds for command (RPC) streams.
If you do not set a message ID, the transport generates a random UUID for every publish. This means no deduplication by default — each publish is treated as a unique message.
Setting a deterministic message ID
To enable deduplication, provide a deterministic ID derived from your domain data:
const record = new JetstreamRecordBuilder(orderData)
.setMessageId(`order-created-${order.id}`)
.build();
await lastValueFrom(this.client.emit('order.created', record));
Now if a network retry or application restart causes the same event to be published twice, the JetStream server drops the duplicate automatically.
Good message IDs are derived from business identifiers: order-${orderId}, payment-${paymentId}-refund, user-${userId}-email-changed. Avoid timestamps or random values — they defeat the purpose of deduplication.
Deduplication only works within the duplicate_window. If your retry logic can span longer than the window, you need application-level idempotency checks in your handlers as well.
Per-request timeout override
The setTimeout() method overrides the global RPC timeout for a single request. This is useful for operations that are known to be slow:
// This request gets 30 seconds instead of the global default
const record = new JetstreamRecordBuilder({ reportId: 'annual-2024' })
.setTimeout(30_000)
.build();
const report = await lastValueFrom(
this.client.send('generate.report', record),
);
The timeout only applies to RPC (client.send()). For fire-and-forget events (client.emit()), timeout has no effect since there is no response to wait for.
Scheduled delivery
Since v2.8.0Use scheduleAt() to delay message delivery to a future time. The message is held by the NATS server and delivered to the consumer at the specified time:
const record = new JetstreamRecordBuilder({ orderId: 42, type: 'reminder' })
.scheduleAt(new Date(Date.now() + 60 * 60 * 1000)) // deliver in 1 hour
.build();
await lastValueFrom(this.client.emit('order.reminder', record));
Scheduling requires NATS Server >= 2.12 and allow_msg_schedules: true on the event stream. The consumer handles scheduled messages like any normal event — no changes needed on the receiving side.
scheduleAt() only works with client.emit(). If used with client.send() (RPC), the schedule is silently ignored and a warning is logged.
See Scheduling (Delayed Jobs) for the full guide, including configuration, how it works under the hood, and max_age considerations.
Reserved headers
The transport uses three headers internally for RPC correlation. These are reserved and cannot be set via the builder:
| Header | Purpose |
|---|---|
x-correlation-id | Links an RPC request to its response |
x-reply-to | Inbox subject for the RPC response |
x-error | Marks error responses so the client can distinguish success from failure |
Attempting to set a reserved header throws an error immediately on setHeader() call:
// Throws: Header "x-correlation-id" is reserved by the JetStream transport
// and cannot be set manually.
new JetstreamRecordBuilder(data)
.setHeader('x-correlation-id', 'my-id')
.build();
The error is thrown on setHeader(), not on build(). This gives you immediate feedback at the call site.
Auto-set transport headers
In addition to reserved headers, the transport automatically sets two informational headers on every outbound message:
| Header | Value | Description |
|---|---|---|
x-subject | NATS subject | The original subject the message was published to |
x-caller-name | Service name | The internal name of the sending service |
These headers are read-only from the handler's perspective — you can access them via ctx.getHeader('x-subject') but cannot override them via the builder.
API summary
| Method | Description |
|---|---|
new JetstreamRecordBuilder(data?) | Create a builder, optionally with initial payload |
.setData(data) | Set or replace the payload |
.setHeader(key, value) | Add a single custom header |
.setHeaders(record) | Add multiple headers from a key-value object |
.setMessageId(id) | Set a deterministic message ID for deduplication |
.setTimeout(ms) | Override the global RPC timeout for this request |
.scheduleAt(date) | Schedule one-shot delayed delivery (NATS >= 2.12). Since v2.8.0 |
.build() | Return an immutable JetstreamRecord |
Next steps
- Scheduling (Delayed Jobs) — delay message delivery to a future time
- Handler Context — access headers and message metadata in your handlers
- Custom Codec — control how payloads are serialized
- Module Configuration — configure dedup windows via stream overrides