From 6749c3fad648881e7a0795773c80ff8c0786e65d Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Wed, 31 Aug 2022 11:18:21 -0400 Subject: [PATCH] =?UTF-8?q?etcd=5Fdocker=203:=20Incorporate=20docker=20bas?= =?UTF-8?q?ed=20etcd=20integration=20package=20into=E2=80=A6=20(#4147)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * etcd_docker 2: Add a docker based etcdintegration package PR 2 for /~https://github.com/m3db/m3/issues/4144 High level approach is as described in /~https://github.com/m3db/m3/issues/4144 . This PR adds: - Functions to spin up a 1 node etcd cluster using docker (in `dockerexternal`) - A drop in replacement for the etcd/integration package using `dockerexternal` commit-id:e4e80f1d * etcd_docker 3: Incorporate docker based etcd integration package into unittests. PR 3 for /~https://github.com/m3db/m3/issues/4144 High level approach is as described in /~https://github.com/m3db/m3/issues/4144 . This PR incorporates the new test package into our unittests. Usage is via the `etcdintegration` package, which makes it transparent to the test code; it simply gets an etcd server started via different means. One piece of weirdness to call out here: the package currently relies on autosync being *disabled* on the client side. This is because the advertise client URL (aka what etcd tells clients to connect to) isn't correct for the open port on the host. That is we have: - etcd: listen on container port 0.0.0.0:2379, advertise 0.0.0.0:2379 - docker: expose etcd port 2379 to 0.0.0.0:0 on host machine (random free port) - client: connect to etcd via host machine. We could probably make this better. commit-id:263fed13 --- docker-compose.yml | 11 + go.mod | 2 - go.sum | 2 - .../integration/custom_aggregations_test.go | 5 +- src/aggregator/integration/election.go | 21 +- .../integration/metadata_change_test.go | 4 +- .../integration/multi_client_one_type_test.go | 4 +- .../multi_server_forwarding_pipeline_test.go | 2 +- .../integration/multi_server_resend_test.go | 2 +- .../one_client_multi_type_forwarded_test.go | 4 +- .../one_client_multi_type_timed_test.go | 4 +- .../one_client_multi_type_untimed_test.go | 14 +- .../integration/one_client_passthru_test.go | 2 +- .../integration/placement_change_test.go | 7 +- .../integration/resend_stress_test.go | 1 - .../integration/same_id_multi_type_test.go | 4 +- src/aggregator/integration/setup.go | 8 + src/cluster/client/etcd/client.go | 8 +- src/cluster/client/etcd/client_test.go | 24 +- src/cluster/client/etcd/config.go | 27 +- src/cluster/client/etcd/config_test.go | 7 + src/cluster/client/etcd/types.go | 5 + src/cluster/etcd/watchmanager/manager_test.go | 222 +++++++------- src/cluster/integration/etcd/etcd.go | 80 ++--- src/cluster/integration/etcd/options.go | 4 +- src/cluster/kv/etcd/store_test.go | 4 +- .../services/heartbeat/etcd/store_test.go | 2 +- src/cluster/services/leader/client_test.go | 8 +- .../services/leader/election/client_test.go | 2 +- src/cmd/services/m3dbnode/main/main_test.go | 5 +- .../resources/docker/dockerexternal/etcd.go | 279 +++++++++++++++++ .../docker/dockerexternal/etcd_options.go | 57 ++++ .../docker/dockerexternal/etcd_test.go | 184 +++++++++++ .../etcdintegration/bridge/README.md | 11 + .../etcdintegration/bridge/bridge.go | 256 ++++++++++++++++ .../dockerexternal/etcdintegration/cluster.go | 287 ++++++++++++++++++ .../etcdintegration/cluster_test.go | 80 +++++ .../dockerexternal/etcdintegration/types.go | 34 +++ src/x/dockertest/common.go | 5 +- src/x/dockertest/docker_resource.go | 3 + 40 files changed, 1462 insertions(+), 229 deletions(-) create mode 100644 src/integration/resources/docker/dockerexternal/etcd.go create mode 100644 src/integration/resources/docker/dockerexternal/etcd_options.go create mode 100644 src/integration/resources/docker/dockerexternal/etcd_test.go create mode 100644 src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md create mode 100644 src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go create mode 100644 src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go create mode 100644 src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go create mode 100644 src/integration/resources/docker/dockerexternal/etcdintegration/types.go diff --git a/docker-compose.yml b/docker-compose.yml index f8146f372c..b45b16aea8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,17 @@ services: volumes: - .:/go/src/github.com/m3db/m3 - /usr/bin/buildkite-agent:/usr/bin/buildkite-agent + # Support running docker within docker. That is, buildkite jobs themselves run in a container; that container + # needs to be able to spin up functioning docker containers. + - /var/run/docker.sock:/var/run/docker.sock + extra_hosts: + # Allow routing from the buildkite container to the host machine, as host.docker.internal. This allows us to do + # the following: + # - Spin up an etcd container with ports published to the host machine + # - Connect to the etcd container from the buildkite test process using host.docker.internal + # See + # https://medium.com/@TimvanBaarsen/how-to-connect-to-the-docker-host-from-inside-a-docker-container-112b4c71bc66 + - "host.docker.internal:host-gateway" environment: - CI - BUILDKITE diff --git a/go.mod b/go.mod index c40d0e3a82..485f992594 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,6 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 - go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 go.opentelemetry.io/collector v0.45.0 go.opentelemetry.io/otel v1.4.1 go.opentelemetry.io/otel/bridge/opentracing v1.4.1 @@ -121,7 +120,6 @@ require ( github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/btree v1.0.1 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index 770fa1c583..22fa99d1b7 100644 --- a/go.sum +++ b/go.sum @@ -1592,8 +1592,6 @@ go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 h1:BQ6CnNP4pIpy5rusFlTBxAacDgPXhuiHFwoTsB go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0/go.mod h1:/kZdrBXlc5fUgYXfIEQ0B5sb7ejXPKbtF4jWzF1exiQ= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 h1:BQUVqBqNFZZyrRbfydrRLzq9hYvCcRj97SsX1YwD7CA= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0/go.mod h1:3QM2rLq3B3hSXmVEvgVt3vEEbG/AumSs0Is7EgrlKzU= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 h1:3qrZ3p/E7CxdV1kKtAU75hHOcUoXcSTwC7ELKWyzMJo= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0/go.mod h1:hFQkP/cTsZIXXvUv+BsGHZ3TK+76XZMi5GToYA94iac= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/src/aggregator/integration/custom_aggregations_test.go b/src/aggregator/integration/custom_aggregations_test.go index d9c58cbe41..6bae441f46 100644 --- a/src/aggregator/integration/custom_aggregations_test.go +++ b/src/aggregator/integration/custom_aggregations_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -68,7 +68,6 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { if testing.Short() { t.SkipNow() } - aggTypesOpts := aggregation.NewTypesOptions(). SetCounterTypeStringTransformFn(aggregation.SuffixTransform). SetTimerTypeStringTransformFn(aggregation.SuffixTransform). @@ -179,7 +178,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/election.go b/src/aggregator/integration/election.go index 1f24e02263..221996347d 100644 --- a/src/aggregator/integration/election.go +++ b/src/aggregator/integration/election.go @@ -26,9 +26,9 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/services/leader" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) var ( @@ -40,27 +40,38 @@ var ( ) type testCluster struct { - t *testing.T - cluster *integration.Cluster + t *testing.T + cluster *integration.Cluster + leaderService services.LeaderService } func newTestCluster(t *testing.T) *testCluster { integration.BeforeTestExternal(t) - return &testCluster{ + cluster := &testCluster{ t: t, cluster: integration.NewCluster(t, &integration.ClusterConfig{ Size: testClusterSize, + // UseBridge: true, }), } + return cluster } func (tc *testCluster) LeaderService() services.LeaderService { + if tc.leaderService != nil { + return tc.leaderService + } + svc, err := leader.NewService(tc.etcdClient(), tc.options()) require.NoError(tc.t, err) - return svc + tc.leaderService = svc + return tc.leaderService } func (tc *testCluster) Close() { + if tc.leaderService != nil { + require.NoError(tc.t, tc.leaderService.Close()) + } tc.cluster.Terminate(tc.t) } diff --git a/src/aggregator/integration/metadata_change_test.go b/src/aggregator/integration/metadata_change_test.go index fa93ce7baf..9822579101 100644 --- a/src/aggregator/integration/metadata_change_test.go +++ b/src/aggregator/integration/metadata_change_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/multi_client_one_type_test.go b/src/aggregator/integration/multi_client_one_type_test.go index d05185a9ab..2af620205f 100644 --- a/src/aggregator/integration/multi_client_one_type_test.go +++ b/src/aggregator/integration/multi_client_one_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -126,7 +126,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) for i := 0; i < numClients; i++ { require.NoError(t, clients[i].close()) diff --git a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go index ec83c8f362..fb1cdbd407 100644 --- a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go +++ b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/multi_server_resend_test.go b/src/aggregator/integration/multi_server_resend_test.go index 6b7c2fb71e..7833e8d991 100644 --- a/src/aggregator/integration/multi_server_resend_test.go +++ b/src/aggregator/integration/multi_server_resend_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -142,6 +141,7 @@ func TestMultiServerResendAggregatedValues(t *testing.T) { // Election cluster setup. electionCluster := newTestCluster(t) + defer electionCluster.Close() // Sharding function maps all metrics to shard 0 except for the rollup metric, // which gets mapped to the last shard. diff --git a/src/aggregator/integration/one_client_multi_type_forwarded_test.go b/src/aggregator/integration/one_client_multi_type_forwarded_test.go index 9ead187784..039eeae391 100644 --- a/src/aggregator/integration/one_client_multi_type_forwarded_test.go +++ b/src/aggregator/integration/one_client_multi_type_forwarded_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -121,7 +121,7 @@ func TestOneClientMultiTypeForwardedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(2 * time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_timed_test.go b/src/aggregator/integration/one_client_multi_type_timed_test.go index 8449f5858b..80f8d6ec3f 100644 --- a/src/aggregator/integration/one_client_multi_type_timed_test.go +++ b/src/aggregator/integration/one_client_multi_type_timed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -119,7 +119,7 @@ func TestOneClientMultiTypeTimedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(time.Minute + 2*time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_untimed_test.go b/src/aggregator/integration/one_client_multi_type_untimed_test.go index 59f6800b66..0adc3a5c2d 100644 --- a/src/aggregator/integration/one_client_multi_type_untimed_test.go +++ b/src/aggregator/integration/one_client_multi_type_untimed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -26,9 +26,17 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/placement" + "github.com/stretchr/testify/require" +) - "github.com/m3db/m3/src/cluster/placement" +const ( + // waitForDataToFlush is the amount of time we will wait in these tests between finishing writing data to + // the aggregator, and attempting to assert that data went through. + // The aggregator generally, and these tests specifically are quite sensitive to time. + // The tests probably need a bit of a rethink to wait on (or poll for) an actual condition instead of sleeping. + waitForDataToFlush = 10 * time.Second ) func TestOneClientMultiTypeUntimedMetricsWithStagedMetadatas(t *testing.T) { @@ -114,7 +122,7 @@ func testOneClientMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_passthru_test.go b/src/aggregator/integration/one_client_passthru_test.go index 75a6f49e25..489730a123 100644 --- a/src/aggregator/integration/one_client_passthru_test.go +++ b/src/aggregator/integration/one_client_passthru_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2020 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/placement_change_test.go b/src/aggregator/integration/placement_change_test.go index 99683d7f3d..9f798b0ee1 100644 --- a/src/aggregator/integration/placement_change_test.go +++ b/src/aggregator/integration/placement_change_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -227,9 +226,9 @@ func TestPlacementChange(t *testing.T) { } clock.SetNow(start2) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) setPlacement(t, placementKey, clusterClient, finalPlacement) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) for _, data := range datasets[1] { clock.SetNow(data.timestamp) @@ -245,7 +244,7 @@ func TestPlacementChange(t *testing.T) { // Move time forward and wait for flushing to happen. clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) // Remove all the topic consumers before closing clients and servers. This allows to close the // connections between servers while they still are running. Otherwise, during server shutdown, diff --git a/src/aggregator/integration/resend_stress_test.go b/src/aggregator/integration/resend_stress_test.go index c1510ac7ee..691edad117 100644 --- a/src/aggregator/integration/resend_stress_test.go +++ b/src/aggregator/integration/resend_stress_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/same_id_multi_type_test.go b/src/aggregator/integration/same_id_multi_type_test.go index 09974adb2d..fe6b70d999 100644 --- a/src/aggregator/integration/same_id_multi_type_test.go +++ b/src/aggregator/integration/same_id_multi_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testSameIDMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index b0723392dc..3ed6726d6e 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -88,6 +88,7 @@ type testServerSetup struct { // Signals. doneCh chan struct{} closedCh chan struct{} + stopped bool } func newTestServerSetup(t *testing.T, opts testServerOptions) *testServerSetup { @@ -448,6 +449,10 @@ func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy } func (ts *testServerSetup) stopServer() error { + if ts.stopped { + return nil + } + ts.stopped = true if err := ts.aggregator.Close(); err != nil { return err } @@ -460,6 +465,9 @@ func (ts *testServerSetup) stopServer() error { func (ts *testServerSetup) close() { ts.electionCluster.Close() + if err := ts.stopServer(); err != nil { + panic(err.Error()) + } } func (tss testServerSetups) newClient(t *testing.T) *client { diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index af4a71828a..6dd5f0a338 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -339,8 +339,14 @@ func newConfigFromCluster(rnd randInt63N, cluster Cluster) (clientv3.Config, err if err != nil { return clientv3.Config{}, err } + + // Support disabling autosync if a user very explicitly requests it (via negative duration). + autoSyncInterval := cluster.AutoSyncInterval() + if autoSyncInterval < 0 { + autoSyncInterval = 0 + } cfg := clientv3.Config{ - AutoSyncInterval: cluster.AutoSyncInterval(), + AutoSyncInterval: autoSyncInterval, DialTimeout: cluster.DialTimeout(), DialOptions: cluster.DialOptions(), Endpoints: cluster.Endpoints(), diff --git a/src/cluster/client/etcd/client_test.go b/src/cluster/client/etcd/client_test.go index 3d1d0b60db..343842eb4e 100644 --- a/src/cluster/client/etcd/client_test.go +++ b/src/cluster/client/etcd/client_test.go @@ -25,18 +25,23 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/kv" + "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" + "github.com/m3db/m3/src/x/retry" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "google.golang.org/grpc" - - "github.com/m3db/m3/src/cluster/kv" - "github.com/m3db/m3/src/cluster/services" ) func TestETCDClientGen(t *testing.T) { - cs, err := NewConfigServiceClient(testOptions()) + cs, err := NewConfigServiceClient( + testOptions(). + // These are error cases; don't retry for no reason. + SetRetryOptions(retry.NewOptions().SetMaxRetries(0)), + ) require.NoError(t, err) c := cs.(*csclient) @@ -414,6 +419,15 @@ func Test_newConfigFromCluster(t *testing.T) { ) }) + t.Run("negative autosync on M3 disables autosync for etcd", func(t *testing.T) { + inputCfg := newFullConfig() + inputCfg.AutoSyncInterval = -1 + etcdCfg, err := newConfigFromCluster(testRnd, inputCfg.NewCluster()) + require.NoError(t, err) + + assert.Equal(t, time.Duration(0), etcdCfg.AutoSyncInterval) + }) + // Separate test just because the assert.Equal won't work for functions. t.Run("passes through dial options", func(t *testing.T) { clusterCfg := newFullConfig() diff --git a/src/cluster/client/etcd/config.go b/src/cluster/client/etcd/config.go index 877f2051e5..520afa385e 100644 --- a/src/cluster/client/etcd/config.go +++ b/src/cluster/client/etcd/config.go @@ -35,12 +35,22 @@ import ( // ClusterConfig is the config for a zoned etcd cluster. type ClusterConfig struct { - Zone string `yaml:"zone"` - Endpoints []string `yaml:"endpoints"` - KeepAlive *KeepAliveConfig `yaml:"keepAlive"` - TLS *TLSConfig `yaml:"tls"` - AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` - DialTimeout time.Duration `yaml:"dialTimeout"` + Zone string `yaml:"zone"` + Endpoints []string `yaml:"endpoints"` + KeepAlive *KeepAliveConfig `yaml:"keepAlive"` + TLS *TLSConfig `yaml:"tls"` + // AutoSyncInterval configures the etcd client's AutoSyncInterval + // (go.etcd.io/etcd/client/v3@v3.6.0-alpha.0/config.go:32). + // By default, it is 1m. + // + // Advanced: + // + // One important difference from the etcd config: we have autosync *on* by default (unlike etcd), meaning that + // the zero value here doesn't indicate autosync off. + // Instead, users should pass in a negative value to indicate "disable autosync" + // Only do this if you truly have a good reason for it! Most production use cases want autosync on. + AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` + DialTimeout time.Duration `yaml:"dialTimeout"` DialOptions []grpc.DialOption `yaml:"-"` // nonserializable } @@ -59,7 +69,10 @@ func (c ClusterConfig) NewCluster() Cluster { SetKeepAliveOptions(keepAliveOpts). SetTLSOptions(c.TLS.newOptions()) - if c.AutoSyncInterval > 0 { + // Autosync should *always* be on, unless the user very explicitly requests it to be off. They can do this via a + // negative value (in which case we can assume they know what they're doing). + // Therefore, only update if it's nonzero, on the assumption that zero is just the empty value. + if c.AutoSyncInterval != 0 { cluster = cluster.SetAutoSyncInterval(c.AutoSyncInterval) } diff --git a/src/cluster/client/etcd/config_test.go b/src/cluster/client/etcd/config_test.go index 1bc8959117..278b41bb1f 100644 --- a/src/cluster/client/etcd/config_test.go +++ b/src/cluster/client/etcd/config_test.go @@ -181,3 +181,10 @@ func TestDefaultConfig(t *testing.T) { require.Equal(t, defaultDialTimeout, cluster.DialTimeout()) require.Equal(t, defaultAutoSyncInterval, cluster.AutoSyncInterval()) } + +func TestConfig_negativeAutosync(t *testing.T) { + cluster := ClusterConfig{ + AutoSyncInterval: -5, + }.NewCluster() + require.Equal(t, time.Duration(-5), cluster.AutoSyncInterval()) +} diff --git a/src/cluster/client/etcd/types.go b/src/cluster/client/etcd/types.go index 9caf012e70..5b60832801 100644 --- a/src/cluster/client/etcd/types.go +++ b/src/cluster/client/etcd/types.go @@ -159,6 +159,11 @@ type Cluster interface { SetTLSOptions(TLSOptions) Cluster AutoSyncInterval() time.Duration + + // SetAutoSyncInterval sets the etcd client to autosync cluster endpoints periodically. This defaults to + // 1 minute (defaultAutoSyncInterval). If negative or zero, it will disable autosync. This differs slightly + // from the underlying etcd configuration its setting, which only supports 0 for disabling. We do this because + // there's otherwise no good way to specify "disable" in our configs (which default to SetAutoSyncInterval(1m)). SetAutoSyncInterval(value time.Duration) Cluster DialTimeout() time.Duration diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index d65758e8ff..ddc8df306c 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -22,16 +22,15 @@ package watchmanager import ( "fmt" - "runtime" "sync/atomic" "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" "github.com/m3db/m3/src/x/clock" @@ -164,114 +163,117 @@ func TestWatchRecreate(t *testing.T) { <-doneCh } -func TestWatchNoLeader(t *testing.T) { - t.Skip("flaky, started to fail very consistently on CI") - const ( - watchInitAndRetryDelay = 200 * time.Millisecond - watchCheckInterval = 50 * time.Millisecond - ) - - integration.BeforeTestExternal(t) - ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) - defer ecluster.Terminate(t) - - var ( - ec = ecluster.Client(0) - tickDuration = 10 * time.Millisecond - electionTimeout = time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration - doneCh = make(chan struct{}, 1) - eventLog = []*clientv3.Event{} - updateCalled int32 - shouldStop int32 - ) - - opts := NewOptions(). - SetClient(ec). - SetUpdateFn( - func(_ string, e []*clientv3.Event) error { - atomic.AddInt32(&updateCalled, 1) - if len(e) > 0 { - eventLog = append(eventLog, e...) - } - return nil - }, - ). - SetTickAndStopFn( - func(string) bool { - if atomic.LoadInt32(&shouldStop) == 0 { - return false - } - - close(doneCh) - - return true - }, - ). - SetWatchChanInitTimeout(watchInitAndRetryDelay). - SetWatchChanResetInterval(watchInitAndRetryDelay). - SetWatchChanCheckInterval(watchCheckInterval) - - integration.WaitClientV3(t, ec) - - wh, err := NewWatchManager(opts) - require.NoError(t, err) - - go wh.Watch("foo") - - runtime.Gosched() - time.Sleep(10 * time.Millisecond) - - // there should be a valid watch now, trigger a notification - _, err = ec.Put(context.Background(), "foo", "bar") - require.NoError(t, err) - - leaderIdx := ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - - // simulate quorum loss - ecluster.Members[1].Stop(t) - ecluster.Members[2].Stop(t) - - // wait for election timeout, then member[0] will not have a leader. - time.Sleep(electionTimeout) - - require.NoError(t, ecluster.Members[1].Restart(t)) - require.NoError(t, ecluster.Members[2].Restart(t)) - - // wait for leader + election delay just in case - time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) - - leaderIdx = ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - integration.WaitClientV3(t, ec) // wait for client to be ready again - - _, err = ec.Put(context.Background(), "foo", "baz") - require.NoError(t, err) - - // give some time for watch to be updated - require.True(t, clock.WaitUntil(func() bool { - return atomic.LoadInt32(&updateCalled) >= 2 - }, 10*time.Second)) - - updates := atomic.LoadInt32(&updateCalled) - if updates < 2 { - require.Fail(t, - "insufficient update calls", - "expected at least 2 update attempts, got %d during a partition", - updates) - } - - atomic.AddInt32(&shouldStop, 1) - <-doneCh - - require.Len(t, eventLog, 2) - require.NotNil(t, eventLog[0]) - require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) - require.NotNil(t, eventLog[1]) - require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) -} +// TODO: this test has been skipped for a while, and now doesn't work with the docker based etcd integration package. +// Revive it if it's useful, and make it no longer flake. +//nolint:gocritic +//func TestWatchNoLeader(t *testing.T) { +// t.Skip("flaky, started to fail very consistently on CI") +// const ( +// watchInitAndRetryDelay = 200 * time.Millisecond +// watchCheckInterval = 50 * time.Millisecond +// ) +// +// integration.BeforeTestExternal(t) +// ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) +// defer ecluster.Terminate(t) +// +// var ( +// ec = ecluster.Client(0) +// tickDuration = 10 * time.Millisecond +// electionTimeout = time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration +// doneCh = make(chan struct{}, 1) +// eventLog = []*clientv3.Event{} +// updateCalled int32 +// shouldStop int32 +// ) +// +// opts := NewOptions(). +// SetClient(ec). +// SetUpdateFn( +// func(_ string, e []*clientv3.Event) error { +// atomic.AddInt32(&updateCalled, 1) +// if len(e) > 0 { +// eventLog = append(eventLog, e...) +// } +// return nil +// }, +// ). +// SetTickAndStopFn( +// func(string) bool { +// if atomic.LoadInt32(&shouldStop) == 0 { +// return false +// } +// +// close(doneCh) +// +// return true +// }, +// ). +// SetWatchChanInitTimeout(watchInitAndRetryDelay). +// SetWatchChanResetInterval(watchInitAndRetryDelay). +// SetWatchChanCheckInterval(watchCheckInterval) +// +// integration.WaitClientV3(t, ec) +// +// wh, err := NewWatchManager(opts) +// require.NoError(t, err) +// +// go wh.Watch("foo") +// +// runtime.Gosched() +// time.Sleep(10 * time.Millisecond) +// +// // there should be a valid watch now, trigger a notification +// _, err = ec.Put(context.Background(), "foo", "bar") +// require.NoError(t, err) +// +// leaderIdx := ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// +// // simulate quorum loss +// ecluster.Address[1].Stop(t) +// ecluster.Address[2].Stop(t) +// +// // wait for election timeout, then member[0] will not have a leader. +// time.Sleep(electionTimeout) +// +// require.NoError(t, ecluster.Address[1].Restart(t)) +// require.NoError(t, ecluster.Address[2].Restart(t)) +// +// // wait for leader + election delay just in case +// time.Sleep(time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration) +// +// leaderIdx = ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// integration.WaitClientV3(t, ec) // wait for client to be ready again +// +// _, err = ec.Put(context.Background(), "foo", "baz") +// require.NoError(t, err) +// +// // give some time for watch to be updated +// require.True(t, clock.WaitUntil(func() bool { +// return atomic.LoadInt32(&updateCalled) >= 2 +// }, 10*time.Second)) +// +// updates := atomic.LoadInt32(&updateCalled) +// if updates < 2 { +// require.Fail(t, +// "insufficient update calls", +// "expected at least 2 update attempts, got %d during a partition", +// updates) +// } +// +// atomic.AddInt32(&shouldStop, 1) +// <-doneCh +// +// require.Len(t, eventLog, 2) +// require.NotNil(t, eventLog[0]) +// require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) +// require.NotNil(t, eventLog[1]) +// require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) +//} func TestWatchCompactedRevision(t *testing.T) { wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) diff --git a/src/cluster/integration/etcd/etcd.go b/src/cluster/integration/etcd/etcd.go index 2c93bf1200..b836e36967 100644 --- a/src/cluster/integration/etcd/etcd.go +++ b/src/cluster/integration/etcd/etcd.go @@ -21,26 +21,24 @@ package etcd import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" - "net/url" - "os" "strings" - "time" "github.com/m3db/m3/src/cluster/client" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cluster/services" - xclock "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/x/instrument" - "go.etcd.io/etcd/server/v3/embed" + "github.com/ory/dockertest/v3" ) type embeddedKV struct { - etcd *embed.Etcd + etcd *dockerexternal.EtcdNode opts Options dir string } @@ -51,12 +49,12 @@ func New(opts Options) (EmbeddedKV, error) { if err != nil { return nil, err } - cfg := embed.NewConfig() - cfg.Dir = dir - cfg.Logger = "zap" - setRandomPorts(cfg) - e, err := embed.StartEtcd(cfg) + pool, err := dockertest.NewPool("") + if err != nil { + return nil, fmt.Errorf("constructing dockertest.Pool for EmbeddedKV: %w", err) + } + e, err := dockerexternal.NewEtcd(pool, instrument.NewOptions()) if err != nil { return nil, fmt.Errorf("unable to start etcd, err: %v", err) } @@ -67,56 +65,16 @@ func New(opts Options) (EmbeddedKV, error) { }, nil } -func setRandomPorts(cfg *embed.Config) { - randomPortURL, err := url.Parse("http://localhost:0") - if err != nil { - panic(err.Error()) - } - - cfg.LPUrls = []url.URL{*randomPortURL} - cfg.APUrls = []url.URL{*randomPortURL} - cfg.LCUrls = []url.URL{*randomPortURL} - cfg.ACUrls = []url.URL{*randomPortURL} - - cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) -} - func (e *embeddedKV) Close() error { - var multi errors.MultiError - - // see if there's any errors - select { - case err := <-e.etcd.Err(): - multi = multi.Add(err) - default: - } - - // shutdown and release - e.etcd.Server.Stop() - e.etcd.Close() - - multi = multi.Add(os.RemoveAll(e.dir)) - return multi.FinalError() + return e.etcd.Close(context.TODO()) } func (e *embeddedKV) Start() error { timeout := e.opts.InitTimeout() - select { - case <-e.etcd.Server.ReadyNotify(): - break - case <-time.After(timeout): - return fmt.Errorf("etcd server took too long to start") - } - - // ensure v3 api endpoints are available, /~https://github.com/coreos/etcd/pull/7075 - apiVersionEndpoint := fmt.Sprintf("http://%s/version", e.etcd.Clients[0].Addr().String()) - fn := func() bool { return version3Available(apiVersionEndpoint) } - ok := xclock.WaitUntil(fn, timeout) - if !ok { - return fmt.Errorf("api version 3 not available") - } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - return nil + return e.etcd.Setup(ctx) } type versionResponse struct { @@ -144,11 +102,7 @@ func version3Available(endpoint string) bool { } func (e *embeddedKV) Endpoints() []string { - addresses := make([]string, 0, len(e.etcd.Clients)) - for _, c := range e.etcd.Clients { - addresses = append(addresses, c.Addr().String()) - } - return addresses + return []string{e.etcd.Address()} } func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, error) { @@ -156,7 +110,9 @@ func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, err SetInstrumentOptions(e.opts.InstrumentOptions()). SetServicesOptions(services.NewOptions().SetInitTimeout(e.opts.InitTimeout())). SetClusters([]etcdclient.Cluster{ - etcdclient.NewCluster().SetZone(e.opts.Zone()).SetEndpoints(e.Endpoints()), + etcdclient.NewCluster().SetZone(e.opts.Zone()). + SetEndpoints(e.Endpoints()). + SetAutoSyncInterval(-1), }). SetService(e.opts.ServiceID()). SetEnv(e.opts.Environment()). diff --git a/src/cluster/integration/etcd/options.go b/src/cluster/integration/etcd/options.go index 2c986bebfb..4382996a3a 100644 --- a/src/cluster/integration/etcd/options.go +++ b/src/cluster/integration/etcd/options.go @@ -27,7 +27,7 @@ import ( ) const ( - defaulTimeout = 5 * time.Second + defaultTimeout = 30 * time.Second defaultDir = "etcd.dir" defaultServiceID = "integration.service" defaultEnv = "integration.env" @@ -48,7 +48,7 @@ func NewOptions() Options { return &opts{ iopts: instrument.NewOptions(), workingDir: defaultDir, - initTimeout: defaulTimeout, + initTimeout: defaultTimeout, serviceID: defaultServiceID, env: defaultEnv, zone: defaultZone, diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index a4f4773934..64c7e781d2 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -36,10 +36,10 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/golang/protobuf/proto" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -513,7 +513,7 @@ func TestWatchNonBlocking(t *testing.T) { ecluster, opts, closeFn := testCluster(t) defer closeFn() - ec := ecluster.Client(0) + ec := ecluster.RandClient() opts = opts.SetWatchChanResetInterval(200 * time.Millisecond).SetWatchChanInitTimeout(500 * time.Millisecond) diff --git a/src/cluster/services/heartbeat/etcd/store_test.go b/src/cluster/services/heartbeat/etcd/store_test.go index 4950becc1e..b38025259b 100644 --- a/src/cluster/services/heartbeat/etcd/store_test.go +++ b/src/cluster/services/heartbeat/etcd/store_test.go @@ -28,9 +28,9 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) func TestKeys(t *testing.T) { diff --git a/src/cluster/services/leader/client_test.go b/src/cluster/services/leader/client_test.go index 766d9e59fd..0fb7b75d4c 100644 --- a/src/cluster/services/leader/client_test.go +++ b/src/cluster/services/leader/client_test.go @@ -30,10 +30,10 @@ import ( "github.com/m3db/m3/src/cluster/services/leader/campaign" "github.com/m3db/m3/src/cluster/services/leader/election" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -127,11 +127,15 @@ func TestNewClient(t *testing.T) { assert.NotNil(t, svc) } +// TODO: this test most likely wasn't testing what we thought it was. While using etcd/testing/framework/integration, +// the client gets closed func TestNewClient_BadCluster(t *testing.T) { + t.Skip("This test only works with the etcd/testing/framework/integration package, " + + "and doesn't provide much signal on correctness of our code.") tc := newTestCluster(t) cl := tc.etcdClient() tc.close() - + require.NoError(t, cl.Close()) _, err := newClient(cl, tc.options(), "") assert.Error(t, err) } diff --git a/src/cluster/services/leader/election/client_test.go b/src/cluster/services/leader/election/client_test.go index af9c0b1759..6cc2db348d 100644 --- a/src/cluster/services/leader/election/client_test.go +++ b/src/cluster/services/leader/election/client_test.go @@ -25,11 +25,11 @@ import ( "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/tests/v3/framework/integration" ) type testCluster struct { diff --git a/src/cmd/services/m3dbnode/main/main_test.go b/src/cmd/services/m3dbnode/main/main_test.go index f488662251..f46e6c39aa 100644 --- a/src/cmd/services/m3dbnode/main/main_test.go +++ b/src/cmd/services/m3dbnode/main/main_test.go @@ -1,4 +1,6 @@ +//go:build big // +build big + // // Copyright (c) 2017 Uber Technologies, Inc. // @@ -51,7 +53,7 @@ import ( // TestConfig tests booting a server using file based configuration. func TestConfig(t *testing.T) { // Embedded kv - embeddedKV, err := etcd.New(etcd.NewOptions()) + embeddedKV, err := etcd.New(etcd.NewOptions().SetInitTimeout(30 * time.Second)) require.NoError(t, err) defer func() { require.NoError(t, embeddedKV.Close()) @@ -631,6 +633,7 @@ db: etcdClusters: - zone: {{.ServiceZone}} endpoints: {{.EtcdEndpoints}} + autoSyncInterval: -1 ` embeddedKVConfigPortion = ` diff --git a/src/integration/resources/docker/dockerexternal/etcd.go b/src/integration/resources/docker/dockerexternal/etcd.go new file mode 100644 index 0000000000..71383df7cc --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd.go @@ -0,0 +1,279 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "strconv" + "time" + + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" + xdockertest "github.com/m3db/m3/src/x/dockertest" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/retry" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +var ( + etcdImage = xdockertest.Image{ + Name: "bitnami/etcd", + Tag: "3.5.4", + } +) + +// NewEtcd constructs a single etcd node, running in a docker container. +func NewEtcd( + pool *dockertest.Pool, + instrumentOpts instrument.Options, + options ...EtcdClusterOption, +) (*EtcdNode, error) { + logger := instrumentOpts.Logger() + if logger == nil { + logger = zap.NewNop() + instrumentOpts = instrumentOpts.SetLogger(logger) + } + + var opts etcdClusterOptions + for _, o := range options { + o.apply(&opts) + } + + return &EtcdNode{ + pool: pool, + instrumentOpts: instrumentOpts, + logger: logger, + opts: opts, + // Solely for mocking in tests--unfortunately we don't want to take in the etcd client as a dependency here + // (we don't know the endpoints, and therefore need to construct it ourselves). + // Thus, we do two hops (mock newClient returning mock memberClient) + newClient: func(config clientv3.Config) (memberClient, error) { + return clientv3.New(config) + }, + }, nil +} + +// EtcdNode is a single etcd node, running via a docker container. +//nolint:maligned +type EtcdNode struct { + instrumentOpts instrument.Options + logger *zap.Logger + pool *dockertest.Pool + opts etcdClusterOptions + + // namePrefix is used to name the cluster. Exists solely for unittests in this package; otherwise a const + namePrefix string + newClient func(config clientv3.Config) (memberClient, error) + + // initialized by Setup + address string + resource *xdockertest.Resource + etcdCli *clientv3.Client + bridge *bridge.Bridge + + stopped bool +} + +// Setup starts the docker container. +func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) { + if c.resource != nil { + return errors.New("etcd cluster already started") + } + + // nolint:gosec + id := rand.New(rand.NewSource(time.Now().UnixNano())).Int() + + namePrefix := "m3-test-etcd-" + if c.namePrefix != "" { + // support overriding for tests + namePrefix = c.namePrefix + } + + // Roughly, runs: + // docker run --rm --env ALLOW_NONE_AUTHENTICATION=yes -it --name Etcd bitnami/etcd + // Port 2379 on the container is bound to a free port on the host + resource, err := xdockertest.NewDockerResource(c.pool, xdockertest.ResourceOptions{ + OverrideDefaults: false, + // TODO: what even is this? + Source: "etcd", + ContainerName: fmt.Sprintf("%s%d", namePrefix, id), + Image: etcdImage, + Env: []string{"ALLOW_NONE_AUTHENTICATION=yes"}, + InstrumentOpts: c.instrumentOpts, + PortMappings: map[docker.Port][]docker.PortBinding{ + "2379/tcp": {{ + HostIP: "0.0.0.0", + HostPort: strconv.Itoa(c.opts.port), + }}, + }, + NoNetworkOverlay: true, + }) + + if err != nil { + return fmt.Errorf("starting etcd container: %w", err) + } + + container := resource.Resource().Container + c.logger.Info("etcd container started", + zap.String("containerID", container.ID), + zap.Any("ports", container.NetworkSettings.Ports), + // Uncomment if you need gory details about the container printed; equivalent of `docker inspect + // zap.Any("container", container), + ) + // Extract the port on which we are listening. + // This is coming from the equivalent of docker inspect + portBinds := container.NetworkSettings.Ports["2379/tcp"] + + // If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine. + // See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16 + ipAddr := "127.0.0.1" + _, err = net.ResolveIPAddr("ip4", "host.docker.internal") + if err == nil { + c.logger.Info("Running tests within a docker container (e.g. for buildkite. " + + "Using host.docker.internal to talk to etcd") + ipAddr = "host.docker.internal" + } + + c.resource = resource + c.address = fmt.Sprintf("%s:%s", ipAddr, portBinds[0].HostPort) + + etcdCli, err := clientv3.New( + clientv3.Config{ + Endpoints: []string{c.address}, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + Logger: c.logger, + }, + ) + if err != nil { + return fmt.Errorf("constructing etcd client: %w", err) + } + + defer func() { + if err := etcdCli.Close(); err != nil { + var merr xerrors.MultiError + closeErr = merr. + Add(closeErr). + Add(fmt.Errorf("closing etcd client: %w", err)). + FinalError() + } + }() + + return c.waitForHealth(ctx, etcdCli) +} + +func (c *EtcdNode) containerHostPort() string { + portBinds := c.resource.Resource().Container.NetworkSettings.Ports["2379/tcp"] + + return fmt.Sprintf("127.0.0.1:%s", portBinds[0].HostPort) +} + +func (c *EtcdNode) waitForHealth(ctx context.Context, memberCli memberClient) error { + retrier := retry.NewRetrier(retry.NewOptions(). + SetForever(true). + SetMaxBackoff(5 * time.Second), + ) + + var timeout time.Duration + deadline, ok := ctx.Deadline() + if ok { + timeout = deadline.Sub(time.Now()) + } + c.logger.Info( + "Waiting for etcd to report healthy (via member list)", + zap.String("timeout", timeout.String()), + ) + err := retrier.AttemptContext(ctx, func() error { + _, err := memberCli.MemberList(ctx) + if err != nil { + c.logger.Info( + "Failed connecting to etcd while waiting for container to come up", + zap.Error(err), + zap.String("endpoints", c.address), + ) + } + return err + }) + if err == nil { + c.logger.Info("etcd is healthy") + return nil + } + return fmt.Errorf("waiting for etcd to become healthy: %w", err) +} + +// Close stops the etcd node, and removes it. +func (c *EtcdNode) Close(ctx context.Context) error { + var err xerrors.MultiError + err = err. + Add(c.resource.Close()) + return err.FinalError() +} + +// Address is the host:port of the etcd node for use by etcd clients. +func (c *EtcdNode) Address() string { + return c.address +} + +// Stop stops the etcd container, but does not purge it. A stopped container can be restarted with Restart. +func (c *EtcdNode) Stop(ctx context.Context) error { + if c.stopped { + return errors.New("etcd node is already stopped") + } + if err := c.pool.Client.StopContainerWithContext(c.resource.Resource().Container.ID, 0, ctx); err != nil { + return err + } + c.stopped = true + return nil +} + +// Restart restarts the etcd container. If it isn't currently stopped, the etcd container will be stopped and then +// started; else it will just be start. +func (c *EtcdNode) Restart(ctx context.Context) error { + if !c.stopped { + c.logger.Info("Stopping etcd node") + + if err := c.Stop(ctx); err != nil { + return fmt.Errorf("stopping etcd node for Restart: %w", err) + } + } + err := c.pool.Client.StartContainerWithContext(c.resource.Resource().Container.ID, nil, ctx) + if err != nil { + return fmt.Errorf("starting etcd node for Restart: %w", err) + } + c.stopped = false + return nil +} + +var _ memberClient = (*clientv3.Client)(nil) + +// memberClient exposes just one method of *clientv3.Client, for purposes of tests. +type memberClient interface { + MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) +} diff --git a/src/integration/resources/docker/dockerexternal/etcd_options.go b/src/integration/resources/docker/dockerexternal/etcd_options.go new file mode 100644 index 0000000000..e4a9addee5 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd_options.go @@ -0,0 +1,57 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +// EtcdClusterOption configure an etcd cluster. +type EtcdClusterOption interface { + apply(opts *etcdClusterOptions) +} + +type etcdClusterOptions struct { + useBridge bool + port int +} + +// EtcdClusterUseBridge configures an EtcdNode to insert a networking "bridge" between the etcd container and the +// calling processes. The bridge intercepts network traffic, and forwards it, unless told not to via e.g. Blackhole(). +// See the bridge package. +// As noted in that package, this implementation is lifted directly from the etcd/integration package; all credit goes +// to etcd authors for the approach. +func EtcdClusterUseBridge(shouldUseBridge bool) EtcdClusterOption { + return useBridge(shouldUseBridge) +} + +type useBridge bool + +func (u useBridge) apply(opts *etcdClusterOptions) { + opts.useBridge = bool(u) +} + +// EtcdClusterPort sets a specific port for etcd to listen on. Default is to listen on :0 (any free port). +func EtcdClusterPort(port int) EtcdClusterOption { + return withPort(port) +} + +type withPort int + +func (w withPort) apply(opts *etcdClusterOptions) { + opts.port = int(w) +} diff --git a/src/integration/resources/docker/dockerexternal/etcd_test.go b/src/integration/resources/docker/dockerexternal/etcd_test.go new file mode 100644 index 0000000000..a267afb75c --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd_test.go @@ -0,0 +1,184 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +import ( + "context" + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/m3db/m3/src/x/instrument" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + testKey = "etcd-test" +) + +var ( + testLogger, _ = zap.NewDevelopment() +) + +type etcdTestDeps struct { + Pool *dockertest.Pool + InstrumentOpts instrument.Options + Etcd *EtcdNode +} + +func setupEtcdTest(t *testing.T) etcdTestDeps { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + iopts := instrument.NewOptions().SetLogger(testLogger) + c, err := NewEtcd(pool, iopts) + require.NoError(t, err) + + return etcdTestDeps{ + Pool: pool, + Etcd: c, + InstrumentOpts: iopts, + } +} + +func TestCluster(t *testing.T) { + t.Run("starts a functioning cluster", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + require.NoError(t, deps.Etcd.Setup(ctx)) + + t.Cleanup(func() { + require.NoError(t, deps.Etcd.Close(ctx)) + }) + + cli, err := clientv3.New( + clientv3.Config{ + Endpoints: []string{deps.Etcd.Address()}, + }, + ) + require.NoError(t, err) + + //nolint:gosec + testVal := strconv.Itoa(rand.Intn(10000)) + _, err = cli.Put(ctx, testKey, testVal) + require.NoError(t, err) + + actualVal, err := cli.Get(ctx, testKey) + require.NoError(t, err) + + assert.Equal(t, testVal, string(actualVal.Kvs[0].Value)) + }) + + t.Run("can run multiple at once", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + + require.NoError(t, deps.Etcd.Setup(ctx)) + defer func() { + require.NoError(t, deps.Etcd.Close(ctx)) + }() + + c2, err := NewEtcd(deps.Pool, deps.InstrumentOpts) + require.NoError(t, err) + require.NoError(t, c2.Setup(ctx)) + defer func() { + require.NoError(t, c2.Close(ctx)) + }() + }) + + t.Run("cleans up containers on shutdown", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + testPrefix := "cleanup-test-" + deps.Etcd.namePrefix = testPrefix + + findContainers := func(namePrefix string, pool *dockertest.Pool) ([]docker.APIContainers, error) { + containers, err := deps.Pool.Client.ListContainers(docker.ListContainersOptions{}) + if err != nil { + return nil, err + } + + var rtn []docker.APIContainers + for _, ct := range containers { + for _, name := range ct.Names { + // Docker response prefixes the container name with / regardless of what you give it as input. + if strings.HasPrefix(name, "/"+namePrefix) { + rtn = append(rtn, ct) + break + } + } + } + return rtn, nil + } + + require.NoError(t, deps.Etcd.Setup(ctx)) + + cts, err := findContainers(testPrefix, deps.Pool) + require.NoError(t, err) + assert.Len(t, cts, 1) + + require.NoError(t, deps.Etcd.Close(ctx)) + cts, err = findContainers(testPrefix, deps.Pool) + require.NoError(t, err) + assert.Len(t, cts, 0) + }) +} + +func TestCluster_waitForHealth(t *testing.T) { + t.Run("errors when context is canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + deps := setupEtcdTest(t) + etcdCli := fakeMemberClient{err: assert.AnError} + cancel() + require.EqualError( + t, + deps.Etcd.waitForHealth(ctx, etcdCli), + "waiting for etcd to become healthy: context canceled while retrying: context canceled", + ) + }) +} + +type fakeMemberClient struct { + err error +} + +func (f fakeMemberClient) MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) { + return nil, f.err +} + +func newTestContext() (context.Context, func()) { + return context.WithTimeout(context.Background(), 10*time.Second) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md new file mode 100644 index 0000000000..860656226a --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md @@ -0,0 +1,11 @@ +# Fork Notice + +This code is taken very directly from the etcd codebase (as is allowed by the Apache License). +The original license is included in all files. + +The file was copied from: /~https://github.com/etcd-io/etcd/blob/cdd2b737f04812a919a5735380fdaa1f932346d0/tests/framework/integration/bridge.go + +As of this notice only slight modifications have been made--primarily to satisfy linters +(lint ignores, public method comments). + +Thank you to the etcd maintainers for use of this code! \ No newline at end of file diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go new file mode 100644 index 0000000000..f8ff1525e4 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go @@ -0,0 +1,256 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bridge + +import ( + "io" + "net" + "sync" +) + +// Dialer makes TCP connections. +type Dialer interface { + Dial() (net.Conn, error) +} + +// Bridge proxies connections between listener and dialer, making it possible +// to disconnect grpc network connections without closing the logical grpc connection. +type Bridge struct { + dialer Dialer + l net.Listener + conns map[*bridgeConn]struct{} + + stopc chan struct{} + pausec chan struct{} + blackholec chan struct{} + wg sync.WaitGroup + + mu sync.Mutex +} + +// New constructs a bridge listening to the given listener and connecting using the given dialer. +func New(dialer Dialer, listener net.Listener) (*Bridge, error) { + b := &Bridge{ + // Bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number + dialer: dialer, + l: listener, + conns: make(map[*bridgeConn]struct{}), + stopc: make(chan struct{}), + pausec: make(chan struct{}), + blackholec: make(chan struct{}), + } + close(b.pausec) + b.wg.Add(1) + go b.serveListen() + return b, nil +} + +// Close stops the bridge. +func (b *Bridge) Close() { + //nolint:errcheck + b.l.Close() + b.mu.Lock() + select { + case <-b.stopc: + default: + close(b.stopc) + } + b.mu.Unlock() + b.wg.Wait() +} + +// DropConnections drops connections to the bridge. +func (b *Bridge) DropConnections() { + b.mu.Lock() + defer b.mu.Unlock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) +} + +// PauseConnections pauses all connections. +func (b *Bridge) PauseConnections() { + b.mu.Lock() + b.pausec = make(chan struct{}) + b.mu.Unlock() +} + +// UnpauseConnections unpauses all connections. +func (b *Bridge) UnpauseConnections() { + b.mu.Lock() + select { + case <-b.pausec: + default: + close(b.pausec) + } + b.mu.Unlock() +} + +func (b *Bridge) serveListen() { + defer func() { + //nolint:errcheck + b.l.Close() + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.mu.Unlock() + b.wg.Done() + }() + + for { + inc, ierr := b.l.Accept() + if ierr != nil { + return + } + b.mu.Lock() + pausec := b.pausec + b.mu.Unlock() + select { + case <-b.stopc: + //nolint:errcheck + inc.Close() + return + case <-pausec: + } + + outc, oerr := b.dialer.Dial() + if oerr != nil { + //nolint:errcheck + inc.Close() + return + } + + bc := &bridgeConn{inc, outc, make(chan struct{})} + b.wg.Add(1) + b.mu.Lock() + b.conns[bc] = struct{}{} + go b.serveConn(bc) + b.mu.Unlock() + } +} + +func (b *Bridge) serveConn(bc *bridgeConn) { + defer func() { + close(bc.donec) + bc.Close() + b.mu.Lock() + delete(b.conns, bc) + b.mu.Unlock() + b.wg.Done() + }() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + //nolint:errcheck + b.ioCopy(bc.out, bc.in) + bc.close() + wg.Done() + }() + go func() { + //nolint:errcheck + b.ioCopy(bc.in, bc.out) + bc.close() + wg.Done() + }() + wg.Wait() +} + +type bridgeConn struct { + in net.Conn + out net.Conn + donec chan struct{} +} + +func (bc *bridgeConn) Close() { + bc.close() + <-bc.donec +} + +func (bc *bridgeConn) close() { + //nolint:errcheck + bc.in.Close() + //nolint:errcheck + bc.out.Close() +} + +// Blackhole stops connections to the bridge. +func (b *Bridge) Blackhole() { + b.mu.Lock() + close(b.blackholec) + b.mu.Unlock() +} + +// Unblackhole stops connections to the bridge. +func (b *Bridge) Unblackhole() { + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) + b.blackholec = make(chan struct{}) + b.mu.Unlock() +} + +// ref. /~https://github.com/golang/go/blob/master/src/io/io.go copyBuffer +func (b *Bridge) ioCopy(dst io.Writer, src io.Reader) (err error) { + buf := make([]byte, 32*1024) + for { + select { + case <-b.blackholec: + //nolint:errcheck + io.Copy(io.Discard, src) + return nil + default: + } + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if ew != nil { + return ew + } + if nr != nw { + return io.ErrShortWrite + } + } + if er != nil { + err = er + break + } + } + return err +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go new file mode 100644 index 0000000000..a83af92fe1 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go @@ -0,0 +1,287 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package etcdintegration is a mostly drop-in replacement for the etcd integration +// (github.com/etcd-io/etcd/tests/v3/framework/integration) package. +// Instead of starting etcd within this Go process, it starts etcd using a docker container. +package etcdintegration + +import ( + "context" + "fmt" + "math/rand" + "net" + "time" + + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/retry" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" +) + +const ( + startTimeout = 30 * time.Second + stopTimeout = 30 * time.Second + + clientHealthTimeout = 30 * time.Second +) + +// ClusterConfig configures an etcd integration test cluster. +type ClusterConfig struct { + // Size is the number of nodes in the cluster. Provided as a parameter to be API compatible with the etcd package, + // but currently only one node is supported. + Size int + + // UseBridge enables a networking bridge on etcd members, accessible via Node.Bridge(). This allows manipulation + // of connections to particular members. + UseBridge bool +} + +// Cluster is an etcd cluster. Currently, the implementation is such that only one node clusters are allowed. +type Cluster struct { + // Members are the etcd nodes that make up the cluster. + Members []*Node + + terminated bool +} + +// NewCluster starts an etcd cluster using docker. +func NewCluster(t testingT, cfg *ClusterConfig) *Cluster { + if cfg.Size > 1 { + t.Errorf("NewCluster currently only supports single node clusters") + t.FailNow() + return nil + } + + logger := zaptest.NewLogger(t) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + r, err := dockerexternal.NewEtcd(pool, instrument.NewOptions(), dockerexternal.EtcdClusterUseBridge(cfg.UseBridge)) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), startTimeout) + defer cancel() + + cluster := &Cluster{ + Members: []*Node{newNode(r, logger, cfg)}, + } + + require.NoError(t, cluster.start(ctx)) + + // Paranoia: try to ensure that we cleanup the containers, even if our callers mess up. + t.Cleanup(func() { + if !cluster.terminated { + cluster.Terminate(t) + } + }) + return cluster +} + +// start is private because NewCluster is intended to always start the cluster. +func (c *Cluster) start(ctx context.Context) error { + var merr xerrors.MultiError + for _, m := range c.Members { + merr = merr.Add(m.start(ctx)) + } + if err := merr.FinalError(); err != nil { + return fmt.Errorf("failed starting etcd cluster: %w", err) + } + return nil +} + +// RandClient returns a client from any member in the cluster. +func (c *Cluster) RandClient() *clientv3.Client { + //nolint:gosec + return c.Members[rand.Intn(len(c.Members))].Client +} + +// Terminate stops all nodes in the cluster. +func (c *Cluster) Terminate(t testingT) { + ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) + defer cancel() + + c.terminated = true + + var err xerrors.MultiError + for _, node := range c.Members { + err = err.Add(node.close(ctx)) + } + require.NoError(t, err.FinalError()) +} + +// Node is a single etcd server process, running in a docker container. +type Node struct { + Client *clientv3.Client + + resource dockerEtcd + cfg *ClusterConfig + logger *zap.Logger + bridge *bridge.Bridge +} + +func newNode(r dockerEtcd, logger *zap.Logger, cfg *ClusterConfig) *Node { + return &Node{ + resource: r, + logger: logger, + cfg: cfg, + } +} + +// Stop stops the etcd container, but doesn't remove it. +func (n *Node) Stop(t testingT) { + ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) + defer cancel() + require.NoError(t, n.resource.Stop(ctx)) + + if n.bridge != nil { + n.bridge.Close() + } +} + +// Bridge can be used to manipulate connections to this etcd node. It +// is a man-in-the-middle listener which mostly transparently forwards connections, unless told to drop them via e.g. +// the Blackhole method. +// Bridge will only be active if cfg.UseBridge is true; calling this method otherwise will panic. +func (n *Node) Bridge() *bridge.Bridge { + if !n.cfg.UseBridge { + panic("EtcdNode wasn't configured to use a Bridge; pass EtcdClusterUseBridge(true) to enable.") + } + return n.bridge +} + +// Restart starts a stopped etcd container, stopping it first if it's not already. +func (n *Node) Restart(t testingT) error { + ctx, cancel := context.WithTimeout(context.Background(), startTimeout) + defer cancel() + require.NoError(t, n.resource.Restart(ctx)) + return nil +} + +// start starts the etcd node. It is private because it isn't part of the etcd/integration package API, and +// should only be called by Cluster.start. +func (n *Node) start(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, startTimeout) + defer cancel() + + if err := n.resource.Setup(ctx); err != nil { + return err + } + + address := n.resource.Address() + if n.cfg.UseBridge { + addr, err := n.setupBridge() + if err != nil { + return fmt.Errorf("setting up connection bridge for etcd node: %w", err) + } + address = addr + } + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"http://" + address}, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + DialTimeout: 5 * time.Second, + Logger: n.logger, + }) + + if err != nil { + return fmt.Errorf("constructing etcd client for member: %w", err) + } + + n.logger.Info("Connecting to docker etcd using host machine port", + zap.String("endpoint", address), + ) + + n.Client = etcdCli + return nil +} + +// setupBridge puts a man-in-the-middle listener in between the etcd docker process and the client. See Bridge() for +// details. +// Returns the new address of the bridge, which clients should connect to. +func (n *Node) setupBridge() (string, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", fmt.Errorf("setting up listener for bridge: %w", err) + } + + n.logger.Info("etcd bridge is listening", zap.String("addr", listener.Addr().String())) + + // dialer = make connections to the etcd container + // listener = the bridge's inbounds + n.bridge, err = bridge.New(dialer{hostport: n.resource.Address()}, listener) + if err != nil { + return "", err + } + + return listener.Addr().String(), nil +} + +func (n *Node) close(ctx context.Context) error { + var err xerrors.MultiError + err = err.Add(n.Client.Close()) + return err.Add(n.resource.Close(ctx)).FinalError() +} + +type dialer struct { + hostport string +} + +func (d dialer) Dial() (net.Conn, error) { + return net.Dial("tcp", d.hostport) +} + +// testingT wraps *testing.T. Allows us to not directly depend on *testing package. +type testingT interface { + zaptest.TestingT + require.TestingT + + Cleanup(func()) +} + +// BeforeTestExternal -- solely here to match etcd API's. +func BeforeTestExternal(t testingT) {} + +// WaitClientV3 waits for an etcd client to be healthy. +func WaitClientV3(t testingT, kv clientv3.KV) { + ctx, cancel := context.WithTimeout(context.Background(), clientHealthTimeout) + defer cancel() + + err := retry.NewRetrier(retry.NewOptions().SetForever(true)).AttemptContext( + ctx, + func() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, err := kv.Get(ctx, "/") + return err + }, + ) + + require.NoError(t, err) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go new file mode 100644 index 0000000000..ed84e57362 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go @@ -0,0 +1,80 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package etcdintegration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testKey = "test-key" +) + +func TestCluster(t *testing.T) { + c := NewCluster(t, &ClusterConfig{Size: 1}) + + defer c.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + testVal := newTestValue(t) + _, err := c.RandClient().Put(ctx, testKey, testVal) + require.NoError(t, err) + + resp, err := c.RandClient().Get(ctx, testKey) + require.NoError(t, err) + assert.Equal(t, testVal, string(resp.Kvs[0].Value)) +} + +func TestCluster_withBridge(t *testing.T) { + c := NewCluster(t, &ClusterConfig{Size: 1, UseBridge: true}) + + defer c.Terminate(t) + + c.Members[0].Bridge().DropConnections() + c.Members[0].Bridge().Blackhole() + + ctx := context.Background() + + blackholedCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + _, err := c.RandClient().MemberList(blackholedCtx) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + + c.Members[0].Bridge().Unblackhole() + + availCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + _, err = c.RandClient().MemberList(availCtx) + require.NoError(t, err) +} + +func newTestValue(t *testing.T) string { + return fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/types.go b/src/integration/resources/docker/dockerexternal/etcdintegration/types.go new file mode 100644 index 0000000000..7e9dc35ca9 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/types.go @@ -0,0 +1,34 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package etcdintegration + +import "context" + +// dockerEtcd are the operations we need out of a docker based etcd process. It's implemented by at least +// resources/dockerexternal.EtcdNode +type dockerEtcd interface { + Setup(ctx context.Context) error + Close(ctx context.Context) error + + Stop(ctx context.Context) error + Restart(ctx context.Context) error + Address() string +} diff --git a/src/x/dockertest/common.go b/src/x/dockertest/common.go index 01558b0d98..0cb4938651 100644 --- a/src/x/dockertest/common.go +++ b/src/x/dockertest/common.go @@ -58,6 +58,8 @@ type ResourceOptions struct { Image Image PortList []int PortMappings map[dc.Port][]dc.PortBinding + + // NoNetworkOverlay if set, disables use of the default integration testing network we create (networkName). NoNetworkOverlay bool // Env is the environment for the docker container; it corresponds 1:1 with dockertest.RunOptions. @@ -111,8 +113,7 @@ func (o ResourceOptions) WithDefaults( func newOptions(name string) *dockertest.RunOptions { return &dockertest.RunOptions{ - Name: name, - NetworkID: networkName, + Name: name, } } diff --git a/src/x/dockertest/docker_resource.go b/src/x/dockertest/docker_resource.go index c47ca1f69e..927539bcea 100644 --- a/src/x/dockertest/docker_resource.go +++ b/src/x/dockertest/docker_resource.go @@ -83,6 +83,9 @@ func NewDockerResource( } opts := newOptions(containerName) + if !resourceOpts.NoNetworkOverlay { + opts.NetworkID = networkName + } opts, err := exposePorts(opts, portList, resourceOpts.PortMappings) if err != nil { return nil, err