From 3c93c08d416b13eac22f00f9f2cd9860b79fe511 Mon Sep 17 00:00:00 2001 From: feluelle Date: Tue, 9 Jun 2020 15:11:21 +0200 Subject: [PATCH] Add S3ToRedshift example dag and system test - add howto docs for S3ToRedshift example dag - add terraform which runs terraform CLI commands in an isolated docker container NOTE: This system test uses terraform to provide the infrastructure needed to run this example dag. --- .gitignore | 10 ++ .pre-commit-config.yaml | 2 +- .../example_dags/example_s3_to_redshift.py | 91 +++++++++++++++++++ .../operator/amazon/aws/s3_to_redshift.rst | 79 ++++++++++++++++ docs/operators-and-hooks-ref.rst | 2 +- .../run_test_package_import_all_classes.sh | 2 +- scripts/ci/kubernetes/docker/bootstrap.sh | 2 +- scripts/ci/prepare_tool_scripts.sh | 43 +++++++++ .../example_s3_to_redshift/outputs.tf | 38 ++++++++ .../example_s3_to_redshift/resources.tf | 35 +++++++ .../example_s3_to_redshift/variables.tf | 25 +++++ .../operators/test_s3_to_redshift_system.py | 50 ++++++++++ tests/test_utils/terraform.py | 34 +++++++ 13 files changed, 409 insertions(+), 4 deletions(-) create mode 100644 airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py create mode 100644 docs/howto/operator/amazon/aws/s3_to_redshift.rst create mode 100644 tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/outputs.tf create mode 100644 tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/resources.tf create mode 100644 tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/variables.tf create mode 100644 tests/providers/amazon/aws/operators/test_s3_to_redshift_system.py create mode 100644 tests/test_utils/terraform.py diff --git a/.gitignore b/.gitignore index 6aea11496fb2a..b3314d2bbbb18 100644 --- a/.gitignore +++ b/.gitignore @@ -187,3 +187,13 @@ dmypy.json /.inputrc log.txt* /backport_packages/CHANGELOG.txt + +# Local .terraform directories +**/.terraform/* + +# .tfstate files +*.tfstate +*.tfstate.* + +# Terraform variables +*.tfvars diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0bec8342e925a..dc30879148e63 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -46,7 +46,7 @@ repos: - license-templates/LICENSE.txt - --fuzzy-match-generates-todo files: > - \.properties$|\.cfg$|\.conf$|\.ini$|\.ldif$|\.readthedocs$|\.service$|^Dockerfile.*$ + \.properties$|\.cfg$|\.conf$|\.ini$|\.ldif$|\.readthedocs$|\.service$|\.tf$|^Dockerfile.*$ - id: insert-license name: Add license for all rst files exclude: ^\.github/.*$ diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py new file mode 100644 index 0000000000000..c36d443788738 --- /dev/null +++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +This is an example dag for using `S3ToRedshiftTransferOperator` to copy a S3 key into a Redshift table. +""" + +from os import getenv + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.operators.s3_to_redshift import S3ToRedshiftTransferOperator +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.utils.dates import days_ago + +# [START howto_operator_s3_to_redshift_env_variables] +S3_BUCKET = getenv("S3_BUCKET") +S3_KEY = getenv("S3_KEY", "key") +REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table") +# [END howto_operator_s3_to_redshift_env_variables] + +default_args = {"start_date": days_ago(1)} + + +def _add_sample_data_to_s3(): + s3_hook = S3Hook() + s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET, replace=True) + + +def _remove_sample_data_from_s3(): + s3_hook = S3Hook() + if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET): + s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{REDSHIFT_TABLE}') + + +with DAG( + dag_id="example_s3_to_redshift", + default_args=default_args, + schedule_interval=None, + tags=['example'] +) as dag: + setup__task_add_sample_data_to_s3 = PythonOperator( + python_callable=_add_sample_data_to_s3, + task_id='setup__add_sample_data_to_s3' + ) + setup__task_create_table = PostgresOperator( + sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name varchar)', + postgres_conn_id='redshift_default', + task_id='setup__create_table' + ) + # [START howto_operator_s3_to_redshift_task_1] + task_transfer_s3_to_redshift = S3ToRedshiftTransferOperator( + s3_bucket=S3_BUCKET, + s3_key=S3_KEY, + schema="PUBLIC", + table=REDSHIFT_TABLE, + copy_options=['csv'], + task_id='transfer_s3_to_redshift' + ) + # [END howto_operator_s3_to_redshift_task_1] + teardown__task_drop_table = PostgresOperator( + sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}', + postgres_conn_id='redshift_default', + task_id='teardown__drop_table' + ) + teardown__task_remove_sample_data_from_s3 = PythonOperator( + python_callable=_remove_sample_data_from_s3, + task_id='teardown__remove_sample_data_from_s3' + ) + [ + setup__task_add_sample_data_to_s3, + setup__task_create_table + ] >> task_transfer_s3_to_redshift >> [ + teardown__task_drop_table, + teardown__task_remove_sample_data_from_s3 + ] diff --git a/docs/howto/operator/amazon/aws/s3_to_redshift.rst b/docs/howto/operator/amazon/aws/s3_to_redshift.rst new file mode 100644 index 0000000000000..7a634da2838c8 --- /dev/null +++ b/docs/howto/operator/amazon/aws/s3_to_redshift.rst @@ -0,0 +1,79 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _howto/operator:S3ToRedshiftTransferOperator: + +S3 To Redshift Transfer Operator +================================ + +.. contents:: + :depth: 1 + :local: + +Overview +-------- + +The ``S3ToRedshiftTransferOperator`` copies data from a S3 Bucket into a Redshift table. + +The example dag provided showcases the +:class:`~airflow.providers.amazon.aws.operators.s3_to_redshift.S3ToRedshiftTransferOperator` +in action. + + - example_s3_to_redshift.py + +example_s3_to_redshift.py +------------------------- + +Purpose +""""""" + +This is a basic example dag for using ``S3ToRedshiftTransferOperator`` to copies data from a S3 Bucket into a Redshift table. + +Environment variables +""""""""""""""""""""" + +This example relies on the following variables, which can be passed via OS environment variables. + +.. exampleinclude:: ../../../../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py + :language: python + :start-after: [START howto_operator_s3_to_redshift_env_variables] + :end-before: [END howto_operator_s3_to_redshift_env_variables] + +You need to set at least the ``S3_BUCKET``. + +Copy S3 key into Redshift table +""""""""""""""""""""""""""""""" + +In the following code we are copying the S3 key ``s3://{S3_BUCKET}/{S3_KEY}/{REDSHIFT_TABLE}`` into the Redshift table +``PUBLIC.{REDSHIFT_TABLE}``. + +.. exampleinclude:: ../../../../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py + :language: python + :start-after: [START howto_operator_s3_to_redshift_task_1] + :end-before: [END howto_operator_s3_to_redshift_task_1] + +You can find more information to the ``COPY`` command used +`here `__. + +Reference +--------- + +For further information, look at: + +* `AWS COPY from Amazon S3 Documentation `__ +* `AWS boto3 Library Documentation for S3 `__ diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 6f9377ba588f6..46f5f1d013925 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -539,7 +539,7 @@ These integrations allow you to copy data from/to Amazon Web Services. * - `Amazon Simple Storage Service (S3) `_ - `Amazon Redshift `__ - - + - :doc:`How to use ` - :mod:`airflow.providers.amazon.aws.operators.s3_to_redshift` * - `Amazon Simple Storage Service (S3) `_ diff --git a/scripts/ci/in_container/run_test_package_import_all_classes.sh b/scripts/ci/in_container/run_test_package_import_all_classes.sh index 4ad80c8e0405b..863e48b82bf0e 100755 --- a/scripts/ci/in_container/run_test_package_import_all_classes.sh +++ b/scripts/ci/in_container/run_test_package_import_all_classes.sh @@ -48,7 +48,7 @@ else fi echo -echo Installing all packages at once in Airlfow 1.10 +echo Installing all packages at once in Airflow 1.10 echo # Install all packages at once diff --git a/scripts/ci/kubernetes/docker/bootstrap.sh b/scripts/ci/kubernetes/docker/bootstrap.sh index 9099b6b5328fa..d8d99c2ff5a06 100755 --- a/scripts/ci/kubernetes/docker/bootstrap.sh +++ b/scripts/ci/kubernetes/docker/bootstrap.sh @@ -35,7 +35,7 @@ echo echo "Uninstalling pre-installed airflow" echo -# Uninstall preinstalled Apache Airlfow +# Uninstall preinstalled Apache Airflow pip uninstall -y apache-airflow diff --git a/scripts/ci/prepare_tool_scripts.sh b/scripts/ci/prepare_tool_scripts.sh index 7a98c501b86c7..ecdde3cc1f39d 100755 --- a/scripts/ci/prepare_tool_scripts.sh +++ b/scripts/ci/prepare_tool_scripts.sh @@ -62,3 +62,46 @@ prepare_tool_script "mcr.microsoft.com/azure-cli:latest" ".azure" az az prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" bq bq prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" gcloud gcloud prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" gsutil gsutil + +function prepare_terraform_script() { + TOOL="terraform" + IMAGE="hashicorp/terraform:latest" + + TARGET_TOOL_PATH="/usr/bin/${TOOL}" + TARGET_TOOL_UPDATE_PATH="/usr/bin/${TOOL}-update" + + cat >"${TARGET_TOOL_PATH}" </dev/null 2>&1 +fi +exit \${RES} +EOF + + cat >"${TARGET_TOOL_UPDATE_PATH}" < None: + super().setUp() + host, port = self.get_tf_output("redshift_endpoint").split(':') + schema = self.get_tf_output("redshift_database_name") + login = self.get_tf_output("redshift_master_username") + password = self.get_tf_output("redshift_master_password") + db.merge_conn(Connection("redshift_default", "postgres", host, login, password, schema, port)) + + def test_run_example_dag_s3_to_redshift(self): + self.run_dag('example_s3_to_redshift', AWS_DAG_FOLDER) + + def tearDown(self) -> None: + super().tearDown() + with create_session() as session: + session.query(Connection).filter(Connection.conn_id == "redshift_default").delete() diff --git a/tests/test_utils/terraform.py b/tests/test_utils/terraform.py new file mode 100644 index 0000000000000..a85d4fc6c2a57 --- /dev/null +++ b/tests/test_utils/terraform.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from tests.test_utils.system_tests_class import SystemTest + + +class Terraform(SystemTest): + TERRAFORM_DIR: str + + def setUp(self) -> None: + self.execute_cmd(["terraform", "init", "-input=false", self.TERRAFORM_DIR]) + self.execute_cmd(["terraform", "plan", "-input=false", self.TERRAFORM_DIR]) + self.execute_cmd(["terraform", "apply", "-input=false", "-auto-approve", self.TERRAFORM_DIR]) + + def get_tf_output(self, name): + return self.check_output(["terraform", "output", name]).decode('utf-8').replace("\r\n", "") + + def tearDown(self) -> None: + self.execute_cmd(["terraform", "plan", "-destroy", "-input=false", self.TERRAFORM_DIR]) + self.execute_cmd(["terraform", "destroy", "-input=false", "-auto-approve", self.TERRAFORM_DIR])