Introduction
If you are machine learning engineer and started to productionalize your model prediction pipeline you may came across below question.
how to write memory efficient prediction data pipelines in python.
Working with a dataset that is too large to fit into memory
Then this post is for you. I will go over how to write memory efficient data pipelines using generators and batching.
Let's setup dataset for our example, here we will be NYC Parking Violations Issued - Fiscal Year 2021,which contains violations issued during the 2014 fiscal year. This dataset contains several data quality problems and is a great example for demonstrating data profiling and data cleaning steps.
The dataset consists of over 15 million rows and the compressed data file is about 500 MB in size (~2.7 GB uncompressed). The dataset is available for download via the Socrata Open Data API (SODA). To download the dataset, openclean includes a SODA wrapper that provides access to all datasets available via the API using their unique identifier.
# List all datasets from NYC Open Data that contain 'parking'
# and ' violation' in their name.
from openclean.data.source.socrata import Socrata
for dataset in Socrata().catalog(domain='data.cityofnewyork.us'):
if 'parking' in dataset.name.lower() and 'violation' in dataset.name.lower():
print(f'{dataset.identifier}\t{dataset.domain}\t{dataset.name}')
#Output
nc67-uf89 data.cityofnewyork.us Open Parking and Camera Violations
pvqr-7yc4 data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2022
ncbg-6agr data.cityofnewyork.us DOF Parking Violation Codes
jt7v-77mi data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2014
kiv2-tbus data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2016
2bnn-yakx data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2017
c284-tqph data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2015
a5td-mswe data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2018
faiq-9dfq data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2019
p7t3-5i9s data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2020
kvfd-bves data.cityofnewyork.us Parking Violations Issued - Fiscal Year 2021
Dataset
The parking violations dataset has the unique identifier kvfd-bves. The identifier is part of the dataste Url https://data.cityofnewyork.us/City-Government/Parking-Violations-Issued-Fiscal-Year-2021/kvfd-bves. The following code downloads the dataset in tab-delimited CSV format and stores it in a local file called kvfd-bves.tsv.gz.
What is Generators and How you you can use it?
In Python, a generator is a function that returns an iterator that produces a sequence of values when iterated over.
Generators are useful when we want to produce a large sequence of values, but we don't want to store all of them in memory at once.
Creating a generator is very similar to creating a function. You can use the generator expression () or define a function that uses yield instead of return.
Generators are usually used in a loop. For each iteration of the calling loop:
- Control is passed back to the generator function from the calling loop.
- The generator yields the next value to the loop and the control is passed back to the loop.
- Steps 1 and 2 are repeated until all the generated values are exhausted.
You can also get data directly from a generator using next(generator_obj).
Let’s say our goal is to process the parking violation 2021 dataset with the following steps:
- Keep only the violations issued by police (denoted by P in the data), to vehicles with the make TOYOTA in NJ.
- Replace P with police.
- Concat house number, street name, and registration state fields into a single address field.
- Write the result’s in to a csv file with the header vehicle_make,issuing_agency,address.
Using Generator Expression ()
We will create generators with the () comprehension format. We also chain multiple generators to form a data pipeline, this is called chaining.
import csv
#define your input and output datasets
input_file_name = "./parking-violations-year-2021.csv"
output_file_name = "./nj_toyota_police_2021.csv"
# 1. stream data from input file
read_file_csv = open(input_file_name, "r")
extractor = csv.reader(read_file_csv) # csv reader produces a generator
# 2. keep only required fields
# field index => field;
# 2 => registration state, 7 => vehicle make, 8 => issuing agency, 23 => house number, 24 => street name
col_filtered_df = ([row[2], row[7], row[8], row[23], row[24]] for row in extractor)
# 3. keep only violations issued by police, to vehicles with the make FORD in NJ
value_filtered_df = filter(
lambda x: all([x[0] == "NJ", x[1] == "FORD", x[2] == "P"]), col_filtered_df
)
# 4. replace P with police
transformed_df = (
[op[0], op[1], "Police", op[3], op[4]]
for op in value_filtered_df
)
# 5. concat house number, street name, registration state into a single address field
final_df = (
[op[1], op[2], ", ".join([op[3], op[4], op[1]])]
for op in transformed_df
)
final_df # this is a generator object and has not yet started generating data
# 6. write a header row for output data
write_file_object = open(output_file_name, "w")
loader = csv.writer(
write_file_object, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
)
header = ["vehicle_make", "issuing_agency", "address"]
loader.writerow(header)
# 7. stream data into an output file
loader.writerows(final_df) # loader asks for data from final_stream
Using generator yield
You can use yield statement to create your own generator inside function as shown below:
file_path = "./parking-violations-year-2021.csv"
def gen_file_reader(file_path):
for row in open(file_path, "r"):
yield row
def gen_print_row_count():
count = 0
for row in gen_file_reader("large_file"):
count += 1
print(f"Total count is {count}")
gen_print_row_count()
This is a simple example, but generator function is more memory efficient.
Batching
Consider an example where your data pipeline has to call an external paid API service. Let’s assume that this service charges per call. It does not matter if you call it with 1 row vs 10,000 rows, It charges the same price.
In this scenario you can accumulate the rows in memory. When it hits a specified threshold, then make the API call.
import time
import csv
input_file_name = "./parking-violations-year-2021.csv"
output_file_name = "./api_call_enriched.csv"
def api_call(rows):
time.sleep(10) # simulating an api call
return rows
def batched_api_transforms(rows, batch_size=10000):
batch = []
for row in rows:
batch.append(row)
if len(batch) >= batch_size:
yield from api_call(batch)
batch = []
yield from api_call(batch)
# 1. stream data from input file
read_file_object = open(input_file_name, "r")
extractor = csv.reader(read_file_object)
# 2. make batched calls to the external service
final_df = batched_api_transforms(extractor)
next(final_df) # you will notice a 15s wait, simulating the external service call
[
next(final_df) for i in range(9999)
] # you will notice almost no wait, since this data is held in process memory
# we have iterated through the first batch of 10,000
# the next call will invoke the api_call function, thus sleeping for 15s
next(final_df) # you will notice a 10s wait
Conclusion
In this post we saw how we can use generators and batching to process large
dataset, so next time you have to build a data pipeline to process a larger than memory data set, try using generators and batching.
Some Final Words
If this blog was helpful and you wish to show a little support, you could:
- 👍 300 times for this story
- Follow me on LinkedIn: https://www.linkedin.com/in/raju-n-203b2115/
These actions really really really help me out, and are much appreciated!
Top comments (1)
This month I have been able to write much for my clients due to such deep and insightful articles.
Efficient memory usage in machine learning pipelines is crucial, and utilizing generators and batching as demonstrated here is an excellent approach to handle large datasets without exhausting system resources.
Keep doing your thing!