diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 17940c0e9f7f..d92698219f39 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -196,18 +196,6 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, return } - // Propagate the connection error from the transport layer to all watchers. - for _, rType := range a.resources { - for _, state := range rType { - for watcher := range state.watchers { - watcher := watcher - a.watcherCallbackSerializer.TrySchedule(func(context.Context) { - watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {}) - }) - } - } - } - // Two conditions need to be met for fallback to be triggered: // 1. There is a connectivity failure on the ADS stream, as described in // gRFC A57. For us, this means that the ADS stream was closed before the @@ -221,21 +209,53 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig, if a.logger.V(2) { a.logger.Infof("No watchers for uncached resources. Not triggering fallback") } + // Since we are not triggering fallback, propagate the connectivity + // error to all watchers and return early. + a.propagateConnectivityErrorToAllWatchers(err) return } - a.fallbackToNextServerIfPossible(serverConfig) + + // Attempt to fallback to servers with lower priority than the failing one. + currentServerIdx := a.serverIndexForConfig(serverConfig) + for i := currentServerIdx + 1; i < len(a.xdsChannelConfigs); i++ { + if a.fallbackToServer(a.xdsChannelConfigs[i]) { + // Since we have successfully triggered fallback, we don't have to + // notify watchers about the connectivity error. + return + } + } + + // Having exhausted all available servers, we must notify watchers of the + // connectivity error - A71. + a.propagateConnectivityErrorToAllWatchers(err) +} + +// propagateConnectivityErrorToAllWatchers propagates the given connection error +// to all watchers of all resources. +// +// Only executed in the context of a serializer callback. +func (a *authority) propagateConnectivityErrorToAllWatchers(err error) { + for _, rType := range a.resources { + for _, state := range rType { + for watcher := range state.watchers { + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { + watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {}) + }) + } + } + } } -// serverIndexForConfig returns the index of the xdsChannelConfig that matches -// the provided ServerConfig. If no match is found, it returns the length of the -// xdsChannelConfigs slice, which represents the index of a non-existent config. +// serverIndexForConfig returns the index of the xdsChannelConfig matching the +// provided server config, panicking if no match is found (which indicates a +// programming error). func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int { for i, cfg := range a.xdsChannelConfigs { if cfg.serverConfig.Equal(sc) { return i } } - return len(a.xdsChannelConfigs) + panic(fmt.Sprintf("no server config matching %v found", sc)) } // Determines the server to fallback to and triggers fallback to the same. If @@ -243,50 +263,26 @@ func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int { // existing resources. // // Only executed in the context of a serializer callback. -func (a *authority) fallbackToNextServerIfPossible(failingServerConfig *bootstrap.ServerConfig) { +func (a *authority) fallbackToServer(xc *xdsChannelWithConfig) bool { if a.logger.V(2) { - a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig) + a.logger.Infof("Attempting to initiate fallback to server %q", xc.serverConfig) } - // The server to fallback to is the next server on the list. If the current - // server is the last server, then there is nothing that can be done. - currentServerIdx := a.serverIndexForConfig(failingServerConfig) - if currentServerIdx == len(a.xdsChannelConfigs) { - // This can never happen. - a.logger.Errorf("Received error from an unknown server: %s", failingServerConfig) - return - } - if currentServerIdx == len(a.xdsChannelConfigs)-1 { + if xc.channel != nil { if a.logger.V(2) { - a.logger.Infof("No more servers to fallback to") + a.logger.Infof("Channel to the next server in the list %q already exists", xc.serverConfig) } - return + return false } - fallbackServerIdx := currentServerIdx + 1 - fallbackChannel := a.xdsChannelConfigs[fallbackServerIdx] - // If the server to fallback to already has an xdsChannel, it means that - // this connectivity error is from a server with a higher priority. There - // is not much we can do here. - if fallbackChannel.channel != nil { - if a.logger.V(2) { - a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.serverConfig) - } - return - } - - // Create an xdsChannel for the fallback server. - if a.logger.V(2) { - a.logger.Infof("Initiating fallback to server %s", fallbackChannel.serverConfig) - } - xc, cleanup, err := a.getChannelForADS(fallbackChannel.serverConfig, a) + channel, cleanup, err := a.getChannelForADS(xc.serverConfig, a) if err != nil { - a.logger.Errorf("Failed to create XDS channel: %v", err) - return + a.logger.Errorf("Failed to create xDS channel: %v", err) + return false } - fallbackChannel.channel = xc - fallbackChannel.cleanup = cleanup - a.activeXDSChannel = fallbackChannel + xc.channel = channel + xc.cleanup = cleanup + a.activeXDSChannel = xc // Subscribe to all existing resources from the new management server. for typ, resources := range a.resources { @@ -294,15 +290,16 @@ func (a *authority) fallbackToNextServerIfPossible(failingServerConfig *bootstra if a.logger.V(2) { a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName(), name) } - xc.subscribe(typ, name) + xc.channel.subscribe(typ, name) - // Add the fallback channel to the list of xdsChannels from which - // this resource has been requested from. Retain the cached resource - // and the set of existing watchers (and other metadata fields) in - // the resource state. - state.xdsChannelConfigs[fallbackChannel] = true + // Add the new channel to the list of xdsChannels from which this + // resource has been requested from. Retain the cached resource and + // the set of existing watchers (and other metadata fields) in the + // resource state. + state.xdsChannelConfigs[xc] = true } } + return true } // adsResourceUpdate is called to notify the authority about a resource update @@ -546,11 +543,6 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.Ser // to revert back to it. This method guarantees that when an update is // received from a server, all lower priority servers are closed. serverIdx := a.serverIndexForConfig(serverConfig) - if serverIdx == len(a.xdsChannelConfigs) { - // This can never happen. - a.logger.Errorf("Received update from an unknown server: %s", serverConfig) - return - } a.activeXDSChannel = a.xdsChannelConfigs[serverIdx] // Close all lower priority channels. diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index ed897b2457cf..2202437859f1 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -24,6 +24,8 @@ import ( "fmt" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" @@ -211,3 +213,163 @@ func (s) TestAuthority_XDSChannelClose(t *testing.T) { t.Fatal("Timeout when waiting for connection to management server to be closed") } } + +// Tests the scenario where the primary management server is unavailable at +// startup and the xDS client falls back to the secondary. The test verifies +// that the resource watcher is not notifified of the connectivity failure until +// all servers have failed. +func (s) TestAuthority_Fallback(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create primary and secondary management servers with restartable + // listeners. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + primaryLis := testutils.NewRestartableListener(l) + primaryMgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: primaryLis}) + l, err = testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + secondaryLis := testutils.NewRestartableListener(l) + secondaryMgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: secondaryLis}) + + // Create bootstrap configuration with the above primary and fallback + // management servers, and an xDS client with that configuration. + nodeID := uuid.New().String() + bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + Servers: []byte(fmt.Sprintf(` + [ + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }, + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + } + ]`, primaryMgmtServer.Address, secondaryMgmtServer.Address)), + Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), + }) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + config, err := bootstrap.NewConfigFromContents(bootstrapContents) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) + } + pool := xdsclient.NewPool(config) + xdsC, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) + } + defer close() + + const clusterName = "cluster" + const edsPrimaryName = "eds-primary" + const edsSecondaryName = "eds-secondary" + + // Create a Cluster resource on the primary. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + e2e.DefaultCluster(clusterName, edsPrimaryName, e2e.SecurityLevelNone), + }, + SkipValidation: true, + } + if err := primaryMgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update primary management server with resources: %v, err: %v", resources, err) + } + + // Create a Cluster resource on the secondary . + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + e2e.DefaultCluster(clusterName, edsSecondaryName, e2e.SecurityLevelNone), + }, + SkipValidation: true, + } + if err := secondaryMgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update primary management server with resources: %v, err: %v", resources, err) + } + + // Stop the primary. + primaryLis.Close() + + // Register a watch. + watcher := newClusterWatcherV2() + cdsCancel := xdsresource.WatchCluster(xdsC, clusterName, watcher) + defer cdsCancel() + + // Ensure that the connectivity error callback is not called. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if v, err := watcher.errCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatalf("Error callback on the watcher with error: %v", v.(error)) + } + + // Ensure that the resource update callback is invoked. + v, err := watcher.updateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error when waiting for a resource update callback: %v", err) + } + gotUpdate := v.(xdsresource.ClusterUpdate) + wantUpdate := xdsresource.ClusterUpdate{ + ClusterName: clusterName, + EDSServiceName: edsSecondaryName, + } + cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels")} + if diff := cmp.Diff(wantUpdate, gotUpdate, cmpOpts...); diff != "" { + t.Fatalf("Diff in the cluster resource update: (-want, got):\n%s", diff) + } + + // Stop the secondary. + secondaryLis.Close() + + // Ensure that the connectivity error callback is called. + if _, err := watcher.errCh.Receive(ctx); err != nil { + t.Fatal("Timeout when waiting for error callback on the watcher") + } +} + +// TODO: Get rid of the clusterWatcher type in cds_watchers_test.go and use this +// one instead. Also, rename this to clusterWatcher as part of that refactor. +type clusterWatcherV2 struct { + updateCh *testutils.Channel // Messages of type xdsresource.ClusterUpdate + errCh *testutils.Channel // Messages of type error + resourceNotFoundCh *testutils.Channel // Messages of type error +} + +func newClusterWatcherV2() *clusterWatcherV2 { + return &clusterWatcherV2{ + updateCh: testutils.NewChannel(), + errCh: testutils.NewChannel(), + resourceNotFoundCh: testutils.NewChannel(), + } +} + +func (cw *clusterWatcherV2) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) { + cw.updateCh.Send(update.Resource) + onDone() +} + +func (cw *clusterWatcherV2) OnError(err error, onDone xdsresource.OnDoneFunc) { + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here simplifies tests that want access to the most recently received + // error. + cw.errCh.Replace(err) + onDone() +} + +func (cw *clusterWatcherV2) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) { + // When used with a go-control-plane management server that continuously + // resends resources which are NACKed by the xDS client, using a `Replace()` + // here simplifies tests that want access to the most recently received + // error. + cw.resourceNotFoundCh.Replace(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response")) + onDone() +} diff --git a/xds/internal/xdsclient/tests/fallback_test.go b/xds/internal/xdsclient/tests/fallback_test.go index cfb38f123ecb..cdff9f4298f2 100644 --- a/xds/internal/xdsclient/tests/fallback_test.go +++ b/xds/internal/xdsclient/tests/fallback_test.go @@ -601,3 +601,130 @@ func (s) TestFallback_MidStartup(t *testing.T) { t.Fatalf("Connection to fallback server not closed once primary becomes ready: %v", err) } } + +// Tests that RPCs succeed at startup when the primary management server is +// down, but the secondary is available. +func (s) TestFallback_OnStartup_RPCSuccess(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTestTimeout) + defer cancel() + + // Create two listeners for the two management servers. The test can + // start/stop these listeners. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + primaryLis := testutils.NewRestartableListener(l) + l, err = testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + fallbackLis := testutils.NewRestartableListener(l) + + // Start two management servers, primary and fallback, with the above + // listeners. + primaryManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: primaryLis}) + fallbackManagementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: fallbackLis}) + + // Start two test service backends. + backend1 := stubserver.StartTestService(t, nil) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, nil) + defer backend2.Stop() + + // Configure xDS resource on the primary management server, with a cluster + // resource that contains an endpoint for backend1. + nodeID := uuid.New().String() + const serviceName = "my-service-fallback-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: testutils.ParsePort(t, backend1.Address), + SecLevel: e2e.SecurityLevelNone, + }) + if err := primaryManagementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Configure xDS resource on the secondary management server, with a cluster + // resource that contains an endpoint for backend2. Only the listener + // resource has the same name on both servers. + fallbackRouteConfigName := "fallback-route-" + serviceName + fallbackClusterName := "fallback-cluster-" + serviceName + fallbackEndpointsName := "fallback-endpoints-" + serviceName + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, fallbackRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(fallbackRouteConfigName, serviceName, fallbackClusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(fallbackClusterName, fallbackEndpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(fallbackEndpointsName, "localhost", []uint32{testutils.ParsePort(t, backend2.Address)})}, + } + if err := fallbackManagementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Shutdown the primary management server before starting the gRPC client to + // trigger fallback on startup. + primaryLis.Stop() + + // Generate bootstrap configuration with the above two servers. + bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{ + Servers: []byte(fmt.Sprintf(`[ + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }, + { + "server_uri": %q, + "channel_creds": [{"type": "insecure"}] + }]`, primaryManagementServer.Address, fallbackManagementServer.Address)), + Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), + }) + if err != nil { + t.Fatalf("Failed to create bootstrap file: %v", err) + } + + // Create an xDS client with the above bootstrap configuration. + config, err := bootstrap.NewConfigFromContents(bootstrapContents) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err) + } + pool := xdsclient.NewPool(config) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + + // Get the xDS resolver to use the above xDS client. + resolverBuilder := internal.NewXDSResolverWithPoolForTesting.(func(*xdsclient.Pool) (resolver.Builder, error)) + resolver, err := resolverBuilder(pool) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + // Start a gRPC client that uses the above xDS resolver. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("Failed to create gRPC client: %v", err) + } + defer cc.Close() + + // Make an RPC (without the `wait_for_ready` call option) and expect it to + // succeed since the fallback management server is up and running. + client := testgrpc.NewTestServiceClient(cc) + var peer peer.Peer + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if got, want := peer.Addr.String(), backend2.Address; got != want { + t.Fatalf("Unexpected peer address: got %q, want %q", got, want) + } + + // Start the primary server. It can take a while before the xDS client + // notices this, since the ADS stream implementation uses a backoff before + // retrying the stream. + primaryLis.Restart() + if err := waitForRPCsToReachBackend(ctx, client, backend1.Address); err != nil { + t.Fatal(err) + } +}