Skip to content

Commit

Permalink
fix: snowpipe error integration (#2227)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonya authored Dec 1, 2023
1 parent 8fbc5cf commit 0b388bf
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
9 changes: 5 additions & 4 deletions pkg/resources/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ func ReadPipe(d *schema.ResourceData, meta interface{}) error {
}

if strings.Contains(pipe.NotificationChannel, "arn:aws:sns:") {
err = d.Set("aws_sns_topic_arn", pipe.NotificationChannel)
return err
if err := d.Set("aws_sns_topic_arn", pipe.NotificationChannel); err != nil {
return err
}
}

if err := d.Set("error_integration", pipe.ErrorIntegration); err != nil {
Expand Down Expand Up @@ -236,10 +237,10 @@ func UpdatePipe(d *schema.ResourceData, meta interface{}) error {
if d.HasChange("error_integration") {
if errorIntegration, ok := d.GetOk("error_integration"); ok {
runSetStatement = true
pipeSet.Comment = sdk.String(errorIntegration.(string))
pipeSet.ErrorIntegration = sdk.String(errorIntegration.(string))
} else {
runUnsetStatement = true
pipeUnset.Comment = sdk.Bool(true)
pipeUnset.ErrorIntegration = sdk.Bool(true)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sdk/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type PipeSet struct {
}

type PipeUnset struct {
ErrorIntegration *bool `ddl:"keyword" sql:"ERROR_INTEGRATION"`
PipeExecutionPaused *bool `ddl:"keyword" sql:"PIPE_EXECUTION_PAUSED"`
Comment *bool `ddl:"keyword" sql:"COMMENT"`
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sdk/pipes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestPipesAlter(t *testing.T) {
t.Run("validation: no property to unset", func(t *testing.T) {
opts := defaultOpts()
opts.Unset = &PipeUnset{}
assertOptsInvalidJoinedErrors(t, opts, errAtLeastOneOf("AlterPipeOptions.Unset", "PipeExecutionPaused", "Comment"))
assertOptsInvalidJoinedErrors(t, opts, errAtLeastOneOf("AlterPipeOptions.Unset", "ErrorIntegration", "PipeExecutionPaused", "Comment"))
})

t.Run("set tag: single", func(t *testing.T) {
Expand Down Expand Up @@ -154,10 +154,11 @@ func TestPipesAlter(t *testing.T) {
opts := defaultOpts()
opts.IfExists = Bool(true)
opts.Unset = &PipeUnset{
ErrorIntegration: Bool(true),
PipeExecutionPaused: Bool(true),
Comment: Bool(true),
}
assertOptsValidAndSQLEquals(t, opts, `ALTER PIPE IF EXISTS %s UNSET PIPE_EXECUTION_PAUSED, COMMENT`, id.FullyQualifiedName())
assertOptsValidAndSQLEquals(t, opts, `ALTER PIPE IF EXISTS %s UNSET ERROR_INTEGRATION, PIPE_EXECUTION_PAUSED, COMMENT`, id.FullyQualifiedName())
})

t.Run("refresh", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sdk/pipes_validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (opts *AlterPipeOptions) validate() error {
}
}
if unset := opts.Unset; valueSet(unset) {
if !anyValueSet(unset.PipeExecutionPaused, unset.Comment) {
errs = append(errs, errAtLeastOneOf("AlterPipeOptions.Unset", "PipeExecutionPaused", "Comment"))
if !anyValueSet(unset.ErrorIntegration, unset.PipeExecutionPaused, unset.Comment) {
errs = append(errs, errAtLeastOneOf("AlterPipeOptions.Unset", "ErrorIntegration", "PipeExecutionPaused", "Comment"))
}
}
return errors.Join(errs...)
Expand Down

0 comments on commit 0b388bf

Please sign in to comment.