Skip to content

Commit

Permalink
xdsclient: Add error type for NACKed resources (#8117)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Feb 25, 2025
1 parent 65c6718 commit e0ac3ac
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 33 deletions.
2 changes: 1 addition & 1 deletion xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func (s) TestResolverError(t *testing.T) {
}

// Push a resource-not-found-error this time around.
resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
resolverErr = xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
r.ReportError(resolverErr)

// Wait for the CDS resource to be not requested anymore, or the connection
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, res
perResourceErrors[name] = err
// Add place holder in the map so we know this resource name was in
// the response.
ret[name] = ads.DataAndErrTuple{Err: err}
ret[name] = ads.DataAndErrTuple{Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, err.Error())}
}

if len(topLevelErrors) == 0 && len(perResourceErrors) == 0 {
Expand All @@ -299,7 +299,7 @@ func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, res
errRet := combineErrors(rType.TypeName(), topLevelErrors, perResourceErrors)
md.ErrState = &xdsresource.UpdateErrorMetadata{
Version: resp.Version,
Err: errRet,
Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, errRet.Error()),
Timestamp: timestamp,
}
return ret, md, errRet
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type wrappingWatcher struct {
}

func (w *wrappingWatcher) OnError(err error, done xdsresource.OnDoneFunc) {
w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %v", w.nodeID, err), done)
w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %w", w.nodeID, err), done)
}

// WatchResource uses xDS to discover the resource associated with the provided
Expand Down
5 changes: 2 additions & 3 deletions xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package xdsclient_test

import (
"context"
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -162,7 +161,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) {
}
gotResp = r.(*v3discoverypb.DiscoveryResponse)

wantNackErr := errors.New("unexpected http connection manager resource type")
wantNackErr := xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type")
if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -309,7 +308,7 @@ func (s) TestADS_NACK_InvalidFirstResponse(t *testing.T) {
gotResp := r.(*v3discoverypb.DiscoveryResponse)

// Verify that the error is propagated to the watcher.
var wantNackErr = errors.New("unexpected http connection manager resource type")
var wantNackErr = xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type")
if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/tests/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,6 @@ func (cw *clusterWatcherV2) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here simplifies tests that want access to the most recently received
// error.
cw.resourceNotFoundCh.Replace(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response"))
cw.resourceNotFoundCh.Replace(xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response"))
onDone()
}
6 changes: 3 additions & 3 deletions xds/internal/xdsclient/tests/cds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")})
onDone()
}

Expand Down Expand Up @@ -670,7 +670,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Verify that an empty update with the expected error is received.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyClusterUpdate(ctx, cw.updateCh, clusterUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -847,7 +847,7 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) {

// The first watcher should receive a resource removed error, while the
// second watcher should not receive an update.
if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil {
t.Fatal(err)
}
if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/tests/eds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
}

func (ew *endpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")})
onDone()
}

Expand Down Expand Up @@ -757,7 +757,7 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Verify that an empty update with the expected error is received.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyEndpointsUpdate(ctx, ew.updateCh, endpointsUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
Expand Down
39 changes: 25 additions & 14 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
}

func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

Expand All @@ -111,7 +111,7 @@ func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneF
}

func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
onDone()
}

Expand All @@ -135,10 +135,6 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener {
}
}

// xdsClient is expected to produce an error containing this string when an
// update is received containing a listener created using `badListenerResource`.
const wantListenerNACKErr = "no RouteSpecifier"

// verifyNoListenerUpdate verifies that no listener update is received on the
// provided update channel, and returns an error if an update is received.
//
Expand Down Expand Up @@ -195,6 +191,21 @@ func verifyListenerError(ctx context.Context, updateCh *testutils.Channel, wantE
return nil
}

func verifyErrorType(ctx context.Context, updateCh *testutils.Channel, wantErrType xdsresource.ErrorType, wantNodeID string) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err)
}
gotErr := u.(listenerUpdateErrTuple).err
if got, want := xdsresource.ErrType(gotErr), wantErrType; got != want {
return fmt.Errorf("update received with error %v of type: %v, want %v", gotErr, got, want)
}
if !strings.Contains(gotErr.Error(), wantNodeID) {
return fmt.Errorf("update received with error: %v, want error with node ID: %q", gotErr, wantNodeID)
}
return nil
}

