DEV Community

Cover image for Visualizing and Analyzing Reddit in Real-Time With Kafka and Memgraph
Memgraph for Memgraph

Posted on • Originally published at memgraph.com

Visualizing and Analyzing Reddit in Real-Time With Kafka and Memgraph

We held a company-wide hackathon where we challenged each other to build compelling, useful applications using a streaming data source, Kafka, Memgraph, and a Web Application backend. First up, visualizing and performing sentiment analysis on Reddit posts in real-time.

Our team spent Hackathon week building a graph application on top of streaming Reddit data. If you want to jump right to the code, check out the GitHub repo, and if you want to learn more about it, join our Discord Community Chat!

The Data Source

We knew that we wanted to experiment wtih some sentiment analysis algorithms on the graph we created -- and the real-time requirement meant that we needed something with a good amount of text. Something where there might be feelings involved. Sound familiar? Reddit is a resource we all know and use so it seemed like a great fit.

The Data Model

First, we defined the data model. We wanted to keep it as simple as possible.

There are three kinds of nodes in the graph:

  • SUBMISSION: A Reddit post that contains the properties title, body, created_at, id, url, and sentiment. The sentiment property denotes if the sentiment of the submission is positive, negative, or neutral.
  • COMMENT: Similarly like the SUBMISSION nodes, comments contain the properties body, created_at, id, and sentiment.
  • REDDITOR: This node contains information about the user who posted the submission or comment. The only properties are id and name.

memgraph-blog-reddit-graph-data-model

Here's how to understand these relationships:

  • A SUBMISSION node is CREATED_BY a REDDITOR
  • A COMMENT node is CREATED_BY a REDDITOR
  • A COMMENT can be:
    • a REPLY_TO another COMMENT OR
    • a REPLY_TO a SUBMISSION.

And that's all!

The Full App Architecture

Our app consists of five
main services:

  1. reddit-stream: This Python script connects to the Reddit API using the PRAW library, retrieves new posts, and sends them to the Kafka cluster.
  2. kafka: A Kafka cluster consisting of four topics. There are two consumers (one is in memgraph and the other is in the backend-app) and three producers (in memgraph, backend-app and reddit-stream).
  3. memgraph: The graph analytics platform that stores the incoming Reddit data from Kafka and performs sentiment analysis for each comment and submission.
  4. backend-app: A Flask server that consumes the Kafka stream and sends it via WebSocket to the frontend-app.
  5. frontend-app: An Angular app that visualizes the Reddit network with the D3.js library.

memgraph-blog-reddit-architecture

Kafka Configuration

The cluster has four topics in total:

  • comments: When a new comment is posted to the subreddit, the reddit-stream service sends it to this topic.
  • submissions: When a new submission is posted to the subreddit, the reddit-stream service sends it to this topic.
  • created_objects: Used for newly created objects in Memgraph. A trigger is activated whenever a new SUBMISSION or COMMENT node is created.
  • node_deleter: We only want to retain Reddit data in Memgraph for a certain amount of time. This topic receives a timestamp that indicates what data to delete.

Processing Incoming Streaming Data

You can create native streams in Memgraph that can be connected to Kafka topics. When messages arrive on these streams, Memgraph allows you to specify functions to call with this data. This allows for the creation of sophisticated transformation methods in incoming data: this is the key to turning streaming data into a live graph.

Here is an example transformation method that receives a Reddit submission from the Kafka topic submissions and creates a
new SUBMISSION node in Memgraph as well as a REDDITOR node if the user
doesn't already exist.

@mgp.transformation
def submissions(messages: mgp.Messages
                ) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
    result_queries = []

    for i in range(messages.total_messages()):
        message = messages.message_at(i)
        submission_info = json.loads(message.payload().decode('utf8'))
        result_queries.append(
            mgp.Record(
                query=("CALL sentiment_analyzer.run($title) YIELD sentiment "
                       "CREATE (s:SUBMISSION {id: $id, title: $title, body: $body, url: $url, created_at: $created_at, sentiment: sentiment}) "
                       "MERGE (r:REDDITOR {id: $redditor_id, name: $redditor_name}) "
                       "CREATE (s)-[:CREATED_BY]->(r)"),
                parameters={
                    "title": submission_info["title"],
                    "body": submission_info["body"],
                    "url": submission_info["url"],
                    "created_at": submission_info["created_at"],
                    "id": submission_info["id"],
                    "redditor_id": submission_info["redditor"]["id"],
                    "redditor_name": submission_info["redditor"]["name"]}))

    return result_queries
Enter fullscreen mode Exit fullscreen mode

Another interesting transformation is the node_deleter() which receives a
timestamp from the Kafka topic node_deleter. The transformation executes a
Cypher query in the database that deletes every submission and comment that has
a created_at property older than the received timestamp. This is what the
query looks like:

...
query=("MATCH (n) "
       "WHERE n.created_at < $delete_limit OR degree(n) == 0 "
       "DETACH DELETE n"),
parameters={'delete_limit': delete_info['timestamp']}
...
Enter fullscreen mode Exit fullscreen mode

Connecting The Backend

The backend server is implemented in Python using Flask. A Kafka
consumer regularly checks for new messages and sends them via WebSocket to all
connected clients. There is only one scenario where the server queries Memgraph
directly. That's because we want to visualize a few submissions and comments
right after a user opens our app, we can't wait for them to be posted on the
subreddit.

The API Endpoints

The web application contains the following endpoints:

  • /test GET This is just a testing endpoint to see if the WebSocket connection is up and running. If you don't see anything in the Angular client (on localhost:4200) then check this endpoint to make sure that messages are actually arriving.
  • /api/graph GET This endpoint returns 10 nodes from Memgraph so they can be visualized. Why is this necessary? Because otherwise, there wouldn't be any rendered nodes in the app when it's started. If the subreddit isn't active, you could be looking at a blank screen for quite some time.
  • /connect WEBSOCKET This is where the client can connect to the WebSocket and receive live updates of new submissions and comments.

Conclusion

The idea behind the hackathon was to see how easy it would be to take off the shelf components and wire them together to create a graph on top of incoming streaming data. It's a testament to the excellent Open Source ecosystem that it was actually quite easy!

If we had more time, we would probably spend time thinking about more advanced algorithms like community detection, calculating the
PageRank or
betweenness
centrality
,
performing online link prediction, etc. on the data.

If this sounds cool to you, check out the GitHub repo, and let us know what you think in our Discord Community Chat!.

Read more about real-time analytics on memgraph.com

Top comments (0)