DEV Community

Roshan Chokshi
Roshan Chokshi

Posted on

Using OpenMPI for Parallel Processing of Text Files

What is this about?

This article provides a clear overview of OpenMPI and how it can be used to improve the performance of distributed computing tasks. It also provides a detailed code example that demonstrates how to use OpenMPI to calculate the frequency of each letter in a text file. The article also mentions the drawbacks of the code example, which is useful to consider when deciding if OpenMPI is a good solution for a particular problem. Overall, this is a article that provides a good understanding of OpenMPI.

What is OpenMPI?

OpenMPI is an open source implementation of the Message Passing Interface (MPI) standard. It is a library that provides high-performance communication among computers in a distributed system. The OpenMPI library includes a set of routines for point-to-point communication, collective communication, and remote memory operations.

Point-to-point communication allows two processes to communicate directly with each other. This is useful for exchanging data between two processes, such as a client and a server. Collective communication allows multiple processes to communicate at once. This is useful for tasks that require multiple processes to cooperate, such as sorting an array. Remote memory operations provide a way for processes to access data stored in memory on another process. This is useful for tasks that require sharing data between processes, such as a distributed search algorithm.

The main functionality of the OpenMPI library is to provide communication between processes. This communication is done through message passing. In message passing, each process sends a message to another process, which then receives the message and processes it. Message passing is an important part of distributed computing, as it allows processes to exchange data and coordinate their activities. It also allows for synchronization between processes, which is critical for distributed systems.

The OpenMPI library also provides support for other features, such as parallel programming, process management, and resource management. Parallel programming is a way of writing code that allows multiple processes to run concurrently. This can significantly speed up the execution of tasks. Process management allows processes to be created, managed, and monitored. Resource management allows processes to acquire and release resources, such as memory, in a distributed system.

What are we doing?

This is a program that uses MPI (Message Passing Interface) to perform different operations on a given text file in parallel. It first reads the text file, removes punctuations and converts it to lower case, then partitions the text into four groups based on their colors (Blue, Yellow, Green, Red). Each group performs a specific operation on the text:

  1. Blue group removes unwanted words from the text.
  2. Yellow group sets each word to a separate line.
  3. Green group counts the frequency of each word in the text.
  4. Red group sorts the words by their frequency.

System Architecture

The program uses MPI_Intercomm_create function to create an intercommunicator from two intracommunicators. The intercommunicator allows the groups to communicate and pass data to each other. The program then outputs the results to a file named "out.txt".

Implementation

This code is a C++ implementation of a program that partitions a number of processes into four groups (Blue, Yellow, Green, Red) and calculates the frequency of each letter in a text file. The program takes in two arguments, the input filename and the output filename, and uses MPI to split the text file into four parts. After processing each part, the results are combined and written to the output file.

#include <mpi.h>
#include <stdio.h>

#include <iostream>
#include <string>
#include <chrono>

using namespace std;

typedef std::chrono::high_resolution_clock::time_point TimeVar;

#define duration(a) std::chrono::duration_cast<std::chrono::nanoseconds>(a).count()
#define timeNow() std::chrono::high_resolution_clock::now()
Enter fullscreen mode Exit fullscreen mode

The program begins by including the mpi.h header, followed by the iostream and string library for string manipulation, and the chrono library for timing. The removePunctuation() method removes all punctuation from a string, the convertToLower() method converts all uppercase letters to lowercase, the calculateFrequency() method calculates the frequency of each letter in a string, and the create_Intercommunicator() method creates an intercommunicator between two groups of processes.

Methods:
removePunctuation():

/* remove the unwanted character*/
void removePunctuation(char *chunk)
{
    int count = 0;
    int i = 0, j = 0;
    for (i = 0; chunk[i] != '\0'; i++)
    {
        if ((chunk[i] >= 'a' && chunk[i] <= 'z') || (chunk[i] >= 'A' && chunk[i] <= 'Z'))
        {
            chunk[j] = chunk[i];
            j++;
        }
    }

    // add null character at the end of the string
    chunk[j] = '\0';
}
Enter fullscreen mode Exit fullscreen mode

convertToLower():

/* convert the string to lower case*/
void convertToLower(char *chunk)
{
    int i = 0;
    while (chunk[i] != '\0')
    {
        if (chunk[i] >= 'A' && chunk[i] <= 'Z')
        {
            chunk[i] = chunk[i] + 32;
        }
        i++;
    }
}
Enter fullscreen mode Exit fullscreen mode

calculateFrequency():

