Module Configuration
This is the main surface you'll touch day-to-day. The library follows NestJS conventions with three registration methods: forRoot() for global setup, forRootAsync() for async/dynamic configuration, and forFeature() for per-module client registration. If you only read one configuration page, read this one.
forRoot()
forRoot() registers the transport globally. Call it once in your root AppModule. It creates the shared NATS connection, codec, event bus, and (optionally) the full consumer infrastructure.
import { Module } from '@nestjs/common';
import { JetstreamModule, TransportEvent, toNanos } from '@horizon-republic/nestjs-jetstream';
@Module({
imports: [
JetstreamModule.forRoot({
name: 'user-events',
servers: ['nats://localhost:4222'],
events: {
stream: {
max_age: toNanos(30, 'days'),
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
num_replicas: 3,
},
consumer: {
max_ack_pending: 500,
ack_wait: toNanos(30, 'seconds'),
},
consume: { idle_heartbeat: 10_000 },
concurrency: 200,
ackExtension: true,
},
rpc: { mode: 'core', timeout: 10_000 },
shutdownTimeout: 15_000,
hooks: {
[TransportEvent.Error]: (err, ctx) => console.error(`[${ctx}]`, err),
[TransportEvent.Connect]: (server) => console.log(`Connected to ${server}`),
},
}),
],
})
export class AppModule {}
How name maps to streams and subjects
The name field drives all NATS resource naming. Given name: 'user-events':
- Event stream:
user-events__microservice_ev-stream - Event subjects:
user-events__microservice.ev.{pattern}(e.g.,user-events__microservice.ev.user.created) - Consumer:
user-events__microservice_ev-consumer
The __microservice suffix provides namespace isolation from other NATS clients on the same cluster. See Naming Conventions for the full naming table and helper functions.
forRootAsync()
For real-world applications, you'll typically load configuration from environment variables or a config service. forRootAsync() supports three patterns.
useFactory (most common)
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { JetstreamModule } from '@horizon-republic/nestjs-jetstream';
@Module({
imports: [
ConfigModule.forRoot(),
JetstreamModule.forRootAsync({
name: 'orders',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => {
const mode = config.get<'core' | 'jetstream'>('RPC_MODE', 'core');
return {
servers: [config.getOrThrow('NATS_URL')],
rpc: mode === 'jetstream' ? { mode, timeout: 60_000 } : { mode },
shutdownTimeout: config.get('SHUTDOWN_TIMEOUT', 10_000),
};
},
}),
],
})
export class AppModule {}
name lives outside the factoryThe name property is defined at the top level of forRootAsync(), not inside the factory return value. This is by design — the name is needed upfront for DI token generation before the factory runs.
useExisting
Point to an already-registered provider that implements the options interface:
JetstreamModule.forRootAsync({
name: 'orders',
imports: [NatsConfigModule],
useExisting: NatsConfigService,
})
The NatsConfigService must be a class-based provider that directly implements Omit<JetstreamModuleOptions, 'name'> — the instance itself is used as the options object (NestJS does not call a factory method on it).
useClass
Similar to useExisting, but the class is instantiated by the module:
JetstreamModule.forRootAsync({
name: 'orders',
useClass: NatsConfigService,
})
forFeature()
forFeature() creates a lightweight JetstreamClient proxy for a target service. It reuses the shared NATS connection from forRoot() — no new connections are created.
Import it in each feature module that needs to communicate with a specific service:
import { Module } from '@nestjs/common';
import { JetstreamModule } from '@horizon-republic/nestjs-jetstream';
import { OrdersService } from './orders.service';
@Module({
imports: [
JetstreamModule.forFeature({ name: 'users' }),
JetstreamModule.forFeature({ name: 'payments' }),
],
providers: [OrdersService],
exports: [OrdersService],
})
export class OrdersModule {}
Injecting clients
Inject the client using @Inject() with the service name as the token:
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { firstValueFrom, lastValueFrom } from 'rxjs';
@Injectable()
export class OrdersService {
constructor(
@Inject('users') private readonly usersClient: ClientProxy,
@Inject('payments') private readonly paymentsClient: ClientProxy,
) {}
async createOrder(userId: number) {
// RPC call to the users service
const user = await firstValueFrom(
this.usersClient.send('user.get', { id: userId }),
);
// Fire-and-forget event to the payments service
await lastValueFrom(
this.paymentsClient.emit('payment.initiate', {
userId,
amount: 99.99,
}),
);
}
}
Injection token
The injection token is the service name string you passed to forFeature({ name }). Use the standard NestJS pattern:
@Inject('users')
private readonly usersClient: ClientProxy;
The library exports a getClientToken(name) helper that returns the same string — it exists for code bases that prefer explicit symbolic tokens, but @Inject('users') is the canonical form and the one used throughout these docs.
Per-client codec override
Each forFeature() client can use a different codec, falling back to the global codec from forRoot() when omitted:
import { MsgPackCodec } from './codecs/msgpack.codec';
JetstreamModule.forFeature({
name: 'legacy-service',
codec: new MsgPackCodec(),
})
See Custom Codec for how to implement the Codec interface.
Full options reference
Below is every field in JetstreamModuleOptions with its type, default value, and guidance on when to change it.
Required options
| Field | Type | Description |
|---|---|---|
name | string | Service name. Used for stream, consumer, and subject naming. Must be unique per service in your system. |
servers | string[] | NATS server URLs (e.g., ['nats://localhost:4222']). |
Optional options
| Field | Type | Default | Description |
|---|---|---|---|
codec | Codec | JsonCodec | Global message serializer/deserializer. Swap for MessagePack, Protobuf, etc. |
rpc | RpcConfig | { mode: 'core' } | RPC transport mode and configuration. See RPC Config. |
consumer | boolean | true | Enable consumer infrastructure. Set to false for publisher-only services (e.g., API gateways). |
events | StreamConsumerOverrides | (production defaults) | Overrides for workqueue event stream and consumer config. To enable message scheduling, set events.stream.allow_msg_schedules: true (requires NATS >= 2.12). Since v2.8.0 |
broadcast | StreamConsumerOverrides | (production defaults) | Overrides for broadcast event stream and consumer config. |
ordered | OrderedEventOverrides | (production defaults) | Configuration for ordered event consumers. Since v2.4.0 |
hooks | Partial<TransportHooks> | (none) | Transport lifecycle hook handlers. Unset hooks are silently ignored. |
onDeadLetter | (info: DeadLetterInfo) => Promise<void> | (none) | Async callback for dead letter handling. Called and awaited when a message exhausts all delivery attempts. Since v2.2.0 |
dlq | { stream?: StreamConfigOverrides } | (none) | Built-in Dead Letter Queue stream. When set, exhausted messages are automatically republished to a dedicated DLQ stream with tracking headers. See Dead Letter Queue. Since v2.9.0 |
metadata | MetadataRegistryOptions | (auto-enabled if any handler has meta) | Handler metadata registry — publishes @EventPattern / @MessagePattern handler metadata to a NATS KV bucket for cross-service discovery. No-op when consumer: false (publisher-only services register nothing). See Handler Metadata. Since v2.9.0 |
allowDestructiveMigration | boolean | false | Allow automatic blue-green stream recreation for immutable property changes (e.g., storage). Without this flag, the transport logs a warning and keeps the existing stream config. See Stream Migration. Since v2.9.0 |
shutdownTimeout | number | 10_000 (10s) | Graceful shutdown timeout in milliseconds. Handlers exceeding this are abandoned. |
connectionOptions | Partial<ConnectionOptions> | (none) | Raw NATS ConnectionOptions pass-through for TLS, auth, reconnection, etc. |
RpcConfig
RPC configuration is a discriminated union on mode:
| Mode | Timeout default | Persistence | Best for |
|---|---|---|---|
'core' | 30_000 ms (30 s) | None (in-memory) | Low-latency RPC, simple request/reply |
'jetstream' | 180_000 ms (3 min) | JetStream stream | Commands that must survive handler downtime |
The timeout field is specified in milliseconds, not seconds. Writing timeout: 30 means 30 ms — almost certainly a bug. Use timeout: 30_000 for 30 seconds.
// Core mode (default) -- NATS native request/reply
rpc: { mode: 'core', timeout: 10_000 }
// JetStream mode -- commands persisted in a stream
rpc: {
mode: 'jetstream',
timeout: 60_000,
stream: { max_age: toNanos(1, 'minutes') }, // stream overrides
consumer: { max_deliver: 3 }, // consumer overrides
}
The timeout value controls both the client-side wait (how long the caller waits for a response) and the server-side handler limit (how long the handler is allowed to run before being terminated). Both sides use the value from their own forRoot() configuration.
See RPC Patterns for a full comparison of the two modes.
StreamConsumerOverrides
The events and broadcast fields accept stream and consumer configuration overrides:
import { toNanos } from '@horizon-republic/nestjs-jetstream';
JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
events: {
stream: {
max_age: toNanos(3, 'days'), // 3 days instead of default 7
max_bytes: 512 * 1024 * 1024, // 512 MB instead of default 5 GB
},
consumer: {
max_deliver: 5, // retry 5 times instead of default 3
ack_wait: toNanos(30, 'seconds'), // 30s ack timeout instead of default 10s
},
},
})
These overrides are merged with the production defaults. You only need to specify the fields you want to change.
NATS JetStream uses nanoseconds for all time-based configuration. The library exports a toNanos(value, unit) helper that converts human-readable durations to nanoseconds. Supported units: 'ms', 'seconds', 'minutes', 'hours', 'days'.
OrderedEventOverrides
Since v2.4.0Ordered events use a separate stream with Limits retention and deliver messages in strict sequential order. The configuration is simpler than workqueue/broadcast because ordered consumers are ephemeral and auto-managed by the @nats-io/jetstream client.
import { DeliverPolicy } from '@nats-io/jetstream';
JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
ordered: {
deliverPolicy: DeliverPolicy.New, // only new messages (default: All)
stream: {
max_age: toNanos(12, 'hours'), // 12 hours
},
},
})
| Field | Type | Default | Description |
|---|---|---|---|
stream | Partial<StreamConfig> | (production defaults) | Stream overrides (e.g., max_age, max_bytes). |
deliverPolicy | DeliverPolicy | DeliverPolicy.All | Where to start reading when the consumer is created. |
optStartSeq | number | (none) | Start sequence (only with DeliverPolicy.StartSequence). |
optStartTime | string | (none) | Start time ISO string (only with DeliverPolicy.StartTime). |
replayPolicy | ReplayPolicy | ReplayPolicy.Instant | Replay policy for historical messages. |
See Ordered Events for detailed usage.
connectionOptions
The connectionOptions field passes raw NATS ConnectionOptions (from @nats-io/transport-node) directly to the NATS client. Use it for TLS, authentication, and reconnection configuration.
The name and servers fields from the top-level options take precedence over anything set in connectionOptions. Don't duplicate them.
TLS
The tls block is passed straight through to @nats-io/transport-node, so any field supported by its TlsOptions works here — paths (certFile, keyFile, caFile), inline PEM (cert, key, ca), or an empty tls: {} for server-only TLS against a broker whose CA your system already trusts.
JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://nats.prod.internal:4222'],
connectionOptions: {
tls: {
// mTLS with client cert + private key, plus a self-signed CA
certFile: '/certs/client.crt',
keyFile: '/certs/client.key',
caFile: '/certs/ca.crt',
},
},
})
For server-only TLS (no client certificate) against a publicly-trusted broker, tls: {} is enough — it tells the client to upgrade the connection without sending a client identity.
Authentication
// Token authentication
connectionOptions: {
token: process.env.NATS_TOKEN,
}
// User/password authentication
connectionOptions: {
user: process.env.NATS_USER,
pass: process.env.NATS_PASS,
}
Reconnection
connectionOptions: {
maxReconnectAttempts: -1, // unlimited reconnection attempts
reconnectTimeWait: 2_000, // 2s between reconnection attempts
reconnectJitter: 500, // add up to 500ms random jitter
}
Publisher-only mode
Set consumer: false to skip all consumer infrastructure. This is useful for API gateways or services that only publish messages and never handle them:
JetstreamModule.forRoot({
name: 'api-gateway',
servers: ['nats://localhost:4222'],
consumer: false, // no streams, consumers, or message routing
})
In publisher-only mode, the JetstreamStrategy provider resolves to null. Do not call app.connectMicroservice() or app.get(JetstreamStrategy).
const bootstrap = async () => {
const app = await NestFactory.create(AppModule);
// No microservice connection needed in publisher-only mode
await app.listen(3000);
};
void bootstrap();
What's next?
- RPC Patterns — Core vs JetStream mode, error handling, and timeouts
- Events & Broadcast — workqueue events and fan-out delivery
- Scheduling (Delayed Jobs) — one-shot delayed delivery via NATS 2.12
- Lifecycle Hooks — monitor connection state and transport events
- Default Configs — full list of production-ready stream and consumer defaults