Skip to content

Commit

Permalink
slow notify init
Browse files Browse the repository at this point in the history
  • Loading branch information
absolutelightning committed Jun 15, 2024
1 parent ecc3510 commit 70a434e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 20 deletions.
3 changes: 1 addition & 2 deletions tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,7 @@ func TestTrackMutate_SeekPrefixWatch(t *testing.T) {
txn := r.Txn()
txn.TrackMutate(true)
txn.Insert([]byte("foobarbaz"), nil)

switch i {
case 0:
r = txn.Commit()
Expand All @@ -1274,8 +1275,6 @@ func TestTrackMutate_SeekPrefixWatch(t *testing.T) {
if hasAnyClosedMutateCh(r) {
t.Fatalf("bad")
}
fmt.Println("from test")
fmt.Println(rootWatch)

// Verify root and parent triggered, and leaf affected
select {
Expand Down
29 changes: 11 additions & 18 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package adaptive

import (
"bytes"
"fmt"
"strings"
)

const defaultModifiedCache = 8192
Expand Down Expand Up @@ -413,23 +411,29 @@ func (t *Txn[T]) slowNotify() {
return
}
snapElem := snapIter.Front()
cmp := 0

// If we've exhausted the nodes in the new root, we know we need
// to invalidate everything that remains in the old snapshot. We
// know from the loop condition there's something in the old
// snapshot.
if rootIter.Front() == nil {
close(snapElem.getMutateCh())
if !isClosed(snapElem.getMutateCh()) {
close(snapElem.getMutateCh())
}
if snapElem.isLeaf() {
close(snapElem.getNodeLeaf().getMutateCh())
if !isClosed(snapElem.getNodeLeaf().getMutateCh()) {
close(snapElem.getNodeLeaf().getMutateCh())
}
}
snapIter.Next()
continue
}

// Do one string compare so we can check the various conditions
// below without repeating the compare.
cmp := strings.Compare(snapIter.Path(), rootIter.Path())
rootElem := rootIter.Front()
if snapElem.getId() < rootElem.getId() {
cmp = -1
}

// If the snapshot is behind the root, then we must have deleted
// this node during the transaction.
Expand All @@ -442,19 +446,9 @@ func (t *Txn[T]) slowNotify() {
continue
}

// If the snapshot is ahead of the root, then we must have added
// this node during the transaction.
if cmp > 0 {
rootIter.Next()
continue
}

// If we have the same path, then we need to see if we mutated a
// node and possibly the leaf.
rootElem := rootIter.Front()
if snapElem != rootElem {
fmt.Println("from slow notify")
fmt.Println(snapElem.getMutateCh())
close(snapElem.getMutateCh())
if snapElem.getNodeLeaf() != nil && (snapElem.getNodeLeaf() != rootElem.getNodeLeaf()) {
close(snapElem.getNodeLeaf().getMutateCh())
Expand Down Expand Up @@ -647,7 +641,6 @@ func (t *Txn[T]) trackChannel(node Node[T]) {
t.trackChnMap = make(map[chan struct{}]struct{})
}
t.trackChnMap[ch] = struct{}{}
node.setMutateCh(make(chan struct{}))
}

// isClosed returns true if the given channel is closed.
Expand Down

0 comments on commit 70a434e

Please sign in to comment.