DEV Community 👩‍💻👨‍💻

Cover image for Multiprocessing communication with FIFO queues
Marcell Cruz
Marcell Cruz

Posted on

Multiprocessing communication with FIFO queues

This is part of a series of posts talking about FIFO and techniques for IPC(inter process comunication) today I'm gonna explore IPC between parent and child processes for asynchronous programming, one problem that arises when you fork a process is communication between the parent process and the child process or between child processes, usually this is solved in the standard library of the language or a third party library, node's exec and spawn for example handles communication between the process for you using something very similar with what we're gonna do, But how does that work and what can we do if we want to have more control over it, Let's explore some options.

Simple Queue Spawning Multiple Processes Without Error handling

The code

I always like to start with the code, because reading the code will give you a general idea of what's going on, the following is very simple ruby code, it's written in Ruby but the idea can be used in any language, and the point is the idea not the implementation.

read_from_fifo reads from the fifo file line by line, push_to_fifo writes a line to the fifo file and process_queue reads from the input file forks the process passes the url that was read from the fifo file to the child process, the child process makes a http request and writes the result to an output fifo file, since process_queue is a recursive function that runs forever, any new line added to the fifo file will be processed.

require 'uri'
require 'net/http'
require 'json'

OUTPUT_FIFO = './output.fifo'
INPUT_FIFO = './input.fifo' 
unless File.exist?(OUTPUT_FIFO)
  File.mkfifo(OUTPUT_FIFO)
  File.chmod(0666, OUTPUT_FIFO)
end

unless File.exist?(INPUT_FIFO)
  File.mkfifo(INPUT_FIFO)
  File.chmod(0666, INPUT_FIFO)
end

def read_from_fifo(fifo) 
  rpipe = File.open(fifo, 'r+')
  res = ""
  while (r = rpipe.read(1)) != "\n"
    res << r
  end
  res
end

def push_to_fifo(endpoint, fifo)
  wpipe = File.open(fifo, 'w+')
  wpipe.write "#{endpoint}\n"
  wpipe.flush
end

def process_queue()
  rpipe = File.open(INPUT_FIFO, 'r+')
  url = ""
  while (r = rpipe.read(1)) != "\n"
    url << r
  end
  fork do
    puts "Processing: #{url} PID: #{Process.pid}"
    uri = URI(url)
    res = Net::HTTP.get_response(uri)
    puts JSON.parse(res.body)['quote']
    push_to_fifo(JSON.parse(res.body)['quote'], OUTPUT_FIFO)
  end
  process_queue
end

push_to_fifo('https://api.kanye.rest/', INPUT_FIFO)
push_to_fifo('https://api.kanye.rest/', INPUT_FIFO)
push_to_fifo('https://api.kanye.rest/', INPUT_FIFO)

process_queue()
Enter fullscreen mode Exit fullscreen mode

Image description

Above is a terminal running the program, and below is another terminal where I pushed the url to the fifo file, the following is my attempt at drawing what is happening in more detail

Image description

Simple Queue Spawning Multiple Processes With Error Handling

One thing that was missing from our last implementation was error handling, if something goes wrong in the process that tries to make the GET request, the message is lost, the process is going to crash or hang forever we're not gonna try again or get any response.

We want to improve the resilience or our application and make it more fault tolerant.

What we can do in this situation is create another queue to process errors, also called a dead letter queue, any time we have a error processing a message we push the message to the dead letter queue to be processed later.

The Code

To do that we just need to push the error to the dead letter queue and to also process the dead letter, to do add we add another process just to process the dead letter queue, both processes, the main that processes the input queue and the process responsible for the dead letter queue run at the same time and wait for messages to get to the queue.

require 'uri'
require 'net/http'
require 'json'

OUTPUT_FIFO = './output.fifo'
INPUT_FIFO = './input.fifo' 
DEADLETTER_FIFO = './deadletter.fifo' 
unless File.exist?(OUTPUT_FIFO)
  File.mkfifo(OUTPUT_FIFO)
  File.chmod(0666, OUTPUT_FIFO)
end

unless File.exist?(INPUT_FIFO)
  File.mkfifo(INPUT_FIFO)
  File.chmod(0666, INPUT_FIFO)
end

unless File.exist?(DEADLETTER_FIFO)
  File.mkfifo(DEADLETTER_FIFO)
  File.chmod(0666, DEADLETTER_FIFO)
end

def read_from_fifo(fifo) 
  rpipe = File.open(fifo, 'r+')
  res = ""
  while (r = rpipe.read(1)) != "\n"
    res << r
  end
  res
end

def push_to_fifo(endpoint, fifo)
  wpipe = File.open(fifo, 'w+')
  wpipe.write "#{endpoint}\n"
  wpipe.flush
end

def process_deadletter() 
  puts "Waiting for messages in the dead letter queue"
  rpipe = File.open(DEADLETTER_FIFO, 'r+')
  url = ""
  while (r = rpipe.read(1)) != "\n"
    url << r
  end
  fork do
    begin
      puts "Dead Letter Processing: #{url} PID: #{Process.pid}"
      uri = URI(url)
      res = Net::HTTP.get_response(uri)
      puts JSON.parse(res.body)['quote']
      push_to_fifo(JSON.parse(res.body)['quote'], OUTPUT_FIFO)
    rescue => e
      puts "error: #{e}"
    end
  end

  process_deadletter
end

def process_queue()
  puts "Waiting for messages in the process queue"
  rpipe = File.open(INPUT_FIFO, 'r+')
  url = ""
  while (r = rpipe.read(1)) != "\n"
    url << r
  end
  fork do
    puts "Processing: #{url} PID: #{Process.pid}"
    begin
      uri = URI(url)
      res = Net::HTTP.get_response(uri)
      puts JSON.parse(res.body)['quote']
      push_to_fifo(JSON.parse(res.body)['quote'], OUTPUT_FIFO)
    rescue => e
      puts "Error trying to process #{url} sending message to deadletter queue"
      push_to_fifo(url, DEADLETTER_FIFO)
    end
  end
  process_queue
end

fork do
  process_deadletter
end

process_queue
Enter fullscreen mode Exit fullscreen mode

If we run the previous code we get the following response

Image description

Now we have two processes waiting for messages in the queue,
if we append a message to the input.fifo file with

echo 'https://api.kanye.rest/' >> input.fifo
Enter fullscreen mode Exit fullscreen mode

We get the response

Image description

Nothing new here, we pass a valid url and everything worked fine, but if we pass a invalid URL we can see the dead letter queue working

echo 'invalid url' >> input.fifo
Enter fullscreen mode Exit fullscreen mode

Image description

The first process fails and send the message to the dead letter queue, the dead letter queue process starts processing the new message in the queue, it also fails since in this case it's a invalid URL

Adding A Delay To The Second Call

In a real situation we would want to add a delay to the second call, if the service that we're hitting is not working now, it's unlikely that it's going to be working some milliseconds later, what we want to do in this case is to add a delay to the process that handles the dead letter queue, we have a lot of different approaches to do that, we can increase the time we do the retry for every retry, we can filter out some errors like the invalid URL case, since it's never going to work in this case.

Next post

In the next post I'm gonna experiment with other ways of setting up the queues and processing the messages.

Top comments (0)

🌚 Life is too short to browse without dark mode