Introduction

This post is the second in my Phemex series. You can find the background and more details in Part 1 here.

In Part 2 the goal is to ingest the stream of trades that we subscribed to in part 1. This stream will then be transformed into the Parquet message format and sent to the datalake.

More Rust and…Dragons

In Part 1 I defined my data model. I made a couple of modifications to fit in better with Rust’s borrow-checker.

The datatypes now look like this:

#[derive(Serialize, Deserialize)]
struct Subscription {
    id: i32,
    method: String,
    params: Vec<String>,
}

#[derive(Serialize, Deserialize)]
struct TradeResponse {
    sequence: u64,
    symbol: String,
    trades: Vec<Vec<Value>>,
}

I also decided not to send the entire TradeResponse, instead I split each one of the trades out of each TradeResponse and send the tuple. The other information wasn’t really relevant for the type of analysis I want to conduct:

#[derive(Serialize, Deserialize)]
struct Trade {
    timestamp: i64,
    direction: String,
    price: i64,
    quantity: i64,
}

Ok so, now I’m happy with the data model I want to send this to Amazon Kinesis Firehose. Since Kinesis is designed for high concurrency, I want to make use of multiple threads in order to send my messages. Tokio has a great task-based concurrency API. This is a light weight non-blocking unit of execution, similar to green threads from other languages like Go, Python.

Here Be (some small) Dragons

I’m using the Rusoto crate to communicate with the backend AWS processes. This also uses Tokio as the runtime on the backend so it fits in really well with the asynchronous model. I start with creating my Kinesis client:

#[tokio::main]
async fn main() {
    env::set_var("RUST_LOG", "info");
    env_logger::init();

    let fh_client = Arc::new(
        KinesisFirehoseClient::new(Region::UsEast2)
    );

Rust is all about ownership. I’ve decided to wrap the Kinesis Firehose client in an Arc which allows for shared ownership between threads.

This is why I entitled this section Here Be (some small) Dragons since to be honest, I’m not 100% sure this is the best way to go. I’m new to Rust, this feels right, but I would love some feedback via Twitter if there’s an easier way to do this. I’ll name this the Dragon Caveat - you’ll see it come up a few times !

Once the client is created, I can go back to my main event loop. There are a few different message types received from the socket, so I only want to send actual trade messages. I created a quick helper method to check if it’s a valid BTCUSD message first and parse out the Trade tuples. The initial message when you first connect that comes back is a snapshot of the 1000 most recent trades, and subsequent trades are marked as incremental. I just send everything to the Kinesis stream and ingest.

if is_trade_message(&msg) {
    let trade_struct: TradeResponse =
        serde_json::from_str(&msg.to_text().expect("Cannot create text from Message"))
            .expect("Cannot serialize Message to TradeResponse");

    for trade in trade_struct.trades {
        let trade_info = Trade {
            timestamp: trade[0].as_i64().unwrap_or_default(),
            direction: String::from(trade[1].as_str().unwrap_or_default()),
            price: trade[2].as_i64().unwrap_or_default(),
            quantity: trade[3].as_i64().unwrap_or_default(),
        };
...

Ok got my trade tuple, lets send into the stream and check for errors, if not then just log out the message ID:

let trade_string = serde_json::to_string(&trade_info)
    .expect("Could not unmarshall TradeResponse to String.");

// Create the PutRecord
let pri = PutRecordInput {
    delivery_stream_name: "my-btc-delivery-stream".to_owned(),
    record: Record {
        data: Bytes::from(trade_string.to_owned()),
    },
};

// Create a clone for the client, this gets
// 'moved' into the Tokio Task Greenlet
let cloned_fh_client = fh_client.clone();

task::spawn(async move {
    match cloned_fh_client.put_record(pri).await {
        Err(error) => {
            error!("Encountered error writing to Firehose Stream: {}", error)
        }
        Ok(resp) => info!("Ingested : {}, {}", trade_string, resp.record_id),
    }
});

And that’s really all it takes. Since the Tokio runtime can schedule non-blocking threads across the physical threads on my machine I can create thousands of sending processes. By default, Kinesis Firehose will support up to 2000 transactions / second so I have plenty of room to play with.

If I wanted to ingest more instrument types or pull in data from other exchanges, I might investigate increasing the number of shards for the stream, but it’s not required here. It’s this method of push-button scaling with Kinesis that makes it very easy to reason about and scale.

Kinesis Firehose transforms the incoming messages into Parquet format and writes batches of them into the S3 bucket. As the files get written, they are automatically split into Year, Month, Day, Hour subfolders in S3 according to when they were sent to the stream. This is a setting in the Firehose configuration detailed below. This is important later on when we want to query based on time ranges and makes any such queries very efficient since it’s easy to isolate and query only messages from a certain time period.

From there, AWS Glue can identify the message schema and associate each record stored in S3 with a partition key and store this in the table metadata. Ultimately Athena will use the information stored in the Glue table to optimize queries against the data. Optimize here applies both in terms of both performance and cost - Athena is billed by the amount of data scanned per query so by reducing the amount of data required to scan the cost is also reduced. This approach scales very well and indeed, you can have very large amounts of data stored in this manner and still query it efficiently.

AWS Configuration

Streaming configuration

I need to first create an AWS Glue database and schema to hold my Schema information. It’s super simple since there’s only 4 fields. I could also do this automatically from pre-existing messages by having an AWS Glue crawler crawl over the S3 Bucket and get the schema that way.

GlueSchema AWS Glue Trade Schema + Partitions

Ok let’s create a new delivery stream and have it transform the incoming records to Parquet. I choose the AWS Glue database and Table I just created to tell Kinesis about the message Schema. Just a note to remember to add in the correct Prefix and Error Prefixes. This will help Amazon Athena and Amazon Quicksight queries later on.

KinesisFormat Kinesis Firehose custom output prefix

Serverless SQL

Once I have that setup, I can create an AWS Glue crawler at regular intervals to ingest new partitions. The AWS Glue crawler runs once per hour. I think it’s important to realize here that the AWS Glue Data Catalog holds metadata about the data source - the data itself is still held in S3. The database and table structure are then directly query-able by Amazon Athena and it’s easy to submit ad-hoc queries in SQL:

AthenaQuery Athena query output

I can also hook up Amazon Quicksight using Amazon Athena as a datasource to visualize the information.

AthenaNewDataSource Select Athena As a Datasource

AthenaChooseTable Select BTC Table for Analysis

Business Intelligence

QuicksightDashboard Quicksight dashboard showing price and direction

In this dashboard, I show the top 50 distinct prices that come up and show these in a heat map based on trade direction. As expected there is some clustering with outliers on either side. In my second visualization I show how the price and the trade direction changes over time.

The above dashboard took about ten minutes to put together. If I spent longer I could do more sophisticated analysis and also look into some of the ML Insights that Amazon Quicksight provides. That said, I think it illustrates my point - Amazon Quicksight is a powerful and flexible tool when used in conjunction with a serverless datalake for OLAP-style analytics.

Part 2 Wrap-Up

In this post we looked at some aspects of the Kinesis Firehose API, ingesting and transforming that stream of data and landing it in our S3 datalake. We then used AWS Glue to hold the schema information and conduct ad-hoc queries using Amazon Athena and SQL.

Finally Amazon Quicksight was used to visualize this information stored in the datalake and build out a simple dashboard showing price, quantity and buy/sell direction.

In part 3, I’ll explore streaming analytics using the firehose stream and how to send those results elsewhere for more real-time analysis.

Al