Skip to content

Commit

Permalink
Add S3ToRedshift example dag and system test
Browse files Browse the repository at this point in the history
- add howto docs for S3ToRedshift example dag
- add terraform which runs terraform CLI commands in an isolated docker container

NOTE: This system test uses terraform to provide the infrastructure needed to run this example dag.
  • Loading branch information
feluelle committed Jun 9, 2020
1 parent d8e5490 commit 3c93c08
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 4 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,13 @@ dmypy.json
/.inputrc
log.txt*
/backport_packages/CHANGELOG.txt

# Local .terraform directories
**/.terraform/*

# .tfstate files
*.tfstate
*.tfstate.*

# Terraform variables
*.tfvars
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ repos:
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
files: >
\.properties$|\.cfg$|\.conf$|\.ini$|\.ldif$|\.readthedocs$|\.service$|^Dockerfile.*$
\.properties$|\.cfg$|\.conf$|\.ini$|\.ldif$|\.readthedocs$|\.service$|\.tf$|^Dockerfile.*$
- id: insert-license
name: Add license for all rst files
exclude: ^\.github/.*$
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is an example dag for using `S3ToRedshiftTransferOperator` to copy a S3 key into a Redshift table.
"""

from os import getenv

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3_to_redshift import S3ToRedshiftTransferOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago

# [START howto_operator_s3_to_redshift_env_variables]
S3_BUCKET = getenv("S3_BUCKET")
S3_KEY = getenv("S3_KEY", "key")
REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table")
# [END howto_operator_s3_to_redshift_env_variables]

default_args = {"start_date": days_ago(1)}


def _add_sample_data_to_s3():
s3_hook = S3Hook()
s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET, replace=True)


def _remove_sample_data_from_s3():
s3_hook = S3Hook()
if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET):
s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{REDSHIFT_TABLE}')


with DAG(
dag_id="example_s3_to_redshift",
default_args=default_args,
schedule_interval=None,
tags=['example']
) as dag:
setup__task_add_sample_data_to_s3 = PythonOperator(
python_callable=_add_sample_data_to_s3,
task_id='setup__add_sample_data_to_s3'
)
setup__task_create_table = PostgresOperator(
sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name varchar)',
postgres_conn_id='redshift_default',
task_id='setup__create_table'
)
# [START howto_operator_s3_to_redshift_task_1]
task_transfer_s3_to_redshift = S3ToRedshiftTransferOperator(
s3_bucket=S3_BUCKET,
s3_key=S3_KEY,
schema="PUBLIC",
table=REDSHIFT_TABLE,
copy_options=['csv'],
task_id='transfer_s3_to_redshift'
)
# [END howto_operator_s3_to_redshift_task_1]
teardown__task_drop_table = PostgresOperator(
sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}',
postgres_conn_id='redshift_default',
task_id='teardown__drop_table'
)
teardown__task_remove_sample_data_from_s3 = PythonOperator(
python_callable=_remove_sample_data_from_s3,
task_id='teardown__remove_sample_data_from_s3'
)
[
setup__task_add_sample_data_to_s3,
setup__task_create_table
] >> task_transfer_s3_to_redshift >> [
teardown__task_drop_table,
teardown__task_remove_sample_data_from_s3
]
79 changes: 79 additions & 0 deletions docs/howto/operator/amazon/aws/s3_to_redshift.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/operator:S3ToRedshiftTransferOperator:

S3 To Redshift Transfer Operator
================================

.. contents::
:depth: 1
:local:

Overview
--------

The ``S3ToRedshiftTransferOperator`` copies data from a S3 Bucket into a Redshift table.

The example dag provided showcases the
:class:`~airflow.providers.amazon.aws.operators.s3_to_redshift.S3ToRedshiftTransferOperator`
in action.

- example_s3_to_redshift.py

example_s3_to_redshift.py
-------------------------

Purpose
"""""""

This is a basic example dag for using ``S3ToRedshiftTransferOperator`` to copies data from a S3 Bucket into a Redshift table.

Environment variables
"""""""""""""""""""""

This example relies on the following variables, which can be passed via OS environment variables.

.. exampleinclude:: ../../../../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
:language: python
:start-after: [START howto_operator_s3_to_redshift_env_variables]
:end-before: [END howto_operator_s3_to_redshift_env_variables]

You need to set at least the ``S3_BUCKET``.

Copy S3 key into Redshift table
"""""""""""""""""""""""""""""""

In the following code we are copying the S3 key ``s3://{S3_BUCKET}/{S3_KEY}/{REDSHIFT_TABLE}`` into the Redshift table
``PUBLIC.{REDSHIFT_TABLE}``.

.. exampleinclude:: ../../../../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
:language: python
:start-after: [START howto_operator_s3_to_redshift_task_1]
:end-before: [END howto_operator_s3_to_redshift_task_1]

You can find more information to the ``COPY`` command used
`here <https://docs.aws.amazon.com/us_en/redshift/latest/dg/copy-parameters-data-source-s3.html>`__.

Reference
---------

For further information, look at:

* `AWS COPY from Amazon S3 Documentation <https://docs.aws.amazon.com/us_en/redshift/latest/dg/copy-parameters-data-source-s3.html>`__
* `AWS boto3 Library Documentation for S3 <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html>`__
2 changes: 1 addition & 1 deletion docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ These integrations allow you to copy data from/to Amazon Web Services.

* - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
- `Amazon Redshift <https://aws.amazon.com/redshift/>`__
-
- :doc:`How to use <howto/operator/amazon/aws/s3_to_redshift>`
- :mod:`airflow.providers.amazon.aws.operators.s3_to_redshift`

* - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ else
fi

echo
echo Installing all packages at once in Airlfow 1.10
echo Installing all packages at once in Airflow 1.10
echo

# Install all packages at once
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/kubernetes/docker/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ echo
echo "Uninstalling pre-installed airflow"
echo

# Uninstall preinstalled Apache Airlfow
# Uninstall preinstalled Apache Airflow
pip uninstall -y apache-airflow


Expand Down
43 changes: 43 additions & 0 deletions scripts/ci/prepare_tool_scripts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,46 @@ prepare_tool_script "mcr.microsoft.com/azure-cli:latest" ".azure" az az
prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" bq bq
prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" gcloud gcloud
prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" gsutil gsutil

function prepare_terraform_script() {
TOOL="terraform"
IMAGE="hashicorp/terraform:latest"

TARGET_TOOL_PATH="/usr/bin/${TOOL}"
TARGET_TOOL_UPDATE_PATH="/usr/bin/${TOOL}-update"

cat >"${TARGET_TOOL_PATH}" <<EOF
#!/usr/bin/env bash
docker run --rm -it \
-v "\${HOST_AIRFLOW_SOURCES}/tmp:/tmp" \
-v "\${HOST_AIRFLOW_SOURCES}/files:/files" \
-v "\${HOST_AIRFLOW_SOURCES}:/opt/airflow" \
-v "\${HOST_HOME}/.aws:/root/.aws" \
-v "\${HOST_HOME}/.azure:/root/.azure" \
-v "\${HOST_HOME}/.config/gcloud:/root/.config/gcloud" \
-w /opt/airflow \
--env-file <(env | grep TF) \
"${IMAGE}" "\$@"
RES=\$?
if [[ \${HOST_OS} == "Linux" ]]; then
docker run --rm \
-v "\${HOST_AIRFLOW_SOURCES}/tmp:/tmp" \
-v "\${HOST_AIRFLOW_SOURCES}/files:/files" \
-v "\${HOST_HOME}/.aws:/root/.aws" \
-v "\${HOST_HOME}/.azure:/root/.azure" \
-v "\${HOST_HOME}/.config/gcloud:/root/.config/gcloud" \
"\${AIRFLOW_CI_IMAGE}" bash -c \
"find '/tmp/' '/files/' '/root/.aws' '/root/.azure' '/root/.config/gcloud' -user root -print0 | xargs --null chown '\${HOST_USER_ID}.\${HOST_GROUP_ID}' --no-dereference" >/dev/null 2>&1
fi
exit \${RES}
EOF

cat >"${TARGET_TOOL_UPDATE_PATH}" <<EOF
#!/usr/bin/env bash
docker pull "${IMAGE}"
EOF

chmod a+x "${TARGET_TOOL_PATH}" "${TARGET_TOOL_UPDATE_PATH}"
}

prepare_terraform_script
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

output "redshift_endpoint" {
value = aws_redshift_cluster.redshift.endpoint
description = "The redshift endpoint which is needed to create an airflow connection."
}

output "redshift_database_name" {
value = aws_redshift_cluster.redshift.database_name
description = "The redshift database name which is needed to create an airflow connection."
}

output "redshift_master_username" {
value = aws_redshift_cluster.redshift.master_username
description = "The redshift username which is needed to create an airflow connection."
sensitive = true
}

output "redshift_master_password" {
value = aws_redshift_cluster.redshift.master_password
description = "The redshift password which is needed to create an airflow connection."
sensitive = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

provider "aws" {
region = var.aws_region
}

resource "aws_s3_bucket" "s3" {
bucket = var.s3_bucket
force_destroy = true
}

resource "aws_redshift_cluster" "redshift" {
cluster_identifier = var.redshift_cluster_identifier
database_name = var.redshift_database_name
master_username = var.redshift_master_username
master_password = var.redshift_master_password
node_type = "dc1.large"
cluster_type = "single-node"
skip_final_snapshot = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

variable "aws_region" {}

variable "s3_bucket" {}

variable "redshift_cluster_identifier" {}
variable "redshift_database_name" {}
variable "redshift_master_username" {}
variable "redshift_master_password" {}
Loading

0 comments on commit 3c93c08

Please sign in to comment.