Skip to content

Commit

Permalink
feat: support searching task by url for GetTask and DeleteTask
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Oct 24, 2024
1 parent bad6972 commit 2e7bb47
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 34 deletions.
1 change: 0 additions & 1 deletion internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

if json.Args.TaskID == "" && json.Args.URL == "" {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"})
return
}

job, err := h.service.CreateGetTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
Expand All @@ -84,6 +89,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

if json.Args.TaskID == "" && json.Args.URL == "" {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"})
return
}

job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
Expand Down
14 changes: 12 additions & 2 deletions manager/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ var (
{
"type": "get_task",
"user_id": 4,
"bio": "bio"
"bio": "bio",
"args": {
"task_id": "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8"
}
}`
mockDeleteTaskJobReqBody = `
{
"type": "delete_task",
"user_id": 4,
"bio": "bio"
"bio": "bio",
"args": {
"task_id": "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49"
}
}`
mockOtherJobReqBody = `
{
Expand All @@ -66,11 +72,15 @@ var (
UserID: 4,
Type: "get_task",
BIO: "bio",
Args: types.GetTaskArgs{TaskID: "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8"},
}
mockCreateDeleteTaskJobRequest = types.CreateDeleteTaskJobRequest{
UserID: 4,
Type: "delete_task",
BIO: "bio",
Args: types.DeleteTaskArgs{
TaskID: "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49",
},
}
mockUpdateJobRequest = types.UpdateJobRequest{
UserID: 4,
Expand Down
20 changes: 18 additions & 2 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/idgen"
)

// Task is an interface for manager tasks.
Expand Down Expand Up @@ -60,7 +61,14 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,
span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{
TaskID: taskID,
})
if err != nil {
logger.Errorf("get tasks marshal request: %v, error: %v", args, err)
return nil, err
Expand Down Expand Up @@ -111,7 +119,15 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul
span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{
TaskID: taskID,
Timeout: json.Timeout,
})
if err != nil {
logger.Errorf("delete task marshal request: %v, error: %v", args, err)
return nil, err
Expand Down
29 changes: 27 additions & 2 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,20 @@ type CreateGetTaskJobRequest struct {
}

type GetTaskArgs struct {
TaskID string `json:"task_id" binding:"required"`
// TaskID is the task id for getting.
TaskID string `json:"task_id" binding:"omitempty"`

// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

// Application is the application of the task.
Application string `json:"application" binding:"omitempty"`

// FilteredQueryParams is the filtered query params of the task.
FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"`
}

type CreateDeleteTaskJobRequest struct {
Expand All @@ -125,7 +138,19 @@ type CreateDeleteTaskJobRequest struct {

type DeleteTaskArgs struct {
// TaskID is the task id for deleting.
TaskID string `json:"task_id" binding:"required"`
TaskID string `json:"task_id" binding:"omitempty"`

// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

// Application is the application of the task.
Application string `json:"application" binding:"omitempty"`

// FilteredQueryParams is the filtered query params of the task.
FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"`

// Timeout is the timeout for deleting, default is 30 minutes.
Timeout time.Duration `json:"timeout" binding:"omitempty"`
Expand Down
10 changes: 5 additions & 5 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(url)
}

filteredQueryParams := parseFilteredQueryParams(meta.Filter)
filteredQueryParams := ParseFilteredQueryParams(meta.Filter)

var (
u string
Expand Down Expand Up @@ -81,8 +81,8 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(data...)
}

// parseFilteredQueryParams parses filtered query params.
func parseFilteredQueryParams(rawFilteredQueryParams string) []string {
// ParseFilteredQueryParams parses filtered query params.
func ParseFilteredQueryParams(rawFilteredQueryParams string) []string {
if pkgstrings.IsBlank(rawFilteredQueryParams) {
return nil
}
Expand All @@ -91,11 +91,11 @@ func parseFilteredQueryParams(rawFilteredQueryParams string) []string {
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, digest, tag, application string, filteredQueryParams []string) string {
func TaskIDV2(url, tag, application string, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}

return pkgdigest.SHA256FromStrings(url, digest, tag, application)
return pkgdigest.SHA256FromStrings(url, tag, application)
}
15 changes: 2 additions & 13 deletions pkg/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func TestTaskIDV2(t *testing.T) {
tests := []struct {
name string
url string
digest string
tag string
application string
filters []string
Expand All @@ -119,22 +118,12 @@ func TestTaskIDV2(t *testing.T) {
{
name: "generate taskID",
url: "https://example.com",
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
tag: "foo",
application: "bar",
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "c8659b8372599cf22c7a2de260dd6e148fca6d4e1c2940703022867f739d071d")
},
},
{
name: "generate taskID with digest",
url: "https://example.com",
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "60469c583429af631a45540f05e08805b31ca4f84e7974cad35cfc84c197bcf8")
assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d")
},
},
{
Expand Down Expand Up @@ -168,7 +157,7 @@ func TestTaskIDV2(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters))
})
}
}
5 changes: 1 addition & 4 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
return "", err
}

taskID := idgen.TaskIDV2(req.URL, req.Digest, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat %s request: %#v", req.URL, req)

Expand Down Expand Up @@ -279,7 +279,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -376,7 +375,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
// preheatV1 preheats job by v1 grpc protocol.
func (j *job) preheatV1(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
urlMeta := &commonv1.UrlMeta{
Digest: req.Digest,
Tag: req.Tag,
Filter: req.FilteredQueryParams,
Header: req.Headers,
Expand Down Expand Up @@ -424,7 +422,6 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/standard/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduling/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down

0 comments on commit 2e7bb47

Please sign in to comment.