Framed Data
 
05_join
20160601

Join data sql style

Data entities used: user and transaction (see section 2a data create ).

merge()

merge(udf,tdf, by.x="id", by.y="uid") 

     id                name  xid amount                date
1  9000    Gerd Abrahamsson 5016    -95 2016-05-09 10:26:05
2  9001     Hanna Andersson 5015   -404 2016-02-20 22:08:23
3  9001     Hanna Andersson 5019   -462 2016-03-09 20:32:35
4  9002     August Bergsten 5006   -153 2016-02-18 17:58:33
5  9002     August Bergsten 5018   -549 2016-04-15 21:44:49
6  9003        Arvid Bohlin 5001    268 2016-01-17 13:37:38
7  9003        Arvid Bohlin 5017    428 2016-03-27 15:30:47
8  9003        Arvid Bohlin 5002    621 2016-02-24 15:36:53
9  9004     Edvard Marklund 5004    720 2016-05-14 16:29:54
10 9004     Edvard Marklund 5020   -339 2016-05-03 17:11:21
11 9005 Ragnhild Brännström 5013   -114 2016-02-06 14:55:10
12 9005 Ragnhild Brännström 5008   -250 2016-02-24 23:14:52
13 9005 Ragnhild Brännström 5014    819 2016-01-18 10:50:20
14 9006        Börje Wallin 5010    549 2016-02-16 14:37:25
15 9007        Otto Byström 5003   -401 2016-01-14 16:43:27
16 9007        Otto Byström 5005   -492 2016-02-24 23:58:57
17 9008     Elise Dahlström 5007    272 2016-05-26 12:00:00
18 9008     Elise Dahlström 5000    498 2016-02-21 06:28:49
19 9008     Elise Dahlström 5009     82 2016-04-20 18:33:25
20 9008     Elise Dahlström 5011   -571 2016-02-28 13:05:33
21 9008     Elise Dahlström 5012    814 2016-03-20 13:29:11

pd.merge()

pd.merge( tdf, udf, how='inner', left_on='uid', right_on='uid')

     xid   uid  amount                 date                 name
0   5000  9008     498  2016-02-21 06:28:49      Elise Dahlström
1   5007  9008     272  2016-05-26 12:00:00      Elise Dahlström
2   5009  9008      82  2016-04-20 18:33:25      Elise Dahlström
3   5011  9008    -571  2016-02-28 13:05:33      Elise Dahlström
4   5012  9008     814  2016-03-20 13:29:11      Elise Dahlström
5   5001  9003     268  2016-01-17 13:37:38         Arvid Bohlin
6   5002  9003     621  2016-02-24 15:36:53         Arvid Bohlin
7   5017  9003     428  2016-03-27 15:30:47         Arvid Bohlin
8   5003  9007    -401  2016-01-14 16:43:27         Otto Byström
9   5005  9007    -492  2016-02-24 23:58:57         Otto Byström
10  5004  9004     720  2016-05-14 16:29:54      Edvard Marklund
11  5020  9004    -339  2016-05-03 17:11:21      Edvard Marklund
12  5006  9002    -153  2016-02-18 17:58:33      August Bergsten
13  5018  9002    -549  2016-04-15 21:44:49      August Bergsten
14  5008  9005    -250  2016-02-24 23:14:52  Ragnhild Brännström
15  5013  9005    -114  2016-02-06 14:55:10  Ragnhild Brännström
16  5014  9005     819  2016-01-18 10:50:20  Ragnhild Brännström
17  5010  9006     549  2016-02-16 14:37:25         Börje Wallin
18  5015  9001    -404  2016-02-20 22:08:23      Hanna Andersson
19  5019  9001    -462  2016-03-09 20:32:35      Hanna Andersson
20  5016  9000     -95  2016-05-09 10:26:05     Gerd Abrahamsson

Join

dmn=> select t.xid, t.amount, t.date, t.uid, u.name 
      from t_transaction t join t_user u 
      on t.uid=u.uid;

 xid  | amount |        date         | uid  |        name         
