Skip to content

Commit

Permalink
[SPARK-16964][SQL] Remove private[sql] and private[spark] from sql.ex…
Browse files Browse the repository at this point in the history
…ecution package

## What changes were proposed in this pull request?
This package is meant to be internal, and as a result it does not make sense to mark things as private[sql] or private[spark]. It simply makes debugging harder when Spark developers need to inspect the plans at runtime.

This patch removes all private[sql] and private[spark] visibility modifiers in org.apache.spark.sql.execution.

## How was this patch tested?
N/A - just visibility changes.

Author: Reynold Xin <[email protected]>

Closes apache#14554 from rxin/remote-private.
  • Loading branch information
rxin authored and cloud-fan committed Aug 9, 2016
1 parent 62e6212 commit 511f52f
Show file tree
Hide file tree
Showing 63 changed files with 170 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

/** Holds a cached logical plan and its data */
private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)

/**
* Provides support in a SQLContext for caching query results and automatically using these cached
Expand All @@ -41,7 +41,7 @@ private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMe
*
* Internal to Spark SQL.
*/
private[sql] class CacheManager extends Logging {
class CacheManager extends Logging {

@transient
private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
Expand All @@ -68,13 +68,13 @@ private[sql] class CacheManager extends Logging {
}

/** Clears all cached tables. */
private[sql] def clearCache(): Unit = writeLock {
def clearCache(): Unit = writeLock {
cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
cachedData.clear()
}

/** Checks if the cache is empty. */
private[sql] def isEmpty: Boolean = readLock {
def isEmpty: Boolean = readLock {
cachedData.isEmpty
}

Expand All @@ -83,7 +83,7 @@ private[sql] class CacheManager extends Logging {
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
private[sql] def cacheQuery(
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
Expand All @@ -108,7 +108,7 @@ private[sql] class CacheManager extends Logging {
* Tries to remove the data for the given [[Dataset]] from the cache.
* No operation, if it's already uncached.
*/
private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock {
def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
val found = dataIndex >= 0
Expand All @@ -120,17 +120,17 @@ private[sql] class CacheManager extends Logging {
}

/** Optionally returns cached data for the given [[Dataset]] */
private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
lookupCachedData(query.queryExecution.analyzed)
}

/** Optionally returns cached data for the given [[LogicalPlan]]. */
private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
cachedData.find(cd => plan.sameResult(cd.plan))
}

/** Replaces segments of the given logical plan with cached versions where possible. */
private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
def useCachedData(plan: LogicalPlan): LogicalPlan = {
plan transformDown {
case currentFragment =>
lookupCachedData(currentFragment)
Expand All @@ -143,7 +143,7 @@ private[sql] class CacheManager extends Logging {
* Invalidates the cache of any data that contains `plan`. Note that it is possible that this
* function will over invalidate.
*/
private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
def invalidateCache(plan: LogicalPlan): Unit = writeLock {
cachedData.foreach {
case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
data.cachedRepresentation.recache()
Expand All @@ -155,7 +155,7 @@ private[sql] class CacheManager extends Logging {
* Invalidates the cache of any data that contains `resourcePath` in one or more
* `HadoopFsRelation` node(s) as part of its logical plan.
*/
private[sql] def invalidateCachedPath(
def invalidateCachedPath(
sparkSession: SparkSession, resourcePath: String): Unit = writeLock {
val (fs, qualifiedPath) = {
val path = new Path(resourcePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.Utils

private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
val relation: BaseRelation
val metastoreTableIdentifier: Option[TableIdentifier]

Expand All @@ -48,7 +48,7 @@ private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
}

/** Physical plan node for scanning data from a relation. */
private[sql] case class RowDataSourceScanExec(
case class RowDataSourceScanExec(
output: Seq[Attribute],
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
Expand All @@ -57,7 +57,7 @@ private[sql] case class RowDataSourceScanExec(
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {

private[sql] override lazy val metrics =
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

val outputUnsafeRows = relation match {
Expand Down Expand Up @@ -138,7 +138,7 @@ private[sql] case class RowDataSourceScanExec(
* @param dataFilters Data source filters to use for filtering data within partitions.
* @param metastoreTableIdentifier
*/
private[sql] case class FileSourceScanExec(
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
outputSchema: StructType,
Expand Down Expand Up @@ -211,7 +211,7 @@ private[sql] case class FileSourceScanExec(
inputRDD :: Nil
}

private[sql] override lazy val metrics =
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object RDDConversions {
}
}

private[sql] object ExternalRDD {
object ExternalRDD {

def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = {
val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session)
Expand All @@ -76,7 +76,7 @@ private[sql] object ExternalRDD {
}

/** Logical plan node for scanning data from an RDD. */
private[sql] case class ExternalRDD[T](
case class ExternalRDD[T](
outputObjAttr: Attribute,
rdd: RDD[T])(session: SparkSession)
extends LeafNode with ObjectProducer with MultiInstanceRelation {
Expand All @@ -103,11 +103,11 @@ private[sql] case class ExternalRDD[T](
}

/** Physical plan node for scanning data from an RDD. */
private[sql] case class ExternalRDDScanExec[T](
case class ExternalRDDScanExec[T](
outputObjAttr: Attribute,
rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec {

private[sql] override lazy val metrics = Map(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

protected override def doExecute(): RDD[InternalRow] = {
Expand All @@ -128,7 +128,7 @@ private[sql] case class ExternalRDDScanExec[T](
}

/** Logical plan node for scanning data from an RDD of InternalRow. */
private[sql] case class LogicalRDD(
case class LogicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {
Expand All @@ -155,12 +155,12 @@ private[sql] case class LogicalRDD(
}

/** Physical plan node for scanning data from an RDD of InternalRow. */
private[sql] case class RDDScanExec(
case class RDDScanExec(
output: Seq[Attribute],
rdd: RDD[InternalRow],
override val nodeName: String) extends LeafExecNode {

private[sql] override lazy val metrics = Map(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class ExpandExec(
child: SparkPlan)
extends UnaryExecNode with CodegenSupport {

private[sql] override lazy val metrics = Map(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

// The GroupExpressions can output data with arbitrary partitioning, so set it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.sql.execution
* the list of paths that it returns will be returned to a user who calls `inputPaths` on any
* DataFrame that queries this relation.
*/
private[sql] trait FileRelation {
trait FileRelation {
/** Returns the list of files that will be read when scanning this relation. */
def inputFiles: Array[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ case class GenerateExec(
child: SparkPlan)
extends UnaryExecNode {

private[sql] override lazy val metrics = Map(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override def producedAttributes: AttributeSet = AttributeSet(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* Physical plan node for scanning data from a local collection.
*/
private[sql] case class LocalTableScanExec(
case class LocalTableScanExec(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafExecNode {

private[sql] override lazy val metrics = Map(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

private val unsafeRows: Array[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow
* iterator to consume the next row, whereas RowIterator combines these calls into a single
* [[advanceNext()]] method.
*/
private[sql] abstract class RowIterator {
abstract class RowIterator {
/**
* Advance this iterator by a single row. Returns `false` if this iterator has no more rows
* and `true` otherwise. If this returns `true`, then the new row can be retrieved by calling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
SparkListenerSQLExecutionStart}

private[sql] object SQLExecution {
object SQLExecution {

val EXECUTION_ID_KEY = "spark.sql.execution.id"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.unsafe.sort.RadixSort;

/**
* Performs (external) sorting.
Expand All @@ -52,7 +50,7 @@ case class SortExec(

private val enableRadixSort = sqlContext.conf.enableRadixSort

override private[sql] lazy val metrics = Map(
override lazy val metrics = Map(
"sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Return all metadata that describes more details of this SparkPlan.
*/
private[sql] def metadata: Map[String, String] = Map.empty
def metadata: Map[String, String] = Map.empty

/**
* Return all metrics containing metrics of this SparkPlan.
*/
private[sql] def metrics: Map[String, SQLMetric] = Map.empty
def metrics: Map[String, SQLMetric] = Map.empty

/**
* Reset all the metrics.
*/
private[sql] def resetMetrics(): Unit = {
def resetMetrics(): Unit = {
metrics.valuesIterator.foreach(_.reset())
}

/**
* Return a LongSQLMetric according to the name.
*/
private[sql] def longMetric(name: String): SQLMetric = metrics(name)
def longMetric(name: String): SQLMetric = metrics(name)

// TODO: Move to `DistributedPlan`
/** Specifies how data is partitioned across different nodes in the cluster. */
Expand Down Expand Up @@ -395,7 +395,7 @@ object SparkPlan {
ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
}

private[sql] trait LeafExecNode extends SparkPlan {
trait LeafExecNode extends SparkPlan {
override def children: Seq[SparkPlan] = Nil
override def producedAttributes: AttributeSet = outputSet
}
Expand All @@ -407,15 +407,15 @@ object UnaryExecNode {
}
}

private[sql] trait UnaryExecNode extends SparkPlan {
trait UnaryExecNode extends SparkPlan {
def child: SparkPlan

override def children: Seq[SparkPlan] = child :: Nil

override def outputPartitioning: Partitioning = child.outputPartitioning
}

private[sql] trait BinaryExecNode extends SparkPlan {
trait BinaryExecNode extends SparkPlan {
def left: SparkPlan
def right: SparkPlan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SparkPlanInfo(
}
}

private[sql] object SparkPlanInfo {
private[execution] object SparkPlanInfo {

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, SaveMode, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -43,13 +42,12 @@ import org.apache.spark.sql.streaming.StreamingQuery
* writing libraries should instead consider using the stable APIs provided in
* [[org.apache.spark.sql.sources]]
*/
@DeveloperApi
abstract class SparkStrategy extends GenericStrategy[SparkPlan] {

override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan)
}

private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
case class PlanLater(plan: LogicalPlan) extends LeafExecNode {

override def output: Seq[Attribute] = plan.output

Expand All @@ -58,7 +56,7 @@ private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
}
}

private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SparkPlanner =>

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ import org.apache.spark.unsafe.Platform
*
* @param numFields the number of fields in the row being serialized.
*/
private[sql] class UnsafeRowSerializer(
class UnsafeRowSerializer(
numFields: Int,
dataSize: SQLMetric = null) extends Serializer with Serializable {
override def newInstance(): SerializerInstance =
new UnsafeRowSerializerInstance(numFields, dataSize)
override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
override def supportsRelocationOfSerializedObjects: Boolean = true
}

private class UnsafeRowSerializerInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override private[sql] lazy val metrics = Map(
override lazy val metrics = Map(
"pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
WholeStageCodegenExec.PIPELINE_DURATION_METRIC))

Expand Down
Loading

0 comments on commit 511f52f

Please sign in to comment.