From e0ac3acff4e8b8e4f5035396ad44f4e19d09f87a Mon Sep 17 00:00:00 2001 From: Arjan Singh Bal <46515553+arjan-bal@users.noreply.github.com> Date: Tue, 25 Feb 2025 11:15:16 +0530 Subject: [PATCH] xdsclient: Add error type for NACKed resources (#8117) --- .../balancer/cdsbalancer/cdsbalancer_test.go | 2 +- xds/internal/xdsclient/channel.go | 4 +- xds/internal/xdsclient/clientimpl_watchers.go | 2 +- .../tests/ads_stream_ack_nack_test.go | 5 +-- .../xdsclient/tests/authority_test.go | 2 +- .../xdsclient/tests/cds_watchers_test.go | 6 +-- .../xdsclient/tests/eds_watchers_test.go | 4 +- .../xdsclient/tests/lds_watchers_test.go | 39 ++++++++++++------- .../xdsclient/tests/misc_watchers_test.go | 2 +- .../xdsclient/tests/rds_watchers_test.go | 4 +- .../xdsclient/transport/ads/ads_stream.go | 2 +- xds/internal/xdsclient/xdsresource/errors.go | 17 +++++++- 12 files changed, 56 insertions(+), 33 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 8e56cc25882d..34f92738d732 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -923,7 +923,7 @@ func (s) TestResolverError(t *testing.T) { } // Push a resource-not-found-error this time around. - resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error") + resolverErr = xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error") r.ReportError(resolverErr) // Wait for the CDS resource to be not requested anymore, or the connection diff --git a/xds/internal/xdsclient/channel.go b/xds/internal/xdsclient/channel.go index df6162ad632a..60ab9290b553 100644 --- a/xds/internal/xdsclient/channel.go +++ b/xds/internal/xdsclient/channel.go @@ -287,7 +287,7 @@ func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, res perResourceErrors[name] = err // Add place holder in the map so we know this resource name was in // the response. - ret[name] = ads.DataAndErrTuple{Err: err} + ret[name] = ads.DataAndErrTuple{Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, err.Error())} } if len(topLevelErrors) == 0 && len(perResourceErrors) == 0 { @@ -299,7 +299,7 @@ func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, res errRet := combineErrors(rType.TypeName(), topLevelErrors, perResourceErrors) md.ErrState = &xdsresource.UpdateErrorMetadata{ Version: resp.Version, - Err: errRet, + Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, errRet.Error()), Timestamp: timestamp, } return ret, md, errRet diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index cc8e0849598a..c080a0b4c895 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -34,7 +34,7 @@ type wrappingWatcher struct { } func (w *wrappingWatcher) OnError(err error, done xdsresource.OnDoneFunc) { - w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %v", w.nodeID, err), done) + w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %w", w.nodeID, err), done) } // WatchResource uses xDS to discover the resource associated with the provided diff --git a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go index fe5b67ba4ca0..090faaa00de0 100644 --- a/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go +++ b/xds/internal/xdsclient/tests/ads_stream_ack_nack_test.go @@ -20,7 +20,6 @@ package xdsclient_test import ( "context" - "errors" "fmt" "strings" "testing" @@ -162,7 +161,7 @@ func (s) TestADS_ACK_NACK_Simple(t *testing.T) { } gotResp = r.(*v3discoverypb.DiscoveryResponse) - wantNackErr := errors.New("unexpected http connection manager resource type") + wantNackErr := xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type") if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil { t.Fatal(err) } @@ -309,7 +308,7 @@ func (s) TestADS_NACK_InvalidFirstResponse(t *testing.T) { gotResp := r.(*v3discoverypb.DiscoveryResponse) // Verify that the error is propagated to the watcher. - var wantNackErr = errors.New("unexpected http connection manager resource type") + var wantNackErr = xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type") if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantNackErr}); err != nil { t.Fatal(err) } diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index 2202437859f1..a73822787b36 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -370,6 +370,6 @@ func (cw *clusterWatcherV2) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc // resends resources which are NACKed by the xDS client, using a `Replace()` // here simplifies tests that want access to the most recently received // error. - cw.resourceNotFoundCh.Replace(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")) + cw.resourceNotFoundCh.Replace(xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")) onDone() } diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index 219117a3af84..7cd14a16620f 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -82,7 +82,7 @@ func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { } func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) + cw.updateCh.Replace(clusterUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")}) onDone() } @@ -670,7 +670,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Verify that an empty update with the expected error is received. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") + wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "") if err := verifyClusterUpdate(ctx, cw.updateCh, clusterUpdateErrTuple{err: wantErr}); err != nil { t.Fatal(err) } @@ -847,7 +847,7 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) { // The first watcher should receive a resource removed error, while the // second watcher should not receive an update. - if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil { + if err := verifyClusterUpdate(ctx, cw1.updateCh, clusterUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")}); err != nil { t.Fatal(err) } if err := verifyNoClusterUpdate(ctx, cw2.updateCh); err != nil { diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index 212638617178..fd393991079a 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -91,7 +91,7 @@ func (ew *endpointsWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { } func (ew *endpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) + ew.updateCh.Replace(endpointsUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Endpoints not found in received response")}) onDone() } @@ -757,7 +757,7 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Verify that an empty update with the expected error is received. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") + wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "") if err := verifyEndpointsUpdate(ctx, ew.updateCh, endpointsUpdateErrTuple{err: wantErr}); err != nil { t.Fatal(err) } diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index ce66fbf250b0..333fb1e50a66 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -86,7 +86,7 @@ func (lw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) { } func (lw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + lw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) onDone() } @@ -111,7 +111,7 @@ func (lw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneF } func (lw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) + lw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")}) onDone() } @@ -135,10 +135,6 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener { } } -// xdsClient is expected to produce an error containing this string when an -// update is received containing a listener created using `badListenerResource`. -const wantListenerNACKErr = "no RouteSpecifier" - // verifyNoListenerUpdate verifies that no listener update is received on the // provided update channel, and returns an error if an update is received. // @@ -195,6 +191,21 @@ func verifyListenerError(ctx context.Context, updateCh *testutils.Channel, wantE return nil } +func verifyErrorType(ctx context.Context, updateCh *testutils.Channel, wantErrType xdsresource.ErrorType, wantNodeID string) error { + u, err := updateCh.Receive(ctx) + if err != nil { + return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err) + } + gotErr := u.(listenerUpdateErrTuple).err + if got, want := xdsresource.ErrType(gotErr), wantErrType; got != want { + return fmt.Errorf("update received with error %v of type: %v, want %v", gotErr, got, want) + } + if !strings.Contains(gotErr.Error(), wantNodeID) { + return fmt.Errorf("update received with error: %v, want error with node ID: %q", gotErr, wantNodeID) + } + return nil +} + // TestLDSWatch covers the case where a single watcher exists for a single // listener resource. The test verifies the following scenarios: // 1. An update from the management server containing the resource being @@ -726,7 +737,7 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Verify that an empty update with the expected error is received. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") + wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "") if err := verifyListenerUpdate(ctx, lw.updateCh, listenerUpdateErrTuple{err: wantErr}); err != nil { t.Fatal(err) } @@ -900,7 +911,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { // The first watcher should receive a resource removed error, while the // second watcher should not see an update. if err := verifyListenerUpdate(ctx, lw1.updateCh, listenerUpdateErrTuple{ - err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, ""), + err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, ""), }); err != nil { t.Fatal(err) } @@ -1002,7 +1013,7 @@ func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { } // The existing watcher should receive a resource removed error. - updateError := listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")} + updateError := listenerUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "")} if err := verifyListenerUpdate(ctx, lw1.updateCh, updateError); err != nil { t.Fatal(err) } @@ -1061,7 +1072,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { } // Verify that the expected error is propagated to the existing watcher. - if err := verifyListenerError(ctx, lw.updateCh, wantListenerNACKErr, nodeID); err != nil { + if err := verifyErrorType(ctx, lw.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil { t.Fatal(err) } @@ -1069,7 +1080,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { lw2 := newListenerWatcher() ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) defer ldsCancel2() - if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil { + if err := verifyErrorType(ctx, lw2.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil { t.Fatal(err) } } @@ -1141,7 +1152,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { } // Verify that the expected error is propagated to the existing watcher. - if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil { + if err := verifyErrorType(ctx, lw1.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil { t.Fatal(err) } @@ -1154,7 +1165,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) { t.Fatal(err) } // Verify that the expected error is propagated to the existing watcher. - if err := verifyListenerError(ctx, lw2.updateCh, wantListenerNACKErr, nodeID); err != nil { + if err := verifyErrorType(ctx, lw2.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil { t.Fatal(err) } } @@ -1232,7 +1243,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { // Verify that the expected error is propagated to the watcher which // requested for the bad resource. // Verify that the expected error is propagated to the existing watcher. - if err := verifyListenerError(ctx, lw1.updateCh, wantListenerNACKErr, nodeID); err != nil { + if err := verifyErrorType(ctx, lw1.updateCh, xdsresource.ErrorTypeNACKed, nodeID); err != nil { t.Fatal(err) } diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 3fcb20bd889f..a9b8b61c7d0d 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -90,7 +90,7 @@ func (rw *testRouteConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFu } func (rw *testRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) onDone() } diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index 1ccb9a82a82f..6209e3336d41 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -81,7 +81,7 @@ func (rw *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) } func (rw *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { - rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) + rw.updateCh.Replace(routeConfigUpdateErrTuple{err: xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "RouteConfiguration not found in received response")}) onDone() } @@ -759,7 +759,7 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Verify that an empty update with the expected error is received. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - wantErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "") + wantErr := xdsresource.NewError(xdsresource.ErrorTypeResourceNotFound, "") if err := verifyRouteConfigUpdate(ctx, rw.updateCh, routeConfigUpdateErrTuple{err: wantErr}); err != nil { t.Fatal(err) } diff --git a/xds/internal/xdsclient/transport/ads/ads_stream.go b/xds/internal/xdsclient/transport/ads/ads_stream.go index bf7510058c5f..fc41b38edade 100644 --- a/xds/internal/xdsclient/transport/ads/ads_stream.go +++ b/xds/internal/xdsclient/transport/ads/ads_stream.go @@ -664,7 +664,7 @@ func (s *StreamImpl) onError(err error, msgReceived bool) { // connection hitting its max connection age limit. // (see [gRFC A9](/~https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md)). if msgReceived { - err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, "%s", err.Error()) + err = xdsresource.NewError(xdsresource.ErrTypeStreamFailedAfterRecv, err.Error()) } s.eventHandler.OnADSStreamError(err) diff --git a/xds/internal/xdsclient/xdsresource/errors.go b/xds/internal/xdsclient/xdsresource/errors.go index d47d6283fe15..06909fbd4bc3 100644 --- a/xds/internal/xdsclient/xdsresource/errors.go +++ b/xds/internal/xdsclient/xdsresource/errors.go @@ -18,7 +18,10 @@ package xdsresource -import "fmt" +import ( + "errors" + "fmt" +) // ErrorType is the type of the error that the watcher will receive from the xds // client. @@ -40,6 +43,9 @@ const ( // ErrTypeStreamFailedAfterRecv indicates an ADS stream error, after // successful receipt of at least one message from the server. ErrTypeStreamFailedAfterRecv + // ErrorTypeNACKed indicates that configuration provided by the xDS management + // server was NACKed. + ErrorTypeNACKed ) type xdsClientError struct { @@ -57,9 +63,16 @@ func NewErrorf(t ErrorType, format string, args ...any) error { return &xdsClientError{t: t, desc: fmt.Sprintf(format, args...)} } +// NewError creates an xDS client error. The callbacks are called with this +// error, to pass additional information about the error. +func NewError(t ErrorType, message string) error { + return NewErrorf(t, "%s", message) +} + // ErrType returns the error's type. func ErrType(e error) ErrorType { - if xe, ok := e.(*xdsClientError); ok { + var xe *xdsClientError + if ok := errors.As(e, &xe); ok { return xe.t } return ErrorTypeUnknown