All Cylinders
 
03_code
20160428

If you'd run the scala code only in the Spark shell, then this would suffice:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    // load the data
    val taxi_file=sc.textFile("path-to-your-data-files") 

    // for every line in the file (except the header), split it into fields, 
    // and 'emit' a tuple containing `(1, distance, num_passengers)` : 
    val ride=taxi_file.filter( !_.startsWith("VendorID") ).
                       map( line => {
                            val spl=line.split(",")

                            // 1, meter_miles, num_passenger
                            ( 1, spl(4).toDouble, spl(3).toInt ) 
                       })  

    // sum up
    val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
    println(s"Totals: ${tuple}") 

Output:

(146112989,1.9195264796499913E9,245566747)

Which is 146 million taxi-rides, covering 2 billion miles, carrying 245 million passengers.

To be submittable as a job on the cluster, the code needs to be encapsulated it as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import java.time.LocalDate
import java.time.format.DateTimeFormatter

object Taxi {
    def main(arg: Array[String]) {
        var logger = Logger.getLogger(this.getClass())

        // Arguments
        if (arg.length < 1) {
            logger.error("No input path!")
            System.err.println("No input path!")
            System.exit(1)
        }
        val inpath = arg(0)

        // setup sparkcontext
        val jobname = "Taxi"
        val conf = new SparkConf().setAppName(jobname)
        val sc = new SparkContext(conf)

        logger.info(s"Job: ${jobname} Path: ${inpath}")

        // the query 
        val taxi_file=sc.textFile(inpath)

        val ride=taxi_file.filter( !_.startsWith("VendorID") ).
                           map( line => {
                                val spl=line.split(",")

                                // 1, meter_miles, num_passenger
                                ( 1, spl(4).toDouble, spl(3).toInt ) 
                           })  

        val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
    
        println(s"Totals: ${tuple}") 
    
    }
}

SBT

We're building with sbt (the scala build tool). Download it, and install it on any of your systems. It will download all the necessary dependencies.

The code file and the skeleton files for the sbt build can be found in this zip: sbt_taxi.zip

Files:

Taxi.scala              
build.sbt               
project/assembly.sbt    

File: Taxi.scala

The scala query code. See prior tab.

File: build.sbt

Just plain sbt stuff:

mainClass in assembly := Some("Taxi")
jarName in assembly := "taxi.jar"

lazy val root = (project in file(".")).
  settings(
    name := "taxi",
    version := "1.0"
)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
)

File: project/assembly.sbt

Only contains the link to the assembly plugin for SBT. Aim: build a fat jar with all of the dependencies.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

For more info: github.com/sbt/sbt-assembly

Compile

In the root dir of your project, run "sbt assembly". Go for coffee. The very first time this takes quite a while and requires quite a few downloads. When an error occurs, first try and rerun "sbt assembly". It may, or may not help.

Like this:

$ unzip ~/Downloads/sbt_taxi.zip . 
Archive:  sbt_taxi.zip
  inflating: Taxi.scala              
  inflating: project/assembly.sbt    
  inflating: build.sbt               

$ sbt assembly
[info] Loading project definition from /home/wildadm/20160428_scala_sbt3/project
[info] Updating {file:/home/wildadm/20160428_scala_sbt3/project/}root-20160428_scala_sbt3-build...
[info] Resolving org.pantsbuild#jarjar;1.6.0 ...
..
(first time? wait a long while) 
..

If all goes well you end up with this beauty:

target/scala-2.10/taxi.jar

Troubleshooting

When trying to build on one system I kept on getting build errors (due to duplicate classes), until I added this section to the build file. I removed it afterwards.

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last
    case PathList("com", "google", xs @ _*) => MergeStrategy.last
    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
    case "about.html" => MergeStrategy.rename
    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
    case "META-INF/mailcap" => MergeStrategy.last
    case "META-INF/mimetypes.default" => MergeStrategy.last
    case "plugin.properties" => MergeStrategy.last
    case "log4j.properties" => MergeStrategy.last
    case x => old(x)
  }
}

The code file and the skeleton files for sbt build: sbt_taxi.zip

Transfer the taxi.jar resulting from sbt assembly to your server hosting spark.

 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/allcylinders at 2016-10-18 07:19