Skip to content

Commit

Permalink
chore(bigtable/bttest): Add read stats to the emulator and a simple t…
Browse files Browse the repository at this point in the history
…est (#7019)

Closes #7017
  • Loading branch information
igorbernstein2 authored Nov 10, 2022
1 parent c9acf47 commit bc008db
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 11 deletions.
81 changes: 81 additions & 0 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,87 @@ func TestReadRowsInvalidRowSet(t *testing.T) {
}
}

func TestReadRowsRequestStats(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
Families: map[string]GCPolicy{
"f": NoGcPolicy(),
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}

client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)

m := NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))

if err = table.Apply(ctx, "row1", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
m.Set("f", "q2", ServerTime, []byte("value2"))
if err = table.Apply(ctx, "row2", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.Set("f", "excluded", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row3", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

statsChannel := make(chan FullReadStats, 1)

readStart := time.Now()
if err := table.ReadRows(ctx, RowRange{}, func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil {
t.Fatalf("NewClient failed: %v", err)
}
readElapsed := time.Since(readStart)

got := <-statsChannel

wantIter := ReadIterationStats{
RowsSeenCount: 3,
RowsReturnedCount: 2,
CellsSeenCount: 4,
CellsReturnedCount: 3,
}

if diff := cmp.Diff(wantIter, got.ReadIterationStats); diff != "" {
t.Errorf("ReadRows RequestStats are incorrect (-want +got):\n%s", diff)
}

if got.RequestLatencyStats.FrontendServerLatency > readElapsed || got.RequestLatencyStats.FrontendServerLatency <= 0 {
t.Fatalf("ReadRows FrontendServerLatency should be in range 0, %v", readElapsed)
}
}

// TestHeaderPopulatedWithAppProfile verifies that request params header is populated with table name and app profile
func TestHeaderPopulatedWithAppProfile(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
Expand Down
53 changes: 42 additions & 11 deletions bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"rsc.io/binaryregexp"
)
Expand Down Expand Up @@ -402,6 +403,7 @@ func (s *server) DeleteSnapshot(context.Context, *btapb.DeleteSnapshotRequest) (
}

func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRowsServer) error {
start := time.Now()
s.mu.Lock()
tbl, ok := s.tables[req.TableName]
s.mu.Unlock()
Expand Down Expand Up @@ -479,36 +481,65 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo
sort.Sort(byRowKey(rows))

limit := int(req.RowsLimit)
count := 0
if limit == 0 {
limit = len(rows)
}

iterStats := &btpb.ReadIterationStats{}

for _, r := range rows {
if limit > 0 && count >= limit {
return nil
if int(iterStats.RowsReturnedCount) >= limit {
break
}
streamed, err := streamRow(stream, r, req.Filter)
if err != nil {

if err := streamRow(stream, r, req.Filter, iterStats); err != nil {
return err
}
if streamed {
count++
}

elapsed := time.Since(start)
if req.RequestStatsView == btpb.ReadRowsRequest_REQUEST_STATS_FULL {
rrr := &btpb.ReadRowsResponse{}
rrr.RequestStats = &btpb.RequestStats{
StatsView: &btpb.RequestStats_FullReadStatsView{
FullReadStatsView: &btpb.FullReadStatsView{
ReadIterationStats: iterStats,
RequestLatencyStats: &btpb.RequestLatencyStats{
FrontendServerLatency: durationpb.New(elapsed),
},
},
},
}

return stream.Send(rrr)
}
return nil
}

// streamRow filters the given row and sends it via the given stream.
// Returns true if at least one cell matched the filter and was streamed, false otherwise.
func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (bool, error) {
func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter, s *btpb.ReadIterationStats) error {
r.mu.Lock()
nr := r.copy()
r.mu.Unlock()
r = nr

s.RowsSeenCount++
for _, f := range r.families {
s.CellsSeenCount += int64(len(f.cells))
}

match, err := filterRow(f, r)
if err != nil {
return false, err
return err
}
if !match {
return false, nil
return nil
}

s.RowsReturnedCount++
for _, f := range r.families {
s.CellsReturnedCount += int64(len(f.cells))
}

rrr := &btpb.ReadRowsResponse{}
Expand Down Expand Up @@ -537,7 +568,7 @@ func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (
rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true}
}

return true, stream.Send(rrr)
return stream.Send(rrr)
}

// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches,
Expand Down

0 comments on commit bc008db

Please sign in to comment.