DEV Community

Cover image for Amazon MSK 101 with Python

Amazon MSK 101 with Python

Professionals familiar with microservices, as well as high throughput and low latency applications, have come to recognize the critical role of distributed event streaming services. Across a wide spectrum of uses, from IT applications to cutting-edge 5G Core Edge deployments, Kafka has become an indispensable tool for enhancing performance.

What is Kafka?

Apache Kafka is a robust platform for distributed event streaming, engineered to manage daily event volumes reaching into the trillions. Initially conceived as a messaging queue system, Kafka has evolved to support a broader range of workloads through continuous development.

Basic Apache Kafka and MSK Terminology

  • An Apache Kafka Cluster is made up of at least three server instances, known as Brokers.
  • For internal cluster configuration data storage and management, Apache Kafka employs Zookeeper. -Kafka organizes data records within Topics.
  • Records are written to topics by a Data Producer and read from them by a Data Consumer.
  • A single broker may host multiple topics.
  • Topics are segmented into Partitions, which are then distributed and replicated across various brokers.
  • The service Amazon Managed Streaming for Apache Kafka is commonly abbreviated as Amazon MSK.
  • Amazon MSK provides full compatibility with the native Apache Kafka APIs for both producers and consumers.
  • In the context of Amazon MSK, brokers are sometimes called Nodes.

collage

Main Use cases:

  • Messaging Preference: Apache Kafka is often likened to other messaging systems, such as ActiveMQ or RabbitMQ.
  • User Activity Monitoring: Kafka's initial application was for monitoring user behavior on websites.
  • Statistical Aggregation: Kafka serves as a tool for compiling statistics from various distributed services or applications.
  • Log Collection: Given its distributed nature, Kafka is well-equipped to manage the substantial throughput demands of log processing.
  • Data Stream Management: Kafka acts as the central framework in a data processing pipeline, facilitating data flow and processing before storage in topics.
  • State Management via Event Sourcing: Kafka is adept at maintaining application states, offering the advantage of data replayability when required.
  • Transaction Logging: As a ledger of transactions, Kafka is particularly valuable for distributed systems with high volumes of transactions.

Hands-on

In the following steps, I will explain the steps to create an Amazon MSK cluster, brokers, and a basic configuration and use case using a Python script to ingest Data into each broker.

Note: Big thanks to #cloudacademy and #AWSBites for sharing all this knowledge about Amazon MSK. I tried to get the best of both worlds to prepare this demo.

1. Create Amazon MSK Cluster

We start by selecting Amazon MSK Cluster from list of services in AWS Console, then select the "Create Cluster" option
Cluster

2. Select MSK Cluster Type

For this demo, I have selected:

  • "Provisioned type"
  • "2.21" Apache Kafka version (reason of this version to be explained later in the demo)
  • Broker type as "Kafka.m5.large"
  • 2vCPU, 8GiB and Up to 10Gbps
  • Storage of 100 GB.

Cluster Type

3. Acknowledge Cluster settings and limitations

This single table will summarize networking information of the MSK cluster to be created, highlighting in red the values we CANNOT edit after the cluster is launched. VPC and subnet dimensioning are critical to avoid re-deployments in the future.

Network and SW

Encryption

Finally we will click on "Create cluster"

4. Amazon MSK Cluster verification

Once the MSK cluster is completed (it may take up to 20 minutes, as the minimum number of brokers is 3 and will be distributed in different zones within the same region), we can examine the different features from this cluster

features

Monitoring: Amazon EKS handles the whole cluster monitoring by tracking Brokers' CPU, Disk, and Memory usage, among other KPIs.

Amazon MSK offers three tiers of Amazon CloudWatch metrics:

  • Basic Monitoring: Provides fundamental metrics at both the cluster and broker levels.
  • Enhanced Broker-Level Monitoring: Offers all the metrics from basic monitoring, plus additional detailed metrics for brokers.
  • Enhanced Topic-Level Monitoring: Builds upon enhanced broker-level monitoring by adding advanced metrics specific to topics.

monitoring

The "view Client Information" will allow us to check the host-port pairs to establish a connection to this cluster (both plain text & TLS)

- Bootstrap Servers:

  • Displays the broker addresses for TLS (secure) and Plaintext (non-secure) connections.
  • These details will be obtained later during the lab via the AWS CLI. - Zookeeper Connection:
  • Zookeeper manages and disseminates internal configuration data within the cluster.
  • Typically, direct interaction with Zookeeper is reserved for complex administrative and configuration efforts.

