-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
126 lines (117 loc) · 3.16 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package pegasus
import (
"crypto/rand"
"fmt"
"math/big"
"sync"
"time"
"6.824/labrpc"
)
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
ck.client_id = nrand()
ck.logMsg(CK_SETUP, fmt.Sprintf("Clerk initialized with id %v", ck.client_id))
return ck
}
//
// shared by Get, Put and Append.
//
func (ck *Clerk) GetPutAppend(opArgs OpArgs) string {
for true {
opReply := OpReply{}
ck.logMsg(CK_GETPUTAPPEND, fmt.Sprintf("Sending %v req for key %v and val %v to currentLeader", opArgs.Op, opArgs.Key, opArgs.Value))
ok := ck.servers[ck.currentLeader].Call("KVServer.AddRaftOp", &opArgs, &opReply)
if ok {
if opReply.Err == ErrWrongLeader {
ck.logMsg(CK_GETPUTAPPEND, fmt.Sprintf("Contacted wrong leader (%v), updating leader list...", ck.currentLeader))
ck.updateCurrentLeader()
} else if opReply.Err == "" { // no errors
value := opReply.Value
ck.logMsg(CK_GETPUTAPPEND, fmt.Sprintf("Returning value %v for key %v!", value, opArgs.Key))
return value
} else {
// in all other errors, resend the request.
ck.logMsg(CK_GETPUTAPPEND, fmt.Sprintf("Got err %v, re-sending req!", opReply.Err))
}
} else {
ck.logMsg(CK_GETPUTAPPEND, "PutAppend RPC failed!")
ck.updateCurrentLeader()
}
}
return ""
}
func (ck *Clerk) Get(key string) string {
opArgs := OpArgs{
Key: key,
Value: "",
Op: GetVal,
RequestId: nrand(),
ClientId: ck.client_id,
}
return ck.GetPutAppend(opArgs)
}
func (ck *Clerk) Put(key string, value string) {
opArgs := OpArgs{
Key: key,
Value: value,
Op: PutVal,
RequestId: nrand(),
ClientId: ck.client_id,
}
ck.GetPutAppend(opArgs)
}
func (ck *Clerk) Append(key string, value string) {
opArgs := OpArgs{
Key: key,
Value: value,
Op: AppendVal,
RequestId: nrand(),
ClientId: ck.client_id,
}
ck.GetPutAppend(opArgs)
}
func (ck *Clerk) updateCurrentLeader() {
leaderFound := false
var mutex sync.Mutex
newLeader := make(chan int)
for i, server := range ck.servers {
go func(i int, server *labrpc.ClientEnd) {
for true {
findLeaderArgs := FindLeaderArgs{}
findLeaderReply := FindLeaderReply{}
ok := server.Call("KVServer.IsLeader", &findLeaderArgs, &findLeaderReply)
if ok {
if findLeaderReply.IsLeader {
mutex.Lock()
ck.logMsg(CK_UPDATE_LEADER, fmt.Sprintf("Found new leader K%v", i))
leaderFound = true
newLeader <- i
mutex.Unlock()
return
} else {
ck.logMsg(CK_UPDATE_LEADER, fmt.Sprintf("K%v is not the leader", i))
}
} else {
ck.logMsg(CK_UPDATE_LEADER, fmt.Sprintf("Failed to contact server K%v", i))
}
mutex.Lock()
exit := leaderFound
mutex.Unlock()
if exit {
return
}
// no one claims to be a leader. Wait for a while for an election, then try again.
ck.logMsg(CK_UPDATE_LEADER, fmt.Sprintf("Going to sleep since %v is not the leader", i))
time.Sleep(time.Millisecond * time.Duration(LEADER_WAIT))
}
}(i, server)
}
ck.currentLeader = <-newLeader
}