Framed Data
 
02b_data_load
20160601

Load the data from a csv file

Load data, from a CSV file containing city-names, country-code, the location (latitude, longitude), elevation and population, as provided by geonames.org: download.geonames.org/export/dump

  • the dataset cities1000.txt can be downloaded in a zipfile from geonames.org.
  • this file contains one line for every city with a population greater than 1000. For more information see 'geoname' table stub on above link
  • fields are separated by tabs
  • some fields will be ignored

Startup the Spark shell, and load the data file, into an RDD[String]:

$ spark-shell

var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")  

tx.count()
Long = 145725

Define a case class, and a parse function :

case class City(
        geonameid: Int, 
        name: String, 
        asciiname: String, 
        latitude: Double, longitude: Double, 
        country: String, 
        population: Int, 
        elevation: Int) 

def parse(line: String) = { 
  val spl=line.split("\t") 
  val geonameid=spl(0).toInt
  val name=spl(1)
  val asciiname=spl(2)
  val latitude=spl(4).toDouble
  val longitude=spl(5).toDouble
  val country=spl(8)
  val population=spl(14).toInt
  val elevation=spl(16).toInt
  City(geonameid, name, asciiname, latitude, longitude, country, population, elevation)
}

Try and parse 1 line:

parse(tx.take(1)(0))
City = City(3039154,El Tarter,El Tarter,42.57952,1.65362,AD,1052,1721)

Success! Now let's parse the complete text file into City records:

var ct=tx.map(parse(_))

Check:

ct.count
Long = 145725

Spot-check: list all cities above 3500m and having a population of more than 100000, ordered by descending elevation:

var chk=ct.filter( rec => ( rec.elevation>3500) && (rec.population>100000)).collect()
chk.sortWith( (x,y) => (x.elevation>y.elevation) ).foreach(println)

City(3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967)
City(3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936)
City(3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834)
City(3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825)
City(3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782)
City(1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651)

Sidenote: a case class has a restricted number of parameters, if there are too many, you'll get this error:

Implementation restriction: case classes cannot have more than 22 parameters.

Startup the Spark shell, and load the data file, into an RDD[String]:

$ spark-shell

var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")  

tx.count()
Long = 145725

Import some needed types:

import org.apache.spark.sql.types.{StructType,StructField,StringType,LongType,DoubleType};
import org.apache.spark.sql.Row;

Create the SQL context:

val sqlctx= new org.apache.spark.sql.SQLContext(sc)

Define the schema:

val schema = StructType(Array(
    StructField("geonameid",LongType,false),
    StructField("name",StringType,true),
    StructField("asciiname",StringType,true),
    StructField("latitude",DoubleType,true),
    StructField("longitude",DoubleType,true),
    StructField("country",StringType,true),
    StructField("population",LongType,true),
    StructField("elevation",LongType,true)
    ) )

Turn the text-line RDD into a row RDD:

val rowrdd = tx.map(_.split("\t")).map(p =>
        Row(p(0).toLong,p(1), p(2),
            p(4).toDouble,p(5).toDouble,
            p(8),p(14).toLong,p(16).toLong) )

Create the dataframe:

val city_df=sqlctx.createDataFrame(rowrdd,schema)

Spotcheck:

city_df.filter(" elevation>3500 and population>100000 ").
        orderBy(desc("elevation")).
        collect().
        foreach(println)

[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]

For future use

Save the dataframe as a parquet file ..

city_df.write.parquet("hdfs:///user/dmn/cities/city_parquet") 

.. for easy retrieval in the future:

val city_df= sqlctx.read.parquet("hdfs:///user/dmn/cities/city_parquet") 

See tab Spark Dataframe, on how to create the dataframe city_df :

Then register the table:

city_df.registerTempTable("city")

And Bob's your uncle.

Spotcheck:

sqlctx.sql("select * from city where elevation>3500 and population>100000 order by elevation desc").
       collect().foreach(println)

[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]

Note: if you run above without the collect() then the ordering may be incorrect.

R:

df<-read.table("/home/dmn/city_data/cities1000.txt",sep="\t"
                 ,quote="",stringsAsFactors=F, na.strings = "",
                 )[,c(1, 2, 3, 5, 6, 9, 15, 17)]
names(df)<-c("geonameid","name","asciiname","latitude","longitude",
             "country","population","elevation")

options(width=200)

Note: if you don't set na.strings = "" in this case, then all Namibia (code 'NA') records are flagged as having NA for country. Check: table(is.na(df$country)) should give all F's and no T's.

Dimensions:

dim(df)
145725      8

Spotcheck: cities above 3500m and having a population of more than 100000:

tf=df[(df$elevation>3500)&(df$population>100000),]
tf[order(-tf$elevation),] 

       geonameid    name asciiname  latitude longitude country population elevation
7015     3907584  Potosí    Potosi -19.58361 -65.75306      BO     141251      3967
7026     3909234   Oruro     Oruro -17.98333 -67.15000      BO     208684      3936
101370   3937513 Juliaca   Juliaca -15.50000 -70.13333      PE     245675      3834
101162   3931276    Puno      Puno -15.84220 -70.01990      PE     116552      3825
7044     3911925  La Paz    La Paz -16.50000 -68.15000      BO     812799      3782
12297    1280737   Lhasa     Lhasa  29.65000  91.10000      CN     118721      3651

