Skip to content

Commit

Permalink
Requeue the reconcile request on async update error
Browse files Browse the repository at this point in the history
  • Loading branch information
turkenh authored and ulucinar committed Jul 14, 2023
1 parent b70acc1 commit e54d702
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 36 deletions.
92 changes: 78 additions & 14 deletions pkg/controller/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,31 @@ package controller

import (
"context"
"sync"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
ctrl "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/upbound/upjet/pkg/resource"
"github.com/upbound/upjet/pkg/terraform"
)

const (
errGet = "cannot get resource"
errGetFmt = "cannot get resource %s/%s after an async %s"
errUpdateStatusFmt = "cannot update status of resource %s/%s after an async %s"
)

var _ CallbackProvider = &APICallbacks{}

// APISecretClient is a client for getting k8s secrets
type APISecretClient struct {
kube client.Client
Expand Down Expand Up @@ -67,39 +74,96 @@ func NewAPICallbacks(m ctrl.Manager, of xpresource.ManagedKind) *APICallbacks {
return &APICallbacks{
kube: m.GetClient(),
newTerraformed: nt,
EventHandler: &eventHandler{
mu: &sync.Mutex{},
rateLimiter: workqueue.DefaultControllerRateLimiter(),
},
}
}

// APICallbacks providers callbacks that work on API resources.
type APICallbacks struct {
EventHandler *eventHandler

kube client.Client
newTerraformed func() resource.Terraformed
}

// Apply makes sure the error is saved in async operation condition.
func (ac *APICallbacks) Apply(name string) terraform.CallbackFn {
func (ac *APICallbacks) callbackFn(name, op string) terraform.CallbackFn {
return func(err error, ctx context.Context) error {
nn := types.NamespacedName{Name: name}
tr := ac.newTerraformed()
if kErr := ac.kube.Get(ctx, nn, tr); kErr != nil {
return errors.Wrap(kErr, errGet)
return errors.Wrapf(kErr, errGetFmt, tr.GetObjectKind().GroupVersionKind().String(), name, op)
}
tr.SetConditions(resource.LastAsyncOperationCondition(err))
tr.SetConditions(resource.AsyncOperationFinishedCondition())
return errors.Wrap(ac.kube.Status().Update(ctx, tr), errStatusUpdate)
return errors.Wrapf(ac.kube.Status().Update(ctx, tr), errUpdateStatusFmt, tr.GetObjectKind().GroupVersionKind().String(), name, op)
}
}

// Destroy makes sure the error is saved in async operation condition.
func (ac *APICallbacks) Destroy(name string) terraform.CallbackFn {
// Create makes sure the error is saved in async operation condition.
func (ac *APICallbacks) Create(name string) terraform.CallbackFn {
return ac.callbackFn(name, "create")
}

// Update makes sure the error is saved in async operation condition.
func (ac *APICallbacks) Update(name string) terraform.CallbackFn {
return func(err error, ctx context.Context) error {
nn := types.NamespacedName{Name: name}
tr := ac.newTerraformed()
if kErr := ac.kube.Get(ctx, nn, tr); kErr != nil {
return errors.Wrap(kErr, errGet)
cErr := ac.callbackFn(name, "update")(err, ctx)
switch {
case err != nil:
ac.EventHandler.requestReconcile(name)
default:
ac.EventHandler.rateLimiter.Forget(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: name,
},
})
}
tr.SetConditions(resource.LastAsyncOperationCondition(err))
tr.SetConditions(resource.AsyncOperationFinishedCondition())
return errors.Wrap(ac.kube.Status().Update(ctx, tr), errStatusUpdate)
return cErr
}
}

// Destroy makes sure the error is saved in async operation condition.
func (ac *APICallbacks) Destroy(name string) terraform.CallbackFn {
return ac.callbackFn(name, "destroy")
}

type eventHandler struct {
queue workqueue.RateLimitingInterface
rateLimiter workqueue.RateLimiter
mu *sync.Mutex
}

func (e *eventHandler) requestReconcile(name string) bool {
e.mu.Lock()
defer e.mu.Unlock()
if e.queue == nil {
return false
}
item := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: name,
},
}
e.queue.AddAfter(item, e.rateLimiter.When(item))
return true
}

