All Cylinders
 
01_intro
20160428

Intro

You have a development Spark Cluster, running on 4 Xen virtual images (named nk01, nk02.. ) on one and the same dom0 host (nk00) :

You want to write a Scala Spark job that you submit on the cluster, while monitoring the load to see that all 4 nodes are pulling their weight!

This article shows:

  • how to package a Scala Spark job using sbt and submit it on the cluster
  • how to monitor the load on the nodes of your Xen virtual systems, using a 'shoestring-budget' monitoring method
  • how to use R for visualization

Software used:

  • spark-1.6.0 for hadoop
  • hadoop-2.7.1 (hdfs/yarn)
  • R version 3.2.4
  • xentop on dom0.
02_data
20160428

Data: NYC taxi rides of 2015

The scala job in question is going to parse the New York city taxi data of 2015, and tally up the following:

  • how many rides
  • how many miles
  • how many passengers

Go ahead and download the yellow cab trip sheet data from www.nyc.gov/html/tlc/html/about/trip_record_data.shtml and put it on your HDFS.

Before you blow-up your data-pipeline a little warning about size: be aware that every file is between 1.7G and 2.0G, which brings the total to about about 22 gigabyte.

For a detailed description of the data fields, see: www.nyc.gov/html/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf

A quick cut from the January file:

VID      pickup_datetime      dropoff_datetime #psngr dist  pickup_longitude      pickup_latitude    ..
1    2015-07-01 00:00:00   2015-07-01 00:15:26   1    3.50  -73.994155883789063   40.751125335693359
1    2015-07-01 00:00:00   2015-07-01 00:22:22   1    3.90  -73.984657287597656   40.768486022949219
1    2015-07-01 00:00:00   2015-07-01 00:07:42   1    2.30  -73.978889465332031   40.762287139892578
1    2015-07-01 00:00:00   2015-07-01 00:39:37   1    9.20  -73.992790222167969   40.742759704589844
1    2015-07-01 00:00:00   2015-07-01 00:05:34   1    1.10  -73.912429809570313   40.769809722900391
1    2015-07-01 00:00:00   2015-07-01 00:06:46   2    1.00  -73.959159851074219   40.773429870605469
2    2015-07-01 00:00:00   2015-07-01 00:36:57   2   19.12  -73.789459228515625   40.647258758544922
2    2015-07-01 00:00:00   2015-07-01 06:30:15   1     .00    0                    0
2    2015-07-01 00:00:00   2015-07-01 11:27:07   1    2.58  -73.998931884765625   40.744678497314453
2    2015-07-01 00:00:00   2015-07-01 00:00:00   1    1.07  -73.99383544921875    40.735431671142578

We are interested in fields:

  • passenger_count (#psngr)
  • trip_distance (dist)
03_code
20160428

If you'd run the scala code only in the Spark shell, then this would suffice:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    // load the data
    val taxi_file=sc.textFile("path-to-your-data-files") 

    // for every line in the file (except the header), split it into fields, 
    // and 'emit' a tuple containing `(1, distance, num_passengers)` : 
    val ride=taxi_file.filter( !_.startsWith("VendorID") ).
                       map( line => {
                            val spl=line.split(",")

                            // 1, meter_miles, num_passenger
                            ( 1, spl(4).toDouble, spl(3).toInt ) 
                       })  

    // sum up
    val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
    println(s"Totals: ${tuple}") 

Output:

(146112989,1.9195264796499913E9,245566747)

Which is 146 million taxi-rides, covering 2 billion miles, carrying 245 million passengers.

To be submittable as a job on the cluster, the code needs to be encapsulated it as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import java.time.LocalDate
import java.time.format.DateTimeFormatter

object Taxi {
    def main(arg: Array[String]) {
        var logger = Logger.getLogger(this.getClass())

        // Arguments
        if (arg.length < 1) {
            logger.error("No input path!")
            System.err.println("No input path!")
            System.exit(1)
        }
        val inpath = arg(0)

        // setup sparkcontext
        val jobname = "Taxi"
        val conf = new SparkConf().setAppName(jobname)
        val sc = new SparkContext(conf)

        logger.info(s"Job: ${jobname} Path: ${inpath}")

        // the query 
        val taxi_file=sc.textFile(inpath)

        val ride=taxi_file.filter( !_.startsWith("VendorID") ).
                           map( line => {
                                val spl=line.split(",")

                                // 1, meter_miles, num_passenger
                                ( 1, spl(4).toDouble, spl(3).toInt ) 
                           })  

        val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
    
        println(s"Totals: ${tuple}") 
    
    }
}

SBT

We're building with sbt (the scala build tool). Download it, and install it on any of your systems. It will download all the necessary dependencies.

The code file and the skeleton files for the sbt build can be found in this zip: sbt_taxi.zip

Files:

Taxi.scala              
build.sbt               
project/assembly.sbt    

File: Taxi.scala

The scala query code. See prior tab.

File: build.sbt

Just plain sbt stuff:

mainClass in assembly := Some("Taxi")
jarName in assembly := "taxi.jar"

lazy val root = (project in file(".")).
  settings(
    name := "taxi",
    version := "1.0"
)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
)

