diff --git a/lib/_tls_legacy.js b/lib/_tls_legacy.js index 4148085503fc64..fc0d115aee2e45 100644 --- a/lib/_tls_legacy.js +++ b/lib/_tls_legacy.js @@ -92,11 +92,11 @@ function onCryptoStreamFinish() { // Generate close notify // NOTE: first call checks if client has sent us shutdown, // second call enqueues shutdown into the BIO. - if (this.pair.ssl.shutdown() !== 1) { + if (this.pair.ssl.shutdownSSL() !== 1) { if (this.pair.ssl && this.pair.ssl.error) return this.pair.error(); - this.pair.ssl.shutdown(); + this.pair.ssl.shutdownSSL(); } if (this.pair.ssl && this.pair.ssl.error) diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index fb63667581c54f..10221b99c30847 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -11,14 +11,23 @@ const debug = util.debuglog('tls'); const Timer = process.binding('timer_wrap').Timer; const tls_wrap = process.binding('tls_wrap'); -// Lazy load -var tls_legacy; +// constructor for lazy loading +function createTCP() { + var TCP = process.binding('tcp_wrap').TCP; + return new TCP(); +} + +// constructor for lazy loading +function createPipe() { + var Pipe = process.binding('pipe_wrap').Pipe; + return new Pipe(); +} function onhandshakestart() { debug('onhandshakestart'); var self = this; - var ssl = self.ssl; + var ssl = self._handle; var now = Timer.now(); assert(now >= ssl.lastHandshakeTime); @@ -63,7 +72,7 @@ function loadSession(self, hello, cb) { // NOTE: That we have disabled OpenSSL's internal session storage in // `node_crypto.cc` and hence its safe to rely on getting servername only // from clienthello or this place. - var ret = self.ssl.loadSession(session); + var ret = self._handle.loadSession(session); cb(null, ret); } @@ -92,9 +101,9 @@ function loadSNI(self, servername, cb) { // TODO(indutny): eventually disallow raw `SecureContext` if (context) - self.ssl.sni_context = context.context || context; + self._handle.sni_context = context.context || context; - cb(null, self.ssl.sni_context); + cb(null, self._handle.sni_context); }); } @@ -127,7 +136,7 @@ function requestOCSP(self, hello, ctx, cb) { return cb(err); if (response) - self.ssl.setOCSPResponse(response); + self._handle.setOCSPResponse(response); cb(null); } } @@ -161,7 +170,7 @@ function onclienthello(hello) { if (err) return self.destroy(err); - self.ssl.endParser(); + self._handle.endParser(); }); }); }); @@ -184,7 +193,7 @@ function onnewsession(key, session) { return; once = true; - self.ssl.newSessionDone(); + self._handle.newSessionDone(); self._newSessionPending = false; if (self._securePending) @@ -204,29 +213,12 @@ function onocspresponse(resp) { */ function TLSSocket(socket, options) { - // Disallow wrapping TLSSocket in TLSSocket - assert(!(socket instanceof TLSSocket)); - - net.Socket.call(this, { - handle: socket && socket._handle, - allowHalfOpen: socket && socket.allowHalfOpen, - readable: false, - writable: false - }); - if (socket) { - this._parent = socket; - - // To prevent assertion in afterConnect() - this._connecting = socket._connecting; - } - this._tlsOptions = options; this._secureEstablished = false; this._securePending = false; this._newSessionPending = false; this._controlReleased = false; this._SNICallback = null; - this.ssl = null; this.servername = null; this.npnProtocol = null; this.authorized = false; @@ -236,15 +228,19 @@ function TLSSocket(socket, options) { // distinguishable from regular ones. this.encrypted = true; + net.Socket.call(this, { + handle: this._wrapHandle(socket && socket._handle), + allowHalfOpen: socket && socket.allowHalfOpen, + readable: false, + writable: false + }); + + // Proxy for API compatibility + this.ssl = this._handle; + this.on('error', this._tlsError); - if (!this._handle) { - this.once('connect', function() { - this._init(null); - }); - } else { - this._init(socket); - } + this._init(socket); // Make sure to setup all required properties like: `_connecting` before // starting the flow of the data @@ -255,23 +251,53 @@ function TLSSocket(socket, options) { util.inherits(TLSSocket, net.Socket); exports.TLSSocket = TLSSocket; -TLSSocket.prototype._init = function(socket) { - assert(this._handle); +var proxiedMethods = [ + 'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6', + 'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive', + 'setSimultaneousAccepts', 'setBlocking', - // lib/net.js expect this value to be non-zero if write hasn't been flushed - // immediately - // TODO(indutny): rewise this solution, it might be 1 before handshake and - // represent real writeQueueSize during regular writes. - this._handle.writeQueueSize = 1; + // PipeWrap + 'setPendingInstances' +]; + +TLSSocket.prototype._wrapHandle = function(handle) { + var res; - var self = this; var options = this._tlsOptions; + if (!handle) { + handle = options.pipe ? createPipe() : createTCP(); + handle.owner = this; + } // Wrap socket's handle var context = options.secureContext || options.credentials || tls.createSecureContext(); - this.ssl = tls_wrap.wrap(this._handle, context.context, options.isServer); + res = tls_wrap.wrap(handle, context.context, options.isServer); + res._parent = handle; + res._reading = handle._reading; + + // Proxy HandleWrap, PipeWrap and TCPWrap methods + proxiedMethods.forEach(function(name) { + res[name] = function methodProxy() { + return handle[name].apply(handle, arguments); + }; + }); + + return res; +}; + +TLSSocket.prototype._init = function(socket) { + var self = this; + var options = this._tlsOptions; + var ssl = this._handle; + + // lib/net.js expect this value to be non-zero if write hasn't been flushed + // immediately + // TODO(indutny): rewise this solution, it might be 1 before handshake and + // represent real writeQueueSize during regular writes. + ssl.writeQueueSize = 1; + this.server = options.server || null; // For clients, we will always have either a given ca list or be using @@ -282,32 +308,32 @@ TLSSocket.prototype._init = function(socket) { this._requestCert = requestCert; this._rejectUnauthorized = rejectUnauthorized; if (requestCert || rejectUnauthorized) - this.ssl.setVerifyMode(requestCert, rejectUnauthorized); + ssl.setVerifyMode(requestCert, rejectUnauthorized); if (options.isServer) { - this.ssl.onhandshakestart = onhandshakestart.bind(this); - this.ssl.onhandshakedone = onhandshakedone.bind(this); - this.ssl.onclienthello = onclienthello.bind(this); - this.ssl.onnewsession = onnewsession.bind(this); - this.ssl.lastHandshakeTime = 0; - this.ssl.handshakes = 0; + ssl.onhandshakestart = onhandshakestart.bind(this); + ssl.onhandshakedone = onhandshakedone.bind(this); + ssl.onclienthello = onclienthello.bind(this); + ssl.onnewsession = onnewsession.bind(this); + ssl.lastHandshakeTime = 0; + ssl.handshakes = 0; if (this.server && (listenerCount(this.server, 'resumeSession') > 0 || listenerCount(this.server, 'newSession') > 0 || listenerCount(this.server, 'OCSPRequest') > 0)) { - this.ssl.enableSessionCallbacks(); + ssl.enableSessionCallbacks(); } } else { - this.ssl.onhandshakestart = function() {}; - this.ssl.onhandshakedone = this._finishInit.bind(this); - this.ssl.onocspresponse = onocspresponse.bind(this); + ssl.onhandshakestart = function() {}; + ssl.onhandshakedone = this._finishInit.bind(this); + ssl.onocspresponse = onocspresponse.bind(this); if (options.session) - this.ssl.setSession(options.session); + ssl.setSession(options.session); } - this.ssl.onerror = function(err) { + ssl.onerror = function(err) { if (self._writableState.errorEmitted) return; self._writableState.errorEmitted = true; @@ -337,11 +363,11 @@ TLSSocket.prototype._init = function(socket) { options.server._contexts.length)) { assert(typeof options.SNICallback === 'function'); this._SNICallback = options.SNICallback; - this.ssl.enableHelloParser(); + ssl.enableHelloParser(); } if (process.features.tls_npn && options.NPNProtocols) - this.ssl.setNPNProtocols(options.NPNProtocols); + ssl.setNPNProtocols(options.NPNProtocols); if (options.handshakeTimeout > 0) this.setTimeout(options.handshakeTimeout, this._handleTimeout); @@ -350,8 +376,23 @@ TLSSocket.prototype._init = function(socket) { if (socket && socket._readableState.length) { var buf; while ((buf = socket.read()) !== null) - this.ssl.receive(buf); + ssl.receive(buf); + } + + if (socket) { + this._parent = socket; + + // To prevent assertion in afterConnect() and properly kick off readStart + this._connecting = socket._connecting; + socket.once('connect', function() { + self._connecting = false; + self.emit('connect'); + }); } + + // Assume `tls.connect()` + if (!socket) + this._connecting = true; }; TLSSocket.prototype.renegotiate = function(options, callback) { @@ -365,11 +406,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) { if (requestCert !== this._requestCert || rejectUnauthorized !== this._rejectUnauthorized) { - this.ssl.setVerifyMode(requestCert, rejectUnauthorized); + this._handle.setVerifyMode(requestCert, rejectUnauthorized); this._requestCert = requestCert; this._rejectUnauthorized = rejectUnauthorized; } - if (!this.ssl.renegotiate()) { + if (!this._handle.renegotiate()) { if (callback) { process.nextTick(function() { callback(new Error('Failed to renegotiate')); @@ -391,11 +432,11 @@ TLSSocket.prototype.renegotiate = function(options, callback) { }; TLSSocket.prototype.setMaxSendFragment = function setMaxSendFragment(size) { - return this.ssl.setMaxSendFragment(size) == 1; + return this._handle.setMaxSendFragment(size) == 1; }; TLSSocket.prototype.getTLSTicket = function getTLSTicket() { - return this.ssl.getTLSTicket(); + return this._handle.getTLSTicket(); }; TLSSocket.prototype._handleTimeout = function() { @@ -424,11 +465,11 @@ TLSSocket.prototype._finishInit = function() { } if (process.features.tls_npn) { - this.npnProtocol = this.ssl.getNegotiatedProtocol(); + this.npnProtocol = this._handle.getNegotiatedProtocol(); } if (process.features.tls_sni && this._tlsOptions.isServer) { - this.servername = this.ssl.getServername(); + this.servername = this._handle.getServername(); } debug('secure established'); @@ -439,49 +480,56 @@ TLSSocket.prototype._finishInit = function() { }; TLSSocket.prototype._start = function() { + if (this._connecting) { + this.once('connect', function() { + this._start(); + }); + return; + } + if (this._tlsOptions.requestOCSP) - this.ssl.requestOCSP(); - this.ssl.start(); + this._handle.requestOCSP(); + this._handle.start(); }; TLSSocket.prototype.setServername = function(name) { - this.ssl.setServername(name); + this._handle.setServername(name); }; TLSSocket.prototype.setSession = function(session) { if (typeof session === 'string') session = new Buffer(session, 'binary'); - this.ssl.setSession(session); + this._handle.setSession(session); }; TLSSocket.prototype.getPeerCertificate = function(detailed) { - if (this.ssl) { + if (this._handle) { return common.translatePeerCertificate( - this.ssl.getPeerCertificate(detailed)); + this._handle.getPeerCertificate(detailed)); } return null; }; TLSSocket.prototype.getSession = function() { - if (this.ssl) { - return this.ssl.getSession(); + if (this._handle) { + return this._handle.getSession(); } return null; }; TLSSocket.prototype.isSessionReused = function() { - if (this.ssl) { - return this.ssl.isSessionReused(); + if (this._handle) { + return this._handle.isSessionReused(); } return null; }; TLSSocket.prototype.getCipher = function(err) { - if (this.ssl) { - return this.ssl.getCurrentCipher(); + if (this._handle) { + return this._handle.getCurrentCipher(); } else { return null; } @@ -620,7 +668,7 @@ function Server(/* [options], listener */) { socket.on('secure', function() { if (socket._requestCert) { - var verifyError = socket.ssl.verifyError(); + var verifyError = socket._handle.verifyError(); if (verifyError) { socket.authorizationError = verifyError.code; @@ -775,28 +823,6 @@ function normalizeConnectArgs(listArgs) { return (cb) ? [options, cb] : [options]; } -function legacyConnect(hostname, options, NPN, context) { - assert(options.socket); - if (!tls_legacy) - tls_legacy = require('_tls_legacy'); - - var pair = tls_legacy.createSecurePair(context, - false, - true, - !!options.rejectUnauthorized, - { - NPNProtocols: NPN.NPNProtocols, - servername: hostname - }); - tls_legacy.pipe(pair, options.socket); - pair.cleartext._controlReleased = true; - pair.on('error', function(err) { - pair.cleartext.emit('error', err); - }); - - return pair; -} - exports.connect = function(/* [port, host], options, cb */) { var args = normalizeConnectArgs(arguments); var options = args[0]; @@ -819,51 +845,21 @@ exports.connect = function(/* [port, host], options, cb */) { context = tls.createSecureContext(options); tls.convertNPNProtocols(options.NPNProtocols, NPN); - // Wrapping TLS socket inside another TLS socket was requested - - // create legacy secure pair - var socket; - var legacy; - var result; - if (options.socket instanceof TLSSocket) { - debug('legacy connect'); - legacy = true; - socket = legacyConnect(hostname, options, NPN, context); - result = socket.cleartext; - } else { - legacy = false; - socket = new TLSSocket(options.socket, { - secureContext: context, - isServer: false, - requestCert: true, - rejectUnauthorized: options.rejectUnauthorized, - session: options.session, - NPNProtocols: NPN.NPNProtocols, - requestOCSP: options.requestOCSP - }); - result = socket; - } - - if (socket._handle && !socket._connecting) { - onHandle(); - } else { - // Not even started connecting yet (or probably resolving dns address), - // catch socket errors and assign handle. - if (!legacy && options.socket) { - options.socket.once('connect', function() { - assert(options.socket._handle); - socket._handle = options.socket._handle; - socket._handle.owner = socket; - socket.emit('connect'); - }); - } - socket.once('connect', onHandle); - } + var socket = new TLSSocket(options.socket, { + pipe: options.path && !options.port, + secureContext: context, + isServer: false, + requestCert: true, + rejectUnauthorized: options.rejectUnauthorized, + session: options.session, + NPNProtocols: NPN.NPNProtocols, + requestOCSP: options.requestOCSP + }); if (cb) - result.once('secureConnect', cb); + socket.once('secureConnect', cb); if (!options.socket) { - assert(!legacy); var connect_opt; if (options.path && !options.port) { connect_opt = { path: options.path }; @@ -874,63 +870,62 @@ exports.connect = function(/* [port, host], options, cb */) { localAddress: options.localAddress }; } - socket.connect(connect_opt); + socket.connect(connect_opt, function() { + socket._start(); + }); } - return result; + socket._releaseControl(); - function onHandle() { - if (!legacy) - socket._releaseControl(); + if (options.session) + socket.setSession(options.session); - if (options.session) - socket.setSession(options.session); + if (options.servername) + socket.setServername(options.servername); - if (!legacy) { - if (options.servername) - socket.setServername(options.servername); + if (options.socket) + socket._start(); - socket._start(); - } - socket.on('secure', function() { - var verifyError = socket.ssl.verifyError(); + socket.on('secure', function() { + var verifyError = socket._handle.verifyError(); - // Verify that server's identity matches it's certificate's names - if (!verifyError) { - var cert = result.getPeerCertificate(); - verifyError = options.checkServerIdentity(hostname, cert); - } + // Verify that server's identity matches it's certificate's names + if (!verifyError) { + var cert = socket.getPeerCertificate(); + verifyError = options.checkServerIdentity(hostname, cert); + } - if (verifyError) { - result.authorized = false; - result.authorizationError = verifyError.code || verifyError.message; + if (verifyError) { + socket.authorized = false; + socket.authorizationError = verifyError.code || verifyError.message; - if (options.rejectUnauthorized) { - result.emit('error', verifyError); - result.destroy(); - return; - } else { - result.emit('secureConnect'); - } + if (options.rejectUnauthorized) { + socket.emit('error', verifyError); + socket.destroy(); + return; } else { - result.authorized = true; - result.emit('secureConnect'); + socket.emit('secureConnect'); } + } else { + socket.authorized = true; + socket.emit('secureConnect'); + } - // Uncork incoming data - result.removeListener('end', onHangUp); - }); + // Uncork incoming data + socket.removeListener('end', onHangUp); + }); - function onHangUp() { - // NOTE: This logic is shared with _http_client.js - if (!socket._hadError) { - socket._hadError = true; - var error = new Error('socket hang up'); - error.code = 'ECONNRESET'; - socket.destroy(); - socket.emit('error', error); - } + function onHangUp() { + // NOTE: This logic is shared with _http_client.js + if (!socket._hadError) { + socket._hadError = true; + var error = new Error('socket hang up'); + error.code = 'ECONNRESET'; + socket.destroy(); + socket.emit('error', error); } - result.once('end', onHangUp); } + socket.once('end', onHangUp); + + return socket; }; diff --git a/lib/net.js b/lib/net.js index a6c95c6857b9b2..2ca4b743459705 100644 --- a/lib/net.js +++ b/lib/net.js @@ -961,7 +961,9 @@ function afterConnect(status, handle, req, readable, writable) { return; } - assert(handle === self._handle, 'handle != self._handle'); + // Update handle if it was wrapped + // TODO(indutny): assert that the handle is actually an ancestor of old one + handle = self._handle; debug('afterConnect'); diff --git a/node.gyp b/node.gyp index 01a67a08c86bba..996121ee45cfe7 100644 --- a/node.gyp +++ b/node.gyp @@ -115,6 +115,7 @@ 'src/smalloc.cc', 'src/spawn_sync.cc', 'src/string_bytes.cc', + 'src/stream_base.cc', 'src/stream_wrap.cc', 'src/tcp_wrap.cc', 'src/timer_wrap.cc', @@ -151,6 +152,7 @@ 'src/req-wrap.h', 'src/req-wrap-inl.h', 'src/string_bytes.h', + 'src/stream_base.h', 'src/stream_wrap.h', 'src/tree.h', 'src/util.h', diff --git a/src/env.h b/src/env.h index ccacbb09f52c2f..c9b4cc0736301c 100644 --- a/src/env.h +++ b/src/env.h @@ -234,8 +234,10 @@ namespace node { V(tcp_constructor_template, v8::FunctionTemplate) \ V(tick_callback_function, v8::Function) \ V(tls_wrap_constructor_function, v8::Function) \ + V(tls_wrap_constructor_template, v8::FunctionTemplate) \ V(tty_constructor_template, v8::FunctionTemplate) \ V(udp_constructor_function, v8::Function) \ + V(write_wrap_constructor_function, v8::Function) \ class Environment; diff --git a/src/node_crypto.cc b/src/node_crypto.cc index 230231080b4d87..912320771e3f65 100644 --- a/src/node_crypto.cc +++ b/src/node_crypto.cc @@ -3,7 +3,7 @@ #include "node_crypto.h" #include "node_crypto_bio.h" #include "node_crypto_groups.h" -#include "tls_wrap.h" // TLSCallbacks +#include "tls_wrap.h" // TLSWrap #include "async-wrap.h" #include "async-wrap-inl.h" @@ -98,28 +98,28 @@ const char* const root_certs[] = { X509_STORE* root_cert_store; // Just to generate static methods -template class SSLWrap; -template void SSLWrap::AddMethods(Environment* env, - Handle t); -template void SSLWrap::InitNPN(SecureContext* sc); -template SSL_SESSION* SSLWrap::GetSessionCallback( +template class SSLWrap; +template void SSLWrap::AddMethods(Environment* env, + Handle t); +template void SSLWrap::InitNPN(SecureContext* sc); +template SSL_SESSION* SSLWrap::GetSessionCallback( SSL* s, unsigned char* key, int len, int* copy); -template int SSLWrap::NewSessionCallback(SSL* s, - SSL_SESSION* sess); -template void SSLWrap::OnClientHello( +template int SSLWrap::NewSessionCallback(SSL* s, + SSL_SESSION* sess); +template void SSLWrap::OnClientHello( void* arg, const ClientHelloParser::ClientHello& hello); #ifdef OPENSSL_NPN_NEGOTIATED -template int SSLWrap::AdvertiseNextProtoCallback( +template int SSLWrap::AdvertiseNextProtoCallback( SSL* s, const unsigned char** data, unsigned int* len, void* arg); -template int SSLWrap::SelectNextProtoCallback( +template int SSLWrap::SelectNextProtoCallback( SSL* s, unsigned char** out, unsigned char* outlen, @@ -127,7 +127,7 @@ template int SSLWrap::SelectNextProtoCallback( unsigned int inlen, void* arg); #endif -template int SSLWrap::TLSExtStatusCallback(SSL* s, void* arg); +template int SSLWrap::TLSExtStatusCallback(SSL* s, void* arg); static void crypto_threadid_cb(CRYPTO_THREADID* tid) { @@ -973,7 +973,7 @@ void SSLWrap::AddMethods(Environment* env, Handle t) { env->SetProtoMethod(t, "getCurrentCipher", GetCurrentCipher); env->SetProtoMethod(t, "endParser", EndParser); env->SetProtoMethod(t, "renegotiate", Renegotiate); - env->SetProtoMethod(t, "shutdown", Shutdown); + env->SetProtoMethod(t, "shutdownSSL", Shutdown); env->SetProtoMethod(t, "getTLSTicket", GetTLSTicket); env->SetProtoMethod(t, "newSessionDone", NewSessionDone); env->SetProtoMethod(t, "setOCSPResponse", SetOCSPResponse); diff --git a/src/node_wrap.h b/src/node_wrap.h index 80d679606e9169..ddd7bd16e0d8c5 100644 --- a/src/node_wrap.h +++ b/src/node_wrap.h @@ -14,7 +14,7 @@ namespace node { -#define WITH_GENERIC_STREAM(env, obj, BODY) \ +#define WITH_GENERIC_UV_STREAM(env, obj, BODY, ELSE) \ do { \ if (env->tcp_constructor_template().IsEmpty() == false && \ env->tcp_constructor_template()->HasInstance(obj)) { \ @@ -28,16 +28,29 @@ namespace node { env->pipe_constructor_template()->HasInstance(obj)) { \ PipeWrap* const wrap = Unwrap(obj); \ BODY \ + } else { \ + ELSE \ } \ } while (0) +#define WITH_GENERIC_STREAM(env, obj, BODY) \ + do { \ + WITH_GENERIC_UV_STREAM(env, obj, BODY, { \ + if (env->tls_wrap_constructor_template().IsEmpty() == false && \ + env->tls_wrap_constructor_template()->HasInstance(obj)) { \ + TLSWrap* const wrap = Unwrap(obj); \ + BODY \ + } \ + }); \ + } while (0) + inline uv_stream_t* HandleToStream(Environment* env, v8::Local obj) { v8::HandleScope scope(env->isolate()); - WITH_GENERIC_STREAM(env, obj, { + WITH_GENERIC_UV_STREAM(env, obj, { return reinterpret_cast(wrap->UVHandle()); - }); + }, {}); return nullptr; } diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 55d5f84ff49858..08fed68741f614 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -77,30 +77,11 @@ void PipeWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "Pipe")); t->InstanceTemplate()->SetInternalFieldCount(1); - enum PropertyAttribute attributes = - static_cast(v8::ReadOnly | v8::DontDelete); - t->InstanceTemplate()->SetAccessor(env->fd_string(), - StreamWrap::GetFD, - nullptr, - Handle(), - v8::DEFAULT, - attributes); - env->SetProtoMethod(t, "close", HandleWrap::Close); env->SetProtoMethod(t, "unref", HandleWrap::Unref); env->SetProtoMethod(t, "ref", HandleWrap::Ref); - env->SetProtoMethod(t, "setBlocking", StreamWrap::SetBlocking); - - env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); - env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); - env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown); - - env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); - env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); - env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); - env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); - env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); + StreamWrap::AddMethods(env, t); env->SetProtoMethod(t, "bind", Bind); env->SetProtoMethod(t, "listen", Listen); diff --git a/src/stream_base.cc b/src/stream_base.cc new file mode 100644 index 00000000000000..0a1324bb5872c5 --- /dev/null +++ b/src/stream_base.cc @@ -0,0 +1,495 @@ +#include "stream_base.h" +#include "stream_wrap.h" + +#include "node.h" +#include "node_buffer.h" +#include "env.h" +#include "env-inl.h" +#include "string_bytes.h" +#include "tls_wrap.h" +#include "util.h" +#include "util-inl.h" +#include "v8.h" + +#include // INT_MAX + +namespace node { + +using v8::Array; +using v8::Context; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::Handle; +using v8::HandleScope; +using v8::Integer; +using v8::Local; +using v8::Number; +using v8::Object; +using v8::PropertyAttribute; +using v8::PropertyCallbackInfo; +using v8::String; +using v8::Value; + +template void StreamBase::AddMethods(Environment* env, + Handle t); +template void StreamBase::AddMethods(Environment* env, + Handle t); + + +template +void StreamBase::AddMethods(Environment* env, Handle t) { + HandleScope scope(env->isolate()); + + enum PropertyAttribute attributes = + static_cast(v8::ReadOnly | v8::DontDelete); + t->InstanceTemplate()->SetAccessor(env->fd_string(), + GetFD, + nullptr, + Handle(), + v8::DEFAULT, + attributes); + + env->SetProtoMethod(t, "readStart", JSMethod); + env->SetProtoMethod(t, "readStop", JSMethod); + env->SetProtoMethod(t, "shutdown", JSMethod); + env->SetProtoMethod(t, "writev", JSMethod); + env->SetProtoMethod(t, + "writeBuffer", + JSMethod); + env->SetProtoMethod(t, + "writeAsciiString", + JSMethod >); + env->SetProtoMethod(t, + "writeUtf8String", + JSMethod >); + env->SetProtoMethod(t, + "writeUcs2String", + JSMethod >); + env->SetProtoMethod(t, + "writeBinaryString", + JSMethod >); +} + + +template +void StreamBase::GetFD(Local key, + const PropertyCallbackInfo& args) { + StreamBase* wrap = Unwrap(args.Holder()); + + if (!wrap->IsAlive()) + return args.GetReturnValue().Set(UV_EINVAL); + + args.GetReturnValue().Set(wrap->GetFD()); +} + + +template & args)> +void StreamBase::JSMethod(const FunctionCallbackInfo& args) { + StreamBase* wrap = Unwrap(args.Holder()); + + if (!wrap->IsAlive()) + return args.GetReturnValue().Set(UV_EINVAL); + + args.GetReturnValue().Set((wrap->*Method)(args)); +} + + +int StreamBase::ReadStart(const FunctionCallbackInfo& args) { + return ReadStart(); +} + + +int StreamBase::ReadStop(const FunctionCallbackInfo& args) { + return ReadStop(); +} + + +int StreamBase::Shutdown(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + + CHECK(args[0]->IsObject()); + Local req_wrap_obj = args[0].As(); + + ShutdownWrap* req_wrap = new ShutdownWrap(env, + req_wrap_obj, + this, + AfterShutdown); + + int err = DoShutdown(req_wrap); + req_wrap->Dispatched(); + if (err) + delete req_wrap; + return err; +} + + +void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { + StreamBase* wrap = req_wrap->wrap(); + Environment* env = req_wrap->env(); + + // The wrap and request objects should still be there. + CHECK_EQ(req_wrap->persistent().IsEmpty(), false); + CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + Local req_wrap_obj = req_wrap->object(); + Local argv[3] = { + Integer::New(env->isolate(), status), + wrap->GetAsyncWrap()->object(), + req_wrap_obj + }; + + if (req_wrap->object()->Has(env->oncomplete_string())) + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); + + delete req_wrap; +} + + +int StreamBase::Writev(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsArray()); + + Local req_wrap_obj = args[0].As(); + Local chunks = args[1].As(); + + size_t count = chunks->Length() >> 1; + + uv_buf_t bufs_[16]; + uv_buf_t* bufs = bufs_; + + // Determine storage size first + size_t storage_size = 0; + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + if (Buffer::HasInstance(chunk)) + continue; + // Buffer chunk, no additional storage required + + // String chunk + Handle string = chunk->ToString(env->isolate()); + enum encoding encoding = ParseEncoding(env->isolate(), + chunks->Get(i * 2 + 1)); + size_t chunk_size; + if (encoding == UTF8 && string->Length() > 65535) + chunk_size = StringBytes::Size(env->isolate(), string, encoding); + else + chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding); + + storage_size += chunk_size + 15; + } + + if (storage_size > INT_MAX) + return UV_ENOBUFS; + + if (ARRAY_SIZE(bufs_) < count) + bufs = new uv_buf_t[count]; + + storage_size += sizeof(WriteWrap); + char* storage = new char[storage_size]; + WriteWrap* req_wrap = + new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite); + + uint32_t bytes = 0; + size_t offset = sizeof(WriteWrap); + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + // Write buffer + if (Buffer::HasInstance(chunk)) { + bufs[i].base = Buffer::Data(chunk); + bufs[i].len = Buffer::Length(chunk); + bytes += bufs[i].len; + continue; + } + + // Write string + offset = ROUND_UP(offset, 16); + CHECK_LT(offset, storage_size); + char* str_storage = storage + offset; + size_t str_size = storage_size - offset; + + Handle string = chunk->ToString(env->isolate()); + enum encoding encoding = ParseEncoding(env->isolate(), + chunks->Get(i * 2 + 1)); + str_size = StringBytes::Write(env->isolate(), + str_storage, + str_size, + string, + encoding); + bufs[i].base = str_storage; + bufs[i].len = str_size; + offset += str_size; + bytes += str_size; + } + + int err = DoWrite(req_wrap, bufs, count, nullptr); + + // Deallocate space + if (bufs != bufs_) + delete[] bufs; + + req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(env->isolate())); + req_wrap->object()->Set(env->bytes_string(), + Number::New(env->isolate(), bytes)); + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + return err; +} + + + + +int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { + CHECK(args[0]->IsObject()); + CHECK(Buffer::HasInstance(args[1])); + Environment* env = Environment::GetCurrent(args); + + Local req_wrap_obj = args[0].As(); + const char* data = Buffer::Data(args[1]); + size_t length = Buffer::Length(args[1]); + + char* storage; + WriteWrap* req_wrap; + uv_buf_t buf; + buf.base = const_cast(data); + buf.len = length; + + // Try writing immediately without allocation + uv_buf_t* bufs = &buf; + size_t count = 1; + int err = DoTryWrite(&bufs, &count); + if (err != 0) + goto done; + if (count == 0) + goto done; + CHECK_EQ(count, 1); + + // Allocate, or write rest + storage = new char[sizeof(WriteWrap)]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite); + + err = DoWrite(req_wrap, bufs, count, nullptr); + req_wrap->Dispatched(); + req_wrap_obj->Set(env->async(), True(env->isolate())); + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + done: + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), length)); + return err; +} + + +template +int StreamBase::WriteString(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsString()); + + Local req_wrap_obj = args[0].As(); + Local string = args[1].As(); + Local send_handle_obj; + if (args[2]->IsObject()) + send_handle_obj = args[2].As(); + + int err; + + // Compute the size of the storage that the string will be flattened into. + // For UTF8 strings that are very long, go ahead and take the hit for + // computing their actual size, rather than tripling the storage. + size_t storage_size; + if (enc == UTF8 && string->Length() > 65535) + storage_size = StringBytes::Size(env->isolate(), string, enc); + else + storage_size = StringBytes::StorageSize(env->isolate(), string, enc); + + if (storage_size > INT_MAX) + return UV_ENOBUFS; + + // Try writing immediately if write size isn't too big + char* storage; + WriteWrap* req_wrap; + char* data; + char stack_storage[16384]; // 16kb + size_t data_size; + uv_buf_t buf; + + bool try_write = storage_size + 15 <= sizeof(stack_storage) && + (!IsIPCPipe() || send_handle_obj.IsEmpty()); + if (try_write) { + data_size = StringBytes::Write(env->isolate(), + stack_storage, + storage_size, + string, + enc); + buf = uv_buf_init(stack_storage, data_size); + + uv_buf_t* bufs = &buf; + size_t count = 1; + err = DoTryWrite(&bufs, &count); + + // Failure + if (err != 0) + goto done; + + // Success + if (count == 0) + goto done; + + // Partial write + CHECK_EQ(count, 1); + } + + storage = new char[sizeof(WriteWrap) + storage_size + 15]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, this, AfterWrite); + + data = reinterpret_cast(ROUND_UP( + reinterpret_cast(storage) + sizeof(WriteWrap), 16)); + + if (try_write) { + // Copy partial data + memcpy(data, buf.base, buf.len); + data_size = buf.len; + } else { + // Write it + data_size = StringBytes::Write(env->isolate(), + data, + storage_size, + string, + enc); + } + + CHECK_LE(data_size, storage_size); + + buf = uv_buf_init(data, data_size); + + if (!IsIPCPipe()) { + err = DoWrite(req_wrap, &buf, 1, nullptr); + } else { + uv_handle_t* send_handle = nullptr; + + if (!send_handle_obj.IsEmpty()) { + HandleWrap* wrap = Unwrap(send_handle_obj); + send_handle = wrap->GetHandle(); + // Reference StreamWrap instance to prevent it from being garbage + // collected before `AfterWrite` is called. + CHECK_EQ(false, req_wrap->persistent().IsEmpty()); + req_wrap->object()->Set(env->handle_string(), send_handle_obj); + } + + err = DoWrite( + req_wrap, + &buf, + 1, + reinterpret_cast(send_handle)); + } + + req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(env->isolate())); + + if (err) { + req_wrap->~WriteWrap(); + delete[] storage; + } + + done: + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), data_size)); + return err; +} + + +void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { + StreamBase* wrap = req_wrap->wrap(); + Environment* env = req_wrap->env(); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + // The wrap and request objects should still be there. + CHECK_EQ(req_wrap->persistent().IsEmpty(), false); + CHECK_EQ(wrap->GetAsyncWrap()->persistent().IsEmpty(), false); + + // Unref handle property + Local req_wrap_obj = req_wrap->object(); + req_wrap_obj->Delete(env->handle_string()); + wrap->OnAfterWrite(req_wrap); + + Local argv[] = { + Integer::New(env->isolate(), status), + wrap->GetAsyncWrap()->object(), + req_wrap_obj, + Undefined(env->isolate()) + }; + + const char* msg = wrap->Error(); + if (msg != nullptr) { + argv[3] = OneByteString(env->isolate(), msg); + wrap->ClearError(); + } + + if (req_wrap->object()->Has(env->oncomplete_string())) + req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); + + req_wrap->~WriteWrap(); + delete[] reinterpret_cast(req_wrap); +} + + +void StreamBase::EmitData(ssize_t nread, + Local buf, + Local handle) { + Environment* env = env_; + + Local argv[] = { + Integer::New(env->isolate(), nread), + buf, + handle + }; + + if (argv[1].IsEmpty()) + argv[1] = Undefined(env->isolate()); + + if (argv[2].IsEmpty()) + argv[2] = Undefined(env->isolate()); + + GetAsyncWrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); +} + + +AsyncWrap* StreamBase::GetAsyncWrap() { + return nullptr; +} + +} // namespace node diff --git a/src/stream_base.h b/src/stream_base.h new file mode 100644 index 00000000000000..d6b3a555b0596b --- /dev/null +++ b/src/stream_base.h @@ -0,0 +1,223 @@ +#ifndef SRC_STREAM_BASE_H_ +#define SRC_STREAM_BASE_H_ + +#include "env.h" +#include "async-wrap.h" +#include "req-wrap.h" +#include "req-wrap-inl.h" +#include "node.h" + +#include "v8.h" + +namespace node { + +// Forward declarations +class StreamBase; + +template +class StreamReq { + public: + typedef void (*DoneCb)(Req* req, int status); + + explicit StreamReq(DoneCb cb) : cb_(cb) { + } + + inline void Done(int status) { + cb_(static_cast(this), status); + } + + private: + DoneCb cb_; +}; + +class ShutdownWrap : public ReqWrap, + public StreamReq { + public: + ShutdownWrap(Environment* env, + v8::Local req_wrap_obj, + StreamBase* wrap, + DoneCb cb) + : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), + StreamReq(cb), + wrap_(wrap) { + Wrap(req_wrap_obj, this); + } + + static void NewShutdownWrap(const v8::FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + } + + inline StreamBase* wrap() const { return wrap_; } + + private: + StreamBase* const wrap_; +}; + +class WriteWrap: public ReqWrap, + public StreamReq { + public: + WriteWrap(Environment* env, + v8::Local obj, + StreamBase* wrap, + DoneCb cb) + : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), + StreamReq(cb), + wrap_(wrap) { + Wrap(obj, this); + } + + void* operator new(size_t size, char* storage) { return storage; } + + // This is just to keep the compiler happy. It should never be called, since + // we don't use exceptions in node. + void operator delete(void* ptr, char* storage) { UNREACHABLE(); } + + inline StreamBase* wrap() const { + return wrap_; + } + + static void NewWriteWrap(const v8::FunctionCallbackInfo& args) { + CHECK(args.IsConstructCall()); + } + + private: + // People should not be using the non-placement new and delete operator on a + // WriteWrap. Ensure this never happens. + void* operator new(size_t size) { UNREACHABLE(); } + void operator delete(void* ptr) { UNREACHABLE(); } + + StreamBase* const wrap_; +}; + +class StreamResource { + public: + typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx); + typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); + typedef void (*ReadCb)(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + + StreamResource() : after_write_cb_(nullptr), + alloc_cb_(nullptr), + read_cb_(nullptr) { + } + + virtual ~StreamResource() = default; + + virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; + virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0; + virtual int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) = 0; + virtual const char* Error() const = 0; + virtual void ClearError() = 0; + + // Events + inline void OnAfterWrite(WriteWrap* w) { + if (after_write_cb_ != nullptr) + after_write_cb_(w, after_write_ctx_); + } + + inline void OnAlloc(size_t size, uv_buf_t* buf) { + if (alloc_cb_ != nullptr) + alloc_cb_(size, buf, alloc_ctx_); + } + + inline void OnRead(size_t nread, + const uv_buf_t* buf, + uv_handle_type pending) { + if (read_cb_ != nullptr) + read_cb_(nread, buf, pending, read_ctx_); + } + + inline void set_after_write_cb(AfterWriteCb cb, void* ctx) { + after_write_ctx_ = ctx; + after_write_cb_ = cb; + } + + inline void set_alloc_cb(AllocCb cb, void* ctx) { + alloc_cb_ = cb; + alloc_ctx_ = ctx; + } + + inline void set_read_cb(ReadCb cb, void* ctx) { + read_cb_ = cb; + read_ctx_ = ctx; + } + + private: + AfterWriteCb after_write_cb_; + void* after_write_ctx_; + AllocCb alloc_cb_; + void* alloc_ctx_; + ReadCb read_cb_; + void* read_ctx_; +}; + +class StreamBase : public StreamResource { + public: + template + static void AddMethods(Environment* env, + v8::Handle target); + + virtual void* Cast() = 0; + virtual bool IsAlive() const = 0; + virtual bool IsClosing() const = 0; + virtual bool IsIPCPipe() const = 0; + virtual int GetFD() const = 0; + + virtual int ReadStart() = 0; + virtual int ReadStop() = 0; + + inline void Consume() { + CHECK_EQ(consumed_, false); + consumed_ = true; + } + + template + inline Outer* Cast() { return static_cast(Cast()); } + + void EmitData(ssize_t nread, + v8::Local buf, + v8::Local handle); + + protected: + explicit StreamBase(Environment* env) : env_(env), consumed_(false) { + } + + virtual ~StreamBase() = default; + + virtual AsyncWrap* GetAsyncWrap() = 0; + + // Libuv callbacks + static void AfterShutdown(ShutdownWrap* req, int status); + static void AfterWrite(WriteWrap* req, int status); + + // JS Methods + int ReadStart(const v8::FunctionCallbackInfo& args); + int ReadStop(const v8::FunctionCallbackInfo& args); + int Shutdown(const v8::FunctionCallbackInfo& args); + int Writev(const v8::FunctionCallbackInfo& args); + int WriteBuffer(const v8::FunctionCallbackInfo& args); + template + int WriteString(const v8::FunctionCallbackInfo& args); + + template + static void GetFD(v8::Local, + const v8::PropertyCallbackInfo&); + + template & args)> + static void JSMethod(const v8::FunctionCallbackInfo& args); + + private: + Environment* env_; + bool consumed_; +}; + +} // namespace node + +#endif // SRC_STREAM_BASE_H_ diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index a9f89e47bb9813..3b50f638eb0fc7 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -38,8 +38,8 @@ using v8::Value; void StreamWrap::Initialize(Handle target, - Handle unused, - Handle context) { + Handle unused, + Handle context) { Environment* env = Environment::GetCurrent(context); Local sw = @@ -55,6 +55,7 @@ void StreamWrap::Initialize(Handle target, ww->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap")); target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"), ww->GetFunction()); + env->set_write_wrap_constructor_function(ww->GetFunction()); } @@ -68,23 +69,53 @@ StreamWrap::StreamWrap(Environment* env, reinterpret_cast(stream), provider, parent), - stream_(stream), - default_callbacks_(this), - callbacks_(&default_callbacks_), - callbacks_gc_(false) { + StreamBase(env), + stream_(stream) { + set_after_write_cb(OnAfterWriteImpl, this); + set_alloc_cb(OnAllocImpl, this); + set_read_cb(OnReadImpl, this); } -void StreamWrap::GetFD(Local, const PropertyCallbackInfo& args) { -#if !defined(_WIN32) - HandleScope scope(args.GetIsolate()); - StreamWrap* wrap = Unwrap(args.Holder()); +void StreamWrap::AddMethods(Environment* env, + v8::Handle target) { + env->SetProtoMethod(target, "setBlocking", SetBlocking); + StreamBase::AddMethods(env, target); +} + + +int StreamWrap::GetFD() const { int fd = -1; - if (wrap != nullptr && wrap->stream() != nullptr) { - fd = wrap->stream()->io_watcher.fd; - } - args.GetReturnValue().Set(fd); +#if !defined(_WIN32) + if (stream() != nullptr) + fd = stream()->io_watcher.fd; #endif + return fd; +} + + +bool StreamWrap::IsAlive() const { + return HandleWrap::IsAlive(this); +} + + +bool StreamWrap::IsClosing() const { + return uv_is_closing(reinterpret_cast(stream())); +} + + +void* StreamWrap::Cast() { + return reinterpret_cast(this); +} + + +AsyncWrap* StreamWrap::GetAsyncWrap() { + return static_cast(this); +} + + +bool StreamWrap::IsIPCPipe() const { + return is_named_pipe_ipc(); } @@ -96,22 +127,13 @@ void StreamWrap::UpdateWriteQueueSize() { } -void StreamWrap::ReadStart(const FunctionCallbackInfo& args) { - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - int err = uv_read_start(wrap->stream(), OnAlloc, OnRead); - args.GetReturnValue().Set(err); +int StreamWrap::ReadStart() { + return uv_read_start(stream(), OnAlloc, OnRead); } -void StreamWrap::ReadStop(const FunctionCallbackInfo& args) { - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - int err = uv_read_stop(wrap->stream()); - args.GetReturnValue().Set(err); +int StreamWrap::ReadStop() { + return uv_read_stop(stream()); } @@ -120,14 +142,25 @@ void StreamWrap::OnAlloc(uv_handle_t* handle, uv_buf_t* buf) { StreamWrap* wrap = static_cast(handle->data); CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); - wrap->callbacks()->DoAlloc(handle, suggested_size, buf); + + return static_cast(wrap)->OnAlloc(suggested_size, buf); +} + + +void StreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { + buf->base = static_cast(malloc(size)); + buf->len = size; + + if (buf->base == nullptr && size > 0) { + FatalError( + "node::StreamWrap::DoAlloc(size_t, uv_buf_t*, void*)", + "Out Of Memory"); + } } template -static Local AcceptHandle(Environment* env, - uv_stream_t* pipe, - AsyncWrap* parent) { +static Local AcceptHandle(Environment* env, StreamWrap* parent) { EscapableHandleScope scope(env->isolate()); Local wrap_obj; UVType* handle; @@ -139,13 +172,54 @@ static Local AcceptHandle(Environment* env, WrapType* wrap = Unwrap(wrap_obj); handle = wrap->UVHandle(); - if (uv_accept(pipe, reinterpret_cast(handle))) + if (uv_accept(parent->stream(), reinterpret_cast(handle))) abort(); return scope.Escape(wrap_obj); } +void StreamWrap::OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + StreamWrap* wrap = static_cast(ctx); + Environment* env = wrap->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + Local pending_obj; + + if (nread < 0) { + if (buf->base != nullptr) + free(buf->base); + wrap->EmitData(nread, Local(), pending_obj); + return; + } + + if (nread == 0) { + if (buf->base != nullptr) + free(buf->base); + return; + } + + char* base = static_cast(realloc(buf->base, nread)); + CHECK_LE(static_cast(nread), buf->len); + + if (pending == UV_TCP) { + pending_obj = AcceptHandle(env, wrap); + } else if (pending == UV_NAMED_PIPE) { + pending_obj = AcceptHandle(env, wrap); + } else if (pending == UV_UDP) { + pending_obj = AcceptHandle(env, wrap); + } else { + CHECK_EQ(pending, UV_UNKNOWN_HANDLE); + } + + wrap->EmitData(nread, Buffer::Use(env, base, nread), pending_obj); +} + + void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf, @@ -164,7 +238,7 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, } } - wrap->callbacks()->DoRead(handle, nread, buf, pending); + static_cast(wrap)->OnRead(nread, buf, pending); } @@ -183,437 +257,26 @@ void StreamWrap::OnRead(uv_stream_t* handle, } -size_t StreamWrap::WriteBuffer(Handle val, uv_buf_t* buf) { - CHECK(Buffer::HasInstance(val)); - - // Simple non-writev case - buf->base = Buffer::Data(val); - buf->len = Buffer::Length(val); - - return buf->len; -} - - -void StreamWrap::WriteBuffer(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - CHECK(args[0]->IsObject()); - CHECK(Buffer::HasInstance(args[1])); - - Local req_wrap_obj = args[0].As(); - Local buf_obj = args[1].As(); - - size_t length = Buffer::Length(buf_obj); - - char* storage; - WriteWrap* req_wrap; - uv_buf_t buf; - WriteBuffer(buf_obj, &buf); - - // Try writing immediately without allocation - uv_buf_t* bufs = &buf; - size_t count = 1; - int err = wrap->callbacks()->TryWrite(&bufs, &count); - if (err != 0) - goto done; - if (count == 0) - goto done; - CHECK_EQ(count, 1); - - // Allocate, or write rest - storage = new char[sizeof(WriteWrap)]; - req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); - - err = wrap->callbacks()->DoWrite(req_wrap, - bufs, - count, - nullptr, - StreamWrap::AfterWrite); - req_wrap->Dispatched(); - req_wrap_obj->Set(env->async(), True(env->isolate())); - - if (err) { - req_wrap->~WriteWrap(); - delete[] storage; - } - - done: - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - wrap->callbacks()->ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), length)); - args.GetReturnValue().Set(err); -} - - -template -void StreamWrap::WriteStringImpl(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - int err; - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - CHECK(args[0]->IsObject()); - CHECK(args[1]->IsString()); - - Local req_wrap_obj = args[0].As(); - Local string = args[1].As(); - - // Compute the size of the storage that the string will be flattened into. - // For UTF8 strings that are very long, go ahead and take the hit for - // computing their actual size, rather than tripling the storage. - size_t storage_size; - if (encoding == UTF8 && string->Length() > 65535) - storage_size = StringBytes::Size(env->isolate(), string, encoding); - else - storage_size = StringBytes::StorageSize(env->isolate(), string, encoding); - - if (storage_size > INT_MAX) { - args.GetReturnValue().Set(UV_ENOBUFS); - return; - } - - // Try writing immediately if write size isn't too big - char* storage; - WriteWrap* req_wrap; - char* data; - char stack_storage[16384]; // 16kb - size_t data_size; - uv_buf_t buf; - - bool try_write = storage_size + 15 <= sizeof(stack_storage) && - (!wrap->is_named_pipe_ipc() || !args[2]->IsObject()); - if (try_write) { - data_size = StringBytes::Write(env->isolate(), - stack_storage, - storage_size, - string, - encoding); - buf = uv_buf_init(stack_storage, data_size); - - uv_buf_t* bufs = &buf; - size_t count = 1; - err = wrap->callbacks()->TryWrite(&bufs, &count); - - // Failure - if (err != 0) - goto done; - - // Success - if (count == 0) - goto done; - - // Partial write - CHECK_EQ(count, 1); - } - - storage = new char[sizeof(WriteWrap) + storage_size + 15]; - req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); - - data = reinterpret_cast(ROUND_UP( - reinterpret_cast(storage) + sizeof(WriteWrap), 16)); - - if (try_write) { - // Copy partial data - memcpy(data, buf.base, buf.len); - data_size = buf.len; - } else { - // Write it - data_size = StringBytes::Write(env->isolate(), - data, - storage_size, - string, - encoding); - } - - CHECK_LE(data_size, storage_size); - - buf = uv_buf_init(data, data_size); - - if (!wrap->is_named_pipe_ipc()) { - err = wrap->callbacks()->DoWrite(req_wrap, - &buf, - 1, - nullptr, - StreamWrap::AfterWrite); - } else { - uv_handle_t* send_handle = nullptr; - - if (args[2]->IsObject()) { - Local send_handle_obj = args[2].As(); - HandleWrap* wrap = Unwrap(send_handle_obj); - send_handle = wrap->GetHandle(); - // Reference StreamWrap instance to prevent it from being garbage - // collected before `AfterWrite` is called. - CHECK_EQ(false, req_wrap->persistent().IsEmpty()); - req_wrap->object()->Set(env->handle_string(), send_handle_obj); - } - - err = wrap->callbacks()->DoWrite( - req_wrap, - &buf, - 1, - reinterpret_cast(send_handle), - StreamWrap::AfterWrite); - } - - req_wrap->Dispatched(); - req_wrap->object()->Set(env->async(), True(env->isolate())); - - if (err) { - req_wrap->~WriteWrap(); - delete[] storage; - } - - done: - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - wrap->callbacks()->ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), data_size)); - args.GetReturnValue().Set(err); -} - - -void StreamWrap::Writev(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - - CHECK(args[0]->IsObject()); - CHECK(args[1]->IsArray()); - - Local req_wrap_obj = args[0].As(); - Local chunks = args[1].As(); - size_t count = chunks->Length() >> 1; - - uv_buf_t bufs_[16]; - uv_buf_t* bufs = bufs_; - - // Determine storage size first - size_t storage_size = 0; - for (size_t i = 0; i < count; i++) { - Handle chunk = chunks->Get(i * 2); - - if (Buffer::HasInstance(chunk)) - continue; - // Buffer chunk, no additional storage required - - // String chunk - Handle string = chunk->ToString(env->isolate()); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(i * 2 + 1)); - size_t chunk_size; - if (encoding == UTF8 && string->Length() > 65535) - chunk_size = StringBytes::Size(env->isolate(), string, encoding); - else - chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding); - - storage_size += chunk_size + 15; - } - - if (storage_size > INT_MAX) { - args.GetReturnValue().Set(UV_ENOBUFS); - return; - } - - if (ARRAY_SIZE(bufs_) < count) - bufs = new uv_buf_t[count]; - - storage_size += sizeof(WriteWrap); - char* storage = new char[storage_size]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); - - uint32_t bytes = 0; - size_t offset = sizeof(WriteWrap); - for (size_t i = 0; i < count; i++) { - Handle chunk = chunks->Get(i * 2); - - // Write buffer - if (Buffer::HasInstance(chunk)) { - bufs[i].base = Buffer::Data(chunk); - bufs[i].len = Buffer::Length(chunk); - bytes += bufs[i].len; - continue; - } - - // Write string - offset = ROUND_UP(offset, 16); - CHECK_LT(offset, storage_size); - char* str_storage = storage + offset; - size_t str_size = storage_size - offset; - - Handle string = chunk->ToString(env->isolate()); - enum encoding encoding = ParseEncoding(env->isolate(), - chunks->Get(i * 2 + 1)); - str_size = StringBytes::Write(env->isolate(), - str_storage, - str_size, - string, - encoding); - bufs[i].base = str_storage; - bufs[i].len = str_size; - offset += str_size; - bytes += str_size; - } - - int err = wrap->callbacks()->DoWrite(req_wrap, - bufs, - count, - nullptr, - StreamWrap::AfterWrite); - - // Deallocate space - if (bufs != bufs_) - delete[] bufs; - - req_wrap->Dispatched(); - req_wrap->object()->Set(env->async(), True(env->isolate())); - req_wrap->object()->Set(env->bytes_string(), - Number::New(env->isolate(), bytes)); - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - wrap->callbacks()->ClearError(); - } - - if (err) { - req_wrap->~WriteWrap(); - delete[] storage; - } - - args.GetReturnValue().Set(err); -} - - -void StreamWrap::WriteAsciiString(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - - -void StreamWrap::WriteUtf8String(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - - -void StreamWrap::WriteUcs2String(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - -void StreamWrap::WriteBinaryString(const FunctionCallbackInfo& args) { - WriteStringImpl(args); -} - void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) - return args.GetReturnValue().Set(UV_EINVAL); - CHECK_GT(args.Length(), 0); - int err = uv_stream_set_blocking(wrap->stream(), args[0]->IsTrue()); - args.GetReturnValue().Set(err); -} - -void StreamWrap::AfterWrite(uv_write_t* req, int status) { - WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req); - StreamWrap* wrap = req_wrap->wrap(); - Environment* env = wrap->env(); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - CHECK_EQ(wrap->persistent().IsEmpty(), false); - - // Unref handle property - Local req_wrap_obj = req_wrap->object(); - req_wrap_obj->Delete(env->handle_string()); - wrap->callbacks()->AfterWrite(req_wrap); - - Local argv[] = { - Integer::New(env->isolate(), status), - wrap->object(), - req_wrap_obj, - Undefined(env->isolate()) - }; - - const char* msg = wrap->callbacks()->Error(); - if (msg != nullptr) { - argv[3] = OneByteString(env->isolate(), msg); - wrap->callbacks()->ClearError(); - } - - req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); - - req_wrap->~WriteWrap(); - delete[] reinterpret_cast(req_wrap); -} - -void StreamWrap::Shutdown(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - - StreamWrap* wrap = Unwrap(args.Holder()); - if (!IsAlive(wrap)) + CHECK_GT(args.Length(), 0); + if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); - CHECK(args[0]->IsObject()); - Local req_wrap_obj = args[0].As(); - - ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj); - int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown); - req_wrap->Dispatched(); - if (err) - delete req_wrap; - args.GetReturnValue().Set(err); -} - - -void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { - ShutdownWrap* req_wrap = static_cast(req->data); - StreamWrap* wrap = static_cast(req->handle->data); - Environment* env = wrap->env(); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - CHECK_EQ(wrap->persistent().IsEmpty(), false); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local req_wrap_obj = req_wrap->object(); - Local argv[3] = { - Integer::New(env->isolate(), status), - wrap->object(), - req_wrap_obj - }; - - req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv); - - delete req_wrap; + bool enable = args[0]->IsTrue(); + args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable)); } -const char* StreamWrapCallbacks::Error() const { - return nullptr; +int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) { + return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown); } -void StreamWrapCallbacks::ClearError() { +void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { + ShutdownWrap* req_wrap = ContainerOf(&ShutdownWrap::req_, req); + req_wrap->Done(status); } @@ -621,13 +284,13 @@ void StreamWrapCallbacks::ClearError() { // values, shifting their base and decrementing their length. This is // required in order to skip the data that was successfully written via // uv_try_write(). -int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { +int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { int err; size_t written; uv_buf_t* vbufs = *bufs; size_t vcount = *count; - err = uv_try_write(wrap()->stream(), vbufs, vcount); + err = uv_try_write(stream(), vbufs, vcount); if (err == UV_ENOSYS || err == UV_EAGAIN) return 0; if (err < 0) @@ -657,106 +320,53 @@ int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { } -int StreamWrapCallbacks::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle, - uv_write_cb cb) { +int StreamWrap::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) { int r; if (send_handle == nullptr) { - r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb); + r = uv_write(&w->req_, stream(), bufs, count, AfterWrite); } else { - r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb); + r = uv_write2(&w->req_, stream(), bufs, count, send_handle, AfterWrite); } if (!r) { size_t bytes = 0; for (size_t i = 0; i < count; i++) bytes += bufs[i].len; - if (wrap()->stream()->type == UV_TCP) { + if (stream()->type == UV_TCP) { NODE_COUNT_NET_BYTES_SENT(bytes); - } else if (wrap()->stream()->type == UV_NAMED_PIPE) { + } else if (stream()->type == UV_NAMED_PIPE) { NODE_COUNT_PIPE_BYTES_SENT(bytes); } } - wrap()->UpdateWriteQueueSize(); + UpdateWriteQueueSize(); return r; } -void StreamWrapCallbacks::AfterWrite(WriteWrap* w) { - wrap()->UpdateWriteQueueSize(); +void StreamWrap::AfterWrite(uv_write_t* req, int status) { + WriteWrap* req_wrap = ContainerOf(&WriteWrap::req_, req); + req_wrap->Done(status); } -void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { - buf->base = static_cast(malloc(suggested_size)); - buf->len = suggested_size; - - if (buf->base == nullptr && suggested_size > 0) { - FatalError( - "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)", - "Out Of Memory"); - } +void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { + StreamWrap* wrap = static_cast(ctx); + wrap->UpdateWriteQueueSize(); } -void StreamWrapCallbacks::DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { - Environment* env = wrap()->env(); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local argv[] = { - Integer::New(env->isolate(), nread), - Undefined(env->isolate()), - Undefined(env->isolate()) - }; - - if (nread < 0) { - if (buf->base != nullptr) - free(buf->base); - wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); - return; - } - - if (nread == 0) { - if (buf->base != nullptr) - free(buf->base); - return; - } - - char* base = static_cast(realloc(buf->base, nread)); - CHECK_LE(static_cast(nread), buf->len); - argv[1] = Buffer::Use(env, base, nread); - - Local pending_obj; - if (pending == UV_TCP) { - pending_obj = AcceptHandle(env, handle, wrap()); - } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle(env, handle, wrap()); - } else if (pending == UV_UDP) { - pending_obj = AcceptHandle(env, handle, wrap()); - } else { - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - } - - if (!pending_obj.IsEmpty()) { - argv[2] = pending_obj; - } - - wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv); +const char* StreamWrap::Error() const { + return nullptr; } -int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) { - return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb); +void StreamWrap::ClearError() { + // No-op } } // namespace node diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 5148228112eb1e..ca673b4ef11879 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -1,10 +1,10 @@ #ifndef SRC_STREAM_WRAP_H_ #define SRC_STREAM_WRAP_H_ +#include "stream_base.h" + #include "env.h" #include "handle_wrap.h" -#include "req-wrap.h" -#include "req-wrap-inl.h" #include "string_bytes.h" #include "v8.h" @@ -13,126 +13,31 @@ namespace node { // Forward declaration class StreamWrap; -class ShutdownWrap : public ReqWrap { - public: - ShutdownWrap(Environment* env, v8::Local req_wrap_obj) - : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP) { - Wrap(req_wrap_obj, this); - } - - static void NewShutdownWrap(const v8::FunctionCallbackInfo& args) { - CHECK(args.IsConstructCall()); - } -}; - -class WriteWrap: public ReqWrap { - public: - // TODO(trevnorris): WrapWrap inherits from ReqWrap, which I've globbed - // into the same provider. How should these be broken apart? - WriteWrap(Environment* env, v8::Local obj, StreamWrap* wrap) - : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - wrap_(wrap) { - Wrap(obj, this); - } - - void* operator new(size_t size, char* storage) { return storage; } - - // This is just to keep the compiler happy. It should never be called, since - // we don't use exceptions in node. - void operator delete(void* ptr, char* storage) { UNREACHABLE(); } - - inline StreamWrap* wrap() const { - return wrap_; - } - - static void NewWriteWrap(const v8::FunctionCallbackInfo& args) { - CHECK(args.IsConstructCall()); - } - - private: - // People should not be using the non-placement new and delete operator on a - // WriteWrap. Ensure this never happens. - void* operator new(size_t size) { UNREACHABLE(); } - void operator delete(void* ptr) { UNREACHABLE(); } - - StreamWrap* const wrap_; -}; - -// Overridable callbacks' types -class StreamWrapCallbacks { - public: - explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) { - } - - explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap()) { - } - - virtual ~StreamWrapCallbacks() = default; - - virtual const char* Error() const; - virtual void ClearError(); - - virtual int TryWrite(uv_buf_t** bufs, size_t* count); - - virtual int DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle, - uv_write_cb cb); - virtual void AfterWrite(WriteWrap* w); - virtual void DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf); - virtual void DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending); - virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb); - - protected: - inline StreamWrap* wrap() const { - return wrap_; - } - - private: - StreamWrap* const wrap_; -}; - -class StreamWrap : public HandleWrap { +class StreamWrap : public HandleWrap, public StreamBase { public: static void Initialize(v8::Handle target, v8::Handle unused, v8::Handle context); - void OverrideCallbacks(StreamWrapCallbacks* callbacks, bool gc) { - StreamWrapCallbacks* old = callbacks_; - callbacks_ = callbacks; - callbacks_gc_ = gc; - if (old != &default_callbacks_) - delete old; - } - - static void GetFD(v8::Local, - const v8::PropertyCallbackInfo&); + int GetFD() const override; + void* Cast() override; + bool IsAlive() const override; + bool IsClosing() const override; + bool IsIPCPipe() const override; // JavaScript functions - static void ReadStart(const v8::FunctionCallbackInfo& args); - static void ReadStop(const v8::FunctionCallbackInfo& args); - static void Shutdown(const v8::FunctionCallbackInfo& args); - - static void Writev(const v8::FunctionCallbackInfo& args); - static void WriteBuffer(const v8::FunctionCallbackInfo& args); - static void WriteAsciiString(const v8::FunctionCallbackInfo& args); - static void WriteUtf8String(const v8::FunctionCallbackInfo& args); - static void WriteUcs2String(const v8::FunctionCallbackInfo& args); - static void WriteBinaryString( - const v8::FunctionCallbackInfo& args); - - static void SetBlocking(const v8::FunctionCallbackInfo& args); - - inline StreamWrapCallbacks* callbacks() const { - return callbacks_; - } + int ReadStart() override; + int ReadStop() override; + + // Resource implementation + int DoShutdown(ShutdownWrap* req_wrap) override; + int DoTryWrite(uv_buf_t** bufs, size_t* count) override; + int DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) override; + const char* Error() const override; + void ClearError() override; inline uv_stream_t* stream() const { return stream_; @@ -152,8 +57,6 @@ class StreamWrap : public HandleWrap { } protected: - static size_t WriteBuffer(v8::Handle val, uv_buf_t* buf); - StreamWrap(Environment* env, v8::Local object, uv_stream_t* stream, @@ -161,22 +64,21 @@ class StreamWrap : public HandleWrap { AsyncWrap* parent = nullptr); ~StreamWrap() { - if (!callbacks_gc_ && callbacks_ != &default_callbacks_) { - delete callbacks_; - } - callbacks_ = nullptr; } - void StateChange() { } + AsyncWrap* GetAsyncWrap() override; void UpdateWriteQueueSize(); + static void AddMethods(Environment* env, + v8::Handle target); + private: + static void SetBlocking(const v8::FunctionCallbackInfo& args); + // Callbacks for libuv - static void AfterWrite(uv_write_t* req, int status); static void OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); - static void AfterShutdown(uv_shutdown_t* req, int status); static void OnRead(uv_stream_t* handle, ssize_t nread, @@ -185,16 +87,18 @@ class StreamWrap : public HandleWrap { ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); + static void AfterWrite(uv_write_t* req, int status); + static void AfterShutdown(uv_shutdown_t* req, int status); - template - static void WriteStringImpl(const v8::FunctionCallbackInfo& args); + // Resource interface implementation + static void OnAfterWriteImpl(WriteWrap* w, void* ctx); + static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); uv_stream_t* const stream_; - StreamWrapCallbacks default_callbacks_; - StreamWrapCallbacks* callbacks_; // Overridable callbacks - bool callbacks_gc_; - - friend class StreamWrapCallbacks; }; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 3f011422ef8837..a823e758ee100d 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -72,15 +72,6 @@ void TCPWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TCP")); t->InstanceTemplate()->SetInternalFieldCount(1); - enum PropertyAttribute attributes = - static_cast(v8::ReadOnly | v8::DontDelete); - t->InstanceTemplate()->SetAccessor(env->fd_string(), - StreamWrap::GetFD, - nullptr, - Handle(), - v8::DEFAULT, - attributes); - // Init properties t->InstanceTemplate()->Set(String::NewFromUtf8(env->isolate(), "reading"), Boolean::New(env->isolate(), false)); @@ -98,16 +89,7 @@ void TCPWrap::Initialize(Handle target, env->SetProtoMethod(t, "ref", HandleWrap::Ref); env->SetProtoMethod(t, "unref", HandleWrap::Unref); - env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); - env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); - env->SetProtoMethod(t, "shutdown", StreamWrap::Shutdown); - - env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); - env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); - env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); - env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); - env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); - env->SetProtoMethod(t, "writev", StreamWrap::Writev); + StreamWrap::AddMethods(env, t); env->SetProtoMethod(t, "open", Open); env->SetProtoMethod(t, "bind", Bind); diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 9aafe3925dc8e2..ab8db6951bdc30 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -33,17 +33,20 @@ using v8::String; using v8::Value; -TLSCallbacks::TLSCallbacks(Environment* env, - Kind kind, - Handle sc, - StreamWrapCallbacks* old) - : SSLWrap(env, Unwrap(sc), kind), - StreamWrapCallbacks(old), +TLSWrap::TLSWrap(Environment* env, + Kind kind, + StreamBase* stream, + Handle stream_obj, + Handle sc) + : SSLWrap(env, Unwrap(sc), kind), + StreamBase(env), AsyncWrap(env, env->tls_wrap_constructor_function()->NewInstance(), AsyncWrap::PROVIDER_TLSWRAP), sc_(Unwrap(sc)), sc_handle_(env->isolate(), sc), + stream_(stream), + stream_handle_(env->isolate(), stream_obj), enc_in_(nullptr), enc_out_(nullptr), clear_in_(nullptr), @@ -58,14 +61,22 @@ TLSCallbacks::TLSCallbacks(Environment* env, MakeWeak(this); // We've our own session callbacks - SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); - SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); + SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); + SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); + + stream_->Consume(); + stream_->set_after_write_cb(OnAfterWriteImpl, this); + stream_->set_alloc_cb(OnAllocImpl, this); + stream_->set_read_cb(OnReadImpl, this); + + set_alloc_cb(OnAllocSelf, this); + set_read_cb(OnReadSelf, this); InitSSL(); } -TLSCallbacks::~TLSCallbacks() { +TLSWrap::~TLSWrap() { enc_in_ = nullptr; enc_out_ = nullptr; delete clear_in_; @@ -73,6 +84,7 @@ TLSCallbacks::~TLSCallbacks() { sc_ = nullptr; sc_handle_.Reset(); + stream_handle_.Reset(); persistent().Reset(); #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB @@ -90,12 +102,12 @@ TLSCallbacks::~TLSCallbacks() { } -void TLSCallbacks::MakePending() { +void TLSWrap::MakePending() { write_item_queue_.MoveBack(&pending_write_items_); } -bool TLSCallbacks::InvokeQueued(int status) { +bool TLSWrap::InvokeQueued(int status) { if (pending_write_items_.IsEmpty()) return false; @@ -103,7 +115,7 @@ bool TLSCallbacks::InvokeQueued(int status) { WriteItemList queue; pending_write_items_.MoveBack(&queue); while (WriteItem* wi = queue.PopFront()) { - wi->cb_(&wi->w_->req_, status); + wi->w_->Done(status); delete wi; } @@ -111,12 +123,12 @@ bool TLSCallbacks::InvokeQueued(int status) { } -void TLSCallbacks::NewSessionDoneCb() { +void TLSWrap::NewSessionDoneCb() { Cycle(); } -void TLSCallbacks::InitSSL() { +void TLSWrap::InitSSL() { // Initialize SSL enc_in_ = NodeBIO::New(); enc_out_ = NodeBIO::New(); @@ -158,7 +170,7 @@ void TLSCallbacks::InitSSL() { } -void TLSCallbacks::Wrap(const FunctionCallbackInfo& args) { +void TLSWrap::Wrap(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); if (args.Length() < 1 || !args[0]->IsObject()) { @@ -172,42 +184,39 @@ void TLSCallbacks::Wrap(const FunctionCallbackInfo& args) { if (args.Length() < 3 || !args[2]->IsBoolean()) return env->ThrowTypeError("Third argument should be boolean"); - Local stream = args[0].As(); + Local stream_obj = args[0].As(); Local sc = args[1].As(); - Kind kind = args[2]->IsTrue() ? SSLWrap::kServer : - SSLWrap::kClient; + Kind kind = args[2]->IsTrue() ? SSLWrap::kServer : + SSLWrap::kClient; - TLSCallbacks* callbacks = nullptr; - WITH_GENERIC_STREAM(env, stream, { - callbacks = new TLSCallbacks(env, kind, sc, wrap->callbacks()); - wrap->OverrideCallbacks(callbacks, true); + StreamBase* stream = nullptr; + WITH_GENERIC_STREAM(env, stream_obj, { + stream = wrap; }); + CHECK_NE(stream, nullptr); - if (callbacks == nullptr) { - return args.GetReturnValue().SetNull(); - } + TLSWrap* res = new TLSWrap(env, kind, stream, stream_obj, sc); - args.GetReturnValue().Set(callbacks->persistent()); + args.GetReturnValue().Set(res->persistent()); } -void TLSCallbacks::Receive(const FunctionCallbackInfo& args) { - TLSCallbacks* wrap = Unwrap(args.Holder()); +void TLSWrap::Receive(const FunctionCallbackInfo& args) { + TLSWrap* wrap = Unwrap(args.Holder()); CHECK(Buffer::HasInstance(args[0])); char* data = Buffer::Data(args[0]); size_t len = Buffer::Length(args[0]); uv_buf_t buf; - uv_stream_t* stream = wrap->wrap()->stream(); // Copy given buffer entirely or partiall if handle becomes closed - while (len > 0 && !uv_is_closing(reinterpret_cast(stream))) { - wrap->DoAlloc(reinterpret_cast(stream), len, &buf); + while (len > 0 && !wrap->IsClosing()) { + wrap->stream_->OnAlloc(len, &buf); size_t copy = buf.len > len ? len : buf.len; memcpy(buf.base, data, copy); buf.len = copy; - wrap->DoRead(stream, buf.len, &buf, UV_UNKNOWN_HANDLE); + wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE); data += copy; len -= copy; @@ -215,10 +224,10 @@ void TLSCallbacks::Receive(const FunctionCallbackInfo& args) { } -void TLSCallbacks::Start(const FunctionCallbackInfo& args) { +void TLSWrap::Start(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); if (wrap->started_) return env->ThrowError("Already started."); @@ -231,14 +240,14 @@ void TLSCallbacks::Start(const FunctionCallbackInfo& args) { } -void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) { +void TLSWrap::SSLInfoCallback(const SSL* ssl_, int where, int ret) { if (!(where & (SSL_CB_HANDSHAKE_START | SSL_CB_HANDSHAKE_DONE))) return; // Be compatible with older versions of OpenSSL. SSL_get_app_data() wants // a non-const SSL* in OpenSSL <= 0.9.7e. SSL* ssl = const_cast(ssl_); - TLSCallbacks* c = static_cast(SSL_get_app_data(ssl)); + TLSWrap* c = static_cast(SSL_get_app_data(ssl)); Environment* env = c->env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); @@ -261,7 +270,7 @@ void TLSCallbacks::SSLInfoCallback(const SSL* ssl_, int where, int ret) { } -void TLSCallbacks::EncOut() { +void TLSWrap::EncOut() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return; @@ -291,47 +300,49 @@ void TLSCallbacks::EncOut() { write_size_ = NodeBIO::FromBIO(enc_out_)->PeekMultiple(data, size, &count); CHECK(write_size_ != 0 && count != 0); - write_req_.data = this; + Local req_wrap_obj = + env()->write_wrap_constructor_function()->NewInstance(); + char* storage = new char[sizeof(WriteWrap)]; + WriteWrap* write_req = new(storage) WriteWrap(env(), + req_wrap_obj, + this, + EncOutCb); + uv_buf_t buf[ARRAY_SIZE(data)]; for (size_t i = 0; i < count; i++) buf[i] = uv_buf_init(data[i], size[i]); - int r = uv_write(&write_req_, wrap()->stream(), buf, count, EncOutCb); + int r = stream_->DoWrite(write_req, buf, count, nullptr); // Ignore errors, this should be already handled in js - if (!r) { - if (wrap()->is_tcp()) { - NODE_COUNT_NET_BYTES_SENT(write_size_); - } else if (wrap()->is_named_pipe()) { - NODE_COUNT_PIPE_BYTES_SENT(write_size_); - } - } + if (!r) + NODE_COUNT_NET_BYTES_SENT(write_size_); } -void TLSCallbacks::EncOutCb(uv_write_t* req, int status) { - TLSCallbacks* callbacks = static_cast(req->data); +void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { + TLSWrap* wrap = req_wrap->wrap()->Cast(); // Handle error if (status) { // Ignore errors after shutdown - if (callbacks->shutdown_) + if (wrap->shutdown_) return; // Notify about error - callbacks->InvokeQueued(status); + wrap->InvokeQueued(status); return; } // Commit - NodeBIO::FromBIO(callbacks->enc_out_)->Read(nullptr, callbacks->write_size_); + NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); // Try writing more data - callbacks->write_size_ = 0; - callbacks->EncOut(); + wrap->write_size_ = 0; + wrap->EncOut(); } -Local TLSCallbacks::GetSSLError(int status, int* err, const char** msg) { +Local TLSWrap::GetSSLError(int status, int* err, const char** msg) { EscapableHandleScope scope(env()->isolate()); *err = SSL_get_error(ssl_, status); @@ -373,7 +384,7 @@ Local TLSCallbacks::GetSSLError(int status, int* err, const char** msg) { } -void TLSCallbacks::ClearOut() { +void TLSWrap::ClearOut() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return; @@ -389,22 +400,30 @@ void TLSCallbacks::ClearOut() { char out[kClearOutChunkSize]; int read; - do { + for (;;) { read = SSL_read(ssl_, out, sizeof(out)); - if (read > 0) { - Local argv[] = { - Integer::New(env()->isolate(), read), - Buffer::New(env(), out, read) - }; - wrap()->MakeCallback(env()->onread_string(), ARRAY_SIZE(argv), argv); + + if (read <= 0) + break; + + while (read > 0) { + int avail = read; + + uv_buf_t buf; + OnAlloc(avail, &buf); + if (static_cast(buf.len) < avail) + avail = buf.len; + memcpy(buf.base, out, avail); + OnRead(avail, &buf, UV_UNKNOWN_HANDLE); + + read -= avail; } - } while (read > 0); + } int flags = SSL_get_shutdown(ssl_); if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) { eof_ = true; - Local arg = Integer::New(env()->isolate(), UV_EOF); - wrap()->MakeCallback(env()->onread_string(), 1, &arg); + OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE); } if (read == -1) { @@ -427,7 +446,7 @@ void TLSCallbacks::ClearOut() { } -bool TLSCallbacks::ClearIn() { +bool TLSWrap::ClearIn() { // Ignore cycling data if ClientHello wasn't yet parsed if (!hello_parser_.IsEnded()) return false; @@ -466,28 +485,67 @@ bool TLSCallbacks::ClearIn() { } -const char* TLSCallbacks::Error() const { +void* TLSWrap::Cast() { + return reinterpret_cast(this); +} + + +AsyncWrap* TLSWrap::GetAsyncWrap() { + return static_cast(this); +} + + +bool TLSWrap::IsIPCPipe() const { + return stream_->IsIPCPipe(); +} + + +int TLSWrap::GetFD() const { + return stream_->GetFD(); +} + + +bool TLSWrap::IsAlive() const { + return stream_->IsAlive(); +} + + +bool TLSWrap::IsClosing() const { + return stream_->IsClosing(); +} + + +int TLSWrap::ReadStart() { + return stream_->ReadStart(); +} + + +int TLSWrap::ReadStop() { + return stream_->ReadStop(); +} + + +const char* TLSWrap::Error() const { return error_; } -void TLSCallbacks::ClearError() { +void TLSWrap::ClearError() { delete[] error_; error_ = nullptr; } -int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { +int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { // TODO(indutny): Support it return 0; } -int TLSCallbacks::DoWrite(WriteWrap* w, - uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle, - uv_write_cb cb) { +int TLSWrap::DoWrite(WriteWrap* w, + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle) { CHECK_EQ(send_handle, nullptr); bool empty = true; @@ -504,11 +562,11 @@ int TLSCallbacks::DoWrite(WriteWrap* w, // However if there any data that should be written to socket, // callback should not be invoked immediately if (BIO_pending(enc_out_) == 0) - return uv_write(&w->req_, wrap()->stream(), bufs, count, cb); + return stream_->DoWrite(w, bufs, count, send_handle); } // Queue callback to execute it on next tick - write_item_queue_.PushBack(new WriteItem(w, cb)); + write_item_queue_.PushBack(new WriteItem(w)); // Write queued data if (empty) { @@ -552,24 +610,51 @@ int TLSCallbacks::DoWrite(WriteWrap* w, } -void TLSCallbacks::AfterWrite(WriteWrap* w) { +void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { // Intentionally empty } -void TLSCallbacks::DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { +void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { + TLSWrap* wrap = static_cast(ctx); + size_t size = 0; - buf->base = NodeBIO::FromBIO(enc_in_)->PeekWritable(&size); + buf->base = NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size); buf->len = size; } -void TLSCallbacks::DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { +void TLSWrap::OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + TLSWrap* wrap = static_cast(ctx); + wrap->DoRead(nread, buf, pending); +} + + +void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) { + buf->base = static_cast(malloc(suggested_size)); + CHECK_NE(buf->base, nullptr); + buf->len = suggested_size; +} + + +void TLSWrap::OnReadSelf(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + TLSWrap* wrap = static_cast(ctx); + Local buf_obj; + if (buf != nullptr) + buf_obj = Buffer::Use(wrap->env(), buf->base, buf->len); + wrap->EmitData(nread, buf_obj, Local()); +} + + +void TLSWrap::DoRead(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending) { if (nread < 0) { // Error should be emitted only after all data was read ClearOut(); @@ -583,8 +668,7 @@ void TLSCallbacks::DoRead(uv_stream_t* handle, HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); - Local arg = Integer::New(env()->isolate(), nread); - wrap()->MakeCallback(env()->onread_string(), 1, &arg); + OnRead(nread, nullptr, UV_UNKNOWN_HANDLE); return; } @@ -608,19 +692,19 @@ void TLSCallbacks::DoRead(uv_stream_t* handle, } -int TLSCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) { +int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) { if (SSL_shutdown(ssl_) == 0) SSL_shutdown(ssl_); shutdown_ = true; EncOut(); - return StreamWrapCallbacks::DoShutdown(req_wrap, cb); + return stream_->DoShutdown(req_wrap); } -void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo& args) { +void TLSWrap::SetVerifyMode(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); if (args.Length() < 2 || !args[0]->IsBoolean() || !args[1]->IsBoolean()) return env->ThrowTypeError("Bad arguments, expected two booleans"); @@ -647,34 +731,34 @@ void TLSCallbacks::SetVerifyMode(const FunctionCallbackInfo& args) { } -void TLSCallbacks::EnableSessionCallbacks( +void TLSWrap::EnableSessionCallbacks( const FunctionCallbackInfo& args) { - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); wrap->enable_session_callbacks(); EnableHelloParser(args); } -void TLSCallbacks::EnableHelloParser(const FunctionCallbackInfo& args) { - TLSCallbacks* wrap = Unwrap(args.Holder()); +void TLSWrap::EnableHelloParser(const FunctionCallbackInfo& args) { + TLSWrap* wrap = Unwrap(args.Holder()); NodeBIO::FromBIO(wrap->enc_in_)->set_initial(kMaxHelloLength); - wrap->hello_parser_.Start(SSLWrap::OnClientHello, + wrap->hello_parser_.Start(SSLWrap::OnClientHello, OnClientHelloParseEnd, wrap); } -void TLSCallbacks::OnClientHelloParseEnd(void* arg) { - TLSCallbacks* c = static_cast(arg); +void TLSWrap::OnClientHelloParseEnd(void* arg) { + TLSWrap* c = static_cast(arg); c->Cycle(); } #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB -void TLSCallbacks::GetServername(const FunctionCallbackInfo& args) { +void TLSWrap::GetServername(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); const char* servername = SSL_get_servername(wrap->ssl_, TLSEXT_NAMETYPE_host_name); @@ -686,10 +770,10 @@ void TLSCallbacks::GetServername(const FunctionCallbackInfo& args) { } -void TLSCallbacks::SetServername(const FunctionCallbackInfo& args) { +void TLSWrap::SetServername(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); - TLSCallbacks* wrap = Unwrap(args.Holder()); + TLSWrap* wrap = Unwrap(args.Holder()); if (args.Length() < 1 || !args[0]->IsString()) return env->ThrowTypeError("First argument should be a string"); @@ -707,8 +791,8 @@ void TLSCallbacks::SetServername(const FunctionCallbackInfo& args) { } -int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { - TLSCallbacks* p = static_cast(SSL_get_app_data(s)); +int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { + TLSWrap* p = static_cast(SSL_get_app_data(s)); Environment* env = p->env(); const char* servername = SSL_get_servername(s, TLSEXT_NAMETYPE_host_name); @@ -744,12 +828,12 @@ int TLSCallbacks::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB -void TLSCallbacks::Initialize(Handle target, +void TLSWrap::Initialize(Handle target, Handle unused, Handle context) { Environment* env = Environment::GetCurrent(context); - env->SetMethod(target, "wrap", TLSCallbacks::Wrap); + env->SetMethod(target, "wrap", TLSWrap::Wrap); Local t = FunctionTemplate::New(env->isolate()); t->InstanceTemplate()->SetInternalFieldCount(1); @@ -761,16 +845,18 @@ void TLSCallbacks::Initialize(Handle target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "enableHelloParser", EnableHelloParser); - SSLWrap::AddMethods(env, t); + StreamBase::AddMethods(env, t); + SSLWrap::AddMethods(env, t); #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB env->SetProtoMethod(t, "getServername", GetServername); env->SetProtoMethod(t, "setServername", SetServername); #endif // SSL_CRT_SET_TLSEXT_SERVERNAME_CB + env->set_tls_wrap_constructor_template(t); env->set_tls_wrap_constructor_function(t->GetFunction()); } } // namespace node -NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSCallbacks::Initialize) +NODE_MODULE_CONTEXT_AWARE_BUILTIN(tls_wrap, node::TLSWrap::Initialize) diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 3815878d586c15..42452055ced275 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -21,33 +21,33 @@ namespace crypto { class SecureContext; } -class TLSCallbacks : public crypto::SSLWrap, - public StreamWrapCallbacks, - public AsyncWrap { +class TLSWrap : public crypto::SSLWrap, + public StreamBase, + public AsyncWrap { public: - ~TLSCallbacks() override; + ~TLSWrap() override; static void Initialize(v8::Handle target, v8::Handle unused, v8::Handle context); - const char* Error() const override; - void ClearError() override; - int TryWrite(uv_buf_t** bufs, size_t* count) override; + void* Cast() override; + int GetFD() const override; + bool IsAlive() const override; + bool IsClosing() const override; + + // JavaScript functions + int ReadStart() override; + int ReadStop() override; + + int DoShutdown(ShutdownWrap* req_wrap) override; + int DoTryWrite(uv_buf_t** bufs, size_t* count) override; int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, - uv_stream_t* send_handle, - uv_write_cb cb) override; - void AfterWrite(WriteWrap* w) override; - void DoAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) override; - void DoRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) override; - int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) override; + uv_stream_t* send_handle) override; + const char* Error() const override; + void ClearError() override; void NewSessionDoneCb(); @@ -66,27 +66,26 @@ class TLSCallbacks : public crypto::SSLWrap, // Write callback queue's item class WriteItem { public: - WriteItem(WriteWrap* w, uv_write_cb cb) : w_(w), cb_(cb) { + explicit WriteItem(WriteWrap* w) : w_(w) { } ~WriteItem() { w_ = nullptr; - cb_ = nullptr; } WriteWrap* w_; - uv_write_cb cb_; ListNode member_; }; - TLSCallbacks(Environment* env, - Kind kind, - v8::Handle sc, - StreamWrapCallbacks* old); + TLSWrap(Environment* env, + Kind kind, + StreamBase* steram, + v8::Handle stream_obj, + v8::Handle sc); static void SSLInfoCallback(const SSL* ssl_, int where, int ret); void InitSSL(); void EncOut(); - static void EncOutCb(uv_write_t* req, int status); + static void EncOutCb(WriteWrap* req_wrap, int status); bool ClearIn(); void ClearOut(); void MakePending(); @@ -104,6 +103,25 @@ class TLSCallbacks : public crypto::SSLWrap, } } + AsyncWrap* GetAsyncWrap() override; + bool IsIPCPipe() const override; + + // Resource implementation + static void OnAfterWriteImpl(WriteWrap* w, void* ctx); + static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + static void OnAfterWriteSelf(WriteWrap* w, void* ctx); + static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx); + static void OnReadSelf(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx); + + void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); + // If |msg| is not nullptr, caller is responsible for calling `delete[] *msg`. v8::Local GetSSLError(int status, int* err, const char** msg); @@ -125,10 +143,11 @@ class TLSCallbacks : public crypto::SSLWrap, crypto::SecureContext* sc_; v8::Persistent sc_handle_; + StreamBase* stream_; + v8::Persistent stream_handle_; BIO* enc_in_; BIO* enc_out_; NodeBIO* clear_in_; - uv_write_t write_req_; size_t write_size_; size_t write_queue_size_; typedef ListHead WriteItemList; diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index 08c50d911f7482..186f2f0100162e 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -36,26 +36,10 @@ void TTYWrap::Initialize(Handle target, t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "TTY")); t->InstanceTemplate()->SetInternalFieldCount(1); - enum PropertyAttribute attributes = - static_cast(v8::ReadOnly | v8::DontDelete); - t->InstanceTemplate()->SetAccessor(env->fd_string(), - StreamWrap::GetFD, - nullptr, - Handle(), - v8::DEFAULT, - attributes); - env->SetProtoMethod(t, "close", HandleWrap::Close); env->SetProtoMethod(t, "unref", HandleWrap::Unref); - env->SetProtoMethod(t, "readStart", StreamWrap::ReadStart); - env->SetProtoMethod(t, "readStop", StreamWrap::ReadStop); - - env->SetProtoMethod(t, "writeBuffer", StreamWrap::WriteBuffer); - env->SetProtoMethod(t, "writeAsciiString", StreamWrap::WriteAsciiString); - env->SetProtoMethod(t, "writeUtf8String", StreamWrap::WriteUtf8String); - env->SetProtoMethod(t, "writeUcs2String", StreamWrap::WriteUcs2String); - env->SetProtoMethod(t, "writeBinaryString", StreamWrap::WriteBinaryString); + StreamWrap::AddMethods(env, t); env->SetProtoMethod(t, "getWindowSize", TTYWrap::GetWindowSize); env->SetProtoMethod(t, "setRawMode", SetRawMode); diff --git a/test/parallel/test-tls-client-default-ciphers.js b/test/parallel/test-tls-client-default-ciphers.js index 1eb74e6981fd69..dfae4a7bb9a9e5 100644 --- a/test/parallel/test-tls-client-default-ciphers.js +++ b/test/parallel/test-tls-client-default-ciphers.js @@ -2,13 +2,21 @@ var assert = require('assert'); var common = require('../common'); var tls = require('tls'); +function Done() {} + function test1() { var ciphers = ''; + tls.createSecureContext = function(options) { - ciphers = options.ciphers + ciphers = options.ciphers; + throw new Done(); + } + + try { + var s = tls.connect(common.PORT); + } catch (e) { + assert(e instanceof Done); } - var s = tls.connect(common.PORT); - s.destroy(); assert.equal(ciphers, tls.DEFAULT_CIPHERS); } test1(); diff --git a/test/parallel/test-tls-close-notify.js b/test/parallel/test-tls-close-notify.js index 54f7314e2f7da0..c5decad5e51fb8 100644 --- a/test/parallel/test-tls-close-notify.js +++ b/test/parallel/test-tls-close-notify.js @@ -17,8 +17,8 @@ var server = tls.createServer({ cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem') }, function(c) { // Send close-notify without shutting down TCP socket - if (c.ssl.shutdown() !== 1) - c.ssl.shutdown(); + if (c._handle.shutdownSSL() !== 1) + c._handle.shutdownSSL(); }).listen(common.PORT, function() { var c = tls.connect(common.PORT, { rejectUnauthorized: false diff --git a/test/parallel/test-tls-multi-key.js b/test/parallel/test-tls-multi-key.js index cdf85008745e2e..657d9084d44d78 100644 --- a/test/parallel/test-tls-multi-key.js +++ b/test/parallel/test-tls-multi-key.js @@ -28,15 +28,14 @@ var server = tls.createServer(options, function(conn) { ciphers: 'ECDHE-ECDSA-AES256-GCM-SHA384', rejectUnauthorized: false }, function() { + ciphers.push(ecdsa.getCipher()); var rsa = tls.connect(common.PORT, { ciphers: 'ECDHE-RSA-AES256-GCM-SHA384', rejectUnauthorized: false }, function() { + ciphers.push(rsa.getCipher()); ecdsa.destroy(); rsa.destroy(); - - ciphers.push(ecdsa.getCipher()); - ciphers.push(rsa.getCipher()); server.close(); }); });