Skip to content

Commit

Permalink
feat: Add DQM Logging on GRPC Server with FileLogStorage for Testing (#…
Browse files Browse the repository at this point in the history
…2403)

* Make a proof of concept

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* revert feature store

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* refactor

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add time

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add time

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* clean up

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add comment

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add pseudocode

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Refactor logging functionality to hide internals

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Refactor

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Revert changes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add new timeout test

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix python ci for m1 mac

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Working state

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Move offline log store

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* refactor

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update logs

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update log storage

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* WOrking state

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Work

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add tests for filestorage

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix logging

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add more tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update error

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* semi working state

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* b state

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update types to be public

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update structs to make fields public

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* clean up

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix go

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix issues

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Working state

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up code a bit

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fixes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update schema functionality

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Remove xitongsys parquet reader

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Clean up

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix go mode

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests and errors and everything

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Remove unused code

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Last working commit

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* work

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Address some changes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* More addresses.

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix more review comments

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Rename

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add request id

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* More fixes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix odfv

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Address other changes

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Reorder for optimization

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add more shcema tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* refactor to clean

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Add initialized repo for testing

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Remove

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Text

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix?

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* remove entity map

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* remove Cache method from registry

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* clean up pre-initialized repo

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* git ignore full data directory in tests

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

Co-authored-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
kevjumba and pyalex authored Apr 13, 2022
1 parent cf7bbc2 commit 57a97d8
Show file tree
Hide file tree
Showing 28 changed files with 1,532 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
infra/scripts/*.conf
go/internal/test/feature_repo
go/cmd/server/logging/feature_repo/data/

# Translations
*.mo
Expand Down
30 changes: 14 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/go-python/gopy v0.4.0
github.com/go-redis/redis/v8 v8.11.4
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
github.com/google/uuid v1.3.0
github.com/mattn/go-sqlite3 v1.14.12
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.28.0
)

require (
Expand All @@ -23,28 +23,26 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/goccy/go-json v0.7.10 // indirect
github.com/goccy/go-json v0.9.6 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gonuts/commander v0.1.0 // indirect
github.com/gonuts/flag v0.1.0 // indirect
github.com/google/flatbuffers v2.0.5+incompatible // indirect
github.com/klauspost/asmfmt v1.3.1 // indirect
github.com/google/flatbuffers v2.0.6+incompatible // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.12 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/zeebo/xxh3 v1.0.1 // indirect
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect
golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 // indirect
golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
Expand Down
53 changes: 34 additions & 19 deletions go.sum

Large diffs are not rendered by default.

Empty file.
Binary file not shown.
40 changes: 40 additions & 0 deletions go/cmd/server/logging/feature_repo/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# This is an example feature definition file

from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_hourly_stats = FileSource(
path="driver_stats.parquet",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)

# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 365 * 10),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
batch_source=driver_hourly_stats,
tags={},
)

driver_stats_fs = FeatureService(
name="test_service",
features=[driver_hourly_stats_view]
)
5 changes: 5 additions & 0 deletions go/cmd/server/logging/feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
project: feature_repo
registry: data/registry.db
provider: local
online_store:
path: data/online_store.db
86 changes: 86 additions & 0 deletions go/cmd/server/logging/filelogstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package logging

import (
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/parquet"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/feast-dev/feast/go/internal/feast/registry"
)

type FileLogStorage struct {
// Feast project name
project string
path string
}

func GetFileConfig(config *registry.RepoConfig) (*OfflineLogStoreConfig, error) {
fileConfig := OfflineLogStoreConfig{
storeType: "file",
}
if onlineStorePath, ok := config.OfflineStore["path"]; ok {
path, success := onlineStorePath.(string)
if !success {
return &fileConfig, fmt.Errorf("path, %s, cannot be converted to string", path)
}
fileConfig.path = path
} else {
return nil, errors.New("need path for file log storage")
}
return &fileConfig, nil
}

// This offline store is currently only used for testing. It will be instantiated during go unit tests to log to file
// and the parquet files will be cleaned up after the test is run.
func NewFileOfflineStore(project string, offlineStoreConfig *OfflineLogStoreConfig) (*FileLogStorage, error) {
store := FileLogStorage{project: project}
var absPath string
var err error
// TODO(kevjumba) remove this default catch.
if offlineStoreConfig.path != "" {
absPath, err = filepath.Abs(offlineStoreConfig.path)
} else {
return nil, errors.New("need path for file log storage")
}
if err != nil {
return nil, err
}
store.path = absPath
return &store, nil
}

func openLogFile(absPath string) (*os.File, error) {
var _, err = os.Stat(absPath)

// create file if not exists
if os.IsNotExist(err) {
var file, err = os.Create(absPath)
if err != nil {
return nil, err
}
return file, nil
} else {
return nil, fmt.Errorf("path %s already exists", absPath)
}
}

func (f *FileLogStorage) FlushToStorage(tbl array.Table) error {
w, err := openLogFile(f.path)
var writer io.Writer = w
if err != nil {
return err
}
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
arrProps := pqarrow.DefaultWriterProps()
err = pqarrow.WriteTable(tbl, writer, 100, props, arrProps)
if err != nil {
return err
}
return nil

}
70 changes: 70 additions & 0 deletions go/cmd/server/logging/filelogstorage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package logging

import (
"context"
"path/filepath"

"testing"

"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/memory"
"github.com/apache/arrow/go/v8/parquet/file"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/feast-dev/feast/go/internal/test"
"github.com/stretchr/testify/assert"
)

func TestFlushToStorage(t *testing.T) {
ctx := context.Background()
table, expectedSchema, expectedColumns, err := GetTestArrowTableAndExpectedResults()
defer table.Release()
assert.Nil(t, err)
offlineStoreConfig := OfflineLogStoreConfig{
storeType: "file",
path: "./log.parquet",
}
fileStore, err := NewFileOfflineStore("test", &offlineStoreConfig)
assert.Nil(t, err)
err = fileStore.FlushToStorage(array.Table(table))
assert.Nil(t, err)
logPath, err := filepath.Abs(offlineStoreConfig.path)
assert.Nil(t, err)
pf, err := file.OpenParquetFile(logPath, false)
assert.Nil(t, err)

reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
assert.Nil(t, err)

tbl, err := reader.ReadTable(ctx)
assert.Nil(t, err)
tr := array.NewTableReader(tbl, -1)
defer tbl.Release()

defer tr.Release()
for tr.Next() {
rec := tr.Record()
assert.NotNil(t, rec)
for _, field := range rec.Schema().Fields() {
assert.Contains(t, expectedSchema, field.Name)
assert.Equal(t, field.Type, expectedSchema[field.Name])
}
values, err := test.GetProtoFromRecord(rec)

assert.Nil(t, err)
for name, val := range values {
if name == "RequestId" {
// Ensure there are request ids in record.
assert.Greater(t, len(val.Val), 0)
} else {
assert.Equal(t, len(val.Val), len(expectedColumns[name].Val))
for idx, featureVal := range val.Val {
assert.Equal(t, featureVal.Val, expectedColumns[name].Val[idx].Val)
}
}
}
}

err = test.CleanUpFile(logPath)
assert.Nil(t, err)

}
Loading

0 comments on commit 57a97d8

Please sign in to comment.