Framed Data
 
01_intro
20160329

Intro

The intention of this article is to show how similar operations are applied to the same data on different platforms, ranging from SQL on Postgres to R and Python Pandas, and Spark Scala.

02a_data_create
20160601

Create dataframe from arrays

Heres' how to create dataframes udf and tdf (aka tables t_user and t_transaction) from simple arrays.

Create dataframe udf

Create a dataframe from arrays:

id_v=c(9000, 9001, 9002, 9003, 9004, 9005, 9006, 9007, 9008)
name_v=c('Gerd Abrahamsson', 'Hanna Andersson', 'August Bergsten',
      'Arvid Bohlin', 'Edvard Marklund', 'Ragnhild Brännström',
      'Börje Wallin', 'Otto Byström','Elise Dahlström')

udf=data.frame( uid=id_v, name=name_v)

Content of udf:

   uid                name
1 9000    Gerd Abrahamsson
2 9001     Hanna Andersson
3 9002     August Bergsten
4 9003        Arvid Bohlin
5 9004     Edvard Marklund
6 9005 Ragnhild Brännström
7 9006        Börje Wallin
8 9007        Otto Byström
9 9008     Elise Dahlström

Create dataframe tdf

xid_v <- c( 5000, 5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010, 5011, 5012,
            5013, 5014, 5015, 5016, 5017, 5018, 5019, 5020)

uid_v <- c( 9008, 9003, 9003, 9007, 9004, 9007, 9002, 9008, 9005, 9008, 9006, 9008, 9008,
            9005, 9005, 9001, 9000, 9003, 9002, 9001, 9004)

amount_v <- c(498, 268, 621, -401, 720, -492, -153, 272, -250, 82, 549, -571, 814, -114,
              819, -404, -95, 428, -549, -462, -339)

date_v <- c('2016-02-21 06:28:49', '2016-01-17 13:37:38', '2016-02-24 15:36:53',
            '2016-01-14 16:43:27', '2016-05-14 16:29:54', '2016-02-24 23:58:57',
            '2016-02-18 17:58:33', '2016-05-26 12:00:00', '2016-02-24 23:14:52',
            '2016-04-20 18:33:25', '2016-02-16 14:37:25', '2016-02-28 13:05:33',
            '2016-03-20 13:29:11', '2016-02-06 14:55:10', '2016-01-18 10:50:20',
            '2016-02-20 22:08:23', '2016-05-09 10:26:05', '2016-03-27 15:30:47',
            '2016-04-15 21:44:49', '2016-03-09 20:32:35', '2016-05-03 17:11:21')

tdf<-data.frame( xid=xid_v, uid=uid_v, amount=amount_v, date=as.POSIXct(date_v) )

Content of tdf:

    xid  uid amount                date
1  5000 9008    498 2016-02-21 06:28:49
2  5001 9003    268 2016-01-17 13:37:38
3  5002 9003    621 2016-02-24 15:36:53
4  5003 9007   -401 2016-01-14 16:43:27
5  5004 9004    720 2016-05-14 16:29:54
6  5005 9007   -492 2016-02-24 23:58:57
7  5006 9002   -153 2016-02-18 17:58:33
8  5007 9008    272 2016-05-26 12:00:00
9  5008 9005   -250 2016-02-24 23:14:52
10 5009 9008     82 2016-04-20 18:33:25
11 5010 9006    549 2016-02-16 14:37:25
12 5011 9008   -571 2016-02-28 13:05:33
13 5012 9008    814 2016-03-20 13:29:11
14 5013 9005   -114 2016-02-06 14:55:10
15 5014 9005    819 2016-01-18 10:50:20
16 5015 9001   -404 2016-02-20 22:08:23
17 5016 9000    -95 2016-05-09 10:26:05
18 5017 9003    428 2016-03-27 15:30:47
19 5018 9002   -549 2016-04-15 21:44:49
20 5019 9001   -462 2016-03-09 20:32:35
21 5020 9004   -339 2016-05-03 17:11:21

Create dataframe udf

Create a dataframe from arrays:

import pandas as pd

uid_v= [9000, 9001, 9002, 9003, 9004, 9005, 9006, 9007, 9008]
name_v=[u'Gerd Abrahamsson', u'Hanna Andersson', u'August Bergsten',
      u'Arvid Bohlin', u'Edvard Marklund', u'Ragnhild Br\xe4nnstr\xf6m',
      u'B\xf6rje Wallin', u'Otto Bystr\xf6m',u'Elise Dahlstr\xf6m']


udf=pd.DataFrame(uid_v, columns=['uid'])
udf['name']=name_v

Content of udf:

    uid                 name
0  9000     Gerd Abrahamsson
1  9001      Hanna Andersson
2  9002      August Bergsten
3  9003         Arvid Bohlin
4  9004      Edvard Marklund
5  9005  Ragnhild Brännström
6  9006         Börje Wallin
7  9007         Otto Byström
8  9008      Elise Dahlström

Create dataframe tdf

xid_v=[ 5000, 5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010, 5011, 5012,
        5013, 5014, 5015, 5016, 5017, 5018, 5019, 5020]

uid_v=[ 9008, 9003, 9003, 9007, 9004, 9007, 9002, 9008, 9005, 9008, 9006, 9008, 9008,
        9005, 9005, 9001, 9000, 9003, 9002, 9001, 9004]

amount_v= [498, 268, 621, -401, 720, -492, -153, 272, -250, 82, 549, -571, 814, -114,
        819, -404, -95, 428, -549, -462, -339]

date_v=['2016-02-21 06:28:49', '2016-01-17 13:37:38', '2016-02-24 15:36:53',
        '2016-01-14 16:43:27', '2016-05-14 16:29:54', '2016-02-24 23:58:57',
        '2016-02-18 17:58:33', '2016-05-26 12:00:00', '2016-02-24 23:14:52',
        '2016-04-20 18:33:25', '2016-02-16 14:37:25', '2016-02-28 13:05:33',
        '2016-03-20 13:29:11', '2016-02-06 14:55:10', '2016-01-18 10:50:20',
        '2016-02-20 22:08:23', '2016-05-09 10:26:05', '2016-03-27 15:30:47',
        '2016-04-15 21:44:49', '2016-03-09 20:32:35', '2016-05-03 17:11:21']

tdf=pd.DataFrame(xid_v, columns=['xid'])
tdf['uid']=uid_v
tdf['amount']=amount_v
tdf['date']=pd.to_datetime(date_v)

Content of tdf:

     xid   uid  amount                date
0   5000  9008     498 2016-02-21 06:28:49
1   5001  9003     268 2016-01-17 13:37:38
2   5002  9003     621 2016-02-24 15:36:53
3   5003  9007    -401 2016-01-14 16:43:27
4   5004  9004     720 2016-05-14 16:29:54
5   5005  9007    -492 2016-02-24 23:58:57
6   5006  9002    -153 2016-02-18 17:58:33
7   5007  9008     272 2016-05-26 12:00:00
8   5008  9005    -250 2016-02-24 23:14:52
9   5009  9008      82 2016-04-20 18:33:25
10  5010  9006     549 2016-02-16 14:37:25
11  5011  9008    -571 2016-02-28 13:05:33
12  5012  9008     814 2016-03-20 13:29:11
13  5013  9005    -114 2016-02-06 14:55:10
14  5014  9005     819 2016-01-18 10:50:20
15  5015  9001    -404 2016-02-20 22:08:23
16  5016  9000     -95 2016-05-09 10:26:05
17  5017  9003     428 2016-03-27 15:30:47
18  5018  9002    -549 2016-04-15 21:44:49
19  5019  9001    -462 2016-03-09 20:32:35
20  5020  9004    -339 2016-05-03 17:11:21

Example: select data from a certain date range

from datetime import datetime
startdate=datetime.strptime('2016-02-01','%Y-%m-%d')
enddate=datetime.strptime('2016-02-29','%Y-%m-%d')
tdf[((tdf.txdate >= startdate) & (tdf.txdate <=enddate))]

The easiest way to generate the sql statements is to use python pandas to connect and export the dataframes to a sqlite database, like this:

import sqlite3
con=sqlite3.connect('db.sqlite') 
udf.to_sql(name='t_user', con=con, index=False)
tdf.to_sql(name='t_transaction', con=con, index=False)
con.close()

Then issue this command on the CLI :

sqlite3 db.sqlite .dump > create.sql 

.. to give you these SQL statements:

PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE "t_user" (
"uid" INTEGER,
  "name" TEXT
);
INSERT INTO "t_user" VALUES(9000,'Gerd Abrahamsson');
INSERT INTO "t_user" VALUES(9001,'Hanna Andersson');
INSERT INTO "t_user" VALUES(9002,'August Bergsten');
INSERT INTO "t_user" VALUES(9003,'Arvid Bohlin');
INSERT INTO "t_user" VALUES(9004,'Edvard Marklund');
INSERT INTO "t_user" VALUES(9005,'Ragnhild Brännström');
INSERT INTO "t_user" VALUES(9006,'Börje Wallin');
INSERT INTO "t_user" VALUES(9007,'Otto Byström');
INSERT INTO "t_user" VALUES(9008,'Elise Dahlström');
CREATE TABLE "t_transaction" (
"xid" INTEGER,
  "uid" INTEGER,
  "amount" INTEGER,
  "date" TIMESTAMP
);
INSERT INTO "t_transaction" VALUES(5000,9008,498,'2016-02-21 06:28:49');
INSERT INTO "t_transaction" VALUES(5001,9003,268,'2016-01-17 13:37:38');
INSERT INTO "t_transaction" VALUES(5002,9003,621,'2016-02-24 15:36:53');
INSERT INTO "t_transaction" VALUES(5003,9007,-401,'2016-01-14 16:43:27');
INSERT INTO "t_transaction" VALUES(5004,9004,720,'2016-05-14 16:29:54');
INSERT INTO "t_transaction" VALUES(5005,9007,-492,'2016-02-24 23:58:57');
INSERT INTO "t_transaction" VALUES(5006,9002,-153,'2016-02-18 17:58:33');
INSERT INTO "t_transaction" VALUES(5007,9008,272,'2016-05-26 12:00:00');
INSERT INTO "t_transaction" VALUES(5008,9005,-250,'2016-02-24 23:14:52');
INSERT INTO "t_transaction" VALUES(5009,9008,82,'2016-04-20 18:33:25');
INSERT INTO "t_transaction" VALUES(5010,9006,549,'2016-02-16 14:37:25');
INSERT INTO "t_transaction" VALUES(5011,9008,-571,'2016-02-28 13:05:33');
INSERT INTO "t_transaction" VALUES(5012,9008,814,'2016-03-20 13:29:11');
INSERT INTO "t_transaction" VALUES(5013,9005,-114,'2016-02-06 14:55:10');
INSERT INTO "t_transaction" VALUES(5014,9005,819,'2016-01-18 10:50:20');
INSERT INTO "t_transaction" VALUES(5015,9001,-404,'2016-02-20 22:08:23');
INSERT INTO "t_transaction" VALUES(5016,9000,-95,'2016-05-09 10:26:05');
INSERT INTO "t_transaction" VALUES(5017,9003,428,'2016-03-27 15:30:47');
INSERT INTO "t_transaction" VALUES(5018,9002,-549,'2016-04-15 21:44:49');
INSERT INTO "t_transaction" VALUES(5019,9001,-462,'2016-03-09 20:32:35');
INSERT INTO "t_transaction" VALUES(5020,9004,-339,'2016-05-03 17:11:21');
COMMIT;

Add these index creation statements for completeness:

CREATE INDEX "ix_t_user_uid" ON "t_user" ("uid");
CREATE INDEX "ix_t_transaction_xid" ON "t_transaction" ("xid");

This is a bit different then prior tab's: instead of creating from columns, we create it from a sequence of rows.

Run the following in the Spark shell:

val udf_df=sx.createDataFrame(Seq(  
    (9000,"Gerd Abrahamsson"),
    (9001,"Hanna Andersson"),
    (9002,"August Bergsten"),
    (9003,"Arvid Bohlin"),
    (9004,"Edvard Marklund"),
    (9005,"Ragnhild Brännström"),
    (9006,"Börje Wallin"),
    (9007,"Otto Byström"),
    (9008,"Elise Dahlström"))).toDF( "uid","name")



import java.sql.Timestamp

val date_format=new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val udfToDate=udf[Timestamp,String]{ (s) => 
        new java.sql.Timestamp(date_format.parse(s).getTime()) }



val tdf_df=sx.createDataFrame(Seq(  
    (5000,9008,498,"2016-02-21 06:28:49"),
    (5001,9003,268,"2016-01-17 13:37:38"),
    (5002,9003,621,"2016-02-24 15:36:53"),
    (5003,9007,-401,"2016-01-14 16:43:27"),
    (5004,9004,720,"2016-05-14 16:29:54"),
    (5005,9007,-492,"2016-02-24 23:58:57"),
    (5006,9002,-153,"2016-02-18 17:58:33"),
    (5007,9008,272,"2016-05-26 12:00:00"),
    (5008,9005,-250,"2016-02-24 23:14:52"),
    (5009,9008,82,"2016-04-20 18:33:25"),
    (5010,9006,549,"2016-02-16 14:37:25"),
    (5011,9008,-571,"2016-02-28 13:05:33"),
    (5012,9008,814,"2016-03-20 13:29:11"),
    (5013,9005,-114,"2016-02-06 14:55:10"),
    (5014,9005,819,"2016-01-18 10:50:20"),
    (5015,9001,-404,"2016-02-20 22:08:23"),
    (5016,9000,-95,"2016-05-09 10:26:05"),
    (5017,9003,428,"2016-03-27 15:30:47"),
    (5018,9002,-549,"2016-04-15 21:44:49"),
    (5019,9001,-462,"2016-03-09 20:32:35"),
    (5020,9004,-339,"2016-05-03 17:11:21"))
  ).toDF( "xid","uid","amount","date").
   .withColumn("date", udfToDate($"date"))

Et voila: the dataframes udf_df and tdf_df.

..

Sidenote: create fake data

The above user and transaction data was created using the following script, which employs the fake-factory package ( 'pip install fake-factory') to generated random data

The python code: 

from faker import Factory
import pandas as pd
import random

fake = Factory.create('sv_SE')

# create user data
uid_v=[]
name_v=[]
for i in range(0,9):
    uid_v.append(9000+i)
    name_v.append(fake.name())

# create transaction data
xid_v=[]
uid_v=[]
amount_v=[]
date_v=[]
sign=[-1,1]
for i in range(0,21):
    xid_v.append(5000+i)
    amount_v.append(sign[random.randint(0,1)]*random.randint(80,900))
    uid_v.append(id_v[random.randint(0,len(id_v)-1)])
    date_v.append(str(fake.date_time_this_year()))
02b_data_load
20160601

Load the data from a csv file

Load data, from a CSV file containing city-names, country-code, the location (latitude, longitude), elevation and population, as provided by geonames.org: download.geonames.org/export/dump

  • the dataset cities1000.txt can be downloaded in a zipfile from geonames.org.
  • this file contains one line for every city with a population greater than 1000. For more information see 'geoname' table stub on above link
  • fields are separated by tabs
  • some fields will be ignored

Startup the Spark shell, and load the data file, into an RDD[String]:

$ spark-shell

var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")  

tx.count()
Long = 145725

Define a case class, and a parse function :

case class City(
        geonameid: Int, 
        name: String, 
        asciiname: String, 
        latitude: Double, longitude: Double, 
        country: String, 
        population: Int, 
        elevation: Int) 

def parse(line: String) = { 
  val spl=line.split("\t") 
  val geonameid=spl(0).toInt
  val name=spl(1)
  val asciiname=spl(2)
  val latitude=spl(4).toDouble
  val longitude=spl(5).toDouble
  val country=spl(8)
  val population=spl(14).toInt
  val elevation=spl(16).toInt
  City(geonameid, name, asciiname, latitude, longitude, country, population, elevation)
}

Try and parse 1 line:

parse(tx.take(1)(0))
City = City(3039154,El Tarter,El Tarter,42.57952,1.65362,AD,1052,1721)

Success! Now let's parse the complete text file into City records:

var ct=tx.map(parse(_))

Check:

ct.count
Long = 145725

Spot-check: list all cities above 3500m and having a population of more than 100000, ordered by descending elevation:

var chk=ct.filter( rec => ( rec.elevation>3500) && (rec.population>100000)).collect()
chk.sortWith( (x,y) => (x.elevation>y.elevation) ).foreach(println)

City(3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967)
City(3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936)
City(3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834)
City(3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825)
City(3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782)
City(1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651)

Sidenote: a case class has a restricted number of parameters, if there are too many, you'll get this error:

Implementation restriction: case classes cannot have more than 22 parameters.

Startup the Spark shell, and load the data file, into an RDD[String]:

$ spark-shell

var tx=sc.textFile("file:///home/dmn/city_data/cities1000.txt")  

tx.count()
Long = 145725

Import some needed types:

import org.apache.spark.sql.types.{StructType,StructField,StringType,LongType,DoubleType};
import org.apache.spark.sql.Row;

Create the SQL context:

val sqlctx= new org.apache.spark.sql.SQLContext(sc)

Define the schema:

val schema = StructType(Array(
    StructField("geonameid",LongType,false),
    StructField("name",StringType,true),
    StructField("asciiname",StringType,true),
    StructField("latitude",DoubleType,true),
    StructField("longitude",DoubleType,true),
    StructField("country",StringType,true),
    StructField("population",LongType,true),
    StructField("elevation",LongType,true)
    ) )

