DEV Community

Stefen
Stefen

Posted on

Building a Scalable RSS Feed Pipeline with Apache Airflow, Kafka, and MongoDB, Flask Api

In today’s data-driven world, processing large volumes of data in real-time has become essential for many organizations. The Extract, Transform, Load (ETL) process is a common way to manage the flow of data between systems. In this article, we’ll walk through how to build a scalable ETL pipeline using Apache Airflow, Kafka, and Python, Mongo and Flask

In this pipeline, the RSS feeds are scraped using a Python library called feedparser. This library is used to parse the XML data in the RSS feeds and extract the relevant information. The parsed data is then transformed into a standardized JSON format using Python's built-in json library. This format includes fields such as title, summary, link, published_date, and language, which make the data easier to analyze and consume.

NEWS_FEEDS = {
        "en": [
            "https://www.cnn.com/rss/edition.rss",
            "https://www.bbc.com/news/10628494",
            "https://www.nbcnews.com/id/303207/device/rss/rss.xml",
            "https://www.foxnews.com/about/rss/"
        ],
        "pl": [
            "https://www.tvn24.pl/najnowsze.xml",
            "https://www.rmf24.pl/fakty/polska/feed",
            "https://wiadomosci.wp.pl/rss",
            "https://www.money.pl/rss/wszystkie"
        ],
        "es": [
            "https://www.elpais.com/rss/feed.html?feedId=1022",
            "https://www.abc.es/rss/feeds/abc_EspanaEspana.xml",
            "https://www.elconfidencial.com/rss/",
            "https://www.elperiodico.com/es/rss/"
        ],
        "de": [
            "https://www.tagesschau.de/xml/rss2",
            "https://www.faz.net/rss/aktuell/",
            "https://www.zeit.de/rss",
            "https://www.spiegel.de/schlagzeilen/tops/index.rss"
        ],
        "fr": [
            "https://www.lemonde.fr/rss/une.xml",
            "https://www.lefigaro.fr/rss/figaro_actualites.xml",
            "https://www.liberation.fr/rss/",
            "https://www.lci.fr/rss"
        ]
    }
Enter fullscreen mode Exit fullscreen mode

What is Apache Airflow?

Apache Airflow is a platform used to programmatically author, schedule, and monitor workflows. It allows developers to create complex workflows by defining tasks and their dependencies. Airflow makes it easy to monitor the execution of tasks and provides an intuitive web interface to visualize the workflow.

What is Kafka?

Apache Kafka is a distributed event streaming platform that allows you to publish and subscribe to streams of records. Kafka provides high-throughput, low-latency, and fault-tolerant data transport. Kafka can be used for real-time data processing, streaming analytics, and log aggregation.

Implementing the ETL pipeline

To implement the ETL pipeline, we’ll use Python and the following libraries:

  • feedparser: A Python library that parses RSS feeds

  • beautifulsoup4: A Python library that extracts data from HTML and XML files

  • kafka-python: A Python library that provides a Kafka client

  • redis: A Python library that provides a Redis client

