-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsume_heartbeat_test.go
131 lines (119 loc) · 2.88 KB
/
consume_heartbeat_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package grq
import (
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/redis/go-redis/v9"
)
const testHeartbeatQueue = "testHeartBeat"
func TestGenerateOfflineQueue(t *testing.T) {
//t.SkipNow()
rq3, err := NewFromOptions(testHeartbeatQueue, redis.Options{
Network: "tcp",
Addr: "127.0.0.1:6379",
})
if err != nil {
t.Error(err)
}
t.Logf("Offline publisher %s started...", rq3.GetID())
for i := 0; i < testSendLimit; i++ {
err = rq3.Publish(fmt.Sprintf("task %v created on %s", i, time.Now().Format(time.Stamp)))
if err != nil {
t.Error(err)
}
t.Logf("Task %v published", i)
}
err = rq3.PublishFirst("this task will be executed as first one")
if err != nil {
t.Errorf("%s : while publishing 1st task", err)
}
err = rq3.Cancel()
if err != nil {
if err.Error() != fmt.Sprintf("consumer %s is not running", testHeartbeatQueue) {
t.Error(err)
}
}
_, err = rq3.Age()
if err != nil {
if !strings.HasPrefix(err.Error(), "consumer") {
t.Error(err)
}
if !strings.HasSuffix(err.Error(), fmt.Sprintf(" of queue %s is not running", testHeartbeatQueue)) {
t.Error(err)
}
}
err = rq3.Close()
if err != nil {
t.Error(err)
}
t.Logf("Tasks are created offline publisher is stopped")
}
func TestHeartbeat(t *testing.T) {
rq4, err := NewFromConnectionString(testHeartbeatQueue, DefaultConnectionString)
if err != nil {
t.Errorf("%s : while connecting to redis", err)
}
first, found, err := rq4.GetTask()
if err != nil {
t.Errorf("%s : while getting first task", err)
}
if !found {
t.Errorf("first task not found?")
}
if first != "this task will be executed as first one" {
t.Errorf("wrong first task payload")
}
t.Logf("First task payload is %s", first)
rq4.SetHeartbeat(10 * time.Millisecond) // fast
t.Logf("Consumer is starting...")
hbwg := sync.WaitGroup{}
hbwg.Add(1)
go func() {
feed, err := rq4.Consume()
if err != nil {
t.Error(err)
}
t.Logf("Feed created...")
t.Log(feed)
var i = 0
for msg := range feed {
t.Logf("Message %v received with payload >%s<", i, msg)
i++
if i == testSendLimit {
break
}
}
t.Logf("We recovered %v messages from abandomed queue %s", i, rq4.GetQueueName())
hbwg.Done()
}()
hbwg.Wait()
payload, found, err := rq4.GetTask()
if payload != "" {
t.Errorf("payload %s extracted from empty channel %s", payload, rq4.GetQueueName())
}
if found {
t.Errorf("something extracted from empty channel %s", rq4.GetQueueName())
}
if err != nil {
t.Error(err)
}
err = rq4.Close()
if err != nil {
t.Error(err)
}
err = rq4.PublishFirst("it will be rejected")
if err != nil {
if err.Error() != "redis: client is closed" {
t.Errorf("%s : while publishing first task to be rejected because of closed channel", err)
}
}
// close closet client one more time to be sure
err = rq4.Close()
if err != nil {
if err.Error() != "redis: client is closed" {
t.Error(err)
}
}
}