1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
| ##========================================================================
##== $query_scala
// input is 'in_rdd', output is 'out_rdd'
// for every line in the file (except the header), split it into fields,
// and 'emit' a tuple containing
// key: day-of-week, (prepended with number for sorting eg. "3-WED")
// value: (1, tip_amount)
val ride=in_rdd.filter( !_.startsWith("VendorID") ).
map( line => {
val spl=line.split(",")
val dateFmt= DateTimeFormatter.ofPattern("yyyy-MM-dd")
val dt=LocalDate.parse( spl(1).substring(0,10), dateFmt)
val dows=dt.getDayOfWeek().toString().substring(0,3)
val down=dt.getDayOfWeek().getValue()
( s"$down-$dows", (1, spl(15).toDouble) )
})
// sum up, per day-of-week
val tuple=ride.reduceByKey( (a,b) => (a._1+b._1, a._2+b._2))
// output: divide tips by num-rides, to get average
val out_rdd=tuple.map( r => {
val (k,v)=(r._1,r._2)
if (v._1!=0) (k, v._2/v._1.toDouble)
else (k, 0)
} )
##========================================================================
##== plot.R
png('barchart.png',width=800, height=400)
df<-read.table('output.txt', sep=',', header=F)
names(df)<-c("dow","val")
dfo=df[order(df$dow),]
dfo$dow=sub('^..','',dfo$dow)
barplot( dfo$val, names.arg=dfo$dow,
main="Average tip per ride",sub="2015" )
dev.off()
##========================================================================
##== Taxi.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object Taxi {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
// Arguments
if (arg.length < 2) {
logger.error("No input/output path!")
System.err.println("No input/output path!")
System.exit(1)
}
val inpath = arg(0)
val outpath = arg(1)
// setup sparkcontext
val jobname = "Taxi"
val conf = new SparkConf().setAppName(jobname)
val sc = new SparkContext(conf)
logger.info(s"Job=${jobname} Inpath=${inpath} Outpath=${outpath} " )
val in_rdd=sc.textFile(inpath) // the taxi file
[[$query_scala]]
out_rdd.saveAsTextFile(outpath)
}
}
##========================================================================
##== build.sbt
mainClass in assembly := Some("Taxi")
jarName in assembly := "taxi.jar"
lazy val root = (project in file(".")).
settings(
name := "taxi",
version := "1.0"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
)
##========================================================================
##== project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
##========================================================================
##== aardvark.sh
#!/bin/bash
# *********************************************************
# *** PART 0: checks before running ***********************
if [ -z $HADOOP_HOME ]; then
echo "Variable 'HADOOP_HOME' is not set!"
exit 1
fi
if [ -z $SPARK_HOME ]; then
echo "Variable 'SPARK_HOME' is not set!"
exit 1
fi
# *********************************************************
# *** PART 1: assemble the jar file ***********************
# compare age of source (scala file) and target (jar file)
S_DATE=`stat -c %Y Taxi.scala`
T_DATE=0
JARFILE=`ls target/scala*/taxi.jar`
if [ ! -z $JARFILE ]
then
T_DATE=`stat -c %Y $JARFILE`
fi
if [ $T_DATE -le $S_DATE ]
then
echo "*** sbt assembly ***"
echo "(if this is the first run, go for a coffee break)"
sbt assembly
fi
# *********************************************************
# *** PART 2: launch jar on the spark cluster *************
# condition 1: the jarfile should exist
JARFILE=`ls target/scala*/taxi.jar`
if [ ! -f $JARFILE ]
then
echo "'$JARFILE' doesn't exist, can't run it."
exit 1
fi
# condition 2: the jar file should be younger than
# the scala sourcefile
S_DATE=`stat -c %Y Taxi.scala`
T_DATE=`stat -c %Y $JARFILE`
if [ $T_DATE -le $S_DATE ]
then
echo "'$JARFILE' is older than source, not running"
exit 1
fi
# define job input/output paths
OUTPUT_PATH=hdfs:///user/wildadm/tip_per_ride
INPUT_PATH=hdfs:///user/wildadm/20160421_nyc_taxi
#INPUT_PATH=hdfs:///user/wildadm/20160421_nyc_taxi_subset
# PRE-LAUNCH: delete the output directory
$HADOOP_HOME/bin/hdfs dfs -rm -r tip_per_ride
# LAUNCH
$SPARK_HOME/bin/spark-submit --master yarn-cluster \
--num-executors 12 \
target/scala-2.10/taxi.jar \
$INPUT_PATH $OUTPUT_PATH
# *********************************************************
# *** PART 3: post-run, fetch data from hdfs **************
$HADOOP_HOME/bin/hdfs dfs -cat $OUTPUT_PATH/part* |\
sed -e 's/^(//' -e 's/)$//' > output.txt
# *********************************************************
# *** PART 4: plot the output *****************************
/usr/bin/R --slave --vanilla --quiet -f ./plot.R
# *********************************************************
# *** THE END *********************************************
echo "Done!"
|