diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index ee491c2e592..440518db747 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -105,6 +105,7 @@ type OnlineStorePersistence struct { // OnlineStoreFilePersistence configures the file-based persistence for the offline store service // +kubebuilder:validation:XValidation:rule="(!has(self.pvc) && has(self.path)) ? self.path.startsWith('/') : true",message="Ephemeral stores must have absolute paths." // +kubebuilder:validation:XValidation:rule="(has(self.pvc) && has(self.path)) ? !self.path.startsWith('/') : true",message="PVC path must be a file name only, with no slashes." +// +kubebuilder:validation:XValidation:rule="has(self.path) && !self.path.startsWith('s3://') && !self.path.startsWith('gs://')",message="Online store does not support S3 or GS buckets." type OnlineStoreFilePersistence struct { Path string `json:"path,omitempty"` PvcConfig *PvcConfig `json:"pvc,omitempty"` @@ -122,11 +123,14 @@ type RegistryPersistence struct { } // RegistryFilePersistence configures the file-based persistence for the registry service -// +kubebuilder:validation:XValidation:rule="(!has(self.pvc) && has(self.path)) ? self.path.startsWith('/') : true",message="Ephemeral stores must have absolute paths." +// +kubebuilder:validation:XValidation:rule="(!has(self.pvc) && has(self.path)) ? (self.path.startsWith('/') || self.path.startsWith('s3://') || self.path.startsWith('gs://')) : true",message="Registry files must use absolute paths or be S3 ('s3://') or GS ('gs://') object store URIs." // +kubebuilder:validation:XValidation:rule="(has(self.pvc) && has(self.path)) ? !self.path.startsWith('/') : true",message="PVC path must be a file name only, with no slashes." +// +kubebuilder:validation:XValidation:rule="(has(self.pvc) && has(self.path)) ? !(self.path.startsWith('s3://') || self.path.startsWith('gs://')) : true",message="PVC persistence does not support S3 or GS object store URIs." +// +kubebuilder:validation:XValidation:rule="(has(self.s3_additional_kwargs) && has(self.path)) ? self.path.startsWith('s3://') : true",message="Additional S3 settings are available only for S3 object store URIs." type RegistryFilePersistence struct { - Path string `json:"path,omitempty"` - PvcConfig *PvcConfig `json:"pvc,omitempty"` + Path string `json:"path,omitempty"` + PvcConfig *PvcConfig `json:"pvc,omitempty"` + S3AdditionalKwargs *map[string]string `json:"s3_additional_kwargs,omitempty"` } // PvcConfig defines the settings for a persistent file store based on PVCs. diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index 59f44c406cf..c020af11216 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -452,6 +452,17 @@ func (in *RegistryFilePersistence) DeepCopyInto(out *RegistryFilePersistence) { *out = new(PvcConfig) (*in).DeepCopyInto(*out) } + if in.S3AdditionalKwargs != nil { + in, out := &in.S3AdditionalKwargs, &out.S3AdditionalKwargs + *out = new(map[string]string) + if **in != nil { + in, out := *in, *out + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryFilePersistence. diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index 5e470296a9e..99d9104b953 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -545,6 +545,9 @@ spec: slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: Online store does not support S3 or GS buckets. + rule: has(self.path) && !self.path.startsWith('s3://') + && !self.path.startsWith('gs://') type: object resources: description: ResourceRequirements describes the compute resource @@ -816,15 +819,30 @@ spec: - message: Mount path must start with '/' and must not contain ':' rule: self.mountPath.matches('^/[^:]*$') + s3_additional_kwargs: + additionalProperties: + type: string + type: object type: object x-kubernetes-validations: - - message: Ephemeral stores must have absolute paths. - rule: '(!has(self.pvc) && has(self.path)) ? self.path.startsWith(''/'') + - message: Registry files must use absolute paths + or be S3 ('s3://') or GS ('gs://') object store + URIs. + rule: '(!has(self.pvc) && has(self.path)) ? (self.path.startsWith(''/'') + || self.path.startsWith(''s3://'') || self.path.startsWith(''gs://'')) : true' - message: PVC path must be a file name only, with no slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: PVC persistence does not support S3 or + GS object store URIs. + rule: '(has(self.pvc) && has(self.path)) ? !(self.path.startsWith(''s3://'') + || self.path.startsWith(''gs://'')) : true' + - message: Additional S3 settings are available only + for S3 object store URIs. + rule: '(has(self.s3_additional_kwargs) && has(self.path)) + ? self.path.startsWith(''s3://'') : true' type: object resources: description: ResourceRequirements describes the compute @@ -1427,6 +1445,10 @@ spec: no slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: Online store does not support S3 or GS + buckets. + rule: has(self.path) && !self.path.startsWith('s3://') + && !self.path.startsWith('gs://') type: object resources: description: ResourceRequirements describes the compute @@ -1705,16 +1727,30 @@ spec: - message: Mount path must start with '/' and must not contain ':' rule: self.mountPath.matches('^/[^:]*$') + s3_additional_kwargs: + additionalProperties: + type: string + type: object type: object x-kubernetes-validations: - - message: Ephemeral stores must have absolute - paths. + - message: Registry files must use absolute paths + or be S3 ('s3://') or GS ('gs://') object + store URIs. rule: '(!has(self.pvc) && has(self.path)) ? - self.path.startsWith(''/'') : true' + (self.path.startsWith(''/'') || self.path.startsWith(''s3://'') + || self.path.startsWith(''gs://'')) : true' - message: PVC path must be a file name only, with no slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: PVC persistence does not support S3 + or GS object store URIs. + rule: '(has(self.pvc) && has(self.path)) ? !(self.path.startsWith(''s3://'') + || self.path.startsWith(''gs://'')) : true' + - message: Additional S3 settings are available + only for S3 object store URIs. + rule: '(has(self.s3_additional_kwargs) && has(self.path)) + ? self.path.startsWith(''s3://'') : true' type: object resources: description: ResourceRequirements describes the compute diff --git a/infra/feast-operator/config/samples/v1alpha1_featurestore_objectstore_persistence.yaml b/infra/feast-operator/config/samples/v1alpha1_featurestore_objectstore_persistence.yaml new file mode 100644 index 00000000000..45f12a67a18 --- /dev/null +++ b/infra/feast-operator/config/samples/v1alpha1_featurestore_objectstore_persistence.yaml @@ -0,0 +1,24 @@ +apiVersion: feast.dev/v1alpha1 +kind: FeatureStore +metadata: + name: sample-s3-registry +spec: + feastProject: my_project + services: + onlineStore: + persistence: + file: + path: /data/online_store.db + offlineStore: + persistence: + file: + type: dask + registry: + local: + persistence: + file: + path: s3://bucket/registry.db + s3_additional_kwargs: + ServerSideEncryption: AES256 + ACL: bucket-owner-full-control + CacheControl: max-age=3600 diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index 1be34a9e088..7f208f548f6 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -553,6 +553,9 @@ spec: slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: Online store does not support S3 or GS buckets. + rule: has(self.path) && !self.path.startsWith('s3://') + && !self.path.startsWith('gs://') type: object resources: description: ResourceRequirements describes the compute resource @@ -824,15 +827,30 @@ spec: - message: Mount path must start with '/' and must not contain ':' rule: self.mountPath.matches('^/[^:]*$') + s3_additional_kwargs: + additionalProperties: + type: string + type: object type: object x-kubernetes-validations: - - message: Ephemeral stores must have absolute paths. - rule: '(!has(self.pvc) && has(self.path)) ? self.path.startsWith(''/'') + - message: Registry files must use absolute paths + or be S3 ('s3://') or GS ('gs://') object store + URIs. + rule: '(!has(self.pvc) && has(self.path)) ? (self.path.startsWith(''/'') + || self.path.startsWith(''s3://'') || self.path.startsWith(''gs://'')) : true' - message: PVC path must be a file name only, with no slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: PVC persistence does not support S3 or + GS object store URIs. + rule: '(has(self.pvc) && has(self.path)) ? !(self.path.startsWith(''s3://'') + || self.path.startsWith(''gs://'')) : true' + - message: Additional S3 settings are available only + for S3 object store URIs. + rule: '(has(self.s3_additional_kwargs) && has(self.path)) + ? self.path.startsWith(''s3://'') : true' type: object resources: description: ResourceRequirements describes the compute @@ -1435,6 +1453,10 @@ spec: no slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: Online store does not support S3 or GS + buckets. + rule: has(self.path) && !self.path.startsWith('s3://') + && !self.path.startsWith('gs://') type: object resources: description: ResourceRequirements describes the compute @@ -1713,16 +1735,30 @@ spec: - message: Mount path must start with '/' and must not contain ':' rule: self.mountPath.matches('^/[^:]*$') + s3_additional_kwargs: + additionalProperties: + type: string + type: object type: object x-kubernetes-validations: - - message: Ephemeral stores must have absolute - paths. + - message: Registry files must use absolute paths + or be S3 ('s3://') or GS ('gs://') object + store URIs. rule: '(!has(self.pvc) && has(self.path)) ? - self.path.startsWith(''/'') : true' + (self.path.startsWith(''/'') || self.path.startsWith(''s3://'') + || self.path.startsWith(''gs://'')) : true' - message: PVC path must be a file name only, with no slashes. rule: '(has(self.pvc) && has(self.path)) ? !self.path.startsWith(''/'') : true' + - message: PVC persistence does not support S3 + or GS object store URIs. + rule: '(has(self.pvc) && has(self.path)) ? !(self.path.startsWith(''s3://'') + || self.path.startsWith(''gs://'')) : true' + - message: Additional S3 settings are available + only for S3 object store URIs. + rule: '(has(self.s3_additional_kwargs) && has(self.path)) + ? self.path.startsWith(''s3://'') : true' type: object resources: description: ResourceRequirements describes the compute diff --git a/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go new file mode 100644 index 00000000000..cedcba6124b --- /dev/null +++ b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go @@ -0,0 +1,419 @@ +/* +Copyright 2024 Feast Community. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "encoding/base64" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "gopkg.in/yaml.v3" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/feast-dev/feast/infra/feast-operator/api/feastversion" + feastdevv1alpha1 "github.com/feast-dev/feast/infra/feast-operator/api/v1alpha1" + "github.com/feast-dev/feast/infra/feast-operator/internal/controller/services" +) + +var _ = Describe("FeatureStore Controller-Ephemeral services", func() { + Context("When deploying a resource with all ephemeral services", func() { + const resourceName = "services-object-store" + var pullPolicy = corev1.PullAlways + var testEnvVarName = "testEnvVarName" + var testEnvVarValue = "testEnvVarValue" + + ctx := context.Background() + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + featurestore := &feastdevv1alpha1.FeatureStore{} + registryPath := "s3://bucket/registry.db" + + s3AdditionalKwargs := map[string]string{ + "key1": "value1", + "key2": "value2", + } + + BeforeEach(func() { + By("creating the custom resource for the Kind FeatureStore") + err := k8sClient.Get(ctx, typeNamespacedName, featurestore) + if err != nil && errors.IsNotFound(err) { + resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) + resource.Spec.Services.OnlineStore = nil + resource.Spec.Services.OfflineStore = nil + resource.Spec.Services.Registry = &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: registryPath, + S3AdditionalKwargs: &s3AdditionalKwargs, + }, + }, + }, + } + + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + } + }) + AfterEach(func() { + resource := &feastdevv1alpha1.FeatureStore{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + By("Cleanup the specific resource instance FeatureStore") + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + }) + + It("should successfully reconcile the resource", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + feast := services.FeastServices{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + } + Expect(resource.Status).NotTo(BeNil()) + Expect(resource.Status.FeastVersion).To(Equal(feastversion.FeastVersion)) + Expect(resource.Status.ClientConfigMap).To(Equal(feast.GetFeastServiceName(services.ClientFeastType))) + Expect(resource.Status.Applied.FeastProject).To(Equal(resource.Spec.FeastProject)) + Expect(resource.Status.Applied.Services).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.OfflineStore).To(BeNil()) + Expect(resource.Status.Applied.Services.OnlineStore).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.Path).To(Equal(registryPath)) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.S3AdditionalKwargs).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.S3AdditionalKwargs).To(Equal(&s3AdditionalKwargs)) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.PvcConfig).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.ImagePullPolicy).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Resources).To(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Image).To(Equal(&services.DefaultImage)) + + Expect(resource.Status.ServiceHostnames.OfflineStore).To(BeEmpty()) + Expect(resource.Status.ServiceHostnames.OnlineStore).To(BeEmpty()) + Expect(resource.Status.ServiceHostnames.Registry).To(Equal(feast.GetFeastServiceName(services.RegistryFeastType) + "." + resource.Namespace + domain)) + + Expect(resource.Status.Conditions).NotTo(BeEmpty()) + cond := apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.ReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.ReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.ReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.RegistryReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.RegistryReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.RegistryReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.ClientReadyType) + Expect(cond).ToNot(BeNil()) + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + Expect(cond.Reason).To(Equal(feastdevv1alpha1.ReadyReason)) + Expect(cond.Type).To(Equal(feastdevv1alpha1.ClientReadyType)) + Expect(cond.Message).To(Equal(feastdevv1alpha1.ClientReadyMessage)) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.OfflineStoreReadyType) + Expect(cond).To(BeNil()) + + cond = apimeta.FindStatusCondition(resource.Status.Conditions, feastdevv1alpha1.OnlineStoreReadyType) + Expect(cond).To(BeNil()) + + Expect(resource.Status.Phase).To(Equal(feastdevv1alpha1.ReadyPhase)) + + // check offline deployment + deploy := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + + // check online deployment + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + + // check registry deployment + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Replicas).To(Equal(&services.DefaultReplicas)) + Expect(controllerutil.HasControllerReference(deploy)).To(BeTrue()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Volumes).To(HaveLen(0)) + Expect(deploy.Spec.Template.Spec.Containers[0].VolumeMounts).To(HaveLen(0)) + + // update S3 additional args and reconcile + resourceNew := resource.DeepCopy() + newS3AdditionalKwargs := make(map[string]string) + for k, v := range s3AdditionalKwargs { + newS3AdditionalKwargs[k] = v + } + newS3AdditionalKwargs["key3"] = "value3" + resourceNew.Spec.Services.Registry.Local.Persistence.FilePersistence.S3AdditionalKwargs = &newS3AdditionalKwargs + err = k8sClient.Update(ctx, resourceNew) + Expect(err).NotTo(HaveOccurred()) + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource = &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + feast.FeatureStore = resource + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.S3AdditionalKwargs).NotTo(BeNil()) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.S3AdditionalKwargs).NotTo(Equal(&s3AdditionalKwargs)) + Expect(resource.Status.Applied.Services.Registry.Local.Persistence.FilePersistence.S3AdditionalKwargs).To(Equal(&newS3AdditionalKwargs)) + + // check registry deployment + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Volumes).To(HaveLen(0)) + Expect(deploy.Spec.Template.Spec.Containers[0].VolumeMounts).To(HaveLen(0)) + + }) + + It("should properly encode a feature_store.yaml config", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + req, err := labels.NewRequirement(services.NameLabelKey, selection.Equals, []string{resource.Name}) + Expect(err).NotTo(HaveOccurred()) + labelSelector := labels.NewSelector().Add(*req) + listOpts := &client.ListOptions{Namespace: resource.Namespace, LabelSelector: labelSelector} + deployList := appsv1.DeploymentList{} + err = k8sClient.List(ctx, &deployList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(deployList.Items).To(HaveLen(1)) + + svcList := corev1.ServiceList{} + err = k8sClient.List(ctx, &svcList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(svcList.Items).To(HaveLen(1)) + + cmList := corev1.ConfigMapList{} + err = k8sClient.List(ctx, &cmList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(cmList.Items).To(HaveLen(1)) + + feast := services.FeastServices{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + } + + // check registry deployment + deploy := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy.Spec.Template.Spec.Containers[0].Env).To(HaveLen(1)) + env := getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + + // check registry config + fsYamlStr, err := feast.GetServiceFeatureStoreYamlBase64(services.RegistryFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err := base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfig := &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfig) + Expect(err).NotTo(HaveOccurred()) + testConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + Registry: services.RegistryConfig{ + RegistryType: services.RegistryFileConfigType, + Path: registryPath, + S3AdditionalKwargs: &s3AdditionalKwargs, + }, + } + Expect(repoConfig).To(Equal(testConfig)) + + // check offline deployment + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + + // check online deployment + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + + // check client config + cm := &corev1.ConfigMap{} + name := feast.GetFeastServiceName(services.ClientFeastType) + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: name, + Namespace: resource.Namespace, + }, + cm) + Expect(err).NotTo(HaveOccurred()) + repoConfigClient := &services.RepoConfig{} + err = yaml.Unmarshal([]byte(cm.Data[services.FeatureStoreYamlCmKey]), repoConfigClient) + Expect(err).NotTo(HaveOccurred()) + clientConfig := &services.RepoConfig{ + Project: feastProject, + Provider: services.LocalProviderType, + EntityKeySerializationVersion: feastdevv1alpha1.SerializationVersion, + Registry: services.RegistryConfig{ + RegistryType: services.RegistryRemoteConfigType, + Path: fmt.Sprintf("feast-%s-registry.default.svc.cluster.local:80", resourceName), + }, + } + Expect(repoConfigClient).To(Equal(clientConfig)) + + // remove S3 additional keywords and reconcile + resourceNew := resource.DeepCopy() + resourceNew.Spec.Services.Registry.Local.Persistence.FilePersistence.S3AdditionalKwargs = nil + err = k8sClient.Update(ctx, resourceNew) + Expect(err).NotTo(HaveOccurred()) + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource = &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + feast.FeatureStore = resource + + // check registry config + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.RegistryFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).NotTo(HaveOccurred()) + env = getFeatureStoreYamlEnvVar(deploy.Spec.Template.Spec.Containers[0].Env) + Expect(env).NotTo(BeNil()) + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.RegistryFeastType) + Expect(err).NotTo(HaveOccurred()) + Expect(fsYamlStr).To(Equal(env.Value)) + + envByte, err = base64.StdEncoding.DecodeString(env.Value) + Expect(err).NotTo(HaveOccurred()) + repoConfig = &services.RepoConfig{} + err = yaml.Unmarshal(envByte, repoConfig) + Expect(err).NotTo(HaveOccurred()) + testConfig.Registry.S3AdditionalKwargs = nil + Expect(repoConfig).To(Equal(testConfig)) + + // check offline deployment + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + + // check online deployment + deploy = &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + }) + }) +}) diff --git a/infra/feast-operator/internal/controller/services/repo_config.go b/infra/feast-operator/internal/controller/services/repo_config.go index 7341271c673..34b62ac30ad 100644 --- a/infra/feast-operator/internal/controller/services/repo_config.go +++ b/infra/feast-operator/internal/controller/services/repo_config.go @@ -88,14 +88,17 @@ func getServiceRepoConfig(feastType FeastServiceType, featureStore *feastdevv1al // Registry server only has a `registry` section if isLocalRegistry { path := DefaultRegistryEphemeralPath + var s3AdditionalKwargs *map[string]string if services != nil && services.Registry != nil && services.Registry.Local != nil && services.Registry.Local.Persistence != nil && services.Registry.Local.Persistence.FilePersistence != nil { filePersistence := services.Registry.Local.Persistence.FilePersistence path = getActualPath(filePersistence.Path, filePersistence.PvcConfig) + s3AdditionalKwargs = filePersistence.S3AdditionalKwargs } repoConfig.Registry = RegistryConfig{ - RegistryType: RegistryFileConfigType, - Path: path, + RegistryType: RegistryFileConfigType, + Path: path, + S3AdditionalKwargs: s3AdditionalKwargs, } repoConfig.OfflineStore = OfflineStoreConfig{} repoConfig.OnlineStore = OnlineStoreConfig{} diff --git a/infra/feast-operator/internal/controller/services/services_types.go b/infra/feast-operator/internal/controller/services/services_types.go index e32c5cf6cae..f67f8c0e465 100644 --- a/infra/feast-operator/internal/controller/services/services_types.go +++ b/infra/feast-operator/internal/controller/services/services_types.go @@ -185,8 +185,9 @@ type OnlineStoreConfig struct { // RegistryConfig is the configuration that relates to reading from and writing to the Feast registry. type RegistryConfig struct { - Path string `yaml:"path,omitempty"` - RegistryType RegistryConfigType `yaml:"registry_type,omitempty"` + Path string `yaml:"path,omitempty"` + RegistryType RegistryConfigType `yaml:"registry_type,omitempty"` + S3AdditionalKwargs *map[string]string `json:"s3_additional_kwargs,omitempty"` } type deploymentSettings struct { diff --git a/infra/feast-operator/test/api/featurestore_types_test.go b/infra/feast-operator/test/api/featurestore_types_test.go index 7e8a448f199..ac690bb7c2c 100644 --- a/infra/feast-operator/test/api/featurestore_types_test.go +++ b/infra/feast-operator/test/api/featurestore_types_test.go @@ -67,6 +67,24 @@ func onlineStoreWithRelativePathForEphemeral(featureStore *feastdevv1alpha1.Feat return copy } +func onlineStoreWithObjectStoreBucketForPvc(path string, featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + copy := featureStore.DeepCopy() + copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + OnlineStore: &feastdevv1alpha1.OnlineStore{ + Persistence: &feastdevv1alpha1.OnlineStorePersistence{ + FilePersistence: &feastdevv1alpha1.OnlineStoreFilePersistence{ + Path: path, + PvcConfig: &feastdevv1alpha1.PvcConfig{ + Create: &feastdevv1alpha1.PvcCreate{}, + MountPath: "/data/online", + }, + }, + }, + }, + } + return copy +} + func offlineStoreWithUnmanagedFileType(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { copy := featureStore.DeepCopy() copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ @@ -111,6 +129,58 @@ func registryWithRelativePathForEphemeral(featureStore *feastdevv1alpha1.Feature } return copy } +func registryWithObjectStoreBucketForPvc(path string, featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + copy := featureStore.DeepCopy() + copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: path, + PvcConfig: &feastdevv1alpha1.PvcConfig{ + Create: &feastdevv1alpha1.PvcCreate{}, + MountPath: "/data/registry", + }, + }, + }, + }, + }, + } + return copy +} +func registryWithS3AdditionalKeywordsForFile(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + copy := featureStore.DeepCopy() + copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: "/data/online_store.db", + S3AdditionalKwargs: &map[string]string{}, + }, + }, + }, + }, + } + return copy +} +func registryWithS3AdditionalKeywordsForGsBucket(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { + copy := featureStore.DeepCopy() + copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ + Registry: &feastdevv1alpha1.Registry{ + Local: &feastdevv1alpha1.LocalRegistryConfig{ + Persistence: &feastdevv1alpha1.RegistryPersistence{ + FilePersistence: &feastdevv1alpha1.RegistryFilePersistence{ + Path: "gs://online_store.db", + S3AdditionalKwargs: &map[string]string{}, + }, + }, + }, + }, + } + return copy +} + func pvcConfigWithNeitherRefNorCreate(featureStore *feastdevv1alpha1.FeatureStore) *feastdevv1alpha1.FeatureStore { copy := featureStore.DeepCopy() copy.Spec.Services = &feastdevv1alpha1.FeatureStoreServices{ @@ -239,6 +309,10 @@ var _ = Describe("FeatureStore API", func() { It("should fail when ephemeral persistence has relative path", func() { attemptInvalidCreationAndAsserts(ctx, onlineStoreWithRelativePathForEphemeral(featurestore), "Ephemeral stores must have absolute paths") }) + It("should fail when PVC persistence has object store bucket", func() { + attemptInvalidCreationAndAsserts(ctx, onlineStoreWithObjectStoreBucketForPvc("s3://bucket/online_store.db", featurestore), "Online store does not support S3 or GS") + attemptInvalidCreationAndAsserts(ctx, onlineStoreWithObjectStoreBucketForPvc("gs://bucket/online_store.db", featurestore), "Online store does not support S3 or GS") + }) }) Context("When creating an invalid Offline Store", func() { @@ -256,7 +330,15 @@ var _ = Describe("FeatureStore API", func() { attemptInvalidCreationAndAsserts(ctx, registryWithAbsolutePathForPvc(featurestore), "PVC path must be a file name only") }) It("should fail when ephemeral persistence has relative path", func() { - attemptInvalidCreationAndAsserts(ctx, registryWithRelativePathForEphemeral(featurestore), "Ephemeral stores must have absolute paths") + attemptInvalidCreationAndAsserts(ctx, registryWithRelativePathForEphemeral(featurestore), "Registry files must use absolute paths or be S3 ('s3://') or GS ('gs://')") + }) + It("should fail when PVC persistence has object store bucket", func() { + attemptInvalidCreationAndAsserts(ctx, registryWithObjectStoreBucketForPvc("s3://bucket/registry.db", featurestore), "PVC persistence does not support S3 or GS object store URIs") + attemptInvalidCreationAndAsserts(ctx, registryWithObjectStoreBucketForPvc("gs://bucket/registry.db", featurestore), "PVC persistence does not support S3 or GS object store URIs") + }) + It("should fail when additional S3 settings are provided to non S3 bucket", func() { + attemptInvalidCreationAndAsserts(ctx, registryWithS3AdditionalKeywordsForFile(featurestore), "Additional S3 settings are available only for S3 object store URIs") + attemptInvalidCreationAndAsserts(ctx, registryWithS3AdditionalKeywordsForGsBucket(featurestore), "Additional S3 settings are available only for S3 object store URIs") }) })