Skip to content

Commit

Permalink
Merge pull request #65 from exposr/buffer-initial-writes
Browse files Browse the repository at this point in the history
fix: buffer initial writes before the underlying socket is open
  • Loading branch information
fredriklindberg authored Dec 8, 2023
2 parents d4f2289 + dc10ff4 commit d2212c9
Showing 1 changed file with 45 additions and 13 deletions.
58 changes: 45 additions & 13 deletions src/transport/ssh/ssh-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class SSHTransportSocket extends Duplex {
public bytesWritten?: number;
public bytesRead?: number;
public bufferSize: number;
private writeBuffer: Array<{ chunk: Buffer, encoding: BufferEncoding, callback: (error: Error | null | undefined) => void }>;
private writer: (chunk: Buffer, encoding: BufferEncoding, callback: (error: Error | null | undefined) => void) => void;

private readBuffer: Array<Buffer>;
private readBufferSize: number;
Expand Down Expand Up @@ -224,6 +226,8 @@ class SSHTransportSocket extends Duplex {
this.bytesWritten = 0;
this.readBuffer = [];
this.readBufferSize = 0;
this.writeBuffer = [];
this.writer = this.bufferedWriter;

this._client = opts.client;

Expand Down Expand Up @@ -257,14 +261,15 @@ class SSHTransportSocket extends Duplex {

this.readyState = "opening";
this.connecting = true;
this.cork();

const connectionCallback = typeof host == 'function' ?
(host as () => void) :
(typeof connectionListener == 'function' ? (connectionListener as () => void) : undefined);
typeof connectionCallback == 'function' && this.once('connect', connectionCallback);

const connectPort = Hostname.getPort(target);
this._client?.forwardOut(target.hostname, connectPort, options.remoteAddr, connectPort, (err, stream) => {
this._client?.forwardOut(target.hostname, connectPort, options.remoteAddr, connectPort, async (err, stream) => {
if (err) {
this.destroy(err);
return;
Expand All @@ -273,7 +278,7 @@ class SSHTransportSocket extends Duplex {
let socket: Duplex = stream;

const isTLS = Hostname.isTLS(target);
let tlsSock: tls.TLSSocket | undefined;
let tlsSock: tls.TLSSocket;
if (isTLS) {
const tlsOpts: tls.ConnectionOptions = {
servername: target.hostname,
Expand All @@ -283,7 +288,20 @@ class SSHTransportSocket extends Duplex {
tlsOpts['checkServerIdentity'] = () => undefined;
tlsOpts['rejectUnauthorized'] = false;
}
tlsSock = tls.connect(tlsOpts);

try {
tlsSock = await new Promise((resolve, reject) => {
const tlsSock = tls.connect(tlsOpts, () => {
resolve(tlsSock);
});
tlsSock.once('error', (err: Error) => {
reject(err);
});
});
} catch (e: any) {
this.destroy(e);
return;
}

socket.once('error', (err: Error) => {
this.destroy(err)
Expand Down Expand Up @@ -322,6 +340,9 @@ class SSHTransportSocket extends Duplex {
this.pending = false;
this.readyState = "open";

this.flushWriteBuffer();
this.writer = this.socket.write.bind(this.socket);

typeof this.constructCallback == 'function' && this.constructCallback();
this.emit('connect');
this.emit('ready');
Expand Down Expand Up @@ -371,23 +392,34 @@ class SSHTransportSocket extends Duplex {
typeof callback === 'function' && callback(error);
}

private bufferedWriter(chunk: Buffer, encoding: BufferEncoding , callback: (error: Error | null | undefined) => void): void {
const data = { chunk, encoding };
this.writeBuffer.push({chunk, encoding, callback});
}

private flushWriteBuffer(): void {
while (true) {
const buffer = this.writeBuffer.shift();
if (!buffer) {
break;
}
this.socket.write(buffer.chunk, buffer.encoding, buffer.callback);
}
}

_write(data: Buffer, encoding: BufferEncoding, callback: (error: Error | null | undefined) => void): void {
assert(this._destroyed == false, "_write on destroyed");
this.socket.write(data, encoding, callback);
this.writer(data, encoding, callback);
this.resetTimeout();
}

_writev(chunks: Array<{ chunk: any; encoding: BufferEncoding; }>, callback: (error: Error | null | undefined) => void): void {
if (this.socket._writev) {
return this.socket._writev(chunks, callback);
} else {
for (let i = 0; i < (chunks.length - 1); i++) {
const {chunk, encoding} = chunks[i];
this.socket.write(chunk, encoding);
}
const {chunk, encoding} = chunks[chunks.length - 1];
this.socket.write(chunk, encoding, callback);
for (let i = 0; i < (chunks.length - 1); i++) {
const {chunk, encoding} = chunks[i];
this.writer(chunk, encoding, () => undefined);
}
const {chunk, encoding} = chunks[chunks.length - 1];
this.writer(chunk, encoding, callback);
this.resetTimeout();
}

Expand Down

0 comments on commit d2212c9

Please sign in to comment.