Combining computer vision and real-time data processing creates lots of interesting opportunities. Imagine a world where cameras, sensors, and intelligent algorithms work together to interpret and respond to the visual world as it unfolds, instantaneously. This is real-time computer vision, a field where the ability to capture and process visual data with lightning speed opens up a myriad of possibilities across industries.
This article explains how you can generate and process computer vision events in real-time using Pipeless and Kafka. Pipeless is an open-source computer vision framework to build and deploy apps in minutes. Kafka is a popular OSS distributed event streaming platform.
If you prefer playing with the code than reading, you can go directly to the ready-to-run example in this doc section. It contains everything you need to get started, including a
docker-compose.yaml to create a Kafka cluster locally and step by step instructions.
Previous articles explained how to create a Pipeless project, load a model to identify cats on a video and draw bounding boxes over the video. That's cool, but not really useful. Today, you will learn how to connect Pipeless with Kafka, so instead of drawing bounding boxes over the input video, you will be able to react to events in real-time, in this case, the event will be a cat appearing on the video.
In case you missed the previous articles, you can find them below:
For this demonstration you will process a local video file, which is not very practical for real world applications. Future tutorials will cover how to use Pipeless to process video streams from remote URLs and RTMP/RTSP flows.
The following schema represents the architecture of what you will deploy:
In short, you will take an input video stream, analyze it with Pipeless to identify what appears on the video and export events to a Kafka topic. Those events can be consumed to take any required actions depending on the application.
As an example from a real-world scenario, consider a potato processing plant where a camera continuously monitors the incoming potatoes. If a potato with an incorrect color appears, the system will trigger an air flow mechanism to eject the problematic potato from the production line. Or you can also envision a restaurant tracking the time patrons spend at their tables. Here the events would be people arriving at or leaving a table.
From a high level point of view, you can see Kafka as a distributed system that is split into
topics. You can post messages to topics (produce), and you can consume messages from topics.
Pipeless provides a plugin to easily send messages to a Kafka topic. It takes care of configuring the Kafka client so you just need to provide some environment variables. The Kafka plugin is included in the
pipeless-ai-plugins package that you can install with
pip. Connecting to Kafka is not a complex task, indeed, using the Kafka plugin is not mandatory, it is just a wrapper around the Kafka producer client to make things even easier, but nothing stops you from using the Kafka client directly instead.
You can find the whole documentation about the Kafka plugin in this doc section.
Let's re-create the cats application to, instead of drawing bounding boxes, detect events and send them to Kafka.
You can find the whole application ready-to-run, including step by step instructions and all the required resources such as the cats detection model and a
docker-compose.yaml with a Kafka cluster in this doc section.
Please clone that repo and move to the
examples/kafka directory to easily follow the next sections.
Find the Pipeless installation instructions and requirements here.
This particular scenario does not require a video output. The original example was modifying the video frames on the fly and producing a new video as our expected output. However, now we are interested in the events not on the video itself, so we will modify the project configuration to disable the output video.
The following is the whole content of the configuration file (
input: address: host: localhost port: 1234 video: enable: true uri: file:///home/path/pipeless/examples/kafka/cats.mp4 log_level: INFO output: video: enable: false worker: n_workers: 1
IMPORTANT: Remember to edit the
uriof the input video to set the absolute path to your local directory
For reference, the following are the differences with the original cats example:
output: - address: - host: localhost - port: 1237 - video: - enable: true - uri: file:///home/example/path/pipeless/examples/cats/cats-output.mp4 + enable: false
As you can see, we also removed the output
address section since we will not have an output component, instead, we will send the events to Kafka directly from our
To load the Kafka plugin we just need to import it from the plugins package and include the following line in the Pipeless
from pipeless_ai_plugins.kafka import KafkaProducer ... def before(self): self.producer = KafkaProducer() ...
Once we have the plugin loaded, we will be sending messages to our Kafka topic on the
process hook with the following line:
producer.produce('pipeless', 'There is a cat!')
The following is the whole code of the new app (
from pipeless_ai.lib.app.app import PipelessApp from pipeless_ai_plugins.kafka import KafkaProducer import cv2 class App(PipelessApp): def before(self): self.producer = KafkaProducer() self.xml_data = cv2.CascadeClassifier('cats.xml') def process(self, frame): model = self.xml_data # Create reduced frame for faster detection original_height, original_width, _ = frame.shape aspect_ratio = original_width / original_height reduced_width = 600 reduced_height = int(reduced_width / aspect_ratio) reduced_frame = cv2.resize(frame, (reduced_width, reduced_height)) bounding_boxes = model.detectMultiScale(reduced_frame, minSize = (30, 30)) # Notify that there is a cat if len(bounding_boxes) > 0: self.producer.produce('pipeless', 'There is a cat!')
Note that the only differences with the original cats example are the following:
def process(self, frame): ... -# Draw the bounding boxes over the original frame -for box in bounding_boxes: - a, b, width, height = box - # Recalculate bounding box for the original image - a = int(a * (original_width / reduced_width)) - b = int(b * (original_height / reduced_height)) - width = int(width * (original_width / reduced_width)) - height = int(height * (original_height / reduced_height)) - cv2.rectangle(frame, (a, b), (a + width, b + height), (255, 0, 255), 2) +# Notify that there is a cat +if len(bounding_boxes) > 0: + this.producer.produce('pipeless', 'There is a cat!')
Finally, let's configure the Kafka plugin with our cluster address. It is as simple as exporting an environment variable:
And that's all you need!
Now let's start a local Kafka using the
docker-compose.yaml file provided on the
examples/kafka directory. We won't go into details here since it is out of the scope of this article, just run the following command from the example directory:
docker compose up
And let's run pipeless to start processing our video and sending events:
The commands on this section must be executed within the Kafka container. Exec into the container by running:
docker compose exec kafka bash
docker-compose.yaml included configures Kafka to automatically create topics. You can verify the
pipeless topic was created when writing to it for the first time by running:
kafka-topics.sh --list --bootstrap-server localhost:9094
The code of the example only sends information to Kafka, it does not consume from the topic, thus the topic still contains all the information we have sent to it. It is your task to listen for messages on the Kafka topics and take actions based on those messages. This is out of the Pipeless scope since each application has its own requirements on what to do with the events. Let's run a consumer to verify the information is arriving to the topic:
kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic pipeless --from-beginning
Ctrl + Cto stop the consumer.
Now, it is your time to complete your application by consuming messages from the Kafka topic, process that information and take any required actions. A simple example could be to send you a notification when there is a cat on the video and the time at which it appeared.
Finally, if you feel confident, feel free to contribute to the codebase via GitHub pull requests!