diff --git a/go.mod b/go.mod index c40d0e3a82..485f992594 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,6 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 - go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 go.opentelemetry.io/collector v0.45.0 go.opentelemetry.io/otel v1.4.1 go.opentelemetry.io/otel/bridge/opentracing v1.4.1 @@ -121,7 +120,6 @@ require ( github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/btree v1.0.1 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index 770fa1c583..22fa99d1b7 100644 --- a/go.sum +++ b/go.sum @@ -1592,8 +1592,6 @@ go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 h1:BQ6CnNP4pIpy5rusFlTBxAacDgPXhuiHFwoTsB go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0/go.mod h1:/kZdrBXlc5fUgYXfIEQ0B5sb7ejXPKbtF4jWzF1exiQ= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 h1:BQUVqBqNFZZyrRbfydrRLzq9hYvCcRj97SsX1YwD7CA= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0/go.mod h1:3QM2rLq3B3hSXmVEvgVt3vEEbG/AumSs0Is7EgrlKzU= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 h1:3qrZ3p/E7CxdV1kKtAU75hHOcUoXcSTwC7ELKWyzMJo= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0/go.mod h1:hFQkP/cTsZIXXvUv+BsGHZ3TK+76XZMi5GToYA94iac= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/src/aggregator/integration/custom_aggregations_test.go b/src/aggregator/integration/custom_aggregations_test.go index d9c58cbe41..6bae441f46 100644 --- a/src/aggregator/integration/custom_aggregations_test.go +++ b/src/aggregator/integration/custom_aggregations_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -68,7 +68,6 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { if testing.Short() { t.SkipNow() } - aggTypesOpts := aggregation.NewTypesOptions(). SetCounterTypeStringTransformFn(aggregation.SuffixTransform). SetTimerTypeStringTransformFn(aggregation.SuffixTransform). @@ -179,7 +178,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/election.go b/src/aggregator/integration/election.go index 1f24e02263..221996347d 100644 --- a/src/aggregator/integration/election.go +++ b/src/aggregator/integration/election.go @@ -26,9 +26,9 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/services/leader" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) var ( @@ -40,27 +40,38 @@ var ( ) type testCluster struct { - t *testing.T - cluster *integration.Cluster + t *testing.T + cluster *integration.Cluster + leaderService services.LeaderService } func newTestCluster(t *testing.T) *testCluster { integration.BeforeTestExternal(t) - return &testCluster{ + cluster := &testCluster{ t: t, cluster: integration.NewCluster(t, &integration.ClusterConfig{ Size: testClusterSize, + // UseBridge: true, }), } + return cluster } func (tc *testCluster) LeaderService() services.LeaderService { + if tc.leaderService != nil { + return tc.leaderService + } + svc, err := leader.NewService(tc.etcdClient(), tc.options()) require.NoError(tc.t, err) - return svc + tc.leaderService = svc + return tc.leaderService } func (tc *testCluster) Close() { + if tc.leaderService != nil { + require.NoError(tc.t, tc.leaderService.Close()) + } tc.cluster.Terminate(tc.t) } diff --git a/src/aggregator/integration/metadata_change_test.go b/src/aggregator/integration/metadata_change_test.go index fa93ce7baf..9822579101 100644 --- a/src/aggregator/integration/metadata_change_test.go +++ b/src/aggregator/integration/metadata_change_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/multi_client_one_type_test.go b/src/aggregator/integration/multi_client_one_type_test.go index d05185a9ab..2af620205f 100644 --- a/src/aggregator/integration/multi_client_one_type_test.go +++ b/src/aggregator/integration/multi_client_one_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -126,7 +126,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) for i := 0; i < numClients; i++ { require.NoError(t, clients[i].close()) diff --git a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go index ec83c8f362..fb1cdbd407 100644 --- a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go +++ b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/multi_server_resend_test.go b/src/aggregator/integration/multi_server_resend_test.go index 6b7c2fb71e..7833e8d991 100644 --- a/src/aggregator/integration/multi_server_resend_test.go +++ b/src/aggregator/integration/multi_server_resend_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -142,6 +141,7 @@ func TestMultiServerResendAggregatedValues(t *testing.T) { // Election cluster setup. electionCluster := newTestCluster(t) + defer electionCluster.Close() // Sharding function maps all metrics to shard 0 except for the rollup metric, // which gets mapped to the last shard. diff --git a/src/aggregator/integration/one_client_multi_type_forwarded_test.go b/src/aggregator/integration/one_client_multi_type_forwarded_test.go index 9ead187784..039eeae391 100644 --- a/src/aggregator/integration/one_client_multi_type_forwarded_test.go +++ b/src/aggregator/integration/one_client_multi_type_forwarded_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -121,7 +121,7 @@ func TestOneClientMultiTypeForwardedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(2 * time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_timed_test.go b/src/aggregator/integration/one_client_multi_type_timed_test.go index 8449f5858b..80f8d6ec3f 100644 --- a/src/aggregator/integration/one_client_multi_type_timed_test.go +++ b/src/aggregator/integration/one_client_multi_type_timed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -119,7 +119,7 @@ func TestOneClientMultiTypeTimedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(time.Minute + 2*time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_untimed_test.go b/src/aggregator/integration/one_client_multi_type_untimed_test.go index 0af650f26e..0adc3a5c2d 100644 --- a/src/aggregator/integration/one_client_multi_type_untimed_test.go +++ b/src/aggregator/integration/one_client_multi_type_untimed_test.go @@ -26,9 +26,17 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/placement" + "github.com/stretchr/testify/require" +) - "github.com/m3db/m3/src/cluster/placement" +const ( + // waitForDataToFlush is the amount of time we will wait in these tests between finishing writing data to + // the aggregator, and attempting to assert that data went through. + // The aggregator generally, and these tests specifically are quite sensitive to time. + // The tests probably need a bit of a rethink to wait on (or poll for) an actual condition instead of sleeping. + waitForDataToFlush = 10 * time.Second ) func TestOneClientMultiTypeUntimedMetricsWithStagedMetadatas(t *testing.T) { @@ -114,7 +122,7 @@ func testOneClientMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_passthru_test.go b/src/aggregator/integration/one_client_passthru_test.go index 75a6f49e25..489730a123 100644 --- a/src/aggregator/integration/one_client_passthru_test.go +++ b/src/aggregator/integration/one_client_passthru_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2020 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/placement_change_test.go b/src/aggregator/integration/placement_change_test.go index 99683d7f3d..9f798b0ee1 100644 --- a/src/aggregator/integration/placement_change_test.go +++ b/src/aggregator/integration/placement_change_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -227,9 +226,9 @@ func TestPlacementChange(t *testing.T) { } clock.SetNow(start2) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) setPlacement(t, placementKey, clusterClient, finalPlacement) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) for _, data := range datasets[1] { clock.SetNow(data.timestamp) @@ -245,7 +244,7 @@ func TestPlacementChange(t *testing.T) { // Move time forward and wait for flushing to happen. clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) // Remove all the topic consumers before closing clients and servers. This allows to close the // connections between servers while they still are running. Otherwise, during server shutdown, diff --git a/src/aggregator/integration/resend_stress_test.go b/src/aggregator/integration/resend_stress_test.go index c1510ac7ee..691edad117 100644 --- a/src/aggregator/integration/resend_stress_test.go +++ b/src/aggregator/integration/resend_stress_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/same_id_multi_type_test.go b/src/aggregator/integration/same_id_multi_type_test.go index 09974adb2d..fe6b70d999 100644 --- a/src/aggregator/integration/same_id_multi_type_test.go +++ b/src/aggregator/integration/same_id_multi_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testSameIDMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index b0723392dc..3ed6726d6e 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -88,6 +88,7 @@ type testServerSetup struct { // Signals. doneCh chan struct{} closedCh chan struct{} + stopped bool } func newTestServerSetup(t *testing.T, opts testServerOptions) *testServerSetup { @@ -448,6 +449,10 @@ func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy } func (ts *testServerSetup) stopServer() error { + if ts.stopped { + return nil + } + ts.stopped = true if err := ts.aggregator.Close(); err != nil { return err } @@ -460,6 +465,9 @@ func (ts *testServerSetup) stopServer() error { func (ts *testServerSetup) close() { ts.electionCluster.Close() + if err := ts.stopServer(); err != nil { + panic(err.Error()) + } } func (tss testServerSetups) newClient(t *testing.T) *client { diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index d65758e8ff..ddc8df306c 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -22,16 +22,15 @@ package watchmanager import ( "fmt" - "runtime" "sync/atomic" "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" "github.com/m3db/m3/src/x/clock" @@ -164,114 +163,117 @@ func TestWatchRecreate(t *testing.T) { <-doneCh } -func TestWatchNoLeader(t *testing.T) { - t.Skip("flaky, started to fail very consistently on CI") - const ( - watchInitAndRetryDelay = 200 * time.Millisecond - watchCheckInterval = 50 * time.Millisecond - ) - - integration.BeforeTestExternal(t) - ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) - defer ecluster.Terminate(t) - - var ( - ec = ecluster.Client(0) - tickDuration = 10 * time.Millisecond - electionTimeout = time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration - doneCh = make(chan struct{}, 1) - eventLog = []*clientv3.Event{} - updateCalled int32 - shouldStop int32 - ) - - opts := NewOptions(). - SetClient(ec). - SetUpdateFn( - func(_ string, e []*clientv3.Event) error { - atomic.AddInt32(&updateCalled, 1) - if len(e) > 0 { - eventLog = append(eventLog, e...) - } - return nil - }, - ). - SetTickAndStopFn( - func(string) bool { - if atomic.LoadInt32(&shouldStop) == 0 { - return false - } - - close(doneCh) - - return true - }, - ). - SetWatchChanInitTimeout(watchInitAndRetryDelay). - SetWatchChanResetInterval(watchInitAndRetryDelay). - SetWatchChanCheckInterval(watchCheckInterval) - - integration.WaitClientV3(t, ec) - - wh, err := NewWatchManager(opts) - require.NoError(t, err) - - go wh.Watch("foo") - - runtime.Gosched() - time.Sleep(10 * time.Millisecond) - - // there should be a valid watch now, trigger a notification - _, err = ec.Put(context.Background(), "foo", "bar") - require.NoError(t, err) - - leaderIdx := ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - - // simulate quorum loss - ecluster.Members[1].Stop(t) - ecluster.Members[2].Stop(t) - - // wait for election timeout, then member[0] will not have a leader. - time.Sleep(electionTimeout) - - require.NoError(t, ecluster.Members[1].Restart(t)) - require.NoError(t, ecluster.Members[2].Restart(t)) - - // wait for leader + election delay just in case - time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) - - leaderIdx = ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - integration.WaitClientV3(t, ec) // wait for client to be ready again - - _, err = ec.Put(context.Background(), "foo", "baz") - require.NoError(t, err) - - // give some time for watch to be updated - require.True(t, clock.WaitUntil(func() bool { - return atomic.LoadInt32(&updateCalled) >= 2 - }, 10*time.Second)) - - updates := atomic.LoadInt32(&updateCalled) - if updates < 2 { - require.Fail(t, - "insufficient update calls", - "expected at least 2 update attempts, got %d during a partition", - updates) - } - - atomic.AddInt32(&shouldStop, 1) - <-doneCh - - require.Len(t, eventLog, 2) - require.NotNil(t, eventLog[0]) - require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) - require.NotNil(t, eventLog[1]) - require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) -} +// TODO: this test has been skipped for a while, and now doesn't work with the docker based etcd integration package. +// Revive it if it's useful, and make it no longer flake. +//nolint:gocritic +//func TestWatchNoLeader(t *testing.T) { +// t.Skip("flaky, started to fail very consistently on CI") +// const ( +// watchInitAndRetryDelay = 200 * time.Millisecond +// watchCheckInterval = 50 * time.Millisecond +// ) +// +// integration.BeforeTestExternal(t) +// ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) +// defer ecluster.Terminate(t) +// +// var ( +// ec = ecluster.Client(0) +// tickDuration = 10 * time.Millisecond +// electionTimeout = time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration +// doneCh = make(chan struct{}, 1) +// eventLog = []*clientv3.Event{} +// updateCalled int32 +// shouldStop int32 +// ) +// +// opts := NewOptions(). +// SetClient(ec). +// SetUpdateFn( +// func(_ string, e []*clientv3.Event) error { +// atomic.AddInt32(&updateCalled, 1) +// if len(e) > 0 { +// eventLog = append(eventLog, e...) +// } +// return nil +// }, +// ). +// SetTickAndStopFn( +// func(string) bool { +// if atomic.LoadInt32(&shouldStop) == 0 { +// return false +// } +// +// close(doneCh) +// +// return true +// }, +// ). +// SetWatchChanInitTimeout(watchInitAndRetryDelay). +// SetWatchChanResetInterval(watchInitAndRetryDelay). +// SetWatchChanCheckInterval(watchCheckInterval) +// +// integration.WaitClientV3(t, ec) +// +// wh, err := NewWatchManager(opts) +// require.NoError(t, err) +// +// go wh.Watch("foo") +// +// runtime.Gosched() +// time.Sleep(10 * time.Millisecond) +// +// // there should be a valid watch now, trigger a notification +// _, err = ec.Put(context.Background(), "foo", "bar") +// require.NoError(t, err) +// +// leaderIdx := ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// +// // simulate quorum loss +// ecluster.Address[1].Stop(t) +// ecluster.Address[2].Stop(t) +// +// // wait for election timeout, then member[0] will not have a leader. +// time.Sleep(electionTimeout) +// +// require.NoError(t, ecluster.Address[1].Restart(t)) +// require.NoError(t, ecluster.Address[2].Restart(t)) +// +// // wait for leader + election delay just in case +// time.Sleep(time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration) +// +// leaderIdx = ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// integration.WaitClientV3(t, ec) // wait for client to be ready again +// +// _, err = ec.Put(context.Background(), "foo", "baz") +// require.NoError(t, err) +// +// // give some time for watch to be updated +// require.True(t, clock.WaitUntil(func() bool { +// return atomic.LoadInt32(&updateCalled) >= 2 +// }, 10*time.Second)) +// +// updates := atomic.LoadInt32(&updateCalled) +// if updates < 2 { +// require.Fail(t, +// "insufficient update calls", +// "expected at least 2 update attempts, got %d during a partition", +// updates) +// } +// +// atomic.AddInt32(&shouldStop, 1) +// <-doneCh +// +// require.Len(t, eventLog, 2) +// require.NotNil(t, eventLog[0]) +// require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) +// require.NotNil(t, eventLog[1]) +// require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) +//} func TestWatchCompactedRevision(t *testing.T) { wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) diff --git a/src/cluster/integration/etcd/etcd.go b/src/cluster/integration/etcd/etcd.go index 2c93bf1200..b836e36967 100644 --- a/src/cluster/integration/etcd/etcd.go +++ b/src/cluster/integration/etcd/etcd.go @@ -21,26 +21,24 @@ package etcd import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" - "net/url" - "os" "strings" - "time" "github.com/m3db/m3/src/cluster/client" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cluster/services" - xclock "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/x/instrument" - "go.etcd.io/etcd/server/v3/embed" + "github.com/ory/dockertest/v3" ) type embeddedKV struct { - etcd *embed.Etcd + etcd *dockerexternal.EtcdNode opts Options dir string } @@ -51,12 +49,12 @@ func New(opts Options) (EmbeddedKV, error) { if err != nil { return nil, err } - cfg := embed.NewConfig() - cfg.Dir = dir - cfg.Logger = "zap" - setRandomPorts(cfg) - e, err := embed.StartEtcd(cfg) + pool, err := dockertest.NewPool("") + if err != nil { + return nil, fmt.Errorf("constructing dockertest.Pool for EmbeddedKV: %w", err) + } + e, err := dockerexternal.NewEtcd(pool, instrument.NewOptions()) if err != nil { return nil, fmt.Errorf("unable to start etcd, err: %v", err) } @@ -67,56 +65,16 @@ func New(opts Options) (EmbeddedKV, error) { }, nil } -func setRandomPorts(cfg *embed.Config) { - randomPortURL, err := url.Parse("http://localhost:0") - if err != nil { - panic(err.Error()) - } - - cfg.LPUrls = []url.URL{*randomPortURL} - cfg.APUrls = []url.URL{*randomPortURL} - cfg.LCUrls = []url.URL{*randomPortURL} - cfg.ACUrls = []url.URL{*randomPortURL} - - cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) -} - func (e *embeddedKV) Close() error { - var multi errors.MultiError - - // see if there's any errors - select { - case err := <-e.etcd.Err(): - multi = multi.Add(err) - default: - } - - // shutdown and release - e.etcd.Server.Stop() - e.etcd.Close() - - multi = multi.Add(os.RemoveAll(e.dir)) - return multi.FinalError() + return e.etcd.Close(context.TODO()) } func (e *embeddedKV) Start() error { timeout := e.opts.InitTimeout() - select { - case <-e.etcd.Server.ReadyNotify(): - break - case <-time.After(timeout): - return fmt.Errorf("etcd server took too long to start") - } - - // ensure v3 api endpoints are available, /~https://github.com/coreos/etcd/pull/7075 - apiVersionEndpoint := fmt.Sprintf("http://%s/version", e.etcd.Clients[0].Addr().String()) - fn := func() bool { return version3Available(apiVersionEndpoint) } - ok := xclock.WaitUntil(fn, timeout) - if !ok { - return fmt.Errorf("api version 3 not available") - } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - return nil + return e.etcd.Setup(ctx) } type versionResponse struct { @@ -144,11 +102,7 @@ func version3Available(endpoint string) bool { } func (e *embeddedKV) Endpoints() []string { - addresses := make([]string, 0, len(e.etcd.Clients)) - for _, c := range e.etcd.Clients { - addresses = append(addresses, c.Addr().String()) - } - return addresses + return []string{e.etcd.Address()} } func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, error) { @@ -156,7 +110,9 @@ func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, err SetInstrumentOptions(e.opts.InstrumentOptions()). SetServicesOptions(services.NewOptions().SetInitTimeout(e.opts.InitTimeout())). SetClusters([]etcdclient.Cluster{ - etcdclient.NewCluster().SetZone(e.opts.Zone()).SetEndpoints(e.Endpoints()), + etcdclient.NewCluster().SetZone(e.opts.Zone()). + SetEndpoints(e.Endpoints()). + SetAutoSyncInterval(-1), }). SetService(e.opts.ServiceID()). SetEnv(e.opts.Environment()). diff --git a/src/cluster/integration/etcd/options.go b/src/cluster/integration/etcd/options.go index 2c986bebfb..4382996a3a 100644 --- a/src/cluster/integration/etcd/options.go +++ b/src/cluster/integration/etcd/options.go @@ -27,7 +27,7 @@ import ( ) const ( - defaulTimeout = 5 * time.Second + defaultTimeout = 30 * time.Second defaultDir = "etcd.dir" defaultServiceID = "integration.service" defaultEnv = "integration.env" @@ -48,7 +48,7 @@ func NewOptions() Options { return &opts{ iopts: instrument.NewOptions(), workingDir: defaultDir, - initTimeout: defaulTimeout, + initTimeout: defaultTimeout, serviceID: defaultServiceID, env: defaultEnv, zone: defaultZone, diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index a4f4773934..64c7e781d2 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -36,10 +36,10 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/golang/protobuf/proto" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -513,7 +513,7 @@ func TestWatchNonBlocking(t *testing.T) { ecluster, opts, closeFn := testCluster(t) defer closeFn() - ec := ecluster.Client(0) + ec := ecluster.RandClient() opts = opts.SetWatchChanResetInterval(200 * time.Millisecond).SetWatchChanInitTimeout(500 * time.Millisecond) diff --git a/src/cluster/services/heartbeat/etcd/store_test.go b/src/cluster/services/heartbeat/etcd/store_test.go index 4950becc1e..b38025259b 100644 --- a/src/cluster/services/heartbeat/etcd/store_test.go +++ b/src/cluster/services/heartbeat/etcd/store_test.go @@ -28,9 +28,9 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) func TestKeys(t *testing.T) { diff --git a/src/cluster/services/leader/client_test.go b/src/cluster/services/leader/client_test.go index 766d9e59fd..0fb7b75d4c 100644 --- a/src/cluster/services/leader/client_test.go +++ b/src/cluster/services/leader/client_test.go @@ -30,10 +30,10 @@ import ( "github.com/m3db/m3/src/cluster/services/leader/campaign" "github.com/m3db/m3/src/cluster/services/leader/election" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -127,11 +127,15 @@ func TestNewClient(t *testing.T) { assert.NotNil(t, svc) } +// TODO: this test most likely wasn't testing what we thought it was. While using etcd/testing/framework/integration, +// the client gets closed func TestNewClient_BadCluster(t *testing.T) { + t.Skip("This test only works with the etcd/testing/framework/integration package, " + + "and doesn't provide much signal on correctness of our code.") tc := newTestCluster(t) cl := tc.etcdClient() tc.close() - + require.NoError(t, cl.Close()) _, err := newClient(cl, tc.options(), "") assert.Error(t, err) } diff --git a/src/cluster/services/leader/election/client_test.go b/src/cluster/services/leader/election/client_test.go index af9c0b1759..6cc2db348d 100644 --- a/src/cluster/services/leader/election/client_test.go +++ b/src/cluster/services/leader/election/client_test.go @@ -25,11 +25,11 @@ import ( "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/tests/v3/framework/integration" ) type testCluster struct { diff --git a/src/cmd/services/m3dbnode/main/main_test.go b/src/cmd/services/m3dbnode/main/main_test.go index f488662251..f46e6c39aa 100644 --- a/src/cmd/services/m3dbnode/main/main_test.go +++ b/src/cmd/services/m3dbnode/main/main_test.go @@ -1,4 +1,6 @@ +//go:build big // +build big + // // Copyright (c) 2017 Uber Technologies, Inc. // @@ -51,7 +53,7 @@ import ( // TestConfig tests booting a server using file based configuration. func TestConfig(t *testing.T) { // Embedded kv - embeddedKV, err := etcd.New(etcd.NewOptions()) + embeddedKV, err := etcd.New(etcd.NewOptions().SetInitTimeout(30 * time.Second)) require.NoError(t, err) defer func() { require.NoError(t, embeddedKV.Close()) @@ -631,6 +633,7 @@ db: etcdClusters: - zone: {{.ServiceZone}} endpoints: {{.EtcdEndpoints}} + autoSyncInterval: -1 ` embeddedKVConfigPortion = `