Skip to content

Commit

Permalink
Merge pull request #147 from Kitware/jupyter
Browse files Browse the repository at this point in the history
Initial implementation for Jupyter backend
  • Loading branch information
jourdain committed Sep 27, 2023
2 parents bb133ce + db1bb39 commit 63b20dd
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 1 deletion.
4 changes: 4 additions & 0 deletions js/src/SmartConnect/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ function smartConnect(publicAPI, model) {
if (model.config.sessionURL) {
// We have a direct connection URL
session = wsConnect(publicAPI, model);
} else if (model.config.wsProxy) {
// Provide fake url if missing since we rely on a proxy
model.config.sessionURL = model.config.sessionURL || "wss://proxy/";
session = wsConnect(publicAPI, model);
} else {
// We need to use the Launcher
const launcher = ProcessLauncher.newInstance({
Expand Down
2 changes: 1 addition & 1 deletion js/src/WebsocketConnection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function WebsocketConnection(publicAPI, model) {
}
try {
if (model.wsProxy) {
model.connection = new WSLINK.WebSocket(transports[0].url);
model.connection = WSLINK.createWebSocket(transports[0].url);
} else {
model.connection = new WebSocket(transports[0].url);
}
Expand Down
10 changes: 10 additions & 0 deletions python/src/wslink/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ def create_webserver(server_config, backend="aiohttp"):

return create_webserver(server_config)

if backend == "jupyter":
from .jupyter import create_webserver

return create_webserver(server_config)

raise Exception(f"{backend} backend is not implemented")


Expand All @@ -33,4 +38,9 @@ def launcher_start(args, config, backend="aiohttp"):

return startWebServer(args, config)

if backend == "jupyter":
from .jupyter import startWebServer

return startWebServer(args, config)

raise Exception(f"{backend} backend is not implemented")
6 changes: 6 additions & 0 deletions python/src/wslink/backends/jupyter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .core import create_webserver, startWebServer

__ALL__ = [
"create_webserver",
"startWebServer",
]
139 changes: 139 additions & 0 deletions python/src/wslink/backends/jupyter/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import asyncio
from functools import partial
from wslink.backends.generic.core import GenericServer
from IPython.core.getipython import get_ipython


class EventEmitter:
def __init__(self):
self._listeners = {}

def clear(self):
self._listeners = {}

def emit(self, event, *args, **kwargs):
listeners = self._listeners.get(event)
if listeners is None:
return

loop = asyncio.get_running_loop()
coroutine_run = (
loop.create_task if (loop and loop.is_running()) else asyncio.run
)

for listener in listeners:
if asyncio.iscoroutinefunction(listener):
coroutine_run(listener(*args, **kwargs))
else:
listener(*args, **kwargs)

def add_event_listener(self, event, listener):
listeners = self._listeners.get(event)
if listeners is None:
listeners = set()
self._listeners[event] = listeners

listeners.add(listener)

def remove_event_listener(self, event, listener):
listeners = self._listeners.get(event)
if listeners is None:
return

if listener in listeners:
listeners.remove(listener)


class WsJupyterComm(EventEmitter):
def __init__(self, kernel=None):
super().__init__()
self.comm = None
self.kernel = get_ipython().kernel if kernel is None else kernel
self.kernel.comm_manager.register_target("wslink_comm", self.on_open)

def send(self, data, buffers):
if self.comm is not None:
self.comm.send(data=data, buffers=buffers)

def on_message(self, msg):
self.emit("message", msg["content"]["data"], msg["buffers"])

def on_close(self, msg):
self.comm = None

def on_open(self, comm, msg):
self.comm = comm
comm.on_msg(self.on_message)
comm.on_close(self.on_close)


JUPYTER_COMM = None


def get_jupyter_comm(kernel=None):
global JUPYTER_COMM
if JUPYTER_COMM is None:
JUPYTER_COMM = WsJupyterComm(kernel)

return JUPYTER_COMM


class GenericMessage:
def __init__(self, data):
self.data = data


class JupyterGenericServer(GenericServer):
def __init__(self, server_config):
super().__init__(server_config)

self.trame_comm = get_jupyter_comm()
self._endpoint = self[self.ws_endpoints[0]]
self._name = self._endpoint.serverProtocol.server.name
self._connections = {}
self.trame_comm.add_event_listener("message", self.on_msg_from_comm)

async def on_msg_from_server(self, client_id, binary, content):
buffers = []
data = {"server": self._name, "client": client_id}

if binary:
buffers.append(content)
else:
data["payload"] = content

self.trame_comm.send(data, buffers)

async def on_msg_from_comm(self, data, buffers):
server_name = data["server"]
client_id = data["client"]

if server_name != self._name:
return

connection = self._connections.get(client_id, None)

if connection is None:
connection = self._endpoint.connect()
connection.on_message(partial(self.on_msg_from_server, client_id))
self._connections[client_id] = connection

is_binary = len(buffers) > 0

message = None

if is_binary:
message = GenericMessage(buffers[0])
else:
message = GenericMessage(data["payload"])

await connection.send(is_binary, message)


def startWebServer(*args, **kwargs):
raise NotImplementedError("Generic backend does not provide a launcher")


def create_webserver(server_config):
jupyter_generic_server = JupyterGenericServer(server_config)
return jupyter_generic_server

0 comments on commit 63b20dd

Please sign in to comment.