diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 34eafdd7c070..abc43c33a2b8 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -41,8 +41,14 @@ import ( "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/pkg/v3/logutil" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/credentials" snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -55,7 +61,7 @@ import ( ) const ( - testTimeout = time.Second * 30 + statusTimeout = time.Second * 30 manageTickerTime = time.Second * 15 learnerMaxStallTime = time.Minute * 5 memberRemovalTimeout = time.Minute * 1 @@ -206,34 +212,28 @@ func (e *ETCD) Test(ctx context.Context) error { return errors.New("etcd datastore is not started") } - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - endpoints := getEndpoints(e.config) - status, err := e.client.Status(ctx, endpoints[0]) + status, err := e.status(ctx) if err != nil { - return err - } - - if status.IsLearner { + return errors.Wrap(err, "failed to get etcd status") + } else if status.IsLearner { return errors.New("this server has not yet been promoted from learner to voting member") } + logrus.Infof("Connected to etcd %s - datastore using %d of %d bytes", status.Version, status.DbSizeInUse, status.DbSize) if err := e.defragment(ctx); err != nil { return errors.Wrap(err, "failed to defragment etcd database") } - if err := e.clearAlarms(ctx); err != nil { + if err := e.clearAlarms(ctx, status.Header.MemberId); err != nil { return errors.Wrap(err, "failed to report and disarm etcd alarms") } // refresh status to see if any errors remain after clearing alarms - status, err = e.client.Status(ctx, endpoints[0]) - if err != nil { + // in addition to errors exposed via the alarm API, this will also return + // an error if the cluster has no leader. + if status, err := e.status(ctx); err != nil { return err - } - - if len(status.Errors) > 0 { + } else if len(status.Errors) > 0 { return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", ")) } @@ -242,6 +242,7 @@ func (e *ETCD) Test(ctx context.Context) error { return err } + // Ensure that there is a cluster member with our peerURL and name var memberNameUrls []string for _, member := range members.Members { for _, peerURL := range member.PeerURLs { @@ -253,6 +254,8 @@ func (e *ETCD) Test(ctx context.Context) error { memberNameUrls = append(memberNameUrls, member.Name+"="+member.PeerURLs[0]) } } + + // no matching PeerURL on any Member, return an error that indicates what was expected vs what we found. return &membershipError{members: memberNameUrls, self: e.name + "=" + e.peerURL()} } @@ -523,7 +526,7 @@ func (e *ETCD) startClient(ctx context.Context) error { e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey - client, err := getClient(ctx, e.config, endpoints...) + client, conn, err := getClient(ctx, e.config, endpoints...) if err != nil { return err } @@ -531,9 +534,8 @@ func (e *ETCD) startClient(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() return nil @@ -554,11 +556,11 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er return err } - client, err := getClient(clientCtx, e.config, clientURLs...) + client, conn, err := getClient(clientCtx, e.config, clientURLs...) if err != nil { return err } - defer client.Close() + defer conn.Close() for _, member := range memberList.Members { for _, peer := range member.PeerURLs { @@ -725,13 +727,53 @@ func (e *ETCD) infoHandler() http.Handler { // If the runtime config does not list any endpoints, the default endpoint is used. // The returned client should be closed when no longer needed, in order to avoid leaking GRPC // client goroutines. -func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, error) { +func getClient(ctx context.Context, control *config.Control, endpoints ...string) (*clientv3.Client, *grpc.ClientConn, error) { + logger, err := logutil.CreateDefaultZapLogger(zapcore.DebugLevel) + if err != nil { + return nil, nil, err + } + cfg, err := getClientConfig(ctx, control, endpoints...) if err != nil { - return nil, err + return nil, nil, err + } + + // Set up dialer and resolver options. + // This is normally handled by clientv3.New() but that wraps all the GRPC + // service with retry handlers and uses deprecated grpc.DialContext() which + // tries to establish a connection even when one isn't wanted. + if cfg.DialKeepAliveTime > 0 { + params := keepalive.ClientParameters{ + Time: cfg.DialKeepAliveTime, + Timeout: cfg.DialKeepAliveTimeout, + PermitWithoutStream: cfg.PermitWithoutStream, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithKeepaliveParams(params)) } - return clientv3.New(*cfg) + if cfg.TLS != nil { + creds := credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials() + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(creds)) + } else { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + cfg.DialOptions = append(cfg.DialOptions, grpc.WithResolvers(NewSimpleResolver(cfg.Endpoints[0]))) + + target := fmt.Sprintf("%s://%p/%s", scheme, cfg, authority(cfg.Endpoints[0])) + conn, err := grpc.NewClient(target, cfg.DialOptions...) + if err != nil { + return nil, nil, err + } + + // Create a new client and wire up the GRPC service interfaces. + // Ref: /~https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/client.go#L87 + client := clientv3.NewCtxClient(ctx, clientv3.WithZapLogger(logger.Named(version.Program+"-etcd-client"))) + client.Cluster = clientv3.NewClusterFromClusterClient(etcdserverpb.NewClusterClient(conn), client) + client.KV = clientv3.NewKVFromKVClient(etcdserverpb.NewKVClient(conn), client) + client.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(etcdserverpb.NewMaintenanceClient(conn), client) + + return client, conn, nil } // getClientConfig generates an etcd client config connected to the specified endpoints. @@ -851,11 +893,11 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error { } defer sqliteClient.Close() - etcdClient, err := getClient(ctx, e.config) + etcdClient, conn, err := getClient(ctx, e.config) if err != nil { return err } - defer etcdClient.Close() + defer conn.Close() values, err := sqliteClient.List(ctx, "/registry/", 0) if err != nil { @@ -984,7 +1026,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { return errors.New("etcd datastore already started") } - client, err := getClient(ctx, e.config) + client, conn, err := getClient(ctx, e.config) if err != nil { return err } @@ -992,9 +1034,8 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error { go func() { <-ctx.Done() - client := e.client e.client = nil - client.Close() + conn.Close() }() if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil { @@ -1251,8 +1292,6 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre } func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) { - ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() resp, err := e.client.Status(ctx, url) if err != nil { return resp, errors.Wrap(err, "failed to check etcd member status") @@ -1363,12 +1402,10 @@ func (e *ETCD) setLearnerProgress(ctx context.Context, status *learnerProgress) return err } -// clearAlarms checks for any alarms on the local etcd member. If found, they are -// reported and the alarm state is cleared. -func (e *ETCD) clearAlarms(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - +// clearAlarms checks for any NOSPACE alarms on the local etcd member. +// If found, they are reported and the alarm state is cleared. +// Other alarm types are not handled. +func (e *ETCD) clearAlarms(ctx context.Context, memberID uint64) error { if e.client == nil { return errors.New("etcd client was nil") } @@ -1380,21 +1417,36 @@ func (e *ETCD) clearAlarms(ctx context.Context) error { for _, alarm := range alarmList.Alarms { logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm) - } - - if len(alarmList.Alarms) > 0 { - if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil { - return fmt.Errorf("etcd alarm disarm failed: %v", err) + if alarm.MemberID != memberID { + continue + } + if alarm.Alarm == etcdserverpb.AlarmType_NOSPACE { + if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{MemberID: alarm.MemberID, Alarm: alarm.Alarm}); err != nil { + return fmt.Errorf("etcd alarm disarm failed: %v", err) + } + logrus.Infof("Alarm disarmed successfully") + } else { + return fmt.Errorf("unhandled etcd alarm type: %s", alarm.Alarm) } - logrus.Infof("Alarms disarmed on etcd server") } return nil } -func (e *ETCD) defragment(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, testTimeout) +// status returns status using the first etcd endpoint. +func (e *ETCD) status(ctx context.Context) (*clientv3.StatusResponse, error) { + if e.client == nil { + return nil, errors.New("etcd client was nil") + } + + ctx, cancel := context.WithTimeout(ctx, statusTimeout) defer cancel() + endpoints := getEndpoints(e.config) + return e.client.Status(ctx, endpoints[0]) +} + +// defragment defragments the etcd datastore using the first etcd endpoint +func (e *ETCD) defragment(ctx context.Context) error { if e.client == nil { return errors.New("etcd client was nil") } @@ -1550,11 +1602,11 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) // GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd // and unmarshal it to a list of apiserver endpoints. func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) { - cl, err := getClient(ctx, cfg) + cl, conn, err := getClient(ctx, cfg) if err != nil { return nil, err } - defer cl.Close() + defer conn.Close() etcdResp, err := cl.KV.Get(ctx, AddressKey) if err != nil { @@ -1576,9 +1628,6 @@ func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]strin // GetMembersClientURLs will list through the member lists in etcd and return // back a combined list of client urls for each member in the cluster func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - members, err := e.client.MemberList(ctx) if err != nil { return nil, err @@ -1593,24 +1642,6 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { return clientURLs, nil } -// GetMembersNames will list through the member lists in etcd and return -// back a combined list of member names -func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) { - ctx, cancel := context.WithTimeout(ctx, testTimeout) - defer cancel() - - members, err := e.client.MemberList(ctx) - if err != nil { - return nil, err - } - - var memberNames []string - for _, member := range members.Members { - memberNames = append(memberNames, member.Name) - } - return memberNames, nil -} - // RemoveSelf will remove the member if it exists in the cluster. This should // only be called on a node that may have previously run etcd, but will not // currently run etcd, to ensure that it is not a member of the cluster. diff --git a/pkg/etcd/resolver.go b/pkg/etcd/resolver.go new file mode 100644 index 000000000000..b95242cbfa91 --- /dev/null +++ b/pkg/etcd/resolver.go @@ -0,0 +1,80 @@ +package etcd + +import ( + "net/url" + "path" + "strings" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" +) + +const scheme = "etcd-endpoint" + +type EtcdSimpleResolver struct { + *manual.Resolver + endpoint string +} + +// Cribbed from /~https://github.com/etcd-io/etcd/blob/v3.5.16/client/v3/internal/resolver/resolver.go +// but only supports a single fixed endpoint. We use this instead of the internal etcd client resolver +// because the agent loadbalancer handles failover and we don't want etcd or grpc's special behavior. +func NewSimpleResolver(endpoint string) *EtcdSimpleResolver { + r := manual.NewBuilderWithScheme(scheme) + return &EtcdSimpleResolver{Resolver: r, endpoint: endpoint} +} + +func (r *EtcdSimpleResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + res, err := r.Resolver.Build(target, cc, opts) + if err != nil { + return nil, err + } + + if r.CC != nil { + addr, serverName := interpret(r.endpoint) + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: addr, ServerName: serverName}}, + }) + } + + return res, nil +} + +func interpret(ep string) (string, string) { + if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") { + if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") { + _, absolutePath, _ := strings.Cut(ep, "://") + return "unix://" + absolutePath, path.Base(absolutePath) + } + if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") { + _, localPath, _ := strings.Cut(ep, "://") + return "unix:" + localPath, path.Base(localPath) + } + _, localPath, _ := strings.Cut(ep, ":") + return "unix:" + localPath, path.Base(localPath) + } + if strings.Contains(ep, "://") { + url, err := url.Parse(ep) + if err != nil { + return ep, ep + } + if url.Scheme == "http" || url.Scheme == "https" { + return url.Host, url.Host + } + return ep, url.Host + } + return ep, ep +} + +func authority(ep string) string { + if _, authority, ok := strings.Cut(ep, "://"); ok { + return authority + } + if suff, ok := strings.CutPrefix(ep, "unix:"); ok { + return suff + } + if suff, ok := strings.CutPrefix(ep, "unixs:"); ok { + return suff + } + return ep +} diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 8f844981f5d9..90919c2403af 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -27,7 +27,7 @@ import ( "github.com/pkg/errors" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" - snapshotv3 "go.etcd.io/etcd/etcdutl/v3/snapshot" + snapshotv3 "go.etcd.io/etcd/client/v3/snapshot" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -243,7 +243,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) { var sf *snapshot.File - if err := snapshotv3.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil { + if err := snapshotv3.Save(ctx, e.client.GetLogger(), *cfg, snapshotPath); err != nil { sf = &snapshot.File{ Name: snapshotName, Location: "",