From 505cbf74bfe27431d323e4bb225417f7b77cb653 Mon Sep 17 00:00:00 2001 From: Karen Feng Date: Tue, 14 Apr 2020 11:46:32 -0700 Subject: [PATCH] Do not read contig and filter header lines on VCF ingest (#189) * Do not read contig and filter header lines on VCF ingest Signed-off-by: Karen Feng * Cleanup Signed-off-by: Karen Feng --- .../io/projectglow/vcf/VCFFileFormat.scala | 2 +- .../io/projectglow/vcf/VCFHeaderUtils.scala | 40 ++++++++++++++----- .../projectglow/vcf/VCFDatasourceSuite.scala | 11 +++++ .../projectglow/vcf/VCFHeaderUtilsSuite.scala | 30 +++++++++----- test-data/vcf/missing_contig_length.vcf | 7 ++++ 5 files changed, 68 insertions(+), 22 deletions(-) create mode 100644 test-data/vcf/missing_contig_length.vcf diff --git a/core/src/main/scala/io/projectglow/vcf/VCFFileFormat.scala b/core/src/main/scala/io/projectglow/vcf/VCFFileFormat.scala index 767cabe61..f326a2c4f 100644 --- a/core/src/main/scala/io/projectglow/vcf/VCFFileFormat.scala +++ b/core/src/main/scala/io/projectglow/vcf/VCFFileFormat.scala @@ -480,7 +480,7 @@ private[vcf] object SchemaDelegate { val infoHeaderLines = ArrayBuffer[VCFInfoHeaderLine]() val formatHeaderLines = ArrayBuffer[VCFFormatHeaderLine]() VCFHeaderUtils - .readHeaderLines(spark, files.map(_.getPath.toString)) + .readHeaderLines(spark, files.map(_.getPath.toString), getNonSchemaHeaderLines = false) .foreach { case i: VCFInfoHeaderLine => infoHeaderLines += i case f: VCFFormatHeaderLine => formatHeaderLines += f diff --git a/core/src/main/scala/io/projectglow/vcf/VCFHeaderUtils.scala b/core/src/main/scala/io/projectglow/vcf/VCFHeaderUtils.scala index b466a2c30..923110713 100644 --- a/core/src/main/scala/io/projectglow/vcf/VCFHeaderUtils.scala +++ b/core/src/main/scala/io/projectglow/vcf/VCFHeaderUtils.scala @@ -103,20 +103,30 @@ object VCFHeaderUtils extends GlowLogging { } /** - * Find the unique header lines from an RDD of VCF headers. + * Find the unique desired header lines from an RDD of VCF headers. * If lines of the same class found with the same ID, we pick one unless they are incompatible. * If there are incompatible lines, [[IllegalArgumentException]] is thrown. * Incompatible lines are: * - FORMAT or INFO lines with the same ID but different types or counts * - contig lines with the same ID but different lengths + * + * @param headers VCF headers to parse header lines from + * @param getNonSchemaHeaderLines If false, parses only INFO and FORMAT lines. + * If true, also parses contig and filter lines. */ - def getUniqueHeaderLines(headers: RDD[VCFHeader]): Seq[VCFHeaderLine] = { + def getUniqueHeaderLines( + headers: RDD[VCFHeader], + getNonSchemaHeaderLines: Boolean): Seq[VCFHeaderLine] = { headers.flatMap { header => - val infoHeaderLines = header.getInfoHeaderLines.asScala - val formatHeaderLines = header.getFormatHeaderLines.asScala - val contigHeaderLines = header.getContigLines.asScala - val filterHeaderLines = header.getFilterLines.asScala - infoHeaderLines ++ formatHeaderLines ++ contigHeaderLines ++ filterHeaderLines + val schemaHeaderLines = header.getInfoHeaderLines.asScala ++ header + .getFormatHeaderLines + .asScala + val nonSchemaHeaderLines = if (getNonSchemaHeaderLines) { + header.getContigLines.asScala ++ header.getFilterLines.asScala + } else { + Seq.empty + } + schemaHeaderLines ++ nonSchemaHeaderLines }.keyBy(line => (line.getClass.getName, line.getID)) .reduceByKey { case (line1: VCFCompoundHeaderLine, line2: VCFCompoundHeaderLine) => @@ -138,16 +148,26 @@ object VCFHeaderUtils extends GlowLogging { s"and $line2. Header lines with the same ID must have the same length.") } case (line1: VCFFilterHeaderLine, _: VCFFilterHeaderLine) => line1 + case (line1, _) => + throw new IllegalArgumentException( + s"Collected unexpected header line type: ${line1.getClass.getName}") } .values .collect() } /** - * A convenience function to parse the headers from a set of VCF files and return the unique + * A convenience function to parse the headers from a set of VCF files and return the desired * header lines. + * + * @param files VCF files to parse header lines from + * @param getNonSchemaHeaderLines If false, parses only INFO and FORMAT lines. + * If true, also parses contig and filter lines. */ - def readHeaderLines(spark: SparkSession, files: Seq[String]): Seq[VCFHeaderLine] = { - getUniqueHeaderLines(createHeaderRDD(spark, files)) + def readHeaderLines( + spark: SparkSession, + files: Seq[String], + getNonSchemaHeaderLines: Boolean): Seq[VCFHeaderLine] = { + getUniqueHeaderLines(createHeaderRDD(spark, files), getNonSchemaHeaderLines) } } diff --git a/core/src/test/scala/io/projectglow/vcf/VCFDatasourceSuite.scala b/core/src/test/scala/io/projectglow/vcf/VCFDatasourceSuite.scala index 844966b28..04b811e92 100644 --- a/core/src/test/scala/io/projectglow/vcf/VCFDatasourceSuite.scala +++ b/core/src/test/scala/io/projectglow/vcf/VCFDatasourceSuite.scala @@ -685,6 +685,17 @@ class VCFDatasourceSuite extends GlowBaseTest { test("Do not break when reading directory with index files") { spark.read.format(sourceName).load(s"$testDataHome/tabix-test-vcf") } + + test("Do not break when reading VCFs with contig lines missing length") { + // Read two copies of the same file to trigger a header line merge + // May break if we parse contig header lines missing length + spark + .read + .format(sourceName) + .load( + s"$testDataHome/vcf/missing_contig_length.vcf", + s"$testDataHome/vcf/missing_contig_length.vcf") + } } // For testing only: schema based on CEUTrio VCF header diff --git a/core/src/test/scala/io/projectglow/vcf/VCFHeaderUtilsSuite.scala b/core/src/test/scala/io/projectglow/vcf/VCFHeaderUtilsSuite.scala index ad4e4bf91..30e2dcea6 100644 --- a/core/src/test/scala/io/projectglow/vcf/VCFHeaderUtilsSuite.scala +++ b/core/src/test/scala/io/projectglow/vcf/VCFHeaderUtilsSuite.scala @@ -135,7 +135,7 @@ class VCFHeaderUtilsSuite extends GlowBaseTest { } } - test("merge header lines") { + gridTest("merge all header lines")(Seq(true, false)) { getNonSchemaHeaderLines => val file1 = s""" |##fileformat=VCFv4.2 @@ -155,28 +155,35 @@ class VCFHeaderUtilsSuite extends GlowBaseTest { |##contig= """.stripMargin val paths = writeVCFHeaders(Seq(file1, file2)) - val lines = VCFHeaderUtils.readHeaderLines(spark, paths) + val lines = VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines) - val expectedLines = Set( + val expectedSchemaLines = Set( new VCFInfoHeaderLine("animal", 1, VCFHeaderLineType.String, "monkey"), new VCFInfoHeaderLine("color", VCFHeaderLineCount.G, VCFHeaderLineType.String, ""), new VCFFormatHeaderLine("AD", VCFHeaderLineCount.R, VCFHeaderLineType.Integer, ""), - new VCFFormatHeaderLine("DP", 1, VCFHeaderLineType.Integer, ""), - new VCFFilterHeaderLine("LowQual", "Low Quality"), - new VCFContigHeaderLine("", VCFHeaderVersion.VCF4_2, "contig", 0), - new VCFContigHeaderLine("", VCFHeaderVersion.VCF4_2, "contig", 1) + new VCFFormatHeaderLine("DP", 1, VCFHeaderLineType.Integer, "") ) + val expectedNonSchemaLines = if (getNonSchemaHeaderLines) { + Set( + new VCFFilterHeaderLine("LowQual", "Low Quality"), + new VCFContigHeaderLine("", VCFHeaderVersion.VCF4_2, "contig", 0), + new VCFContigHeaderLine("", VCFHeaderVersion.VCF4_2, "contig", 1) + ) + } else { + Set.empty + } // We compare the string-encoded versions of the header lines to avoid direct object comparisons val sortedLines = lines.map(_.toString).toSet - val sortedExpectedLines = expectedLines.map(_.toString) + val sortedExpectedLines = (expectedSchemaLines ++ expectedNonSchemaLines).map(_.toString) assert(lines.size == sortedLines.size) assert(sortedLines == sortedExpectedLines) } def checkLinesIncompatible(file1: String, file2: String): Unit = { val paths = writeVCFHeaders(Seq(file1, file2)) - val ex = intercept[SparkException](VCFHeaderUtils.readHeaderLines(spark, paths)) + val ex = intercept[SparkException]( + VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines = true)) assert(ex.getCause.isInstanceOf[IllegalArgumentException]) } @@ -226,7 +233,7 @@ class VCFHeaderUtilsSuite extends GlowBaseTest { |##FORMAT= """.stripMargin val paths = writeVCFHeaders(Seq(file1, file2)) - VCFHeaderUtils.readHeaderLines(spark, paths) // no exception + VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines = true) // no exception } test("does not try to read tabix indices") { @@ -234,7 +241,8 @@ class VCFHeaderUtilsSuite extends GlowBaseTest { VCFHeaderUtils .readHeaderLines( spark, - Seq(s"$testDataHome/tabix-test-vcf/NA12878_21_10002403NoTbi.vcf.gz.tbi")) + Seq(s"$testDataHome/tabix-test-vcf/NA12878_21_10002403NoTbi.vcf.gz.tbi"), + getNonSchemaHeaderLines = true) .isEmpty) } } diff --git a/test-data/vcf/missing_contig_length.vcf b/test-data/vcf/missing_contig_length.vcf new file mode 100644 index 000000000..726e14993 --- /dev/null +++ b/test-data/vcf/missing_contig_length.vcf @@ -0,0 +1,7 @@ +##fileformat=VCFv4.2 +##FORMAT= +##INFO= +##contig= +##contig= +#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SAMPLE1 +1 1 a A G . . . AD 1,2