Skip to content

Commit

Permalink
Add documentation and warnings related to using different regions for…
Browse files Browse the repository at this point in the history
… Redshift and S3

Redshift throws extremely confusing errors when the Redshift cluster and S3 bucket are in different AWS regions. This patch expands the documentation to discuss how to fix these errors and adds logic to attempt to automatically detect and warn users when this case occurs.

Fixes #87.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #285 from JoshRosen/cross-region-s3.
  • Loading branch information
JoshRosen committed Oct 20, 2016
1 parent cdf192a commit d508d3e
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ env:
- secure: "cuyemI1bqPkWBD5B1FqIKDJb5g/SX5x8lrzkO0J/jkyGY0VLbHxrl5j/9PrKFuvraBK3HC56HEP1Zg+IMvh+uv0D+p5y14C97fAzE33uNgR2aVkamOo92zHvxvXe7zBtqc8rztWsJb1pgkrY7SdgSXgQc88ohey+XecDh4TahTY="
# AWS_S3_SCRATCH_SPACE
- secure: "LvndQIW6dHs6nyaMHtblGI/oL+s460lOezFs2BoD0Isenb/O/IM+nY5K9HepTXjJIcq8qvUYnojZX1FCrxxOXX2/+/Iihiq7GzJYdmdMC6hLg9bJYeAFk0dWYT88/AwadrJCBOa3ockRLhiO3dkai7Ki5+M1erfaFiAHHMpJxYQ="
# AWS_S3_CROSS_REGION_SCRATCH_SPACE
- secure: "esYmBqt256Dc77HT68zoaE/vtsFGk2N+Kt+52RlR0cjHPY1q5801vxLbeOlpYb2On3x8YckE++HadjL40gwSBsca0ffoogq6zTlfbJYDSQkQG1evxXWJZLcafB0igfBs/UbEUo7EaxoAJQcLgiWWwUdO0a0iU1ciSVyogZPagL0="

script:
- ./dev/run-tests-travis.sh
Expand Down
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ This library is more suited to ETL than interactive queries, since large amounts
- [Configuring column encoding](#configuring-column-encoding)
- [Setting descriptions on columns](#setting-descriptions-on-columns)
- [Transactional Guarantees](#transactional-guarantees)
- [Common problems and solutions](#common-problems-and-solutions)
- [S3 bucket and Redshift cluster are in different AWS regions](#s3-bucket-and-redshift-cluster-are-in-different-aws-regions)
- [Migration Guide](#migration-guide)

## Installation
Expand Down Expand Up @@ -527,6 +529,44 @@ If the deprecated `usestagingtable` setting is set to `false` then this library

**Querying Redshift tables**: Queries use Redshift's [`UNLOAD`](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) command to execute a query and save its results to S3 and use [manifests](https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html) to guard against certain eventually-consistent S3 operations. As a result, queries from Redshift data source for Spark should have the same consistency properties as regular Redshift queries.

## Common problems and solutions

### S3 bucket and Redshift cluster are in different AWS regions

By default, S3 <-> Redshift copies will not work if the S3 bucket and Redshift cluster are in different AWS regions.

If you attempt to perform a read of a Redshift table and the regions are mismatched then you may see a confusing error, such as

```
java.sql.SQLException: [Amazon](500310) Invalid operation: S3ServiceException:The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.
```

Similarly, attempting to write to Redshift using a S3 bucket in a different region may cause the following error:

```
error: Problem reading manifest file - S3ServiceException:The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.,Status 301,Error PermanentRedirect
```

**For writes:** Redshift's `COPY` command allows the S3 bucket's region to be explicitly specified, so you can make writes to Redshift work properly in these cases by adding

```
region 'the-region-name'
```

to the `extracopyoptions` setting. For example, with a bucket in the US East (Virginia) region and the Scala API, use

```
.option("extracopyoptions", "region 'us-east-1'")
```

**For reads:** According to [its documentation](http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html), the Redshift `UNLOAD` command does not support writing to a bucket in a different region:

> **Important**
>
> The Amazon S3 bucket where Amazon Redshift will write the output files must reside in the same region as your cluster.
As a result, this use-case is not supported by this library. The only workaround is to use a new bucket in the same region as your Redshift cluster.

## Migration Guide

- Version 2.0 removed a number of deprecated APIs; for details, see /~https://github.com/databricks/spark-redshift/pull/239
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2016 Databricks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.spark.redshift

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.AmazonS3Client
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

/**
* Integration tests where the Redshift cluster and the S3 bucket are in different AWS regions.
*/
class CrossRegionIntegrationSuite extends IntegrationSuiteBase {

protected val AWS_S3_CROSS_REGION_SCRATCH_SPACE: String =
loadConfigFromEnv("AWS_S3_CROSS_REGION_SCRATCH_SPACE")
require(AWS_S3_CROSS_REGION_SCRATCH_SPACE.contains("s3n"), "must use s3n:// URL")

override protected val tempDir: String = AWS_S3_CROSS_REGION_SCRATCH_SPACE + randomSuffix + "/"

test("write") {
val bucketRegion = Utils.getRegionForS3Bucket(
tempDir,
new AmazonS3Client(new BasicAWSCredentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY))).get
val df = sqlContext.createDataFrame(sc.parallelize(Seq(Row(1)), 1),
StructType(StructField("foo", IntegerType) :: Nil))
val tableName = s"roundtrip_save_and_load_$randomSuffix"
try {
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", tableName)
.option("tempdir", tempDir)
.option("extracopyoptions", s"region '$bucketRegion'")
.save()
// Check that the table exists. It appears that creating a table in one connection then
// immediately querying for existence from another connection may result in spurious "table
// doesn't exist" errors; this caused the "save with all empty partitions" test to become
// flaky (see #146). To work around this, add a small sleep and check again:
if (!DefaultJDBCWrapper.tableExists(conn, tableName)) {
Thread.sleep(1000)
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
}
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ private[redshift] case class RedshiftRelation(

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val creds = AWSCredentialsUtils.load(params, sqlContext.sparkContext.hadoopConfiguration)
for (
redshiftRegion <- Utils.getRegionForRedshiftCluster(params.jdbcUrl);
s3Region <- Utils.getRegionForS3Bucket(params.rootTempDir, s3ClientFactory(creds))
) {
if (redshiftRegion != s3Region) {
// We don't currently support `extraunloadoptions`, so even if Amazon _did_ add a `region`
// option for this we wouldn't be able to pass in the new option. However, we choose to
// err on the side of caution and don't throw an exception because we don't want to break
// existing workloads in case the region detection logic is wrong.
log.error("The Redshift cluster and S3 bucket are in different regions " +
s"($redshiftRegion and $s3Region, respectively). Redshift's UNLOAD command requires " +
s"that the Redshift cluster and Amazon S3 bucket be located in the same region, so " +
s"this read will fail.")
}
}
Utils.checkThatBucketHasObjectLifecycleConfiguration(params.rootTempDir, s3ClientFactory(creds))
if (requiredColumns.isEmpty) {
// In the special case where no columns were requested, issue a `count(*)` against Redshift
Expand Down
14 changes: 14 additions & 0 deletions src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,20 @@ private[redshift] class RedshiftWriter(
val creds: AWSCredentialsProvider =
AWSCredentialsUtils.load(params, sqlContext.sparkContext.hadoopConfiguration)

for (
redshiftRegion <- Utils.getRegionForRedshiftCluster(params.jdbcUrl);
s3Region <- Utils.getRegionForS3Bucket(params.rootTempDir, s3ClientFactory(creds))
) {
val regionIsSetInExtraCopyOptions =
params.extraCopyOptions.contains(s3Region) && params.extraCopyOptions.contains("region")
if (redshiftRegion != s3Region && !regionIsSetInExtraCopyOptions) {
log.error("The Redshift cluster and S3 bucket are in different regions " +
s"($redshiftRegion and $s3Region, respectively). In order to perform this cross-region " +
s"""write, you must add "region '$s3Region'" to the extracopyoptions parameter. """ +
"For more details on cross-region usage, see the README.")
}
}

Utils.assertThatFileSystemIsNotS3BlockFileSystem(
new URI(params.rootTempDir), sqlContext.sparkContext.hadoopConfiguration)

Expand Down
34 changes: 34 additions & 0 deletions src/main/scala/com/databricks/spark/redshift/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,38 @@ private[redshift] object Utils {
"use a s3n:// or s3a:// scheme.")
}
}

/**
* Attempts to retrieve the region of the S3 bucket.
*/
def getRegionForS3Bucket(tempDir: String, s3Client: AmazonS3Client): Option[String] = {
try {
val s3URI = createS3URI(Utils.fixS3Url(tempDir))
val bucket = s3URI.getBucket
assert(bucket != null, "Could not get bucket from S3 URI")
val region = s3Client.getBucketLocation(bucket) match {
// Map "US Standard" to us-east-1
case null | "US" => "us-east-1"
case other => other
}
Some(region)
} catch {
case NonFatal(e) =>
log.warn("An error occurred while trying to determine the S3 bucket's region", e)
None
}
}

/**
* Attempts to determine the region of a Redshift cluster based on its URL. It may not be possible
* to determine the region in some cases, such as when the Redshift cluster is placed behind a
* proxy.
*/
def getRegionForRedshiftCluster(url: String): Option[String] = {
val regionRegex = """.*\.([^.]+)\.redshift\.amazonaws\.com.*""".r
url match {
case regionRegex(region) => Some(region)
case _ => None
}
}
}
7 changes: 7 additions & 0 deletions src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@ class UtilsSuite extends FunSuite with Matchers {
removeCreds("s3n://ACCESSKEY:SECRETKEY@bucket/path/to/temp/dir") ===
"s3n://bucket/path/to/temp/dir")
}

test("getRegionForRedshiftCluster") {
val redshiftUrl =
"jdbc:redshift://example.secret.us-west-2.redshift.amazonaws.com:5439/database"
assert(Utils.getRegionForRedshiftCluster("mycluster.example.com") === None)
assert(Utils.getRegionForRedshiftCluster(redshiftUrl) === Some("us-west-2"))
}
}

0 comments on commit d508d3e

Please sign in to comment.