Skip to content

Commit

Permalink
[ZEPPELIN-3778] Cluster synchronize notes & authorization
Browse files Browse the repository at this point in the history
### What is this PR for?
In cluster mode, The user creates, modifies, and deletes the note on any of the zeppelin servers.
All need to be notified to all the zeppelin servers in the cluster to synchronize the update of Notebook. Failure to do so will result in the user not being able to continue while switching to another server.

1. Listen for note create/delete/update events
2. Listen for note Notebook Authorization events
3. Broadcast note update event

### What type of PR is it?
[Feature]

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3778

### How should this be tested?
* [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/546647637)

### Screenshots (if appropriate)
### Sync note event
![syncNote](https://user-images.githubusercontent.com/3677382/59598792-9a50fc80-912f-11e9-940c-c983dff1f373.gif)

### Sync note Authorization
![sync-auth](https://user-images.githubusercontent.com/3677382/59598816-a5a42800-912f-11e9-93fd-9fba426ea7d6.gif)

### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?

Author: Xun Liu <[email protected]>

Closes apache#3387 from liuxunorg/ZEPPELIN-3778 and squashes the following commits:

437d108 [Xun Liu] Organize the code format
05b75ad [Xun Liu] [ZEPPELIN-3778] Cluster synchronize notes & authorization
  • Loading branch information
xunliu committed Jun 24, 2019
1 parent c8a82bc commit 0845d35
Show file tree
Hide file tree
Showing 14 changed files with 1,450 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,10 @@
import org.apache.zeppelin.cluster.event.ClusterEventListener;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -76,9 +67,15 @@ public class ClusterManagerServer extends ClusterManager {
// Connect to the interpreter process that has been created
public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS";

private List<ClusterEventListener> clusterEventListeners = new ArrayList<>();
private List<ClusterEventListener> clusterIntpEventListeners = new ArrayList<>();
private List<ClusterEventListener> clusterNoteEventListeners = new ArrayList<>();
private List<ClusterEventListener> clusterAuthEventListeners = new ArrayList<>();

// zeppelin cluster event
public static String ZEPL_CLUSTER_EVENT_TOPIC = "ZEPL_CLUSTER_EVENT_TOPIC";
public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC";
public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC";
public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC";
public static String CLUSTER_NB_AUTH_EVENT_TOPIC = "CLUSTER_NB_AUTH_EVENT_TOPIC";

private ClusterManagerServer() {
super();
Expand Down Expand Up @@ -206,8 +203,12 @@ public BroadcastService getBroadcastService() {
raftServer = builder.build();
raftServer.bootstrap(clusterMemberIds);

messagingService.registerHandler(ZEPL_CLUSTER_EVENT_TOPIC,
subscribeClusterEvent, MoreExecutors.directExecutor());
messagingService.registerHandler(CLUSTER_INTP_EVENT_TOPIC,
subscribeClusterIntpEvent, MoreExecutors.directExecutor());
messagingService.registerHandler(CLUSTER_NOTE_EVENT_TOPIC,
subscribeClusterNoteEvent, MoreExecutors.directExecutor());
messagingService.registerHandler(CLUSTER_AUTH_EVENT_TOPIC,
subscribeClusterAuthEvent, MoreExecutors.directExecutor());

LOGGER.info("RaftServer run() <<<");
}
Expand Down Expand Up @@ -273,12 +274,12 @@ public HashMap<String, Object> getIdleNodeMeta() {
return idleNodeMeta;
}

public void unicastClusterEvent(String host, int port, String msg) {
public void unicastClusterEvent(String host, int port, String topic, String msg) {
LOGGER.info("send unicastClusterEvent message {}", msg);

Address address = Address.from(host, port);
CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
topic, msg.getBytes(), Duration.ofSeconds(2));
response.whenComplete((r, e) -> {
if (null == e) {
LOGGER.error(e.getMessage(), e);
Expand All @@ -288,7 +289,7 @@ public void unicastClusterEvent(String host, int port, String msg) {
});
}

public void broadcastClusterEvent(String msg) {
public void broadcastClusterEvent(String topic, String msg) {
LOGGER.info("send broadcastClusterEvent message {}", msg);

for (Node node : clusterNodes) {
Expand All @@ -299,7 +300,7 @@ public void broadcastClusterEvent(String msg) {
}

CompletableFuture<byte[]> response = messagingService.sendAndReceive(node.address(),
ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
topic, msg.getBytes(), Duration.ofSeconds(2));
response.whenComplete((r, e) -> {
if (null == e) {
LOGGER.error(e.getMessage(), e);
Expand All @@ -310,18 +311,51 @@ public void broadcastClusterEvent(String msg) {
}
}

private BiFunction<Address, byte[], byte[]> subscribeClusterEvent = (address, data) -> {
private BiFunction<Address, byte[], byte[]> subscribeClusterIntpEvent = (address, data) -> {
String message = new String(data);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("subscribeClusterIntpEvent() {}", message);
}
for (ClusterEventListener eventListener : clusterIntpEventListeners) {
eventListener.onClusterEvent(message);
}

return null;
};

private BiFunction<Address, byte[], byte[]> subscribeClusterNoteEvent = (address, data) -> {
String message = new String(data);
LOGGER.info("subscribeClusterEvent() {}", message);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("subscribeClusterNoteEvent() {}", message);
}
for (ClusterEventListener eventListener : clusterNoteEventListeners) {
eventListener.onClusterEvent(message);
}

for (ClusterEventListener eventListener : clusterEventListeners) {
return null;
};

private BiFunction<Address, byte[], byte[]> subscribeClusterAuthEvent = (address, data) -> {
String message = new String(data);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("subscribeClusterAuthEvent() {}", message);
}
for (ClusterEventListener eventListener : clusterAuthEventListeners) {
eventListener.onClusterEvent(message);
}

return null;
};

public void addClusterEventListeners(ClusterEventListener listener) {
clusterEventListeners.add(listener);
public void addClusterEventListeners(String topic, ClusterEventListener listener) {
if (StringUtils.equals(topic, CLUSTER_INTP_EVENT_TOPIC)) {
clusterIntpEventListeners.add(listener);
} else if (StringUtils.equals(topic, CLUSTER_NOTE_EVENT_TOPIC)) {
clusterNoteEventListeners.add(listener);
} else if (StringUtils.equals(topic, CLUSTER_AUTH_EVENT_TOPIC)) {
clusterAuthEventListeners.add(listener);
} else {
LOGGER.error("Unknow cluster event topic : {}", topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,21 @@
* Cluster Event
*/
public enum ClusterEvent {
CREATE_INTP_PROCESS
// CLUSTER_INTP_EVENT_TOPIC
CREATE_INTP_PROCESS,
// CLUSTER_NOTE_EVENT_TOPIC
BROADCAST_NOTE,
BROADCAST_NOTE_LIST,
BROADCAST_PARAGRAPH,
BROADCAST_PARAGRAPHS,
BROADCAST_NEW_PARAGRAPH,
UPDATE_NOTE_PERMISSIONS,
// CLUSTER_AUTH_EVENT_TOPIC
SET_ROLES,
SET_READERS_PERMISSIONS,
SET_RUNNERS_PERMISSIONS,
SET_WRITERS_PERMISSIONS,
SET_OWNERS_PERMISSIONS,
CLEAR_PERMISSION,
SET_NEW_NOTE_PERMISSIONS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.event;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.zeppelin.display.Input;

import java.util.HashMap;
import java.util.Map;

public class ClusterMessage {
public ClusterEvent clusterEvent;
private Map<String, String> data = new HashMap<>();

private static Gson gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
.setPrettyPrinting()
.registerTypeAdapterFactory(Input.TypeAdapterFactory).create();

public ClusterMessage(ClusterEvent event) {
this.clusterEvent = event;
}

public ClusterMessage put(String k, String v) {
data.put(k, v);
return this;
}

public String get(String k) {
return data.get(k);
}

public Map<String, String> getData() {
return data;
}

public static ClusterMessage deserializeMessage(String msg) {
return gson.fromJson(msg, ClusterMessage.class);
}

public static String serializeMessage(ClusterMessage m) {
return gson.toJson(m);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.function.Predicate;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
Expand Down Expand Up @@ -312,6 +313,11 @@ public String getServerAddress() {
return getString(ConfVars.ZEPPELIN_ADDR);
}

@VisibleForTesting
public void setServerPort(int port) {
properties.put(ConfVars.ZEPPELIN_PORT.getVarName(), String.valueOf(port));
}

public int getServerPort() {
return getInt(ConfVars.ZEPPELIN_PORT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage)
throws IOException {
super(zConf, recoveryStorage);
clusterServer.addClusterEventListeners(this);
clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, this);
}

@Override
Expand Down Expand Up @@ -104,7 +104,8 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep
mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS);
mapEvent.put(CLUSTER_EVENT_MSG, sContext);
String strEvent = gson.toJson(mapEvent);
clusterServer.unicastClusterEvent(srvHost, srvPort, strEvent);
clusterServer.unicastClusterEvent(
srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, strEvent);

HashMap<String, Object> intpMeta = clusterServer
.getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
Expand Down Expand Up @@ -145,9 +146,13 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep
}

@Override
public void onClusterEvent(String event) {
public void onClusterEvent(String msg) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(msg);
}

Gson gson = new Gson();
Map<String, Object> mapEvent = gson.fromJson(event,
Map<String, Object> mapEvent = gson.fromJson(msg,
new TypeToken<Map<String, Object>>(){}.getType());
String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
Expand All @@ -157,7 +162,7 @@ public void onClusterEvent(String event) {
onCreateIntpProcess(mapEvent);
break;
default:
LOGGER.error("Unknown Cluster Event : {}", clusterEvent);
LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,12 @@ private static void setupNotebookServer(

private static void setupClusterManagerServer(ServiceLocator serviceLocator) {
if (conf.isClusterMode()) {
ClusterManagerServer.getInstance().start();
ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance();
NotebookServer notebookServer = serviceLocator.getService(NotebookServer.class);
AuthorizationService authorizationService = serviceLocator.getService(AuthorizationService.class);
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer);
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService);
clusterManagerServer.start();
}
}

Expand Down
Loading

0 comments on commit 0845d35

Please sign in to comment.