diff --git a/CHANGELOG.md b/CHANGELOG.md index e83e6816702..a4a24580e6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## dbt 0.18.0b2 (July 30, 2020) +### Features +- Added `--defer` and `--state` flags to `dbt run`, to defer to a previously generated manifest for unselected nodes in a run. ([#2527](/~https://github.com/fishtown-analytics/dbt/issues/2527), [#2656](/~https://github.com/fishtown-analytics/dbt/pull/2656)) + ### Breaking changes - Previously, dbt put macros from all installed plugins into the namespace. This version of dbt will not include adapter plugin macros unless they are from the currently-in-use adapter or one of its dependencies [#2590](/~https://github.com/fishtown-analytics/dbt/pull/2590) diff --git a/core/dbt/clients/system.py b/core/dbt/clients/system.py index 0b09edaa1b7..f93a5d939d9 100644 --- a/core/dbt/clients/system.py +++ b/core/dbt/clients/system.py @@ -162,6 +162,10 @@ def write_file(path: str, contents: str = '') -> bool: return True +def read_json(path: str) -> Dict[str, Any]: + return json.loads(load_file_contents(path)) + + def write_json(path: str, data: Dict[str, Any]) -> bool: return write_file(path, json.dumps(data, cls=dbt.utils.JSONEncoder)) diff --git a/core/dbt/contracts/graph/compiled.py b/core/dbt/contracts/graph/compiled.py index e68c72e2bb4..72138b53eb6 100644 --- a/core/dbt/contracts/graph/compiled.py +++ b/core/dbt/contracts/graph/compiled.py @@ -29,15 +29,16 @@ class InjectedCTE(JsonSchemaMixin, Replaceable): id: str sql: str -# for some frustrating reason, we can't subclass from ParsedNode directly, -# or typing.Union will flatten CompiledNode+ParsedNode into just ParsedNode. -# TODO: understand that issue and come up with some way for these two to share -# logic + +@dataclass +class CompiledNodeMixin(JsonSchemaMixin): + # this is a special mixin class to provide a required argument. If a node + # is missing a `compiled` flag entirely, it must not be a CompiledNode. + compiled: bool @dataclass -class CompiledNode(ParsedNode): - compiled: bool = False +class CompiledNode(ParsedNode, CompiledNodeMixin): compiled_sql: Optional[str] = None extra_ctes_injected: bool = False extra_ctes: List[InjectedCTE] = field(default_factory=list) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 4f9ba1d8192..a186e1189d0 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -4,7 +4,7 @@ import os from dataclasses import dataclass, field from datetime import datetime -from itertools import chain +from itertools import chain, islice from multiprocessing.synchronize import Lock from typing import ( Dict, List, Optional, Union, Mapping, MutableMapping, Any, Set, Tuple, @@ -22,7 +22,7 @@ ParsedMacro, ParsedDocumentation, ParsedNodePatch, ParsedMacroPatch, ParsedSourceDefinition ) -from dbt.contracts.util import Writable, Replaceable +from dbt.contracts.util import Readable, Writable, Replaceable from dbt.exceptions import ( raise_duplicate_resource_name, InternalException, raise_compiler_error, warn_or_error, raise_invalid_patch @@ -1011,25 +1011,52 @@ def resolve_doc( return result return None + def merge_from_artifact( + self, + other: 'WritableManifest', + selected: Set[UniqueID], + ) -> None: + """Given the selected unique IDs and a writable manifest, update this + manifest by replacing any unselected nodes with their counterpart. + + Only non-ephemeral refable nodes are examined. + """ + refables = set(NodeType.refable()) + merged = set() + for unique_id, node in other.nodes.items(): + if ( + node.resource_type in refables and + not node.is_ephemeral and + unique_id not in selected + ): + merged.add(unique_id) + self.nodes[unique_id] = node.replace(deferred=True) + + # log up to 5 items + sample = list(islice(merged, 5)) + logger.debug( + f'Merged {len(merged)} items from state (sample: {sample})' + ) + @dataclass -class WritableManifest(JsonSchemaMixin, Writable): - nodes: Mapping[str, NonSourceNode] = field( +class WritableManifest(JsonSchemaMixin, Writable, Readable): + nodes: Mapping[UniqueID, NonSourceNode] = field( metadata=dict(description=( 'The nodes defined in the dbt project and its dependencies' - )), + )) ) - sources: Mapping[str, ParsedSourceDefinition] = field( + sources: Mapping[UniqueID, ParsedSourceDefinition] = field( metadata=dict(description=( - 'The sources defined in the dbt project and its dependencies', + 'The sources defined in the dbt project and its dependencies' )) ) - macros: Mapping[str, ParsedMacro] = field( + macros: Mapping[UniqueID, ParsedMacro] = field( metadata=dict(description=( 'The macros defined in the dbt project and its dependencies' )) ) - docs: Mapping[str, ParsedDocumentation] = field( + docs: Mapping[UniqueID, ParsedDocumentation] = field( metadata=dict(description=( 'The docs defined in the dbt project and its dependencies' )) diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 9ac99c84400..140671bcfa9 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -159,6 +159,7 @@ class ParsedNodeDefaults(ParsedNodeMandatory): docs: Docs = field(default_factory=Docs) patch_path: Optional[str] = None build_path: Optional[str] = None + deferred: bool = False def write_node(self, target_path: str, subdirectory: str, payload: str): if (os.path.basename(self.path) == diff --git a/core/dbt/contracts/util.py b/core/dbt/contracts/util.py index e2f2c257c2a..764ee1cd08e 100644 --- a/core/dbt/contracts/util.py +++ b/core/dbt/contracts/util.py @@ -1,7 +1,8 @@ import dataclasses from typing import List -from dbt.clients.system import write_json +from dbt.clients.system import write_json, read_json +from dbt.exceptions import RuntimeException def list_str() -> List[str]: @@ -76,3 +77,16 @@ def replace(self, **kwargs): @property def extra(self): return self._extra + + +class Readable: + @classmethod + def read(cls, path: str): + try: + data = read_json(path) + except (EnvironmentError, ValueError) as exc: + raise RuntimeException( + f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' + ) from exc + + return cls.from_dict(data) # type: ignore diff --git a/core/dbt/flags.py b/core/dbt/flags.py index ffcb3958081..3f692a79d5c 100644 --- a/core/dbt/flags.py +++ b/core/dbt/flags.py @@ -1,5 +1,6 @@ import os import multiprocessing +from pathlib import Path from typing import Optional # initially all flags are set to None, the on-load call of reset() will set # them for their first time. @@ -22,9 +23,19 @@ def env_set_truthy(key: str) -> Optional[str]: return value +def env_set_path(key: str) -> Optional[Path]: + value = os.getenv(key) + if value is None: + return value + else: + return Path(value) + + SINGLE_THREADED_WEBSERVER = env_set_truthy('DBT_SINGLE_THREADED_WEBSERVER') SINGLE_THREADED_HANDLER = env_set_truthy('DBT_SINGLE_THREADED_HANDLER') MACRO_DEBUGGING = env_set_truthy('DBT_MACRO_DEBUGGING') +DEFER_MODE = env_set_truthy('DBT_DEFER_TO_STATE') +ARTIFACT_STATE_PATH = env_set_path('DBT_ARTIFACT_STATE_PATH') def _get_context(): diff --git a/core/dbt/main.py b/core/dbt/main.py index 726dca5cea8..d4344f1cbed 100644 --- a/core/dbt/main.py +++ b/core/dbt/main.py @@ -6,6 +6,7 @@ import sys import traceback from contextlib import contextmanager +from pathlib import Path import dbt.version import dbt.flags as flags @@ -31,7 +32,7 @@ from dbt.utils import ExitCodes from dbt.config import PROFILES_DIR, read_user_config -from dbt.exceptions import RuntimeException +from dbt.exceptions import RuntimeException, InternalException class DBTVersion(argparse.Action): @@ -62,6 +63,50 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.register('action', 'dbtversion', DBTVersion) + def add_optional_argument_inverse( + self, + name, + *, + enable_help=None, + disable_help=None, + dest=None, + no_name=None, + default=None, + ): + mutex_group = self.add_mutually_exclusive_group() + if not name.startswith('--'): + raise InternalException( + 'cannot handle optional argument without "--" prefix: ' + f'got "{name}"' + ) + if dest is None: + dest_name = name[2:].replace('-', '_') + else: + dest_name = dest + + if no_name is None: + no_name = f'--no-{name[2:]}' + + mutex_group.add_argument( + name, + action='store_const', + const=True, + dest=dest_name, + default=default, + help=enable_help, + ) + + mutex_group.add_argument( + f'--no-{name[2:]}', + action='store_const', + const=False, + dest=dest_name, + default=default, + help=disable_help, + ) + + return mutex_group + class RPCArgumentParser(DBTArgumentParser): def exit(self, status=0, message=None): @@ -279,6 +324,8 @@ def _build_base_subparser(): If set, bypass the adapter-level cache of database state ''', ) + + base_subparser.set_defaults(defer=None, state=None) return base_subparser @@ -395,15 +442,39 @@ def _build_run_subparser(subparsers, base_subparser): parents=[base_subparser], help=''' Compile SQL and execute against the current target database. - ''') + ''' + ) run_sub.add_argument( '-x', '--fail-fast', action='store_true', help=''' - Stop execution upon a first failure. - ''' + Stop execution upon a first failure. + ''' + ) + + # for now, this is a "dbt run"-only thing + run_sub.add_argument( + '--state', + help=''' + If set, use the given directory as the source for json files to compare + with this project. + ''', + type=Path, + default=flags.ARTIFACT_STATE_PATH, ) + run_sub.add_optional_argument_inverse( + '--defer', + enable_help=''' + If set, defer to the state variable for resolving unselected nodes. + ''', + disable_help=''' + If set, do not defer to the state variable for resolving unselected + nodes. + ''', + default=flags.DEFER_MODE, + ) + run_sub.set_defaults(cls=run_task.RunTask, which='run', rpc_method='run') return run_sub @@ -830,30 +901,17 @@ def parse_args(args, cls=DBTArgumentParser): ''' ) - partial_flag = p.add_mutually_exclusive_group() - partial_flag.add_argument( + p.add_optional_argument_inverse( '--partial-parse', - action='store_const', - const=True, - dest='partial_parse', - default=None, - help=''' + enable_help=''' Allow for partial parsing by looking for and writing to a pickle file in the target directory. This overrides the user configuration file. WARNING: This can result in unexpected behavior if you use env_var()! - ''' - ) - - partial_flag.add_argument( - '--no-partial-parse', - action='store_const', - const=False, - default=None, - dest='partial_parse', - help=''' + ''', + disable_help=''' Disallow partial parsing. This overrides the user configuration file. - ''' + ''', ) # if set, run dbt in single-threaded mode: thread count is ignored, and diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 57fc8dde8ab..246a3336cb2 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -6,7 +6,6 @@ from hologram import ValidationError from .compile import CompileTask -from .runnable import write_manifest from dbt.adapters.factory import get_adapter from dbt.contracts.graph.compiled import CompileResultNode @@ -274,7 +273,7 @@ def run(self) -> CatalogResults: path = os.path.join(self.config.target_path, CATALOG_FILENAME) results.write(path) if self.args.compile: - write_manifest(self.config, self.manifest) + self.write_manifest() if exceptions: logger.error( diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 034a8cd9728..1e2fb304761 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,5 +1,6 @@ import functools import time +from pathlib import Path from typing import List, Dict, Any, Iterable, Set, Tuple, Optional from .compile import CompileRunner, CompileTask @@ -21,6 +22,7 @@ from dbt.compilation import compile_node from dbt.context.providers import generate_runtime_model from dbt.contracts.graph.compiled import CompileResultNode +from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.graph.model_config import Hook from dbt.contracts.graph.parsed import ParsedHookNode from dbt.contracts.results import RunModelResult @@ -245,6 +247,32 @@ def __init__(self, args, config): super().__init__(args, config) self.ran_hooks = [] self._total_executed = 0 + self.deferred_manifest: Optional[WritableManifest] = None + + def _get_state_path(self) -> Path: + if self.args.state is not None: + return self.args.state + else: + raise RuntimeException( + 'Received a --defer argument, but no value was provided ' + 'to --state' + ) + + def _get_deferred_manifest(self) -> Optional[WritableManifest]: + if not self.args.defer: + return None + + path = self._get_state_path() + + if not path.is_absolute(): + path = Path(self.config.project_root) / path + if path.exists() and not path.is_file(): + path = path / 'manifest.json' + if not path.exists(): + raise RuntimeException( + f'Could not find --state path: "{path}"' + ) + return WritableManifest.read(str(path)) def index_offset(self, value: int) -> int: return self._total_executed + value @@ -355,7 +383,24 @@ def print_results_line(self, results, execution_time): "Finished running {stat_line}{execution}." .format(stat_line=stat_line, execution=execution)) + def defer_to_manifest(self, selected_uids): + self.deferred_manifest = self._get_deferred_manifest() + if self.deferred_manifest is None: + return + if self.manifest is None: + raise InternalException( + 'Expected to defer to manifest, but there is no runtime ' + 'manifest to defer from!' + ) + self.manifest.merge_from_artifact( + other=self.deferred_manifest, + selected=selected_uids, + ) + # TODO: is it wrong to write the manifest here? I think it's right... + self.write_manifest() + def before_run(self, adapter, selected_uids): + self.defer_to_manifest(selected_uids) with adapter.connection_named('master'): self.create_schemas(adapter, selected_uids) self.populate_adapter_cache(adapter) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 693445bc071..401bb5ec15a 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -50,20 +50,20 @@ RUNNING_STATE = DbtProcessState('running') -def write_manifest(config, manifest): - if flags.WRITE_JSON: - manifest.write(os.path.join(config.target_path, MANIFEST_FILE_NAME)) - - class ManifestTask(ConfiguredTask): def __init__(self, args, config): super().__init__(args, config) self.manifest: Optional[Manifest] = None self.graph: Optional[Graph] = None + def write_manifest(self): + if flags.WRITE_JSON: + path = os.path.join(self.config.target_path, MANIFEST_FILE_NAME) + self.manifest.write(path) + def load_manifest(self): self.manifest = get_full_manifest(self.config) - write_manifest(self.config, self.manifest) + self.write_manifest() def compile_manifest(self): if self.manifest is None: diff --git a/core/setup.py b/core/setup.py index 3840791a21e..cfc50bf6864 100644 --- a/core/setup.py +++ b/core/setup.py @@ -64,7 +64,7 @@ def read(fname): 'json-rpc>=1.12,<2', 'werkzeug>=0.15,<0.17', 'dataclasses==0.6;python_version<"3.7"', - 'hologram==0.0.7', + 'hologram==0.0.8', 'logbook>=1.5,<1.6', 'typing-extensions>=3.7.4,<3.8', # the following are all to match snowflake-connector-python diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index bc7a79f0d61..82e2fc8a91d 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -966,6 +966,7 @@ def expected_seeded_manifest(self, model_database=None): 'config': model_config, 'schema': my_schema_name, 'database': model_database, + 'deferred': False, 'alias': 'model', 'description': 'The test model', 'columns': { @@ -1032,6 +1033,7 @@ def expected_seeded_manifest(self, model_database=None): 'config': second_config, 'schema': self.alternate_schema, 'database': self.default_database, + 'deferred': False, 'alias': 'second_model', 'description': 'The second test model', 'columns': { @@ -1115,6 +1117,7 @@ def expected_seeded_manifest(self, model_database=None): 'schema': my_schema_name, 'database': self.default_database, 'alias': 'seed', + 'deferred': False, 'description': 'The test seed', 'columns': { 'id': { @@ -1183,6 +1186,7 @@ def expected_seeded_manifest(self, model_database=None): 'macros': ['macro.dbt.test_not_null'], 'nodes': ['model.test.model'], }, + 'deferred': False, 'description': '', 'fqn': ['test', 'schema_test', 'not_null_model_id'], 'name': 'not_null_model_id', @@ -1237,6 +1241,7 @@ def expected_seeded_manifest(self, model_database=None): 'macros': ['macro.test.test_nothing'], 'nodes': ['model.test.model'], }, + 'deferred': False, 'description': '', 'fqn': ['test', 'schema_test', 'test_nothing_model_'], 'name': 'test_nothing_model_', @@ -1290,6 +1295,7 @@ def expected_seeded_manifest(self, model_database=None): 'macros': ['macro.dbt.test_unique'], 'nodes': ['model.test.model'], }, + 'deferred': False, 'description': '', 'fqn': ['test', 'schema_test', 'unique_model_id'], 'name': 'unique_model_id', @@ -1386,6 +1392,7 @@ def expected_postgres_references_manifest(self, model_database=None): 'macros': [], 'nodes': ['source.test.my_source.my_table'] }, + 'deferred': False, 'description': '', 'docs': {'show': True}, 'fqn': ['test', 'ephemeral_copy'], @@ -1448,6 +1455,7 @@ def expected_postgres_references_manifest(self, model_database=None): 'macros': [], 'nodes': ['model.test.ephemeral_copy'] }, + 'deferred': False, 'description': 'A summmary table of the ephemeral copy of the seed data', 'docs': {'show': True}, 'fqn': ['test', 'ephemeral_summary'], @@ -1512,6 +1520,7 @@ def expected_postgres_references_manifest(self, model_database=None): 'macros': [], 'nodes': ['model.test.ephemeral_summary'] }, + 'deferred': False, 'description': 'A view of the summary of the ephemeral copy of the seed data', 'docs': {'show': True}, 'fqn': ['test', 'view_summary'], @@ -1594,6 +1603,7 @@ def expected_postgres_references_manifest(self, model_database=None): }, 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, + 'deferred': False, 'description': 'The test seed', 'docs': {'show': True}, 'fqn': ['test', 'seed'], @@ -1879,6 +1889,7 @@ def expected_bigquery_complex_manifest(self): 'tags': [], }, }, + 'deferred': False, 'description': 'A clustered and partitioned copy of the test model', 'patch_path': self.dir('bq_models/schema.yml'), 'docs': {'show': True}, @@ -1958,6 +1969,7 @@ def expected_bigquery_complex_manifest(self): 'tags': [], }, }, + 'deferred': False, 'description': 'A clustered and partitioned copy of the test model, clustered on multiple columns', 'patch_path': self.dir('bq_models/schema.yml'), 'docs': {'show': True}, @@ -2038,6 +2050,7 @@ def expected_bigquery_complex_manifest(self): 'tags': [], }, }, + 'deferred': False, 'description': 'The test model', 'patch_path': self.dir('bq_models/schema.yml'), 'docs': {'show': True}, @@ -2083,6 +2096,7 @@ def expected_bigquery_complex_manifest(self): 'meta': {}, 'unique_id': 'model.test.nested_table', 'columns': {}, + 'deferred': False, 'description': '', 'docs': {'show': True}, 'compiled': True, @@ -2164,6 +2178,7 @@ def expected_bigquery_complex_manifest(self): 'tags': [], }, }, + 'deferred': False, 'description': 'The test seed', 'docs': {'show': True}, 'compiled': True, @@ -2268,6 +2283,7 @@ def expected_redshift_incremental_view_manifest(self): 'schema': my_schema_name, 'database': self.default_database, 'alias': 'model', + 'deferred': False, 'description': 'The test model', 'columns': { 'id': { @@ -2387,6 +2403,7 @@ def expected_redshift_incremental_view_manifest(self): 'tags': [], }, }, + 'deferred': False, 'description': 'The test seed', 'docs': {'show': True}, 'compiled': True, @@ -2559,6 +2576,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'macros': [], 'nodes': ['seed.test.seed'] }, + 'deferred': False, 'description': 'The test model', 'docs': {'show': False}, 'extra_ctes': [], @@ -2640,6 +2658,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'macros': [], 'nodes': ['seed.test.seed'] }, + 'deferred': False, 'description': 'The second test model', 'docs': {'show': False}, 'extra_ctes': [], @@ -2728,6 +2747,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, }, 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, + 'deferred': False, 'description': 'The test seed', 'docs': {'show': True}, 'extra_ctes': [], @@ -2784,6 +2804,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'macros': ['macro.dbt.test_not_null'], 'nodes': ['model.test.model'], }, + 'deferred': False, 'description': '', 'docs': {'show': True}, 'extra_ctes': [], @@ -2848,6 +2869,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'macros': ['macro.test.test_nothing'], 'nodes': ['model.test.model'], }, + 'deferred': False, 'description': '', 'docs': {'show': True}, 'extra_ctes': [], @@ -2911,6 +2933,7 @@ def expected_run_results(self, quote_schema=True, quote_model=False, 'macros': ['macro.dbt.test_unique'], 'nodes': ['model.test.model'], }, + 'deferred': False, 'description': '', 'docs': {'show': True}, 'extra_ctes': [], @@ -3016,6 +3039,7 @@ def expected_postgres_references_run_results(self): 'nodes': ['model.test.ephemeral_copy'], 'macros': [] }, + 'deferred': False, 'description': ( 'A summmary table of the ephemeral copy of the seed data' ), @@ -3097,6 +3121,7 @@ def expected_postgres_references_run_results(self): 'nodes': ['model.test.ephemeral_summary'], 'macros': [] }, + 'deferred': False, 'description': ( 'A view of the summary of the ephemeral copy of the ' 'seed data' @@ -3192,6 +3217,7 @@ def expected_postgres_references_run_results(self): }, 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, + 'deferred': False, 'description': 'The test seed', 'docs': {'show': True}, 'extra_ctes': [], diff --git a/test/integration/062_defer_state_test/changed_models/ephemeral_model.sql b/test/integration/062_defer_state_test/changed_models/ephemeral_model.sql new file mode 100644 index 00000000000..2f976e3a9b5 --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models/ephemeral_model.sql @@ -0,0 +1,2 @@ +{{ config(materialized='ephemeral') }} +select * from {{ ref('view_model') }} diff --git a/test/integration/062_defer_state_test/changed_models/schema.yml b/test/integration/062_defer_state_test/changed_models/schema.yml new file mode 100644 index 00000000000..1ec506d3d19 --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models/schema.yml @@ -0,0 +1,9 @@ +version: 2 +models: + - name: view_model + columns: + - name: id + tests: + - unique + - not_null + - name: name diff --git a/test/integration/062_defer_state_test/changed_models/table_model.sql b/test/integration/062_defer_state_test/changed_models/table_model.sql new file mode 100644 index 00000000000..07fa333386f --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models/table_model.sql @@ -0,0 +1,2 @@ +{{ config(materialized='table') }} +select * from {{ ref('ephemeral_model') }} diff --git a/test/integration/062_defer_state_test/changed_models/view_model.sql b/test/integration/062_defer_state_test/changed_models/view_model.sql new file mode 100644 index 00000000000..bddbbb23cc2 --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models/view_model.sql @@ -0,0 +1 @@ +select * from no.such.table diff --git a/test/integration/062_defer_state_test/changed_models_bad/ephemeral_model.sql b/test/integration/062_defer_state_test/changed_models_bad/ephemeral_model.sql new file mode 100644 index 00000000000..5155dfa475e --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models_bad/ephemeral_model.sql @@ -0,0 +1,2 @@ +{{ config(materialized='ephemeral') }} +select * from no.such.table diff --git a/test/integration/062_defer_state_test/changed_models_bad/schema.yml b/test/integration/062_defer_state_test/changed_models_bad/schema.yml new file mode 100644 index 00000000000..1ec506d3d19 --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models_bad/schema.yml @@ -0,0 +1,9 @@ +version: 2 +models: + - name: view_model + columns: + - name: id + tests: + - unique + - not_null + - name: name diff --git a/test/integration/062_defer_state_test/changed_models_bad/table_model.sql b/test/integration/062_defer_state_test/changed_models_bad/table_model.sql new file mode 100644 index 00000000000..07fa333386f --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models_bad/table_model.sql @@ -0,0 +1,2 @@ +{{ config(materialized='table') }} +select * from {{ ref('ephemeral_model') }} diff --git a/test/integration/062_defer_state_test/changed_models_bad/view_model.sql b/test/integration/062_defer_state_test/changed_models_bad/view_model.sql new file mode 100644 index 00000000000..bddbbb23cc2 --- /dev/null +++ b/test/integration/062_defer_state_test/changed_models_bad/view_model.sql @@ -0,0 +1 @@ +select * from no.such.table diff --git a/test/integration/062_defer_state_test/data/seed.csv b/test/integration/062_defer_state_test/data/seed.csv new file mode 100644 index 00000000000..1a728c8ab74 --- /dev/null +++ b/test/integration/062_defer_state_test/data/seed.csv @@ -0,0 +1,3 @@ +id,name +1,Alice +2,Bob diff --git a/test/integration/062_defer_state_test/models/ephemeral_model.sql b/test/integration/062_defer_state_test/models/ephemeral_model.sql new file mode 100644 index 00000000000..2f976e3a9b5 --- /dev/null +++ b/test/integration/062_defer_state_test/models/ephemeral_model.sql @@ -0,0 +1,2 @@ +{{ config(materialized='ephemeral') }} +select * from {{ ref('view_model') }} diff --git a/test/integration/062_defer_state_test/models/schema.yml b/test/integration/062_defer_state_test/models/schema.yml new file mode 100644 index 00000000000..1ec506d3d19 --- /dev/null +++ b/test/integration/062_defer_state_test/models/schema.yml @@ -0,0 +1,9 @@ +version: 2 +models: + - name: view_model + columns: + - name: id + tests: + - unique + - not_null + - name: name diff --git a/test/integration/062_defer_state_test/models/table_model.sql b/test/integration/062_defer_state_test/models/table_model.sql new file mode 100644 index 00000000000..07fa333386f --- /dev/null +++ b/test/integration/062_defer_state_test/models/table_model.sql @@ -0,0 +1,2 @@ +{{ config(materialized='table') }} +select * from {{ ref('ephemeral_model') }} diff --git a/test/integration/062_defer_state_test/models/view_model.sql b/test/integration/062_defer_state_test/models/view_model.sql new file mode 100644 index 00000000000..4b91aa0f2fa --- /dev/null +++ b/test/integration/062_defer_state_test/models/view_model.sql @@ -0,0 +1 @@ +select * from {{ ref('seed') }} diff --git a/test/integration/062_defer_state_test/snapshots/my_snapshot.sql b/test/integration/062_defer_state_test/snapshots/my_snapshot.sql new file mode 100644 index 00000000000..6a7d2b31bfa --- /dev/null +++ b/test/integration/062_defer_state_test/snapshots/my_snapshot.sql @@ -0,0 +1,14 @@ +{% snapshot my_cool_snapshot %} + + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['id'], + ) + }} + select * from {{ ref('view_model') }} + +{% endsnapshot %} diff --git a/test/integration/062_defer_state_test/test_defer_state.py b/test/integration/062_defer_state_test/test_defer_state.py new file mode 100644 index 00000000000..892ef863ee9 --- /dev/null +++ b/test/integration/062_defer_state_test/test_defer_state.py @@ -0,0 +1,135 @@ +from test.integration.base import DBTIntegrationTest, use_profile +import copy +import json +import os +import shutil + +import pytest + + +class TestDeferState(DBTIntegrationTest): + @property + def schema(self): + return "defer_state_062" + + @property + def models(self): + return "models" + + def setUp(self): + self.other_schema = None + super().setUp() + self._created_schemas.add(self.other_schema) + + @property + def project_config(self): + return { + 'config-version': 2, + 'seeds': { + 'test': { + 'quote_columns': True, + } + } + } + + def get_profile(self, adapter_type): + if self.other_schema is None: + self.other_schema = self.unique_schema() + '_other' + if self.adapter_type == 'snowflake': + self.other_schema = self.other_schema.upper() + profile = super().get_profile(adapter_type) + default_name = profile['test']['target'] + profile['test']['outputs']['otherschema'] = copy.deepcopy(profile['test']['outputs'][default_name]) + profile['test']['outputs']['otherschema']['schema'] = self.other_schema + return profile + + def copy_state(self): + assert not os.path.exists('state') + os.makedirs('state') + shutil.copyfile('target/manifest.json', 'state/manifest.json') + + def run_and_defer(self): + results = self.run_dbt(['seed']) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + results = self.run_dbt(['run']) + assert len(results) == 2 + assert not any(r.node.deferred for r in results) + + # copy files over from the happy times when we had a good target + self.copy_state() + + # no state, still fails + self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False) + + # with state it should work though + results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema']) + assert self.other_schema not in results[0].node.injected_sql + assert self.unique_schema() in results[0].node.injected_sql + + with open('target/manifest.json') as fp: + data = json.load(fp) + assert data['nodes']['seed.test.seed']['deferred'] + + assert len(results) == 1 + + def run_switchdirs_defer(self): + results = self.run_dbt(['seed']) + assert len(results) == 1 + results = self.run_dbt(['run']) + assert len(results) == 2 + + # copy files over from the happy times when we had a good target + self.copy_state() + + self.use_default_project({'source-paths': ['changed_models']}) + # the sql here is just wrong, so it should fail + self.run_dbt( + ['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'], + expect_pass=False, + ) + # but this should work since we just use the old happy model + self.run_dbt( + ['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'], + expect_pass=True, + ) + + self.use_default_project({'source-paths': ['changed_models_bad']}) + # this should fail because the table model refs a broken ephemeral + # model, which it should see + self.run_dbt( + ['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'], + expect_pass=False, + ) + + @use_profile('postgres') + def test_postgres_state_changetarget(self): + self.run_and_defer() + # these should work without --defer! + self.run_dbt(['test']) + self.run_dbt(['snapshot']) + # make sure these commands don't work with --defer + with pytest.raises(SystemExit): + self.run_dbt(['seed', '--defer']) + + with pytest.raises(SystemExit): + self.run_dbt(['test', '--defer']) + with pytest.raises(SystemExit): + self.run_dbt(['snapshot', '--defer']) + + @use_profile('postgres') + def test_postgres_stat_changedir(self): + self.run_switchdirs_defer() + + @use_profile('snowflake') + def test_snowflake_state_changetarget(self): + self.run_and_defer() + + @use_profile('redshift') + def test_redshift_state_changetarget(self): + self.run_and_defer() + + @use_profile('bigquery') + def test_bigquery_state_changetarget(self): + self.run_and_defer() + diff --git a/test/integration/base.py b/test/integration/base.py index 5065f1c1d09..c912133cf82 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -60,6 +60,7 @@ class FakeArgs: def __init__(self): self.threads = 1 self.data = False + self.defer = False self.schema = True self.full_refresh = False self.models = None diff --git a/test/unit/test_contracts_graph_compiled.py b/test/unit/test_contracts_graph_compiled.py index 4dc16616bff..b40aa563ee7 100644 --- a/test/unit/test_contracts_graph_compiled.py +++ b/test/unit/test_contracts_graph_compiled.py @@ -28,6 +28,7 @@ def _minimum(self): 'database': 'test_db', 'schema': 'test_schema', 'alias': 'bar', + 'compiled': False, } def test_basic_uncompiled(self): @@ -45,6 +46,7 @@ def test_basic_uncompiled(self): 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, 'database': 'test_db', + 'deferred': False, 'description': '', 'schema': 'test_schema', 'alias': 'bar', @@ -80,6 +82,7 @@ def test_basic_uncompiled(self): refs=[], sources=[], depends_on=DependsOn(), + deferred=False, description='', database='test_db', schema='test_schema', @@ -116,6 +119,7 @@ def test_basic_compiled(self): 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, 'database': 'test_db', + 'deferred': True, 'description': '', 'schema': 'test_schema', 'alias': 'bar', @@ -153,6 +157,7 @@ def test_basic_compiled(self): refs=[], sources=[], depends_on=DependsOn(), + deferred=True, description='', database='test_db', schema='test_schema', @@ -204,6 +209,7 @@ def _minimum(self): 'name': 'foo', 'kwargs': {}, }, + 'compiled': False, } def test_basic_uncompiled(self): @@ -237,6 +243,7 @@ def test_basic_uncompiled(self): 'vars': {}, 'severity': 'ERROR', }, + 'deferred': False, 'docs': {'show': True}, 'columns': {}, 'meta': {}, @@ -260,6 +267,7 @@ def test_basic_uncompiled(self): fqn=['test', 'models', 'foo'], refs=[], sources=[], + deferred=False, depends_on=DependsOn(), description='', database='test_db', @@ -297,6 +305,7 @@ def test_basic_compiled(self): 'refs': [], 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, + 'deferred': False, 'database': 'test_db', 'description': '', 'schema': 'test_schema', @@ -314,6 +323,7 @@ def test_basic_compiled(self): 'vars': {}, 'severity': 'warn', }, + 'docs': {'show': True}, 'columns': {}, 'meta': {}, @@ -341,6 +351,7 @@ def test_basic_compiled(self): refs=[], sources=[], depends_on=DependsOn(), + deferred=False, description='', database='test_db', schema='test_schema', diff --git a/test/unit/test_contracts_graph_parsed.py b/test/unit/test_contracts_graph_parsed.py index 6324a3e4639..7716512b541 100644 --- a/test/unit/test_contracts_graph_parsed.py +++ b/test/unit/test_contracts_graph_parsed.py @@ -109,6 +109,7 @@ def _model_ok(self): 'tags': [], 'vars': {}, }, + 'deferred': False, 'docs': {'show': True}, 'columns': {}, 'meta': {}, @@ -176,6 +177,7 @@ def test_complex(self): 'sources': [], 'depends_on': {'macros': [], 'nodes': ['model.test.bar']}, 'database': 'test_db', + 'deferred': True, 'description': 'My parsed node', 'schema': 'test_schema', 'alias': 'bar', @@ -216,6 +218,7 @@ def test_complex(self): refs=[], sources=[], depends_on=DependsOn(nodes=['model.test.bar']), + deferred=True, description='My parsed node', database='test_db', schema='test_schema', @@ -297,6 +300,7 @@ def test_patch_ok(self): 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, 'database': 'test_db', + 'deferred': False, 'description': 'The foo model', 'schema': 'test_schema', 'alias': 'bar', @@ -406,6 +410,7 @@ def _hook_ok(self): 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, 'database': 'test_db', + 'deferred': False, 'description': '', 'schema': 'test_schema', 'alias': 'bar', @@ -443,6 +448,7 @@ def test_ok(self): sources=[], depends_on=DependsOn(), description='', + deferred=False, database='test_db', schema='test_schema', alias='bar', @@ -487,6 +493,7 @@ def test_complex(self): 'refs': [], 'sources': [], 'depends_on': {'macros': [], 'nodes': ['model.test.bar']}, + 'deferred': False, 'database': 'test_db', 'description': 'My parsed node', 'schema': 'test_schema', @@ -530,6 +537,7 @@ def test_complex(self): sources=[], depends_on=DependsOn(nodes=['model.test.bar']), description='My parsed node', + deferred=False, database='test_db', schema='test_schema', alias='bar', @@ -594,6 +602,7 @@ def _complex(self): 'sources': [], 'depends_on': {'macros': [], 'nodes': ['model.test.bar']}, 'database': 'test_db', + 'deferred': False, 'description': 'My parsed node', 'schema': 'test_schema', 'alias': 'bar', @@ -642,6 +651,7 @@ def test_ok(self): 'refs': [], 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, + 'deferred': False, 'database': 'test_db', 'description': '', 'schema': 'test_schema', @@ -924,6 +934,7 @@ def _ts_ok(self): 'refs': [], 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, + 'deferred': False, 'database': 'test_db', 'description': '', 'schema': 'test_schema', @@ -1034,6 +1045,7 @@ def test_check_ok(self): 'sources': [], 'depends_on': {'macros': [], 'nodes': []}, 'database': 'test_db', + 'deferred': False, 'description': '', 'schema': 'test_schema', 'alias': 'bar', diff --git a/test/unit/test_manifest.py b/test/unit/test_manifest.py index e7d25489ed4..9acc26a9edf 100644 --- a/test/unit/test_manifest.py +++ b/test/unit/test_manifest.py @@ -30,6 +30,7 @@ 'depends_on', 'database', 'schema', 'name', 'resource_type', 'package_name', 'root_path', 'path', 'original_file_path', 'raw_sql', 'description', 'columns', 'fqn', 'build_path', 'patch_path', 'docs', + 'deferred', }) REQUIRED_COMPILED_NODE_KEYS = frozenset(REQUIRED_PARSED_NODE_KEYS | {