-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredis_srv.go
200 lines (162 loc) · 4.47 KB
/
redis_srv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Redis service
package main
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"os"
"strings"
"time"
"github.com/go-redis/redis/v8"
)
// This channel is used to broadcast messages to all the nodes
const REDIS_BROADCAST_CHANNEL = "webrtc_cdn"
// Setup redis client to receive messages
func setupRedisListener(node *WebRTC_CDN_Node) {
if node.standAlone {
return
}
defer func() {
if err := recover(); err != nil {
switch x := err.(type) {
case string:
LogError(errors.New(x))
case error:
LogError(x)
default:
LogError(errors.New("could not connect to redis"))
}
}
LogWarning("Connection to Redis lost!")
}()
// Load configuration
redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" {
redisHost = "localhost"
}
redisPort := os.Getenv("REDIS_PORT")
if redisPort == "" {
redisPort = "6379"
}
redisPassword := os.Getenv("REDIS_PASSWORD")
redisTLS := os.Getenv("REDIS_TLS")
ctx := context.Background()
// Connect
var redisClient *redis.Client
if redisTLS == "YES" {
redisClient = redis.NewClient(&redis.Options{
Addr: redisHost + ":" + redisPort,
Password: redisPassword,
TLSConfig: &tls.Config{},
})
} else {
redisClient = redis.NewClient(&redis.Options{
Addr: redisHost + ":" + redisPort,
Password: redisPassword,
})
}
// Subscribe to the channels
subscriber := redisClient.Subscribe(ctx, REDIS_BROADCAST_CHANNEL, node.id)
LogInfo("[REDIS] Listening for commands on channels '" + REDIS_BROADCAST_CHANNEL + "', '" + node.id + "'")
for {
msg, err := subscriber.ReceiveMessage(ctx) // Receive message
if err != nil {
LogWarning("Could not connect to Redis: " + err.Error())
time.Sleep(10 * time.Second)
} else {
// Parse message
node.receiveRedisMessage(msg.Payload)
}
}
}
// Parses messages received from redis
// and calls the corresponding functions
func (node *WebRTC_CDN_Node) receiveRedisMessage(msg string) {
msgData := map[string]string{}
// Decode message
json.Unmarshal([]byte(msg), &msgData)
msgType := strings.ToUpper(msgData["type"])
msgSource := msgData["src"]
if msgSource == node.id {
return // Ignore messages from self
}
switch msgType {
case "RESOLVE":
sid := msgData["sid"]
if node.resolveSource(sid) {
node.sendInfoMessage(msgSource, sid) // Tell the node who asked that we have that source
}
case "INFO":
sid := msgData["sid"]
node.receiveInfoMessage(msgSource, sid)
case "CONNECT":
sid := msgData["sid"]
node.receiveConnectMessage(msgSource, sid)
case "OFFER":
sid := msgData["sid"]
data := msgData["data"]
hasVideo := (msgData["video"] == "true")
hasAudio := (msgData["audio"] == "true")
node.receiveOfferMessage(sid, data, hasVideo, hasAudio)
case "ANSWER":
sid := msgData["sid"]
data := msgData["data"]
node.receiveAnswerMessage(msgSource, sid, data)
case "CANDIDATE":
sid := msgData["sid"]
data := msgData["data"]
node.receiveCandidateMessage(msgSource, sid, data)
}
}
// Sends a redis message
func (node *WebRTC_CDN_Node) sendRedisMessage(channel string, msg *map[string]string) {
if node.standAlone {
return
}
b, e := json.Marshal(msg)
if e != nil {
LogError(e)
return
}
node.mutexRedisSend.Lock()
defer node.mutexRedisSend.Unlock()
r := node.redisClient.Publish(context.Background(), channel, string(b))
if r != nil && r.Err() != nil {
LogError(r.Err())
} else {
LogDebug("[REDIS] [SENT] Channel: " + channel + " | Message: " + string(b))
}
}
// Sends an INFO message to other node(s)
// This message makes them aware the node has a WebRTC source
// for the specified Stream ID (sid)
func (node *WebRTC_CDN_Node) sendInfoMessage(channel string, sid string) {
mp := make(map[string]string)
mp["type"] = "INFO"
mp["src"] = node.id
mp["sid"] = sid
node.sendRedisMessage(channel, &mp)
}
// Sends a RESOLVE message
// This message asks other nodes if they have a WebRTC source
// for the specified Stream ID (sid)
// They will respond with INFO if they have it
func (node *WebRTC_CDN_Node) sendResolveMessage(sid string) {
mp := make(map[string]string)
mp["type"] = "RESOLVE"
mp["src"] = node.id
mp["sid"] = sid
node.sendRedisMessage(REDIS_BROADCAST_CHANNEL, &mp)
}
// Sends a CONNECT message
// This message asks a node to open a connection
// to receive an external WebRTC source
func (node *WebRTC_CDN_Node) sendConnectMessage(dst string, sid string) {
mp := make(map[string]string)
mp["type"] = "CONNECT"
mp["src"] = node.id
mp["dst"] = dst
mp["sid"] = sid
node.sendRedisMessage(dst, &mp)
}