diff --git a/pkg/resources/pipe.go b/pkg/resources/pipe.go index d47ea05736..1676d31d56 100644 --- a/pkg/resources/pipe.go +++ b/pkg/resources/pipe.go @@ -200,8 +200,9 @@ func ReadPipe(d *schema.ResourceData, meta interface{}) error { } if strings.Contains(pipe.NotificationChannel, "arn:aws:sns:") { - err = d.Set("aws_sns_topic_arn", pipe.NotificationChannel) - return err + if err := d.Set("aws_sns_topic_arn", pipe.NotificationChannel); err != nil { + return err + } } if err := d.Set("error_integration", pipe.ErrorIntegration); err != nil { @@ -236,10 +237,10 @@ func UpdatePipe(d *schema.ResourceData, meta interface{}) error { if d.HasChange("error_integration") { if errorIntegration, ok := d.GetOk("error_integration"); ok { runSetStatement = true - pipeSet.Comment = sdk.String(errorIntegration.(string)) + pipeSet.ErrorIntegration = sdk.String(errorIntegration.(string)) } else { runUnsetStatement = true - pipeUnset.Comment = sdk.Bool(true) + pipeUnset.ErrorIntegration = sdk.Bool(true) } } diff --git a/pkg/sdk/pipes.go b/pkg/sdk/pipes.go index 928d01992a..6306b3f3db 100644 --- a/pkg/sdk/pipes.go +++ b/pkg/sdk/pipes.go @@ -56,6 +56,7 @@ type PipeSet struct { } type PipeUnset struct { + ErrorIntegration *bool `ddl:"keyword" sql:"ERROR_INTEGRATION"` PipeExecutionPaused *bool `ddl:"keyword" sql:"PIPE_EXECUTION_PAUSED"` Comment *bool `ddl:"keyword" sql:"COMMENT"` } diff --git a/pkg/sdk/pipes_test.go b/pkg/sdk/pipes_test.go index f6ccd7fdeb..489f2b8023 100644 --- a/pkg/sdk/pipes_test.go +++ b/pkg/sdk/pipes_test.go @@ -93,7 +93,7 @@ func TestPipesAlter(t *testing.T) { t.Run("validation: no property to unset", func(t *testing.T) { opts := defaultOpts() opts.Unset = &PipeUnset{} - assertOptsInvalidJoinedErrors(t, opts, errAtLeastOneOf("AlterPipeOptions.Unset", "PipeExecutionPaused", "Comment")) + assertOptsInvalidJoinedErrors(t, opts, errAtLeastOneOf("AlterPipeOptions.Unset", "ErrorIntegration", "PipeExecutionPaused", "Comment")) }) t.Run("set tag: single", func(t *testing.T) { @@ -154,10 +154,11 @@ func TestPipesAlter(t *testing.T) { opts := defaultOpts() opts.IfExists = Bool(true) opts.Unset = &PipeUnset{ + ErrorIntegration: Bool(true), PipeExecutionPaused: Bool(true), Comment: Bool(true), } - assertOptsValidAndSQLEquals(t, opts, `ALTER PIPE IF EXISTS %s UNSET PIPE_EXECUTION_PAUSED, COMMENT`, id.FullyQualifiedName()) + assertOptsValidAndSQLEquals(t, opts, `ALTER PIPE IF EXISTS %s UNSET ERROR_INTEGRATION, PIPE_EXECUTION_PAUSED, COMMENT`, id.FullyQualifiedName()) }) t.Run("refresh", func(t *testing.T) { diff --git a/pkg/sdk/pipes_validations.go b/pkg/sdk/pipes_validations.go index f87272f466..fe40944e94 100644 --- a/pkg/sdk/pipes_validations.go +++ b/pkg/sdk/pipes_validations.go @@ -49,8 +49,8 @@ func (opts *AlterPipeOptions) validate() error { } } if unset := opts.Unset; valueSet(unset) { - if !anyValueSet(unset.PipeExecutionPaused, unset.Comment) { - errs = append(errs, errAtLeastOneOf("AlterPipeOptions.Unset", "PipeExecutionPaused", "Comment")) + if !anyValueSet(unset.ErrorIntegration, unset.PipeExecutionPaused, unset.Comment) { + errs = append(errs, errAtLeastOneOf("AlterPipeOptions.Unset", "ErrorIntegration", "PipeExecutionPaused", "Comment")) } } return errors.Join(errs...)