Skip to content

Commit

Permalink
[SC-69350][DELTA] Do not store write options in the catalog for Delta
Browse files Browse the repository at this point in the history
Write options such as `replaceWhere` and `mergeSchema` can be stored in the transaction log, as well as the catalog when using the DataFrameWriter with `saveAsTable`. This has been a bug, as write options should not be stored in the transaction log. Nor do we need to store anything in the Catalog for Delta. This PR cleans up these properties from the catalog as well as the transaction log.

However, there may be users who depend on this behavior. Therefore we do two things:
  1. Introduce a legacy flag so that users can revert to the old behavior
  2. We continue to store any Delta specific configurations, which are prefixed by `delta.` to the transaction log

Unit test

Author: Burak Yavuz <[email protected]>

GitOrigin-RevId: 3bf8db1b25f94096b13855a806a24122871a1585
  • Loading branch information
brkyvz authored and rahulsmahadev committed Mar 9, 2021
1 parent 577f101 commit c01396f
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.delta.catalog

import java.util
import java.util.Locale

// scalastyle:off import.ordering.noEmptyLine
import scala.collection.JavaConverters._
Expand All @@ -28,7 +29,7 @@ import org.apache.spark.sql.delta.DeltaTableIdentifier.gluePermissionError
import org.apache.spark.sql.delta.commands.{AlterTableAddColumnsDeltaCommand, AlterTableChangeColumnDeltaCommand, AlterTableSetLocationDeltaCommand, AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, CreateDeltaTableCommand, TableCreationModes}
import org.apache.spark.sql.delta.commands.{AlterTableAddConstraintDeltaCommand, AlterTableDropConstraintDeltaCommand, WriteIntoDelta}
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -65,8 +66,9 @@ class DeltaCatalog extends DelegatingCatalogExtension
* @param ident The identifier of the table
* @param schema The schema of the table
* @param partitions The partition transforms for the table
* @param properties The table properties. Right now it also includes write options for backwards
* compatibility
* @param allTableProperties The table properties that configure the behavior of the table or
* provide information about the table
* @param writeOptions Options specific to the write during table creation or replacement
* @param sourceQuery A query if this CREATE request came from a CTAS or RTAS
* @param operation The specific table creation mode, whether this is a Create/Replace/Create or
* Replace
Expand All @@ -75,12 +77,13 @@ class DeltaCatalog extends DelegatingCatalogExtension
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String],
allTableProperties: util.Map[String, String],
writeOptions: Map[String, String],
sourceQuery: Option[DataFrame],
operation: TableCreationModes.CreationMode): Table = {
// These two keys are properties in data source v2 but not in v1, so we have to filter
// These two keys are tableProperties in data source v2 but not in v1, so we have to filter
// them out. Otherwise property consistency checks will fail.
val tableProperties = properties.asScala.filterKeys {
val tableProperties = allTableProperties.asScala.filterKeys {
case TableCatalog.PROP_LOCATION => false
case TableCatalog.PROP_PROVIDER => false
case TableCatalog.PROP_COMMENT => false
Expand All @@ -96,15 +99,16 @@ class DeltaCatalog extends DelegatingCatalogExtension
val location = if (isByPath) {
Option(ident.name())
} else {
Option(properties.get("location"))
Option(allTableProperties.get("location"))
}
val locUriOpt = location.map(CatalogUtils.stringToURI)
val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
val storage = DataSource.buildStorageFormatFromOptions(writeOptions)
.copy(locationUri = locUriOpt)
val tableType =
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
val id = TableIdentifier(ident.name(), ident.namespace().lastOption)
val loc = new Path(locUriOpt.getOrElse(spark.sessionState.catalog.defaultTablePath(id)))
val commentOpt = Option(allTableProperties.get("comment"))

val tableDesc = new CatalogTable(
identifier = id,
Expand All @@ -115,7 +119,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap,
comment = Option(properties.get("comment")))
comment = commentOpt)
// END: copy-paste from the super method finished.

val withDb = verifyTableAndSolidify(tableDesc, None)
Expand All @@ -127,7 +131,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
operation.mode,
new DeltaOptions(withDb.storage.properties, spark.sessionState.conf),
withDb.partitionColumnNames,
withDb.properties + ("comment" -> properties.get("comment")),
withDb.properties ++ commentOpt.map("comment" -> _),
df)
}

