diff --git a/deploy/docker-compose/template/scheduler.template.yaml b/deploy/docker-compose/template/scheduler.template.yaml index c7e5d33c692..0eb27b47d52 100644 --- a/deploy/docker-compose/template/scheduler.template.yaml +++ b/deploy/docker-compose/template/scheduler.template.yaml @@ -61,7 +61,7 @@ scheduler: # then the task will also be reclaimed. taskGCInterval: 30m # hostGCInterval is the interval of host gc. - hostGCInterval: 6h + hostGCInterval: 5m # hostTTL is time to live of host. If host announces message to scheduler, # then HostTTl will be reset. hostTTL: 1h diff --git a/deploy/helm-charts b/deploy/helm-charts index 4322fe6ba5a..92a2470199a 160000 --- a/deploy/helm-charts +++ b/deploy/helm-charts @@ -1 +1 @@ -Subproject commit 4322fe6ba5ab7ac758e71afb70233bdd426ec759 +Subproject commit 92a2470199a00f785f01529f91a5e4db8813f389 diff --git a/pkg/rpc/scheduler/client/client_v1.go b/pkg/rpc/scheduler/client/client_v1.go index badbcf8e47f..00ad33603c7 100644 --- a/pkg/rpc/scheduler/client/client_v1.go +++ b/pkg/rpc/scheduler/client/client_v1.go @@ -90,7 +90,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt }, nil } -// GetV1ByAddr returns v2 version of the scheduler client by address. +// GetV1ByAddr returns v1 version of the scheduler client by address. func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) { conn, err := grpc.DialContext( ctx, diff --git a/pkg/rpc/scheduler/client/client_v2.go b/pkg/rpc/scheduler/client/client_v2.go index 588b98d7643..8b40990ec55 100644 --- a/pkg/rpc/scheduler/client/client_v2.go +++ b/pkg/rpc/scheduler/client/client_v2.go @@ -30,6 +30,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/protobuf/types/known/emptypb" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2" @@ -145,6 +146,9 @@ type V2 interface { // AnnounceHost announces host to scheduler. AnnounceHost(context.Context, *schedulerv2.AnnounceHostRequest, ...grpc.CallOption) error + // ListHosts lists hosts in scheduler. + ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) + // DeleteHost releases host in scheduler. DeleteHost(context.Context, *schedulerv2.DeleteHostRequest, ...grpc.CallOption) error @@ -253,6 +257,18 @@ func (v *v2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ return eg.Wait() } +// ListHosts lists host in all schedulers. +func (v *v2) ListHosts(ctx context.Context, opts ...grpc.CallOption) (*schedulerv2.ListHostsResponse, error) { + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + return v.SchedulerClient.ListHosts( + context.WithValue(ctx, pkgbalancer.ContextKey, ""), + new(emptypb.Empty), + opts..., + ) +} + // DeleteHost releases host in all schedulers. func (v *v2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest, opts ...grpc.CallOption) error { ctx, cancel := context.WithTimeout(ctx, contextTimeout) diff --git a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go index bf2e5bc7fd3..b499306d93e 100644 --- a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go +++ b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go @@ -152,6 +152,26 @@ func (mr *MockV2MockRecorder) DeleteTask(arg0, arg1 any, arg2 ...any) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockV2)(nil).DeleteTask), varargs...) } +// ListHosts mocks base method. +func (m *MockV2) ListHosts(ctx context.Context, opts ...grpc.CallOption) (*scheduler.ListHostsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListHosts", varargs...) + ret0, _ := ret[0].(*scheduler.ListHostsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListHosts indicates an expected call of ListHosts. +func (mr *MockV2MockRecorder) ListHosts(ctx any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListHosts", reflect.TypeOf((*MockV2)(nil).ListHosts), varargs...) +} + // StatPeer mocks base method. func (m *MockV2) StatPeer(arg0 context.Context, arg1 *scheduler.StatPeerRequest, arg2 ...grpc.CallOption) (*common.Peer, error) { m.ctrl.T.Helper() diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 01923b4a857..d41d7a88c77 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -224,6 +224,20 @@ var ( }, []string{"os", "platform", "platform_family", "platform_version", "kernel_version", "git_version", "git_commit", "go_version", "build_platform"}) + ListHostsCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "list_hosts_total", + Help: "Counter of the number of the list hosts.", + }) + + ListHostsCountFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "list_hosts_failure_total", + Help: "Counter of the number of failed of the list hosts.", + }) + LeaveHostCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index a476bd7acd7..271e5bf9585 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -138,10 +138,18 @@ func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.A return new(emptypb.Empty), nil } -// TODO Implement the following methods. // ListHosts lists hosts in scheduler. func (s *schedulerServerV2) ListHosts(ctx context.Context, _ *emptypb.Empty) (*schedulerv2.ListHostsResponse, error) { - return nil, nil + // Collect ListHostsCount metrics. + metrics.ListHostsCount.Inc() + resp, err := s.service.ListHosts(ctx) + if err != nil { + // Collect ListHostsFailureCount metrics. + metrics.ListHostsCountFailureCount.Inc() + return nil, err + } + + return resp, nil } // DeleteHost releases host in scheduler. diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index da9ffd221ad..365b22d7feb 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -693,6 +693,84 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ return nil } +// ListHosts lists hosts in scheduler. +func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, error) { + hosts := v.resource.HostManager().LoadAll() + + resHosts := make([]*commonv2.Host, len(hosts)) + for i, host := range hosts { + resHosts[i] = &commonv2.Host{ + Id: host.ID, + Type: uint32(host.Type), + Hostname: host.Hostname, + Ip: host.IP, + Port: host.Port, + DownloadPort: host.DownloadPort, + Os: host.OS, + Platform: host.Platform, + PlatformFamily: host.PlatformFamily, + PlatformVersion: host.PlatformVersion, + KernelVersion: host.KernelVersion, + Cpu: &commonv2.CPU{ + LogicalCount: host.CPU.LogicalCount, + PhysicalCount: host.CPU.PhysicalCount, + Percent: host.CPU.Percent, + ProcessPercent: host.CPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: host.CPU.Times.User, + System: host.CPU.Times.System, + Idle: host.CPU.Times.Idle, + Nice: host.CPU.Times.Nice, + Iowait: host.CPU.Times.Iowait, + Irq: host.CPU.Times.Irq, + Softirq: host.CPU.Times.Softirq, + Steal: host.CPU.Times.Steal, + Guest: host.CPU.Times.Guest, + GuestNice: host.CPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: host.Memory.Total, + Available: host.Memory.Available, + Used: host.Memory.Used, + UsedPercent: host.Memory.UsedPercent, + ProcessUsedPercent: host.Memory.ProcessUsedPercent, + Free: host.Memory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: host.Network.TCPConnectionCount, + UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount, + Location: &host.Network.Location, + Idc: &host.Network.IDC, + }, + Disk: &commonv2.Disk{ + Total: host.Disk.Total, + Free: host.Disk.Free, + Used: host.Disk.Used, + UsedPercent: host.Disk.UsedPercent, + InodesTotal: host.Disk.InodesTotal, + InodesUsed: host.Disk.InodesUsed, + InodesFree: host.Disk.InodesFree, + InodesUsedPercent: host.Disk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: host.Build.GitVersion, + GitCommit: &host.Build.GitCommit, + GoVersion: &host.Build.GoVersion, + Platform: &host.Build.Platform, + }, + SchedulerClusterId: host.SchedulerClusterID, + DisableShared: host.DisableShared, + } + } + + resp := &schedulerv2.ListHostsResponse{ + Hosts: resHosts, + } + + return resp, nil +} + // DeleteHost releases host in scheduler. func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest) error { log := logger.WithHostID(req.GetHostId()) diff --git a/test/e2e/v2/leave_host_test.go b/test/e2e/v2/leave_host_test.go new file mode 100644 index 00000000000..0e29fa793f7 --- /dev/null +++ b/test/e2e/v2/leave_host_test.go @@ -0,0 +1,75 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" //nolint + . "github.com/onsi/gomega" //nolint + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" + "d7y.io/dragonfly/v2/test/e2e/v2/util" +) + +var _ = Describe("Clients go offline normally and abnormally", func() { + Context("scheduler clears peer metadata", func() { + It("number of hosts should be ok", Label("host", "leave"), func() { + grpcCredentials := insecure.NewCredentials() + schedulerClient, err := schedulerclient.GetV2ByAddr(context.Background(), ":8002", grpc.WithTransportCredentials(grpcCredentials)) + Expect(err).NotTo(HaveOccurred()) + + response, err := schedulerClient.ListHosts(context.Background()) + fmt.Println(response, err) + Expect(err).NotTo(HaveOccurred()) + + hostCount := len(response.Hosts) + + podName, err := util.GetSeedClientPodName(3) + Expect(err).NotTo(HaveOccurred()) + + out, err := util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName, "--grace-period=30").CombinedOutput() + hostCount-- + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(30 * time.Second) + response, err = schedulerClient.ListHosts(context.Background()) + fmt.Println(response, err) + Expect(err).NotTo(HaveOccurred()) + Expect(len(response.Hosts)).To(Equal(hostCount)) + + podName, err = util.GetSeedClientPodName(4) + Expect(err).NotTo(HaveOccurred()) + + out, err = util.KubeCtlCommand("-n", util.DragonflyNamespace, "delete", "pod", podName, "--force", "--grace-period=0").CombinedOutput() + hostCount-- + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(10 * time.Minute) + response, err = schedulerClient.ListHosts(context.Background()) + fmt.Println(response, err) + Expect(err).NotTo(HaveOccurred()) + Expect(len(response.Hosts)).To(Equal(hostCount)) + }) + }) +}) diff --git a/test/e2e/v2/util/exec.go b/test/e2e/v2/util/exec.go index 1d69789e163..7188452a213 100644 --- a/test/e2e/v2/util/exec.go +++ b/test/e2e/v2/util/exec.go @@ -115,14 +115,10 @@ func ClientExec() (*PodExec, error) { } func SeedClientExec(n int) (*PodExec, error) { - out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=seed-client", - "-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput() + podName, err := GetSeedClientPodName(n) if err != nil { return nil, err } - - podName := strings.Trim(string(out), "'") - fmt.Println(podName) return NewPodExec(DragonflyNamespace, podName, "seed-client"), nil } @@ -137,3 +133,15 @@ func ManagerExec(n int) (*PodExec, error) { fmt.Println(podName) return NewPodExec(DragonflyNamespace, podName, "manager"), nil } + +func GetSeedClientPodName(n int) (string, error) { + out, err := KubeCtlCommand("-n", DragonflyNamespace, "get", "pod", "-l", "component=seed-client", + "-o", fmt.Sprintf("jsonpath='{range .items[%d]}{.metadata.name}{end}'", n)).CombinedOutput() + if err != nil { + return "", err + } + + podName := strings.Trim(string(out), "'") + fmt.Println(podName) + return podName, nil +} diff --git a/test/testdata/charts/config-v2.yaml b/test/testdata/charts/config-v2.yaml index 230e4ba8e7d..9b270718b1a 100644 --- a/test/testdata/charts/config-v2.yaml +++ b/test/testdata/charts/config-v2.yaml @@ -38,6 +38,9 @@ scheduler: limits: cpu: "2" memory: "4Gi" + service: + type: NodePort + nodePort: 30802 extraVolumeMounts: - name: logs mountPath: "/var/log/" @@ -57,7 +60,7 @@ scheduler: seedClient: enable: true - replicas: 3 + replicas: 5 image: repository: dragonflyoss/client tag: latest diff --git a/test/testdata/kind/config-v2.yaml b/test/testdata/kind/config-v2.yaml index 644e910e230..030b79c88b0 100644 --- a/test/testdata/kind/config-v2.yaml +++ b/test/testdata/kind/config-v2.yaml @@ -12,6 +12,9 @@ nodes: - containerPort: 4003 hostPort: 4003 protocol: TCP + - containerPort: 30802 + hostPort: 8002 + protocol: TCP extraMounts: - hostPath: ./test/testdata/containerd/config-v2.toml containerPath: /etc/containerd/config.toml