First, we’ll define a DAG (Directed Acyclic Graph) in Airflow to run the pipeline on a scheduled basis. The DAG consists of four tasks:

  1. Update the proxy pool: This task retrieves a list of proxy servers from Redis or a public API, tests their connectivity, and stores the valid proxies in Redis. We’ll use the proxies to avoid getting blocked by the RSS feed servers.

    1. Extract news: This task reads the RSS feeds using the valid proxies, extracts the news articles, and stores them in a list. We’ll use concurrent programming to speed up the extraction process.
    2. Validate data: This task checks if the news articles have all the required fields (title, link, and summary), and stores the valid articles in a separate list.
    3. Send to Kafka: This task sends the validated news articles to a Kafka topic, using the JsonConverter to serialize the data.

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow import DAG
    from airflow.utils.dates import days_ago

    from datetime import datetime, timedelta
    import feedparser
    from bs4 import BeautifulSoup
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import json
    import requests
    import random
    import redis
    import concurrent.futures
    import html

    NEWS_FEEDS = {
    "en": [
    "https://www.cnn.com/rss/edition.rss",
    "https://www.bbc.com/news/10628494",
    "https://www.nbcnews.com/id/303207/device/rss/rss.xml",
    "https://www.foxnews.com/about/rss/"
    ],
    "pl": [
    "https://www.tvn24.pl/najnowsze.xml",
    "https://www.rmf24.pl/fakty/polska/feed",
    "https://wiadomosci.wp.pl/rss",
    "https://www.money.pl/rss/wszystkie"
    ],
    "es": [
    "https://www.elpais.com/rss/feed.html?feedId=1022",
    "https://www.abc.es/rss/feeds/abc_EspanaEspana.xml",
    "https://www.elconfidencial.com/rss/",
    "https://www.elperiodico.com/es/rss/"
    ],
    "de": [
    "https://www.tagesschau.de/xml/rss2",
    "https://www.faz.net/rss/aktuell/",
    "https://www.zeit.de/rss",
    "https://www.spiegel.de/schlagzeilen/tops/index.rss"
    ],
    "fr": [
    "https://www.lemonde.fr/rss/une.xml",
    "https://www.lefigaro.fr/rss/figaro_actualites.xml",
    "https://www.liberation.fr/rss/",
    "https://www.lci.fr/rss"
    ]
    }

    headers_list = [
    {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Referer": "https://www.google.com/",
    "DNT": "1",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1"
    },
    {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Referer": "https://www.google.com/",
    "DNT": "1",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1"
    },
    {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8",
    "Accept-Encoding": "gzip, deflate",
    "Accept-Language": "en-GB,en-US;q=0.9,en;q=0.8",
    "Dnt": "1",
    "Referer": "https://www.google.com/",
    "Upgrade-Insecure-Requests": "1",
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36",
    "X-Amzn-Trace-Id": "Root=1-5ee7bae0-82260c065baf5ad7f0b3a3e3"
    },
    {
    "User-Agent": 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0',
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8",
    "Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7",
    "Referer": "https://www.reddit.com/",
    "DNT": "1",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1"
    }

    ]

    Define default_args dictionary to pass to the DAG

    ARGS = {
    "owner": "stefentaime",
    "start_date": days_ago(0),
    "retries": 1,
    "retry_delay": timedelta(seconds=30)
    }

    dag = DAG(
    dag_id="ETL-Pipeline",
    default_args=ARGS,
    description="",
    schedule_interval="0 0 1 * *",
    tags=["ETL", "kafka", "Scrapting"]
    )

    REDIS_CONFIG = {'host': 'redis', 'port': 6379, 'decode_responses': True}
    REDIS_KEY = 'proxies'
    PROXY_WEBPAGE = 'https://free-proxy-list.net/'
    TESTING_URL = 'https://httpbin.org/ip'
    MAX_WORKERS = 20
    PROXY_EXPIRATION = timedelta(minutes=5)

    def get_proxies():
    r = redis.Redis(**REDIS_CONFIG)
    if r.exists(REDIS_KEY):
    proxies = r.lrange(REDIS_KEY, 0, -1)
    expiration = r.ttl(REDIS_KEY)
    if expiration == -1:
    r.expire(REDIS_KEY, PROXY_EXPIRATION)
    elif expiration < PROXY_EXPIRATION.total_seconds():
    r.delete(REDIS_KEY)
    proxies = []
    else:
    proxies = []
    if not proxies:
    headers = random.choice(headers_list)
    page = requests.get(PROXY_WEBPAGE, headers=headers)
    soup = BeautifulSoup(page.content, 'html.parser')
    for row in soup.find('tbody').find_all('tr'):
    proxy = row.find_all('td')[0].text + ':' + row.find_all('td')[1].text
    proxies.append(proxy)
    r.rpush(REDIS_KEY, *proxies)
    r.expire(REDIS_KEY, PROXY_EXPIRATION)
    return proxies

    def update_proxypool(**kwargs):
    get_proxies()

    def test_proxy(proxies):
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    results = list(executor.map(test_single_proxy, proxies))
    return (proxy for valid, proxy in zip(results, proxies) if valid)

    def test_single_proxy(proxy):
    headers = random.choice(headers_list)
    try:
    resp = requests.get(TESTING_URL, headers=headers, proxies={"http": proxy, "https": proxy}, timeout=3)
    if resp.status_code == 200:
    return True
    except:
    pass
    return False

    Define the task to update the proxypool

    def update_proxypool(**kwargs):
    proxies = get_proxies()
    valid_proxies = list(test_proxy(proxies))
    kwargs['ti'].xcom_push(key='valid_proxies', value=valid_proxies)

    import datetime

    next_id = 1

    def extract_website_name(link):
    # Extract the website name from the link
    website_name = link.split('//')[1].split('/')[0]
    # Remove any leading "www." from the website name
    website_name = website_name.replace('www.', '')
    return website_name

    def extract_article_data(entry, language):
    global next_id
    title = entry.title.encode('ascii', 'ignore').decode()
    soup = BeautifulSoup(entry.summary, 'html.parser')
    summary = html.unescape(soup.get_text().strip().replace('\xa0', ' '))
    link = entry.link
    date_published = entry.get('published_parsed', None)
    if date_published is not None:
    date_published = datetime.datetime(*date_published[:6])
    time_since_published = datetime.datetime.utcnow() - date_published
    if time_since_published < datetime.timedelta(hours=1):
    today = datetime.datetime.utcnow().strftime("%d-%m-%Y")
    website_name = extract_website_name(link)
    unique_id = f"{language.upper()}{next_id:02d}-{website_name}-01-{today}"
    next_id += 1
    return {
    'id': unique_id,
    'title': title,
    'link': link,
    'summary': summary,
    'language': language
    }
    return None

    def extract_news_feed(feed_url, language, proxy):
    feed = feedparser.parse(feed_url, request_headers={'User-Agent': proxy})
    articles = []
    extracted_articles = set()
    for entry in feed.entries:
    if len(articles) >= 2:
    break
    link = entry.link
    title = entry.title.encode('ascii', 'ignore').decode()
    unique_id = f'{language}-{link}-{title}'
    if unique_id in extracted_articles:
    continue
    extracted_articles.add(unique_id)
    article_data = extract_article_data(entry, language)
    if article_data is not None:
    articles.append(article_data)
    return articles

    def extract_news(**kwargs):
    valid_proxies = set(kwargs['ti'].xcom_pull(key='valid_proxies'))
    articles = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = [executor.submit(extract_news_feed, feed_url, language, proxy) for language in NEWS_FEEDS.keys()
    for feed_url in NEWS_FEEDS[language] for proxy in valid_proxies]
    for future in concurrent.futures.as_completed(futures):
    result = future.result()
    if result is not None:
    articles.extend(result)
    kwargs['ti'].xcom_push(key='articles', value=articles)
    return articles

    Define the task to validate the quality of the data

    def validate_data(**kwargs):
    articles = kwargs['ti'].xcom_pull(key='articles', task_ids='extract_news')
    validated_articles = [article for article in articles if all(article.get(k) for k in ('title', 'link', 'summary'))]
    kwargs['ti'].xcom_push(key='validated_articles', value=validated_articles)
    return validated_articles

    Define the task to send data to the Kafka topic

    def send_to_kafka(**kwargs):
    validated_articles = kwargs['ti'].xcom_pull(key='validated_articles', task_ids='validate_data')
    producer = KafkaProducer(bootstrap_servers='broker:29092')
    for article in validated_articles:
    try:
    producer.send('rss_feeds', key=article['title'].encode(), value=json.dumps(article).encode())
    except KafkaError as e:
    print(f"Failed to send message to Kafka: {e}")
    producer.flush()
    print("Data sent to Kafka successfully.")

    Define the task dependencies

    update_proxypool_task = PythonOperator(task_id='update_proxypool', python_callable=update_proxypool, provide_context=True, dag=dag)
    extract_news_task = PythonOperator(task_id='extract_news', python_callable=extract_news, provide_context=True, dag=dag)
    validate_data_task = PythonOperator(task_id='validate_data', python_callable=validate_data, provide_context=True, dag=dag)
    send_to_kafka_task = PythonOperator(task_id='send_to_kafka', python_callable=send_to_kafka, provide_context=True, dag=dag)

    Set the task dependencies

    update_proxypool_task >> extract_news_task >> validate_data_task >> send_to_kafka_task

Next, we’ll deploy a Kafka connector to consume the news articles from the Kafka topic and load them into MongoDB. We’ll use the MongoSinkConnector from the mongo-kafka-connect library, which provides an efficient and reliable way to integrate Kafka with MongoDB. The connector is configured to read the news articles from the Kafka topic, and write them to a MongoDB collection in the demo database.

{
    "name": "mongodb-sink-connector",
    "config": {
      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max": "1",
      "topics": "rss_feeds",
      "connection.uri": "mongodb://debezium:dbz@mongo:27017/demo?authSource=admin",
      "database": "demo",
      "collection": "rss_feeds_collection",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }
Enter fullscreen mode Exit fullscreen mode

To run the pipeline, you need to set up the following components:

  • Apache Airflow: Use pip to install Airflow, and create a Python script that defines the DAG.

  • Redis: Set up a Redis instance to store the proxy servers.

  • Kafka: Install and configure a Kafka cluster with a single broker, and create a Kafka topic named rss_feeds.

  • MongoDB: Install and configure a MongoDB cluster, and create a database named demo.

  • Kafka Connector: Deploy the mongo-kafka-connect connector to your Kafka cluster, and configure it to read from the rss_feeds topic and write to the rss_feeds_collection collection in the demo database.

Flask web application to serve news articles stored in a MongoDB database. The web application provides the following endpoints:

from pymongo import MongoClient
from bson.objectid import ObjectId
from flask import Flask, request, jsonify, render_template

client = MongoClient('mongodb://debezium:dbz@localhost:27017/?authSource=admin')
db = client['demo']
collection = db['rss_feeds_collection']


app = Flask(__name__, template_folder='/path/template')

# get all news articles
@app.route('/news', methods=['GET'])
def get_all_news():
    cursor = collection.find({}, {"_id": 0})
    news = []
    for item in cursor:
        news.append({'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language'], 'id': item['id']})
    return jsonify({'news': news})

