Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Spark incremental strategies #569

Merged
merged 2 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ the reliability of your `unique_key`, or the availability of certain features.

* [Snowflake](snowflake-configs#merge-behavior-incremental-models): `merge` (default), `delete+insert` (optional)
* [BigQuery](bigquery-configs#merge-behavior-incremental-models): `merge` (default), `insert_overwrite` (optional)
* [Spark](spark-configs#incremental-models): `insert_overwrite` (default), `merge` (optional, Delta-only)
* [Spark](spark-configs#incremental-models): `append` (default), `insert_overwrite` (optional), `merge` (optional, Delta-only)

### Configuring incremental strategy

Expand Down
105 changes: 88 additions & 17 deletions website/docs/reference/resource-configs/spark-configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ To-do:

## Configuring tables

When materializing a model as `table`, you may include several optional configs:
When materializing a model as `table`, you may include several optional configs that are specific to the dbt-spark plugin, in addition to the standard [model configs](model-configs).

| Option | Description | Required? | Example |
|---------|----------------------------------------------------|-------------------------|--------------------------|
Expand All @@ -27,15 +27,86 @@ When materializing a model as `table`, you may include several optional configs:

## Incremental models

The [`incremental_strategy` config](configuring-incremental-models#what-is-an-incremental_strategy) controls how dbt builds incremental models, and it can be set to one of two values:
- `insert_overwrite` (default)
- `merge` (Delta Lake only)
<Changelog>

- `dbt-spark==0.19.0`: Added the `append` strategy as default for all platforms, file types, and connection methods.

</Changelog>

dbt seeks to offer useful, intuitive modeling abstractions by means of its built-in configurations and materializations. Because there is so much variance between Apache Spark clusters out in the world—not to mention the powerful features offered to Databricks users by the Delta file format and custom runtime—making sense of all the available options is an undertaking in its own right.

For that reason, the dbt-spark plugin leans heavily on the [`incremental_strategy` config](configuring-incremental-models#what-is-an-incremental_strategy). This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of three values:
- **`append`** (default): Insert new records without updating or overwriting any existing data.
- **`insert_overwrite`**: If `partition_by` is specified, overwrite partitions in the table with new data. If no `partition_by` is specified, overwrite the entire table with new data.
- **`merge`** (Delta Lake only): Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.)

Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, `incremental_strategy` may be specified in `dbt_project.yml` or within a model file's `config()` block.

### The `append` strategy

Following the `append` strategy, dbt will perform an `insert into` statement with all new data. The appeal of this strategy is that it is straightforward and functional across all platforms, file types, connection methods, and Apache Spark versions. However, this strategy _cannot_ update, overwrite, or delete existing data, so it is likely to insert duplicate records for many data sources.

Specifying `append` as the incremental strategy is optional, since it's the default strategy used when none is specified.

<Tabs
defaultValue="source"
values={[
{ label: 'Source code', value: 'source', },
{ label: 'Run code', value: 'run', },
]
}>
<TabItem value="source">

<File name='spark_incremental.sql'>

```sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
```
</File>
</TabItem>
<TabItem value="run">

<File name='spark_incremental.sql'>

```sql
create temporary view spark_incremental__dbt_tmp as

select * from analytics.events

where event_ts >= (select max(event_ts) from {{ this }})

;

insert into table analytics.spark_incremental
select `date_day`, `users` from spark_incremental__dbt_tmp
```

</File>
</TabItem>
</Tabs>

### The `insert_overwrite` strategy

Apache Spark does not natively support `delete`, `update`, or `merge` statements. As such, Spark's default incremental behavior is different [from the standard](configuring-incremental-models).
This strategy is most effective when specified alongside a `partition_by` clause in your model config. dbt will run an [atomic `insert overwrite` statement](https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-dml-insert-overwrite-table.html) that dynamically replaces all partitions included in your query. Be sure to re-select _all_ of the relevant data for a partition when using this incremental strategy.

If no `partition_by` is specified, then the `insert_overwrite` strategy will atomically replace all contents of the table, overriding all existing data with only the new records. The column schema of the table remains the same, however. This can be desirable in some limited circumstances, since it minimizes downtime while the table contents are overwritten. The operation is comparable to running `truncate` + `insert` on other databases. For atomic replacement of Delta-formatted tables, use the `table` materialization (which runs `create or replace`) instead.

**Usage notes:**
- This strategy is not supported for tables with `file_format: delta`.
- This strategy is not available when connecting via Databricks SQL endpoints (`method: odbc` + `endpoint`).
- If connecting via a Databricks cluster + ODBC driver (`method: odbc` + `cluster`), you **must** include `set spark.sql.sources.partitionOverwriteMode DYNAMIC` in the [cluster Spark Config](https://docs.databricks.com/clusters/configure.html#spark-config) in order for dynamic partition replacement to work (`incremental_strategy: insert_overwrite` + `partition_by`).

To use incremental models, specify a `partition_by` clause in your model config. dbt will run an [atomic `insert overwrite` statement](https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-dml-insert-overwrite-table.html) that dynamically replaces all partitions included in your query. Be sure to re-select _all_ of the relevant data for a partition when using this incremental strategy.
<Lightbox src="/img/reference/databricks-cluster-sparkconfig-partition-overwrite.png" title="Databricks cluster: Spark Config" />

<Tabs
defaultValue="source"
Expand Down Expand Up @@ -117,18 +188,18 @@ insert overwrite table analytics.spark_incremental

### The `merge` strategy

:::info New in dbt-spark v0.15.3
<Changelog>

This functionality is new in dbt-spark v0.15.3. See [installation instructions](spark-profile#installation-and-distribution)
- `dbt-spark==0.15.3`: Introduced `merge` incremental strategy

</Changelog>

:::

There are three prerequisites for the `merge` incremental strategy:
- Creating the table in Delta file format
- Using Databricks Runtime 5.1 and above
- Specifying a `unique_key`
**Usage notes:** The `merge` incremental strategy requires:
- `file_format: delta`
- Databricks Runtime 5.1 and above

dbt will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery.
dbt will run an [atomic `merge` statement](https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html) which looks nearly identical to the default merge behavior on Snowflake and BigQuery. If a `unique_key` is specified (recommended), dbt will update old records with values from new records that match on the key column. If a `unique_key` is not specified, dbt will forgo match criteria and simply insert all new records (similar to `append` strategy).

<Tabs
defaultValue="source"
Expand Down Expand Up @@ -218,11 +289,11 @@ or `show table extended in [database] like '*'`.

## Always `schema`, never `database`

:::info New in dbt-spark v0.17.0
<Changelog>

This is a breaking change in dbt-spark v0.17.0. See [installation instructions](spark-profile#installation-and-distribution)
- `dbt-spark==0.17.0` ended use of `database` in all cases.

:::
</Changelog>

Apache Spark uses the terms "schema" and "database" interchangeably. dbt understands
`database` to exist at a higher level than `schema`. As such, you should _never_
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.