|    | 
      
    
Load the data from a csv file
Load data, from a CSV file containing city-names, country-code, the location (latitude, longitude), elevation and population, as provided by geonames.org: download.geonames.org/export/dump 
  
- the dataset cities1000.txt can be downloaded in a zipfile from geonames.org.
 
- this file contains one line for every city with a population greater than 1000. For more information see 'geoname' table stub on above link
 
- fields are separated by tabs
 
- some fields will be ignored
 
 
Startup the Spark shell, and load the data file, into an RDD[String]: 
$ spark-shell
var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")  
tx.count()
Long = 145725
 
Define a case class, and a parse function : 
case class City(
        geonameid: Int, 
        name: String, 
        asciiname: String, 
        latitude: Double, longitude: Double, 
        country: String, 
        population: Int, 
        elevation: Int) 
def parse(line: String) = { 
  val spl=line.split("\t") 
  val geonameid=spl(0).toInt
  val name=spl(1)
  val asciiname=spl(2)
  val latitude=spl(4).toDouble
  val longitude=spl(5).toDouble
  val country=spl(8)
  val population=spl(14).toInt
  val elevation=spl(16).toInt
  City(geonameid, name, asciiname, latitude, longitude, country, population, elevation)
}
 
Try and parse 1 line: 
parse(tx.take(1)(0))
City = City(3039154,El Tarter,El Tarter,42.57952,1.65362,AD,1052,1721)
 
Success! Now let's parse the complete text file into City records: 
var ct=tx.map(parse(_))
 
Check: 
ct.count
Long = 145725
 
Spot-check: list all cities above 3500m and having a population of more than 100000, ordered by descending elevation: 
var chk=ct.filter( rec => ( rec.elevation>3500) && (rec.population>100000)).collect()
chk.sortWith( (x,y) => (x.elevation>y.elevation) ).foreach(println)
City(3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967)
City(3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936)
City(3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834)
City(3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825)
City(3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782)
City(1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651)
 
Sidenote: a case class has a restricted number of parameters, if there are too many, you'll get this error: 
Implementation restriction: case classes cannot have more than 22 parameters.
 
Startup the Spark shell, and load the data file, into an RDD[String]: 
$ spark-shell
var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")  
tx.count()
Long = 145725
 
Import some needed types: 
import org.apache.spark.sql.types.{StructType,StructField,StringType,LongType,DoubleType};
import org.apache.spark.sql.Row;
 
Create the SQL context: 
val sqlctx= new org.apache.spark.sql.SQLContext(sc)
 
Define the schema: 
val schema = StructType(Array(
    StructField("geonameid",LongType,false),
    StructField("name",StringType,true),
    StructField("asciiname",StringType,true),
    StructField("latitude",DoubleType,true),
    StructField("longitude",DoubleType,true),
    StructField("country",StringType,true),
    StructField("population",LongType,true),
    StructField("elevation",LongType,true)
    ) )
 
Turn the text-line RDD into a row RDD: 
val rowrdd = tx.map(_.split("\t")).map(p =>
        Row(p(0).toLong,p(1), p(2),
            p(4).toDouble,p(5).toDouble,
            p(8),p(14).toLong,p(16).toLong) )
 
Create the dataframe: 
val city_df=sqlctx.createDataFrame(rowrdd,schema)
 
Spotcheck: 
city_df.filter(" elevation>3500 and population>100000 ").
        orderBy(desc("elevation")).
        collect().
        foreach(println)
[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]
 
For future use
Save the dataframe as a parquet file .. 
city_df.write.parquet("hdfs:///user/dmn/cities/city_parquet") 
 
.. for easy retrieval in the future: 
val city_df= sqlctx.read.parquet("hdfs:///user/dmn/cities/city_parquet") 
 
See tab Spark Dataframe, on how to create the dataframe city_df : 
Then register the table: 
city_df.registerTempTable("city")
 
And Bob's your uncle. 
Spotcheck: 
sqlctx.sql("select * from city where elevation>3500 and population>100000 order by elevation desc").
       collect().foreach(println)
[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]
 
Note: if you run above without the collect() then the ordering may be incorrect. 
R: 
df<-read.table("/home/dmn/city_data/cities1000.txt",sep="\t"
                 ,quote="",stringsAsFactors=F, na.strings = "",
                 )[,c(1, 2, 3, 5, 6, 9, 15, 17)]
names(df)<-c("geonameid","name","asciiname","latitude","longitude",
             "country","population","elevation")
options(width=200)
 
Note: if you don't set na.strings = "" in this case, then all Namibia (code 'NA') records are flagged as having NA for country. Check: table(is.na(df$country)) should give all F's and no T's. 
Dimensions: 
dim(df)
145725      8
 
Spotcheck: cities above 3500m and having a population of more than 100000: 
tf=df[(df$elevation>3500)&(df$population>100000),]
tf[order(-tf$elevation),] 
       geonameid    name asciiname  latitude longitude country population elevation
7015     3907584  Potosí    Potosi -19.58361 -65.75306      BO     141251      3967
7026     3909234   Oruro     Oruro -17.98333 -67.15000      BO     208684      3936
101370   3937513 Juliaca   Juliaca -15.50000 -70.13333      PE     245675      3834
101162   3931276    Puno      Puno -15.84220 -70.01990      PE     116552      3825
7044     3911925  La Paz    La Paz -16.50000 -68.15000      BO     812799      3782
12297    1280737   Lhasa     Lhasa  29.65000  91.10000      CN     118721      3651
 
