Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow WriteOptions to be given if desired. #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package goque

import "github.com/syndtr/goleveldb/leveldb/opt"

func getOpts(o []*opt.WriteOptions) *opt.WriteOptions {
if len(o) == 1 {
return o[1]
}
return nil
}
17 changes: 9 additions & 8 deletions prefix_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// prefixDelimiter defines the delimiter used to separate a prefix from an
Expand Down Expand Up @@ -71,7 +72,7 @@ func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) {
}

// Enqueue adds an item to the queue.
func (pq *PrefixQueue) Enqueue(prefix, value []byte) (*Item, error) {
func (pq *PrefixQueue) Enqueue(prefix, value []byte, opts ...*opt.WriteOptions) (*Item, error) {
pq.Lock()
defer pq.Unlock()

Expand All @@ -94,7 +95,7 @@ func (pq *PrefixQueue) Enqueue(prefix, value []byte) (*Item, error) {
}

// Add it to the queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -248,7 +249,7 @@ func (pq *PrefixQueue) PeekByIDString(prefix string, id uint64) (*Item, error) {
}

// Update updates an item in the given queue without changing its position.
func (pq *PrefixQueue) Update(prefix []byte, id uint64, newValue []byte) (*Item, error) {
func (pq *PrefixQueue) Update(prefix []byte, id uint64, newValue []byte, opts ...*opt.WriteOptions) (*Item, error) {
pq.Lock()
defer pq.Unlock()

Expand Down Expand Up @@ -276,7 +277,7 @@ func (pq *PrefixQueue) Update(prefix []byte, id uint64, newValue []byte) (*Item,
}

// Update this item in the queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -392,7 +393,7 @@ func (pq *PrefixQueue) getOrCreateQueue(prefix []byte) (*queue, error) {
}

// savePrefixQueue saves the given queue for the given prefix.
func (pq *PrefixQueue) saveQueue(prefix []byte, q *queue) error {
func (pq *PrefixQueue) saveQueue(prefix []byte, q *queue, opts ...*opt.WriteOptions) error {
// Encode the queue using gob.
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
Expand All @@ -401,14 +402,14 @@ func (pq *PrefixQueue) saveQueue(prefix []byte, q *queue) error {
}

// Save it to the database.
return pq.db.Put(generateKeyPrefixData(prefix), buffer.Bytes(), nil)
return pq.db.Put(generateKeyPrefixData(prefix), buffer.Bytes(), getOpts(opts))
}

// save saves the main prefix queue data.
func (pq *PrefixQueue) save() error {
func (pq *PrefixQueue) save(opts ...*opt.WriteOptions) error {
val := make([]byte, 8)
binary.BigEndian.PutUint64(val, pq.size)
return pq.db.Put(pq.getDataKey(), val, nil)
return pq.db.Put(pq.getDataKey(), val, getOpts(opts))
}

// getDataKey generates the main prefix queue data key.
Expand Down
9 changes: 5 additions & 4 deletions priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

Expand Down Expand Up @@ -82,7 +83,7 @@ func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
}

// Enqueue adds an item to the priority queue.
func (pq *PriorityQueue) Enqueue(priority uint8, value []byte) (*PriorityItem, error) {
func (pq *PriorityQueue) Enqueue(priority uint8, value []byte, opts ...*opt.WriteOptions) (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

Expand All @@ -103,7 +104,7 @@ func (pq *PriorityQueue) Enqueue(priority uint8, value []byte) (*PriorityItem, e
}

// Add it to the priority queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -264,7 +265,7 @@ func (pq *PriorityQueue) PeekByPriorityID(priority uint8, id uint64) (*PriorityI

// Update updates an item in the priority queue without changing its
// position.
func (pq *PriorityQueue) Update(priority uint8, id uint64, newValue []byte) (*PriorityItem, error) {
func (pq *PriorityQueue) Update(priority uint8, id uint64, newValue []byte, opts ...*opt.WriteOptions) (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

Expand All @@ -287,7 +288,7 @@ func (pq *PriorityQueue) Update(priority uint8, id uint64, newValue []byte) (*Pr
}

// Update this item in the queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
if err := pq.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down
9 changes: 5 additions & 4 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// Queue is a standard FIFO (first in, first out) queue.
Expand Down Expand Up @@ -55,7 +56,7 @@ func OpenQueue(dataDir string) (*Queue, error) {
}

// Enqueue adds an item to the queue.
func (q *Queue) Enqueue(value []byte) (*Item, error) {
func (q *Queue) Enqueue(value []byte, opts ...*opt.WriteOptions) (*Item, error) {
q.Lock()
defer q.Unlock()

Expand All @@ -72,7 +73,7 @@ func (q *Queue) Enqueue(value []byte) (*Item, error) {
}

// Add it to the queue.
if err := q.db.Put(item.Key, item.Value, nil); err != nil {
if err := q.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (q *Queue) PeekByID(id uint64) (*Item, error) {
}

// Update updates an item in the queue without changing its position.
func (q *Queue) Update(id uint64, newValue []byte) (*Item, error) {
func (q *Queue) Update(id uint64, newValue []byte, opts ...*opt.WriteOptions) (*Item, error) {
q.Lock()
defer q.Unlock()

Expand All @@ -210,7 +211,7 @@ func (q *Queue) Update(id uint64, newValue []byte) (*Item, error) {
}

// Update this item in the queue.
if err := q.db.Put(item.Key, item.Value, nil); err != nil {
if err := q.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down
9 changes: 5 additions & 4 deletions stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// Stack is a standard LIFO (last in, first out) stack.
Expand Down Expand Up @@ -55,7 +56,7 @@ func OpenStack(dataDir string) (*Stack, error) {
}

// Push adds an item to the stack.
func (s *Stack) Push(value []byte) (*Item, error) {
func (s *Stack) Push(value []byte, opts ...*opt.WriteOptions) (*Item, error) {
s.Lock()
defer s.Unlock()

Expand All @@ -72,7 +73,7 @@ func (s *Stack) Push(value []byte) (*Item, error) {
}

// Add it to the stack.
if err := s.db.Put(item.Key, item.Value, nil); err != nil {
if err := s.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -188,7 +189,7 @@ func (s *Stack) PeekByID(id uint64) (*Item, error) {
}

// Update updates an item in the stack without changing its position.
func (s *Stack) Update(id uint64, newValue []byte) (*Item, error) {
func (s *Stack) Update(id uint64, newValue []byte, opts ...*opt.WriteOptions) (*Item, error) {
s.Lock()
defer s.Unlock()

Expand All @@ -210,7 +211,7 @@ func (s *Stack) Update(id uint64, newValue []byte) (*Item, error) {
}

// Update this item in the stack.
if err := s.db.Put(item.Key, item.Value, nil); err != nil {
if err := s.db.Put(item.Key, item.Value, getOpts(opts)); err != nil {
return nil, err
}

Expand Down