Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix drop command for kv store #322

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add delete operations to drop command
  • Loading branch information
matthewpeterkort committed Feb 5, 2025
commit c55c2f0df3dfc4839a8033158b8f9a9ce4cdc203
5 changes: 3 additions & 2 deletions engine/core/processors_has.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/bmeg/grip/engine/logic"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/util/setcmp"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -50,7 +51,7 @@ func (h *HasLabel) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe
out <- t
continue
}
if contains(labels, t.GetCurrent().Get().Label) {
if setcmp.ContainsString(labels, t.GetCurrent().Get().Label) {
out <- t
}
}
Expand Down Expand Up @@ -106,7 +107,7 @@ func (h *HasID) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, o
out <- t
continue
}
if contains(ids, t.GetCurrentID()) {
if setcmp.ContainsString(ids, t.GetCurrentID()) {
out <- t
}
}
Expand Down
9 changes: 0 additions & 9 deletions engine/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ func debug(i ...interface{}) {
pretty.Println(i...)
}

func contains(a []string, v string) bool {
for _, i := range a {
if i == v {
return true
}
}
return false
}

func dedupStringSlice(s []string) []string {
seen := make(map[string]struct{}, len(s))
j := 0
Expand Down
9 changes: 0 additions & 9 deletions gripql/inspect/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,6 @@ func arrayEq(a, b []string) bool {
return true
}

func contains(a []string, n string) bool {
for _, c := range a {
if c == n {
return true
}
}
return false
}

// PipelineSteps create an array, the same length at stmts that labels the
// step id for each of the GraphStatements
func PipelineSteps(stmts []*gripql.GraphStatement) []string {
Expand Down
71 changes: 58 additions & 13 deletions kvgraph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,12 @@ import (
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/kvindex"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/setcmp"
"google.golang.org/protobuf/proto"

multierror "github.com/hashicorp/go-multierror"
)

func contains(a []string, v string) bool {
for _, i := range a {
if i == v {
return true
}
}
return false
}

// GetTimestamp returns the update timestamp
func (kgdb *KVInterfaceGDB) GetTimestamp() string {
return kgdb.kvg.ts.Get(kgdb.graph)
Expand Down Expand Up @@ -199,6 +191,10 @@ func (kgdb *KVInterfaceGDB) DelEdge(eid string) error {
if err := kgdb.kvg.kv.Delete(dkey); err != nil {
return err
}
if err := kgdb.kvg.idx.RemoveDoc(eid); err != nil {
return err
}

kgdb.kvg.ts.Touch(kgdb.graph)
return nil
}
Expand Down Expand Up @@ -235,6 +231,10 @@ func (kgdb *KVInterfaceGDB) DelVertex(id string) error {
if err := tx.Delete(vid); err != nil {
return err
}
if err := kgdb.kvg.idx.RemoveDoc(kvindex.FieldKeyParse(vid)); err != nil {
return err
}

for _, k := range delKeys {
if err := tx.Delete(k); err != nil {
return err
Expand Down Expand Up @@ -378,7 +378,7 @@ func (kgdb *KVInterfaceGDB) GetOutChannel(ctx context.Context, reqChan chan gdbi
for it.Seek(skeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), skeyPrefix); it.Next() {
keyValue := it.Key()
_, _, dst, _, label, etype := SrcEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
vkey := VertexKey(kgdb.graph, dst)
if etype == edgeSingle {
vertexChan <- elementData{
Expand Down Expand Up @@ -456,7 +456,7 @@ func (kgdb *KVInterfaceGDB) GetInChannel(ctx context.Context, reqChan chan gdbi.
for it.Seek(dkeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), dkeyPrefix); it.Next() {
keyValue := it.Key()
_, src, _, _, label, _ := DstEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
vkey := VertexKey(kgdb.graph, src)
dataValue, err := it.Get(vkey)
if err == nil {
Expand Down Expand Up @@ -506,7 +506,7 @@ func (kgdb *KVInterfaceGDB) GetOutEdgeChannel(ctx context.Context, reqChan chan
for it.Seek(skeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), skeyPrefix); it.Next() {
keyValue := it.Key()
_, src, dst, eid, label, edgeType := SrcEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
if edgeType == edgeSingle {
e := gdbi.Edge{}
if load {
Expand Down Expand Up @@ -563,7 +563,7 @@ func (kgdb *KVInterfaceGDB) GetInEdgeChannel(ctx context.Context, reqChan chan g
for it.Seek(dkeyPrefix); it.Valid() && bytes.HasPrefix(it.Key(), dkeyPrefix); it.Next() {
keyValue := it.Key()
_, src, dst, eid, label, edgeType := DstEdgeKeyParse(keyValue)
if len(edgeLabels) == 0 || contains(edgeLabels, label) {
if len(edgeLabels) == 0 || setcmp.ContainsString(edgeLabels, label) {
if edgeType == edgeSingle {
e := gdbi.Edge{}
if load {
Expand Down Expand Up @@ -678,6 +678,51 @@ func (kgdb *KVInterfaceGDB) GetVertexList(ctx context.Context, loadProp bool) <-
return o
}

func (kgdb *KVInterfaceGDB) DeleteAllData(ctx context.Context, graph string) error {
go func() {
kgdb.kvg.kv.View(func(it kvi.KVIterator) error {
ePrefix := EdgeListPrefix(graph)
for it.Seek(ePrefix); it.Valid() && bytes.HasPrefix(it.Key(), ePrefix); it.Next() {
select {
case <-ctx.Done():
return nil
default:
}
keyValue := it.Key()
_, eid, _, _, _, etype := EdgeKeyParse(keyValue)
if etype == edgeSingle {
kgdb.DelEdge(string(eid))
}
}
return nil
})
}()

go func() {
kgdb.kvg.kv.View(func(it kvi.KVIterator) error {
vPrefix := VertexListPrefix(graph)

for it.Seek(vPrefix); it.Valid() && bytes.HasPrefix(it.Key(), vPrefix); it.Next() {
select {
case <-ctx.Done():
return nil
default:
}
gv := &gripql.Vertex{}
dataValue, _ := it.Value()
proto.Unmarshal(dataValue, gv)
keyValue := it.Key()
_, vid := VertexKeyParse(keyValue)
_ = kgdb.DelVertex(vid)

}
return nil
})
}()

return nil
}

// ListVertexLabels returns a list of vertex types in the graph
func (kgdb *KVInterfaceGDB) ListVertexLabels() ([]string, error) {
labelField := fmt.Sprintf("%s.v.label", kgdb.graph)
Expand Down
4 changes: 4 additions & 0 deletions kvgraph/graphdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kvgraph

import (
"bytes"
"context"
"fmt"

"github.com/bmeg/grip/gdbi"
Expand Down Expand Up @@ -43,6 +44,9 @@ func (kgraph *KVGraph) DeleteGraph(graph string) error {
graphKey := GraphKey(graph)
kgraph.kv.Delete(graphKey)

kvgdb := KVInterfaceGDB{kvg: kgraph, graph: graph}
kvgdb.DeleteAllData(context.Background(), graph)

kgraph.deleteGraphIndex(graph)

return nil
Expand Down
19 changes: 10 additions & 9 deletions kvgraph/test/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"github.com/bmeg/grip/kvindex"
"github.com/bmeg/grip/util/setcmp"
)

var docs = `[
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestFieldListing(t *testing.T) {

count := 0
for _, field := range idx.ListFields() {
if !contains(newFields, field) {
if !setcmp.ContainsString(newFields, field) {
t.Errorf("Bad field return: %s", field)
}
count++
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestLoadDoc(t *testing.T) {

count := 0
for d := range idx.GetTermMatch(context.Background(), "v.label", "Person", -1) {
if !contains(personDocs, d) {
if !setcmp.ContainsString(personDocs, d) {
t.Errorf("Bad doc return: %s", d)
}
count++
Expand All @@ -100,7 +101,7 @@ func TestLoadDoc(t *testing.T) {

count = 0
for d := range idx.GetTermMatch(context.Background(), "v.data.firstName", "Bob", -1) {
if !contains(bobDocs, d) {
if !setcmp.ContainsString(bobDocs, d) {
t.Errorf("Bad doc return: %s", d)
}
count++
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestTermEnum(t *testing.T) {
count := 0
for d := range idx.FieldTerms("v.data.lastName") {
count++
if !contains(lastNames, d.(string)) {
if !setcmp.ContainsString(lastNames, d.(string)) {
t.Errorf("Bad term return: %s", d)
}
}
Expand All @@ -139,7 +140,7 @@ func TestTermEnum(t *testing.T) {
count = 0
for d := range idx.FieldTerms("v.data.firstName") {
count++
if !contains(firstNames, d.(string)) {
if !setcmp.ContainsString(firstNames, d.(string)) {
t.Errorf("Bad term return: %s", d)
}
}
Expand All @@ -166,7 +167,7 @@ func TestTermCount(t *testing.T) {
count := 0
for d := range idx.FieldStringTermCounts("v.data.lastName") {
count++
if !contains(lastNames, d.String) {
if !setcmp.ContainsString(lastNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
if d.String == "Smith" {
Expand All @@ -182,7 +183,7 @@ func TestTermCount(t *testing.T) {
count = 0
for d := range idx.FieldTermCounts("v.data.firstName") {
count++
if !contains(firstNames, d.String) {
if !setcmp.ContainsString(firstNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
}
Expand Down Expand Up @@ -212,7 +213,7 @@ func TestDocDelete(t *testing.T) {
count := 0
for d := range idx.FieldStringTermCounts("v.data.lastName") {
count++
if !contains(lastNames, d.String) {
if !setcmp.ContainsString(lastNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
if d.String == "Smith" {
Expand All @@ -233,7 +234,7 @@ func TestDocDelete(t *testing.T) {
count = 0
for d := range idx.FieldStringTermCounts("v.data.lastName") {
count++
if !contains(lastNames, d.String) {
if !setcmp.ContainsString(lastNames, d.String) {
t.Errorf("Bad term return: %s", d.String)
}
if d.String == "Smith" {
Expand Down
9 changes: 0 additions & 9 deletions kvgraph/test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,6 @@ func resetKVInterface() {
}
}

func contains(a []string, v string) bool {
for _, i := range a {
if i == v {
return true
}
}
return false
}

func TestMain(m *testing.M) {
var err error
var exit = 1
Expand Down
9 changes: 6 additions & 3 deletions kvindex/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"fmt"
"math"

"github.com/bmeg/grip/util/setcmp"
)

// TermType defines in a term is a Number or a String
Expand Down Expand Up @@ -37,18 +39,19 @@ func fieldScan(docID string, doc map[string]interface{}, fieldPrefix string, fie
if containsPrefix(f, fields) {
if x, ok := v.(map[string]interface{}); ok {
fieldScan(docID, x, fmt.Sprintf("%s.%s", fieldPrefix, k), fields, out)
} else if contains(f, fields) {
} else if setcmp.ContainsString(fields, f) {
out <- newEntry(docID, f, v)
}
}
}
}

func mapDig(i map[string]interface{}, path []string) interface{} {
// Given a list of fields (graphs), return term (label) of doc (graph element) if it exists on field
func getTermOnField(i map[string]interface{}, path []string) interface{} {
if x, ok := i[path[0]]; ok {
if len(path) > 1 {
if y, ok := x.(map[string]interface{}); ok {
return mapDig(y, path[1:])
return getTermOnField(y, path[1:])
}
} else {
return x
Expand Down
8 changes: 6 additions & 2 deletions kvindex/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ import (

// Fields
// key: f | field
// Known as 'graph' in grip
// val:
var idxFieldPrefix = []byte("f")

// Terms
// key: t | field | TermType | term
// key: t | field | TermType'
// Known as 'label' in grip
// val: count
var idxTermPrefix = []byte("t")

// Entries
// key: i | field | TermType | term | docid
// What links the graph + label to the doc via an id
// val:
var idxEntryPrefix = []byte("i")

// Docs
// key: d | docid
// Known as 'data' in grip
// val: Doc entry list
var idxDocPrefix = []byte("D")

Expand Down Expand Up @@ -75,7 +79,7 @@ func EntryPrefix(field string) []byte {
return bytes.Join([][]byte{idxEntryPrefix, []byte(field), {}}, []byte{0})
}

// EntryTypePrefix get prefix for all entries for a single field
// EntryTypePrefix get prefix for all entries for a single field an a single type
func EntryTypePrefix(field string, ttype TermType) []byte {
return bytes.Join([][]byte{idxEntryPrefix, []byte(field), {byte(ttype)}, {}}, []byte{0})
}
Expand Down
Loading
Loading