Skip to content

Commit

Permalink
wire TemporaryRootShardKcpSharedInformerFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
p0lyn0mial committed Aug 1, 2022
1 parent 4bc12d5 commit 64aa157
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 23 deletions.
53 changes: 44 additions & 9 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clusters"
"k8s.io/kubernetes/pkg/genericcontrolplane"
"k8s.io/kubernetes/pkg/genericcontrolplane/aggregator"
Expand Down Expand Up @@ -86,6 +87,7 @@ type ExtraConfig struct {
KubeClusterClient kubernetes.ClusterInterface
ApiExtensionsClusterClient apiextensionsclient.ClusterInterface
KcpClusterClient kcpclient.ClusterInterface
RootShardKcpClusterClient kcpclient.ClusterInterface

// misc
preHandlerChainMux *handlerChainMuxes
Expand All @@ -100,6 +102,9 @@ type ExtraConfig struct {
// TODO(p0lyn0mial): get rid of TemporaryRootShardKcpSharedInformerFactory, in the future
// we should have multi-shard aware informers
//
// TODO(p0lyn0mial): wire it to the root shard, this will be needed to get bindings,
// eventually it will be replaced by replication
//
// TemporaryRootShardKcpSharedInformerFactory bring data from the root shard
TemporaryRootShardKcpSharedInformerFactory kcpexternalversions.SharedInformerFactory
}
Expand Down Expand Up @@ -164,14 +169,47 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) {
c.GenericConfig.RequestInfoResolver = requestinfo.NewFactory() // must be set here early to avoid a crash in the EnableMultiCluster roundtrip wrapper

// Setup kcp * informers, but those will need the identities for the APIExports used to make the APIs available.
// The identities are not known before we can get them from the APIExports via the loopback client, hence we postpone
// this to getOrCreateKcpIdentities() in the kcp-start-informers post-start hook.
// The identities are not known before we can get them from the APIExports via the loopback client or from the root shard in case this is a non-root shard,
// hence we postpone this to getOrCreateKcpIdentities() in the kcp-start-informers post-start hook.
// The informers here are not used before the informers are actually started (i.e. no race).
nonIdentityKcpClusterClient, err := kcpclient.NewClusterForConfig(c.GenericConfig.LoopbackClientConfig) // can only used for wildcard requests of apis.kcp.dev
if err != nil {
return nil, err
if len(c.Options.Extra.RootShardKubeconfigFile) > 0 {
// TODO(p0lyn0mial): use kcp-admin instead of system:admin
nonIdentityRootKcpShardSystemAdminConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{ExplicitPath: c.Options.Extra.RootShardKubeconfigFile}, &clientcmd.ConfigOverrides{CurrentContext: "system:admin"}).ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to load the kubeconfig from: %s, for the root shard, err: %w", c.Options.Extra.RootShardKubeconfigFile, err)
}
nonIdentityRootKcpShardClient, err := kcpclient.NewClusterForConfig(nonIdentityRootKcpShardSystemAdminConfig)
if err != nil {
return nil, err
}
var kcpShardIdentityRoundTripper func(rt http.RoundTripper) http.RoundTripper
kcpShardIdentityRoundTripper, c.resolveIdentities = boostrap.NewWildcardIdentitiesWrappingRoundTripper(boostrap.KcpRootGroupExportNames, boostrap.KcpRootGroupResourceExportNames, nonIdentityRootKcpShardClient.Cluster(tenancyv1alpha1.RootCluster))
rootKcpShardIdentityConfig := rest.CopyConfig(nonIdentityRootKcpShardSystemAdminConfig)
rootKcpShardIdentityConfig.Wrap(kcpShardIdentityRoundTripper)
c.RootShardKcpClusterClient, err = kcpclient.NewClusterForConfig(rootKcpShardIdentityConfig) // this is now generic to be used for all kcp API groups
if err != nil {
return nil, err
}
c.TemporaryRootShardKcpSharedInformerFactory = kcpexternalversions.NewSharedInformerFactoryWithOptions(
c.RootShardKcpClusterClient.Cluster(logicalcluster.Wildcard),
resyncPeriod,
kcpexternalversions.WithExtraClusterScopedIndexers(indexers.ClusterScoped()),
kcpexternalversions.WithExtraNamespaceScopedIndexers(indexers.NamespaceScoped()),
)

c.identityConfig = rest.CopyConfig(c.GenericConfig.LoopbackClientConfig)
c.identityConfig.Wrap(kcpShardIdentityRoundTripper)
} else {
// create an empty non-functional factory so that code that uses it but doesn't need it, doesn't have to check against the nil value
c.TemporaryRootShardKcpSharedInformerFactory = kcpexternalversions.NewSharedInformerFactory(nil, resyncPeriod)

// The informers here are not used before the informers are actually started (i.e. no race).
nonIdentityKcpClusterClient, err := kcpclient.NewClusterForConfig(c.GenericConfig.LoopbackClientConfig) // can only used for wildcard requests of apis.kcp.dev
if err != nil {
return nil, err
}
c.identityConfig, c.resolveIdentities = boostrap.NewConfigWithWildcardIdentities(c.GenericConfig.LoopbackClientConfig, boostrap.KcpRootGroupExportNames, boostrap.KcpRootGroupResourceExportNames, nonIdentityKcpClusterClient.Cluster(tenancyv1alpha1.RootCluster))
}
c.identityConfig, c.resolveIdentities = boostrap.NewConfigWithWildcardIdentities(c.GenericConfig.LoopbackClientConfig, boostrap.KcpRootGroupExportNames, boostrap.KcpRootGroupResourceExportNames, nonIdentityKcpClusterClient.Cluster(tenancyv1alpha1.RootCluster))
c.KcpClusterClient, err = kcpclient.NewClusterForConfig(c.identityConfig) // this is now generic to be used for all kcp API groups
if err != nil {
return nil, err
Expand All @@ -183,9 +221,6 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) {
kcpexternalversions.WithExtraNamespaceScopedIndexers(indexers.NamespaceScoped()),
)

