Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement delete peer and task in persistent cache #3623

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.23.0

require (
d7y.io/api/v2 v2.0.168
d7y.io/api/v2 v2.0.169
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.168 h1:4ewK+RN9b608Cng2eIdt9G98thcLAkVh0EfHoi5mDa8=
d7y.io/api/v2 v2.0.168/go.mod h1:yT5MhUI0My91HWiq8ThPzQu8FNydTpRUix5LgpDE8bw=
d7y.io/api/v2 v2.0.169 h1:CKxPnhXJ0FNOtyATZ5pw5yolRhV6mhlFnEOvgBs9cRA=
d7y.io/api/v2 v2.0.169/go.mod h1:s3ovYyCQQ9RHUC+RMpAZYI075vkaz/PcLpoyTZqvvOg=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
8 changes: 8 additions & 0 deletions pkg/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// PersistentCachePeersNamespace prefix of persistent cache peers namespace cache key.
PersistentCachePeersNamespace = "persistent-cache-peers"

// PersistentPeersNamespace prefix of persistent peers namespace cache key.
PersistentPeersNamespace = "persistent-peers"

// PersistentCacheHostsNamespace prefix of persistent cache hosts namespace cache key.
PersistentCacheHostsNamespace = "persistent-cache-hosts"

Expand Down Expand Up @@ -146,6 +149,11 @@ func MakePersistentCachePeersOfPersistentCacheTaskInScheduler(schedulerClusterID
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID, PersistentCachePeersNamespace))
}

// MakePersistentPeersOfPersistentCacheTaskInScheduler make persistent peers of persistent cache task in scheduler.
func MakePersistentPeersOfPersistentCacheTaskInScheduler(schedulerClusterID uint, taskID string) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID, PersistentPeersNamespace))
}

// MakePersistentCachePeerKeyInScheduler make persistent cache peer key in scheduler.
func MakePersistentCachePeerKeyInScheduler(schedulerClusterID uint, peerID string) string {
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID))
Expand Down
25 changes: 25 additions & 0 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

// Store the joint-set with task for persistent peer and set expiration.
if peer.Persistent {
if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID).Result(); err != nil {
peer.Log.Errorf("add persistent peer id to task joint-set failed: %v", err)
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
}

// Store the joint-set with host.
if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.ID).Result(); err != nil {
peer.Log.Errorf("add peer id to host joint-set failed: %v", err)
Expand Down Expand Up @@ -239,6 +252,18 @@ func (p *peerManager) Delete(ctx context.Context, peerID string) error {
return err
}

persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return err
}

