Skip to content

Commit

Permalink
Revert "child_process: do not keep list of sent sockets"
Browse files Browse the repository at this point in the history
This reverts commit db5ee0b.
  • Loading branch information
indutny committed Jan 17, 2013
1 parent ae6f4b3 commit 44cd121
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 448 deletions.
8 changes: 1 addition & 7 deletions doc/api/child_process.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,10 @@ process may not actually kill it. `kill` really just sends a signal to a proces

See `kill(2)`

### child.send(message, [sendHandle], [options])
### child.send(message, [sendHandle])

* `message` {Object}
* `sendHandle` {Handle object}
* `options` {Object}

When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
Expand Down Expand Up @@ -167,11 +166,6 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.

The `options` object may have the following properties:

* `track` - Notify master process when `sendHandle` will be closed in child
process. (`false` by default)

**send server object**

Here is an example of sending a server:
Expand Down
11 changes: 1 addition & 10 deletions doc/api/net.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,10 @@ with `child_process.fork()`.

The number of concurrent connections on the server.

This becomes `null` when sending a socket to a child with
`child_process.fork()`. To poll forks and get current number of active
connections use asynchronous `server.getConnections` instead.
This becomes `null` when sending a socket to a child with `child_process.fork()`.

`net.Server` is an [EventEmitter][] with the following events:

### server.getConnections(callback)

Asynchronously get the number of concurrent connections on the server. Works
when sockets were sent to forks.

Callback should take two arguments `err` and `count`.

### Event: 'listening'

Emitted when the server has been bound after calling `server.listen`.
Expand Down
191 changes: 54 additions & 137 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,43 +107,36 @@ var handleConversion = {
},

