Skip to content

Commit

Permalink
fix for flux plugin: clean up old charts from chart cache after repo …
Browse files Browse the repository at this point in the history
…update #4115  (#5644)

fix for long-standing issue
#4115

plus DRYing up a bunch of existing test code

Not 100% sure who to ask for review. Its not urgent. If no-one
materializes in the next few days, I will ask

The only thing of interest here is really a new func call 
```
chartCache.PurgeObsoleteChartVersions(charts)
```
executed before an old call to 
```
SyncCharts()
```

There is a new integration test I added for this scenario.
Most everything else is test code cleanup and doesn't need any review
  • Loading branch information
gfichtenholt authored Nov 21, 2022
1 parent a36b8b5 commit e5fe86f
Show file tree
Hide file tree
Showing 13 changed files with 650 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func (c *ChartCache) SyncCharts(charts []models.Chart, downloadFn DownloadChartF
log.Warningf("Skipping chart [%s] due to empty version array", chart.ID)
continue
} else if len(chart.ChartVersions[0].URLs) == 0 {
log.Warningf("Chart: [%s], version: [%s] has no URLs", chart.ID, chart.ChartVersions[0].Version)
log.Warningf("Skipping chart [%s], version: [%s] has no URLs", chart.ID, chart.ChartVersions[0].Version)
continue
} else if chart.Repo == nil {
// shouldn't happen
log.Warningf("Skipping chart [%s] as it is not associated with any repo", chart.ID)
continue
}

Expand Down Expand Up @@ -259,10 +263,9 @@ func (c *ChartCache) processNextWorkItem(workerName string) bool {
return true
}

func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
log.Infof("+DeleteChartsForRepo(%s)", repo)
defer log.Infof("-DeleteChartsForRepo(%s)", repo)

// will clear out the cache of charts for a given repo except the charts specified by
// keepThese argument, which may be nil.
func (c *ChartCache) deleteChartsHelper(repo *types.NamespacedName, keepThese sets.String) error {
// need to get a list of all charts/versions for this repo that are either:
// a. already in the cache OR
// b. being processed
Expand All @@ -287,6 +290,7 @@ func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
if err != nil {
return err
}
log.Infof("Redis [SCAN %d %s]: %d keys", cursor, match, len(keys))
for _, k := range keys {
redisKeysToDelete.Insert(k)
}
Expand All @@ -308,7 +312,7 @@ func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
}
}

