Skip to content

Commit

Permalink
Allow WriteOptions to be given if desired.
Browse files Browse the repository at this point in the history
  • Loading branch information
Psykar committed Aug 24, 2020
1 parent d618510 commit 989e5fc
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 20 deletions.
17 changes: 9 additions & 8 deletions prefix_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// prefixDelimiter defines the delimiter used to separate a prefix from an
Expand Down Expand Up @@ -71,7 +72,7 @@ func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) {
}

// Enqueue adds an item to the queue.
func (pq *PrefixQueue) Enqueue(prefix, value []byte) (*Item, error) {
func (pq *PrefixQueue) Enqueue(prefix, value []byte, opts ...*opt.WriteOptions) (*Item, error) {
pq.Lock()
defer pq.Unlock()

Expand All @@ -94,7 +95,7 @@ func (pq *PrefixQueue) Enqueue(prefix, value []byte) (*Item, error) {
}

// Add it to the queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -248,7 +249,7 @@ func (pq *PrefixQueue) PeekByIDString(prefix string, id uint64) (*Item, error) {
}

// Update updates an item in the given queue without changing its position.
func (pq *PrefixQueue) Update(prefix []byte, id uint64, newValue []byte) (*Item, error) {
func (pq *PrefixQueue) Update(prefix []byte, id uint64, newValue []byte, opts ...*opt.WriteOptions) (*Item, error) {
pq.Lock()
defer pq.Unlock()

Expand Down Expand Up @@ -276,7 +277,7 @@ func (pq *PrefixQueue) Update(prefix []byte, id uint64, newValue []byte) (*Item,
}

// Update this item in the queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -392,7 +393,7 @@ func (pq *PrefixQueue) getOrCreateQueue(prefix []byte) (*queue, error) {
}

// savePrefixQueue saves the given queue for the given prefix.
func (pq *PrefixQueue) saveQueue(prefix []byte, q *queue) error {
func (pq *PrefixQueue) saveQueue(prefix []byte, q *queue, opts ...*opt.WriteOptions) error {
// Encode the queue using gob.
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
Expand All @@ -401,14 +402,14 @@ func (pq *PrefixQueue) saveQueue(prefix []byte, q *queue) error {
}

// Save it to the database.
return pq.db.Put(generateKeyPrefixData(prefix), buffer.Bytes(), nil)
return pq.db.Put(generateKeyPrefixData(prefix), buffer.Bytes(), getOpts(opts))
}

// save saves the main prefix queue data.
func (pq *PrefixQueue) save() error {
func (pq *PrefixQueue) save(opts ...*opt.WriteOptions) error {
val := make([]byte, 8)
binary.BigEndian.PutUint64(val, pq.size)
return pq.db.Put(pq.getDataKey(), val, nil)
return pq.db.Put(pq.getDataKey(), val, getOpts(opts))
}

// getDataKey generates the main prefix queue data key.
Expand Down
9 changes: 5 additions & 4 deletions priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

Expand Down Expand Up @@ -82,7 +83,7 @@ func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
}

// Enqueue adds an item to the priority queue.
func (pq *PriorityQueue) Enqueue(priority uint8, value []byte) (*PriorityItem, error) {
func (pq *PriorityQueue) Enqueue(priority uint8, value []byte, opts ...*opt.WriteOptions) (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

Expand All @@ -103,7 +104,7 @@ func (pq *PriorityQueue) Enqueue(priority uint8, value []byte) (*PriorityItem, e
}

// Add it to the priority queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -264,7 +265,7 @@ func (pq *PriorityQueue) PeekByPriorityID(priority uint8, id uint64) (*PriorityI

// Update updates an item in the priority queue without changing its
// position.
func (pq *PriorityQueue) Update(priority uint8, id uint64, newValue []byte) (*PriorityItem, error) {
func (pq *PriorityQueue) Update(priority uint8, id uint64, newValue []byte, opts ...*opt.WriteOptions) (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

Expand All @@ -287,7 +288,7 @@ func (pq *PriorityQueue) Update(priority uint8, id uint64, newValue []byte) (*Pr
}

// Update this item in the queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down
9 changes: 5 additions & 4 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// Queue is a standard FIFO (first in, first out) queue.
Expand Down Expand Up @@ -55,7 +56,7 @@ func OpenQueue(dataDir string) (*Queue, error) {
}

// Enqueue adds an item to the queue.
func (q *Queue) Enqueue(value []byte) (*Item, error) {
func (q *Queue) Enqueue(value []byte, opts ...*opt.WriteOptions) (*Item, error) {
q.Lock()
defer q.Unlock()

Expand All @@ -72,7 +73,7 @@ func (q *Queue) Enqueue(value []byte) (*Item, error) {
}

// Add it to the queue.
if err := q.db.Put(item.Key, item.Value, nil); err != nil {
if err := q.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (q *Queue) PeekByID(id uint64) (*Item, error) {
}

// Update updates an item in the queue without changing its position.
func (q *Queue) Update(id uint64, newValue []byte) (*Item, error) {
func (q *Queue) Update(id uint64, newValue []byte, opts ...*opt.WriteOptions) (*Item, error) {
q.Lock()
defer q.Unlock()

Expand All @@ -210,7 +211,7 @@ func (q *Queue) Update(id uint64, newValue []byte) (*Item, error) {
}

// Update this item in the queue.
if err := q.db.Put(item.Key, item.Value, nil); err != nil {
if err := q.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down
9 changes: 5 additions & 4 deletions stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// Stack is a standard LIFO (last in, first out) stack.
Expand Down Expand Up @@ -55,7 +56,7 @@ func OpenStack(dataDir string) (*Stack, error) {
}

// Push adds an item to the stack.
func (s *Stack) Push(value []byte) (*Item, error) {
func (s *Stack) Push(value []byte, opts ...*opt.WriteOptions) (*Item, error) {
s.Lock()
defer s.Unlock()

Expand All @@ -72,7 +73,7 @@ func (s *Stack) Push(value []byte) (*Item, error) {
}

// Add it to the stack.
if err := s.db.Put(item.Key, item.Value, nil); err != nil {
if err := s.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (s *Stack) PeekByID(id uint64) (*Item, error) {
}

// Update updates an item in the stack without changing its position.
func (s *Stack) Update(id uint64, newValue []byte) (*Item, error) {
func (s *Stack) Update(id uint64, newValue []byte, opts ...*opt.WriteOptions) (*Item, error) {
s.Lock()
defer s.Unlock()

Expand All @@ -210,7 +211,7 @@ func (s *Stack) Update(id uint64, newValue []byte) (*Item, error) {
}

// Update this item in the stack.
if err := s.db.Put(item.Key, item.Value, nil); err != nil {
if err := s.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down

0 comments on commit 989e5fc

Please sign in to comment.