DEV Community

Cover image for A language for data flow and analysis
Caleb Winston
Caleb Winston

Posted on

A language for data flow and analysis

For many people, Python is their go-to language for data analysis. This in large part because the Python language makes it very easy to think about and write functions that operate on units of data. For example, this function determines a number segment a person of a certain age belongs to.

def age_segment(age):
    if age < 12: return 0
    elif age < 18: return 1
    elif age < 28: return 2
    elif age < 48: return 3
    elif age < 68: return 4
    elif age < 88: return 5
    else: return 6

Here's another function. It takes in an age segment number and returns whether or not the person is legally a minor.

def is_under_age(age_segment):
    if age_segment <= 1:
        return True
    return False

And here's one last one. This function's job is to pretty print an age. That's all it's job is.

def print_age(age_segment):
    if age_segment == 0: print("0-12")
    elif age_segment == 1: print("12-18")
    elif age_segment == 2: print("18-18")
    elif age_segment == 3: print("28-48")
    elif age_segment == 4: print("48-68")
    elif age_segment == 5: print("68-88")
    elif age_segment == 6: print("88+")

All of these functions operate on "single units of data". But a very common thing to do in data analysis is have a bunch of "units of data" and then have a sort-of "pipeline" of functions that each unit passes through with each function doing an operation that progressively transforms the data.

As an example, let's say we have such a list of data. And you want to take each element of data and pass it through this sort-of "pipeline" of functions that you could construct that passes data first to age_segment then to is_under_age and finally to print_age.

ages = [9, 19, 37, 28, 48, 13]

Now you might realize that you could just use Python's for loop construct to loop over this and call the functions in order on the ages in the list. And that is what many people will do. But there are a few issues you will run into.

  • You will have to know Python to change the structure of the pipeline (e.g. - you want to remove age_segment because your data is already age segments)
  • You will have to write (in my opinion) messy code to make functions run in parallel (Python provides the multiprocessing library to make functions run at the same time - instead of one after the other - and easily pass data between them; but this introduces quite a bit of extra code)

In fact, this is what that will look like.

from multiprocessing import Process, Queue
from ages_utils import age_segment as age_segment
from ages_utils import is_under_age as is_under_age
from ages_utils import ages as ages
from ages_utils import print_age as print_age
class PLPipeSentinel: pass
def run_ages(stream, out_queue):
    for data in stream:
        out_queue.put(data)
    out_queue.put(PLPipeSentinel())

def run_age_segment(in_queue, out_queue):
    while 1:
        inp = in_queue.get()
        if isinstance(inp, PLPipeSentinel):
            outp = PLPipeSentinel()
        if not isinstance(inp, PLPipeSentinel):
            outp = age_segment(inp)
        if out_queue is not None:
            out_queue.put(outp)
        if isinstance(inp, PLPipeSentinel):
            break

def run_is_under_age(in_queue, out_queue):
    while 1:
        inp = in_queue.get()
        if isinstance(inp, PLPipeSentinel):
            outp = PLPipeSentinel()
        if not isinstance(inp, PLPipeSentinel):
            result = is_under_age(inp)
            if result:
                outp = inp
            else:
                continue
        if out_queue is not None:
            out_queue.put(outp)
        if isinstance(inp, PLPipeSentinel):

            break
def run_print_age(in_queue, out_queue):
    while 1:
        inp = in_queue.get()
        if isinstance(inp, PLPipeSentinel):
            outp = PLPipeSentinel()
        if not isinstance(inp, PLPipeSentinel):
            outp = print_age(inp)
        if out_queue is not None:
            out_queue.put(outp)
        if isinstance(inp, PLPipeSentinel):
            break

if __name__ == "__main__":
    data = ages()
    in_age_segment = Queue()
    in_is_under_age = Queue()
    in_print_age = Queue()
    ages_process = Process(target=run_ages, args=(data, in_age_segment))
    age_segment_process = Process(target=run_age_segment, args=(in_age_segment,in_is_under_age,))
    is_under_age_process = Process(target=run_is_under_age, args=(in_is_under_age,in_print_age,))
    print_age_process = Process(target=run_print_age, args=(in_print_age,None,))
    ages_process.start()
    age_segment_process.start()
    is_under_age_process.start()
    print_age_process.start()
    age_segment_process.join()
    is_under_age_process.join()
    print_age_process.join()

Maybe this is legible to you but it certainly isn't for me.

What if there was a simple high-level language to describe the structure of a pipeline with a compiler that compiles the language code to Python. This language would be a language that anybody can use - a scientist, a business executive, ...literally anyone.

You would work with components of a pipeline having a basic understanding of what the components do (e.g. - is_under_age tells me if the age is considered legally underage) but no required knowledge of how they are implemented. Somebody who does know Python would write the implementations of the components but then the pipeline can be written and re-written and re-structured without needing to consult a person who knows Python because the pipeline is written in a simple high-level language that abstracts away the functions that make up any pipeline.

This high-level language exists! It's called Pipelines. With the Pipelines language, the above 63 lines of Python code is reduced to the below 5 line description.

from ages_utils import ages
from ages_utils import age_segment
from ages_utils import is_under_age
from ages_utils import print_age

ages |> age_segment /> is_under_age |> print_age

The |> indicates that each element passed into age_segment and print_age is transformed by passing into the age_segment() and print_age() functions with the results then passed onto the next function in the pipeline if one exists.

The \> indicates that each element passed into is_under_age is filtered. The element is passed into is_under_age and if the result is True it is passed on. If the result is False it does not get passed on.

There's much more to the language than just pipes and filters and features designed for more complex data flow and everything can be found in the "README" on the GitHub repository.

GitHub logo calebwin / pipelines

Pipelines is a language for high-level data analysis.

Pipelines is a language and runtime for crafting massively parallel pipelines. Unlike other languages for defining data flow, the Pipeline language requires implementation of components to be defined separately in the Python scripting language. This allows the details of implementations to be separated from the structure of the pipeline, while providing access to thousands of active libraries for machine learning, data analysis and processing. Skip to Getting Started to install the Pipeline compiler.

An example

As an introductory example, a simple pipeline for Fizz Buzz on even numbers could be written as follows -

from fizzbuzz import numbers
from fizzbuzz import even
from fizzbuzz import fizzbuzz
from fizzbuzz import printer
numbers
/> even 
|> fizzbuzz where (number=*, fizz="Fizz", buzz="Buzz")
|> printer

Meanwhile, the implementation of the components would be written in Python -

def numbers()
    for

If you do decide to use in your own data analysis work, please let me know - I would love to see what you make! If not, I would appreciate it if you could star the GitHub repository for later if you find this interesting and potentially useful.

Top comments (0)