Skip to content

Commit

Permalink
feat: update new task type(TaskType_STANDARD, TaskType_PERSISTENT, Ta…
Browse files Browse the repository at this point in the history
…skType_PERSISTENT_CACHE) (#3540)

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Sep 27, 2024
1 parent ea850f7 commit 9cd6f41
Show file tree
Hide file tree
Showing 24 changed files with 345 additions and 345 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.159
d7y.io/api/v2 v2.0.162
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.159 h1:xSLq0GjqV0F8TgfZ13EDJa+eqaWcqhrEepybAoT9OnI=
d7y.io/api/v2 v2.0.159/go.mod h1:VOnTWgLrGtivgyyofZCfiSDTAKDJ9ohVqM6l3S8EPCE=
d7y.io/api/v2 v2.0.162 h1:u96AbaVT1wGwYMONjm+3B6AdFtrz0m2y0b8eDaBOwW8=
d7y.io/api/v2 v2.0.162/go.mod h1:VOnTWgLrGtivgyyofZCfiSDTAKDJ9ohVqM6l3S8EPCE=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
30 changes: 15 additions & 15 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ type V2 interface {
// DeleteTask deletes task from p2p network.
DeleteTask(context.Context, *dfdaemonv2.DeleteTaskRequest, ...grpc.CallOption) error

// DownloadCacheTask downloads cache task from p2p network.
DownloadCacheTask(context.Context, *dfdaemonv2.DownloadCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadCacheTaskClient, error)
// DownloadPersistentCacheTask downloads persistent cache task from p2p network.
DownloadPersistentCacheTask(context.Context, *dfdaemonv2.DownloadPersistentCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadPersistentCacheTaskClient, error)

// StatCacheTask stats cache task information.
StatCacheTask(context.Context, *dfdaemonv2.StatCacheTaskRequest, ...grpc.CallOption) (*commonv2.CacheTask, error)
// StatPersistentCacheTask stats persistent cache task information.
StatPersistentCacheTask(context.Context, *dfdaemonv2.StatPersistentCacheTaskRequest, ...grpc.CallOption) (*commonv2.PersistentCacheTask, error)

// DeleteCacheTask deletes cache task from p2p network.
DeleteCacheTask(context.Context, *dfdaemonv2.DeleteCacheTaskRequest, ...grpc.CallOption) error
// DeletePersistentCacheTask deletes persistent cache task from p2p network.
DeletePersistentCacheTask(context.Context, *dfdaemonv2.DeletePersistentCacheTaskRequest, ...grpc.CallOption) error

// Close tears down the ClientConn and all underlying connections.
Close() error
Expand Down Expand Up @@ -207,24 +207,24 @@ func (v *v2) DeleteTask(ctx context.Context, req *dfdaemonv2.DeleteTaskRequest,
return err
}

// DownloadCacheTask downloads cache task from p2p network.
func (v *v2) DownloadCacheTask(ctx context.Context, req *dfdaemonv2.DownloadCacheTaskRequest, opts ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadCacheTaskClient, error) {
return v.DfdaemonUploadClient.DownloadCacheTask(ctx, req, opts...)
// DownloadPersistentCacheTask downloads persistent cache task from p2p network.
func (v *v2) DownloadPersistentCacheTask(ctx context.Context, req *dfdaemonv2.DownloadPersistentCacheTaskRequest, opts ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadPersistentCacheTaskClient, error) {
return v.DfdaemonUploadClient.DownloadPersistentCacheTask(ctx, req, opts...)
}

// StatCacheTask stats cache task information.
func (v *v2) StatCacheTask(ctx context.Context, req *dfdaemonv2.StatCacheTaskRequest, opts ...grpc.CallOption) (*commonv2.CacheTask, error) {
// StatPersistentCacheTask stats persistent cache task information.
func (v *v2) StatPersistentCacheTask(ctx context.Context, req *dfdaemonv2.StatPersistentCacheTaskRequest, opts ...grpc.CallOption) (*commonv2.PersistentCacheTask, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

return v.DfdaemonUploadClient.StatCacheTask(ctx, req, opts...)
return v.DfdaemonUploadClient.StatPersistentCacheTask(ctx, req, opts...)
}

// DeleteCacheTask deletes cache task from p2p network.
func (v *v2) DeleteCacheTask(ctx context.Context, req *dfdaemonv2.DeleteCacheTaskRequest, opts ...grpc.CallOption) error {
// DeletePersistentCacheTask deletes persistent cache task from p2p network.
func (v *v2) DeletePersistentCacheTask(ctx context.Context, req *dfdaemonv2.DeletePersistentCacheTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonUploadClient.DeleteCacheTask(ctx, req, opts...)
_, err := v.DfdaemonUploadClient.DeletePersistentCacheTask(ctx, req, opts...)
return err
}
40 changes: 20 additions & 20 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,25 @@ func ParseHostType(name string) HostType {
func TaskTypeV1ToV2(typ commonv1.TaskType) commonv2.TaskType {
switch typ {
case commonv1.TaskType_Normal:
return commonv2.TaskType_DFDAEMON
case commonv1.TaskType_DfCache:
return commonv2.TaskType_DFCACHE
return commonv2.TaskType_STANDARD
case commonv1.TaskType_DfStore:
return commonv2.TaskType_DFSTORE
return commonv2.TaskType_PERSISTENT
case commonv1.TaskType_DfCache:
return commonv2.TaskType_PERSISTENT_CACHE
}

return commonv2.TaskType_DFDAEMON
return commonv2.TaskType_STANDARD
}

// TaskTypeV2ToV1 converts task type from v2 to v1.
func TaskTypeV2ToV1(typ commonv2.TaskType) commonv1.TaskType {
switch typ {
case commonv2.TaskType_DFDAEMON:
case commonv2.TaskType_STANDARD:
return commonv1.TaskType_Normal
case commonv2.TaskType_DFCACHE:
return commonv1.TaskType_DfCache
case commonv2.TaskType_DFSTORE:
case commonv2.TaskType_PERSISTENT:
return commonv1.TaskType_DfStore
case commonv2.TaskType_PERSISTENT_CACHE:
return commonv1.TaskType_DfCache
}

return commonv1.TaskType_Normal
Expand Down
4 changes: 2 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_DFDAEMON,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
Expand Down Expand Up @@ -425,7 +425,7 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_DFDAEMON,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
Expand Down
62 changes: 31 additions & 31 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,115 +289,115 @@ var (
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
})

AnnounceCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
AnnouncePersistentCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_cache_peer_total",
Name: "announce_persistent_cache_peer_total",
Help: "Counter of the number of the announcing cache peer.",
})

AnnounceCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
AnnouncePersistentCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_cache_peer_failure_total",
Name: "announce_persistent_cache_peer_failure_total",
Help: "Counter of the number of failed of the announcing cache peer.",
})

StatCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
StatPersistentCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_peer_total",
Name: "stat_persistent_cache_peer_total",
Help: "Counter of the number of the stat cache peer.",
})

StatCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
StatPersistentCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_peer_failure_total",
Name: "stat_persistent_cache_peer_failure_total",
Help: "Counter of the number of failed of the stat cache peer.",
})

DeleteCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
DeletePersistentCachePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_peer_total",
Name: "delete_persistent_cache_peer_total",
Help: "Counter of the number of the deleting cache peer.",
})

DeleteCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
DeletePersistentCachePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_peer_failure_total",
Name: "delete_persistent_cache_peer_failure_total",
Help: "Counter of the number of failed of the deleting cache peer.",
})

UploadCacheTaskStartedCount = promauto.NewCounter(prometheus.CounterOpts{
UploadPersistentCacheTaskStartedCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_started_total",
Name: "upload_persistent_cache_task_started_total",
Help: "Counter of the number of the started uploading cache peer.",
})

UploadCacheTaskStartedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
UploadPersistentCacheTaskStartedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_started_failure_total",
Name: "upload_persistent_cache_task_started_failure_total",
Help: "Counter of the number of failed of the started uploading cache peer.",
})

UploadCacheTaskFinishedCount = promauto.NewCounter(prometheus.CounterOpts{
UploadPersistentCacheTaskFinishedCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_finished_total",
Name: "upload_persistent_cache_task_finished_total",
Help: "Counter of the number of the finished uploading cache peer.",
})

UploadCacheTaskFinishedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
UploadPersistentCacheTaskFinishedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_finished_failure_total",
Name: "upload_persistent_cache_task_finished_failure_total",
Help: "Counter of the number of failed of the finished uploading cache peer.",
})

UploadCacheTaskFailedCount = promauto.NewCounter(prometheus.CounterOpts{
UploadPersistentCacheTaskFailedCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_failed_total",
Name: "upload_persistent_cache_task_failed_total",
Help: "Counter of the number of the failed uploading cache peer.",
})

UploadCacheTaskFailedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
UploadPersistentCacheTaskFailedFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "upload_cache_peer_failed_failure_total",
Help: "Counter of the number of failed of the failed uploading cache peer.",
})

StatCacheTaskCount = promauto.NewCounter(prometheus.CounterOpts{
StatPersistentCacheTaskCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_task_total",
Name: "stat_persistent_cache_task_total",
Help: "Counter of the number of the stat cache task.",
})

StatCacheTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
StatPersistentCacheTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_cache_task_failure_total",
Name: "stat_persistent_cache_task_failure_total",
Help: "Counter of the number of failed of the stat cache task.",
})

DeleteCacheTaskCount = promauto.NewCounter(prometheus.CounterOpts{
DeletePersistentCacheTaskCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_task_total",
Name: "delete_persistent_cache_task_total",
Help: "Counter of the number of the delete cache task.",
})

DeleteCacheTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
DeletePersistentCacheTaskFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "delete_cache_task_failure_total",
Name: "delete_persistent_cache_task_failure_total",
Help: "Counter of the number of failed of the delete cache task.",
})

Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/host_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func TestHostManager_RunGC(t *testing.T) {
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
mockPeer := NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
Expand Down
Loading

0 comments on commit 9cd6f41

Please sign in to comment.