-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add S3ToRedshift example dag and system test #8877
Conversation
cdbdff7
to
3abe736
Compare
3abe736
to
1e977e1
Compare
Ready for review. PTAL @ashb @potiuk Maybe @xinbinhuang @mustafagok @baolsen if you have time :) |
|
||
# [START howto_operator_s3_to_redshift_env_variables] | ||
S3_BUCKET = getenv("S3_BUCKET", "bucket") | ||
S3_KEY = getenv("S3_KEY", "key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we relying on env vars, rather Airflow Variables+templates? {{ var.s3_key}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have this in all of our system tests. So that we can easily pass them to the example dags. See /~https://github.com/apache/airflow/blob/master/TESTING.rst#environment-for-system-tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to switch to Airflow Variables+templates like Ash said and set them via env variables: https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#storing-variables-in-environment-variables.
So we only need to prefix all env vars with AIRFLOW_VAR
.
For example: set AIRFLOW_VAR_S3_KEY=key
in variables.env
and access them via {{ var.s3_key}}
here. WDYT @potiuk @turbaszek ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But how do we then define defaults in case these var.
s are not set? Or should we make all vars required then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine and when we automate it, we should define files where we define ENV variables.
Using Airflow Variables in example DAGs is a very bad idea. People use it as - well - examples - and using airflow Variables in DAGs in this way is very bad from Database load point of view. Until we avoid parsing the DAGs by all entities frequently we should not use variables in this way (we even have best practice about it) - basically every time the DAG is parsed, DB communication occurs and the extra connection is made - that is quite bad for scalability and not obvious at all, so if people take that literally as examples (which they did in the past) they will implement bad practices.
tests/providers/amazon/aws/operators/test_s3_to_redshift_system.py
Outdated
Show resolved
Hide resolved
398fa82
to
ad3a6fc
Compare
138645f
to
4fd0a69
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First try (this works 🎉 ) - I switched to terraform for providing infra. WDYT? @potiuk @ashb @mik-laj @turbaszek @kaxil
airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
Outdated
Show resolved
Hide resolved
tests/test_utils/terraform.py
Outdated
class Terraform(SystemTest): | ||
TERRAFORM_DIR: str | ||
|
||
def setUp(self) -> None: | ||
self.execute_cmd(["terraform", "init", "-input=false", self.TERRAFORM_DIR]) | ||
self.execute_cmd(["terraform", "plan", "-input=false", self.TERRAFORM_DIR]) | ||
self.execute_cmd(["terraform", "apply", "-input=false", "-auto-approve", self.TERRAFORM_DIR]) | ||
|
||
def get_tf_output(self, name): | ||
output = self.check_output(["terraform", "output", name]).decode('utf-8').replace("\r\n", "") | ||
self.log.info(output) | ||
return output | ||
|
||
def tearDown(self) -> None: | ||
self.execute_cmd(["terraform", "plan", "-destroy", "-input=false", self.TERRAFORM_DIR]) | ||
self.execute_cmd(["terraform", "destroy", "-input=false", "-auto-approve", self.TERRAFORM_DIR]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite happy about it. This can probably be structured better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider using/taking a look at /~https://github.com/beelit94/python-terraform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked this out, but didn't find that really useful to use an extra package only to wrap bash inside python. And it also seems to not be compatible to the latest terraform 0.12.x releases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I would suggest to add init
, plan
, apply
as methods/functions to limit the number of replicated code using self.execute_cmd
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could do that.
The real issue I am having with this, is also that super().setUp()
and super().tearDown()
are not explicit enough. Terraform is very much hidden behind them. You can easily forget to call those when you overwrite those which for super().tearDown()
could be dramatically. But maybe we could add a static check that checks that?!
4fd0a69
to
16bb5c7
Compare
My variables which are not committed and I used for testing are: variables.env
and terraform.tfvars
I added terraform.tfvars to .gitignore, because I was not sure if we should commit it since it contains "sensitive information" eventhough we only use it for testing. The full example dag takes about 7 mins to run. |
ea8c668
to
c9406b0
Compare
- add howto docs for S3ToRedshift example dag - add terraform which runs terraform CLI commands in an isolated docker container NOTE: This system test uses terraform to provide the infrastructure needed to run this example dag.
c9406b0
to
3c93c08
Compare
|
||
# [START howto_operator_s3_to_redshift_env_variables] | ||
S3_BUCKET = getenv("S3_BUCKET", "bucket") | ||
S3_KEY = getenv("S3_KEY", "key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine and when we automate it, we should define files where we define ENV variables.
Using Airflow Variables in example DAGs is a very bad idea. People use it as - well - examples - and using airflow Variables in DAGs in this way is very bad from Database load point of view. Until we avoid parsing the DAGs by all entities frequently we should not use variables in this way (we even have best practice about it) - basically every time the DAG is parsed, DB communication occurs and the extra connection is made - that is quite bad for scalability and not obvious at all, so if people take that literally as examples (which they did in the past) they will implement bad practices.
NOTE: This system test uses terraform to provide the infrastructure needed to run this example dag.
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.