Skip to content

Commit

Permalink
[SPARK-20523][BUILD] Clean up build warnings for 2.2.0 release
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Fix build warnings primarily related to Breeze 0.13 operator changes, Java style problems

## How was this patch tested?

Existing tests

Author: Sean Owen <[email protected]>

Closes apache#17803 from srowen/SPARK-20523.
  • Loading branch information
srowen committed May 3, 2017
1 parent db2fb84 commit 16fab6b
Show file tree
Hide file tree
Showing 23 changed files with 54 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,9 @@ protected File initRecoveryDb(String dbName) {
// If another DB was initialized first just make sure all the DBs are in the same
// location.
Path newLoc = new Path(_recoveryPath, dbName);
Path copyFrom = new Path(f.toURI());
Path copyFrom = new Path(f.toURI());
if (!newLoc.equals(copyFrom)) {
logger.info("Moving " + copyFrom + " to: " + newLoc);
logger.info("Moving " + copyFrom + " to: " + newLoc);
try {
// The move here needs to handle moving non-empty directories across NFS mounts
FileSystem fs = FileSystem.getLocal(_conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public final class Platform {
boolean _unaligned;
String arch = System.getProperty("os.arch", "");
if (arch.equals("ppc64le") || arch.equals("ppc64")) {
// Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but ppc64 and ppc64le support it
// Since java.nio.Bits.unaligned() doesn't return true on ppc (See JDK-8165231), but
// ppc64 and ppc64le support it
_unaligned = true;
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed();
List<MemoryConsumer> list = sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1070,11 +1070,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched.dagScheduler = mockDAGScheduler
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
assert(manager.isZombie === true)
}
})
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).thenAnswer(
new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
assert(manager.isZombie)
}
})
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import scala.collection.mutable
import scala.language.implicitConversions
import scala.util.Random

import org.scalatest.{BeforeAndAfter, Matchers}
Expand Down
4 changes: 4 additions & 0 deletions dev/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@
files="src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java"/>
<suppress checks="MethodName"
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/>
<suppress checks="MethodName"
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
<suppress checks="MethodName"
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.sql.Timestamp;
import java.util.*;

import scala.Tuple2;

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network.
* <p>
Expand Down
14 changes: 7 additions & 7 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,18 @@ object PageRank extends Logging {
// Propagates the message along outbound edges
// and adding start nodes back in with activation resetProb
val rankUpdates = rankGraph.aggregateMessages[BV[Double]](
ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
(a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
ctx => ctx.sendToDst(ctx.srcAttr *:* ctx.attr),
(a : BV[Double], b : BV[Double]) => a +:+ b, TripletFields.Src)

rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
(vid, oldRank, msgSumOpt) =>
val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) :* (1.0 - resetProb)
val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) *:* (1.0 - resetProb)
val resetActivations = if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid) :* resetProb
sourcesInitMapBC.value(vid) *:* resetProb
} else {
zero
}
popActivations :+ resetActivations
popActivations +:+ resetActivations
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
Expand All @@ -250,9 +250,9 @@ object PageRank extends Logging {
}