Turn the text-line RDD into a row RDD:

val rowrdd = tx.map(_.split("\t")).map(p =>
        Row(p(0).toLong,p(1), p(2),
            p(4).toDouble,p(5).toDouble,
            p(8),p(14).toLong,p(16).toLong) )

Create the dataframe:

val city_df=sqlctx.createDataFrame(rowrdd,schema)

Spotcheck:

city_df.filter(" elevation>3500 and population>100000 ").
        orderBy(desc("elevation")).
        collect().
        foreach(println)

[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]

For future use

Save the dataframe as a parquet file ..

city_df.write.parquet("hdfs:///user/dmn/cities/city_parquet") 

.. for easy retrieval in the future:

val city_df= sqlctx.read.parquet("hdfs:///user/dmn/cities/city_parquet") 

See tab Spark Dataframe, on how to create the dataframe city_df :

Then register the table:

city_df.registerTempTable("city")

And Bob's your uncle.

Spotcheck:

sqlctx.sql("select * from city where elevation>3500 and population>100000 order by elevation desc").
       collect().foreach(println)

[3907584,Potosí,Potosi,-19.58361,-65.75306,BO,141251,3967]
[3909234,Oruro,Oruro,-17.98333,-67.15,BO,208684,3936]
[3937513,Juliaca,Juliaca,-15.5,-70.13333,PE,245675,3834]
[3931276,Puno,Puno,-15.8422,-70.0199,PE,116552,3825]
[3911925,La Paz,La Paz,-16.5,-68.15,BO,812799,3782]
[1280737,Lhasa,Lhasa,29.65,91.1,CN,118721,3651]

Note: if you run above without the collect() then the ordering may be incorrect.

R:

df<-read.table("/home/dmn/city_data/cities1000.txt",sep="\t"
                 ,quote="",stringsAsFactors=F, na.strings = "",
                 )[,c(1, 2, 3, 5, 6, 9, 15, 17)]
names(df)<-c("geonameid","name","asciiname","latitude","longitude",
             "country","population","elevation")

options(width=200)

Note: if you don't set na.strings = "" in this case, then all Namibia (code 'NA') records are flagged as having NA for country. Check: table(is.na(df$country)) should give all F's and no T's.

Dimensions:

dim(df)
145725      8

Spotcheck: cities above 3500m and having a population of more than 100000:

tf=df[(df$elevation>3500)&(df$population>100000),]
tf[order(-tf$elevation),] 

       geonameid    name asciiname  latitude longitude country population elevation
7015     3907584  Potosí    Potosi -19.58361 -65.75306      BO     141251      3967
7026     3909234   Oruro     Oruro -17.98333 -67.15000      BO     208684      3936
101370   3937513 Juliaca   Juliaca -15.50000 -70.13333      PE     245675      3834
101162   3931276    Puno      Puno -15.84220 -70.01990      PE     116552      3825
7044     3911925  La Paz    La Paz -16.50000 -68.15000      BO     812799      3782
12297    1280737   Lhasa     Lhasa  29.65000  91.10000      CN     118721      3651

Read tab-separated file into a data.table

Read:

library(data.table)
dt<-fread("/home/dmn/city_data/cities1000.txt", header=FALSE,
                na.strings = "",
                select=c(1, 2, 3, 5, 6, 9, 15, 17) )
setnames(dt,c("geonameid","name","asciiname","latitude","longitude",
              "country","population","elevation"))

options(width=200)

Spotcheck: cities above 3500m and having a population of more than 100000:

dt[(dt$elevation>3500)&(dt$population>100000),][order(-elevation),]

   geonameid    name asciiname  latitude longitude country population elevation
1:   3907584  Potosí    Potosi -19.58361 -65.75306      BO     141251      3967
2:   3909234   Oruro     Oruro -17.98333 -67.15000      BO     208684      3936
3:   3937513 Juliaca   Juliaca -15.50000 -70.13333      PE     245675      3834
4:   3931276    Puno      Puno -15.84220 -70.01990      PE     116552      3825
5:   3911925  La Paz    La Paz -16.50000 -68.15000      BO     812799      3782
6:   1280737   Lhasa     Lhasa  29.65000  91.10000      CN     118721      3651

See also: http://pandas.pydata.org/pandas-docs/stable/io.html :

import pandas as pd
import csv   

colnames= [ "geonameid","name","asciiname","latitude","longitude",
            "country","population","elevation" ]

df=pd.io.parsers.read_table("/home/dmn/city_data/cities1000.txt",
                sep="\t", header=None, names= colnames,
                quoting=csv.QUOTE_NONE,usecols=[ 0, 1, 2, 4, 5, 8, 14, 16])

pd.set_option('display.width', 200)

Number of records:

len(df)
145725

Spotcheck: cities above 3500m and having a population of more than 100000:

df[(df.elevation>3500) & (df.population>100000)].sort("elevation",ascending=False)

        geonameid     name asciiname  latitude  longitude country  population  elevation
7014      3907584   Potosí    Potosi -19.58361  -65.75306      BO      141251       3967
7025      3909234    Oruro     Oruro -17.98333  -67.15000      BO      208684       3936
100330    3937513  Juliaca   Juliaca -15.50000  -70.13333      PE      245675       3834
100122    3931276     Puno      Puno -15.84220  -70.01990      PE      116552       3825
7043      3911925   La Paz    La Paz -16.50000  -68.15000      BO      812799       3782
12296     1280737    Lhasa     Lhasa  29.65000   91.10000      CN      118721       3651

Preprocessing

The data will be loaded into a Postgres table using the copy command. With this command, it's not possible to only load a subset of columns, therefore we are using R to preprocess the file.

Startup R, read selected columns of the input, then copy the data out into file "cities.csv" :

df<-read.table("/home/dmn/city_data/cities1000.txt", sep="\t",
               quote="",stringsAsFactors=F)[,c(1, 2, 3, 5, 6, 9, 15, 17)]

# corrections for city names with a double quote in their name:
df[df$V1==1682560,c("V2","V3")]=c("T-boli","T-boli")
df[df$V1==688022,c("V2","V3")]=c("Yur-yivka","Yur-yivka")

write.table(df,"cities.csv", sep="\t",row.names=F,col.names=F,na="",quote=F) 

Sidenote: create a postgres database user

On the linux CLI :

sudo  -u postgres psql template1
template1=# create user dmn encrypted password 'dmn';
template1=# create DATABASE dmn WITH TEMPLATE = template0 ENCODING = 'UTF8' owner dmn;
template1=# \q

Startup psql :

psql -h localhost -U dmn

Load into postgres

Startup psql, and create the table:

create table t_city (
     geonameid int  primary key
    ,name      varchar(128) 
    ,asciiname varchar(128) 
    ,latitude  numeric 
    ,longitude  numeric
    ,country   varchar(5) 
    ,population int
    ,elevation int
);

Load the data:

 \copy t_city from 'cities.csv' with DELIMITER E'\t' CSV

Count:

select count(1) from t_city 

count  
--------
 145725

Spotcheck: cities above 3500m and having a population of more than 100000:

select * 
from t_city 
where elevation>3500 and population>100000
order by elevation desc;

 geonameid |  name   | asciiname | latitude  | longitude | country | population | elevation 
-----------+---------+-----------+-----------+-----------+---------+------------+-----------
   3907584 | Potosí  | Potosi    | -19.58361 | -65.75306 | BO      |     141251 |      3967
   3909234 | Oruro   | Oruro     | -17.98333 |    -67.15 | BO      |     208684 |      3936
   3937513 | Juliaca | Juliaca   |     -15.5 | -70.13333 | PE      |     245675 |      3834
   3931276 | Puno    | Puno      |  -15.8422 |  -70.0199 | PE      |     116552 |      3825
   3911925 | La Paz  | La Paz    |     -16.5 |    -68.15 | BO      |     812799 |      3782
   1280737 | Lhasa   | Lhasa     |     29.65 |      91.1 | CN      |     118721 |      3651

That concludes the loading!

03_aggregate
20160601

Aggregate

Simple aggregate: sum the population of the cities for the EU28 countries.

Data entity used: city (see section 2b data load ).

Scala spark

Prep:

val eu28=List("AT", "BE", "BG", "CY", "CZ", "DE", "DK", "EE", "ES", "FI", "FR",
              "GB", "GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT", "NL",
              "PL", "PT", "RO", "SE", "SI", "SK", "AN")

Query:

ct.filter( r => (eu28 contains r.country)).  // retain cities of the 28 EU countries
   map( r => (r.country, r.population)).     // create (country, population) tuples
   reduceByKey( (_ + _)).                    // sum up
   sortBy(-_._2).                            // sort by value, negative sign for desc
   foreach( println)

(DE,85880759)
(GB,63628475)
(FR,52643435)
(IT,52376411)
(ES,49756525)
(PL,28776423)
(RO,25000006)
(NL,15022475)
(HU,10263483)
(BE,10117760)
(CZ,8720141)
(GR,8562837)
(SE,7803509)
(PT,7097218)
(BG,5457463)
(FI,5178753)
(AT,4924993)
(DK,4475046)
(HR,3744956)
(IE,3548735)
(SK,2971938)
(LT,2755868)
(LV,1735119)
(SI,1183740)
(EE,995124)
(CY,797327)
(MT,398419)
(LU,358224)

Query:

sqlctx.sql("""select country, sum(population) as sum_pop 
              from city  
              where country in ('AT','BE','BG','CY','CZ','DE','DK','EE','ES','FI','FR',
                         'GB','GR','HR','HU','IE','IT','LT','LU','LV','MT','NL',
                         'PL','PT','RO','SE','SI','SK','AN')
              group by country
              order by sum_pop desc""").collect().foreach(println)

Result:

[DE,85880759]
[GB,63628475]
[FR,52643435]
[IT,52376411]
[ES,49756525]
[PL,28776423]
[RO,25000006]
[NL,15022475]
[HU,10263483]
[BE,10117760]
[CZ,8720141]
[GR,8562837]
[SE,7803509]
[PT,7097218]
[BG,5457463]
[FI,5178753]
[AT,4924993]
[DK,4475046]
[HR,3744956]
[IE,3548735]
[SK,2971938]
[LT,2755868]
[LV,1735119]
[SI,1183740]
[EE,995124]
[CY,797327]
[MT,398419]
[LU,358224]

The list of countries that make-up the EU28 :

eu28=c("AT", "BE", "BG", "CY", "CZ", "DE", "DK", "EE", "ES", "FI", "FR",
       "GB", "GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT", "NL",
       "PL", "PT", "RO", "SE", "SI", "SK", "AN")

Create a subset:

eudf=df[df$country %in% eu28,]

dim(eudf)
57251     8

Aggregate:

aggregate( eudf$population, by=list(country=eudf$country),sum)  

   country        x
1       AT  4924993
2       BE 10117760
3       BG  5457463
4       CY   797327
5       CZ  8720141
6       DE 85880759
7       DK  4475046
8       EE   995124
9       ES 49756525
10      FI  5178753
11      FR 52643435
12      GB 63628475
13      GR  8562837
14      HR  3744956
15      HU 10263483
16      IE  3548735
17      IT 52376411
18      LT  2755868
19      LU   358224
20      LV  1735119
21      MT   398419
22      NL 15022475
23      PL 28776423
24      PT  7097218
25      RO 25000006
26      SE  7803509
27      SI  1183740
28      SK  2971938

Prep:

eu28=c("AT", "BE", "BG", "CY", "CZ", "DE", "DK", "EE", "ES", "FI", "FR",
       "GB", "GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT", "NL",
       "PL", "PT", "RO", "SE", "SI", "SK", "AN")

For data.tables both operations (subsetting and aggregation) can be written as a one-liner:

 dt[country %in% eu28][,sum(population),by=country]

    country       V1
 1:      AT  4924993
 2:      BE 10117760
 3:      BG  5457463
 4:      CY   797327
 5:      CZ  8720141
 6:      DE 85880759
 7:      DK  4475046
 8:      EE   995124
 9:      ES 49756525
10:      FI  5178753
11:      FR 52643435
12:      GB 63628475
13:      GR  8562837
14:      HR  3744956
15:      HU 10263483
16:      IE  3548735
17:      IT 52376411
18:      LT  2755868
19:      LU   358224
20:      LV  1735119
21:      MT   398419
22:      NL 15022475
23:      PL 28776423
24:      PT  7097218
25:      RO 25000006
26:      SE  7803509
27:      SI  1183740
28:      SK  2971938

Python

Prep:

eu28=set(["AT", "BE", "BG", "CY", "CZ", "DE", "DK", "EE", "ES", "FI", "FR",
          "GB", "GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT", "NL",
          "PL", "PT", "RO", "SE", "SI", "SK", "AN"])

Agg:

df[df.country.isin(eu28)][["country","population"]].groupby(['country']).sum()

country  population
AT          4924993
BE         10117760
BG          5457463
CY           797327
CZ          8720141
DE         85880759
DK          4475046
EE           995124
ES         49756525
FI          5178753
FR         52643435
GB         50690526
GR          8562837
HR          3744956
HU         10263483
IE          3548735
IT         52376411
LT          2755868
LU           358224
LV          1735119
MT           398419
NL         15022475
PL         28776423
PT          7097218
RO         25000006
SE          7803509
SI          1183740
SK          2971938

Postgres:

select country, sum(population) as sum_pop
from t_city 
where country in ('AT', 'BE', 'BG', 'CY', 'CZ', 'DE', 'DK', 'EE', 'ES', 'FI', 'FR',
         'GB', 'GR', 'HR', 'HU', 'IE', 'IT', 'LT', 'LU', 'LV', 'MT', 'NL',
         'PL', 'PT', 'RO', 'SE', 'SI', 'SK', 'AN')
group by country
order by 2 desc;

Result:

 country | sum_pop  
---------+----------
 DE      | 85880759
 GB      | 63628475
 FR      | 52643435
 IT      | 52376411
 ES      | 49756525
 PL      | 28776423
 RO      | 25000006
 NL      | 15022475
 HU      | 10263483
 BE      | 10117760
 CZ      |  8720141
 GR      |  8562837
 SE      |  7803509
 PT      |  7097218
 BG      |  5457463
 FI      |  5178753
 AT      |  4924993
 DK      |  4475046
 HR      |  3744956
 IE      |  3548735
 SK      |  2971938
 LT      |  2755868
 LV      |  1735119
 SI      |  1183740
 EE      |   995124
 CY      |   797327
 MT      |   398419
 LU      |   358224
(28 rows)
04_frequency
20160601

Frequency

Count the occurrence of the city names, and list the top 20. Additional condition: population has to be greater than 100.

Data entity used: city (see section 2b data load ).

Query:

ct.filter( r => r.population>100).
   map( r => (r.asciiname, 1) ).
   reduceByKey( (_ + _)).
   sortBy(-_._2).
   take(20).
   foreach(println)

Result:

(San Antonio,31)
(San Miguel,31)
(San Francisco,28)
(San Jose,26)
(San Isidro,25)
(Santa Cruz,25)
(Buenavista,24)
(Clinton,24)
(Newport,24)
(San Vicente,23)
(Victoria,23)
(Santa Maria,23)
(Richmond,22)
(San Carlos,21)
(Santa Ana,21)
(Georgetown,21)
(San Pedro,20)
(Springfield,20)
(Franklin,20)
(Salem,19)

Another way is to use the RDD.countByValue function which for RDD[T] returns Map[T,Long]. BUT this turns our RDD into a scala.collection.Map[String,Long], ie. it's now a 'local' collection, and no longer distributed.

ct.filter( r => r.population>100).
   map( r => r.asciiname ).
   countByValue().
   toList.
   sortBy(-_._2).
   take(20).foreach(println)

Query:

city_df.filter(" population>100 ").
        groupBy("asciiname").count().
        orderBy(desc("count")).
        take(20).
        foreach(println)

Result:

[San Miguel,31]                                                                 
[San Antonio,31]
[San Francisco,28]
[San Jose,26]
[San Isidro,25]
[Santa Cruz,25]
[Clinton,24]
[Buenavista,24]
[Newport,24]
[Victoria,23]
[Santa Maria,23]
[San Vicente,23]
[Richmond,22]
[Santa Ana,21]
[San Carlos,21]
[Georgetown,21]
[Springfield,20]
[Franklin,20]
[San Pedro,20]
[Greenville,19]

Query:

sqlctx.sql("""
select asciiname, count(1) as cnt
from city   
where population>100
group by asciiname 
order by cnt desc
limit 20
""").collect().foreach(println)

Result:

[San Antonio,31]                                                                
[San Miguel,31]
[San Francisco,28]
[San Jose,26]
[San Isidro,25]
[Santa Cruz,25]
[Clinton,24]
[Buenavista,24]
[Newport,24]
[Santa Maria,23]
[Victoria,23]
[San Vicente,23]
[Richmond,22]
[San Carlos,21]
[Georgetown,21]
[Santa Ana,21]
[Franklin,20]
[Springfield,20]
[San Pedro,20]
[La Union,19]

Making use of the count function of plyr:

library(plyr)

ff=count(df[df$population>100,], "asciiname")
ff[order(-ff$freq)[1:20],]

          asciiname freq
