Skip to content

Commit

Permalink
fix: taint master node in leave host test
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Oct 19, 2024
1 parent f96fb35 commit 75b8d03
Show file tree
Hide file tree
Showing 17 changed files with 134 additions and 81 deletions.
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
}

host := &schedulerv1.PeerHost{
Id: idgen.HostIDV2(opt.Host.AdvertiseIP.String(), opt.Host.Hostname),
Id: idgen.HostIDV2(opt.Host.AdvertiseIP.String(), opt.Host.Hostname, opt.Scheduler.Manager.SeedPeer.Enable),
Ip: opt.Host.AdvertiseIP.String(),
RpcPort: int32(opt.Download.PeerGRPC.TCPListen.PortRange.Start),
DownPort: 0,
Expand Down
7 changes: 6 additions & 1 deletion manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/types"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
)

Expand Down Expand Up @@ -196,7 +197,11 @@ func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler,

// If the peer exists in the sync peer results, update the peer data in the database with
// the sync peer results and delete the sync peer from the sync peers map.
id := idgen.HostIDV2(peer.IP, peer.Hostname)
isSeedPeer := false
if types.ParseHostType(peer.Type) != types.HostTypeNormal {
isSeedPeer = true
}
id := idgen.HostIDV2(peer.IP, peer.Hostname, isSeedPeer)
if syncPeer, ok := syncPeers[id]; ok {
if err := s.db.WithContext(ctx).First(&models.Peer{}, peer.ID).Updates(models.Peer{
Type: syncPeer.Type.Name(),
Expand Down
9 changes: 5 additions & 4 deletions pkg/idgen/host_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package idgen

import (
"fmt"

"d7y.io/dragonfly/v2/pkg/digest"
)

// HostIDV1 generates v1 version of host id.
Expand All @@ -28,6 +26,9 @@ func HostIDV1(hostname string, port int32) string {
}

// HostIDV2 generates v2 version of host id.
func HostIDV2(ip, hostname string) string {
return digest.SHA256FromStrings(ip, hostname)
func HostIDV2(ip, hostname string, isSeedPeer bool) string {
if isSeedPeer {
return fmt.Sprintf("%s-%s-seed", ip, hostname)
}
return fmt.Sprintf("%s-%s", ip, hostname)
}
48 changes: 31 additions & 17 deletions pkg/idgen/host_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,57 @@ func TestHostIDV1(t *testing.T) {

func TestHostIDV2(t *testing.T) {
tests := []struct {
name string
ip string
hostname string
expect func(t *testing.T, d string)
name string
ip string
hostname string
isSeedPeer bool
expect func(t *testing.T, d string)
}{
{
name: "generate HostID",
ip: "127.0.0.1",
hostname: "foo",
name: "generate HostID for peer",
ip: "127.0.0.1",
hostname: "foo",
isSeedPeer: false,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "52727e8408e0ee1f999086f241ec43d5b3dbda666f1a06ef1fcbe75b4e90fa17")
assert.Equal(d, "127.0.0.1-foo")
},
},
{
name: "generate HostID with empty ip",
ip: "",
hostname: "foo",
name: "generate HostID for seed peer",
ip: "127.0.0.1",
hostname: "foo",
isSeedPeer: true,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")
assert.Equal(d, "127.0.0.1-foo-seed")
},
},
{
name: "generate HostID with empty host",
ip: "127.0.0.1",
hostname: "",
name: "generate HostID with empty ip for seed peer",
ip: "",
hostname: "foo",
isSeedPeer: true,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "-foo-seed")
},
},
{
name: "generate HostID with empty host for seed peer",
ip: "127.0.0.1",
hostname: "",
isSeedPeer: true,
expect: func(t *testing.T, d string) {
assert := assert.New(t)
assert.Equal(d, "12ca17b49af2289436f303e0166030a21e525d266e209267433801a8fd4071a0")
assert.Equal(d, "127.0.0.1--seed")
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, HostIDV2(tc.ip, tc.hostname))
tc.expect(t, HostIDV2(tc.ip, tc.hostname, tc.isSeedPeer))
})
}
}
4 changes: 4 additions & 0 deletions scheduler/resource/standard/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,7 @@ func (h *Host) LeavePeers() {
func (h *Host) FreeUploadCount() int32 {
return h.ConcurrentUploadLimit.Load() - h.ConcurrentUploadCount.Load()
}

func (h *Host) IsSeedPeer() bool {
return h.Type == types.HostTypeSuperSeed || h.Type == types.HostTypeStrongSeed || h.Type == types.HostTypeWeakSeed
}
4 changes: 2 additions & 2 deletions scheduler/resource/standard/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ var (

mockAnnounceInterval = 5 * time.Minute

mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "baz"
mockHostIDC = "bas"
)
Expand Down
4 changes: 2 additions & 2 deletions scheduler/resource/standard/seed_peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package standard
import (
"context"
"fmt"
reflect "reflect"
"reflect"

"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
Expand Down Expand Up @@ -156,7 +156,7 @@ func (sc *seedPeerClient) updateSeedPeersForHostManager(seedPeers []*managerv2.S
concurrentUploadLimit = int32(config.LoadLimit)
}

id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname)
id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname, true)
seedPeerHost, loaded := sc.hostManager.Load(id)
if !loaded {
options := []HostOption{WithNetwork(Network{
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduling/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ var (
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "bas"
mockHostIDC = "baz"
mockPeerID = idgen.PeerIDV2()
Expand Down
10 changes: 5 additions & 5 deletions scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ var (
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "baz"
mockHostIDC = "bas"
mockPeerID = idgen.PeerIDV2()
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func TestScheduling_FindCandidateParents(t *testing.T) {
var mockPeers []*resource.Peer
for i := 0; i < 11; i++ {
mockHost := resource.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname,
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer)
Expand Down Expand Up @@ -1357,7 +1357,7 @@ func TestScheduling_FindParentAndCandidateParents(t *testing.T) {
var mockPeers []*resource.Peer
for i := 0; i < 11; i++ {
mockHost := resource.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname,
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer)
Expand Down Expand Up @@ -1618,7 +1618,7 @@ func TestScheduling_FindSuccessParent(t *testing.T) {
var mockPeers []*resource.Peer
for i := 0; i < 11; i++ {
mockHost := resource.NewHost(
idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname,
idgen.HostIDV2("127.0.0.1", uuid.New().String(), false), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer)
Expand Down
6 changes: 3 additions & 3 deletions scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ var (
mockTaskFilteredQueryParams = []string{"bar"}
mockTaskHeader = map[string]string{"Content-Length": "100", "Range": "bytes=0-99"}
mockTaskPieceLength int32 = 2048
mockHostID = idgen.HostIDV2("127.0.0.1", "foo")
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar")
mockHostID = idgen.HostIDV2("127.0.0.1", "foo", false)
mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar", true)
mockHostLocation = "bas"
mockHostIDC = "baz"
mockPeerID = idgen.PeerIDV2()
Expand Down Expand Up @@ -2559,7 +2559,7 @@ func TestServiceV1_LeaveHost(t *testing.T) {

tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT())
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{
Id: idgen.HostIDV2(host.IP, host.Hostname),
Id: idgen.HostIDV2(host.IP, host.Hostname, true),
}))
})
}
Expand Down
1 change: 1 addition & 0 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest)

// Leave peers in host.
host.LeavePeers()
v.resource.HostManager().Delete(req.GetHostId())
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions scheduler/service/service_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,8 @@ func TestServiceV2_DeleteHost(t *testing.T) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(host, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Delete(gomock.Any()).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, err error) {
Expand All @@ -1026,6 +1028,8 @@ func TestServiceV2_DeleteHost(t *testing.T) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Any()).Return(host, true).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Delete(gomock.Any()).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, err error) {
Expand Down
16 changes: 8 additions & 8 deletions test/e2e/v2/containerd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ var _ = Describe("Containerd with CRI support", func() {
},
}

clientPods, err := util.ClientExecAll()
clientPod, err := util.ClientExec()
fmt.Println(err)
Expect(err).NotTo(HaveOccurred())
for _, taskMetadata := range taskMetadatas {
sha256sum, err := util.CalculateSha256ByTaskID(clientPods, taskMetadata.ID)
sha256sum, err := util.CalculateSha256ByTaskID([]*util.PodExec{clientPod}, taskMetadata.ID)
Expect(err).NotTo(HaveOccurred())
Expect(taskMetadata.Sha256).To(Equal(sha256sum))
}
Expand Down Expand Up @@ -115,12 +115,12 @@ var _ = Describe("Containerd with CRI support", func() {
},
}

clientPods, err := util.ClientExecAll()
clientPod, err := util.ClientExec()
fmt.Println(err)
Expect(err).NotTo(HaveOccurred())

for _, taskMetadata := range taskMetadatas {
sha256sum, err := util.CalculateSha256ByTaskID(clientPods, taskMetadata.ID)
sha256sum, err := util.CalculateSha256ByTaskID([]*util.PodExec{clientPod}, taskMetadata.ID)
Expect(err).NotTo(HaveOccurred())
Expect(taskMetadata.Sha256).To(Equal(sha256sum))
}
Expand Down Expand Up @@ -179,12 +179,12 @@ var _ = Describe("Containerd with CRI support", func() {
},
}

clientPods, err := util.ClientExecAll()
clientPod, err := util.ClientExec()
fmt.Println(err)
Expect(err).NotTo(HaveOccurred())

for _, taskMetadata := range taskMetadatas {
sha256sum, err := util.CalculateSha256ByTaskID(clientPods, taskMetadata.ID)
sha256sum, err := util.CalculateSha256ByTaskID([]*util.PodExec{clientPod}, taskMetadata.ID)
Expect(err).NotTo(HaveOccurred())
Expect(taskMetadata.Sha256).To(Equal(sha256sum))
}
Expand Down Expand Up @@ -231,12 +231,12 @@ var _ = Describe("Containerd with CRI support", func() {
},
}

clientPods, err := util.ClientExecAll()
clientPod, err := util.ClientExec()
fmt.Println(err)
Expect(err).NotTo(HaveOccurred())

for _, taskMetadata := range taskMetadatas {
sha256sum, err := util.CalculateSha256ByTaskID(clientPods, taskMetadata.ID)
sha256sum, err := util.CalculateSha256ByTaskID([]*util.PodExec{clientPod}, taskMetadata.ID)
Expect(err).NotTo(HaveOccurred())
Expect(taskMetadata.Sha256).To(Equal(sha256sum))
}
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/v2/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"
"testing"
"time"

. "github.com/onsi/ginkgo/v2" //nolint
. "github.com/onsi/gomega" //nolint
Expand Down Expand Up @@ -86,6 +87,8 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())
gitCommit := strings.Fields(string(rawGitCommit))[0]
fmt.Printf("git commit: %s\n", gitCommit)
// Wait for peers to start and announce.
time.Sleep(5 * time.Minute)
})

// TestE2E is the root of e2e test function
Expand Down
Loading

0 comments on commit 75b8d03

Please sign in to comment.