for k := range redisKeysToDelete {
for k := range redisKeysToDelete.Difference(keepThese) {
if namespace, chartID, chartVersion, err := c.fromKey(k); err != nil {
log.Errorf("%+v", err)
} else {
Expand All @@ -329,6 +333,52 @@ func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
return nil
}

func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
log.Infof("+DeleteChartsForRepo(%s)", repo)
defer log.Infof("-DeleteChartsForRepo(%s)", repo)

return c.deleteChartsHelper(repo, sets.String{})
}

// this function is called when re-importing charts after an update to the repo,
// so keepThese is actually populated from the new data, meaning that if the new
// data no longer includes a certain version, it'll get purged here
func (c *ChartCache) PurgeObsoleteChartVersions(keepThese []models.Chart) error {
log.Infof("+PurgeObsoleteChartVersions()")
defer log.Infof("-PurgeObsoleteChartVersions")

repos := map[types.NamespacedName]sets.String{}
for _, ch := range keepThese {
if ch.Repo == nil {
// shouldn't happen
log.Warningf("Skipping chart [%s] as it is not associated with any repo", ch.ID)
continue
}
n := types.NamespacedName{
Name: ch.Repo.Name,
Namespace: ch.Repo.Namespace,
}
a, ok := repos[n]
if a == nil || !ok {
a = sets.String{}
}
for _, cv := range ch.ChartVersions {
if key, err := c.KeyFor(ch.Repo.Namespace, ch.ID, cv.Version); err != nil {
return err
} else {
repos[n] = a.Insert(key)
}
}
}

for repo, keep := range repos {
if err := c.deleteChartsHelper(&repo, keep); err != nil {
return err
}
}
return nil
}

func (c *ChartCache) OnResync() error {
log.Infof("+OnResync(), queue: [%s], size: [%d]", c.queue.Name(), c.queue.Len())
c.resyncCond.L.Lock()
Expand Down Expand Up @@ -620,7 +670,7 @@ func ChartCacheComputeValue(chartID, chartUrl, chartVersion string, downloadFn D
return nil, err
}

log.Infof("Successfully fetched details for chart: [%s], version: [%s], url: [%s], details: [%d] bytes",
log.V(4).Infof("Successfully fetched details for chart: [%s], version: [%s], url: [%s], details: [%d] bytes",
chartID, chartVersion, chartUrl, len(chartTgz))

cacheEntryValue := chartCacheEntryValue{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av
}

if byteArray == nil {
return nil, status.Errorf(codes.Internal, "failed to load details for chart [%s]", chartModel.ID)
return nil, status.Errorf(codes.Internal, "failed to load details for chart [%s], version [%s]", chartModel.ID, chartVersion)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ import (
"time"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
corev1 "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1"
plugins "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/plugins/v1alpha1"
fluxplugin "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/plugins/fluxv2/packages/v1alpha1"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/common"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -854,22 +851,10 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
t.Fatal(err)
}

opt1 := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageSummariesResponse{},
corev1.AvailablePackageSummary{},
corev1.AvailablePackageReference{},
corev1.Context{},
plugins.Plugin{},
corev1.PackageAppVersion{})
opt2 := cmpopts.SortSlices(lessAvailablePackageFunc)
if !tc.unauthenticated {
if got, want := resp, expected_oci_stefanprodan_podinfo_available_summaries(repoName.Name); !cmp.Equal(got, want, opt1, opt2) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2))
}
compareAvailablePackageSummaries(t, resp, expected_oci_stefanprodan_podinfo_available_summaries(repoName.Name))
} else {
if got, want := resp, no_available_summaries(repoName.Name); !cmp.Equal(got, want, opt1, opt2) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2))
}
compareAvailablePackageSummaries(t, resp, no_available_summaries(repoName.Name))
return // nothing more to check
}

Expand All @@ -889,12 +874,7 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
if err != nil {
t.Fatal(err)
}
opts := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageVersionsResponse{},
corev1.PackageAppVersion{})
if got, want := resp2, expected_versions_stefanprodan_podinfo; !cmp.Equal(want, got, opts) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opts))
}
compareAvailablePackageVersions(t, resp2, expected_versions_stefanprodan_podinfo)

hour, minute, second = time.Now().Clock()
t.Logf("[%d:%d:%d] Calling GetAvailablePackageDetail(latest version) blocking for up to [%s]...",
Expand All @@ -914,7 +894,7 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
t.Fatal(err)
}

compareActualVsExpectedAvailablePackageDetail(
compareAvailablePackageDetail(
t,
resp3.AvailablePackageDetail,
expected_detail_oci_stefanprodan_podinfo(repoName.Name, tc.registryUrl).AvailablePackageDetail)
Expand All @@ -936,7 +916,7 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
t.Fatal(err)
}

compareActualVsExpectedAvailablePackageDetail(
compareAvailablePackageDetail(
t,
resp4.AvailablePackageDetail,
expected_detail_oci_stefanprodan_podinfo_2(repoName.Name, tc.registryUrl).AvailablePackageDetail)
Expand Down Expand Up @@ -1026,17 +1006,134 @@ func TestKindClusterAvailablePackageEndpointsOCIRepo2Charts(t *testing.T) {
t.Fatal(err)
}

opt1 := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageSummariesResponse{},
corev1.AvailablePackageSummary{},
corev1.AvailablePackageReference{},
corev1.Context{},
plugins.Plugin{},
corev1.PackageAppVersion{})
opt2 := cmpopts.SortSlices(lessAvailablePackageFunc)
if got, want := resp, expected_oci_repo_with_2_charts_available_summaries(repoName.Name); !cmp.Equal(got, want, opt1, opt2) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2))
compareAvailablePackageSummaries(t, resp, expected_oci_repo_with_2_charts_available_summaries(repoName.Name))
})
}
}

