011: Instant messaging

A real-time chat application where multiple browser tabs exchange messages via Piko's SSE streaming. Messages flow through a server-side hub: chat.Send broadcasts to all connected chat.Listen streams. If a connection drops, withRetryStream() reconnects and replays missed messages using event IDs.

What this demonstrates

The chat.Send and chat.Listen actions mediate real-time messaging through a shared in-memory hub. The chat.Listen stream is long-lived. It keeps the connection open indefinitely, never calls SendComplete, and runs until the client disconnects. On the client, withRetryStream() configures reconnection with exponential backoff. The action calls stream.SendWithID() so each message's SSE id is the hub message ID. On reconnect the browser sends Last-Event-ID, and the chat.Listen action replays the messages the client lost by reading the hub's in-memory ring buffer (skipping any message whose ID the client already has).

The server sends heartbeat pings every 15 seconds to prevent proxy timeouts. Calling Subscribe() before reading history makes sure no messages slip through between the history read and the subscription. Non-blocking fan-out keeps fast clients responsive. Broadcast uses select with default to skip slow subscribers.

Project structure

src/
  actions/
    chat/
      hub.go                          Chat hub singleton with Subscribe/Broadcast/History
      send.go                         POST action: broadcast a message
      listen.go                       SSE streaming action: subscribe and forward
  pages/
    index.pk                          Chat UI with login, message feed, send controls

How it works

The hub manages subscribers and a ring buffer of the last 100 messages. The Listen action subscribes and forwards indefinitely:

func (a *ListenAction) StreamProgress(stream *piko.SSEStream) error {
    msgCh, unsubscribe := hub.Subscribe()
    defer unsubscribe()
    // Skip messages the client already has, using its Last-Event-ID.
    var lastSeenID uint64
    if lastID := stream.LastEventID(); lastID != "" {
        if parsed, err := strconv.ParseUint(lastID, 10, 64); err == nil {
            lastSeenID = parsed
        }
    }
    // Replay missed history from the hub's ring buffer.
    for _, msg := range hub.History() {
        if msg.ID <= lastSeenID { continue }
        stream.SendWithID(strconv.FormatUint(msg.ID, 10), "chat", msg)
    }
    // Forward live messages until the client disconnects.
    for {
        select {
        case <-stream.Done(): return nil
        case msg := <-msgCh: stream.SendWithID(strconv.FormatUint(msg.ID, 10), "chat", msg)
        case <-heartbeat.C: stream.SendHeartbeat()
        }
    }
}

The client connects with auto-reconnection:

await action.chat.Listen({})
    .withOnProgress((data, eventType) => { /* append message */ })
    .withRetryStream({ maxReconnects: Infinity, baseDelay: 2000, backoff: 'exponential' })
    .call();

How to run this example

In the root directory of the Piko repository:

cd examples/scenarios/011_live_notifications/src/
go mod tidy
air

See also