func (e *eventHandler) Create(_ context.Context, _ event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
e.mu.Lock()
defer e.mu.Unlock()
if e.queue == nil {
e.queue = limitingInterface
}
}

func (e *eventHandler) Update(_ context.Context, _ event.UpdateEvent, _ workqueue.RateLimitingInterface) {
}

func (e *eventHandler) Delete(_ context.Context, _ event.DeleteEvent, _ workqueue.RateLimitingInterface) {
}

func (e *eventHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) {
}
98 changes: 90 additions & 8 deletions pkg/controller/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
tjerrors "github.com/upbound/upjet/pkg/terraform/errors"
)

func TestAPICallbacks_Apply(t *testing.T) {
func TestAPICallbacksCreate(t *testing.T) {
type args struct {
mgr ctrl.Manager
mg xpresource.ManagedKind
Expand All @@ -48,7 +48,7 @@ func TestAPICallbacks_Apply(t *testing.T) {
args
want
}{
"ApplyOperationFailed": {
"CreateOperationFailed": {
reason: "It should update the condition with error if async apply failed",
args: args{
mg: xpresource.ManagedKind(xpfake.GVK(&fake.Terraformed{})),
Expand All @@ -58,7 +58,89 @@ func TestAPICallbacks_Apply(t *testing.T) {
MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
got := obj.(resource.Terraformed).GetCondition(resource.TypeLastAsyncOperation)
if diff := cmp.Diff(resource.LastAsyncOperationCondition(tjerrors.NewApplyFailed(nil)), got); diff != "" {
t.Errorf("\nApply(...): -want error, +got error:\n%s", diff)
t.Errorf("\nCreate(...): -want error, +got error:\n%s", diff)
}
return nil
},
},
Scheme: xpfake.SchemeWith(&fake.Terraformed{}),
},
err: tjerrors.NewApplyFailed(nil),
},
},
"CreateOperationSucceeded": {
reason: "It should update the condition with success if the apply operation does not report error",
args: args{
mg: xpresource.ManagedKind(xpfake.GVK(&fake.Terraformed{})),
mgr: &xpfake.Manager{
Client: &test.MockClient{
MockGet: test.NewMockGetFn(nil),
MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
got := obj.(resource.Terraformed).GetCondition(resource.TypeLastAsyncOperation)
if diff := cmp.Diff(resource.LastAsyncOperationCondition(nil), got); diff != "" {
t.Errorf("\nCreate(...): -want error, +got error:\n%s", diff)
}
return nil
},
},
Scheme: xpfake.SchemeWith(&fake.Terraformed{}),
},
},
},
"CannotGet": {
reason: "It should return error if it cannot get the resource to update",
args: args{
mg: xpresource.ManagedKind(xpfake.GVK(&fake.Terraformed{})),
mgr: &xpfake.Manager{
Client: &test.MockClient{
MockGet: func(_ context.Context, _ client.ObjectKey, _ client.Object) error {
return errBoom
},
},
Scheme: xpfake.SchemeWith(&fake.Terraformed{}),
},
},
want: want{
err: errors.Wrapf(errBoom, errGetFmt, "", ", Kind=/name", "create"),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
e := NewAPICallbacks(tc.args.mgr, tc.args.mg)
err := e.Create("name")(tc.args.err, context.TODO())
if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" {
t.Errorf("\n%s\nCreate(...): -want error, +got error:\n%s", tc.reason, diff)
}
})
}
}

