Skip to content

Commit

Permalink
HBASE-19616 Review of LogCleaner Class
Browse files Browse the repository at this point in the history
  • Loading branch information
saintstack committed Feb 5, 2019
1 parent 5f8bdd5 commit af92322
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -36,7 +37,9 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
Expand All @@ -45,7 +48,7 @@
*/
@InterfaceAudience.Private
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName());
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);

public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
Expand All @@ -55,16 +58,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
@VisibleForTesting
static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;

public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC =
"hbase.oldwals.cleaner.thread.check.interval.msec";
@VisibleForTesting
static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L;


private final LinkedBlockingQueue<CleanerContext> pendingDelete;
private List<Thread> oldWALsCleaner;
private long cleanerThreadTimeoutMsec;
private long cleanerThreadCheckIntervalMsec;

/**
* @param period the period of time to sleep between each run
Expand All @@ -81,8 +77,6 @@ public LogCleaner(final int period, final Stoppable stopper, Configuration conf,
this.oldWALsCleaner = createOldWalsCleaner(size);
this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
}

@Override
Expand All @@ -97,35 +91,33 @@ public void onConfigurationChange(Configuration conf) {

int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
if (newSize == oldWALsCleaner.size()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Size from configuration is the same as previous which is " +
newSize + ", no need to update.");
}
LOG.debug("Size from configuration is the same as previous which "
+ "is {}, no need to update.", newSize);
return;
}
interruptOldWALsCleaner();
oldWALsCleaner = createOldWalsCleaner(newSize);
cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
}

@Override
protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
List<CleanerContext> results = new LinkedList<>();
for (FileStatus toDelete : filesToDelete) {
CleanerContext context = CleanerContext.createCleanerContext(toDelete,
cleanerThreadTimeoutMsec);
if (context != null) {
pendingDelete.add(context);
results.add(context);
List<CleanerContext> results = new ArrayList<>();
for (FileStatus file : filesToDelete) {
LOG.trace("Scheduling file {} for deletion", file);
if (file != null) {
results.add(new CleanerContext(file));
}
}

LOG.debug("Old WAL files pending deletion: {}", results);
pendingDelete.addAll(results);

int deletedFiles = 0;
for (CleanerContext res : results) {
deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0;
LOG.trace("Awaiting the results for deletion of old WAL file: {}", res);
deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0;
}
return deletedFiles;
}
Expand All @@ -146,13 +138,8 @@ long getCleanerThreadTimeoutMsec() {
return cleanerThreadTimeoutMsec;
}

@VisibleForTesting
long getCleanerThreadCheckIntervalMsec() {
return cleanerThreadCheckIntervalMsec;
}

private List<Thread> createOldWalsCleaner(int size) {
LOG.info("Creating OldWALs cleaners with size=" + size);
LOG.info("Creating {} OldWALs cleaner threads", size);

List<Thread> oldWALsCleaner = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand All @@ -167,102 +154,86 @@ private List<Thread> createOldWalsCleaner(int size) {

private void interruptOldWALsCleaner() {
for (Thread cleaner : oldWALsCleaner) {
LOG.trace("Interrupting thread: {}", cleaner);
cleaner.interrupt();
}
oldWALsCleaner.clear();
}

private void deleteFile() {
while (true) {
CleanerContext context = null;
boolean succeed = false;
boolean interrupted = false;
try {
context = pendingDelete.take();
if (context != null) {
FileStatus toClean = context.getTargetToClean();
succeed = this.fs.delete(toClean.getPath(), false);
}
} catch (InterruptedException ite) {
// It's most likely from configuration changing request
if (context != null) {
LOG.warn("Interrupted while cleaning oldWALs " +
context.getTargetToClean() + ", try to clean it next round.");
}
interrupted = true;
} catch (IOException e) {
// fs.delete() fails.
LOG.warn("Failed to clean oldwals with exception: " + e);
succeed = false;
} finally {
if (context != null) {
final CleanerContext context = pendingDelete.take();
Preconditions.checkNotNull(context);
FileStatus oldWalFile = context.getTargetToClean();
try {
LOG.debug("Attempting to delete old WAL file: {}", oldWalFile);
boolean succeed = this.fs.delete(oldWalFile.getPath(), false);
context.setResult(succeed);
} catch (IOException e) {
// fs.delete() fails.
LOG.warn("Failed to clean old WAL file", e);
context.setResult(false);
}
if (interrupted) {
// Restore interrupt status
Thread.currentThread().interrupt();
break;
}
} catch (InterruptedException ite) {
// It is most likely from configuration changing request
LOG.warn("Interrupted while cleaning old WALs, will "
+ "try to clean it next round. Exiting.");
// Restore interrupt status
Thread.currentThread().interrupt();
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Exiting cleaner.");
LOG.debug("Exiting");
}
}

@Override
public synchronized void cancel(boolean mayInterruptIfRunning) {
LOG.debug("Cancelling LogCleaner");
super.cancel(mayInterruptIfRunning);
for (Thread t : oldWALsCleaner) {
t.interrupt();
}
interruptOldWALsCleaner();
}

private static final class CleanerContext {

final FileStatus target;
volatile boolean result;
volatile boolean setFromCleaner = false;
long timeoutMsec;

static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) {
return status != null ? new CleanerContext(status, timeoutMsec) : null;
}
final AtomicBoolean result;
final CountDownLatch remainingResults;

private CleanerContext(FileStatus status, long timeoutMsec) {
private CleanerContext(FileStatus status) {
this.target = status;
this.result = false;
this.timeoutMsec = timeoutMsec;
this.result = new AtomicBoolean(false);
this.remainingResults = new CountDownLatch(1);
}

synchronized void setResult(boolean res) {
this.result = res;
this.setFromCleaner = true;
notify();
void setResult(boolean res) {
this.result.set(res);
this.remainingResults.countDown();
}

synchronized boolean getResult(long waitIfNotFinished) {
long totalTimeMsec = 0;
boolean getResult(long waitIfNotFinished) {
try {
while (!setFromCleaner) {
long startTimeNanos = System.nanoTime();
wait(waitIfNotFinished);
totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
TimeUnit.NANOSECONDS);
if (totalTimeMsec >= timeoutMsec) {
LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target);
return result;
}
boolean completed = this.remainingResults.await(waitIfNotFinished,
TimeUnit.MILLISECONDS);
if (!completed) {
LOG.warn("Spend too much time [{}ms] to delete old WAL file: {}",
waitIfNotFinished, target);
return false;
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting deletion of " + target);
return result;
LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target);
return false;
}
return result;
return result.get();
}

FileStatus getTargetToClean() {
return target;
}

@Override
public String toString() {
return "CleanerContext [target=" + target + ", result=" + result + "]";
}
}
}
Loading

0 comments on commit af92322

Please sign in to comment.