Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: register service to manager #475

Merged
merged 2 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions cdnsystem/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ func (s *Server) register(ctx context.Context) error {
location := s.config.Host.Location
downloadPort := int32(s.config.DownloadPort)

var cdn *manager.CDN
var err error
cdn, err = s.managerClient.CreateCDN(ctx, &manager.CreateCDNRequest{
cdn, err := s.managerClient.UpdateCDN(ctx, &manager.UpdateCDNRequest{
SourceType: manager.SourceType_CDN_SOURCE,
HostName: iputils.HostName,
Ip: ip,
Expand All @@ -176,30 +174,18 @@ func (s *Server) register(ctx context.Context) error {
DownloadPort: downloadPort,
})
if err != nil {
cdn, err = s.managerClient.UpdateCDN(ctx, &manager.UpdateCDNRequest{
SourceType: manager.SourceType_CDN_SOURCE,
HostName: iputils.HostName,
Ip: ip,
Port: port,
Idc: idc,
Location: location,
DownloadPort: downloadPort,
})
if err != nil {
logger.Errorf("update cdn to manager failed %v", err)
return err
}
logger.Infof("update cdn %s successfully", cdn.HostName)
logger.Errorf("update cdn %s to manager failed %v", cdn.HostName, err)
return err
}
logger.Infof("create cdn %s successfully", cdn.HostName)
logger.Infof("update cdn %s to manager successfully", cdn.HostName)

cdnClusterID := s.config.Manager.CDNClusterID
if cdnClusterID != 0 {
if _, err := s.managerClient.AddCDNToCDNCluster(ctx, &manager.AddCDNToCDNClusterRequest{
CdnId: cdn.Id,
CdnClusterId: cdnClusterID,
}); err != nil {
logger.Warnf("add cdn to cdn cluster failed %v", err)
logger.Warnf("add cdn %s to cdn cluster %s failed %v", cdn.HostName, cdnClusterID, err)
return err
}
logger.Infof("add cdn %s to cdn cluster %s successfully", cdn.HostName, cdnClusterID)
Expand Down
57 changes: 36 additions & 21 deletions manager/service/service_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"io"

logger "d7y.io/dragonfly/v2/internal/dflog"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (s *GRPC) GetCDN(ctx context.Context, req *manager.GetCDNRequest) (*manager
return &pbCDN, nil
}

func (s *GRPC) CreateCDN(ctx context.Context, req *manager.CreateCDNRequest) (*manager.CDN, error) {
func (s *GRPC) createCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*manager.CDN, error) {
if err := req.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand All @@ -142,7 +143,7 @@ func (s *GRPC) CreateCDN(ctx context.Context, req *manager.CreateCDNRequest) (*m
}

if err := s.db.Create(&cdn).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

return &manager.CDN{
Expand All @@ -162,14 +163,21 @@ func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m
}

cdn := model.CDN{}
if err := s.db.First(&cdn, model.CDN{HostName: req.HostName}).Updates(model.CDN{
if err := s.db.First(&cdn, model.CDN{HostName: req.HostName}).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createCDN(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.db.Model(&cdn).Updates(model.CDN{
IDC: req.Idc,
Location: req.Location,
IP: req.Ip,
Port: req.Port,
DownloadPort: req.DownloadPort,
}).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -197,16 +205,16 @@ func (s *GRPC) AddCDNToCDNCluster(ctx context.Context, req *manager.AddCDNToCDNC

cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, req.CdnClusterId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

cdn := model.CDN{}
if err := s.db.First(&cdn, req.CdnId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.db.Model(&cdnCluster).Association("CDNs").Append(&cdn); err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -314,7 +322,7 @@ func (s *GRPC) GetScheduler(ctx context.Context, req *manager.GetSchedulerReques
return &pbScheduler, nil
}

func (s *GRPC) CreateScheduler(ctx context.Context, req *manager.CreateSchedulerRequest) (*manager.Scheduler, error) {
func (s *GRPC) createScheduler(ctx context.Context, req *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) {
if err := req.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand All @@ -337,7 +345,7 @@ func (s *GRPC) CreateScheduler(ctx context.Context, req *manager.CreateScheduler
}

if err := s.db.Create(&scheduler).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

return &manager.Scheduler{
Expand All @@ -358,23 +366,30 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
return nil, status.Error(codes.InvalidArgument, err.Error())
}

scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{HostName: req.HostName}).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createScheduler(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
}

var netConfig datatypes.JSONMap
if len(req.NetConfig) > 0 {
if err := netConfig.UnmarshalJSON(req.NetConfig); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}

scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{HostName: req.HostName}).Updates(model.Scheduler{
if err := s.db.Model(&scheduler).Updates(model.Scheduler{
VIPs: req.Vips,
IDC: req.Idc,
Location: req.Location,
NetConfig: netConfig,
IP: req.Ip,
Port: req.Port,
}).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -404,16 +419,16 @@ func (s *GRPC) AddSchedulerClusterToSchedulerCluster(ctx context.Context, req *m

schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, req.SchedulerClusterId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.db.Model(&schedulerCluster).Association("Schedulers").Append(&scheduler); err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -500,7 +515,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
req, err := m.Recv()
if err != nil {
logger.Errorf("keepalive failed for the first time: %v", err)
return err
return status.Error(codes.Unknown, err.Error())
}
if err := req.Validate(); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -518,7 +533,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.Scheduler{
Status: model.SchedulerStatusActive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -537,7 +552,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.CDN{
Status: model.CDNStatusActive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -559,7 +574,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.Scheduler{
Status: model.SchedulerStatusInactive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -578,7 +593,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.CDN{
Status: model.CDNStatusInactive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -594,7 +609,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
return nil
}
logger.Errorf("%s keepalive failed: %v", hostName, err)
return err
return status.Error(codes.Unknown, err.Error())
}

logger.Debugf("%s type of %s send keepalive request", sourceType, hostName)
Expand Down
10 changes: 7 additions & 3 deletions pkg/rpc/cdnsystem/cdnsystem_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions pkg/rpc/dfdaemon/dfdaemon_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading