diff --git a/internal/job/types.go b/internal/job/types.go index 2c845002139..cdbdb55a760 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -24,6 +24,7 @@ type PreheatRequest struct { Headers map[string]string `json:"headers" validate:"omitempty"` Application string `json:"application" validate:"omitempty"` Priority int32 `json:"priority" validate:"omitempty"` + PieceLength uint32 `json:"pieceLength" validate:"omitempty"` } type PreheatResponse struct { diff --git a/manager/job/preheat.go b/manager/job/preheat.go index e5cf77dde44..abee3a92f47 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -109,6 +109,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul URL: json.URL, Tag: json.Tag, FilteredQueryParams: json.FilteredQueryParams, + PieceLength: json.PieceLength, Headers: json.Headers, }, } @@ -304,6 +305,7 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh URL: image.blobsURL(v.Digest.String()), Tag: args.Tag, FilteredQueryParams: args.FilteredQueryParams, + PieceLength: args.PieceLength, Headers: nethttp.HeaderToMap(header), } diff --git a/manager/service/job.go b/manager/service/job.go index f312bfb4143..f2836a05fc9 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -37,6 +37,10 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return nil, err } + if json.Args.PieceLength == 0 { + json.Args.PieceLength = types.DefaultPreheatJobPieceLength + } + groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args) if err != nil { return nil, err diff --git a/manager/service/preheat.go b/manager/service/preheat.go index 83f43ced3ae..cb4345a91eb 100644 --- a/manager/service/preheat.go +++ b/manager/service/preheat.go @@ -51,6 +51,7 @@ func (s *service) CreateV1Preheat(ctx context.Context, json types.CreateV1Prehea Type: json.Type, URL: json.URL, FilteredQueryParams: json.FilteredQueryParams, + PieceLength: types.DefaultPreheatJobPieceLength, Headers: json.Headers, }, }) diff --git a/manager/types/job.go b/manager/types/job.go index 056476e7a7f..4d3a286ce33 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -16,6 +16,11 @@ package types +const ( + // DefaultPreheatJobPieceLength is the default piece length for preheating. + DefaultPreheatJobPieceLength = 4 * 1024 * 1024 +) + type CreateJobRequest struct { BIO string `json:"bio" binding:"omitempty"` Type string `json:"type" binding:"required"` @@ -65,6 +70,9 @@ type PreheatArgs struct { // FilteredQueryParams is the filtered query params for preheating. FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"` + // PieceLength is the piece length for preheating. + PieceLength uint32 `json:"pieceLength" binding:"omitempty"` + // Headers is the http headers for authentication. Headers map[string]string `json:"headers" binding:"omitempty"` diff --git a/scheduler/config/dynconfig.go b/scheduler/config/dynconfig.go index d979950ece6..dfd1fd392d0 100644 --- a/scheduler/config/dynconfig.go +++ b/scheduler/config/dynconfig.go @@ -419,9 +419,9 @@ func (mc *managerClient) Get() (any, error) { Ip: mc.config.Server.AdvertiseIP.String(), }) if err != nil { - if s, ok := status.FromError(err); ok { + if st, ok := status.FromError(err); ok { // TODO Compatible with old version manager. - if slices.Contains([]codes.Code{codes.Unimplemented, codes.NotFound}, s.Code()) { + if slices.Contains([]codes.Code{codes.Unimplemented, codes.NotFound}, st.Code()) { return DynconfigData{ Scheduler: getSchedulerResp, Applications: nil, diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 3331048a062..59f074149a2 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -22,20 +22,23 @@ import ( "context" "errors" "fmt" + "io" "strings" "time" "github.com/RichardKnop/machinery/v1" - "github.com/go-http-utils/headers" "github.com/go-playground/validator/v10" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1" commonv1 "d7y.io/api/v2/pkg/apis/common/v1" + commonv2 "d7y.io/api/v2/pkg/apis/common/v2" + dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" ) @@ -148,8 +151,9 @@ func (j *job) Serve() { }() } -// preheat is a job to preheat. -func (j *job) preheat(ctx context.Context, req string) error { +// preheat is a job to preheat, it is not supported to preheat +// with range requests. +func (j *job) preheat(ctx context.Context, data string) error { ctx, cancel := context.WithTimeout(ctx, preheatTimeout) defer cancel() @@ -163,63 +167,120 @@ func (j *job) preheat(ctx context.Context, req string) error { return fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP) } - preheat := &internaljob.PreheatRequest{} - if err := internaljob.UnmarshalRequest(req, preheat); err != nil { - logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req) + req := &internaljob.PreheatRequest{} + if err := internaljob.UnmarshalRequest(data, req); err != nil { + logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data) return err } - if err := validator.New().Struct(preheat); err != nil { - logger.Errorf("preheat %s validate failed: %s", preheat.URL, err.Error()) + if err := validator.New().Struct(req); err != nil { + logger.Errorf("preheat %s validate failed: %s", req.URL, err.Error()) return err } - urlMeta := &commonv1.UrlMeta{ - Digest: preheat.Digest, - Tag: preheat.Tag, - Filter: preheat.FilteredQueryParams, - Header: preheat.Headers, - Application: preheat.Application, - Priority: commonv1.Priority(preheat.Priority), - } - if preheat.Headers != nil { - if r, ok := preheat.Headers[headers.Range]; ok { - // Range in dragonfly is without "bytes=". - urlMeta.Range = strings.TrimPrefix(r, http.RangePrefix) + // Preheat by v2 grpc protocol. If seed peer does not support + // v2 protocol, preheat by v1 grpc protocol. + if err := j.preheatV2(ctx, req); err != nil { + logger.Errorf("preheat %s failed: %s", req.URL, err.Error()) + + if st, ok := status.FromError(err); ok { + if st.Code() == codes.Unimplemented { + if err := j.preheatV1(ctx, req); err != nil { + return err + } + + return nil + } } + + return err + } + + return nil +} + +// preheatV1 preheats job by v1 grpc protocol. +func (j *job) preheatV1(ctx context.Context, req *internaljob.PreheatRequest) error { + urlMeta := &commonv1.UrlMeta{ + Digest: req.Digest, + Tag: req.Tag, + Filter: req.FilteredQueryParams, + Header: req.Headers, + Application: req.Application, + Priority: commonv1.Priority(req.Priority), } // Trigger seed peer download seeds. - taskID := idgen.TaskIDV1(preheat.URL, urlMeta) - log := logger.WithTask(taskID, preheat.URL) - log.Infof("preheat %s tag: %s, range: %s, filtered query params: %s, digest: %s", - preheat.URL, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest) - log.Debugf("preheat %s headers: %#v", preheat.URL, urlMeta.Header) + taskID := idgen.TaskIDV1(req.URL, urlMeta) + log := logger.WithTask(taskID, req.URL) + log.Infof("preheat(v1) %s tag: %s, filtered query params: %s, digest: %s, headers: %#v", + req.URL, urlMeta.Tag, urlMeta.Filter, urlMeta.Digest, urlMeta.Header) stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{ TaskId: taskID, - Url: preheat.URL, + Url: req.URL, UrlMeta: urlMeta, }) if err != nil { - log.Errorf("preheat %s failed: %s", preheat.URL, err.Error()) + log.Errorf("preheat(v1) %s failed: %s", req.URL, err.Error()) return err } for { piece, err := stream.Recv() if err != nil { - log.Errorf("preheat %s recive piece failed: %s", preheat.URL, err.Error()) + log.Errorf("preheat(v1) %s recive piece failed: %s", req.URL, err.Error()) return err } if piece.Done == true { - log.Infof("preheat %s succeeded", preheat.URL) + log.Infof("preheat(v1) %s succeeded", req.URL) return nil } } } +// preheatV2 preheats job by v2 grpc protocol. +func (j *job) preheatV2(ctx context.Context, req *internaljob.PreheatRequest) error { + filteredQueryParams := strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator) + taskID := idgen.TaskIDV2(req.URL, req.Digest, req.Tag, req.Application, int32(req.PieceLength), filteredQueryParams) + + log := logger.WithTask(taskID, req.URL) + log.Infof("preheat(v2) %s tag: %s, filtered query params: %s, digest: %s, headers: %#v", + req.URL, req.Tag, req.FilteredQueryParams, req.Digest, req.Headers) + + stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{ + Download: &commonv2.Download{ + Url: req.URL, + Digest: &req.Digest, + Type: commonv2.TaskType_DFDAEMON, + Tag: &req.Tag, + Application: &req.Application, + Priority: commonv2.Priority(req.Priority), + FilteredQueryParams: filteredQueryParams, + RequestHeader: req.Headers, + PieceLength: uint32(req.PieceLength), + }}) + if err != nil { + logger.Errorf("preheat(v2) %s failed: %s", req.URL, err.Error()) + return err + } + + // Wait for the download task to complete. + for { + _, err := stream.Recv() + if err != nil { + if err == io.EOF { + log.Infof("preheat(v2) %s succeeded", req.URL) + return nil + } + + log.Errorf("preheat(v2) %s recive piece failed: %s", req.URL, err.Error()) + return err + } + } +} + // syncPeers is a job to sync peers. func (j *job) syncPeers() (string, error) { var hosts []*resource.Host diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 6dc81fad18c..27ce8978963 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -819,7 +819,7 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { // Handle resource included host, task, and peer. - _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload()) + host, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload()) if err != nil { return err } @@ -836,7 +836,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S // If scheduler trigger seed peer download back-to-source, // the needBackToSource flag should be true. case download.GetNeedBackToSource(): - peer.Log.Infof("peer need back to source") + peer.Log.Info("peer need back to source") peer.NeedBackToSource.Store(true) // If task is pending, failed, leave, or succeeded and has no available peer, // scheduler trigger seed peer download back-to-source. @@ -845,18 +845,27 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S task.FSM.Is(resource.TaskStateLeave) || task.FSM.Is(resource.TaskStateSucceeded) && !task.HasAvailablePeer(blocklist): - // If trigger the seed peer download back-to-source, - // the need back-to-source flag should be true. - download.NeedBackToSource = true - - // Output path should be empty, prevent the seed peer - // copy file to output path. - download.OutputPath = nil - if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil { - // Collect RegisterPeerFailureCount metrics. - metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() - return err + + // If HostType is normal, trigger seed peer download back-to-source. + if host.Type == types.HostTypeNormal { + // If trigger the seed peer download back-to-source, + // the need back-to-source flag should be true. + download.NeedBackToSource = true + + // Output path should be empty, prevent the seed peer + // copy file to output path. + download.OutputPath = nil + if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil { + // Collect RegisterPeerFailureCount metrics. + metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), + peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + return err + } + } else { + // If HostType is not normal, peer is seed peer, and + // trigger seed peer download back-to-source directly. + peer.Log.Info("peer need back to source") + peer.NeedBackToSource.Store(true) } }