Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize scheduler dynconfig #480

Merged
merged 9 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ func SetupQuitSignalHandler(handler func()) {
}()
}

func GetConfigPath(name string) string {
cfgFile := viper.GetString("config")
if cfgFile != "" {
return cfgFile
}

return filepath.Join(dfpath.DefaultConfigDir, fmt.Sprintf("%s.yaml", name))
}

// initConfig reads in config file and ENV variables if set.
func initConfig(useConfigFile bool, name string, config interface{}) {
// Use config file and read once.
Expand Down
1 change: 1 addition & 0 deletions internal/dfpath/config_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ import (
)

var DefaultConfigDir = filepath.Join(WorkHome, "config")
var DefaultDynconfigDir = filepath.Join(DefaultConfigDir, "dynconfig")
3 changes: 3 additions & 0 deletions internal/dfpath/config_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@

package dfpath

import "path/filepath"

var DefaultConfigDir = "/etc/dragonfly"
var DefaultDynconfigDir = filepath.Join(DefaultConfigDir, "dynconfig")
4 changes: 4 additions & 0 deletions internal/dfpath/dfpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func init() {
panic(err)
}

if err := fileutils.MkdirAll(DefaultDynconfigDir); err != nil {
panic(err)
}

if err := fileutils.MkdirAll(LogDir); err != nil {
panic(err)
}
Expand Down
27 changes: 4 additions & 23 deletions internal/dynconfig/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
)

type strategy interface {
Unmarshal(rawVal interface{}, opts ...DecoderConfigOption) error
Unmarshal(rawVal interface{}) error
}

type Dynconfig struct {
Expand All @@ -56,10 +56,6 @@ type Option func(d *Dynconfig) error
// WithManagerClient set the manager client
func WithManagerClient(c ManagerClient) Option {
return func(d *Dynconfig) error {
if d.sourceType != ManagerSourceType {
return errors.New("the source type must be ManagerSourceType")
}

d.managerClient = c
return nil
}
Expand All @@ -68,10 +64,6 @@ func WithManagerClient(c ManagerClient) Option {
// WithLocalConfigPath set the file path
func WithLocalConfigPath(p string) Option {
return func(d *Dynconfig) error {
if d.sourceType != LocalSourceType {
return errors.New("the source type must be LocalSourceType")
}

d.localConfigPath = p
return nil
}
Expand All @@ -80,10 +72,6 @@ func WithLocalConfigPath(p string) Option {
// WithCachePath set the cache file path
func WithCachePath(p string) Option {
return func(d *Dynconfig) error {
if d.sourceType != ManagerSourceType {
return errors.New("the source type must be ManagerSourceType")
}

d.cachePath = p
return nil
}
Expand All @@ -92,10 +80,6 @@ func WithCachePath(p string) Option {
// WithExpireTime set the expire time for cache
func WithExpireTime(e time.Duration) Option {
return func(d *Dynconfig) error {
if d.sourceType != ManagerSourceType {
return errors.New("the source type must be ManagerSourceType")
}

d.expire = e
return nil
}
Expand Down Expand Up @@ -168,8 +152,8 @@ func (d *Dynconfig) validate() error {

// Unmarshal unmarshals the config into a Struct. Make sure that the tags
// on the fields of the structure are properly set.
func (d *Dynconfig) Unmarshal(rawVal interface{}, opts ...DecoderConfigOption) error {
return d.strategy.Unmarshal(rawVal, opts...)
func (d *Dynconfig) Unmarshal(rawVal interface{}) error {
return d.strategy.Unmarshal(rawVal)
}

// A DecoderConfigOption can be passed to dynconfig Unmarshal to configure
Expand All @@ -178,7 +162,7 @@ type DecoderConfigOption func(*mapstructure.DecoderConfig)

// defaultDecoderConfig returns default mapstructure.DecoderConfig with support
// of time.Duration values & string slices
func defaultDecoderConfig(output interface{}, opts ...DecoderConfigOption) *mapstructure.DecoderConfig {
func defaultDecoderConfig(output interface{}) *mapstructure.DecoderConfig {
c := &mapstructure.DecoderConfig{
Metadata: nil,
Result: output,
Expand All @@ -188,9 +172,6 @@ func defaultDecoderConfig(output interface{}, opts ...DecoderConfigOption) *maps
mapstructure.StringToSliceHookFunc(","),
),
}
for _, opt := range opts {
opt(c)
}
return c
}

Expand Down
10 changes: 5 additions & 5 deletions internal/dynconfig/dynconfig_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package dynconfig

import (
"encoding/json"
"errors"
"io/ioutil"

"gopkg.in/yaml.v3"
)

type dynconfigLocal struct {
Expand All @@ -37,11 +37,11 @@ func newDynconfigLocal(path string) (*dynconfigLocal, error) {

// Unmarshal unmarshals the config into a Struct. Make sure that the tags
// on the fields of the structure are properly set.
func (d *dynconfigLocal) Unmarshal(rawVal interface{}, opts ...DecoderConfigOption) error {
func (d *dynconfigLocal) Unmarshal(rawVal interface{}) error {
b, err := ioutil.ReadFile(d.filepath)
if err != nil {
return errors.New("can't find the local config data")
return err
}

return json.Unmarshal(b, rawVal)
return yaml.Unmarshal(b, rawVal)
}
4 changes: 2 additions & 2 deletions internal/dynconfig/dynconfig_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func (d *dynconfigManager) get() (interface{}, error) {

// Unmarshal unmarshals the config into a Struct. Make sure that the tags
// on the fields of the structure are properly set.
func (d *dynconfigManager) Unmarshal(rawVal interface{}, opts ...DecoderConfigOption) error {
func (d *dynconfigManager) Unmarshal(rawVal interface{}) error {
dynconfig, err := d.get()
if err != nil {
return errors.New("can't find the cached data")
}

return decode(dynconfig, defaultDecoderConfig(rawVal, opts...))
return decode(dynconfig, defaultDecoderConfig(rawVal))
}

// Load dynamic config from manager
Expand Down
26 changes: 9 additions & 17 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,19 @@ func New() *Config {
}

func (c *Config) Validate() error {
if c.Dynconfig.Type == dc.LocalSourceType && c.Dynconfig.Path == "" {
return errors.New("dynconfig is LocalSourceType type requires parameter path")
if c.Dynconfig.CDNDirPath == "" {
if c.Dynconfig.Type == dc.LocalSourceType && c.Dynconfig.Data == nil {
return errors.New("dynconfig is LocalSourceType type requires parameter data")
}
}

if c.Dynconfig.Type == dc.ManagerSourceType {
if c.Dynconfig.ExpireTime == 0 {
return errors.New("dynconfig is ManagerSourceType type requires parameter expireTime")
}

if c.Dynconfig.CachePath == "" {
return errors.New("dynconfig is ManagerSourceType type requires parameter cachePath")
}

if c.Dynconfig.Addr == "" {
return errors.New("dynconfig is ManagerSourceType type requires parameter addr")
if c.Manager.Addr == "" {
return errors.New("dynconfig is ManagerSourceType type requires parameter manager addr")
}
}

Expand Down Expand Up @@ -93,17 +91,11 @@ type DynconfigOptions struct {
// ExpireTime is expire time for manager cache.
ExpireTime time.Duration `yaml:"expireTime" mapstructure:"expireTime"`

// Addr is dynconfig source address.
Addr string `yaml:"addr" mapstructure:"addr"`

// Path is dynconfig filepath.
Path string `yaml:"path" mapstructure:"path"`

// CachePath is cache filepath.
CachePath string `yaml:"cachePath" mapstructure:"cachePath"`

// CDNDirPath is cdn dir.
CDNDirPath string `yaml:"cdnDirPath" mapstructure:"cdnDirPath"`

// Data is dynconfig local data.
Data *DynconfigData `yaml:"data" mapstructure:"data"`
}

type SchedulerConfig struct {
Expand Down
2 changes: 0 additions & 2 deletions scheduler/config/config_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ var config = Config{
Dynconfig: &DynconfigOptions{
Type: dc.LocalSourceType,
ExpireTime: 30000 * 1000 * 1000,
Path: SchedulerDynconfigPath,
CachePath: SchedulerDynconfigCachePath,
},
Server: ServerConfig{
IP: iputils.HostIP,
Expand Down
2 changes: 0 additions & 2 deletions scheduler/config/config_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ var config = Config{
Dynconfig: &DynconfigOptions{
Type: dc.LocalSourceType,
ExpireTime: 30000 * 1000 * 1000,
Path: SchedulerDynconfigPath,
CachePath: SchedulerDynconfigCachePath,
},
Server: ServerConfig{
IP: iputils.HostIP,
Expand Down
3 changes: 0 additions & 3 deletions scheduler/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ func TestSchedulerConfig_Load(t *testing.T) {
config := &Config{
Dynconfig: &DynconfigOptions{
Type: dc.LocalSourceType,
Path: "foo",
CachePath: "bar",
ExpireTime: 1000,
Addr: "127.0.0.1:8002",
CDNDirPath: "tmp",
},
Scheduler: SchedulerConfig{
Expand Down
53 changes: 39 additions & 14 deletions scheduler/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,27 @@ import (
)

var (
SchedulerDynconfigPath = filepath.Join(dfpath.WorkHome, "dynconfig/scheduler.json")
SchedulerDynconfigCachePath = filepath.Join(dfpath.WorkHome, "dynconfig/scheduler")
SchedulerDynconfigCachePath = filepath.Join(dfpath.DefaultDynconfigDir, "scheduler")
)

var (
watchInterval = 1 * time.Second
)

type DynconfigData struct {
CDNs []*CDN `yaml:"cdns" mapstructure:"cdns"`
}

type CDN struct {
HostName string `yaml:"hostname" mapstructure:"hostname" json:"host_name"`
IP string `yaml:"ip" mapstructure:"ip" json:"ip"`
Port int32 `yaml:"port" mapstructure:"port" json:"port"`
DownloadPort int32 `yaml:"downloadPort" mapstructure:"downloadPort" json:"download_port"`
}

type DynconfigInterface interface {
// Get the dynamic config from manager.
Get() (*manager.Scheduler, error)
Get() (*DynconfigData, error)

// Register allows an instance to register itself to listen/observe events.
Register(Observer)
Expand All @@ -62,14 +72,15 @@ type DynconfigInterface interface {

type Observer interface {
// OnNotify allows an event to be "published" to interface implementations.
OnNotify(*manager.Scheduler)
OnNotify(*DynconfigData)
}

type dynconfig struct {
*dc.Dynconfig
observers map[Observer]struct{}
done chan bool
cdnDirPath string
sourceType dc.SourceType
}

// TODO(Gaius) Rely on manager to delete cdnDirPath
Expand All @@ -78,6 +89,7 @@ func NewDynconfig(sourceType dc.SourceType, cdnDirPath string, options ...dc.Opt
observers: map[Observer]struct{}{},
done: make(chan bool),
cdnDirPath: cdnDirPath,
sourceType: sourceType,
}

client, err := dc.New(sourceType, options...)
Expand All @@ -88,30 +100,43 @@ func NewDynconfig(sourceType dc.SourceType, cdnDirPath string, options ...dc.Opt
return d, nil
}

func (d *dynconfig) Get() (*manager.Scheduler, error) {
var config manager.Scheduler
func (d *dynconfig) Get() (*DynconfigData, error) {
var config DynconfigData
if d.cdnDirPath != "" {
cdn, err := d.getCDNFromDirPath()
cdns, err := d.getCDNFromDirPath()
if err != nil {
return nil, err
}
config.Cdns = cdn
} else {
config.CDNs = cdns
return &config, nil
}

if d.sourceType == dc.ManagerSourceType {
if err := d.Unmarshal(&config); err != nil {
return nil, err
}
return &config, nil
}

if err := d.Unmarshal(&struct {
Dynconfig *DynconfigOptions `yaml:"dynconfig" mapstructure:"dynconfig"`
}{
Dynconfig: &DynconfigOptions{
Data: &config,
},
}); err != nil {
return nil, err
}
return &config, nil
}

func (d *dynconfig) getCDNFromDirPath() ([]*manager.CDN, error) {
func (d *dynconfig) getCDNFromDirPath() ([]*CDN, error) {
files, err := ioutil.ReadDir(d.cdnDirPath)
if err != nil {
return nil, err
}

var data []*manager.CDN
var data []*CDN
for _, file := range files {
// skip directory
if file.IsDir() {
Expand All @@ -135,7 +160,7 @@ func (d *dynconfig) getCDNFromDirPath() ([]*manager.CDN, error) {
return nil, err
}

var s *manager.CDN
var s *CDN
if err := json.Unmarshal(b, &s); err != nil {
return nil, err
}
Expand Down Expand Up @@ -203,13 +228,13 @@ func NewManagerClient(client manager.ManagerClient) dc.ManagerClient {
}

func (mc *managerClient) Get() (interface{}, error) {
scConfig, err := mc.GetScheduler(context.Background(), &manager.GetSchedulerRequest{
scheduler, err := mc.GetScheduler(context.Background(), &manager.GetSchedulerRequest{
HostName: iputils.HostName,
SourceType: manager.SourceType_SCHEDULER_SOURCE,
})
if err != nil {
return nil, err
}

return scConfig, nil
return scheduler, nil
}
Loading