// create an empty non-functional factory so that code that uses it but doesn't need it, doesn't have to check against the nil value
c.TemporaryRootShardKcpSharedInformerFactory = kcpexternalversions.NewSharedInformerFactory(nil, resyncPeriod)

// Setup kube * informers
c.KubeClusterClient, err = kubernetes.NewClusterForConfig(c.GenericConfig.LoopbackClientConfig)
if err != nil {
Expand Down
81 changes: 67 additions & 14 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package server

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"time"

"github.com/kcp-dev/logicalcluster/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
Expand Down Expand Up @@ -157,7 +159,6 @@ func (s *Server) Run(ctx context.Context) error {
// nolint:nilerr
return nil // don't klog.Fatal. This only happens when context is cancelled.
}

klog.Infof("Finished starting APIExport and APIBinding informers")

if s.Options.Extra.ShardName == tenancyv1alpha1.RootShard {
Expand All @@ -172,24 +173,76 @@ func (s *Server) Run(ctx context.Context) error {
return nil // don't klog.Fatal. This only happens when context is cancelled.
}
klog.Infof("Bootstrapped root workspace phase 0")
}

klog.Infof("Getting kcp APIExport identities")
klog.Infof("Getting kcp APIExport identities")
if err := wait.PollImmediateInfiniteWithContext(goContext(ctx), time.Millisecond*500, func(ctx context.Context) (bool, error) {
if err := s.resolveIdentities(ctx); err != nil {
klog.V(3).Infof("failed to resolve identities, keeping trying: %v", err)
return false, nil
}
return true, nil
}); err != nil {
klog.Errorf("failed to get or create identities: %v", err)
// nolint:nilerr
return nil // don't klog.Fatal. This only happens when context is cancelled.
}
klog.Infof("Finished getting kcp APIExport identities")
} else if len(s.Options.Extra.RootShardKubeconfigFile) > 0 {
klog.Info("Starting setting up kcp informers for the root shard")

go s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().Run(ctx.StopCh)
go s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().Run(ctx.StopCh)

if err := wait.PollInfiniteWithContext(goContext(ctx), time.Millisecond*100, func(ctx context.Context) (bool, error) {
exportsSynced := s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIExports().Informer().HasSynced()
bindingsSynced := s.TemporaryRootShardKcpSharedInformerFactory.Apis().V1alpha1().APIBindings().Informer().HasSynced()
return exportsSynced && bindingsSynced, nil
}); err != nil {
klog.Errorf("failed to start APIExport and/or APIBinding informers for the root shard: %w", err)
// nolint:nilerr
return nil // don't klog.Fatal. This only happens when context is cancelled.
}
klog.Infof("Finished starting APIExport and APIBinding informers for the root shard")

klog.Infof("Getting kcp APIExport identities for the root shard")
if err := wait.PollImmediateInfiniteWithContext(goContext(ctx), time.Millisecond*500, func(ctx context.Context) (bool, error) {
if err := s.resolveIdentities(ctx); err != nil {
klog.V(3).Infof("failed to resolve identities for the root shard, keeping trying: %w", err)
return false, nil
}
return true, nil
}); err != nil {
klog.Errorf("failed to get or create identities for the root shard: %w", err)
// nolint:nilerr
return nil // don't klog.Fatal. This only happens when context is cancelled.
}

klog.Infof("Finished getting kcp APIExport identities for the root shard")

s.TemporaryRootShardKcpSharedInformerFactory.Start(ctx.StopCh)
s.TemporaryRootShardKcpSharedInformerFactory.WaitForCacheSync(ctx.StopCh)

if err := wait.PollImmediateInfiniteWithContext(goContext(ctx), time.Millisecond*500, func(ctx context.Context) (bool, error) {
if err := s.resolveIdentities(ctx); err != nil {
klog.V(3).Infof("failed to resolve identities, keeping trying: %v", err)
return false, nil
select {
case <-ctx.StopCh:
return nil // context closed, avoid reporting success below
default:
}
return true, nil
}); err != nil {
klog.Errorf("failed to get or create identities: %v", err)
// nolint:nilerr
return nil // don't klog.Fatal. This only happens when context is cancelled.
klog.Infof("Finished starting kcp informers for the root shard")

klog.Info("Creating ClusterWorkspaceShard resource in the root shard")
shard := &tenancyv1alpha1.ClusterWorkspaceShard{
ObjectMeta: metav1.ObjectMeta{Name: s.Options.Extra.ShardName},
Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{
BaseURL: fmt.Sprintf("https://%v", s.GenericConfig.ExternalAddress),
ExternalURL: fmt.Sprintf("https://%v", s.Options.Extra.ShardExternalURL),
},
}
if _, err := s.RootShardKcpClusterClient.Cluster(tenancyv1alpha1.RootCluster).TenancyV1alpha1().ClusterWorkspaceShards().Create(goContext(ctx), shard, metav1.CreateOptions{}); err != nil {
return err
}
klog.Info("Finished creating ClusterWorkspaceShard resource in the root shard")
}

klog.Infof("Finished getting kcp APIExport identities")

s.KcpSharedInformerFactory.Start(ctx.StopCh)
s.KcpSharedInformerFactory.WaitForCacheSync(ctx.StopCh)

Expand Down

0 comments on commit 64aa157

Please sign in to comment.