/* calculate character frequency*/
int calculateFrequency(char *chunk, int *freq)
{
    int i = 0;
    while (chunk[i] != '\0')
    {
        if (chunk[i] >= 'a' && chunk[i] <= 'z')
        {
            freq[chunk[i] - 'a']++;
        }
        i++;
    }

    return *freq;
}
Enter fullscreen mode Exit fullscreen mode

create_Intercommunicator():

void create_Intercommunicator(MPI_Comm &comm, int color, MPI_Comm &a, MPI_Comm &b, MPI_Comm &c, MPI_Comm &d)
{
    if (color == 0)
    { // Blue: remove the unwanted word in text file
        /* Creates an intercommunicator from two intracommunicators. */
        MPI_Intercomm_create(comm, 0, MPI_COMM_WORLD, 1, 1, &a);
    }
    else if (color == 1)
    { // Yellow
        // set to one word each line
        MPI_Intercomm_create(comm, 0, MPI_COMM_WORLD, 0, 1, &a);
        MPI_Intercomm_create(comm, 0, MPI_COMM_WORLD, 2, 12, &b);
    }
    else if (color == 2)
    { // Green
        // count the word frequency
        MPI_Intercomm_create(comm, 0, MPI_COMM_WORLD, 1, 12, &b);
        MPI_Intercomm_create(comm, 0, MPI_COMM_WORLD, 3, 123, &c);
    }
    else if (color == 3)
    { // Red
        // sort by the word frequency
        MPI_Intercomm_create(comm, 0, MPI_COMM_WORLD, 2, 123, &c);
    }
}
Enter fullscreen mode Exit fullscreen mode

The main() method begins by initializing the MPI environment and declaring the colors, communicators, and file variables.

int main(int argc, char const *argv[])
{
    TimeVar t1 = timeNow();
    string colors[4] = {"Blue", "Yellow", "Green", "Red"};
    MPI_Comm group_comm, BY_comm, YG_comm, GR_comm, RD_comm;
    MPI_File in, out;
    MPI_Offset filesize; /*  integer type of size sufficient to represent the size (in bytes) */
    MPI_Status status;   /* Status returned from read */
    int world_rank, world_size, group_rank, group_size;
    int initialized, finalized;
    int ierr;
    int bufsize;
    int freqsum[26] = {0};

    MPI_Initialized(&initialized);
    if (!initialized)
        MPI_Init(NULL, NULL);

    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);
Enter fullscreen mode Exit fullscreen mode

It then checks the number of arguments and exits with an error message if there are not enough. The code then opens the input and output files and gets their sizes.


    /* Check the arguments */
    if (argc != 3)
    {
        if (world_rank == 0)
            fprintf(stderr, "Usage: %s infilename outfilename\n", argv[0]);
        MPI_Finalize();
        exit(1);
    }

    /* Read the input file */
    ierr = MPI_File_open(MPI_COMM_WORLD, argv[1], MPI_MODE_RDONLY, MPI_INFO_NULL, &in);
    if (ierr)
    {
        if (world_rank == 0)
            fprintf(stderr, "%s: Couldn't open file %s\n", argv[0], argv[1]);
        MPI_Finalize();
        exit(2);
    }

    /* Get the size of the file */
    ierr = MPI_File_get_size(in, &filesize);
    if (ierr)
    {
        if (world_rank == 0)
            fprintf(stderr, "%s: Couldn't read file size of %s\n", argv[0], argv[1]);
        MPI_Finalize();
        exit(3);
    }

    /* Open the output file */
    ierr = MPI_File_open(MPI_COMM_WORLD, argv[2], MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &out);
    if (ierr)
    {
        if (world_rank == 0)
            fprintf(stderr, "%s: Couldn't open output file %s\n", argv[0], argv[2]);
        MPI_Finalize();
        exit(4);
    }
Enter fullscreen mode Exit fullscreen mode

The code then splits the MPI_COMM_WORLD into four groups based on the rank of the process and creates an intercommunicator between each group. The process then sets the view for the input and output files.


    /* split into groups*/
    int color = world_rank % 4;
    MPI_Comm_split(MPI_COMM_WORLD, color, world_rank, &group_comm);
    MPI_Comm_rank(group_comm, &group_rank);

    MPI_Comm_size(group_comm, &group_size);

    printf("Rank %d/%d in original comm, group [%s] %d/%d in new comm\n", world_rank, world_size, colors[color].c_str(), group_rank, group_size);

    /* Calculate how many elements that is */
    filesize = filesize / sizeof(char);
    /* Calculate how many elements each processor gets */
    bufsize = filesize / group_size;

    ierr = MPI_File_set_view(in, group_rank * bufsize, MPI_CHAR, MPI_CHAR, "native", MPI_INFO_NULL); // split the file for group members
    if (ierr)
    {
        if (group_rank == 0)
            fprintf(stderr, "%s: Couldn't set file view for %s", argv[0], argv[1]);
        MPI_Finalize();
        exit(5);
    }

    ierr = MPI_File_set_view(out, group_rank * bufsize, MPI_CHAR, MPI_CHAR, "native", MPI_INFO_NULL); // split the file for group members
    if (ierr)
    {
        if (group_rank == 0)
            fprintf(stderr, "%s: Couldn't set file view for %s", argv[0], argv[1]);
        MPI_Finalize();
        exit(6);
    }

    /* create intercommunicator for groups */
    create_Intercommunicator(group_comm, color, BY_comm, YG_comm, GR_comm, RD_comm);

    MPI_Barrier(MPI_COMM_WORLD);
