Skip to content

Commit

Permalink
ZEPPELIN-2685. Improvement on Interpreter class
Browse files Browse the repository at this point in the history
### What is this PR for?
Follow up of apache#2577. Main changes on Interpreter
* Add throw `InterpreterException` which is checked exception for the abstract methods of `Interpreter`, this would enforce the interpreter implementation to throw `InterpreterException`.
* field name refactoring.

     * `property` -> `properties`
     * `getProperty()` --> `getProperties()`
* Introduce launcher layer for interpreter launching. Currently we only use shell script to launch interpreter, but it could be any other service or component to launch interpreter, such as livy server , other 3rd party tools or even we may create a separate module for interpreter launcher

     * abstract cass `InterpreterLauncher`
     * For now, only 2 implementation: `ShellScriptLauncher` & `SparkInterpreterLauncher`. We could add method in class `Interpreter` to allow interpreter to specify its own launcher class, but it could be future work.

### What type of PR is it?
[Improvement | Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2685

### How should this be tested?
Unit test is covered. `ShellScriptLauncherTest` & `SparkInterpreterLauncherTest`

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <[email protected]>

Closes apache#2592 from zjffdu/ZEPPELIN-2685 and squashes the following commits:

17dc2f1 [Jeff Zhang] address comments
e545cc3 [Jeff Zhang] ZEPPELIN-2685. Improvement on Interpreter class
  • Loading branch information
zjffdu committed Oct 14, 2017
1 parent ed8755d commit 9812e26
Show file tree
Hide file tree
Showing 94 changed files with 1,287 additions and 605 deletions.
2 changes: 1 addition & 1 deletion bin/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
export JAVA_OPTS

JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then
if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
else
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
Expand Down
1 change: 1 addition & 0 deletions bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then

if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
else
# autodetect HADOOP_CONF_HOME by heuristic
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
Expand Down
6 changes: 6 additions & 0 deletions docs/interpreter/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,19 @@ For example,
* **local[*]** in local mode
* **spark://master:7077** in standalone cluster
* **yarn-client** in Yarn client mode
* **yarn-cluster** in Yarn cluster mode
* **mesos://host:5050** in Mesos cluster

That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way.
For the further information about Spark & Zeppelin version compatibility, please refer to "Available Interpreters" section in [Zeppelin download page](https://zeppelin.apache.org/download.html).

> Note that without exporting `SPARK_HOME`, it's running in local mode with included version of Spark. The included version may vary depending on the build profile.
### 3. Yarn mode
Zeppelin support both yarn client and yarn cluster mode (yarn cluster mode is supported from 0.8.0). For yarn mode, you must specify `SPARK_HOME` & `HADOOP_CONF_DIR`.
You can either specify them in `zeppelin-env.sh`, or in interpreter setting page. Specifying them in `zeppelin-env.sh` means you can use only one version of `spark` & `hadoop`. Specifying them
in interpreter setting page means you can use multiple versions of `spark` & `hadoop` in one zeppelin instance.

## SparkContext, SQLContext, SparkSession, ZeppelinContext
SparkContext, SQLContext and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext` and `z`, respectively, in Scala, Python and R environments.
Staring from 0.6.1 SparkSession is available as variable `spark` when you are using Spark 2.x.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public ElasticsearchInterpreter(Properties property) {

@Override
public void open() {
logger.info("Properties: {}", getProperty());
logger.info("Properties: {}", getProperties());

String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE);
clientType = clientType == null ? null : clientType.toLowerCase();
Expand All @@ -123,15 +123,15 @@ public void open() {
catch (final NumberFormatException e) {
this.resultSize = 10;
logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
property.get(ELASTICSEARCH_RESULT_SIZE), e);
getProperty(ELASTICSEARCH_RESULT_SIZE), e);
}

try {
if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) {
elsClient = new TransportBasedClient(getProperty());
elsClient = new TransportBasedClient(getProperties());
}
else if ("http".equals(clientType)) {
elsClient = new HttpBasedClient(getProperty());
elsClient = new HttpBasedClient(getProperties());
}
else {
logger.error("Unknown type of Elasticsearch client: " + clientType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
Expand Down Expand Up @@ -86,7 +87,7 @@ public void parseArgs() {

// Functions that each file system implementation must override

public abstract String listAll(String path);
public abstract String listAll(String path) throws InterpreterException;

public abstract boolean isDirectory(String path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public String listFile(String filePath) {
return "No such File or directory";
}

public String listAll(String path) {
public String listAll(String path) throws InterpreterException {
String all = "";
if (exceptionOnConnect != null)
return "Error connecting to provided endpoint.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.zeppelin.flink;

import java.lang.reflect.InvocationTargetException;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
Expand All @@ -34,10 +33,8 @@
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
Expand All @@ -46,11 +43,8 @@
import org.slf4j.LoggerFactory;

import scala.Console;
import scala.None;
import scala.Option;
import scala.Some;
import scala.collection.JavaConversions;
import scala.collection.immutable.Nil;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.AbstractFunction0;
import scala.tools.nsc.Settings;
Expand Down Expand Up @@ -80,7 +74,7 @@ public FlinkInterpreter(Properties property) {
public void open() {
out = new ByteArrayOutputStream();
flinkConf = new org.apache.flink.configuration.Configuration();
Properties intpProperty = getProperty();
Properties intpProperty = getProperties();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
Expand Down
6 changes: 3 additions & 3 deletions groovy/src/main/java/org/apache/zeppelin/groovy/GObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,12 @@ public void run(String paragraphId) {
@ZeppelinApi
public void run(String noteId, String paragraphId, InterpreterContext context) {
if (paragraphId.equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
throw new RuntimeException("Can not run current Paragraph");
}
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, paragraphId,
context);
if (runners.size() <= 0) {
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
throw new RuntimeException("Paragraph " + paragraphId + " not found " + runners.size());
}
for (InterpreterContextRunner r : runners) {
r.run();
Expand All @@ -338,7 +338,7 @@ public void runNote(String noteId, InterpreterContext context) {
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);

if (runners.size() <= 0) {
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
throw new RuntimeException("Note " + noteId + " not found " + runners.size());
}

for (InterpreterContextRunner r : runners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@

package org.apache.zeppelin.groovy;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.io.PrintWriter;
import java.io.File;
import java.util.*;

import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
Expand All @@ -40,7 +36,6 @@
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.runtime.ResourceGroovyMethods;
import org.codehaus.groovy.runtime.StackTraceUtils;

import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -167,7 +162,7 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr
//put shared bindings evaluated in this interpreter
bindings.putAll(sharedBindings);
//put predefined bindings
bindings.put("g", new GObject(log, out, this.getProperty(), contextInterpreter, bindings));
bindings.put("g", new GObject(log, out, this.getProperties(), contextInterpreter, bindings));
bindings.put("out", new PrintWriter(out, true));

script.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public HbaseInterpreter(Properties property) {
}

@Override
public void open() {
public void open() throws InterpreterException {
this.scriptingContainer = new ScriptingContainer(LocalContextScope.SINGLETON);
this.writer = new StringWriter();
scriptingContainer.setOutput(this.writer);
Expand All @@ -88,7 +88,7 @@ public void open() {
}

logger.info("Absolute Ruby Source:" + abs_ruby_src.toString());
// hirb.rb:41 requires the following system property to be set.
// hirb.rb:41 requires the following system properties to be set.
Properties sysProps = System.getProperties();
sysProps.setProperty(HBASE_RUBY_SRC, abs_ruby_src.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.zeppelin.hbase;

import org.apache.log4j.BasicConfigurator;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -35,7 +36,7 @@ public class HbaseInterpreterTest {
private static HbaseInterpreter hbaseInterpreter;

@BeforeClass
public static void setUp() throws NullPointerException {
public static void setUp() throws NullPointerException, InterpreterException {
BasicConfigurator.configure();
Properties properties = new Properties();
properties.put("hbase.home", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void rerun() {
}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
this.context = context;
try {
return interpreterEvent.interpret(st, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void open() {
}

@Override
public void close() {
public void close() throws InterpreterException {
try {
if (conn != null) {
conn.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void setUp() {
}

@After
public void tearDown() {
public void tearDown() throws InterpreterException {
intp.close();
ignite.close();
}
Expand Down
25 changes: 12 additions & 13 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
Expand Down Expand Up @@ -172,7 +170,7 @@ public HashMap<String, Properties> getPropertiesMap() {
@Override
public void open() {
super.open();
for (String propertyKey : property.stringPropertyNames()) {
for (String propertyKey : properties.stringPropertyNames()) {
logger.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
if (2 == keyValue.length) {
Expand All @@ -185,7 +183,7 @@ public void open() {
prefixProperties = new Properties();
basePropretiesMap.put(keyValue[0].trim(), prefixProperties);
}
prefixProperties.put(keyValue[1].trim(), property.getProperty(propertyKey));
prefixProperties.put(keyValue[1].trim(), getProperty(propertyKey));
}
}

Expand All @@ -211,8 +209,8 @@ public void open() {


protected boolean isKerboseEnabled() {
if (!isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
if (!isEmpty(getProperty("zeppelin.jdbc.auth.type"))) {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(properties);
if (authType.equals(KERBEROS)) {
return true;
}
Expand Down Expand Up @@ -356,7 +354,7 @@ private void closeDBPool(String user, String propertyKey) throws SQLException {
}

private void setUserProperty(String propertyKey, InterpreterContext interpreterContext)
throws SQLException, IOException {
throws SQLException, IOException, InterpreterException {

String user = interpreterContext.getAuthenticationInfo().getUser();

Expand Down Expand Up @@ -424,18 +422,19 @@ public Connection getConnection(String propertyKey, InterpreterContext interpret
final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey);
final String url = properties.getProperty(URL_KEY);

if (isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
if (isEmpty(getProperty("zeppelin.jdbc.auth.type"))) {
connection = getConnectionFromPool(url, user, propertyKey, properties);
} else {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
UserGroupInformation.AuthenticationMethod authType =
JDBCSecurityImpl.getAuthtype(getProperties());

final String connectionUrl = appendProxyUserToURL(url, user, propertyKey);

JDBCSecurityImpl.createSecureConfiguration(property, authType);
JDBCSecurityImpl.createSecureConfiguration(getProperties(), authType);
switch (authType) {
case KERBEROS:
if (user == null || "false".equalsIgnoreCase(
property.getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
} else {
if (basePropretiesMap.get(propertyKey).containsKey("proxy.user.property")) {
Expand Down Expand Up @@ -497,7 +496,7 @@ private String appendProxyUserToURL(String url, String user, String propertyKey)
return connectionUrl.toString();
}

private String getPassword(Properties properties) throws IOException {
private String getPassword(Properties properties) throws IOException, InterpreterException {
if (isNotEmpty(properties.getProperty(PASSWORD_KEY))) {
return properties.getProperty(PASSWORD_KEY);
} else if (isNotEmpty(properties.getProperty(JDBC_JCEKS_FILE))
Expand Down Expand Up @@ -850,7 +849,7 @@ public Scheduler getScheduler() {

@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
List<InterpreterCompletion> candidates = new ArrayList<>();
String propertyKey = getPropertyKey(buf);
String sqlCompleterKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.zeppelin.completer.CompletionType;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.FIFOScheduler;
Expand Down Expand Up @@ -349,7 +350,7 @@ public void concurrentSettingTest() {
}

@Test
public void testAutoCompletion() throws SQLException, IOException {
public void testAutoCompletion() throws SQLException, IOException, InterpreterException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
Expand Down
Loading

0 comments on commit 9812e26

Please sign in to comment.