011: Instant messaging
A real-time chat application where multiple browser tabs exchange messages via Piko's SSE streaming. Messages flow through a server-side hub: chat.Send broadcasts to all connected chat.Listen streams. If a connection drops, withRetryStream() reconnects and replays missed messages using event IDs.
What this demonstrates
The chat.Send and chat.Listen actions mediate real-time messaging through a shared in-memory hub. The chat.Listen stream is long-lived. It keeps the connection open indefinitely, never calls SendComplete, and runs until the client disconnects. On the client, withRetryStream() configures reconnection with exponential backoff. The action calls stream.SendWithID() so each message's SSE id is the hub message ID. On reconnect the browser sends Last-Event-ID, and the chat.Listen action replays the messages the client lost by reading the hub's in-memory ring buffer (skipping any message whose ID the client already has).
The server sends heartbeat pings every 15 seconds to prevent proxy timeouts. Calling Subscribe() before reading history makes sure no messages slip through between the history read and the subscription. Non-blocking fan-out keeps fast clients responsive. Broadcast uses select with default to skip slow subscribers.
Project structure
src/
actions/
chat/
hub.go Chat hub singleton with Subscribe/Broadcast/History
send.go POST action: broadcast a message
listen.go SSE streaming action: subscribe and forward
pages/
index.pk Chat UI with login, message feed, send controls
How it works
The hub manages subscribers and a ring buffer of the last 100 messages. The Listen action subscribes and forwards indefinitely:
func (a *ListenAction) StreamProgress(stream *piko.SSEStream) error {
msgCh, unsubscribe := hub.Subscribe()
defer unsubscribe()
// Skip messages the client already has, using its Last-Event-ID.
var lastSeenID uint64
if lastID := stream.LastEventID(); lastID != "" {
if parsed, err := strconv.ParseUint(lastID, 10, 64); err == nil {
lastSeenID = parsed
}
}
// Replay missed history from the hub's ring buffer.
for _, msg := range hub.History() {
if msg.ID <= lastSeenID { continue }
stream.SendWithID(strconv.FormatUint(msg.ID, 10), "chat", msg)
}
// Forward live messages until the client disconnects.
for {
select {
case <-stream.Done(): return nil
case msg := <-msgCh: stream.SendWithID(strconv.FormatUint(msg.ID, 10), "chat", msg)
case <-heartbeat.C: stream.SendHeartbeat()
}
}
}
The client connects with auto-reconnection:
await action.chat.Listen({})
.withOnProgress((data, eventType) => { /* append message */ })
.withRetryStream({ maxReconnects: Infinity, baseDelay: 2000, backoff: 'exponential' })
.call();
How to run this example
In the root directory of the Piko repository:
cd examples/scenarios/011_live_notifications/src/
go mod tidy
air