DEV Community

Yitaek Hwang
Yitaek Hwang

Posted on

Ingesting Financial Tick Data Using a Time-Series Database

Compared to traditional financial markets, crypto markets experience more volatility with price swinging in either direction at a quicker pace. Price of each cryptocurrency also tends to vary across exchanges. Given such a dynamic
nature, investors and traders looking to navigate the market need fast andreliable data from various crypto exchanges. In this tutorial, we’ll take a look at three different ways to ingest crypto market data into QuestDB for further analysis:

  1. Using the Cryptofeed library
  2. Writing a custom data pipeline
  3. Via Change Data Capture (CDC)

Image description

Prerequisites

We will be using QuestDB to ingest and store crypto market data. Create a new directory and from the directory, run the following to start a local instance of QuestDB:

mkdir cryptofeed-questdb
cd cryptofeed-questdb
docker run \
  -p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 \
  -v "$(pwd):/var/lib/questdb" \
  questdb/questdb:7.0.1
Enter fullscreen mode Exit fullscreen mode

Method 1: ingesting data using the Cryptofeed library

One of the easiest ways to ingest market data is to use an open-source tool called Cryptofeed. The Python library establishes websocket connections to various exchanges including Binance, Coinbase, Gemini, and Kraken and returns trade, market, and book update data in a standardized format. Cryptofeed also has native integration with QuestDB, making it a great choice to ingest data rapidly.

To get started, create a virtual environment with Python 3.8+. We will use venv but you can use conda, poetry, or virtualenv as well. We will create a venv for cryptofeed:

$ python3 -m venv cryptofeed
$ source cryptofeed/bin/activate
Enter fullscreen mode Exit fullscreen mode

Then install cryptofeed: pip install cryptofeed

Navigate into the cryptofeed directory and create a new file questdb.py. We will then paste the following to ingest trade data for BTC-USD pair from Coinbase and Gemini:


from cryptofeed import FeedHandler
from cryptofeed.backends.quest import TradeQuest
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Coinbase, Gemini


QUEST_HOST = '127.0.0.1'
QUEST_PORT = 9009




def main():
   f = FeedHandler()
   f.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD'], callbacks={TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT)}))
   f.add_feed(Gemini(channels=[TRADES], symbols=['BTC-USD'], callbacks={TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT)}))
   f.run()




if __name__ == '__main__':
   main()

Enter fullscreen mode Exit fullscreen mode

When you run this code, it will automatically create a socket connection with Coinbase and Gemini API and push data to QuestDB. Note that it may take a while to see data populated (especially from Gemini).

Navigate to localhost:9000 to access the web console. We can query data from Coinbase via SELECT * FROM trades-COINBASE:

Image description

You can see all the supported exchanges and supported channels (e.g., L1/L2/L3 books, trades, ticket, candles, open interest, etc) on the Cryptofeed GitHub page.

If you want to modify the structure of the data ingested into QuestDB, you can override the callback handler. For example, if you want to change the name of the table it writes to or the columns, you can specify the write function. In
fact, the QuestDB demo site implements cryptofeed to ingest data to the trades table with the following custom callback function:

from cryptofeed import FeedHandler
from cryptofeed.backends.backend import BackendCallback
from cryptofeed.backends.socket import SocketCallback
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Coinbase


QUEST_HOST = '127.0.0.1'
QUEST_PORT = 9009




class QuestCallback(SocketCallback):
   def __init__(self, host='127.0.0.1', port=9009, **kwargs):
       super().__init__(f"tcp://{host}", port=port, **kwargs)
       self.numeric_type = float
       self.none_to = None


   async def writer(self):
       while True:
           try:
               await self.connect()
           except:
               exit(-1)
           async with self.read_queue() as update:
               update = "\n".join(update) + "\n"
               try:
                   self.conn.write(update.encode())
               except:
                   exit(-2)


class TradeQuest(QuestCallback, BackendCallback):
   default_key = 'trades'


   async def write(self, data):
       update = f'{self.key},symbol={data["symbol"]},side={data["side"]} price={data["price"]},amount={data["amount"]} {int(data["timestamp"] * 1_000_000_000)}'
       await self.queue.put(update)




