Skip to content

Commit

Permalink
lib: merge stream code for http2 streams & net.Socket
Browse files Browse the repository at this point in the history
Squashed from:

- lib: separate writev responsibilities from writeGeneric
- lib: fix calling of cb twice
- lib: extract streamId out of stream_base to caller
- lib: add symbols instead of methods to hide impl details
- lib: remove unneeded lines
- lib: use Object.assign instead of apply
- lib: rename mixin StreamBase to StreamSharedMethods
- lib: use stream shared funcs as top level instead of
  properties of prototypes
- lib: mv lib/internal/stream_shared_methods.js
  lib/internal/stream_base_commons.js
- lib: add comment for readability
- lib: refactor _writev in Http2Stream
- lib: rephrase comment
- lib: revert usage of const,let for perf reasons

PR-URL: nodejs#19527
Refs: nodejs#19060
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Trivikram Kamat <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
aks- authored and addaleax committed Mar 27, 2018
1 parent c46e36b commit c2835e5
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 107 deletions.
61 changes: 14 additions & 47 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ const binding = process.binding('http2');
const { FileHandle } = process.binding('fs');
const { StreamPipe } = internalBinding('stream_pipe');
const assert = require('assert');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
const net = require('net');
const tls = require('tls');
const util = require('util');
const fs = require('fs');
const {
errnoException,
codes: {
ERR_HTTP2_ALTSVC_INVALID_ORIGIN,
ERR_HTTP2_ALTSVC_LENGTH,
Expand Down Expand Up @@ -107,8 +105,13 @@ const {
validateTimerDuration,
refreshFnSymbol
} = require('internal/timers');
const {
createWriteWrap,
writeGeneric,
writevGeneric
} = require('internal/stream_base_commons');

const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap');
const { ShutdownWrap } = process.binding('stream_wrap');
const { constants, nameForErrorCode } = binding;

const NETServer = net.Server;
Expand Down Expand Up @@ -1429,28 +1432,6 @@ class ClientHttp2Session extends Http2Session {
}
}

function createWriteReq(req, handle, data, encoding) {
switch (encoding) {
case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);
case 'ascii':
return handle.writeAsciiString(req, data);
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
case 'buffer':
return handle.writeBuffer(req, data);
default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
}
}

function trackWriteState(stream, bytes) {
const session = stream[kSession];
stream[kState].writeQueueSize += bytes;
Expand Down Expand Up @@ -1674,16 +1655,12 @@ class Http2Stream extends Duplex {
if (!this.headersSent)
this[kProceed]();

const handle = this[kHandle];
const req = new WriteWrap();
const req = createWriteWrap(this[kHandle], afterDoStreamWrite);
req.stream = this[kID];
req.handle = handle;
req.callback = cb;
req.oncomplete = afterDoStreamWrite;
req.async = false;
const err = createWriteReq(req, handle, data, encoding);
if (err)
return this.destroy(errnoException(err, 'write', req.error), cb);

writeGeneric(this, req, data, encoding, cb);

trackWriteState(this, req.bytes);
}

Expand Down Expand Up @@ -1711,22 +1688,12 @@ class Http2Stream extends Duplex {
if (!this.headersSent)
this[kProceed]();

const handle = this[kHandle];
const req = new WriteWrap();
var req = createWriteWrap(this[kHandle], afterDoStreamWrite);
req.stream = this[kID];
req.handle = handle;
req.callback = cb;
req.oncomplete = afterDoStreamWrite;
req.async = false;
const chunks = new Array(data.length << 1);
for (var i = 0; i < data.length; i++) {
const entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
const err = handle.writev(req, chunks);
if (err)
return this.destroy(errnoException(err, 'write', req.error), cb);

writevGeneric(this, req, data, cb);

trackWriteState(this, req.bytes);
}

Expand Down
79 changes: 79 additions & 0 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict';

const { Buffer } = require('buffer');
const errors = require('internal/errors');
const { WriteWrap } = process.binding('stream_wrap');

const errnoException = errors.errnoException;

function handleWriteReq(req, data, encoding) {
const { handle } = req;

switch (encoding) {
case 'buffer':
return handle.writeBuffer(req, data);
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);
case 'ascii':
return handle.writeAsciiString(req, data);
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
}
}

function createWriteWrap(handle, oncomplete) {
var req = new WriteWrap();

req.handle = handle;
req.oncomplete = oncomplete;
req.async = false;

return req;
}

function writevGeneric(self, req, data, cb) {
var allBuffers = data.allBuffers;
var chunks;
var i;
if (allBuffers) {
chunks = data;
for (i = 0; i < data.length; i++)
data[i] = data[i].chunk;
} else {
chunks = new Array(data.length << 1);
for (i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
}
var err = req.handle.writev(req, chunks, allBuffers);

// Retain chunks
if (err === 0) req._chunks = chunks;

if (err)
return self.destroy(errnoException(err, 'write', req.error), cb);
}

function writeGeneric(self, req, data, encoding, cb) {
var err = handleWriteReq(req, data, encoding);

if (err)
return self.destroy(errnoException(err, 'write', req.error), cb);
}

module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric
};
74 changes: 14 additions & 60 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ const { TCP, constants: TCPConstants } = process.binding('tcp_wrap');
const { Pipe, constants: PipeConstants } = process.binding('pipe_wrap');
const { TCPConnectWrap } = process.binding('tcp_wrap');
const { PipeConnectWrap } = process.binding('pipe_wrap');
const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap');
const { ShutdownWrap } = process.binding('stream_wrap');
const {
newAsyncId,
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol }
} = require('internal/async_hooks');
const {
createWriteWrap,
writevGeneric,
writeGeneric
} = require('internal/stream_base_commons');
const errors = require('internal/errors');
const {
ERR_INVALID_ADDRESS_FAMILY,
Expand Down Expand Up @@ -740,38 +745,15 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
return false;
}

var req = new WriteWrap();
req.handle = this._handle;
req.oncomplete = afterWrite;
req.async = false;
var err;

if (writev) {
var allBuffers = data.allBuffers;
var chunks;
var i;
if (allBuffers) {
chunks = data;
for (i = 0; i < data.length; i++)
data[i] = data[i].chunk;
} else {
chunks = new Array(data.length << 1);
for (i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
}
err = this._handle.writev(req, chunks, allBuffers);

// Retain chunks
if (err === 0) req._chunks = chunks;
} else {
err = createWriteReq(req, this._handle, data, encoding);
}
var ret;
var req = createWriteWrap(this._handle, afterWrite);
if (writev)
ret = writevGeneric(this, req, data, cb);
else
ret = writeGeneric(this, req, data, encoding, cb);

if (err)
return this.destroy(errnoException(err, 'write', req.error), cb);
// Bail out if handle.write* returned an error
if (ret) return ret;

this._bytesDispatched += req.bytes;

Expand All @@ -794,34 +776,6 @@ Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};

function createWriteReq(req, handle, data, encoding) {
switch (encoding) {
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);

case 'buffer':
return handle.writeBuffer(req, data);

case 'utf8':
case 'utf-8':
return handle.writeUtf8String(req, data);

case 'ascii':
return handle.writeAsciiString(req, data);

case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return handle.writeUcs2String(req, data);

default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
}
}


protoGetter('bytesWritten', function bytesWritten() {
var bytes = this._bytesDispatched;
const state = this._writableState;
Expand Down
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
'lib/internal/v8_prof_polyfill.js',
'lib/internal/v8_prof_processor.js',
'lib/internal/vm/Module.js',
'lib/internal/stream_base_commons.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/BufferList.js',
Expand Down

0 comments on commit c2835e5

Please sign in to comment.