if persistent {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
log.Errorf("delete persistent peer id from task joint-set failed: %v", err)
}
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID).Result(); err != nil {
log.Errorf("delete peer id from host joint-set failed: %v", err)
return err
Expand Down
7 changes: 1 addition & 6 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ type Task struct {
// when the task is deleted by the user.
PersistentReplicaCount uint64

// Replica count of the cache task. If cache task is not persistent,
// the persistent cache task will be deleted when dfdaemon runs garbage collection.
ReplicaCount uint64

// Digest of the persistent cache task content, for example md5:xxx or sha256:yyy.
Digest *digest.Digest

Expand Down Expand Up @@ -100,13 +96,12 @@ type Task struct {
}

// New persistent cache task instance.
func NewTask(id, tag, application, state string, persistentReplicaCount uint64, replicaCount uint64, pieceLength int32,
func NewTask(id, tag, application, state string, persistentReplicaCount uint64, pieceLength int32,
contentLength int64, totalPieceCount int32, digest *digest.Digest, ttl time.Duration, createdAt, updatedAt time.Time,
log *logger.SugaredLoggerOnWith) *Task {
t := &Task{
ID: id,
PersistentReplicaCount: persistentReplicaCount,
ReplicaCount: replicaCount,
Digest: digest,
Tag: tag,
Application: application,
Expand Down
24 changes: 16 additions & 8 deletions scheduler/resource/persistentcache/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type TaskManager interface {
// Load returns persistent cache task by a key.
Load(context.Context, string) (*Task, bool)

// LoadCorrentReplicaCount returns current replica count of the persistent cache task.
LoadCorrentReplicaCount(context.Context, string) (int64, error)

// LoadCurrentPersistentReplicaCount returns current persistent replica count of the persistent cache task.
LoadCurrentPersistentReplicaCount(context.Context, string) (int64, error)

// Store sets persistent cache task.
Store(context.Context, *Task) error

Expand Down Expand Up @@ -76,12 +82,6 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) {
return nil, false
}

replicaCount, err := strconv.ParseUint(rawTask["replica_count"], 10, 64)
if err != nil {
log.Errorf("parsing replica count failed: %v", err)
return nil, false
}

pieceLength, err := strconv.ParseInt(rawTask["piece_length"], 10, 32)
if err != nil {
log.Errorf("parsing piece length failed: %v", err)
Expand Down Expand Up @@ -132,7 +132,6 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) {
rawTask["application"],
rawTask["state"],
persistentReplicaCount,
replicaCount,
int32(pieceLength),
contentLength,
int32(totalPieceCount),
Expand All @@ -144,14 +143,23 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) {
), true
}

// LoadCorrentReplicaCount returns current replica count of the persistent cache task.
func (t *taskManager) LoadCorrentReplicaCount(ctx context.Context, taskID string) (int64, error) {
return t.rdb.SCard(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result()
}

// LoadCurrentPersistentReplicaCount returns current persistent replica count of the persistent cache task.
func (t *taskManager) LoadCurrentPersistentReplicaCount(ctx context.Context, taskID string) (int64, error) {
return t.rdb.SCard(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result()
}

// Store sets persistent cache task.
func (t *taskManager) Store(ctx context.Context, task *Task) error {
if _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.HSet(ctx,
pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID),
"id", task.ID,
"persistent_replica_count", task.PersistentReplicaCount,
"replica_count", task.ReplicaCount,
"digest", task.Digest.String(),
"tag", task.Tag,
"application", task.Application,
Expand Down
142 changes: 99 additions & 43 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,11 +1402,25 @@ func (v *V2) AnnouncePersistentCachePeer(stream schedulerv2.Scheduler_AnnouncePe

// StatPersistentCachePeer checks information of persistent cache peer.
func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatPersistentCachePeerRequest) (*commonv2.PersistentCachePeer, error) {
log := logger.WithPeer(req.HostId, req.TaskId, req.PeerId)
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
log.Errorf("persistent cache peer %s not found", req.GetPeerId())
return nil, status.Errorf(codes.NotFound, "persistent cache peer %s not found", req.GetPeerId())
}

currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current persistent replica count failed %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
}

currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current replica count failed %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
}

return &commonv2.PersistentCachePeer{
Id: peer.ID,
Persistent: peer.Persistent,
Expand All @@ -1415,18 +1429,19 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP
CreatedAt: timestamppb.New(peer.CreatedAt),
UpdatedAt: timestamppb.New(peer.UpdatedAt),
Task: &commonv2.PersistentCacheTask{
Id: peer.Task.ID,
PersistentReplicaCount: peer.Task.PersistentReplicaCount,
ReplicaCount: peer.Task.ReplicaCount,
Digest: peer.Task.Digest.String(),
Tag: &peer.Task.Tag,
Application: &peer.Task.Application,
PieceLength: uint64(peer.Task.PieceLength),
ContentLength: uint64(peer.Task.ContentLength),
PieceCount: uint32(peer.Task.TotalPieceCount),
State: peer.Task.FSM.Current(),
CreatedAt: timestamppb.New(peer.Task.CreatedAt),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt),
Id: peer.Task.ID,
PersistentReplicaCount: peer.Task.PersistentReplicaCount,
CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount),
CurrentReplicaCount: uint64(currentReplicaCount),
Digest: peer.Task.Digest.String(),
Tag: &peer.Task.Tag,
Application: &peer.Task.Application,
PieceLength: uint64(peer.Task.PieceLength),
ContentLength: uint64(peer.Task.ContentLength),
PieceCount: uint32(peer.Task.TotalPieceCount),
State: peer.Task.FSM.Current(),
CreatedAt: timestamppb.New(peer.Task.CreatedAt),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt),
},
Host: &commonv2.Host{
Id: peer.Host.ID,
Expand Down Expand Up @@ -1499,9 +1514,16 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP
}, nil
}

// TODO Implement the following methods.
// DeletePersistentCachePeer releases persistent cache peer in scheduler.
func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.DeletePersistentCachePeerRequest) error {
log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil {
log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err)
return status.Errorf(codes.Internal, err.Error())
}

// TODO(gaius) Implement copy replica to the other peers.
// Select the remote peer to copy the replica and trigger the download task with asynchronous.
return nil
}

Expand All @@ -1528,7 +1550,7 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule
}

task = persistentcache.NewTask(req.GetTaskId(), req.GetTag(), req.GetApplication(), persistentcache.TaskStatePending, req.GetPersistentReplicaCount(),
0, int32(req.GetPieceLength()), int64(req.GetContentLength()), int32(req.GetPieceCount()), digest, req.GetTtl().AsDuration(), time.Now(), time.Now(), log)
int32(req.GetPieceLength()), int64(req.GetContentLength()), int32(req.GetPieceCount()), digest, req.GetTtl().AsDuration(), time.Now(), time.Now(), log)