75375   San Antonio   31
76514    San Miguel   31
75784 San Francisco   28
76042      San Jose   26
76000    San Isidro   25
77019    Santa Cruz   25
12900    Buenavista   24
19349       Clinton   24
59090       Newport   24
77184   Santa Maria   23
77720   San Vicente   23
92062      Victoria   23
71507      Richmond   22
30887    Georgetown   21
75521    San Carlos   21
76926     Santa Ana   21
29256      Franklin   20
76679     San Pedro   20
82832   Springfield   20
32755    Greenville   19
dt[population>100][,.N,by=asciiname][order(-N)][1:20]

        asciiname  N
 1:    San Miguel 31
 2:   San Antonio 31
 3: San Francisco 28
 4:      San Jose 26
 5:    San Isidro 25
 6:    Santa Cruz 25
 7:       Newport 24
 8:    Buenavista 24
 9:       Clinton 24
10:   San Vicente 23
11:   Santa Maria 23
12:      Victoria 23
13:      Richmond 22
14:     Santa Ana 21
15:    San Carlos 21
16:    Georgetown 21
17:     San Pedro 20
18:      Franklin 20
19:   Springfield 20
20:       Windsor 19

Applying value_counts() to a dataframe column returns a series.

df[df.population>100]["asciiname"].value_counts()[:20]

San Antonio      31
San Miguel       31
San Francisco    28
San Jose         26
Santa Cruz       25
San Isidro       25
Newport          24
Clinton          24
Buenavista       24
Santa Maria      23
San Vicente      23
Victoria         23
Richmond         22
San Carlos       21
Santa Ana        21
Georgetown       21
San Pedro        20
Franklin         20
Springfield      20
Greenville       19

Another way is to use the collections.Counter:

import collections
ctr=collections.Counter( df[df.population>100]["asciiname"])
ctr.most_common(20) 

[('San Miguel', 31),
 ('San Antonio', 31),
 ('San Francisco', 28),
 ('San Jose', 26),
 ('Santa Cruz', 25),
 ('San Isidro', 25),
 ('Buenavista', 24),
 ('Newport', 24),
 ('Clinton', 24),
 ('Santa Maria', 23),
 ('San Vicente', 23),
 ('Victoria', 23),
 ('Richmond', 22),
 ('San Carlos', 21),
 ('Santa Ana', 21),
 ('Georgetown', 21),
 ('San Pedro', 20),
 ('Franklin', 20),
 ('Springfield', 20),
 ('Salem', 19)]

postgres

select asciiname,count(1) 
from t_city 
where population>100
group by asciiname 
order by 2 desc 
limit 20

   asciiname   | count 
---------------+-------
 San Antonio   |    31
 San Miguel    |    31
 San Francisco |    28
 San Jose      |    26
 San Isidro    |    25
 Santa Cruz    |    25
 Buenavista    |    24
 Clinton       |    24
 Newport       |    24
 San Vicente   |    23
 Santa Maria   |    23
 Victoria      |    23
 Richmond      |    22
 Santa Ana     |    21
 Georgetown    |    21
 San Carlos    |    21
 Franklin      |    20
 Springfield   |    20
 San Pedro     |    20
 Greenville    |    19
(20 rows)
05_join
20160601

Join data sql style

Data entities used: user and transaction (see section 2a data create ).

merge()

merge(udf,tdf, by.x="id", by.y="uid") 

     id                name  xid amount                date
1  9000    Gerd Abrahamsson 5016    -95 2016-05-09 10:26:05
2  9001     Hanna Andersson 5015   -404 2016-02-20 22:08:23
3  9001     Hanna Andersson 5019   -462 2016-03-09 20:32:35
4  9002     August Bergsten 5006   -153 2016-02-18 17:58:33
5  9002     August Bergsten 5018   -549 2016-04-15 21:44:49
6  9003        Arvid Bohlin 5001    268 2016-01-17 13:37:38
7  9003        Arvid Bohlin 5017    428 2016-03-27 15:30:47
8  9003        Arvid Bohlin 5002    621 2016-02-24 15:36:53
9  9004     Edvard Marklund 5004    720 2016-05-14 16:29:54
10 9004     Edvard Marklund 5020   -339 2016-05-03 17:11:21
11 9005 Ragnhild Brännström 5013   -114 2016-02-06 14:55:10
12 9005 Ragnhild Brännström 5008   -250 2016-02-24 23:14:52
13 9005 Ragnhild Brännström 5014    819 2016-01-18 10:50:20
14 9006        Börje Wallin 5010    549 2016-02-16 14:37:25
15 9007        Otto Byström 5003   -401 2016-01-14 16:43:27
16 9007        Otto Byström 5005   -492 2016-02-24 23:58:57
17 9008     Elise Dahlström 5007    272 2016-05-26 12:00:00
18 9008     Elise Dahlström 5000    498 2016-02-21 06:28:49
19 9008     Elise Dahlström 5009     82 2016-04-20 18:33:25
20 9008     Elise Dahlström 5011   -571 2016-02-28 13:05:33
21 9008     Elise Dahlström 5012    814 2016-03-20 13:29:11

pd.merge()

pd.merge( tdf, udf, how='inner', left_on='uid', right_on='uid')

     xid   uid  amount                 date                 name
0   5000  9008     498  2016-02-21 06:28:49      Elise Dahlström
1   5007  9008     272  2016-05-26 12:00:00      Elise Dahlström
2   5009  9008      82  2016-04-20 18:33:25      Elise Dahlström
3   5011  9008    -571  2016-02-28 13:05:33      Elise Dahlström
4   5012  9008     814  2016-03-20 13:29:11      Elise Dahlström
5   5001  9003     268  2016-01-17 13:37:38         Arvid Bohlin
6   5002  9003     621  2016-02-24 15:36:53         Arvid Bohlin
7   5017  9003     428  2016-03-27 15:30:47         Arvid Bohlin
8   5003  9007    -401  2016-01-14 16:43:27         Otto Byström
9   5005  9007    -492  2016-02-24 23:58:57         Otto Byström
10  5004  9004     720  2016-05-14 16:29:54      Edvard Marklund
11  5020  9004    -339  2016-05-03 17:11:21      Edvard Marklund
12  5006  9002    -153  2016-02-18 17:58:33      August Bergsten
13  5018  9002    -549  2016-04-15 21:44:49      August Bergsten
14  5008  9005    -250  2016-02-24 23:14:52  Ragnhild Brännström
15  5013  9005    -114  2016-02-06 14:55:10  Ragnhild Brännström
16  5014  9005     819  2016-01-18 10:50:20  Ragnhild Brännström
17  5010  9006     549  2016-02-16 14:37:25         Börje Wallin
18  5015  9001    -404  2016-02-20 22:08:23      Hanna Andersson
19  5019  9001    -462  2016-03-09 20:32:35      Hanna Andersson
20  5016  9000     -95  2016-05-09 10:26:05     Gerd Abrahamsson

Join

dmn=> select t.xid, t.amount, t.date, t.uid, u.name 
      from t_transaction t join t_user u 
      on t.uid=u.uid;

 xid  | amount |        date         | uid  |        name         
------+--------+---------------------+------+---------------------
 5000 |    498 | 2016-02-21 06:28:49 | 9008 | Elise Dahlström
 5001 |    268 | 2016-01-17 13:37:38 | 9003 | Arvid Bohlin
 5002 |    621 | 2016-02-24 15:36:53 | 9003 | Arvid Bohlin
 5003 |   -401 | 2016-01-14 16:43:27 | 9007 | Otto Byström
 5004 |    720 | 2016-05-14 16:29:54 | 9004 | Edvard Marklund
 5005 |   -492 | 2016-02-24 23:58:57 | 9007 | Otto Byström
 5006 |   -153 | 2016-02-18 17:58:33 | 9002 | August Bergsten
 5007 |    272 | 2016-05-26 12:00:00 | 9008 | Elise Dahlström
 5008 |   -250 | 2016-02-24 23:14:52 | 9005 | Ragnhild Brännström
 5009 |     82 | 2016-04-20 18:33:25 | 9008 | Elise Dahlström
 5010 |    549 | 2016-02-16 14:37:25 | 9006 | Börje Wallin
 5011 |   -571 | 2016-02-28 13:05:33 | 9008 | Elise Dahlström
 5012 |    814 | 2016-03-20 13:29:11 | 9008 | Elise Dahlström
 5013 |   -114 | 2016-02-06 14:55:10 | 9005 | Ragnhild Brännström
 5014 |    819 | 2016-01-18 10:50:20 | 9005 | Ragnhild Brännström
 5015 |   -404 | 2016-02-20 22:08:23 | 9001 | Hanna Andersson
 5016 |    -95 | 2016-05-09 10:26:05 | 9000 | Gerd Abrahamsson
 5017 |    428 | 2016-03-27 15:30:47 | 9003 | Arvid Bohlin
 5018 |   -549 | 2016-04-15 21:44:49 | 9002 | August Bergsten
 5019 |   -462 | 2016-03-09 20:32:35 | 9001 | Hanna Andersson
 5020 |   -339 | 2016-05-03 17:11:21 | 9004 | Edvard Marklund

