Skip to content

Commit

Permalink
feat: optimize implement of the sync peers
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Nov 29, 2024
1 parent d0e41b5 commit 0a2873b
Show file tree
Hide file tree
Showing 19 changed files with 359 additions and 558 deletions.
13 changes: 13 additions & 0 deletions api/manager/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4202,28 +4202,34 @@ const docTemplate = `{
],
"properties": {
"args": {
"description": "Args is the arguments of the job.",
"type": "object",
"additionalProperties": {}
},
"bio": {
"description": "BIO is the description of the job.",
"type": "string"
},
"scheduler_cluster_ids": {
"description": "SchedulerClusterIDs is the scheduler cluster ids of the job.",
"type": "array",
"items": {
"type": "integer"
}
},
"seed_peer_cluster_ids": {
"description": "SeedPeerClusterIDs is the seed peer cluster ids of the job.",
"type": "array",
"items": {
"type": "integer"
}
},
"type": {
"description": "Type is the type of the job.",
"type": "string"
},
"user_id": {
"description": "UserID is the user id of the job.",
"type": "integer"
}
}
Expand Down Expand Up @@ -4682,6 +4688,11 @@ const docTemplate = `{
"type": "integer",
"maximum": 1000,
"minimum": 10
},
"job_rate_limit": {
"type": "integer",
"maximum": 1000000,
"minimum": 1
}
}
},
Expand Down Expand Up @@ -4881,9 +4892,11 @@ const docTemplate = `{
"type": "object",
"properties": {
"bio": {
"description": "BIO is the description of the job.",
"type": "string"
},
"user_id": {
"description": "UserID is the user id of the job.",
"type": "integer"
}
}
Expand Down
13 changes: 13 additions & 0 deletions api/manager/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4196,28 +4196,34 @@
],
"properties": {
"args": {
"description": "Args is the arguments of the job.",
"type": "object",
"additionalProperties": {}
},
"bio": {
"description": "BIO is the description of the job.",
"type": "string"
},
"scheduler_cluster_ids": {
"description": "SchedulerClusterIDs is the scheduler cluster ids of the job.",
"type": "array",
"items": {
"type": "integer"
}
},
"seed_peer_cluster_ids": {
"description": "SeedPeerClusterIDs is the seed peer cluster ids of the job.",
"type": "array",
"items": {
"type": "integer"
}
},
"type": {
"description": "Type is the type of the job.",
"type": "string"
},
"user_id": {
"description": "UserID is the user id of the job.",
"type": "integer"
}
}
Expand Down Expand Up @@ -4676,6 +4682,11 @@
"type": "integer",
"maximum": 1000,
"minimum": 10
},
"job_rate_limit": {
"type": "integer",
"maximum": 1000000,
"minimum": 1
}
}
},
Expand Down Expand Up @@ -4875,9 +4886,11 @@
"type": "object",
"properties": {
"bio": {
"description": "BIO is the description of the job.",
"type": "string"
},
"user_id": {
"description": "UserID is the user id of the job.",
"type": "integer"
}
}
Expand Down
12 changes: 12 additions & 0 deletions api/manager/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -461,20 +461,26 @@ definitions:
properties:
args:
additionalProperties: {}
description: Args is the arguments of the job.
type: object
bio:
description: BIO is the description of the job.
type: string
scheduler_cluster_ids:
description: SchedulerClusterIDs is the scheduler cluster ids of the job.
items:
type: integer
type: array
seed_peer_cluster_ids:
description: SeedPeerClusterIDs is the seed peer cluster ids of the job.
items:
type: integer
type: array
type:
description: Type is the type of the job.
type: string
user_id:
description: UserID is the user id of the job.
type: integer
required:
- type
Expand Down Expand Up @@ -792,6 +798,10 @@ definitions:
maximum: 1000
minimum: 10
type: integer
job_rate_limit:
maximum: 1000000
minimum: 1
type: integer
type: object
d7y_io_dragonfly_v2_manager_types.SchedulerClusterScopes:
properties:
Expand Down Expand Up @@ -925,8 +935,10 @@ definitions:
d7y_io_dragonfly_v2_manager_types.UpdateJobRequest:
properties:
bio:
description: BIO is the description of the job.
type: string
user_id:
description: UserID is the user id of the job.
type: integer
type: object
d7y_io_dragonfly_v2_manager_types.UpdateOauthRequest:
Expand Down
2 changes: 1 addition & 1 deletion client-rs
Submodule client-rs updated 45 files
+1 −1 .github/workflows/ci.yml
+417 −75 Cargo.lock
+19 −15 Cargo.toml
+1 −1 ci/Dockerfile
+1 −1 ci/Dockerfile.dfinit
+1 −1 dragonfly-client-backend/Cargo.toml
+274 −0 dragonfly-client-backend/src/hdfs.rs
+80 −43 dragonfly-client-backend/src/http.rs
+57 −19 dragonfly-client-backend/src/lib.rs
+81 −72 dragonfly-client-backend/src/object_storage.rs
+19 −2 dragonfly-client-config/build.rs
+17 −2 dragonfly-client-config/src/dfdaemon.rs
+6 −1 dragonfly-client-core/src/error/mod.rs
+1 −0 dragonfly-client-init/Cargo.toml
+71 −0 dragonfly-client-init/src/container_runtime/containerd.rs
+51 −0 dragonfly-client-init/src/container_runtime/crio.rs
+202 −4 dragonfly-client-init/src/container_runtime/docker.rs
+1 −0 dragonfly-client-storage/Cargo.toml
+95 −46 dragonfly-client-storage/src/content.rs
+65 −50 dragonfly-client-storage/src/lib.rs
+74 −128 dragonfly-client-storage/src/metadata.rs
+3 −0 dragonfly-client-storage/src/storage_engine/mod.rs
+28 −20 dragonfly-client-storage/src/storage_engine/rocksdb.rs
+1 −0 dragonfly-client-util/Cargo.toml
+39 −9 dragonfly-client-util/src/digest/mod.rs
+29 −105 dragonfly-client-util/src/http/mod.rs
+4 −0 dragonfly-client-util/src/id_generator/mod.rs
+5 −4 dragonfly-client/Cargo.toml
+21 −6 dragonfly-client/src/announcer/mod.rs
+28 −7 dragonfly-client/src/bin/dfdaemon/main.rs
+45 −21 dragonfly-client/src/bin/dfget/main.rs
+36 −30 dragonfly-client/src/grpc/dfdaemon_download.rs
+40 −35 dragonfly-client/src/grpc/dfdaemon_upload.rs
+1 −1 dragonfly-client/src/grpc/mod.rs
+11 −11 dragonfly-client/src/grpc/scheduler.rs
+275 −121 dragonfly-client/src/metrics/mod.rs
+121 −0 dragonfly-client/src/proxy/cache.rs
+13 −31 dragonfly-client/src/proxy/header.rs
+275 −188 dragonfly-client/src/proxy/mod.rs
+25 −32 dragonfly-client/src/resource/persistent_cache_task.rs
+78 −73 dragonfly-client/src/resource/piece.rs
+12 −15 dragonfly-client/src/resource/piece_collector.rs
+137 −125 dragonfly-client/src/resource/task.rs
+3 −2 dragonfly-client/src/tracing/mod.rs
+1 −1 rust-toolchain.toml
6 changes: 5 additions & 1 deletion manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ type SyncPeersConfig struct {
// Timeout is the timeout for syncing peers information from the single scheduler.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`

