Skip to content

Commit

Permalink
[AIRFLOW-XXX] Fix syntax docs errors
Browse files Browse the repository at this point in the history
Co-authored-by: Jarek Potiuk <jarek.potiuk@polidea.com>
  • Loading branch information
Kamil Breguła and potiuk committed Jan 1, 2000
1 parent 0044260 commit 9ed86ef
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 96 deletions.
22 changes: 11 additions & 11 deletions airflow/contrib/example_dags/example_gcp_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@
This DAG relies on the following OS environment variables
https://airflow.apache.org/concepts.html#variables
* GCP_PROJECT_ID - Google Cloud Project to use for the Cloud Function.
* GCP_LOCATION - Google Cloud Functions region where the function should be
created.
* GCF_ENTRYPOINT - Name of the executable function in the source code.
* and one of the below:
- GCF_SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
or
(
- GCF_SOURCE_UPLOAD_URL - Generated upload URL for the zipped source
and
- GCF_ZIP_PATH - Local path to the zipped source archive
)
or
- GCF_SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function
is defined in a supported Cloud Source Repository URL format
https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
* GCF_SOURCE_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage
* GCF_SOURCE_UPLOAD_URL - Generated upload URL for the zipped source and GCF_ZIP_PATH - Local path to
the zipped source archive
* GCF_SOURCE_REPOSITORY - The URL pointing to the hosted repository where the function
is defined in a supported Cloud Source Repository URL format
https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#SourceRepository
"""

import os
Expand Down
9 changes: 4 additions & 5 deletions airflow/contrib/example_dags/example_gcp_spanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
* GCP_SPANNER_INSTANCE_ID - Cloud Spanner instance ID.
* GCP_SPANNER_DATABASE_ID - Cloud Spanner database ID.
* GCP_SPANNER_CONFIG_NAME - The name of the instance's configuration. Values are of the
form projects/<gcp_project>/instanceConfigs/<configuration>.
See also:
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs
form ``projects/<gcp_project>/instanceConfigs/<configuration>``. See also:
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs
* GCP_SPANNER_NODE_COUNT - Number of nodes allocated to the instance.
* GCP_SPANNER_DISPLAY_NAME - The descriptive name for this instance as it appears in UIs.
Must be unique per project and between 4 and 30 characters in length.
Must be unique per project and between 4 and 30 characters in length.
"""

import os
Expand Down
31 changes: 18 additions & 13 deletions airflow/contrib/hooks/gcp_vision_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@ class NameDeterminer:
"""
Class used for checking if the entity has the 'name' attribute set.
- If so, no action is taken.
- If not, and the name can be constructed from other parameters provided, it is created and filled in
* If so, no action is taken.
* If not, and the name can be constructed from other parameters provided, it is created and filled in
the entity.
- If both the entity's 'name' attribute is set and the name can be constructed from other parameters
* If both the entity's 'name' attribute is set and the name can be constructed from other parameters
provided:
- If they are the same: no action is taken.
- If they are different: an exception is thrown.
* If they are the same - no action is taken
* if they are different - an exception is thrown.
"""

