Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3968: fsync the parent directory of a segment file when the file is created #10405

Merged
merged 27 commits into from
Apr 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
459facb
[KAFKA-3968] fsync the parent directory of a segment file when the fi…
ccding Mar 25, 2021
b4c8284
move import
ccding Mar 26, 2021
b260169
address comments (except the topic partition one)
ccding Mar 29, 2021
ba086e9
remove import
ccding Mar 29, 2021
2a19e0e
reuse the function in utils.java
ccding Mar 30, 2021
40a1abe
simplify logic
ccding Mar 30, 2021
1ac80b6
default changeFileSuffixes flush to true
ccding Mar 30, 2021
09cac0b
flush when mkdirs
ccding Mar 30, 2021
5be95aa
revert accidential change
ccding Mar 30, 2021
c9448c8
atomicMoveWithFallback
ccding Mar 30, 2021
daeb698
no flush parent dir in test
ccding Mar 30, 2021
0d4800b
check null pointer
ccding Mar 31, 2021
95a6c3f
fix unit test error
ccding Mar 31, 2021
8c859f3
set flag after flush
ccding Apr 1, 2021
fdc1faa
disable flushing on renameTo
ccding Apr 1, 2021
6795ec9
address comments based on offline discussion with Jun
ccding Apr 1, 2021
55ae3bc
Merge branch 'trunk' into fsync
ccding Apr 1, 2021
e653af4
check hadCleanShutdown for open FileRecord
ccding Apr 1, 2021
85861ee
address comments
ccding Apr 1, 2021
1578678
fix default values
ccding Apr 1, 2021
fffc353
more default value
ccding Apr 1, 2021
f66c545
do flush in the LogSegment class
ccding Apr 2, 2021
56be9d8
Merge branch 'trunk' into fsync
ccding Apr 2, 2021
1ecf94b
remove parameter from FileRecord.open
ccding Apr 2, 2021
080a79a
default to false
ccding Apr 2, 2021
61eee4a
add param to javadoc
ccding Apr 2, 2021
7543938
during flush -> during the next flush
ccding Apr 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
do flush in the LogSegment class
  • Loading branch information
ccding committed Apr 2, 2021
commit f66c545aa3d12ca06176bdcc3deff3b2f08b600b
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -51,7 +50,6 @@ public class FileRecords extends AbstractRecords implements Closeable {
private final AtomicInteger size;
private final FileChannel channel;
private volatile File file;
private final AtomicBoolean needsFlushParentDir;

/**
* The {@code FileRecords.open} methods should be used instead of this constructor whenever possible.
Expand All @@ -61,15 +59,13 @@ public class FileRecords extends AbstractRecords implements Closeable {
FileChannel channel,
int start,
int end,
boolean isSlice,
boolean needsFlushParentDir) throws IOException {
boolean isSlice) throws IOException {
this.file = file;
this.channel = channel;
this.start = start;
this.end = end;
this.isSlice = isSlice;
this.size = new AtomicInteger();
this.needsFlushParentDir = new AtomicBoolean(needsFlushParentDir);

if (isSlice) {
// don't check the file size if this is just a slice view
Expand Down Expand Up @@ -140,7 +136,7 @@ public void readInto(ByteBuffer buffer, int position) throws IOException {
public FileRecords slice(int position, int size) throws IOException {
int availableBytes = availableBytes(position, size);
int startPosition = this.start + position;
return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true, false);
return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);
}

/**
Expand Down Expand Up @@ -199,16 +195,12 @@ public int append(MemoryRecords records) throws IOException {
*/
public void flush() throws IOException {
channel.force(true);
if (needsFlushParentDir.getAndSet(false)) {
Utils.flushParentDir(file.toPath());
}
}

/**
* Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
*/
public void flushParentDir() throws IOException {
needsFlushParentDir.set(false);
Utils.flushParentDir(file.toPath());
}

Expand Down Expand Up @@ -258,9 +250,9 @@ public void updateParentDir(File parentDir) {
* Rename the file that backs this message set
* @throws IOException if rename fails.
*/
public void renameTo(File f, boolean needsFlushParentDir) throws IOException {
public void renameTo(File f) throws IOException {
try {
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), needsFlushParentDir);
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
} finally {
this.file = f;
}
Expand Down Expand Up @@ -439,23 +431,22 @@ public static FileRecords open(File file,
boolean mutable,
boolean fileAlreadyExists,
int initFileSize,
boolean preallocate,
boolean needsRecovery) throws IOException {
boolean preallocate) throws IOException {
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
return new FileRecords(file, channel, 0, end, false, mutable && needsRecovery);
return new FileRecords(file, channel, 0, end, false);
}

public static FileRecords open(File file,
boolean fileAlreadyExists,
int initFileSize,
boolean preallocate,
boolean needsRecovery) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems unneeded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True.

return open(file, true, fileAlreadyExists, initFileSize, preallocate, needsRecovery);
return open(file, true, fileAlreadyExists, initFileSize, preallocate);
}

public static FileRecords open(File file, boolean mutable) throws IOException {
return open(file, mutable, false, 0, false, false);
return open(file, mutable, false, 0, false);
}

public static FileRecords open(File file) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testAppendProtectsFromOverflow() throws Exception {
FileChannel fileChannelMock = mock(FileChannel.class);
when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE);

FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false, false);
FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false);
assertThrows(IllegalArgumentException.class, () -> append(records, values));
}

Expand All @@ -100,7 +100,7 @@ public void testOpenOversizeFile() throws Exception {
FileChannel fileChannelMock = mock(FileChannel.class);
when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L);

assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false, false));
assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false));
}

@Test
Expand Down Expand Up @@ -314,7 +314,7 @@ public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException {
when(channelMock.size()).thenReturn(42L);
when(channelMock.position(42L)).thenReturn(null);

FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
fileRecords.truncateTo(42);

verify(channelMock, atLeastOnce()).size();
Expand All @@ -331,7 +331,7 @@ public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOExcepti

when(channelMock.size()).thenReturn(42L);

FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);

try {
fileRecords.truncateTo(43);
Expand All @@ -353,7 +353,7 @@ public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException {
when(channelMock.size()).thenReturn(42L);
when(channelMock.truncate(anyLong())).thenReturn(channelMock);

FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
fileRecords.truncateTo(23);

verify(channelMock, atLeastOnce()).size();
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset:
*
* @throws IOException if rename fails
*/
def renameTo(f: File, needFlushParentDir: Boolean): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir)
def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
finally _file = f
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper:
}
}

def renameTo(f: File, needFlushParentDir: Boolean): Unit = {
def renameTo(f: File): Unit = {
inLock(lock) {
indexWrapper.renameTo(f, needFlushParentDir)
indexWrapper.renameTo(f)
}
}

Expand Down Expand Up @@ -114,7 +114,7 @@ object LazyIndex {

def updateParentDir(f: File): Unit

def renameTo(f: File, needFlushParentDir: Boolean): Unit
def renameTo(f: File): Unit

def deleteIfExists(): Boolean

Expand All @@ -130,8 +130,8 @@ object LazyIndex {

def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)

def renameTo(f: File, needFlushParentDir: Boolean): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir)
def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
catch {
case _: NoSuchFileException if !file.exists => ()
}
Expand All @@ -152,7 +152,7 @@ object LazyIndex {

def updateParentDir(parentDir: File): Unit = index.updateParentDir(parentDir)

def renameTo(f: File, needFlushParentDir: Boolean): Unit = index.renameTo(f, needFlushParentDir)
def renameTo(f: File): Unit = index.renameTo(f)

def deleteIfExists(): Boolean = index.deleteIfExists()

Expand Down
25 changes: 17 additions & 8 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochFileCache
Expand Down Expand Up @@ -59,7 +60,8 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging {
val time: Time,
val needsFlushParentDir: Boolean = false) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the new param to the javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def offsetIndex: OffsetIndex = lazyOffsetIndex.get

Expand Down Expand Up @@ -95,6 +97,9 @@ class LogSegment private[log] (val log: FileRecords,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0

/* whether or not we need to flush the parent dir during flush */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during flush => during the first flush ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. We changed the value of atomicNeedsFlushParentDir after the first flush. The value of needsFlushParentDir in the construction function is for during the first flush. Do you have any suggestions on how to comment on them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to during the next flush.

private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir)

// The timestamp we used for time based log rolling and for ensuring max compaction delay
// volatile for LogCleaner to see the update
@volatile private var rollingBasedTimestamp: Option[Long] = None
Expand Down Expand Up @@ -472,6 +477,9 @@ class LogSegment private[log] (val log: FileRecords,
offsetIndex.flush()
timeIndex.flush()
txnIndex.flush()
// We only need to flush the parent of the log file because all other files share the same parent
if (atomicNeedsFlushParentDir.getAndSet(false))
log.flushParentDir()
}
}

Expand All @@ -490,13 +498,13 @@ class LogSegment private[log] (val log: FileRecords,
* Change the suffix for the index and log files for this log segment
* IOException from this method should be handled by the caller
*/
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = {
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)), false)
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)), false)
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)), false)
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)), false)
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = {
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
// We only need to flush the parent of the log file because all other files share the same parent
if (needFlushParentDir)
if (needsFlushParentDir)
log.flushParentDir()
}

Expand Down Expand Up @@ -671,7 +679,8 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time)
time,
needsRecovery || !fileAlreadyExists)
}

def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/TransactionIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File)
maybeChannel = None
}

def renameTo(f: File, needFlushParentDir: Boolean): Unit = {
def renameTo(f: File): Unit = {
try {
if (file.exists)
Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir)
Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
} finally _file = f
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3022,7 +3022,7 @@ class LogTest {
append(10)
// Kind of a hack, but renaming the index to a directory ensures that the append
// to the index will fail.
log.activeSegment.txnIndex.renameTo(log.dir, false)
log.activeSegment.txnIndex.renameTo(log.dir)
assertThrows(classOf[KafkaStorageException], () => appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1))
assertThrows(classOf[KafkaStorageException], () => log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0))
assertThrows(classOf[KafkaStorageException], () => readLog(log, 0, 4096).records.records.iterator.next().offset)
Expand Down Expand Up @@ -4587,7 +4587,7 @@ class LogTest {

// Kind of a hack, but renaming the index to a directory ensures that the append
// to the index will fail.
log.activeSegment.txnIndex.renameTo(log.dir, false)
log.activeSegment.txnIndex.renameTo(log.dir)

// The append will be written to the log successfully, but the write to the index will fail
assertThrows(classOf[KafkaStorageException],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class TransactionIndexTest {
val renamed = TestUtils.tempFile()
index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2))

index.renameTo(renamed, false)
index.renameTo(renamed)
index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16))

val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public static FileRawSnapshotReader open(Path logDir, OffsetAndEpoch snapshotId)
false, // mutable
true, // fileAlreadyExists
0, // initFileSize
false, // preallocate
false // needsRecovery
false // preallocate
);

return new FileRawSnapshotReader(fileRecords, snapshotId);
Expand Down