Introduction

Hello, this is a surprise posting, well surprise to me at least in the best possible way. I posted parts 1 and 2 here of this series to Reddit and another user pointed out that there was a much nicer way to perform the asynchronous sending and receiving portion that I had written. This is great, it’s exactly the reason I wanted to start a blog - continual growth and learning. I’m adding what I learnt here so that others may also see the difference and benefit from the much cleaner code.

Sync and a miss

The websocket library that I was using - Tungstenite - also has a Tokio version. I refactored the code to take advantage of this new library and, based on the example code in the github repository. This allowed me to separate the Ping and exchange messages out into separate Futures.

The main reason for this was it was pointed out that my original code actually blocks on message retrieval. This can stop the sending of a heartbeat if there are no messages received for more than 30s or so. In practice I didn’t see this due to the fact that there are messages constantly being received from the exchange, but still I took it as a learning opportunity.

Subscription code before :

let subscription_message = serde_json::to_string(&btcusd_subscription)
    .expect("Could not deserialize MarketTicket to JSON");

let ping_message =
    serde_json::to_string(&ping_message).expect("Could not deserialize PING message to JSON.");

let (mut socket, response) =
    connect(Url::parse("wss://phemex.com/ws").expect("Invalid Websocket URI"))
        .expect("Could Not Connect to Websocket Address");

let btc_sub = Message::Text(subscription_message);

socket
    .write_message(btc_sub)
    .expect("Could not send subscription request.");

let (tx, rx) = mpsc::channel();
let ping_timer = timer::Timer::new();

let _guard = ping_timer.schedule_repeating(chrono::Duration::seconds(10), move || {
    debug!("Sending PING message.");
    tx.send(ping_message.clone())
        .expect("Cannot send ping message.");
});

Subscription code afterwards :

let phemex_addr = String::from("wss://phemex.com/ws");
let (wss_stream, _resp) = connect_async(phemex_addr)
    .await
    .expect("Could not connect to the exchange.");

let mut interval = tokio::time::interval(Duration::from_secs(5));

let (mut tx, mut rx) = wss_stream.split();
let msg = Message::Text(serde_json::to_string(&btcusd_subscription).unwrap_or_default());

tx.send(msg)
    .await
    .expect("Could not send subscription message to exchange.");

You can see here, as well as being a lot cleaner, the tx/rx portions are split out from the wss_stream and that interval is now a tokio::time::duration which makes it easier to use later on in my message processing code.

The message processing code now looks like this :

loop {
    match select(rx.next(), interval.next()).await {
        Either::Left((msg, _tick_fut_continue)) => match msg {
            Some(msg) => {
                let msg = msg.expect("Could not marshall message.");
                if (msg.is_text() || msg.is_binary()) && is_trade_message(&msg) {
                    let trade_struct: TradeResponse = serde_json::from_str(
                        &msg.to_text().expect("Cannot create text from Message"),
                    )
                    .expect("Cannot serialize Message to TradeResponse");

                    # ...Process and send trade to Kinesis

And the interval is handled on the other side of the select clause in an Either::Right :

        Either::Right((_, _msg_fut_continue)) => {
            tx.send(Message::Ping(vec![]))
                .await
                .expect("Could not send PING keepalive.");
            info!("Sent PING Message as keepalive.");
        }

Much cleaner result, and now my message code is truly independent of my PING heartbeat.

Onwards to Timeseries databases in Part 3 !

Al