From 0bce4bff1ace56c6c3271e0a0d4087a9b679d860 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 21 Jul 2022 14:09:21 +0200 Subject: [PATCH 1/3] sharded-test-server: pass a shard name and a path to the kubeconfig for the root shard --- cmd/sharded-test-server/shard.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/sharded-test-server/shard.go b/cmd/sharded-test-server/shard.go index 4fa73562101..d2d7358b467 100644 --- a/cmd/sharded-test-server/shard.go +++ b/cmd/sharded-test-server/shard.go @@ -60,7 +60,8 @@ func startShard(ctx context.Context, n int, args []string, servingCA *crypto.CA, } if n > 0 { - // args = append(args, "--root-kubeconfig=.kcp-0/root.kubeconfig") + args = append(args, fmt.Sprintf("--shard-name=shard-%d", n)) + args = append(args, "--root-shard-kubeconfig-file=.kcp-0/admin.kubeconfig") args = append(args, fmt.Sprintf("--embedded-etcd-client-port=%d", 2379+n+1)) args = append(args, fmt.Sprintf("--embedded-etcd-peer-port=%d", (2379+n+1)+1)) // prev value +1 } From 4bc12d59fcc17c079d52250768866bdc01f61628 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Tue, 26 Jul 2022 16:17:09 +0200 Subject: [PATCH 2/3] introduce NewWildcardIdentitiesWrappingRoundTripper NewWildcardIdentitiesWrappingRoundTripper creates a HTTP RoundTripper that injected resource identities for individual group or group resources. Each group or resource is coming from one APIExport whose names are passed in as a map. The RoundTripper is exposed as a function that allows wrapping the RoundTripper. The method also returns the resolve function that gets the APIExports and extract the identities. The resolve func might return an error if the APIExport is not found or for other reason. Only after it succeeds a client using the returned RoundTripper can use the group and group resources with identities. --- pkg/server/bootstrap/identity.go | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/server/bootstrap/identity.go b/pkg/server/bootstrap/identity.go index 2963c6c4b2c..41178aed728 100644 --- a/pkg/server/bootstrap/identity.go +++ b/pkg/server/bootstrap/identity.go @@ -84,7 +84,24 @@ func NewConfigWithWildcardIdentities(config *rest.Config, groupExportNames map[string]string, grouptResourceExportNames map[schema.GroupResource]string, kcpClient kcpclient.Interface) (identityConfig *rest.Config, resolve func(ctx context.Context) error) { + identityRoundTripper, identityResolver := NewWildcardIdentitiesWrappingRoundTripper(groupExportNames, grouptResourceExportNames, kcpClient) + identityConfig = rest.CopyConfig(config) + identityConfig.Wrap(identityRoundTripper) + return identityConfig, identityResolver +} +// NewWildcardIdentitiesWrappingRoundTripper creates an HTTP RoundTripper +// that injected resource identities for individual group or group resources. +// Each group or resource is coming from one APIExport whose names are passed in as a map. +// The RoundTripper is exposed as a function that allows wrapping the RoundTripper +// +// The method also returns the resolve function that gets the APIExports and extract the identities. +// The resolve func might return an error if the APIExport is not found or for other reason. Only +// after it succeeds a client using the returned RoundTripper can use the group and group resources +// with identities. +func NewWildcardIdentitiesWrappingRoundTripper(groupExportNames map[string]string, + groupResourceExportNames map[schema.GroupResource]string, + kcpClient kcpclient.Interface) (func(rt http.RoundTripper) http.RoundTripper, func(ctx context.Context) error) { ids := &identities{ groupIdentities: map[string]string{}, groupResourceIdentities: map[schema.GroupResource]string{}, @@ -93,14 +110,18 @@ func NewConfigWithWildcardIdentities(config *rest.Config, for group := range groupExportNames { ids.groupIdentities[group] = "" } - for gr := range grouptResourceExportNames { + for gr := range groupResourceExportNames { ids.groupResourceIdentities[gr] = "" } - identityConfig = rest.CopyConfig(config) - identityConfig.Wrap(injectKcpIdentities(ids)) + return injectKcpIdentities(ids), wildcardIdentitiesResolver(ids, groupExportNames, groupResourceExportNames, kcpClient) +} - return identityConfig, func(ctx context.Context) error { +func wildcardIdentitiesResolver(ids *identities, + groupExportNames map[string]string, + groupResourceExportNames map[schema.GroupResource]string, + kcpClient kcpclient.Interface) func(ctx context.Context) error { + return func(ctx context.Context) error { var errs []error for group, name := range groupExportNames { ids.lock.RLock() @@ -127,7 +148,7 @@ func NewConfigWithWildcardIdentities(config *rest.Config, klog.V(2).Infof("APIExport %s|%s for group %q has identity %s", logicalcluster.From(apiExport), name, group, apiExport.Status.IdentityHash) } - for gr, name := range grouptResourceExportNames { + for gr, name := range groupResourceExportNames { ids.lock.RLock() id := ids.groupResourceIdentities[gr] ids.lock.RUnlock() @@ -152,7 +173,6 @@ func NewConfigWithWildcardIdentities(config *rest.Config, klog.V(2).Infof("APIExport %s|%s for resource %s.%s has identity %s", logicalcluster.From(apiExport), name, gr.Resource, gr.Group, apiExport.Status.IdentityHash) } - return errorsutil.NewAggregate(errs) } } From c881323808f357ad8bf93ad3a1fbbca34f1adc2f Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 1 Aug 2022 09:02:55 +0200 Subject: [PATCH 3/3] wire TemporaryRootShardKcpSharedInformerFactory --- pkg/server/config.go | 53 ++++++++++++++++++++++++----- pkg/server/server.go | 81 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 111 insertions(+), 23 deletions(-) diff --git a/pkg/server/config.go b/pkg/server/config.go index f2905008c6d..c4cce895eb5 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -38,6 +38,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clusters" "k8s.io/kubernetes/pkg/genericcontrolplane" "k8s.io/kubernetes/pkg/genericcontrolplane/aggregator" @@ -86,6 +87,7 @@ type ExtraConfig struct { KubeClusterClient kubernetes.ClusterInterface ApiExtensionsClusterClient apiextensionsclient.ClusterInterface KcpClusterClient kcpclient.ClusterInterface + RootShardKcpClusterClient kcpclient.ClusterInterface // misc preHandlerChainMux *handlerChainMuxes @@ -100,6 +102,9 @@ type ExtraConfig struct { // TODO(p0lyn0mial): get rid of TemporaryRootShardKcpSharedInformerFactory, in the future // we should have multi-shard aware informers // + // TODO(p0lyn0mial): wire it to the root shard, this will be needed to get bindings, + // eventually it will be replaced by replication + // // TemporaryRootShardKcpSharedInformerFactory bring data from the root shard TemporaryRootShardKcpSharedInformerFactory kcpexternalversions.SharedInformerFactory } @@ -164,14 +169,47 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) { c.GenericConfig.RequestInfoResolver = requestinfo.NewFactory() // must be set here early to avoid a crash in the EnableMultiCluster roundtrip wrapper // Setup kcp * informers, but those will need the identities for the APIExports used to make the APIs available. - // The identities are not known before we can get them from the APIExports via the loopback client, hence we postpone - // this to getOrCreateKcpIdentities() in the kcp-start-informers post-start hook. + // The identities are not known before we can get them from the APIExports via the loopback client or from the root shard in case this is a non-root shard, + // hence we postpone this to getOrCreateKcpIdentities() in the kcp-start-informers post-start hook. // The informers here are not used before the informers are actually started (i.e. no race). - nonIdentityKcpClusterClient, err := kcpclient.NewClusterForConfig(c.GenericConfig.LoopbackClientConfig) // can only used for wildcard requests of apis.kcp.dev - if err != nil { - return nil, err + if len(c.Options.Extra.RootShardKubeconfigFile) > 0 { + // TODO(p0lyn0mial): use kcp-admin instead of system:admin + nonIdentityRootKcpShardSystemAdminConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{ExplicitPath: c.Options.Extra.RootShardKubeconfigFile}, &clientcmd.ConfigOverrides{CurrentContext: "system:admin"}).ClientConfig() + if err != nil { + return nil, fmt.Errorf("failed to load the kubeconfig from: %s, for the root shard, err: %w", c.Options.Extra.RootShardKubeconfigFile, err) + } + nonIdentityRootKcpShardClient, err := kcpclient.NewClusterForConfig(nonIdentityRootKcpShardSystemAdminConfig) // can only used for wildcard requests of apis.kcp.dev + if err != nil { + return nil, err + } + var kcpShardIdentityRoundTripper func(rt http.RoundTripper) http.RoundTripper + kcpShardIdentityRoundTripper, c.resolveIdentities = boostrap.NewWildcardIdentitiesWrappingRoundTripper(boostrap.KcpRootGroupExportNames, boostrap.KcpRootGroupResourceExportNames, nonIdentityRootKcpShardClient.Cluster(tenancyv1alpha1.RootCluster)) + rootKcpShardIdentityConfig := rest.CopyConfig(nonIdentityRootKcpShardSystemAdminConfig) + rootKcpShardIdentityConfig.Wrap(kcpShardIdentityRoundTripper) + c.RootShardKcpClusterClient, err = kcpclient.NewClusterForConfig(rootKcpShardIdentityConfig) + if err != nil { + return nil, err + } + c.TemporaryRootShardKcpSharedInformerFactory = kcpexternalversions.NewSharedInformerFactoryWithOptions( + c.RootShardKcpClusterClient.Cluster(logicalcluster.Wildcard), + resyncPeriod, + kcpexternalversions.WithExtraClusterScopedIndexers(indexers.ClusterScoped()), + kcpexternalversions.WithExtraNamespaceScopedIndexers(indexers.NamespaceScoped()), + ) + + c.identityConfig = rest.CopyConfig(c.GenericConfig.LoopbackClientConfig) + c.identityConfig.Wrap(kcpShardIdentityRoundTripper) + } else { + // create an empty non-functional factory so that code that uses it but doesn't need it, doesn't have to check against the nil value + c.TemporaryRootShardKcpSharedInformerFactory = kcpexternalversions.NewSharedInformerFactory(nil, resyncPeriod) + + // The informers here are not used before the informers are actually started (i.e. no race). + nonIdentityKcpClusterClient, err := kcpclient.NewClusterForConfig(c.GenericConfig.LoopbackClientConfig) // can only used for wildcard requests of apis.kcp.dev + if err != nil { + return nil, err + } + c.identityConfig, c.resolveIdentities = boostrap.NewConfigWithWildcardIdentities(c.GenericConfig.LoopbackClientConfig, boostrap.KcpRootGroupExportNames, boostrap.KcpRootGroupResourceExportNames, nonIdentityKcpClusterClient.Cluster(tenancyv1alpha1.RootCluster)) } - c.identityConfig, c.resolveIdentities = boostrap.NewConfigWithWildcardIdentities(c.GenericConfig.LoopbackClientConfig, boostrap.KcpRootGroupExportNames, boostrap.KcpRootGroupResourceExportNames, nonIdentityKcpClusterClient.Cluster(tenancyv1alpha1.RootCluster)) c.KcpClusterClient, err = kcpclient.NewClusterForConfig(c.identityConfig) // this is now generic to be used for all kcp API groups if err != nil { return nil, err @@ -183,9 +221,6 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) { kcpexternalversions.WithExtraNamespaceScopedIndexers(indexers.NamespaceScoped()), ) - // create an empty non-functional factory so that code that uses it but doesn't need it, doesn't have to check against the nil value - c.TemporaryRootShardKcpSharedInformerFactory = kcpexternalversions.NewSharedInformerFactory(nil, resyncPeriod) - // Setup kube * informers c.KubeClusterClient, err = kubernetes.NewClusterForConfig(c.GenericConfig.LoopbackClientConfig) if err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index d68367b5f9f..240f1012f7a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -18,12 +18,14 @@ package server import ( "context" + "fmt" "net/http" _ "net/http/pprof" "time" "github.com/kcp-dev/logicalcluster/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" @@ -157,7 +159,6 @@ func (s *Server) Run(ctx context.Context) error { // nolint:nilerr return nil // don't klog.Fatal. This only happens when context is cancelled. } - klog.Infof("Finished starting APIExport and APIBinding informers") if s.Options.Extra.ShardName == tenancyv1alpha1.RootShard { @@ -172,24 +173,76 @@ func (s *Server) Run(ctx context.Context) error { return nil // don't klog.Fatal. This only happens when context is cancelled. } klog.Infof("Bootstrapped root workspace phase 0") - } - klog.Infof("Getting kcp APIExport identities") + klog.Infof("Getting kcp APIExport identities") + if err := wait.PollImmediateInfiniteWithContext(goContext(ctx), time.Millisecond*500, func(ctx context.Context) (bool, error) { + if err := s.resolveIdentities(ctx); err != nil { + klog.V(3).Infof("failed to resolve identities, keeping trying: %v", err) + return false, nil + } + return true, nil + }); err != nil { + klog.Errorf("failed to get or create identities: %v", err) + // nolint:nilerr + return nil // don't klog.Fatal. This only happens when context is cancelled. + } + klog.Infof("Finished getting kcp APIExport identities") + } else if len(s.Options.Extra.RootShardKubeconfigFile) > 0 { + klog.Info("Starting setting up kcp informers for the root shard") + + go s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().Run(ctx.StopCh) + go s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().Run(ctx.StopCh) + + if err := wait.PollInfiniteWithContext(goContext(ctx), time.Millisecond*100, func(ctx context.Context) (bool, error) { + exportsSynced := s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced() + bindingsSynced := s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().HasSynced() + return exportsSynced && bindingsSynced, nil + }); err != nil { + klog.Errorf("failed to start APIExport and/or APIBinding informers for the root shard: %w", err) + // nolint:nilerr + return nil // don't klog.Fatal. This only happens when context is cancelled. + } + klog.Infof("Finished starting APIExport and APIBinding informers for the root shard") + + klog.Infof("Getting kcp APIExport identities for the root shard") + if err := wait.PollImmediateInfiniteWithContext(goContext(ctx), time.Millisecond*500, func(ctx context.Context) (bool, error) { + if err := s.resolveIdentities(ctx); err != nil { + klog.V(3).Infof("failed to resolve identities for the root shard, keeping trying: %w", err) + return false, nil + } + return true, nil + }); err != nil { + klog.Errorf("failed to get or create identities for the root shard: %w", err) + // nolint:nilerr + return nil // don't klog.Fatal. This only happens when context is cancelled. + } + + klog.Infof("Finished getting kcp APIExport identities for the root shard") + + s.TemporaryRootShardKcpSharedInformerFactory.Start(ctx.StopCh) + s.TemporaryRootShardKcpSharedInformerFactory.WaitForCacheSync(ctx.StopCh) - if err := wait.PollImmediateInfiniteWithContext(goContext(ctx), time.Millisecond*500, func(ctx context.Context) (bool, error) { - if err := s.resolveIdentities(ctx); err != nil { - klog.V(3).Infof("failed to resolve identities, keeping trying: %v", err) - return false, nil + select { + case <-ctx.StopCh: + return nil // context closed, avoid reporting success below + default: } - return true, nil - }); err != nil { - klog.Errorf("failed to get or create identities: %v", err) - // nolint:nilerr - return nil // don't klog.Fatal. This only happens when context is cancelled. + klog.Infof("Finished starting kcp informers for the root shard") + + klog.Info("Creating ClusterWorkspaceShard resource in the root shard") + shard := &tenancyv1alpha1.ClusterWorkspaceShard{ + ObjectMeta: metav1.ObjectMeta{Name: s.Options.Extra.ShardName}, + Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{ + BaseURL: fmt.Sprintf("https://%v", s.GenericConfig.ExternalAddress), + ExternalURL: fmt.Sprintf("https://%v", s.Options.Extra.ShardExternalURL), + }, + } + if _, err := s.RootShardKcpClusterClient.Cluster(tenancyv1alpha1.RootCluster).TenancyV1alpha1().ClusterWorkspaceShards().Create(goContext(ctx), shard, metav1.CreateOptions{}); err != nil { + return err + } + klog.Info("Finished creating ClusterWorkspaceShard resource in the root shard") } - klog.Infof("Finished getting kcp APIExport identities") - s.KcpSharedInformerFactory.Start(ctx.StopCh) s.KcpSharedInformerFactory.WaitForCacheSync(ctx.StopCh)