Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Livy:337 Binding RPCServer to user provided port and not random port #334

Merged
merged 25 commits into from
Jun 8, 2017
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bc8f2ac
Code changes in RPCserver for user provided port
pralabhkumar May 20, 2017
968f584
Indentation Changes
pralabhkumar May 20, 2017
8e3c0ef
Indentation Changes
pralabhkumar May 20, 2017
a8c53b8
Indentation Changes
pralabhkumar May 20, 2017
94d23e5
Indentation Changes
pralabhkumar May 20, 2017
994ac16
Configuring Port Range
pralabhkumar May 21, 2017
238d238
Documentation Changed
pralabhkumar May 21, 2017
951739c
launcher.port.range will take care of launching RPC
pralabhkumar May 29, 2017
dbaf50a
Checkstyle changes
pralabhkumar May 29, 2017
4a7e219
Checkstyle changes
pralabhkumar May 29, 2017
5ac6512
Dummy push
pralabhkumar Jun 1, 2017
02c51b3
Code changes
pralabhkumar Jun 1, 2017
a2938d5
Changed BindException Handling to SocketException Handling
pralabhkumar Jun 1, 2017
e6524d4
Changed Import Order
pralabhkumar Jun 1, 2017
a6ea902
Code changes to increase port range
pralabhkumar Jun 2, 2017
789ae88
Set Port isConntect to true
pralabhkumar Jun 2, 2017
edbf6c9
Indentation Changes & port range in livy-client.conf.template
pralabhkumar Jun 5, 2017
2c4189c
Indentation changes
pralabhkumar Jun 5, 2017
2b91398
Changed visibilty of method private
pralabhkumar Jun 5, 2017
a8e29d1
Indentation Changes
pralabhkumar Jun 5, 2017
e59e9a7
Indenetation Changes
pralabhkumar Jun 5, 2017
59683d3
Unit test case to test port range
pralabhkumar Jun 7, 2017
57b6c51
Checkstyle changes
pralabhkumar Jun 7, 2017
a22222e
Unit test case for port range
pralabhkumar Jun 7, 2017
3f58233
Added comment for Port Range Configuration and increase port range fo…
pralabhkumar Jun 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ public static enum Entry implements ConfEntry {

// Address for the RSC driver to connect back with it's connection info.
LAUNCHER_ADDRESS("launcher.address", null),
LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"),
// Setting up of this propety by user has no benefit. It is currently being used
// to pass port information from ContextLauncher to RSCDriver
LAUNCHER_PORT("launcher.port", -1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove LAUNCHER_PORT, seems it is not used any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjffdu
As mentioned in the Comment for launcher port . This property is still being used to pass port information from contextlauncher to RSCDriver.
ContextLauncher.java ,set this property
conf.set(LAUNCHER_PORT, factory.getServer().getPort());

And RSCDriver takes that information
int launcherPort = livyConf.getInt(LAUNCHER_PORT);

So this property still needed.
But user setting it up will have no benefit. So we should not mention this property in livy-client.conf.template.

Please let me if its ok .

Other comments are indentation related , I'll fix them and push .


// How long will the RSC wait for a connection for a Livy server before shutting itself down.
SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),

Expand Down
81 changes: 66 additions & 15 deletions rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -61,18 +63,74 @@ public class RpcServer implements Closeable {
private static final SecureRandom RND = new SecureRandom();

private final String address;
private final Channel channel;
private Channel channel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 spaces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

private final EventLoopGroup group;
private final int port;
private final ConcurrentMap<String, ClientInfo> pendingClients;
private final RSCConf config;

private final String portRange;
private static enum PortRangeSchema{START_PORT, END_PORT, max};
private final String PORT_DELIMITER="~";
/**
* Creating RPC Server
* @param lconf
* @throws IOException
* @throws InterruptedException
*/
public RpcServer(RSCConf lconf) throws IOException, InterruptedException {
this.config = lconf;
this.portRange=config.get(LAUNCHER_PORT_RANGE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

miss space around =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

this.group = new NioEventLoopGroup(
this.config.getInt(RPC_MAX_THREADS),
Utils.newDaemonThreadFactory("RPC-Handler-%d"));
this.channel = new ServerBootstrap()
this.config.getInt(RPC_MAX_THREADS),
Copy link
Contributor

@zjffdu zjffdu Jun 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style issue, should have indention here. Seems travis still didn't detect it. @jerryshao Do you have any clues ?

Utils.newDaemonThreadFactory("RPC-Handler-%d"));
int [] portData=getPortNumberAndRange();
int startingPortNumber=portData[PortRangeSchema.START_PORT.ordinal()];
int endPort=portData[PortRangeSchema.END_PORT.ordinal()];
for(int tries = startingPortNumber-1 ; tries <= endPort ; tries++){
try {
startingPortNumber++;
this.channel = getChannel(startingPortNumber);
break;
}catch(BindException e){
LOG.warn("RPC not able to connect port "+ startingPortNumber);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen when there is no available port in this range

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have Changed the code , to handle the case if no available port . It will throws exception and fails gracefully.

Please let me know , if that ok .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjffdu Working on this .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjffdu done. Graceful failure if not port available (as would have happened if the random port is not available)

this.port = ((InetSocketAddress) channel.localAddress()).getPort();
this.pendingClients = new ConcurrentHashMap<>();
LOG.warn("Connected to the port " + this.port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to info level

String address = config.get(RPC_SERVER_ADDRESS);
if (address == null) {
address = config.findLocalAddress();
}
this.address = address;
}

/**
* Get Port Numbers
*/
public int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException, NumberFormatException{
String[] split = this.portRange.split(PORT_DELIMITER);
int [] portRange=new int [PortRangeSchema.max.ordinal()];
try {
portRange[PortRangeSchema.START_PORT.ordinal()] =
Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]);
portRange[PortRangeSchema.END_PORT.ordinal()] =
Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]);
}catch(ArrayIndexOutOfBoundsException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style issue

LOG.error("Port Range format is not correct " + this.portRange);
throw e;
}
catch(NumberFormatException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style issue

LOG.error("Port are not in numeric format " + this.portRange);
throw e;
}
return portRange;
}
/**
* @throws InterruptedException
**/
public Channel getChannel(int portNumber) throws BindException, InterruptedException{
Channel channel = new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
Expand All @@ -97,19 +155,11 @@ public void run() {
.option(ChannelOption.SO_BACKLOG, 1)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(0)
.bind(portNumber)
.sync()
.channel();
this.port = ((InetSocketAddress) channel.localAddress()).getPort();
this.pendingClients = new ConcurrentHashMap<>();

String address = config.get(RPC_SERVER_ADDRESS);
if (address == null) {
address = config.findLocalAddress();
}
this.address = address;
return channel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indention

}

/**
* Tells the RPC server to expect connections from clients.
*
Expand Down Expand Up @@ -310,3 +360,4 @@ private ClientInfo(String id, String secret, ClientCallback callback) {
}

}