Join : RDD.cartesian().filter()

(see section 2a on how to create the RDDs: udf_rdd and tdf_rdd)

Create the Cartesian product of the two RDD's, and then filter out the records with corresponding id's:

val jn_rdd=udf_rdd.cartesian(tdf_rdd).filter( r => r._1.uid==r._2.uid )

This creates an RDD of this type:

jn_rdd: org.apache.spark.rdd.RDD[(Udf, Tdf)] 

Result

jn_rdd.collect().foreach(println)

(Udf(9002,August Bergsten),Tdf(5006,9002,-153.0,2016-02-18 17:58:33.0))
(Udf(9003,Arvid Bohlin),Tdf(5001,9003,268.0,2016-01-17 13:37:38.0))
(Udf(9003,Arvid Bohlin),Tdf(5002,9003,621.0,2016-02-24 15:36:53.0))
(Udf(9004,Edvard Marklund),Tdf(5004,9004,720.0,2016-05-14 16:29:54.0))
(Udf(9000,Gerd Abrahamsson),Tdf(5016,9000,-95.0,2016-05-09 10:26:05.0))
(Udf(9001,Hanna Andersson),Tdf(5015,9001,-404.0,2016-02-20 22:08:23.0))
(Udf(9001,Hanna Andersson),Tdf(5019,9001,-462.0,2016-03-09 20:32:35.0))
(Udf(9002,August Bergsten),Tdf(5018,9002,-549.0,2016-04-15 21:44:49.0))
(Udf(9003,Arvid Bohlin),Tdf(5017,9003,428.0,2016-03-27 15:30:47.0))
(Udf(9004,Edvard Marklund),Tdf(5020,9004,-339.0,2016-05-03 17:11:21.0))
(Udf(9005,Ragnhild Brännström),Tdf(5008,9005,-250.0,2016-02-24 23:14:52.0))
(Udf(9006,Börje Wallin),Tdf(5010,9006,549.0,2016-02-16 14:37:25.0))
(Udf(9007,Otto Byström),Tdf(5003,9007,-401.0,2016-01-14 16:43:27.0))
(Udf(9007,Otto Byström),Tdf(5005,9007,-492.0,2016-02-24 23:58:57.0))
(Udf(9008,Elise Dahlström),Tdf(5000,9008,498.0,2016-02-21 06:28:49.0))
(Udf(9008,Elise Dahlström),Tdf(5007,9008,272.0,2016-05-26 12:00:00.0))
(Udf(9008,Elise Dahlström),Tdf(5009,9008,82.0,2016-04-20 18:33:25.0))
(Udf(9005,Ragnhild Brännström),Tdf(5013,9005,-114.0,2016-02-06 14:55:10.0))
(Udf(9005,Ragnhild Brännström),Tdf(5014,9005,819.0,2016-01-18 10:50:20.0))
(Udf(9008,Elise Dahlström),Tdf(5011,9008,-571.0,2016-02-28 13:05:33.0))
(Udf(9008,Elise Dahlström),Tdf(5012,9008,814.0,2016-03-20 13:29:11.0))

Dataframe.join()

val jn_df=udf_df.join( tdf_df, udf_df("uid")===tdf_df("uid") )

Type:

org.apache.spark.sql.DataFrame = 
    [uid: int, name: string, xid: int, uid: int, amount: double, date: timestamp]

Note the duplicate uid column!

Content of the dataframe:

jn_df.collect().foreach(println)

[9000,Gerd Abrahamsson,5016,9000,-95.0,2016-05-09 10:26:05.0]                   
[9001,Hanna Andersson,5015,9001,-404.0,2016-02-20 22:08:23.0]
[9001,Hanna Andersson,5019,9001,-462.0,2016-03-09 20:32:35.0]
[9002,August Bergsten,5006,9002,-153.0,2016-02-18 17:58:33.0]
[9002,August Bergsten,5018,9002,-549.0,2016-04-15 21:44:49.0]
[9003,Arvid Bohlin,5001,9003,268.0,2016-01-17 13:37:38.0]
[9003,Arvid Bohlin,5002,9003,621.0,2016-02-24 15:36:53.0]
[9003,Arvid Bohlin,5017,9003,428.0,2016-03-27 15:30:47.0]
[9004,Edvard Marklund,5004,9004,720.0,2016-05-14 16:29:54.0]
[9004,Edvard Marklund,5020,9004,-339.0,2016-05-03 17:11:21.0]
[9005,Ragnhild Brännström,5008,9005,-250.0,2016-02-24 23:14:52.0]
[9005,Ragnhild Brännström,5013,9005,-114.0,2016-02-06 14:55:10.0]
[9005,Ragnhild Brännström,5014,9005,819.0,2016-01-18 10:50:20.0]
[9006,Börje Wallin,5010,9006,549.0,2016-02-16 14:37:25.0]
[9007,Otto Byström,5003,9007,-401.0,2016-01-14 16:43:27.0]
[9007,Otto Byström,5005,9007,-492.0,2016-02-24 23:58:57.0]
[9008,Elise Dahlström,5000,9008,498.0,2016-02-21 06:28:49.0]
[9008,Elise Dahlström,5007,9008,272.0,2016-05-26 12:00:00.0]
[9008,Elise Dahlström,5009,9008,82.0,2016-04-20 18:33:25.0]
[9008,Elise Dahlström,5011,9008,-571.0,2016-02-28 13:05:33.0]
[9008,Elise Dahlström,5012,9008,814.0,2016-03-20 13:29:11.0]

Join

Very similar to plain SQL:

val rs= sx.sql("""
      select t.xid, t.amount, t.date, t.uid, u.name 
      from t_tdf t join t_udf u 
      on t.uid=u.uid
      """)

Type:

org.apache.spark.sql.DataFrame = 
    [xid: int, amount: double, date: timestamp, uid: int, name: string]

Content:

rs.collect().foreach(println)

[5016,-95.0,2016-05-09 10:26:05.0,9000,Gerd Abrahamsson]
[5015,-404.0,2016-02-20 22:08:23.0,9001,Hanna Andersson]
[5019,-462.0,2016-03-09 20:32:35.0,9001,Hanna Andersson]
[5006,-153.0,2016-02-18 17:58:33.0,9002,August Bergsten]
[5018,-549.0,2016-04-15 21:44:49.0,9002,August Bergsten]
[5001,268.0,2016-01-17 13:37:38.0,9003,Arvid Bohlin]
[5002,621.0,2016-02-24 15:36:53.0,9003,Arvid Bohlin]
[5017,428.0,2016-03-27 15:30:47.0,9003,Arvid Bohlin]
[5004,720.0,2016-05-14 16:29:54.0,9004,Edvard Marklund]
[5020,-339.0,2016-05-03 17:11:21.0,9004,Edvard Marklund]
[5008,-250.0,2016-02-24 23:14:52.0,9005,Ragnhild Brännström]
[5013,-114.0,2016-02-06 14:55:10.0,9005,Ragnhild Brännström]
[5014,819.0,2016-01-18 10:50:20.0,9005,Ragnhild Brännström]
[5010,549.0,2016-02-16 14:37:25.0,9006,Börje Wallin]
[5003,-401.0,2016-01-14 16:43:27.0,9007,Otto Byström]
[5005,-492.0,2016-02-24 23:58:57.0,9007,Otto Byström]
[5000,498.0,2016-02-21 06:28:49.0,9008,Elise Dahlström]
[5007,272.0,2016-05-26 12:00:00.0,9008,Elise Dahlström]
[5009,82.0,2016-04-20 18:33:25.0,9008,Elise Dahlström]
[5011,-571.0,2016-02-28 13:05:33.0,9008,Elise Dahlström]
[5012,814.0,2016-03-20 13:29:11.0,9008,Elise Dahlström]

..

Aggregate with date range

Join the user entities and transaction entities, and sum up the transaction amounts per user for February. Order the output by amount.

# merge
jdf=merge(udf,tdf, by.x="uid", by.y="uid")

# range of dates
startdate<-as.POSIXct("2016-02-01")
enddate<-as.POSIXct("2016-02-29")

# subset 
sb<-jdf[jdf$date>=startdate & jdf$date<=enddate,c('name','amount')]

# aggregate
ag<-aggregate(sb$amount, by=list(group=sb$name),sum)
ag[order(ag$x),]

Result:

                group    x
6        Otto Byström -492
5     Hanna Andersson -404
7 Ragnhild Brännström -364
2     August Bergsten -153
4     Elise Dahlström  -73
3        Börje Wallin  549
1        Arvid Bohlin  621
from datetime import datetime

