Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1354 from weaveworks/rewrite/image-fetch-backoff
Browse files Browse the repository at this point in the history
Vary sampling rate in image metadata cache, and back off on HTTP 429
  • Loading branch information
squaremo authored Sep 18, 2018
2 parents 5e2410f + 1c3a901 commit 2691f5e
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 138 deletions.
22 changes: 13 additions & 9 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ var version = "unversioned"
const (
product = "weave-flux"

// The number of connections chosen for memcache and remote GETs should match for best performance (hence the single hardcoded value)
// Value chosen through performance tests on sock-shop. I was unable to get higher performance than this.
defaultRemoteConnections = 125 // Chosen performance tests on sock-shop. Unable to get higher performance than this.
defaultMemcacheConnections = 10 // This doesn't need to be high. The user is only requesting one tag/image at a time.
// This is used as the "burst" value for rate limiting, and
// therefore also as the limit to the number of concurrent fetches
// and memcached connections, since these in general can't do any
// more work than is allowed by the burst amount.
defaultRemoteConnections = 10

// There are running systems that assume these defaults (by not
// supplying a value for one or both). Don't change them.
Expand Down Expand Up @@ -98,9 +99,8 @@ func main() {
memcachedHostname = fs.String("memcached-hostname", "memcached", "Hostname for memcached service.")
memcachedTimeout = fs.Duration("memcached-timeout", time.Second, "Maximum time to wait before giving up on memcached requests.")
memcachedService = fs.String("memcached-service", "memcached", "SRV service used to discover memcache servers.")
registryCacheExpiry = fs.Duration("registry-cache-expiry", 1*time.Hour, "Duration to keep cached image info. Must be < 1 month.")
registryPollInterval = fs.Duration("registry-poll-interval", 5*time.Minute, "period at which to check for updated images")
registryRPS = fs.Int("registry-rps", 200, "maximum registry requests per second per host")
registryRPS = fs.Float64("registry-rps", 50, "maximum registry requests per second per host")
registryBurst = fs.Int("registry-burst", defaultRemoteConnections, "maximum number of warmer connections to remote and memcache")
registryTrace = fs.Bool("registry-trace", false, "output trace of image registry requests to log")
registryInsecure = fs.StringSlice("registry-insecure-host", []string{}, "use HTTP for this image registry domain (e.g., registry.cluster.local), instead of HTTPS")
Expand All @@ -119,7 +119,10 @@ func main() {
token = fs.String("token", "", "Authentication token for upstream service")

dockerConfig = fs.String("docker-config", "", "path to a docker config to use for image registry credentials")

_ = fs.Duration("registry-cache-expiry", 0, "")
)
fs.MarkDeprecated("registry-cache-expiry", "no longer used; cache entries are expired adaptively according to how often they change")

err := fs.Parse(os.Args[1:])
switch {
Expand Down Expand Up @@ -281,7 +284,6 @@ func main() {
memcacheClient := registryMemcache.NewMemcacheClient(registryMemcache.MemcacheConfig{
Host: *memcachedHostname,
Service: *memcachedService,
Expiry: *registryCacheExpiry,
Timeout: *memcachedTimeout,
UpdateInterval: 1 * time.Minute,
Logger: log.With(logger, "component", "memcached"),
Expand All @@ -298,8 +300,9 @@ func main() {
// Remote client, for warmer to refresh entries
registryLogger := log.With(logger, "component", "registry")
registryLimits := &registryMiddleware.RateLimiters{
RPS: *registryRPS,
Burst: *registryBurst,
RPS: *registryRPS,
Burst: *registryBurst,
Logger: log.With(logger, "component", "ratelimiter"),
}
remoteFactory := &registry.RemoteClientFactory{
Logger: registryLogger,
Expand Down Expand Up @@ -441,6 +444,7 @@ func main() {

cacheWarmer.Notify = daemon.AskForImagePoll
cacheWarmer.Priority = daemon.ImageRefresh
cacheWarmer.Trace = *registryTrace
shutdownWg.Add(1)
go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, imageCreds)

Expand Down
36 changes: 26 additions & 10 deletions image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ type Info struct {
ImageID string `json:",omitempty"`
// the time at which the image pointed at was created
CreatedAt time.Time `json:",omitempty"`
// the last time this image manifest was fetched
LastFetched time.Time `json:",omitempty"`
}

// MarshalJSON returns the Info value in JSON (as bytes). It is
Expand All @@ -245,14 +247,18 @@ type Info struct {
// detect.
func (im Info) MarshalJSON() ([]byte, error) {
type InfoAlias Info // alias to shed existing MarshalJSON implementation
var t string
var ca, lf string
if !im.CreatedAt.IsZero() {
t = im.CreatedAt.UTC().Format(time.RFC3339Nano)
ca = im.CreatedAt.UTC().Format(time.RFC3339Nano)
}
if !im.LastFetched.IsZero() {
lf = im.LastFetched.UTC().Format(time.RFC3339Nano)
}
encode := struct {
InfoAlias
CreatedAt string `json:",omitempty"`
}{InfoAlias(im), t}
CreatedAt string `json:",omitempty"`
LastFetched string `json:",omitempty"`
}{InfoAlias(im), ca, lf}
return json.Marshal(encode)
}

Expand All @@ -262,18 +268,28 @@ func (im *Info) UnmarshalJSON(b []byte) error {
type InfoAlias Info
unencode := struct {
InfoAlias
CreatedAt string `json:",omitempty"`
CreatedAt string `json:",omitempty"`
LastFetched string `json:",omitempty"`
}{}
json.Unmarshal(b, &unencode)
*im = Info(unencode.InfoAlias)
if unencode.CreatedAt == "" {
im.CreatedAt = time.Time{}

var err error
if err = decodeTime(unencode.CreatedAt, &im.CreatedAt); err == nil {
err = decodeTime(unencode.LastFetched, &im.LastFetched)
}
return err
}

func decodeTime(s string, t *time.Time) error {
if s == "" {
*t = time.Time{}
} else {
t, err := time.Parse(time.RFC3339, unencode.CreatedAt)
var err error
*t, err = time.Parse(time.RFC3339, s)
if err != nil {
return err
}
im.CreatedAt = t.UTC()
}
return nil
}
Expand Down Expand Up @@ -311,7 +327,7 @@ func NewerBySemver(lhs, rhs *Info) bool {
}

// Sort orders the given image infos according to `newer` func.
func Sort(infos []Info, newer func (a, b *Info) bool) {
func Sort(infos []Info, newer func(a, b *Info) bool) {
if newer == nil {
newer = NewerByCreated
}
Expand Down
8 changes: 3 additions & 5 deletions image/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package image
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -149,9 +148,11 @@ func mustMakeInfo(ref string, created time.Time) Info {

func TestImageInfoSerialisation(t *testing.T) {
t0 := time.Now().UTC() // UTC so it has nil location, otherwise it won't compare
t1 := time.Now().Add(5 * time.Minute).UTC()
info := mustMakeInfo("my/image:tag", t0)
info.Digest = "sha256:digest"
info.ImageID = "sha256:layerID"
info.LastFetched = t1
bytes, err := json.Marshal(info)
if err != nil {
t.Fatal(err)
Expand All @@ -160,9 +161,7 @@ func TestImageInfoSerialisation(t *testing.T) {
if err = json.Unmarshal(bytes, &info1); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(info, info1) {
t.Errorf("roundtrip serialisation failed:\n original: %#v\nroundtripped: %#v", info, info1)
}
assert.Equal(t, info, info1)
}

func TestImageInfoCreatedAtZero(t *testing.T) {
Expand Down Expand Up @@ -248,4 +247,3 @@ func reverse(imgs []Info) {
imgs[i], imgs[opp] = imgs[opp], imgs[i]
}
}

4 changes: 3 additions & 1 deletion registry/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
)

type Reader interface {
// GetKey gets the value at a key, along with its refresh deadline
GetKey(k Keyer) ([]byte, time.Time, error)
}

type Writer interface {
SetKey(k Keyer, v []byte) error
// SetKey sets the value at a key, along with its refresh deadline
SetKey(k Keyer, deadline time.Time, v []byte) error
}

type Client interface {
Expand Down
58 changes: 29 additions & 29 deletions registry/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
/* This package implements an image DB cache using memcached.
Items are given an expiry based on their refresh deadline, with a
minimum duration to try and ensure things will expire well after they
would have been refreshed (i.e., only if they truly need garbage
collection).
memcached will still evict things when under memory pressure. We can
recover from that -- we'll just get a cache miss, and fetch it again.
*/
package memcached

import (
Expand All @@ -16,7 +27,8 @@ import (
)

const (
DefaultExpiry = time.Hour
// The minimum expiry given to an entry.
MinExpiry = time.Hour
)

// MemcacheClient is a memcache client that gets its server list from SRV
Expand All @@ -26,7 +38,6 @@ type MemcacheClient struct {
serverList *memcache.ServerList
hostname string
service string
ttl time.Duration
logger log.Logger

quit chan struct{}
Expand All @@ -37,7 +48,6 @@ type MemcacheClient struct {
type MemcacheConfig struct {
Host string
Service string
Expiry time.Duration
Timeout time.Duration
UpdateInterval time.Duration
Logger log.Logger
Expand All @@ -55,15 +65,10 @@ func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

err := newClient.updateMemcacheServers()
if err != nil {
config.Logger.Log("err", errors.Wrapf(err, "Error setting memcache servers to '%v'", config.Host))
Expand All @@ -86,25 +91,14 @@ func NewFixedServerMemcacheClient(config MemcacheConfig, addresses ...string) *M
serverList: &servers,
hostname: config.Host,
service: config.Service,
ttl: config.Expiry,
logger: config.Logger,
quit: make(chan struct{}),
}

if newClient.ttl == 0 {
newClient.ttl = DefaultExpiry
}

return newClient
}

// The memcached client does not report the expiry when you GET a
// value, but we do want to know it, so we can refresh items that are
// soon to expire (and ignore items that are not). For that reason, we
// prepend the expiry to the value when setting, and read it back when
// getting.

// GetKey gets the value and its expiry time from the cache.
// GetKey gets the value and its refresh deadline from the cache.
func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) {
cacheItem, err := c.client.Get(k.Key())
if err != nil {
Expand All @@ -116,19 +110,25 @@ func (c *MemcacheClient) GetKey(k cache.Keyer) ([]byte, time.Time, error) {
return []byte{}, time.Time{}, err
}
}
exTime := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(exTime), 0), nil
deadlineTime := binary.BigEndian.Uint32(cacheItem.Value)
return cacheItem.Value[4:], time.Unix(int64(deadlineTime), 0), nil
}

// SetKey sets the value at a key.
func (c *MemcacheClient) SetKey(k cache.Keyer, v []byte) error {
exTime := time.Now().Add(c.ttl).Unix()
exBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(exBytes, uint32(exTime))
// SetKey sets the value and its refresh deadline at a key. NB the key
// expiry is set _longer_ than the deadline, to give us a grace period
// in which to refresh the value.
func (c *MemcacheClient) SetKey(k cache.Keyer, refreshDeadline time.Time, v []byte) error {
expiry := refreshDeadline.Sub(time.Now()) * 2
if expiry < MinExpiry {
expiry = MinExpiry
}

deadlineBytes := make([]byte, 4, 4)
binary.BigEndian.PutUint32(deadlineBytes, uint32(refreshDeadline.Unix()))
if err := c.client.Set(&memcache.Item{
Key: k.Key(),
Value: append(exBytes, v...),
Expiration: int32(exTime),
Value: append(deadlineBytes, v...),
Expiration: int32(expiry.Seconds()),
}); err != nil {
c.logger.Log("err", errors.Wrap(err, "storing in memcache"))
return err
Expand Down
12 changes: 5 additions & 7 deletions registry/cache/memcached/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ func TestMemcache_ExpiryReadWrite(t *testing.T) {
}, strings.Fields(*memcachedIPs)...)

// Set some dummy data
err := mc.SetKey(key, val)
now := time.Now().Round(time.Second)
err := mc.SetKey(key, now, val)
if err != nil {
t.Fatal(err)
}

cached, expiry, err := mc.GetKey(key)
cached, deadline, err := mc.GetKey(key)
if err != nil {
t.Fatal(err)
}
if expiry.IsZero() {
t.Fatal("Time should not be zero")
}
if expiry.Before(time.Now()) {
t.Fatal("Expiry should be in the future")
if !deadline.Equal(now) {
t.Fatalf("Deadline should be %s, but is %s", now.String(), deadline.String())
}

if string(cached) != string(val) {
Expand Down
4 changes: 2 additions & 2 deletions registry/cache/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func (i *instrumentedClient) GetKey(k Keyer) (_ []byte, ex time.Time, err error)
return i.next.GetKey(k)
}

func (i *instrumentedClient) SetKey(k Keyer, v []byte) (err error) {
func (i *instrumentedClient) SetKey(k Keyer, d time.Time, v []byte) (err error) {
defer func(begin time.Time) {
cacheRequestDuration.With(
fluxmetrics.LabelMethod, "SetKey",
fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
).Observe(time.Since(begin).Seconds())
}(time.Now())
return i.next.SetKey(k, v)
return i.next.SetKey(k, d, v)
}
Loading

0 comments on commit 2691f5e

Please sign in to comment.