Skip to content

Commit

Permalink
feat: add E2E tests for cases that peers going offline
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Sep 26, 2024
1 parent 1afe79e commit a86b706
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 11 deletions.
2 changes: 1 addition & 1 deletion deploy/docker-compose/template/scheduler.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ scheduler:
# then the task will also be reclaimed.
taskGCInterval: 30m
# hostGCInterval is the interval of host gc.
hostGCInterval: 6h
hostGCInterval: 5m
# hostTTL is time to live of host. If host announces message to scheduler,
# then HostTTl will be reset.
hostTTL: 1h
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/scheduler/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
}, nil
}

// GetV1ByAddr returns v2 version of the scheduler client by address.
// GetV1ByAddr returns v1 version of the scheduler client by address.
func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
conn, err := grpc.DialContext(
ctx,
Expand Down
16 changes: 16 additions & 0 deletions pkg/rpc/scheduler/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/protobuf/types/known/emptypb"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
Expand Down Expand Up @@ -145,6 +146,9 @@ type V2 interface {
// AnnounceHost announces host to scheduler.
AnnounceHost(context.Context, *schedulerv2.AnnounceHostRequest, ...grpc.CallOption) error

// ListHosts lists hosts in scheduler.
ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error)

// DeleteHost releases host in scheduler.
DeleteHost(context.Context, *schedulerv2.DeleteHostRequest, ...grpc.CallOption) error

Expand Down Expand Up @@ -253,6 +257,18 @@ func (v *v2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
return eg.Wait()
}

// ListHosts lists host in all schedulers.
func (v *v2) ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.SchedulerClient.ListHosts(
context.WithValue(ctx, pkgbalancer.ContextKey, ""),
new(emptypb.Empty),
opts...,
)
}

// DeleteHost releases host in all schedulers.
func (v *v2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
Expand Down
20 changes: 20 additions & 0 deletions pkg/rpc/scheduler/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ var (
}, []string{"os", "platform", "platform_family", "platform_version",
"kernel_version", "git_version", "git_commit", "go_version", "build_platform"})

ListHostsCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "list_hosts_total",
Help: "Counter of the number of the list hosts.",
})

ListHostsCountFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "list_hosts_failure_total",
Help: "Counter of the number of failed of the list hosts.",
})

LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
12 changes: 10 additions & 2 deletions scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,18 @@ func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.A
return new(emptypb.Empty), nil
}

// TODO Implement the following methods.
// ListHosts lists hosts in scheduler.
func (s *schedulerServerV2) ListHosts(ctx context.Context, _ *emptypb.Empty) (*schedulerv2.ListHostsResponse, error) {
return nil, nil
// Collect ListHostsCount metrics.
metrics.ListHostsCount.Inc()
resp, err := s.service.ListHosts(ctx)
if err != nil {
// Collect ListHostsFailureCount metrics.
metrics.ListHostsCountFailureCount.Inc()
return nil, err
}

return resp, nil
}

// DeleteHost releases host in scheduler.
Expand Down
78 changes: 78 additions & 0 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,84 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
return nil
}

// ListHosts lists hosts in scheduler.
func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, error) {
hosts := v.resource.HostManager().LoadAll()

resHosts := make([]*commonv2.Host, len(hosts))
for i, host := range hosts {
resHosts[i] = &commonv2.Host{
Id: host.ID,
Type: uint32(host.Type),
Hostname: host.Hostname,
Ip: host.IP,
Port: host.Port,
DownloadPort: host.DownloadPort,
Os: host.OS,
Platform: host.Platform,
PlatformFamily: host.PlatformFamily,
PlatformVersion: host.PlatformVersion,
KernelVersion: host.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: host.CPU.LogicalCount,
PhysicalCount: host.CPU.PhysicalCount,
Percent: host.CPU.Percent,
ProcessPercent: host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: host.CPU.Times.User,
System: host.CPU.Times.System,
Idle: host.CPU.Times.Idle,
Nice: host.CPU.Times.Nice,
Iowait: host.CPU.Times.Iowait,
Irq: host.CPU.Times.Irq,
Softirq: host.CPU.Times.Softirq,
Steal: host.CPU.Times.Steal,
Guest: host.CPU.Times.Guest,
GuestNice: host.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: host.Memory.Total,
Available: host.Memory.Available,
Used: host.Memory.Used,
UsedPercent: host.Memory.UsedPercent,
ProcessUsedPercent: host.Memory.ProcessUsedPercent,
Free: host.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: host.Network.TCPConnectionCount,
UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount,
Location: &host.Network.Location,
Idc: &host.Network.IDC,
},
Disk: &commonv2.Disk{
Total: host.Disk.Total,
Free: host.Disk.Free,
Used: host.Disk.Used,
UsedPercent: host.Disk.UsedPercent,
InodesTotal: host.Disk.InodesTotal,
InodesUsed: host.Disk.InodesUsed,
InodesFree: host.Disk.InodesFree,
InodesUsedPercent: host.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: host.Build.GitVersion,
GitCommit: &host.Build.GitCommit,
GoVersion: &host.Build.GoVersion,
Platform: &host.Build.Platform,
},
SchedulerClusterId: host.SchedulerClusterID,
DisableShared: host.DisableShared,
}
}