Image description

5. Amazon MSK Cluster Configuration

Warning:
Be mindful of the following constraints with Amazon MSK:

  • A maximum of thirty brokers for each Amazon MSK cluster.
  • A limit of ninety brokers across your AWS account.
  • A cap of one hundred cluster configurations per AWS account.
  • Broker storage limits:
    • Minimum storage capacity: 1 GiB
    • Maximum storage capacity: 16384 GiB

Pre-requisite:
For this part of the lab, you will need an EC2 instance within the same VPC and reachable to our AWS MSK cluster

EC2

We connect to this EC2 through Instance Connect (user ec2-user)

Instance_Connect

Next, I will install the Apache Kafka command-line tools on an EC2 instance and use them to create 1 topic in an Amazon MSK cluster.

wget https://clouda-labs-assets.s3-us-west-2.amazonaws.com/amazon-msk/kafka_2.12-2.4.0.tgz
tar -xzf kafka_2.12-2.4.0.tgz
mv kafka_2.12-2.4.0 kafka
export PATH="$PATH:/home/ec2-user/kafka/bin"
Enter fullscreen mode Exit fullscreen mode
[ec2-user@ip-10-0-0-79 ~]$ aws kafka list-clusters
{
    "ClusterInfoList": [
        {
            "BrokerNodeGroupInfo": {
                "BrokerAZDistribution": "DEFAULT",
                "ClientSubnets": [
                    "subnet-0167105441aa53e15",
                    "subnet-0d0a11e136154ee56",
                    "subnet-045aa3b9f6af13f05"
                ],
                "InstanceType": "kafka.t3.small",
                "SecurityGroups": [
                    "sg-02ff01498307ca426"
                ],
                "StorageInfo": {
                    "EbsStorageInfo": {
                        "VolumeSize": 5
                    }
                }
            },
            "ClusterArn": "arn:aws:kafka:us-west-2:610080016601:cluster/MSKCluster/59748c30-487e-4911-b4ab-84723ba21999-13",
            "ClusterName": "MSKCluster",
            "CreationTime": "2024-02-12T08:04:57.942Z",
            "CurrentBrokerSoftwareInfo": {
                "KafkaVersion": "2.2.1"
            },
            "CurrentVersion": "K3AEGXETSR30VB",
            "EncryptionInfo": {
                "EncryptionAtRest": {
                    "DataVolumeKMSKeyId": "arn:aws:kms:us-west-2:610080016601:key/d605ad91-f2d8-4d0f-8b1b-7d99336034fc"
                },
                "EncryptionInTransit": {
                    "ClientBroker": "TLS_PLAINTEXT",
                    "InCluster": true
                }
            },
            "EnhancedMonitoring": "DEFAULT",
            "OpenMonitoring": {
                "Prometheus": {
                    "JmxExporter": {
                        "EnabledInBroker": false
                    },
                    "NodeExporter": {
                        "EnabledInBroker": false
                    }
                }
            },
            "NumberOfBrokerNodes": 3,
            "State": "ACTIVE",
            "Tags": {
                "ca-environment": "production",
                "ca-laboratory-uuid": "9ca6bcad-eac8-40b7-b460-eb30b03e0b9b",
                "ca-environment-session-id": "2235578",
                "ca-creator": "system",
                "ca-external-user-id": "60b01d0fbdc40c0050f226ba",
                "ca-external-account-id": "5fb421b64b3602071b5a538f",
                "ca-scope": "lab",
                "ca-persistent": "false",
                "ca-environment-session-uuid": "aa8c6df6-073f-4219-86a8-7938b42d8437",
                "ca-laboratory-id": "682"
            },
            "ZookeeperConnectString": "z-2.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:2181,z-1.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:2181,z-3.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:2181"
        }
    ]
}

Enter fullscreen mode Exit fullscreen mode

You've added the bin directory with Apache Kafka command-line tools to your Linux shell's PATH variable. This lets you use these tools without typing the full path. The tools are installed here for lab convenience.

In real-world settings, choose the installation location carefully, considering conventions and who will manage the Kafka cluster.

AWS command-line interface to fetch the Amazon MSK cluster's broker addresses

