NATS event provider

Distributed pub/sub backed by NATS, with optional JetStream persistence, durable consumers, queue-group competing consumers, and replay.

Overview

The NATS provider implements events.Provider against nats.go and the Watermill NATS adapter. It satisfies the same port as GoChannel, so moving from in-process to clustered messaging is a one-line WithEventsProvider change. The Router, Publisher, and Subscriber access stays identical.

You own the provider lifecycle. Create the provider, call Start(ctx), register it with WithEventsProvider, and call Close() on shutdown. The container does not start or stop a provider you supply. The Wiring section below shows the order that matters.

The provider supports two modes. Core NATS gives at-most-once, fire-and-forget delivery without persistence. JetStream gives at-least-once delivery with durable consumers, replay, and acknowledgements. JetStream is on unless you set Config.JetStream.Disabled = true. The JetStream.AutoProvision flag (default true) creates streams on demand. That helps in development. In production, set it to false and pre-create streams with explicit retention, replication, and storage policies.

The provider builds queue groups by prepending Config.QueueGroupPrefix (default "piko") to topic names. Combined with Config.SubscribersCount, that gives competing-consumer load balancing across replicas. JetStream.TrackMessageID adds exactly once de-duplication at a throughput cost. JetStream.AckAsync does the opposite. It returns from Ack() before server confirmation, trading delivery guarantees for throughput.

NATS gives cross-process and cross-replica delivery, persistence, and replay, at the cost of running a NATS server. The server is a single binary, but it is still a server to operate. Reach for GoChannel when every publisher and subscriber lives inside the same process and durability is not required.

The provider handles connection loss for you. It connects with infinite reconnects and a 2-second backoff, plus reconnect, disconnect, and error handlers, so a transient NATS outage recovers without application code. It also emits OpenTelemetry metrics for start and close duration, connection attempts, reconnections, and errors, so the wired provider reports observability data from the first connection.

Requirements

  • A running NATS server reachable at Config.URL (default nats://127.0.0.1:4222). For JetStream, start the server with -js.
  • Network egress to the NATS cluster. Pass mTLS and auth options through Config.NATSOptions as standard nc.Option values.
  • The piko.sh/piko/wdk/events/events_provider_nats module. It is a separate Go module that requires github.com/ThreeDotsLabs/watermill-nats/v2 and github.com/nats-io/nats.go. Add it to your go.mod. The provider is pure Go, so it needs no build tag or CGO and runs in both interpreted and compiled mode.

Configuration

NewNATSProvider does not apply any defaults. It stores the Config you pass. Start from DefaultConfig() and override the fields you need, so omitted fields keep their working values instead of dropping to zero.

import (
    "piko.sh/piko/wdk/events/events_provider_nats"
)

config := events_provider_nats.DefaultConfig()
config.URL = "nats://nats-server:4222"
config.ClusterID = "piko-events"
config.QueueGroupPrefix = "piko"
config.SubscribersCount = 4
config.JetStream.AutoProvision = false // pre-create streams in production

provider, err := events_provider_nats.NewNATSProvider(config)
if err != nil {
    return err
}

DefaultConfig() sets URL to nats://127.0.0.1:4222, ClusterID to piko-events, QueueGroupPrefix to piko, SubscribersCount to 1, AckWaitTimeout and CloseTimeout to 30 seconds, and RouterConfig.CloseTimeout to 30. JetStream defaults to AutoProvision true, DurablePrefix piko, and both TrackMessageID and AckAsync false. Build a Config literal from scratch and every omitted field becomes its zero value, which yields an empty URL and zero timeouts.

To switch to core NATS, set config.JetStream.Disabled = true.

Wiring

Start the provider before you register it, and close it on shutdown. The container does not manage a provider you supply, so an unstarted provider has a nil Publisher, Subscriber, and Router, and the event bus fails to build.

provider, err := events_provider_nats.NewNATSProvider(config)
if err != nil {
    return err
}
if err := provider.Start(ctx); err != nil {
    return err
}
defer provider.Close()

ssr := piko.New(
    piko.WithEventsProvider(provider),
)

Start(ctx) connects, sets up pub/sub, and runs the Watermill router. The provided context controls its lifetime. Close() drains the router, subscriber, and publisher, then shuts the connection down.

See also

Other event providers:

  • GoChannel, in-process Watermill provider for single-process workloads.

Framework docs:

External: