Skip to content

Commit

Permalink
Merge branch 'STORM-266' of https://github.com/Parth-Brahmbhatt/incub…
Browse files Browse the repository at this point in the history
…ator-storm into STORM-266

STORM-266: Adding shell process pid and name in the log message
  • Loading branch information
revans2 committed Jun 30, 2014
2 parents 5f2d05a + 0c43a37 commit afbcfcf
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 10 deletions.
7 changes: 4 additions & 3 deletions storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void querySubprocess() {
return;
} else if (command.equals("log")) {
String msg = shellMsg.getMsg();
LOG.info("Shell msg: " + msg);
LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
} else if (command.equals("emit")) {
String stream = shellMsg.getStream();
Long task = shellMsg.getTask();
Expand All @@ -115,8 +115,9 @@ private void querySubprocess() {
throw new RuntimeException("Unknown command received: " + command);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException(processInfo, e);
}
}

Expand Down
9 changes: 6 additions & 3 deletions storm-core/src/jvm/backtype/storm/task/ShellBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void run() {
handleError(shellMsg.getMsg());
} else if (command.equals("log")) {
String msg = shellMsg.getMsg();
LOG.info("Shell msg: " + msg);
LOG.info("Shell msg: " + msg + _process.getProcessInfoString());
} else if (command.equals("emit")) {
handleEmit(shellMsg);
}
Expand Down Expand Up @@ -170,7 +170,8 @@ public void execute(Tuple input) {

_pendingWrites.put(boltMsg);
} catch(InterruptedException e) {
throw new RuntimeException("Error during multilang processing", e);
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException("Error during multilang processing " + processInfo, e);
}
}

Expand Down Expand Up @@ -225,6 +226,8 @@ private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
}

private void die(Throwable exception) {
_exception = exception;
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
_exception = new RuntimeException(processInfo, exception);
}

}
46 changes: 42 additions & 4 deletions storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class ShellProcess implements Serializable {
private InputStream processErrorStream;
private String[] command;
public ISerializer serializer;
public Number pid;
public String componentName;

public ShellProcess(String[] command) {
this.command = command;
Expand All @@ -52,22 +54,22 @@ public Number launch(Map conf, TopologyContext context) {

ShellLogger = Logger.getLogger(context.getThisComponentId());

this.componentName = context.getThisComponentId();
this.serializer = getSerializer(conf);

Number pid;
try {
_subprocess = builder.start();
processErrorStream = _subprocess.getErrorStream();
serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
pid = serializer.connect(conf, context);
this.pid = serializer.connect(conf, context);
} catch (IOException e) {
throw new RuntimeException(
"Error when launching multilang subprocess\n"
+ getErrorsString(), e);
} catch (NoOutputException e) {
throw new RuntimeException(e + getErrorsString() + "\n");
}
return pid;
return this.pid;
}

private ISerializer getSerializer(Map conf) {
Expand Down Expand Up @@ -141,4 +143,40 @@ public String getErrorsString() {
return "";
}
}
}

/**
*
* @return pid, if the process has been launched, null otherwise.
*/
public Number getPid() {
return this.pid;
}

/**
*
* @return the name of component.
*/
public String getComponentName() {
return this.componentName;
}

/**
*
* @return exit code of the process if process is terminated, -1 if process is not started or terminated.
*/
public int getExitCode() {
try {
return this._subprocess != null ? this._subprocess.exitValue() : -1;
} catch(IllegalThreadStateException e) {
return -1;
}
}

public String getProcessInfoString() {
return String.format(" pid:%s, name:%s ", pid, componentName);
}

public String getProcessTerminationInfoString() {
return String.format(" exitCode:%s, errorString:%s ", getExitCode(), getErrorsString());
}
}

0 comments on commit afbcfcf

Please sign in to comment.