-
Notifications
You must be signed in to change notification settings - Fork 1
/
message_workflow_v2_test.py
137 lines (122 loc) · 4.44 KB
/
message_workflow_v2_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import unittest
import time
from pathlib import Path
import random
from messagebus_kafka.admin import AdminApi
from messagebus_kafka.consumer_v2 import Consumer
from messagebus_kafka.producer_v2 import Producer
from threading import Thread
class MyConsumer(Consumer):
def __init__(
self,
conf: dict,
value_schema_str: str,
topics: str,
key_schema_str: str = None,
batch_size: int = 5,
logger=None,
):
super().__init__(
conf, value_schema_str, topics, key_schema_str, batch_size, logger
)
self.received_message = None
def handle_message(self, topic: str, key, value, headers: dict):
self.received_message = value
self.log_debug("Message received for topic " + topic)
self.log_debug("Key = {}".format(key))
self.log_debug("Value = {}".format(value))
self.log_debug("Headers = {}".format(headers))
class MyProducer(Producer):
def __init__(
self,
conf,
value_schema_str: str,
key_schema_str: str = None,
logger=None,
**kwargs,
):
super().__init__(conf, value_schema_str, key_schema_str, logger, **kwargs)
self.error = None
def delivery_report(self, err, msg, obj=None):
if err is not None:
self.error = err
print("MyProducer: Error {}".format(err))
else:
print(
"MyProducer: Successfully produced to {} [{}] at offset {}".format(
msg.topic(), msg.partition(), msg.offset()
)
)
class MessageBusTest(unittest.TestCase):
def __init__(self, methodName="runTest"):
super().__init__(methodName)
self.username = "username"
self.password = "password"
self.schema_registry_url = "http://localhost:8081"
self.broker = "localhost:9092"
self.script_location = Path(__file__).absolute().parent.parent
self.conf = {
"bootstrap.servers": self.broker,
}
# adminApi
self.api = self._get_api()
# create topics
self.topic_test_1 = f"dev-python-messagebus-test{random.randint(1, 10)}"
self.topic_test_2 = f"dev-python-messagebus-test{random.randint(11, 20)}"
self.topics = [self.topic_test_1, self.topic_test_2]
self.api.create_topics(self.topics)
# create key and value schema
self.val_schema = self._get_val_schema()
self.producer = self._get_producer()
self.consumer = self._get_consumer()
def test_workflow(self):
consume_thread = Thread(target=self.consumer.consume_auto, daemon=True)
consume_thread.start()
produce_result = self.producer.produce_async(
self.topic_test_2,
{"name": "Johny", "age": 29},
)
print("producer's produce_async result", produce_result)
self.assertTrue(produce_result)
while self.consumer.received_message is None:
time.sleep(1)
self.consumer.shutdown()
consume_thread.join()
print("consumer's received_message", self.consumer.received_message)
self.assertEqual(self.consumer.received_message["name"], "Johny")
self.assertEqual(self.consumer.received_message["age"], 29)
# delete topics
self.api.delete_topics(self.topics)
def _get_producer(self) -> MyProducer:
return MyProducer(
{
**self.conf,
**{
"schema.registry.url": self.schema_registry_url,
# "on_delivery": on_delivery_callback, # you can use this item to catch the produce_sync callback
},
},
self.val_schema,
)
def _get_consumer(self) -> MyConsumer:
return MyConsumer(
{
**self.conf,
**{
"auto.offset.reset": "earliest",
"group.id": "default",
"schema.registry.url": self.schema_registry_url,
},
},
self.val_schema,
[self.topic_test_2]
)
def _get_api(self) -> AdminApi:
api = AdminApi(self.conf)
for a in api.list_topics():
print("Topic {}".format(a))
return api
def _get_val_schema(self) -> str:
with open(f"{self.script_location}/schemas/johny_schema.avsc", "r") as f:
val_schema = f.read()
return val_schema