Clojure: Synchronous server sent events with virtual threads and channels


I have been playing around with turbo and datastar both of which use server sent events to broadcast updates to the client. Traditionally, server sent events work best with asynchronous ring handlers. But, personally I much prefer the simplicity of synchronous handlers, and with the advent of virtual threads we can have our cake and eat it. This post will go over how to build a synchronous ring handler for server sent events.

Extending StreamableResponseBody

First we want to extend StreamableResponseBody protocol to support ManyToManyChannel. Even though we are using virtual threads we still want a way for these virtual threads to communicate with each other and core.async channels are perfect for this. In theory BufferedWriter is thread safe so you could skip core.async if you really want to, but you'd miss out on useful features like sliding-buffer etc.

;; Extend core.async channel with StreamableResponseBody
(extend-type clojure.core.async.impl.channels.ManyToManyChannel
  StreamableResponseBody
  (write-body-to-stream [ch _response ^OutputStream output-stream]
    (with-open [out    output-stream
                writer (io/writer out)]
      (try
        (loop []
          (when-let [^String msg (a/<!! ch)]
            (doto writer (.write msg) (.flush))
            (recur)))
        ;; If the client disconnects writing to the output stream
        ;; throws an IOException.
        (catch java.io.IOException _)
        ;; Close channel after client disconnect.
        (finally (a/close! ch))))))

This code takes a message off the channel and writes it to the output-stream. If a write fails we close the channel. It's important to note this code uses a/<!! so is blocking.

Tracking client connections

Next lets create an atom to keep track of client connections:

(defonce clients (atom #{}))

Sending messages

Some code for sending messages:

(defn format-event [body]
  (str "data: " body "\n\n"))

(defn send>!! [ch message]
  (let [v (a/>!! ch message)]
    (when-not v (swap! clients disj ch))
    ;; Keeps the return semantics of >!!
    v))

When a channel is closed, we want to remove the clients connection from the clients atom.

Heartbeat

There's no way for us to know that the client has closed the connection without trying to write to the output-stream. To prune disconnected clients, so that we don't have an ever expanding atom of zombie clients, we need to occasionally write to the output stream. To do this we send a \n\n message every X seconds. This heartbeat runs in a virtual thread and stops when the channel is closed.

Note: this code is very similar to what you would normally write in a go block but instead we are using the blocking core.async constructs and running them in a virtual thread.

(defn heartbeat>!! [ch msec]
  (Thread/startVirtualThread
    #(loop []
       (Thread/sleep ^long msec)
       (when (send>!! ch "\n\n")
         (recur)))))

The handler

Finally, our synchronous handler creates a channel, adds the channel to the clients atom, sends a connection message and starts a heartbeat. The channel is then returned as the response body.

(defn handler-sse [_]
  (let [ch (a/chan 10)]
    (swap! clients conj ch)
    (send>!! ch (format-event "Successfully connected"))
    ;; Every 10 seconds we send a heartbeat to check if output stream
    ;; is still open.
    (heartbeat>!! ch 10000)
    {:status  200
     :headers {"Content-Type"  "text/event-stream"
               "Cache-Control" "no-cache, no-store"}
     :body    ch}))

The headers are important here, we don't want the client to cache responses and the content type needs to be text/event-stream.

Broadcast

We need a broadcast function to broadcast messages to clients. All it does is iterates through each channel in clients and sends a message.

(defn broadcast-message-to-connected-clients! [message]
  (run! (fn [ch] (send>!! ch (format-event message))) @clients))

Simple server

We need a server configured to use virtual threads. Virtual threads means we can have hundreds of thousands of threads with very little overhead. This is what enables us to write the handlers in a synchronous manner without running into performance problems.

Note: you'll need to be running Java 21+ to have access to Virtual threads.

(def app
  (fn handler [{:keys [request-method uri] :as req}]
    (if (= [:get  "/"] [request-method uri])
      (handler-sse req)
      {:status 404})))

(defn start-server [& {:as opts}]
  (let [thread-pool (new QueuedThreadPool)
        _           (.setVirtualThreadsExecutor thread-pool
                      (Executors/newVirtualThreadPerTaskExecutor))]
    (run-jetty  #'app
      (merge
        {:port        8080
         :thread-pool thread-pool}
        opts))))

Start the server.

(def server
  (start-server :join? false))

Connecting clients

Connect a client from a terminal. You can connect multiple clients using different terminal windows.

curl localhost:8080 -vv

=>

*   Trying [::1]:8080...
* Connected to localhost (::1) port 8080
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.4.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: Jetty(12.0.13)
< Content-Type: text/event-stream;charset=UTF-8
< Cache-Control: no-cache, no-store
< Transfer-Encoding: chunked
< 
data: Successfully connected

Broadcast messages

Send a message to all clients.

(broadcast-message-to-connected-clients! "Hello")

You should see the message print in each connected terminal.

data: Hello

Check connected clients

If we deref the clients atom we can see that we have two connected clients.

@clients
=>
#{#object[clojure.core.async.impl.channels.ManyToManyChannel 0x31de488f "clojure.core.async.impl.channels.ManyToManyChannel@31de488f"]
  #object[clojure.core.async.impl.channels.ManyToManyChannel 0x7870479b "clojure.core.async.impl.channels.ManyToManyChannel@7870479b"]}

Pruning disconnected clients

If we close the terminal windows (disconnecting the clients) and wait 30 seconds or so and deref the clients atom again it should be empty.

@clients
=>
#{}

Conclusion

Hopefully, this post provides a good starting point for anyone wanting to set up server sent events with synchronous ring handlers.

As an aside core.async channels and their blocking constructs work really nicely with virtual threads. In my experience so far it means you never have to reach for go blocks. This massively simplifies things.

The full example project can be found here.

Further Reading: