Skip to content

Commit

Permalink
feat: add garbage collection for persistent cache host (#3642)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Nov 11, 2024
1 parent 8711108 commit 555a132
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 18 deletions.
69 changes: 56 additions & 13 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@ import (
"strconv"
"time"

"github.com/redis/go-redis/v9"
redis "github.com/redis/go-redis/v9"

logger "d7y.io/dragonfly/v2/internal/dflog"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
)

const (
// GC persistent cache host id.
GCHostID = "persistent-cache-host"
)

// HostManager is the interface used for host manager.
type HostManager interface {
// Load returns host by a key.
Expand All @@ -44,6 +50,9 @@ type HostManager interface {

// LoadAll returns all hosts.
LoadAll(context.Context) ([]*Host, error)

// RunGC runs garbage collection.
RunGC() error
}

// hostManager contains content for host manager.
Expand All @@ -56,14 +65,25 @@ type hostManager struct {
}

// New host manager interface.
func newHostManager(cfg *config.Config, rdb redis.UniversalClient) HostManager {
return &hostManager{config: cfg, rdb: rdb}
func newHostManager(cfg *config.Config, gc pkggc.GC, rdb redis.UniversalClient) (HostManager, error) {
h := &hostManager{config: cfg, rdb: rdb}

if err := gc.Add(pkggc.Task{
ID: GCHostID,
Interval: cfg.Scheduler.GC.HostGCInterval,
Timeout: cfg.Scheduler.GC.HostGCInterval,
Runner: h,
}); err != nil {
return nil, err
}

return h, nil
}

// Load returns host by a key.
func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
log := logger.WithHostID(hostID)
rawHost, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result()
rawHost, err := h.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result()
if err != nil {
log.Errorf("getting host failed from redis: %v", err)
return nil, false
Expand Down Expand Up @@ -427,9 +447,9 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
}

// Store sets host.
func (t *hostManager) Store(ctx context.Context, host *Host) error {
_, err := t.rdb.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, host.ID),
func (h *hostManager) Store(ctx context.Context, host *Host) error {
_, err := h.rdb.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
Expand Down Expand Up @@ -494,13 +514,13 @@ func (t *hostManager) Store(ctx context.Context, host *Host) error {
}

// Delete deletes host by a key.
func (t *hostManager) Delete(ctx context.Context, hostID string) error {
_, err := t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result()
func (h *hostManager) Delete(ctx context.Context, hostID string) error {
_, err := h.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result()
return err
}

// LoadAll returns all hosts.
func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
var (
hosts []*Host
cursor uint64
Expand All @@ -512,14 +532,14 @@ func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
err error
)

hostKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(t.config.Manager.SchedulerClusterID), 10).Result()
hostKeys, cursor, err = h.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), 10).Result()
if err != nil {
logger.Error("scan hosts failed")
return nil, err
}

for _, hostKey := range hostKeys {
host, loaded := t.Load(ctx, hostKey)
host, loaded := h.Load(ctx, hostKey)
if !loaded {
logger.WithHostID(hostKey).Error("load host failed")
continue
Expand All @@ -535,3 +555,26 @@ func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {

return hosts, nil
}

// RunGC runs garbage collection.
func (h *hostManager) RunGC() error {
hosts, err := h.LoadAll(context.Background())
if err != nil {
logger.Error("load all hosts failed")
return err
}

for _, host := range hosts {
// If the host's elapsed exceeds twice the announcing interval,
// then leave peers in host.
elapsed := time.Since(host.UpdatedAt)
if host.AnnounceInterval > 0 && elapsed > host.AnnounceInterval*2 {
host.Log.Info("host has been reclaimed")
if err := h.Delete(context.Background(), host.ID); err != nil {
host.Log.Errorf("delete host failed: %v", err)
}
}
}

return nil
}
14 changes: 14 additions & 0 deletions scheduler/resource/persistentcache/host_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 60 additions & 1 deletion scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type PeerManager interface {

// DeleteAllByTaskID deletes all peers by task id.
DeleteAllByTaskID(context.Context, string) error

// LoadAllByHostID returns all peers by host id.
LoadAllByHostID(context.Context, string) ([]*Peer, error)

// DeleteAllByHostID deletes all peers by host id.
DeleteAllByHostID(context.Context, string) error
}

// peerManager contains content for peer manager.
Expand Down Expand Up @@ -366,6 +372,59 @@ func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) erro
}
}

p.taskManager.Delete(ctx, taskID)
return nil
}

// LoadAllByHostID returns all persistent cache peers by host id.
func (p *peerManager) LoadAllByHostID(ctx context.Context, hostID string) ([]*Peer, error) {
log := logger.WithHostID(hostID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, hostID)).Result()
if err != nil {
log.Error("get peer ids failed")
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
continue
}

peers = append(peers, peer)
}

return peers, nil
}

// DeleteAllByHostID deletes all persistent cache peers by host id.
func (p *peerManager) DeleteAllByHostID(ctx context.Context, hostID string) error {
log := logger.WithTaskID(hostID)
peers, err := p.LoadAllByHostID(ctx, hostID)
if err != nil {
log.Error("load peers failed")
return err
}

for _, peer := range peers {
addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port)
client, err := dfdaemonclient.GetV2ByAddr(ctx, addr, grpc.WithTransportCredentials(p.transportCredentials))
if err != nil {
log.Errorf("get dfdaemon client failed: %v", err)
continue
}

if err := client.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: peer.Task.ID}); err != nil {
log.Errorf("delete task %s failed", peer.Task.ID)
continue
}

if err := p.Delete(ctx, peer.ID); err != nil {
log.Errorf("delete peer %s failed", peer.ID)
continue
}
}

return nil
}
29 changes: 29 additions & 0 deletions scheduler/resource/persistentcache/peer_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions scheduler/resource/persistentcache/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
redis "github.com/redis/go-redis/v9"
"google.golang.org/grpc/credentials"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/scheduler/config"
)

Expand Down Expand Up @@ -50,11 +52,16 @@ type resource struct {
}

// New returns Resource interface.
func New(cfg *config.Config, rdb redis.UniversalClient, transportCredentials credentials.TransportCredentials) Resource {
func New(cfg *config.Config, gc gc.GC, rdb redis.UniversalClient, transportCredentials credentials.TransportCredentials) (Resource, error) {
taskManager := newTaskManager(cfg, rdb)
hostManager := newHostManager(cfg, rdb)
hostManager, err := newHostManager(cfg, gc, rdb)
if err != nil {
logger.Errorf("failed to create host manager: %v", err)
return nil, err
}

peerManager := newPeerManager(cfg, rdb, taskManager, hostManager, transportCredentials)
return &resource{peerManager, taskManager, hostManager}
return &resource{peerManager, taskManager, hostManager}, nil
}

// Host manager interface.
Expand Down
6 changes: 5 additions & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize persistent cache resource.
s.persistentCacheResource = persistentcache.New(cfg, rdb, peerClientTransportCredentials)
s.persistentCacheResource, err = persistentcache.New(cfg, s.gc, rdb, peerClientTransportCredentials)
if err != nil {
logger.Errorf("failed to create persistent cache resource: %v", err)
return nil, err
}

// Initialize job service.
if cfg.Job.Enable && rdb != nil {
Expand Down

0 comments on commit 555a132

Please sign in to comment.