|
Load the data from a csv file
Load data, from a CSV file containing city-names, country-code, the location (latitude, longitude), elevation and population, as provided by geonames.org: download.geonames.org/export/dump
- the dataset cities1000.txt can be downloaded in a zipfile from geonames.org.
- this file contains one line for every city with a population greater than 1000. For more information see 'geoname' table stub on above link
- fields are separated by tabs
- some fields will be ignored
Startup the Spark shell, and load the data file, into an RDD[String]:
$ spark-shell
var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")
tx.count()
Long = 145725
Define a case class, and a parse function :
case class City(
geonameid: Int,
name: String,
asciiname: String,
latitude: Double, longitude: Double,
country: String,
population: Int,
elevation: Int)
def parse(line: String) = {
val spl=line.split("\t")
val geonameid=spl(0).toInt
val name=spl(1)
val asciiname=spl(2)
val latitude=spl(4).toDouble
val longitude=spl(5).toDouble
val country=spl(8)
val population=spl(14).toInt
val elevation=spl(16).toInt
City(geonameid, name, asciiname, latitude, longitude, country, population, elevation)
}
Try and parse 1 line:
parse(tx.take(1)(0))
City = City(3039154,El Tarter,El Tarter,42.57952,1.65362,AD,1052,1721)
Success! Now let's parse the complete text file into City records:
var ct=tx.map(parse(_))
Check:
ct.count
Long = 145725
Spot-check: list all cities above 3500m and having a population of more than 100000, ordered by descending elevation:
var chk=ct.filter( rec => ( rec.elevation>3500) && (rec.population>100000)).collect()
chk.sortWith( (x,y) => (x.elevation>y.elevation) ).foreach(println)
City(3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967)
City(3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936)
City(3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834)
City(3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825)
City(3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782)
City(1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651)
Sidenote: a case class has a restricted number of parameters, if there are too many, you'll get this error:
Implementation restriction: case classes cannot have more than 22 parameters.
Startup the Spark shell, and load the data file, into an RDD[String]:
$ spark-shell
var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")
tx.count()
Long = 145725
Import some needed types:
import org.apache.spark.sql.types.{StructType,StructField,StringType,LongType,DoubleType};
import org.apache.spark.sql.Row;
Create the SQL context:
val sqlctx= new org.apache.spark.sql.SQLContext(sc)
Define the schema:
val schema = StructType(Array(
StructField("geonameid",LongType,false),
StructField("name",StringType,true),
StructField("asciiname",StringType,true),
StructField("latitude",DoubleType,true),
StructField("longitude",DoubleType,true),
StructField("country",StringType,true),
StructField("population",LongType,true),
StructField("elevation",LongType,true)
) )
Turn the text-line RDD into a row RDD:
val rowrdd = tx.map(_.split("\t")).map(p =>
Row(p(0).toLong,p(1), p(2),
p(4).toDouble,p(5).toDouble,
p(8),p(14).toLong,p(16).toLong) )
Create the dataframe:
val city_df=sqlctx.createDataFrame(rowrdd,schema)
Spotcheck:
city_df.filter(" elevation>3500 and population>100000 ").
orderBy(desc("elevation")).
collect().
foreach(println)
[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]
For future use
Save the dataframe as a parquet file ..
city_df.write.parquet("hdfs:///user/dmn/cities/city_parquet")
.. for easy retrieval in the future:
val city_df= sqlctx.read.parquet("hdfs:///user/dmn/cities/city_parquet")
See tab Spark Dataframe, on how to create the dataframe city_df :
Then register the table:
city_df.registerTempTable("city")
And Bob's your uncle.
Spotcheck:
sqlctx.sql("select * from city where elevation>3500 and population>100000 order by elevation desc").
collect().foreach(println)
[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]
Note: if you run above without the collect() then the ordering may be incorrect.
R:
df<-read.table("/home/dmn/city_data/cities1000.txt",sep="\t"
,quote="",stringsAsFactors=F, na.strings = "",
)[,c(1, 2, 3, 5, 6, 9, 15, 17)]
names(df)<-c("geonameid","name","asciiname","latitude","longitude",
"country","population","elevation")
options(width=200)
Note: if you don't set na.strings = "" in this case, then all Namibia (code 'NA') records are flagged as having NA for country. Check: table(is.na(df$country)) should give all F's and no T's.
Dimensions:
dim(df)
145725 8
Spotcheck: cities above 3500m and having a population of more than 100000:
tf=df[(df$elevation>3500)&(df$population>100000),]
tf[order(-tf$elevation),]
geonameid name asciiname latitude longitude country population elevation
7015 3907584 Potosí Potosi -19.58361 -65.75306 BO 141251 3967
7026 3909234 Oruro Oruro -17.98333 -67.15000 BO 208684 3936
101370 3937513 Juliaca Juliaca -15.50000 -70.13333 PE 245675 3834
101162 3931276 Puno Puno -15.84220 -70.01990 PE 116552 3825
7044 3911925 La Paz La Paz -16.50000 -68.15000 BO 812799 3782
12297 1280737 Lhasa Lhasa 29.65000 91.10000 CN 118721 3651
Read tab-separated file into a data.table
Read:
library(data.table)
dt<-fread("/home/dmn/city_data/cities1000.txt", header=FALSE,
na.strings = "",
select=c(1, 2, 3, 5, 6, 9, 15, 17) )
setnames(dt,c("geonameid","name","asciiname","latitude","longitude",
"country","population","elevation"))
options(width=200)
Spotcheck: cities above 3500m and having a population of more than 100000:
dt[(dt$elevation>3500)&(dt$population>100000),][order(-elevation),]
geonameid name asciiname latitude longitude country population elevation
1: 3907584 Potosí Potosi -19.58361 -65.75306 BO 141251 3967
2: 3909234 Oruro Oruro -17.98333 -67.15000 BO 208684 3936
3: 3937513 Juliaca Juliaca -15.50000 -70.13333 PE 245675 3834
4: 3931276 Puno Puno -15.84220 -70.01990 PE 116552 3825
5: 3911925 La Paz La Paz -16.50000 -68.15000 BO 812799 3782
6: 1280737 Lhasa Lhasa 29.65000 91.10000 CN 118721 3651
See also: http://pandas.pydata.org/pandas-docs/stable/io.html :
import pandas as pd
import csv
colnames= [ "geonameid","name","asciiname","latitude","longitude",
"country","population","elevation" ]
df=pd.io.parsers.read_table("/home/dmn/city_data/cities1000.txt",
sep="\t", header=None, names= colnames,
quoting=csv.QUOTE_NONE,usecols=[ 0, 1, 2, 4, 5, 8, 14, 16])
pd.set_option('display.width', 200)
Number of records:
len(df)
145725
Spotcheck: cities above 3500m and having a population of more than 100000:
df[(df.elevation>3500) & (df.population>100000)].sort("elevation",ascending=False)
geonameid name asciiname latitude longitude country population elevation
7014 3907584 Potosí Potosi -19.58361 -65.75306 BO 141251 3967
7025 3909234 Oruro Oruro -17.98333 -67.15000 BO 208684 3936
100330 3937513 Juliaca Juliaca -15.50000 -70.13333 PE 245675 3834
100122 3931276 Puno Puno -15.84220 -70.01990 PE 116552 3825
7043 3911925 La Paz La Paz -16.50000 -68.15000 BO 812799 3782
12296 1280737 Lhasa Lhasa 29.65000 91.10000 CN 118721 3651
Preprocessing
The data will be loaded into a Postgres table using the copy command. With this command, it's not possible to only load a subset of columns, therefore we are using R to preprocess the file.
Startup R, read selected columns of the input, then copy the data out into file "cities.csv" :
df<-read.table("/home/dmn/city_data/cities1000.txt", sep="\t",
quote="",stringsAsFactors=F)[,c(1, 2, 3, 5, 6, 9, 15, 17)]
# corrections for city names with a double quote in their name:
df[df$V1==1682560,c("V2","V3")]=c("T-boli","T-boli")
df[df$V1==688022,c("V2","V3")]=c("Yur-yivka","Yur-yivka")
write.table(df,"cities.csv", sep="\t",row.names=F,col.names=F,na="",quote=F)
Sidenote: create a postgres database user
On the linux CLI :
sudo -u postgres psql template1
template1=# create user dmn encrypted password 'dmn';
template1=# create DATABASE dmn WITH TEMPLATE = template0 ENCODING = 'UTF8' owner dmn;
template1=# \q
Startup psql :
psql -h localhost -U dmn
Load into postgres
Startup psql, and create the table:
create table t_city (
geonameid int primary key
,name varchar(128)
,asciiname varchar(128)
,latitude numeric
,longitude numeric
,country varchar(5)
,population int
,elevation int
);
Load the data:
\copy t_city from 'cities.csv' with DELIMITER E'\t' CSV
Count:
select count(1) from t_city
count
--------
145725
Spotcheck: cities above 3500m and having a population of more than 100000:
select *
from t_city
where elevation>3500 and population>100000
order by elevation desc;
geonameid | name | asciiname | latitude | longitude | country | population | elevation
-----------+---------+-----------+-----------+-----------+---------+------------+-----------
3907584 | Potosí | Potosi | -19.58361 | -65.75306 | BO | 141251 | 3967
3909234 | Oruro | Oruro | -17.98333 | -67.15 | BO | 208684 | 3936
3937513 | Juliaca | Juliaca | -15.5 | -70.13333 | PE | 245675 | 3834
3931276 | Puno | Puno | -15.8422 | -70.0199 | PE | 116552 | 3825
3911925 | La Paz | La Paz | -16.5 | -68.15 | BO | 812799 | 3782
1280737 | Lhasa | Lhasa | 29.65 | 91.1 | CN | 118721 | 3651
That concludes the loading!
| |