Enter fullscreen mode Exit fullscreen mode

Reads the input file, removes punctuation, converts all letters to lowercase, calculates the frequency of each letter, and writes the results to the output file. Finally, the program closes the input and output files and exits with the total time taken.

if (color == 0)
    { // Blue: remove the puctuations and spaces in text file
        int nrchar = 0;
        char *buf = new char[bufsize + 1];
        /* Reads a file starting at the location specified by the individual file pointer (blocking, noncollective) */
        ierr = MPI_File_read(in, buf, bufsize, MPI_CHAR, &status);
        if (ierr)
        {
            if (group_rank == 0)
                fprintf(stderr, "%s: Couldn't read from file %s", argv[0], argv[1]);
            MPI_Finalize();
            exit(7);
        }
        /* Gets the number of top-level elements received. */
        MPI_Get_count(&status, MPI_CHAR, &nrchar);
        /* Add a null character to the end of the buffer */
        buf[nrchar] = '\0';
        removePunctuation(buf);
        MPI_Send(buf, bufsize, MPI_CHAR, group_rank, 0, BY_comm);
    }
    else if (color == 1)
    { // Yellow: convert to lowercase
        char *buf = new char[bufsize + 1];
        /* receive the data from blue */
        MPI_Recv(buf, bufsize, MPI_CHAR, group_rank, 0, BY_comm, &status);
        convertToLower(buf);
        MPI_Send(buf, bufsize, MPI_CHAR, group_rank, 0, YG_comm);
    }
    else if (color == 2)
    { // Green: calculate the char frequency
        char *buf = new char[bufsize + 1];
        int freq[26] = {0};
        /* receive the data from yellow */
        MPI_Recv(buf, bufsize, MPI_CHAR, group_rank, 0, YG_comm, &status);
        calculateFrequency(buf, freq);
        MPI_Send(freq, 26, MPI_INT, group_rank, 0, GR_comm);
    }
    else if (color == 3)
    { // Red: write the result to file
        int freq[26] = {0};
        MPI_Recv(freq, 26, MPI_INT, group_rank, 0, GR_comm, &status);
        MPI_Reduce(freq, freqsum, 26, MPI_INT, MPI_SUM, 0, group_comm);
        if (group_rank == 0)
        {
            char outbuf[1000];
            int index = 0;
            for (int i = 0; i < 26; i++)
            {
                index += sprintf(outbuf + index, "%c: %d\n", 'a' + i, freqsum[i]);
            }

            ierr = MPI_File_write(out, outbuf, index, MPI_CHAR, &status);
            if (ierr)
            {
                if (group_rank == 0)
                    fprintf(stderr, "%s: Couldn't write to file %s", argv[0], argv[2]);
                MPI_Finalize();
                exit(8);
            }
        }
    }

    MPI_File_close(&in);
    MPI_File_close(&out);

    MPI_Finalized(&finalized);
    if (!finalized)
        MPI_Finalize();
    cout << "Total Time: " << duration(timeNow() - t1) << " seconds";
    return 0;
}
Enter fullscreen mode Exit fullscreen mode

How to run?

To run this program, you need to have MPI installed on your system. You can then compile the program using
mpic++ main.cpp -o main.out
and run it using
mpirun --hostfile mpi.config -np 4 ./main.out test.txt out.txt
Where test.txt is the input file and out.txt is the output file. The --hostfile option specifies the host file that contains a list of hosts (computers) to be used in the MPI job, and -np 4 specifies the number of processes to run.

Results:
Benchmark

Drawbacks:
One of the drawbacks of this code is that it is limited in the number of processes it can use. This is due to the fact that the intercommunicator only supports four processes. This means that if more than four processes are available, the code will not be able to utilize them. Additionally, the code is limited in the size of the file that it can read. This is due to the fact that it splits the file into chunks that are equal in size. If the file is too large, the code will not be able to split it into chunks that are equal in size.

Top comments (0)