Read tab-separated file into a data.table
Read: 
library(data.table)
dt<-fread("/home/dmn/city_data/cities1000.txt", header=FALSE,
                na.strings = "",
                select=c(1, 2, 3, 5, 6, 9, 15, 17) )
setnames(dt,c("geonameid","name","asciiname","latitude","longitude",
              "country","population","elevation"))
options(width=200)
 
Spotcheck: cities above 3500m and having a population of more than 100000: 
dt[(dt$elevation>3500)&(dt$population>100000),][order(-elevation),]
   geonameid    name asciiname  latitude longitude country population elevation
1:   3907584  Potosí    Potosi -19.58361 -65.75306      BO     141251      3967
2:   3909234   Oruro     Oruro -17.98333 -67.15000      BO     208684      3936
3:   3937513 Juliaca   Juliaca -15.50000 -70.13333      PE     245675      3834
4:   3931276    Puno      Puno -15.84220 -70.01990      PE     116552      3825
5:   3911925  La Paz    La Paz -16.50000 -68.15000      BO     812799      3782
6:   1280737   Lhasa     Lhasa  29.65000  91.10000      CN     118721      3651
 
See also: http://pandas.pydata.org/pandas-docs/stable/io.html : 
import pandas as pd
import csv   
colnames= [ "geonameid","name","asciiname","latitude","longitude",
            "country","population","elevation" ]
df=pd.io.parsers.read_table("/home/dmn/city_data/cities1000.txt",
                sep="\t", header=None, names= colnames,
                quoting=csv.QUOTE_NONE,usecols=[ 0, 1, 2, 4, 5, 8, 14, 16])
pd.set_option('display.width', 200)
 
Number of records: 
len(df)
145725
 
Spotcheck: cities above 3500m and having a population of more than 100000: 
df[(df.elevation>3500) & (df.population>100000)].sort("elevation",ascending=False)
        geonameid     name asciiname  latitude  longitude country  population  elevation
7014      3907584   Potosí    Potosi -19.58361  -65.75306      BO      141251       3967
7025      3909234    Oruro     Oruro -17.98333  -67.15000      BO      208684       3936
100330    3937513  Juliaca   Juliaca -15.50000  -70.13333      PE      245675       3834
100122    3931276     Puno      Puno -15.84220  -70.01990      PE      116552       3825
7043      3911925   La Paz    La Paz -16.50000  -68.15000      BO      812799       3782
12296     1280737    Lhasa     Lhasa  29.65000   91.10000      CN      118721       3651
 
Preprocessing
The data will be loaded into a Postgres table using the copy command. With this command, it's not possible to only load a subset of columns, therefore we are using R to preprocess the file. 
Startup R, read selected columns of the input, then copy the data out into file "cities.csv" : 
df<-read.table("/home/dmn/city_data/cities1000.txt", sep="\t",
               quote="",stringsAsFactors=F)[,c(1, 2, 3, 5, 6, 9, 15, 17)]
# corrections for city names with a double quote in their name:
df[df$V1==1682560,c("V2","V3")]=c("T-boli","T-boli")
df[df$V1==688022,c("V2","V3")]=c("Yur-yivka","Yur-yivka")
write.table(df,"cities.csv", sep="\t",row.names=F,col.names=F,na="",quote=F) 
 
Sidenote: create a postgres database user
On the linux CLI : 
sudo  -u postgres psql template1
template1=# create user dmn encrypted password 'dmn';
template1=# create DATABASE dmn WITH TEMPLATE = template0 ENCODING = 'UTF8' owner dmn;
template1=# \q
 
Startup psql : 
psql -h localhost -U dmn
 
Load into postgres
Startup psql, and create the table: 
create table t_city (
     geonameid int  primary key
    ,name      varchar(128) 
    ,asciiname varchar(128) 
    ,latitude  numeric 
    ,longitude  numeric
    ,country   varchar(5) 
    ,population int
    ,elevation int
);
 
Load the data: 
 \copy t_city from 'cities.csv' with DELIMITER E'\t' CSV
 
Count: 
select count(1) from t_city 
count  
--------
 145725
 
Spotcheck: cities above 3500m and having a population of more than 100000: 
select * 
from t_city 
where elevation>3500 and population>100000
order by elevation desc;
 geonameid |  name   | asciiname | latitude  | longitude | country | population | elevation 
-----------+---------+-----------+-----------+-----------+---------+------------+-----------
   3907584 | Potosí  | Potosi    | -19.58361 | -65.75306 | BO      |     141251 |      3967
   3909234 | Oruro   | Oruro     | -17.98333 |    -67.15 | BO      |     208684 |      3936
   3937513 | Juliaca | Juliaca   |     -15.5 | -70.13333 | PE      |     245675 |      3834
   3931276 | Puno    | Puno      |  -15.8422 |  -70.0199 | PE      |     116552 |      3825
   3911925 | La Paz  | La Paz    |     -16.5 |    -68.15 | BO      |     812799 |      3782
   1280737 | Lhasa   | Lhasa     |     29.65 |      91.1 | CN      |     118721 |      3651
 
That concludes the loading! 
  
       |   |