DEV Community

Francesco Tisiot
Francesco Tisiot

Posted on • Originally published at aiven.io on

Remove naughty words from your data using DataCater

Using PostgreSQL® and DataCater to get rid of naughty words

Oh ****, everyone knows that bad words shouldn't be used!

Even though you might not expect it, profanity exists everywhere, whether it's a single person talking, a social media comment, or a live show. Giving visibility to such bad words can have disastrous effects, and, in the word of real-time streaming, we can't rely on a human check. Therefore we need to build streaming systems that automatically prevent bad words from being displayed. If you're interested in this, today's your lucky day, because we're going to create one with a couple of technologies and integrations!

In this blog post, we'll use the great Simpsons dataset, perfect for our scenario since it contains phrases with several bad words, like a live presenter cursing periodically. We'll demonstrate how we can clean up sentences in streaming mode using:

  • Aiven for PostgreSQL®, everyone's beloved database, for storing the Simpsons dataset.
  • ApiLayer Bad words API, for detecting bad words in the sentences.
  • Aiven for OpenSearch®, for storing the polished Simpsons dataset; we’ll use search filters to investigate the results and the amazing OpenSearch Dashboard to visualize them.
  • DataCater, an online service to define streaming data pipelines with an interesting option to write Python transformations; DataCater enables us to stream the data from PostgreSQL® to OpenSearch® and detect swears using ApiLayer’s Bad words API on the way.

Create the data storage

We need a PostgreSQL database that will act as storage for the sentences. For convenience, we are using Aiven for PostgreSQL; we can create it with the Aiven CLI and the following command:

avn service create demo-pg  \
    --service-type pg       \
    --plan hobbyist         \
    --cloud google-europe-west8
Enter fullscreen mode Exit fullscreen mode

The minimal hobbyist plan is all we need for the Simpsons dataset. We also chose the google-europe-west8 zone, one of the newest zones covered by Aiven, conveniently located in Milan. You can review the list of clouds available in the dedicated document.

We also need to create an Aiven for OpenSearch service which will contain the polished results:

avn service create demo-opensearch  \
    --service-type  opensearch      \
    --plan hobbyist                 \
    --cloud google-europe-west8
Enter fullscreen mode Exit fullscreen mode

As before, we're using the hobbyist plan and the same region. Let's wait for both services to be up with:

avn service wait demo-pg
avn service wait demo-opensearch
Enter fullscreen mode Exit fullscreen mode

Load the data in PostgreSQL

The Simpsons dialogue dataset is available in Kaggle. After creating a Kaggle account, we can download it as archive.zip, and then decompress that to produce the data in CSV format in a file named simpsons_dataset.csv. The file contains all the dialogues together with the character speaking each sentence.

To load it into the PostgreSQL database, we first need connect to the PostgreSQL command prompt with:

avn service cli demo-pg
Enter fullscreen mode Exit fullscreen mode

Then we need to create a table called simpsons_dialogues to host our dataset:

create table simpsons_dialogues(
    id serial primary key,
    character_speaking text,
    words text);
Enter fullscreen mode Exit fullscreen mode

Copying the dataset over to PostgreSQL can be done using the PostgreSQL \copy command:

\copy simpsons_dialogues(character_speaking, words) from 'simpsons_dataset.csv' csv header
Enter fullscreen mode Exit fullscreen mode

The data should be ingested in a matter of seconds... Once the upload is finished, you can check there's some data in the table with the following query:

select count(*) from simpsons_dialogues;
Enter fullscreen mode Exit fullscreen mode

Now it's time to play.

Define the transformation flow in DataCater

After importing the data, we need to define how we'll transform the data. We can head to DataCater and sign up for a free trial, entitling us to define up to two pipelines. Once our account is confirmed, we will land in the main dashboard where we can create the data endpoints of our pipeline.

Define the PostgreSQL source

We need to tell DataCater where to source the data and where to sink it. Let's start by mapping the source of data, by clicking on the Data Sources link at the top, and selecting Create a data source. The list of options available is quite wide, and includes PostgreSQL which we need to load the Simpsons dataset.

To define a datasource on top of Aiven for PostgreSQL we need to specify:

  • Name: the logical name of the datasource, we can use simpsons_source_data
  • Hostname or IP, Port, Username and Password: the connection details to use to reach Aiven for PostgreSQL. We can find all the details with the following Aiven CLI command:
  avn service get demo-pg --format '{service_uri_params}'
Enter fullscreen mode Exit fullscreen mode
  • SSL: Aiven for PostgreSQL requires SSL, therefore we need to specify Use SSL
  • Database and Schema: the database and schema where the data is residing. We used the defaults, therefore the database is defaultdb and the schema is public
  • Table name: the table with the data, in our case it is simpsons_dialogues. We can either allow DataCater to fetch the list of tables and select from the dropdown, or fill it in explicitly.

We also need to change the Change Data Capture method, and select Logical replication with wal2json.

Logical replication wal2json

With all the fields filled, we can check the connection and, if successful, click on Create datasource.

Define the OpenSearch target

A similar exercise can be done with the OpenSearch index, that we'll use as a data pipeline target. We need to head to Data sinks and click on Create data sink, select Elasticsearch, and provide the following details:

  • Name: the logical name of the data sink, we can use simpsons_sink_data
  • Hostname or IP, Port, Username and Password: the connection details to use to reach Aiven for PostgreSQL. We can find all the details with the following Aiven CLI command:
  avn service get demo-opensearch --format '{service_uri_params}'
Enter fullscreen mode Exit fullscreen mode
  • HTTP Scheme: we can use the HTTPS protocol, available by default in Aiven for openSearch
  • Index name: we can either leave this blank (an index named datacater_pipeline_[pipeline_id] will be used), or fill it in. In our case we'll fill it in with simpsons_cleaned_dialogues

Once we've checked that the connection is valid, we click on Create data sink to finalise the process.

Get the ApiLayer token and enable the Bad words API

As anticipated, we're going to use the ApiLayer Bad words API to clean-up the sentences, which means we need to setup an account. Once that's created and verified, we can subscribe to the ApiLayer Bad words API free plan (enough for our little test).

Remember to make a note of the API Key from your account details on the ApiLayer website. You'll need it down the road.

Polish all the sentences

Time to create our data pipeline.

Let's head back to DataCater and select the Pipelines tab at the top of the window and click on Create pipeline. We can now:

  • Select the data source named simpsons_source_data pointing to our PostgreSQL table.
  • Select the data sink named simpsons_cleaned_dialogues pointing to our OpenSearch index.

Since ApiLayer Bad words API Free Plan only allows us to clean up to 100 sentences a day, we'll focus on the most recent sentences of the iconic character Bart Simpson. To do this, let's navigate to the Filters tab and add the following filters:

  • In the character_speaking column, add a filter of type Require values that equal value with the value Bart Simpson
  • In the id column, add a filter of type Require values that are greater than value with the value 157500. The 157500 value has been hand crafted accurately to have enough examples to play with, but not too much to exceed the API's free tier daily quota.

The Filters tab should look like the following image. Don't worry about the 100% drop rate, it's based on sample data, and we still have 54 rows to parse. You can check it with the following query in the PostgreSQL database

select count(*)
from simpsons_dialogues
where character_speaking = 'Bart Simpson'
and id > 157500
Enter fullscreen mode Exit fullscreen mode

The Filter tab showing the two filters applied to the pipeline

Now we need to define the cleaning pipeline by navigating to the Transform tab, where the magic 🪄 happens.
If we click on the Apply transformation button below the character_speaking column, we can check the long list of options available for transformations and select Capitalize to normalise all characters capitalization.

Then we can move to the words column and, apply the User defined transformation which allows us to write Python, amazing! The DataCater Code Transformations documentation says that we can use the requests module to perform API calls to ApiLayer.

With Python we can call the ApiLayer Bad Words API, pass the phrase and store the cleanup response. Pasting the following code (taken from the ApiLayer documentation) and replacing the APILAYER_KEY placeholder with the value we saved earlier from the ApiLayer website will do the trick:

