Build a Distributed Streaming System with Apache Kafka and Python

Mwaleh Muturi

What is Apache Kafka?

Kafka is an open source distributed streaming platform that simplifies data integration between systems. A stream is a pipeline to which your applications receives data continuously. As a streaming platform Kafka has two primary uses:

  • Data Integration: Kafka captures streams of events or data changes and feeds these to other data systems such as relational databases, key-value stores or data warehouses.
  • Stream processing: Kafka accepts each stream of events and stores it in an append-only queue called a log. Information in the log is immutable hence enables continuous, real-time processing and transformation of these streams and makes the results available system-wide.

Compared to other technologies, Kafka has a better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large-scale message processing applications.

Kafka system has three main components:

  1. A Producer: The service that emits the source data.
  2. A Broker: Kafka acts as an intermediary between the producer and the consumer. It uses the power of API's to get and broadcast data
  3. A Consumer: The service that uses the data which the broker will broadcast.

You can find more info on Kafka's Official site

Project

We are going to build a simple streaming application that streams a video file from our producer and displays it in a web browser. This project aims to showcase data integration and stream processing properties of Kafka.

Project requirements:

This project introduces the basics of Kafka and messaging. Basic knowledge of Python is needed.

Installing Kafka

  • Install Kafka on Mac by typing brew install kafka

    • After installation run brew services start kafka
  • For Linux user follow installation instruction from here.
  • Kafka runs on port 9092 by default

Setting up:

Our project will consist of:

  • A video - as our source of data. I recommend using a .mp4 file that's less than 5mb for this project.
  • A simple producer that sends video images to Kafka
  • A consumer to fetch data and display on a web browser
  • Kafka as a broker

Create a project directory :

 $ mkdir kafka  &&  cd kafka

Create a virtualenv and activate it :

$ virtualenv env && source env/bin/activate

Install required dependencies : We need to install Flask and opencv

pip install kafka-python opencv-python Flask

Creating the Producer

A producer is a service that sends messages to the Kafka broker. One thing to note is, the producer is not concerned with the various systems that will eventually consume or load the broadcast data.

Let's create it

Create a producer.py file and add this code.

# producer.py

import time
import cv2
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

Creating the Message:

The message will consist of images sent in the binary form. OpenCV enables us to read our movie file and convert it into bytes before sending it to Kafka. We need to create a function that will take in the video file, read the file and convert it into bytes before sending it to Kafka. For this tutorial, place the video file in the same folder as the producer.

Sending the message:

Kafka messages are in byte string format, therefore images need encoding before sending.

Here is the full producer code.

# producer.py

import time
import cv2
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

def video_emitter(video):
    # Open the video
    video = cv2.VideoCapture(video)
    print(' emitting.....')

    # read the file
    while (video.isOpened):
        # read the image in each frame
        success, image = video.read()
        # check if the file has read to the end
        if not success:
            break
        # convert the image png
        ret, jpeg = cv2.imencode('.png', image)
        # Convert the image to bytes and send to kafka
        producer.send_messages(topic, jpeg.tobytes())
        # To reduce CPU usage create sleep time of 0.2sec  
        time.sleep(0.2)
    # clear the capture
    video.release()
    print('done emitting')

if __name__ == '__main__':
    video_emitter('video.mp4')

Great !! we are done with the producer.

Creating the Consumer

The consumer is a service that listens and consumes messages from Kafka brokers. Our consumer should listen to messages bearing the topic my-topic and display them. We shall use Flask - A Python microframework to display the received video images.

Continuous Listening:

Updates and new messages from the broker are fetched by continuously listening to what is broadcast. A generator is used to keep the connection open. A generator is a loop that produces results sequentially instead of a single value. Since images are streamed sequentially, our response will use multipart/x-mixed-replace mime type.

Here is the consumer.py code.

from flask import Flask, Response
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', group_id='view' bootstrap_servers=['0.0.0.0:9092'])
#Continuously listen to the connection and print messages as recieved
app = Flask(__name__)

@app.route('/')
def index():
    # return a multipart response
    return Response(kafkastream(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')
def kafkastream():
    for msg in consumer:
        yield (b'--frame\r\n'
               b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

Running the program

Make sure Kafka is working by running brew services start kafka.

Next, open two terminals.

  • In the first terminal run the Producer.
    • Open a terminal and type:
  (env)$ python producer.py

  • In the second terminal run the Consumer.
  (env)$ python consumer.py

This runs our flask web server.

Next, open your browser and navigate to http://0.0.0.0:5000,

Observations

  1. Browser reloading does not restart the video. Kafka uses message offset to know how far in the log the consumer had read.
  2. If the browser is closed while the video is playing the next time you reopen the browser and navigate to the link, the video picks from where it left.
  3. The producer does not need to be running for the video to play. Kafka persists the message and avails it when the consumer is ready to receive the message.
  4. When both producer and consumer are running the images are received almost in real-time.
  5. The video processing is sequential.
  6. Message-sharing reduces the number of times a producer needs to send the images.

Where to use Kafka:

  • Micro-services: Kafka is the best conduit for the various services that need to continuously communicate asynchronously with each other.
  • Databases: One way of avoiding dumping whole databases in data warehouses is creating Kafka producers and consumers that detect and save only changes made to the databases.
  • Data ingestion: producers embedded on websites collect click events or page views in real-time.
  • Sensor and device data
  • Stock ticker

Conclusion :

Kafka is a fast, scalable and easy to use distributed streaming system. To use the system you need to know:

  • The topic your producer will publish to the brokers,
  • Or the topic your consumer will listen to when the broker publishes information.

We created a simple streaming application demonstrating the advantages of streaming data, how fast it is and how Kafka works as a broker.

Hope you have a picture of how a streaming system work.

Mwaleh Muturi

A coding enthusiast with a wild curiosity on how things work. loves exploring and explaining stuff.