Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add AllSeedPeersScope for preheating #3698

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP

func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
if json.Args.Scope == "" {
json.Args.Scope = types.SinglePeerScope
json.Args.Scope = types.SingleSeedPeerScope
}

if json.Args.ConcurrentCount == 0 {
Expand Down
11 changes: 7 additions & 4 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package types
import "time"

const (
// SinglePeerScope represents the scope that only single peer will be preheated.
SinglePeerScope = "single_peer"
// SingleSeedPeerScope represents the scope that only single seed peer will be preheated.
SingleSeedPeerScope = "single_seed_peer"

// AllSeedPeersScope represents the scope that all seed peers will be preheated.
AllSeedPeersScope = "all_seed_peers"

// AllPeersScope represents the scope that all peers will be preheated.
AllPeersScope = "all_peers"
Expand Down Expand Up @@ -126,8 +129,8 @@ type PreheatArgs struct {
// The image type preheating task can specify the image architecture type. eg: linux/amd64.
Platform string `json:"platform" binding:"omitempty"`

// Scope is the scope for preheating, default is single_peer.
Scope string `json:"scope" binding:"omitempty,oneof=single_peer all_peers"`
// Scope is the scope for preheating, default is single_seed_peer.
Scope string `json:"scope" binding:"omitempty"`

// BatchSize is the batch size for preheating all peers, default is 50.
ConcurrentCount int64 `json:"concurrent_count" binding:"omitempty,gte=1,lte=500"`
Expand Down
179 changes: 170 additions & 9 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,22 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
defer cancel()

switch req.Scope {
case managertypes.SinglePeerScope:
log.Info("preheat single peer")
case managertypes.SingleSeedPeerScope:
log.Info("preheat single seed peer")
resp, err := j.preheatSinglePeer(ctx, taskID, req, log)
if err != nil {
return "", err
}

resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID
return internaljob.MarshalResponse(resp)
case managertypes.AllSeedPeersScope:
log.Info("preheat all seed peers")
resp, err := j.preheatAllSeedPeers(ctx, taskID, req, log)
if err != nil {
return "", err
}

resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID
return internaljob.MarshalResponse(resp)
case managertypes.AllPeersScope:
Expand Down Expand Up @@ -237,26 +246,178 @@ func (j *job) preheatSinglePeer(ctx context.Context, taskID string, req *interna
return resp, nil
}

// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
// If all the peer download task failed, return error. If some of the peer download task failed, return success tasks and failure tasks.
// preheatAllSeedPeers preheats job by all peer seed peers, only suoported by v2 protocol. Scheduler will trigger all seed peers to download task.
// If all the seed peers download task failed, return error. If some of the seed peers download task failed, return success tasks and failure tasks.
// Notify the client that the preheat is successful.
func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
// If seed peer is disabled, return error.
if !j.config.SeedPeer.Enable {
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}

// If scheduler has no available seed peer, return error.
seedPeers := j.resource.SeedPeer().Client().SeedPeers()
if len(seedPeers) == 0 {
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}

var (
successTasks = sync.Map{}
failureTasks = sync.Map{}
)

eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(int(req.ConcurrentCount))
for _, seedPeer := range seedPeers {
var (
hostname = seedPeer.Hostname
ip = seedPeer.Ip
port = seedPeer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(idgen.HostIDV2(ip, hostname, true), hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...)
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})

return err
}

stream, err := dfdaemonClient.DownloadTask(
ctx,
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
FilteredQueryParams: strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator),
RequestHeader: req.Headers,
Timeout: durationpb.New(req.Timeout),
CertificateChain: req.CertificateChain,
}})
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})

return err
}

// Wait for the download task to complete.
for {
_, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Info("preheat succeeded")
successTasks.Store(ip, &internaljob.PreheatSuccessTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
})

return nil
}

log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})

return err
}
}
})
}

// Wait for all tasks to complete and print the errors.
if err := eg.Wait(); err != nil {
log.Errorf("preheat failed: %s", err.Error())
}

// If successTasks is not empty, return success tasks and failure tasks.
// Notify the client that the preheat is successful.
var preheatResponse internaljob.PreheatResponse
failureTasks.Range(func(_, value any) bool {
if failureTask, ok := value.(*internaljob.PreheatFailureTask); ok {
preheatResponse.FailureTasks = append(preheatResponse.FailureTasks, failureTask)
}

return true
})

successTasks.Range(func(_, value any) bool {
if successTask, ok := value.(*internaljob.PreheatSuccessTask); ok {
for _, failureTask := range preheatResponse.FailureTasks {
if failureTask.IP == successTask.IP {
return true
}
}

preheatResponse.SuccessTasks = append(preheatResponse.SuccessTasks, successTask)
}

return true
})

if len(preheatResponse.SuccessTasks) > 0 {
return &preheatResponse, nil
}

msg := "no error message"
if len(preheatResponse.FailureTasks) > 0 {
msg = fmt.Sprintf("%s %s %s %s", taskID, preheatResponse.FailureTasks[0].IP, preheatResponse.FailureTasks[0].Hostname,
preheatResponse.FailureTasks[0].Description)
}

return nil, fmt.Errorf("all peers preheat failed: %s", msg)
}

// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
// If all the peers download task failed, return error. If some of the peers download task failed, return success tasks and
// failure tasks. Notify the client that the preheat is successful.
func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
// If scheduler has no available peer, return error.
peers := j.resource.HostManager().LoadAll()
if len(peers) == 0 {
return nil, fmt.Errorf("cluster %d scheduler %s has no available peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}

var (
successTasks = sync.Map{}
failureTasks = sync.Map{}
)

eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(int(req.ConcurrentCount))
for _, host := range j.resource.HostManager().LoadAll() {
for _, peer := range peers {
var (
hostname = host.Hostname
ip = host.IP
port = host.Port
hostname = peer.Hostname
ip = peer.IP
port = peer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(host.ID, hostname, ip)
log := logger.WithHost(peer.ID, hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
Expand Down
8 changes: 8 additions & 0 deletions scheduler/resource/standard/seed_peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type SeedPeerClient interface {
// Addrs returns the addresses of seed peers.
Addrs() []string

// SeedPeers returns the seed peers working for the scheduler.
SeedPeers() []*managerv2.SeedPeer

// Client is cdnsystem grpc client interface.
cdnsystemclient.Client

Expand Down Expand Up @@ -132,6 +135,11 @@ func (sc *seedPeerClient) Addrs() []string {
return addrs
}

// SeedPeers returns the seed peers working for the scheduler.
func (sc *seedPeerClient) SeedPeers() []*managerv2.SeedPeer {
return sc.data.Scheduler.SeedPeers
}

// Dynamic config notify function.
func (sc *seedPeerClient) OnNotify(data *config.DynconfigData) {
if reflect.DeepEqual(sc.data, data) {
Expand Down
15 changes: 15 additions & 0 deletions scheduler/resource/standard/seed_peer_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading