DEV Community

Ruma Sinha
Ruma Sinha

Posted on

Working with Map() function in Python, Pyspark and Apache Beam

Map() function

The map() is a built in function in Python. The map function executes a specified function for each item in an iterable. An iterable can be a list or a set or a dictionary or a tuple.
The syntax of map function with a list iterable is:
map(funcA, list[item1,item2,item3....itemN])
The function funcA() is applied to each list element. The resulting output is a new iterator. The process is known as mapping.

Example with a Python code

For loop
list_of_states = ['new jersey','new york','texas','california','florida']

# for loop to convert each element into upper case
modified_list_of_states = []
for idx in range(len(list_of_states)):
  modified_list_of_states.append(str.upper(list_of_states[idx]))

modified_list_of_states
['NEW JERSEY', 'NEW YORK', 'TEXAS', 'CALIFORNIA', 'FLORIDA']
Enter fullscreen mode Exit fullscreen mode

Image description

Lambda and Map function
modified_list_of_states = map(lambda list_of_states: str.upper(list_of_states), list_of_states)

list(modified_list_of_states)

['NEW JERSEY', 'NEW YORK', 'TEXAS', 'CALIFORNIA', 'FLORIDA']
Enter fullscreen mode Exit fullscreen mode

Image description

Map function with a defined function that takes as input each list item and returns the modified value
def convert_case_f(val):
   return str.upper(val)

modified_list_of_states = map(convert_case_f, list_of_states)
modified_list_of_states # creates an iterator. To display the items we use the list()
<map at 0x7f08a15d6ee0>

list(modified_list_of_states)
['NEW JERSEY', 'NEW YORK', 'TEXAS', 'CALIFORNIA', 'FLORIDA']
Enter fullscreen mode Exit fullscreen mode

Image description

Map function in Pyspark

We have the movie ratings dataset from Kaggle. The sample data as below:

Image description

The third column is the movie ratings data and our use case is to find the total count per rating.
Using the map() function, we can extract the ratings column.

Image description
For every record, the lambda function split the columns based on the white space.The third column that is the rating column gets extracted from every data record. This transformation is applied to every row in the dataset with the map() function.

from pyspark import SparkConf, SparkContext
import collections

# Setting up the SparkContext object
conf = SparkConf().setMaster("local").setAppName("MovieRatingsData")
sc = SparkContext(conf = conf)

# Loading data
movie_ratings_data = sc.textFile("/content/u.data")

# Extracting the ratings data with the map() function
ratings = movie_ratings_data.map(lambda x: x.split()[2])

# Count the total per rating
ratings_count = ratings.countByValue()

# Display the result
collections.OrderedDict(sorted(ratings_count.items()))
Enter fullscreen mode Exit fullscreen mode

Image description

Map() function with Apache Beam

With the same use case lets see the working example with Apache Beam.First install apache-beam library. Next create the pipeline p and a PCollection movie_data that stores the results of all the transformations.
Pipeline p applies all the transformations in sequence. It first read the data file using read transform into a collection and then split each row into columns. Next, we make a key value pair of each rating where key is the rating and value assigned is 1. This is then combined and summed up. Displaying the final results.

import apache_beam as beam

p = beam.Pipeline()

movie_data = ( 
                      p 
                      | 'Read from data file' >> beam.io.ReadFromText('/content/u.data')
                      | 'Split rows' >> beam.Map(lambda record: record.split('\t'))
                      | 'Fetch ratings data' >> beam.Map(lambda record: (record[2], 1))
                      | 'Total count per rating' >> beam.CombinePerKey(sum)
                      | 'Write results for rating' >> beam.io.WriteToText('results')
                   )

p.run() # creates the result output file
Enter fullscreen mode Exit fullscreen mode

Image description

References

Latest comments (0)