[ec2-user@ip-10-0-0-79 ~]$ export PATH="$PATH:/home/ec2-user/kafka/bin"
[ec2-user@ip-10-0-0-79 ~]$ CLUSTER_ARN=$(aws kafka list-clusters --query "ClusterInfoList[0].ClusterArn" --output text)
[ec2-user@ip-10-0-0-79 ~]$ echo $CLUSTER_ARN
arn:aws:kafka:us-west-2:610080016601:cluster/MSKCluster/59748c30-487e-4911-b4ab-84723ba21999-13
Enter fullscreen mode Exit fullscreen mode

Commands to fetch Cluster Information status and verify it's "ACTIVE" before proceeding:

[ec2-user@ip-10-0-0-79 ~]$ CLUSTER_STATE=$(aws kafka list-clusters --query 'ClusterInfoList[0].State' | tr -d '"')
[ec2-user@ip-10-0-0-79 ~]$ while [ $CLUSTER_STATE != "ACTIVE" ]; do
>   echo $CLUSTER_STATE
>   sleep 10
>   CLUSTER_STATE=$(aws kafka list-clusters --query 'ClusterInfoList[0].State' | tr -d '"')
> done
[ec2-user@ip-10-0-0-79 ~]$ echo $CLUSTER_STATE
ACTIVE
Enter fullscreen mode Exit fullscreen mode

Collecting get-bootstrap-brokers information:

[ec2-user@ip-10-0-0-79 ~]$ aws kafka get-bootstrap-brokers --cluster-arn $CLUSTER_ARN
{
    "BootstrapBrokerString": "b-3.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9092,b-1.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9092,b-2.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9092",
    "BootstrapBrokerStringTls": "b-3.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9094,b-1.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9094,b-2.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9094"
}
[ec2-user@ip-10-0-0-79 ~]$ 
Enter fullscreen mode Exit fullscreen mode

5.1 Create a Topic

[ec2-user@ip-10-0-0-79 ~]$ kafka-topics.sh --list --bootstrap-server $BROKER_STRING
__amazon_msk_canary
__consumer_offsets
aggregated_data
raw_data
Enter fullscreen mode Exit fullscreen mode

You will see the raw unquoted broker string displayed.

Next, you will use the command-line tools and the broker string to list and create topics in the Amazon MSK cluster.

Find below command to create a topic with replication factor of 2

kafka-topics.sh --create \
  --bootstrap-server $BROKER_STRING \
  --topic raw_data \
  --partitions 1 \
  --replication-factor 2
Enter fullscreen mode Exit fullscreen mode

The Replication Factor determines the number of brokers across which a topic and its partitions are duplicated.

The replication factor should be more than one to ensure that the topic data remains accessible, even if a broker goes offline.

Use the following command to list your active topics:

[ec2-user@ip-10-0-0-79 ~]$ kafka-topics.sh --list --bootstrap-server $BROKER_STRING
__amazon_msk_canary
__consumer_offsets
raw_data
Enter fullscreen mode Exit fullscreen mode

I will use Faust 1.10.4 Python package for stream processing and ingesting data to 1 topic.

Faust is a Python library designed for creating event-streaming applications using Apache Kafka, drawing inspiration from Kafka's official Java stream processing library.

pip install faust==1.10.4
Enter fullscreen mode Exit fullscreen mode

Please choose any desired IDE tool to add the below Python script. In my case, I have used Theia-IDE for convenience.

Python file name: windowed_raw_data

from datetime import datetime, timedelta
from time import time
import random
import faust
import os

class RawModel(faust.Record):
    date: datetime
    value: float


class AggModel(faust.Record):
    date: datetime
    count: int
    mean: float

TOPIC = 'raw_data'
TABLE = 'tumbling_table'
BROKER_STRING = os.environ['BROKER_STRING'].replace(',', ';')
KAFKA = 'kafka://' + BROKER_STRING
CLEANUP_INTERVAL = 1.0
WINDOW = 10
WINDOW_EXPIRES = 1
PARTITIONS = 1

app = faust.App('windowed-agg', broker=KAFKA, version=1, topic_partitions=PARTITIONS)

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)
sink = app.topic(SINK, value_type=AggModel)

def window_processor(key, events):
    timestamp = key[1][0]
    values = [event.value for event in events]
    count = len(values)
    mean = sum(values) / count

    print(
        f'processing window:'
        f'{len(values)} events,'
        f'mean: {mean:.2f},'
        f'timestamp {timestamp}',
    )

    sink.send_soon(value=AggModel(date=timestamp, count=count, mean=mean))

