Skip to content

Commit

Permalink
xdsclient: invoke connectivity failure callback only after all listed…
Browse files Browse the repository at this point in the history
… servers have failed (#8075)
  • Loading branch information
easwars authored Feb 11, 2025
1 parent ad5cd32 commit 4b5608f
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 63 deletions.
118 changes: 55 additions & 63 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,6 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig,
return
}

// Propagate the connection error from the transport layer to all watchers.
for _, rType := range a.resources {
for _, state := range rType {
for watcher := range state.watchers {
watcher := watcher
a.watcherCallbackSerializer.TrySchedule(func(context.Context) {
watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {})
})
}
}
}

// Two conditions need to be met for fallback to be triggered:
// 1. There is a connectivity failure on the ADS stream, as described in
// gRFC A57. For us, this means that the ADS stream was closed before the
Expand All @@ -221,88 +209,97 @@ func (a *authority) handleADSStreamFailure(serverConfig *bootstrap.ServerConfig,
if a.logger.V(2) {
a.logger.Infof("No watchers for uncached resources. Not triggering fallback")
}
// Since we are not triggering fallback, propagate the connectivity
// error to all watchers and return early.
a.propagateConnectivityErrorToAllWatchers(err)
return
}
a.fallbackToNextServerIfPossible(serverConfig)

// Attempt to fallback to servers with lower priority than the failing one.
currentServerIdx := a.serverIndexForConfig(serverConfig)
for i := currentServerIdx + 1; i < len(a.xdsChannelConfigs); i++ {
if a.fallbackToServer(a.xdsChannelConfigs[i]) {
// Since we have successfully triggered fallback, we don't have to
// notify watchers about the connectivity error.
return
}
}

// Having exhausted all available servers, we must notify watchers of the
// connectivity error - A71.
a.propagateConnectivityErrorToAllWatchers(err)
}

// propagateConnectivityErrorToAllWatchers propagates the given connection error
// to all watchers of all resources.
//
// Only executed in the context of a serializer callback.
func (a *authority) propagateConnectivityErrorToAllWatchers(err error) {
for _, rType := range a.resources {
for _, state := range rType {
for watcher := range state.watchers {
a.watcherCallbackSerializer.TrySchedule(func(context.Context) {
watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err), func() {})
})
}
}
}
}

// serverIndexForConfig returns the index of the xdsChannelConfig that matches
// the provided ServerConfig. If no match is found, it returns the length of the
// xdsChannelConfigs slice, which represents the index of a non-existent config.
// serverIndexForConfig returns the index of the xdsChannelConfig matching the
// provided server config, panicking if no match is found (which indicates a
// programming error).
func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int {
for i, cfg := range a.xdsChannelConfigs {
if cfg.serverConfig.Equal(sc) {
return i
}
}
return len(a.xdsChannelConfigs)
panic(fmt.Sprintf("no server config matching %v found", sc))
}

// Determines the server to fallback to and triggers fallback to the same. If
// required, creates an xdsChannel to that server, and re-subscribes to all
// existing resources.
//
// Only executed in the context of a serializer callback.
func (a *authority) fallbackToNextServerIfPossible(failingServerConfig *bootstrap.ServerConfig) {
func (a *authority) fallbackToServer(xc *xdsChannelWithConfig) bool {
if a.logger.V(2) {
a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig)
a.logger.Infof("Attempting to initiate fallback to server %q", xc.serverConfig)
}

// The server to fallback to is the next server on the list. If the current
// server is the last server, then there is nothing that can be done.
currentServerIdx := a.serverIndexForConfig(failingServerConfig)
if currentServerIdx == len(a.xdsChannelConfigs) {
// This can never happen.
a.logger.Errorf("Received error from an unknown server: %s", failingServerConfig)
return
}
if currentServerIdx == len(a.xdsChannelConfigs)-1 {
if xc.channel != nil {
if a.logger.V(2) {
a.logger.Infof("No more servers to fallback to")
a.logger.Infof("Channel to the next server in the list %q already exists", xc.serverConfig)
}
return
return false
}
fallbackServerIdx := currentServerIdx + 1
fallbackChannel := a.xdsChannelConfigs[fallbackServerIdx]

// If the server to fallback to already has an xdsChannel, it means that
// this connectivity error is from a server with a higher priority. There
// is not much we can do here.
if fallbackChannel.channel != nil {
if a.logger.V(2) {
a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.serverConfig)
}
return
}

// Create an xdsChannel for the fallback server.
if a.logger.V(2) {
a.logger.Infof("Initiating fallback to server %s", fallbackChannel.serverConfig)
}
xc, cleanup, err := a.getChannelForADS(fallbackChannel.serverConfig, a)
channel, cleanup, err := a.getChannelForADS(xc.serverConfig, a)
if err != nil {
a.logger.Errorf("Failed to create XDS channel: %v", err)
return
a.logger.Errorf("Failed to create xDS channel: %v", err)
return false
}
fallbackChannel.channel = xc
fallbackChannel.cleanup = cleanup
a.activeXDSChannel = fallbackChannel
xc.channel = channel
xc.cleanup = cleanup
a.activeXDSChannel = xc

// Subscribe to all existing resources from the new management server.
for typ, resources := range a.resources {
for name, state := range resources {
if a.logger.V(2) {
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName(), name)
}
xc.subscribe(typ, name)
xc.channel.subscribe(typ, name)

// Add the fallback channel to the list of xdsChannels from which
// this resource has been requested from. Retain the cached resource
// and the set of existing watchers (and other metadata fields) in
// the resource state.
state.xdsChannelConfigs[fallbackChannel] = true
// Add the new channel to the list of xdsChannels from which this
// resource has been requested from. Retain the cached resource and
// the set of existing watchers (and other metadata fields) in the
// resource state.
state.xdsChannelConfigs[xc] = true
}
}
return true
}

