diff --git a/client-rs b/client-rs index 1a85ccc7a66..b09f9d3e61c 160000 --- a/client-rs +++ b/client-rs @@ -1 +1 @@ -Subproject commit 1a85ccc7a665be82a677c67185a28a1def99c834 +Subproject commit b09f9d3e61cf7361c819789f0bc2ee89044e36f9 diff --git a/internal/job/types.go b/internal/job/types.go index a0382a964ad..d54f9a20eb5 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -24,7 +24,6 @@ import ( type PreheatRequest struct { URL string `json:"url" validate:"required,url"` Tag string `json:"tag" validate:"omitempty"` - Digest string `json:"digest" validate:"omitempty"` FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"` Headers map[string]string `json:"headers" validate:"omitempty"` Application string `json:"application" validate:"omitempty"` diff --git a/manager/handlers/job.go b/manager/handlers/job.go index 9ba338b9e40..f7f17ae582a 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -70,6 +70,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } + if json.Args.TaskID == "" && json.Args.URL == "" { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"}) + return + } + job, err := h.service.CreateGetTaskJob(ctx.Request.Context(), json) if err != nil { ctx.Error(err) // nolint: errcheck @@ -84,6 +89,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } + if json.Args.TaskID == "" && json.Args.URL == "" { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"}) + return + } + job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json) if err != nil { ctx.Error(err) // nolint: errcheck diff --git a/manager/handlers/job_test.go b/manager/handlers/job_test.go index 37d288b0dad..5bcde5f72e6 100644 --- a/manager/handlers/job_test.go +++ b/manager/handlers/job_test.go @@ -43,13 +43,19 @@ var ( { "type": "get_task", "user_id": 4, - "bio": "bio" + "bio": "bio", + "args": { + "task_id": "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8" + } }` mockDeleteTaskJobReqBody = ` { "type": "delete_task", "user_id": 4, - "bio": "bio" + "bio": "bio", + "args": { + "task_id": "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49" + } }` mockOtherJobReqBody = ` { @@ -66,11 +72,15 @@ var ( UserID: 4, Type: "get_task", BIO: "bio", + Args: types.GetTaskArgs{TaskID: "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8"}, } mockCreateDeleteTaskJobRequest = types.CreateDeleteTaskJobRequest{ UserID: 4, Type: "delete_task", BIO: "bio", + Args: types.DeleteTaskArgs{ + TaskID: "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49", + }, } mockUpdateJobRequest = types.UpdateJobRequest{ UserID: 4, diff --git a/manager/job/task.go b/manager/job/task.go index 9ac70036163..de078f50511 100644 --- a/manager/job/task.go +++ b/manager/job/task.go @@ -32,6 +32,7 @@ import ( "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" + "d7y.io/dragonfly/v2/pkg/idgen" ) // Task is an interface for manager tasks. @@ -60,7 +61,14 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID)) defer span.End() - args, err := internaljob.MarshalRequest(json) + taskID := json.TaskID + if json.URL != "" { + taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) + } + + args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{ + TaskID: taskID, + }) if err != nil { logger.Errorf("get tasks marshal request: %v, error: %v", args, err) return nil, err @@ -111,7 +119,15 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID)) defer span.End() - args, err := internaljob.MarshalRequest(json) + taskID := json.TaskID + if json.URL != "" { + taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams)) + } + + args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{ + TaskID: taskID, + Timeout: json.Timeout, + }) if err != nil { logger.Errorf("delete task marshal request: %v, error: %v", args, err) return nil, err diff --git a/manager/types/job.go b/manager/types/job.go index 6b7bb53b12c..3036e5981f9 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -112,7 +112,20 @@ type CreateGetTaskJobRequest struct { } type GetTaskArgs struct { - TaskID string `json:"task_id" binding:"required"` + // TaskID is the task id for getting. + TaskID string `json:"task_id" binding:"omitempty"` + + // URL is the download url of the task. + URL string `json:"url" binding:"omitempty"` + + // Tag is the tag of the task. + Tag string `json:"tag" binding:"omitempty"` + + // Application is the application of the task. + Application string `json:"application" binding:"omitempty"` + + // FilteredQueryParams is the filtered query params of the task. + FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"` } type CreateDeleteTaskJobRequest struct { @@ -125,7 +138,19 @@ type CreateDeleteTaskJobRequest struct { type DeleteTaskArgs struct { // TaskID is the task id for deleting. - TaskID string `json:"task_id" binding:"required"` + TaskID string `json:"task_id" binding:"omitempty"` + + // URL is the download url of the task. + URL string `json:"url" binding:"omitempty"` + + // Tag is the tag of the task. + Tag string `json:"tag" binding:"omitempty"` + + // Application is the application of the task. + Application string `json:"application" binding:"omitempty"` + + // FilteredQueryParams is the filtered query params of the task. + FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"` // Timeout is the timeout for deleting, default is 30 minutes. Timeout time.Duration `json:"timeout" binding:"omitempty"` diff --git a/pkg/idgen/task_id.go b/pkg/idgen/task_id.go index 993e6fe3969..44f82847296 100644 --- a/pkg/idgen/task_id.go +++ b/pkg/idgen/task_id.go @@ -50,7 +50,7 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string { return pkgdigest.SHA256FromStrings(url) } - filteredQueryParams := parseFilteredQueryParams(meta.Filter) + filteredQueryParams := ParseFilteredQueryParams(meta.Filter) var ( u string @@ -81,8 +81,8 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string { return pkgdigest.SHA256FromStrings(data...) } -// parseFilteredQueryParams parses filtered query params. -func parseFilteredQueryParams(rawFilteredQueryParams string) []string { +// ParseFilteredQueryParams parses filtered query params. +func ParseFilteredQueryParams(rawFilteredQueryParams string) []string { if pkgstrings.IsBlank(rawFilteredQueryParams) { return nil } @@ -91,11 +91,11 @@ func parseFilteredQueryParams(rawFilteredQueryParams string) []string { } // TaskIDV2 generates v2 version of task id. -func TaskIDV2(url, digest, tag, application string, filteredQueryParams []string) string { +func TaskIDV2(url, tag, application string, filteredQueryParams []string) string { url, err := neturl.FilterQueryParams(url, filteredQueryParams) if err != nil { url = "" } - return pkgdigest.SHA256FromStrings(url, digest, tag, application) + return pkgdigest.SHA256FromStrings(url, tag, application) } diff --git a/pkg/idgen/task_id_test.go b/pkg/idgen/task_id_test.go index 11cf4d003e5..6b7e95f9a26 100644 --- a/pkg/idgen/task_id_test.go +++ b/pkg/idgen/task_id_test.go @@ -110,7 +110,6 @@ func TestTaskIDV2(t *testing.T) { tests := []struct { name string url string - digest string tag string application string filters []string @@ -119,22 +118,12 @@ func TestTaskIDV2(t *testing.T) { { name: "generate taskID", url: "https://example.com", - digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4", tag: "foo", application: "bar", filters: []string{}, expect: func(t *testing.T, d any) { assert := assert.New(t) - assert.Equal(d, "c8659b8372599cf22c7a2de260dd6e148fca6d4e1c2940703022867f739d071d") - }, - }, - { - name: "generate taskID with digest", - url: "https://example.com", - digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4", - expect: func(t *testing.T, d any) { - assert := assert.New(t) - assert.Equal(d, "60469c583429af631a45540f05e08805b31ca4f84e7974cad35cfc84c197bcf8") + assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d") }, }, { @@ -168,7 +157,7 @@ func TestTaskIDV2(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.filters)) + tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters)) }) } } diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 007c5c95391..a2392643d1f 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -170,7 +170,7 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) { return "", err } - taskID := idgen.TaskIDV2(req.URL, req.Digest, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)) + taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator)) log := logger.WithTask(taskID, req.URL) log.Infof("preheat %s request: %#v", req.URL, req) @@ -279,7 +279,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj taskID, &dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{ Url: req.URL, - Digest: &req.Digest, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, @@ -376,7 +375,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj // preheatV1 preheats job by v1 grpc protocol. func (j *job) preheatV1(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) { urlMeta := &commonv1.UrlMeta{ - Digest: req.Digest, Tag: req.Tag, Filter: req.FilteredQueryParams, Header: req.Headers, @@ -424,7 +422,6 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{ Download: &commonv2.Download{ Url: req.URL, - Digest: &req.Digest, Type: commonv2.TaskType_STANDARD, Tag: &req.Tag, Application: &req.Application, diff --git a/scheduler/resource/standard/task_test.go b/scheduler/resource/standard/task_test.go index 63e0dd2b946..0547d5c435a 100644 --- a/scheduler/resource/standard/task_test.go +++ b/scheduler/resource/standard/task_test.go @@ -49,7 +49,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo" diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 7d7264d74b7..376be64fc0e 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -134,7 +134,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo" diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 01668709bb0..47b44d5f182 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -163,7 +163,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo" diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 3d3884aa63a..8628aad6886 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -185,7 +185,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo"