diff --git a/internal/dflog/logger.go b/internal/dflog/logger.go index 54c97da1c86..8ef75d32f08 100644 --- a/internal/dflog/logger.go +++ b/internal/dflog/logger.go @@ -141,6 +141,12 @@ func WithTask(taskID, url string) *SugaredLoggerOnWith { } } +func WithPersistentCacheTask(taskID string) *SugaredLoggerOnWith { + return &SugaredLoggerOnWith{ + withArgs: []any{"taskID", taskID}, + } +} + func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith { return &SugaredLoggerOnWith{ withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip}, diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index eb65c858e0f..5f50f2dba3f 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -41,6 +41,18 @@ const ( // SchedulersNamespace prefix of schedulers namespace cache key. SchedulersNamespace = "schedulers" + // SchedulerClustersNamespace prefix of scheduler clusters namespace cache key. + SchedulerClustersNamespace = "scheduler-clusters" + + // TasksNamespace prefix of tasks namespace cache key. + PersistentCacheTasksNamespace = "persistent-cache-tasks" + + // PersistentCachePeersNamespace prefix of persistent cache peers namespace cache key. + PersistentCachePeersNamespace = "persistent-cache-peers" + + // PersistentCacheHostsNamespace prefix of persistent cache hosts namespace cache key. + PersistentCacheHostsNamespace = "persistent-cache-hosts" + // ApplicationsNamespace prefix of applications namespace cache key. ApplicationsNamespace = "applications" @@ -137,6 +149,35 @@ func MakeKeyInScheduler(namespace, id string) string { return fmt.Sprintf("%s:%s", MakeNamespaceKeyInScheduler(namespace), id) } +// MakeSchedulerClusterKeyInManager make scheduler cluster key in manager. +func MakePersistentCacheTaskKeyInScheduler(schedulerClusterID uint, taskID string) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID)) +} + +// MakePersistentCacheTasksInScheduler make persistent cache tasks in scheduler. +func MakePersistentCacheTasksInScheduler(schedulerClusterID uint) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheTasksNamespace)) +} + +// MakePersistentCachePeerKeyInScheduler make persistent cache peer key in scheduler. +func MakePersistentCachePeerKeyInScheduler(schedulerClusterID uint, peerID string) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID)) +} + +func MakePersistentCachePeersInScheduler(schedulerClusterID uint) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCachePeersNamespace)) +} + +// MakePersistentCacheHostKeyInScheduler make persistent cache host key in scheduler. +func MakePersistentCacheHostKeyInScheduler(schedulerClusterID uint, hostID string) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheHostsNamespace, hostID)) +} + +// MakePersistentCacheHostsInScheduler make persistent cache hosts in scheduler. +func MakePersistentCacheHostsInScheduler(schedulerClusterID uint) string { + return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheHostsNamespace)) +} + // MakeNetworkTopologyKeyInScheduler make network topology key in scheduler. func MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID string) string { return MakeKeyInScheduler(NetworkTopologyNamespace, fmt.Sprintf("%s:%s", srcHostID, destHostID)) diff --git a/scheduler/resource/persistentcache/task.go b/scheduler/resource/persistentcache/task.go new file mode 100644 index 00000000000..f945734f338 --- /dev/null +++ b/scheduler/resource/persistentcache/task.go @@ -0,0 +1,144 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package persistentcache + +import ( + "context" + "time" + + "github.com/looplab/fsm" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/digest" +) + +const ( + // Task has been created but did not start uploading. + TaskStatePending = "Pending" + + // Task is uploading resources for p2p cluster. + TaskStateUploading = "Uploading" + + // Task has been uploaded successfully. + TaskStateSucceeded = "Succeeded" + + // Task has been uploaded failed. + TaskStateFailed = "Failed" +) + +const ( + // Task is uploading. + TaskEventUpload = "Upload" + + // Task uploaded successfully. + TaskEventUploadSucceeded = "UploadSucceeded" + + // Task uploaded failed. + TaskEventUploadFailed = "UploadFailed" +) + +// Task contains content for persistent cache task. +type Task struct { + // ID is task id. + ID string + + // Replica count of the persistent cache task. The persistent cache task will + // not be deleted when dfdamon runs garbage collection. It only be deleted + // when the task is deleted by the user. + PersistentReplicaCount uint64 + + // Replica count of the cache task. If cache task is not persistent, + // the persistent cache task will be deleted when dfdaemon runs garbage collection. + ReplicaCount uint64 + + // Digest of the persistent cache task content, for example md5:xxx or sha256:yyy. + Digest *digest.Digest + + // Tag is used to distinguish different persistent cache tasks. + Tag string + + // Application of persistent cache task. + Application string + + // Persistet cache task piece length. + PieceLength int32 + + // ContentLength is persistent cache task total content length. + ContentLength int64 + + // TotalPieceCount is total piece count. + TotalPieceCount int32 + + // Persistent cache task state machine. + FSM *fsm.FSM + + // TTL is persistent cache task time to live. + TTL time.Duration + + // CreatedAt is persistent cache task create time. + CreatedAt time.Time + + // UpdatedAt is persistent cache task update time. + UpdatedAt time.Time + + // Persistent cache task log. + Log *logger.SugaredLoggerOnWith +} + +// New persistent cache task instance. +func NewTask(id, tag, application, state string, persistentReplicaCount uint64, replicaCount uint64, pieceLength int32, + contentLength int64, totalPieceCount int32, digest *digest.Digest, ttl time.Duration, createdAt, updatedAt time.Time, + log *logger.SugaredLoggerOnWith) *Task { + t := &Task{ + ID: id, + PersistentReplicaCount: persistentReplicaCount, + ReplicaCount: replicaCount, + Digest: digest, + Tag: tag, + Application: application, + ContentLength: contentLength, + TotalPieceCount: totalPieceCount, + TTL: time.Hour * 24, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + Log: logger.WithPersistentCacheTask(id), + } + + // Initialize state machine. + t.FSM = fsm.NewFSM( + TaskStatePending, + fsm.Events{ + {Name: TaskEventUpload, Src: []string{TaskStatePending, TaskStateFailed}, Dst: TaskStateUploading}, + {Name: TaskEventUploadSucceeded, Src: []string{TaskStateUploading}, Dst: TaskStateSucceeded}, + {Name: TaskEventUploadFailed, Src: []string{TaskStateUploading}, Dst: TaskStateFailed}, + }, + fsm.Callbacks{ + TaskEventUpload: func(ctx context.Context, e *fsm.Event) { + t.Log.Infof("task state is %s", e.FSM.Current()) + }, + TaskEventUploadSucceeded: func(ctx context.Context, e *fsm.Event) { + t.Log.Infof("task state is %s", e.FSM.Current()) + }, + TaskEventUploadFailed: func(ctx context.Context, e *fsm.Event) { + t.Log.Infof("task state is %s", e.FSM.Current()) + }, + }, + ) + t.FSM.SetState(state) + + return t +} diff --git a/scheduler/resource/persistentcache/task_manager.go b/scheduler/resource/persistentcache/task_manager.go new file mode 100644 index 00000000000..6274c66a366 --- /dev/null +++ b/scheduler/resource/persistentcache/task_manager.go @@ -0,0 +1,215 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination task_manager_mock.go -source task_manager.go -package persistentcache + +package persistentcache + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/redis/go-redis/v9" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/digest" + pkgredis "d7y.io/dragonfly/v2/pkg/redis" + "d7y.io/dragonfly/v2/scheduler/config" +) + +// TaskManager is the interface used for persistent cache task manager. +type TaskManager interface { + // Load returns persistent cache task for a key. + Load(context.Context, string) (*Task, bool) + + // Store sets persistent cache task. + Store(context.Context, *Task) error + + // Delete deletes persistent cache task for a key. + Delete(context.Context, string) + + // LoadAll returns all persistent cache tasks. + LoadAll(context.Context) ([]*Task, error) +} + +// taskManager contains content for persistent cache task manager. +type taskManager struct { + // Config is scheduler config. + config *config.Config + + // Redis universal client interface. + rdb redis.UniversalClient +} + +// TODO: Use newTaskManager for resource management. +// New persistent cache task manager interface. +// nolint +func newTaskManager(cfg *config.Config, rdb redis.UniversalClient) TaskManager { + return &taskManager{config: cfg, rdb: rdb} +} + +// Load returns persistent cache task for a key. +func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { + rawTask, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result() + if err != nil { + fmt.Println("getting task failed from Redis:", err) + return nil, false + } + + // Set integer fields from raw task. + persistentReplicaCount, err := strconv.ParseUint(rawTask["persistent_replica_count"], 10, 64) + if err != nil { + fmt.Println("parsing persistent replica count failed:", err) + return nil, false + } + + replicaCount, err := strconv.ParseUint(rawTask["replica_count"], 10, 64) + if err != nil { + fmt.Println("parsing replica count failed:", err) + return nil, false + } + + pieceLength, err := strconv.ParseInt(rawTask["piece_length"], 10, 32) + if err != nil { + fmt.Println("parsing piece length failed:", err) + return nil, false + } + + contentLength, err := strconv.ParseInt(rawTask["content_length"], 10, 64) + if err != nil { + fmt.Println("parsing content length failed:", err) + return nil, false + } + + totalPieceCount, err := strconv.ParseInt(rawTask["total_piece_count"], 10, 32) + if err != nil { + fmt.Println("parsing total piece count failed:", err) + return nil, false + } + + // Set time fields from raw task. + ttl, err := strconv.Atoi(rawTask["ttl"]) + if err != nil { + fmt.Println("parsing ttl failed:", err) + return nil, false + } + + createdAt, err := time.Parse(time.RFC3339, rawTask["created_at"]) + if err != nil { + fmt.Println("parsing created at failed:", err) + return nil, false + } + + updatedAt, err := time.Parse(time.RFC3339, rawTask["updated_at"]) + if err != nil { + fmt.Println("parsing updated at failed:", err) + return nil, false + } + + // Set digest from raw task. + digest, err := digest.Parse(rawTask["digest"]) + if err != nil { + fmt.Println("parsing digest failed:", err) + return nil, false + } + + return NewTask( + rawTask["id"], + rawTask["tag"], + rawTask["application"], + rawTask["state"], + persistentReplicaCount, + replicaCount, + int32(pieceLength), + contentLength, + int32(totalPieceCount), + digest, + time.Duration(ttl), + createdAt, + updatedAt, + logger.WithPersistentCacheTask(rawTask["id"]), + ), true +} + +// Store sets task persistent cache task. +func (t *taskManager) Store(ctx context.Context, task *Task) error { + _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + t.rdb.HSet(ctx, + pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), + "id", task.ID, + "persistent_replica_count", task.PersistentReplicaCount, + "replica_count", task.ReplicaCount, + "digest", task.Digest.String(), + "tag", task.Tag, + "application", task.Application, + "piece_length", task.PieceLength, + "content_length", task.ContentLength, + "total_piece_count", task.TotalPieceCount, + "state", TaskStatePending, + "ttl", task.TTL, + "created_at", task.CreatedAt.Format(time.RFC3339), + "updated_at", task.UpdatedAt.Format(time.RFC3339)) + + t.rdb.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL) + return nil + }) + + return err +} + +// Delete deletes persistent cache task for a key. +func (t *taskManager) Delete(ctx context.Context, taskID string) { + t.rdb.Del(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)) +} + +// LoadAll returns all persistent cache tasks. +func (t *taskManager) LoadAll(ctx context.Context) ([]*Task, error) { + var ( + tasks []*Task + cursor uint64 + ) + + for { + var ( + taskKeys []string + err error + ) + + taskKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheTasksInScheduler(t.config.Manager.SchedulerClusterID), 10).Result() + if err != nil { + logger.Warn("scan tasks failed") + return nil, err + } + + for _, taskKey := range taskKeys { + task, loaded := t.Load(ctx, taskKey) + if !loaded { + logger.WithTaskID(taskKey).Warn("load task failed") + continue + } + + tasks = append(tasks, task) + } + + if cursor == 0 { + break + } + } + + return tasks, nil +} diff --git a/scheduler/resource/persistentcache/task_manager_mock.go b/scheduler/resource/persistentcache/task_manager_mock.go new file mode 100644 index 00000000000..5e2cbaed458 --- /dev/null +++ b/scheduler/resource/persistentcache/task_manager_mock.go @@ -0,0 +1,96 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: task_manager.go +// +// Generated by this command: +// +// mockgen -destination task_manager_mock.go -source task_manager.go -package persistentcache +// + +// Package persistentcache is a generated GoMock package. +package persistentcache + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockTaskManager is a mock of TaskManager interface. +type MockTaskManager struct { + ctrl *gomock.Controller + recorder *MockTaskManagerMockRecorder +} + +// MockTaskManagerMockRecorder is the mock recorder for MockTaskManager. +type MockTaskManagerMockRecorder struct { + mock *MockTaskManager +} + +// NewMockTaskManager creates a new mock instance. +func NewMockTaskManager(ctrl *gomock.Controller) *MockTaskManager { + mock := &MockTaskManager{ctrl: ctrl} + mock.recorder = &MockTaskManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskManager) EXPECT() *MockTaskManagerMockRecorder { + return m.recorder +} + +// Delete mocks base method. +func (m *MockTaskManager) Delete(arg0 context.Context, arg1 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Delete", arg0, arg1) +} + +// Delete indicates an expected call of Delete. +func (mr *MockTaskManagerMockRecorder) Delete(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockTaskManager)(nil).Delete), arg0, arg1) +} + +// Load mocks base method. +func (m *MockTaskManager) Load(arg0 context.Context, arg1 string) (*Task, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Load", arg0, arg1) + ret0, _ := ret[0].(*Task) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// Load indicates an expected call of Load. +func (mr *MockTaskManagerMockRecorder) Load(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockTaskManager)(nil).Load), arg0, arg1) +} + +// LoadAll mocks base method. +func (m *MockTaskManager) LoadAll(arg0 context.Context) ([]*Task, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadAll", arg0) + ret0, _ := ret[0].([]*Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadAll indicates an expected call of LoadAll. +func (mr *MockTaskManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockTaskManager)(nil).LoadAll), arg0) +} + +// Store mocks base method. +func (m *MockTaskManager) Store(arg0 context.Context, arg1 *Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Store", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Store indicates an expected call of Store. +func (mr *MockTaskManagerMockRecorder) Store(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Store", reflect.TypeOf((*MockTaskManager)(nil).Store), arg0, arg1) +} diff --git a/scheduler/resource/standard/resource.go b/scheduler/resource/standard/resource.go index 305982fa034..5507ce78657 100644 --- a/scheduler/resource/standard/resource.go +++ b/scheduler/resource/standard/resource.go @@ -19,7 +19,6 @@ package standard import ( - "github.com/redis/go-redis/v9" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -64,9 +63,6 @@ type resource struct { // Scheduler config. config *config.Config - // Redis universal client interface. - rdb redis.UniversalClient - // TransportCredentials stores the Authenticator required to setup a client connection. transportCredentials credentials.TransportCredentials } @@ -82,13 +78,6 @@ func WithTransportCredentials(creds credentials.TransportCredentials) Option { } } -// WithRedisClient returns a Option which configures the redis client. -func WithRedisClient(rdb redis.UniversalClient) Option { - return func(r *resource) { - r.rdb = rdb - } -} - // New returns Resource interface. func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, options ...Option) (Resource, error) { resource := &resource{config: cfg} diff --git a/scheduler/resource/standard/task.go b/scheduler/resource/standard/task.go index 2e819fe6dff..2cd5bd459d5 100644 --- a/scheduler/resource/standard/task.go +++ b/scheduler/resource/standard/task.go @@ -28,7 +28,6 @@ import ( commonv2 "d7y.io/api/v2/pkg/apis/common/v2" schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1" - schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/container/set" @@ -529,32 +528,3 @@ func (t *Task) ReportPieceResultToPeers(peerPacket *schedulerv1.PeerPacket, even } } } - -// AnnouncePeers announces all peers in the task with the state code. -// Used only in v2 version of the grpc. -func (t *Task) AnnouncePeers(resp *schedulerv2.AnnouncePeerResponse, event string) { - for _, vertex := range t.DAG.GetVertices() { - peer := vertex.Value - if peer == nil { - continue - } - - if peer.FSM.Is(PeerStateRunning) { - stream, loaded := peer.LoadAnnouncePeerStream() - if !loaded { - continue - } - - if err := stream.Send(resp); err != nil { - t.Log.Errorf("send response to peer %s failed: %s", peer.ID, err.Error()) - continue - } - t.Log.Infof("task announces peer %s response %#v", peer.ID, resp.Response) - - if err := peer.FSM.Event(context.Background(), event); err != nil { - peer.Log.Errorf("peer fsm event failed: %s", err.Error()) - continue - } - } - } -} diff --git a/scheduler/resource/standard/task_test.go b/scheduler/resource/standard/task_test.go index 3251e6768ff..c05cc06e066 100644 --- a/scheduler/resource/standard/task_test.go +++ b/scheduler/resource/standard/task_test.go @@ -28,8 +28,6 @@ import ( commonv2 "d7y.io/api/v2/pkg/apis/common/v2" schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1" v1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks" - schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2" - v2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks" "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/digest" @@ -1578,86 +1576,3 @@ func TestTask_ReportPieceResultToPeers(t *testing.T) { }) } } - -func TestTask_AnnouncePeers(t *testing.T) { - tests := []struct { - name string - run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) - }{ - { - name: "peer state is PeerStatePending", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { - mockPeer.FSM.SetState(PeerStatePending) - task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) - - assert := assert.New(t) - assert.True(mockPeer.FSM.Is(PeerStatePending)) - }, - }, - { - name: "peer state is PeerStateRunning and stream is empty", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { - mockPeer.FSM.SetState(PeerStateRunning) - task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) - - assert := assert.New(t) - assert.True(mockPeer.FSM.Is(PeerStateRunning)) - }, - }, - { - name: "peer state is PeerStateRunning and stream sending failed", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { - mockPeer.FSM.SetState(PeerStateRunning) - mockPeer.StoreAnnouncePeerStream(stream) - ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1) - - task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) - - assert := assert.New(t) - assert.True(mockPeer.FSM.Is(PeerStateRunning)) - }, - }, - { - name: "peer state is PeerStateRunning and state changing failed", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { - mockPeer.FSM.SetState(PeerStateRunning) - mockPeer.StoreAnnouncePeerStream(stream) - ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1) - - task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) - - assert := assert.New(t) - assert.True(mockPeer.FSM.Is(PeerStateRunning)) - }, - }, - { - name: "peer state is PeerStateRunning and announce peer successfully", - run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) { - mockPeer.FSM.SetState(PeerStateRunning) - mockPeer.StoreAnnouncePeerStream(stream) - ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(nil).Times(1) - - task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed) - - assert := assert.New(t) - assert.True(mockPeer.FSM.Is(PeerStateFailed)) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl) - - mockHost := NewHost( - mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, - mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit) - mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost) - task.StorePeer(mockPeer) - tc.run(t, task, mockPeer, stream, stream.EXPECT()) - }) - } -} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ce4f1ade356..5b84850f3e5 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -230,10 +230,6 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err resourceOptions = append(resourceOptions, resource.WithTransportCredentials(clientTransportCredentials)) } - if rdb != nil { - resourceOptions = append(resourceOptions, resource.WithRedisClient(rdb)) - } - resource, err := resource.New(cfg, s.gc, dynconfig, resourceOptions...) if err != nil { return nil, err