Skip to content

Commit

Permalink
[ZEPPELIN-1610] - Add notebook watcher
Browse files Browse the repository at this point in the history
### What is this PR for?
Add a Simple way to switch a websocket connection to a new state; watcher.
A websocket watcher is a special connection that will watch most of the web socket even in Zeppelin, this cam be used to monitor zeppelin server activity.

### What type of PR is it?
[Feature]

### Todos
* [x] - Add watcher Queue
* [x] - Add endpoint to switch from regular client to watcher
* [x] - Add a way to generate a uniq key when zeppelin server restart
* [x] - Add example on how to use watcher.

### What is the Jira issue?
* [ZEPPELIN-1610](https://issues.apache.org/jira/browse/ZEPPELIN-1610)

### How should this be tested?
You will have to create your own websocket client and provide a valid http header (`X-Watcher-Key`) when you connect to zeppelin ws
 something like
```
private Session openWatcherSession() {
    ClientUpgradeRequest request = new ClientUpgradeRequest();
    request.setHeader(WatcherSecurityKey.HTTP_HEADER, WatcherSecurityKey.getKey());
    WatcherWebsocket socket = WatcherWebsocket.createInstace();
    Future<Session> future = null;
    Session session = null;
    try {
      future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
      session = future.get();
    } catch (IOException | InterruptedException | ExecutionException e) {
      LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
      return session;
    }
    return session;
  }
```

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes

Author: Anthony Corbacho <[email protected]>

Closes apache#1588 from anthonycorbacho/feat/updateWebsocketInZeppelinHubRepo and squashes the following commits:

26cf53f [Anthony Corbacho] Move broadcastToWatcher
8413e9b [Anthony Corbacho] Remove redundant broadcast
fb2c260 [Anthony Corbacho] Remove TODO in socket/Message
716c92c [Anthony Corbacho] Fix checkstyle
a10ba13 [Anthony Corbacho] Add remove connection from note back in test
89d70f2 [Anthony Corbacho] fix test
092791e [Anthony Corbacho] Light refactoring :: add missing header, add comment, refacto some methods
8f7e1b3 [Anthony Corbacho] Add X-Watcher-Key in request header for watcher client
e2d3053 [Anthony Corbacho] Add simple check for ws before switching ws client to watcher, client should provide a header X-Watcher-Key with a valid key (generated at runtime), if key invalid client wont be accepted
e25ea1e [Anthony Corbacho] Add simple Key generation for Watcher ws client
4affe25 [Anthony Corbacho] Handle remoing wssession from notebook map once the session is close :: avoiding socket connection to be ide
c32192a [Anthony Corbacho] rework watcher creation and ws session with notes
3bd3482 [Anthony Corbacho] Reorder import :: Google check style
bde5db4 [Anthony Corbacho] Update ping routine
ede1f18 [Anthony Corbacho] make private field public for accessibility
aa55a5a [Anthony Corbacho] Strting to rework ZeppelinClient
e5b3a1d [Anthony Corbacho] Add zeppelinhub notebook watcher
9d6c93f [Anthony Corbacho] Add new OP watcher
0d7f493 [Anthony Corbacho] Added new WS queue called watcher, watcher will be abler to listen to almost every note action performed in zeppelin notebook websocket server
45849ce [Anthony Corbacho] Add new message type :: Watcher message, this class will wrapp zeppelin ws message and add extra information such as noteId and user
  • Loading branch information
anthonycorbacho authored and Leemoonsoo committed Nov 16, 2016
1 parent e093057 commit 5e85e6e
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,24 @@
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.NotebookEventListener;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.ParagraphJobListener;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.types.InterpreterSettingsList;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.util.WatcherSecurityKey;
import org.apache.zeppelin.utils.InterpreterBindingUtils;
import org.apache.zeppelin.utils.SecurityUtils;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
Expand All @@ -67,6 +75,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
Expand Down Expand Up @@ -97,6 +106,14 @@ String getKey() {
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();

/**
* This is a special endpoint in the notebook websoket, Every connection in this Queue
* will be able to watch every websocket event, it doesnt need to be listed into the map of
* noteSocketMap. This can be used to get information about websocket traffic and watch what
* is going on.
*/
final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();

private Notebook notebook() {
return ZeppelinServer.notebook;
}
Expand Down Expand Up @@ -275,6 +292,9 @@ public void onMessage(NotebookSocket conn, String msg) {
case GET_INTERPRETER_SETTINGS:
getInterpreterSettings(conn, subject);
break;
case WATCHER:
switchConnectionToWatcher(conn, messagereceived);
break;
default:
break;
}
Expand Down Expand Up @@ -389,6 +409,7 @@ private void broadcastToNoteBindedInterpreter(String interpreterGroupId,

private void broadcast(String noteId, Message m) {
synchronized (noteSocketMap) {
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
Expand All @@ -406,6 +427,7 @@ private void broadcast(String noteId, Message m) {

private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
synchronized (noteSocketMap) {
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
Expand All @@ -431,11 +453,7 @@ private void multicastToUser(String user, Message m) {
}

for (NotebookSocket conn: userConnectedSockets.get(user)) {
try {
conn.send(serializeMessage(m));
} catch (IOException e) {
LOG.error("socket error", e);
}
unicast(m, conn);
}
}

Expand All @@ -445,6 +463,7 @@ private void unicast(Message m, NotebookSocket conn) {
} catch (IOException e) {
LOG.error("socket error", e);
}
broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
}

public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
Expand Down Expand Up @@ -545,10 +564,8 @@ public void broadcastNote(Note note) {
broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
}

public void broadcastInterpreterBindings(String noteId,
List settingList) {
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS)
.put("interpreterBindings", settingList));
public void broadcastInterpreterBindings(String noteId, List settingList) {
broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList));
}

public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
Expand Down Expand Up @@ -1770,6 +1787,50 @@ public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos)
.get(settingId);
interpreterSetting.setInfos(metaInfos);
}


private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
throws IOException {
if (!isSessionAllowedToSwitchToWatcher(conn)) {
LOG.error("Cannot switch this client to watcher, invalid security key");
return;
}
LOG.info("Going to add {} to watcher socket", conn);
// add the connection to the watcher.
if (watcherSockets.contains(conn)) {
LOG.info("connection alrerady present in the watcher");
return;
}
watcherSockets.add(conn);

// remove this connection from regular zeppelin ws usage.
removeConnectionFromAllNote(conn);
connectedSockets.remove(conn);
removeUserConnection(conn.getUser(), conn);
}

private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
return !(StringUtils.isBlank(watcherSecurityKey)
|| !watcherSecurityKey.equals(WatcherSecurityKey.getKey()));
}

private void broadcastToWatchers(String noteId, String subject, Message message) {
synchronized (watcherSockets) {
if (watcherSockets.isEmpty()) {
return;
}
for (NotebookSocket watcher : watcherSockets) {
try {
watcher.send(WatcherMessage
.builder(noteId)
.subject(subject)
.message(serializeMessage(message))
.build()
.serialize());
} catch (IOException e) {
LOG.error("Cannot broadcast message to watcher", e);
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
Expand All @@ -57,6 +52,11 @@
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;

/**
* Binded interpreters for a note
Expand Down
Loading

0 comments on commit 5e85e6e

Please sign in to comment.