def main():
   handler = FeedHandler()
   handler.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD', 'ETH-USD'],
                             callbacks={TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT)}))
   hanlder.run()




if __name__ == '__main__':
   main()
Enter fullscreen mode Exit fullscreen mode

Underneath the hood, cryptofeed library utilizes plain socket connections via Influx Line Protocol (ILP) to push data to
QuestDB. As such, it is important to provide the raw ILP string in the write callback function.

Image description

The biggest advantage of using Cryptofeed is the large number of preconfigured integrations with various exchanges. The library does the heavy lifting of normalizing the data so ingesting it into QuestDB is very simple. However, if you need more control over the type or format of the data, you may need to call the exchange API directly.

Method 2: Build a custom market data pipeline with Cryptofeed data fetcher

If Cryptofeed does not support the exchange you are interested in or if you need more control over the type or format of the data, you can opt to write your own data ingestion function. With QuestDB, you have the option to use PostgreSQL wire or ILP. Since the ILP is faster and supports schemaless ingestion, we will show an example of using the InfluxDB Line Protocol via QuestDB Node.js SDK to ingest price data from Binance and Gemini:

const axios = require("axios")
const { Sender } = require("@questdb/nodejs-client");


async function main() {
 // create a sender with a 4k buffer
 const sender = new Sender({ bufferSize: 4096 });


 // connect to QuestDB
 // host and port are required in connect options
 await sender.connect({ port: 9009, host: "localhost" });


 async function getBinanceData() {
   const { data } = await axios.get(
     "https://api.binance.us/api/v3/avgPrice?symbol=BTCUSD",
   )


   // add rows to the buffer of the sender
   sender
     .table("prices")
     .symbol("pair", "BTCUSD")
     .stringColumn("exchange", "Binance")
     .floatColumn("bid", parseFloat(data.price))
     .atNow();


   await sender.flush();


   setTimeout(getBinanceData, 1000)
 }


 async function getGeminiData() {
   const { data } = await axios.get("https://api.gemini.com/v1/pricefeed")
   const { price } = data.find((i) => i.pair === "BTCUSD")


   // add rows to the buffer of the sender
   sender
     .table("prices")
     .symbol("pair", "BTCUSD")
     .stringColumn("exchange", "Gemini")
     .floatColumn("bid", parseFloat(price))
     .atNow();


   await sender.flush();
   setTimeout(getGeminiData, 1000)
 }

 getBinanceData()
 getGeminiData()
}


main()
Enter fullscreen mode Exit fullscreen mode

The code above polls the REST endpoints of Binance and Gemini API and writes the data to a table called prices:

Image description

While writing a custom data ingestion function is more work than simply using Cryptofeed, it can be a great option if you need to customize the fields or run some preprocessing logic prior to sending it to QuestDB.

Method 3: Ingest market data using Cange Data Capture (CDC)

Finally, you can ingest data via Change Data Capture (CDC) if you have an external data stream or database that you can listen on. For example, an external data market team might publish price data on Kafka or push updates to a relational database. Instead of polling this data directly, you could opt to leverage CDC patterns to stream changes to QuestDB instead.

An example of this architecture is detailed in
Realtime crypto tracker with QuestDB Kafka Connector.
This reference architecture has a function that polls Coinbase API for latest price data and publishes it to Kafka topics. QuestDB Kafka Connector in turn publishes that data to QuestDB.

Image description

Wrapping up

Wrapping up QuestDB offers various ways to ingest crypto market data quickly. For a starting point, utilize the Cryptofeed library to connect to various exchanges that are already supported, and optionally modify the ingestion by implementing your own callback. If you need to integrate with a data feed not supported by Cryptofeed, you can write a custom data ingestor and publish data over InfluxDB line protocol to QuestDB. Finally, if there’s an existing data
feed that Debezium supports (e.g., Kafka, PostgreSQL) then using CDC can be a great choice to minimize the infrastructure burden.

Additional resources

Top comments (0)