diff --git a/hack/logcheck.out b/hack/logcheck.out index 4f007ab1972..983b624b451 100644 --- a/hack/logcheck.out +++ b/hack/logcheck.out @@ -27,8 +27,8 @@ /pkg/admission/webhook/generic_webhook.go:143:3: function "Infof" should not be used, convert to contextual logging /pkg/admission/webhook/generic_webhook.go:143:3: function "V" should not be used, convert to contextual logging /pkg/authorization/delegated/authorizer.go:45:3: function "Errorf" should not be used, convert to contextual logging -/pkg/cliplugins/workload/plugin/sync.go:616:4: function "Infof" should not be used, convert to contextual logging -/pkg/cliplugins/workload/plugin/sync.go:616:4: function "V" should not be used, convert to contextual logging +/pkg/cliplugins/workload/plugin/sync.go:615:4: function "Infof" should not be used, convert to contextual logging +/pkg/cliplugins/workload/plugin/sync.go:615:4: function "V" should not be used, convert to contextual logging /pkg/dns/plugin/nsmap/namespace.go:68:2: function "Info" should not be used, convert to contextual logging /pkg/dns/plugin/nsmap/namespace.go:68:2: function "V" should not be used, convert to contextual logging /pkg/embeddedetcd/server.go:44:2: function "Info" should not be used, convert to contextual logging @@ -61,10 +61,10 @@ /pkg/informer/informer.go:595:4: function "V" should not be used, convert to contextual logging /pkg/informer/informer.go:597:4: function "InfoS" should not be used, convert to contextual logging /pkg/informer/informer.go:597:4: function "V" should not be used, convert to contextual logging +/pkg/informer/informer.go:927:4: function "InfoS" should not be used, convert to contextual logging +/pkg/informer/informer.go:927:4: function "V" should not be used, convert to contextual logging /pkg/informer/informer.go:929:4: function "InfoS" should not be used, convert to contextual logging /pkg/informer/informer.go:929:4: function "V" should not be used, convert to contextual logging -/pkg/informer/informer.go:931:4: function "InfoS" should not be used, convert to contextual logging -/pkg/informer/informer.go:931:4: function "V" should not be used, convert to contextual logging /pkg/logging/constants.go:100:9: Additional arguments to WithValues should always be Key Value pairs. Please check if there is any key or value missing. /pkg/logging/constants.go:52:9: Key positional arguments are expected to be inlined constant strings. Please replace ReconcilerKey provided with string value. /pkg/logging/constants.go:57:9: Key positional arguments are expected to be inlined constant strings. Please replace QueueKeyKey provided with string value. @@ -75,52 +75,52 @@ /pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_resource_reconcile.go:67:4: function "V" should not be used, convert to contextual logging /pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_metadata.go:80:5: Key positional arguments are expected to be inlined constant strings. Please replace &{tenancyv1alpha1 ExperimentalWorkspaceOwnerAnnotationKey} provided with string value. /pkg/reconciler/tenancy/workspace/workspace_reconcile_metadata.go:55:5: Key positional arguments are expected to be inlined constant strings. Please replace &{tenancyv1alpha1 ExperimentalWorkspaceOwnerAnnotationKey} provided with string value. -/pkg/reconciler/workload/resource/resource_controller.go:399:32: function "Enabled" should not be used, convert to contextual logging -/pkg/reconciler/workload/resource/resource_controller.go:399:32: function "V" should not be used, convert to contextual logging -/pkg/reconciler/workload/resource/resource_controller.go:399:8: function "Enabled" should not be used, convert to contextual logging -/pkg/reconciler/workload/resource/resource_controller.go:399:8: function "V" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:158:2: function "Infof" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:158:2: function "V" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:172:2: function "Infof" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:172:2: function "V" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:239:2: function "InfoS" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:240:8: function "InfoS" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:286:3: function "Errorf" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:290:2: function "Infof" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:345:3: function "Errorf" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:350:2: function "Infof" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:350:2: function "V" should not be used, convert to contextual logging -/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:352:3: function "Errorf" should not be used, convert to contextual logging +/pkg/reconciler/workload/resource/resource_controller.go:398:32: function "Enabled" should not be used, convert to contextual logging +/pkg/reconciler/workload/resource/resource_controller.go:398:32: function "V" should not be used, convert to contextual logging +/pkg/reconciler/workload/resource/resource_controller.go:398:8: function "Enabled" should not be used, convert to contextual logging +/pkg/reconciler/workload/resource/resource_controller.go:398:8: function "V" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:157:2: function "Infof" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:157:2: function "V" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:171:2: function "Infof" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:171:2: function "V" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:238:2: function "InfoS" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:239:8: function "InfoS" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:285:3: function "Errorf" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:289:2: function "Infof" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:344:3: function "Errorf" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:349:2: function "Infof" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:349:2: function "V" should not be used, convert to contextual logging +/pkg/reconciler/workload/synctargetexports/synctargetexports_controller.go:351:3: function "Errorf" should not be used, convert to contextual logging /pkg/reconciler/workload/synctargetexports/synctargetexports_reconcile.go:49:4: function "Infof" should not be used, convert to contextual logging /pkg/reconciler/workload/synctargetexports/synctargetexports_reconcile.go:49:4: function "V" should not be used, convert to contextual logging /pkg/reconciler/workload/synctargetexports/synctargetexports_reconcile.go:60:5: function "Warningf" should not be used, convert to contextual logging /pkg/server/localproxy.go:137:4: function "Infof" should not be used, convert to contextual logging /pkg/server/localproxy.go:150:4: function "Infof" should not be used, convert to contextual logging /pkg/server/options/controllers.go:54:3: function "Fatal" should not be used, convert to contextual logging -/pkg/syncer/endpoints/endpoint_downstream_controller.go:128:2: Additional arguments to Info should always be Key Value pairs. Please check if there is any key or value missing. -/pkg/syncer/endpoints/endpoint_downstream_controller.go:152:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NameKey} provided with string value. -/pkg/syncer/endpoints/endpoint_downstream_controller.go:152:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NamespaceKey} provided with string value. +/pkg/syncer/endpoints/endpoint_downstream_controller.go:129:2: Additional arguments to Info should always be Key Value pairs. Please check if there is any key or value missing. +/pkg/syncer/endpoints/endpoint_downstream_controller.go:153:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NameKey} provided with string value. +/pkg/syncer/endpoints/endpoint_downstream_controller.go:153:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NamespaceKey} provided with string value. /pkg/syncer/namespace/namespace_downstream_process.go:45:11: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamNamespace provided with string value. /pkg/syncer/namespace/namespace_downstream_process.go:89:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NamespaceKey} provided with string value. /pkg/syncer/namespace/namespace_downstream_process.go:89:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging WorkspaceKey} provided with string value. -/pkg/syncer/spec/spec_controller.go:182:15: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamName provided with string value. -/pkg/syncer/spec/spec_controller.go:182:15: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamNamespace provided with string value. +/pkg/syncer/spec/spec_controller.go:184:15: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamName provided with string value. +/pkg/syncer/spec/spec_controller.go:184:15: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamNamespace provided with string value. /pkg/syncer/spec/spec_process.go:118:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NameKey} provided with string value. /pkg/syncer/spec/spec_process.go:118:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NamespaceKey} provided with string value. /pkg/syncer/spec/spec_process.go:118:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging WorkspaceKey} provided with string value. /pkg/syncer/spec/spec_process.go:136:4: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamName provided with string value. /pkg/syncer/spec/spec_process.go:154:11: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamNamespace provided with string value. /pkg/syncer/spec/spec_process.go:391:11: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamName provided with string value. -/pkg/syncer/status/status_process.go:157:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NameKey} provided with string value. -/pkg/syncer/status/status_process.go:157:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NamespaceKey} provided with string value. -/pkg/syncer/status/status_process.go:157:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging WorkspaceKey} provided with string value. +/pkg/syncer/status/status_process.go:154:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NameKey} provided with string value. +/pkg/syncer/status/status_process.go:154:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging NamespaceKey} provided with string value. +/pkg/syncer/status/status_process.go:154:11: Key positional arguments are expected to be inlined constant strings. Please replace &{logging WorkspaceKey} provided with string value. /pkg/syncer/status/status_process.go:70:11: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamName provided with string value. /pkg/syncer/status/status_process.go:70:11: Key positional arguments are expected to be inlined constant strings. Please replace DownstreamNamespace provided with string value. -/pkg/syncer/syncer.go:101:11: Key positional arguments are expected to be inlined constant strings. Please replace SyncTargetName provided with string value. -/pkg/syncer/syncer.go:101:11: Key positional arguments are expected to be inlined constant strings. Please replace SyncTargetWorkspace provided with string value. -/pkg/syncer/syncer.go:212:11: Key positional arguments are expected to be inlined constant strings. Please replace SyncTargetKey provided with string value. -/pkg/tunneler/dialer.go:148:8: function "Infof" should not be used, convert to contextual logging -/pkg/tunneler/dialer.go:148:8: function "V" should not be used, convert to contextual logging +/pkg/syncer/syncer.go:196:11: Key positional arguments are expected to be inlined constant strings. Please replace SyncTargetKey provided with string value. +/pkg/syncer/syncer.go:85:11: Key positional arguments are expected to be inlined constant strings. Please replace SyncTargetName provided with string value. +/pkg/syncer/syncer.go:85:11: Key positional arguments are expected to be inlined constant strings. Please replace SyncTargetWorkspace provided with string value. +/pkg/tunneler/dialer.go:151:8: function "Infof" should not be used, convert to contextual logging +/pkg/tunneler/dialer.go:151:8: function "V" should not be used, convert to contextual logging /pkg/tunneler/listener.go:157:3: function "Infof" should not be used, convert to contextual logging /pkg/tunneler/listener.go:157:3: function "V" should not be used, convert to contextual logging /pkg/tunneler/listener.go:161:2: function "Infof" should not be used, convert to contextual logging @@ -131,20 +131,20 @@ /pkg/tunneler/listener.go:168:3: function "V" should not be used, convert to contextual logging /pkg/tunneler/listener.go:180:3: function "Infof" should not be used, convert to contextual logging /pkg/tunneler/listener.go:180:3: function "V" should not be used, convert to contextual logging -/pkg/tunneler/listener.go:211:2: function "Infof" should not be used, convert to contextual logging -/pkg/tunneler/listener.go:211:2: function "V" should not be used, convert to contextual logging +/pkg/tunneler/listener.go:210:2: function "Infof" should not be used, convert to contextual logging +/pkg/tunneler/listener.go:210:2: function "V" should not be used, convert to contextual logging /pkg/tunneler/listener.go:82:4: function "Infof" should not be used, convert to contextual logging /pkg/tunneler/listener.go:82:4: function "V" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:146:3: function "InfoS" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:146:3: function "V" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:178:5: function "Infof" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:178:5: function "V" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:182:4: function "Infof" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:182:4: function "V" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:195:4: function "Infof" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:195:4: function "V" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:230:4: function "Infof" should not be used, convert to contextual logging -/pkg/tunneler/tunnel.go:230:4: function "V" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:145:3: function "InfoS" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:145:3: function "V" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:177:5: function "Infof" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:177:5: function "V" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:181:4: function "Infof" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:181:4: function "V" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:194:4: function "Infof" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:194:4: function "V" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:229:4: function "Infof" should not be used, convert to contextual logging +/pkg/tunneler/tunnel.go:229:4: function "V" should not be used, convert to contextual logging /pkg/virtual/apiexport/builder/build.go:140:7: function "Errorf" should not be used, convert to contextual logging /pkg/virtual/framework/dynamic/apiserver/serving_info.go:185:3: function "Infof" should not be used, convert to contextual logging /pkg/virtual/framework/dynamic/apiserver/serving_info.go:185:3: function "V" should not be used, convert to contextual logging diff --git a/pkg/informer/informer.go b/pkg/informer/informer.go index 173dad8f047..1aa8268f65e 100644 --- a/pkg/informer/informer.go +++ b/pkg/informer/informer.go @@ -353,13 +353,13 @@ func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, Generi return inf } -// Listers returns a map of per-resource-type listers for all types that are +// Informers returns a map of per-resource-type generic informers for all types that are // known by this informer factory, and that are synced. // // If any informers aren't synced, their GVRs are returned so that they can be // checked and processed later. -func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Listers() (listers map[schema.GroupVersionResource]Lister, notSynced []schema.GroupVersionResource) { - listers = map[schema.GroupVersionResource]Lister{} +func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Informers() (informers map[schema.GroupVersionResource]GenericInformer, notSynced []schema.GroupVersionResource) { + informers = map[schema.GroupVersionResource]GenericInformer{} d.informersLock.RLock() defer d.informersLock.RUnlock() @@ -372,38 +372,10 @@ func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, Generi continue } - listers[gvr] = informer.Lister() + informers[gvr] = informer } - return listers, notSynced -} - -// Lister returns a lister for the given resource-type, whether it is -// known by this informer factory, and whether it is synced. -func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Lister(gvr schema.GroupVersionResource) (lister Lister, known, synced bool) { - d.informersLock.RLock() - defer d.informersLock.RUnlock() - - informer, ok := d.informers[gvr] - if !ok { - return lister, false, false - } - - return informer.Lister(), true, informer.Informer().HasSynced() -} - -// Informer returns an informer for the given resource-type if it exists, whether it is -// known by this informer factory, and whether it is synced. -func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Informer(gvr schema.GroupVersionResource) (informer Informer, known, synced bool) { - d.informersLock.RLock() - defer d.informersLock.RUnlock() - - genericInformer, ok := d.informers[gvr] - if !ok { - return informer, false, false - } - - return genericInformer.Informer(), true, genericInformer.Informer().HasSynced() + return informers, notSynced } // GVREventHandler is an event handler that includes the GroupVersionResource diff --git a/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_reconcile.go b/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_reconcile.go index acf307b9732..8c833cd5350 100644 --- a/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_reconcile.go +++ b/pkg/reconciler/apis/permissionclaimlabel/permissionclaimlabel_reconcile.go @@ -245,9 +245,9 @@ func claimFromSetKey(key string) apisv1alpha1.PermissionClaim { } func (c *controller) getInformerForGroupResource(group, resource string) (kcpkubernetesinformers.GenericClusterInformer, schema.GroupVersionResource, error) { - listers, _ := c.ddsif.Listers() + informers, _ := c.ddsif.Informers() - for gvr := range listers { + for gvr := range informers { if gvr.Group == group && gvr.Resource == resource { informer, err := c.ddsif.ForResource(gvr) // once we find one, return. diff --git a/pkg/reconciler/workload/resource/resource_controller.go b/pkg/reconciler/workload/resource/resource_controller.go index 6d6f2f5fd14..1e1fa3e220c 100644 --- a/pkg/reconciler/workload/resource/resource_controller.go +++ b/pkg/reconciler/workload/resource/resource_controller.go @@ -373,11 +373,11 @@ func (c *Controller) enqueueResourcesForNamespace(ns *corev1.Namespace) error { logger = logger.WithValues("nsLocations", nsLocations.List()) logger.V(4).Info("getting listers") - listers, notSynced := c.ddsif.Listers() + informers, notSynced := c.ddsif.Informers() var errs []error - for gvr, lister := range listers { + for gvr, informer := range informers { logger = logger.WithValues("gvr", gvr.String()) - objs, err := lister.ByCluster(clusterName).ByNamespace(ns.Name).List(labels.Everything()) + objs, err := informer.Lister().ByCluster(clusterName).ByNamespace(ns.Name).List(labels.Everything()) if err != nil { errs = append(errs, fmt.Errorf("error listing %q in %s|%s: %w", gvr, clusterName, ns.Name, err)) continue @@ -438,9 +438,9 @@ func (c *Controller) enqueueSyncTarget(obj interface{}) { func (c *Controller) enqueueSyncTargetKey(syncTargetKey string) { logger := logging.WithReconciler(klog.Background(), ControllerName).WithValues("syncTargetKey", syncTargetKey) - listers, _ := c.ddsif.Listers() + informers, _ := c.ddsif.Informers() queued := map[string]int{} - for gvr := range listers { + for gvr := range informers { inf, err := c.ddsif.ForResource(gvr) if err != nil { runtime.HandleError(err) diff --git a/pkg/syncer/controllermanager/controllermanager.go b/pkg/syncer/controllermanager/controllermanager.go index e34de3b6d02..fabab061137 100644 --- a/pkg/syncer/controllermanager/controllermanager.go +++ b/pkg/syncer/controllermanager/controllermanager.go @@ -28,34 +28,50 @@ import ( "k8s.io/klog/v2" "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/kcp/pkg/syncer/shared" ) const ( ControllerNamePrefix = "syncer-controller-manager-" ) +// InformerSource is a dynamic source of informers per GVR, +// which notifies when informers are added or removed for some GVR. +// It is implemented by the DynamicSharedInformerFactory (in fact by +// both the scoped or cluster-aware variants). type InformerSource struct { + // Subscribe registers for informer change notifications, returning a channel to which change notifications are sent. + // The id argument is the identifier of the subscriber, since there might be several subscribers subscribing + // to receive events from this InformerSource. Subscribe func(id string) <-chan struct{} - Informer func(gvr schema.GroupVersionResource) (informer cache.SharedIndexInformer, known, synced bool) -} -type Controller interface { - Start(ctx context.Context, numThreads int) + // Informers returns a map of per-resource-type SharedIndexInformers for all types that are + // known by this informer source, and that are synced. + // + // It also returns the list of informers that are known by this informer source, but sill not synced. + Informers func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) } -type ControllerDefintion struct { +// ManagedController defines a controller that should be managed by a ControllerManager, +// to be started when the required GVRs are supported, and stopped when the required GVRs +// are not supported anymore. +type ManagedController struct { RequiredGVRs []schema.GroupVersionResource - NumThreads int - Create func(syncedInformers map[schema.GroupVersionResource]cache.SharedIndexInformer) (Controller, error) + Create CreateControllerFunc } -func NewControllerManager(ctx context.Context, suffix string, informerSource InformerSource, controllers map[string]ControllerDefintion) *ControllerManager { +type StartControllerFunc func(ctx context.Context) +type CreateControllerFunc func(ctx context.Context) (StartControllerFunc, error) + +// NewControllerManager creates a new ControllerManager which will manage (create/start/stop) GVR-specific controllers according to informers +// available in the provided InformerSource. +func NewControllerManager(ctx context.Context, suffix string, informerSource InformerSource, controllers map[string]ManagedController) *ControllerManager { controllerManager := ControllerManager{ - name: ControllerNamePrefix + suffix, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerNamePrefix+suffix), - informerSource: informerSource, - controllerDefinitions: controllers, - startedControllers: map[string]context.CancelFunc{}, + name: ControllerNamePrefix + suffix, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerNamePrefix+suffix), + informerSource: informerSource, + managedControllers: controllers, + startedControllers: map[string]context.CancelFunc{}, } apisChanged := informerSource.Subscribe(controllerManager.name) @@ -77,12 +93,19 @@ func NewControllerManager(ctx context.Context, suffix string, informerSource Inf return &controllerManager } +// ControllerManager is a component that manages (create/start/stop) GVR-specific controllers according to available GVRs. +// It reacts to the changes of supported GVRs in a DiscoveringDynamicSharedInformerFactory +// (the GVRs for which an informer has been automatically created, started and synced), +// and starts / stops registered GVRs-specific controllers according to the GVRs they depend on. +// +// For example this allows starting PVC / PV controllers only when PVC / PV resources are exposed by the Syncer and UpSyncer +// virtual workspaces, and Informers for them have been started and synced by the corresponding ddsif. type ControllerManager struct { - name string - queue workqueue.RateLimitingInterface - informerSource InformerSource - controllerDefinitions map[string]ControllerDefintion - startedControllers map[string]context.CancelFunc + name string + queue workqueue.RateLimitingInterface + informerSource InformerSource + managedControllers map[string]ManagedController + startedControllers map[string]context.CancelFunc } // Start starts the controller, which stops when ctx.Done() is closed. @@ -110,79 +133,66 @@ func (c *ControllerManager) processNextWorkItem(ctx context.Context) bool { } defer c.queue.Done(key) - c.UpdateControllers(ctx) + c.process(ctx) c.queue.Forget(key) return true } -func (c *ControllerManager) UpdateControllers(ctx context.Context) { +func (c *ControllerManager) process(ctx context.Context) { logger := klog.FromContext(ctx) - controllersToStart := map[string]map[schema.GroupVersionResource]cache.SharedIndexInformer{} + controllersToStart := map[string]CreateControllerFunc{} + syncedInformers, notSynced := c.informerSource.Informers() controllerLoop: - for controllerName, controllerDefinition := range c.controllerDefinitions { - requiredGVRs := controllerDefinition.RequiredGVRs - informers := make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(requiredGVRs)) + for controllerName, managedController := range c.managedControllers { + requiredGVRs := managedController.RequiredGVRs for _, gvr := range requiredGVRs { - if informer, known, synced := c.informerSource.Informer(gvr); !known { - continue controllerLoop - } else if !synced { - logger.V(2).Info("waiting for the informer to be synced before starting controller", "gvr", gvr, "controller", controllerName) - c.queue.AddAfter("resync", time.Second) + informer := syncedInformers[gvr] + if informer == nil { + if shared.ContainsGVR(notSynced, gvr) { + logger.V(2).Info("waiting for the informer to be synced before starting controller", "gvr", gvr, "controller", controllerName) + c.queue.AddAfter("resync", time.Second) + continue controllerLoop + } + // The informer doesn't even exist for this GVR. + // Let's ignore this controller for now: one of the required GVRs has no informer started + // (because it has not been found on the SyncTarget in the supported resources to sync). + // If this required GVR is supported later on, the updateControllers() method will be called + // again after an API change notification comes through the informerSource. continue controllerLoop - } else { - informers[gvr] = informer } } - controllersToStart[controllerName] = informers + controllersToStart[controllerName] = managedController.Create + } + + // Remove obsolete controllers that don't have their required GVRs anymore + for controllerName, cancelFunc := range c.startedControllers { + if _, ok := controllersToStart[controllerName]; ok { + // The controller is still expected => don't remove it + continue + } + // The controller should not be running + // Stop it and remove it from the list of started controllers + cancelFunc() + delete(c.startedControllers, controllerName) } // Create and start missing controllers that have their required GVRs synced - newlyStartedControllers := map[string]context.CancelFunc{} - for controllerName, informers := range controllersToStart { + for controllerName, create := range controllersToStart { if _, ok := c.startedControllers[controllerName]; ok { // The controller is already started continue } - controllerDefinition, ok := c.controllerDefinitions[controllerName] - if !ok { - logger.V(2).Info("cannot find controller definition", "controller", controllerName) - continue - } // Create the controller - controller, err := controllerDefinition.Create(informers) + start, err := create(ctx) if err != nil { - logger.Error(err, "error creating controller", "controller", controllerName) + logger.Error(err, "failed creating controller", "controller", controllerName) continue } - for _, informer := range informers { - if err := informer.GetStore().Resync(); err != nil { - logger.Error(err, "error resyncing informer controller", "controller", controllerName) - continue - } - } - // Start the controller controllerContext, cancelFunc := context.WithCancel(ctx) - go controller.Start(controllerContext, controllerDefinition.NumThreads) - newlyStartedControllers[c.name] = cancelFunc - } - - // Remove obsolete controllers that don't have their required GVRs anymore - for controllerName, cancelFunc := range c.startedControllers { - if _, ok := controllersToStart[controllerName]; ok { - // The controller is still expected => don't remove it - continue - } - // The controller should not be running - // Stop it and remove it from the list of started controllers - cancelFunc() - delete(c.startedControllers, controllerName) - } - - // Add missing controllers that were created and started above - for controllerName, cancelFunc := range newlyStartedControllers { + go start(controllerContext) c.startedControllers[controllerName] = cancelFunc } } diff --git a/pkg/syncer/endpoints/endpoint_downstream_controller.go b/pkg/syncer/endpoints/endpoint_downstream_controller.go index a8a05341c1a..feaa94b69e4 100644 --- a/pkg/syncer/endpoints/endpoint_downstream_controller.go +++ b/pkg/syncer/endpoints/endpoint_downstream_controller.go @@ -22,10 +22,7 @@ import ( "fmt" "time" - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -42,64 +39,68 @@ const ( ControllerName = "syncer-endpoint-controller" ) +// NewEndpointController returns new controller which would annotate Endpoints related to synced Services, so that those Endpoints +// would be upsynced by the UpSyncer to the upstream KCP workspace. +// This would be useful to enable components such as a KNative controller (running against the KCP workspace) to see the Endpoint, +// and confirm that the related Service is effective. func NewEndpointController( - syncerLogger logr.Logger, downstreamClient dynamic.Interface, ddsifForDownstream *ddsif.GenericDiscoveringDynamicSharedInformerFactory[cache.SharedIndexInformer, cache.GenericLister, informers.GenericInformer], - syncedInformers map[schema.GroupVersionResource]cache.SharedIndexInformer, -) (*EndpointController, error) { +) (*controller, error) { endpointsGVR := corev1.SchemeGroupVersion.WithResource("endpoints") - c := &EndpointController{ + c := &controller{ queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), } - logger := logging.WithReconciler(syncerLogger, ControllerName) - - endpointsInformer, ok := syncedInformers[endpointsGVR] + informers, _ := ddsifForDownstream.Informers() + endpointsInformer, ok := informers[endpointsGVR] if !ok { return nil, errors.New("endpoints informer should be available") } - endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - c.AddToQueue(obj, logger) + c.enqueue(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { - c.AddToQueue(newObj, logger) + c.enqueue(newObj) }, DeleteFunc: func(obj interface{}) { - c.AddToQueue(obj, logger) + c.enqueue(obj) }, }) return c, nil } -type EndpointController struct { +type controller struct { queue workqueue.RateLimitingInterface } -func (c *EndpointController) AddToQueue(obj interface{}, logger logr.Logger) { +func (c *controller) enqueue(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) return } - logging.WithQueueKey(logger, key).V(2).Info("queueing", "key", key) + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) + logger.V(2).Info("queueing") c.queue.Add(key) } // Start starts N worker processes processing work items. -func (c *EndpointController) Start(ctx context.Context, numThreads int) { +func (c *controller) Start(ctx context.Context, numThreads int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() logger := logging.WithReconciler(klog.FromContext(ctx), ControllerName) ctx = klog.NewContext(ctx, logger) - logger.Info("Starting syncer workers") - defer logger.Info("Stopping syncer workers") + logger.Info("Starting controller") + defer func() { + logger.Info("Shutting down controller") + }() for i := 0; i < numThreads; i++ { go wait.UntilWithContext(ctx, c.startWorker, time.Second) @@ -109,12 +110,12 @@ func (c *EndpointController) Start(ctx context.Context, numThreads int) { } // startWorker processes work items until stopCh is closed. -func (c *EndpointController) startWorker(ctx context.Context) { +func (c *controller) startWorker(ctx context.Context) { for c.processNextWorkItem(ctx) { } } -func (c *EndpointController) processNextWorkItem(ctx context.Context) bool { +func (c *controller) processNextWorkItem(ctx context.Context) bool { // Wait until there is a new item in the working queue key, quit := c.queue.Get() if quit { @@ -142,7 +143,7 @@ func (c *EndpointController) processNextWorkItem(ctx context.Context) bool { return true } -func (c *EndpointController) process(ctx context.Context, key string) error { +func (c *controller) process(ctx context.Context, key string) error { logger := klog.FromContext(ctx) namespace, name, err := cache.SplitMetaNamespaceKey(key) diff --git a/pkg/syncer/namespace/namespace_downstream_controller.go b/pkg/syncer/namespace/namespace_downstream_controller.go index 88519721cfc..27f9a00a096 100644 --- a/pkg/syncer/namespace/namespace_downstream_controller.go +++ b/pkg/syncer/namespace/namespace_downstream_controller.go @@ -18,7 +18,6 @@ package namespace import ( "context" - "errors" "fmt" "sync" "time" @@ -99,11 +98,12 @@ func NewDownstreamController( return downstreamClient.Resource(namespaceGVR).Delete(ctx, namespace, metav1.DeleteOptions{}) }, upstreamNamespaceExists: func(clusterName logicalcluster.Name, upstreamNamespaceName string) (bool, error) { - lister, known, synced := ddsifForUpstreamSyncer.Lister(namespaceGVR) - if !known || !synced { - return false, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory") + informer, err := ddsifForUpstreamSyncer.ForResource(namespaceGVR) + if err != nil { + return false, err } - _, err := lister.ByCluster(clusterName).Get(upstreamNamespaceName) + + _, err = informer.Lister().ByCluster(clusterName).Get(upstreamNamespaceName) if apierrors.IsNotFound(err) { return false, nil } @@ -113,31 +113,31 @@ func NewDownstreamController( return true, nil }, getDownstreamNamespace: func(downstreamNamespaceName string) (runtime.Object, error) { - lister, known, synced := ddsifForDownstream.Lister(namespaceGVR) - if !known || !synced { - return nil, errors.New("informer should be up and synced for namespaces in the downstream informer factory") + informer, err := ddsifForDownstream.ForResource(namespaceGVR) + if err != nil { + return nil, err } - return lister.Get(downstreamNamespaceName) + return informer.Lister().Get(downstreamNamespaceName) }, listDownstreamNamespaces: func() ([]runtime.Object, error) { - lister, known, synced := ddsifForDownstream.Lister(namespaceGVR) - if !known || !synced { - return nil, errors.New("informer should be up and synced for namespaces in the downstream informer factory") + informer, err := ddsifForUpstreamSyncer.ForResource(namespaceGVR) + if err != nil { + return nil, err } - return lister.List(labels.Everything()) + return informer.Lister().List(labels.Everything()) }, isDowntreamNamespaceEmpty: func(ctx context.Context, namespace string) (bool, error) { - listers, notSynced := ddsifForDownstream.Listers() + informers, notSynced := ddsifForDownstream.Informers() if len(notSynced) > 0 { return false, fmt.Errorf("some informers are still not synced in the downstream informer factory") } - for gvr, lister := range listers { + for gvr, informer := range informers { // Skip namespaces. - if gvr.Group == "" && gvr.Version == "v1" && gvr.Resource == "namespaces" { + if gvr == namespaceGVR { continue } - list, err := lister.ByNamespace(namespace).List(labels.Everything()) + list, err := informer.Lister().ByNamespace(namespace).List(labels.Everything()) if err != nil { return false, err } diff --git a/pkg/syncer/resourcesync/controller.go b/pkg/syncer/resourcesync/controller.go index 8d1a152a922..4f3604e52cb 100644 --- a/pkg/syncer/resourcesync/controller.go +++ b/pkg/syncer/resourcesync/controller.go @@ -51,7 +51,7 @@ import ( conditionsv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/apis/conditions/v1alpha1" "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions" workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" - clientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" + workloadv1alpha1typed "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/workload/v1alpha1" workloadv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/workload/v1alpha1" workloadv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/workload/v1alpha1" "github.com/kcp-dev/kcp/pkg/informer" @@ -60,56 +60,38 @@ import ( const ( resyncPeriod = 10 * time.Hour - controllerName = "kcp-syncer-resourcesync-controller" + controllerName = "kcp-syncer-synctarget-gvrsource-controller" ) -var _ informer.GVRSource = (*Controller)(nil) - -// controller is a control loop that watches synctarget. It starts/stops spec syncer and status syncer -// per gvr based on synctarget.Status.SyncedResources. -// All the spec/status syncer share the same downstreamNSInformer and upstreamSecretInformer. Informers -// for gvr is started separated for each syncer. -type Controller struct { - queue workqueue.RateLimitingInterface - downstreamKubeClient kubernetes.Interface - - cachedDiscovery discovery.CachedDiscoveryInterface - - syncTargetUID types.UID - syncTargetLister workloadv1alpha1listers.SyncTargetLister - synctargetInformerCacheSync cache.InformerSynced - kcpClient clientset.Interface - - gvrs map[schema.GroupVersionResource]informer.GVRPartialMetadata - - mutex sync.RWMutex - - // Support subscribers that want to know when Synced GVRs have changed. - subscribersLock sync.Mutex - subscribers []chan<- struct{} -} - -func NewController( +// NewSyncTargetGVRSource returns a controller watching a [workloadv1alpha1.SyncTarget] and maintaining, +// from the information contained in the SyncTarget status, +// a list of the GVRs that should be watched and synced. +// +// It implements the [informer.GVRSource] interface to provide the GVRs to sync, as well as +// a way to subscribe to changes in the GVR list. +// It will be used to feed the various [informer.DiscoveringDynamicSharedInformerFactory] instances +// (one for downstream and 2 for upstream, for syncing and upsyncing). +func NewSyncTargetGVRSource( syncerLogger logr.Logger, - discoveryClient discovery.DiscoveryInterface, + upstreamSyncerDiscovery discovery.DiscoveryInterface, upstreamDynamicClusterClient kcpdynamic.ClusterInterface, downstreamDynamicClient dynamic.Interface, downstreamKubeClient kubernetes.Interface, - kcpClient clientset.Interface, + syncTargetClient workloadv1alpha1typed.SyncTargetInterface, syncTargetInformer workloadv1alpha1informers.SyncTargetInformer, syncTargetName string, syncTargetClusterName logicalcluster.Name, syncTargetUID types.UID, -) (*Controller, error) { - c := &Controller{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), - cachedDiscovery: memory.NewMemCacheClient(discoveryClient), - downstreamKubeClient: downstreamKubeClient, - kcpClient: kcpClient, - syncTargetUID: syncTargetUID, - syncTargetLister: syncTargetInformer.Lister(), - synctargetInformerCacheSync: syncTargetInformer.Informer().HasSynced, - gvrs: map[schema.GroupVersionResource]informer.GVRPartialMetadata{}, +) (*controller, error) { + c := &controller{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), + upstreamSyncerDiscoveryClient: memory.NewMemCacheClient(upstreamSyncerDiscovery), + downstreamKubeClient: downstreamKubeClient, + syncTargetClient: syncTargetClient, + syncTargetUID: syncTargetUID, + syncTargetLister: syncTargetInformer.Lister(), + synctargetInformerHasSynced: syncTargetInformer.Informer().HasSynced, + gvrsToWatch: map[schema.GroupVersionResource]informer.GVRPartialMetadata{}, } logger := logging.WithReconciler(syncerLogger, controllerName) @@ -136,7 +118,75 @@ func NewController( return c, nil } -func (c *Controller) enqueueSyncTarget(obj interface{}, logger logr.Logger) { +var _ informer.GVRSource = (*controller)(nil) + +// controller is a control loop that watches synctarget. It starts/stops spec syncer and status syncer +// per gvr based on synctarget.Status.SyncedResources. +// All the spec/status syncer share the same downstreamNSInformer and upstreamSecretInformer. Informers +// for gvr is started separated for each syncer. +type controller struct { + queue workqueue.RateLimitingInterface + downstreamKubeClient kubernetes.Interface + + upstreamSyncerDiscoveryClient discovery.CachedDiscoveryInterface + + syncTargetUID types.UID + syncTargetLister workloadv1alpha1listers.SyncTargetLister + synctargetInformerHasSynced cache.InformerSynced + syncTargetClient workloadv1alpha1typed.SyncTargetInterface + + gvrsToWatchLock sync.RWMutex + gvrsToWatch map[schema.GroupVersionResource]informer.GVRPartialMetadata + + // Support subscribers that want to know when Synced GVRs have changed. + subscribersLock sync.Mutex + subscribers []chan<- struct{} +} + +// GVRs returns the required metadata (scope, kind, singular name) about all GVRs that should be synced. +// It implements [informer.GVRSource.GVRs]. +func (c *controller) GVRs() map[schema.GroupVersionResource]informer.GVRPartialMetadata { + c.gvrsToWatchLock.RLock() + defer c.gvrsToWatchLock.RUnlock() + + gvrs := make(map[schema.GroupVersionResource]informer.GVRPartialMetadata, len(c.gvrsToWatch)+len(builtinGVRs)+1) + gvrs[corev1.SchemeGroupVersion.WithResource("namespaces")] = informer.GVRPartialMetadata{ + Scope: apiextensionsv1.ClusterScoped, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Singular: "namespace", + Kind: "Namespace", + }, + } + for key, value := range builtinGVRs { + gvrs[key] = value + } + for key, value := range c.gvrsToWatch { + gvrs[key] = value + } + return gvrs +} + +// Ready returns true if the controller is ready to return the GVRs to sync. +// It implements [informer.GVRSource.Ready]. +func (c *controller) Ready() bool { + return c.synctargetInformerHasSynced() +} + +// Subscribe returns a new channel to which the controller writes whenever +// its list of GVRs has changed. +// It implements [informer.GVRSource.Subscribe]. +func (c *controller) Subscribe() <-chan struct{} { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + + // Use a buffered channel so we can always send at least 1, regardless of consumer status. + changes := make(chan struct{}, 1) + c.subscribers = append(c.subscribers, changes) + + return changes +} + +func (c *controller) enqueueSyncTarget(obj interface{}, logger logr.Logger) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) @@ -149,7 +199,7 @@ func (c *Controller) enqueueSyncTarget(obj interface{}, logger logr.Logger) { } // Start starts the controller workers. -func (c *Controller) Start(ctx context.Context, numThreads int) { +func (c *controller) Start(ctx context.Context, numThreads int) { defer runtime.HandleCrash() defer c.queue.ShutDown() @@ -165,12 +215,12 @@ func (c *Controller) Start(ctx context.Context, numThreads int) { <-ctx.Done() } -func (c *Controller) startWorker(ctx context.Context) { +func (c *controller) startWorker(ctx context.Context) { for c.processNextWorkItem(ctx) { } } -func (c *Controller) processNextWorkItem(ctx context.Context) bool { +func (c *controller) processNextWorkItem(ctx context.Context) bool { // Wait until there is a new item in the working queue k, quit := c.queue.Get() if quit { @@ -196,7 +246,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { return true } -func (c *Controller) process(ctx context.Context, key string) error { +func (c *controller) process(ctx context.Context, key string) error { logger := klog.FromContext(ctx) _, name, err := cache.SplitMetaNamespaceKey(key) @@ -207,7 +257,10 @@ func (c *Controller) process(ctx context.Context, key string) error { syncTarget, err := c.syncTargetLister.Get(name) if apierrors.IsNotFound(err) { - return c.removeUnusedGVRs(ctx, map[schema.GroupVersionResource]bool{}) + if updated := c.removeUnusedGVRs(ctx, map[schema.GroupVersionResource]bool{}); updated { + c.notifySubscribers(ctx) + } + return nil } if err != nil { @@ -220,10 +273,11 @@ func (c *Controller) process(ctx context.Context, key string) error { requiredGVRs := getAllGVRs(syncTarget) - c.cachedDiscovery.Invalidate() + c.upstreamSyncerDiscoveryClient.Invalidate() var errs []error var unauthorizedGVRs []string + notify := false for gvr := range requiredGVRs { logger := logger.WithValues("gvr", gvr.String()) ctx := klog.NewContext(ctx, logger) @@ -243,13 +297,19 @@ func (c *Controller) process(ctx context.Context, key string) error { continue } - if err := c.addGVR(ctx, gvr, syncTarget); err != nil { + if updated, err := c.addGVR(ctx, gvr); err != nil { return err + } else if updated { + notify = true } } - if err := c.removeUnusedGVRs(ctx, requiredGVRs); err != nil { - return err + if updated := c.removeUnusedGVRs(ctx, requiredGVRs); updated { + notify = true + } + + if notify { + c.notifySubscribers(ctx) } newSyncTarget := syncTarget.DeepCopy() @@ -273,7 +333,7 @@ func (c *Controller) process(ctx context.Context, key string) error { return errors.NewAggregate(errs) } -func (c *Controller) patchSyncTargetCondition(ctx context.Context, new, old *workloadv1alpha1.SyncTarget) error { +func (c *controller) patchSyncTargetCondition(ctx context.Context, new, old *workloadv1alpha1.SyncTarget) error { logger := klog.FromContext(ctx) // If the object being reconciled changed as a result, update it. if equality.Semantic.DeepEqual(old.Status.Conditions, new.Status.Conditions) { @@ -306,11 +366,11 @@ func (c *Controller) patchSyncTargetCondition(ctx context.Context, new, old *wor return fmt.Errorf("failed to create patch for syncTarget %s: %w", new.Name, err) } logger.V(2).Info("patching syncTarget", "patch", string(patchBytes)) - _, uerr := c.kcpClient.WorkloadV1alpha1().SyncTargets().Patch(ctx, new.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, uerr := c.syncTargetClient.Patch(ctx, new.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") return uerr } -func (c *Controller) checkSSAR(ctx context.Context, gvr schema.GroupVersionResource) (bool, error) { +func (c *controller) checkSSAR(ctx context.Context, gvr schema.GroupVersionResource) (bool, error) { ssar := &authorizationv1.SelfSubjectAccessReview{ Spec: authorizationv1.SelfSubjectAccessReviewSpec{ ResourceAttributes: &authorizationv1.ResourceAttributes{ @@ -330,51 +390,49 @@ func (c *Controller) checkSSAR(ctx context.Context, gvr schema.GroupVersionResou return sar.Status.Allowed, nil } -// removeUnusedGVRs stop syncers for gvrs not in requiredGVRs. -func (c *Controller) removeUnusedGVRs(ctx context.Context, requiredGVRs map[schema.GroupVersionResource]bool) error { +// removeUnusedGVRs removes the GVRs which are not required anymore, and return `true` if GVRs were updated. +func (c *controller) removeUnusedGVRs(ctx context.Context, requiredGVRs map[schema.GroupVersionResource]bool) bool { logger := klog.FromContext(ctx) - c.mutex.Lock() - defer c.mutex.Unlock() + c.gvrsToWatchLock.Lock() + defer c.gvrsToWatchLock.Unlock() updated := false - for gvr := range c.gvrs { + for gvr := range c.gvrsToWatch { if _, ok := requiredGVRs[gvr]; !ok { logger.WithValues("gvr", gvr.String()).V(2).Info("Stop syncer for gvr") - delete(c.gvrs, gvr) + delete(c.gvrsToWatch, gvr) updated = true } } - - if updated { - c.notifySubscribers(ctx) - } - return nil + return updated } -func (c *Controller) GVRs() map[schema.GroupVersionResource]informer.GVRPartialMetadata { - c.mutex.RLock() - defer c.mutex.RUnlock() +// addGVR adds the given GVR if it isn't already in the list, and returns `true` if the GVR was added, +// `false` if it was already there. +func (c *controller) addGVR(ctx context.Context, gvr schema.GroupVersionResource) (bool, error) { + logger := klog.FromContext(ctx) - gvrs := make(map[schema.GroupVersionResource]informer.GVRPartialMetadata, len(c.gvrs)+len(builtinGVRs)+1) - gvrs[corev1.SchemeGroupVersion.WithResource("namespaces")] = informer.GVRPartialMetadata{ - Scope: apiextensionsv1.ClusterScoped, - Names: apiextensionsv1.CustomResourceDefinitionNames{ - Singular: "namespace", - Kind: "Namespace", - }, - } - for key, value := range builtinGVRs { - gvrs[key] = value + c.gvrsToWatchLock.Lock() + defer c.gvrsToWatchLock.Unlock() + + if _, ok := c.gvrsToWatch[gvr]; ok { + logger.V(2).Info("Informer is started already") + return false, nil } - for key, value := range c.gvrs { - gvrs[key] = value + + partialMetadata, err := c.getGVRPartialMetadata(gvr) + if err != nil { + return false, err } - return gvrs + + c.gvrsToWatch[gvr] = *partialMetadata + + return true, nil } -func (c *Controller) getGVRPartialMetadata(gvr schema.GroupVersionResource) (*informer.GVRPartialMetadata, error) { - apiResourceList, err := c.cachedDiscovery.ServerResourcesForGroupVersion(gvr.GroupVersion().String()) +func (c *controller) getGVRPartialMetadata(gvr schema.GroupVersionResource) (*informer.GVRPartialMetadata, error) { + apiResourceList, err := c.upstreamSyncerDiscoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String()) if err != nil { return nil, err } @@ -400,22 +458,7 @@ func (c *Controller) getGVRPartialMetadata(gvr schema.GroupVersionResource) (*in return nil, fmt.Errorf("unable to retrieve discovery for GVR: %s", gvr) } -func (c *Controller) Ready() bool { - return c.synctargetInformerCacheSync() -} - -func (c *Controller) Subscribe() <-chan struct{} { - c.subscribersLock.Lock() - defer c.subscribersLock.Unlock() - - // Use a buffered channel so we can always send at least 1, regardless of consumer status. - changes := make(chan struct{}, 1) - c.subscribers = append(c.subscribers, changes) - - return changes -} - -func (c *Controller) notifySubscribers(ctx context.Context) { +func (c *controller) notifySubscribers(ctx context.Context) { logger := klog.FromContext(ctx) c.subscribersLock.Lock() @@ -432,28 +475,6 @@ func (c *Controller) notifySubscribers(ctx context.Context) { } } -func (c *Controller) addGVR(ctx context.Context, gvr schema.GroupVersionResource, syncTarget *workloadv1alpha1.SyncTarget) error { - logger := klog.FromContext(ctx) - - c.mutex.Lock() - defer c.mutex.Unlock() - - if _, ok := c.gvrs[gvr]; ok { - logger.V(2).Info("Informer is started already") - return nil - } - - partialMetadata, err := c.getGVRPartialMetadata(gvr) - if err != nil { - return err - } - - c.gvrs[gvr] = *partialMetadata - - c.notifySubscribers(ctx) - return nil -} - var builtinGVRs = map[schema.GroupVersionResource]informer.GVRPartialMetadata{ { Version: "v1", diff --git a/pkg/syncer/shared/helpers.go b/pkg/syncer/shared/helpers.go index 2800013d786..79aafe80bff 100644 --- a/pkg/syncer/shared/helpers.go +++ b/pkg/syncer/shared/helpers.go @@ -71,3 +71,12 @@ func GetDNSID(clusterName logicalcluster.Name, syncTargetUID types.UID, syncTarg return fmt.Sprintf("kcp-dns-%s-%s-%s", syncTargetName, uid36hash[:8], workspace36hash[:8]) } + +func ContainsGVR(gvrs []schema.GroupVersionResource, gvr schema.GroupVersionResource) bool { + for _, item := range gvrs { + if gvr == item { + return true + } + } + return false +} diff --git a/pkg/syncer/shared/namespace.go b/pkg/syncer/shared/namespace.go index 751eb21c342..03450507b4a 100644 --- a/pkg/syncer/shared/namespace.go +++ b/pkg/syncer/shared/namespace.go @@ -25,7 +25,6 @@ import ( "github.com/kcp-dev/logicalcluster/v3" "github.com/martinlindhe/base36" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ) @@ -106,7 +105,3 @@ func PhysicalClusterNamespaceName(l NamespaceLocator) (string, error) { // keep the namespaces short enough. return fmt.Sprintf("kcp-%s", base36hash[:12]), nil } - -func IsNamespace(gvr schema.GroupVersionResource) bool { - return gvr.Resource == "namespaces" && gvr.Group == "" && gvr.Version == "v1" -} diff --git a/pkg/syncer/spec/spec_controller.go b/pkg/syncer/spec/spec_controller.go index a8373a1d892..33bf5641702 100644 --- a/pkg/syncer/spec/spec_controller.go +++ b/pkg/syncer/spec/spec_controller.go @@ -46,9 +46,10 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "github.com/kcp-dev/kcp/pkg/indexers" ddsif "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" - "github.com/kcp-dev/kcp/pkg/syncer/indexers" + syncerindexers "github.com/kcp-dev/kcp/pkg/syncer/indexers" "github.com/kcp-dev/kcp/pkg/syncer/shared" "github.com/kcp-dev/kcp/pkg/syncer/spec/dns" specmutators "github.com/kcp-dev/kcp/pkg/syncer/spec/mutators" @@ -72,7 +73,7 @@ type Controller struct { getUpstreamLister func(gvr schema.GroupVersionResource) (kcpcache.GenericClusterLister, error) getDownstreamLister func(gvr schema.GroupVersionResource) (cache.GenericLister, error) - listDownstreamNamespacesByLocator func(jsonLocator string) ([]interface{}, error) + listDownstreamNamespacesByLocator func(jsonLocator string) ([]*unstructured.Unstructured, error) downstreamNSCleaner shared.Cleaner syncTargetName string @@ -100,32 +101,34 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste downstreamNSCleaner: downstreamNSCleaner, getDownstreamLister: func(gvr schema.GroupVersionResource) (cache.GenericLister, error) { - lister, known, synced := ddsifForDownstream.Lister(gvr) - if !known { + informers, notSynced := ddsifForDownstream.Informers() + informer, ok := informers[gvr] + if !ok { + if shared.ContainsGVR(notSynced, gvr) { + return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr) + } return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr) } - if !synced { - return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr) - } - return lister, nil + return informer.Lister(), nil }, getUpstreamLister: func(gvr schema.GroupVersionResource) (kcpcache.GenericClusterLister, error) { - lister, known, synced := ddsifForUpstreamSyncer.Lister(gvr) - if !known { + informers, notSynced := ddsifForUpstreamSyncer.Informers() + informer, ok := informers[gvr] + if !ok { + if shared.ContainsGVR(notSynced, gvr) { + return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr) + } return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr) } - if !synced { - return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr) - } - return lister, nil + return informer.Lister(), nil }, - listDownstreamNamespacesByLocator: func(jsonLocator string) ([]interface{}, error) { + listDownstreamNamespacesByLocator: func(jsonLocator string) ([]*unstructured.Unstructured, error) { nsInformer, err := ddsifForDownstream.ForResource(namespaceGVR) if err != nil { return nil, err } - return nsInformer.Informer().GetIndexer().ByIndex(indexers.ByNamespaceLocatorIndexName, jsonLocator) + return indexers.ByIndex[*unstructured.Unstructured](nsInformer.Informer().GetIndexer(), syncerindexers.ByNamespaceLocatorIndexName, jsonLocator) }, syncTargetName: syncTargetName, syncTargetClusterName: syncTargetClusterName, @@ -136,16 +139,18 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste logger := logging.WithReconciler(syncerLogger, controllerName) + namespaceGVR := corev1.SchemeGroupVersion.WithResource("namespaces") + ddsifForUpstreamSyncer.AddEventHandler( ddsif.GVREventHandlerFuncs{ AddFunc: func(gvr schema.GroupVersionResource, obj interface{}) { - if shared.IsNamespace(gvr) { + if gvr == namespaceGVR { return } c.AddToQueue(gvr, obj, logger) }, UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) { - if shared.IsNamespace(gvr) { + if gvr == namespaceGVR { return } oldUnstrob := oldObj.(*unstructured.Unstructured) @@ -156,7 +161,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste } }, DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) { - if shared.IsNamespace(gvr) { + if gvr == namespaceGVR { return } c.AddToQueue(gvr, obj, logger) @@ -167,7 +172,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste ddsifForDownstream.AddEventHandler( ddsif.GVREventHandlerFuncs{ DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) { - if shared.IsNamespace(gvr) { + if gvr == namespaceGVR { return } key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) @@ -187,9 +192,9 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste // Handle namespaced resources if namespace != "" { // Use namespace lister - namespaceLister, known, synced := ddsifForDownstream.Lister(namespaceGVR) - if !known || !synced { - utilruntime.HandleError(errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory")) + namespaceLister, err := c.getDownstreamLister(namespaceGVR) + if err != nil { + utilruntime.HandleError(err) return } @@ -245,8 +250,8 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste dnsServiceLister := syncerNamespaceInformerFactory.Core().V1().Services().Lister() deploymentMutator := specmutators.NewDeploymentMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) { - secretLister, known, synced := ddsifForUpstreamSyncer.Lister(corev1.SchemeGroupVersion.WithResource("secrets")) - if !known || !synced { + secretLister, err := c.getUpstreamLister(corev1.SchemeGroupVersion.WithResource("secrets")) + if err != nil { return nil, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory") } return secretLister.ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything()) diff --git a/pkg/syncer/spec/spec_process.go b/pkg/syncer/spec/spec_process.go index 7f680f3a059..a7b52b7a724 100644 --- a/pkg/syncer/spec/spec_process.go +++ b/pkg/syncer/spec/spec_process.go @@ -132,14 +132,14 @@ func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResourc } if len(downstreamNamespaces) == 1 { - namespace := downstreamNamespaces[0].(*unstructured.Unstructured) + namespace := downstreamNamespaces[0] logger.WithValues(DownstreamName, namespace.GetName()).V(4).Info("Found downstream namespace for upstream namespace") downstreamNamespace = namespace.GetName() } else if len(downstreamNamespaces) > 1 { // This should never happen unless there's some namespace collision. var namespacesCollisions []string for _, namespace := range downstreamNamespaces { - namespacesCollisions = append(namespacesCollisions, namespace.(*unstructured.Unstructured).GetName()) + namespacesCollisions = append(namespacesCollisions, namespace.GetName()) } return nil, fmt.Errorf("(namespace collision) found multiple downstream namespaces: %s for upstream namespace %s|%s", strings.Join(namespacesCollisions, ","), clusterName, upstreamNamespace) } else { @@ -264,12 +264,12 @@ func (c *Controller) ensureDownstreamNamespaceExists(ctx context.Context, downst }) } - // Check if the namespace already exists, if not create it. namespaceLister, err := c.getDownstreamLister(namespaceGVR) if err != nil { return err } + // Check if the namespace already exists, if not create it. namespace, err := namespaceLister.Get(newNamespace.GetName()) if apierrors.IsNotFound(err) { _, err = namespaces.Create(ctx, newNamespace, metav1.CreateOptions{}) diff --git a/pkg/syncer/spec/spec_process_test.go b/pkg/syncer/spec/spec_process_test.go index 5f8ed3ffad2..0390e3a8f35 100644 --- a/pkg/syncer/spec/spec_process_test.go +++ b/pkg/syncer/spec/spec_process_test.go @@ -1179,13 +1179,13 @@ func TestSpecSyncerProcess(t *testing.T) { <-downstreamDDSIFUpdated t.Logf("%s: downstream ddsif synced", t.Name()) - _, unsynced := ddsifForUpstreamSyncer.Listers() + _, unsynced := ddsifForUpstreamSyncer.Informers() for _, gvr := range unsynced { informer, _ := ddsifForUpstreamSyncer.ForResource(gvr) cache.WaitForCacheSync(ctx.Done(), informer.Informer().HasSynced) t.Logf("%s: upstream ddsif informer synced for gvr %s", t.Name(), gvr.String()) } - _, unsynced = ddsifForDownstream.Listers() + _, unsynced = ddsifForDownstream.Informers() for _, gvr := range unsynced { informer, _ := ddsifForDownstream.ForResource(gvr) cache.WaitForCacheSync(ctx.Done(), informer.Informer().HasSynced) diff --git a/pkg/syncer/status/status_controller.go b/pkg/syncer/status/status_controller.go index 1194a5baa15..bd84dad4f53 100644 --- a/pkg/syncer/status/status_controller.go +++ b/pkg/syncer/status/status_controller.go @@ -26,6 +26,7 @@ import ( kcpdynamic "github.com/kcp-dev/client-go/dynamic" "github.com/kcp-dev/logicalcluster/v3" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -76,24 +77,26 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus downstreamClient: downstreamClient, getDownstreamLister: func(gvr schema.GroupVersionResource) (cache.GenericLister, error) { - lister, known, synced := ddsifForDownstream.Lister(gvr) - if !known { + informers, notSynced := ddsifForDownstream.Informers() + informer, ok := informers[gvr] + if !ok { + if shared.ContainsGVR(notSynced, gvr) { + return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory - should retry", gvr) + } return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr) } - if !synced { - return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory - should retry", gvr) - } - return lister, nil + return informer.Lister(), nil }, getUpstreamLister: func(gvr schema.GroupVersionResource) (kcpcache.GenericClusterLister, error) { - lister, known, synced := ddsifForUpstreamSyncer.Lister(gvr) - if !known { + informers, notSynced := ddsifForUpstreamSyncer.Informers() + informer, ok := informers[gvr] + if !ok { + if shared.ContainsGVR(notSynced, gvr) { + return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory - should retry", gvr) + } return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr) } - if !synced { - return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory - should retry", gvr) - } - return lister, nil + return informer.Lister(), nil }, syncTargetName: syncTargetName, @@ -105,16 +108,18 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus logger := logging.WithReconciler(syncerLogger, controllerName) + namespaceGVR := corev1.SchemeGroupVersion.WithResource("namespaces") + ddsifForDownstream.AddEventHandler( ddsif.GVREventHandlerFuncs{ AddFunc: func(gvr schema.GroupVersionResource, obj interface{}) { - if shared.IsNamespace(gvr) { + if gvr == namespaceGVR { return } c.AddToQueue(gvr, obj, logger) }, UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) { - if shared.IsNamespace(gvr) { + if gvr == namespaceGVR { return } oldUnstrob := oldObj.(*unstructured.Unstructured) @@ -125,7 +130,7 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus } }, DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) { - if shared.IsNamespace(gvr) { + if gvr == namespaceGVR { return } c.AddToQueue(gvr, obj, logger) diff --git a/pkg/syncer/status/status_process.go b/pkg/syncer/status/status_process.go index a34fd57bce8..6166a2c5d2f 100644 --- a/pkg/syncer/status/status_process.go +++ b/pkg/syncer/status/status_process.go @@ -30,7 +30,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -69,27 +68,15 @@ func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResourc logger = logger.WithValues(DownstreamNamespace, downstreamNamespace, DownstreamName, downstreamName) - // get the downstream object downstreamLister, err := c.getDownstreamLister(gvr) if err != nil { return err } - var resourceExists bool - var downstreamObj runtime.Object - - // to upstream var namespaceLocator *shared.NamespaceLocator var locatorExists bool if downstreamNamespace != "" { - downstreamObj, err = downstreamLister.ByNamespace(downstreamNamespace).Get(downstreamName) - if err == nil { - resourceExists = true - } else if !apierrors.IsNotFound(err) { - return err - } - downstreamNamespaceLister, err := c.getDownstreamLister(namespaceGVR) if err != nil { return err @@ -116,22 +103,27 @@ func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResourc // that syncers for multiple logical clusters can coexist. return nil } - } else { - downstreamObj, err = downstreamLister.Get(downstreamName) - if err == nil { - resourceExists = true - } else if !apierrors.IsNotFound(err) { - return err - } else { + } + + var resourceExists bool + obj, err := downstreamLister.ByNamespace(downstreamNamespace).Get(downstreamName) + if err == nil { + resourceExists = true + } else if !apierrors.IsNotFound(err) { + return err + } + + if downstreamNamespace == "" { + if !resourceExists { // TODO(davidfestal): The downstream object doesn't exist, but we cannot remove the finalizer // on the upstream resource since we dont have the locator to locate the upstream resource. // That should be fixed. return nil } - objMeta, ok := downstreamObj.(metav1.Object) + objMeta, ok := obj.(metav1.Object) if !ok { - logger.Info(fmt.Sprintf("Error: downstream cluster-wide resource expected to be metav1.Object, got %T", downstreamObj)) + logger.Info(fmt.Sprintf("Error: downstream cluster-wide resource expected to be metav1.Object, got %T", obj)) return nil } namespaceLocator, locatorExists, err = shared.LocatorFromAnnotations(objMeta.GetAnnotations()) @@ -164,16 +156,13 @@ func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResourc if !resourceExists { logger.Info("Downstream object does not exist. Removing finalizer on upstream object") - if err != nil { - return err - } return shared.EnsureUpstreamFinalizerRemoved(ctx, gvr, upstreamLister, c.upstreamClient, upstreamNamespace, c.syncTargetKey, upstreamClusterName, shared.GetUpstreamResourceName(gvr, downstreamName)) } // update upstream status - u, ok := downstreamObj.(*unstructured.Unstructured) + u, ok := obj.(*unstructured.Unstructured) if !ok { - return fmt.Errorf("object to synchronize is expected to be Unstructured, but is %T", downstreamObj) + return fmt.Errorf("object to synchronize is expected to be Unstructured, but is %T", obj) } return c.updateStatusInUpstream(ctx, gvr, upstreamLister, upstreamNamespace, upstreamName, upstreamClusterName, u) } diff --git a/pkg/syncer/status/status_process_test.go b/pkg/syncer/status/status_process_test.go index b946620ca75..0803ea6d777 100644 --- a/pkg/syncer/status/status_process_test.go +++ b/pkg/syncer/status/status_process_test.go @@ -636,13 +636,13 @@ func TestStatusSyncerProcess(t *testing.T) { <-downstreamDDSIFUpdated t.Logf("%s: downstream ddsif synced", t.Name()) - _, unsynced := ddsifForUpstreamSyncer.Listers() + _, unsynced := ddsifForUpstreamSyncer.Informers() for _, gvr := range unsynced { informer, _ := ddsifForUpstreamSyncer.ForResource(gvr) cache.WaitForCacheSync(ctx.Done(), informer.Informer().HasSynced) t.Logf("%s: upstream ddsif informer synced for gvr %s", t.Name(), gvr.String()) } - _, unsynced = ddsifForDownstream.Listers() + _, unsynced = ddsifForDownstream.Informers() for _, gvr := range unsynced { informer, _ := ddsifForDownstream.ForResource(gvr) cache.WaitForCacheSync(ctx.Done(), informer.Informer().HasSynced) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index d471b07b6f2..95ff8f616ef 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/url" - "strings" "time" kcpdiscovery "github.com/kcp-dev/client-go/discovery" @@ -66,23 +65,6 @@ const ( heartbeatInterval = 20 * time.Second ) -type FilteredGVRSource struct { - ddsif.GVRSource - skipGVR func(gvr schema.GroupVersionResource) bool -} - -func (s *FilteredGVRSource) GVRs() map[schema.GroupVersionResource]ddsif.GVRPartialMetadata { - gvrs := s.GVRSource.GVRs() - filteredGVRs := make(map[schema.GroupVersionResource]ddsif.GVRPartialMetadata, len(gvrs)) - for gvr, metadata := range gvrs { - if s.skipGVR(gvr) { - continue - } - filteredGVRs[gvr] = metadata - } - return filteredGVRs -} - // SyncerConfig defines the syncer configuration that is guaranteed to // vary across syncer deployments. Capturing these details in a struct // simplifies defining these details in test fixture. @@ -124,6 +106,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i // TODO(david): But the complete implementation should setup a SyncTarget informer, create spec and status syncer for every URLs found in the // TODO(david): Status.SyncerVirtualWorkspaceURLs slice, and update them each time this list changes. var syncerVirtualWorkspaceURL string + var upsyncerVirtualWorkspaceURL string // TODO(david): we need to provide user-facing details if this polling goes on forever. Blocking here is a bad UX. // TODO(david): Also, any regressions in our code will make any e2e test that starts a syncer (at least in-process) // TODO(david): block until it hits the 10 minute overall test timeout. @@ -150,6 +133,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i logger.Error(fmt.Errorf("SyncTarget should not have several Syncer virtual workspace URLs: not supported for now, ignoring additional URLs"), "error processing SyncTarget") } syncerVirtualWorkspaceURL = syncTarget.Status.VirtualWorkspaces[0].SyncerURL + upsyncerVirtualWorkspaceURL = syncTarget.Status.VirtualWorkspaces[0].UpsyncerURL return true, nil }) if err != nil { @@ -158,17 +142,15 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i upstreamConfig := rest.CopyConfig(cfg.UpstreamConfig) upstreamConfig.Host = syncerVirtualWorkspaceURL - rest.AddUserAgent(upstreamConfig, "kcp#spec-syncer/"+kcpVersion) - + rest.AddUserAgent(upstreamConfig, "kcp#syncing/"+kcpVersion) upstreamSyncerClusterClient, err := kcpdynamic.NewForConfig(upstreamConfig) if err != nil { return err } upstreamUpsyncConfig := rest.CopyConfig(cfg.UpstreamConfig) - // Upsyncing to Virtual Workspace for upsyncing - upstreamUpsyncConfig.Host = strings.Replace(syncerVirtualWorkspaceURL, "/services/syncer", "/services/upsyncer", 1) - upstreamUpsyncConfig.UserAgent = "kcp#upsyncer/" + kcpVersion + upstreamUpsyncConfig.Host = upsyncerVirtualWorkspaceURL + rest.AddUserAgent(upstreamUpsyncConfig, "kcp#upsyncing/"+kcpVersion) upstreamUpsyncerClusterClient, err := kcpdynamic.NewForConfig(upstreamUpsyncConfig) if err != nil { return err @@ -216,18 +198,18 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i // syncerNamespaceInformerFactory to watch some DNS-related resources in the dns namespace syncerNamespaceInformerFactory := kubernetesinformers.NewSharedInformerFactoryWithOptions(downstreamKubeClient, resyncPeriod, kubernetesinformers.WithNamespace(syncerNamespace)) - discoveryClient, err := kcpdiscovery.NewForConfig(upstreamConfig) + upstreamSyncerDiscoveryClient, err := kcpdiscovery.NewForConfig(upstreamConfig) if err != nil { return err } - resourceController, err := resourcesync.NewController( + syncTargetGVRSource, err := resourcesync.NewSyncTargetGVRSource( logger, - discoveryClient.DiscoveryInterface, + upstreamSyncerDiscoveryClient.DiscoveryInterface, upstreamSyncerClusterClient, downstreamDynamicClient, downstreamKubeClient, - kcpSyncTargetClient, + kcpSyncTargetClient.WorkloadV1alpha1().SyncTargets(), kcpSyncTargetInformerFactory.Workload().V1alpha1().SyncTargets(), cfg.SyncTargetName, logicalcluster.From(syncTarget), @@ -237,19 +219,18 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i return err } - ddsifForUpstreamSyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamSyncerClusterClient, nil, nil, resourceController, cache.Indexers{}) + ddsifForUpstreamSyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamSyncerClusterClient, nil, nil, syncTargetGVRSource, cache.Indexers{}) if err != nil { return err } ddsifForUpstreamUpsyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamUpsyncerClusterClient, nil, nil, - &FilteredGVRSource{ - resourceController, - func(gvr schema.GroupVersionResource) bool { - if gvr.Resource == "persistentvolumes" && gvr.Group == "" { - return false - } - return true + &filteredGVRSource{ + GVRSource: syncTargetGVRSource, + keepGVR: func(gvr schema.GroupVersionResource) bool { + return gvr.Group == corev1.GroupName && (gvr.Resource == "persistentvolumes" || + gvr.Resource == "pods" || + gvr.Resource == "endpoints") }, }, cache.Indexers{}) @@ -261,7 +242,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i func(o *metav1.ListOptions) { o.LabelSelector = workloadv1alpha1.InternalDownstreamClusterLabel + "=" + syncTargetKey }, - resourceController, + syncTargetGVRSource, cache.Indexers{ indexers.ByNamespaceLocatorIndexName: indexers.IndexByNamespaceLocator, }, @@ -302,6 +283,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i return err } + // Start and sync informer factories var cacheSyncsForAlwaysRequiredGVRs []cache.InformerSynced for _, alwaysRequired := range []string{"secrets", "namespaces"} { gvr := corev1.SchemeGroupVersion.WithResource(alwaysRequired) @@ -331,22 +313,29 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i go ddsifForUpstreamUpsyncer.StartWorker(ctx) go ddsifForDownstream.StartWorker(ctx) + // Start static controllers go apiImporter.Start(klog.NewContext(ctx, logger.WithValues("resources", resources)), importPollInterval) - go resourceController.Start(ctx, 1) + go syncTargetGVRSource.Start(ctx, 1) go specSyncer.Start(ctx, numSyncerThreads) go statusSyncer.Start(ctx, numSyncerThreads) go downstreamNamespaceController.Start(ctx, numSyncerThreads) + // Create and start GVR-specific controllers through controller managers upstreamSyncerControllerManager := controllermanager.NewControllerManager(ctx, "upstream-syncer", controllermanager.InformerSource{ Subscribe: ddsifForUpstreamSyncer.Subscribe, - Informer: func(gvr schema.GroupVersionResource) (cache.SharedIndexInformer, bool, bool) { - return ddsifForUpstreamSyncer.Informer(gvr) + Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) { + genericInformers, notSynced := ddsifForUpstreamSyncer.Informers() + informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers)) + for gvr, inf := range genericInformers { + informers[gvr] = inf.Informer() + } + return informers, notSynced }, }, - map[string]controllermanager.ControllerDefintion{}, + map[string]controllermanager.ManagedController{}, ) go upstreamSyncerControllerManager.Start(ctx) @@ -354,35 +343,53 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i "upstream-upsyncer", controllermanager.InformerSource{ Subscribe: ddsifForUpstreamUpsyncer.Subscribe, - Informer: func(gvr schema.GroupVersionResource) (cache.SharedIndexInformer, bool, bool) { - return ddsifForUpstreamUpsyncer.Informer(gvr) + Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) { + genericInformers, notSynced := ddsifForUpstreamUpsyncer.Informers() + informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers)) + for gvr, inf := range genericInformers { + informers[gvr] = inf.Informer() + } + return informers, notSynced }, }, - map[string]controllermanager.ControllerDefintion{}, + map[string]controllermanager.ManagedController{}, ) go upstreamUpsyncerControllerManager.Start(ctx) downstreamSyncerControllerManager := controllermanager.NewControllerManager(ctx, - "downstream", + "downstream-syncer", controllermanager.InformerSource{ Subscribe: ddsifForDownstream.Subscribe, - Informer: ddsifForDownstream.Informer, + Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) { + genericInformers, notSynced := ddsifForDownstream.Informers() + informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers)) + for gvr, inf := range genericInformers { + informers[gvr] = inf.Informer() + } + return informers, notSynced + }, }, - map[string]controllermanager.ControllerDefintion{ + map[string]controllermanager.ManagedController{ endpoints.ControllerName: { RequiredGVRs: []schema.GroupVersionResource{ corev1.SchemeGroupVersion.WithResource("services"), corev1.SchemeGroupVersion.WithResource("endpoints"), }, - NumThreads: 2, - Create: func(syncedInformers map[schema.GroupVersionResource]cache.SharedIndexInformer) (controllermanager.Controller, error) { - return endpoints.NewEndpointController(logger, downstreamDynamicClient, ddsifForDownstream, syncedInformers) + Create: func(ctx context.Context) (controllermanager.StartControllerFunc, error) { + endpointController, err := endpoints.NewEndpointController(downstreamDynamicClient, ddsifForDownstream) + if err != nil { + return nil, err + } + return func(ctx context.Context) { + endpointController.Start(ctx, 2) + }, nil }, }, }, ) go downstreamSyncerControllerManager.Start(ctx) + // Start tunneler for POD access if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) { go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName) } @@ -410,3 +417,20 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i return nil } + +type filteredGVRSource struct { + ddsif.GVRSource + keepGVR func(gvr schema.GroupVersionResource) bool +} + +func (s *filteredGVRSource) GVRs() map[schema.GroupVersionResource]ddsif.GVRPartialMetadata { + gvrs := s.GVRSource.GVRs() + filteredGVRs := make(map[schema.GroupVersionResource]ddsif.GVRPartialMetadata, len(gvrs)) + for gvr, metadata := range gvrs { + if !s.keepGVR(gvr) { + continue + } + filteredGVRs[gvr] = metadata + } + return filteredGVRs +} diff --git a/test/e2e/conformance/cross_logical_cluster_list_test.go b/test/e2e/conformance/cross_logical_cluster_list_test.go index 8321386b356..3ea5fbf4565 100644 --- a/test/e2e/conformance/cross_logical_cluster_list_test.go +++ b/test/e2e/conformance/cross_logical_cluster_list_test.go @@ -253,15 +253,15 @@ func TestCRDCrossLogicalClusterListPartialObjectMetadata(t *testing.T) { t.Logf("Wait for the sheriff to show up in the informer") // key := "default/" + client.ToClusterAwareKey(wsNormalCRD1a, "john-hicks-adams") require.Eventually(t, func() bool { - listers, _ := informerFactory.Listers() + informers, _ := informerFactory.Informers() - lister := listers[sheriffsGVR] - if lister == nil { + informer := informers[sheriffsGVR] + if informer == nil { t.Logf("Waiting for sheriffs to show up in dynamic informer") return false } - l, err := lister.List(labels.Everything()) + l, err := informer.Lister().List(labels.Everything()) if err != nil { t.Logf("Error listing sheriffs: %v", err) return false