// BatchSize is the batch size when operating gorm.
// BatchSize is the batch size when operating gorm database.
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`
}

Expand Down Expand Up @@ -641,6 +641,10 @@ func (cfg *Config) Validate() error {
return errors.New("syncPeers requires parameter timeout")
}

if cfg.Job.SyncPeers.BatchSize == 0 {
return errors.New("syncPeers requires parameter batchSize")
}

if cfg.ObjectStorage.Enable {
if cfg.ObjectStorage.Name == "" {
return errors.New("objectStorage requires parameter name")
Expand Down
20 changes: 18 additions & 2 deletions manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ func TestConfig_Load(t *testing.T) {
},
},
SyncPeers: SyncPeersConfig{
Interval: 13 * time.Hour,
Timeout: 2 * time.Minute,
Interval: 13 * time.Hour,
Timeout: 2 * time.Minute,
BatchSize: 50,
},
},
ObjectStorage: ObjectStorageConfig{
Expand Down Expand Up @@ -809,6 +810,21 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "syncPeers requires parameter timeout")
},
},
{
name: "syncPeers requires parameter batchSize",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.SyncPeers.BatchSize = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "syncPeers requires parameter batchSize")
},
},
{
name: "objectStorage requires parameter name",
config: New(),
Expand Down
3 changes: 2 additions & 1 deletion manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ const (
// DefaultClusterJobRateLimit is default rate limit(requests per second) for job Open API by cluster.
DefaultClusterJobRateLimit = 10

// DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler.
// DefaultJobSyncPeersBatchSize is the default batch size for syncing all peers information from the scheduler and
// operating on the database.
DefaultJobSyncPeersBatchSize = 500
)

Expand Down
1 change: 1 addition & 0 deletions manager/config/testdata/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ job:
syncPeers:
interval: 13h
timeout: 2m
batchSize: 50

objectStorage:
enable: true
Expand Down
8 changes: 5 additions & 3 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

job, err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json)
if err != nil {
// CreateSyncPeersJob is a sync operation, so don't need to return the job id,
// and not record the job information in the database. If return success, need to
// query the peers table to get the latest data.
if err := h.service.CreateSyncPeersJob(ctx.Request.Context(), json); err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
case job.GetTaskJob:
var json types.CreateGetTaskJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// DefaultTaskPollingInterval is the default interval for polling task.
const DefaultTaskPollingInterval = 5 * time.Second
const DefaultTaskPollingInterval = 10 * time.Second

// tracer is a global tracer for job.
var tracer = otel.Tracer("manager")
Expand Down
14 changes: 7 additions & 7 deletions manager/job/mocks/sync_peers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0a2873b

Please sign in to comment.