-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add fast-path optimizations for reading LibuvStreams #14667
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -50,8 +50,8 @@ end | |||
# A dict of all libuv handles that are being waited on somewhere in the system | ||||
# and should thus not be garbage collected | ||||
const uvhandles = ObjectIdDict() | ||||
preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)+1 | ||||
unpreserve_handle(x) = (v = uvhandles[x]; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing) | ||||
preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)::Int+1 | ||||
unpreserve_handle(x) = (v = uvhandles[x]::Int; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing) | ||||
|
||||
function stream_wait(x, c...) # for x::LibuvObject | ||||
preserve_handle(x) | ||||
|
@@ -87,8 +87,13 @@ end | |||
nb_available(s::LibuvStream) = nb_available(s.buffer) | ||||
|
||||
function eof(s::LibuvStream) | ||||
if isopen(s) # fast path | ||||
nb_available(s) > 0 && return true | ||||
else | ||||
return nb_available(s) <= 0 | ||||
end | ||||
wait_readnb(s,1) | ||||
!isopen(s) && nb_available(s)<=0 | ||||
!isopen(s) && nb_available(s) <= 0 | ||||
end | ||||
|
||||
const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB | ||||
|
@@ -330,6 +335,11 @@ function wait_connected(x::Union{LibuvStream, LibuvServer}) | |||
end | ||||
|
||||
function wait_readbyte(x::LibuvStream, c::UInt8) | ||||
if isopen(x) # fast path | ||||
search(x.buffer, c) > 0 && return | ||||
else | ||||
return | ||||
end | ||||
preserve_handle(x) | ||||
try | ||||
while isopen(x) && search(x.buffer, c) <= 0 | ||||
|
@@ -342,9 +352,15 @@ function wait_readbyte(x::LibuvStream, c::UInt8) | |||
end | ||||
unpreserve_handle(x) | ||||
end | ||||
nothing | ||||
end | ||||
|
||||
function wait_readnb(x::LibuvStream, nb::Int) | ||||
if isopen(x) # fast path | ||||
nb_available(x.buffer) >= nb && return | ||||
else | ||||
return | ||||
end | ||||
oldthrottle = x.throttle | ||||
preserve_handle(x) | ||||
try | ||||
|
@@ -362,12 +378,14 @@ function wait_readnb(x::LibuvStream, nb::Int) | |||
end | ||||
unpreserve_handle(x) | ||||
end | ||||
nothing | ||||
end | ||||
|
||||
function wait_close(x::Union{LibuvStream, LibuvServer}) | ||||
if isopen(x) | ||||
stream_wait(x, x.closenotify) | ||||
end | ||||
nothing | ||||
end | ||||
|
||||
function close(stream::Union{LibuvStream, LibuvServer}) | ||||
|
@@ -876,6 +894,14 @@ function stop_reading(stream::LibuvStream) | |||
end | ||||
end | ||||
|
||||
function readbytes!(s::LibuvStream, b::AbstractArray{UInt8}, nb=length(b)) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to me this should share code with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think they differ a bit on what happens with the stream is closed before the array is fully populated (not that i think we shouldn't change that) |
||||
wait_readnb(s, nb) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line breaks MbedTLS and everything that depends on it. I think Line 358 in 0a00109
(also in need of tests) |
||||
nr = nb_available(s) | ||||
resize!(b, nr) # shrink to just contain input data if was resized | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And this line won't work if you try to give it an array that shares data, |
||||
read!(s.buffer, b) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this won't work for UDPSocket or anything else without a buffer field There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it also doesn't support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's a buggy type because of this field assumption. I forget the issue number, but sounds like this abstract type isn't appropriate for it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why something should be an |
||||
return nr | ||||
end | ||||
|
||||
function readbytes(stream::LibuvStream) | ||||
wait_readnb(stream, typemax(Int)) | ||||
return takebuf_array(stream.buffer) | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return
false
instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do add a test when you fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in a0b5414