Introduction
In today's world, real-time data processing and visualization play a critical role in decision-making for many applications. This article showcases the development of an air quality monitoring platform that integrates Apache Kafka and Couchbase Capella for real-time data capture and management, leveraging Streamlit to deploy a visually engaging interface.
Development
1. Setting up Couchbase Capella
Couchbase Capella is a high-performance NoSQL database ideal for real-time data storage. Follow these steps to configure it:
- Sign up at Couchbase Capella.
- Create a cluster:
Select a deployment region.
-
Configure a bucket named
TopicosAppU3
.- Generate the required credentials:
User: your_user
Password: your_password
Cluster endpoint: your_endpoint
2. Configuring Apache Kafka on Confluent Cloud
Apache Kafka is a distributed messaging system that enables real-time data transmission. To set it up:
- Sign up at Confluent Cloud.
- Create a basic cluster.
- Set up a topic named
calidad-del-aire
. - Generate API credentials:
API Key: your_api_key
API Secret: your_api_secret
Bootstrap server: your_bootstrap_server
3. Project Development
The project uses Python along with the libraries streamlit
, couchbase
, and confluent-kafka
to integrate Couchbase and Kafka.
Dependency Configuration
Ensure you install the necessary dependencies with:
pip install streamlit couchbase confluent-kafka pandas
Code Implementation
The code is structured as follows:
- Connecting to Couchbase:
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from couchbase.auth import PasswordAuthenticator
cluster = Cluster(
"couchbases://your_endpoint",
ClusterOptions(PasswordAuthenticator("your_user", "your_password"))
)
bucket = cluster.bucket("TopicosAppU3")
collection = bucket.default_collection()
- Configuring Kafka:
from confluent_kafka import Producer, Consumer
producer = Producer({
'bootstrap.servers': "your_bootstrap_server",
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': "your_api_key",
'sasl.password': "your_api_secret",
})
consumer = Consumer({
'bootstrap.servers': "your_bootstrap_server",
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': "your_api_key",
'sasl.password': "your_api_secret",
'group.id': 'streamlit-consumer',
'auto.offset.reset': 'earliest',
})
consumer.subscribe(["calidad-del-aire"])
- Streamlit Interface:
import streamlit as st
import random
def generate_data():
return {
"City": random.choice(["Lima", "Arequipa", "Cusco"]),
"PM2.5": round(random.uniform(5, 150), 2),
"PM10": round(random.uniform(10, 300), 2),
"Humidity (%)": random.randint(30, 90),
"Temperature (°C)": round(random.uniform(15, 35), 1),
}
st.title("Air Quality Monitoring Platform")
if st.button("Generate and Save Data"):
data = generate_data()
collection.upsert("last_change", data)
st.json(data)
producer.produce("calidad-del-aire", value=str(data))
producer.flush()
st.success("Data sent to Kafka.")
if st.button("Consume Kafka Data"):
message = consumer.poll(1.0)
if message:
st.json(eval(message.value().decode('utf-8')))
else:
st.warning("No messages available.")
4. Deploying with Streamlit Community Cloud
- Publish your code in a GitHub repository.
- Connect it to Streamlit Community Cloud at Streamlit Cloud.
- Configure the required environment variables in the Streamlit settings page:
COUCHBASE_USER=your_user
COUCHBASE_PASSWORD=your_password
KAFKA_BOOTSTRAP_SERVERS=your_bootstrap_server
- Github Repository. Repositorio del codigo en Github
Conclusion
This project demonstrates how to integrate Apache Kafka and Couchbase Capella to capture and visualize real-time data. With Kafka managing data transmission and Couchbase providing persistent storage, the application is scalable, efficient, and easy to implement using Streamlit. This combination is ideal for modern applications requiring real-time data processing and visualization.
Top comments (0)