If you'd run the scala code only in the Spark shell, then this would suffice:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // load the data
val taxi_file=sc.textFile("path-to-your-data-files")
// for every line in the file (except the header), split it into fields,
// and 'emit' a tuple containing `(1, distance, num_passengers)` :
val ride=taxi_file.filter( !_.startsWith("VendorID") ).
map( line => {
val spl=line.split(",")
// 1, meter_miles, num_passenger
( 1, spl(4).toDouble, spl(3).toInt )
})
// sum up
val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
println(s"Totals: ${tuple}")
|
Output:
(146112989,1.9195264796499913E9,245566747)
Which is 146 million taxi-rides, covering 2 billion miles, carrying 245 million passengers.
To be submittable as a job on the cluster, the code needs to be encapsulated it as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object Taxi {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
// Arguments
if (arg.length < 1) {
logger.error("No input path!")
System.err.println("No input path!")
System.exit(1)
}
val inpath = arg(0)
// setup sparkcontext
val jobname = "Taxi"
val conf = new SparkConf().setAppName(jobname)
val sc = new SparkContext(conf)
logger.info(s"Job: ${jobname} Path: ${inpath}")
// the query
val taxi_file=sc.textFile(inpath)
val ride=taxi_file.filter( !_.startsWith("VendorID") ).
map( line => {
val spl=line.split(",")
// 1, meter_miles, num_passenger
( 1, spl(4).toDouble, spl(3).toInt )
})
val tuple=ride.reduce( (a,b) => (a._1+b._1, a._2+b._2, a._3+b._3))
println(s"Totals: ${tuple}")
}
}
|
SBT
We're building with sbt (the scala build tool). Download it, and install it on any of your systems. It will download all the necessary dependencies.
The code file and the skeleton files for the sbt build can be found in this zip: sbt_taxi.zip
Files:
Taxi.scala
build.sbt
project/assembly.sbt
File: Taxi.scala
The scala query code. See prior tab.
File: build.sbt
Just plain sbt stuff:
mainClass in assembly := Some("Taxi")
jarName in assembly := "taxi.jar"
lazy val root = (project in file(".")).
settings(
name := "taxi",
version := "1.0"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
)
File: project/assembly.sbt
Only contains the link to the assembly plugin for SBT. Aim: build a fat jar with all of the dependencies.
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
For more info: github.com/sbt/sbt-assembly
Compile
In the root dir of your project, run "sbt assembly". Go for coffee. The very first time this takes quite a while and requires quite a few downloads. When an error occurs, first try and rerun "sbt assembly". It may, or may not help.
Like this:
$ unzip ~/Downloads/sbt_taxi.zip .
Archive: sbt_taxi.zip
inflating: Taxi.scala
inflating: project/assembly.sbt
inflating: build.sbt
$ sbt assembly
[info] Loading project definition from /home/wildadm/20160428_scala_sbt3/project
[info] Updating {file:/home/wildadm/20160428_scala_sbt3/project/}root-20160428_scala_sbt3-build...
[info] Resolving org.pantsbuild#jarjar;1.6.0 ...
..
(first time? wait a long while)
..
If all goes well you end up with this beauty:
target/scala-2.10/taxi.jar
Troubleshooting
When trying to build on one system I kept on getting build errors (due to duplicate classes), until I added this section to the build file. I removed it afterwards.
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case PathList("com", "google", xs @ _*) => MergeStrategy.last
case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case x => old(x)
}
}