// SPARK-18847 If the graph has sinks (vertices with no outgoing edges) correct the sum of ranks
val rankSums = rankGraph.vertices.values.fold(zero)(_ :+ _)
val rankSums = rankGraph.vertices.values.fold(zero)(_ +:+ _)
rankGraph.mapVertices { (vid, attr) =>
Vectors.fromBreeze(attr :/ rankSums)
Vectors.fromBreeze(attr /:/ rankSums)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[ann] class SigmoidLayerModelWithSquaredError
extends FunctionalLayerModel(new FunctionalLayer(new SigmoidFunction)) with LossFunction {
override def loss(output: BDM[Double], target: BDM[Double], delta: BDM[Double]): Double = {
ApplyInPlace(output, target, delta, (o: Double, t: Double) => o - t)
val error = Bsum(delta :* delta) / 2 / output.cols
val error = Bsum(delta *:* delta) / 2 / output.cols
ApplyInPlace(delta, output, delta, (x: Double, o: Double) => x * (o - o * o))
error
}
Expand Down Expand Up @@ -119,6 +119,6 @@ private[ann] class SoftmaxLayerModelWithCrossEntropyLoss extends LayerModel with

override def loss(output: BDM[Double], target: BDM[Double], delta: BDM[Double]): Double = {
ApplyInPlace(output, target, delta, (o: Double, t: Double) => o - t)
-Bsum( target :* brzlog(output)) / output.cols
-Bsum( target *:* brzlog(output)) / output.cols
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class GaussianMixture @Since("2.0.0") (
*/
val cov = {
val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) ^:^ 2.0)
val diagVec = Vectors.fromBreeze(ss)
BLAS.scal(1.0 / numSamples, diagVec)
val covVec = new DenseVector(Array.fill[Double](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class GaussianMixture private (
private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = {
val mu = vectorMean(x)
val ss = BDV.zeros[Double](x(0).length)
x.foreach(xi => ss += (xi - mu) :^ 2.0)
x.foreach(xi => ss += (xi - mu) ^:^ 2.0)
diag(ss / x.length.toDouble)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class LocalLDAModel private[spark] (
docBound += count * LDAUtils.logSumExp(Elogthetad + localElogbeta(idx, ::).t)
}
// E[log p(theta | alpha) - log q(theta | gamma)]
docBound += sum((brzAlpha - gammad) :* Elogthetad)
docBound += sum((brzAlpha - gammad) *:* Elogthetad)
docBound += sum(lgamma(gammad) - lgamma(brzAlpha))
docBound += lgamma(sum(brzAlpha)) - lgamma(sum(gammad))

Expand All @@ -324,7 +324,7 @@ class LocalLDAModel private[spark] (
// Bound component for prob(topic-term distributions):
// E[log p(beta | eta) - log q(beta | lambda)]
val sumEta = eta * vocabSize
val topicsPart = sum((eta - lambda) :* Elogbeta) +
val topicsPart = sum((eta - lambda) *:* Elogbeta) +
sum(lgamma(lambda) - lgamma(eta)) +
sum(lgamma(sumEta) - lgamma(sum(lambda(::, breeze.linalg.*))))

Expand Down Expand Up @@ -721,7 +721,7 @@ class DistributedLDAModel private[clustering] (
val N_wj = edgeContext.attr
val smoothed_N_wk: TopicCounts = edgeContext.dstAttr + (eta - 1.0)
val smoothed_N_kj: TopicCounts = edgeContext.srcAttr + (alpha - 1.0)
val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k
val phi_wk: TopicCounts = smoothed_N_wk /:/ smoothed_N_k
val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0)
val tokenLogLikelihood = N_wj * math.log(phi_wk.dot(theta_kj))
edgeContext.sendToDst(tokenLogLikelihood)
Expand All @@ -748,7 +748,7 @@ class DistributedLDAModel private[clustering] (
if (isTermVertex(vertex)) {
val N_wk = vertex._2
val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0)
val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k
val phi_wk: TopicCounts = smoothed_N_wk /:/ smoothed_N_k
sumPrior + (eta - 1.0) * sum(phi_wk.map(math.log))
} else {
val N_kj = vertex._2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
stats.unpersist()
expElogbetaBc.destroy(false)
val batchResult = statsSum :* expElogbeta.t
val batchResult = statsSum *:* expElogbeta.t

// Note that this is an optimization to avoid batch.count
updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
Expand Down Expand Up @@ -522,7 +522,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {

val dalpha = -(gradf - b) / q

if (all((weight * dalpha + alpha) :> 0D)) {
if (all((weight * dalpha + alpha) >:> 0D)) {
alpha :+= weight * dalpha
this.alpha = Vectors.dense(alpha.toArray)
}
Expand Down Expand Up @@ -584,22 +584,22 @@ private[clustering] object OnlineLDAOptimizer {
val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K
val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K

val phiNorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100 // ids
val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids
var meanGammaChange = 1D
val ctsVector = new BDV[Double](cts) // ids

// Iterate between gamma and phi until convergence
while (meanGammaChange > 1e-3) {
val lastgamma = gammad.copy
// K K * ids ids
gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phiNorm))) :+ alpha
gammad := (expElogthetad *:* (expElogbetad.t * (ctsVector /:/ phiNorm))) +:+ alpha
expElogthetad := exp(LDAUtils.dirichletExpectation(gammad))
// TODO: Keep more values in log space, and only exponentiate when needed.
phiNorm := expElogbetad * expElogthetad :+ 1e-100
phiNorm := expElogbetad * expElogthetad +:+ 1e-100
meanGammaChange = sum(abs(gammad - lastgamma)) / k
}

val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector :/ phiNorm).asDenseMatrix
val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector /:/ phiNorm).asDenseMatrix
(gammad, sstatsd, ids)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[clustering] object LDAUtils {
*/
private[clustering] def logSumExp(x: BDV[Double]): Double = {
val a = max(x)
a + log(sum(exp(x :- a)))
a + log(sum(exp(x -:- a)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
assert(m1.pi ~== m2.pi relTol 0.01)
assert(m1.theta ~== m2.theta relTol 0.01)
}
val testParams = Seq(
val testParams = Seq[(String, Dataset[_])](
("bernoulli", bernoulliDataset),
("multinomial", dataset)
)
Expand Down
4 changes: 0 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@
<url>https://issues.apache.org/jira/browse/SPARK</url>
</issueManagement>

<prerequisites>
<maven>${maven.version}</maven>
</prerequisites>

<mailingLists>
<mailingList>
<name>Dev Mailing List</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.scheduler.cluster

import scala.language.reflectiveCalls

import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public class GroupStateTimeout {
* `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutDuration()`. See documentation
* on `GroupState` for more details.
*/
public static GroupStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
public static GroupStateTimeout ProcessingTimeTimeout() {
return ProcessingTimeTimeout$.MODULE$;
}

/**
* Timeout based on event-time. The event-time timestamp for timeout can be set for each
Expand All @@ -51,4 +53,5 @@ public class GroupStateTimeout {

/** No timeout. */
public static GroupStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

/* invalid json with leading nulls would trigger java.io.CharConversionException
in Jackson's JsonFactory.createParser(byte[]) due to RFC-4627 encoding detection */
val badJson = "\0\0\0A\1AAA"
val badJson = "\u0000\u0000\u0000A\u0001AAA"

test("$.store.bicycle") {
checkEvaluation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;

/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
Expand Down Expand Up @@ -160,7 +159,9 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
if (taskContext != null) {
Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption();
if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
((AccumulatorV2<Integer, Integer>)accu.get()).add(blocks.size());
@SuppressWarnings("unchecked")
AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
intAccum.add(blocks.size());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution

import java.util.Locale

import scala.language.reflectiveCalls

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.test.SharedSQLContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.UUID

import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.reflectiveCalls

import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.AsyncAssertions.Waiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class HiveDDLSuite
if (dbPath.isEmpty) {
hiveContext.sessionState.catalog.defaultTablePath(tableIdentifier)
} else {
new Path(new Path(dbPath.get), tableIdentifier.table)
new Path(new Path(dbPath.get), tableIdentifier.table).toUri
}
val filesystemPath = new Path(expectedTablePath.toString)
val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
Expand Down

0 comments on commit 16fab6b

Please sign in to comment.