Skip to content

Commit

Permalink
Merge pull request #322 from bmeg/fix/kv-drop-command
Browse files Browse the repository at this point in the history
Fix drop command for kv store
  • Loading branch information
kellrott authored Feb 6, 2025
2 parents 54c6baa + a9b9bd3 commit 96023b5
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 101 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
uses: actions/checkout@v4
- name: run unit tests
run: |
go test ./test/... -config badger.yml
go test ./test/... -config pebble.yml
badgerTest:
Expand Down Expand Up @@ -217,7 +217,7 @@ jobs:
run: |
# start grip server
chmod +x grip
./grip server --rpc-port 18202 --http-port 18201 --config ./test/badger-auth.yml &
./grip server --rpc-port 18202 --http-port 18201 --config ./test/pebble-auth.yml &
sleep 5
# simple auth
# run tests without credentials, should fail
Expand All @@ -228,7 +228,7 @@ jobs:
echo "Got expected auth error"
fi
# run specialized role based tests
make test-authorization ARGS="--grip_config_file_path test/badger-auth.yml"
make test-authorization ARGS="--grip_config_file_path test/pebble-auth.yml"
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func TestifyConfig(c *Config) {
a := "grip.db." + rand
d.Badger = &a
}
if d.Pebble != nil {
a := "grip.db." + rand
d.Pebble = &a
}
if d.MongoDB != nil {
d.MongoDB.DBName = "gripdb-" + rand
}
Expand Down
5 changes: 3 additions & 2 deletions engine/core/processors_has.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/bmeg/grip/engine/logic"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/util/setcmp"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -50,7 +51,7 @@ func (h *HasLabel) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe
out <- t
continue
}
if contains(labels, t.GetCurrent().Get().Label) {
if setcmp.ContainsString(labels, t.GetCurrent().Get().Label) {
out <- t
}
}
Expand Down Expand Up @@ -106,7 +107,7 @@ func (h *HasID) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, o
out <- t
continue
}
if contains(ids, t.GetCurrentID()) {
if setcmp.ContainsString(ids, t.GetCurrentID()) {
out <- t
}
}
Expand Down
9 changes: 0 additions & 9 deletions engine/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ func debug(i ...interface{}) {
pretty.Println(i...)
}

func contains(a []string, v string) bool {
for _, i := range a {
if i == v {
return true
}
}
return false
}

