diff --git a/base/stream.jl b/base/stream.jl index 01d5c7c15c3a2..74147f2489605 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -50,8 +50,8 @@ end # A dict of all libuv handles that are being waited on somewhere in the system # and should thus not be garbage collected const uvhandles = ObjectIdDict() -preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)+1 -unpreserve_handle(x) = (v = uvhandles[x]; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing) +preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)::Int+1 +unpreserve_handle(x) = (v = uvhandles[x]::Int; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing) function stream_wait(x, c...) # for x::LibuvObject preserve_handle(x) @@ -87,8 +87,13 @@ end nb_available(s::LibuvStream) = nb_available(s.buffer) function eof(s::LibuvStream) + if isopen(s) # fast path + nb_available(s) > 0 && return true + else + return nb_available(s) <= 0 + end wait_readnb(s,1) - !isopen(s) && nb_available(s)<=0 + !isopen(s) && nb_available(s) <= 0 end const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB @@ -330,6 +335,11 @@ function wait_connected(x::Union{LibuvStream, LibuvServer}) end function wait_readbyte(x::LibuvStream, c::UInt8) + if isopen(x) # fast path + search(x.buffer, c) > 0 && return + else + return + end preserve_handle(x) try while isopen(x) && search(x.buffer, c) <= 0 @@ -342,9 +352,15 @@ function wait_readbyte(x::LibuvStream, c::UInt8) end unpreserve_handle(x) end + nothing end function wait_readnb(x::LibuvStream, nb::Int) + if isopen(x) # fast path + nb_available(x.buffer) >= nb && return + else + return + end oldthrottle = x.throttle preserve_handle(x) try @@ -362,12 +378,14 @@ function wait_readnb(x::LibuvStream, nb::Int) end unpreserve_handle(x) end + nothing end function wait_close(x::Union{LibuvStream, LibuvServer}) if isopen(x) stream_wait(x, x.closenotify) end + nothing end function close(stream::Union{LibuvStream, LibuvServer}) @@ -876,6 +894,14 @@ function stop_reading(stream::LibuvStream) end end +function readbytes!(s::LibuvStream, b::AbstractArray{UInt8}, nb=length(b)) + wait_readnb(s, nb) + nr = nb_available(s) + resize!(b, nr) # shrink to just contain input data if was resized + read!(s.buffer, b) + return nr +end + function readbytes(stream::LibuvStream) wait_readnb(stream, typemax(Int)) return takebuf_array(stream.buffer)