# get a news article by id
@app.route('/news/<id>', methods=['GET'])
def get_news_by_id(id):
    item = collection.find_one({'id': id})
    if item:
        return jsonify({'_id': str(item['_id']), 'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language']})
    else:
        return jsonify({'error': 'News article not found'})

# update a news article by id
@app.route('/news/<id>', methods=['PUT'])
def update_news_by_id(id):
    item = collection.find_one({'id': id})
    if item:
        data = request.get_json()
        collection.update_one({'id': id}, {'$set': data})
        return jsonify({'message': 'News article updated successfully'})
    else:
        return jsonify({'error': 'News article not found'})

# delete a news article by id
@app.route('/news/<id>', methods=['DELETE'])
def delete_news_by_id(id):
    item = collection.find_one({'id': id})
    if item:
        collection.delete_one({'id': id})
        return jsonify({'message': 'News article deleted successfully'})
    else:
        return jsonify({'error': 'News article not found'})


# render a web page with news articles
@app.route('/', methods=['GET'])
def news_page():
    page = request.args.get('page', 1, type=int)
    language = request.args.get('language')

    # build query for language filtering
    query = {} if not language else {'language': language}

    # retrieve total count and paginated news articles
    count = collection.count_documents(query)
    cursor = collection.find(query, {"_id": 0}).skip((page-1)*5).limit(8)
    news = []
    for item in cursor:
        news.append({'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language'], 'id': item['id']})

    # calculate number of pages for pagination
    num_pages = count // 8 + (1 if count % 8 > 0 else 0)

    return render_template('index.html', news=news, page=page, language=language, num_pages=num_pages)

if __name__ == '__main__':
    app.run(debug=True)
Enter fullscreen mode Exit fullscreen mode
  • /news: GET all news articles from the database

  • /news/: GET a news article with the specified id from the database

  • /news/: PUT updates a news article with the specified id in the database

  • /news/: DELETE deletes a news article with the specified id from the database

  • /: GET a web page that displays paginated news articles with an optional language filter

Prerequisites

Before we start, make sure you have the following installed:

  • Python 3

  • Docker and Docker Compose

  • A text editor

Steps To Run:

  1. Clone the project to your desired location:

Execute the following command that will create the .env file containing the Airflow UID needed by docker-compose:

  • $ echo -e "AIRFLOW_UID=$(id -u)" > .env

Build Docker:

$ docker-compose build

Initialize Airflow database:

  • $ docker-compose up airflow-init

Start Containers:

$ docker-compose up -d

When everything is done, you can check all the containers running:

  • $ docker ps

Now you can access Airflow web interface by going to http://localhost:8080 with the default user which is in the docker-compose.yml. Username/Password: airflow. Now, we can trigger our DAG and see all the tasks running.

To setup Kafka and MongoDB, navigate to cd mongo-kafka:

  • $ cd mongo-kafka

Start Kafka and MongoDB containers:

  • $ docker-compose up -d

Execute the following command that will create SinkConnector for MongoDB:

Execute the following command that will Run Api

  • $ python api.pi

Conclusion:

In conclusion, this article has covered a variety of topics related to building a scalable RSS feed pipeline. We started by discussing RSS feeds and how to scrape them using Python. We then explored the use of Apache Airflow for orchestrating the pipeline and scheduling tasks.

Next, we looked at how to use Kafka as a message broker to handle the data flow between the different components of the pipeline. We also examined the use of Kafka Connect to integrate Kafka with MongoDB and to enable easy data ingestion.

To visualize the data ingested into MongoDB, we built a simple Flask API with Jinja templates to render a web page with paginated news articles. We used Bootstrap to make the page responsive and added filtering capabilities based on the language of the news articles.

https://medium.com/@stefentaime_10958/building-a-scalable-rss-feed-pipeline-with-apache-airflow-kafka-and-mongodb-flask-api-da379cc2e3fb

Top comments (0)