diff --git a/docs/resources/stream_on_directory_table.md b/docs/resources/stream_on_directory_table.md
index 490ba2eae1..ec6abfdb5e 100644
--- a/docs/resources/stream_on_directory_table.md
+++ b/docs/resources/stream_on_directory_table.md
@@ -73,6 +73,7 @@ resource "snowflake_stream_on_directory_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
+- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.
### Nested Schema for `describe_output`
diff --git a/docs/resources/stream_on_external_table.md b/docs/resources/stream_on_external_table.md
index 3b773cbfcd..e885a93a91 100644
--- a/docs/resources/stream_on_external_table.md
+++ b/docs/resources/stream_on_external_table.md
@@ -90,6 +90,7 @@ resource "snowflake_stream_on_external_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
+- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.
### Nested Schema for `at`
diff --git a/docs/resources/stream_on_table.md b/docs/resources/stream_on_table.md
index 3af9472bd3..99ec11a4be 100644
--- a/docs/resources/stream_on_table.md
+++ b/docs/resources/stream_on_table.md
@@ -75,6 +75,7 @@ resource "snowflake_stream_on_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
+- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.
### Nested Schema for `at`
diff --git a/docs/resources/stream_on_view.md b/docs/resources/stream_on_view.md
index b8e5d908e7..6795301683 100644
--- a/docs/resources/stream_on_view.md
+++ b/docs/resources/stream_on_view.md
@@ -79,6 +79,7 @@ resource "snowflake_stream_on_view" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
+- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.
### Nested Schema for `at`
diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go
index 8e4cd10770..f72e461915 100644
--- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go
+++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go
@@ -115,3 +115,8 @@ func (s *StreamOnDirectoryTableResourceAssert) HasNoStale() *StreamOnDirectoryTa
s.AddAssertion(assert.ValueNotSet("stale"))
return s
}
+
+func (s *StreamOnDirectoryTableResourceAssert) HasStreamTypeString(expected string) *StreamOnDirectoryTableResourceAssert {
+ s.AddAssertion(assert.ValueSet("stream_type", expected))
+ return s
+}
diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go
index 78b963026d..6b61c0b45f 100644
--- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go
+++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go
@@ -145,3 +145,8 @@ func (s *StreamOnExternalTableResourceAssert) HasNoStale() *StreamOnExternalTabl
s.AddAssertion(assert.ValueNotSet("stale"))
return s
}
+
+func (s *StreamOnExternalTableResourceAssert) HasStreamTypeString(expected string) *StreamOnExternalTableResourceAssert {
+ s.AddAssertion(assert.ValueSet("stream_type", expected))
+ return s
+}
diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go
index 4175844bf4..e9fe89e6b8 100644
--- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go
+++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go
@@ -155,3 +155,8 @@ func (s *StreamOnTableResourceAssert) HasNoTable() *StreamOnTableResourceAssert
s.AddAssertion(assert.ValueNotSet("table"))
return s
}
+
+func (s *StreamOnTableResourceAssert) HasStreamTypeString(expected string) *StreamOnTableResourceAssert {
+ s.AddAssertion(assert.ValueSet("stream_type", expected))
+ return s
+}
diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go
index 10ba5a153f..8db3b75211 100644
--- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go
+++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go
@@ -155,3 +155,8 @@ func (s *StreamOnViewResourceAssert) HasNoView() *StreamOnViewResourceAssert {
s.AddAssertion(assert.ValueNotSet("view"))
return s
}
+
+func (s *StreamOnViewResourceAssert) HasStreamTypeString(expected string) *StreamOnViewResourceAssert {
+ s.AddAssertion(assert.ValueSet("stream_type", expected))
+ return s
+}
diff --git a/pkg/acceptance/helpers/stream_client.go b/pkg/acceptance/helpers/stream_client.go
index 9403063c9c..c027299800 100644
--- a/pkg/acceptance/helpers/stream_client.go
+++ b/pkg/acceptance/helpers/stream_client.go
@@ -43,6 +43,19 @@ func (c *StreamClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateOnT
return stream, c.DropFunc(t, req.GetName())
}
+func (c *StreamClient) CreateOnViewWithRequest(t *testing.T, req *sdk.CreateOnViewStreamRequest) (*sdk.Stream, func()) {
+ t.Helper()
+ ctx := context.Background()
+
+ err := c.client().CreateOnView(ctx, req)
+ require.NoError(t, err)
+
+ stream, err := c.client().ShowByID(ctx, req.GetName())
+ require.NoError(t, err)
+
+ return stream, c.DropFunc(t, req.GetName())
+}
+
func (c *StreamClient) Update(t *testing.T, request *sdk.AlterStreamRequest) {
t.Helper()
ctx := context.Background()
diff --git a/pkg/resources/custom_diffs.go b/pkg/resources/custom_diffs.go
index f884fc7e9b..5d2ac7de3c 100644
--- a/pkg/resources/custom_diffs.go
+++ b/pkg/resources/custom_diffs.go
@@ -228,6 +228,38 @@ func RecreateWhenSecretTypeChangedExternally(secretType sdk.SecretType) schema.C
}
}
+// RecreateWhenStreamTypeChangedExternally recreates a stream when argument streamType is different than in the state.
+func RecreateWhenStreamTypeChangedExternally(streamType sdk.StreamSourceType) schema.CustomizeDiffFunc {
+ return RecreateWhenResourceTypeChangedExternally("stream_type", streamType, sdk.ToStreamSourceType)
+}
+
+// RecreateWhenResourceTypeChangedExternally recreates a resource when argument wantType is different than the value in typeField.
+func RecreateWhenResourceTypeChangedExternally[T ~string](typeField string, wantType T, toType func(string) (T, error)) schema.CustomizeDiffFunc {
+ return func(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error {
+ if n := diff.Get(typeField); n != nil {
+ logging.DebugLogger.Printf("[DEBUG] new external value for %s: %s\n", typeField, n.(string))
+
+ gotTypeRaw := n.(string)
+ // if the type is empty, the state is empty - do not recreate
+ if gotTypeRaw == "" {
+ return nil
+ }
+
+ gotType, err := toType(gotTypeRaw)
+ if err != nil {
+ return fmt.Errorf("unknown type: %w", err)
+ }
+ if gotType != wantType {
+ // we have to set here a value instead of just SetNewComputed
+ // because with empty value (default snowflake behavior for type) ForceNew fails
+ // because there are no changes (at least from the SDKv2 point of view) for typeField
+ return errors.Join(diff.SetNew(typeField, ""), diff.ForceNew(typeField))
+ }
+ }
+ return nil
+ }
+}
+
// RecreateWhenStreamIsStale detects when the stream is stale, and sets a `false` value for `stale` field.
// This means that the provider can detect that change in `stale` from `true` to `false`, where `false` is our desired state.
func RecreateWhenStreamIsStale() schema.CustomizeDiffFunc {
diff --git a/pkg/resources/stream_common.go b/pkg/resources/stream_common.go
index b928bc743e..dde23c0ea8 100644
--- a/pkg/resources/stream_common.go
+++ b/pkg/resources/stream_common.go
@@ -52,6 +52,11 @@ var streamCommonSchema = map[string]*schema.Schema{
Optional: true,
Description: "Specifies a comment for the stream.",
},
+ "stream_type": {
+ Type: schema.TypeString,
+ Computed: true,
+ Description: "Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.",
+ },
ShowOutputAttributeName: {
Type: schema.TypeList,
Computed: true,
@@ -203,6 +208,7 @@ func handleStreamRead(d *schema.ResourceData,
) error {
return errors.Join(
d.Set("comment", stream.Comment),
+ d.Set("stream_type", stream.SourceType),
d.Set(ShowOutputAttributeName, []map[string]any{schemas.StreamToSchema(stream)}),
d.Set(DescribeOutputAttributeName, []map[string]any{schemas.StreamDescriptionToSchema(*streamDescription)}),
d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()),
diff --git a/pkg/resources/stream_on_directory_table.go b/pkg/resources/stream_on_directory_table.go
index f96d41ca56..9277c90616 100644
--- a/pkg/resources/stream_on_directory_table.go
+++ b/pkg/resources/stream_on_directory_table.go
@@ -41,6 +41,7 @@ func StreamOnDirectoryTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, ShowOutputAttributeName, "stage", "comment"),
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, DescribeOutputAttributeName, "stage", "comment"),
RecreateWhenStreamIsStale(),
+ RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeStage),
),
Schema: streamOnDirectoryTableSchema,
diff --git a/pkg/resources/stream_on_directory_table_acceptance_test.go b/pkg/resources/stream_on_directory_table_acceptance_test.go
index bb82e8745c..0f61963f93 100644
--- a/pkg/resources/stream_on_directory_table_acceptance_test.go
+++ b/pkg/resources/stream_on_directory_table_acceptance_test.go
@@ -20,6 +20,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
+ "github.com/stretchr/testify/require"
)
func TestAcc_StreamOnDirectoryTable_Basic(t *testing.T) {
@@ -459,3 +460,57 @@ func TestAcc_StreamOnDirectoryTable_InvalidConfiguration(t *testing.T) {
},
})
}
+
+func TestAcc_StreamOnDirectoryTable_ExternalStreamTypeChange(t *testing.T) {
+ id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
+ acc.TestAccPreCheck(t)
+ stage, cleanupStage := acc.TestClient().Stage.CreateStageWithDirectory(t)
+ t.Cleanup(cleanupStage)
+ model := model.StreamOnDirectoryTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), stage.ID().FullyQualifiedName())
+
+ resource.Test(t, resource.TestCase{
+ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
+ TerraformVersionChecks: []tfversion.TerraformVersionCheck{
+ tfversion.RequireAbove(tfversion.Version1_5_0),
+ },
+ CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
+ Steps: []resource.TestStep{
+ {
+ Config: config.FromModel(t, model),
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeStage),
+ ),
+ ),
+ },
+ // external change with a different type
+ {
+ PreConfig: func() {
+ table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
+ t.Cleanup(cleanupTable)
+ acc.TestClient().Stream.DropFunc(t, id)()
+ externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
+ t.Cleanup(cleanup)
+ require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType)
+ },
+ Config: config.FromModel(t, model),
+ ConfigPlanChecks: resource.ConfigPlanChecks{
+ PreApply: []plancheck.PlanCheck{
+ plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
+ },
+ },
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeStage),
+ ),
+ ),
+ },
+ },
+ })
+}
diff --git a/pkg/resources/stream_on_external_table.go b/pkg/resources/stream_on_external_table.go
index 9262e3595e..73748790eb 100644
--- a/pkg/resources/stream_on_external_table.go
+++ b/pkg/resources/stream_on_external_table.go
@@ -52,6 +52,7 @@ func StreamOnExternalTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, ShowOutputAttributeName, "external_table", "insert_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, DescribeOutputAttributeName, "external_table", "insert_only", "comment"),
RecreateWhenStreamIsStale(),
+ RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeExternalTable),
),
Schema: streamOnExternalTableSchema,
diff --git a/pkg/resources/stream_on_external_table_acceptance_test.go b/pkg/resources/stream_on_external_table_acceptance_test.go
index 292c383ee3..f3d9feb28f 100644
--- a/pkg/resources/stream_on_external_table_acceptance_test.go
+++ b/pkg/resources/stream_on_external_table_acceptance_test.go
@@ -25,6 +25,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
+ "github.com/stretchr/testify/require"
)
func TestAcc_StreamOnExternalTable_Basic(t *testing.T) {
@@ -890,3 +891,62 @@ func TestAcc_StreamOnExternalTable_InvalidConfiguration(t *testing.T) {
},
})
}
+
+func TestAcc_StreamOnExternalTable_ExternalStreamTypeChange(t *testing.T) {
+ id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
+ acc.TestAccPreCheck(t)
+ stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
+ stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName())
+ _, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID)
+ t.Cleanup(stageCleanup)
+
+ externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation)
+ t.Cleanup(externalTableCleanup)
+ model := model.StreamOnExternalTableBase("test", id, externalTable.ID())
+
+ resource.Test(t, resource.TestCase{
+ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
+ TerraformVersionChecks: []tfversion.TerraformVersionCheck{
+ tfversion.RequireAbove(tfversion.Version1_5_0),
+ },
+ CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
+ Steps: []resource.TestStep{
+ {
+ Config: config.FromModel(t, model),
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeExternalTable),
+ ),
+ ),
+ },
+ // external change with a different type
+ {
+ PreConfig: func() {
+ table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
+ t.Cleanup(cleanupTable)
+ acc.TestClient().Stream.DropFunc(t, id)()
+ externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
+ t.Cleanup(cleanup)
+ require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType)
+ },
+ Config: config.FromModel(t, model),
+ ConfigPlanChecks: resource.ConfigPlanChecks{
+ PreApply: []plancheck.PlanCheck{
+ plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
+ },
+ },
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeExternalTable),
+ ),
+ ),
+ },
+ },
+ })
+}
diff --git a/pkg/resources/stream_on_table.go b/pkg/resources/stream_on_table.go
index 4fdcceba23..7dfaced2b1 100644
--- a/pkg/resources/stream_on_table.go
+++ b/pkg/resources/stream_on_table.go
@@ -59,6 +59,7 @@ func StreamOnTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnTableSchema, ShowOutputAttributeName, "table", "append_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnTableSchema, DescribeOutputAttributeName, "table", "append_only", "comment"),
RecreateWhenStreamIsStale(),
+ RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeTable),
),
Schema: streamOnTableSchema,
diff --git a/pkg/resources/stream_on_table_acceptance_test.go b/pkg/resources/stream_on_table_acceptance_test.go
index 7f18495559..bc71c43560 100644
--- a/pkg/resources/stream_on_table_acceptance_test.go
+++ b/pkg/resources/stream_on_table_acceptance_test.go
@@ -25,6 +25,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
+ "github.com/stretchr/testify/require"
)
func TestAcc_StreamOnTable_Basic(t *testing.T) {
@@ -835,3 +836,59 @@ func TestAcc_StreamOnTable_InvalidConfiguration(t *testing.T) {
},
})
}
+
+func TestAcc_StreamOnTable_ExternalStreamTypeChange(t *testing.T) {
+ id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
+ acc.TestAccPreCheck(t)
+ table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
+ t.Cleanup(cleanupTable)
+
+ model := model.StreamOnTableBase("test", id, table.ID())
+
+ resource.Test(t, resource.TestCase{
+ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
+ TerraformVersionChecks: []tfversion.TerraformVersionCheck{
+ tfversion.RequireAbove(tfversion.Version1_5_0),
+ },
+ CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
+ Steps: []resource.TestStep{
+ {
+ Config: config.FromModel(t, model),
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnTableResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeTable),
+ ),
+ ),
+ },
+ // external change with a different type
+ {
+ PreConfig: func() {
+ statement := fmt.Sprintf("SELECT * FROM %s", table.ID().FullyQualifiedName())
+ view, cleanupView := acc.TestClient().View.CreateView(t, statement)
+ t.Cleanup(cleanupView)
+ acc.TestClient().Stream.DropFunc(t, id)()
+ externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnViewWithRequest(t, sdk.NewCreateOnViewStreamRequest(id, view.ID()))
+ t.Cleanup(cleanup)
+ require.Equal(t, sdk.StreamSourceTypeView, *externalChangeStream.SourceType)
+ },
+ Config: config.FromModel(t, model),
+ ConfigPlanChecks: resource.ConfigPlanChecks{
+ PreApply: []plancheck.PlanCheck{
+ plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
+ },
+ },
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnTableResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeTable),
+ ),
+ ),
+ },
+ },
+ })
+}
diff --git a/pkg/resources/stream_on_view.go b/pkg/resources/stream_on_view.go
index bf570fa5c3..c9e1bcb3a6 100644
--- a/pkg/resources/stream_on_view.go
+++ b/pkg/resources/stream_on_view.go
@@ -59,6 +59,7 @@ func StreamOnView() *schema.Resource {
ComputedIfAnyAttributeChanged(StreamOnViewSchema, ShowOutputAttributeName, "view", "append_only", "comment"),
ComputedIfAnyAttributeChanged(StreamOnViewSchema, DescribeOutputAttributeName, "view", "append_only", "comment"),
RecreateWhenStreamIsStale(),
+ RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeView),
),
Schema: StreamOnViewSchema,
diff --git a/pkg/resources/stream_on_view_acceptance_test.go b/pkg/resources/stream_on_view_acceptance_test.go
index fef512a7ef..279fdb1371 100644
--- a/pkg/resources/stream_on_view_acceptance_test.go
+++ b/pkg/resources/stream_on_view_acceptance_test.go
@@ -25,6 +25,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
+ "github.com/stretchr/testify/require"
)
func TestAcc_StreamOnView_Basic(t *testing.T) {
@@ -861,3 +862,61 @@ func TestAcc_StreamOnView_InvalidConfiguration(t *testing.T) {
},
})
}
+
+func TestAcc_StreamOnView_ExternalStreamTypeChange(t *testing.T) {
+ id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
+ acc.TestAccPreCheck(t)
+ table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
+ t.Cleanup(cleanupTable)
+ statement := fmt.Sprintf("SELECT * FROM %s", table.ID().FullyQualifiedName())
+ view, cleanupView := acc.TestClient().View.CreateView(t, statement)
+ t.Cleanup(cleanupView)
+
+ model := model.StreamOnView("test", id.DatabaseName(), id.Name(), id.SchemaName(), view.ID().FullyQualifiedName())
+
+ resource.Test(t, resource.TestCase{
+ ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
+ TerraformVersionChecks: []tfversion.TerraformVersionCheck{
+ tfversion.RequireAbove(tfversion.Version1_5_0),
+ },
+ CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
+ Steps: []resource.TestStep{
+ {
+ Config: config.FromModel(t, model),
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnViewResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeView)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeView),
+ ),
+ ),
+ },
+ // external change with a different type
+ {
+ PreConfig: func() {
+ table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
+ t.Cleanup(cleanupTable)
+ acc.TestClient().Stream.DropFunc(t, id)()
+ externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
+ t.Cleanup(cleanup)
+ require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType)
+ },
+ Config: config.FromModel(t, model),
+ ConfigPlanChecks: resource.ConfigPlanChecks{
+ PreApply: []plancheck.PlanCheck{
+ plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
+ },
+ },
+ Check: resource.ComposeTestCheckFunc(
+ assert.AssertThat(t,
+ resourceassert.StreamOnViewResource(t, model.ResourceReference()).
+ HasStreamTypeString(string(sdk.StreamSourceTypeView)),
+ resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
+ HasSourceType(sdk.StreamSourceTypeView),
+ ),
+ ),
+ },
+ },
+ })
+}
diff --git a/pkg/sdk/streams_dto_gen.go b/pkg/sdk/streams_dto_gen.go
index 25201a7d11..d657571efc 100644
--- a/pkg/sdk/streams_dto_gen.go
+++ b/pkg/sdk/streams_dto_gen.go
@@ -79,6 +79,10 @@ type CreateOnViewStreamRequest struct {
Comment *string
}
+func (r *CreateOnViewStreamRequest) GetName() SchemaObjectIdentifier {
+ return r.name
+}
+
type CloneStreamRequest struct {
OrReplace *bool
name SchemaObjectIdentifier // required