Skip to content

Commit

Permalink
bigquery: support uploading structs directly
Browse files Browse the repository at this point in the history
Uploader.Put now accepts structs (or struct pointers) without needing to
wrap them in a StructSaver first.

The meaning is equivalent to using a StructSaver with a schema inferred
from the struct, and an empty InsertID.

Inferring a schema for every struct will be expensive, but we will
cache schema inference in a forthcoming CL.

Change-Id: I31c24e6f551aaae89ebd3ddb499cc6d59a825f10
Reviewed-on: https://code-review.googlesource.com/9755
Reviewed-by: Michael Darakananda <pongad@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
jba committed Dec 9, 2016
1 parent 5bfd313 commit e80926d
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 17 deletions.
20 changes: 16 additions & 4 deletions bigquery/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ func TestIntegration_UploadAndReadStructs(t *testing.T) {
table := newTable(t, schema)
defer table.Delete(ctx)

// Populate the table.
upl := table.Uploader()
// Populate the table using StructSavers explicitly.
scores := []score{
{Name: "a", Num: 12},
{Name: "b", Num: 18},
Expand All @@ -322,13 +322,25 @@ func TestIntegration_UploadAndReadStructs(t *testing.T) {
t.Fatal(err)
}

// Continue uploading to the table using structs and struct pointers.

scores2 := []interface{}{
score{Name: "d", Num: 12},
&score{Name: "e", Num: 18},
}
if err := upl.Put(ctx, scores2); err != nil {
t.Fatal(err)
}

// Wait until the data has been uploaded. This can take a few seconds, according
// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
if err := waitForRow(ctx, table); err != nil {
t.Fatal(err)
}

// Test iteration with structs.
// Read what we wrote.
want := []score{scores[0], scores[1], scores[2],
scores2[0].(score), *scores2[1].(*score)}
it := table.Read(ctx)
var got []score
for {
Expand All @@ -343,8 +355,8 @@ func TestIntegration_UploadAndReadStructs(t *testing.T) {
got = append(got, g)
}
sort.Sort(byName(got))
if !reflect.DeepEqual(got, scores) {
t.Errorf("got %+v, want %+v", got, scores)
if !reflect.DeepEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
}
}

Expand Down
67 changes: 57 additions & 10 deletions bigquery/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,78 @@ func (t *Table) Uploader() *Uploader {
return &Uploader{t: t}
}

// Put uploads one or more rows to the BigQuery service. src must implement ValueSaver or be a slice of ValueSavers.
// Put uploads one or more rows to the BigQuery service.
//
// If src is ValueSaver, then its Save method is called to produce a row for uploading.
//
// If src is a struct or pointer to a struct, then a schema is inferred from it
// and used to create a StructSaver. The InsertID of the StructSaver will be
// empty.
//
// If src is a slice of ValueSavers, structs, or struct pointers, then each
// element of the slice is treated as above, and multiple rows are uploaded.
//
// Put returns a PutMultiError if one or more rows failed to be uploaded.
// The PutMultiError contains a RowInsertionError for each failed row.
func (u *Uploader) Put(ctx context.Context, src interface{}) error {
// TODO(mcgreevy): Support structs which do not implement ValueSaver as src, a la Datastore.

if saver, ok := src.(ValueSaver); ok {
return u.putMulti(ctx, []ValueSaver{saver})
savers, err := valueSavers(src)
if err != nil {
return err
}
return u.putMulti(ctx, savers)
}

func valueSavers(src interface{}) ([]ValueSaver, error) {
saver, ok, err := toValueSaver(src)
if err != nil {
return nil, err
}
if ok {
return []ValueSaver{saver}, nil
}
srcVal := reflect.ValueOf(src)
if srcVal.Kind() != reflect.Slice {
return fmt.Errorf("%T is not a ValueSaver or slice of ValueSavers", src)
}
return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src)

}
var savers []ValueSaver
for i := 0; i < srcVal.Len(); i++ {
s := srcVal.Index(i).Interface()
saver, ok := s.(ValueSaver)
saver, ok, err := toValueSaver(s)
if err != nil {
return nil, err
}
if !ok {
return fmt.Errorf("element %d of src is of type %T, which is not a ValueSaver", i, s)
return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s)
}
savers = append(savers, saver)
}
return u.putMulti(ctx, savers)
return savers, nil
}

// Make a ValueSaver from x, which must implement ValueSaver already
// or be a struct or pointer to struct.
func toValueSaver(x interface{}) (ValueSaver, bool, error) {
if saver, ok := x.(ValueSaver); ok {
return saver, ok, nil
}
v := reflect.ValueOf(x)
// Support Put with []interface{}
if v.Kind() == reflect.Interface {
v = v.Elem()
}
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
if v.Kind() != reflect.Struct {
return nil, false, nil
}
// TODO(jba): cache schema inference to speed this up.
schema, err := inferStruct(v.Type())
if err != nil {
return nil, false, err
}
return &StructSaver{Struct: x, Schema: schema}, true, nil
}

func (u *Uploader) putMulti(ctx context.Context, src []ValueSaver) error {
Expand Down
49 changes: 46 additions & 3 deletions bigquery/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"reflect"
"testing"

"cloud.google.com/go/internal/pretty"

"golang.org/x/net/context"
)

Expand Down Expand Up @@ -53,7 +55,7 @@ func TestRejectsNonValueSavers(t *testing.T) {

for _, tc := range testCases {
if err := u.Put(context.Background(), tc.src); err == nil {
t.Errorf("put value: %v; got err: %v; want nil", tc.src, err)
t.Errorf("put value: %v; got nil, want error", tc.src)
}
}
}
Expand Down Expand Up @@ -119,8 +121,7 @@ func TestInsertsData(t *testing.T) {
projectID: "project-id",
service: irr,
}
table := client.Dataset("dataset-id").Table("table-id")
u := Uploader{t: table}
u := client.Dataset("dataset-id").Table("table-id").Uploader()
for _, batch := range tc.data {
if len(batch) == 0 {
continue
Expand Down Expand Up @@ -240,3 +241,45 @@ func TestUploadOptionsPropagate(t *testing.T) {
}
}
}

func TestValueSavers(t *testing.T) {
ts := &testSaver{ir: &insertionRow{}}
type T struct{ I int }
schema, err := InferSchema(T{})
if err != nil {
t.Fatal(err)
}
for _, test := range []struct {
in interface{}
want []ValueSaver
}{
{ts, []ValueSaver{ts}},
{T{I: 1}, []ValueSaver{&StructSaver{Schema: schema, Struct: T{I: 1}}}},
{[]ValueSaver{ts, ts}, []ValueSaver{ts, ts}},
{[]interface{}{ts, ts}, []ValueSaver{ts, ts}},
{[]T{{I: 1}, {I: 2}}, []ValueSaver{
&StructSaver{Schema: schema, Struct: T{I: 1}},
&StructSaver{Schema: schema, Struct: T{I: 2}},
}},
{[]interface{}{T{I: 1}, &T{I: 2}}, []ValueSaver{
&StructSaver{Schema: schema, Struct: T{I: 1}},
&StructSaver{Schema: schema, Struct: &T{I: 2}},
}},
} {
got, err := valueSavers(test.in)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got, test.want) {

t.Errorf("%+v: got %v, want %v", test.in, pretty.Value(got), pretty.Value(test.want))
}
// Make sure Save is successful.
for i, vs := range got {
_, _, err := vs.Save()
if err != nil {
t.Fatalf("%+v, #%d: got error %v, want nil", test.in, i, err)
}
}
}
}

0 comments on commit e80926d

Please sign in to comment.