Skip to content

Commit

Permalink
Turn on fast reader by default (#307)
Browse files Browse the repository at this point in the history
* option for fast reader

Signed-off-by: Henry D <[email protected]>

* fix . problem

Signed-off-by: Henry D <[email protected]>

* turn on by default

Signed-off-by: Henry D <[email protected]>

* Fix stringnecy; add test; add doc

Signed-off-by: Henry D <[email protected]>
  • Loading branch information
henrydavidge committed Nov 16, 2020
1 parent 63981a8 commit 09ce16a
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 32 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/io/projectglow/sql/GlowConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ object GlowConf {
.buildConf("io.projectglow.vcf.fastReaderEnabled")
.doc("Use fast VCF reader")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)
}
11 changes: 5 additions & 6 deletions core/src/main/scala/io/projectglow/vcf/VCFHeaderUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@ package io.projectglow.vcf

import java.io.StringReader

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.common.annotations.VisibleForTesting
import htsjdk.variant.vcf.{VCFCodec, VCFCompoundHeaderLine, VCFContigHeaderLine, VCFFilterHeaderLine, VCFHeader, VCFHeaderLine}
import htsjdk.variant.vcf._
import io.projectglow.common.GlowLogging
import io.projectglow.sql.util.SerializableConfiguration
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

import io.projectglow.common.GlowLogging
import io.projectglow.sql.util.SerializableConfiguration
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

object VCFHeaderUtils extends GlowLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,20 @@ class VCFLineToInternalRowConverter(
}
var i = 0
while (!ctx.isTab && i < typeAndIdx.length) {
if (i == gtIdx) {
ctx.parseCallsAndPhasing(gRow, phasedIdx, callsIdx)
} else if (typeAndIdx(i) == null) {
// Eat this value as a string since we don't need the parsed value
require(false, s"Parsing unneeded field as string")
ctx.parseString(':')
} else {
val (typ, idx) = typeAndIdx(i)
val value = ctx.parseFormatVal(typ)
gRow.update(idx, value)
tryWithWarning(fieldNames(i).asInstanceOf[UTF8String], FieldTypes.FORMAT) {
if (i == gtIdx) {
ctx.parseCallsAndPhasing(gRow, phasedIdx, callsIdx)
} else if (typeAndIdx(i) == null) {
// Eat this value as a string since we don't need the parsed value
ctx.parseString(':')
} else {
val (typ, idx) = typeAndIdx(i)
val value = ctx.parseFormatVal(typ)
gRow.update(idx, value)
}
ctx.eat(':')
i += 1
}
ctx.eat(':')
i += 1
}
ctx.eat('\t')
genotypeHolder(sampleIdx) = gRow
Expand Down Expand Up @@ -318,17 +319,21 @@ class LineCtx(text: Text) {
}

def parseString(extraStopChar1: Byte = '\0', extraStopChar2: Byte = '\0'): UTF8String = {
if (pos >= text.getLength || line(pos) == '.') {
pos += 1
var stop = pos
while (stop < text.getLength && line(stop) != delimiter && line(stop) != '\t' && line(stop) != extraStopChar1 && line(
stop) != extraStopChar2) {
stop += 1
}

if (stop - pos == 0) {
return null
}

val out = UTF8String.fromBytes(line, pos, stop - pos)
pos = stop
if (out == LineCtx.MISSING) {
null
} else {
var stop = pos
while (stop < text.getLength && line(stop) != delimiter && line(stop) != '\t' && line(stop) != extraStopChar1 && line(
stop) != extraStopChar2) {
stop += 1
}
val out = UTF8String.fromBytes(line, pos, stop - pos)
pos = stop
out
}
}
Expand Down Expand Up @@ -540,4 +545,5 @@ object LineCtx {
val POS_NAN = UTF8String.fromString("+nan")
val NEG_NAN = UTF8String.fromString("-nan")
val END = UTF8String.fromString("END")
val MISSING = UTF8String.fromString(".")
}
25 changes: 24 additions & 1 deletion core/src/test/scala/io/projectglow/vcf/VCFDatasourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ class VCFDatasourceSuite extends GlowBaseTest {
line: String,
extraHeaderLines: String = "",
nSamples: Int = 1,
schema: Option[StructType] = None): DataFrame = {
schema: Option[StructType] = None,
options: Map[String, String] = Map.empty): DataFrame = {
val file = Files.createTempFile("test-vcf", ".vcf")
val samples = (1 to nSamples).map(n => s"sample_$n").mkString("\t")
val baseHeader =
Expand All @@ -193,6 +194,7 @@ class VCFDatasourceSuite extends GlowBaseTest {
FileUtils.writeStringToFile(file.toFile, headers + line)
val baseReader = spark
.read
.options(options)
.format(sourceName)
val reader = schema match {
case None => baseReader // infer schema
Expand Down Expand Up @@ -374,6 +376,19 @@ class VCFDatasourceSuite extends GlowBaseTest {
assertThrows[SparkException](ds.collect)
}

test("validation stringency (format fields)") {
def doTest(stringency: String): Unit = {
parseVcfContents(
makeVcfLine(Seq(".", "FIELD", "monkey")), // FIELD is a string instead of an integer
extraHeaderLines = "##FORMAT=<ID=FIELD,Number=1,Type=Integer,Description=\"\"\n",
options = Map("validationStringency" -> stringency)
).collect()
}
intercept[SparkException](doTest("strict"))
doTest("silent")
doTest("lenient")
}

test("invalid validation stringency") {
assertThrows[IllegalArgumentException] {
spark
Expand Down Expand Up @@ -788,6 +803,14 @@ class FastVCFDatasourceSuite extends VCFDatasourceSuite {
Double.PositiveInfinity,
Double.PositiveInfinity))
}

test("read string that starts with .") {
import sess.implicits._
val df = parseVcfContents(
makeVcfLine(Seq("STR=.monkey")),
extraHeaderLines = "##INFO=<ID=STR,Number=1,Type=String,Description=\"\"\n")
assert(df.selectExpr("INFO_STR").as[String].head == ".monkey")
}
}

// For testing only: schema based on CEUTrio VCF header
Expand Down
9 changes: 6 additions & 3 deletions docs/source/etl/variant-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ the DataFrame API using Python, R, Scala, or SQL.
.. code-block:: python
df = spark.read.format("vcf").load(path)
.. invisible-code-block: python
assert_rows_equal(df.select("contigName", "start").head(), Row(contigName='17', start=504217))
Expand Down Expand Up @@ -72,6 +69,12 @@ You can control the behavior of the VCF reader with a few parameters. All parame

Starting from Glow 0.4.0, the ``splitToBiallelic`` option for the VCF reader no longer exists. To split multiallelic variants to biallelics use the :ref:`split_multiallelics<split_multiallelics>` transformer after loading the VCF.

.. note::

Glow includes a VCF reader that uses `htsjdk <https://github.com/samtools/htsjdk>`_ for initial parsing as well as a reader that parses VCF lines to Spark rows directly.

As of release 0.7.0, the direct reader is enabled by default. To use the htsjdk based reader, set the Spark config ``io.projectglow.vcf.fastReaderEnabled`` to ``false``.


.. important:: The VCF reader uses the 0-start, half-open (zero-based) coordinate system. This means
that the ``start`` values in the DataFrame will be 1 lower than the values that appear in the VCF
Expand Down

0 comments on commit 09ce16a

Please sign in to comment.