Skip to content

Commit

Permalink
ZEPPELIN-273 Spark 1.5 support
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/ZEPPELIN-273

"spark-1.5" profile is added

Author: Lee moon soo <[email protected]>

Closes apache#269 from Leemoonsoo/spark_1.5 and squashes the following commits:

6ba2dce [Lee moon soo] Add missing import after rebase
5279d26 [Lee moon soo] improve
8c19b09 [Lee moon soo] Add SparkVersion enum and test
699b05b [Lee moon soo] Add spark-1.5 profile
67023fa [Lee moon soo] allow spark 1.5
  • Loading branch information
Leemoonsoo committed Sep 4, 2015
1 parent 48f875d commit 754c55e
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 38 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ Spark 1.1.x
```
mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests
```
Spark 1.5.x
```
mvn clean package -Pspark-1.5 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests
```
CDH 5.X
```
mvn clean package -Pspark-1.2 -Dhadoop.version=2.5.0-cdh5.3.0 -Phadoop-2.4 -DskipTests
Expand Down
13 changes: 13 additions & 0 deletions spark-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,19 @@
</dependencies>
</profile>

<profile>
<id>spark-1.5</id>
<properties>
<spark.version>1.5.0</spark.version>
<akka.group>com.typesafe.akka</akka.group>
<akka.version>2.3.11</akka.version>
<protobuf.version>2.5.0</protobuf.version>
</properties>

<dependencies>
</dependencies>
</profile>

<profile>
<id>hadoop-0.23</id>
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void open() {
CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python"));
cmd.addArgument(scriptPath, false);
cmd.addArgument(Integer.toString(port), false);
cmd.addArgument(getJavaSparkContext().version(), false);
cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
executor = new DefaultExecutor();
outputStream = new ByteArrayOutputStream();
PipedOutputStream ps = new PipedOutputStream();
Expand Down Expand Up @@ -286,9 +286,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
}

SparkInterpreter sparkInterpreter = getSparkInterpreter();
if (!sparkInterpreter.getSparkContext().version().startsWith("1.2") &&
!sparkInterpreter.getSparkContext().version().startsWith("1.3") &&
!sparkInterpreter.getSparkContext().version().startsWith("1.4")) {
if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
return new InterpreterResult(Code.ERROR, "pyspark "
+ sparkInterpreter.getSparkContext().version() + " is not supported");
}
Expand Down
37 changes: 13 additions & 24 deletions spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public class SparkInterpreter extends Interpreter {

private Map<String, Object> binder;
private SparkEnv env;
private SparkVersion sparkVersion;


public SparkInterpreter(Properties property) {
Expand Down Expand Up @@ -438,6 +439,8 @@ public void open() {
sc.taskScheduler().rootPool().addSchedulable(pool);
}

sparkVersion = SparkVersion.fromVersionString(sc.version());

sqlc = getSQLContext();

dep = getDependencyResolver();
Expand All @@ -462,15 +465,9 @@ public void open() {
+ "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
intp.interpret("import org.apache.spark.SparkContext._");

if (sc.version().startsWith("1.1")) {
intp.interpret("import sqlContext._");
} else if (sc.version().startsWith("1.2")) {
if (sparkVersion.oldSqlContextImplicits()) {
intp.interpret("import sqlContext._");
} else if (sc.version().startsWith("1.3")) {
intp.interpret("import sqlContext.implicits._");
intp.interpret("import sqlContext.sql");
intp.interpret("import org.apache.spark.sql.functions._");
} else if (sc.version().startsWith("1.4")) {
} else {
intp.interpret("import sqlContext.implicits._");
intp.interpret("import sqlContext.sql");
intp.interpret("import org.apache.spark.sql.functions._");
Expand All @@ -488,14 +485,10 @@ public void open() {
*/

try {
if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
if (sparkVersion.oldLoadFilesMethodName()) {
Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class);
loadFiles.invoke(this.interpreter, settings);
} else if (sc.version().startsWith("1.3")) {
Method loadFiles = this.interpreter.getClass().getMethod(
"org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
loadFiles.invoke(this.interpreter, settings);
} else if (sc.version().startsWith("1.4")) {
} else {
Method loadFiles = this.interpreter.getClass().getMethod(
"org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
loadFiles.invoke(this.interpreter, settings);
Expand Down Expand Up @@ -682,18 +675,10 @@ public int getProgress(InterpreterContext context) {
int[] progressInfo = null;
try {
Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
if (sc.version().startsWith("1.0")) {
if (sparkVersion.getProgress1_0()) {
progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
} else if (sc.version().startsWith("1.1")) {
progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
} else if (sc.version().startsWith("1.2")) {
progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
} else if (sc.version().startsWith("1.3")) {
progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
} else if (sc.version().startsWith("1.4")) {
progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
} else {
continue;
progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
}
} catch (IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException
Expand Down Expand Up @@ -818,4 +803,8 @@ public Scheduler getScheduler() {
public ZeppelinContext getZeppelinContext() {
return z;
}

public SparkVersion getSparkVersion() {
return sparkVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public int getProgress(InterpreterContext context) {
return sparkInterpreter.getProgress(context);
}


@Override
public Scheduler getScheduler() {
if (concurrentSQL()) {
Expand Down
94 changes: 94 additions & 0 deletions spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;

/**
* Provide reading comparing capability of spark version returned from SparkContext.version()
*/
public enum SparkVersion {
SPARK_1_0_0,
SPARK_1_0_1,
SPARK_1_1_0,
SPARK_1_1_1,
SPARK_1_2_0,
SPARK_1_2_1,
SPARK_1_2_2,
SPARK_1_3_0,
SPARK_1_3_1,
SPARK_1_4_0,
SPARK_1_4_1,
SPARK_1_5_0;

private int version;

SparkVersion() {
version = Integer.parseInt(name().substring("SPARK_".length()).replaceAll("_", ""));
}

public int toNumber() {
return version;
}

public String toString() {
return name().substring("SPARK_".length()).replaceAll("_", ".");
}

public static SparkVersion fromVersionString(String versionString) {
for (SparkVersion v : values()) {
if (v.toString().equals(versionString)) {
return v;
}
}
throw new IllegalArgumentException();
}

public boolean isPysparkSupported() {
return this.newerThanEquals(SPARK_1_2_0);
}

public boolean hasDataFrame() {
return this.newerThanEquals(SPARK_1_4_0);
}

public boolean getProgress1_0() {
return this.olderThan(SPARK_1_1_0);
}

public boolean oldLoadFilesMethodName() {
return this.olderThan(SPARK_1_3_0);
}

public boolean oldSqlContextImplicits() {
return this.olderThan(SPARK_1_3_0);
}

public boolean newerThan(SparkVersion versionToCompare) {
return version > versionToCompare.version;
}

public boolean newerThanEquals(SparkVersion versionToCompare) {
return version >= versionToCompare.version;
}

public boolean olderThan(SparkVersion versionToCompare) {
return version < versionToCompare.version;
}

public boolean olderThanEquals(SparkVersion versionToCompare) {
return version <= versionToCompare.version;
}
}
28 changes: 19 additions & 9 deletions spark/src/main/resources/python/zeppelin_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,28 @@ def put(self, key, value):
def get(self, key):
return self.__getitem__(key)

class SparkVersion(object):
SPARK_1_4_0 = 140
SPARK_1_3_0 = 130

def __init__(self, versionNumber):
self.version = versionNumber

def isAutoConvertEnabled(self):
return self.version >= self.SPARK_1_4_0

def isImportAllPackageUnderSparkSql(self):
return self.version >= self.SPARK_1_3_0


output = Logger()
sys.stdout = output
sys.stderr = output

client = GatewayClient(port=int(sys.argv[1]))
sparkVersion = sys.argv[2]
sparkVersion = SparkVersion(int(sys.argv[2]))

if sparkVersion.startswith("1.4"):
if sparkVersion.isAutoConvertEnabled():
gateway = JavaGateway(client, auto_convert = True)
else:
gateway = JavaGateway(client)
Expand All @@ -102,17 +115,14 @@ def get(self, key):

jsc = intp.getJavaSparkContext()

if sparkVersion.startswith("1.2"):
if sparkVersion.isImportAllPackageUnderSparkSql():
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
else:
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
elif sparkVersion.startswith("1.3"):
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
elif sparkVersion.startswith("1.4"):
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")


java_import(gateway.jvm, "scala.Tuple2")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;

import static org.junit.Assert.*;

import org.junit.Test;

public class SparkVersionTest {

@Test
public void testSparkVersion() {
// test equals
assertTrue(SparkVersion.SPARK_1_2_0 == SparkVersion.fromVersionString("1.2.0"));

// test newer than
assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0));
assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_3_0));
assertTrue(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_1_0));

assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_2_0));
assertFalse(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_3_0));
assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_1_0));

// test older than
assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_2_0));
assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_1_0));
assertTrue(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_3_0));

assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_2_0));
assertFalse(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_1_0));
assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_3_0));

// conversion
assertEquals(120, SparkVersion.SPARK_1_2_0.toNumber());
assertEquals("1.2.0", SparkVersion.SPARK_1_2_0.toString());
}
}

0 comments on commit 754c55e

Please sign in to comment.