- Overview
- Installation Instructions
- Models
- Macros
- Materialization
- Contributions
This dbt package contains Snowflake macros and models that can be (re)used across dbt projects with snowflake as target database.
-
Add the package into your project
Example : packages.yml
- git: "/~https://github.com/entechlog/dbt-snow-utils.git" revision: 0.1.0
- package: entechlog/dbt_snow_utils version: 0.1.0
âś… Packages can be added to your project using either of above options
âś… Please refer to the release version of this repo/dbt hub for the latest version. The version number mentioned above may not be the updated version number.
-
Install the package by running below command
dbt deps
-
Add the following model configuration under the models section of dbt_project.yml. This allows to customize the target database and schema, edit them as needed
models: dbt_snow_utils: staging: database: "DEMO_DB" schema: staging marts: database: "DEMO_DB" schema: marts
-
Snowpipe is Snowflake's continuous data ingestion service. Currently there is not a consolidated dashboard in snowflake which shows the summary of Snowpipe.
-
Copy history in Snowsight gives a dashboard for table level copy history
-
Table functions
INFORMATION_SCHEMA.PIPE_USAGE_HISTORY
andINFORMATION_SCHEMA.COPY_HISTORY
has copy history but its kept retained for 14 days. The table function avoids the 10,000 row limitation of the LOAD_HISTORY View but is also a slow operation. So adjust SQL predicates to filter the data based on your volume -
This process materialize data from
PIPE_USAGE_HISTORY
andCOPY_HISTORY
into a snowflake table. The target tables can be used to visualize the Snowpipe copy history and usage history with the help of dbt macroget_snowpipe_details
and dbt models with tag+tag:snowpipe
-
Add the following variables under vars section of
dbt_project.yml
. This allows to customize the data retrieval filtersvars: dbt_snow_utils: pipe_databases: "ALL" filter_by_date: pipe_copy_history_filter_key: "hours" pipe_copy_history_filter_value: -36 pipe_usage_history_filter_key: "day" pipe_usage_history_filter_value: -2
pipe_databases
(optional): The database name with Snowpipes. Valid values are string “ALL” OR list of databasesfilter_by_date
(optional): The date for filtering data for incremental loads. Should be specified inYYYY-MM-DD
format, if none specified process will use current datepipe_copy_history_filter_key
(optional): The filter key for table function COPY_HISTORY. Some valid values areday
,hour
,minute
,second
etc. See here for list of date and time partspipe_copy_history_filter_value
(optional): The filter value for table function COPY_HISTORY. Should be negative value and relate to the valid values key can acceptpipe_usage_history_filter_key
(optional): The filter key for table function USAGE_HISTORY. Some valid values areday
,hour
,minute
,second
etc. See here for list of date and time partspipe_usage_history_filter_value
(optional): The filter value for table function USAGE_HISTORY. Should be negative value and relate to the valid values key can accept
-
Run the models using command
dbt run --select +tag:snowpipe --vars '{"filter_by_date": "2022-03-22"}' OR dbt run --select +tag:snowpipe --full-refresh
-
This should create
snowpipe__usage_history
andsnowpipe__copy_history
which can be integrated with BI tools to build Snowpipe monitoring dashboards.
-
This process materialize data from
QUERY_HISTORY
into a snowflake table. -
The role used by dbt should have monitor access so it can fetch query executed by all users.
GRANT MONITOR ON WAREHOUSE <ALL-SNOWFLAKE-WAREHOUSE> to role <DBT-ROLE>;
-
Add the following variables under vars section of
dbt_project.yml
. This allows to customize the data retrieval filtersvars: dbt_snow_utils: filter_by_date: query_history_filter_key: "hours" query_history_filter_value: -144 query_history_result_limit: 10000
filter_by_date
(optional): The date for filtering data for incremental loads. Should be specified inYYYY-MM-DD
format, if none specified process will use current datequery_history_filter_key
(optional): The filter key for table function QUERY_HISTORY. Some valid values areday
,hour
,minute
,second
etc. See here for list of date and time partsquery_history_filter_value
(optional): The filter value for table function QUERY_HISTORY. Should be negative value and relate to the valid values key can accept
-
Run the models using command
dbt run --select +tag:snowflake --vars '{"filter_by_date": "2022-03-30"}' OR dbt run --select +tag:snowflake --full-refresh
-
This should create
snowflake__query_history
which can be integrated with BI tools to build Snowflake monitoring dashboards.
This macro clones the source schema/schemas into the destination database.
source_schema
(required): The source schema namedestination_postfix
(required): The destination schema name postfixsource_database
(optional): The source database namedestination_database
(optional): The destination database name
dbt run-operation dbt_snow_utils.clone_schemas --args "{'source_database': 'demo_db', 'source_schemas': ['dim', 'fact', 'utils'], 'destination_database': 'demo_db', 'destination_postfix': '_20220323_01'}"
pre_hook="{{ dbt_snow_utils.clone_schemas(['dim', 'fact', 'utils'], '_backup', 'demo_db', this.database) }}"
This macro clones the source table into the destination database/schema.
source_table
(required): The source table namedestination_table
(required): The destination table namesource_database
(optional): The source database namesource_schema
(optional): The source schema namedestination_database
(optional): The destination database namedestination_schema
(optional): The destination schema name
dbt run-operation clone_table --args '{"source_table": "COUNTRY_CODE", "destination_table": "COUNTRY_CODE_BKP"}'
post_hook="{{ dbt_snow_utils.clone_table(this.identifier,this.identifier~'_temp', this.database, this.schema, this.database, this.schema ) }}"
This macro clones all the tables from source database/schema into the destination database/schema. This also provides an option to truncate the tables after cloning if you just need the table structure and not data.
source_schemas
(required): The list of source schema namessource_database
(optional): The source database namedestination_database
(optional): The destination database nametruncate_table_flag
(optional): Flag to truncate data after copy, When enabled only table structure is copied and not data
dbt run-operation clone_tables --args "{'source_database': 'DEV_ENTECHLOG_DW_DB', 'source_schemas': ['dim', 'fact', 'utils'], 'destination_database': 'DEV_ENTECHLOG_DEMO_DB', 'truncate_table_flag': 'True'}"
pre_hook="{{ dbt_snow_utils.clone_tables(['dim', 'fact', 'utils'], 'DEV_ENTECHLOG_DW_DB', 'DEV_ENTECHLOG_DEMO_DB', 'True') }}"
This macro deletes data from a table based on a where clause. Often used as pre-hook in incremental loads to delete the data.
del_key
(required): The column name in WHERE clause of deletesdel_value
(required): The value for column name in WHERE clause of deletesdatabase
(optional): The database nameschema
(optional): The schema nametable
(optional): The table name
dbt run-operation delete_records_by_column --args '{"del_key": "payment_date", "del_value": "2005-05-25", "database": "DBT_DEMO", "schema": "MARTS", "table": "tmp_store_revenue"}'
post_hook="{{ dbt_snow_utils.delete_records_by_column('payment_date', '2005-05-24') }}"
post_hook="{{ dbt_snow_utils.delete_records_by_column('payment_date', var('start_date')) }}"
This macro deletes data from a table based on a where clause. Often used as pre-hook in incremental loads to delete the data.
databases_list
(optional): A list of databases (and optionally schemas in the format database.schema) to search for orphaned tables/views. Defaults to the target database defined in your dbt profile.dry_run
(optional): If set to True, the macro will log the tables/views that would be deleted without actually performing the deletion. Defaults to False.
- To perform a dry run (log but do not delete) on specific databases:
dbt run-operation delete_orphaned_tables --args "{databases_list: ['DATABASE1', 'DATABASE2'], dry_run: True}"
- To delete orphaned tables/views in the default target database:
dbt run-operation delete_orphaned_tables --args "{dry_run: False}"
- To delete orphaned tables/views in specific databases (e.g., 'DATABASE1' and 'DATABASE2'):
dbt run-operation delete_orphaned_tables --args "{databases_list: ['DATABASE1', 'DATABASE2'], dry_run: False}"
- To delete orphaned tables/views in specific databases/schemas (e.g., 'DATABASE1' and 'DATABASE2.SCHEMA1'):
dbt run-operation delete_orphaned_tables --args "{databases_list: ['DATABASE1', 'DATABASE2.SCHEMA1'], dry_run: False}"
This materialization strategy loads data from a configured external stage (cloud storage location) directly into a Snowflake table using the COPY INTO command or an INSERT statement.
stage_name
(optional): The name of the Snowflake stage object to be used or created. Defaults to the model's identifier with _stage postfix.url
(required): The URL of the external stage (cloud storage path).file_format
(optional): The file format option for loading data. Defaults to (type = PARQUET) if not specified.mode
(optional): The loading mode, either COPY or INSERT. Defaults to COPY.pattern
(optional): A regex pattern to match files in the external stage.
Configure this materialization in your model's configuration block in dbt_project.yml or within the model SQL file itself using {{ config(materialized = 'stage2table', ...) }}.
{{
config(
materialized="stage2table",
url="s3://" ~ var("s3_bucket") ~ "/cricsheet/all_player_data/",
file_format="(TYPE = CSV SKIP_HEADER = 1 TRIM_SPACE = TRUE ESCAPE_UNENCLOSED_FIELD = NONE)",
mode="INSERT",
tags=["source", "cricsheet"],
pre_hook="{{ delete_data('FILE_LAST_MODIFIED_DT', var('batch_cycle_date'), this) }}",
)
}}
Contributions to this package are welcomed. Please create issues for bugs or feature requests for enhancement ideas or PRs for any enhancement contributions.