Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6624; Prevent concurrent log flush and log deletion #4663

Merged
merged 2 commits into from
Mar 13, 2018

Conversation

lindong28
Copy link
Member

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma ijuma requested a review from junrao March 8, 2018 16:56
@junrao
Copy link
Contributor

junrao commented Mar 8, 2018

@lindong28 : Thanks for the patch. The fix has the correct logic. I am not sure about the performance impact though. In particular, Log.append() also synchronizes on the lock. With this change, it means that any log flushing will block the producer, which is not ideal.

@junrao
Copy link
Contributor

junrao commented Mar 12, 2018

@lindong28 : Since we are going to do another RC for 1.1, do you think you could provide a new patch in the next day or two so that we can include it in 1.1.? cc @dguy

@lindong28
Copy link
Member Author

@junrao Sorry for late reply. Certainly, I will provide a new patch today.

Just to double check, we would like to use a flag to avoid blocking produce/fetch when the flush is going on, right?

@lindong28
Copy link
Member Author

I have been pretty much focusing on the other bug related to the negative hw last week.

@junrao
Copy link
Contributor

junrao commented Mar 12, 2018

@lindong28 : Thanks. Having a separate flag is one way to avoid blocking the publishers during flushing. But there may be other ways too.

@lindong28
Copy link
Member Author

@junrao I have updated the patch so that log is only deleted fileDeleteDelayMs. This approach enforces the existing concept of fileDeleteDelayMs for log deletion uniformly. fileDeleteDelayMs is 60 seconds by default. Suppose log.flush for a given log takes less than 60 seconds, this approach should prevent concurrent log flush and log deletion.

if (removedLog != null) {
try {
val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
if (waitingTimeMs > 0)
Thread.sleep(waitingTimeMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be more than one entry in logsToBeDeleted.
Can we pick an entry whose wait time is short so that the efficiency of the while loop is higher ?

Copy link
Member Author

@lindong28 lindong28 Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedyu Thanks for the comment. This is a FIFO queue and each log should be deleted with the same delay. So the first element in the queue should always have the smallest wait time.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 : Thanks for the updated patch. One more comment below.

if (removedLog != null) {
try {
val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if you look at the code in line 726. We already have the logic of delaying the deletion by currentDefaultConfig.fileDeleteDelayMs. So, it's kind of weird to delay it again here.

I am also not sure if this completely solves the problem since people may customize fileDeleteDelayMs. It would be useful to have a solution regardless of fileDeleteDelayMs and the flush time.

Copy link
Member Author

@lindong28 lindong28 Mar 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thanks for the comment Jun. The line 726 determines when broker tries to delete files next time. It is still possible that a file is scheduled for deletion right before the next task is run. And then the file will be deleted immediately instead of waiting for file.delete.delay.ms. So it may be reasonable to make this change to correctly enforce file.delete.delay.ms according to its Java doc.

Yeah the delay does not completely solve the problem. In the rare cases that log.flush takes more than 60 seconds, broker will still see IOException. In order to prevent this completely, we can either use a flag to avoid IOException, or grab the lock when flushing the log. If we have time, I would recommend we do the performance test with the lock-based approach -- in general we need to do such performance benchmark for each major release anyway. If we don't have time, and we think it is important to completely prevent this (after we have the 60 seconds delay), I will implement the flag-based approach. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 : Thanks for the explanation. It makes sense. In the interest of time, we can just take what you had for now and work on a more complete fix in a separate jira.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 : Thanks for the explanation. It makes sense. In the interest of time, we can just take what you had for now and work on a more complete fix in a separate jira. A few minor comments below.

@@ -75,7 +75,7 @@ class LogManager(logDirs: Seq[File],
// from one log directory to another log directory on the same broker. The directory of the future log will be renamed
// to replace the current log of the partition after the future log catches up with the current log
private val futureLogs = new Pool[TopicPartition, Log]()
private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment on what's the long value in the queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. It is fixed now.

@@ -260,7 +260,7 @@ class LogManager(logDirs: Seq[File],
logDirFailureChannel = logDirFailureChannel)

if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
this.logsToBeDeleted.add(log)
this.logsToBeDeleted.add((log, time.milliseconds()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a private method that does the above statement and reuse?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. It is fixed now.

if (removedLog != null) {
try {
val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 : Thanks for the explanation. It makes sense. In the interest of time, we can just take what you had for now and work on a more complete fix in a separate jira.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 : Thanks for the patch. LGTM

@junrao junrao merged commit 1ea07b9 into apache:trunk Mar 13, 2018
junrao pushed a commit that referenced this pull request Mar 13, 2018
KAFKA-6624; Prevent concurrent log flush and log deletion

Reviewers: Ted Yu <[email protected]>, Jun Rao <[email protected]>
@junrao
Copy link
Contributor

junrao commented Mar 13, 2018

Merged to trunk and 1.1 branch. CC @dguy

@lindong28 lindong28 deleted the KAFKA-6624 branch March 14, 2018 19:13
isolis pushed a commit to linkedin/kafka that referenced this pull request Sep 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants