October 13, 2024

Kafka Tutorial in Python

Apache Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. This tutorial will guide you through the basics of using Kafka with Python, including setting up a Kafka broker, creating a producer, and building a consumer.

1. Install Required Libraries

You need to install the confluent_kafka library to interact with Kafka in Python. This library provides both producer and consumer functionalities.

pip install confluent_kafka

2. Setting Up Kafka

Before using Kafka with Python, you need to have Kafka and Zookeeper running. Follow these steps:

2.1 Download Kafka

Download the latest version of Kafka from the Kafka downloads page.

2.2 Start Zookeeper

Zookeeper is required for Kafka to manage distributed brokers. Start Zookeeper using the provided script:

bin/zookeeper-server-start.sh config/zookeeper.properties

2.3 Start Kafka Broker

Once Zookeeper is running, start the Kafka broker:

bin/kafka-server-start.sh config/server.properties

3. Create a Kafka Topic

Before producing or consuming messages, you need to create a topic. Use the following command to create a topic:

bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

4. Kafka Producer Example

To produce messages to a Kafka topic, use the Producer class from the confluent_kafka library:

from confluent_kafka import Producer

# Callback function to handle delivery reports
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

# Create a Kafka Producer instance
producer = Producer({'bootstrap.servers': 'localhost:9092'})

# Produce a message
producer.produce('test_topic', key='key', value='Hello, Kafka!', callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery reports to be received
producer.flush()

5. Kafka Consumer Example

To consume messages from a Kafka topic, use the Consumer class from the confluent_kafka library:

from confluent_kafka import Consumer, KafkaException

# Create a Kafka Consumer instance
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest'
})

# Subscribe to the topic
consumer.subscribe(['test_topic'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        print(f"Received message: {msg.value().decode('utf-8')} from topic {msg.topic()}")
finally:
    consumer.close()

6. Summary

This tutorial covered the basics of using Kafka with Python. You learned how to set up Kafka, create a topic, and use Python to produce and consume messages. Kafka is a powerful tool for building real-time data applications, and Python provides an easy way to integrate with it using the confluent_kafka library.