Skip to content

Commit

Permalink
feat: when the redis is disabled, AnnounceHost need to skip store red…
Browse files Browse the repository at this point in the history
…is (#3712)

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Dec 17, 2024
1 parent a370da1 commit 063a20d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
10 changes: 6 additions & 4 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize persistent cache resource.
s.persistentCacheResource, err = persistentcache.New(cfg, s.gc, rdb, peerClientTransportCredentials)
if err != nil {
logger.Errorf("failed to create persistent cache resource: %v", err)
return nil, err
if rdb != nil {
s.persistentCacheResource, err = persistentcache.New(cfg, s.gc, rdb, peerClientTransportCredentials)
if err != nil {
logger.Errorf("failed to create persistent cache resource: %v", err)
return nil, err
}
}

// Initialize job service.
Expand Down
39 changes: 38 additions & 1 deletion scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,12 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
}
}

// Handle the persistent cache host.
// Handle the persistent cache host. If redis is not enabled,
// it will not support the persistent cache feature.
if v.persistentCacheResource == nil {
return nil
}

persistentCacheHost, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.Host.GetId())
if !loaded {
options := []persistentcache.HostOption{}
Expand Down Expand Up @@ -1573,11 +1578,19 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download
// TODO Implement the following methods.
// AnnouncePersistentCachePeer announces persistent cache peer to scheduler.
func (v *V2) AnnouncePersistentCachePeer(stream schedulerv2.Scheduler_AnnouncePersistentCachePeerServer) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

return nil
}

// StatPersistentCachePeer checks information of persistent cache peer.
func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatPersistentCachePeerRequest) (*commonv2.PersistentCachePeer, error) {
if v.persistentCacheResource == nil {
return nil, status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.HostId, req.TaskId, req.PeerId)
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
Expand Down Expand Up @@ -1694,6 +1707,10 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP

// DeletePersistentCachePeer releases persistent cache peer in scheduler.
func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.DeletePersistentCachePeerRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil {
log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err)
Expand All @@ -1707,6 +1724,10 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del

// UploadPersistentCacheTaskStarted uploads the metadata of the persistent cache task started.
func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskStartedRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
host, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.GetHostId())
if !loaded {
Expand Down Expand Up @@ -1764,6 +1785,10 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule

// UploadPersistentCacheTaskFinished uploads the metadata of the persistent cache task finished.
func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFinishedRequest) (*commonv2.PersistentCacheTask, error) {
if v.persistentCacheResource == nil {
return nil, status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
// Handle peer with task finished request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
Expand Down Expand Up @@ -1830,6 +1855,10 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul

// UploadPersistentCacheTaskFailed uploads the metadata of the persistent cache task failed.
func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *schedulerv2.UploadPersistentCacheTaskFailedRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
// Handle peer with task failed request, load peer and update it.
peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
Expand Down Expand Up @@ -1866,6 +1895,10 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler

// StatPersistentCacheTask checks information of persistent cache task.
func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatPersistentCacheTaskRequest) (*commonv2.PersistentCacheTask, error) {
if v.persistentCacheResource == nil {
return nil, status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if !loaded {
Expand Down Expand Up @@ -1904,6 +1937,10 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP

// DeletePersistentCacheTask releases persistent cache task in scheduler.
func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.DeletePersistentCacheTaskRequest) error {
if v.persistentCacheResource == nil {
return status.Error(codes.FailedPrecondition, "redis is not enabled")
}

log := logger.WithHostAndTaskID(req.GetHostId(), req.GetTaskId())
if err := v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()); err != nil {
log.Errorf("delete persistent cache peers by task %s error %s", req.GetTaskId(), err)
Expand Down

0 comments on commit 063a20d

Please sign in to comment.