// adsResourceUpdate is called to notify the authority about a resource update
Expand Down Expand Up @@ -546,11 +543,6 @@ func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.Ser
// to revert back to it. This method guarantees that when an update is
// received from a server, all lower priority servers are closed.
serverIdx := a.serverIndexForConfig(serverConfig)
if serverIdx == len(a.xdsChannelConfigs) {
// This can never happen.
a.logger.Errorf("Received update from an unknown server: %s", serverConfig)
return
}
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx]

// Close all lower priority channels.
Expand Down
162 changes: 162 additions & 0 deletions xds/internal/xdsclient/tests/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
Expand Down Expand Up @@ -211,3 +213,163 @@ func (s) TestAuthority_XDSChannelClose(t *testing.T) {
t.Fatal("Timeout when waiting for connection to management server to be closed")
}
}

// Tests the scenario where the primary management server is unavailable at
// startup and the xDS client falls back to the secondary. The test verifies
// that the resource watcher is not notifified of the connectivity failure until
// all servers have failed.
func (s) TestAuthority_Fallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Create primary and secondary management servers with restartable
// listeners.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
primaryLis := testutils.NewRestartableListener(l)
primaryMgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: primaryLis})
l, err = testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
secondaryLis := testutils.NewRestartableListener(l)
secondaryMgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: secondaryLis})

// Create bootstrap configuration with the above primary and fallback
// management servers, and an xDS client with that configuration.
nodeID := uuid.New().String()
bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []byte(fmt.Sprintf(`
[
{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
},
{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}
]`, primaryMgmtServer.Address, secondaryMgmtServer.Address)),
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
})
if err != nil {
t.Fatalf("Failed to create bootstrap configuration: %v", err)
}
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
xdsC, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{Name: t.Name()})
if err != nil {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()

const clusterName = "cluster"
const edsPrimaryName = "eds-primary"
const edsSecondaryName = "eds-secondary"

// Create a Cluster resource on the primary.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
e2e.DefaultCluster(clusterName, edsPrimaryName, e2e.SecurityLevelNone),
},
SkipValidation: true,
}
if err := primaryMgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update primary management server with resources: %v, err: %v", resources, err)
}

// Create a Cluster resource on the secondary .
resources = e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
e2e.DefaultCluster(clusterName, edsSecondaryName, e2e.SecurityLevelNone),
},
SkipValidation: true,
}
if err := secondaryMgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update primary management server with resources: %v, err: %v", resources, err)
}

// Stop the primary.
primaryLis.Close()

// Register a watch.
watcher := newClusterWatcherV2()
cdsCancel := xdsresource.WatchCluster(xdsC, clusterName, watcher)
defer cdsCancel()

// Ensure that the connectivity error callback is not called.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if v, err := watcher.errCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Fatalf("Error callback on the watcher with error: %v", v.(error))
}

// Ensure that the resource update callback is invoked.
v, err := watcher.updateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error when waiting for a resource update callback: %v", err)
}
gotUpdate := v.(xdsresource.ClusterUpdate)
wantUpdate := xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: edsSecondaryName,
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels")}
if diff := cmp.Diff(wantUpdate, gotUpdate, cmpOpts...); diff != "" {
t.Fatalf("Diff in the cluster resource update: (-want, got):\n%s", diff)
}

// Stop the secondary.
secondaryLis.Close()

// Ensure that the connectivity error callback is called.
if _, err := watcher.errCh.Receive(ctx); err != nil {
t.Fatal("Timeout when waiting for error callback on the watcher")
}
}

// TODO: Get rid of the clusterWatcher type in cds_watchers_test.go and use this
// one instead. Also, rename this to clusterWatcher as part of that refactor.
type clusterWatcherV2 struct {
updateCh *testutils.Channel // Messages of type xdsresource.ClusterUpdate
errCh *testutils.Channel // Messages of type error
resourceNotFoundCh *testutils.Channel // Messages of type error
}

func newClusterWatcherV2() *clusterWatcherV2 {
return &clusterWatcherV2{
updateCh: testutils.NewChannel(),
errCh: testutils.NewChannel(),
resourceNotFoundCh: testutils.NewChannel(),
}
}

func (cw *clusterWatcherV2) OnUpdate(update *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
cw.updateCh.Send(update.Resource)
onDone()
}

func (cw *clusterWatcherV2) OnError(err error, onDone xdsresource.OnDoneFunc) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here simplifies tests that want access to the most recently received
// error.
cw.errCh.Replace(err)
onDone()
}

func (cw *clusterWatcherV2) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
// When used with a go-control-plane management server that continuously
// resends resources which are NACKed by the xDS client, using a `Replace()`
// here simplifies tests that want access to the most recently received
// error.
cw.resourceNotFoundCh.Replace(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Cluster not found in received response"))
onDone()
}
Loading

0 comments on commit 4b5608f

Please sign in to comment.