'net.Socket': {
send: function(message, socket, options) {
// if the socket was created by net.Server
send: function(message, socket) {
// pause socket so no data is lost, will be resumed later

// if the socket wsa created by net.Server
if (socket.server) {
// the slave should keep track of the socket
message.key = socket.server._connectionKey;

var firstTime = !this._channel.sockets.send[message.key];

// add socket to connections list
var socketList = getSocketList('send', this, message.key);
socketList.add(socket);

if (options && options.track) {
// Keep track of socket's status
message.id = socketList.add(socket);
} else {
// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from
// the slaves
if (firstTime) socket.server._setupSlave(socketList);

// Act like socket is detached
socket.server._connections--;
// the server should no longer expose a .connection property
// and when asked to close it should query the socket status from slaves
if (firstTime) {
socket.server._setupSlave(socketList);
}
}

// remove handle from socket object, it will be closed when the socket
// will be sent
// has been send
var handle = socket._handle;
handle.onread = function() {};
socket._handle = null;

return handle;
},

postSend: function(handle) {
// Close the Socket handle after sending it
handle.close();
},

got: function(message, handle, emit) {
var socket = new net.Socket({handle: handle});
socket.readable = socket.writable = true;
Expand All @@ -153,10 +146,7 @@ var handleConversion = {

// add socket to connections list
var socketList = getSocketList('got', this, message.key);
socketList.add({
id: message.id,
socket: socket
});
socketList.add(socket);
}

emit(socket);
Expand All @@ -171,98 +161,39 @@ function SocketListSend(slave, key) {
var self = this;

this.key = key;
this.slave = slave;

// These two arrays are used to store the list of sockets and the freelist of
// indexes in this list. After insertion, item will have persistent index
// until it'll be removed. This way we can use this index as an identifier for
// sockets.
this.list = [];
this.freelist = [];
this.slave = slave;

slave.once('disconnect', function() {
self.flush();
});

this.slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
self.remove(msg.id);
self.flush();
});
}
util.inherits(SocketListSend, EventEmitter);

SocketListSend.prototype.add = function(socket) {
var index;

// Pick one of free indexes, or insert in the end of the list
if (this.freelist.length > 0) {
index = this.freelist.pop();
this.list[index] = socket;
} else {
index = this.list.push(socket) - 1;
}

return index;
};

SocketListSend.prototype.remove = function(index) {
var socket = this.list[index];
if (!socket) return;

// Create a hole in the list and move index to the freelist
this.list[index] = null;
this.freelist.push(index);

socket.destroy();
this.list.push(socket);
};

SocketListSend.prototype.flush = function() {
var list = this.list;
this.list = [];
this.freelist = [];

list.forEach(function(socket) {
if (socket) socket.destroy();
socket.destroy();
});
};

SocketListSend.prototype._request = function request(msg, cmd, callback) {
var self = this;

if (!this.slave.connected) return onslaveclose();
this.slave.send(msg);

function onclose() {
self.slave.removeListener('internalMessage', onreply);
callback(new Error('Slave closed before reply'));
};

function onreply(msg) {
if (msg.cmd !== cmd || msg.key !== self.key) return;
self.slave.removeListener('disconnect', onclose);
self.slave.removeListener('internalMessage', onreply);

callback(null, msg);
};

this.slave.once('disconnect', onclose);
this.slave.on('internalMessage', onreply);
};

SocketListSend.prototype.close = function close(callback) {
this._request({
cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
key: this.key
}, 'NODE_SOCKET_ALL_CLOSED', callback);
};
SocketListSend.prototype.update = function() {
if (this.slave.connected === false) return;

SocketListSend.prototype.getConnections = function getConnections(callback) {
this._request({
cmd: 'NODE_SOCKET_GET_COUNT',
this.slave.send({
cmd: 'NODE_SOCKET_FETCH',
key: this.key
}, 'NODE_SOCKET_COUNT', function(err, msg) {
if (err) return callback(err);
callback(null, msg.count);
});
};

Expand All @@ -272,59 +203,45 @@ function SocketListReceive(slave, key) {

var self = this;

this.connections = 0;
this.key = key;
this.list = [];
this.slave = slave;

function onempty() {
if (!self.slave.connected) return;

self.slave.send({
cmd: 'NODE_SOCKET_ALL_CLOSED',
key: self.key
});
}
slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;

this.slave.on('internalMessage', function(msg) {
if (msg.key !== self.key) return;

if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
// Already empty
if (self.connections === 0) return onempty();

// Wait for sockets to get closed
self.once('empty', onempty);
} else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
if (!self.slave.connected) return;
self.slave.send({
cmd: 'NODE_SOCKET_COUNT',
key: self.key,
count: self.connections
});
if (self.list.length === 0) {
self.flush();
return;
}

self.on('itemRemoved', function removeMe() {
if (self.list.length !== 0) return;
self.removeListener('itemRemoved', removeMe);
self.flush();
});
});
}
util.inherits(SocketListReceive, EventEmitter);

SocketListReceive.prototype.add = function(obj) {
var self = this;

this.connections++;
SocketListReceive.prototype.flush = function() {
this.list = [];

// Notify previous owner of socket about its state change
obj.socket.once('close', function() {
self.connections--;
if (this.slave.connected) {
this.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: this.key
});
}
};

if (obj.id !== undefined && self.slave.connected) {
// Master wants to keep eye on socket status
self.slave.send({
cmd: 'NODE_SOCKET_CLOSED',
key: self.key,
id: obj.id
});
}
SocketListReceive.prototype.add = function(socket) {
var self = this;
this.list.push(socket);

if (self.connections === 0) self.emit('empty');
socket.on('close', function() {
self.list.splice(self.list.indexOf(socket), 1);
self.emit('itemRemoved');
});
};

Expand Down Expand Up @@ -449,16 +366,17 @@ function setupChannel(target, channel) {
var string = JSON.stringify(message) + '\n';
var writeReq = channel.writeUtf8String(string, handle);

// Close the Socket handle after sending it
if (message && message.type === 'net.Socket') {
handle.close();
}

if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
this.emit('error', er);
}

if (obj && obj.postSend) {
writeReq.oncomplete = obj.postSend.bind(null, handle);
} else {
writeReq.oncomplete = nop;
}
writeReq.oncomplete = nop;

/* If the master is > 2 read() calls behind, please stop sending. */
return channel.writeQueueSize < (65536 * 2);
Expand Down Expand Up @@ -738,7 +656,6 @@ function ChildProcess() {

this._closesNeeded = 1;
this._closesGot = 0;
this.connected = false;

this.signalCode = null;
this.exitCode = null;
Expand Down
Loading

0 comments on commit 44cd121

Please sign in to comment.