Skip to content

Commit

Permalink
Fix PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: David Festal <dfestal@redhat.com>
  • Loading branch information
davidfestal committed Jan 10, 2023
1 parent 7f76f5e commit a22427d
Show file tree
Hide file tree
Showing 18 changed files with 475 additions and 444 deletions.
90 changes: 45 additions & 45 deletions hack/logcheck.out

Large diffs are not rendered by default.

38 changes: 5 additions & 33 deletions pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,13 @@ func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, Generi
return inf
}

// Listers returns a map of per-resource-type listers for all types that are
// Informers returns a map of per-resource-type generic informers for all types that are
// known by this informer factory, and that are synced.
//
// If any informers aren't synced, their GVRs are returned so that they can be
// checked and processed later.
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Listers() (listers map[schema.GroupVersionResource]Lister, notSynced []schema.GroupVersionResource) {
listers = map[schema.GroupVersionResource]Lister{}
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Informers() (informers map[schema.GroupVersionResource]GenericInformer, notSynced []schema.GroupVersionResource) {
informers = map[schema.GroupVersionResource]GenericInformer{}

d.informersLock.RLock()
defer d.informersLock.RUnlock()
Expand All @@ -372,38 +372,10 @@ func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, Generi
continue
}

listers[gvr] = informer.Lister()
informers[gvr] = informer
}

return listers, notSynced
}

// Lister returns a lister for the given resource-type, whether it is
// known by this informer factory, and whether it is synced.
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Lister(gvr schema.GroupVersionResource) (lister Lister, known, synced bool) {
d.informersLock.RLock()
defer d.informersLock.RUnlock()

informer, ok := d.informers[gvr]
if !ok {
return lister, false, false
}

return informer.Lister(), true, informer.Informer().HasSynced()
}

// Informer returns an informer for the given resource-type if it exists, whether it is
// known by this informer factory, and whether it is synced.
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Informer(gvr schema.GroupVersionResource) (informer Informer, known, synced bool) {
d.informersLock.RLock()
defer d.informersLock.RUnlock()

genericInformer, ok := d.informers[gvr]
if !ok {
return informer, false, false
}

return genericInformer.Informer(), true, genericInformer.Informer().HasSynced()
return informers, notSynced
}

// GVREventHandler is an event handler that includes the GroupVersionResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func claimFromSetKey(key string) apisv1alpha1.PermissionClaim {
}

