Skip to content

Commit

Permalink
[EXE-1558] Setup build and deploy; add pushdown (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorisZ017 authored May 24, 2023
1 parent 8fdc563 commit 5cfff18
Show file tree
Hide file tree
Showing 35 changed files with 2,364 additions and 146 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ project/target
build/*.jar
aws_variables.env
derby.log
.bsp/
5 changes: 1 addition & 4 deletions .jvmopts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
-Dfile.encoding=UTF8
-Xms1024M
-Xmx1024M
-Xss6M
-XX:MaxPermSize=512m
-XX:+CMSClassUnloadingEnabled
-XX:+UseConcMarkSweepGC
-Xss6M
45 changes: 18 additions & 27 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,47 @@
* limitations under the License.
*/

import com.typesafe.sbt.pgp.PgpKeys
import org.scalastyle.sbt.ScalastylePlugin.rawScalastyleSettings
import sbt.Keys._
import sbt._
import sbtrelease.ReleasePlugin.autoImport.ReleaseTransformations._
import sbtrelease.ReleasePlugin.autoImport._
import scoverage.ScoverageKeys

val sparkVersion = "3.2.0"
val sparkVersion = "3.3.2"

// Define a custom test configuration so that unit test helper classes can be re-used under
// the integration tests configuration; see http://stackoverflow.com/a/20635808.
lazy val IntegrationTest = config("it") extend Test
val testSparkVersion = sys.props.get("spark.testVersion").getOrElse(sparkVersion)
val testHadoopVersion = sys.props.get("hadoop.testVersion").getOrElse("3.2.1")
val testHadoopVersion = sys.props.get("hadoop.testVersion").getOrElse("3.3.2")
// DON't UPGRADE AWS-SDK-JAVA if not compatible with hadoop version
val testAWSJavaSDKVersion = sys.props.get("aws.testVersion").getOrElse("1.11.1033")
val testAWSJavaSDKVersion = sys.props.get("aws.testVersion").getOrElse("1.12.31")


lazy val root = Project("spark-redshift", file("."))
.configs(IntegrationTest)
.settings(net.virtualvoid.sbt.graph.Plugin.graphSettings: _*)
.settings(Project.inConfig(IntegrationTest)(rawScalastyleSettings()): _*)
.settings(Defaults.coreDefaultSettings: _*)
.settings(Defaults.itSettings: _*)
.enablePlugins(PublishToArtifactory)
.settings(
name := "spark-redshift",
organization := "io.github.spark-redshift-community",
scalaVersion := "2.12.15",
licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"),
credentials += Credentials(Path.userHome / ".sbt" / ".credentials"),
scalacOptions ++= Seq("-target:jvm-1.8"),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
scalacOptions ++= Seq("-release", "17"),
javacOptions ++= Seq("-source", "17", "-target", "17"),
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.32",
"com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4",

// A Redshift-compatible JDBC driver must be present on the classpath for spark-redshift to work.
// For testing, we use an Amazon driver, which is available from
// http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html
"com.amazon.redshift" % "jdbc41" % "1.2.27.1051" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.27.1051/RedshiftJDBC41-no-awssdk-1.2.27.1051.jar",
"com.amazon.redshift" % "jdbc42" % "2.1.0.14" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.1.0.14/redshift-jdbc42-2.1.0.14.jar",

"com.google.guava" % "guava" % "27.0.1-jre" % "test",
"com.google.guava" % "guava" % "20.0",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.mockito" % "mockito-core" % "1.10.19" % "test",

Expand All @@ -70,7 +68,7 @@ lazy val root = Project("spark-redshift", file("."))
"org.apache.hadoop" % "hadoop-aws" % testHadoopVersion excludeAll
(ExclusionRule(organization = "com.fasterxml.jackson.core"))
exclude("org.apache.hadoop", "hadoop-common")
exclude("com.amazonaws", "aws-java-sdk-s3") force(),
exclude("com.amazonaws", "aws-java-sdk-bundle") force(), // load from provided aws-java-sdk-* instead of bundle

"org.apache.spark" %% "spark-core" % testSparkVersion % "provided" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-sql" % testSparkVersion % "provided" exclude("org.apache.hadoop", "hadoop-client") force(),
Expand All @@ -82,24 +80,19 @@ lazy val root = Project("spark-redshift", file("."))
// Display full-length stacktraces from ScalaTest:
testOptions in Test += Tests.Argument("-oF"),
fork in Test := true,
javaOptions in Test ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M"),

/********************
* Release settings *
********************/

publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
},
javaOptions in Test ++= Seq(
"-Xms512M", "-Xmx2048M",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
),

publishMavenStyle := true,
releaseCrossBuild := true,
licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0")),
releasePublishArtifactsAction := PgpKeys.publishSigned.value,

pomExtra :=
<url>https://github.com:spark_redshift_community/spark.redshift</url>
Expand Down Expand Up @@ -130,8 +123,6 @@ lazy val root = Project("spark-redshift", file("."))
</developer>
</developers>,

bintrayReleaseOnPublish in ThisBuild := false,

// Add publishing to spark packages as another step.
releaseProcess := Seq[ReleaseStep](
checkSnapshotDependencies,
Expand Down
18 changes: 18 additions & 0 deletions project/ArtifactoryCredentials.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sbt.Keys.credentials
import sbt.{AutoPlugin, Credentials, Def, PluginTrigger}

// Mirrors aiq - ArtifactoryCredentials
object ArtifactoryCredentials extends AutoPlugin {

lazy val credentialSettings: Seq[Def.Setting[_]] = Seq(
// Attempts to use environment variables if available to configure Artifactory
credentials ++= sys.env.get("ARTIFACTORY_ACCESS_TOKEN").toList.map { token =>
Credentials("Artifactory Realm", "actioniq.jfrog.io", sys.env("ARTIFACTORY_USER"), token)
}
)

// Applying this to all builds automatically for now. trigger = allRequirements with no requirements
override def trigger: PluginTrigger = allRequirements

override def projectSettings: Seq[Def.Setting[_]] = credentialSettings
}
24 changes: 24 additions & 0 deletions project/PublishToArtifactory.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sbt.Keys.{isSnapshot, packageDoc, packageSrc, publishArtifact, publishConfiguration, publishLocalConfiguration, publishM2Configuration, publishMavenStyle, publishTo}
import sbt.{Def, _}

// Mirrors aiq - PublishToArtifactory
object PublishToArtifactory extends AutoPlugin {

lazy val baseSettings: Seq[Def.Setting[_]] = Seq(
publishTo := Some("Artifactory Realm".at("https://actioniq.jfrog.io/artifactory/aiq-sbt-local")),
publishMavenStyle := true,
publishConfiguration := publishConfiguration.value.withOverwrite(isSnapshot.value),
publishLocalConfiguration := publishLocalConfiguration.value.withOverwrite(isSnapshot.value),
publishM2Configuration := publishM2Configuration.value.withOverwrite(isSnapshot.value),
Compile / packageSrc / publishArtifact := false,
Compile / packageDoc / publishArtifact := false,
)

override def requires: Plugins = ArtifactoryCredentials

override def trigger: PluginTrigger = noTrigger

// a group of settings that are automatically added to projects.
override val projectSettings: Seq[Def.Setting[_]] = baseSettings

}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
sbt.version=0.13.18
sbt.version=1.4.9
14 changes: 4 additions & 10 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")
addDependencyTreePlugin

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.5")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")

addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")

addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.11")

libraryDependencies += "org.apache.maven" % "maven-artifact" % "3.3.9"
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}

import io.github.spark_redshift_community.spark.redshift.Parameters.MergedParameters

import scala.util.Random


Expand Down Expand Up @@ -62,6 +64,16 @@ trait IntegrationSuiteBase
s"$AWS_REDSHIFT_JDBC_URL?user=$AWS_REDSHIFT_USER&password=$AWS_REDSHIFT_PASSWORD&ssl=true"
}

protected def param: MergedParameters = {
MergedParameters(
Map(
"url" -> jdbcUrlNoUserPassword,
"user" -> AWS_REDSHIFT_USER,
"password" -> AWS_REDSHIFT_PASSWORD
)
)
}

protected def jdbcUrlNoUserPassword: String = {
s"$AWS_REDSHIFT_JDBC_URL?ssl=true"
}
Expand Down Expand Up @@ -91,7 +103,7 @@ trait IntegrationSuiteBase
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY)
sc.hadoopConfiguration.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
sc.hadoopConfiguration.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
conn = DefaultJDBCWrapper.getConnector(param)
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PostgresDriverIntegrationSuite extends IntegrationSuiteBase {

// TODO (luca|issue #9) Fix tests when using postgresql driver
ignore("postgresql driver takes precedence for jdbc:postgresql:// URIs") {
val conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
val conn = DefaultJDBCWrapper.getConnector(param)
try {
assert(conn.getClass.getName === "org.postgresql.jdbc4.Jdbc4Connection")
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package io.github.spark_redshift_community.spark.redshift
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.services.s3.AmazonS3Client
import io.github.spark_redshift_community.spark.redshift
import io.github.spark_redshift_community.spark.redshift.pushdowns.RedshiftPushDownStrategy
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
Expand All @@ -41,6 +44,13 @@ class DefaultSource(
*/
def this() = this(DefaultJDBCWrapper, awsCredentials => new AmazonS3Client(awsCredentials))

private def appendRedshiftPushDownStrategy(
sqlContext: SQLContext,
): Unit = {
sqlContext.sparkSession.experimental.extraStrategies ++= Seq(
new RedshiftPushDownStrategy(sqlContext.sparkContext.getConf)
)
}
/**
* Create a new RedshiftRelation instance using parameters from Spark SQL DDL. Resolves the schema
* using JDBC connection over provided URL, which must contain credentials.
Expand All @@ -49,7 +59,9 @@ class DefaultSource(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val params = Parameters.mergeParameters(parameters)
redshift.RedshiftRelation(jdbcWrapper, s3ClientFactory, params, None)(sqlContext)
val jdbcOptions = new JDBCOptions(CaseInsensitiveMap(parameters))
appendRedshiftPushDownStrategy(sqlContext)
redshift.RedshiftRelation(jdbcWrapper, s3ClientFactory, params, None, jdbcOptions)(sqlContext)
}

/**
Expand All @@ -60,7 +72,14 @@ class DefaultSource(
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val params = Parameters.mergeParameters(parameters)
redshift.RedshiftRelation(jdbcWrapper, s3ClientFactory, params, Some(schema))(sqlContext)
val jdbcOptions = new JDBCOptions(CaseInsensitiveMap(parameters))
appendRedshiftPushDownStrategy(sqlContext)
redshift.RedshiftRelation(jdbcWrapper,
s3ClientFactory,
params,
Some(schema),
jdbcOptions
)(sqlContext)
}

/**
Expand All @@ -78,7 +97,7 @@ class DefaultSource(
}

def tableExists: Boolean = {
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, params.credentials)
val conn = jdbcWrapper.getConnector(params)
try {
jdbcWrapper.tableExists(conn, table.toString)
} finally {
Expand Down
Loading

0 comments on commit 5cfff18

Please sign in to comment.