# join
jdf=pd.merge( tdf, udf, how='inner', left_on='uid', right_on='uid')

# date range
startdate=datetime.strptime('2016-02-01','%Y-%m-%d')
enddate=datetime.strptime('2016-02-29','%Y-%m-%d')

# aggregate
ag=jdf[((jdf.date >= startdate) & (jdf.date <=enddate))][[ 'name','amount']].groupby('name').sum()

# sort by amount  
ag.sort_values(by='amount') 

Result:

name                 amount
Otto Byström           -492
Hanna Andersson        -404
Ragnhild Brännström    -364
August Bergsten        -153
Elise Dahlström         -73
Börje Wallin            549
Arvid Bohlin            621

Query:

select u.name,sum(t.amount) as sum
from t_transaction t join t_user u on t.uid=u.uid
where t.date between '2016-02-01' and '2016-02-29'
group by u.name
order by sum;

Result:

        name         | sum  
---------------------+------
 Otto Byström        | -492
 Hanna Andersson     | -404
 Ragnhild Brännström | -364
 August Bergsten     | -153
 Elise Dahlström     |  -73
 Börje Wallin        |  549
 Arvid Bohlin        |  621
(7 rows)

Join:

val jn_rdd=udf_rdd.cartesian(tdf_rdd).filter( r => r._1.uid==r._2.uid )

Date-range:

val fmt=new java.text.SimpleDateFormat("yyyy-MM-dd") 
val t1=new java.sql.Timestamp( fmt.parse("2016-02-01").getTime()) 
val t2=new java.sql.Timestamp( fmt.parse("2016-02-29").getTime()) 

Aggregate:

jn_rdd.filter( r=> ( r._2.date.after(t1) && r._2.date.before(t2)) ).
       map( r => ( r._1.name, r._2.amount) ).
       reduceByKey( (_+_) ).
       sortBy( _._2 ).collect().foreach(println) 

Result:

(Otto Byström,-492.0)
(Hanna Andersson,-404.0)
(Ragnhild Brännström,-364.0)
(August Bergsten,-153.0)
(Elise Dahlström,-73.0)
(Börje Wallin,549.0)
(Arvid Bohlin,621.0)
06_rank
20160601

Top-2 ranking elements within a group

Problem

Suppose you have weekly data, classified by a key (eg. payload=sales and key=store)

 week | key | payload 
------+-----+---------
    5 | A   |   71.02
    5 | B   |   70.93
    5 | B   |   71.16
    5 | E   |   71.77
    6 | B   |   69.66
    6 | F   |   68.67
    7 | B   |   72.45
    7 | C   |   69.91
    7 | D   |   68.22
    7 | F   |   63.73
    7 | G   |    71.7
    8 | B   |   69.86
    8 | C   |   64.04
    8 | E   |      72
    9 | A   |   70.33
   10 | A   |    71.7
   10 | C   |   66.41
   10 | E   |   62.96
   11 | A   |   71.11
   11 | C   |   70.02
   11 | E   |    69.3
   11 | F   |   70.97
   12 | D   |   68.81
   12 | F   |      66

You want the last and penultimate payload value for every key, like this:

 key | last  | penultimate
-----+-------+-------------
 A   | 71.11 |        71.7
 B   | 69.86 |       72.45
 C   | 70.02 |       66.41
 D   | 68.81 |       68.22
 E   |  69.3 |       62.96
 F   |    66 |       70.97

Note:

  • not every 'week' has an entry for every 'key'
  • key G is missing from the final result, because it only has 1 entry

Solution

Database used: Postgres.

Table creation

create table t_weekly ( week int, key varchar(3), payload numeric);

insert into t_weekly(week,key,payload) values(5,'A',71.02);
insert into t_weekly(week,key,payload) values(5,'B',70.93);
insert into t_weekly(week,key,payload) values(5,'B',71.16);
insert into t_weekly(week,key,payload) values(5,'E',71.77);
insert into t_weekly(week,key,payload) values(6,'B',69.66);
insert into t_weekly(week,key,payload) values(6,'F',68.67);
insert into t_weekly(week,key,payload) values(7,'B',72.45);
insert into t_weekly(week,key,payload) values(7,'C',69.91);
insert into t_weekly(week,key,payload) values(7,'D',68.22);
insert into t_weekly(week,key,payload) values(7,'F',63.73);
insert into t_weekly(week,key,payload) values(7,'G',71.7);
insert into t_weekly(week,key,payload) values(8,'B',69.86);
insert into t_weekly(week,key,payload) values(8,'C',64.04);
insert into t_weekly(week,key,payload) values(8,'E',72);
insert into t_weekly(week,key,payload) values(9,'A',70.33);
insert into t_weekly(week,key,payload) values(10,'A',71.7);
insert into t_weekly(week,key,payload) values(10,'C',66.41);
insert into t_weekly(week,key,payload) values(10,'E',62.96);
insert into t_weekly(week,key,payload) values(11,'A',71.11);
insert into t_weekly(week,key,payload) values(11,'C',70.02);
insert into t_weekly(week,key,payload) values(11,'E',69.3);
insert into t_weekly(week,key,payload) values(11,'F',70.97);
insert into t_weekly(week,key,payload) values(12,'D',68.81);
insert into t_weekly(week,key,payload) values(12,'F',66);

Solution

Using the 'rank() over' functionality :

with ranked as ( 
    select week,key,payload,rank() over (partition by key order by week desc) 
    from t_weekly 
) select  r1.key,
          r1.payload as last, 
          r2.payload as penultim 
from   ranked r1 join ranked r2 on r1.key=r2.key 
where  r1.rank=1 
and    r2.rank=2
order by key

Result

 key | last  | penultim 
-----+-------+----------
 A   | 71.11 |     71.7
 B   | 69.86 |    72.45
 C   | 70.02 |    66.41
 D   | 68.81 |    68.22
 E   |  69.3 |    62.96
 F   |    66 |    70.97

Dataframe creation

val week_df=sx.createDataFrame(Seq(
    (5,"A",71.02), (5,"B",70.93), (5,"B",71.16), (5,"E",71.77), (6,"B",69.66), 
    (6,"F",68.67), (7,"B",72.45), (7,"C",69.91), (7,"D",68.22), (7,"F",63.73), 
    (7,"G",71.7), (8,"B",69.86), (8,"C",64.04), (8,"E",72.0), (9,"A",70.33),
    (10,"A",71.7), (10,"C",66.41), (10,"E",62.96), (11,"A",71.11), (11,"C",70.02),
    (11,"E",69.3), (11,"F",70.97), (12,"D",68.81), (12,"F",66.0))
    ).toDF( "week","key","payload")

Attempt 1: HiveContext needed for window functions?

Experimenting with Window.partitionBy("key").orderBy("week") threw up this error:

Could not resolve window function 'rank'. Note that, using window functions currently 
requires a HiveContext;

Hmm, we don't want to be dependent on Hive. Let's try and solve it a different way.

Attempt 2

Get the top ranking values:

val toprank=week_df.groupBy("key").agg( last("week") ).
                    withColumnRenamed("last(week)()","toprank")

Now filter out these records from the original dataframe, and then do the 'toprank' again to get the 'secondrank':

val week_excltop_df=week_df.join(toprank,Seq("key"),"leftouter").filter("week!=toprank")

Get the second-ranking values:

val secondrank=week_excltop_df.groupBy("key").agg( last("week") ).
                               withColumnRenamed("last(week)()","secondrank")

Turn the ranks into values:

val s1=week_df.join( toprank,Seq("key"),"leftouter").
               where("week=toprank").
               select("key","payload"). 
               withColumnRenamed("payload","final")

val s2=week_df.join( secondrank,Seq("key"),"leftouter").
               where("week=secondrank").
               select("key","payload").
               withColumnRenamed("payload","penultim")

And join s1 and s2 together, to get the final result:

s1.join(s2, Seq("key"),"inner").show()

+---+-----+--------+                                                            
|key|final|penultim|
+---+-----+--------+
|  A|71.11|    71.7|
|  B|69.86|   72.45|
|  C|70.02|   66.41|
|  D|68.81|   68.22|
|  E| 69.3|   62.96|
|  F| 66.0|   70.97|
+---+-----+--------+

Dataframe creation

