|
Join data sql style
Data entities used: user and transaction (see section 2a data create ).
merge()
merge(udf,tdf, by.x="id", by.y="uid")
id name xid amount date
1 9000 Gerd Abrahamsson 5016 -95 2016-05-09 10:26:05
2 9001 Hanna Andersson 5015 -404 2016-02-20 22:08:23
3 9001 Hanna Andersson 5019 -462 2016-03-09 20:32:35
4 9002 August Bergsten 5006 -153 2016-02-18 17:58:33
5 9002 August Bergsten 5018 -549 2016-04-15 21:44:49
6 9003 Arvid Bohlin 5001 268 2016-01-17 13:37:38
7 9003 Arvid Bohlin 5017 428 2016-03-27 15:30:47
8 9003 Arvid Bohlin 5002 621 2016-02-24 15:36:53
9 9004 Edvard Marklund 5004 720 2016-05-14 16:29:54
10 9004 Edvard Marklund 5020 -339 2016-05-03 17:11:21
11 9005 Ragnhild Brännström 5013 -114 2016-02-06 14:55:10
12 9005 Ragnhild Brännström 5008 -250 2016-02-24 23:14:52
13 9005 Ragnhild Brännström 5014 819 2016-01-18 10:50:20
14 9006 Börje Wallin 5010 549 2016-02-16 14:37:25
15 9007 Otto Byström 5003 -401 2016-01-14 16:43:27
16 9007 Otto Byström 5005 -492 2016-02-24 23:58:57
17 9008 Elise Dahlström 5007 272 2016-05-26 12:00:00
18 9008 Elise Dahlström 5000 498 2016-02-21 06:28:49
19 9008 Elise Dahlström 5009 82 2016-04-20 18:33:25
20 9008 Elise Dahlström 5011 -571 2016-02-28 13:05:33
21 9008 Elise Dahlström 5012 814 2016-03-20 13:29:11
pd.merge()
pd.merge( tdf, udf, how='inner', left_on='uid', right_on='uid')
xid uid amount date name
0 5000 9008 498 2016-02-21 06:28:49 Elise Dahlström
1 5007 9008 272 2016-05-26 12:00:00 Elise Dahlström
2 5009 9008 82 2016-04-20 18:33:25 Elise Dahlström
3 5011 9008 -571 2016-02-28 13:05:33 Elise Dahlström
4 5012 9008 814 2016-03-20 13:29:11 Elise Dahlström
5 5001 9003 268 2016-01-17 13:37:38 Arvid Bohlin
6 5002 9003 621 2016-02-24 15:36:53 Arvid Bohlin
7 5017 9003 428 2016-03-27 15:30:47 Arvid Bohlin
8 5003 9007 -401 2016-01-14 16:43:27 Otto Byström
9 5005 9007 -492 2016-02-24 23:58:57 Otto Byström
10 5004 9004 720 2016-05-14 16:29:54 Edvard Marklund
11 5020 9004 -339 2016-05-03 17:11:21 Edvard Marklund
12 5006 9002 -153 2016-02-18 17:58:33 August Bergsten
13 5018 9002 -549 2016-04-15 21:44:49 August Bergsten
14 5008 9005 -250 2016-02-24 23:14:52 Ragnhild Brännström
15 5013 9005 -114 2016-02-06 14:55:10 Ragnhild Brännström
16 5014 9005 819 2016-01-18 10:50:20 Ragnhild Brännström
17 5010 9006 549 2016-02-16 14:37:25 Börje Wallin
18 5015 9001 -404 2016-02-20 22:08:23 Hanna Andersson
19 5019 9001 -462 2016-03-09 20:32:35 Hanna Andersson
20 5016 9000 -95 2016-05-09 10:26:05 Gerd Abrahamsson
Join
dmn=> select t.xid, t.amount, t.date, t.uid, u.name
from t_transaction t join t_user u
on t.uid=u.uid;
xid | amount | date | uid | name
------+--------+---------------------+------+---------------------
5000 | 498 | 2016-02-21 06:28:49 | 9008 | Elise Dahlström
5001 | 268 | 2016-01-17 13:37:38 | 9003 | Arvid Bohlin
5002 | 621 | 2016-02-24 15:36:53 | 9003 | Arvid Bohlin
5003 | -401 | 2016-01-14 16:43:27 | 9007 | Otto Byström
5004 | 720 | 2016-05-14 16:29:54 | 9004 | Edvard Marklund
5005 | -492 | 2016-02-24 23:58:57 | 9007 | Otto Byström
5006 | -153 | 2016-02-18 17:58:33 | 9002 | August Bergsten
5007 | 272 | 2016-05-26 12:00:00 | 9008 | Elise Dahlström
5008 | -250 | 2016-02-24 23:14:52 | 9005 | Ragnhild Brännström
5009 | 82 | 2016-04-20 18:33:25 | 9008 | Elise Dahlström
5010 | 549 | 2016-02-16 14:37:25 | 9006 | Börje Wallin
5011 | -571 | 2016-02-28 13:05:33 | 9008 | Elise Dahlström
5012 | 814 | 2016-03-20 13:29:11 | 9008 | Elise Dahlström
5013 | -114 | 2016-02-06 14:55:10 | 9005 | Ragnhild Brännström
5014 | 819 | 2016-01-18 10:50:20 | 9005 | Ragnhild Brännström
5015 | -404 | 2016-02-20 22:08:23 | 9001 | Hanna Andersson
5016 | -95 | 2016-05-09 10:26:05 | 9000 | Gerd Abrahamsson
5017 | 428 | 2016-03-27 15:30:47 | 9003 | Arvid Bohlin
5018 | -549 | 2016-04-15 21:44:49 | 9002 | August Bergsten
5019 | -462 | 2016-03-09 20:32:35 | 9001 | Hanna Andersson
5020 | -339 | 2016-05-03 17:11:21 | 9004 | Edvard Marklund
Join : RDD.cartesian().filter()
(see section 2a on how to create the RDDs: udf_rdd and tdf_rdd)
Create the Cartesian product of the two RDD's, and then filter out the records with corresponding id's:
val jn_rdd=udf_rdd.cartesian(tdf_rdd).filter( r => r._1.uid==r._2.uid )
This creates an RDD of this type:
jn_rdd: org.apache.spark.rdd.RDD[(Udf, Tdf)]
Result
jn_rdd.collect().foreach(println)
(Udf(9002,August Bergsten),Tdf(5006,9002,-153.0,2016-02-18 17:58:33.0))
(Udf(9003,Arvid Bohlin),Tdf(5001,9003,268.0,2016-01-17 13:37:38.0))
(Udf(9003,Arvid Bohlin),Tdf(5002,9003,621.0,2016-02-24 15:36:53.0))
(Udf(9004,Edvard Marklund),Tdf(5004,9004,720.0,2016-05-14 16:29:54.0))
(Udf(9000,Gerd Abrahamsson),Tdf(5016,9000,-95.0,2016-05-09 10:26:05.0))
(Udf(9001,Hanna Andersson),Tdf(5015,9001,-404.0,2016-02-20 22:08:23.0))
(Udf(9001,Hanna Andersson),Tdf(5019,9001,-462.0,2016-03-09 20:32:35.0))
(Udf(9002,August Bergsten),Tdf(5018,9002,-549.0,2016-04-15 21:44:49.0))
(Udf(9003,Arvid Bohlin),Tdf(5017,9003,428.0,2016-03-27 15:30:47.0))
(Udf(9004,Edvard Marklund),Tdf(5020,9004,-339.0,2016-05-03 17:11:21.0))
(Udf(9005,Ragnhild Brännström),Tdf(5008,9005,-250.0,2016-02-24 23:14:52.0))
(Udf(9006,Börje Wallin),Tdf(5010,9006,549.0,2016-02-16 14:37:25.0))
(Udf(9007,Otto Byström),Tdf(5003,9007,-401.0,2016-01-14 16:43:27.0))
(Udf(9007,Otto Byström),Tdf(5005,9007,-492.0,2016-02-24 23:58:57.0))
(Udf(9008,Elise Dahlström),Tdf(5000,9008,498.0,2016-02-21 06:28:49.0))
(Udf(9008,Elise Dahlström),Tdf(5007,9008,272.0,2016-05-26 12:00:00.0))
(Udf(9008,Elise Dahlström),Tdf(5009,9008,82.0,2016-04-20 18:33:25.0))
(Udf(9005,Ragnhild Brännström),Tdf(5013,9005,-114.0,2016-02-06 14:55:10.0))
(Udf(9005,Ragnhild Brännström),Tdf(5014,9005,819.0,2016-01-18 10:50:20.0))
(Udf(9008,Elise Dahlström),Tdf(5011,9008,-571.0,2016-02-28 13:05:33.0))
(Udf(9008,Elise Dahlström),Tdf(5012,9008,814.0,2016-03-20 13:29:11.0))
Dataframe.join()
val jn_df=udf_df.join( tdf_df, udf_df("uid")===tdf_df("uid") )
Type:
org.apache.spark.sql.DataFrame =
[uid: int, name: string, xid: int, uid: int, amount: double, date: timestamp]
Note the duplicate uid column!
Content of the dataframe:
jn_df.collect().foreach(println)
[9000,Gerd Abrahamsson,5016,9000,-95.0,2016-05-09 10:26:05.0]
[9001,Hanna Andersson,5015,9001,-404.0,2016-02-20 22:08:23.0]
[9001,Hanna Andersson,5019,9001,-462.0,2016-03-09 20:32:35.0]
[9002,August Bergsten,5006,9002,-153.0,2016-02-18 17:58:33.0]
[9002,August Bergsten,5018,9002,-549.0,2016-04-15 21:44:49.0]
[9003,Arvid Bohlin,5001,9003,268.0,2016-01-17 13:37:38.0]
[9003,Arvid Bohlin,5002,9003,621.0,2016-02-24 15:36:53.0]
[9003,Arvid Bohlin,5017,9003,428.0,2016-03-27 15:30:47.0]
[9004,Edvard Marklund,5004,9004,720.0,2016-05-14 16:29:54.0]
[9004,Edvard Marklund,5020,9004,-339.0,2016-05-03 17:11:21.0]
[9005,Ragnhild Brännström,5008,9005,-250.0,2016-02-24 23:14:52.0]
[9005,Ragnhild Brännström,5013,9005,-114.0,2016-02-06 14:55:10.0]
[9005,Ragnhild Brännström,5014,9005,819.0,2016-01-18 10:50:20.0]
[9006,Börje Wallin,5010,9006,549.0,2016-02-16 14:37:25.0]
[9007,Otto Byström,5003,9007,-401.0,2016-01-14 16:43:27.0]
[9007,Otto Byström,5005,9007,-492.0,2016-02-24 23:58:57.0]
[9008,Elise Dahlström,5000,9008,498.0,2016-02-21 06:28:49.0]
[9008,Elise Dahlström,5007,9008,272.0,2016-05-26 12:00:00.0]
[9008,Elise Dahlström,5009,9008,82.0,2016-04-20 18:33:25.0]
[9008,Elise Dahlström,5011,9008,-571.0,2016-02-28 13:05:33.0]
[9008,Elise Dahlström,5012,9008,814.0,2016-03-20 13:29:11.0]
Join
Very similar to plain SQL:
val rs= sx.sql("""
select t.xid, t.amount, t.date, t.uid, u.name
from t_tdf t join t_udf u
on t.uid=u.uid
""")
Type:
org.apache.spark.sql.DataFrame =
[xid: int, amount: double, date: timestamp, uid: int, name: string]
Content:
rs.collect().foreach(println)
[5016,-95.0,2016-05-09 10:26:05.0,9000,Gerd Abrahamsson]
[5015,-404.0,2016-02-20 22:08:23.0,9001,Hanna Andersson]
[5019,-462.0,2016-03-09 20:32:35.0,9001,Hanna Andersson]
[5006,-153.0,2016-02-18 17:58:33.0,9002,August Bergsten]
[5018,-549.0,2016-04-15 21:44:49.0,9002,August Bergsten]
[5001,268.0,2016-01-17 13:37:38.0,9003,Arvid Bohlin]
[5002,621.0,2016-02-24 15:36:53.0,9003,Arvid Bohlin]
[5017,428.0,2016-03-27 15:30:47.0,9003,Arvid Bohlin]
[5004,720.0,2016-05-14 16:29:54.0,9004,Edvard Marklund]
[5020,-339.0,2016-05-03 17:11:21.0,9004,Edvard Marklund]
[5008,-250.0,2016-02-24 23:14:52.0,9005,Ragnhild Brännström]
[5013,-114.0,2016-02-06 14:55:10.0,9005,Ragnhild Brännström]
[5014,819.0,2016-01-18 10:50:20.0,9005,Ragnhild Brännström]
[5010,549.0,2016-02-16 14:37:25.0,9006,Börje Wallin]
[5003,-401.0,2016-01-14 16:43:27.0,9007,Otto Byström]
[5005,-492.0,2016-02-24 23:58:57.0,9007,Otto Byström]
[5000,498.0,2016-02-21 06:28:49.0,9008,Elise Dahlström]
[5007,272.0,2016-05-26 12:00:00.0,9008,Elise Dahlström]
[5009,82.0,2016-04-20 18:33:25.0,9008,Elise Dahlström]
[5011,-571.0,2016-02-28 13:05:33.0,9008,Elise Dahlström]
[5012,814.0,2016-03-20 13:29:11.0,9008,Elise Dahlström]
..
Aggregate with date range
Join the user entities and transaction entities, and sum up the transaction amounts per user for February. Order the output by amount.
# merge
jdf=merge(udf,tdf, by.x="uid", by.y="uid")
# range of dates
startdate<-as.POSIXct("2016-02-01")
enddate<-as.POSIXct("2016-02-29")
# subset
sb<-jdf[jdf$date>=startdate & jdf$date<=enddate,c('name','amount')]
# aggregate
ag<-aggregate(sb$amount, by=list(group=sb$name),sum)
ag[order(ag$x),]
Result:
group x
6 Otto Byström -492
5 Hanna Andersson -404
7 Ragnhild Brännström -364
2 August Bergsten -153
4 Elise Dahlström -73
3 Börje Wallin 549
1 Arvid Bohlin 621
from datetime import datetime
# join
jdf=pd.merge( tdf, udf, how='inner', left_on='uid', right_on='uid')
# date range
startdate=datetime.strptime('2016-02-01','%Y-%m-%d')
enddate=datetime.strptime('2016-02-29','%Y-%m-%d')
# aggregate
ag=jdf[((jdf.date >= startdate) & (jdf.date <=enddate))][[ 'name','amount']].groupby('name').sum()
# sort by amount
ag.sort_values(by='amount')
Result:
name amount
Otto Byström -492
Hanna Andersson -404
Ragnhild Brännström -364
August Bergsten -153
Elise Dahlström -73
Börje Wallin 549
Arvid Bohlin 621
Query:
select u.name,sum(t.amount) as sum
from t_transaction t join t_user u on t.uid=u.uid
where t.date between '2016-02-01' and '2016-02-29'
group by u.name
order by sum;
Result:
name | sum
---------------------+------
Otto Byström | -492
Hanna Andersson | -404
Ragnhild Brännström | -364
August Bergsten | -153
Elise Dahlström | -73
Börje Wallin | 549
Arvid Bohlin | 621
(7 rows)
Join:
val jn_rdd=udf_rdd.cartesian(tdf_rdd).filter( r => r._1.uid==r._2.uid )
Date-range:
val fmt=new java.text.SimpleDateFormat("yyyy-MM-dd")
val t1=new java.sql.Timestamp( fmt.parse("2016-02-01").getTime())
val t2=new java.sql.Timestamp( fmt.parse("2016-02-29").getTime())
Aggregate:
jn_rdd.filter( r=> ( r._2.date.after(t1) && r._2.date.before(t2)) ).
map( r => ( r._1.name, r._2.amount) ).
reduceByKey( (_+_) ).
sortBy( _._2 ).collect().foreach(println)
Result:
(Otto Byström,-492.0)
(Hanna Andersson,-404.0)
(Ragnhild Brännström,-364.0)
(August Bergsten,-153.0)
(Elise Dahlström,-73.0)
(Börje Wallin,549.0)
(Arvid Bohlin,621.0)
| |