Skip to content

Commit

Permalink
STORM-266: Adding shell process pid and name in the log message and p…
Browse files Browse the repository at this point in the history
…id,name,exitcode and stderr in case te shell process encounters error.
  • Loading branch information
Parth-Brahmbhatt committed Jun 17, 2014
1 parent ecac64f commit 0c43a37
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 10 deletions.
7 changes: 4 additions & 3 deletions storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void querySubprocess() {
return;
} else if (command.equals("log")) {
String msg = shellMsg.getMsg();
LOG.info("Shell msg: " + msg);
LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
} else if (command.equals("emit")) {
String stream = shellMsg.getStream();
Long task = shellMsg.getTask();
Expand All @@ -115,8 +115,9 @@ private void querySubprocess() {
throw new RuntimeException("Unknown command received: " + command);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException(processInfo, e);
}
}

Expand Down
9 changes: 6 additions & 3 deletions storm-core/src/jvm/backtype/storm/task/ShellBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void run() {
handleError(shellMsg.getMsg());
} else if (command.equals("log")) {
String msg = shellMsg.getMsg();
LOG.info("Shell msg: " + msg);
LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
} else if (command.equals("emit")) {
handleEmit(shellMsg);
}
Expand Down Expand Up @@ -170,7 +170,8 @@ public void execute(Tuple input) {

_pendingWrites.put(boltMsg);
} catch(InterruptedException e) {
throw new RuntimeException("Error during multilang processing", e);
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException("Error during multilang processing " + processInfo, e);
}
}

Expand Down Expand Up @@ -225,6 +226,8 @@ private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
}

private void die(Throwable exception) {
_exception = exception;
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
_exception = new RuntimeException(processInfo, exception);
}

}
46 changes: 42 additions & 4 deletions storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class ShellProcess implements Serializable {
private InputStream processErrorStream;
private String[] command;
public ISerializer serializer;
public Number pid;
public String componentName;

public ShellProcess(String[] command) {
this.command = command;
Expand All @@ -52,22 +54,22 @@ public Number launch(Map conf, TopologyContext context) {

ShellLogger = Logger.getLogger(context.getThisComponentId());

this.componentName = context.getThisComponentId();
this.serializer = getSerializer(conf);

Number pid;
try {
_subprocess = builder.start();
processErrorStream = _subprocess.getErrorStream();
serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
pid = serializer.connect(conf, context);
this.pid = serializer.connect(conf, context);
} catch (IOException e) {
throw new RuntimeException(
"Error when launching multilang subprocess\n"
+ getErrorsString(), e);
} catch (NoOutputException e) {
throw new RuntimeException(e + getErrorsString() + "\n");
}
return pid;
return this.pid;
}

private ISerializer getSerializer(Map conf) {
Expand Down Expand Up @@ -141,4 +143,40 @@ public String getErrorsString() {
return "";
}
}
}

/**
*
* @return pid, if the process has been launched, null otherwise.
*/
public Number getPid() {
return this.pid;
}

/**
*
* @return the name of component.
*/
public String getComponentName() {
return this.componentName;
}

/**
*
* @return exit code of the process if process is terminated, -1 if process is not started or terminated.
*/
public int getExitCode() {
try {
return this._subprocess != null ? this._subprocess.exitValue() : -1;
} catch(IllegalThreadStateException e) {
return -1;
}
}

public String getProcessInfoString() {
return String.format(" pid:%s, name:%s ", pid, componentName);
}

public String getProcessTerminationInfoString() {
return String.format(" exitCode:%s, errorString:%s ", getExitCode(), getErrorsString());
}
}

0 comments on commit 0c43a37

Please sign in to comment.