Skip to content

Commit

Permalink
pkg/accesslogs: test shipment counts in processor test
Browse files Browse the repository at this point in the history
Change-Id: If0c416e2cb4218c20ed0ff5b6ebe3896e63e932d
  • Loading branch information
halkyon committed Aug 20, 2024
1 parent 691964b commit 7ae6c2c
Showing 1 changed file with 74 additions and 51 deletions.
125 changes: 74 additions & 51 deletions pkg/accesslogs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,58 +87,81 @@ func TestProcessor(t *testing.T) {
func TestProcessorWithShipment(t *testing.T) {
t.Parallel()

ctx := testcontext.New(t)
defer ctx.Cleanup()

log := zaptest.NewLogger(t)
defer ctx.Check(log.Sync)

s := newInMemoryStorage()
p := NewProcessor(log, Options{
DefaultShipmentLimit: 20 * memory.B,
})
defer ctx.Check(p.Close)

ctx.Go(p.Run)

uuid1, err := uuid.New()
require.NoError(t, err)
uuid2, err := uuid.New()
require.NoError(t, err)
key1 := Key{
PublicProjectID: uuid1,
Bucket: "bucket1",
Prefix: "prefix1",
}
key2 := Key{
PublicProjectID: uuid2,
Bucket: "bucket2",
Prefix: "prefix2/",
}
entry1 := newTestEntry("entry1")
entry2 := newTestEntry("entry2")

for i := 0; i < 10; i++ {
require.NoError(t, p.QueueEntry(s, key1, entry1))
require.NoError(t, p.QueueEntry(s, key2, entry1))
require.NoError(t, p.QueueEntry(s, key1, entry2))
require.NoError(t, p.QueueEntry(s, key2, entry2))
tests := []struct {
name string
shipmentLimit memory.Size
expectedShipments int
}{
{
name: "small shipment limit",
shipmentLimit: 20 * memory.B,
expectedShipments: 10,
},
{
name: "big shipment limit",
shipmentLimit: 64 * memory.MiB,
expectedShipments: 1,
},
}

require.NoError(t, p.Close()) // sync, don't wait until the deferred call

for _, bucket := range []string{key1.Bucket, key2.Bucket} {
buf := bytes.NewBuffer(nil)

for _, v := range s.getBucketContents(bucket) {
buf.Write(v)
}

bucketContents := buf.String()
require.Equal(t, 20, strings.Count(bucketContents, "\n"))
bucketContents = strings.Replace(bucketContents, entry1.String()+"\n", "", 10)
bucketContents = strings.Replace(bucketContents, entry2.String()+"\n", "", 10)
require.Empty(t, bucketContents)
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx := testcontext.New(t)
defer ctx.Cleanup()

log := zaptest.NewLogger(t)
defer ctx.Check(log.Sync)

s := newInMemoryStorage()
p := NewProcessor(log, Options{
DefaultShipmentLimit: tc.shipmentLimit,
})
defer ctx.Check(p.Close)

ctx.Go(p.Run)

uuid1, err := uuid.New()
require.NoError(t, err)
uuid2, err := uuid.New()
require.NoError(t, err)
key1 := Key{
PublicProjectID: uuid1,
Bucket: "bucket1",
Prefix: "prefix1",
}
key2 := Key{
PublicProjectID: uuid2,
Bucket: "bucket2",
Prefix: "prefix2/",
}
entry1 := newTestEntry("entry1")
entry2 := newTestEntry("entry2")

for i := 0; i < 10; i++ {
require.NoError(t, p.QueueEntry(s, key1, entry1))
require.NoError(t, p.QueueEntry(s, key2, entry1))
require.NoError(t, p.QueueEntry(s, key1, entry2))
require.NoError(t, p.QueueEntry(s, key2, entry2))
}

require.NoError(t, p.Close()) // sync, don't wait until the deferred call

for _, bucket := range []string{key1.Bucket, key2.Bucket} {
buf := bytes.NewBuffer(nil)

require.Len(t, s.getBucketContents(bucket), tc.expectedShipments)

for _, v := range s.getBucketContents(bucket) {
buf.Write(v)
}

bucketContents := buf.String()
require.Equal(t, 20, strings.Count(bucketContents, "\n"))
bucketContents = strings.Replace(bucketContents, entry1.String()+"\n", "", 10)
bucketContents = strings.Replace(bucketContents, entry2.String()+"\n", "", 10)
require.Empty(t, bucketContents)
}
})
}
}

Expand Down

0 comments on commit 7ae6c2c

Please sign in to comment.