------+--------+---------------------+------+---------------------
 5000 |    498 | 2016-02-21 06:28:49 | 9008 | Elise Dahlström
 5001 |    268 | 2016-01-17 13:37:38 | 9003 | Arvid Bohlin
 5002 |    621 | 2016-02-24 15:36:53 | 9003 | Arvid Bohlin
 5003 |   -401 | 2016-01-14 16:43:27 | 9007 | Otto Byström
 5004 |    720 | 2016-05-14 16:29:54 | 9004 | Edvard Marklund
 5005 |   -492 | 2016-02-24 23:58:57 | 9007 | Otto Byström
 5006 |   -153 | 2016-02-18 17:58:33 | 9002 | August Bergsten
 5007 |    272 | 2016-05-26 12:00:00 | 9008 | Elise Dahlström
 5008 |   -250 | 2016-02-24 23:14:52 | 9005 | Ragnhild Brännström
 5009 |     82 | 2016-04-20 18:33:25 | 9008 | Elise Dahlström
 5010 |    549 | 2016-02-16 14:37:25 | 9006 | Börje Wallin
 5011 |   -571 | 2016-02-28 13:05:33 | 9008 | Elise Dahlström
 5012 |    814 | 2016-03-20 13:29:11 | 9008 | Elise Dahlström
 5013 |   -114 | 2016-02-06 14:55:10 | 9005 | Ragnhild Brännström
 5014 |    819 | 2016-01-18 10:50:20 | 9005 | Ragnhild Brännström
 5015 |   -404 | 2016-02-20 22:08:23 | 9001 | Hanna Andersson
 5016 |    -95 | 2016-05-09 10:26:05 | 9000 | Gerd Abrahamsson
 5017 |    428 | 2016-03-27 15:30:47 | 9003 | Arvid Bohlin
 5018 |   -549 | 2016-04-15 21:44:49 | 9002 | August Bergsten
 5019 |   -462 | 2016-03-09 20:32:35 | 9001 | Hanna Andersson
 5020 |   -339 | 2016-05-03 17:11:21 | 9004 | Edvard Marklund

Join : RDD.cartesian().filter()

(see section 2a on how to create the RDDs: udf_rdd and tdf_rdd)

Create the Cartesian product of the two RDD's, and then filter out the records with corresponding id's:

val jn_rdd=udf_rdd.cartesian(tdf_rdd).filter( r => r._1.uid==r._2.uid )

This creates an RDD of this type:

jn_rdd: org.apache.spark.rdd.RDD[(Udf, Tdf)] 

Result

jn_rdd.collect().foreach(println)

(Udf(9002,August Bergsten),Tdf(5006,9002,-153.0,2016-02-18 17:58:33.0))
(Udf(9003,Arvid Bohlin),Tdf(5001,9003,268.0,2016-01-17 13:37:38.0))
(Udf(9003,Arvid Bohlin),Tdf(5002,9003,621.0,2016-02-24 15:36:53.0))
(Udf(9004,Edvard Marklund),Tdf(5004,9004,720.0,2016-05-14 16:29:54.0))
(Udf(9000,Gerd Abrahamsson),Tdf(5016,9000,-95.0,2016-05-09 10:26:05.0))
(Udf(9001,Hanna Andersson),Tdf(5015,9001,-404.0,2016-02-20 22:08:23.0))
(Udf(9001,Hanna Andersson),Tdf(5019,9001,-462.0,2016-03-09 20:32:35.0))
(Udf(9002,August Bergsten),Tdf(5018,9002,-549.0,2016-04-15 21:44:49.0))
(Udf(9003,Arvid Bohlin),Tdf(5017,9003,428.0,2016-03-27 15:30:47.0))
(Udf(9004,Edvard Marklund),Tdf(5020,9004,-339.0,2016-05-03 17:11:21.0))
(Udf(9005,Ragnhild Brännström),Tdf(5008,9005,-250.0,2016-02-24 23:14:52.0))
(Udf(9006,Börje Wallin),Tdf(5010,9006,549.0,2016-02-16 14:37:25.0))
(Udf(9007,Otto Byström),Tdf(5003,9007,-401.0,2016-01-14 16:43:27.0))
(Udf(9007,Otto Byström),Tdf(5005,9007,-492.0,2016-02-24 23:58:57.0))
(Udf(9008,Elise Dahlström),Tdf(5000,9008,498.0,2016-02-21 06:28:49.0))
(Udf(9008,Elise Dahlström),Tdf(5007,9008,272.0,2016-05-26 12:00:00.0))
(Udf(9008,Elise Dahlström),Tdf(5009,9008,82.0,2016-04-20 18:33:25.0))
(Udf(9005,Ragnhild Brännström),Tdf(5013,9005,-114.0,2016-02-06 14:55:10.0))
(Udf(9005,Ragnhild Brännström),Tdf(5014,9005,819.0,2016-01-18 10:50:20.0))
(Udf(9008,Elise Dahlström),Tdf(5011,9008,-571.0,2016-02-28 13:05:33.0))
(Udf(9008,Elise Dahlström),Tdf(5012,9008,814.0,2016-03-20 13:29:11.0))