func (c *controller) getInformerForGroupResource(group, resource string) (kcpkubernetesinformers.GenericClusterInformer, schema.GroupVersionResource, error) {
listers, _ := c.ddsif.Listers()
informers, _ := c.ddsif.Informers()

for gvr := range listers {
for gvr := range informers {
if gvr.Group == group && gvr.Resource == resource {
informer, err := c.ddsif.ForResource(gvr)
// once we find one, return.
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/workload/resource/resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ func (c *Controller) enqueueResourcesForNamespace(ns *corev1.Namespace) error {
logger = logger.WithValues("nsLocations", nsLocations.List())

logger.V(4).Info("getting listers")
listers, notSynced := c.ddsif.Listers()
informers, notSynced := c.ddsif.Informers()
var errs []error
for gvr, lister := range listers {
for gvr, informer := range informers {
logger = logger.WithValues("gvr", gvr.String())
objs, err := lister.ByCluster(clusterName).ByNamespace(ns.Name).List(labels.Everything())
objs, err := informer.Lister().ByCluster(clusterName).ByNamespace(ns.Name).List(labels.Everything())
if err != nil {
errs = append(errs, fmt.Errorf("error listing %q in %s|%s: %w", gvr, clusterName, ns.Name, err))
continue
Expand Down Expand Up @@ -438,9 +438,9 @@ func (c *Controller) enqueueSyncTarget(obj interface{}) {
func (c *Controller) enqueueSyncTargetKey(syncTargetKey string) {
logger := logging.WithReconciler(klog.Background(), ControllerName).WithValues("syncTargetKey", syncTargetKey)

listers, _ := c.ddsif.Listers()
informers, _ := c.ddsif.Informers()
queued := map[string]int{}
for gvr := range listers {
for gvr := range informers {
inf, err := c.ddsif.ForResource(gvr)
if err != nil {
runtime.HandleError(err)
Expand Down
142 changes: 76 additions & 66 deletions pkg/syncer/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,50 @@ import (
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
)

const (
ControllerNamePrefix = "syncer-controller-manager-"
)

// InformerSource is a dynamic source of informers per GVR,
// which notifies when informers are added or removed for some GVR.
// It is implemented by the DynamicSharedInformerFactory (in fact by
// both the scoped or cluster-aware variants).
type InformerSource struct {
// Subscribe registers for informer change notifications, returning a channel to which change notifications are sent.
// The id argument is the identifier of the subscriber, since there might be several subscribers subscribing
// to receive events from this InformerSource.
Subscribe func(id string) <-chan struct{}
Informer func(gvr schema.GroupVersionResource) (informer cache.SharedIndexInformer, known, synced bool)
}

type Controller interface {
Start(ctx context.Context, numThreads int)
// Informers returns a map of per-resource-type SharedIndexInformers for all types that are
// known by this informer source, and that are synced.
//
// It also returns the list of informers that are known by this informer source, but sill not synced.
Informers func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource)
}

type ControllerDefintion struct {
// ManagedController defines a controller that should be managed by a ControllerManager,
// to be started when the required GVRs are supported, and stopped when the required GVRs
// are not supported anymore.
type ManagedController struct {
RequiredGVRs []schema.GroupVersionResource
NumThreads int
Create func(syncedInformers map[schema.GroupVersionResource]cache.SharedIndexInformer) (Controller, error)
Create CreateControllerFunc
}

func NewControllerManager(ctx context.Context, suffix string, informerSource InformerSource, controllers map[string]ControllerDefintion) *ControllerManager {
type StartControllerFunc func(ctx context.Context)
type CreateControllerFunc func(ctx context.Context) (StartControllerFunc, error)

// NewControllerManager creates a new ControllerManager which will manage (create/start/stop) GVR-specific controllers according to informers
// available in the provided InformerSource.
func NewControllerManager(ctx context.Context, suffix string, informerSource InformerSource, controllers map[string]ManagedController) *ControllerManager {
controllerManager := ControllerManager{
name: ControllerNamePrefix + suffix,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerNamePrefix+suffix),
informerSource: informerSource,
controllerDefinitions: controllers,
startedControllers: map[string]context.CancelFunc{},
name: ControllerNamePrefix + suffix,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerNamePrefix+suffix),
informerSource: informerSource,
managedControllers: controllers,
startedControllers: map[string]context.CancelFunc{},
}

apisChanged := informerSource.Subscribe(controllerManager.name)
Expand All @@ -77,12 +93,19 @@ func NewControllerManager(ctx context.Context, suffix string, informerSource Inf
return &controllerManager
}

// ControllerManager is a component that manages (create/start/stop) GVR-specific controllers according to available GVRs.
// It reacts to the changes of supported GVRs in a DiscoveringDynamicSharedInformerFactory
// (the GVRs for which an informer has been automatically created, started and synced),
// and starts / stops registered GVRs-specific controllers according to the GVRs they depend on.
//
// For example this allows starting PVC / PV controllers only when PVC / PV resources are exposed by the Syncer and UpSyncer
// virtual workspaces, and Informers for them have been started and synced by the corresponding ddsif.
type ControllerManager struct {
name string
queue workqueue.RateLimitingInterface
informerSource InformerSource
controllerDefinitions map[string]ControllerDefintion
startedControllers map[string]context.CancelFunc
name string
queue workqueue.RateLimitingInterface
informerSource InformerSource
managedControllers map[string]ManagedController
startedControllers map[string]context.CancelFunc
}

// Start starts the controller, which stops when ctx.Done() is closed.
Expand Down Expand Up @@ -110,79 +133,66 @@ func (c *ControllerManager) processNextWorkItem(ctx context.Context) bool {
}
defer c.queue.Done(key)

c.UpdateControllers(ctx)
c.process(ctx)
c.queue.Forget(key)
return true
}

func (c *ControllerManager) UpdateControllers(ctx context.Context) {
func (c *ControllerManager) process(ctx context.Context) {
logger := klog.FromContext(ctx)
controllersToStart := map[string]map[schema.GroupVersionResource]cache.SharedIndexInformer{}
controllersToStart := map[string]CreateControllerFunc{}
syncedInformers, notSynced := c.informerSource.Informers()
controllerLoop:
for controllerName, controllerDefinition := range c.controllerDefinitions {
requiredGVRs := controllerDefinition.RequiredGVRs
informers := make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(requiredGVRs))
for controllerName, managedController := range c.managedControllers {
requiredGVRs := managedController.RequiredGVRs
for _, gvr := range requiredGVRs {
if informer, known, synced := c.informerSource.Informer(gvr); !known {
continue controllerLoop
} else if !synced {
logger.V(2).Info("waiting for the informer to be synced before starting controller", "gvr", gvr, "controller", controllerName)
c.queue.AddAfter("resync", time.Second)
informer := syncedInformers[gvr]
if informer == nil {
if shared.ContainsGVR(notSynced, gvr) {
logger.V(2).Info("waiting for the informer to be synced before starting controller", "gvr", gvr, "controller", controllerName)
c.queue.AddAfter("resync", time.Second)
continue controllerLoop
}
// The informer doesn't even exist for this GVR.
// Let's ignore this controller for now: one of the required GVRs has no informer started
// (because it has not been found on the SyncTarget in the supported resources to sync).
// If this required GVR is supported later on, the updateControllers() method will be called
// again after an API change notification comes through the informerSource.
continue controllerLoop
} else {
informers[gvr] = informer
}
}
controllersToStart[controllerName] = informers
controllersToStart[controllerName] = managedController.Create
}

// Remove obsolete controllers that don't have their required GVRs anymore
for controllerName, cancelFunc := range c.startedControllers {
if _, ok := controllersToStart[controllerName]; ok {
// The controller is still expected => don't remove it
continue
}
// The controller should not be running
// Stop it and remove it from the list of started controllers
cancelFunc()
delete(c.startedControllers, controllerName)
}

// Create and start missing controllers that have their required GVRs synced
newlyStartedControllers := map[string]context.CancelFunc{}
for controllerName, informers := range controllersToStart {
for controllerName, create := range controllersToStart {
if _, ok := c.startedControllers[controllerName]; ok {
// The controller is already started
continue
}
controllerDefinition, ok := c.controllerDefinitions[controllerName]
if !ok {
logger.V(2).Info("cannot find controller definition", "controller", controllerName)
continue
}

// Create the controller
controller, err := controllerDefinition.Create(informers)
start, err := create(ctx)
if err != nil {
logger.Error(err, "error creating controller", "controller", controllerName)
logger.Error(err, "failed creating controller", "controller", controllerName)
continue
}

for _, informer := range informers {
if err := informer.GetStore().Resync(); err != nil {
logger.Error(err, "error resyncing informer controller", "controller", controllerName)
continue
}
}

// Start the controller
controllerContext, cancelFunc := context.WithCancel(ctx)
go controller.Start(controllerContext, controllerDefinition.NumThreads)
newlyStartedControllers[c.name] = cancelFunc
}

// Remove obsolete controllers that don't have their required GVRs anymore
for controllerName, cancelFunc := range c.startedControllers {
if _, ok := controllersToStart[controllerName]; ok {
// The controller is still expected => don't remove it
continue
}
// The controller should not be running
// Stop it and remove it from the list of started controllers
cancelFunc()
delete(c.startedControllers, controllerName)
}

// Add missing controllers that were created and started above
for controllerName, cancelFunc := range newlyStartedControllers {
go start(controllerContext)
c.startedControllers[controllerName] = cancelFunc
}
}
Loading

0 comments on commit a22427d

Please sign in to comment.