tumbling_table = (
    app.Table(
        TABLE,
        default=list,
        partitions=PARTITIONS,
        on_window_close=window_processor,
    )
    .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
    .relative_to_field(RawModel.date)
)

@app.agent(source)
async def print_windowed_events(stream):
    async for event in stream:
        value_list = tumbling_table['events'].value()
        value_list.append(event)
        tumbling_table['events'] = value_list

@app.timer(0.1)
async def produce():
    await source.send(value=RawModel(value=random.random(), date=int(time())))


if __name__ == '__main__':
    app.main()

Enter fullscreen mode Exit fullscreen mode

This code snippet highlights a few key points:

  • A function named produce is designed to dispatch a random value ranging from zero to one to the raw_data topic.
    • In real-world scenarios, this function would handle actual data streams like page views, clicks, transactions, etc.
    • It generates a random value for the value attribute and a Unix timestamp for the date attribute of your RawModel.
  • The function is adorned with a timer decorator and utilizes the async keyword.
    • This enables it to run at regular intervals and concurrently with other asynchronous functions.

To create a topic for the table in your Faust script in the Amazon MSK cluster, in the Linux shell, enter the following command:

kafka-topics.sh --create \
  --bootstrap-server $BROKER_STRING \
  --topic windowed-agg-tumbling_table-changelog \
  --replication-factor 2 \
  --partitions 1

Enter fullscreen mode Exit fullscreen mode

To run your script, in the Linux shell, enter the following command:

[ec2-user@ip-10-0-0-79 ~]$ python3 windowed_raw_data.py worker

┌ƒaµS† v1.10.4┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ id          │ windowed-agg                                                                                                                                                                                                                             │
│ transport   │ [URL('kafka://b-1.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9092'), URL('kafka://b-3.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9092'), URL('kafka://b-2.mskcluster.xo9fsh.c13.kafka.us-west-2.amazonaws.com:9092')]  │
│ store       │ memory:                                                                                                                                                                                                                                  │
│ web         │ http://localhost:6066/                                                                                                                                                                                                                   │
│ log         │ -stderr- (warn)                                                                                                                                                                                                                          │
│ pid         │ 21260                                                                                                                                                                                                                                    │
│ hostname    │ ip-10-0-0-79.us-west-2.compute.internal                                                                                                                                                                                                  │
│ platform    │ CPython 3.7.16 (Linux x86_64)                                                                                                                                                                                                            │
│ drivers     │                                                                                                                                                                                                                                          │
│   transport │ aiokafka=1.1.6                                                                                                                                                                                                                           │
│   web       │ aiohttp=3.8.6                                                                                                                                                                                                                            │
│ datadir     │ /home/ec2-user/windowed-agg-data                                                                                                                                                                                                         │
│ appdir      │ /home/ec2-user/windowed-agg-data/v1                                                                                                                                                                                                      │
└─────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
starting➢ 😊
[2024-02-12 09:44:30,078] [21260] [WARNING] processing window:56 events,mean: 0.48,timestamp 1707731060.0 
[2024-02-12 09:44:40,084] [21260] [WARNING] processing window:98 events,mean: 0.53,timestamp 1707731070.0 
[2024-02-12 09:44:50,090] [21260] [WARNING] processing window:99 events,mean: 0.45,timestamp 1707731080.0 
[2024-02-12 09:45:00,097] [21260] [WARNING] processing window:98 events,mean: 0.53,timestamp 1707731090.0 
Enter fullscreen mode Exit fullscreen mode

And...IT'S A WRAP!, During this small demo you:

  • Established a new cluster configuration for an Amazon MSK cluster
  • Set up Apache Kafka command-line tools on an EC2 instance
  • Utilized these tools to initiate topics within the Amazon MSK cluster.
  • Additionally, you developed a Python script to generate events, employing the Faust stream processing library designed for Apache Kafka.

You now start playing with different features, even testing the struggles after upgrade Kafka from one version to another!

Happy Learning!

Top comments (1)

Collapse
 
divya828 profile image
divya828 • Edited

What is the authentication mechanism used by the faust client in python application to authenticate itself to Kafka cluster via IAM?
I am trying to use kafka-python library to stream messages and authenticate the application using aws_msk_iam_sasl_signer but I'am unable to connect the complete flow for this.