Dataframe.join()

val jn_df=udf_df.join( tdf_df, udf_df("uid")===tdf_df("uid") )

Type:

org.apache.spark.sql.DataFrame = 
    [uid: int, name: string, xid: int, uid: int, amount: double, date: timestamp]

Note the duplicate uid column!

Content of the dataframe:

jn_df.collect().foreach(println)

[9000,Gerd Abrahamsson,5016,9000,-95.0,2016-05-09 10:26:05.0]                   
[9001,Hanna Andersson,5015,9001,-404.0,2016-02-20 22:08:23.0]
[9001,Hanna Andersson,5019,9001,-462.0,2016-03-09 20:32:35.0]
[9002,August Bergsten,5006,9002,-153.0,2016-02-18 17:58:33.0]
[9002,August Bergsten,5018,9002,-549.0,2016-04-15 21:44:49.0]
[9003,Arvid Bohlin,5001,9003,268.0,2016-01-17 13:37:38.0]
[9003,Arvid Bohlin,5002,9003,621.0,2016-02-24 15:36:53.0]
[9003,Arvid Bohlin,5017,9003,428.0,2016-03-27 15:30:47.0]
[9004,Edvard Marklund,5004,9004,720.0,2016-05-14 16:29:54.0]
[9004,Edvard Marklund,5020,9004,-339.0,2016-05-03 17:11:21.0]
[9005,Ragnhild Brännström,5008,9005,-250.0,2016-02-24 23:14:52.0]
[9005,Ragnhild Brännström,5013,9005,-114.0,2016-02-06 14:55:10.0]
[9005,Ragnhild Brännström,5014,9005,819.0,2016-01-18 10:50:20.0]
[9006,Börje Wallin,5010,9006,549.0,2016-02-16 14:37:25.0]
[9007,Otto Byström,5003,9007,-401.0,2016-01-14 16:43:27.0]
[9007,Otto Byström,5005,9007,-492.0,2016-02-24 23:58:57.0]
[9008,Elise Dahlström,5000,9008,498.0,2016-02-21 06:28:49.0]
[9008,Elise Dahlström,5007,9008,272.0,2016-05-26 12:00:00.0]
[9008,Elise Dahlström,5009,9008,82.0,2016-04-20 18:33:25.0]
[9008,Elise Dahlström,5011,9008,-571.0,2016-02-28 13:05:33.0]
[9008,Elise Dahlström,5012,9008,814.0,2016-03-20 13:29:11.0]

Join

Very similar to plain SQL:

val rs= sx.sql("""
      select t.xid, t.amount, t.date, t.uid, u.name 
      from t_tdf t join t_udf u 
      on t.uid=u.uid
      """)

Type:

org.apache.spark.sql.DataFrame = 
    [xid: int, amount: double, date: timestamp, uid: int, name: string]

Content:

rs.collect().foreach(println)