Expand Down Expand Up @@ -180,7 +184,13 @@ class DeltaCatalog extends DelegatingCatalogExtension
properties: util.Map[String, String]): Table = {
if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
createDeltaTable(
ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)
ident,
schema,
partitions,
properties,
Map.empty,
sourceQuery = None,
TableCreationModes.Create)
} else {
super.createTable(ident, schema, partitions, properties)
}
Expand Down Expand Up @@ -320,14 +330,45 @@ class DeltaCatalog extends DelegatingCatalogExtension
operation: TableCreationModes.CreationMode) extends StagedTable with SupportsWrite {

private var asSelectQuery: Option[DataFrame] = None
private var writeOptions: Map[String, String] = properties.asScala.toMap
private var writeOptions: Map[String, String] = Map.empty

override def commitStagedChanges(): Unit = {
val conf = spark.sessionState.conf
val props = new util.HashMap[String, String]()
// Options passed in through the SQL API will show up both with an "option." prefix and
// without in Spark 3.1, so we need to remove those from the properties
val optionsThroughProperties = properties.asScala.collect {
case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
}.toSet
val sqlWriteOptions = new util.HashMap[String, String]()
properties.asScala.foreach { case (k, v) =>
if (!k.startsWith("option.") && !optionsThroughProperties.contains(k)) {
// Do not add to properties
props.put(k, v)
} else if (optionsThroughProperties.contains(k)) {
sqlWriteOptions.put(k, v)
}
}
if (writeOptions.isEmpty && !sqlWriteOptions.isEmpty) {
writeOptions = sqlWriteOptions.asScala.toMap
}
if (conf.getConf(DeltaSQLConf.DELTA_LEGACY_STORE_WRITER_OPTIONS_AS_PROPS)) {
// Legacy behavior
writeOptions.foreach { case (k, v) => props.put(k, v) }
} else {
writeOptions.foreach { case (k, v) =>
// Continue putting in Delta prefixed options to avoid breaking workloads
if (k.toLowerCase(Locale.ROOT).startsWith("delta.")) {
props.put(k, v)
}
}
}
createDeltaTable(
ident,
schema,
partitions,
writeOptions.asJava,
props,
writeOptions,
asSelectQuery,
operation)
}
Expand All @@ -339,11 +380,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
override def capabilities(): util.Set[TableCapability] = Set(V1_BATCH_WRITE).asJava

override def newWriteBuilder(info: LogicalWriteInfo): V1WriteBuilder = {
// TODO: We now pass both properties and options into CreateDeltaTableCommand, because
// it wasn't supported in the initial APIs, but with DFWriterV2, we should actually separate
// them
val combinedProps = info.options.asCaseSensitiveMap().asScala ++ properties.asScala
writeOptions = combinedProps.toMap
writeOptions = info.options.asCaseSensitiveMap().asScala.toMap
new DeltaV1WriteBuilder
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,21 @@ case class CreateDeltaTableCommand(

/** Clean up the information we pass on to store in the catalog. */
private def cleanupTableDefinition(table: CatalogTable, snapshot: Snapshot): CatalogTable = {
// These actually have no effect on the usability of Delta, but feature flagging legacy
// behavior for now
val storageProps = if (conf.getConf(DeltaSQLConf.DELTA_LEGACY_STORE_WRITER_OPTIONS_AS_PROPS)) {
// Legacy behavior
table.storage
} else {
table.storage.copy(properties = Map.empty)
}

table.copy(
schema = new StructType(),
properties = Map.empty,
partitionColumnNames = Nil,
// Remove write specific options when updating the catalog
storage = storageProps,
tracksPartitionsInCatalog = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,20 @@ trait DeltaSQLConfBase {
|""".stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_LEGACY_STORE_WRITER_OPTIONS_AS_PROPS =
buildConf("legacy.storeOptionsAsProperties")
.internal()
.doc("""
|Delta was unintentionally storing options provided by the DataFrameWriter in the
|saveAsTable method as table properties in the transaction log. This was unsupported
|behavior (it was a bug), and it has security implications (accidental storage of
|credentials). This flag prevents the storage of arbitrary options as table properties.
|Set this flag to true to continue setting non-delta prefixed table properties through
|table options.
|""".stripMargin)
.booleanConf
.createWithDefault(false)
}

object DeltaSQLConf extends DeltaSQLConfBase
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.language.implicitConversions

import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -1630,15 +1631,6 @@ class DeltaTableCreationSuite
assert(deltaLog.snapshot.version === 1)
assert(deltaLog.snapshot.schema === new StructType().add("col", "string"))

val e = intercept[IllegalArgumentException] {
sql(
s"""REPLACE TABLE delta_test (col2 string)
|USING delta
|LOCATION '${dir.getAbsolutePath}'
|OPTIONS (overwriteSchema = 'false')
""".stripMargin)
}
assert(e.getMessage.contains("overwriteSchema is not allowed"))

val e2 = intercept[AnalysisException] {
sql(
Expand Down Expand Up @@ -1829,4 +1821,103 @@ class DeltaTableCreationSuite
Seq("format", "description"))
}
}

/**
* Verifies that the correct table properties are stored in the transaction log as well as the
* catalog.
*/
private def verifyTableProperties(
tableName: String,
deltaLogPropertiesContains: Seq[String],
deltaLogPropertiesMissing: Seq[String],
catalogStorageProps: Seq[String] = Nil): Unit = {
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))

if (catalogStorageProps.isEmpty) {
assert(table.storage.properties.isEmpty)
} else {
assert(catalogStorageProps.forall(table.storage.properties.contains),
s"Catalog didn't contain properties: ${catalogStorageProps}.\n" +
"Catalog: ${table.storage.properties}")
}
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName))

deltaLogPropertiesContains.foreach { prop =>
assert(deltaLog.snapshot.getProperties.contains(prop))
}

deltaLogPropertiesMissing.foreach { prop =>
assert(!deltaLog.snapshot.getProperties.contains(prop))
}
}

test("do not store write options in the catalog - DataFrameWriter") {
withTempDir { dir =>
withTable("t") {
spark.range(10).write.format("delta")
.option("path", dir.getCanonicalPath)
.option("mergeSchema", "true")
.option("delta.appendOnly", "true")
.saveAsTable("t")

verifyTableProperties(
"t",
// Still allow delta prefixed confs
Seq("delta.appendOnly"),
Seq("mergeSchema")
)
// Sanity check that table is readable
checkAnswer(spark.table("t"), spark.range(10).toDF())
}
}
}

test("do not store write options in the catalog - DataFrameWriterV2") {
withTempDir { dir =>
withTable("t") {
spark.range(10).writeTo("t").using("delta")
.option("path", dir.getCanonicalPath)
.option("mergeSchema", "true")
.option("delta.appendOnly", "true")
.tableProperty("key", "value")
.create()

verifyTableProperties(
"t",
Seq(
"delta.appendOnly", // Still allow delta prefixed confs
"key" // Explicit properties should work
),
Seq("mergeSchema")
)
// Sanity check that table is readable
checkAnswer(spark.table("t"), spark.range(10).toDF())
}
}
}

test("do not store write options in the catalog - legacy flag") {
withTempDir { dir =>
withTable("t") {
withSQLConf(DeltaSQLConf.DELTA_LEGACY_STORE_WRITER_OPTIONS_AS_PROPS.key -> "true") {
spark.range(10).write.format("delta")
.option("path", dir.getCanonicalPath)
.option("mergeSchema", "true")
.option("delta.appendOnly", "true")
.saveAsTable("t")

verifyTableProperties(
"t",
// Everything gets stored in the transaction log
Seq("delta.appendOnly", "mergeSchema"),
Nil,
// Things get stored in the catalog props as well
Seq("delta.appendOnly", "mergeSchema")
)

checkAnswer(spark.table("t"), spark.range(10).toDF())
}
}
}
}
}

0 comments on commit c01396f

Please sign in to comment.