I'm continuing to work on Dali-RP2, which helps collect data from crypto exchanges and then matches that with historical pricing in order to prepare one's taxes. Recently, I ran into some issues pulling prices for crypto assets. Typically, to find a price for a given crypto asset for a given time, you can just access the public REST API of an exchange and it will return a OHLCV bar, which contains the Opening price, High price, Low price, Closing price and Volume.
That is all fine and good, but some exchanges can be quite stingy with their API. This is understandable, since you don't want users spamming a server for data. However, if you happen to have 1000 or so transactions you want to price it can take a long long time to price everything.
For instance, Kraken requires around 5 seconds between each call to its API. For 1000 transactions, this would take around 7 hours to pull all the pricing I need. Luckily, they have downloadable CSV files that cover everything but the last quarter. So, you can just download the files and save them to cache, right?
Unfortunately, this creates a massive cache that requires a lot of memory, and requires even more memory when it is saved to disk. Eventually, if you work with enough crypto assets this will cause the program to crash because it will run out of memory.
To avoid this problem, we have to chunk the csv files into smaller files that we then read prices from.
Chunk It, You must Chunk It
I was able to find a good comprehensive overview on the myriad of ways one can chunk things. From there, I adapted it for my particular situation.
def __split_process(self, csv_file: str, chunk_size: int = _CHUNK_SIZE) -> Generator[Tuple[str, List[List[str]]], None, None]:
chunk: List[List[str]] = []
lines = reader(csv_file.splitlines())
position = _PAIR_START
next_timestamp: Optional[int] = None
for line in lines:
if next_timestamp is None:
next_timestamp = ((int(line[self.__TIMESTAMP_INDEX]) + chunk_size) // chunk_size) * chunk_size
if int(line[self.__TIMESTAMP_INDEX]) % chunk_size == 0 or int(line[self.__TIMESTAMP_INDEX]) > next_timestamp:
yield position, chunk
if position == _PAIR_START:
position = _PAIR_MIDDLE
chunk = []
next_timestamp += chunk_size
chunk.append(line)
if chunk:
position = _PAIR_END
yield position, chunk
def _split_chunks_size_n(self, file_name: str, csv_file: str, chunk_size: int = _CHUNK_SIZE) -> None:
pair, duration_in_minutes = file_name.strip(".csv").split("_", 1)
chunk_size *= min(int(duration_in_minutes), _MAX_MULTIPLIER)
file_timestamp: str
pair_start: Optional[int] = None
pair_end: int
pair_duration: str = pair + duration_in_minutes
for position, chunk in self.__split_process(csv_file, chunk_size):
file_timestamp = str((int(chunk[0][self.__TIMESTAMP_INDEX])) // chunk_size * chunk_size)
if position == _PAIR_END:
pair_end = int(chunk[-1][self.__TIMESTAMP_INDEX])
if pair_start is None:
pair_start = int(chunk[0][self.__TIMESTAMP_INDEX])
elif position == _PAIR_START:
pair_start = int(chunk[0][self.__TIMESTAMP_INDEX])
chunk_filename: str = f'{pair}_{file_timestamp}_{duration_in_minutes}.{"csv.gz"}'
chunk_filepath: str = path.join(self.__CACHE_DIRECTORY, chunk_filename)
with gopen(chunk_filepath, "wt", encoding="utf-8", newline="") as chunk_file:
csv_writer = writer(chunk_file)
for row in chunk:
csv_writer.writerow(row)
if pair_start:
self.__cached_pairs[pair_duration] = PairStartEnd(start=pair_start, end=pair_end)
Let's take a look at that line by line. First in the __split_process
function. We want to be able to tell _split_chunk_size_n
the start and end times of the data to make it easier to retrieve the data later. We don't want to waste time attempting to access a time frame that doesn't exist. So, we keep track of where we are in each CSV file with the position
variable.
We want to yield
the chunk when the timestamp
(stored in the __TIMESTAMP_INDEX
column) reaches a multiplier of the chunk_size
or goes above it. To do that, we need to find the next multiplier of the chunk_size.
next_timestamp = ((int(line[self.__TIMESTAMP_INDEX]) + chunk_size) // chunk_size) * chunk_size
Above, we first add the chunk_size
to the current timestamp in order to get a timestamp that is in the next chunk. We then "floor" the timestamp by using integer division //
to divide it by chunk_size
and then multiply it by the chunk_size
. This will round the number down to the nearest chunk_size
.
Here is a simple example of what we are doing if we had a chunk_size
of 10.
36 // 10 = 3
3 * 10 = 30
When we reach the next chunk we yield
the position and chunk, reset the chunk, and set the next_timestamp
to the next chunk.
if int(line[self.__TIMESTAMP_INDEX]) % chunk_size == 0 or int(line[self.__TIMESTAMP_INDEX]) > next_timestamp:
yield position, chunk
if position == _PAIR_START:
position = _PAIR_MIDDLE
chunk = []
next_timestamp += chunk_size
Otherwise, we keep appending lines to the chunk. Note that the timestamp that is a multiple of the chunk_size
or just beyond it is the first bar in the next chunk.
When we reach the end of the file we yield
the last chunk and tell _split_chunk_size_n
our position.
if chunk:
position = _PAIR_END
yield position, chunk
Standardizing File Size
That worked pretty well, but I ended up with a lot of files. I had around 460 files for each pair or asset I wanted to price. This isn't too bad, but I noticed files with longer durations were smaller. That seemed like wasted space to me, so I wanted to make the files all around the same size.
The CSV files list pricing bars (OHLCV) of different durations. There are 6 files in total - 1 minute, 5 minute, 15 minute, 60 minute, 12 hour, and 24 hour. In theory, the 5 minute files would only contain one fifth of the bars that the 1 minute files did, and so forth and so on. Based on this, I decided to add a multiplier to the chunk_size, which would increase the timespan covered by the larger durations.
pair, duration_in_minutes = file_name.strip(".csv").split("_", 1)
chunk_size *= min(int(duration_in_minutes), _MAX_MULTIPLIER)
This reduced the number of files generated to only about 140 or so per asset I want to price.
Summary
I've just started working with Python. Before that, I was working with Ruby on Rails where everything just goes into a database. This was my first time to deal with a large chunk of data and how to manage it. On the surface of it, it seems like a rather primitive way to handle it, but it ended up being very fast for my needs. I'm able to price numerous assets in a relatively short period of time, and if need be fall back on the REST API for the last quarter.
What do you think? Do you have other ways you deal with large pieces of data like this? I'd love to hear how others handle this. Let me know in the comments below.
Top comments (0)