DEV Community

Jorge PM
Jorge PM

Posted on

Extract csv data and load it to PostgreSQL using Meltano ELT

Index

  1. Introduction
  2. Environment preparation
  3. Using our environment
  4. Conclusion and final thoughts

Remember: in my posts you can always skip the first section and go directly to "show me the code!!" Section 2.

1. Introduction

I run into these two related projects (meltano and singer) and coming from a more "traditional" Airflow-based pipelines background, I decided to give them a try.

To give you a quick overview, pretty much meltano (which uses singer) works like this.

Meltano sits on top of an orchestrator (e.g. Airflow) and uses singer taps to load data and move it around. Then meltano triggers dbt for your transformation stage.

Also, meltano used to have a hello world tutorial similar to this one but it is now giving me a 404 :(. At least it isn't a teapot 418.

Let's take a step back. What are these libraries and how do they fit in your data stack

When it comes to meltano, a library that promises to be an opensource ELT, the question is how does it fit a more traditional stack. I had two particular questions about meltano that were conveniently replied by someone working at meltano a couple of year ago in the dataengineering subreddit.

First, how does this compare to Airflow and second how does something like Spark works with meltano. MeltanoDouwe answered these questions for the community:

The idea is that Airflow can be used to orchestrate data pipelines, but these data pipelines still first need to be written, as do their components (the extractor, loader, and (optionally) transformations). Airflow doesn't help with that, and shouldn't, because the whole point is that it's programmable and generically useful.

Meltano sits on top of Airflow (or another orchestrator) to specifically help people build robust EL pipelines out of open source extractors and loaders following the Singer specification. It provides a CLI and YAML-based configuration format that allow people to set up their EL(T) pipelines in a much friendlier way than having to manually write Airflow DAGs stringing together hard-to-monitor commands.

And when it comes to Spark:

Meltano primarily concerns itself with getting data from various sources and integrating them in a single place, with optional SQL-based dbt transformations applied. What happens after that is up to you and out of scope for Meltano, but you could imagine setting up Airflow with a DAG that follows a simple BashOperator that runs meltano elt with an operator that kicks off some kind of Spark processing job pointed at the same data warehouse or other storage location targeted by Meltano

Here is the original post https://www.reddit.com/r/dataengineering/comments/gj722d/why_gitlab_is_building_meltano_an_open_source/

So in a nutshell, meltano will help to avoid having to write bespoke code for well known operations and data transfers that you can simply pick up as a singer tap. That said, there is significant overlap with Airflow operators. However, I can see how using the right tool for the job (meltano just to run your ELT and Airflow for the orchestration) could give you a better experience than trying to get Airflow operators under control (this is specially true if you are in an ecosystem like Composer where you very often end up in dependency hell).

2. Environment preparation

When it comes to installing meltano, the guide in its website is pretty good, this is just a summary of it https://meltano.com/docs/installation.html#local-installation

The process is simple: create your venv, activate it and install meltano with pip (this is to be run from a pre-created folder where you want the project to live)

python3 -m venv venv
source venv/bin/activate

# to avoid any issues during the installation we will update pip
pip install -U pip
pip install meltano
Enter fullscreen mode Exit fullscreen mode

Now, let's setup meltano. First, let's create out meltano project. We will call it dags

meltano init dags
Enter fullscreen mode Exit fullscreen mode

We are now going to need Extractors and Loaders to extract data from a source a to load it somewhere else. Remember, once it's loaded, we could transform it with dbt.

We will use a csv extractor and we will load it to an instance of PostgreSQL. So before we move on, let's configure an instance of PostgreSQL we can use to test meltano.

I will use docker and simply run a vanilla PostgreSQL instance in it.

docker run --name db -e POSTGRES_PASSWORD=password -e POSTGRES_DB=datadb -p 5432:5432 -d postgres
Enter fullscreen mode Exit fullscreen mode

Which will have a default user called postgres and will create a database called datadb. For more details, you can check the officianl PostgreSQL docker page https://hub.docker.com/_/postgres. Remember that you might need sudo to run docker in linux.

To check everything is working as expected, use your favorite PostgreSQL client. I use pgcli

(venv) user@computer:~/dags$ pgcli -h localhost -u postgres -d datadb
Password for postgres: 
Server: PostgreSQL 13.3 (Debian 13.3-1.pgdg100+1)
Version: 2.2.0

postgres@localhost:datadb>  
Enter fullscreen mode Exit fullscreen mode

As you can see, I'm connected to datadb in our docker instance. You can exit pgcli now (you can hit ctrl+d to exit pgcli or pretty much anything).

Finally, let's prepare a sample csv file. We will add a couple of rows and a header and put them in a file called values.csv in the extract folder (this folder was created by meltano when initialising the dags project).

echo $'id,text,value\n1,hello,34\n2,bye,65' > extract/values.csv
Enter fullscreen mode Exit fullscreen mode

All good! Let's go back to meltano.

Setting up the extractor and the loader

Now that we have our db instance up and running, let's setup a csv extractor.
To find the right extractor, we can explore them by doing:

meltano discover extractors
Enter fullscreen mode Exit fullscreen mode

And then we can add it (and test it):

meltano add extractor tap-csv --variant=meltano
meltano invoke tap-csv --version
Enter fullscreen mode Exit fullscreen mode

For more details see https://hub.meltano.com/extractors/csv

Similarly, we can add our loader which will be required for loading the data from the csv file to PostgreSQL

meltano add loader target-postgres
Enter fullscreen mode Exit fullscreen mode

Now, let's configure our plugins in the meltano.yml file that meltano created within the dags folder when we initialised it.

This file will have some configuration and we will add extra configuration for the extractor and the loader. Modify this file so it looks like this (your project_id will be different):

version: 1
send_anonymous_usage_stats: false
project_id: 59aca8ad-597d-47fc-a9f4-f1327774bd55
plugins:
  extractors:
  - name: tap-csv
    variant: meltano
    pip_url: git+https://gitlab.com/meltano/tap-csv.git
    config:
      files:
        - entity: values
          file: extract/values.csv
          keys:
            - id
  loaders:
  - name: target-postgres
    variant: transferwise
    pip_url: pipelinewise-target-postgres
    config:
      host: localhost
      port: 5432
      user: postgres
      dbname: datadb

Enter fullscreen mode Exit fullscreen mode

For PostgreSQL password, we use the .env file (remember to use the same password as the one you used when running the docker container)

echo 'export TARGET_POSTGRES_PASSWORD=password' > .env
Enter fullscreen mode Exit fullscreen mode

3. Using our setup

Now, we can run our pipeline using the elt command. We will skip the transformation (dbt) step for now.

meltano elt tap-csv target-postgres --transform=skip
Enter fullscreen mode Exit fullscreen mode

Let's check the results with pgcli

(venv) user@computer:~/dags$ pgcli -h localhost -u postgres -d datadb
Server: PostgreSQL 13.3 (Debian 13.3-1.pgdg100+1)
Version: 2.2.0

postgres@localhost:datadb> select * from tap_csv.values                                                              
+------+--------+---------+
| id   | text   | value   |
|------+--------+---------|
| 1    | hello  | 34      |
| 2    | bye    | 65      |
+------+--------+---------+
SELECT 2
Time: 0.012s
postgres@localhost:datadb> 
Enter fullscreen mode Exit fullscreen mode

4. Conclusion and final thoughts

This is a very simple but fully functional example. You can explore further loaders https://hub.meltano.com/loaders/ and extractors https://hub.meltano.com/extractors/ and you will see how easily you could productionise these pipelines.

It is important to note that I did have some issues with the default tap-csv variant (meltanolabs). The issue was related to that tap not being able to use the discovery functionality but there was next to nothing information about how to solve it and I couldn't find a single step by step tutorial neither. This worries me a bit but it could be a matter of my lack of experience with singer. However, while looking into singer, the number one issue flagged by the community is how open source taps tend to be buggy and they need to be used carefully.

I would happily give meltano a shot. I believe it is a very interesting technology and I get the motivation behind it. Perhaps you can try it as a re-write of simple pipelines that run in Airflow with bespoke python code but I would be cautious about a full migration to a framework that could be still in early days in comparison with the battle-tested alternatives.

Discussion (0)