resp := &schedulerv2.ListHostsResponse{
Hosts: resHosts,
}

return resp, nil
}

// DeleteHost releases host in scheduler.
func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest) error {
log := logger.WithHostID(req.GetHostId())
Expand Down
75 changes: 75 additions & 0 deletions test/e2e/v2/leave_host_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e

import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2" //nolint
. "github.com/onsi/gomega" //nolint
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
"d7y.io/dragonfly/v2/test/e2e/v2/util"
)

var _ = Describe("Clients go offline normally and abnormally", func() {
Context("scheduler clears peer metadata", func() {
It("number of hosts should be ok", Label("host", "leave"), func() {
grpcCredentials := insecure.NewCredentials()
schedulerClient, err := schedulerclient.GetV2ByAddr(context.Background(), ":8002", grpc.WithTransportCredentials(grpcCredentials))
Expect(err).NotTo(HaveOccurred())

response, err := schedulerClient.ListHosts(context.Background())
fmt.Println(response, err)
Expect(err).NotTo(HaveOccurred())

hostCount := len(response.Hosts)

podName, err := util.GetSeedClientPodName(3)
Expect(err).NotTo(HaveOccurred())

out, err := util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName, "--grace-period=30").CombinedOutput()
hostCount--
fmt.Println(string(out))
Expect(err).NotTo(HaveOccurred())

time.Sleep(30 * time.Second)
response, err = schedulerClient.ListHosts(context.Background())
fmt.Println(response, err)
Expect(err).NotTo(HaveOccurred())
Expect(len(response.Hosts)).To(Equal(hostCount))

podName, err = util.GetSeedClientPodName(4)
Expect(err).NotTo(HaveOccurred())

out, err = util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName, "--force", "--grace-period=0").CombinedOutput()
hostCount--
fmt.Println(string(out))
Expect(err).NotTo(HaveOccurred())

time.Sleep(10 * time.Minute)
response, err = schedulerClient.ListHosts(context.Background())
fmt.Println(response, err)
Expect(err).NotTo(HaveOccurred())
Expect(len(response.Hosts)).To(Equal(hostCount))
})
})
})
18 changes: 13 additions & 5 deletions test/e2e/v2/util/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,10 @@ func ClientExec() (*PodExec, error) {
}

func SeedClientExec(n int) (*PodExec, error) {
out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=seed-client",
"-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput()
podName, err := GetSeedClientPodName(n)
if err != nil {
return nil, err
}

podName := strings.Trim(string(out), "'")
fmt.Println(podName)
return NewPodExec(DragonflyNamespace, podName, "seed-client"), nil
}

Expand All @@ -137,3 +133,15 @@ func ManagerExec(n int) (*PodExec, error) {
fmt.Println(podName)
return NewPodExec(DragonflyNamespace, podName, "manager"), nil
}

func GetSeedClientPodName(n int) (string, error) {
out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=seed-client",
"-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput()
if err != nil {
return "", err
}

podName := strings.Trim(string(out), "'")
fmt.Println(podName)
return podName, nil
}
5 changes: 4 additions & 1 deletion test/testdata/charts/config-v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ scheduler:
limits:
cpu: "2"
memory: "4Gi"
service:
type: NodePort
nodePort: 30802
extraVolumeMounts:
- name: logs
mountPath: "/var/log/"
Expand All @@ -57,7 +60,7 @@ scheduler:

seedClient:
enable: true
replicas: 3
replicas: 5
image:
repository: dragonflyoss/client
tag: latest
Expand Down
3 changes: 3 additions & 0 deletions test/testdata/kind/config-v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ nodes:
- containerPort: 4003
hostPort: 4003
protocol: TCP
- containerPort: 30802
hostPort: 8002
protocol: TCP
extraMounts:
- hostPath: ./test/testdata/containerd/config-v2.toml
containerPath: /etc/containerd/config.toml
Expand Down

0 comments on commit a86b706

Please sign in to comment.