From aa78396155dacd085f661ff9f79376aa20f62538 Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 29 Nov 2024 21:13:12 +0800 Subject: [PATCH] feat: optimize implement of the sync peers (#3677) Signed-off-by: Gaius --- .github/workflows/compatibility-e2e-v2.yml | 8 +- api/manager/docs.go | 13 + api/manager/swagger.json | 13 + api/manager/swagger.yaml | 12 + client-rs | 2 +- go.mod | 2 +- go.sum | 4 +- manager/config/config.go | 6 +- manager/config/config_test.go | 20 +- manager/config/constants.go | 3 +- manager/config/testdata/manager.yaml | 1 + manager/handlers/job.go | 8 +- manager/job/job.go | 2 +- manager/job/mocks/sync_peers_mock.go | 14 +- manager/job/sync_peers.go | 331 +++++++++------------ manager/job/sync_peers_test.go | 118 -------- manager/service/job.go | 101 ++++--- manager/service/mocks/service_mock.go | 7 +- manager/service/service.go | 2 +- manager/types/job.go | 123 ++++++-- pkg/container/slice/slice.go | 44 --- pkg/container/slice/slice_test.go | 102 ------- pkg/digest/digest.go | 2 +- scheduler/service/service_v1.go | 8 +- scheduler/service/service_v1_test.go | 22 ++ scheduler/service/service_v2.go | 8 +- scheduler/service/service_v2_test.go | 4 + 27 files changed, 409 insertions(+), 571 deletions(-) delete mode 100644 manager/job/sync_peers_test.go delete mode 100644 pkg/container/slice/slice.go delete mode 100644 pkg/container/slice/slice_test.go diff --git a/.github/workflows/compatibility-e2e-v2.yml b/.github/workflows/compatibility-e2e-v2.yml index 778aacc071f..9bdbc61e155 100644 --- a/.github/workflows/compatibility-e2e-v2.yml +++ b/.github/workflows/compatibility-e2e-v2.yml @@ -31,19 +31,19 @@ jobs: include: - module: manager image: manager - image-tag: v2.1.60 + image-tag: v2.1.65 chart-name: manager - module: scheduler image: scheduler - image-tag: v2.1.60 + image-tag: v2.1.65 chart-name: scheduler - module: client image: client - image-tag: v0.1.115 + image-tag: v0.1.119 chart-name: client - module: seed-client image: client - image-tag: v0.1.115 + image-tag: v0.1.119 chart-name: seed-client steps: diff --git a/api/manager/docs.go b/api/manager/docs.go index 50d21d177e6..81e8cc1b2c1 100644 --- a/api/manager/docs.go +++ b/api/manager/docs.go @@ -4202,28 +4202,34 @@ const docTemplate = `{ ], "properties": { "args": { + "description": "Args is the arguments of the job.", "type": "object", "additionalProperties": {} }, "bio": { + "description": "BIO is the description of the job.", "type": "string" }, "scheduler_cluster_ids": { + "description": "SchedulerClusterIDs is the scheduler cluster ids of the job.", "type": "array", "items": { "type": "integer" } }, "seed_peer_cluster_ids": { + "description": "SeedPeerClusterIDs is the seed peer cluster ids of the job.", "type": "array", "items": { "type": "integer" } }, "type": { + "description": "Type is the type of the job.", "type": "string" }, "user_id": { + "description": "UserID is the user id of the job.", "type": "integer" } } @@ -4682,6 +4688,11 @@ const docTemplate = `{ "type": "integer", "maximum": 1000, "minimum": 10 + }, + "job_rate_limit": { + "type": "integer", + "maximum": 1000000, + "minimum": 1 } } }, @@ -4881,9 +4892,11 @@ const docTemplate = `{ "type": "object", "properties": { "bio": { + "description": "BIO is the description of the job.", "type": "string" }, "user_id": { + "description": "UserID is the user id of the job.", "type": "integer" } } diff --git a/api/manager/swagger.json b/api/manager/swagger.json index 63997b5a11f..917c40d9b5e 100644 --- a/api/manager/swagger.json +++ b/api/manager/swagger.json @@ -4196,28 +4196,34 @@ ], "properties": { "args": { + "description": "Args is the arguments of the job.", "type": "object", "additionalProperties": {} }, "bio": { + "description": "BIO is the description of the job.", "type": "string" }, "scheduler_cluster_ids": { + "description": "SchedulerClusterIDs is the scheduler cluster ids of the job.", "type": "array", "items": { "type": "integer" } }, "seed_peer_cluster_ids": { + "description": "SeedPeerClusterIDs is the seed peer cluster ids of the job.", "type": "array", "items": { "type": "integer" } }, "type": { + "description": "Type is the type of the job.", "type": "string" }, "user_id": { + "description": "UserID is the user id of the job.", "type": "integer" } } @@ -4676,6 +4682,11 @@ "type": "integer", "maximum": 1000, "minimum": 10 + }, + "job_rate_limit": { + "type": "integer", + "maximum": 1000000, + "minimum": 1 } } }, @@ -4875,9 +4886,11 @@ "type": "object", "properties": { "bio": { + "description": "BIO is the description of the job.", "type": "string" }, "user_id": { + "description": "UserID is the user id of the job.", "type": "integer" } } diff --git a/api/manager/swagger.yaml b/api/manager/swagger.yaml index 3d281176b0f..996c0a74216 100644 --- a/api/manager/swagger.yaml +++ b/api/manager/swagger.yaml @@ -461,20 +461,26 @@ definitions: properties: args: additionalProperties: {} + description: Args is the arguments of the job. type: object bio: + description: BIO is the description of the job. type: string scheduler_cluster_ids: + description: SchedulerClusterIDs is the scheduler cluster ids of the job. items: type: integer type: array seed_peer_cluster_ids: + description: SeedPeerClusterIDs is the seed peer cluster ids of the job. items: type: integer type: array type: + description: Type is the type of the job. type: string user_id: + description: UserID is the user id of the job. type: integer required: - type @@ -792,6 +798,10 @@ definitions: maximum: 1000 minimum: 10 type: integer + job_rate_limit: + maximum: 1000000 + minimum: 1 + type: integer type: object d7y_io_dragonfly_v2_manager_types.SchedulerClusterScopes: properties: @@ -925,8 +935,10 @@ definitions: d7y_io_dragonfly_v2_manager_types.UpdateJobRequest: properties: bio: + description: BIO is the description of the job. type: string user_id: + description: UserID is the user id of the job. type: integer type: object d7y_io_dragonfly_v2_manager_types.UpdateOauthRequest: diff --git a/client-rs b/client-rs index ee21989120b..9fd770ffab3 160000 --- a/client-rs +++ b/client-rs @@ -1 +1 @@ -Subproject commit ee21989120b16db4e8456e7d163f9ba06aad98c9 +Subproject commit 9fd770ffab34893da27ab40c6febe1125d6c077c diff --git a/go.mod b/go.mod index 25ed66ef2ca..fc4c128842f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.23.0 require ( - d7y.io/api/v2 v2.0.175 + d7y.io/api/v2 v2.0.177 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.8 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index abfdcfbebc2..8c27f5abe90 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -d7y.io/api/v2 v2.0.175 h1:yE1FeYnBEK/geHmDJbqXB0pUXtPBtqk9E7xijIVh0AA= -d7y.io/api/v2 v2.0.175/go.mod h1:+l4ErhthKmcIhcRU6F01Km8q+yDyICF7JImefg0t6HY= +d7y.io/api/v2 v2.0.177 h1:iC+Jm4n7lKs3N1JIO25XOdtELbKSlis85LEoGbYlp98= +d7y.io/api/v2 v2.0.177/go.mod h1:+l4ErhthKmcIhcRU6F01Km8q+yDyICF7JImefg0t6HY= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/manager/config/config.go b/manager/config/config.go index 4760ff459cd..306ff33f4c0 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -339,7 +339,7 @@ type SyncPeersConfig struct { // Timeout is the timeout for syncing peers information from the single scheduler. Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` - // BatchSize is the batch size when operating gorm. + // BatchSize is the batch size when operating gorm database. BatchSize int `yaml:"batchSize" mapstructure:"batchSize"` } @@ -641,6 +641,10 @@ func (cfg *Config) Validate() error { return errors.New("syncPeers requires parameter timeout") } + if cfg.Job.SyncPeers.BatchSize == 0 { + return errors.New("syncPeers requires parameter batchSize") + } + if cfg.ObjectStorage.Enable { if cfg.ObjectStorage.Name == "" { return errors.New("objectStorage requires parameter name") diff --git a/manager/config/config_test.go b/manager/config/config_test.go index 4ca6989b72a..352e4b9b048 100644 --- a/manager/config/config_test.go +++ b/manager/config/config_test.go @@ -188,8 +188,9 @@ func TestConfig_Load(t *testing.T) { }, }, SyncPeers: SyncPeersConfig{ - Interval: 13 * time.Hour, - Timeout: 2 * time.Minute, + Interval: 13 * time.Hour, + Timeout: 2 * time.Minute, + BatchSize: 50, }, }, ObjectStorage: ObjectStorageConfig{ @@ -809,6 +810,21 @@ func TestConfig_Validate(t *testing.T) { assert.EqualError(err, "syncPeers requires parameter timeout") }, }, + { + name: "syncPeers requires parameter batchSize", + config: New(), + mock: func(cfg *Config) { + cfg.Auth.JWT = mockJWTConfig + cfg.Database.Type = DatabaseTypeMysql + cfg.Database.Mysql = mockMysqlConfig + cfg.Database.Redis = mockRedisConfig + cfg.Job.SyncPeers.BatchSize = 0 + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "syncPeers requires parameter batchSize") + }, + }, { name: "objectStorage requires parameter name", config: New(), diff --git a/manager/config/constants.go b/manager/config/constants.go index 79f9a2d9ec6..5bda9c5af06 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -108,7 +108,8 @@ const ( // DefaultClusterJobRateLimit is default rate limit(requests per second) for job Open API by cluster. DefaultClusterJobRateLimit = 10 - // DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler. + // DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler and + // operating on the database. DefaultJobSyncPeersBatchSize = 500 ) diff --git a/manager/config/testdata/manager.yaml b/manager/config/testdata/manager.yaml index 5ea161eb160..3efde7bb4fb 100644 --- a/manager/config/testdata/manager.yaml +++ b/manager/config/testdata/manager.yaml @@ -79,6 +79,7 @@ job: syncPeers: interval: 13h timeout: 2m + batchSize: 50 objectStorage: enable: true diff --git a/manager/handlers/job.go b/manager/handlers/job.go index 4172948eae1..0e8f7be5be2 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -70,13 +70,15 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } - job, err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json) - if err != nil { + // CreateSyncPeersJob is a sync operation, so don't need to return the job id, + // and not record the job information in the database. If return success, need to + // query the peers table to get the latest data. + if err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json); err != nil { ctx.Error(err) // nolint: errcheck return } - ctx.JSON(http.StatusOK, job) + ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK)) case job.GetTaskJob: var json types.CreateGetTaskJobRequest if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { diff --git a/manager/job/job.go b/manager/job/job.go index f652dd5e741..45e34516379 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -30,7 +30,7 @@ import ( ) // DefaultTaskPollingInterval is the default interval for polling task. -const DefaultTaskPollingInterval = 5 * time.Second +const DefaultTaskPollingInterval = 10 * time.Second // tracer is a global tracer for job. var tracer = otel.Tracer("manager") diff --git a/manager/job/mocks/sync_peers_mock.go b/manager/job/mocks/sync_peers_mock.go index 49f2ace24c1..ea4ab3159de 100644 --- a/manager/job/mocks/sync_peers_mock.go +++ b/manager/job/mocks/sync_peers_mock.go @@ -13,7 +13,7 @@ import ( context "context" reflect "reflect" - job "d7y.io/dragonfly/v2/manager/job" + models "d7y.io/dragonfly/v2/manager/models" gomock "go.uber.org/mock/gomock" ) @@ -41,18 +41,18 @@ func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder { return m.recorder } -// Run mocks base method. -func (m *MockSyncPeers) Run(arg0 context.Context, arg1 job.SyncPeersArgs) error { +// CreateSyncPeers mocks base method. +func (m *MockSyncPeers) CreateSyncPeers(arg0 context.Context, arg1 []models.Scheduler) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", arg0, arg1) + ret := m.ctrl.Call(m, "CreateSyncPeers", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// Run indicates an expected call of Run. -func (mr *MockSyncPeersMockRecorder) Run(arg0, arg1 any) *gomock.Call { +// CreateSyncPeers indicates an expected call of CreateSyncPeers. +func (mr *MockSyncPeersMockRecorder) CreateSyncPeers(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSyncPeers", reflect.TypeOf((*MockSyncPeers)(nil).CreateSyncPeers), arg0, arg1) } // Serve mocks base method. diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index 9862a8b5a40..7b53773e655 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -20,6 +20,7 @@ package job import ( "context" + "errors" "fmt" "sync" "time" @@ -28,22 +29,22 @@ import ( "github.com/google/uuid" "go.opentelemetry.io/otel/trace" "gorm.io/gorm" - "gorm.io/gorm/clause" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" - "d7y.io/dragonfly/v2/pkg/container/slice" "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/types" + pkgtypes "d7y.io/dragonfly/v2/pkg/types" resource "d7y.io/dragonfly/v2/scheduler/resource/standard" ) // SyncPeers is an interface for sync peers. type SyncPeers interface { - // Run execute action to sync peers, which is async. - Run(context.Context, SyncPeersArgs) error + // CreateSyncPeers creates sync peers job, and merge the sync peer results with the data + // in the peer table in the database. It is a synchronous operation, and it will returns + // an error if the sync peers job is failed. + CreateSyncPeers(context.Context, []models.Scheduler) error // Serve started sync peers server. Serve() @@ -57,70 +58,84 @@ type syncPeers struct { config *config.Config job *internaljob.Job db *gorm.DB + mu *sync.Mutex done chan struct{} - - syncLocker sync.Mutex - workChan chan SyncPeersArgs -} - -type SyncPeersArgs struct { - CandidateSchedulerClusters []models.SchedulerCluster - TaskID string } // newSyncPeers returns a new SyncPeers. func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) { return &syncPeers{ - config: cfg, - db: gdb, - job: job, - done: make(chan struct{}), - workChan: make(chan SyncPeersArgs, 10), - syncLocker: sync.Mutex{}, + config: cfg, + job: job, + db: gdb, + mu: &sync.Mutex{}, + done: make(chan struct{}), }, nil } -// Run start to sync peers. -func (s *syncPeers) Run(ctx context.Context, args SyncPeersArgs) error { - if len(args.CandidateSchedulerClusters) == 0 { - if err := s.db.WithContext(ctx).Find(&args.CandidateSchedulerClusters).Error; err != nil { - return fmt.Errorf("failed to get candidate scheduler clusters: %v", err) +// CreateSyncPeers creates sync peers job. +func (s *syncPeers) CreateSyncPeers(ctx context.Context, schedulers []models.Scheduler) error { + // Avoid running multiple sync peers jobs at the same time. + if !s.mu.TryLock() { + return errors.New("sync peers job is running") + } + defer s.mu.Unlock() + + // Send sync peer requests to all available schedulers, and merge the sync peer results + // with the data in the peer table in the database. + for _, scheduler := range schedulers { + log := logger.WithScheduler(scheduler.Hostname, scheduler.IP, uint64(scheduler.SchedulerClusterID)) + + // Send sync peer request to scheduler. + results, err := s.createSyncPeers(ctx, scheduler) + if err != nil { + log.Error(err) + continue } + log.Infof("sync peers count is %d", len(results)) + + // Merge sync peer results with the data in the peer table. + s.mergePeers(ctx, scheduler, results, log) } - s.workChan <- args return nil } // Serve started sync peers server. func (s *syncPeers) Serve() { - ticker := time.NewTicker(s.config.Job.SyncPeers.Interval) - defer ticker.Stop() + tick := time.NewTicker(s.config.Job.SyncPeers.Interval) + defer tick.Stop() + for { select { - case <-ticker.C: - logger.Debugf("start to sync peers periodically") - if err := s.syncPeers(context.Background(), nil); err != nil { - logger.Errorf("sync peers failed periodically: %v", err) - } - case args := <-s.workChan: - logger.Debugf("start to sync peers for request") - err := s.syncPeers(context.Background(), args.CandidateSchedulerClusters) - if err != nil { - logger.Errorf("sync peers failed for request: %v", err) + case <-tick.C: + ctx, cancel := context.WithTimeout(context.Background(), s.config.Job.SyncPeers.Timeout) + defer cancel() + + // Find all of the scheduler clusters that has active schedulers. + var schedulerClusters []models.SchedulerCluster + if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { + logger.Errorf("sync peers find scheduler clusters failed: %v", err) } - if args.TaskID != "" { - job := models.Job{} - state := machineryv1tasks.StateFailure - if err == nil { - state = machineryv1tasks.StateSuccess - } - if updateErr := s.db.WithContext(context.Background()).First(&job, "task_id = ?", args.TaskID).Updates(models.Job{ - State: state, - }).Error; updateErr != nil { - logger.Errorf("update sync peers job result failed for request: %v", updateErr) + // Find all of the schedulers that has active scheduler cluster. + var schedulers []models.Scheduler + for _, schedulerCluster := range schedulerClusters { + var scheduler models.Scheduler + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + continue } + + logger.Infof("sync peers find scheduler cluster %s", schedulerCluster.Name) + schedulers = append(schedulers, scheduler) + } + logger.Infof("sync peers find schedulers count is %d", len(schedulers)) + + if err := s.CreateSyncPeers(ctx, schedulers); err != nil { + logger.Errorf("sync peers failed: %v", err) } case <-s.done: return @@ -133,55 +148,6 @@ func (s *syncPeers) Stop() { close(s.done) } -// syncPeers is the real working function in synchronous mode. -func (s *syncPeers) syncPeers(ctx context.Context, candidateSchedulerClusters []models.SchedulerCluster) error { - if !s.syncLocker.TryLock() { - return fmt.Errorf("another sync peers is already running") - } - defer s.syncLocker.Unlock() - - if len(candidateSchedulerClusters) == 0 { - if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil { - return err - } - } - - // Find all of the schedulers that has active scheduler cluster. - var candidateSchedulers []models.Scheduler - for _, candidateSchedulerCluster := range candidateSchedulerClusters { - var scheduler models.Scheduler - if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{ - SchedulerClusterID: candidateSchedulerCluster.ID, - State: models.SchedulerStateActive, - }).Error; err != nil { - continue - } - - logger.Infof("sync peers find candidate scheduler cluster %s", candidateSchedulerCluster.Name) - candidateSchedulers = append(candidateSchedulers, scheduler) - } - logger.Infof("sync peers find candidate schedulers count is %d", len(candidateSchedulers)) - - // Send sync peer requests to all available schedulers, - // and merge the sync peer results with the data in - // the peer table in the database. - for _, scheduler := range candidateSchedulers { - log := logger.WithScheduler(scheduler.Hostname, scheduler.IP, uint64(scheduler.SchedulerClusterID)) - - // Send sync peer request to scheduler. - results, err := s.createSyncPeers(ctx, scheduler) - if err != nil { - log.Error(err) - continue - } - log.Infof("sync peers count is %d", len(results)) - - // Merge sync peer results with the data in the peer table. - s.mergePeers(ctx, scheduler, results, log) - } - return nil -} - // createSyncPeers creates sync peers. func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) ([]*resource.Host, error) { var span trace.Span @@ -205,7 +171,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu logger.Infof("create sync peers in queue %v, task: %#v", queue, task) asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task) if err != nil { - logger.Errorf("create sync peers in queue %v failed: %v", queue, err) + logger.Errorf("create sync peers in queue %v failed", queue, err) return nil, err } @@ -226,111 +192,92 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu // Merge sync peer results with the data in the peer table. func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) { - // Fetch existing peers from the database - var existingPeers []models.Peer - var count int64 - - if err := s.db.Model(&models.Peer{}). - Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID). - Count(&count). - Error; err != nil { - log.Error("failed to count existing peers: ", err) - return + // Convert sync peer results from slice to map. + syncPeers := make(map[string]*resource.Host, len(results)) + for _, result := range results { + syncPeers[result.ID] = result } - log.Infof("total peers count: %d", count) - - pageSize := s.config.Job.SyncPeers.BatchSize - totalPages := (count + int64(pageSize-1)) / int64(pageSize) + oldPeers := make([]*models.Peer, 0, s.config.Job.SyncPeers.BatchSize) + if err := s.db.WithContext(ctx).Model(&models.Peer{}).Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID).FindInBatches(&oldPeers, s.config.Job.SyncPeers.BatchSize, func(tx *gorm.DB, batch int) error { + peers := make([]*models.Peer, 0, s.config.Job.SyncPeers.BatchSize) + for _, oldPeer := range oldPeers { + // If the peer exists in the sync peer results, update the peer data in the database with + // the sync peer results and delete the sync peer from the sync peers map. + isSeedPeer := pkgtypes.ParseHostType(oldPeer.Type) != pkgtypes.HostTypeNormal + id := idgen.HostIDV2(oldPeer.IP, oldPeer.Hostname, isSeedPeer) + if syncPeer, ok := syncPeers[id]; ok { + peers = append(peers, &models.Peer{ + Hostname: syncPeer.Hostname, + Type: syncPeer.Type.Name(), + IDC: syncPeer.Network.IDC, + Location: syncPeer.Network.Location, + IP: syncPeer.IP, + Port: syncPeer.Port, + DownloadPort: syncPeer.DownloadPort, + ObjectStoragePort: syncPeer.ObjectStoragePort, + State: models.PeerStateActive, + OS: syncPeer.OS, + Platform: syncPeer.Platform, + PlatformFamily: syncPeer.PlatformFamily, + PlatformVersion: syncPeer.PlatformVersion, + KernelVersion: syncPeer.KernelVersion, + GitVersion: syncPeer.Build.GitVersion, + GitCommit: syncPeer.Build.GitCommit, + BuildPlatform: syncPeer.Build.Platform, + SchedulerClusterID: uint(syncPeer.SchedulerClusterID), + }) + + // Delete the sync peer from the sync peers map. + delete(syncPeers, id) + } else { + // If the peer does not exist in the sync peer results, delete the peer in the database. + if err := tx.Unscoped().Delete(&models.Peer{}, oldPeer.ID).Error; err != nil { + log.Error(err) + } + } + } - for page := 1; page <= int(totalPages); page++ { - var batchPeers []models.Peer - if err := s.db.Preload("SchedulerCluster"). - Scopes(models.Paginate(page, pageSize)). - Where("scheduler_cluster_id = ?", scheduler.SchedulerClusterID). - Find(&batchPeers). - Error; err != nil { - log.Error("Failed to fetch peers in batch: ", err) - return + // Avoid save empty slice. + if len(peers) > 0 { + tx.Save(&peers) } - existingPeers = append(existingPeers, batchPeers...) + return nil + }).Error; err != nil { + log.Error(err) + return } - // Calculate differences using diffPeers function - toUpsert, toDelete := diffPeers(existingPeers, results) - - // Perform batch upsert - if len(toUpsert) > 0 { - // Construct the upsert query - if err := s.db.WithContext(ctx). - Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "id"}}, - UpdateAll: true, - }). - CreateInBatches(toUpsert, s.config.Job.SyncPeers.BatchSize). - Error; err != nil { - log.Error(err) - } + // Insert the sync peers that do not exist in the database into the peer table. + peers := make([]*models.Peer, 0, len(syncPeers)) + for _, syncPeer := range syncPeers { + peers = append(peers, &models.Peer{ + Hostname: syncPeer.Hostname, + Type: syncPeer.Type.Name(), + IDC: syncPeer.Network.IDC, + Location: syncPeer.Network.Location, + IP: syncPeer.IP, + Port: syncPeer.Port, + DownloadPort: syncPeer.DownloadPort, + ObjectStoragePort: syncPeer.ObjectStoragePort, + State: models.PeerStateActive, + OS: syncPeer.OS, + Platform: syncPeer.Platform, + PlatformFamily: syncPeer.PlatformFamily, + PlatformVersion: syncPeer.PlatformVersion, + KernelVersion: syncPeer.KernelVersion, + GitVersion: syncPeer.Build.GitVersion, + GitCommit: syncPeer.Build.GitCommit, + BuildPlatform: syncPeer.Build.Platform, + SchedulerClusterID: uint(syncPeer.SchedulerClusterID), + }) } - // Perform batch delete - if len(toDelete) > 0 { - if err := s.db.WithContext(ctx). - Delete(&toDelete). - Error; err != nil { + // Avoid save empty slice. + if len(peers) > 0 { + if err := s.db.WithContext(ctx).CreateInBatches(peers, len(peers)).Error; err != nil { log.Error(err) } } } - -func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) { - // Convert current peers to a map for quick lookup - currentPeersMap := slice.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string { - return item.ID - }) - - // Convert existing peers to a map for quick lookup - existingPeersMap := slice.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string { - return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal) - }) - - // Calculate differences - for id, currentPeer := range currentPeersMap { - if _, ok := existingPeersMap[id]; ok { - // Remove from existingPeersMap to mark it as processed - delete(existingPeersMap, id) - } - // Add all current peers to upsert list - toUpsert = append(toUpsert, convertToModelPeer(*currentPeer)) - } - - // Peers left in existingPeersMap are to be deleted - toDelete = slice.Values(existingPeersMap) - - return toUpsert, toDelete -} - -// Helper function to convert resource.Host to models.Peer -func convertToModelPeer(peer resource.Host) models.Peer { - return models.Peer{ - Hostname: peer.Hostname, - Type: peer.Type.Name(), - IDC: peer.Network.IDC, - Location: peer.Network.Location, - IP: peer.IP, - Port: peer.Port, - DownloadPort: peer.DownloadPort, - ObjectStoragePort: peer.ObjectStoragePort, - State: models.PeerStateActive, - OS: peer.OS, - Platform: peer.Platform, - PlatformFamily: peer.PlatformFamily, - PlatformVersion: peer.PlatformVersion, - KernelVersion: peer.KernelVersion, - GitVersion: peer.Build.GitVersion, - GitCommit: peer.Build.GitCommit, - BuildPlatform: peer.Build.Platform, - SchedulerClusterID: uint(peer.SchedulerClusterID), - } -} diff --git a/manager/job/sync_peers_test.go b/manager/job/sync_peers_test.go deleted file mode 100644 index caf7891fb72..00000000000 --- a/manager/job/sync_peers_test.go +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2024 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package job - -import ( - "sort" - "testing" - - "github.com/stretchr/testify/assert" - - "d7y.io/dragonfly/v2/manager/models" - "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/types" - resource "d7y.io/dragonfly/v2/scheduler/resource/standard" -) - -func Test_diffPeers(t *testing.T) { - type args struct { - existingPeers []models.Peer - currentPeers []*resource.Host - } - tests := []struct { - name string - args args - wantToUpsert []models.Peer - wantToDelete []models.Peer - }{ - { - name: "append", - args: args{ - existingPeers: []models.Peer{ - // delete for not existing - generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed), - // delete for original HostTypeNormal - generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal), - // delete for type changed - generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal), - // update for port changed - generateModePeer("127.0.0.1", "foo1", 80, 443, types.HostTypeSuperSeed), - // update for type changed - generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeStrongSeed), - }, - currentPeers: []*resource.Host{ - resource.NewHost( - idgen.HostIDV2("127.0.0.1", "foo1", true), - "127.0.0.1", - "foo1", - 80, - 80, - types.HostTypeSuperSeed), - resource.NewHost( - idgen.HostIDV2("127.0.0.2", "foo2", true), - "127.0.0.2", - "foo2", - 80, - 80, - types.HostTypeSuperSeed), - resource.NewHost( - idgen.HostIDV2("127.0.0.3", "foo3", true), - "127.0.0.3", - "foo3", - 80, - 80, - types.HostTypeSuperSeed), // append only - }, - }, - wantToUpsert: []models.Peer{ - generateModePeer("127.0.0.1", "foo1", 80, 80, types.HostTypeSuperSeed), - generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeSuperSeed), - generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed), - }, - wantToDelete: []models.Peer{ - generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal), - generateModePeer("127.0.0.5", "foo5", 80, 80, types.HostTypeNormal), - generateModePeer("127.0.0.6", "foo6", 80, 80, types.HostTypeSuperSeed), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers) - // sort the result to compare - sort.Slice(gotToUpdate, func(i, j int) bool { - return gotToUpdate[i].IP < gotToUpdate[j].IP - }) - sort.Slice(gotToDelete, func(i, j int) bool { - return gotToDelete[i].IP < gotToDelete[j].IP - }) - assert.Equalf(t, tt.wantToUpsert, gotToUpdate, "diffPeers toUpsert(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) - assert.Equalf(t, tt.wantToDelete, gotToDelete, "diffPeers toDelete(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) - }) - } -} - -func generateModePeer(ip, hostname string, port, downloadPort int32, typ types.HostType) models.Peer { - return models.Peer{ - Hostname: hostname, - Type: typ.Name(), - IP: ip, - Port: port, - State: models.PeerStateActive, - DownloadPort: downloadPort, - } -} diff --git a/manager/service/job.go b/manager/service/job.go index 2fdbbbf3dea..8677b661cf0 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -22,11 +22,9 @@ import ( "fmt" machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" - "github.com/google/uuid" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" - "d7y.io/dragonfly/v2/manager/job" "d7y.io/dragonfly/v2/manager/metrics" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" @@ -36,47 +34,13 @@ import ( "d7y.io/dragonfly/v2/pkg/structure" ) -func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) { - args, err := structure.StructToMap(json) +func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error { + schedulers, err := s.findSchedulerInClusters(ctx, json.SchedulerClusterIDs) if err != nil { - return nil, err - } - - candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, nil) - if err != nil { - return nil, err - } - - var candidateClusters []models.SchedulerCluster - for _, scheduler := range candidateSchedulers { - candidateClusters = append(candidateClusters, scheduler.SchedulerCluster) - } - - taskID := fmt.Sprintf("manager_%v", uuid.New().String()) - - if err = s.job.SyncPeers.Run(ctx, job.SyncPeersArgs{ - CandidateSchedulerClusters: candidateClusters, - TaskID: taskID, - }); err != nil { - return nil, err - } - - // job here is a local one controlled by the manager self. - job := models.Job{ - TaskID: taskID, - BIO: json.BIO, - Args: args, - Type: json.Type, - State: machineryv1tasks.StateStarted, - UserID: json.UserID, - SchedulerClusters: candidateClusters, - } - - if err = s.db.WithContext(ctx).Create(&job).Error; err != nil { - return nil, err + return err } - return &job, nil + return s.job.SyncPeers.CreateSyncPeers(ctx, schedulers) } func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) { @@ -101,7 +65,7 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return nil, err } - candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, []string{types.SchedulerFeaturePreheat}) + candidateSchedulers, err := s.findAllCandidateSchedulersInClusters(ctx, json.SchedulerClusterIDs, []string{types.SchedulerFeaturePreheat}) if err != nil { return nil, err } @@ -144,7 +108,7 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask return nil, err } - schedulers, err := s.findSchedulers(ctx, json.SchedulerClusterIDs) + schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs) if err != nil { return nil, err } @@ -191,7 +155,7 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele return nil, err } - schedulers, err := s.findSchedulers(ctx, json.SchedulerClusterIDs) + schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs) if err != nil { return nil, err } @@ -224,7 +188,54 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele return &job, nil } -func (s *service) findSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { +func (s *service) findSchedulerInClusters(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { + var activeSchedulers []models.Scheduler + if len(schedulerClusterIDs) != 0 { + // Find the scheduler clusters by request. + for _, schedulerClusterID := range schedulerClusterIDs { + schedulerCluster := models.SchedulerCluster{} + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { + return nil, fmt.Errorf("scheduler cluster id %d: %w", schedulerClusterID, err) + } + + scheduler := models.Scheduler{} + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + return nil, fmt.Errorf("scheduler cluster id %d: %w", schedulerClusterID, err) + } + + activeSchedulers = append(activeSchedulers, scheduler) + } + } else { + // Find all of the scheduler clusters that has active scheduler. + var schedulerClusters []models.SchedulerCluster + if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { + return nil, err + } + + for _, schedulerCluster := range schedulerClusters { + scheduler := models.Scheduler{} + if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + State: models.SchedulerStateActive, + }).Error; err != nil { + continue + } + + activeSchedulers = append(activeSchedulers, scheduler) + } + } + + if len(activeSchedulers) == 0 { + return nil, errors.New("active schedulers not found") + } + + return activeSchedulers, nil +} + +func (s *service) findAllSchedulersInClusters(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { var activeSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { // Find the scheduler clusters by request. @@ -271,7 +282,7 @@ func (s *service) findSchedulers(ctx context.Context, schedulerClusterIDs []uint return activeSchedulers, nil } -func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint, features []string) ([]models.Scheduler, error) { +func (s *service) findAllCandidateSchedulersInClusters(ctx context.Context, schedulerClusterIDs []uint, features []string) ([]models.Scheduler, error) { var candidateSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { // Find the scheduler clusters by request. diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index 638b46b3d2f..fd654b94a0e 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -341,12 +341,11 @@ func (mr *MockServiceMockRecorder) CreateSeedPeerCluster(arg0, arg1 any) *gomock } // CreateSyncPeersJob mocks base method. -func (m *MockService) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) { +func (m *MockService) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateSyncPeersJob", ctx, json) - ret0, _ := ret[0].(*models.Job) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // CreateSyncPeersJob indicates an expected call of CreateSyncPeersJob. diff --git a/manager/service/service.go b/manager/service/service.go index b849b006445..1650bab1eee 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -115,7 +115,7 @@ type Service interface { GetConfigs(context.Context, types.GetConfigsQuery) ([]models.Config, int64, error) CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*models.Job, error) - CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) + CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error) CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error) DestroyJob(context.Context, uint) error diff --git a/manager/types/job.go b/manager/types/job.go index e0e4f6e13dc..58160664aaf 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -35,37 +35,70 @@ const ( ) type CreateJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args map[string]any `json:"args" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SeedPeerClusterIDs []uint `json:"seed_peer_cluster_ids" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // Type is the type of the job. + Type string `json:"type" binding:"required"` + + // Args is the arguments of the job. + Args map[string]any `json:"args" binding:"omitempty"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` + + // SeedPeerClusterIDs is the seed peer cluster ids of the job. + SeedPeerClusterIDs []uint `json:"seed_peer_cluster_ids" binding:"omitempty"` + + // SchedulerClusterIDs is the scheduler cluster ids of the job. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } type UpdateJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` } type JobParams struct { + // Type is the type of the job. ID uint `uri:"id" binding:"required"` } type GetJobsQuery struct { - Type string `form:"type" binding:"omitempty"` - State string `form:"state" binding:"omitempty,oneof=PENDING RECEIVED STARTED RETRY SUCCESS FAILURE"` - UserID uint `form:"user_id" binding:"omitempty"` - Page int `form:"page" binding:"omitempty,gte=1"` - PerPage int `form:"per_page" binding:"omitempty,gte=1,lte=10000000"` + // Type is the type of the job. + Type string `form:"type" binding:"omitempty"` + + // State is the state of the job. + State string `form:"state" binding:"omitempty,oneof=PENDING RECEIVED STARTED RETRY SUCCESS FAILURE"` + + // UserID is the user id of the job. + UserID uint `form:"user_id" binding:"omitempty"` + + // Page is the page number of the job list. + Page int `form:"page" binding:"omitempty,gte=1"` + + // PerPage is the item count per page of the job list. + PerPage int `form:"per_page" binding:"omitempty,gte=1,lte=10000000"` } type CreatePreheatJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args PreheatArgs `json:"args" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // Type is the preheating type, support image and file. + Type string `json:"type" binding:"required"` + + // Args is the arguments of the preheating job. + Args PreheatArgs `json:"args" binding:"omitempty"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` + + // SchedulerClusterIDs is the scheduler cluster ids of the job. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } type PreheatArgs struct { @@ -104,19 +137,34 @@ type PreheatArgs struct { } type CreateSyncPeersJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` - Timeout time.Duration `json:"timeout" binding:"omitempty"` + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // Type is the type of the job. + Type string `json:"type" binding:"required"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` + + // SeedPeerClusterIDs is the seed peer cluster ids of the job. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } type CreateGetTaskJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args GetTaskArgs `json:"args" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // Type is the type of the job. + Type string `json:"type" binding:"required"` + + // Args is the arguments of the job. + Args GetTaskArgs `json:"args" binding:"omitempty"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` + + // SchedulerClusterIDs is the scheduler cluster ids of the job. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } type GetTaskArgs struct { @@ -137,11 +185,20 @@ type GetTaskArgs struct { } type CreateDeleteTaskJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args DeleteTaskArgs `json:"args" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // Type is the type of the job. + Type string `json:"type" binding:"required"` + + // Args is the arguments of the job. + Args DeleteTaskArgs `json:"args" binding:"omitempty"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` + + // SchedulerClusterIDs is the scheduler cluster ids of the job. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } type DeleteTaskArgs struct { diff --git a/pkg/container/slice/slice.go b/pkg/container/slice/slice.go deleted file mode 100644 index 432b81f76eb..00000000000 --- a/pkg/container/slice/slice.go +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2023 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package slice - -func KeyBy[K comparable, V any](collection []V, iteratee func(item V) K) map[K]V { - result := make(map[K]V, len(collection)) - - for i := range collection { - k := iteratee(collection[i]) - result[k] = collection[i] - } - - return result -} - -func Values[K comparable, V any](in ...map[K]V) []V { - size := 0 - for i := range in { - size += len(in[i]) - } - result := make([]V, 0, size) - - for i := range in { - for k := range in[i] { - result = append(result, in[i][k]) - } - } - - return result -} diff --git a/pkg/container/slice/slice_test.go b/pkg/container/slice/slice_test.go deleted file mode 100644 index 8fd3846a446..00000000000 --- a/pkg/container/slice/slice_test.go +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2023 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package slice - -import ( - "reflect" - "sort" - "testing" -) - -// TestKeyBy tests the KeyBy function -func TestKeyBy(t *testing.T) { - type Person struct { - ID int - Name string - } - - people := []Person{ - {ID: 1, Name: "Alice"}, - {ID: 2, Name: "Bob"}, - {ID: 3, Name: "Charlie"}, - } - - // Test case 1: Key by ID - keyByID := KeyBy(people, func(p Person) int { - return p.ID - }) - expectedKeyByID := map[int]Person{ - 1: {ID: 1, Name: "Alice"}, - 2: {ID: 2, Name: "Bob"}, - 3: {ID: 3, Name: "Charlie"}, - } - if !reflect.DeepEqual(keyByID, expectedKeyByID) { - t.Errorf("KeyBy by ID failed, expected %v, got %v", expectedKeyByID, keyByID) - } - - // Test case 2: Key by Name - keyByName := KeyBy(people, func(p Person) string { - return p.Name - }) - expectedKeyByName := map[string]Person{ - "Alice": {ID: 1, Name: "Alice"}, - "Bob": {ID: 2, Name: "Bob"}, - "Charlie": {ID: 3, Name: "Charlie"}, - } - if !reflect.DeepEqual(keyByName, expectedKeyByName) { - t.Errorf("KeyBy by Name failed, expected %v, got %v", expectedKeyByName, keyByName) - } -} - -// TestValues tests the Values function -func TestValues(t *testing.T) { - map1 := map[int]string{ - 1: "one", - 2: "two", - } - map2 := map[int]string{ - 3: "three", - 4: "four", - } - - // Test case 1: Values from one map - values1 := Values(map1) - expectedValues1 := []string{"one", "two"} - - sort.Strings(values1) - sort.Strings(expectedValues1) - if !reflect.DeepEqual(values1, expectedValues1) { - t.Errorf("Values from one map failed, expected %v, got %v", expectedValues1, values1) - } - - // Test case 2: Values from multiple maps - values2 := Values(map1, map2) - expectedValues2 := []string{"one", "two", "three", "four"} - - sort.Strings(values2) - sort.Strings(expectedValues2) - if !reflect.DeepEqual(values2, expectedValues2) { - t.Errorf("Values from multiple maps failed, expected %v, got %v", expectedValues2, values2) - } - - // Test case 3: Values from empty maps - values3 := Values(map[int]string{}, map[int]string{}) - expectedValues3 := []string{} - if !reflect.DeepEqual(values3, expectedValues3) { - t.Errorf("Values from empty maps failed, expected %v, got %v", expectedValues3, values3) - } -} diff --git a/pkg/digest/digest.go b/pkg/digest/digest.go index 4358e7d9bbc..618b5fc2f2b 100644 --- a/pkg/digest/digest.go +++ b/pkg/digest/digest.go @@ -123,7 +123,7 @@ func Parse(digest string) (*Digest, error) { switch algorithm { case AlgorithmCRC32: - if len(encoded) != 8 && len(encoded) != 10 { + if len(encoded) <= 0 { return nil, errors.New("invalid encoded") } case AlgorithmBlake3: diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 5d47d4652b3..50c9590e361 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -472,6 +472,7 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ resource.WithPlatformFamily(req.GetPlatformFamily()), resource.WithPlatformVersion(req.GetPlatformVersion()), resource.WithKernelVersion(req.GetKernelVersion()), + resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)), } if concurrentUploadLimit > 0 { @@ -541,10 +542,6 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ })) } - if req.GetSchedulerClusterId() != 0 { - options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID))) - } - if req.GetObjectStoragePort() != 0 { options = append(options, resource.WithObjectStoragePort(req.GetObjectStoragePort())) } @@ -654,6 +651,9 @@ func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) e // Leave peers in host. host.LeavePeers() + + // Delete host in scheduler. + v.resource.HostManager().Delete(host.ID) return nil } diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 697fdfcbf50..ec051f5a57c 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -2232,6 +2232,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2247,6 +2249,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2262,6 +2266,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2278,6 +2284,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2294,6 +2302,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2310,6 +2320,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2326,6 +2338,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2342,6 +2356,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2358,6 +2374,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2374,6 +2392,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2390,6 +2410,8 @@ func TestServiceV1_LeaveHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 349a08c9124..17d5acb6b59 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -521,6 +521,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ standard.WithPlatformFamily(req.Host.GetPlatformFamily()), standard.WithPlatformVersion(req.Host.GetPlatformVersion()), standard.WithKernelVersion(req.Host.GetKernelVersion()), + standard.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)), } if concurrentUploadLimit > 0 { @@ -596,10 +597,6 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ })) } - if req.Host.GetSchedulerClusterId() != 0 { - options = append(options, standard.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID))) - } - if req.GetInterval() != nil { options = append(options, standard.WithAnnounceInterval(req.GetInterval().AsDuration())) } @@ -979,6 +976,9 @@ func (v *V2) DeleteHost(ctx context.Context, req *schedulerv2.DeleteHostRequest) // Leave peers in host. host.LeavePeers() + + // Delete host in scheduler. + v.resource.HostManager().Delete(req.GetHostId()) return nil } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index f0043152e80..f010a0244d8 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -1591,6 +1591,8 @@ func TestServiceV2_DeleteHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *standard.Peer, err error) { @@ -1606,6 +1608,8 @@ func TestServiceV2_DeleteHost(t *testing.T) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Delete(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, peer *standard.Peer, err error) {