func dedupStringSlice(s []string) []string {
seen := make(map[string]struct{}, len(s))
j := 0
Expand Down
9 changes: 0 additions & 9 deletions gripql/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,6 @@ func arrayEq(a, b []string) bool {
return true
}

func contains(a []string, n string) bool {
for _, c := range a {
if c == n {
return true
}
}
return false
}

// PipelineSteps create an array, the same length at stmts that labels the
// step id for each of the GraphStatements
func PipelineSteps(stmts []*gripql.GraphStatement) []string {
Expand Down
28 changes: 15 additions & 13 deletions kvgraph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,12 @@ import (
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/kvindex"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/setcmp"
"google.golang.org/protobuf/proto"

multierror "github.com/hashicorp/go-multierror"
)

func contains(a []string, v string) bool {
for _, i := range a {
if i == v {
return true
}
}
return false
}

// GetTimestamp returns the update timestamp
func (kgdb *KVInterfaceGDB) GetTimestamp() string {
return kgdb.kvg.ts.Get(kgdb.graph)
Expand Down Expand Up @@ -199,6 +191,10 @@ func (kgdb *KVInterfaceGDB) DelEdge(eid string) error {
if err := kgdb.kvg.kv.Delete(dkey); err != nil {
return err
}
if err := kgdb.kvg.idx.RemoveDoc(eid); err != nil {
return err
}

kgdb.kvg.ts.Touch(kgdb.graph)
return nil
}
Expand Down Expand Up @@ -235,11 +231,17 @@ func (kgdb *KVInterfaceGDB) DelVertex(id string) error {
if err := tx.Delete(vid); err != nil {
return err
}

for _, k := range delKeys {
if err := tx.Delete(k); err != nil {
return err
}
}

if err := kgdb.kvg.idx.RemoveDoc(kvindex.FieldKeyParse(vid)); err != nil {
return err
}

kgdb.kvg.ts.Touch(kgdb.graph)
return nil
})
Expand Down Expand Up @@ -378,7 +380,7 @@ func (kgdb *KVInterfaceGDB) GetOutChannel(ctx context.Context, reqChan chan gdbi
for it.Seek(skeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), skeyPrefix); it.Next() {
keyValue := it.Key()
_, _, dst, _, label, etype := SrcEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
vkey := VertexKey(kgdb.graph, dst)
if etype == edgeSingle {
vertexChan <- elementData{
Expand Down Expand Up @@ -456,7 +458,7 @@ func (kgdb *KVInterfaceGDB) GetInChannel(ctx context.Context, reqChan chan gdbi.
for it.Seek(dkeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), dkeyPrefix); it.Next() {
keyValue := it.Key()
_, src, _, _, label, _ := DstEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
vkey := VertexKey(kgdb.graph, src)
dataValue, err := it.Get(vkey)
if err == nil {
Expand Down Expand Up @@ -506,7 +508,7 @@ func (kgdb *KVInterfaceGDB) GetOutEdgeChannel(ctx context.Context, reqChan chan
for it.Seek(skeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), skeyPrefix); it.Next() {
keyValue := it.Key()
_, src, dst, eid, label, edgeType := SrcEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
if edgeType == edgeSingle {
e := gdbi.Edge{}
if load {
Expand Down Expand Up @@ -563,7 +565,7 @@ func (kgdb *KVInterfaceGDB) GetInEdgeChannel(ctx context.Context, reqChan chan g
for it.Seek(dkeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), dkeyPrefix); it.Next() {
keyValue := it.Key()
_, src, dst, eid, label, edgeType := DstEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
if edgeType == edgeSingle {
e := gdbi.Edge{}
if load {
Expand Down
19 changes: 10 additions & 9 deletions kvgraph/test/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"github.com/bmeg/grip/kvindex"
"github.com/bmeg/grip/util/setcmp"
)

var docs = `[
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestFieldListing(t *testing.T) {

count := 0
for _, field := range idx.ListFields() {
if !contains(newFields, field) {
if !setcmp.ContainsString(newFields, field) {
t.Errorf("Bad field return: %s", field)
}
count++
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestLoadDoc(t *testing.T) {

count := 0
for d := range idx.GetTermMatch(context.Background(), "v.label", "Person", -1) {
if !contains(personDocs, d) {
if !setcmp.ContainsString(personDocs, d) {
t.Errorf("Bad doc return: %s", d)
}
count++
Expand All @@ -100,7 +101,7 @@ func TestLoadDoc(t *testing.T) {

count = 0
for d := range idx.GetTermMatch(context.Background(), "v.data.firstName", "Bob", -1) {
if !contains(bobDocs, d) {
if !setcmp.ContainsString(bobDocs, d) {
t.Errorf("Bad doc return: %s", d)
}
count++
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestTermEnum(t *testing.T) {
count := 0
for d := range idx.FieldTerms("v.data.lastName") {
count++
if !contains(lastNames, d.(string)) {
if !setcmp.ContainsString(lastNames, d.(string)) {
t.Errorf("Bad term return: %s", d)
}
}
Expand All @@ -139,7 +140,7 @@ func TestTermEnum(t *testing.T) {
count = 0
for d := range idx.FieldTerms("v.data.firstName") {
count++
if !contains(firstNames, d.(string)) {
if !setcmp.ContainsString(firstNames, d.(string)) {
t.Errorf("Bad term return: %s", d)
}
}
Expand All @@ -166,7 +167,7 @@ func TestTermCount(t *testing.T) {
count := 0
for d := range idx.FieldStringTermCounts("v.data.lastName") {
count++
if !contains(lastNames, d.String) {
if !setcmp.ContainsString(lastNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
if d.String == "Smith" {
Expand All @@ -182,7 +183,7 @@ func TestTermCount(t *testing.T) {
count = 0
for d := range idx.FieldTermCounts("v.data.firstName") {
count++
if !contains(firstNames, d.String) {
if !setcmp.ContainsString(firstNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
}
Expand Down Expand Up @@ -212,7 +213,7 @@ func TestDocDelete(t *testing.T) {
count := 0
for d := range idx.FieldStringTermCounts("v.data.lastName") {
count++
if !contains(lastNames, d.String) {
if !setcmp.ContainsString(lastNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
if d.String == "Smith" {
Expand All @@ -233,7 +234,7 @@ func TestDocDelete(t *testing.T) {
count = 0
for d := range idx.FieldStringTermCounts("v.data.lastName") {
count++
if !contains(lastNames, d.String) {
if !setcmp.ContainsString(lastNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
if d.String == "Smith" {
Expand Down
9 changes: 0 additions & 9 deletions kvgraph/test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,6 @@ func resetKVInterface() {
}
}

func contains(a []string, v string) bool {
for _, i := range a {
if i == v {
return true
}
}
return false
}

func TestMain(m *testing.M) {
var err error
var exit = 1
Expand Down
23 changes: 2 additions & 21 deletions kvi/pebbledb/pebble_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,8 @@ func (pdb *PebbleKV) Delete(id []byte) error {

// DeletePrefix deletes all elements in kvstore that begin with prefix `id`
func (pdb *PebbleKV) DeletePrefix(prefix []byte) error {
deleteBlockSize := 10000
for found := true; found; {
found = false
wb := make([][]byte, 0, deleteBlockSize)
it, err := pdb.db.NewIter(&pebble.IterOptions{LowerBound: prefix})
if err != nil {
return err
}
for ; it.Valid() && bytes.HasPrefix(it.Key(), prefix) && len(wb) < deleteBlockSize-1; it.Next() {
wb = append(wb, copyBytes(it.Key()))
}
it.Close()
for _, i := range wb {
err := pdb.db.Delete(i, nil)
if err != nil {
return err
}
found = true
}
}
return nil
nextPrefix := append(prefix, 0xFF)
return pdb.db.DeleteRange(prefix, nextPrefix, nil)
}

// HasKey returns true if the key is exists in kvstore
Expand Down
9 changes: 6 additions & 3 deletions kvindex/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"fmt"
"math"

"github.com/bmeg/grip/util/setcmp"
)

// TermType defines in a term is a Number or a String
Expand Down Expand Up @@ -37,18 +39,19 @@ func fieldScan(docID string, doc map[string]interface{}, fieldPrefix string, fie
if containsPrefix(f, fields) {
if x, ok := v.(map[string]interface{}); ok {
fieldScan(docID, x, fmt.Sprintf("%s.%s", fieldPrefix, k), fields, out)
} else if contains(f, fields) {
} else if setcmp.ContainsString(fields, f) {
out <- newEntry(docID, f, v)
}
}
}
}

func mapDig(i map[string]interface{}, path []string) interface{} {
// Given a list of fields (graphs), return term (label) of doc (graph element) if it exists on field
func getTermOnField(i map[string]interface{}, path []string) interface{} {
if x, ok := i[path[0]]; ok {
if len(path) > 1 {
if y, ok := x.(map[string]interface{}); ok {
return mapDig(y, path[1:])
return getTermOnField(y, path[1:])
}
} else {
return x
Expand Down
8 changes: 6 additions & 2 deletions kvindex/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ import (

// Fields
// key: f | field
// Known as 'graph' in grip
// val:
var idxFieldPrefix = []byte("f")

// Terms
// key: t | field | TermType | term
// key: t | field | TermType'
// Known as 'label' in grip
// val: count
var idxTermPrefix = []byte("t")

// Entries
// key: i | field | TermType | term | docid
// What links the graph + label to the doc via an id
// val:
var idxEntryPrefix = []byte("i")

// Docs
// key: d | docid
// Known as 'data' in grip
// val: Doc entry list
var idxDocPrefix = []byte("D")

Expand Down Expand Up @@ -75,7 +79,7 @@ func EntryPrefix(field string) []byte {
return bytes.Join([][]byte{idxEntryPrefix, []byte(field), {}}, []byte{0})
}

// EntryTypePrefix get prefix for all entries for a single field
// EntryTypePrefix get prefix for all entries for a single field an a single type
func EntryTypePrefix(field string, ttype TermType) []byte {
return bytes.Join([][]byte{idxEntryPrefix, []byte(field), {byte(ttype)}, {}}, []byte{0})
}
Expand Down
Loading

0 comments on commit 96023b5

Please sign in to comment.