Introduction

They say imitation is the sincerest form of flattery. In that light, I was inspired by an ex-Morgan Stanley colleague - Kyle Downey - to start my own blog. This is my first post on that endeavor and I decided to put my cloudy spin on one of Kyle’s latest posts.

You can find Kyles’ post here. He discusses connecting to the Phemex Cryptocurrency Derivatives Exchange via python websockets and ingesting and processing data according to his asynchronous functional reactive library, Tau. It’s a great post and if you haven’t read it, I highly recommend going through it and the other posts in the series.

Rusty Rust Rust

I’m a developer at heart and I’m always on the look out for new and exciting technologies to get my teeth into, whether that’s Machine Learning, Distributed Systems, Serverless - you name it. I have been learning Rust since November 2019 and tried out some experiments namely:

So, I decided to replicate Kyles approach using Rust and AWS.

Let’s breakdown some of the requirements:

  1. Connect to the Phemex Crypto API via websocket
  2. Send a subscription JSON message for the BTCUSD trade feed
  3. Send a periodic heartbeat message
  4. Ingest the feed
  5. Tee off the raw data into a datalake for OLAP-style analysis
  6. Hook up a Business Intelligence tool to visualize the raw data
  7. Stretch Goal - Do some real-time streaming analysis on the feed to look for anomalous price swings and send to the datalake.

I’ll cover the first three requirements in this first post. The second part will cover points 4,5 and 6. The final point will be covered in the third post in this series.

In my case, the datalake here uses Amazon S3 to store the raw data. I’m a big proponent of schema-on-read, especially in today’s big data analytics world where storage is cheap and compute is ephemeral and data attributes are in constant flux.

The solution architecture will consist of a Rust service that will send raw data to a Kinesis Firehose stream. From here, Kinesis Firehose delivers the raw data to an S3 bucket, transformed into the Parquet format.

I then use AWS Glue to scan the bucket for a schema and then I can execute arbitrary queries against it using Amazon Athena and SQL. I can then hook up Amazon Quicksight and create visualizations to my heart’s content. Minimal infrastructure, unlimited scale, cost effective - just the way I like it.

My Stretch goal will take the form of Kinesis Data Analytics. Because I’m already ingesting the feed via Kinesis, it’s fairly trivial to enable this and perform real-time statistics.

The Rust Bit

So where to start. I know that Phemex has a websocket API and that the documentation lives here. Reading this tells me two things:

  • I need to send a JSON message as a subscription for the BTCUSD feed
  • In order to be a good API Citizen, I should send a PING message every 5 seconds or so to act as a heartbeat or risk server-side disconnection.

Ok with that in mind, lets write some rust.

I’m using the incredible serde_json crate here and coming from other languages it.is.magical.

First off, lets define my Subscription type:

#[derive(Serialize, Deserialize)]
struct Subscription {
    id: i32,
    method: &'static str,
    params: Vec<&'static str>,
}

I also want a type to marshal the JSON response from Phemex:

#[derive(Serialize, Deserialize)]
struct TradeResponse {
    sequence: u64,
    symbol: &'static str,
    trades: Vec<Vec<Value>>,
}

I’ll be using the Tungstenite Websocket library because it’s both simple and lightweight to connect to Phemex.

Let’s build a Subscription Struct:

#[tokio::main]
async fn main() {
    env::set_var("RUST_LOG", "info");
    env_logger::init();

    let btcusd_subscription = Subscription {
        id: 1,
        method: "trade.subscribe",
        params: vec!["BTCUSD"],
    };

We will also need a PING message to be sent, so I can reuse the Subscription type for this - perhaps an enhancement would be for this to have its own type but for now, this will do:

let ping_message = Subscription {
    id: 1,
    method: "server.ping",
    params: vec![],
};

Ok we have our datatypes, let’s make a connection to Phemex:

let (mut socket, response) =
    connect(Url::parse("wss://phemex.com/ws")
            .expect("Invalid Websocket URI"))
        .expect("Could Not Connect to Websocket Address");

For those that aren’t familiar with Rust syntax and error handling, this is a great guide.

I’ll send the subscription message now for BTCUSD:

// Convert the Struct to a JSON string
let btc_sub = serde_json::to_string(&btcusd_subscription)
        .expect("Could not serialize Subscription to JSON");

// Marshall the JSON String to a Tungstenite Message
let btc_sub = Message::Text(btc_sub);

// Send the subscription message to the service
socket
    .write_message(btc_sub)
    .expect("Could not send subscription request.");

As part of being good API citizens, Phemex request that we send a heartbeat message every 5 seconds. For this, I decided to use a Timer-based approach to send a message to a Channel.

One of the massive benefits to Rust is the notion of Fearless Concurrency The timer thread has one end of a MPSC (Multi-producer Single Consumer) Channel. As part of my message processing loop, I do an asynchronous call to the receiving end of this channel, if there’s a message then send it.

// Setup the Timer
let (tx, rx) = mpsc::channel();
let ping_timer = timer::Timer::new();

// The move closure is executed on another thread.
let _guard = ping_timer.schedule_repeating(
    chrono::Duration::seconds(5), move || {
        debug!("Sending PING message.");
        tx.send(p_string.clone())
            .expect("Cannot send ping message.");
});

My main processing is just a simple infinite loop that checks the websocket for new data, and also does a try_recv() call on the timer channel we set up. If there’s a message from the timer, great let’s send it to the websocket, otherwise, don’t block the loop and continue receiving messages.

There’s probably a much more elegant solution to this waiting somewhere out there, but hey I’m a rust-newbie:

loop {
    let msg = socket
        .read_message()
        .expect("Error reading message from socket.");

    match rx.try_recv() {
        Ok(msg) => {
            debug!("Got PING message, sending.");
            socket
                .write_message(Message::Ping(msg.into_bytes()))
                .expect("Could not write PING Message.")
        }
        _ => (),
    }
    //...Firehose code
}

Lo and behold, trades !

{"sequence":857609649,"symbol":"BTCUSD","trades":[[1587650681818879665,"Buy",72960000,552]],"type":"incremental"}
{"sequence":857609857,"symbol":"BTCUSD","trades":[[1587650682134103601,"Buy",72960000,248]],"type":"incremental"}
{"sequence":857610273,"symbol":"BTCUSD","trades":[[1587650682376529787,"Sell",72985000,20]],"type":"incremental"}
...

This ingest service would ideally run in a container since I’m pulling from an external service. Because I don’t particularly want to run, maintain, care and feed for my own container system AWS Fargate would be ideal here.

No offense to K8S I just don’t particularly need all those features for this, since the main bulk of my processing and data storage will run natively and serverlessly in the cloud. I can also get away with a very modest container instance here, the entire rust binary runs in less than 5MB of memory.

In part 2 (coming soon.) we explore the AWS componentry, including setting up and sending messages to Kinesis Firehose and where to land and process our data.

Al