|
Intro
You have a development Spark Cluster, running on 4 Xen virtual images (named nk01, nk02.. ) on one and the same dom0 host (nk00) :
You want to write a Scala Spark job that you submit on the cluster, while monitoring the load to see that all 4 nodes are pulling their weight!
This article shows:
- how to package a Scala Spark job using
sbt and submit it on the cluster
- how to monitor the load on the nodes of your Xen virtual systems, using a 'shoestring-budget' monitoring method
- how to use R for visualization
Software used:
- spark-1.6.0 for hadoop
- hadoop-2.7.1 (hdfs/yarn)
- R version 3.2.4
- xentop on dom0.
Data: NYC taxi rides of 2015
The scala job in question is going to parse the New York city taxi data of 2015, and tally up the following:
- how many rides
- how many miles
- how many passengers
Go ahead and download the yellow cab trip sheet data from www.nyc.gov/html/tlc/html/about/trip_record_data.shtml and put it on your HDFS.
Before you blow-up your data-pipeline a little warning about size: be aware that every file is between 1.7G and 2.0G, which brings the total to about about 22 gigabyte.
For a detailed description of the data fields, see: www.nyc.gov/html/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
A quick cut from the January file:
VID pickup_datetime dropoff_datetime #psngr dist pickup_longitude pickup_latitude ..
1 2015-07-01 00:00:00 2015-07-01 00:15:26 1 3.50 -73.994155883789063 40.751125335693359
1 2015-07-01 00:00:00 2015-07-01 00:22:22 1 3.90 -73.984657287597656 40.768486022949219
1 2015-07-01 00:00:00 2015-07-01 00:07:42 1 2.30 -73.978889465332031 40.762287139892578
1 2015-07-01 00:00:00 2015-07-01 00:39:37 1 9.20 -73.992790222167969 40.742759704589844
1 2015-07-01 00:00:00 2015-07-01 00:05:34 1 1.10 -73.912429809570313 40.769809722900391
1 2015-07-01 00:00:00 2015-07-01 00:06:46 2 1.00 -73.959159851074219 40.773429870605469
2 2015-07-01 00:00:00 2015-07-01 00:36:57 2 19.12 -73.789459228515625 40.647258758544922
2 2015-07-01 00:00:00 2015-07-01 06:30:15 1 .00 0 0
2 2015-07-01 00:00:00 2015-07-01 11:27:07 1 2.58 -73.998931884765625 40.744678497314453
2 2015-07-01 00:00:00 2015-07-01 00:00:00 1 1.07 -73.99383544921875 40.735431671142578
We are interested in fields:
- passenger_count (#psngr)
- trip_distance (dist)
If you'd run the scala code only in the Spark shell, then this would suffice:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // load the data
val taxi_file=sc.textFile("path-to-your-data-files")
// for every line in the file (except the header), split it into fields,
// and 'emit' a tuple containing `(1, distance, num_passengers)` :
val ride=taxi_file.filter( !_.startsWith("VendorID") ).
map( line => {
val spl=line.split(",")
// 1, meter_miles, num_passenger
( 1, spl(4).toDouble, spl(3).toInt )
})
// sum up
val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
println(s"Totals: ${tuple}")
|
Output:
(146112989,1.9195264796499913E9,245566747)
Which is 146 million taxi-rides, covering 2 billion miles, carrying 245 million passengers.
To be submittable as a job on the cluster, the code needs to be encapsulated it as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object Taxi {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
// Arguments
if (arg.length < 1) {
logger.error("No input path!")
System.err.println("No input path!")
System.exit(1)
}
val inpath = arg(0)
// setup sparkcontext
val jobname = "Taxi"
val conf = new SparkConf().setAppName(jobname)
val sc = new SparkContext(conf)
logger.info(s"Job: ${jobname} Path: ${inpath}")
// the query
val taxi_file=sc.textFile(inpath)
val ride=taxi_file.filter( !_.startsWith("VendorID") ).
map( line => {
val spl=line.split(",")
// 1, meter_miles, num_passenger
( 1, spl(4).toDouble, spl(3).toInt )
})
val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
println(s"Totals: ${tuple}")
}
}
|
SBT
We're building with sbt (the scala build tool). Download it, and install it on any of your systems. It will download all the necessary dependencies.
The code file and the skeleton files for the sbt build can be found in this zip: sbt_taxi.zip
Files:
Taxi.scala
build.sbt
project/assembly.sbt
File: Taxi.scala
The scala query code. See prior tab.
File: build.sbt
Just plain sbt stuff:
mainClass in assembly := Some("Taxi")
jarName in assembly := "taxi.jar"
lazy val root = (project in file(".")).
settings(
name := "taxi",
version := "1.0"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
)
File: project/assembly.sbt
Only contains the link to the assembly plugin for SBT. Aim: build a fat jar with all of the dependencies.
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
For more info: github.com/sbt/sbt-assembly
Compile
In the root dir of your project, run "sbt assembly". Go for coffee. The very first time this takes quite a while and requires quite a few downloads. When an error occurs, first try and rerun "sbt assembly". It may, or may not help.
Like this:
$ unzip ~/Downloads/sbt_taxi.zip .
Archive: sbt_taxi.zip
inflating: Taxi.scala
inflating: project/assembly.sbt
inflating: build.sbt
$ sbt assembly
[info] Loading project definition from /home/wildadm/20160428_scala_sbt3/project
[info] Updating {file:/home/wildadm/20160428_scala_sbt3/project/}root-20160428_scala_sbt3-build...
[info] Resolving org.pantsbuild#jarjar;1.6.0 ...
..
(first time? wait a long while)
..
If all goes well you end up with this beauty:
target/scala-2.10/taxi.jar
Troubleshooting
When trying to build on one system I kept on getting build errors (due to duplicate classes), until I added this section to the build file. I removed it afterwards.
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case PathList("com", "google", xs @ _*) => MergeStrategy.last
case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case x => old(x)
}
}
The code file and the skeleton files for sbt build: sbt_taxi.zip
Transfer the taxi.jar resulting from sbt assembly to your server hosting spark.
Launch
Here you need to do two things, nearly at the same time:
- kick off your monitor-data collection process (on nk00, xen domU)
- submit the job on the spark cluster (on nk01, the xen vm running spark )
Detail
Before you sumbit the spark job, you want to kick off your virtual server monitoring.
1. Collect monitoring data
Logon to your domain zero (dom0) host, ie the hypervisor or mother of your xen virtual images.
If you haven't installed it yet: sudo install xentop
Then run it as follows (more details in next section) :
sudo xentop -b -d 1 -i 500 > xt.log
2. Submit spark job
Meanwhile on your spark system submit your taxi job on the spark cluster as follows:
$SPARK_HOME/bin/spark-submit --master yarn-cluster --num-executors 12 \
taxi.jar hdfs:///user/dmn/20160421_nyc_taxi
For the above you need your freshly created taxi.jar and the location of the NYC taxi-ride csv files on your HDFS cluster.
Check via hadoop's web interface, how your job is faring... In this case that is on node 1: http://nk01:8088/cluster/apps
Plot the load of your Xen cluster
Step 1: data gathering
In prior section you were told to run xentop on domain-0 as follows:
sudo xentop -b -d 1 -i 500 > xt.log
Note: the 500 is the number of seconds this action takes place. You may need to increase/decrease this value according to the situation.
Once the job has run, then filter the data (note: this greps on 'nk' the names of the nodes)
cat xt.log | cut -c-37 | grep nk > xt_data.txt
This gives the following text file:
nk01 --b--- 111312 0.0
nk02 --b--- 30264 0.0
nk03 --b--- 30472 0.0
nk04 --b--- 30425 0.0
nk01 --b--- 111312 5.7
nk02 --b--- 30264 2.1
Step 2: load data into R
Startup R and load the :
nw=read.table("xt_data.txt",header=F)
cluster=data.frame( nk01=nw[nw$V1=='nk01','V4']
, nk02=nw[nw$V1=='nk02','V4']
, nk03=nw[nw$V1=='nk03','V4']
, nk04=nw[nw$V1=='nk04','V4']
)
Admitted: there should be better way to translate the narrow to wide format, but since we only have a few nodes...
head(cluster)
nk01 nk02 nk03 nk04
1 0.0 0.0 0.0 0.0
2 5.7 2.1 2.2 2.4
3 5.5 2.2 2.1 2.1
4 61.6 2.1 2.1 2.0
5 165.7 2.3 2.3 2.1
6 167.2 2.1 2.2 2.2
Plot:
par(mar=c(2, 4, 1, 1) ) # bottom, left, top, right
par(mfrow=c(4,1) )
lim=c(0,400)
for ( node in c('nk01','nk02','nk03','nk04') ) {
plot( nw[nw$V1==node,'V4'], col="blue",type="l", ylim=lim, xlab='', ylab=node)
}
Result
Admire the plot in next section ..
Plot
Conclusion
As you can tell from the chart, after a short burst on the namenode (nk01) the load gets distributed equally over the 4 nodes. In other Spark works like advertised on the box!
And we found out that the yellow cabs in New York city in 2015 clocked 2 billion miles carrying 245 million passengers, spread over 146 million taxi-rides.
| |