File: project/assembly.sbt

Only contains the link to the assembly plugin for SBT. Aim: build a fat jar with all of the dependencies.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

For more info: github.com/sbt/sbt-assembly

Compile

In the root dir of your project, run "sbt assembly". Go for coffee. The very first time this takes quite a while and requires quite a few downloads. When an error occurs, first try and rerun "sbt assembly". It may, or may not help.

Like this:

$ unzip ~/Downloads/sbt_taxi.zip . 
Archive:  sbt_taxi.zip
  inflating: Taxi.scala              
  inflating: project/assembly.sbt    
  inflating: build.sbt               

$ sbt assembly
[info] Loading project definition from /home/wildadm/20160428_scala_sbt3/project
[info] Updating {file:/home/wildadm/20160428_scala_sbt3/project/}root-20160428_scala_sbt3-build...
[info] Resolving org.pantsbuild#jarjar;1.6.0 ...
..
(first time? wait a long while) 
..

If all goes well you end up with this beauty:

target/scala-2.10/taxi.jar

Troubleshooting

When trying to build on one system I kept on getting build errors (due to duplicate classes), until I added this section to the build file. I removed it afterwards.

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last
    case PathList("com", "google", xs @ _*) => MergeStrategy.last
    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
    case "about.html" => MergeStrategy.rename
    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
    case "META-INF/mailcap" => MergeStrategy.last
    case "META-INF/mimetypes.default" => MergeStrategy.last
    case "plugin.properties" => MergeStrategy.last
    case "log4j.properties" => MergeStrategy.last
    case x => old(x)
  }
}

The code file and the skeleton files for sbt build: sbt_taxi.zip

Transfer the taxi.jar resulting from sbt assembly to your server hosting spark.

04_launch
20160428

Launch

Here you need to do two things, nearly at the same time:

  1. kick off your monitor-data collection process (on nk00, xen domU)
  2. submit the job on the spark cluster (on nk01, the xen vm running spark )

Detail

Before you sumbit the spark job, you want to kick off your virtual server monitoring.

1. Collect monitoring data

Logon to your domain zero (dom0) host, ie the hypervisor or mother of your xen virtual images.

If you haven't installed it yet: sudo install xentop

Then run it as follows (more details in next section) :

sudo xentop -b -d 1 -i 500 > xt.log

2. Submit spark job

Meanwhile on your spark system submit your taxi job on the spark cluster as follows:

$SPARK_HOME/bin/spark-submit  --master yarn-cluster --num-executors 12   \
        taxi.jar hdfs:///user/dmn/20160421_nyc_taxi

For the above you need your freshly created taxi.jar and the location of the NYC taxi-ride csv files on your HDFS cluster.

Check via hadoop's web interface, how your job is faring... In this case that is on node 1: http://nk01:8088/cluster/apps

05_plot
20160428

Plot the load of your Xen cluster

Step 1: data gathering

In prior section you were told to run xentop on domain-0 as follows:

sudo xentop -b -d 1 -i 500 > xt.log

Note: the 500 is the number of seconds this action takes place. You may need to increase/decrease this value according to the situation.

Once the job has run, then filter the data (note: this greps on 'nk' the names of the nodes)

cat xt.log |  cut -c-37  | grep nk > xt_data.txt

This gives the following text file:

  nk01 --b---     111312    0.0  
  nk02 --b---      30264    0.0  
  nk03 --b---      30472    0.0  
  nk04 --b---      30425    0.0  
  nk01 --b---     111312    5.7  
  nk02 --b---      30264    2.1  

Step 2: load data into R

Startup R and load the :

nw=read.table("xt_data.txt",header=F)
cluster=data.frame( nk01=nw[nw$V1=='nk01','V4']
         , nk02=nw[nw$V1=='nk02','V4']
         , nk03=nw[nw$V1=='nk03','V4']
         , nk04=nw[nw$V1=='nk04','V4']
)

Admitted: there should be better way to translate the narrow to wide format, but since we only have a few nodes...

head(cluster)

   nk01 nk02 nk03 nk04
1   0.0  0.0  0.0  0.0
2   5.7  2.1  2.2  2.4
3   5.5  2.2  2.1  2.1
4  61.6  2.1  2.1  2.0
5 165.7  2.3  2.3  2.1
6 167.2  2.1  2.2  2.2

Plot:

par(mar=c(2, 4, 1, 1) ) # bottom, left, top, right
par(mfrow=c(4,1) )
lim=c(0,400)
for ( node in c('nk01','nk02','nk03','nk04') ) {
    plot( nw[nw$V1==node,'V4'], col="blue",type="l", ylim=lim, xlab='', ylab=node)
}

Result

Admire the plot in next section ..

06_conclusion
20160428

Plot

Conclusion

As you can tell from the chart, after a short burst on the namenode (nk01) the load gets distributed equally over the 4 nodes. In other Spark works like advertised on the box!

And we found out that the yellow cabs in New York city in 2015 clocked 2 billion miles carrying 245 million passengers, spread over 146 million taxi-rides.

 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/allcylinders at 2016-10-18 07:19