Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
default to ahash
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 8, 2022
1 parent 1f2116e commit 211e62a
Show file tree
Hide file tree
Showing 25 changed files with 69 additions and 71 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ base64 = { version = "0.13.0", optional = true }
futures = { version = "0.3", optional = true }

# for faster hashing
ahash = { version = "0.7", optional = true }
ahash = { version = "0.7" }

# parquet support
parquet2 = { version = "0.14.0", optional = true, default_features = false }
Expand Down Expand Up @@ -175,7 +175,7 @@ compute_comparison = ["compute_take", "compute_boolean"]
compute_concatenate = []
compute_contains = []
compute_filter = []
compute_hash = ["multiversion", "ahash"]
compute_hash = ["multiversion"]
compute_if_then_else = []
compute_length = []
compute_like = ["regex"]
Expand Down
5 changes: 3 additions & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::fs::File;
use std::{collections::HashMap, io::Read};
use std::{io::Read};

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
AHashMap,
chunk::Chunk,
datatypes::{DataType, Schema},
error::Result,
Expand Down Expand Up @@ -40,7 +41,7 @@ pub fn read_gzip_json(
let (schema, ipc_fields) = read::deserialize_schema(&schema)?;

// read dictionaries
let mut dictionaries = HashMap::new();
let mut dictionaries = AHashMap::new();
if let Some(dicts) = arrow_json.dictionaries {
for json_dict in dicts {
// TODO: convert to a concrete Arrow type
Expand Down
6 changes: 3 additions & 3 deletions integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use arrow2::io::ipc::IpcField;
use serde_json::Value;

use arrow2::chunk::Chunk;
use arrow2::AHashMap;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::json_integration::{read, ArrowJsonBatch, ArrowJsonDictionaryBatch};

use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;

Expand All @@ -43,7 +43,7 @@ pub struct ArrowFile {
pub fields: Vec<IpcField>,
// we can evolve this into a concrete Arrow type
// this is temporarily not being read from
pub _dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
pub _dictionaries: AHashMap<i64, ArrowJsonDictionaryBatch>,
pub chunks: Vec<Chunk<Box<dyn Array>>>,
}

Expand All @@ -54,7 +54,7 @@ pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {

let (schema, fields) = read::deserialize_schema(&arrow_json["schema"])?;
// read dictionaries
let mut dictionaries = HashMap::new();
let mut dictionaries = AHashMap::new();
if let Some(dicts) = arrow_json.get("dictionaries") {
for d in dicts
.as_array()
Expand Down
4 changes: 2 additions & 2 deletions src/array/union/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use ahash::AHashMap;

use crate::{
bitmap::Bitmap,
Expand Down Expand Up @@ -31,7 +31,7 @@ type UnionComponents<'a> = (&'a [Field], Option<&'a [i32]>, UnionMode);
pub struct UnionArray {
types: Buffer<i8>,
// None represents when there is no typeid
fields_hash: Option<HashMap<i8, FieldEntry>>,
fields_hash: Option<AHashMap<i8, FieldEntry>>,
fields: Vec<Box<dyn Array>>,
offsets: Option<Buffer<i32>>,
data_type: DataType,
Expand Down
6 changes: 3 additions & 3 deletions src/compute/like.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Contains "like" operators such as [`like_utf8`] and [`like_utf8_scalar`].
use std::collections::HashMap;
use ahash::AHashMap;
use regex::bytes::Regex as BytesRegex;
use regex::Regex;

Expand Down Expand Up @@ -35,7 +35,7 @@ fn a_like_utf8<O: Offset, F: Fn(bool) -> bool>(

let validity = combine_validities(lhs.validity(), rhs.validity());

let mut map = HashMap::new();
let mut map = AHashMap::new();

let values =
Bitmap::try_from_trusted_len_iter(lhs.iter().zip(rhs.iter()).map(|(lhs, rhs)| {
Expand Down Expand Up @@ -179,7 +179,7 @@ fn a_like_binary<O: Offset, F: Fn(bool) -> bool>(

let validity = combine_validities(lhs.validity(), rhs.validity());

let mut map = HashMap::new();
let mut map = AHashMap::new();

let values =
Bitmap::try_from_trusted_len_iter(lhs.iter().zip(rhs.iter()).map(|(lhs, rhs)| {
Expand Down
5 changes: 3 additions & 2 deletions src/compute/merge_sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@
//! To serialize slices, e.g. for checkpointing or transfer via Arrow's IPC, you can store
//! them as 3 non-null primitive arrays (e.g. `PrimitiveArray<i64>`).
use ahash::AHashMap;
use std::cmp::Ordering;
use std::iter::once;
use std::{cmp::Ordering, collections::HashMap};

use itertools::Itertools;

Expand Down Expand Up @@ -498,7 +499,7 @@ pub fn build_comparator_impl<'a>(
.collect::<Result<Vec<_>>>()?;
Ok(((lhs_index, rhs_index), multi_column_comparator))
})
.collect::<Result<HashMap<(usize, usize), Vec<(IsValid, IsValid, DynComparator)>>>>()?;
.collect::<Result<AHashMap<(usize, usize), Vec<(IsValid, IsValid, DynComparator)>>>>()?;

// prepare a comparison function taking into account _nulls_ and sort options
let cmp = move |left_index, left_row, right_index, right_row| {
Expand Down
5 changes: 2 additions & 3 deletions src/compute/regex_match.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Contains regex matching operators [`regex_match`] and [`regex_match_scalar`].
use std::collections::HashMap;

use ahash::AHashMap;
use regex::Regex;

use super::utils::combine_validities;
Expand All @@ -18,7 +17,7 @@ pub fn regex_match<O: Offset>(values: &Utf8Array<O>, regex: &Utf8Array<O>) -> Re
));
}

let mut map = HashMap::new();
let mut map = AHashMap::new();
let validity = combine_validities(values.validity(), regex.validity());

let iterator = values.iter().zip(regex.iter()).map(|(haystack, regex)| {
Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ macro_rules! avro_decode {

macro_rules! read_header {
($reader:ident $($_await:tt)*) => {{
let mut items = HashMap::new();
let mut items = ahash::AHashMap::new();

loop {
let len = zigzag_i64($reader)$($_await)*? as usize;
Expand Down
5 changes: 2 additions & 3 deletions src/io/avro/read/header.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;

use ahash::AHashMap;
use avro_schema::Schema;
use serde_json;

Expand All @@ -9,7 +8,7 @@ use super::Compression;

/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`].
pub(crate) fn deserialize_header(
header: HashMap<String, Vec<u8>>,
header: AHashMap<String, Vec<u8>>,
) -> Result<(Schema, Option<Compression>)> {
let schema = header
.get("avro.schema")
Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/read/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use ahash::AHashMap;
use std::io::Read;

use avro_schema::Schema;
Expand Down Expand Up @@ -29,7 +29,7 @@ fn _read_binary<R: Read>(reader: &mut R) -> Result<Vec<u8>> {
Ok(buf)
}

fn read_header<R: Read>(reader: &mut R) -> Result<HashMap<String, Vec<u8>>> {
fn read_header<R: Read>(reader: &mut R) -> Result<AHashMap<String, Vec<u8>>> {
read_header!(reader)
}

Expand Down
5 changes: 2 additions & 3 deletions src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Async Avro
use std::collections::HashMap;

use ahash::AHashMap;
use avro_schema::{Record, Schema as AvroSchema};
use futures::AsyncRead;
use futures::AsyncReadExt;
Expand Down Expand Up @@ -56,6 +55,6 @@ async fn _read_binary<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<Vec

async fn read_header<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<HashMap<String, Vec<u8>>> {
) -> Result<AHashMap<String, Vec<u8>>> {
read_header!(reader.await)
}
7 changes: 3 additions & 4 deletions src/io/avro/write/header.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;

use ahash::AHashMap;
use avro_schema::Schema;
use serde_json;

Expand All @@ -11,10 +10,10 @@ use super::Compression;
pub(crate) fn serialize_header(
schema: &Schema,
compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>> {
) -> Result<AHashMap<String, Vec<u8>>> {
let schema = serde_json::to_string(schema).map_err(|e| Error::ExternalFormat(e.to_string()))?;

let mut header = HashMap::<String, Vec<u8>>::default();
let mut header = AHashMap::<String, Vec<u8>>::default();

header.insert("avro.schema".to_string(), schema.into_bytes());
if let Some(compression) = compression {
Expand Down
8 changes: 3 additions & 5 deletions src/io/csv/read/infer_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{
collections::HashSet,
io::{Read, Seek},
};
use ahash::AHashSet;
use std::io::{Read, Seek};

use crate::datatypes::{DataType, Field};
use crate::error::Result;
Expand Down Expand Up @@ -34,7 +32,7 @@ pub fn infer_schema<R: Read + Seek, F: Fn(&[u8]) -> DataType>(

let header_length = headers.len();
// keep track of inferred field types
let mut column_types: Vec<HashSet<DataType>> = vec![HashSet::new(); header_length];
let mut column_types: Vec<AHashSet<DataType>> = vec![AHashSet::new(); header_length];

let mut records_count = 0;

Expand Down
5 changes: 2 additions & 3 deletions src/io/csv/read_async/infer_schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;

use super::{AsyncReader, ByteRecord};
use ahash::AHashSet;

use crate::datatypes::{DataType, Field};
use crate::error::Result;
Expand Down Expand Up @@ -41,7 +40,7 @@ where

let header_length = headers.len();
// keep track of inferred field types
let mut column_types: Vec<HashSet<DataType>> = vec![HashSet::new(); header_length];
let mut column_types: Vec<AHashSet<DataType>> = vec![AHashSet::new(); header_length];

let mut records_count = 0;

Expand Down
7 changes: 3 additions & 4 deletions src/io/csv/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;

use crate::datatypes::{DataType, Field, TimeUnit};
use ahash::AHashSet;

pub(super) const RFC3339: &str = "%Y-%m-%dT%H:%M:%S%.f%:z";

Expand Down Expand Up @@ -78,7 +77,7 @@ pub fn infer(bytes: &[u8]) -> DataType {
}
}

fn merge_fields(field_name: &str, possibilities: &mut HashSet<DataType>) -> Field {
fn merge_fields(field_name: &str, possibilities: &mut AHashSet<DataType>) -> Field {
// determine data type based on possible types
// if there are incompatible types, use DataType::Utf8
let data_type = match possibilities.len() {
Expand All @@ -101,7 +100,7 @@ fn merge_fields(field_name: &str, possibilities: &mut HashSet<DataType>) -> Fiel

pub(crate) fn merge_schema(
headers: &[String],
column_types: &mut [HashSet<DataType>],
column_types: &mut [AHashSet<DataType>],
) -> Vec<Field> {
headers
.iter()
Expand Down
9 changes: 5 additions & 4 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque};
use ahash::AHashMap;
use std::collections::VecDeque;
use std::io::{Read, Seek};

use arrow_format;
Expand Down Expand Up @@ -322,14 +323,14 @@ mod tests {
pub fn prepare_projection(
fields: &[Field],
mut projection: Vec<usize>,
) -> (Vec<usize>, HashMap<usize, usize>, Vec<Field>) {
) -> (Vec<usize>, AHashMap<usize, usize>, Vec<Field>) {
let fields = projection.iter().map(|x| fields[*x].clone()).collect();

// todo: find way to do this more efficiently
let mut indices = (0..projection.len()).collect::<Vec<_>>();
indices.sort_unstable_by_key(|&i| &projection[i]);
let map = indices.iter().copied().enumerate().fold(
HashMap::default(),
AHashMap::default(),
|mut acc, (index, new_index)| {
acc.insert(index, new_index);
acc
Expand All @@ -355,7 +356,7 @@ pub fn prepare_projection(

pub fn apply_projection(
chunk: Chunk<Box<dyn Array>>,
map: &HashMap<usize, usize>,
map: &AHashMap<usize, usize>,
) -> Chunk<Box<dyn Array>> {
// re-order according to projection
let arrays = chunk.into_arrays();
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Async reader for Arrow IPC files
use std::collections::HashMap;
use ahash::AHashMap;
use std::io::SeekFrom;

use arrow_format::ipc::{planus::ReadAsRoot, Block, MessageHeaderRef};
Expand Down Expand Up @@ -73,7 +73,7 @@ impl<'a> FileStream<'a> {
mut reader: R,
mut dictionaries: Option<Dictionaries>,
metadata: FileMetadata,
projection: Option<(Vec<usize>, HashMap<usize, usize>)>,
projection: Option<(Vec<usize>, AHashMap<usize, usize>)>,
limit: Option<usize>,
) -> BoxStream<'a, Result<Chunk<Box<dyn Array>>>>
where
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! which provides arbitrary access to any of its messages, and the
//! [`StreamReader`](stream::StreamReader), which only supports reading
//! data in the order it was written in.
use std::collections::HashMap;
use ahash::AHashMap;

use crate::array::Array;

Expand Down Expand Up @@ -35,7 +35,7 @@ pub use schema::deserialize_schema;
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};

/// how dictionaries are tracked in this crate
pub type Dictionaries = HashMap<i64, Box<dyn Array>>;
pub type Dictionaries = AHashMap<i64, Box<dyn Array>>;

pub(crate) type Node<'a> = arrow_format::ipc::FieldNodeRef<'a>;
pub(crate) type IpcBuffer<'a> = arrow_format::ipc::BufferRef<'a>;
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use ahash::AHashMap;
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};

Expand Down Expand Up @@ -119,7 +119,7 @@ pub fn read_file_dictionaries<R: Read + Seek>(
let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() {
blocks
} else {
return Ok(HashMap::new());
return Ok(AHashMap::new());
};
// use a temporary smaller scratch for the messages
let mut message_scratch = Default::default();
Expand Down Expand Up @@ -326,7 +326,7 @@ pub struct FileReader<R: Read + Seek> {
// the dictionaries are going to be read
dictionaries: Option<Dictionaries>,
current_block: usize,
projection: Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
projection: Option<(Vec<usize>, AHashMap<usize, usize>, Schema)>,
remaining: usize,
data_scratch: Vec<u8>,
message_scratch: Vec<u8>,
Expand Down
Loading

0 comments on commit 211e62a

Please sign in to comment.