Skip to content

Commit

Permalink
feat: reuse connections and limit the number of connections for prehe…
Browse files Browse the repository at this point in the history
…ating (#3683)

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Dec 6, 2024
1 parent a97584a commit 49ae448
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 42 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ jobs:
chart-name: scheduler
- module: client
image: client
image-tag: v0.1.119
image-tag: v0.1.123
chart-name: client
- module: seed-client
image: client
image-tag: v0.1.119
image-tag: v0.1.123
chart-name: seed-client

steps:
Expand Down
77 changes: 39 additions & 38 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ const (
PreheatFileType PreheatType = "file"
)

// defaultHTTPTransport is the default http transport.
var defaultHTTPTransport = &http.Transport{
MaxIdleConns: 400,
MaxIdleConnsPerHost: 20,
MaxConnsPerHost: 50,
IdleConnTimeout: 120 * time.Second,
}

// accessURLPattern is the pattern of access url.
var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")

Expand All @@ -77,20 +85,26 @@ type Preheat interface {
// preheat is an implementation of Preheat.
type preheat struct {
job *internaljob.Job
registryTimeout time.Duration
rootCAs *x509.CertPool
certificateChain [][]byte
rootCAs *x509.CertPool
insecureSkipVerify bool
registryTimeout time.Duration
}

// newPreheat creates a new Preheat.
func newPreheat(job *internaljob.Job, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) (Preheat, error) {
var certificateChain [][]byte
p := &preheat{
job: job,
rootCAs: rootCAs,
insecureSkipVerify: insecureSkipVerify,
registryTimeout: registryTimeout,
}

if rootCAs != nil {
certificateChain = rootCAs.Subjects()
p.certificateChain = rootCAs.Subjects()
}

return &preheat{job, registryTimeout, rootCAs, certificateChain, insecureSkipVerify}, nil
return p, nil
}

// CreatePreheat creates a preheat job.
Expand Down Expand Up @@ -191,26 +205,29 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([
return nil, err
}

opts := []imageAuthClientOption{
withHTTPClient(&http.Client{
Timeout: p.registryTimeout,
Transport: &http.Transport{
DialContext: nethttp.NewSafeDialer().DialContext,
TLSClientConfig: &tls.Config{RootCAs: p.rootCAs, InsecureSkipVerify: p.insecureSkipVerify},
},
}),
withBasicAuth(args.Username, args.Password),
}
// Background:
// Harbor uses the V1 preheat request and will carry the auth info in the headers.
// Harbor uses the V1 preheat request and will carry the auth info in the headers.
options := []imageAuthClientOption{}
header := nethttp.MapToHeader(args.Headers)
if token := header.Get("Authorization"); len(token) > 0 {
opts = append(opts, withIssuedToken(token))
options = append(options, withIssuedToken(token))
header.Set("Authorization", token)
}

httpClient := &http.Client{
Timeout: p.registryTimeout,
Transport: &http.Transport{
DialContext: nethttp.NewSafeDialer().DialContext,
TLSClientConfig: &tls.Config{RootCAs: p.rootCAs, InsecureSkipVerify: p.insecureSkipVerify},
MaxIdleConns: defaultHTTPTransport.MaxIdleConns,
MaxIdleConnsPerHost: defaultHTTPTransport.MaxIdleConnsPerHost,
MaxConnsPerHost: defaultHTTPTransport.MaxConnsPerHost,
IdleConnTimeout: defaultHTTPTransport.IdleConnTimeout,
},
}

// Init docker auth client.
client, err := newImageAuthClient(image, opts...)
client, err := newImageAuthClient(image, httpClient, &typesregistry.AuthConfig{Username: args.Username, Password: args.Password}, options...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,23 +369,6 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
// imageAuthClientOption is an option for imageAuthClient.
type imageAuthClientOption func(*imageAuthClient)

// withBasicAuth sets basic auth for imageAuthClient.
func withBasicAuth(username, password string) imageAuthClientOption {
return func(c *imageAuthClient) {
c.authConfig = &typesregistry.AuthConfig{
Username: username,
Password: password,
}
}
}

// withHTTPClient sets http client for imageAuthClient.
func withHTTPClient(client *http.Client) imageAuthClientOption {
return func(c *imageAuthClient) {
c.httpClient = client
}
}

// withIssuedToken sets the issuedToken for imageAuthClient.
func withIssuedToken(token string) imageAuthClientOption {
return func(c *imageAuthClient) {
Expand All @@ -394,17 +394,18 @@ type imageAuthClient struct {
}

// newImageAuthClient creates a new imageAuthClient.
func newImageAuthClient(image *preheatImage, opts ...imageAuthClientOption) (*imageAuthClient, error) {
func newImageAuthClient(image *preheatImage, httpClient *http.Client, authConfig *typesregistry.AuthConfig, opts ...imageAuthClientOption) (*imageAuthClient, error) {
d := &imageAuthClient{
httpClient: http.DefaultClient,
httpClient: httpClient,
authConfig: authConfig,
interceptorTokenHandler: newInterceptorTokenHandler(),
}

for _, opt := range opts {
opt(d)
}

// return earlier if issued token is not empty
// Return earlier if issued token is not empty.
if len(d.issuedToken) > 0 {
return d, nil
}
Expand Down

0 comments on commit 49ae448

Please sign in to comment.