Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fast-path optimizations for reading LibuvStreams #14667

Merged
merged 1 commit into from
Jan 14, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ end
# A dict of all libuv handles that are being waited on somewhere in the system
# and should thus not be garbage collected
const uvhandles = ObjectIdDict()
preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)+1
unpreserve_handle(x) = (v = uvhandles[x]; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing)
preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)::Int+1
unpreserve_handle(x) = (v = uvhandles[x]::Int; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing)

function stream_wait(x, c...) # for x::LibuvObject
preserve_handle(x)
Expand Down Expand Up @@ -87,8 +87,13 @@ end
nb_available(s::LibuvStream) = nb_available(s.buffer)

function eof(s::LibuvStream)
if isopen(s) # fast path
nb_available(s) > 0 && return true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return false instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do add a test when you fix this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in a0b5414

else
return nb_available(s) <= 0
end
wait_readnb(s,1)
!isopen(s) && nb_available(s)<=0
!isopen(s) && nb_available(s) <= 0
end

const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
Expand Down Expand Up @@ -330,6 +335,11 @@ function wait_connected(x::Union{LibuvStream, LibuvServer})
end

function wait_readbyte(x::LibuvStream, c::UInt8)
if isopen(x) # fast path
search(x.buffer, c) > 0 && return
else
return
end
preserve_handle(x)
try
while isopen(x) && search(x.buffer, c) <= 0
Expand All @@ -342,9 +352,15 @@ function wait_readbyte(x::LibuvStream, c::UInt8)
end
unpreserve_handle(x)
end
nothing
end

function wait_readnb(x::LibuvStream, nb::Int)
if isopen(x) # fast path
nb_available(x.buffer) >= nb && return
else
return
end
oldthrottle = x.throttle
preserve_handle(x)
try
Expand All @@ -362,12 +378,14 @@ function wait_readnb(x::LibuvStream, nb::Int)
end
unpreserve_handle(x)
end
nothing
end

function wait_close(x::Union{LibuvStream, LibuvServer})
if isopen(x)
stream_wait(x, x.closenotify)
end
nothing
end

function close(stream::Union{LibuvStream, LibuvServer})
Expand Down Expand Up @@ -876,6 +894,14 @@ function stop_reading(stream::LibuvStream)
end
end

function readbytes!(s::LibuvStream, b::AbstractArray{UInt8}, nb=length(b))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me this should share code with read!; I recall a significant amount of work went into that. Not clear we even need both read and readbytes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think they differ a bit on what happens with the stream is closed before the array is fully populated (not that i think we shouldn't change that)

wait_readnb(s, nb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line breaks MbedTLS and everything that depends on it. I think wait_readnb should probably have a wider signature, Integer instead of Int?

function wait_readnb(x::LibuvStream, nb::Int)

(also in need of tests)

nr = nb_available(s)
resize!(b, nr) # shrink to just contain input data if was resized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this line won't work if you try to give it an array that shares data, cannot resize array with shared data - MbedTLS.jl does this via pointer_to_array with an array that is managed by the C library.

read!(s.buffer, b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't work for UDPSocket or anything else without a buffer field

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also doesn't support read or write (it uses send and recv), so i'm not too worried about that. we could extend the type-hierarchy to exclude it (because it isn't a stream) explicitly, but to what end?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a buggy type because of this field assumption. I forget the issue number, but sounds like this abstract type isn't appropriate for it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why something should be an IO type if it doesn't have read and write.

return nr
end

function readbytes(stream::LibuvStream)
wait_readnb(stream, typemax(Int))
return takebuf_array(stream.buffer)
Expand Down