Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): add Resolve, used by HttpConnector #1674

Merged
merged 1 commit into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat(client): add Resolve, used by HttpConnector
This introduces a `Resolve` trait to describe asynchronous DNS
resolution. The `HttpConnector` can be configured with a resolver,
allowing a user to still use all the functionality of the
`HttpConnector`, while customizing the DNS resolution.

To prevent a breaking change, the `HttpConnector` has its `Resolve`
generic set by default to `GaiResolver`. This is same as the existing
resolver, which uses `getaddrinfo` inside a thread pool.

Closes #1517
  • Loading branch information
seanmonstar committed Oct 17, 2018
commit 70782c56351e4d4a81f9be04c3e23e988b3ec3ca
196 changes: 179 additions & 17 deletions src/client/connect/dns.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,183 @@
use std::io;
use std::{fmt, io, vec};
use std::net::{
Ipv4Addr, Ipv6Addr,
IpAddr, Ipv4Addr, Ipv6Addr,
SocketAddr, ToSocketAddrs,
SocketAddrV4, SocketAddrV6,
};
use std::vec;
use std::sync::Arc;

use ::futures::{Async, Future, Poll};
use futures::{Async, Future, Poll};
use futures::future::{Executor, ExecuteError};
use futures::sync::oneshot;
use futures_cpupool::{Builder as CpuPoolBuilder};

pub struct Work {
use self::sealed::GaiTask;

/// Resolve a hostname to a set of IP addresses.
pub trait Resolve {
/// The set of IP addresses to try to connect to.
type Addrs: Iterator<Item=IpAddr>;
/// A Future of the resolved set of addresses.
type Future: Future<Item=Self::Addrs, Error=io::Error>;
/// Resolve a hostname.
fn resolve(&self, name: Name) -> Self::Future;
}

/// A domain name to resolve into IP addresses.
pub struct Name {
host: String,
}

/// A resolver using blocking `getaddrinfo` calls in a threadpool.
#[derive(Clone)]
pub struct GaiResolver {
executor: GaiExecutor,
}

pub struct GaiAddrs {
inner: IpAddrs,
}

pub struct GaiFuture {
rx: oneshot::SpawnHandle<IpAddrs, io::Error>,
}

impl Name {
pub(super) fn new(host: String) -> Name {
Name {
host,
}
}

/// View the hostname as a string slice.
pub fn as_str(&self) -> &str {
&self.host
}
}

impl fmt::Debug for Name {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.host, f)
}
}

impl GaiResolver {
/// Construct a new `GaiResolver`.
///
/// Takes number of DNS worker threads.
pub fn new(threads: usize) -> Self {
let pool = CpuPoolBuilder::new()
.name_prefix("hyper-dns")
.pool_size(threads)
.create();
GaiResolver::new_with_executor(pool)
}

/// Construct a new `GaiResolver` with a shared thread pool executor.
///
/// Takes an executor to run blocking `getaddrinfo` tasks on.
pub fn new_with_executor<E: 'static>(executor: E) -> Self
where
E: Executor<GaiTask> + Send + Sync,
{
GaiResolver {
executor: GaiExecutor(Arc::new(executor)),
}
}
}

impl Resolve for GaiResolver {
type Addrs = GaiAddrs;
type Future = GaiFuture;

fn resolve(&self, name: Name) -> Self::Future {
let blocking = GaiBlocking::new(name.host);
let rx = oneshot::spawn(blocking, &self.executor);
GaiFuture {
rx,
}
}
}

impl fmt::Debug for GaiResolver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("GaiResolver")
}
}

impl Future for GaiFuture {
type Item = GaiAddrs;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let addrs = try_ready!(self.rx.poll());
Ok(Async::Ready(GaiAddrs {
inner: addrs,
}))
}
}

impl fmt::Debug for GaiFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("GaiFuture")
}
}

impl Iterator for GaiAddrs {
type Item = IpAddr;

fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|sa| sa.ip())
}
}

impl fmt::Debug for GaiAddrs {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("GaiAddrs")
}
}

#[derive(Clone)]
struct GaiExecutor(Arc<Executor<GaiTask> + Send + Sync>);

impl Executor<oneshot::Execute<GaiBlocking>> for GaiExecutor {
fn execute(&self, future: oneshot::Execute<GaiBlocking>) -> Result<(), ExecuteError<oneshot::Execute<GaiBlocking>>> {
self.0.execute(GaiTask { work: future })
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
}
}

pub(super) struct GaiBlocking {
host: String,
port: u16
}

impl Work {
pub fn new(host: String, port: u16) -> Work {
Work { host: host, port: port }
impl GaiBlocking {
pub(super) fn new(host: String) -> GaiBlocking {
GaiBlocking { host }
}
}

impl Future for Work {
impl Future for GaiBlocking {
type Item = IpAddrs;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
debug!("resolving host={:?}, port={:?}", self.host, self.port);
(&*self.host, self.port).to_socket_addrs()
debug!("resolving host={:?}", self.host);
(&*self.host, 0).to_socket_addrs()
.map(|i| Async::Ready(IpAddrs { iter: i }))
}
}

pub struct IpAddrs {
pub(super) struct IpAddrs {
iter: vec::IntoIter<SocketAddr>,
}

impl IpAddrs {
pub fn new(addrs: Vec<SocketAddr>) -> Self {
pub(super) fn new(addrs: Vec<SocketAddr>) -> Self {
IpAddrs { iter: addrs.into_iter() }
}

pub fn try_parse(host: &str, port: u16) -> Option<IpAddrs> {
pub(super) fn try_parse(host: &str, port: u16) -> Option<IpAddrs> {
if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port);
return Some(IpAddrs { iter: vec![SocketAddr::V4(addr)].into_iter() })
Expand All @@ -51,7 +189,7 @@ impl IpAddrs {
None
}

pub fn split_by_preference(self) -> (IpAddrs, IpAddrs) {
pub(super) fn split_by_preference(self) -> (IpAddrs, IpAddrs) {
let preferring_v6 = self.iter
.as_slice()
.first()
Expand All @@ -64,7 +202,7 @@ impl IpAddrs {
(IpAddrs::new(preferred), IpAddrs::new(fallback))
}

pub fn is_empty(&self) -> bool {
pub(super) fn is_empty(&self) -> bool {
self.iter.as_slice().is_empty()
}
}
Expand All @@ -77,6 +215,30 @@ impl Iterator for IpAddrs {
}
}

// Make this Future unnameable outside of this crate.
pub(super) mod sealed {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct GaiTask {
pub(super) work: oneshot::Execute<GaiBlocking>
}

impl fmt::Debug for GaiTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("GaiTask")
}
}

impl Future for GaiTask {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}

#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
Expand Down
Loading