func TestAPICallbacksUpdate(t *testing.T) {
type args struct {
mgr ctrl.Manager
mg xpresource.ManagedKind
err error
}
type want struct {
err error
}
cases := map[string]struct {
reason string
args
want
}{
"UpdateOperationFailed": {
reason: "It should update the condition with error if async apply failed",
args: args{
mg: xpresource.ManagedKind(xpfake.GVK(&fake.Terraformed{})),
mgr: &xpfake.Manager{
Client: &test.MockClient{
MockGet: test.NewMockGetFn(nil),
MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
got := obj.(resource.Terraformed).GetCondition(resource.TypeLastAsyncOperation)
if diff := cmp.Diff(resource.LastAsyncOperationCondition(tjerrors.NewApplyFailed(nil)), got); diff != "" {
t.Errorf("\nUpdate(...): -want error, +got error:\n%s", diff)
}
return nil
},
Expand All @@ -78,7 +160,7 @@ func TestAPICallbacks_Apply(t *testing.T) {
MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
got := obj.(resource.Terraformed).GetCondition(resource.TypeLastAsyncOperation)
if diff := cmp.Diff(resource.LastAsyncOperationCondition(nil), got); diff != "" {
t.Errorf("\nApply(...): -want error, +got error:\n%s", diff)
t.Errorf("\nUpdate(...): -want error, +got error:\n%s", diff)
}
return nil
},
Expand All @@ -101,16 +183,16 @@ func TestAPICallbacks_Apply(t *testing.T) {
},
},
want: want{
err: errors.Wrap(errBoom, errGet),
err: errors.Wrapf(errBoom, errGetFmt, "", ", Kind=/name", "update"),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
e := NewAPICallbacks(tc.args.mgr, tc.args.mg)
err := e.Apply("name")(tc.args.err, context.TODO())
err := e.Update("name")(tc.args.err, context.TODO())
if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" {
t.Errorf("\n%s\nApply(...): -want error, +got error:\n%s", tc.reason, diff)
t.Errorf("\n%s\nUpdate(...): -want error, +got error:\n%s", tc.reason, diff)
}
})
}
Expand Down Expand Up @@ -183,7 +265,7 @@ func TestAPICallbacks_Destroy(t *testing.T) {
},
},
want: want{
err: errors.Wrap(errBoom, errGet),
err: errors.Wrapf(errBoom, errGetFmt, "", ", Kind=/name", "destroy"),
},
},
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const (
errStartAsyncDestroy = "cannot start async destroy"
errApply = "cannot apply"
errDestroy = "cannot destroy"
errStatusUpdate = "cannot update status of custom resource"
errScheduleProvider = "cannot schedule native Terraform provider process"
)

Expand Down Expand Up @@ -281,7 +280,7 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Create(mg.GetName())), errStartAsyncApply)
}
tr, ok := mg.(resource.Terraformed)
if !ok {
Expand Down Expand Up @@ -312,7 +311,7 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Update(mg.GetName())), errStartAsyncApply)
}
tr, ok := mg.(resource.Terraformed)
if !ok {
Expand Down
35 changes: 25 additions & 10 deletions pkg/controller/external_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
/*
Copyright 2021 Upbound Inc.
*/
// Copyright 2023 Upbound Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

Expand Down Expand Up @@ -92,12 +102,17 @@ func (s StoreFns) Workspace(ctx context.Context, c resource.SecretClient, tr res
}

type CallbackFns struct {
ApplyFn func(string) terraform.CallbackFn
CreateFn func(string) terraform.CallbackFn
UpdateFn func(string) terraform.CallbackFn
DestroyFn func(string) terraform.CallbackFn
}

func (c CallbackFns) Apply(name string) terraform.CallbackFn {
return c.ApplyFn(name)
func (c CallbackFns) Create(name string) terraform.CallbackFn {
return c.CreateFn(name)
}

func (c CallbackFns) Update(name string) terraform.CallbackFn {
return c.UpdateFn(name)
}

func (c CallbackFns) Destroy(name string) terraform.CallbackFn {
Expand Down Expand Up @@ -474,7 +489,7 @@ func TestCreate(t *testing.T) {
UseAsync: true,
},
c: CallbackFns{
ApplyFn: func(s string) terraform.CallbackFn {
CreateFn: func(s string) terraform.CallbackFn {
return nil
},
},
Expand Down Expand Up @@ -540,14 +555,14 @@ func TestUpdate(t *testing.T) {
err: errors.New(errUnexpectedObject),
},
},
"AsyncFailed": {
"AsyncUpdateFailed": {
reason: "It should return error if it cannot trigger the async apply",
args: args{
cfg: &config.Resource{
UseAsync: true,
},
c: CallbackFns{
ApplyFn: func(s string) terraform.CallbackFn {
UpdateFn: func(s string) terraform.CallbackFn {
return nil
},
},
Expand All @@ -562,7 +577,7 @@ func TestUpdate(t *testing.T) {
err: errors.Wrap(errBoom, errStartAsyncApply),
},
},
"SyncApplyFailed": {
"SyncUpdateFailed": {
reason: "It should return error if it cannot apply in sync mode",
args: args{
cfg: &config.Resource{},
Expand Down
Loading

0 comments on commit e54d702

Please sign in to comment.