From d99ed623bc3bec816812e3fe294309404b192329 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 18 Nov 2020 16:59:59 +0100 Subject: [PATCH 1/9] Enable create or replace sql syntax With Delta we can do an atomic operation to replace the current version of the table, with a new version using the create or replace syntax. --- dbt/adapters/spark/column.py | 3 ++- dbt/adapters/spark/impl.py | 7 +++++-- dbt/adapters/spark/relation.py | 3 +++ dbt/include/spark/macros/adapters.sql | 6 +++++- dbt/include/spark/macros/materializations/table.sql | 4 +++- 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/dbt/adapters/spark/column.py b/dbt/adapters/spark/column.py index 2f5e851dc..6512efed9 100644 --- a/dbt/adapters/spark/column.py +++ b/dbt/adapters/spark/column.py @@ -2,6 +2,7 @@ from typing import TypeVar, Optional, Dict, Any from dbt.adapters.base.column import Column +from hologram import JsonDict Self = TypeVar('Self', bound='SparkColumn') @@ -54,7 +55,7 @@ def convert_table_stats(raw_stats: Optional[str]) -> Dict[str, Any]: table_stats[f'stats:{key}:include'] = True return table_stats - def to_dict(self, omit_none=False): + def to_dict(self, omit_none: bool = True, validate: bool = False) -> JsonDict: original_dict = super().to_dict(omit_none=omit_none) # If there are stats, merge them into the root of the dict original_stats = original_dict.pop('table_stats') diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 15f89f7df..86955eb76 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -2,6 +2,7 @@ from dataclasses import dataclass from typing import Optional, List, Dict, Any, Union, Iterable import agate +from dbt.contracts.relation import RelationType import dbt import dbt.exceptions @@ -131,11 +132,13 @@ def list_relations_without_caching( f'got {len(row)} values, expected 4' ) _schema, name, _, information = row - rel_type = ('view' if 'Type: VIEW' in information else 'table') + rel_type = (RelationType.View if 'Type: VIEW' in information else RelationType.Table) + is_delta = 'Provider: delta' in information relation = self.Relation.create( schema=_schema, identifier=name, - type=rel_type + type=rel_type, + is_delta=is_delta ) relations.append(relation) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 4aa06f820..507f51d3b 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,3 +1,5 @@ +from typing import Optional + from dataclasses import dataclass from dbt.adapters.base.relation import BaseRelation, Policy @@ -23,6 +25,7 @@ class SparkRelation(BaseRelation): quote_policy: SparkQuotePolicy = SparkQuotePolicy() include_policy: SparkIncludePolicy = SparkIncludePolicy() quote_character: str = '`' + is_delta: Optional[bool] = None def __post_init__(self): if self.database != self.schema and self.database: diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 02253fe5e..a45b0d1ae 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -77,7 +77,11 @@ {% if temporary -%} {{ create_temporary_view(relation, sql) }} {%- else -%} - create table {{ relation }} + {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %} + create or replace table {{ relation }} + {% else %} + create table {{ relation }} + {% endif %} {{ file_format_clause() }} {{ partition_cols(label="partitioned by") }} {{ clustered_cols(label="clustered by") }} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index d772a5548..adfdb7a3c 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -11,7 +11,9 @@ {{ run_hooks(pre_hooks) }} -- setup: if the target relation already exists, drop it - {% if old_relation -%} + -- in case if the existing and future table is delta, we want to do a + -- create or replace table instead of dropping, so we don't have the table unavailable + {% if old_relation and not (old_relation.is_delta and config.get('file_format', validator=validation.any[basestring]) == 'delta') -%} {{ adapter.drop_relation(old_relation) }} {%- endif %} From 880db896cf6f4759ead578c655e2956053162892 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 18 Nov 2020 19:54:49 +0100 Subject: [PATCH 2/9] Trim the whitespace --- test/unit/test_macros.py | 71 ++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/test/unit/test_macros.py b/test/unit/test_macros.py index 325e80b6c..e8d51e703 100644 --- a/test/unit/test_macros.py +++ b/test/unit/test_macros.py @@ -8,104 +8,86 @@ class TestSparkMacros(unittest.TestCase): def setUp(self): self.jinja_env = Environment(loader=FileSystemLoader('dbt/include/spark/macros'), - extensions=['jinja2.ext.do',]) + extensions=['jinja2.ext.do', ]) self.config = {} - - self.default_context = {} - self.default_context['validation'] = mock.Mock() - self.default_context['model'] = mock.Mock() - self.default_context['exceptions'] = mock.Mock() - self.default_context['config'] = mock.Mock() + self.default_context = { + 'validation': mock.Mock(), + 'model': mock.Mock(), + 'exceptions': mock.Mock(), + 'config': mock.Mock() + } self.default_context['config'].get = lambda key, default=None, **kwargs: self.config.get(key, default) - def __get_template(self, template_filename): return self.jinja_env.get_template(template_filename, globals=self.default_context) - def __run_macro(self, template, name, temporary, relation, sql): self.default_context['model'].alias = relation value = getattr(template.module, name)(temporary, relation, sql) return re.sub(r'\s\s+', ' ', value) - def test_macros_load(self): self.jinja_env.get_template('adapters.sql') - def test_macros_create_table_as(self): template = self.__get_template('adapters.sql') + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table as select 1") - + self.assertEqual(sql, "create table my_table as select 1") def test_macros_create_table_as_file_format(self): template = self.__get_template('adapters.sql') - self.config['file_format'] = 'delta' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table using delta as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table using delta as select 1") def test_macros_create_table_as_partition(self): template = self.__get_template('adapters.sql') - self.config['partition_by'] = 'partition_1' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table partitioned by (partition_1) as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table partitioned by (partition_1) as select 1") def test_macros_create_table_as_partitions(self): template = self.__get_template('adapters.sql') - self.config['partition_by'] = ['partition_1', 'partition_2'] - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table partitioned by (partition_1,partition_2) as select 1") - def test_macros_create_table_as_cluster(self): template = self.__get_template('adapters.sql') - self.config['clustered_by'] = 'cluster_1' self.config['buckets'] = '1' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table clustered by (cluster_1) into 1 buckets as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table clustered by (cluster_1) into 1 buckets as select 1") def test_macros_create_table_as_clusters(self): template = self.__get_template('adapters.sql') - self.config['clustered_by'] = ['cluster_1', 'cluster_2'] self.config['buckets'] = '1' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table clustered by (cluster_1,cluster_2) into 1 buckets as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table clustered by (cluster_1,cluster_2) into 1 buckets as select 1") def test_macros_create_table_as_location(self): template = self.__get_template('adapters.sql') - self.config['location_root'] = '/mnt/root' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table location '/mnt/root/my_table' as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table location '/mnt/root/my_table' as select 1") def test_macros_create_table_as_comment(self): template = self.__get_template('adapters.sql') - self.config['persist_docs'] = {'relation': True} self.default_context['model'].description = 'Description Test' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table comment 'Description Test' as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table comment 'Description Test' as select 1") def test_macros_create_table_as_all(self): template = self.__get_template('adapters.sql') @@ -118,5 +100,8 @@ def test_macros_create_table_as_all(self): self.config['persist_docs'] = {'relation': True} self.default_context['model'].description = 'Description Test' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1") + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual( + sql, + "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1" + ) From b5b69366f1906b7cccc9c14f6afd8446cd04e54b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 18 Nov 2020 20:06:16 +0100 Subject: [PATCH 3/9] Fix the test to create or replace --- test/unit/test_macros.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit/test_macros.py b/test/unit/test_macros.py index e8d51e703..5c5e3f8cf 100644 --- a/test/unit/test_macros.py +++ b/test/unit/test_macros.py @@ -41,7 +41,7 @@ def test_macros_create_table_as_file_format(self): self.config['file_format'] = 'delta' sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() - self.assertEqual(sql, "create table my_table using delta as select 1") + self.assertEqual(sql, "create or replace table my_table using delta as select 1") def test_macros_create_table_as_partition(self): template = self.__get_template('adapters.sql') @@ -103,5 +103,5 @@ def test_macros_create_table_as_all(self): sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() self.assertEqual( sql, - "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1" + "create or replace table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1" ) From 8cb6be56174edf6cc8d4c5c720b5d05a468de165 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 18 Nov 2020 20:22:00 +0100 Subject: [PATCH 4/9] Make flake8 happy --- dbt/adapters/spark/column.py | 4 +++- dbt/adapters/spark/impl.py | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/spark/column.py b/dbt/adapters/spark/column.py index 6512efed9..d8292f6ef 100644 --- a/dbt/adapters/spark/column.py +++ b/dbt/adapters/spark/column.py @@ -55,7 +55,9 @@ def convert_table_stats(raw_stats: Optional[str]) -> Dict[str, Any]: table_stats[f'stats:{key}:include'] = True return table_stats - def to_dict(self, omit_none: bool = True, validate: bool = False) -> JsonDict: + def to_dict( + self, omit_none: bool = True, validate: bool = False + ) -> JsonDict: original_dict = super().to_dict(omit_none=omit_none) # If there are stats, merge them into the root of the dict original_stats = original_dict.pop('table_stats') diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 86955eb76..0e8ec7b78 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -132,7 +132,8 @@ def list_relations_without_caching( f'got {len(row)} values, expected 4' ) _schema, name, _, information = row - rel_type = (RelationType.View if 'Type: VIEW' in information else RelationType.Table) + rel_type = RelationType.View \ + if 'Type: VIEW' in information else RelationType.Table is_delta = 'Provider: delta' in information relation = self.Relation.create( schema=_schema, From 8794c896937e2ac6bf46b9c3d5252d13d06bbf4a Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 11 Dec 2020 11:12:30 +0100 Subject: [PATCH 5/9] Use is_delta instead of file_format == 'delta' --- .../macros/materializations/incremental.sql | 21 +++++++++---------- .../macros/materializations/snapshot.sql | 14 ++++++------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 000659a8f..2fd4f9d37 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -24,7 +24,7 @@ {% do return(file_format) %} {% endmacro %} -{% macro dbt_spark_validate_get_incremental_strategy(file_format) %} +{% macro dbt_spark_validate_get_incremental_strategy(relation) %} {#-- Find and validate the incremental strategy #} {%- set strategy = config.get("incremental_strategy", default="insert_overwrite") -%} @@ -41,7 +41,7 @@ {% if strategy not in ['merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} - {% if strategy == 'merge' and file_format != 'delta' %} + {% if strategy == 'merge' and not relation.is_delta %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} {% endif %} @@ -49,15 +49,14 @@ {% do return(strategy) %} {% endmacro %} -{% macro dbt_spark_validate_merge(file_format) %} +{% macro dbt_spark_validate_merge(relation) %} {% set invalid_file_format_msg -%} You can only choose the 'merge' incremental_strategy when file_format is set to 'delta' {%- endset %} - {% if file_format != 'delta' %} + {% if not relation.is_delta %} {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} {% endif %} - {% endmacro %} @@ -84,20 +83,20 @@ {% materialization incremental, adapter='spark' -%} + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} + {#-- Validate early so we don't run SQL if the file_format is invalid --#} {% set file_format = dbt_spark_validate_get_file_format() -%} {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} + {% set strategy = dbt_spark_validate_get_incremental_strategy(target_relation) -%} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - {% if strategy == 'merge' %} {%- set unique_key = config.require('unique_key') -%} - {% do dbt_spark_validate_merge(file_format) %} + {% do dbt_spark_validate_merge(target_relation) %} {% endif %} {% if config.get('partition_by') %} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index 78214641b..fc91329fe 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -80,8 +80,14 @@ Invalid file format: {{ file_format }} Snapshot functionality requires file_format be set to 'delta' {%- endset %} + + {% set target_relation_exists, target_relation = get_or_create_relation( + database=none, + schema=model.schema, + identifier=target_table, + type='table') -%} - {%- if file_format != 'delta' -%} + {%- if not target_relation_exists.is_delta -%} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} @@ -89,12 +95,6 @@ {% do create_schema(model.database, model.schema) %} {% endif %} - {% set target_relation_exists, target_relation = get_or_create_relation( - database=none, - schema=model.schema, - identifier=target_table, - type='table') -%} - {%- if not target_relation.is_table -%} {% do exceptions.relation_wrong_type(target_relation, 'table') %} {%- endif -%} From a9478eb4831d68662f1eca25ce2a76ce9975bba6 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 23 Dec 2020 20:09:40 +0100 Subject: [PATCH 6/9] Update dbt/include/spark/macros/materializations/snapshot.sql Co-authored-by: Jeremy Cohen --- dbt/include/spark/macros/materializations/snapshot.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index fc91329fe..d2c72d78a 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -87,7 +87,10 @@ identifier=target_table, type='table') -%} - {%- if not target_relation_exists.is_delta -%} +{%- if not (target_relation.is_delta or ( + not target_relation_exists and + config.get('file_format', validator=validation.any[basestring]) == 'delta' +)) -%} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} From 589afedfa951360a5b6b4cd74a9515e99fcfc4cb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 23 Dec 2020 21:39:02 +0100 Subject: [PATCH 7/9] Clean up error messages when doing snapshots --- .../macros/materializations/snapshot.sql | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index d2c72d78a..cd43253e1 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -75,25 +75,30 @@ {%- set strategy_name = config.get('strategy') -%} {%- set unique_key = config.get('unique_key') %} {%- set file_format = config.get('file_format', 'parquet') -%} - - {% set invalid_format_msg -%} - Invalid file format: {{ file_format }} - Snapshot functionality requires file_format be set to 'delta' - {%- endset %} {% set target_relation_exists, target_relation = get_or_create_relation( database=none, schema=model.schema, identifier=target_table, type='table') -%} - -{%- if not (target_relation.is_delta or ( - not target_relation_exists and - config.get('file_format', validator=validation.any[basestring]) == 'delta' -)) -%} + + {%- if file_format != 'delta' -%} + {% set invalid_format_msg -%} + Invalid file format: {{ file_format }} + Snapshot functionality requires file_format be set to 'delta' + {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} + {%- if target_relation_exists -%} + {%- if not target_relation.is_delta -%} + {% set invalid_format_msg -%} + The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' + {%- endset %} + {% do exceptions.raise_compiler_error(invalid_format_msg) %} + {% endif %} + {% endif %} + {% if not adapter.check_schema_exists(model.database, model.schema) %} {% do create_schema(model.database, model.schema) %} {% endif %} From 1f8febd3babfa74ed48e23b071b03c40b297bf76 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 23 Dec 2020 21:43:18 +0100 Subject: [PATCH 8/9] Revert incremental for now --- .../macros/materializations/incremental.sql | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 2fd4f9d37..000659a8f 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -24,7 +24,7 @@ {% do return(file_format) %} {% endmacro %} -{% macro dbt_spark_validate_get_incremental_strategy(relation) %} +{% macro dbt_spark_validate_get_incremental_strategy(file_format) %} {#-- Find and validate the incremental strategy #} {%- set strategy = config.get("incremental_strategy", default="insert_overwrite") -%} @@ -41,7 +41,7 @@ {% if strategy not in ['merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} - {% if strategy == 'merge' and not relation.is_delta %} + {% if strategy == 'merge' and file_format != 'delta' %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} {% endif %} @@ -49,14 +49,15 @@ {% do return(strategy) %} {% endmacro %} -{% macro dbt_spark_validate_merge(relation) %} +{% macro dbt_spark_validate_merge(file_format) %} {% set invalid_file_format_msg -%} You can only choose the 'merge' incremental_strategy when file_format is set to 'delta' {%- endset %} - {% if not relation.is_delta %} + {% if file_format != 'delta' %} {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} {% endif %} + {% endmacro %} @@ -83,20 +84,20 @@ {% materialization incremental, adapter='spark' -%} - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - {#-- Validate early so we don't run SQL if the file_format is invalid --#} {% set file_format = dbt_spark_validate_get_file_format() -%} {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {% set strategy = dbt_spark_validate_get_incremental_strategy(target_relation) -%} + {% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} + {% if strategy == 'merge' %} {%- set unique_key = config.require('unique_key') -%} - {% do dbt_spark_validate_merge(target_relation) %} + {% do dbt_spark_validate_merge(file_format) %} {% endif %} {% if config.get('partition_by') %} From 3ebb8709b7c580724a24962b30d30749aa2f1b23 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 31 Dec 2020 13:17:35 +0100 Subject: [PATCH 9/9] Add me to the contributors list --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0ecd95ab..af33934d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,12 @@ - Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](/~https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](/~https://github.com/fishtown-analytics/dbt-spark/pull/126)) ### Under the hood +- Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history. - Add changelog, issue templates ([#119](/~https://github.com/fishtown-analytics/dbt-spark/pull/119), [#120](/~https://github.com/fishtown-analytics/dbt-spark/pull/120)) +### Contributors +- [@Fokko](/~https://github.com/Fokko) ([#125](/~https://github.com/fishtown-analytics/dbt-spark/pull/125)) + ## dbt-spark 0.18.1.1 (November 13, 2020) ### Fixes