Apache Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. This tutorial will guide you through the basics of using Kafka with Python, including setting up a Kafka broker, creating a producer, and building a consumer.
1. Install Required Libraries
You need to install the confluent_kafka
library to interact with Kafka in Python. This library provides both producer and consumer functionalities.
pip install confluent_kafka
2. Setting Up Kafka
Before using Kafka with Python, you need to have Kafka and Zookeeper running. Follow these steps:
2.1 Download Kafka
Download the latest version of Kafka from the Kafka downloads page.
2.2 Start Zookeeper
Zookeeper is required for Kafka to manage distributed brokers. Start Zookeeper using the provided script:
bin/zookeeper-server-start.sh config/zookeeper.properties
2.3 Start Kafka Broker
Once Zookeeper is running, start the Kafka broker:
bin/kafka-server-start.sh config/server.properties
3. Create a Kafka Topic
Before producing or consuming messages, you need to create a topic. Use the following command to create a topic:
bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
4. Kafka Producer Example
To produce messages to a Kafka topic, use the Producer
class from the confluent_kafka
library:
from confluent_kafka import Producer
# Callback function to handle delivery reports
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Create a Kafka Producer instance
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# Produce a message
producer.produce('test_topic', key='key', value='Hello, Kafka!', callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery reports to be received
producer.flush()
5. Kafka Consumer Example
To consume messages from a Kafka topic, use the Consumer
class from the confluent_kafka
library:
from confluent_kafka import Consumer, KafkaException
# Create a Kafka Consumer instance
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
})
# Subscribe to the topic
consumer.subscribe(['test_topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
print(f"Received message: {msg.value().decode('utf-8')} from topic {msg.topic()}")
finally:
consumer.close()
6. Summary
This tutorial covered the basics of using Kafka with Python. You learned how to set up Kafka, create a topic, and use Python to produce and consume messages. Kafka is a powerful tool for building real-time data applications, and Python provides an easy way to integrate with it using the confluent_kafka
library.