DEV Community

Cover image for Dealing with Large CSV files in Python by Chunking
Neal Chambers
Neal Chambers

Posted on

Dealing with Large CSV files in Python by Chunking

I'm continuing to work on Dali-RP2, which helps collect data from crypto exchanges and then matches that with historical pricing in order to prepare one's taxes. Recently, I ran into some issues pulling prices for crypto assets. Typically, to find a price for a given crypto asset for a given time, you can just access the public REST API of an exchange and it will return a OHLCV bar, which contains the Opening price, High price, Low price, Closing price and Volume.

That is all fine and good, but some exchanges can be quite stingy with their API. This is understandable, since you don't want users spamming a server for data. However, if you happen to have 1000 or so transactions you want to price it can take a long long time to price everything.

For instance, Kraken requires around 5 seconds between each call to its API. For 1000 transactions, this would take around 7 hours to pull all the pricing I need. Luckily, they have downloadable CSV files that cover everything but the last quarter. So, you can just download the files and save them to cache, right?

Unfortunately, this creates a massive cache that requires a lot of memory, and requires even more memory when it is saved to disk. Eventually, if you work with enough crypto assets this will cause the program to crash because it will run out of memory.

To avoid this problem, we have to chunk the csv files into smaller files that we then read prices from.

Chunk It, You must Chunk It

I was able to find a good comprehensive overview on the myriad of ways one can chunk things. From there, I adapted it for my particular situation.

def __split_process(self, csv_file: str, chunk_size: int = _CHUNK_SIZE) -> Generator[Tuple[str, List[List[str]]], None, None]:
    chunk: List[List[str]] = []

    lines = reader(csv_file.splitlines())
    position = _PAIR_START
    next_timestamp: Optional[int] = None

    for line in lines:
        if next_timestamp is None:
            next_timestamp = ((int(line[self.__TIMESTAMP_INDEX]) + chunk_size) // chunk_size) * chunk_size

        if int(line[self.__TIMESTAMP_INDEX]) % chunk_size == 0 or int(line[self.__TIMESTAMP_INDEX]) > next_timestamp:
            yield position, chunk
            if position == _PAIR_START:
                position = _PAIR_MIDDLE
            chunk = []
            next_timestamp += chunk_size
        chunk.append(line)
    if chunk:
        position = _PAIR_END
        yield position, chunk

def _split_chunks_size_n(self, file_name: str, csv_file: str, chunk_size: int = _CHUNK_SIZE) -> None:

    pair, duration_in_minutes = file_name.strip(".csv").split("_", 1)
    chunk_size *= min(int(duration_in_minutes), _MAX_MULTIPLIER)
    file_timestamp: str
    pair_start: Optional[int] = None
    pair_end: int
    pair_duration: str = pair + duration_in_minutes

    for position, chunk in self.__split_process(csv_file, chunk_size):
        file_timestamp = str((int(chunk[0][self.__TIMESTAMP_INDEX])) // chunk_size * chunk_size)
        if position == _PAIR_END:
            pair_end = int(chunk[-1][self.__TIMESTAMP_INDEX])
            if pair_start is None:
                pair_start = int(chunk[0][self.__TIMESTAMP_INDEX])
        elif position == _PAIR_START:
            pair_start = int(chunk[0][self.__TIMESTAMP_INDEX])

        chunk_filename: str = f'{pair}_{file_timestamp}_{duration_in_minutes}.{"csv.gz"}'
        chunk_filepath: str = path.join(self.__CACHE_DIRECTORY, chunk_filename)

        with gopen(chunk_filepath, "wt", encoding="utf-8", newline="") as chunk_file:
            csv_writer = writer(chunk_file)
            for row in chunk:
                csv_writer.writerow(row)

    if pair_start:
        self.__cached_pairs[pair_duration] = PairStartEnd(start=pair_start, end=pair_end)
Enter fullscreen mode Exit fullscreen mode

Let's take a look at that line by line. First in the __split_process function. We want to be able to tell _split_chunk_size_n the start and end times of the data to make it easier to retrieve the data later. We don't want to waste time attempting to access a time frame that doesn't exist. So, we keep track of where we are in each CSV file with the position variable.

We want to yield the chunk when the timestamp (stored in the __TIMESTAMP_INDEX column) reaches a multiplier of the chunk_size or goes above it. To do that, we need to find the next multiplier of the chunk_size.

next_timestamp = ((int(line[self.__TIMESTAMP_INDEX]) + chunk_size) // chunk_size) * chunk_size
Enter fullscreen mode Exit fullscreen mode

Above, we first add the chunk_size to the current timestamp in order to get a timestamp that is in the next chunk. We then "floor" the timestamp by using integer division // to divide it by chunk_size and then multiply it by the chunk_size. This will round the number down to the nearest chunk_size.

Here is a simple example of what we are doing if we had a chunk_size of 10.

36 // 10 = 3
3 * 10 = 30

When we reach the next chunk we yield the position and chunk, reset the chunk, and set the next_timestamp to the next chunk.

if int(line[self.__TIMESTAMP_INDEX]) % chunk_size == 0 or int(line[self.__TIMESTAMP_INDEX]) > next_timestamp:
    yield position, chunk
    if position == _PAIR_START:
        position = _PAIR_MIDDLE
    chunk = []
    next_timestamp += chunk_size
Enter fullscreen mode Exit fullscreen mode

Otherwise, we keep appending lines to the chunk. Note that the timestamp that is a multiple of the chunk_size or just beyond it is the first bar in the next chunk.

When we reach the end of the file we yield the last chunk and tell _split_chunk_size_n our position.

if chunk:
    position = _PAIR_END
    yield position, chunk
Enter fullscreen mode Exit fullscreen mode

Standardizing File Size

That worked pretty well, but I ended up with a lot of files. I had around 460 files for each pair or asset I wanted to price. This isn't too bad, but I noticed files with longer durations were smaller. That seemed like wasted space to me, so I wanted to make the files all around the same size.

The CSV files list pricing bars (OHLCV) of different durations. There are 6 files in total - 1 minute, 5 minute, 15 minute, 60 minute, 12 hour, and 24 hour. In theory, the 5 minute files would only contain one fifth of the bars that the 1 minute files did, and so forth and so on. Based on this, I decided to add a multiplier to the chunk_size, which would increase the timespan covered by the larger durations.

pair, duration_in_minutes = file_name.strip(".csv").split("_", 1)
chunk_size *= min(int(duration_in_minutes), _MAX_MULTIPLIER)
Enter fullscreen mode Exit fullscreen mode

This reduced the number of files generated to only about 140 or so per asset I want to price.

Summary

I've just started working with Python. Before that, I was working with Ruby on Rails where everything just goes into a database. This was my first time to deal with a large chunk of data and how to manage it. On the surface of it, it seems like a rather primitive way to handle it, but it ended up being very fast for my needs. I'm able to price numerous assets in a relatively short period of time, and if need be fall back on the REST API for the last quarter.

What do you think? Do you have other ways you deal with large pieces of data like this? I'd love to hear how others handle this. Let me know in the comments below.

Top comments (0)