Running my first pyspark app in CDH5.
Spark has many key components in the Spark infrastructure, but there are a few basics that are covered here. These are partitioning, caching, serialization, and the shuffle operation.
Since Spark follows Lazy Evaluation it creates a single action job in the end. It waits until an action is called.
So this Job was further divided into two stages, one for Stage0: reduceByKey and the other for Stage1: collect(our last code). We used reduceByKey in it,so in future try to minimize the number of shuffles and the amount of data shuffled. Shuffles are expensive operations; all shuffle data must be written to disk and then transferred over the network.
This clearly show a PythonRDD object is a scala object under the hood. It reads/takes the 2.1 KB of shuffled data, map it together and returns a RDD.
from operator import add
count=myfile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(add)
count.collect()
count_reduceByKey=myfile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda w1,w2: w1 + w2)
count_reduceByKey.collect()
count_groupByKey=myfile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).groupByKey().map(lambda (word,count): (word,sum(count)))
count_groupByKey.collect()
In Saprk UI under the Stages tab you can chekc the read/write performed by our job.
- This Job is divided in to two Stages
- Stage0 for groupByKey()
- Stage1 for collect()
- They both shuffled 2.5 KB ko data, as compared to reduceBYKey() 2.1 KB
textbook=sc.textFile('hdfs://devtoolsRnD/data/everyone/work/hive/SparkCookbook.txt')
textbook.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(add).sortBy(lambda x: (x[1]),ascending=False).filter(lambda x:x[1]>600).collect()
[('the', 2516), ('to', 1157), ('a', 928), ('is', 896), ('of', 895), ('and', 782), ('in', 624)]
textbook.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(add).filter(lambda x:x[1]>600).sortBy(lambda x: (x[1]),ascending=False).collect()
myfile.cache()
OR
myfile.persist(StorageLevel.MEMORY_AND_DISK_SER)
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read. but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
myfile.persist(StorageLevel.MEMORY_AND_DISK)
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed
Spark is a framework which lets user to create parallel data processing code and then distribute that code across a cluster of machines, which means it will execute tasks as close to where the data lives as possible (i.e. minimize data transfer).
Eg: Push down predicates.