Compared to traditional financial markets, crypto markets experience more volatility with price swinging in either direction at a quicker pace. Price of each cryptocurrency also tends to vary across exchanges. Given such a dynamic
nature, investors and traders looking to navigate the market need fast andreliable data from various crypto exchanges. In this tutorial, we’ll take a look at three different ways to ingest crypto market data into QuestDB for further analysis:
- Using the Cryptofeed library
- Writing a custom data pipeline
- Via Change Data Capture (CDC)
Prerequisites
We will be using QuestDB to ingest and store crypto market data. Create a new directory and from the directory, run the following to start a local instance of QuestDB:
mkdir cryptofeed-questdb
cd cryptofeed-questdb
docker run \
-p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 \
-v "$(pwd):/var/lib/questdb" \
questdb/questdb:7.0.1
Method 1: ingesting data using the Cryptofeed library
One of the easiest ways to ingest market data is to use an open-source tool called Cryptofeed. The Python library establishes websocket connections to various exchanges including Binance, Coinbase, Gemini, and Kraken and returns trade, market, and book update data in a standardized format. Cryptofeed also has native integration with QuestDB, making it a great choice to ingest data rapidly.
To get started, create a virtual environment with Python 3.8+. We will use venv but you can use conda, poetry, or virtualenv as well. We will create a venv for cryptofeed:
$ python3 -m venv cryptofeed
$ source cryptofeed/bin/activate
Then install cryptofeed: pip install cryptofeed
Navigate into the cryptofeed
directory and create a new file questdb.py
. We will then paste the following to ingest trade data for BTC-USD pair from Coinbase and Gemini:
from cryptofeed import FeedHandler
from cryptofeed.backends.quest import TradeQuest
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Coinbase, Gemini
QUEST_HOST = '127.0.0.1'
QUEST_PORT = 9009
def main():
f = FeedHandler()
f.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD'], callbacks={TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT)}))
f.add_feed(Gemini(channels=[TRADES], symbols=['BTC-USD'], callbacks={TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT)}))
f.run()
if __name__ == '__main__':
main()
When you run this code, it will automatically create a socket connection with Coinbase and Gemini API and push data to QuestDB. Note that it may take a while to see data populated (especially from Gemini).
Navigate to localhost:9000 to access the web console. We can query data from Coinbase via SELECT * FROM trades-COINBASE
:
You can see all the supported exchanges and supported channels (e.g., L1/L2/L3 books, trades, ticket, candles, open interest, etc) on the Cryptofeed GitHub page.
If you want to modify the structure of the data ingested into QuestDB, you can override the callback handler. For example, if you want to change the name of the table it writes to or the columns, you can specify the write
function. In
fact, the QuestDB demo site implements cryptofeed to ingest data to the trades
table with the following custom callback function:
from cryptofeed import FeedHandler
from cryptofeed.backends.backend import BackendCallback
from cryptofeed.backends.socket import SocketCallback
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Coinbase
QUEST_HOST = '127.0.0.1'
QUEST_PORT = 9009
class QuestCallback(SocketCallback):
def __init__(self, host='127.0.0.1', port=9009, **kwargs):
super().__init__(f"tcp://{host}", port=port, **kwargs)
self.numeric_type = float
self.none_to = None
async def writer(self):
while True:
try:
await self.connect()
except:
exit(-1)
async with self.read_queue() as update:
update = "\n".join(update) + "\n"
try:
self.conn.write(update.encode())
except:
exit(-2)
class TradeQuest(QuestCallback, BackendCallback):
default_key = 'trades'
async def write(self, data):
update = f'{self.key},symbol={data["symbol"]},side={data["side"]} price={data["price"]},amount={data["amount"]} {int(data["timestamp"] * 1_000_000_000)}'
await self.queue.put(update)
def main():
handler = FeedHandler()
handler.add_feed(Coinbase(channels=[TRADES], symbols=['BTC-USD', 'ETH-USD'],
callbacks={TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT)}))
hanlder.run()
if __name__ == '__main__':
main()
Underneath the hood, cryptofeed library utilizes plain socket connections via Influx Line Protocol (ILP) to push data to
QuestDB. As such, it is important to provide the raw ILP string in the write callback function.
The biggest advantage of using Cryptofeed is the large number of preconfigured integrations with various exchanges. The library does the heavy lifting of normalizing the data so ingesting it into QuestDB is very simple. However, if you need more control over the type or format of the data, you may need to call the exchange API directly.
Method 2: Build a custom market data pipeline with Cryptofeed data fetcher
If Cryptofeed does not support the exchange you are interested in or if you need more control over the type or format of the data, you can opt to write your own data ingestion function. With QuestDB, you have the option to use PostgreSQL wire or ILP. Since the ILP is faster and supports schemaless ingestion, we will show an example of using the InfluxDB Line Protocol via QuestDB Node.js SDK to ingest price data from Binance and Gemini:
const axios = require("axios")
const { Sender } = require("@questdb/nodejs-client");
async function main() {
// create a sender with a 4k buffer
const sender = new Sender({ bufferSize: 4096 });
// connect to QuestDB
// host and port are required in connect options
await sender.connect({ port: 9009, host: "localhost" });
async function getBinanceData() {
const { data } = await axios.get(
"https://api.binance.us/api/v3/avgPrice?symbol=BTCUSD",
)
// add rows to the buffer of the sender
sender
.table("prices")
.symbol("pair", "BTCUSD")
.stringColumn("exchange", "Binance")
.floatColumn("bid", parseFloat(data.price))
.atNow();
await sender.flush();
setTimeout(getBinanceData, 1000)
}
async function getGeminiData() {
const { data } = await axios.get("https://api.gemini.com/v1/pricefeed")
const { price } = data.find((i) => i.pair === "BTCUSD")
// add rows to the buffer of the sender
sender
.table("prices")
.symbol("pair", "BTCUSD")
.stringColumn("exchange", "Gemini")
.floatColumn("bid", parseFloat(price))
.atNow();
await sender.flush();
setTimeout(getGeminiData, 1000)
}
getBinanceData()
getGeminiData()
}
main()
The code above polls the REST endpoints of Binance and Gemini API and writes the data to a table called prices
:
While writing a custom data ingestion function is more work than simply using Cryptofeed, it can be a great option if you need to customize the fields or run some preprocessing logic prior to sending it to QuestDB.
Method 3: Ingest market data using Cange Data Capture (CDC)
Finally, you can ingest data via Change Data Capture (CDC) if you have an external data stream or database that you can listen on. For example, an external data market team might publish price data on Kafka or push updates to a relational database. Instead of polling this data directly, you could opt to leverage CDC patterns to stream changes to QuestDB instead.
An example of this architecture is detailed in
Realtime crypto tracker with QuestDB Kafka Connector.
This reference architecture has a function that polls Coinbase API for latest price data and publishes it to Kafka topics. QuestDB Kafka Connector in turn publishes that data to QuestDB.
Wrapping up
Wrapping up QuestDB offers various ways to ingest crypto market data quickly. For a starting point, utilize the Cryptofeed library to connect to various exchanges that are already supported, and optionally modify the ingestion by implementing your own callback. If you need to integrate with a data feed not supported by Cryptofeed, you can write a custom data ingestor and publish data over InfluxDB line protocol to QuestDB. Finally, if there’s an existing data
feed that Debezium supports (e.g., Kafka, PostgreSQL) then using CDC can be a great choice to minimize the infrastructure burden.
Top comments (0)