df= pd.DataFrame( [ { "week":5, "key":"A", "payload":71.02},
 { "week":5, "key":"B", "payload":70.93}, { "week":5, "key":"B", "payload":71.16},
 { "week":5, "key":"E", "payload":71.77}, { "week":6, "key":"B", "payload":69.66},
 { "week":6, "key":"F", "payload":68.67}, { "week":7, "key":"B", "payload":72.45},
 { "week":7, "key":"C", "payload":69.91}, { "week":7, "key":"D", "payload":68.22},
 { "week":7, "key":"F", "payload":63.73}, { "week":7, "key":"G", "payload":71.7},
 { "week":8, "key":"B", "payload":69.86}, { "week":8, "key":"C", "payload":64.04},
 { "week":8, "key":"E", "payload":72}, { "week":9, "key":"A", "payload":70.33},
 { "week":10, "key":"A", "payload":71.7}, { "week":10, "key":"C", "payload":66.41},
 { "week":10, "key":"E", "payload":62.96}, { "week":11, "key":"A", "payload":71.11},
 { "week":11, "key":"C", "payload":70.02}, { "week":11, "key":"E", "payload":69.3},
 { "week":11, "key":"F", "payload":70.97}, { "week":12, "key":"D", "payload":68.81},
 { "week":12, "key":"F", "payload":66} ] )

Result

Put the rank column into the dataframe:

gb=df.groupby( ['key'])
df['rank']=gb['week'].rank(method='min',ascending=False)

Intermediate result:

   key  payload  week  rank
0    A    71.02     5   4.0
1    B    70.93     5   4.0
2    B    71.16     5   4.0
3    E    71.77     5   4.0
4    B    69.66     6   3.0
5    F    68.67     6   4.0
6    B    72.45     7   2.0
7    C    69.91     7   4.0
8    D    68.22     7   2.0
9    F    63.73     7   3.0
10   G    71.70     7   1.0
11   B    69.86     8   1.0
12   C    64.04     8   3.0
13   E    72.00     8   3.0
14   A    70.33     9   3.0
15   A    71.70    10   2.0

Toprank:

s1=df[df['rank']==1][['key','payload']]
s1.index=s1['key']
s1.columns=['key','last']

Second rank:

s2=df[df['rank']==2][['key','payload']]
s2.index=s2['key']
s2.drop('key',axis=1,inplace=True)  # to avoid it appearing twice in final result
s2.columns=['penultimate']

Join to get the final result:

rs=pd.concat( [s1,s2], axis=1, join='inner')
rs.sort_values(by='key')

Final result:

    key   last  penultimate
key                        
A     A  71.11        71.70
B     B  69.86        72.45
C     C  70.02        66.41
D     D  68.81        68.22
E     E  69.30        62.96
F     F  66.00        70.97

The way shown here is using the dplyr package.

Dataframe creation

df<-read.table(text= gsub( '@','\n', paste0(
    "'week','key','payload'@5,'A',71.02@5,'B',70.93@5,'B',71.16@5,'E',71.77@6,'B',69.66@",
    "6,'F',68.67@7,'B',72.45@7,'C',69.91@7,'D',68.22@7,'F',63.73@7,'G',71.7@8,'B',69.86@",
    "8,'C',64.04@8,'E',72@9,'A',70.33@10,'A',71.7@10,'C',66.41@10,'E',62.96@11,'A',71.11@",
    "11,'C',70.02@11,'E',69.3@11,'F',70.97@12,'D',68.81@12,'F',66@")),sep=",",header=T)

Solution

library(dplyr)

dfr=df %>% group_by(key) %>% mutate(rank = min_rank(week))

s1=dfr[ dfr$rank==1,]
s2=dfr[ dfr$rank==2,]
res=merge (s1,s2, by.x='key',by.y='key')[,c('key','payload.x','payload.y')]
names(res)=c('key','last','penultimate') 

Result:

  key  last penultimate
1   A 71.02       70.33
2   C 69.91       64.04
3   D 68.22       68.81
4   E 71.77       72.00
5   F 68.67       63.73
98_outtake
20160601

OUTTAKE

This was first filed under '02a create data', when I was convinced that it was not possible to create spark objects from columns of values: it is possible to create them from rows of values..

This section is just kept for 'historical reference'...

Create dataframe from arrays

Heres' how to create dataframes udf and tdf from CSV FILES (and not from an array like shown in section 2a)

Unfortunately there is no way to create Spark data objects (rdd's, dataframe's) from arrays. (it is possible to create Spark objects from sequences of rows!)

The easiest is to load them from CSV files. So load the arrays into python, as shown on the python-tab, and then save them to file like this:

udf.to_csv('udf.csv',header=False, index=False,encoding='utf-8')
tdf.to_csv('tdf.csv',header=False, index=False,encoding='utf-8')

Then start up the Spark-shell and run this code to load the RDD's:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import java.time.LocalDate
import java.time.format.DateTimeFormatter

case class Udf( uid:Int, name:String)
case class Tdf( xid:Int, uid:Int, amount:Double, date:java.sql.Timestamp)

val dtfmt0= new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

def parse_udf(line:String) = {
    val _field=line.split(',')
    val uid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val name = if (_field(1).isEmpty) "" else _field(1)
    Udf ( uid, name)
}

def parse_tdf(line:String) = {
    val _field=line.split(',')
    val xid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val uid = if (_field(1).isEmpty) 0 else _field(1).toInt
    val amount = if (_field(2).isEmpty) 0.0 else _field(2).toDouble
    val date = if (_field(3).isEmpty) new java.sql.Timestamp(0l) else new java.sql.Timestamp(dtfmt0.parse(_field(3)).getTime())
    Tdf ( xid, uid, amount, date)
}

// udf ---------------------------------------------
val udf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/udf.csv").
    map(parse_udf(_))

// tdf ---------------------------------------------
val tdf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/tdf.csv").
    map(parse_tdf(_))

Same remark as for Spark RDD: unfortunately there is no way to create Spark data objects from arrays. (it is possible to create Spark objects from sequences of rows!). The easiest is to load them from CSV files. So load the arrays into python, as shown on the python-tab, and then save them to file like this:

udf.to_csv('udf.csv',header=False, index=False,encoding='utf-8')
tdf.to_csv('tdf.csv',header=False, index=False,encoding='utf-8')

Run the following in the Spark shell:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import org.apache.spark.sql.Row
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val udf_schema = StructType(
    StructField("uid",IntegerType,false)  ::
    StructField("name",StringType,false) :: Nil
    )

val tdf_schema = StructType(
    StructField("xid",IntegerType,false)  ::
    StructField("uid",IntegerType,false)  ::
    StructField("amount",DoubleType,false)  ::
    StructField("date",TimestampType,false) :: Nil
    )

val dtfmt0= new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

def parse_udf(line:String) = {
    val _field=line.split(',')
    val uid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val name = if (_field(1).isEmpty) "" else _field(1)
    Row ( uid, name) 
}

def parse_tdf(line:String) = {
    val _field=line.split(',')
    val xid = if (_field(0).isEmpty) 0 else _field(0).toInt
    val uid = if (_field(1).isEmpty) 0 else _field(1).toInt
    val amount = if (_field(2).isEmpty) 0.0 else _field(2).toDouble
    val date = if (_field(3).isEmpty) new java.sql.Timestamp(0l) 
               else new java.sql.Timestamp(dtfmt0.parse(_field(3)).getTime())
    Row ( xid, uid, amount, date)
}


val sx= new org.apache.spark.sql.SQLContext(sc)

// udf ---------------------------------------------
val in_udf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/udf.csv").
    map(parse_udf(_))
val udf_df = sx.createDataFrame(in_udf_rdd, udf_schema)
udf_df.registerTempTable("t_udf")


// tdf ---------------------------------------------
val in_tdf_rdd=sc.textFile("file:///home/dmn/20160613_framed_data/tdf.csv").
    map(parse_tdf(_))
val tdf_df = sx.createDataFrame(in_tdf_rdd, tdf_schema)
tdf_df.registerTempTable("t_tdf")

Et voila: the dataframes udf_df and tdf_df and the relational tables t_udf and t_tdf.

..

Sidenote: create fake data

The above user and transaction data was created using the following script, which employs the fake-factory package ( 'pip install fake-factory') to generated random data

The python code: 

from faker import Factory
import pandas as pd
import random

fake = Factory.create('sv_SE')

# create user data
uid_v=[]
name_v=[]
for i in range(0,9):
    uid_v.append(9000+i)
    name_v.append(fake.name())

# create transaction data
xid_v=[]
uid_v=[]
amount_v=[]
date_v=[]
sign=[-1,1]
for i in range(0,21):
    xid_v.append(5000+i)
    amount_v.append(sign[random.randint(0,1)]*random.randint(80,900))
    uid_v.append(id_v[random.randint(0,len(id_v)-1)])
    date_v.append(str(fake.date_time_this_year()))
99_shorties
20160609

Shorties

Environment options

Options for the environment.

  • list all options: pd.describe_option()
  • set display width: pd.set_option('display.width', 200)
  • set max column width: pd.set_option('display.max_colwidth', 100) (default=50)
  • run a script within IPython's namespace: %run -i yourscript.py

Load a script in the spark-shell

Within spark shell:

:load yourcode.scala

On the CLI:

spark-shell -i yourcode.scala

or:

cat yourcode.scala | spark-shell

History

:history 
 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/frameddata at 2016-10-18 07:18