This repository has been archived by the owner on Jan 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathmemorydatabase.go
91 lines (81 loc) · 2.78 KB
/
memorydatabase.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package naffka
import (
"fmt"
"sync"
"github.com/matrix-org/naffka/types"
)
// A MemoryDatabase stores the message history as arrays in memory.
// It can be used to run unit tests.
// If the process is stopped then any messages that haven't been
// processed by a consumer are lost forever and all offsets become
// invalid.
type MemoryDatabase struct {
topicsMutex sync.Mutex
topics map[string]*memoryDatabaseTopic
}
type memoryDatabaseTopic struct {
messagesMutex sync.Mutex
messages []types.Message
}
func (t *memoryDatabaseTopic) addMessages(msgs []types.Message) error {
t.messagesMutex.Lock()
defer t.messagesMutex.Unlock()
if int64(len(t.messages)) != msgs[0].Offset {
return fmt.Errorf("message offset %d is not immediately after the previous offset %d", msgs[0].Offset, len(t.messages))
}
t.messages = append(t.messages, msgs...)
return nil
}
// getMessages returns the current messages as a slice.
// This slice will have it's own copy of the length field so won't be affected
// by adding more messages in addMessages.
// The slice will share the same backing array with the slice we append new
// messages to. It is safe to read the messages in the backing array since we
// only append to the slice. It is not safe to write or append to the returned
// slice.
func (t *memoryDatabaseTopic) getMessages() []types.Message {
t.messagesMutex.Lock()
defer t.messagesMutex.Unlock()
return t.messages
}
func (m *MemoryDatabase) getTopic(topicName string) *memoryDatabaseTopic {
m.topicsMutex.Lock()
defer m.topicsMutex.Unlock()
result := m.topics[topicName]
if result == nil {
result = &memoryDatabaseTopic{}
if m.topics == nil {
m.topics = map[string]*memoryDatabaseTopic{}
}
m.topics[topicName] = result
}
return result
}
// StoreMessages implements Database
func (m *MemoryDatabase) StoreMessages(topic string, messages []types.Message) error {
return m.getTopic(topic).addMessages(messages)
}
// FetchMessages implements Database
func (m *MemoryDatabase) FetchMessages(topic string, startOffset, endOffset int64) ([]types.Message, error) {
messages := m.getTopic(topic).getMessages()
if endOffset > int64(len(messages)) {
return nil, fmt.Errorf("end offset %d out of range %d", endOffset, len(messages))
}
if startOffset >= endOffset {
return nil, fmt.Errorf("start offset %d greater than or equal to end offset %d", startOffset, endOffset)
}
if startOffset < 0 {
return nil, fmt.Errorf("start offset %d less than 0", startOffset)
}
return messages[startOffset:endOffset], nil
}
// MaxOffsets implements Database
func (m *MemoryDatabase) MaxOffsets() (map[string]int64, error) {
m.topicsMutex.Lock()
defer m.topicsMutex.Unlock()
result := map[string]int64{}
for name, t := range m.topics {
result[name] = int64(len(t.getMessages())) - 1
}
return result, nil
}