// TestLDSWatch covers the case where a single watcher exists for a single
// listener resource. The test verifies the following scenarios:
// 1. An update from the management server containing the resource being
Expand Down Expand Up @@ -726,7 +737,7 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Verify that an empty update with the expected error is received.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -900,7 +911,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
// The first watcher should receive a resource removed error, while the
// second watcher should not see an update.
if err := verifyListenerUpdate(ctx, lw1.updateCh, listenerUpdateErrTuple{
err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, ""),
err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, ""),
}); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1002,7 +1013,7 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) {
}

// The existing watcher should receive a resource removed error.
updateError := listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}
updateError := listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")}
if err := verifyListenerUpdate(ctx, lw1.updateCh, updateError); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1061,15 +1072,15 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
}

// Verify that the expected error is propagated to the existing watcher.
if err := verifyListenerError(ctx, lw.updateCh, wantListenerNACKErr, nodeID); err != nil {
if err := verifyErrorType(ctx, lw.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
t.Fatal(err)
}

// Verify that the expected error is propagated to the new watcher as well.
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
if err := verifyErrorType(ctx, lw2.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1141,7 +1152,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
}

// Verify that the expected error is propagated to the existing watcher.
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
if err := verifyErrorType(ctx, lw1.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
t.Fatal(err)
}

Expand All @@ -1154,7 +1165,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
t.Fatal(err)
}
// Verify that the expected error is propagated to the existing watcher.
if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil {
if err := verifyErrorType(ctx, lw2.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1232,7 +1243,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
// Verify that the expected error is propagated to the watcher which
// requested for the bad resource.
// Verify that the expected error is propagated to the existing watcher.
if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil {
if err := verifyErrorType(ctx, lw1.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil {
t.Fatal(err)
}

Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/tests/misc_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFu
}

func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
onDone()
}

Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/tests/rds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc)
}

func (rw *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")})
onDone()
}

Expand Down Expand Up @@ -759,7 +759,7 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// Verify that an empty update with the expected error is received.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")
wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")
if err := verifyRouteConfigUpdate(ctx, rw.updateCh, routeConfigUpdateErrTuple{err: wantErr}); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/transport/ads/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (s *StreamImpl) onError(err error, msgReceived bool) {
// connection hitting its max connection age limit.
// (see [gRFC A9](/~https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md)).
if msgReceived {
err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, "%s", err.Error())
err = xdsresource.NewError(xdsresource.ErrTypeStreamFailedAfterRecv, err.Error())
}

s.eventHandler.OnADSStreamError(err)
Expand Down
17 changes: 15 additions & 2 deletions xds/internal/xdsclient/xdsresource/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package xdsresource

import "fmt"
import (
"errors"
"fmt"
)

// ErrorType is the type of the error that the watcher will receive from the xds
// client.
Expand All @@ -40,6 +43,9 @@ const (
// ErrTypeStreamFailedAfterRecv indicates an ADS stream error, after
// successful receipt of at least one message from the server.
ErrTypeStreamFailedAfterRecv
// ErrorTypeNACKed indicates that configuration provided by the xDS management
// server was NACKed.
ErrorTypeNACKed
)

type xdsClientError struct {
Expand All @@ -57,9 +63,16 @@ func NewErrorf(t ErrorType, format string, args ...any) error {
return &xdsClientError{t: t, desc: fmt.Sprintf(format, args...)}
}

// NewError creates an xDS client error. The callbacks are called with this
// error, to pass additional information about the error.
func NewError(t ErrorType, message string) error {
return NewErrorf(t, "%s", message)
}

// ErrType returns the error's type.
func ErrType(e error) ErrorType {
if xe, ok := e.(*xdsClientError); ok {
var xe *xdsClientError
if ok := errors.As(e, &xe); ok {
return xe.t
}
return ErrorTypeUnknown
Expand Down

0 comments on commit e0ac3ac

Please sign in to comment.