- Python 3.6+
- confluent-kafka[avro]
docker-compose up -d
pip install pipenv
pipenv install
pipenv shell
- Configuration properties in librdkafka
- Confluent producer configuration in here
- Confluent consumer configuration in here
- Python confluent kafka client in here
pip3 install .
This package implements the interface for producer/consumer APIs to push/read messages to/from Kafka via AvroSerializer.
The example is available in this test
- Producer implementation
class MyProducer(Producer):
# kafka delivery callback handler
def delivery_report(self, err, msg, obj=None):
if err is not None:
# error handler
# code here
pass
else:
# success handler
# code here
pass
- Consumer implementation
class MyConsumer(Consumer):
# message handler overrider
def handle_message(self, topic: str, key, value):
# code here
pass
- Produce a message
producer = MyProducer(
{
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
},
"<string(json string) avro schema of value>",
)
produce_result = producer.produce_async(
"test_topic",
{"name": "Johny", "age": 29},
)
- Consume a message
consumer = MyConsumer(
{
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
"auto.offset.reset": "earliest",
"group.id": "default",
},
"<string(json string) avro schema of value>",
"test_topic",
)
consume_thread = Thread(target=consumer.consume_auto, daemon=True)
consume_thread.start()
consumer.shutdown()
consume_thread.join()
The example is available in this test
- Producer implementation
class MyProducer(Producer):
# kafka delivery callback handler
def delivery_report(self, err, msg, obj=None):
if err is not None:
# error handler
# code here
pass
else:
# success handler
# code here
pass
- Consumer implementation
class MyConsumer(Consumer):
# message handler overrider
def handle_message(self, topic: str, key, value, headers: dict):
# code here
pass
- Produce a message
producer = MyProducer(
{
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
},
"<string(json string) avro schema of value>",
"<string(json string) avro schema of key>",
)
produce_result = producer.produce_async(
"test_topic",
{"name": "Johny", "age": 29},
key="<UUID>" # optional (when use custom key schema)
)
- Consume a message
consumer = MyConsumer(
{
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
"auto.offset.reset": "earliest",
"group.id": "default",
},
"<string(json string) avro schema of value>",
"test_topic",
"<string(json string) avro schema of key>",
)
consume_thread = Thread(target=consumer.consume_auto, daemon=True)
consume_thread.start()
consumer.shutdown()
consume_thread.join()
tox -e py
# or
cd messagebus
pytest -v -rPx