Skip to main content

Module Configuration

The library follows NestJS conventions with three registration methods: forRoot() for global setup, forRootAsync() for async/dynamic configuration, and forFeature() for per-module client registration.

forRoot()

forRoot() registers the transport globally. Call it once in your root AppModule. It creates the shared NATS connection, codec, event bus, and (optionally) the full consumer infrastructure.

src/app.module.ts
import { Module } from '@nestjs/common';
import { JetstreamModule, TransportEvent, toNanos } from '@horizon-republic/nestjs-jetstream';

@Module({
imports: [
JetstreamModule.forRoot({
name: 'user-events',
servers: ['nats://localhost:4222'],
events: {
stream: {
max_age: toNanos(30, 'days'),
max_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
num_replicas: 3,
},
consumer: {
max_ack_pending: 500,
ack_wait: toNanos(30, 'seconds'),
},
consume: { idle_heartbeat: 10_000 },
concurrency: 200,
ackExtension: true,
},
rpc: { mode: 'core', timeout: 10_000 },
shutdownTimeout: 15_000,
hooks: {
[TransportEvent.Error]: (err, ctx) => console.error(`[${ctx}]`, err),
[TransportEvent.Connect]: (server) => console.log(`Connected to ${server}`),
},
}),
],
})
export class AppModule {}

How name maps to streams and subjects

The name field drives all NATS resource naming. Given name: 'user-events':

  • Event stream: user-events__microservice_ev-stream
  • Event subjects: user-events__microservice.ev.{pattern} (e.g., user-events__microservice.ev.user.created)
  • Consumer: user-events__microservice_ev-consumer

The __microservice suffix provides namespace isolation from other NATS clients on the same cluster. See Naming Conventions for the full naming table and helper functions.

forRootAsync()

For real-world applications, you'll typically load configuration from environment variables or a config service. forRootAsync() supports three patterns.

useFactory (most common)

src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { JetstreamModule } from '@horizon-republic/nestjs-jetstream';

@Module({
imports: [
ConfigModule.forRoot(),
JetstreamModule.forRootAsync({
name: 'orders',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
servers: [config.getOrThrow('NATS_URL')],
rpc: { mode: config.get('RPC_MODE', 'core') as 'core' | 'jetstream' },
shutdownTimeout: config.get('SHUTDOWN_TIMEOUT', 10_000),
}),
}),
],
})
export class AppModule {}
The name lives outside the factory

The name property is defined at the top level of forRootAsync(), not inside the factory return value. This is by design — the name is needed upfront for DI token generation before the factory runs.

useExisting

Point to an already-registered provider that implements the options interface:

JetstreamModule.forRootAsync({
name: 'orders',
imports: [NatsConfigModule],
useExisting: NatsConfigService,
})

The NatsConfigService must be a provider that, when resolved, returns an object matching Omit<JetstreamModuleOptions, 'name'>.

useClass

Similar to useExisting, but the class is instantiated by the module:

JetstreamModule.forRootAsync({
name: 'orders',
useClass: NatsConfigService,
})

forFeature()

forFeature() creates a lightweight JetstreamClient proxy for a target service. It reuses the shared NATS connection from forRoot() — no new connections are created.

Import it in each feature module that needs to communicate with a specific service:

src/orders/orders.module.ts
import { Module } from '@nestjs/common';
import { JetstreamModule } from '@horizon-republic/nestjs-jetstream';
import { OrdersService } from './orders.service';

@Module({
imports: [
JetstreamModule.forFeature({ name: 'users' }),
JetstreamModule.forFeature({ name: 'payments' }),
],
providers: [OrdersService],
exports: [OrdersService],
})
export class OrdersModule {}

Injecting clients

Inject the client using @Inject() with the service name as the token:

src/orders/orders.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { firstValueFrom, lastValueFrom } from 'rxjs';

@Injectable()
export class OrdersService {
constructor(
@Inject('users') private readonly usersClient: ClientProxy,
@Inject('payments') private readonly paymentsClient: ClientProxy,
) {}

async createOrder(userId: number) {
// RPC call to the users service
const user = await firstValueFrom(
this.usersClient.send('user.get', { id: userId }),
);

// Fire-and-forget event to the payments service
await lastValueFrom(
this.paymentsClient.emit('payment.initiate', {
userId,
amount: 99.99,
}),
);
}
}

getClientToken()

The injection token is simply the service name string. The library exports a getClientToken() helper if you prefer explicit token references:

import { getClientToken } from '@horizon-republic/nestjs-jetstream';

@Inject(getClientToken('users'))
private readonly usersClient: ClientProxy;

// Equivalent to:
@Inject('users')
private readonly usersClient: ClientProxy;

Per-client codec override

Each forFeature() client can use a different codec, falling back to the global codec from forRoot() when omitted:

import { MsgPackCodec } from './codecs/msgpack.codec';

JetstreamModule.forFeature({
name: 'legacy-service',
codec: new MsgPackCodec(),
})

See Custom Codec for how to implement the Codec interface.

Full options reference

Below is every field in JetstreamModuleOptions with its type, default value, and guidance on when to change it.

Required options

FieldTypeDescription
namestringService name. Used for stream, consumer, and subject naming. Must be unique per service in your system.
serversstring[]NATS server URLs (e.g., ['nats://localhost:4222']).

Optional options