if err := task.FSM.Event(ctx, persistentcache.TaskEventUpload); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
Expand Down Expand Up @@ -1586,7 +1608,6 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul
}

// Handle task with peer finished request, load task and update it.
peer.Task.ReplicaCount++
if err := peer.Task.FSM.Event(ctx, persistentcache.TaskEventSucceeded); err != nil {
log.Errorf("task fsm event failed: %s", err.Error())
return nil, status.Errorf(codes.Internal, err.Error())
Expand All @@ -1598,24 +1619,34 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul
return nil, status.Errorf(codes.Internal, err.Error())
}

// TODO(gaius) Implement copy multiple replicas to the other peers.
// Select the remote peer to copy the replica and trigger the download task with asynchronous.
if peer.Task.ReplicaCount < peer.Task.PersistentReplicaCount {
currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current persistent replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
}

currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, peer.Task.ID)
if err != nil {
log.Errorf("load current replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
}

// TODO(gaius) Implement copy multiple replicas to the other peers.
// Select the remote peer to copy the replica and trigger the download task with asynchronous.
return &commonv2.PersistentCacheTask{
Id: peer.Task.ID,
PersistentReplicaCount: peer.Task.PersistentReplicaCount,
ReplicaCount: peer.Task.ReplicaCount,
Digest: peer.Task.Digest.String(),
Tag: &peer.Task.Tag,
Application: &peer.Task.Application,
PieceLength: uint64(peer.Task.PieceLength),
ContentLength: uint64(peer.Task.ContentLength),
PieceCount: uint32(peer.Task.TotalPieceCount),
State: peer.Task.FSM.Current(),
CreatedAt: timestamppb.New(peer.Task.CreatedAt),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt),
Id: peer.Task.ID,
PersistentReplicaCount: peer.Task.PersistentReplicaCount,
CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount),
CurrentReplicaCount: uint64(currentReplicaCount),
Digest: peer.Task.Digest.String(),
Tag: &peer.Task.Tag,
Application: &peer.Task.Application,
PieceLength: uint64(peer.Task.PieceLength),
ContentLength: uint64(peer.Task.ContentLength),
PieceCount: uint32(peer.Task.TotalPieceCount),
State: peer.Task.FSM.Current(),
CreatedAt: timestamppb.New(peer.Task.CreatedAt),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt),
}, nil
}

Expand Down Expand Up @@ -1657,28 +1688,53 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler

// StatPersistentCacheTask checks information of persistent cache task.
func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatPersistentCacheTaskRequest) (*commonv2.PersistentCacheTask, error) {
log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if !loaded {
log.Errorf("persistent cache task %s not found", req.GetTaskId())
return nil, status.Errorf(codes.NotFound, "persistent cache task %s not found", req.GetTaskId())
}

currentPersistentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, task.ID)
if err != nil {
log.Errorf("load current persistent replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
}

currentReplicaCount, err := v.persistentCacheResource.TaskManager().LoadCorrentReplicaCount(ctx, task.ID)
if err != nil {
log.Errorf("load current replica count failed %s", err)
return nil, status.Errorf(codes.Internal, err.Error())
}

return &commonv2.PersistentCacheTask{
Id: task.ID,
PersistentReplicaCount: task.PersistentReplicaCount,
ReplicaCount: task.ReplicaCount,
Digest: task.Digest.String(),
Tag: &task.Tag,
Application: &task.Application,
PieceLength: uint64(task.PieceLength),
ContentLength: uint64(task.ContentLength),
PieceCount: uint32(task.TotalPieceCount),
State: task.FSM.Current(),
CreatedAt: timestamppb.New(task.CreatedAt),
UpdatedAt: timestamppb.New(task.UpdatedAt),
Id: task.ID,
PersistentReplicaCount: task.PersistentReplicaCount,
CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount),
CurrentReplicaCount: uint64(currentReplicaCount),
Digest: task.Digest.String(),
Tag: &task.Tag,
Application: &task.Application,
PieceLength: uint64(task.PieceLength),
ContentLength: uint64(task.ContentLength),
PieceCount: uint32(task.TotalPieceCount),
State: task.FSM.Current(),
CreatedAt: timestamppb.New(task.CreatedAt),
UpdatedAt: timestamppb.New(task.UpdatedAt),
}, nil
}

// DeletePersistentCacheTask releases persistent cache task in scheduler.
func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.DeletePersistentCacheTaskRequest) error {
return v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId())
log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
if err := v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache peers by task %s error %s", req.GetTaskId(), err)
}

if err := v.persistentCacheResource.TaskManager().Delete(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache task %s error %s", req.GetTaskId(), err)
return status.Errorf(codes.Internal, err.Error())
}

return nil
}
Loading