Skip to content

kata-ai/messagebus-kafka-python

Repository files navigation

Kafka Message Bus

image

codecov

Overview

Requirements

  • Python 3.6+
  • confluent-kafka[avro]

Dev Requirements

docker-compose up -d

pip install pipenv
pipenv install
pipenv shell

Documentaion

  • Configuration properties in librdkafka
  • Confluent producer configuration in here
  • Confluent consumer configuration in here
  • Python confluent kafka client in here

Installation

pip3 install .

Usage

This package implements the interface for producer/consumer APIs to push/read messages to/from Kafka via AvroSerializer.

Examples

Producers and Consumer V1

The example is available in this test

  • Producer implementation
 class MyProducer(Producer):

    # kafka delivery callback handler
    def delivery_report(self, err, msg, obj=None):
        if err is not None:
            # error handler
            # code here
            pass 
        else:
            # success handler
            # code here
            pass
  • Consumer implementation
class MyConsumer(Consumer):

    # message handler overrider
    def handle_message(self, topic: str, key, value):
        # code here
        pass
  • Produce a message
producer = MyProducer(
    {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081"
    },
    "<string(json string) avro schema of value>",
)
produce_result = producer.produce_async(
    "test_topic",
    {"name": "Johny", "age": 29},
)
  • Consume a message
consumer = MyConsumer(
    {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081"
        "auto.offset.reset": "earliest",
        "group.id": "default",
    },
    "<string(json string) avro schema of value>",
    "test_topic",
)
consume_thread = Thread(target=consumer.consume_auto, daemon=True)
consume_thread.start()
consumer.shutdown()
consume_thread.join()

Producer and Consumers V2

The example is available in this test

  • Producer implementation
 class MyProducer(Producer):

    # kafka delivery callback handler
    def delivery_report(self, err, msg, obj=None):
        if err is not None:
            # error handler
            # code here
            pass 
        else:
            # success handler
            # code here
            pass
  • Consumer implementation
class MyConsumer(Consumer):

    # message handler overrider
    def handle_message(self, topic: str, key, value, headers: dict):
        # code here
        pass
  • Produce a message
producer = MyProducer(
    {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081"
    },
    "<string(json string) avro schema of value>",
    "<string(json string) avro schema of key>",
)
produce_result = producer.produce_async(
    "test_topic",
    {"name": "Johny", "age": 29},
    key="<UUID>" # optional (when use custom key schema)
)
  • Consume a message
consumer = MyConsumer(
    {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081"
        "auto.offset.reset": "earliest",
        "group.id": "default",
    },
    "<string(json string) avro schema of value>",
    "test_topic",
    "<string(json string) avro schema of key>",
)
consume_thread = Thread(target=consumer.consume_auto, daemon=True)
consume_thread.start()
consumer.shutdown()
consume_thread.join()

Testing

tox -e py

# or 

cd messagebus
pytest -v -rPx

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages