Skip to content

Commit

Permalink
Src folder in multimodule (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafafrdz authored Dec 25, 2024
1 parent 3b04fd0 commit f4d47f7
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package $organization$.$name$

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* Use this to test the app locally, from sbt:
* sbt "run inputFile.txt outputFile.txt"
* (+ select CountingLocalApp when prompted)
*/
object CountingLocalApp extends App {
val (inputFile, outputFile): (String, String) = (args(0), args(1))
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("my awesome app")

Runner.run(conf, inputFile, outputFile)
}

/**
* Use this when submitting the app to a cluster with spark-submit
*/
object CountingApp extends App {
val (inputFile, outputFile): (String, String) = (args(0), args(1))

// spark-submit command should supply all necessary config elements
Runner.run(new SparkConf(), inputFile, outputFile)
}

object Runner {
def run(conf: SparkConf, inputFile: String, outputFile: String): Unit = {
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile(inputFile)
val counts: RDD[(String, Int)] = WordCount.withStopWordsFiltered(rdd)
counts.saveAsTextFile(outputFile)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package $organization$.$name$

/**
* Everyone's favourite wordcount example.
*/

import org.apache.spark.rdd._

object WordCount {

/**
* A slightly more complex than normal wordcount example with optional
* separators and stopWords. Splits on the provided separators, removes
* the stopwords, and converts everything to lower case.
*/
def withStopWordsFiltered(
rdd: RDD[String],
separators: Array[Char] = " ".toCharArray,
stopWords: Set[String] = Set("the")
): RDD[(String, Int)] = {

val tokens: RDD[String] = rdd.flatMap(_.split(separators).map(_.trim.toLowerCase))
val lcStopWords: Set[String] = stopWords.map(_.trim.toLowerCase)
val words: RDD[String] = tokens.filter(token => !lcStopWords.contains(token) && token.nonEmpty)
val wordPairs: RDD[(String, Int)] = words.map((_, 1))
val wordCounts: RDD[(String, Int)] = wordPairs.reduceByKey(_ + _)
wordCounts
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package $organization$.$name$

/**
* A simple test for everyone's favourite wordcount example.
*/

import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.spark.rdd.RDD
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers

class WordCountTest extends AnyFlatSpecLike with Matchers with SharedSparkContext {

"WordCount" should "count words with Stop Words Removed" in {
val linesRDD: RDD[String] = sc.parallelize(
Seq(
"How happy was the panda? You ask.",
"Panda is the most happy panda in all the#!?ing land!"
)
)

val stopWords: Set[String] = Set("a", "the", "in", "was", "there", "she", "he")
val splitTokens: Array[Char] = "#%?!. ".toCharArray

val wordCounts: RDD[(String, Int)] =
WordCount.withStopWordsFiltered(linesRDD, splitTokens, stopWords)
val wordCountsAsMap: collection.Map[String, Int] = wordCounts.collectAsMap()

assert(!wordCountsAsMap.contains("the"))
assert(!wordCountsAsMap.contains("?"))
assert(!wordCountsAsMap.contains("#!?ing"))
assert(wordCountsAsMap.contains("ing"))
wordCountsAsMap("panda") should be(3)
}
}

0 comments on commit f4d47f7

Please sign in to comment.