Skip to content
This repository has been archived by the owner on Oct 20, 2022. It is now read-only.

Commit

Permalink
Add option to Rename a table when restoring it (#293)
Browse files Browse the repository at this point in the history
* Add Rename to CassandraRestore and Update image

* Add Example and remove restorationStrategyType option

* Update CRD

* Fix unit test

* Reformat entities provided and support spaces w/wo commas

* Remove unused code and use INIT in mock

* Indent Makefile

* Update Icarus sidecar

* Create event when restore fails and some refactoring
  • Loading branch information
cscetbon authored Feb 2, 2021
1 parent 8a12e02 commit e5f1695
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 49 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ define debug_telepresence
sudo mkdir -p /var/run/secrets/kubernetes.io ; \
sudo ln -s /tmp/known/var/run/secrets/kubernetes.io/serviceaccount /var/run/secrets/kubernetes.io/ || true ; \
tdep=$(shell kubectl get deployment -l app=cassandra-operator -o jsonpath='{.items[0].metadata.name}') ; \
echo kubectl get deployment -l app=cassandra-operator -o jsonpath='{.items[0].metadata.name}' ; \
echo kubectl get deployment -l app=cassandra-operator -o jsonpath='{.items[0].metadata.name}' ; \
echo telepresence --swap-deployment $$tdep --mount=/tmp/known --env-file cassandra-operator.env $1 $2 ; \
telepresence --swap-deployment $$tdep --mount=/tmp/known --env-file cassandra-operator.env $1 $2
telepresence --swap-deployment $$tdep --mount=/tmp/known --env-file cassandra-operator.env $1 $2
endef

debug-telepresence:
Expand Down
1 change: 0 additions & 1 deletion deploy/crds/db.orange.com_cassandraclusters_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ spec:
to load along with each C* pod
properties:
image:
default: gcr.io/cassandra-operator/instaclustr-icarus:1.0.5
description: Image of backup/restore sidecar
type: string
imagePullPolicy:
Expand Down
4 changes: 4 additions & 0 deletions deploy/crds/db.orange.com_cassandrarestores_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ spec:
description: When set do not delete truncated SSTables after they've
been restored during CLEANUP phase. Defaults to false
type: boolean
rename:
additionalProperties:
type: string
type: object
schemaVersion:
description: Version of the schema to restore from. Upon backup, a
schema version is automatically appended to a snapshot name and
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-openapi/spec v0.19.4
github.com/golang/protobuf v1.4.2 // indirect
github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20201103172213-3e9c434b280c
github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20210115143219-869427dc2595
github.com/jarcoal/httpmock v1.0.4
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.1.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20201030135811-4e3bc4
github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20201030135811-4e3bc40abf96/go.mod h1:2+9I3yZFu2UU6G+fRrnJqUH9tl1iq2W3dCUkjzQTMBM=
github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20201103172213-3e9c434b280c h1:4jJ028DTFj1UOXAjn8tSffmyLabdY/QSTlGOahZrRMI=
github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20201103172213-3e9c434b280c/go.mod h1:2+9I3yZFu2UU6G+fRrnJqUH9tl1iq2W3dCUkjzQTMBM=
github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20210115143219-869427dc2595 h1:wbD83uZxDmoTmO4F4kmSL0yba6vYgHsyvjEIRWxSdzU=
github.com/instaclustr/instaclustr-icarus-go-client v0.0.0-20210115143219-869427dc2595/go.mod h1:2+9I3yZFu2UU6G+fRrnJqUH9tl1iq2W3dCUkjzQTMBM=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.2.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jarcoal/httpmock v1.0.4 h1:jp+dy/+nonJE4g4xbVtl9QdrUNbn6/3hDT5R4nDIZnA=
Expand Down
18 changes: 9 additions & 9 deletions pkg/apis/db/v1alpha1/cassandracluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ const (
DefaultReadinessHealthCheckTimeout int32 = 10
DefaultReadinessHealthCheckPeriod int32 = 10

defaultCassandraImage = "cassandra:3.11"
defaultBootstrapImage = "orangeopensource/cassandra-bootstrap:0.1.7"
defaultServiceAccountName = "cassandra-cluster-node"
InitContainerCmd = "cp -vr /etc/cassandra/* /bootstrap"
defaultMaxPodUnavailable = 1
defaultNumTokens = 256
defaultImagePullPolicy = v1.PullAlways
defaultCassandraImage = "cassandra:3.11"
defaultBootstrapImage = "orangeopensource/cassandra-bootstrap:0.1.7"
DefaultBackRestImage = "gcr.io/cassandra-operator/instaclustr-icarus:1.0.7"
defaultServiceAccountName = "cassandra-cluster-node"
InitContainerCmd = "cp -vr /etc/cassandra/* /bootstrap"
defaultMaxPodUnavailable = 1
defaultNumTokens = 256
defaultImagePullPolicy = v1.PullAlways

DefaultCassandraDC = "dc1"
DefaultCassandraRack = "rack1"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (cc *CassandraCluster) CheckDefaults() {

// BackupRestore default config
if ccs.BackRestSidecar == nil {
ccs.BackRestSidecar = &BackRestSidecar{}
ccs.BackRestSidecar = &BackRestSidecar{Image: DefaultBackRestImage}
}
}

Expand Down Expand Up @@ -954,7 +955,6 @@ type ServicePolicy struct {
// BackRestSidecar defines details about cassandra-sidecar to load along with each C* pod
type BackRestSidecar struct {
// Image of backup/restore sidecar
// +kubebuilder:default:="gcr.io/cassandra-operator/instaclustr-icarus:1.0.5"
Image string `json:"image,omitempty"`
// ImagePullPolicy define the pull policy for backrest sidecar docker image
ImagePullPolicy v1.PullPolicy `json:"imagePullPolicy,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/db/v1alpha1/cassandrarestore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type CassandraRestoreSpec struct {
// keyspace, e.g. 'k1,k2' if one wants to backup whole keyspaces or 'ks1.t1,ks2.t2' if one wants to restore specific
// tables. These formats are mutually exclusive so 'k1,k2.t2' is invalid. An empty field will restore all keyspaces
Entities string `json:"entities,omitempty"`
Rename map[string]string `json:"rename,omitempty"`
// Name of Secret to use when accessing cloud storage providers
Secret string `json:"secret,omitempty"`
}
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/db/v1alpha1/cassandrarestore_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func TestComputeStatusFromRestoreOperation(t *testing.T) {
operationID,
k8sSecretName,
storageLocation,
"HARDLINKS",
"TRUNCATE",
schemaVersion), &restoreOperation)

Expand Down
3 changes: 1 addition & 2 deletions pkg/apis/db/v1alpha1/common/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ func MockRestoreResponse(
operationId,
k8sSecretName,
storageLocation,
restorationStrategyType,
restorationPhase,
schemaVersion string) map[string]interface{} {

Expand All @@ -25,7 +24,7 @@ func MockRestoreResponse(
"cassandraDirectory": "/var/lib/cassandra",
"snapshotTag": snapshotTag,
"entities": "",
"restorationStrategyType": restorationStrategyType,
"restorationStrategyType": "HARDLINKS",
"restorationPhase": restorationPhase,
"noDeleteDownloads": noDeleteDownloads,
"schemaVersion": schemaVersion,
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/db/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 26 additions & 2 deletions pkg/backrest/backrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
regexSpaceOrComma = regexp.MustCompile("[\\s,]+")
)


type Client struct {
client cassandrabackup.Client
CoordinatorMember string
Expand All @@ -29,6 +34,22 @@ func NewClient(client client.Client, cc *api.CassandraCluster, pod *corev1.Pod)
return &Client{client: csClient, CoordinatorMember: pod.Name}, nil
}

func filterEmptyStrings(input []string) []string {
output := input[:0]

for _, x := range input {
if len(x) > 0 {
output = append(output, x)
}
}

return output
}

func formatEntities(entities string) string{
return strings.Join(filterEmptyStrings(regexSpaceOrComma.Split(strings.TrimSpace(entities), -1)), ",")
}

func (c *Client) PerformRestore(restore *api.CassandraRestore,
backup *api.CassandraBackup) (*api.BackRestStatus, error) {
restoreOperationRequest := &icarus.RestoreOperationRequest {
Expand All @@ -37,13 +58,14 @@ func (c *Client) PerformRestore(restore *api.CassandraRestore,
SnapshotTag: backup.Spec.SnapshotTag,
NoDeleteTruncates: restore.Spec.NoDeleteTruncates,
ExactSchemaVersion: restore.Spec.ExactSchemaVersion,
RestorationPhase: "DOWNLOAD",
RestorationPhase: "INIT",
GlobalRequest: true,
Import_: &icarus.AllOfRestoreOperationRequestImport_{
Type_: "import",
SourceDir: "/var/lib/cassandra/downloadedsstables",
},
Entities: restore.Spec.Entities,
Rename: restore.Spec.Rename,
K8sSecretName: restore.Spec.Secret,
CassandraDirectory: restore.Spec.CassandraDirectory,
SchemaVersion: restore.Spec.SchemaVersion,
Expand All @@ -59,6 +81,8 @@ func (c *Client) PerformRestore(restore *api.CassandraRestore,
restoreOperationRequest.Entities = backup.Spec.Entities
}

restoreOperationRequest.Entities = formatEntities(restoreOperationRequest.Entities)

if len(restore.Spec.Secret) == 0 {
restoreOperationRequest.K8sSecretName = backup.Spec.Secret
}
Expand All @@ -85,7 +109,7 @@ func (c *Client) PerformBackup(backup *api.CassandraBackup) (string, error) {
Duration: backup.Spec.Duration,
Bandwidth: bandwidthDataRate,
ConcurrentConnections: backup.Spec.ConcurrentConnections,
Entities: backup.Spec.Entities,
Entities: formatEntities(backup.Spec.Entities),
K8sSecretName: backup.Spec.Secret,
Dc: backup.Spec.Datacenter,
GlobalRequest: true,
Expand Down
14 changes: 13 additions & 1 deletion pkg/backrest/backrest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestPerformRestore(t *testing.T) {
StorageLocation: "gcp://backup-casskop-aguitton/cassandra-bgl/dc1/cassandra-bgl-dc1-rack1-0",
SnapshotTag: "SnapshotTag1",
Secret: "cloud-backup-secrets",
Entities: "ks1,ks2",
Entities: "ks1 ks2",
},
}

Expand Down Expand Up @@ -127,4 +127,16 @@ func TestParseBandwidth(t *testing.T) {
value, err = dataRateFromBandwidth("0.25M")
assert.NotNil(err)
assert.Nil(value)
}

func TestFormatEntities(t *testing.T) {
assert := assert.New(t)
expected := "k1,k2"

assert.Equal(expected, formatEntities("k1 k2"))
assert.Equal(expected, formatEntities(" k1 k2 "))
assert.Equal(expected, formatEntities(" k1 , k2 "))
assert.Equal(expected, formatEntities(" k1,k2 "))
assert.Equal(expected, formatEntities(" k1, k2 "))
assert.Equal(expected, formatEntities(" k1,, k2, "))
}
2 changes: 0 additions & 2 deletions pkg/cassandrabackup/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (m *mockCassandraBackupClient) PerformRestoreOperation(restoreOperation ica
operationID,
restoreOperation.K8sSecretName,
restoreOperation.StorageLocation,
restoreOperation.RestorationStrategyType,
restoreOperation.RestorationPhase,
restoreOperation.SchemaVersion), &restoreOp)

Expand All @@ -113,7 +112,6 @@ func (m *mockCassandraBackupClient) RestoreOperationByID(operationId string) (*i
operationId,
k8sSecretName,
storageLocation,
"HARDLINKS",
"TRUNCATE",
schemaVersion), &restoreOperation)
return &restoreOperation, nil
Expand Down
6 changes: 2 additions & 4 deletions pkg/cassandrabackup/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func performRestoreMock(codeStatus int) (*icarus.RestoreOperationResponse, error
operationID,
restoreOperationRequest.K8sSecretName,
restoreOperationRequest.StorageLocation,
"HARDLINKS",
"DOWNLOAD",
"INIT",
restoreOperationRequest.SchemaVersion))
})

Expand All @@ -103,8 +102,7 @@ func getRestoreMock(codeStatus int) (*icarus.RestoreOperationResponse, error) {
operationID,
k8sSecretName,
storageLocation,
"HARDLINKS",
"DOWNLOAD",
"INIT",
schemaVersion))
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/cassandracluster/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestGenerateCassandraStatefulSet(t *testing.T) {
checkDefaultInitContainerResources(t, stsDefault.Spec.Template.Spec.InitContainers)
resources := generateResourceList(defaultBackRestContainerRequestsCPU, defaultBackRestContainerRequestsMemory)
checkBackRestSidecar(t, stsDefault.Spec.Template.Spec.Containers,
"",
api.DefaultBackRestImage,
"",
v1.ResourceRequirements{
Requests: resources,
Expand Down
48 changes: 25 additions & 23 deletions pkg/controller/cassandrarestore/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ func (r ReconcileCassandraRestore) Reconcile(request reconcile.Request) (reconci
r.recorder.Event(cassandraRestore,
v1.EventTypeNormal,
"RestoreRequired",
fmt.Sprintf("Restore task required from cassandraBackup of datacenter %s of cluster %s to %s under snapshot %s. Restore operation on pod %s",
cassandraBackup.Spec.Datacenter, cassandraBackup.Spec.CassandraCluster,
cassandraBackup.Spec.StorageLocation, cassandraBackup.Spec.SnapshotTag,
cassandraRestore.Status.CoordinatorMember))
r.restoreEventMessage(cassandraBackup, ""))
return common.Reconciled()
}

Expand All @@ -119,9 +116,7 @@ func (r ReconcileCassandraRestore) Reconcile(request reconcile.Request) (reconci
cassandraRestore,
v1.EventTypeWarning,
"PerformRestoreOperationFailed",
fmt.Sprintf("Restore task from cassandraBackup of datacenter %s of cluster %s to %s under snapshot %s failed to run, will retry. Restore operation on pod %s", cassandraBackup.Spec.Datacenter, cassandraBackup.Spec.CassandraCluster,
cassandraBackup.Spec.StorageLocation, cassandraBackup.Spec.SnapshotTag,
cassandraRestore.Status.CoordinatorMember))
r.restoreEventMessage(cassandraBackup, " failed to run, will retry"))
return controllerruntime.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
Expand All @@ -132,10 +127,7 @@ func (r ReconcileCassandraRestore) Reconcile(request reconcile.Request) (reconci
r.recorder.Event(cassandraRestore,
v1.EventTypeNormal,
"RestoreInitiated",
fmt.Sprintf("Restore task initiated from cassandraBackup of datacenter %s of cluster %s to %s under snapshot %s. Restore operation %v on pod %s.",
cassandraBackup.Spec.Datacenter, cassandraBackup.Spec.CassandraCluster,
cassandraBackup.Spec.StorageLocation, cassandraBackup.Spec.SnapshotTag,
cassandraRestore.Status.ID, cassandraRestore.Status.CoordinatorMember))
r.restoreEventMessage(cassandraBackup, ""))

return common.Reconciled()
}
Expand All @@ -153,24 +145,31 @@ func (r ReconcileCassandraRestore) Reconcile(request reconcile.Request) (reconci
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errorfactory.CassandraBackupOperationFailure:
return controllerruntime.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
r.recorder.Event(cassandraRestore,
v1.EventTypeNormal,
"RestoreFailed",
r.restoreEventMessage(cassandraBackup, err.Error()))
return common.Reconciled()
default:
return common.RequeueWithError(reqLogger, err.Error(), err)
}
}
r.recorder.Event(cassandraRestore,
v1.EventTypeNormal,
"RestoreCompleted",
fmt.Sprintf("Restore task from cassandraBackup of datacenter %s of cluster %s to %s under snapshot %s is completed. Restore operation %v on pod %s.",
cassandraBackup.Spec.Datacenter, cassandraBackup.Spec.CassandraCluster,
cassandraBackup.Spec.StorageLocation, cassandraBackup.Spec.SnapshotTag,
cassandraRestore.Status.ID, cassandraRestore.Status.CoordinatorMember))
r.restoreEventMessage(cassandraBackup, ""))
}
return common.Reconciled()
}

func (r ReconcileCassandraRestore) restoreEventMessage(cassandraBackup *v1alpha1.CassandraBackup,
message string) string {
return fmt.Sprintf("Restore of backup %s of datacenter %s of cluster %s to %s " +
"under snapshot %s. %s", cassandraBackup.Name,
cassandraBackup.Spec.Datacenter, cassandraBackup.Spec.CassandraCluster, cassandraBackup.Spec.StorageLocation,
cassandraBackup.Spec.SnapshotTag, message)
}

// requiredRestore select restore coordinator on a specific member of a Cluster
func (r *ReconcileCassandraRestore) requiredRestore(restore *v1alpha1.CassandraRestore, cc *v1alpha1.CassandraCluster,
backup *v1alpha1.CassandraBackup, reqLogger *logrus.Entry) error {
Expand Down Expand Up @@ -251,7 +250,7 @@ func (r *ReconcileCassandraRestore) checkRestoreOperationState(restore *v1alpha1
reqLogger.Info("cassandra backup sidecar communication error checking running Operation", "OperationId",
restoreId)
return errorfactory.New(errorfactory.CassandraBackupSidecarNotReady{}, err,
"cassandra backup sidecar communication error")
"Icarus sidecar communication error")
}

status, err := sr.RestoreStatusByID(restoreId)
Expand All @@ -261,7 +260,7 @@ func (r *ReconcileCassandraRestore) checkRestoreOperationState(restore *v1alpha1
reqLogger.Info("cassandra backup sidecar communication error checking running Operation",
"OperationId", restoreId)
return errorfactory.New(errorfactory.CassandraBackupSidecarNotReady{}, err,
"cassandra backup sidecar communication error")
"Icarus sidecar communication error")
}

if err := UpdateRestoreStatus(r.client, restore, *status, reqLogger); err != nil {
Expand All @@ -272,10 +271,13 @@ func (r *ReconcileCassandraRestore) checkRestoreOperationState(restore *v1alpha1
restoreConditionType := v1alpha1.RestoreConditionType(restore.Status.Condition.Type)

// Restore operation failed or canceled,
// TODO : reschedule it by marking restore Condition.State = RestoreRequired ?
if restoreConditionType.IsInError() {
return errorfactory.New(errorfactory.CassandraBackupOperationFailure{}, err,
"cassandra backup sidecar Operation failed", fmt.Sprintf("restore operation id : %s", restoreId))
errorMessage := ""
if len(restore.Status.Condition.FailureCause) > 0 {
errorMessage = restore.Status.Condition.FailureCause[0].Message
}
return errorfactory.New(errorfactory.CassandraBackupOperationFailure{}, errors.New(errorMessage),
"Restore operation failed")
}

// Restore operation completed successfully
Expand Down
Loading

0 comments on commit e5f1695

Please sign in to comment.