def __init__(self, label, id_label, get_path):
Expand Down Expand Up @@ -122,7 +127,7 @@ def create_product_set(
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
"""
client = self.get_conn()
parent = ProductSearchClient.location_path(project_id, location)
Expand Down Expand Up @@ -152,7 +157,7 @@ def get_product_set(
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
Expand Down Expand Up @@ -182,7 +187,7 @@ def update_product_set(
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
"""
client = self.get_conn()
product_set = self.product_set_name_determiner.get_entity_with_name(
Expand All @@ -207,7 +212,7 @@ def delete_product_set(
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
Expand All @@ -228,7 +233,7 @@ def create_product(
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator`
"""
client = self.get_conn()
parent = ProductSearchClient.location_path(project_id, location)
Expand Down Expand Up @@ -256,7 +261,7 @@ def create_product(
def get_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_path(project_id, location, product_id)
Expand Down Expand Up @@ -286,7 +291,7 @@ def update_product(
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator`
"""
client = self.get_conn()
product = self.product_name_determiner.get_entity_with_name(product, product_id, location, project_id)
Expand All @@ -307,7 +312,7 @@ def update_product(
def delete_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_path(project_id, location, product_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,11 @@ def create(self, pod):


class ExtractXcomPodRequestFactory(KubernetesRequestFactory):

XCOM_MOUNT_PATH = '/airflow/xcom'
SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
"""
Request generator for a pod with sidecar container.
"""
XCOM_MOUNT_PATH = '/airflow/xcom'
SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
_yaml = """apiVersion: v1
kind: Pod
metadata:
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/jenkins_job_trigger_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def jenkins_request_with_headers(jenkins_server, req):
:param jenkins_server: The server to query
:param req: The request to execute
:return: Dict containing the response body (key body)
and the headers coming along (headers)
and the headers coming along (headers)
"""
try:
response = jenkins_server.jenkins_request(req)
Expand Down
33 changes: 17 additions & 16 deletions airflow/contrib/utils/gcp_field_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,20 @@
Typically (for clarity and in order to aid syntax highlighting) the array of
dicts should be defined as series of dict() executions. Fragment of example
specification might look as follows:
```
SPECIFICATION =[
dict(name="an_union", type="union", optional=True, fields=[
dict(name="variant_1", type="dict"),
dict(name="variant_2", regexp=r'^.+$', api_version='v1beta2'),
),
dict(name="an_union", type="dict", fields=[
dict(name="field_1", type="dict"),
dict(name="field_2", regexp=r'^.+$'),
),
...
]
```
specification might look as follows::
SPECIFICATION =[
dict(name="an_union", type="union", optional=True, fields=[
dict(name="variant_1", type="dict"),
dict(name="variant_2", regexp=r'^.+$', api_version='v1beta2'),
),
dict(name="an_union", type="dict", fields=[
dict(name="field_1", type="dict"),
dict(name="field_2", regexp=r'^.+$'),
),
...
]
Each field should have key = "name" indicating field name. The field can be of one of the
following types:
Expand Down Expand Up @@ -311,14 +310,15 @@ def _validate_field(self, validation_spec, dictionary_to_validate, parent=None,
force_optional=False):
"""
Validates if field is OK.
:param validation_spec: specification of the field
:type validation_spec: dict
:param dictionary_to_validate: dictionary where the field should be present
:type dictionary_to_validate: dict
:param parent: full path of parent field
:type parent: str
:param force_optional: forces the field to be optional
(all union fields have force_optional set to True)
(all union fields have force_optional set to True)
:type force_optional: bool
:return: True if the field is present
"""
Expand Down Expand Up @@ -413,6 +413,7 @@ def validate(self, body_to_validate):
instantiated with. Raises ValidationSpecificationException or
ValidationFieldException in case of problems with specification or the
body not conforming to the specification respectively.
:param body_to_validate: body that must follow the specification
:type body_to_validate: dict
:return: None
Expand Down
14 changes: 7 additions & 7 deletions airflow/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ def wrapper(self, context, *args, **kwargs):

def prepare_lineage(func):
"""
Prepares the lineage inlets and outlets
inlets can be:
"auto" -> picks up any outlets from direct upstream tasks that have outlets
defined, as such that if A -> B -> C and B does not have outlets but A does,
these are provided as inlets.
"list of task_ids" -> picks up outlets from the upstream task_ids
"list of datasets" -> manually defined list of DataSet
Prepares the lineage inlets and outlets. Inlets can be:
* "auto" -> picks up any outlets from direct upstream tasks that have outlets defined, as such that
if A -> B -> C and B does not have outlets but A does, these are provided as inlets.
* "list of task_ids" -> picks up outlets from the upstream task_ids
* "list of datasets" -> manually defined list of DataSet
"""
@wraps(func)
def wrapper(self, context, *args, **kwargs):
Expand Down
3 changes: 1 addition & 2 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import copy
from collections import defaultdict, namedtuple, OrderedDict

from builtins import ImportError as BuiltinImportError, bytes, object, str
from future.standard_library import install_aliases

Expand Down Expand Up @@ -3941,7 +3940,7 @@ def run(
:param local: True to run the tasks using the LocalExecutor
:type local: bool
:param executor: The executor instance to run the tasks
:type executor: BaseExecutor
:type executor: airflow.executor.BaseExecutor
:param donot_pickle: True to avoid pickling DAG object and send to workers
:type donot_pickle: bool
:param ignore_task_deps: True to skip upstream tasks
Expand Down
11 changes: 5 additions & 6 deletions airflow/task/task_runner/base_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,12 @@ def _read_task_logs(self, stream):

def run_command(self, run_with=None, join_args=False):
"""
Run the task command
Run the task command.
:param run_with: list of tokens to run the task command with
E.g. ['bash', '-c']
:param run_with: list of tokens to run the task command with e.g. ``['bash', '-c']``
:type run_with: list
:param join_args: whether to concatenate the list of command tokens
E.g. ['airflow', 'run'] vs ['airflow run']
:param join_args: whether to concatenate the list of command tokens e.g. ``['airflow', 'run']`` vs
``['airflow run']``
:param join_args: bool
:return: the process that was run
:rtype: subprocess.Popen
Expand Down Expand Up @@ -146,7 +145,7 @@ def start(self):
def return_code(self):
"""
:return: The return code associated with running the task instance or
None if the task is not yet done.
None if the task is not yet done.
:rtype: int
"""
raise NotImplementedError()
Expand Down
1 change: 1 addition & 0 deletions airflow/ti_deps/dep_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DepContext(object):

For example there could be a SomeRunContext that subclasses this class which has
dependencies for:

- Making sure there are slots available on the infrastructure to run the task instance
- A task-instance's task-specific dependencies are met (e.g. the previous task
instance completed successfully)
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/asciiart.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
#
bug = r"""\
bug = r"""
=, .=
=.| ,---. |.=
=.| "-(:::::)-" |.=
Expand Down
26 changes: 15 additions & 11 deletions airflow/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,21 @@
def date_range(start_date, end_date=None, num=None, delta=None):
"""
Get a set of dates as a list based on a start, end and delta, delta
can be something that can be added to ``datetime.datetime``
or a cron expression as a ``str``
can be something that can be added to `datetime.datetime`
or a cron expression as a `str`

:Example::

date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *')
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0),
datetime.datetime(2016, 3, 1, 0, 0)]

:param start_date: anchor date to start the series from
:type start_date: datetime.datetime
:param end_date: right boundary for the date range
Expand All @@ -51,15 +64,6 @@ def date_range(start_date, end_date=None, num=None, delta=None):
number of entries you want in the range. This number can be negative,
output will always be sorted regardless
:type num: int
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *')
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0),
datetime.datetime(2016, 3, 1, 0, 0)]
"""
if not delta:
return []
Expand Down
2 changes: 2 additions & 0 deletions airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class UtcDateTime(TypeDecorator):
"""
Almost equivalent to :class:`~sqlalchemy.types.DateTime` with
``timezone=True`` option, but it differs from that by:

- Never silently take naive :class:`~datetime.datetime`, instead it
always raise :exc:`ValueError` unless time zone aware value.
- :class:`~datetime.datetime` value's :attr:`~datetime.datetime.tzinfo`
Expand All @@ -142,6 +143,7 @@ class UtcDateTime(TypeDecorator):
it never return naive :class:`~datetime.datetime`, but time zone
aware value, even with SQLite or MySQL.
- Always returns DateTime in UTC

"""

impl = DateTime(timezone=True)
Expand Down
Loading

0 comments on commit 9ed86ef

Please sign in to comment.