// The goal of this integration test is to ensure that when the contents of remote HTTP helm repo is changed,
// that fact is recorded locally and processed properly (repo/chart cache is updated with latest, etc.)
func TestKindClusterAddRemovePackageVersionsInHttpRepo(t *testing.T) {
fluxPluginPackagesClient, _, err := checkEnv(t)
if err != nil {
t.Fatal(err)
}

adminAcctName := types.NamespacedName{
Name: "test-add-remove-versions-repo-admin-" + randSeq(4),
Namespace: "default",
}
grpcContext, err := newGrpcAdminContext(t, adminAcctName)
if err != nil {
t.Fatal(err)
}

repoName := types.NamespacedName{
Name: "podinfo",
Namespace: "test-" + randSeq(4),
}
if err := kubeCreateNamespaceAndCleanup(t, repoName.Namespace); err != nil {
t.Fatal(err)
}

if err = kubeAddHelmRepositoryAndCleanup(t, repoName, "", podinfo_repo_url, "", 10*time.Second); err != nil {
t.Fatal(err)
}

pkgRef := availableRef(fmt.Sprintf("%s/%s", repoName.Name, "podinfo"), repoName.Namespace)

// need to wait until repo is indexed by flux plugin
const maxWait = 25
var pkgDetail *corev1.GetAvailablePackageDetailResponse
for i := 0; i <= maxWait; i++ {
grpcContext, cancel := context.WithTimeout(grpcContext, defaultContextTimeout)
defer cancel()

pkgDetail, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{AvailablePackageRef: pkgRef})
if err == nil {
break
} else if i == maxWait {
if repo, err2 := kubeGetHelmRepository(t, repoName); err2 == nil && repo != nil {
t.Fatalf("Timed out waiting for available package [%s], last response: %v, last error: [%v],\nhelm repository:%s",
pkgRef, pkgDetail, err, common.PrettyPrint(repo))
} else {
t.Fatalf("Timed out waiting for available package [%s], last response: %v, last error: [%v]",
pkgRef, pkgDetail, err)
}
} else {
t.Logf("Waiting 1s for repository [%s] to be indexed, attempt [%d/%d]...", repoName, i+1, maxWait)
time.Sleep(1 * time.Second)
}
}
compareAvailablePackageDetail(
t,
pkgDetail.AvailablePackageDetail,
expected_detail_podinfo(repoName.Name, repoName.Namespace).AvailablePackageDetail)

podName, err := getFluxPluginTestdataPodName()
if err != nil {
t.Fatal(err)
}
t.Logf("podName = [%s]", podName)

if err = kubeCopyFileToPod(
t,
testTgz("podinfo-6.0.3.tgz"),
*podName,
"/usr/share/nginx/html/podinfo/podinfo-6.0.3.tgz"); err != nil {
t.Fatal(err)
}
if err = kubeCopyFileToPod(
t,
testYaml("podinfo-index-updated.yaml"),
*podName,
"/usr/share/nginx/html/podinfo/index.yaml"); err != nil {
t.Fatal(err)
}

SleepWithCountdown(t, 20)

pkgDetail, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{AvailablePackageRef: pkgRef})
if err != nil {
t.Fatal(err)
}
compareAvailablePackageDetail(
t,
pkgDetail.AvailablePackageDetail,
expected_detail_podinfo_after_update_1(repoName.Name, repoName.Namespace).AvailablePackageDetail)

if err = kubeCopyFileToPod(
t,
testYaml("podinfo-index.yaml"),
*podName,
"/usr/share/nginx/html/podinfo/index.yaml"); err != nil {
t.Logf("Error reverting to previous podinfo index: %v", err)
}

SleepWithCountdown(t, 20)

pkgDetail, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{AvailablePackageRef: pkgRef})
if err != nil {
t.Fatal(err)
}
compareAvailablePackageDetail(
t,
pkgDetail.AvailablePackageDetail,
expected_detail_podinfo(repoName.Name, repoName.Namespace).AvailablePackageDetail)

_, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{
AvailablePackageRef: pkgRef,
PkgVersion: "6.0.3",
})
if status.Code(err) != codes.Internal {
t.Fatalf("Expected Internal, got: %v", err)
}
}
Loading

0 comments on commit e5fe86f

Please sign in to comment.