How to publish and subscribe to events

This guide shows how to publish messages to Piko's event bus and how to register a handler that consumes them. See the events reference for the full API. The underlying library is Watermill.

Get the publisher and subscriber

package main

import (
    "context"
    "encoding/json"

    "github.com/ThreeDotsLabs/watermill/message"
    "piko.sh/piko/wdk/events"
)

func publishCustomerCreated(ctx context.Context, customerID string) error {
    publisher, err := events.GetPublisher()
    if err != nil {
        return err
    }

    payload, err := json.Marshal(map[string]string{"id": customerID})
    if err != nil {
        return err
    }

    msg := message.NewMessage(customerID, payload)
    return publisher.Publish("customer.created", msg)
}

The publisher is the shared instance configured at bootstrap. The topic name is arbitrary. Pick a convention (<bounded-context>.<event>) and document it.

Handle events on startup

Register handlers on Piko's router during bootstrap:

package main

import (
    "context"
    "encoding/json"

    "github.com/ThreeDotsLabs/watermill/message"
    "piko.sh/piko"
    "piko.sh/piko/wdk/events"
)

func main() {
    ssr := piko.New()

    ctx := context.Background()
    router, err := events.GetRouter()
    if err != nil {
        panic(err)
    }

    subscriber, err := events.GetSubscriber()
    if err != nil {
        panic(err)
    }

    router.AddNoPublisherHandler(
        "send-welcome-email",
        "customer.created",
        subscriber,
        func(msg *message.Message) error {
            var payload struct {
                ID string `json:"id"`
            }
            if err := json.Unmarshal(msg.Payload, &payload); err != nil {
                return err
            }
            return sendWelcomeEmail(ctx, payload.ID)
        },
    )

    ssr.Run()
}

AddNoPublisherHandler is appropriate when the handler does not publish further events. Use AddHandler when it does, to get ack semantics right.

Configure a non-default backend

The default provider is in-process GoChannel. For distributed messaging, register the NATS provider at bootstrap:

import (
    "piko.sh/piko"
    "piko.sh/piko/wdk/events/events_provider_nats"
)

natsProvider, err := events_provider_nats.NewNATSProvider(events_provider_nats.Config{
    URL: "nats://localhost:4222",
})
if err != nil {
    panic(err)
}

ssr := piko.New(
    piko.WithEventsProvider(natsProvider),
)

Two providers ship with Piko. The in-process events_provider_gochannel (the default) and events_provider_nats for distributed messaging. Both live under wdk/events/events_provider_*.

See also