This project took me forever to figure out, so I'm going to document it as much as possible. What I've been trying to get to is a simple, Async Rust codebase that would allow me to plug in web-based server-side events or web socket events and then send them over to one or more clients as needed. A lot of this code comes from a variety of examples that I stitched together, and getting them to work well was a headache, so now you get to share in it.

Since this is an async project, I'm going to use Tokio as my runtime and Axum as my server. They're well-documented and more or less standard, as far as most people are concerned, so let's go with it. Minus all the metadata about tracing and tracking and all that, the main function for an Axum project is just:

<main>= (U->)
#[tokio::main]
async fn main() {
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await.unwrap();
    let app = app();
    axum::serve(listener, app).await.unwrap();
}

Nothing could be simpler: build a listener, build an app, start them running.

So, what does the app look like? Well, we need:

  • something to display the results on the browser side
  • a static file handler to deliver that something
  • a route to the thing producing our text
<app>= (U->)
fn app() -> Router {
    let assets_dir = PathBuf::from(env!("ASSETS_PATH"));
    let static_files_service = ServeDir::new(assets_dir).append_index_html_on_directories(true);
    Router::new()
        .fallback_service(static_files_service)
        .route("/heartbeat", get(heartbeat))
}

For a brief aside, that ASSETS_PATH there isn't a standard environment variable, so where does it come from? I have a build.rs script that just defines where to find things:

<build.rs>=
use std::path::{Path, PathBuf};

fn path_to_assets() -> PathBuf {
    Path::new(env!("CARGO_MANIFEST_DIR")).join("assets")
}

fn main() {
    let assets: PathBuf = path_to_assets();
    println!("cargo:rustc-env=ASSETS_PATH={}", &assets.display());
}

Nothing fancy, but I wanted to understand what one could do with build.rs, and this was my first real experiment with it. I could have just hacked the CARGO_MANIFEST_DIR line directly into my source code, but then I wouldn't have learned how to create new environment variables on the fly at build-time, would I?

Back to the heartbeat. Here, I create an MPSC (Multiple Producer, Single Consumer) channel (although in this case there's only going to be one producer) of type String, and then I'm going to send the transmit half of the channel off to our actual generator.

Then I'm going to create and return the SSE (Server Side Event) producer that the app wants to route to. SSE wants a stream, and I want to stream the contents of the heartbeat generator, which doesn't produce one. Fortunately, the try_stream! macro will re-write a lot of this and replace the yield <something> keywords with code to output a stream.

<heartbeat>= (U->)
async fn heartbeat() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let (tx, mut rx) = mpsc::channel::<String>(8);
    let _sender = task::spawn(async move { heartbeat_generator(tx).await });

    Sse::new(try_stream! {
        while let Some(received) = rx.recv().await {
            let event = Event::default().data(received);
            yield event;
        }
    })
}

Now I'm going to admit that this all makes me a bit uncomfortable. I understand that Sse is something that produces a Response object on a schedule, but I don't really understand how Axum receives those objects or what it does with them. I don't understand (yet) the bookkeeping that goes into task::spawn, or how Tokio is keeping track of all this eventing. It feels very esoteric and abstract to me; in a way, it's much worse than writing Haskell code, because so much is hidden away behind curtains. Nonetheless, this did what I wanted it to do.

The heartbeat generator is just a loop that takes the transmitter and sends messages over it. There's really nothing esoteric here, except of course for that we need to use tokio's sleep function so that the sleeping goes into the generator, rather than the usual Unix thing of just waiting on a signal.

<heartbeat generator>= (U->)
async fn heartbeat_generator(tx: mpsc::Sender<String>) {
    loop {
        let message = String::from("Hello!");
        if let Err(_e) = tx.send(message).await {
            eprintln!("Something broke in the heartbeat generator?");
            break;
        }
        let _ = sleep(Duration::from_secs(2)).await;
    }
}

Along with the header, our code now is just:

<main.rs>=
use async_stream::try_stream;
use axum::{
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use futures::stream::Stream;
use std::{convert::Infallible, path::PathBuf};
use tokio::{
    sync::mpsc,
    task,
    time::{sleep, Duration},
};
use tower_http::services::ServeDir;

<main>
<app>
<heartbeat>
<heartbeat generator>

Now, to receive this content, we need an HTML page to host it. Something like:

<index.html>=
<html>
    <head>
        <title>Heartbeat revealer</title>
    </head>
    <body>
        <h3>Message for you, sir:</h3>
        <ul id="results"></ul>
    </body>
    <script src="script.js"></script>
</html>

And the JavaScript to receive it:

<script.js>=
var eventSource = new EventSource('heartbeat');
eventSource.onmessage = function(event) {
    const ul = document.getElementById("results");
    const li = document.createElement("li");
    li.appendChild(document.createTextNode(event.data));
    ul.appendChild(li);
}

Overall, this project told me that there's a ton of stuff I still need to understand before I start going around making Async Rust code that does anything of import. Still, the fact that I got this far reassures me that it's not all impossible. I just hate the size of the thing, y'know? Rust Async is sprawling, and I feel lost every time I start to deal with it. Narrowing it down to "I need an app, and this example shows me how to do SSE, but I don't want to just do SSE, I want to send the stuff over SSE from somewhere else..." just made me feel a little nutty.

The point of this project is to be a stepping stone toward a series of headless applications. Readmes that start with "It's X, but for the command line!" confuse me. Just make an 'X' library and wrap it in whatever user experience you want.