Skip to content

Commit

Permalink
MINOR: Fix StreamsOptimizedTest (#9911)
Browse files Browse the repository at this point in the history
We have seen recent system test timeouts associated with this test.
Analysis revealed an excessive amount of time spent searching
for test conditions in the logs.

This change addresses the issue by dropping some unnecessary
checks and using a more efficient log search mechanism.

Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored Jan 19, 2021
1 parent 4c6f900 commit be88f5a
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions tests/kafkatest/tests/streams/streams_optimized_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import time
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
from kafkatest.services.streams import StreamsResetter
Expand Down Expand Up @@ -65,30 +66,41 @@ def test_upgrade_optimized_topology(self):

processors = [processor1, processor2, processor3]

# produce records continually during the test
self.logger.info("produce records continually during the test")
self.producer.start()

# start all processors unoptimized
self.logger.info("start all processors unoptimized")
for processor in processors:
self.set_topics(processor)
processor.CLEAN_NODE_ENABLED = False
self.verify_running_repartition_topic_count(processor, 4)

self.logger.info("verify unoptimized")
self.verify_processing(processors, verify_individual_operations=False)

self.logger.info("stop unoptimized")
stop_processors(processors, self.stopped_message)

self.logger.info("reset")
self.reset_application()
for processor in processors:
processor.node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + ".1", allow_fail=False)
processor.node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + ".1", allow_fail=False)
processor.node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + ".1", allow_fail=False)
processor.node.account.ssh("mv " + processor.CONFIG_FILE + " " + processor.CONFIG_FILE + ".1", allow_fail=False)

# start again with topology optimized
self.logger.info("start again with topology optimized")
for processor in processors:
processor.OPTIMIZED_CONFIG = 'all'
self.verify_running_repartition_topic_count(processor, 1)

self.logger.info("verify optimized")
self.verify_processing(processors, verify_individual_operations=True)

self.logger.info("stop optimized")
stop_processors(processors, self.stopped_message)

self.logger.info("teardown")
self.producer.stop()
self.kafka.stop()
self.zookeeper.stop()
Expand All @@ -110,34 +122,25 @@ def verify_running_repartition_topic_count(processor, repartition_topic_count):
% repartition_topic_count + str(processor.node.account))

def verify_processing(self, processors, verify_individual_operations):
# This test previously had logic to account for skewed assignments, in which not all processors may
# receive active assignments. I don't think this will happen anymore, but keep an eye out if we see
# test failures here. If that does resurface, note that the prior implementation was not correct.
# A better approach would be to make sure we see processing of each partition across the whole cluster
# instead of just expecting to see each node perform some processing.
for processor in processors:
if not self.all_source_subtopology_tasks(processor):
if verify_individual_operations:
for operation in self.operation_pattern.split('\|'):
self.do_verify(processor, operation)
else:
self.do_verify(processor, self.operation_pattern)
if verify_individual_operations:
for operation in self.operation_pattern.split('\|'):
self.do_verify(processor, operation)
else:
self.logger.info("Skipping processor %s with all source tasks" % processor.node.account)
self.do_verify(processor, self.operation_pattern)

def do_verify(self, processor, pattern):
self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern)
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(pattern,
timeout_sec=60,
err_msg="Never saw processing of %s " % pattern + str(processor.node.account))

def all_source_subtopology_tasks(self, processor):
retries = 0
while retries < 5:
found = list(processor.node.account.ssh_capture("sed -n 's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" % processor.LOG_FILE, allow_fail=True))
self.logger.info("Returned %s from assigned task check" % found)
if len(found) > 0:
return True
retries += 1
time.sleep(1)

return False
self.logger.info(list(processor.node.account.ssh_capture("ls -lh %s" % (processor.STDOUT_FILE), allow_fail=True)))
wait_until(
lambda: processor.node.account.ssh("grep --max-count 1 '%s' %s" % (pattern, processor.STDOUT_FILE), allow_fail=True) == 0,
timeout_sec=60
)

def set_topics(self, processor):
processor.INPUT_TOPIC = self.input_topic
Expand Down

0 comments on commit be88f5a

Please sign in to comment.