Read tab-separated file into a data.table

Read:

library(data.table)
dt<-fread("/home/dmn/city_data/cities1000.txt", header=FALSE,
                na.strings = "",
                select=c(1, 2, 3, 5, 6, 9, 15, 17) )
setnames(dt,c("geonameid","name","asciiname","latitude","longitude",
              "country","population","elevation"))

options(width=200)

Spotcheck: cities above 3500m and having a population of more than 100000:

dt[(dt$elevation>3500)&(dt$population>100000),][order(-elevation),]

   geonameid    name asciiname  latitude longitude country population elevation
1:   3907584  Potosí    Potosi -19.58361 -65.75306      BO     141251      3967
2:   3909234   Oruro     Oruro -17.98333 -67.15000      BO     208684      3936
3:   3937513 Juliaca   Juliaca -15.50000 -70.13333      PE     245675      3834
4:   3931276    Puno      Puno -15.84220 -70.01990      PE     116552      3825
5:   3911925  La Paz    La Paz -16.50000 -68.15000      BO     812799      3782
6:   1280737   Lhasa     Lhasa  29.65000  91.10000      CN     118721      3651

See also: http://pandas.pydata.org/pandas-docs/stable/io.html :

import pandas as pd
import csv   

colnames= [ "geonameid","name","asciiname","latitude","longitude",
            "country","population","elevation" ]

df=pd.io.parsers.read_table("/home/dmn/city_data/cities1000.txt",
                sep="\t", header=None, names= colnames,
                quoting=csv.QUOTE_NONE,usecols=[ 0, 1, 2, 4, 5, 8, 14, 16])

pd.set_option('display.width', 200)

Number of records:

len(df)
145725

Spotcheck: cities above 3500m and having a population of more than 100000:

df[(df.elevation>3500) & (df.population>100000)].sort("elevation",ascending=False)

        geonameid     name asciiname  latitude  longitude country  population  elevation
7014      3907584   Potosí    Potosi -19.58361  -65.75306      BO      141251       3967
7025      3909234    Oruro     Oruro -17.98333  -67.15000      BO      208684       3936
100330    3937513  Juliaca   Juliaca -15.50000  -70.13333      PE      245675       3834
100122    3931276     Puno      Puno -15.84220  -70.01990      PE      116552       3825
7043      3911925   La Paz    La Paz -16.50000  -68.15000      BO      812799       3782
12296     1280737    Lhasa     Lhasa  29.65000   91.10000      CN      118721       3651

Preprocessing

The data will be loaded into a Postgres table using the copy command. With this command, it's not possible to only load a subset of columns, therefore we are using R to preprocess the file.

Startup R, read selected columns of the input, then copy the data out into file "cities.csv" :

df<-read.table("/home/dmn/city_data/cities1000.txt", sep="\t",
               quote="",stringsAsFactors=F)[,c(1, 2, 3, 5, 6, 9, 15, 17)]

# corrections for city names with a double quote in their name:
df[df$V1==1682560,c("V2","V3")]=c("T-boli","T-boli")
df[df$V1==688022,c("V2","V3")]=c("Yur-yivka","Yur-yivka")

write.table(df,"cities.csv", sep="\t",row.names=F,col.names=F,na="",quote=F) 

Sidenote: create a postgres database user

On the linux CLI :

sudo  -u postgres psql template1
template1=# create user dmn encrypted password 'dmn';
template1=# create DATABASE dmn WITH TEMPLATE = template0 ENCODING = 'UTF8' owner dmn;
template1=# \q

Startup psql :

psql -h localhost -U dmn

Load into postgres

Startup psql, and create the table:

create table t_city (
     geonameid int  primary key
    ,name      varchar(128) 
    ,asciiname varchar(128) 
    ,latitude  numeric 
    ,longitude  numeric
    ,country   varchar(5) 
    ,population int
    ,elevation int
);

Load the data:

 \copy t_city from 'cities.csv' with DELIMITER E'\t' CSV

Count:

select count(1) from t_city 

count  
--------
 145725

Spotcheck: cities above 3500m and having a population of more than 100000:

select * 
from t_city 
where elevation>3500 and population>100000
order by elevation desc;

 geonameid |  name   | asciiname | latitude  | longitude | country | population | elevation 
-----------+---------+-----------+-----------+-----------+---------+------------+-----------
   3907584 | Potosí  | Potosi    | -19.58361 | -65.75306 | BO      |     141251 |      3967
   3909234 | Oruro   | Oruro     | -17.98333 |    -67.15 | BO      |     208684 |      3936
   3937513 | Juliaca | Juliaca   |     -15.5 | -70.13333 | PE      |     245675 |      3834
   3931276 | Puno    | Puno      |  -15.8422 |  -70.0199 | PE      |     116552 |      3825
   3911925 | La Paz  | La Paz    |     -16.5 |    -68.15 | BO      |     812799 |      3782
   1280737 | Lhasa   | Lhasa     |     29.65 |      91.1 | CN      |     118721 |      3651

That concludes the loading!

 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/frameddata at 2016-10-18 07:18