FieldTypeDefaultDescription
codecCodecJsonCodecGlobal message serializer/deserializer. Swap for MessagePack, Protobuf, etc.
rpcRpcConfig{ mode: 'core' }RPC transport mode and configuration. See RPC Config.
consumerbooleantrueEnable consumer infrastructure. Set to false for publisher-only services (e.g., API gateways).
eventsStreamConsumerOverrides(production defaults)Overrides for workqueue event stream and consumer config.
broadcastStreamConsumerOverrides(production defaults)Overrides for broadcast event stream and consumer config.
orderedOrderedEventOverrides(production defaults)Configuration for ordered event consumers. Since v2.4.0
events.stream.allow_msg_schedulesbooleanfalseEnable message scheduling on the event stream. Requires NATS >= 2.12. Since v2.8.0
hooksPartial<TransportHooks>(none)Transport lifecycle hook handlers. Unset hooks are silently ignored.
onDeadLetter(info: DeadLetterInfo) => Promise<void>(none)Async callback for dead letter handling. Called when a message exhausts all delivery attempts. Since v2.2.0
shutdownTimeoutnumber10_000 (10s)Graceful shutdown timeout in milliseconds. Handlers exceeding this are abandoned.
connectionOptionsPartial<ConnectionOptions>(none)Raw NATS ConnectionOptions pass-through for TLS, auth, reconnection, etc.

RpcConfig

RPC configuration is a discriminated union on mode:

ModeTimeout defaultPersistenceBest for
'core'30sNone (in-memory)Low-latency RPC, simple request/reply
'jetstream'3 minJetStream streamCommands that must survive handler downtime
// Core mode (default) -- NATS native request/reply
rpc: { mode: 'core', timeout: 10_000 }

// JetStream mode -- commands persisted in a stream
rpc: {
mode: 'jetstream',
timeout: 60_000,
stream: { max_age: toNanos(1, 'minutes') }, // stream overrides
consumer: { max_deliver: 3 }, // consumer overrides
}
Timeout applies to both sides

The timeout value controls both the client-side wait (how long the caller waits for a response) and the server-side handler limit (how long the handler is allowed to run before being terminated). Both sides use the value from their own forRoot() configuration.

See RPC Patterns for a full comparison of the two modes.

StreamConsumerOverrides

The events and broadcast fields accept stream and consumer configuration overrides:

import { toNanos } from '@horizon-republic/nestjs-jetstream';

JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
events: {
stream: {
max_age: toNanos(3, 'days'), // 3 days instead of default 7
max_bytes: 512 * 1024 * 1024, // 512 MB instead of default 5 GB
},
consumer: {
max_deliver: 5, // retry 5 times instead of default 3
ack_wait: toNanos(30, 'seconds'), // 30s ack timeout instead of default 10s
},
},
})

These overrides are merged with the production defaults. You only need to specify the fields you want to change.

The toNanos() helper

NATS JetStream uses nanoseconds for all time-based configuration. The library exports a toNanos(value, unit) helper that converts human-readable durations to nanoseconds. Supported units: 'ms', 'seconds', 'minutes', 'hours', 'days'.

OrderedEventOverrides

Since v2.4.0

Ordered events use a separate stream with Limits retention and deliver messages in strict sequential order. The configuration is simpler than workqueue/broadcast because ordered consumers are ephemeral and auto-managed by the @nats-io/jetstream client.

import { DeliverPolicy } from '@nats-io/jetstream';

JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://localhost:4222'],
ordered: {
deliverPolicy: DeliverPolicy.New, // only new messages (default: All)
stream: {
max_age: toNanos(12, 'hours'), // 12 hours
},
},
})
FieldTypeDefaultDescription
streamPartial<StreamConfig>(production defaults)Stream overrides (e.g., max_age, max_bytes).
deliverPolicyDeliverPolicyDeliverPolicy.AllWhere to start reading when the consumer is created.
optStartSeqnumber(none)Start sequence (only with DeliverPolicy.StartSequence).
optStartTimestring(none)Start time ISO string (only with DeliverPolicy.StartTime).
replayPolicyReplayPolicyReplayPolicy.InstantReplay policy for historical messages.

See Ordered Events for detailed usage.

connectionOptions

The connectionOptions field passes raw NATS ConnectionOptions (from @nats-io/transport-node) directly to the NATS client. Use it for TLS, authentication, and reconnection configuration.

Precedence

The name and servers fields from the top-level options take precedence over anything set in connectionOptions. Don't duplicate them.

TLS

JetstreamModule.forRoot({
name: 'orders',
servers: ['nats://nats.prod.internal:4222'],
connectionOptions: {
tls: {
certFile: '/certs/client.crt',
keyFile: '/certs/client.key',
caFile: '/certs/ca.crt',
},
},
})

Authentication

// Token authentication
connectionOptions: {
token: process.env.NATS_TOKEN,
}

// User/password authentication
connectionOptions: {
user: process.env.NATS_USER,
pass: process.env.NATS_PASS,
}

Reconnection

connectionOptions: {
maxReconnectAttempts: -1, // unlimited reconnection attempts
reconnectTimeWait: 2_000, // 2s between reconnection attempts
reconnectJitter: 500, // add up to 500ms random jitter
}

Publisher-only mode

Set consumer: false to skip all consumer infrastructure. This is useful for API gateways or services that only publish messages and never handle them:

JetstreamModule.forRoot({
name: 'api-gateway',
servers: ['nats://localhost:4222'],
consumer: false, // no streams, consumers, or message routing
})

In publisher-only mode, the JetstreamStrategy provider resolves to null. Do not call app.connectMicroservice() or app.get(JetstreamStrategy).

src/main.ts
const bootstrap = async () => {
const app = await NestFactory.create(AppModule);

// No microservice connection needed in publisher-only mode
await app.listen(3000);
};

void bootstrap();

What's next?