aardvark.code
 
06_spark
20160528

Spark example

What?

This example is about running a Spark Scala job on the cluster.

The same NYC taxi data is used, as was described in article 'All Cylinders', but now to calculate the average tip per ride per weekday. Also see that article for more information about the 'build.sbt' and 'assembly.sbt' files.

The final barchart produced looks like this:

Prerequisite

The Scala build tool ('sbt') has been installed on your system, as well as spark, a hadoop-client (for accessing hdfs), spark and finally R for plotting. And of course aardvark.

Go aardvark

Grab this aardvark.code file:

wget http://data.munging.ninja/aardvarkcode/spark/aardvark.code

The two most important files are on top:

  1. the scala query
  2. the plotting of the chart using R

Execute

Execute aardvark. Go for coffee. Come back. Have a look at the chart:

$ display barchart.png 

The aardvark.code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
##======================================================================== 
##== $query_scala

// input is 'in_rdd', output is 'out_rdd'

// for every line in the file (except the header), split it into fields,
// and 'emit' a tuple containing 
//     key:   day-of-week,  (prepended with number for sorting eg. "3-WED") 
//     value: (1, tip_amount) 
val ride=in_rdd.filter( !_.startsWith("VendorID") ).
    map( line => {
            val spl=line.split(",")
            val dateFmt= DateTimeFormatter.ofPattern("yyyy-MM-dd")
            val dt=LocalDate.parse( spl(1).substring(0,10), dateFmt)

            val dows=dt.getDayOfWeek().toString().substring(0,3)
            val down=dt.getDayOfWeek().getValue()
            ( s"$down-$dows", (1, spl(15).toDouble) )
       })

// sum up, per day-of-week
val tuple=ride.reduceByKey( (a,b) => (a._1+b._1, a._2+b._2))
    
// output: divide tips by num-rides, to get average
val out_rdd=tuple.map( r => {       
    val (k,v)=(r._1,r._2)
    if (v._1!=0) (k, v._2/v._1.toDouble) 
    else (k, 0) 
    } )

##======================================================================== 
##== plot.R

png('barchart.png',width=800, height=400) 
df<-read.table('output.txt', sep=',', header=F)  
names(df)<-c("dow","val") 
dfo=df[order(df$dow),]
dfo$dow=sub('^..','',dfo$dow)
barplot( dfo$val, names.arg=dfo$dow, 
         main="Average tip per ride",sub="2015" )
dev.off()

##======================================================================== 
##== Taxi.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import java.time.LocalDate
import java.time.format.DateTimeFormatter

object Taxi {

    def main(arg: Array[String]) {
        var logger = Logger.getLogger(this.getClass())

        // Arguments
        if (arg.length < 2) {
            logger.error("No input/output path!")
            System.err.println("No input/output path!")
            System.exit(1)
        }
        val inpath = arg(0)
        val outpath = arg(1)

        // setup sparkcontext
        val jobname = "Taxi"
        val conf = new SparkConf().setAppName(jobname)
        val sc = new SparkContext(conf)

        logger.info(s"Job=${jobname} Inpath=${inpath} Outpath=${outpath} " )

        val in_rdd=sc.textFile(inpath) // the taxi file
[[$query_scala]]
        out_rdd.saveAsTextFile(outpath)
    }
}


##======================================================================== 
##== build.sbt

mainClass in assembly := Some("Taxi") 
jarName in assembly := "taxi.jar"

lazy val root = (project in file(".")).
  settings(
    name := "taxi",
    version := "1.0"
)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
)

##======================================================================== 
##== project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

##======================================================================== 
##== aardvark.sh 
#!/bin/bash

# *********************************************************
# *** PART 0: checks before running ***********************

if [ -z $HADOOP_HOME ]; then  
    echo "Variable 'HADOOP_HOME' is not set!"
    exit 1
fi

if [ -z $SPARK_HOME ]; then  
    echo "Variable 'SPARK_HOME' is not set!"
    exit 1
fi
 
# *********************************************************
# *** PART 1: assemble the jar file ***********************
# compare age of source (scala file) and target (jar file) 
S_DATE=`stat -c %Y Taxi.scala`
T_DATE=0
JARFILE=`ls target/scala*/taxi.jar`
if [ ! -z $JARFILE ] 
then
    T_DATE=`stat -c %Y $JARFILE`
fi
if [ $T_DATE -le $S_DATE ]
then
    echo "*** sbt assembly ***"
    echo "(if this is the first run, go for a coffee break)"
    sbt assembly 
fi 

# *********************************************************
# *** PART 2: launch jar on the spark cluster *************
# condition 1: the jarfile should exist
JARFILE=`ls target/scala*/taxi.jar`
if [ ! -f $JARFILE ] 
then
    echo "'$JARFILE' doesn't exist, can't run it." 
    exit 1
fi

# condition 2: the jar file should be younger than 
#              the scala sourcefile
S_DATE=`stat -c %Y Taxi.scala`
T_DATE=`stat -c %Y $JARFILE`

if [ $T_DATE -le $S_DATE ]
then
    echo "'$JARFILE' is older than source, not running" 
    exit 1
fi

# define job input/output paths
OUTPUT_PATH=hdfs:///user/wildadm/tip_per_ride
INPUT_PATH=hdfs:///user/wildadm/20160421_nyc_taxi
#INPUT_PATH=hdfs:///user/wildadm/20160421_nyc_taxi_subset

# PRE-LAUNCH: delete the output directory 
$HADOOP_HOME/bin/hdfs dfs -rm -r tip_per_ride

# LAUNCH
$SPARK_HOME/bin/spark-submit --master yarn-cluster \
    --num-executors 12 \
    target/scala-2.10/taxi.jar \
    $INPUT_PATH $OUTPUT_PATH


# *********************************************************
# *** PART 3: post-run, fetch data from hdfs **************
$HADOOP_HOME/bin/hdfs dfs -cat $OUTPUT_PATH/part* |\
    sed -e 's/^(//' -e 's/)$//' > output.txt


# *********************************************************
# *** PART 4: plot the output *****************************
/usr/bin/R --slave --vanilla --quiet -f ./plot.R


# *********************************************************
# *** THE END *********************************************
echo "Done!"
 
Notes by Data Munging Ninja. Generated on akalumba:sync/20151223_datamungingninja/aardvarkcode at 2018-02-24 12:57