Skip to content

Commit

Permalink
APIT-2476: revert (#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmalikconfluent authored Oct 25, 2024
1 parent 056ee56 commit bafad00
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 80 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

**New features:**
* Added support for the new `latest_offsets` and `latest_offsets_timestamp` attributes of `confluent_flink_statement` [resource](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_flink_statement) in a [General Availability lifecycle stage](https://docs.confluent.io/cloud/current/api.html#section/Versioning/API-Lifecycle-Policy).
* Added support for the new `properties_sensitive` block of `confluent_flink_statement` [resource](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_flink_statement) in a [General Availability lifecycle stage](https://docs.confluent.io/cloud/current/api.html#section/Versioning/API-Lifecycle-Policy) to resolve [#397](/~https://github.com/confluentinc/terraform-provider-confluent/issues/397).
* Added support for the new `versions` block of `confluent_flink_artifact` [resource](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_flink_artifact) and [data source](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/data-sources/confluent_flink_artifact) in a [General Availability lifecycle stage](https://docs.confluent.io/cloud/current/api.html#section/Versioning/API-Lifecycle-Policy).

**Examples:**
Expand Down
20 changes: 0 additions & 20 deletions docs/resources/confluent_flink_statement.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,6 @@ resource "confluent_flink_statement" "example" {
}
```

Example of `confluent_flink_statement` that creates a model:
```
resource "confluent_flink_statement" "example" {
statement = "CREATE MODEL `vector_encoding` INPUT (input STRING) OUTPUT (vector ARRAY<FLOAT>) WITH( 'TASK' = 'classification','PROVIDER' = 'OPENAI','OPENAI.ENDPOINT' = 'https://api.openai.com/v1/embeddings','OPENAI.API_KEY' = '{{sessionconfig/sql.secrets.openaikey}}');"
properties = {
"sql.current-catalog" = var.confluent_environment_display_name
"sql.current-database" = var.confluent_kafka_cluster_display_name
}
properties_sensitive = {
"sql.secrets.openaikey" : "***REDACTED***"
}
lifecycle {
prevent_destroy = true
}
}
```

<!-- schema generated by tfplugindocs -->
## Argument Reference

Expand Down Expand Up @@ -124,9 +107,6 @@ The following arguments are supported:
- `properties` - (Optional Map) The custom topic settings to set:
- `name` - (Required String) The setting name, for example, `sql.local-time-zone`.
- `value` - (Required String) The setting value, for example, `GMT-08:00`.
- `properties_sensitive` - (Optional Map) Block for sensitive statement properties:
- `name` - (Required String) The setting name, for example, `sql.secrets.openaikey`.
- `value` - (Required String) The setting value, for example, `s1234`.

- `stopped` - (Optional Boolean) The boolean flag to control whether the running Flink Statement should be stopped. Defaults to `false`. Update it to `true` to stop the statement. Subsequently, update it to `false` to resume the statement.

Expand Down
59 changes: 5 additions & 54 deletions internal/provider/resource_flink_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/samber/lo"
"net/http"
"regexp"
"strings"
"time"
)

Expand All @@ -35,7 +33,6 @@ const (
paramStatement = "statement"
paramComputePool = "compute_pool"
paramProperties = "properties"
paramPropertiesSensitive = "properties_sensitive"
paramStopped = "stopped"
paramLatestOffsets = "latest_offsets"
paramLatestOffsetsTimestamp = "latest_offsets_timestamp"
Expand Down Expand Up @@ -84,16 +81,6 @@ func flinkStatementResource() *schema.Resource {
Optional: true,
Computed: true,
},
paramPropertiesSensitive: {
Type: schema.TypeMap,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Sensitive: true,
Optional: true,
Computed: true,
ForceNew: false,
},
paramStopped: {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -164,14 +151,11 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte
}

statement := d.Get(paramStatement).(string)

mergedProperties, sensitiveProperties, _ := extractFlinkProperties(d)

tflog.Debug(ctx, fmt.Sprintf("SENSITIVE VALUES: %s", sensitiveProperties))
properties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{}))

spec := fgb.NewSqlV1StatementSpec()
spec.SetStatement(statement)
spec.SetProperties(mergedProperties)
spec.SetProperties(properties)
spec.SetComputePoolId(computePoolId)
spec.SetPrincipal(principalId)

Expand All @@ -183,11 +167,6 @@ func flinkStatementCreate(ctx context.Context, d *schema.ResourceData, meta inte
if err != nil {
return diag.Errorf("error creating Flink Statement: error marshaling %#v to json: %s", createFlinkStatementRequest, createDescriptiveError(err))
}

if err := d.Set(paramPropertiesSensitive, sensitiveProperties); err != nil {
return diag.FromErr(createDescriptiveError(err))
}

tflog.Debug(ctx, fmt.Sprintf("Creating new Flink Statement: %s", createFlinkStatementRequestJson))

createdFlinkStatement, _, err := executeFlinkStatementCreate(flinkRestClient.apiContext(ctx), flinkRestClient, createFlinkStatementRequest)
Expand Down Expand Up @@ -260,8 +239,8 @@ func flinkStatementUpdate(ctx context.Context, d *schema.ResourceData, meta inte
// Updating anything else is not supported at this moment
// stopped: false -> true to trigger flinkStatementStop
// stopped: true -> false to trigger flinkStatementResume
if d.HasChangesExcept(paramStopped, paramPropertiesSensitive) {
return diag.Errorf("error updating Flink Statement %q: only %q and %q attribute can be updated for Flink Statement", d.Id(), paramStopped, paramPropertiesSensitive)
if d.HasChangeExcept(paramStopped) {
return diag.Errorf(`error updating Flink Statement %q: only %q attribute can be updated for Flink Statement, "true" -> "false" to trigger resuming, "false" -> "true" to trigger stopping`, d.Id(), paramStopped)
}

if d.Get(paramStopped).(bool) == false {
Expand Down Expand Up @@ -380,7 +359,7 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta
if err := d.Set(paramStatement, statement.Spec.GetStatement()); err != nil {
return nil, err
}
if err := d.Set(paramProperties, extractNonsensitiveProperties(statement.Spec.GetProperties())); err != nil {
if err := d.Set(paramProperties, statement.Spec.GetProperties()); err != nil {
return nil, err
}
if err := d.Set(paramStopped, statement.Spec.GetStopped()); err != nil {
Expand Down Expand Up @@ -418,21 +397,6 @@ func setFlinkStatementAttributes(d *schema.ResourceData, c *FlinkRestClient, sta
return d, nil
}

func extractNonsensitiveProperties(properties map[string]string) map[string]string {
nonsensitiveProperties := make(map[string]string)

for propertiesSettingName, propertiesSettingValue := range properties {
// Skip all sensitive config settings since we don't want to store them in TF state
isSensitiveSetting := strings.HasPrefix(propertiesSettingName, "sql.secrets")
if isSensitiveSetting {
continue
}
nonsensitiveProperties[propertiesSettingName] = propertiesSettingValue
}

return nonsensitiveProperties
}

func flinkStatementDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
tflog.Debug(ctx, fmt.Sprintf("Deleting Flink Statement %q", d.Id()), map[string]interface{}{flinkStatementLoggingKey: d.Id()})

Expand Down Expand Up @@ -650,19 +614,6 @@ func extractFlinkPrincipalId(client *Client, d *schema.ResourceData, isImportOpe
return "", fmt.Errorf("one of provider.flink_principal_id (defaults to FLINK_PRINCIPAL_ID environment variable) or resource.principal.id must be set")
}

func extractFlinkProperties(d *schema.ResourceData) (map[string]string, map[string]string, map[string]string) {
sensitiveProperties := convertToStringStringMap(d.Get(paramPropertiesSensitive).(map[string]interface{}))
nonsensitiveProperties := convertToStringStringMap(d.Get(paramProperties).(map[string]interface{}))

// Merge both configs
properties := lo.Assign(
nonsensitiveProperties,
sensitiveProperties,
)

return properties, sensitiveProperties, nonsensitiveProperties
}

func createFlinkStatementId(environmentId, computePoolId, statementName string) string {
return fmt.Sprintf("%s/%s/%s", environmentId, computePoolId, statementName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"),
Expand Down Expand Up @@ -235,7 +234,6 @@ func TestAccFlinkStatementWithEnhancedProviderBlock(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "0"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key"),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.secret"),
Expand Down
3 changes: 0 additions & 3 deletions internal/provider/resource_flink_statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func TestAccFlinkStatement(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampEmptyValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey),
Expand Down Expand Up @@ -270,7 +269,6 @@ func TestAccFlinkStatement(t *testing.T) {
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "latest_offsets_timestamp", latestOffsetsTimestampStoppedValueTest),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "properties.%", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, fmt.Sprintf("properties.%s", flinkFirstPropertyKeyTest), flinkFirstPropertyValueTest),
resource.TestCheckNoResourceAttr(fullFlinkStatementResourceLabel, "sql.secrets.openaikey"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.#", "1"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.%", "2"),
resource.TestCheckResourceAttr(fullFlinkStatementResourceLabel, "credentials.0.key", kafkaApiKey),
Expand Down Expand Up @@ -329,7 +327,6 @@ func testAccCheckFlinkStatement(confluentCloudBaseUrl, mockServerUrl string) str
properties = {
"%s" = "%s"
}
}
`, confluentCloudBaseUrl, flinkStatementResourceLabel, kafkaApiKey, kafkaApiSecret, mockServerUrl, flinkPrincipalIdTest,
flinkOrganizationIdTest, flinkEnvironmentIdTest, flinkComputePoolIdTest,
Expand Down

0 comments on commit bafad00

Please sign in to comment.