Skip to content

Commit

Permalink
cds: stop child policies on resource-not-found errors (#8122)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Feb 26, 2025
1 parent dbf92b4 commit feaf942
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 50 deletions.
36 changes: 24 additions & 12 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,14 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
// ResolverError handles errors reported by the xdsResolver.
func (b *cdsBalancer) ResolverError(err error) {
b.serializer.TrySchedule(func(context.Context) {
// Resource not found error is reported by the resolver when the
// top-level cluster resource is removed by the management server.
// Missing Listener or RouteConfiguration on the management server
// results in a 'resource not found' error from the xDS resolver. In
// these cases, we should stap watching all of the current clusters
// being watched.
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
b.closeAllWatchers()
b.closeChildPolicyAndReportTF(err)
return
}
var root string
if b.lbCfg != nil {
Expand All @@ -372,6 +376,22 @@ func (b *cdsBalancer) closeAllWatchers() {
}
}

// closeChildPolicyAndReportTF closes the child policy, if it exists, and
// updates the connectivity state of the channel to TransientFailure with an
// error picker.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) closeChildPolicyAndReportTF(err error) {
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
}
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
}

// Close cancels the CDS watch, closes the child policy and closes the
// cdsBalancer.
func (b *cdsBalancer) Close() {
Expand Down Expand Up @@ -537,16 +557,8 @@ func (b *cdsBalancer) onClusterError(name string, err error) {
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name)
if b.childLB != nil {
b.childLB.ResolverError(err)
} else {
// If child balancer was never created, fail the RPCs with errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "cluster %q not found", name)
b.closeChildPolicyAndReportTF(err)
}

// Generates discovery mechanisms for the cluster graph rooted at `name`. This
Expand Down
125 changes: 88 additions & 37 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,6 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
// - when a bad cluster resource update is received after a previous good
// update from the management server, the cds LB policy is expected to
// continue using the previous good update.
// - when the cluster resource is removed after a previous good
// update from the management server, the cds LB policy is expected to put
// the channel in TRANSIENT_FAILURE.
func (s) TestClusterUpdate_Failure(t *testing.T) {
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
Expand Down Expand Up @@ -778,34 +775,6 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
case <-ctx.Done():
t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
}

// Remove the cluster resource from the management server, triggering a
// resource-not-found error.
resources = e2e.UpdateOptions{
NodeID: nodeID,
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that the watch for the cluster resource is not cancelled.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-cdsResourceCanceledCh:
t.Fatal("Watch for cluster resource is cancelled when not expected to")
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Ensure RPC fails with Unavailable. The actual error message depends on
// the picker returned from the priority LB policy, and therefore not
// checking for it here.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
}
}

// Tests the following scenarios for resolver errors:
Expand All @@ -822,7 +791,7 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
// is expected to push the error down the child policy and put the channel in
// TRANSIENT_FAILURE. It is also expected to cancel the CDS watch.
func (s) TestResolverError(t *testing.T) {
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
_, resolverErrCh, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
lis := testutils.NewListenerWrapper(t, nil)
mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis)

Expand Down Expand Up @@ -938,12 +907,9 @@ func (s) TestResolverError(t *testing.T) {

// Verify that the resolver error is pushed to the child policy.
select {
case err := <-resolverErrCh:
if err != resolverErr {
t.Fatalf("Error pushed to child policy is %v, want %v", err, resolverErr)
}
case <-childPolicyCloseCh:
case <-ctx.Done():
t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
t.Fatal("Timeout when waiting for child policy to be closed")
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
Expand All @@ -956,6 +922,91 @@ func (s) TestResolverError(t *testing.T) {
}
}

// Tests scenarios involving removal of a cluster resource from the management
// server.
//
// - when the cluster resource is removed after a previous good
// update from the management server, the cds LB policy is expected to put
// the channel in TRANSIENT_FAILURE.
// - when the cluster resource is re-sent by the management server, RPCs
// should start succeeding.
func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {
mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)

// Verify that the specified cluster resource is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantNames := []string{clusterName}
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
t.Fatal(err)
}

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
t.Cleanup(server.Stop)

// Configure cluster and endpoints resources in the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

// Remove the cluster resource from the management server, triggering a
// resource-not-found error.
resources.Clusters = nil
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that the watch for the cluster resource is not cancelled.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-cdsResourceCanceledCh:
t.Fatal("Watch for cluster resource is cancelled when not expected to")
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Ensure RPC fails with Unavailable.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if status.Code(err) != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
}
if !strings.Contains(err.Error(), wantErr) {
t.Fatalf("EmptyCall() failed with error: %v, want %v", err, wantErr)
}

// Re-add the cluster resource to the management server.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
}

// Tests that closing the cds LB policy results in the the child policy being
// closed.
func (s) TestClose(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,12 @@ func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
}

// Ensure that RPCs start to fail with expected error.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
_, err := client.EmptyCall(sCtx, &testpb.Empty{})
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "all priorities are removed") {
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), wantErr) {
break
}
if err != nil {
Expand Down

0 comments on commit feaf942

Please sign in to comment.