Skip to content

Commit

Permalink
stream_base: various improvements
Browse files Browse the repository at this point in the history
Expose and use in TLSWrap an `v8::External` wrap of the
`StreamBase*` pointer instead of guessing the ancestor C++ class in
`node_wrap.h`.

Make use of `StreamBase::Callback` structure for storing/passing both
callback and context in a single object.

Introduce `GetObject()` for future user-land usage, when a child class
is not going to be inherited from AsyncWrap.

PR-URL: nodejs#2351
Reviewed-By: Trevor Norris <[email protected]>
  • Loading branch information
indutny committed Aug 20, 2015
1 parent 0d39d35 commit 291b310
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 65 deletions.
4 changes: 3 additions & 1 deletion lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ TLSSocket.prototype._wrapHandle = function(wrap) {
var context = options.secureContext ||
options.credentials ||
tls.createSecureContext();
res = tls_wrap.wrap(handle, context.context, options.isServer);
res = tls_wrap.wrap(handle._externalStream,
context.context,
options.isServer);
res._parent = handle;
res._parentWrap = wrap;
res._secureContext = context;
Expand Down
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ namespace node {
V(exponent_string, "exponent") \
V(exports_string, "exports") \
V(ext_key_usage_string, "ext_key_usage") \
V(external_stream_string, "_externalStream") \
V(family_string, "family") \
V(fatal_exception_string, "_fatalException") \
V(fd_string, "fd") \
Expand Down
15 changes: 0 additions & 15 deletions src/node_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,6 @@ namespace node {
} \
} while (0)

#define WITH_GENERIC_STREAM(env, obj, BODY) \
do { \
WITH_GENERIC_UV_STREAM(env, obj, BODY, { \
if (env->tls_wrap_constructor_template().IsEmpty() == false && \
env->tls_wrap_constructor_template()->HasInstance(obj)) { \
TLSWrap* const wrap = Unwrap<TLSWrap>(obj); \
BODY \
} else if (env->jsstream_constructor_template().IsEmpty() == false && \
env->jsstream_constructor_template()->HasInstance(obj)) { \
JSStream* const wrap = Unwrap<JSStream>(obj); \
BODY \
} \
}); \
} while (0)

inline uv_stream_t* HandleToStream(Environment* env,
v8::Local<v8::Object> obj) {
v8::HandleScope scope(env->isolate());
Expand Down
18 changes: 18 additions & 0 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

namespace node {

using v8::External;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Handle;
Expand All @@ -36,6 +37,13 @@ void StreamBase::AddMethods(Environment* env,
v8::DEFAULT,
attributes);

t->InstanceTemplate()->SetAccessor(env->external_stream_string(),
GetExternal<Base>,
nullptr,
env->as_external(),
v8::DEFAULT,
attributes);

env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
if ((flags & kFlagNoShutdown) == 0)
Expand Down Expand Up @@ -72,6 +80,16 @@ void StreamBase::GetFD(Local<String> key,
}


template <class Base>
void StreamBase::GetExternal(Local<String> key,
const PropertyCallbackInfo<Value>& args) {
StreamBase* wrap = Unwrap<Base>(args.Holder());

Local<External> ext = External::New(args.GetIsolate(), wrap);
args.GetReturnValue().Set(ext);
}


template <class Base,
int (StreamBase::*Method)(const FunctionCallbackInfo<Value>& args)>
void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
Expand Down
27 changes: 22 additions & 5 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {

// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false);

HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = {
Integer::New(env->isolate(), status),
wrap->GetAsyncWrap()->object(),
wrap->GetObject(),
req_wrap_obj
};

Expand Down Expand Up @@ -370,7 +369,6 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {

// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false);

// Unref handle property
Local<Object> req_wrap_obj = req_wrap->object();
Expand All @@ -379,7 +377,7 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {

Local<Value> argv[] = {
Integer::New(env->isolate(), status),
wrap->GetAsyncWrap()->object(),
wrap->GetObject(),
req_wrap_obj,
Undefined(env->isolate())
};
Expand Down Expand Up @@ -414,7 +412,16 @@ void StreamBase::EmitData(ssize_t nread,
if (argv[2].IsEmpty())
argv[2] = Undefined(env->isolate());

GetAsyncWrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
AsyncWrap* async = GetAsyncWrap();
if (async == nullptr) {
node::MakeCallback(env,
GetObject(),
env->onread_string(),
ARRAY_SIZE(argv),
argv);
} else {
async->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
}
}


Expand All @@ -428,6 +435,16 @@ int StreamBase::GetFD() {
}


AsyncWrap* StreamBase::GetAsyncWrap() {
return nullptr;
}


Local<Object> StreamBase::GetObject() {
return GetAsyncWrap()->object();
}


int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
// No TryWrite by default
return 0;
Expand Down
72 changes: 42 additions & 30 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,31 @@ class WriteWrap: public ReqWrap<uv_write_t>,

class StreamResource {
public:
template <class T>
struct Callback {
Callback() : fn(nullptr), ctx(nullptr) {}
Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
Callback(const Callback&) = default;

inline bool is_empty() { return fn == nullptr; }
inline void clear() {
fn = nullptr;
ctx = nullptr;
}

T fn;
void* ctx;
};

typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
typedef void (*ReadCb)(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx);

StreamResource() : after_write_cb_(nullptr),
alloc_cb_(nullptr),
read_cb_(nullptr) {
StreamResource() {
}

virtual ~StreamResource() = default;

virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
Expand All @@ -131,44 +144,37 @@ class StreamResource {

// Events
inline void OnAfterWrite(WriteWrap* w) {
if (after_write_cb_ != nullptr)
after_write_cb_(w, after_write_ctx_);
if (!after_write_cb_.is_empty())
after_write_cb_.fn(w, after_write_cb_.ctx);
}

inline void OnAlloc(size_t size, uv_buf_t* buf) {
if (alloc_cb_ != nullptr)
alloc_cb_(size, buf, alloc_ctx_);
if (!alloc_cb_.is_empty())
alloc_cb_.fn(size, buf, alloc_cb_.ctx);
}

inline void OnRead(size_t nread,
const uv_buf_t* buf,
uv_handle_type pending = UV_UNKNOWN_HANDLE) {
if (read_cb_ != nullptr)
read_cb_(nread, buf, pending, read_ctx_);
if (!read_cb_.is_empty())
read_cb_.fn(nread, buf, pending, read_cb_.ctx);
}

inline void set_after_write_cb(AfterWriteCb cb, void* ctx) {
after_write_ctx_ = ctx;
after_write_cb_ = cb;
inline void set_after_write_cb(Callback<AfterWriteCb> c) {
after_write_cb_ = c;
}

inline void set_alloc_cb(AllocCb cb, void* ctx) {
alloc_cb_ = cb;
alloc_ctx_ = ctx;
}
inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }

inline void set_read_cb(ReadCb cb, void* ctx) {
read_cb_ = cb;
read_ctx_ = ctx;
}
inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
inline Callback<ReadCb> read_cb() { return read_cb_; }

private:
AfterWriteCb after_write_cb_;
void* after_write_ctx_;
AllocCb alloc_cb_;
void* alloc_ctx_;
ReadCb read_cb_;
void* read_ctx_;
Callback<AfterWriteCb> after_write_cb_;
Callback<AllocCb> alloc_cb_;
Callback<ReadCb> read_cb_;
};

class StreamBase : public StreamResource {
Expand Down Expand Up @@ -211,7 +217,9 @@ class StreamBase : public StreamResource {

virtual ~StreamBase() = default;

virtual AsyncWrap* GetAsyncWrap() = 0;
// One of these must be implemented
virtual AsyncWrap* GetAsyncWrap();
virtual v8::Local<v8::Object> GetObject();

// Libuv callbacks
static void AfterShutdown(ShutdownWrap* req, int status);
Expand All @@ -227,8 +235,12 @@ class StreamBase : public StreamResource {
int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base>
static void GetFD(v8::Local<v8::String>,
const v8::PropertyCallbackInfo<v8::Value>&);
static void GetFD(v8::Local<v8::String> key,
const v8::PropertyCallbackInfo<v8::Value>& args);

template <class Base>
static void GetExternal(v8::Local<v8::String> key,
const v8::PropertyCallbackInfo<v8::Value>& args);

template <class Base,
int (StreamBase::*Method)( // NOLINT(whitespace/parens)
Expand Down
6 changes: 3 additions & 3 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ StreamWrap::StreamWrap(Environment* env,
parent),
StreamBase(env),
stream_(stream) {
set_after_write_cb(OnAfterWriteImpl, this);
set_alloc_cb(OnAllocImpl, this);
set_read_cb(OnReadImpl, this);
set_after_write_cb({ OnAfterWriteImpl, this });
set_alloc_cb({ OnAllocImpl, this });
set_read_cb({ OnReadImpl, this });
}


Expand Down
18 changes: 7 additions & 11 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "node_crypto_bio.h" // NodeBIO
#include "node_crypto_clienthello.h" // ClientHelloParser
#include "node_crypto_clienthello-inl.h"
#include "node_wrap.h" // WithGenericStream
#include "node_counters.h"
#include "node_internals.h"
#include "stream_base.h"
Expand Down Expand Up @@ -63,12 +62,12 @@ TLSWrap::TLSWrap(Environment* env,
SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap<TLSWrap>::NewSessionCallback);

stream_->Consume();
stream_->set_after_write_cb(OnAfterWriteImpl, this);
stream_->set_alloc_cb(OnAllocImpl, this);
stream_->set_read_cb(OnReadImpl, this);
stream_->set_after_write_cb({ OnAfterWriteImpl, this });
stream_->set_alloc_cb({ OnAllocImpl, this });
stream_->set_read_cb({ OnReadImpl, this });

set_alloc_cb(OnAllocSelf, this);
set_read_cb(OnReadSelf, this);
set_alloc_cb({ OnAllocSelf, this });
set_read_cb({ OnReadSelf, this });

InitSSL();
}
Expand Down Expand Up @@ -177,15 +176,12 @@ void TLSWrap::Wrap(const FunctionCallbackInfo<Value>& args) {
if (args.Length() < 3 || !args[2]->IsBoolean())
return env->ThrowTypeError("Third argument should be boolean");

Local<Object> stream_obj = args[0].As<Object>();
Local<External> stream_obj = args[0].As<External>();
Local<Object> sc = args[1].As<Object>();
Kind kind = args[2]->IsTrue() ? SSLWrap<TLSWrap>::kServer :
SSLWrap<TLSWrap>::kClient;

StreamBase* stream = nullptr;
WITH_GENERIC_STREAM(env, stream_obj, {
stream = wrap;
});
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);

TLSWrap* res = new TLSWrap(env, kind, stream, Unwrap<SecureContext>(sc));
Expand Down

0 comments on commit 291b310

Please sign in to comment.