diff --git a/balancer/roundrobin/roundrobin.go b/balancer/roundrobin/roundrobin.go index 80a42d22510c..35da5d1ec9d9 100644 --- a/balancer/roundrobin/roundrobin.go +++ b/balancer/roundrobin/roundrobin.go @@ -22,12 +22,13 @@ package roundrobin import ( - rand "math/rand/v2" - "sync/atomic" + "fmt" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/balancer/endpointsharding" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" ) // Name is the name of round_robin balancer. @@ -35,47 +36,44 @@ const Name = "round_robin" var logger = grpclog.Component("roundrobin") -// newBuilder creates a new roundrobin balancer builder. -func newBuilder() balancer.Builder { - return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true}) -} - func init() { - balancer.Register(newBuilder()) + balancer.Register(builder{}) } -type rrPickerBuilder struct{} +type builder struct{} -func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { - logger.Infof("roundrobinPicker: Build called with info: %v", info) - if len(info.ReadySCs) == 0 { - return base.NewErrPicker(balancer.ErrNoSubConnAvailable) - } - scs := make([]balancer.SubConn, 0, len(info.ReadySCs)) - for sc := range info.ReadySCs { - scs = append(scs, sc) - } - return &rrPicker{ - subConns: scs, - // Start at a random index, as the same RR balancer rebuilds a new - // picker when SubConn states change, and we don't want to apply excess - // load to the first server in the list. - next: uint32(rand.IntN(len(scs))), +func (bb builder) Name() string { + return Name +} + +func (bb builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + childBuilder := balancer.Get(pickfirstleaf.Name).Build + bal := &rrBalancer{ + cc: cc, + Balancer: endpointsharding.NewBalancer(cc, opts, childBuilder, endpointsharding.Options{}), } + bal.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", bal)) + bal.logger.Infof("Created") + return bal } -type rrPicker struct { - // subConns is the snapshot of the roundrobin balancer when this picker was - // created. The slice is immutable. Each Get() will do a round robin - // selection from it and return the selected SubConn. - subConns []balancer.SubConn - next uint32 +type rrBalancer struct { + balancer.Balancer + cc balancer.ClientConn + logger *internalgrpclog.PrefixLogger } -func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { - subConnsLen := uint32(len(p.subConns)) - nextIndex := atomic.AddUint32(&p.next, 1) +func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + return b.Balancer.UpdateClientConnState(balancer.ClientConnState{ + // Enable the health listener in pickfirst children for client side health + // checks and outlier detection, if configured. + ResolverState: pickfirstleaf.EnableHealthListener(ccs.ResolverState), + }) +} - sc := p.subConns[nextIndex%subConnsLen] - return balancer.PickResult{SubConn: sc}, nil +func (b *rrBalancer) ExitIdle() { + // Should always be ok, as child is endpoint sharding. + if ei, ok := b.Balancer.(balancer.ExitIdler); ok { + ei.ExitIdle() + } } diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index e09b478334a7..09c8df13d469 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -395,6 +395,7 @@ func (b *wrrBalancer) Close() { ew.stopORCAListener() } } + b.child.Close() } func (b *wrrBalancer) ExitIdle() { diff --git a/balancer/weightedtarget/weightedtarget_test.go b/balancer/weightedtarget/weightedtarget_test.go index 5251ed6c1049..acaae430fbe6 100644 --- a/balancer/weightedtarget/weightedtarget_test.go +++ b/balancer/weightedtarget/weightedtarget_test.go @@ -129,12 +129,14 @@ func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) e return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig) } - addrsWithAttr := make([]resolver.Address, len(s.ResolverState.Addresses)) - for i, addr := range s.ResolverState.Addresses { - addrsWithAttr[i] = setConfigKey(addr, c.configStr) + for i, ep := range s.ResolverState.Endpoints { + addrsWithAttr := make([]resolver.Address, len(ep.Addresses)) + for j, addr := range ep.Addresses { + addrsWithAttr[j] = setConfigKey(addr, c.configStr) + } + s.ResolverState.Endpoints[i].Addresses = addrsWithAttr } s.BalancerConfig = nil - s.ResolverState.Addresses = addrsWithAttr return b.Balancer.UpdateClientConnState(s) } @@ -188,7 +190,9 @@ func (s) TestWeightedTarget(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. addr1 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + }}, BalancerConfig: config1, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) @@ -229,7 +233,9 @@ func (s) TestWeightedTarget(t *testing.T) { // Send the config, and one address with hierarchy path "cluster_2". addr2 := resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_2"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_2"}), + }}, BalancerConfig: config2, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) @@ -241,6 +247,12 @@ func (s) TestWeightedTarget(t *testing.T) { // The subconn for cluster_1 should be shut down. scShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scShutdown != sc1 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown) } @@ -277,7 +289,9 @@ func (s) TestWeightedTarget(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_2"]. addr3 := resolver.Address{Addr: testBackendAddrStrs[3], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr3, []string{"cluster_2"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_2"}), + }}, BalancerConfig: config3, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) @@ -286,8 +300,15 @@ func (s) TestWeightedTarget(t *testing.T) { // The subconn from the test_config_balancer should be shut down. scShutdown = <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh + if scShutdown != sc2 { - t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown) + t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown) } scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) @@ -350,7 +371,9 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) @@ -375,9 +398,9 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { // Send two addresses. addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_1"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_1"}), }}, BalancerConfig: config, }); err != nil { @@ -401,7 +424,9 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { // Remove the first address. if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_1"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_1"}), + }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) @@ -453,16 +478,18 @@ func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) { addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scs := waitForNewSubConns(t, cc, 2) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scs := waitForNewSubConns(ctx, t, cc, 2) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, @@ -472,13 +499,14 @@ func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) { sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn) sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) + // The CONNECTING picker should be sent by all leaf pickfirst policies on + // receiving the first resolver update. + <-cc.NewPickerCh // Send state changes for both SubConns, and wait for the picker. sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh @@ -521,18 +549,20 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} addr4 := resolver.Address{Addr: testBackendAddrStrs[4]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_1"}), - hierarchy.Set(addr3, []string{"cluster_2"}), - hierarchy.Set(addr4, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_2"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr4}}, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scs := waitForNewSubConns(t, cc, 4) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scs := waitForNewSubConns(ctx, t, cc, 4) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1, addr2}, "cluster_2": {addr3, addr4}, @@ -544,21 +574,21 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn) + // The CONNECTING picker should be sent by all leaf pickfirst policies on + // receiving the first resolver update. + <-cc.NewPickerCh + // Send state changes for all SubConns, and wait for the picker. sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh @@ -570,7 +600,13 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn sc2's connection down, should be RR between balancers. - sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + wantSubConnErr := errors.New("subConn connection error") + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: wantSubConnErr, + }) p = <-cc.NewPickerCh want = []balancer.SubConn{sc1, sc1, sc3, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { @@ -579,10 +615,10 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { // Shut down subConn corresponding to addr3. if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_1"}), - hierarchy.Set(addr4, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr4}}, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { @@ -600,7 +636,8 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn sc1's connection down. - wantSubConnErr := errors.New("subConn connection error") + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc1.UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, @@ -612,6 +649,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn last connection to connecting. + sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) p = <-cc.NewPickerCh for i := 0; i < 5; i++ { @@ -626,8 +664,6 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { ConnectionError: wantSubConnErr, }) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil { t.Fatal(err) } @@ -665,18 +701,20 @@ func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *test addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} addr4 := resolver.Address{Addr: testBackendAddrStrs[4]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_1"}), - hierarchy.Set(addr3, []string{"cluster_2"}), - hierarchy.Set(addr4, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_2"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr4}}, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scs := waitForNewSubConns(t, cc, 4) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scs := waitForNewSubConns(ctx, t, cc, 4) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1, addr2}, "cluster_2": {addr3, addr4}, @@ -688,21 +726,21 @@ func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *test sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn) + // The CONNECTING picker should be sent by all leaf pickfirst policies on + // receiving the first resolver update. + <-cc.NewPickerCh + // Send state changes for all SubConns, and wait for the picker. sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh @@ -749,17 +787,19 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_2"}), - hierarchy.Set(addr3, []string{"cluster_3"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_2"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_3"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scs := waitForNewSubConns(t, cc, 3) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scs := waitForNewSubConns(ctx, t, cc, 3) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, @@ -772,16 +812,17 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { sc3 := scs["cluster_3"][0].sc.(*testutils.TestSubConn) // Send state changes for all SubConns, and wait for the picker. - sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // The CONNECTING picker should be sent by all leaf pickfirst policies on + // receiving the first resolver update. <-cc.NewPickerCh + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh + <-sc3.ConnectCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh @@ -808,9 +849,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { t.Fatalf("failed to parse balancer config: %v", err) } if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr3, []string{"cluster_3"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_3"}), }}, BalancerConfig: config, }); err != nil { @@ -822,6 +863,12 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { p = <-cc.NewPickerCh scShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scShutdown != sc2 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown) } @@ -831,6 +878,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { } // Move balancer 3 into transient failure. + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + <-sc3.ConnectCh + sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) wantSubConnErr := errors.New("subConn connection error") sc3.UpdateState(balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, @@ -852,8 +902,8 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { t.Fatalf("failed to parse balancer config: %v", err) } if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr3, []string{"cluster_3"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_3"}), }}, BalancerConfig: config, }); err != nil { @@ -862,14 +912,17 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { // Removing a subBalancer causes the weighted target LB policy to push a new // picker which ensures that the removed subBalancer is not picked for RPCs. - scShutdown = <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scShutdown != sc1 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil { t.Fatal(err) } @@ -907,18 +960,20 @@ func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} addr4 := resolver.Address{Addr: testBackendAddrStrs[4]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_1"}), - hierarchy.Set(addr3, []string{"cluster_2"}), - hierarchy.Set(addr4, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_2"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr4}}, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scs := waitForNewSubConns(t, cc, 4) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scs := waitForNewSubConns(ctx, t, cc, 4) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1, addr2}, "cluster_2": {addr3, addr4}, @@ -930,21 +985,21 @@ func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn) sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn) + // The CONNECTING picker should be sent by all leaf pickfirst policies on + // receiving the first resolver update. + <-cc.NewPickerCh + // Send state changes for all SubConns, and wait for the picker. sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - <-cc.NewPickerCh sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh @@ -973,11 +1028,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing t.Fatalf("failed to parse balancer config: %v", err) } if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_1"}), - hierarchy.Set(addr3, []string{"cluster_2"}), - hierarchy.Set(addr4, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr3}}, []string{"cluster_2"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr4}}, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { @@ -1023,16 +1078,18 @@ func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) { addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr1}}, []string{"cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr2}}, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scs := waitForNewSubConns(t, cc, 2) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scs := waitForNewSubConns(ctx, t, cc, 2) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, @@ -1084,17 +1141,21 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes // Send the config with one address for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} + ep1 := resolver.Endpoint{Addresses: []resolver.Address{addr1}} + ep2 := resolver.Endpoint{Addresses: []resolver.Address{addr2}} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(addr1, []string{"cluster_1"}), - hierarchy.Set(addr2, []string{"cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(ep1, []string{"cluster_1"}), + hierarchy.SetInEndpoint(ep2, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } - scs := waitForNewSubConns(t, cc, 2) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scs := waitForNewSubConns(ctx, t, cc, 2) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, @@ -1139,13 +1200,13 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes } } -// Verify that a SubConn is created with the expected address and hierarchy -// path cleared. +// Verify that a SubConn is created with the expected address. func verifyAddressInNewSubConn(t *testing.T, cc *testutils.BalancerClientConn, addr resolver.Address) { t.Helper() gotAddr := <-cc.NewSubConnAddrsCh - wantAddr := []resolver.Address{hierarchy.Set(addr, []string{})} + wantAddr := []resolver.Address{addr} + gotAddr[0].BalancerAttributes = nil if diff := cmp.Diff(gotAddr, wantAddr, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { t.Fatalf("got unexpected new subconn addrs: %v", diff) } @@ -1163,12 +1224,17 @@ type subConnWithAddr struct { // // Returned value is a map from subBalancer (identified by its config) to // subConns created by it. -func waitForNewSubConns(t *testing.T, cc *testutils.BalancerClientConn, num int) map[string][]subConnWithAddr { +func waitForNewSubConns(ctx context.Context, t *testing.T, cc *testutils.BalancerClientConn, num int) map[string][]subConnWithAddr { t.Helper() scs := make(map[string][]subConnWithAddr) for i := 0; i < num; i++ { - addrs := <-cc.NewSubConnAddrsCh + var addrs []resolver.Address + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for addresses for new SubConn.") + case addrs = <-cc.NewSubConnAddrsCh: + } if len(addrs) != 1 { t.Fatalf("received subConns with %d addresses, want 1", len(addrs)) } @@ -1176,7 +1242,12 @@ func waitForNewSubConns(t *testing.T, cc *testutils.BalancerClientConn, num int) if !ok { t.Fatalf("received subConn address %v contains no attribute for balancer config", addrs[0]) } - sc := <-cc.NewSubConnCh + var sc balancer.SubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for new SubConn.") + case sc = <-cc.NewSubConnCh: + } scWithAddr := subConnWithAddr{sc: sc, addr: addrs[0]} scs[cfg] = append(scs[cfg], scWithAddr) } @@ -1253,7 +1324,9 @@ func (s) TestInitialIdle(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addrs[0]}}, []string{"cds:cluster_1"}), + }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) @@ -1295,7 +1368,9 @@ func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. addr := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr, []string{"cluster_1"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addr}}, []string{"cluster_1"}), + }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) @@ -1354,7 +1429,9 @@ func (s) TestUpdateStatePauses(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{addrs[0]}}, []string{"cds:cluster_1"}), + }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index c154c029d8f2..78bd78e030ce 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -19,6 +19,7 @@ package balancergroup import ( "context" "encoding/json" + "errors" "fmt" "testing" "time" @@ -42,16 +43,19 @@ const ( ) var ( - rrBuilder = balancer.Get(roundrobin.Name) - testBalancerIDs = []string{"b1", "b2", "b3"} - testBackendAddrs []resolver.Address + rrBuilder = balancer.Get(roundrobin.Name) + testBalancerIDs = []string{"b1", "b2", "b3"} + testBackendAddrs []resolver.Address + testBackendEndpoints []resolver.Endpoint ) const testBackendAddrsCount = 12 func init() { for i := 0; i < testBackendAddrsCount; i++ { - testBackendAddrs = append(testBackendAddrs, resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)}) + addr := resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)} + testBackendAddrs = append(testBackendAddrs, addr) + testBackendEndpoints = append(testBackendEndpoints, resolver.Endpoint{Addresses: []resolver.Address{addr}}) } } @@ -88,18 +92,18 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { // balancers. gator.Add(testBalancerIDs[0], 2) bg.Add(testBalancerIDs[0], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}}) + bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[0:2]}}) gator.Add(testBalancerIDs[1], 1) bg.Add(testBalancerIDs[1], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}) + bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[2:4]}}) bg.Start() - m1 := make(map[resolver.Address]balancer.SubConn) + m1 := make(map[string]balancer.SubConn) for i := 0; i < 4; i++ { addrs := <-cc.NewSubConnAddrsCh sc := <-cc.NewSubConnCh - m1[addrs[0]] = sc + m1[addrs[0].Addr] = sc sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) } @@ -107,9 +111,9 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { // Test roundrobin on the last picker. p1 := <-cc.NewPickerCh want := []balancer.SubConn{ - m1[testBackendAddrs[0]], m1[testBackendAddrs[0]], - m1[testBackendAddrs[1]], m1[testBackendAddrs[1]], - m1[testBackendAddrs[2]], m1[testBackendAddrs[3]], + m1[testBackendAddrs[0].Addr], m1[testBackendAddrs[0].Addr], + m1[testBackendAddrs[1].Addr], m1[testBackendAddrs[1].Addr], + m1[testBackendAddrs[2].Addr], m1[testBackendAddrs[3].Addr], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) @@ -124,7 +128,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { // Add b3, weight 1, backends [1,2]. gator.Add(testBalancerIDs[2], 1) bg.Add(testBalancerIDs[2], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}}) + bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[1:3]}}) // Remove b1. gator.Remove(testBalancerIDs[0]) @@ -132,16 +136,16 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { // Update b2 to weight 3, backends [0,3]. gator.UpdateWeight(testBalancerIDs[1], 3) - bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}}) + bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: append([]resolver.Endpoint(nil), testBackendEndpoints[0], testBackendEndpoints[3])}}) gator.Start() bg.Start() - m2 := make(map[resolver.Address]balancer.SubConn) + m2 := make(map[string]balancer.SubConn) for i := 0; i < 4; i++ { addrs := <-cc.NewSubConnAddrsCh sc := <-cc.NewSubConnCh - m2[addrs[0]] = sc + m2[addrs[0].Addr] = sc sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) } @@ -149,9 +153,9 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { // Test roundrobin on the last picker. p2 := <-cc.NewPickerCh want = []balancer.SubConn{ - m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], - m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], - m2[testBackendAddrs[1]], m2[testBackendAddrs[2]], + m2[testBackendAddrs[0].Addr], m2[testBackendAddrs[0].Addr], m2[testBackendAddrs[0].Addr], + m2[testBackendAddrs[3].Addr], m2[testBackendAddrs[3].Addr], m2[testBackendAddrs[3].Addr], + m2[testBackendAddrs[1].Addr], m2[testBackendAddrs[2].Addr], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) @@ -203,7 +207,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) { // Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer // is removed later, so the balancer group returned has one sub-balancer in its // own map, and one sub-balancer in cache. -func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.BalancerClientConn, map[resolver.Address]*testutils.TestSubConn) { +func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.BalancerClientConn, map[string]*testutils.TestSubConn) { cc := testutils.NewBalancerClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() @@ -219,18 +223,18 @@ func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duratio // balancers. gator.Add(testBalancerIDs[0], 2) bg.Add(testBalancerIDs[0], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}}) + bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[0:2]}}) gator.Add(testBalancerIDs[1], 1) bg.Add(testBalancerIDs[1], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}) + bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[2:4]}}) bg.Start() - m1 := make(map[resolver.Address]*testutils.TestSubConn) + m1 := make(map[string]*testutils.TestSubConn) for i := 0; i < 4; i++ { addrs := <-cc.NewSubConnAddrsCh sc := <-cc.NewSubConnCh - m1[addrs[0]] = sc + m1[addrs[0].Addr] = sc sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) } @@ -238,9 +242,9 @@ func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duratio // Test roundrobin on the last picker. p1 := <-cc.NewPickerCh want := []balancer.SubConn{ - m1[testBackendAddrs[0]], m1[testBackendAddrs[0]], - m1[testBackendAddrs[1]], m1[testBackendAddrs[1]], - m1[testBackendAddrs[2]], m1[testBackendAddrs[3]], + m1[testBackendAddrs[0].Addr], m1[testBackendAddrs[0].Addr], + m1[testBackendAddrs[1].Addr], m1[testBackendAddrs[1].Addr], + m1[testBackendAddrs[2].Addr], m1[testBackendAddrs[3].Addr], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil { t.Fatalf("want %v, got %v", want, err) @@ -261,7 +265,7 @@ func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duratio // Test roundrobin on the with only sub-balancer0. p2 := <-cc.NewPickerCh want = []balancer.SubConn{ - m1[testBackendAddrs[0]], m1[testBackendAddrs[1]], + m1[testBackendAddrs[0].Addr], m1[testBackendAddrs[1].Addr], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil { t.Fatalf("want %v, got %v", want, err) @@ -277,7 +281,10 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) { // Turn down subconn for addr2, shouldn't get picker update because // sub-balancer1 was removed. - addrToSC[testBackendAddrs[2]].UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + addrToSC[testBackendAddrs[2].Addr].UpdateState(balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: errors.New("test error"), + }) for i := 0; i < 10; i++ { select { case <-cc.NewPickerCh: @@ -295,10 +302,10 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) { p3 := <-cc.NewPickerCh want := []balancer.SubConn{ - addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]], - addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]], + addrToSC[testBackendAddrs[0].Addr], addrToSC[testBackendAddrs[0].Addr], + addrToSC[testBackendAddrs[1].Addr], addrToSC[testBackendAddrs[1].Addr], // addr2 is down, b2 only has addr3 in READY state. - addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]], + addrToSC[testBackendAddrs[3].Addr], addrToSC[testBackendAddrs[3].Addr], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) @@ -324,10 +331,10 @@ func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) { // The balancer group is closed. The subconns should be shutdown immediately. shutdownTimeout := time.After(time.Millisecond * 500) scToShutdown := map[balancer.SubConn]int{ - addrToSC[testBackendAddrs[0]]: 1, - addrToSC[testBackendAddrs[1]]: 1, - addrToSC[testBackendAddrs[2]]: 1, - addrToSC[testBackendAddrs[3]]: 1, + addrToSC[testBackendAddrs[0].Addr]: 1, + addrToSC[testBackendAddrs[1].Addr]: 1, + addrToSC[testBackendAddrs[2].Addr]: 1, + addrToSC[testBackendAddrs[3].Addr]: 1, } for i := 0; i < len(scToShutdown); i++ { select { @@ -353,8 +360,8 @@ func (s) TestBalancerGroup_locality_caching_not_read_within_timeout(t *testing.T ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() scToShutdown := map[balancer.SubConn]int{ - addrToSC[testBackendAddrs[2]]: 1, - addrToSC[testBackendAddrs[3]]: 1, + addrToSC[testBackendAddrs[2].Addr]: 1, + addrToSC[testBackendAddrs[3].Addr]: 1, } for i := 0; i < len(scToShutdown); i++ { select { @@ -399,8 +406,8 @@ func (s) TestBalancerGroup_locality_caching_read_with_different_builder(t *testi // shut down immediately. shutdownTimeout := time.After(time.Millisecond * 500) scToShutdown := map[balancer.SubConn]int{ - addrToSC[testBackendAddrs[2]]: 1, - addrToSC[testBackendAddrs[3]]: 1, + addrToSC[testBackendAddrs[2].Addr]: 1, + addrToSC[testBackendAddrs[3].Addr]: 1, } for i := 0; i < len(scToShutdown); i++ { select { @@ -415,23 +422,23 @@ func (s) TestBalancerGroup_locality_caching_read_with_different_builder(t *testi } } - bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[4:6]}}) + bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[4:6]}}) newSCTimeout := time.After(time.Millisecond * 500) - scToAdd := map[resolver.Address]int{ - testBackendAddrs[4]: 1, - testBackendAddrs[5]: 1, + scToAdd := map[string]int{ + testBackendAddrs[4].Addr: 1, + testBackendAddrs[5].Addr: 1, } for i := 0; i < len(scToAdd); i++ { select { case addr := <-cc.NewSubConnAddrsCh: - c := scToAdd[addr[0]] + c := scToAdd[addr[0].Addr] if c == 0 { t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c) } - scToAdd[addr[0]] = c - 1 + scToAdd[addr[0].Addr] = c - 1 sc := <-cc.NewSubConnCh - addrToSC[addr[0]] = sc + addrToSC[addr[0].Addr] = sc sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) case <-newSCTimeout: @@ -442,9 +449,9 @@ func (s) TestBalancerGroup_locality_caching_read_with_different_builder(t *testi // Test roundrobin on the new picker. p3 := <-cc.NewPickerCh want := []balancer.SubConn{ - addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]], - addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]], - addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]], + addrToSC[testBackendAddrs[0].Addr], addrToSC[testBackendAddrs[0].Addr], + addrToSC[testBackendAddrs[1].Addr], addrToSC[testBackendAddrs[1].Addr], + addrToSC[testBackendAddrs[4].Addr], addrToSC[testBackendAddrs[5].Addr], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil { t.Fatalf("want %v, got %v", want, err) @@ -572,17 +579,17 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { }) gator.Add(testBalancerIDs[0], 1) bg.Add(testBalancerIDs[0], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}}) + bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[0:2]}}) bg.Start() defer bg.Close() - m1 := make(map[resolver.Address]balancer.SubConn) + m1 := make(map[string]balancer.SubConn) scs := make(map[balancer.SubConn]bool) for i := 0; i < 2; i++ { addrs := <-cc.NewSubConnAddrsCh sc := <-cc.NewSubConnCh - m1[addrs[0]] = sc + m1[addrs[0].Addr] = sc scs[sc] = true sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -590,7 +597,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { p1 := <-cc.NewPickerCh want := []balancer.SubConn{ - m1[testBackendAddrs[0]], m1[testBackendAddrs[1]], + m1[testBackendAddrs[0].Addr], m1[testBackendAddrs[1].Addr], } if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil { t.Fatal(err) @@ -609,7 +616,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { bd.Data.(balancer.Balancer).Close() }, UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { - ccs.ResolverState.Addresses = ccs.ResolverState.Addresses[1:] + ccs.ResolverState.Endpoints = ccs.ResolverState.Endpoints[1:] bal := bd.Data.(balancer.Balancer) return bal.UpdateClientConnState(ccs) }, @@ -620,7 +627,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { t.Fatalf("ParseConfig(%s) failed: %v", string(cfgJSON), err) } if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}, + ResolverState: resolver.State{Endpoints: testBackendEndpoints[2:4]}, BalancerConfig: lbCfg, }); err != nil { t.Fatalf("error updating ClientConn state: %v", err) diff --git a/test/roundrobin_test.go b/test/roundrobin_test.go index 815e244946fe..7fa92aaaedc1 100644 --- a/test/roundrobin_test.go +++ b/test/roundrobin_test.go @@ -51,6 +51,7 @@ func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOpt const backendCount = 5 backends := make([]*stubserver.StubServer, backendCount) + endpoints := make([]resolver.Endpoint, backendCount) addrs := make([]resolver.Address, backendCount) for i := 0; i < backendCount; i++ { backend := &stubserver.StubServer{ @@ -64,6 +65,7 @@ func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOpt backends[i] = backend addrs[i] = resolver.Address{Addr: backend.Address} + endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{addrs[i]}} } dopts := []grpc.DialOption{ @@ -115,10 +117,10 @@ func (s) TestRoundRobin_AddressesRemoved(t *testing.T) { // Send a resolver update with no addresses. This should push the channel into // TransientFailure. - r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) + r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{}}) testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) - const msgWant = "produced zero addresses" + const msgWant = "no children to pick from" client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(status.Convert(err).Message(), msgWant) { t.Fatalf("EmptyCall() = %v, want Contains(Message(), %q)", err, msgWant) diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index f1e99fab9bf9..600897344e51 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -62,8 +62,8 @@ const ( ) var ( - testBackendAddrs = []resolver.Address{{Addr: "1.1.1.1:1"}} - cmpOpts = cmp.Options{ + testBackendEndpoints = []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}} + cmpOpts = cmp.Options{ cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval"), } @@ -109,7 +109,7 @@ func (s) TestDropByCategory(t *testing.T) { t.Fatalf("Failed to create LRS server config for testing: %v", err) } if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -205,7 +205,7 @@ func (s) TestDropByCategory(t *testing.T) { dropDenominator2 = 4 ) if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -284,7 +284,7 @@ func (s) TestDropCircuitBreaking(t *testing.T) { t.Fatalf("Failed to create LRS server config for testing: %v", err) } if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -430,7 +430,7 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) { var maxRequest uint32 = 50 if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -474,7 +474,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { defer b.Close() if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -494,7 +494,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { } addrs1 := <-cc.NewSubConnAddrsCh - if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want { + if got, want := addrs1[0].Addr, testBackendEndpoints[0].Addresses[0].Addr; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } cn, ok := xds.GetXDSHandshakeClusterName(addrs1[0].Attributes) @@ -511,7 +511,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { const testClusterName2 = "test-cluster-2" var addr2 = resolver.Address{Addr: "2.2.2.2"} if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{addr2}}}}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName2, EDSServiceName: testServiceName, @@ -549,7 +549,7 @@ func (s) TestReResolution(t *testing.T) { defer b.Close() if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -568,7 +568,10 @@ func (s) TestReResolution(t *testing.T) { t.Fatal(err.Error()) } - sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + sc1.UpdateState(balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: errors.New("test error"), + }) // This should get the transient failure picker. if err := cc.WaitForErrPicker(ctx); err != nil { t.Fatal(err.Error()) @@ -615,9 +618,12 @@ func (s) TestLoadReporting(t *testing.T) { b := builder.Build(cc, balancer.BuildOptions{}) defer b.Close() - addrs := make([]resolver.Address, len(testBackendAddrs)) - for i, a := range testBackendAddrs { - addrs[i] = xdsinternal.SetLocalityID(a, testLocality) + endpoints := make([]resolver.Endpoint, len(testBackendEndpoints)) + for i, e := range testBackendEndpoints { + endpoints[i] = xdsinternal.SetLocalityIDInEndpoint(e, testLocality) + for j, a := range e.Addresses { + endpoints[i].Addresses[j] = xdsinternal.SetLocalityID(a, testLocality) + } } testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ URI: "trafficdirector.googleapis.com:443", @@ -627,7 +633,7 @@ func (s) TestLoadReporting(t *testing.T) { t.Fatalf("Failed to create LRS server config for testing: %v", err) } if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -661,7 +667,7 @@ func (s) TestLoadReporting(t *testing.T) { scs := balancer.SubConnState{ConnectivityState: connectivity.Ready} sca := internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) - sca(&scs, addrs[0]) + sca(&scs, endpoints[0].Addresses[0]) sc1.UpdateState(scs) // Test pick with one backend. const successCount = 5 @@ -748,9 +754,9 @@ func (s) TestUpdateLRSServer(t *testing.T) { b := builder.Build(cc, balancer.BuildOptions{}) defer b.Close() - addrs := make([]resolver.Address, len(testBackendAddrs)) - for i, a := range testBackendAddrs { - addrs[i] = xdsinternal.SetLocalityID(a, testLocality) + endpoints := make([]resolver.Endpoint, len(testBackendEndpoints)) + for i, e := range testBackendEndpoints { + endpoints[i] = xdsinternal.SetLocalityIDInEndpoint(e, testLocality) } testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ URI: "trafficdirector.googleapis.com:443", @@ -760,7 +766,7 @@ func (s) TestUpdateLRSServer(t *testing.T) { t.Fatalf("Failed to create LRS server config for testing: %v", err) } if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -794,7 +800,7 @@ func (s) TestUpdateLRSServer(t *testing.T) { // Update LRS server to a different name. if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -819,7 +825,7 @@ func (s) TestUpdateLRSServer(t *testing.T) { // Update LRS server to nil, to disable LRS. if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: endpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, @@ -876,7 +882,7 @@ func (s) TestChildPolicyUpdatedOnConfigUpdate(t *testing.T) { // Initial config update with childPolicyName1 if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, ChildPolicy: &internalserviceconfig.BalancerConfig{ @@ -893,7 +899,7 @@ func (s) TestChildPolicyUpdatedOnConfigUpdate(t *testing.T) { // Second config update with childPolicyName2 if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, ChildPolicy: &internalserviceconfig.BalancerConfig{ @@ -929,7 +935,7 @@ func (s) TestFailedToParseChildPolicyConfig(t *testing.T) { }) err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, ChildPolicy: &internalserviceconfig.BalancerConfig{ @@ -992,7 +998,7 @@ func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) { }) if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + ResolverState: xdsclient.SetClient(resolver.State{Endpoints: testBackendEndpoints}, xdsC), BalancerConfig: &LBConfig{ Cluster: testClusterName, EDSServiceName: testServiceName, diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index b606cb9e5e34..dd6baf65b80f 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -96,9 +96,9 @@ func TestClusterPicks(t *testing.T) { {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, } if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), - hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"cds:cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[1]}}, []string{"cds:cluster_2"}), }}, BalancerConfig: config1, }); err != nil { @@ -175,9 +175,9 @@ func TestConfigUpdateAddCluster(t *testing.T) { {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, } if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), - hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"cds:cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[1]}}, []string{"cds:cluster_2"}), }}, BalancerConfig: config1, }); err != nil { @@ -243,10 +243,10 @@ func TestConfigUpdateAddCluster(t *testing.T) { } wantAddrs = append(wantAddrs, resolver.Address{Addr: testBackendAddrStrs[2], BalancerAttributes: nil}) if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), - hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), - hierarchy.Set(wantAddrs[2], []string{"cds:cluster_3"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"cds:cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[1]}}, []string{"cds:cluster_2"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[2]}}, []string{"cds:cluster_3"}), }}, BalancerConfig: config2, }); err != nil { @@ -333,9 +333,9 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, } if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), - hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"cds:cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[1]}}, []string{"cds:cluster_2"}), }}, BalancerConfig: config1, }); err != nil { @@ -417,9 +417,9 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { // Resend the previous config with clusters if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), - hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"cds:cluster_1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[1]}}, []string{"cds:cluster_2"}), }}, BalancerConfig: config1, }); err != nil { @@ -576,8 +576,8 @@ func TestInitialIdle(t *testing.T) { {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, } if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"cds:cluster_1"}), }}, BalancerConfig: config1, }); err != nil { @@ -623,8 +623,8 @@ func TestClusterGracefulSwitch(t *testing.T) { {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, } if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"csp:cluster"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"csp:cluster"}), }}, BalancerConfig: config1, }); err != nil { @@ -664,8 +664,8 @@ func TestClusterGracefulSwitch(t *testing.T) { t.Fatalf("failed to parse balancer config: %v", err) } if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[1], []string{"csp:cluster"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[1]}}, []string{"csp:cluster"}), }}, BalancerConfig: config2, }); err != nil { @@ -751,8 +751,8 @@ func (s) TestUpdateStatePauses(t *testing.T) { {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, } if err := bal.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{wantAddrs[0]}}, []string{"cds:cluster_1"}), }}, BalancerConfig: config1, }); err != nil { diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 67ba02c4cbeb..c127e0db19ab 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -1164,8 +1164,8 @@ func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable) continue } - if !strings.Contains(err.Error(), "produced zero addresses") { - t.Logf("EmptyCall() = %v, want %v", err, "produced zero addresses") + if !strings.Contains(err.Error(), "no children to pick from") { + t.Logf("EmptyCall() = %v, want %v", err, "no children to pick from") continue } return nil diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index 79740dc24f71..7183cefc32fc 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -20,6 +20,7 @@ package priority import ( "context" + "errors" "fmt" "testing" "time" @@ -89,9 +90,9 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { // Two children, with priorities [0, 1], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -123,10 +124,10 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { // Add p2, it shouldn't cause any updates. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-2"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, []string{"child-2"}), }, }, BalancerConfig: &LBConfig{ @@ -157,9 +158,9 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { // Remove p2, no updates. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -203,9 +204,9 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { t.Log("Two localities, with priorities [0, 1], each with one backend.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -248,6 +249,7 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh + <-sc1.ConnectCh sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -259,10 +261,10 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { t.Log("Add p2, it shouldn't cause any updates.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-2"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, []string{"child-2"}), }, }, BalancerConfig: &LBConfig{ @@ -286,7 +288,13 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { } t.Log("Turn down 1, use 2.") - sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + <-sc1.ConnectCh + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc1.UpdateState(balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: errors.New("test error"), + }) // Before 2 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. @@ -310,9 +318,9 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { t.Log("Remove 2, use 1.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -328,6 +336,12 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { // p2 SubConns are shut down. scToShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown != sc2 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scToShutdown) } @@ -366,9 +380,9 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { // Two localities, with priorities [0, 1], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -426,6 +440,12 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { // p1 subconn should be shut down. scToShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown != sc1 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown) } @@ -450,9 +470,9 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { // Two localities, with different priorities, each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -499,10 +519,10 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { t.Log("Add p2, it should create a new SubConn.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-2"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, []string{"child-2"}), }, }, BalancerConfig: &LBConfig{ @@ -551,10 +571,10 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { // Three localities, with priorities [0,1,2], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-2"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, []string{"child-2"}), }, }, BalancerConfig: &LBConfig{ @@ -617,7 +637,17 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { // // With localities caching, the lower priorities are closed after a timeout, // in goroutines. The order is no longer guaranteed. - scToShutdown := []balancer.SubConn{<-cc.ShutdownSubConnCh, <-cc.ShutdownSubConnCh} + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + scToShutdown := [2]balancer.SubConn{} + scToShutdown[0] = <-cc.ShutdownSubConnCh + <-cc.ShutdownSubConnCh + scToShutdown[1] = <-cc.ShutdownSubConnCh + <-cc.ShutdownSubConnCh + if !(scToShutdown[0] == sc1 && scToShutdown[1] == sc2) && !(scToShutdown[0] == sc2 && scToShutdown[1] == sc1) { t.Errorf("ShutdownSubConn, want [%v, %v], got %v", sc1, sc2, scToShutdown) } @@ -653,9 +683,9 @@ func (s) TestPriority_InitTimeout(t *testing.T) { // Two localities, with different priorities, each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -724,9 +754,9 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // Two localities, with different priorities, each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -768,6 +798,12 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // p0 subconn should be shut down. scToShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown != sc0 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown) } @@ -780,9 +816,9 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // Re-add two localities, with previous priorities, but different backends. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[3]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[3]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -823,8 +859,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // Remove p1, to fallback to p0. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -839,6 +875,12 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // p1 subconn should be shut down. scToShutdown1 := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown1 != sc11 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc11, scToShutdown1) } @@ -883,9 +925,9 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { // Two localities, with priorities [0, 1], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -917,8 +959,8 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { // Remove addresses from priority 0, should use p1. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -975,8 +1017,8 @@ func (s) TestPriority_FirstPriorityUnavailable(t *testing.T) { // One localities, with priorities [0], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1021,9 +1063,9 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // Two children, with priorities [0, 1], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1056,9 +1098,9 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // higher priority, and be used. The old SubConn should be closed. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1082,6 +1124,12 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // Old subconn should be shut down. scToShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown != sc1 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown) } @@ -1118,9 +1166,9 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { // Two children, with priorities [0, 1], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1165,9 +1213,9 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { // higher priority, and be used. The old SubConn should be closed. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1183,6 +1231,12 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { // Old subconn from child-0 should be removed. scToShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown != sc0 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown) } @@ -1214,9 +1268,9 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { // Two children, with priorities [0, 1], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1237,7 +1291,10 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { sc0 := <-cc.NewSubConnCh // p0 is down. - sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + sc0.UpdateState(balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: errors.New("test error"), + }) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { @@ -1260,8 +1317,8 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { // Remove child with p1, the child at higher priority should now be used. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1276,6 +1333,12 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { // Old subconn from child-1 should be shut down. scToShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown != sc1 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown) } @@ -1318,8 +1381,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { // One children, with priorities [0], with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1372,8 +1435,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { // Re-add the child, shouldn't create new connections. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1417,8 +1480,8 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { // One children, with priorities [0], with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1450,8 +1513,8 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { // name). if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1466,6 +1529,12 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { // Old subconn should be shut down. scToShutdown := <-cc.ShutdownSubConnCh + // The same SubConn is closed by gracefulswitch and pickfirstleaf when they + // are closed. Remove duplicate events. + // TODO: /~https://github.com/grpc/grpc-go/issues/6472 - Remove this + // workaround once pickfirst is the only leaf policy and responsible for + // shutting down SubConns. + <-cc.ShutdownSubConnCh if scToShutdown != sc1 { t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown) } @@ -1516,8 +1585,8 @@ func (s) TestPriority_ChildPolicyUpdatePickerInline(t *testing.T) { // One children, with priorities [0], with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1560,8 +1629,8 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { // ignored. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1600,8 +1669,8 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { // Send another update to set IgnoreReresolutionRequests to false. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1659,9 +1728,9 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { // Reresolution is ignored for p0. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1748,7 +1817,7 @@ func init() { }) } - sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{StateListener: lis}) + sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Endpoints[0].Addresses, balancer.NewSubConnOptions{StateListener: lis}) if err != nil { return err } @@ -1780,9 +1849,9 @@ func (s) TestPriority_HighPriorityInitIdle(t *testing.T) { // Two children, with priorities [0, 1], each with one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1846,8 +1915,8 @@ func (s) TestPriority_AddLowPriorityWhenHighIsInIdle(t *testing.T) { // One child, with priorities [0], one backend. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), }, }, BalancerConfig: &LBConfig{ @@ -1875,9 +1944,9 @@ func (s) TestPriority_AddLowPriorityWhenHighIsInIdle(t *testing.T) { // Add 1, should keep using 0. if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1927,9 +1996,9 @@ func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) { t.Log("Two localities, with priorities [0, 1], each with one backend.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1985,9 +2054,9 @@ func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) { t.Log("Change p0 to use new address.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[3]}, []string{"child-1"}), + Endpoints: []resolver.Endpoint{ + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, []string{"child-0"}), + hierarchy.SetInEndpoint(resolver.Endpoint{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[3]}}}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{