Skip to content

Commit

Permalink
Merge pull request #14667 from JuliaLang/jn/fast-io
Browse files Browse the repository at this point in the history
add fast-path optimizations for reading LibuvStreams
  • Loading branch information
vtjnash committed Jan 14, 2016
2 parents 9d14d39 + 9dbc3ab commit 918f14e
Showing 1 changed file with 29 additions and 3 deletions.
32 changes: 29 additions & 3 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ end
# A dict of all libuv handles that are being waited on somewhere in the system
# and should thus not be garbage collected
const uvhandles = ObjectIdDict()
preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)+1
unpreserve_handle(x) = (v = uvhandles[x]; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing)
preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)::Int+1
unpreserve_handle(x) = (v = uvhandles[x]::Int; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing)

function stream_wait(x, c...) # for x::LibuvObject
preserve_handle(x)
Expand Down Expand Up @@ -87,8 +87,13 @@ end
nb_available(s::LibuvStream) = nb_available(s.buffer)

function eof(s::LibuvStream)
if isopen(s) # fast path
nb_available(s) > 0 && return true
else
return nb_available(s) <= 0
end
wait_readnb(s,1)
!isopen(s) && nb_available(s)<=0
!isopen(s) && nb_available(s) <= 0
end

const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
Expand Down Expand Up @@ -330,6 +335,11 @@ function wait_connected(x::Union{LibuvStream, LibuvServer})
end

function wait_readbyte(x::LibuvStream, c::UInt8)
if isopen(x) # fast path
search(x.buffer, c) > 0 && return
else
return
end
preserve_handle(x)
try
while isopen(x) && search(x.buffer, c) <= 0
Expand All @@ -342,9 +352,15 @@ function wait_readbyte(x::LibuvStream, c::UInt8)
end
unpreserve_handle(x)
end
nothing
end

function wait_readnb(x::LibuvStream, nb::Int)
if isopen(x) # fast path
nb_available(x.buffer) >= nb && return
else
return
end
oldthrottle = x.throttle
preserve_handle(x)
try
Expand All @@ -362,12 +378,14 @@ function wait_readnb(x::LibuvStream, nb::Int)
end
unpreserve_handle(x)
end
nothing
end

function wait_close(x::Union{LibuvStream, LibuvServer})
if isopen(x)
stream_wait(x, x.closenotify)
end
nothing
end

function close(stream::Union{LibuvStream, LibuvServer})
Expand Down Expand Up @@ -876,6 +894,14 @@ function stop_reading(stream::LibuvStream)
end
end

function readbytes!(s::LibuvStream, b::AbstractArray{UInt8}, nb=length(b))
wait_readnb(s, nb)
nr = nb_available(s)
resize!(b, nr) # shrink to just contain input data if was resized
read!(s.buffer, b)
return nr
end

function readbytes(stream::LibuvStream)
wait_readnb(stream, typemax(Int))
return takebuf_array(stream.buffer)
Expand Down

0 comments on commit 918f14e

Please sign in to comment.