Skip to content

Commit

Permalink
Add support for int32 columns (#647)
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Takashi <nicolas.tcs@hotmail.com>
  • Loading branch information
nicolastakashi authored Jan 2, 2024
1 parent db1b0e5 commit 8b67228
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 6 deletions.
2 changes: 2 additions & 0 deletions dynparquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ func storageLayoutToParquetNode(l StorageLayout) (parquet.Node, error) {
node = parquet.Leaf(parquet.DoubleType)
case int32(schemapb.StorageLayout_TYPE_BOOL):
node = parquet.Leaf(parquet.BooleanType)
case int32(schemapb.StorageLayout_TYPE_INT32):
node = parquet.Int(32)
default:
return nil, fmt.Errorf("unknown storage layout type: %v", l.GetTypeInt32())
}
Expand Down
11 changes: 8 additions & 3 deletions gen/proto/go/frostdb/schema/v1alpha1/schema.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions gen/proto/go/frostdb/schema/v1alpha2/schema.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pqarrow/arrowutils/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func (h cursorHeap) Less(i, j int) bool {
continue
}
return v1 < v2
case *array.Int32:
arr2 := c2.r.Column(i).(*array.Int32)
v1 := arr1.Value(c1.curIdx)
v2 := arr2.Value(c2.curIdx)
if v1 == v2 {
continue
}
return v1 < v2
case *array.Dictionary:
switch dict := arr1.Dictionary().(type) {
case *array.Binary:
Expand Down
5 changes: 5 additions & 0 deletions pqarrow/builder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func RollbackPrevious(cb ColumnBuilder) error {
b.ResetToLength(b.Len() - 1)
case *array.Int64Builder:
b.Resize(b.Len() - 1)

case *array.StringBuilder:
b.Resize(b.Len() - 1)
case *array.BinaryBuilder:
Expand Down Expand Up @@ -65,6 +66,8 @@ func AppendValue(cb ColumnBuilder, arr arrow.Array, i int) error {
b.AppendSingle(arr.(*array.Boolean).Value(i))
case *array.Int64Builder:
b.Append(arr.(*array.Int64).Value(i))
case *array.Int32Builder:
b.Append(arr.(*array.Int32).Value(i))
case *array.Float64Builder:
b.Append(arr.(*array.Float64).Value(i))
case *array.StringBuilder:
Expand Down Expand Up @@ -163,6 +166,8 @@ func AppendGoValue(cb ColumnBuilder, v any) error {
b.AppendSingle(v.(bool))
case *array.Int64Builder:
b.Append(v.(int64))
case *array.Int32Builder:
b.Append(v.(int32))
case *array.StringBuilder:
b.Append(v.(string))
case *array.BinaryBuilder:
Expand Down
2 changes: 2 additions & 0 deletions pqarrow/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func ArrowScalarToParquetValue(sc scalar.Scalar) (parquet.Value, error) {
return parquet.ValueOf(string(s.Data())), nil
case *scalar.Int64:
return parquet.ValueOf(s.Value), nil
case *scalar.Int32:
return parquet.ValueOf(s.Value), nil
case *scalar.FixedSizeBinary:
width := s.Type.(*arrow.FixedSizeBinaryType).ByteWidth
v := [16]byte{}
Expand Down
2 changes: 2 additions & 0 deletions proto/frostdb/schema/v1alpha1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ message StorageLayout {
TYPE_DOUBLE = 3;
// Represents a boolean type.
TYPE_BOOL = 4;
// Represents a int32 type.
TYPE_INT32 = 5;
}

// Type of the column.
Expand Down
2 changes: 2 additions & 0 deletions proto/frostdb/schema/v1alpha2/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ message StorageLayout {
TYPE_DOUBLE = 3;
// Represents a boolean type.
TYPE_BOOL = 4;
// Represents a int32 type.
TYPE_INT32 = 5;
}

// Type of the column.
Expand Down
108 changes: 108 additions & 0 deletions query/physicalplan/binaryscalarexpr.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ func BinaryScalarOperation(left arrow.Array, right scalar.Scalar, operator logic
default:
panic("something terrible has happened, this should have errored previously during validation")
}
case arrow.PrimitiveTypes.Int32:
switch operator {
case logicalplan.OpEq:
return Int32ArrayScalarEqual(left.(*array.Int32), right.(*scalar.Int32))
case logicalplan.OpNotEq:
return Int32ArrayScalarNotEqual(left.(*array.Int32), right.(*scalar.Int32))
case logicalplan.OpLt:
return Int32ArrayScalarLessThan(left.(*array.Int32), right.(*scalar.Int32))
case logicalplan.OpLtEq:
return Int32ArrayScalarLessThanOrEqual(left.(*array.Int32), right.(*scalar.Int32))
case logicalplan.OpGt:
return Int32ArrayScalarGreaterThan(left.(*array.Int32), right.(*scalar.Int32))
case logicalplan.OpGtEq:
return Int32ArrayScalarGreaterThanOrEqual(left.(*array.Int32), right.(*scalar.Int32))
default:
panic("something terrible has happened, this should have errored previously during validation")
}
}

switch arr := left.(type) {
Expand Down Expand Up @@ -457,3 +474,94 @@ func BooleanArrayScalarNotEqual(left *array.Boolean, right *scalar.Boolean) (*Bi

return res, nil
}

func Int32ArrayScalarEqual(left *array.Int32, right *scalar.Int32) (*Bitmap, error) {
res := NewBitmap()

for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
if left.Value(i) == right.Value {
res.Add(uint32(i))
}
}

return res, nil
}

func Int32ArrayScalarNotEqual(left *array.Int32, right *scalar.Int32) (*Bitmap, error) {
res := NewBitmap()

for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
res.Add(uint32(i))
continue
}
if left.Value(i) != right.Value {
res.Add(uint32(i))
}
}

return res, nil
}

func Int32ArrayScalarLessThan(left *array.Int32, right *scalar.Int32) (*Bitmap, error) {
res := NewBitmap()

for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
if left.Value(i) < right.Value {
res.Add(uint32(i))
}
}

return res, nil
}

func Int32ArrayScalarLessThanOrEqual(left *array.Int32, right *scalar.Int32) (*Bitmap, error) {
res := NewBitmap()

for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
if left.Value(i) <= right.Value {
res.Add(uint32(i))
}
}

return res, nil
}

func Int32ArrayScalarGreaterThan(left *array.Int32, right *scalar.Int32) (*Bitmap, error) {
res := NewBitmap()

for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
if left.Value(i) > right.Value {
res.Add(uint32(i))
}
}

return res, nil
}

func Int32ArrayScalarGreaterThanOrEqual(left *array.Int32, right *scalar.Int32) (*Bitmap, error) {
res := NewBitmap()

for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
if left.Value(i) >= right.Value {
res.Add(uint32(i))
}
}

return res, nil
}

0 comments on commit 8b67228

Please sign in to comment.