import requests

url = "https://api.apilayer.com/bad_words?censor_character={censor_character}"


headers= {
  "apikey": "APILAYER_KEY"
}

def transform(value, row):
  result = ''
  if row['words'] is not None:
    payload = row['words'].encode("utf-8")
    response = requests.request("POST", url, headers=headers, data = payload)

    status_code = response.status_code
    result = response.text
  if result is None:
    result=''
  return str(result)
Enter fullscreen mode Exit fullscreen mode

In the above code, we're setting the headers and the URL, and then retrieving the words column from the current row (row['words]), encoding it and passing it to the ApiLayer API, and then finally parsing the result.

Once we've finished our transformation definition, the tab should be similar to the following image.

Screenshot of the transformation tab including the capitalize and user defined transformation

Deploy the pipeline and check the results

It's finally time to deploy the pipeline. We can head to the Deploy tab and click on Create deployment. We just need to wait a couple of seconds for the deployment to be created and then we can hit the Start button. The DataCater UI allows us also to browse the deployment logs, which is quite handy to spot if something goes wrong.

Where's our data gone? We can check it in OpenSearch Dashboards, available alongside our Aiven for OpenSearch. We can find the URL and login credentials with:

avn service get demo-opensearch --json \
    | jq -r '.connection_info.opensearch_dashboards_uri'
Enter fullscreen mode Exit fullscreen mode

We can head to Stack Management select Index Patterns, and create an index pattern with the name simpsons*. If our data pipeline is working we should see that the simpsons_cleaned_dialogues exists.

OpenSearch Index pattern screen showing the simpsons\_cleaned\_dialogues index

After creating the index pattern, we can head to the Discover tab, add a filter for words.bad_words_total not being equal to 0, and review the dialogues that seem to contain bad words. Some of them are false positives... but hey better safe than sorry!

List of censored words

Is it really streaming?

As of now, we have only polished a static set of sentences. Can we demonstrate that the flow is acting in streaming mode too?

With our terminal connected to the PostgreSQL instance, we can now take a deep breath and run the following insert statement replacing the BAD_WORDS_PHRASE with something really impolite:

insert into simpsons_dialogues (character_speaking, words)
    values ('Bart Simpson', 'BAD_WORDS_PHRASE');
Enter fullscreen mode Exit fullscreen mode

And immediately we should see a new entry in the OpenSearch index. In my case... it worked!

By refreshing the OpenSearch Dashboard, scrolling to the bottom, checking the last entry and clicking on the little arrow > next to the message, we can see the JSON output in detail, represented below with some * polishing the bad words.

{
  "_index": "thesimpsons",
  "_type": "_doc",
  "_id": "158315",
  "_version": 0,
  "_score": 0,
  "_source": {
    "id": 158315,
    "character_speaking": "Bart Simpson",
    "words": {
      "content": "D*** A** S***",
      "bad_words_total": 3,
      "bad_words_list": [
        {
          "original": "D***",
          "word": "d***",
          "deviations": 0,
          "info": 2,
          "start": 0,
          "end": 4,
          "replacedLen": 4
        },
        {
          "original": "A**",
          "word": "a**",
          "deviations": 0,
          "info": 2,
          "start": 5,
          "end": 8,
          "replacedLen": 3
        },
        {
          "original": "S**",
          "word": "s**",
          "deviations": 0,
          "info": 2,
          "start": 9,
          "end": 13,
          "replacedLen": 4
        }
      ],
      "censored_content": "**** *** ****"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

A real-time streaming solution that polishes sentences... This is **** brilliant!

Enhance streaming data pipelines with Python extensions

Jokes apart, this is just a simple example of what's achievable by plugging DataCater on top of Aiven services, like Aiven for PostgreSQL and Aiven for OpenSearch. The rich set of pre-cooked transformations allows you to cover a great part of the typical data manipulations needed, and, for the rest, there's always the Python extension handy.

Some more resources that you might find useful:

Top comments (0)