Skip to content

Commit

Permalink
Address edge case with tabix offsets and partitioned files (#317)
Browse files Browse the repository at this point in the history
* Do not read orphaned block end

Signed-off-by: Karen Feng <[email protected]>

* Clean up

Signed-off-by: Karen Feng <[email protected]>

* Try to manually scalafmt

Signed-off-by: Karen Feng <[email protected]>

* scalafmt

Signed-off-by: Karen Feng <[email protected]>

* Add extra check that partitioned file contains a header

Signed-off-by: Karen Feng <[email protected]>

* syntax

Signed-off-by: Karen Feng <[email protected]>

* Address comments

Signed-off-by: Karen Feng <[email protected]>

* Revive dropped comment

Signed-off-by: Karen Feng <[email protected]>
  • Loading branch information
karenfeng committed Dec 21, 2020
1 parent 8af47ee commit e3711cb
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 11 deletions.
45 changes: 34 additions & 11 deletions core/src/main/scala/io/projectglow/vcf/TabixIndexHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,21 @@ object TabixIndexHelper extends GlowLogging {
* If a check fails generates appropriate message
* Otherwise generates block range by querying tabix index based
* on the filteredSimpleInterval produces by makeFilteredInterval.
*
* Returns a block range if the partitioned file contains the start position of a gzip
* block that overlaps the filtered interval (based on the tabix index). The block
* range excludes any part of the file before the start of the first desired block,
* or after the end of the last desired block.
*
* No block range will be returned if:
* - The partitioned file does not include any of the desired blocks' start positions
* - The partitioned file ends before the start of the first desired block
* - The partitioned file starts after the start of the last desired block
*
* This is designed based on the following invariants:
* - Spark's files are partitioned arbitrarily
* - The bgzip codec requires that a block starts within the partitioned file
* - The bgzip codec reads an entire block, whether or not it fits within the partition
*/
def getFileRangeToRead(
hadoopFs: FileSystem,
Expand Down Expand Up @@ -482,17 +497,25 @@ object TabixIndexHelper extends GlowLogging {
if (offsetList.isEmpty) {
None
} else {
val (minOverBlocks, maxOverBlocks) = offsetList
.foldLeft((offsetList(0).getStartPosition, offsetList(0).getEndPosition)) {
case ((myStart, myEnd), e) =>
(Math.min(myStart, e.getStartPosition), Math.max(myEnd, e.getEndPosition))
}
val blockRangeStart = Math.max(file.start, minOverBlocks >> 16) // Shift 16 bits to get
// file offset of the bin from bgzipped virtual file offset.
val blockRangeEnd = Math.min(file.start + file.length, (maxOverBlocks >> 16) + 0xFFFF)
// 0xFFFF is the maximum possible length of an uncompressed bin.
if (blockRangeStart <= blockRangeEnd) {
Some((blockRangeStart, blockRangeEnd))
// Flag used to avoid returning any files without a BGZIP header
var fileContainsBlockStart = false
// Shift 16 bits to get file offset of the bin from bgzipped virtual file offset.
val (firstStart, firstEnd) =
(offsetList(0).getStartPosition >> 16, offsetList(0).getEndPosition >> 16)
val (minOverBlocks, maxOverBlocks) = offsetList.foldLeft((firstStart, firstEnd)) {
case ((aStart, aEnd), o) =>
val bStart = o.getStartPosition >> 16
val bEnd = o.getEndPosition >> 16
if (!fileContainsBlockStart) {
fileContainsBlockStart = (file.start <= bStart) && (file.start + file.length >= bStart)
}
(Math.min(aStart, bStart), Math.max(aEnd, bEnd))
}
val blockRangeStart = Math.max(file.start, minOverBlocks)
val blockRangeEnd = Math.min(file.start + file.length, maxOverBlocks)
if (blockRangeStart <= blockRangeEnd && fileContainsBlockStart) {
// 0xFFFF is the maximum possible length of an uncompressed bin
Some((blockRangeStart, Math.min(file.start + file.length, blockRangeEnd + 0xFFFF)))
} else {
None
}
Expand Down
80 changes: 80 additions & 0 deletions core/src/test/scala/io/projectglow/vcf/TabixHelperSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType

class TabixHelperSuite extends GlowBaseTest with GlowLogging {

Expand All @@ -34,6 +35,7 @@ class TabixHelperSuite extends GlowBaseTest with GlowLogging {
lazy val multiAllelicVcf = s"$tabixTestVcf/combined.chr20_18210071_18210093.g.vcf.gz"
lazy val testNoTbiVcf = s"$tabixTestVcf/NA12878_21_10002403NoTbi.vcf.gz"
lazy val oneRowGzipVcf = s"$testDataHome/vcf/1row_not_bgz.vcf.gz"
lazy val testMultiBlockVcf = s"$tabixTestVcf/CEUTrio.HiSeq.WGS.b37.NA12878.20.21.vcf.gz"

def printFilterContig(filterContig: FilterContig): Unit = {
filterContig.getContigName.foreach(i => logger.debug(s"$i"))
Expand Down Expand Up @@ -808,4 +810,82 @@ class TabixHelperSuite extends GlowBaseTest with GlowLogging {
.getFileRangeToRead(fs, partitionedFile, conf, false, false, interval)
.isEmpty)
}

test("Overlapping partitions and tabix offsets") {
val path = new Path(testMultiBlockVcf)
val conf = sparkContext.hadoopConfiguration
val fs = path.getFileSystem(conf)
val interval = Some(SimpleInterval("20", 10132301, 10214079))
// Tabix offsets for this interval is (1005005266,1005032301) -> (15335,15335)

// Partition before offset start
val p1 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 0, 15334)
val r1 = TabixIndexHelper.getFileRangeToRead(fs, p1, conf, true, true, interval)
assert(r1.isEmpty)

// Partition overlapping with offset
val p2 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 15330, 1000)
val r2 = TabixIndexHelper.getFileRangeToRead(fs, p2, conf, true, true, interval)
assert(r2 == Some(15335, 16330))

// Partition starts within 0xFFFF of offset
val p3 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 15340, 1000)
val r3 = TabixIndexHelper.getFileRangeToRead(fs, p3, conf, true, true, interval)
assert(r3.isEmpty)

// Partition after offset
val p4 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 20000, 1000)
val r4 = TabixIndexHelper.getFileRangeToRead(fs, p4, conf, true, true, interval)
assert(r4.isEmpty)

// Do not exceed file start/end
val p5 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 15330, 65635)
val r5 = TabixIndexHelper.getFileRangeToRead(fs, p5, conf, true, true, interval)
assert(r5 == Some(15335, 15335 + 0xFFFF))
}

test("Check if partition includes BGZF block start") {
val path = new Path(testMultiBlockVcf)
val conf = sparkContext.hadoopConfiguration
val fs = path.getFileSystem(conf)
val interval = Some(SimpleInterval("20", 1, Int.MaxValue))

val p1 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 0, 100)
val r1 = TabixIndexHelper.getFileRangeToRead(fs, p1, conf, true, true, interval)
assert(r1 == Some(0, 100))

val p2 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 100, 200)
val r2 = TabixIndexHelper.getFileRangeToRead(fs, p2, conf, true, true, interval)
assert(r2.isEmpty)
}

test("Small partitions") {
val sess = spark.newSession()
sess.conf.set("spark.sql.files.maxPartitionBytes", "100")
val rows = sess
.read
.format(sourceName)
.load(testMultiBlockVcf)
.filter("contigName = '20' and start > 10012714 and end < 10014990")
assert(rows.count() == 3)
}

test("Get all of BGZIP block even if file partition ends partway through") {
val schema = spark.read.format(sourceName).load(testMultiBlockVcf).schema
val format = new VCFFileFormat()
val reader = format.buildReader(
spark,
dataSchema = schema,
partitionSchema = StructType(Nil),
requiredSchema = schema,
filters = Seq(EqualTo("contigName", "20")),
options = Map.empty,
hadoopConf = spark.sessionState.newHadoopConf()
)
val p1 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 0, 100)
val p2 = PartitionedFile(InternalRow.empty, testMultiBlockVcf, 0, 15335)
val allRowsSize = reader(p2).size
assert(allRowsSize == 280) // Sanity check
assert(reader(p1).size == allRowsSize)
}
}

0 comments on commit e3711cb

Please sign in to comment.