Skip to content

Commit

Permalink
Merge pull request #2656 from fishtown-analytics/feature/defer-to-prod
Browse files Browse the repository at this point in the history
Feature/defer to prod
  • Loading branch information
beckjake authored Jul 31, 2020
2 parents 618d491 + 2ed9764 commit 1fb8c9c
Show file tree
Hide file tree
Showing 32 changed files with 455 additions and 47 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## dbt 0.18.0b2 (July 30, 2020)

### Features
- Added `--defer` and `--state` flags to `dbt run`, to defer to a previously generated manifest for unselected nodes in a run. ([#2527](/~https://github.com/fishtown-analytics/dbt/issues/2527), [#2656](/~https://github.com/fishtown-analytics/dbt/pull/2656))


### Breaking changes
- Previously, dbt put macros from all installed plugins into the namespace. This version of dbt will not include adapter plugin macros unless they are from the currently-in-use adapter or one of its dependencies [#2590](/~https://github.com/fishtown-analytics/dbt/pull/2590)
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/clients/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def write_file(path: str, contents: str = '') -> bool:
return True


def read_json(path: str) -> Dict[str, Any]:
return json.loads(load_file_contents(path))


def write_json(path: str, data: Dict[str, Any]) -> bool:
return write_file(path, json.dumps(data, cls=dbt.utils.JSONEncoder))

Expand Down
13 changes: 7 additions & 6 deletions core/dbt/contracts/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ class InjectedCTE(JsonSchemaMixin, Replaceable):
id: str
sql: str

# for some frustrating reason, we can't subclass from ParsedNode directly,
# or typing.Union will flatten CompiledNode+ParsedNode into just ParsedNode.
# TODO: understand that issue and come up with some way for these two to share
# logic

@dataclass
class CompiledNodeMixin(JsonSchemaMixin):
# this is a special mixin class to provide a required argument. If a node
# is missing a `compiled` flag entirely, it must not be a CompiledNode.
compiled: bool


@dataclass
class CompiledNode(ParsedNode):
compiled: bool = False
class CompiledNode(ParsedNode, CompiledNodeMixin):
compiled_sql: Optional[str] = None
extra_ctes_injected: bool = False
extra_ctes: List[InjectedCTE] = field(default_factory=list)
Expand Down
45 changes: 36 additions & 9 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
from dataclasses import dataclass, field
from datetime import datetime
from itertools import chain
from itertools import chain, islice
from multiprocessing.synchronize import Lock
from typing import (
Dict, List, Optional, Union, Mapping, MutableMapping, Any, Set, Tuple,
Expand All @@ -22,7 +22,7 @@
ParsedMacro, ParsedDocumentation, ParsedNodePatch, ParsedMacroPatch,
ParsedSourceDefinition
)
from dbt.contracts.util import Writable, Replaceable
from dbt.contracts.util import Readable, Writable, Replaceable
from dbt.exceptions import (
raise_duplicate_resource_name, InternalException, raise_compiler_error,
warn_or_error, raise_invalid_patch
Expand Down Expand Up @@ -1011,25 +1011,52 @@ def resolve_doc(
return result
return None

def merge_from_artifact(
self,
other: 'WritableManifest',
selected: Set[UniqueID],
) -> None:
"""Given the selected unique IDs and a writable manifest, update this
manifest by replacing any unselected nodes with their counterpart.
Only non-ephemeral refable nodes are examined.
"""
refables = set(NodeType.refable())
merged = set()
for unique_id, node in other.nodes.items():
if (
node.resource_type in refables and
not node.is_ephemeral and
unique_id not in selected
):
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)

# log up to 5 items
sample = list(islice(merged, 5))
logger.debug(
f'Merged {len(merged)} items from state (sample: {sample})'
)


@dataclass
class WritableManifest(JsonSchemaMixin, Writable):
nodes: Mapping[str, NonSourceNode] = field(
class WritableManifest(JsonSchemaMixin, Writable, Readable):
nodes: Mapping[UniqueID, NonSourceNode] = field(
metadata=dict(description=(
'The nodes defined in the dbt project and its dependencies'
)),
))
)
sources: Mapping[str, ParsedSourceDefinition] = field(
sources: Mapping[UniqueID, ParsedSourceDefinition] = field(
metadata=dict(description=(
'The sources defined in the dbt project and its dependencies',
'The sources defined in the dbt project and its dependencies'
))
)
macros: Mapping[str, ParsedMacro] = field(
macros: Mapping[UniqueID, ParsedMacro] = field(
metadata=dict(description=(
'The macros defined in the dbt project and its dependencies'
))
)
docs: Mapping[str, ParsedDocumentation] = field(
docs: Mapping[UniqueID, ParsedDocumentation] = field(
metadata=dict(description=(
'The docs defined in the dbt project and its dependencies'
))
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class ParsedNodeDefaults(ParsedNodeMandatory):
docs: Docs = field(default_factory=Docs)
patch_path: Optional[str] = None
build_path: Optional[str] = None
deferred: bool = False

def write_node(self, target_path: str, subdirectory: str, payload: str):
if (os.path.basename(self.path) ==
Expand Down
16 changes: 15 additions & 1 deletion core/dbt/contracts/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import dataclasses
from typing import List

from dbt.clients.system import write_json
from dbt.clients.system import write_json, read_json
from dbt.exceptions import RuntimeException


def list_str() -> List[str]:
Expand Down Expand Up @@ -76,3 +77,16 @@ def replace(self, **kwargs):
@property
def extra(self):
return self._extra


class Readable:
@classmethod
def read(cls, path: str):
try:
data = read_json(path)
except (EnvironmentError, ValueError) as exc:
raise RuntimeException(
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'
) from exc

return cls.from_dict(data) # type: ignore
11 changes: 11 additions & 0 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import multiprocessing
from pathlib import Path
from typing import Optional
# initially all flags are set to None, the on-load call of reset() will set
# them for their first time.
Expand All @@ -22,9 +23,19 @@ def env_set_truthy(key: str) -> Optional[str]:
return value


def env_set_path(key: str) -> Optional[Path]:
value = os.getenv(key)
if value is None:
return value
else:
return Path(value)


SINGLE_THREADED_WEBSERVER = env_set_truthy('DBT_SINGLE_THREADED_WEBSERVER')
SINGLE_THREADED_HANDLER = env_set_truthy('DBT_SINGLE_THREADED_HANDLER')
MACRO_DEBUGGING = env_set_truthy('DBT_MACRO_DEBUGGING')
DEFER_MODE = env_set_truthy('DBT_DEFER_TO_STATE')
ARTIFACT_STATE_PATH = env_set_path('DBT_ARTIFACT_STATE_PATH')


def _get_context():
Expand Down
102 changes: 80 additions & 22 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import traceback
from contextlib import contextmanager
from pathlib import Path

import dbt.version
import dbt.flags as flags
Expand All @@ -31,7 +32,7 @@

from dbt.utils import ExitCodes
from dbt.config import PROFILES_DIR, read_user_config
from dbt.exceptions import RuntimeException
from dbt.exceptions import RuntimeException, InternalException


class DBTVersion(argparse.Action):
Expand Down Expand Up @@ -62,6 +63,50 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.register('action', 'dbtversion', DBTVersion)

def add_optional_argument_inverse(
self,
name,
*,
enable_help=None,
disable_help=None,
dest=None,
no_name=None,
default=None,
):
mutex_group = self.add_mutually_exclusive_group()
if not name.startswith('--'):
raise InternalException(
'cannot handle optional argument without "--" prefix: '
f'got "{name}"'
)
if dest is None:
dest_name = name[2:].replace('-', '_')
else:
dest_name = dest

if no_name is None:
no_name = f'--no-{name[2:]}'

mutex_group.add_argument(
name,
action='store_const',
const=True,
dest=dest_name,
default=default,
help=enable_help,
)

mutex_group.add_argument(
f'--no-{name[2:]}',
action='store_const',
const=False,
dest=dest_name,
default=default,
help=disable_help,
)

return mutex_group


class RPCArgumentParser(DBTArgumentParser):
def exit(self, status=0, message=None):
Expand Down Expand Up @@ -279,6 +324,8 @@ def _build_base_subparser():
If set, bypass the adapter-level cache of database state
''',
)

base_subparser.set_defaults(defer=None, state=None)
return base_subparser


Expand Down Expand Up @@ -395,15 +442,39 @@ def _build_run_subparser(subparsers, base_subparser):
parents=[base_subparser],
help='''
Compile SQL and execute against the current target database.
''')
'''
)
run_sub.add_argument(
'-x',
'--fail-fast',
action='store_true',
help='''
Stop execution upon a first failure.
'''
Stop execution upon a first failure.
'''
)

# for now, this is a "dbt run"-only thing
run_sub.add_argument(
'--state',
help='''
If set, use the given directory as the source for json files to compare
with this project.
''',
type=Path,
default=flags.ARTIFACT_STATE_PATH,
)
run_sub.add_optional_argument_inverse(
'--defer',
enable_help='''
If set, defer to the state variable for resolving unselected nodes.
''',
disable_help='''
If set, do not defer to the state variable for resolving unselected
nodes.
''',
default=flags.DEFER_MODE,
)

run_sub.set_defaults(cls=run_task.RunTask, which='run', rpc_method='run')
return run_sub

Expand Down Expand Up @@ -830,30 +901,17 @@ def parse_args(args, cls=DBTArgumentParser):
'''
)

partial_flag = p.add_mutually_exclusive_group()
partial_flag.add_argument(
p.add_optional_argument_inverse(
'--partial-parse',
action='store_const',
const=True,
dest='partial_parse',
default=None,
help='''
enable_help='''
Allow for partial parsing by looking for and writing to a pickle file
in the target directory. This overrides the user configuration file.
WARNING: This can result in unexpected behavior if you use env_var()!
'''
)

partial_flag.add_argument(
'--no-partial-parse',
action='store_const',
const=False,
default=None,
dest='partial_parse',
help='''
''',
disable_help='''
Disallow partial parsing. This overrides the user configuration file.
'''
''',
)

# if set, run dbt in single-threaded mode: thread count is ignored, and
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from hologram import ValidationError

from .compile import CompileTask
from .runnable import write_manifest

from dbt.adapters.factory import get_adapter
from dbt.contracts.graph.compiled import CompileResultNode
Expand Down Expand Up @@ -274,7 +273,7 @@ def run(self) -> CatalogResults:
path = os.path.join(self.config.target_path, CATALOG_FILENAME)
results.write(path)
if self.args.compile:
write_manifest(self.config, self.manifest)
self.write_manifest()

if exceptions:
logger.error(
Expand Down
Loading

0 comments on commit 1fb8c9c

Please sign in to comment.