Skip to content

Commit

Permalink
Do not read contig and filter header lines on VCF ingest (#189)
Browse files Browse the repository at this point in the history
* Do not read contig and filter header lines on VCF ingest

Signed-off-by: Karen Feng <[email protected]>

* Cleanup

Signed-off-by: Karen Feng <[email protected]>
  • Loading branch information
karenfeng committed Apr 14, 2020
1 parent 28b8068 commit 505cbf7
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 22 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/io/projectglow/vcf/VCFFileFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ private[vcf] object SchemaDelegate {
val infoHeaderLines = ArrayBuffer[VCFInfoHeaderLine]()
val formatHeaderLines = ArrayBuffer[VCFFormatHeaderLine]()
VCFHeaderUtils
.readHeaderLines(spark, files.map(_.getPath.toString))
.readHeaderLines(spark, files.map(_.getPath.toString), getNonSchemaHeaderLines = false)
.foreach {
case i: VCFInfoHeaderLine => infoHeaderLines += i
case f: VCFFormatHeaderLine => formatHeaderLines += f
Expand Down
40 changes: 30 additions & 10 deletions core/src/main/scala/io/projectglow/vcf/VCFHeaderUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,30 @@ object VCFHeaderUtils extends GlowLogging {
}

/**
* Find the unique header lines from an RDD of VCF headers.
* Find the unique desired header lines from an RDD of VCF headers.
* If lines of the same class found with the same ID, we pick one unless they are incompatible.
* If there are incompatible lines, [[IllegalArgumentException]] is thrown.
* Incompatible lines are:
* - FORMAT or INFO lines with the same ID but different types or counts
* - contig lines with the same ID but different lengths
*
* @param headers VCF headers to parse header lines from
* @param getNonSchemaHeaderLines If false, parses only INFO and FORMAT lines.
* If true, also parses contig and filter lines.
*/
def getUniqueHeaderLines(headers: RDD[VCFHeader]): Seq[VCFHeaderLine] = {
def getUniqueHeaderLines(
headers: RDD[VCFHeader],
getNonSchemaHeaderLines: Boolean): Seq[VCFHeaderLine] = {
headers.flatMap { header =>
val infoHeaderLines = header.getInfoHeaderLines.asScala
val formatHeaderLines = header.getFormatHeaderLines.asScala
val contigHeaderLines = header.getContigLines.asScala
val filterHeaderLines = header.getFilterLines.asScala
infoHeaderLines ++ formatHeaderLines ++ contigHeaderLines ++ filterHeaderLines
val schemaHeaderLines = header.getInfoHeaderLines.asScala ++ header
.getFormatHeaderLines
.asScala
val nonSchemaHeaderLines = if (getNonSchemaHeaderLines) {
header.getContigLines.asScala ++ header.getFilterLines.asScala
} else {
Seq.empty
}
schemaHeaderLines ++ nonSchemaHeaderLines
}.keyBy(line => (line.getClass.getName, line.getID))
.reduceByKey {
case (line1: VCFCompoundHeaderLine, line2: VCFCompoundHeaderLine) =>
Expand All @@ -138,16 +148,26 @@ object VCFHeaderUtils extends GlowLogging {
s"and $line2. Header lines with the same ID must have the same length.")
}
case (line1: VCFFilterHeaderLine, _: VCFFilterHeaderLine) => line1
case (line1, _) =>
throw new IllegalArgumentException(
s"Collected unexpected header line type: ${line1.getClass.getName}")
}
.values
.collect()
}

/**
* A convenience function to parse the headers from a set of VCF files and return the unique
* A convenience function to parse the headers from a set of VCF files and return the desired
* header lines.
*
* @param files VCF files to parse header lines from
* @param getNonSchemaHeaderLines If false, parses only INFO and FORMAT lines.
* If true, also parses contig and filter lines.
*/
def readHeaderLines(spark: SparkSession, files: Seq[String]): Seq[VCFHeaderLine] = {
getUniqueHeaderLines(createHeaderRDD(spark, files))
def readHeaderLines(
spark: SparkSession,
files: Seq[String],
getNonSchemaHeaderLines: Boolean): Seq[VCFHeaderLine] = {
getUniqueHeaderLines(createHeaderRDD(spark, files), getNonSchemaHeaderLines)
}
}
11 changes: 11 additions & 0 deletions core/src/test/scala/io/projectglow/vcf/VCFDatasourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,17 @@ class VCFDatasourceSuite extends GlowBaseTest {
test("Do not break when reading directory with index files") {
spark.read.format(sourceName).load(s"$testDataHome/tabix-test-vcf")
}

test("Do not break when reading VCFs with contig lines missing length") {
// Read two copies of the same file to trigger a header line merge
// May break if we parse contig header lines missing length
spark
.read
.format(sourceName)
.load(
s"$testDataHome/vcf/missing_contig_length.vcf",
s"$testDataHome/vcf/missing_contig_length.vcf")
}
}

// For testing only: schema based on CEUTrio VCF header
Expand Down
30 changes: 19 additions & 11 deletions core/src/test/scala/io/projectglow/vcf/VCFHeaderUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class VCFHeaderUtilsSuite extends GlowBaseTest {
}
}

test("merge header lines") {
gridTest("merge all header lines")(Seq(true, false)) { getNonSchemaHeaderLines =>
val file1 =
s"""
|##fileformat=VCFv4.2
Expand All @@ -155,28 +155,35 @@ class VCFHeaderUtilsSuite extends GlowBaseTest {
|##contig=<ID=21,length=48129895>
""".stripMargin
val paths = writeVCFHeaders(Seq(file1, file2))
val lines = VCFHeaderUtils.readHeaderLines(spark, paths)
val lines = VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines)

val expectedLines = Set(
val expectedSchemaLines = Set(
new VCFInfoHeaderLine("animal", 1, VCFHeaderLineType.String, "monkey"),
new VCFInfoHeaderLine("color", VCFHeaderLineCount.G, VCFHeaderLineType.String, ""),
new VCFFormatHeaderLine("AD", VCFHeaderLineCount.R, VCFHeaderLineType.Integer, ""),
new VCFFormatHeaderLine("DP", 1, VCFHeaderLineType.Integer, ""),
new VCFFilterHeaderLine("LowQual", "Low Quality"),
new VCFContigHeaderLine("<ID=20,length=63025520>", VCFHeaderVersion.VCF4_2, "contig", 0),
new VCFContigHeaderLine("<ID=21,length=48129895>", VCFHeaderVersion.VCF4_2, "contig", 1)
new VCFFormatHeaderLine("DP", 1, VCFHeaderLineType.Integer, "")
)
val expectedNonSchemaLines = if (getNonSchemaHeaderLines) {
Set(
new VCFFilterHeaderLine("LowQual", "Low Quality"),
new VCFContigHeaderLine("<ID=20,length=63025520>", VCFHeaderVersion.VCF4_2, "contig", 0),
new VCFContigHeaderLine("<ID=21,length=48129895>", VCFHeaderVersion.VCF4_2, "contig", 1)
)
} else {
Set.empty
}

// We compare the string-encoded versions of the header lines to avoid direct object comparisons
val sortedLines = lines.map(_.toString).toSet
val sortedExpectedLines = expectedLines.map(_.toString)
val sortedExpectedLines = (expectedSchemaLines ++ expectedNonSchemaLines).map(_.toString)
assert(lines.size == sortedLines.size)
assert(sortedLines == sortedExpectedLines)
}

def checkLinesIncompatible(file1: String, file2: String): Unit = {
val paths = writeVCFHeaders(Seq(file1, file2))
val ex = intercept[SparkException](VCFHeaderUtils.readHeaderLines(spark, paths))
val ex = intercept[SparkException](
VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines = true))
assert(ex.getCause.isInstanceOf[IllegalArgumentException])
}

Expand Down Expand Up @@ -226,15 +233,16 @@ class VCFHeaderUtilsSuite extends GlowBaseTest {
|##FORMAT=<ID=animal,Number=2,Type=String,Description="monkey">
""".stripMargin
val paths = writeVCFHeaders(Seq(file1, file2))
VCFHeaderUtils.readHeaderLines(spark, paths) // no exception
VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines = true) // no exception
}

test("does not try to read tabix indices") {
assert(
VCFHeaderUtils
.readHeaderLines(
spark,
Seq(s"$testDataHome/tabix-test-vcf/NA12878_21_10002403NoTbi.vcf.gz.tbi"))
Seq(s"$testDataHome/tabix-test-vcf/NA12878_21_10002403NoTbi.vcf.gz.tbi"),
getNonSchemaHeaderLines = true)
.isEmpty)
}
}
7 changes: 7 additions & 0 deletions test-data/vcf/missing_contig_length.vcf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
##fileformat=VCFv4.2
##FORMAT=<ID=AD,Number=1,Type=Integer,Description="Allelic depths for the ref and alt alleles in theorder listed">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Approximate read depth; some reads may have beenfiltered">
##contig=<ID=1>
##contig=<ID=2>
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SAMPLE1
1 1 a A G . . . AD 1,2

0 comments on commit 505cbf7

Please sign in to comment.