[5016,-95.0,2016-05-09 10:26:05.0,9000,Gerd Abrahamsson]
[5015,-404.0,2016-02-20 22:08:23.0,9001,Hanna Andersson]
[5019,-462.0,2016-03-09 20:32:35.0,9001,Hanna Andersson]
[5006,-153.0,2016-02-18 17:58:33.0,9002,August Bergsten]
[5018,-549.0,2016-04-15 21:44:49.0,9002,August Bergsten]
[5001,268.0,2016-01-17 13:37:38.0,9003,Arvid Bohlin]
[5002,621.0,2016-02-24 15:36:53.0,9003,Arvid Bohlin]
[5017,428.0,2016-03-27 15:30:47.0,9003,Arvid Bohlin]
[5004,720.0,2016-05-14 16:29:54.0,9004,Edvard Marklund]
[5020,-339.0,2016-05-03 17:11:21.0,9004,Edvard Marklund]
[5008,-250.0,2016-02-24 23:14:52.0,9005,Ragnhild Brännström]
[5013,-114.0,2016-02-06 14:55:10.0,9005,Ragnhild Brännström]
[5014,819.0,2016-01-18 10:50:20.0,9005,Ragnhild Brännström]
[5010,549.0,2016-02-16 14:37:25.0,9006,Börje Wallin]
[5003,-401.0,2016-01-14 16:43:27.0,9007,Otto Byström]
[5005,-492.0,2016-02-24 23:58:57.0,9007,Otto Byström]
[5000,498.0,2016-02-21 06:28:49.0,9008,Elise Dahlström]
[5007,272.0,2016-05-26 12:00:00.0,9008,Elise Dahlström]
[5009,82.0,2016-04-20 18:33:25.0,9008,Elise Dahlström]
[5011,-571.0,2016-02-28 13:05:33.0,9008,Elise Dahlström]
[5012,814.0,2016-03-20 13:29:11.0,9008,Elise Dahlström]

..

Aggregate with date range

Join the user entities and transaction entities, and sum up the transaction amounts per user for February. Order the output by amount.

# merge
jdf=merge(udf,tdf, by.x="uid", by.y="uid")

# range of dates
startdate<-as.POSIXct("2016-02-01")
enddate<-as.POSIXct("2016-02-29")

# subset 
sb<-jdf[jdf$date>=startdate & jdf$date<=enddate,c('name','amount')]

# aggregate
ag<-aggregate(sb$amount, by=list(group=sb$name),sum)
ag[order(ag$x),]

Result:

                group    x
6        Otto Byström -492
5     Hanna Andersson -404
7 Ragnhild Brännström -364
2     August Bergsten -153
4     Elise Dahlström  -73
3        Börje Wallin  549
1        Arvid Bohlin  621
from datetime import datetime

# join
jdf=pd.merge( tdf, udf, how='inner', left_on='uid', right_on='uid')

# date range
startdate=datetime.strptime('2016-02-01','%Y-%m-%d')
enddate=datetime.strptime('2016-02-29','%Y-%m-%d')

# aggregate
ag=jdf[((jdf.date >= startdate) & (jdf.date <=enddate))][[ 'name','amount']].groupby('name').sum()

# sort by amount  
ag.sort_values(by='amount') 

Result:

name                 amount
Otto Byström           -492
Hanna Andersson        -404
Ragnhild Brännström    -364
August Bergsten        -153
Elise Dahlström         -73
Börje Wallin            549
Arvid Bohlin            621

Query:

select u.name,sum(t.amount) as sum
from t_transaction t join t_user u on t.uid=u.uid
where t.date between '2016-02-01' and '2016-02-29'
group by u.name
order by sum;

Result:

        name         | sum  
---------------------+------
 Otto Byström        | -492
 Hanna Andersson     | -404
 Ragnhild Brännström | -364
 August Bergsten     | -153
 Elise Dahlström     |  -73
 Börje Wallin        |  549
 Arvid Bohlin        |  621
(7 rows)

Join:

val jn_rdd=udf_rdd.cartesian(tdf_rdd).filter( r => r._1.uid==r._2.uid )

Date-range:

val fmt=new java.text.SimpleDateFormat("yyyy-MM-dd") 
val t1=new java.sql.Timestamp( fmt.parse("2016-02-01").getTime()) 
val t2=new java.sql.Timestamp( fmt.parse("2016-02-29").getTime()) 

Aggregate:

jn_rdd.filter( r=> ( r._2.date.after(t1) && r._2.date.before(t2)) ).
       map( r => ( r._1.name, r._2.amount) ).
       reduceByKey( (_+_) ).
       sortBy( _._2 ).collect().foreach(println) 

Result:

(Otto Byström,-492.0)
(Hanna Andersson,-404.0)
(Ragnhild Brännström,-364.0)
(August Bergsten,-153.0)
(Elise Dahlström,-73.0)
(Börje Wallin,549.0)
(Arvid Bohlin,621.0)
 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/frameddata at 2016-10-18 07:18