This post is an adaptation of the one I originally published in the Orchest blog.
Preface: pandas is awesome, but it is not enough
pandas was created in 2008 by Wes McKinney as a "skunkworks project" for his data analysis tasks, and since then it has grown to be one of the key enablers of the Python growth in the Data Science industry. Its powerful CSV reading capabilities, its SQL-like aggregation and grouping capabilities, its rich time series processing methods and its integration with Jupyter have made pandas an essential tool in any Data Scientist toolbelt.
In [1]: import pandas as pd
In [2]: df = pd.read_csv("...")
In [3]: df.head()
Growth of pandas in Stack Overflow compared to other Python-related tags (source)
However, as described by its own creator, pandas has some design flaws that cannot be easily amended and suffers from a few shortcomings that limit its applicability to small to medium datasets. This blog post describes some of these limitations, which can be summarized as follows:
(1) Many pandas operations don't take advantage of multiple cores or query planning
pandas was not designed with big datasets in mind and uses an eager evaluation model, and as such, complex chained operations create many intermediate objects that in some cases can be quite big. On the other hand, even though there have been some recent efforts to leverage multicore in pandas, the results are somewhat heterogeneous, and in many cases pandas is bound to Python's Global Interpreter Lock, that enforces that only one thread can access the CPU at a given time.
(2) Lousy memory management
There are several ways to handle missing data, and each one has its tradeoffs: pandas choice of using sentinel values reduces the amount of memory needed, but at the same time it introduces small inconsistencies across data types that are being addressed now, and makes it more difficult to the CPU to apply vectorized mathematical operations. On the other hand, the lack of support for memory-mapping (more on that below) and the way strings and categories are handled also reduce the efficiency of some of the operations.
Because of these limitations, several libraries have tried to extend pandas to bigger workloads or create faster, more efficient alternatives.
Demystifying Apache Arrow
Apache Arrow (Arrow for short) is an open source project that defines itself as "a language-independent columnar memory format" (more on that later). It is part of the Apache Software Foundation, and as such is governed by a community of several stakeholders. It has implementations in several languages (C++ and also Rust, Julia, Go, and even JavaScript) and bindings for Python, R and others that wrap the C++ implementation.
One of its creators is Wes McKinney himself, so it is no surprise that Python is one of the main targets of Arrow!
But... what is Arrow exactly?
There has been widespread confusion around Arrow and how does it compare with things like Parquet. The topic deserves some clarification.
Arrow defines two binary representations: the Arrow IPC Streaming Format and the Arrow IPC File (or Random Access) Format. The former is optimized for dealing with batches of data of arbitrary length (hence the "Streaming"), while the latter requires a fixed amount of batches and in turn supports seek operations (hence the "Random Access").
In light of these somewhat confusing names, it is important to insist on what Arrow isn't:
- Arrow is not a file format. When we talk about file formats, we usually think about something that is stored on disk, and Arrow is all about "runtime in-memory representation". Neither the Arrow File Format nor the Arrow Streaming Format define encoding rules to save data on disk. Instead, think of them as data structures that you use in your code.
- Arrow is not meant for long-term storage. Instead, it is meant for ephemeral, or transient, in-memory storage.
You might be thinking: okay, if Arrow is not a file format and is designed to be an in-memory representation, then how does one serialize or store some Arrow data on disk? For that, there are two major options:
- Apache Parquet (Parquet for short), which nowadays is an industry standard to store columnar data on disk. It compress the data with high efficiency and provides fast read and write speeds. As written in the Arrow documentation, "Arrow is an ideal in-memory transport layer for data that is being read or written with Parquet files".
- Feather File Format (Feather for short), which encodes the Arrow IPC File Format (finite, unlike an endless stream). Like Parquet, Feather files are also compressed and optimized for columnar data.
The Feather format was created alongside Arrow, and nowadays it provides decent compression (although Parquet files are usually smaller) and excellent read and write speeds (even better than Parquet). On the other hand, the Parquet format has much wider adoption and is more interoperable. If you are not sure which one is best and you're not concerned about squeezing the speed as much as possible, you can safely pick Parquet.
File size of Feather vs other file formats (source)
First steps with PyArrow
To install the Python bindings for Arrow, you can either use conda/mamba or pip:
$ mamba install "pyarrow=7.0"
Let's work with the classic NYC Taxi dataset. We picked the year 2015 because the files are quite big, while not being too old (there were some schema changes along the way). Download the relevant file from the terminal:
$ wget -v "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-01.csv" -O "/data/yellow_tripdata_2015-01.csv"
And now you're ready to read the CSV data into Arrow:
import pyarrow as pa
from pyarrow import csv
nyc = csv.read_csv("/data/yellow_tripdata_2015-01.csv")
print(len(nyc))
A PyArrow table with its schema
Notice that the dataset contains over 12 million rows. Let's inspect the schema and compute how much RAM is needed to store this data:
In [5]: nyc.schema
Out [5]:
VendorID: int64
tpep_pickup_datetime: timestamp[s]
tpep_dropoff_datetime: timestamp[s]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
In [6]: print("RSS (RAM): {}MB".format(pa.total_allocated_bytes() >> 20))
RSS (RAM): 1812MB
The whole CSV takes a bit more than 2.5 GB on disk, but only 1.8 GB in memory. Observe also that two columns were automatically detected as a timestamp, which departs from the default behavior of pandas.read_csv
.
Arrow's read_csv
function returned a Table
object, which contains a collection of columns. Each of those columns is a ChunkedArray
, one of the many array types available in Arrow:
In [7]: nyc["trip_distance"]
Out [7]:
<pyarrow.lib.ChunkedArray object at 0x7f2cec1023b0>
[
[
1.59,
3.3,
1.8,
0.5,
3,
9,
2.2,
0.8,
18.2,
0.9,
...
Like pandas DataFrames, indexing works by column. To select more than one column, you can use the .select method:
In [8]: nyc.select(["trip_distance", "total_amount"])
Out [8]: pyarrow.Table
trip_distance: double
total_amount: double
----
trip_distance: [[1.59,3.3,1.8,0.5,3,9,2.2,0.8,18.2,0.9,...
total_amount: [[17.05,17.8,10.8,4.8,16.3,40.33,15.3,9.96,58.13,9.35,...
And to slice specific rows, you can use either the .slice
or .take
methods:
nyc.slice(100, 3).to_pandas()
nyc.take([100, 101, 102]).to_pandas()
Some differences with pandas
Arrow departs in some interesting ways from pandas that are immediately noticeable upon first use. Most importantly, data is immutable:
nyc["trip_distance"] = 0 # Raises TypeError!
As stated in the documentation, "Many Arrow objects are immutable: once constructed, their logical properties cannot change anymore. This makes it possible to use them in multi-threaded scenarios without requiring tedious and error-prone synchronization".
However, there are ways to, say, efficiently append rows to an existing Table: pyarrow.concat_tables
will perform a zero-copy concatenation if the schemas of both tables are the same:
pa.concat_tables(
[nyc.slice(1_000, 3), nyc.slice(2_000, 3)]
).to_pandas()
Another interesting difference is how missing values are handled: pandas is now experimenting with mask-based approaches, but in Arrow they are there from the start. Moreover, since Arrow arrays store the number of missing values, the underlying code can skip some checks if no values are missing:
In [12]: nyc["tip_amount"].null_count
Out [12]: 0
Arrow awesomeness
On of the most interesting capabilities of Arrow is the ability to deal with memory-mapped files. This allows Arrow to read datasets that are bigger than the available RAM without incurring any additional cost.
For example, you can memory-map the same CSV file from the sections above:
In [13]: mmap = pa.memory_map("/data/yellow_tripdata_2015-01.csv")
And verify that 0 bytes of memory were allocated:
In [14]: print("RSS: {} MB".format(pa.total_allocated_bytes() >> 20))
RSS: 0 MB
This memory-mapped file can be read in batches, so that you don't need to load all the contents of the file in memory:
from pyarrow.csv import open_csv
# Create a CSVStreamingReader from the memory-mapped file
reader = open_csv(mmap)
# Iterate over all the batches of the file
reader.read_next_batch().to_pandas()
A possible use case is converting a huge CSV file to Parquet by batches, as follows:
import pyarrow.parquet as pq
# "Rewind" the CSV file
mmap.seek(0)
reader = open_csv(mmap)
# Open parquet file for writing with same schema as the CSV file
with pq.ParquetWriter("/data/yellow_tripdata_2015-01.parquet", reader.schema) as writer:
while True:
try:
batch = reader.read_next_batch()
writer.write_batch(batch)
except StopIteration:
break
# Load data directly from Parquet
reloaded_nyc = pq.read_table("/data/yellow_tripdata_2015-01.parquet")
In fact, Arrow supports reading and writing data batches from arbitrary file-like objects, which could be files on disk, sockets, or in-memory objects:
import io
buf = io.BytesIO()
# Create new stream wrapping the BytesIO object
# using the NYC table schema
with pa.ipc.new_stream(buf, reloaded_nyc.schema) as writer:
# Write 5 batches
for index, batch in enumerate(reloaded_nyc.to_batches()):
writer.write_batch(batch)
if index > 5:
break
print(writer.stats) # WriteStats(num_messages=8, num_record_batches=7, ...
# "Rewind" the BytesIO object
buf.seek(0)
# Open the BytesIO for reading
with pa.ipc.open_stream(buf) as reader:
schema = reader.schema
batches = [b for b in reader]
# Create a PyArrow Table from the batches
pa.Table.from_batches(batches)
Magic!
Should you use (Py)Arrow?
First of all, there are chances that you are using PyArrow already: pandas optionally uses PyArrow for reading CSV and Parquet files, and other dataframe libraries in the ecosystem leverage Arrow for performance.
Arrow shines as a building block for other high-level libraries, that leverage its capabilities to squeeze the performance of the system. However, as you can see, PyArrow itself is quite low-level. Yes, some operations like groupbys and aggregations are supported, but some higher level constructs from pandas are not present.
Therefore, as some general recommendations:
- Use PyArrow if you are building a high performance application where efficiency is important and you want to leverage Arrow chunking and streaming capabilities.
- Don't use PyArrow (directly) if you are looking for a faster alternative to pandas that you can easily migrate to.
In the next articles of this series, we will describe some of those alternatives. Stay tuned!
Top comments (0)