Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support searching task by url for GetTask and DeleteTask #3607

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ jobs:
chart-name: scheduler
- module: client
image: client
image-tag: v0.1.113
image-tag: v0.1.115
chart-name: client
- module: seed-client
image: client
image-tag: v0.1.113
image-tag: v0.1.115
chart-name: seed-client

steps:
Expand Down
1 change: 0 additions & 1 deletion internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

if json.Args.TaskID == "" && json.Args.URL == "" {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"})
return
}

job, err := h.service.CreateGetTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
Expand All @@ -84,6 +89,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

if json.Args.TaskID == "" && json.Args.URL == "" {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"})
return
}

job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
Expand Down
14 changes: 12 additions & 2 deletions manager/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ var (
{
"type": "get_task",
"user_id": 4,
"bio": "bio"
"bio": "bio",
"args": {
"task_id": "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8"
}
}`
mockDeleteTaskJobReqBody = `
{
"type": "delete_task",
"user_id": 4,
"bio": "bio"
"bio": "bio",
"args": {
"task_id": "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49"
}
}`
mockOtherJobReqBody = `
{
Expand All @@ -66,11 +72,15 @@ var (
UserID: 4,
Type: "get_task",
BIO: "bio",
Args: types.GetTaskArgs{TaskID: "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8"},
}
mockCreateDeleteTaskJobRequest = types.CreateDeleteTaskJobRequest{
UserID: 4,
Type: "delete_task",
BIO: "bio",
Args: types.DeleteTaskArgs{
TaskID: "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49",
},
}
mockUpdateJobRequest = types.UpdateJobRequest{
UserID: 4,
Expand Down
20 changes: 18 additions & 2 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/idgen"
)

// Task is an interface for manager tasks.
Expand Down Expand Up @@ -60,7 +61,14 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,
span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{
TaskID: taskID,
})
if err != nil {
logger.Errorf("get tasks marshal request: %v, error: %v", args, err)
return nil, err
Expand Down Expand Up @@ -111,7 +119,15 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul
span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{
TaskID: taskID,
Timeout: json.Timeout,
})
if err != nil {
logger.Errorf("delete task marshal request: %v, error: %v", args, err)
return nil, err
Expand Down
29 changes: 27 additions & 2 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,20 @@ type CreateGetTaskJobRequest struct {
}

type GetTaskArgs struct {
TaskID string `json:"task_id" binding:"required"`
// TaskID is the task id for getting.
TaskID string `json:"task_id" binding:"omitempty"`

// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

// Application is the application of the task.
Application string `json:"application" binding:"omitempty"`

// FilteredQueryParams is the filtered query params of the task.
FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"`
}

type CreateDeleteTaskJobRequest struct {
Expand All @@ -125,7 +138,19 @@ type CreateDeleteTaskJobRequest struct {

type DeleteTaskArgs struct {
// TaskID is the task id for deleting.
TaskID string `json:"task_id" binding:"required"`
TaskID string `json:"task_id" binding:"omitempty"`

// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

// Application is the application of the task.
Application string `json:"application" binding:"omitempty"`

// FilteredQueryParams is the filtered query params of the task.
FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"`

// Timeout is the timeout for deleting, default is 30 minutes.
Timeout time.Duration `json:"timeout" binding:"omitempty"`
Expand Down
10 changes: 5 additions & 5 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(url)
}

filteredQueryParams := parseFilteredQueryParams(meta.Filter)
filteredQueryParams := ParseFilteredQueryParams(meta.Filter)

var (
u string
Expand Down Expand Up @@ -81,8 +81,8 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(data...)
}

// parseFilteredQueryParams parses filtered query params.
func parseFilteredQueryParams(rawFilteredQueryParams string) []string {
// ParseFilteredQueryParams parses filtered query params.
func ParseFilteredQueryParams(rawFilteredQueryParams string) []string {
if pkgstrings.IsBlank(rawFilteredQueryParams) {
return nil
}
Expand All @@ -91,11 +91,11 @@ func parseFilteredQueryParams(rawFilteredQueryParams string) []string {
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, digest, tag, application string, filteredQueryParams []string) string {
func TaskIDV2(url, tag, application string, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}

return pkgdigest.SHA256FromStrings(url, digest, tag, application)
return pkgdigest.SHA256FromStrings(url, tag, application)
}
15 changes: 2 additions & 13 deletions pkg/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func TestTaskIDV2(t *testing.T) {
tests := []struct {
name string
url string
digest string
tag string
application string
filters []string
Expand All @@ -119,22 +118,12 @@ func TestTaskIDV2(t *testing.T) {
{
name: "generate taskID",
url: "https://example.com",
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
tag: "foo",
application: "bar",
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "c8659b8372599cf22c7a2de260dd6e148fca6d4e1c2940703022867f739d071d")
},
},
{
name: "generate taskID with digest",
url: "https://example.com",
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "60469c583429af631a45540f05e08805b31ca4f84e7974cad35cfc84c197bcf8")
assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d")
},
},
{
Expand Down Expand Up @@ -168,7 +157,7 @@ func TestTaskIDV2(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters))
})
}
}
5 changes: 1 addition & 4 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
return "", err
}

taskID := idgen.TaskIDV2(req.URL, req.Digest, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat %s request: %#v", req.URL, req)

Expand Down Expand Up @@ -279,7 +279,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -376,7 +375,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
// preheatV1 preheats job by v1 grpc protocol.
func (j *job) preheatV1(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
urlMeta := &commonv1.UrlMeta{
Digest: req.Digest,
Tag: req.Tag,
Filter: req.FilteredQueryParams,
Header: req.Headers,
Expand Down Expand Up @@ -424,7 +422,6 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/standard/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduling/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/v2/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "de7d72a4f865bb1b1d3a9b7288bfd369a500277f5565736b2ba67aa205958df7",
ID: "14b31801ea6990788057b965fbc51e44bf73800462915fdfa0fda8182acca4d6",
Sha256: "fc44bbbba20490450c73530db3d1b935f893f38d7d8084ca132952a765ff5ff6",
}

Expand Down Expand Up @@ -67,7 +67,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "510f018dc34c7e6ced07db2e88654a4e565e7982d5c73994e48e901f633c8113",
ID: "958e177b56be708c9d7ec193ae8cef399b39faff8234af33efa4cbe097d1fc5f",
Sha256: "dc102987a36be20846821ac74648534863ff0fe8897d4250273a6ffc80481d91",
}

Expand Down Expand Up @@ -97,7 +97,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "381ee3f1dd0b55d151997e107e5517e4ac315677a4ed67c3cd814fe7b86481d1",
ID: "dd573cf9c3e1a79402b8423abcd1ba987c1b1ee9c49069d139d71106a260b055",
Sha256: "54e54b7ff54ef70d4db2adcd24a27e3b9af3cd99fc0213983bac1e8035429be6",
}

Expand Down Expand Up @@ -127,7 +127,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "0068ce9e9beaca3ec33911d537be56de2d12e1b201bf3230aefe803919c373a5",
ID: "f1957adc26ec326800ced850d72e583a03be0999ba80d9aa2e3ba57ef4ddaf17",
Sha256: "87c09b7c338f258809ca2d436bbe06ac94a3166b3f3e1125a86f35d9a9aa1d2f",
}

Expand Down
Loading
Loading