
Top2 ranking elements within a group
Problem
Suppose you have weekly data, classified by a key (eg. payload=sales and key=store)
week  key  payload
++
5  A  71.02
5  B  70.93
5  B  71.16
5  E  71.77
6  B  69.66
6  F  68.67
7  B  72.45
7  C  69.91
7  D  68.22
7  F  63.73
7  G  71.7
8  B  69.86
8  C  64.04
8  E  72
9  A  70.33
10  A  71.7
10  C  66.41
10  E  62.96
11  A  71.11
11  C  70.02
11  E  69.3
11  F  70.97
12  D  68.81
12  F  66
You want the last and penultimate payload value for every key, like this:
key  last  penultimate
++
A  71.11  71.7
B  69.86  72.45
C  70.02  66.41
D  68.81  68.22
E  69.3  62.96
F  66  70.97
Note:
 not every 'week' has an entry for every 'key'
 key G is missing from the final result, because it only has 1 entry
Solution
Database used: Postgres.
Table creation
create table t_weekly ( week int, key varchar(3), payload numeric);
insert into t_weekly(week,key,payload) values(5,'A',71.02);
insert into t_weekly(week,key,payload) values(5,'B',70.93);
insert into t_weekly(week,key,payload) values(5,'B',71.16);
insert into t_weekly(week,key,payload) values(5,'E',71.77);
insert into t_weekly(week,key,payload) values(6,'B',69.66);
insert into t_weekly(week,key,payload) values(6,'F',68.67);
insert into t_weekly(week,key,payload) values(7,'B',72.45);
insert into t_weekly(week,key,payload) values(7,'C',69.91);
insert into t_weekly(week,key,payload) values(7,'D',68.22);
insert into t_weekly(week,key,payload) values(7,'F',63.73);
insert into t_weekly(week,key,payload) values(7,'G',71.7);
insert into t_weekly(week,key,payload) values(8,'B',69.86);
insert into t_weekly(week,key,payload) values(8,'C',64.04);
insert into t_weekly(week,key,payload) values(8,'E',72);
insert into t_weekly(week,key,payload) values(9,'A',70.33);
insert into t_weekly(week,key,payload) values(10,'A',71.7);
insert into t_weekly(week,key,payload) values(10,'C',66.41);
insert into t_weekly(week,key,payload) values(10,'E',62.96);
insert into t_weekly(week,key,payload) values(11,'A',71.11);
insert into t_weekly(week,key,payload) values(11,'C',70.02);
insert into t_weekly(week,key,payload) values(11,'E',69.3);
insert into t_weekly(week,key,payload) values(11,'F',70.97);
insert into t_weekly(week,key,payload) values(12,'D',68.81);
insert into t_weekly(week,key,payload) values(12,'F',66);
Solution
Using the 'rank() over' functionality :
with ranked as (
select week,key,payload,rank() over (partition by key order by week desc)
from t_weekly
) select r1.key,
r1.payload as last,
r2.payload as penultim
from ranked r1 join ranked r2 on r1.key=r2.key
where r1.rank=1
and r2.rank=2
order by key
Result
key  last  penultim
++
A  71.11  71.7
B  69.86  72.45
C  70.02  66.41
D  68.81  68.22
E  69.3  62.96
F  66  70.97
Dataframe creation
val week_df=sx.createDataFrame(Seq(
(5,"A",71.02), (5,"B",70.93), (5,"B",71.16), (5,"E",71.77), (6,"B",69.66),
(6,"F",68.67), (7,"B",72.45), (7,"C",69.91), (7,"D",68.22), (7,"F",63.73),
(7,"G",71.7), (8,"B",69.86), (8,"C",64.04), (8,"E",72.0), (9,"A",70.33),
(10,"A",71.7), (10,"C",66.41), (10,"E",62.96), (11,"A",71.11), (11,"C",70.02),
(11,"E",69.3), (11,"F",70.97), (12,"D",68.81), (12,"F",66.0))
).toDF( "week","key","payload")
Attempt 1: HiveContext needed for window functions?
Experimenting with Window.partitionBy("key").orderBy("week") threw up this error:
Could not resolve window function 'rank'. Note that, using window functions currently
requires a HiveContext;
Hmm, we don't want to be dependent on Hive. Let's try and solve it a different way.
Attempt 2
Get the top ranking values:
val toprank=week_df.groupBy("key").agg( last("week") ).
withColumnRenamed("last(week)()","toprank")
Now filter out these records from the original dataframe, and then do the 'toprank' again to get the 'secondrank':
val week_excltop_df=week_df.join(toprank,Seq("key"),"leftouter").filter("week!=toprank")
Get the secondranking values:
val secondrank=week_excltop_df.groupBy("key").agg( last("week") ).
withColumnRenamed("last(week)()","secondrank")
Turn the ranks into values:
val s1=week_df.join( toprank,Seq("key"),"leftouter").
where("week=toprank").
select("key","payload").
withColumnRenamed("payload","final")
val s2=week_df.join( secondrank,Seq("key"),"leftouter").
where("week=secondrank").
select("key","payload").
withColumnRenamed("payload","penultim")
And join s1 and s2 together, to get the final result:
s1.join(s2, Seq("key"),"inner").show()
++++
keyfinalpenultim
++++
 A71.11 71.7
 B69.86 72.45
 C70.02 66.41
 D68.81 68.22
 E 69.3 62.96
 F 66.0 70.97
++++
Dataframe creation
df= pd.DataFrame( [ { "week":5, "key":"A", "payload":71.02},
{ "week":5, "key":"B", "payload":70.93}, { "week":5, "key":"B", "payload":71.16},
{ "week":5, "key":"E", "payload":71.77}, { "week":6, "key":"B", "payload":69.66},
{ "week":6, "key":"F", "payload":68.67}, { "week":7, "key":"B", "payload":72.45},
{ "week":7, "key":"C", "payload":69.91}, { "week":7, "key":"D", "payload":68.22},
{ "week":7, "key":"F", "payload":63.73}, { "week":7, "key":"G", "payload":71.7},
{ "week":8, "key":"B", "payload":69.86}, { "week":8, "key":"C", "payload":64.04},
{ "week":8, "key":"E", "payload":72}, { "week":9, "key":"A", "payload":70.33},
{ "week":10, "key":"A", "payload":71.7}, { "week":10, "key":"C", "payload":66.41},
{ "week":10, "key":"E", "payload":62.96}, { "week":11, "key":"A", "payload":71.11},
{ "week":11, "key":"C", "payload":70.02}, { "week":11, "key":"E", "payload":69.3},
{ "week":11, "key":"F", "payload":70.97}, { "week":12, "key":"D", "payload":68.81},
{ "week":12, "key":"F", "payload":66} ] )
Result
Put the rank column into the dataframe:
gb=df.groupby( ['key'])
df['rank']=gb['week'].rank(method='min',ascending=False)
Intermediate result:
key payload week rank
0 A 71.02 5 4.0
1 B 70.93 5 4.0
2 B 71.16 5 4.0
3 E 71.77 5 4.0
4 B 69.66 6 3.0
5 F 68.67 6 4.0
6 B 72.45 7 2.0
7 C 69.91 7 4.0
8 D 68.22 7 2.0
9 F 63.73 7 3.0
10 G 71.70 7 1.0
11 B 69.86 8 1.0
12 C 64.04 8 3.0
13 E 72.00 8 3.0
14 A 70.33 9 3.0
15 A 71.70 10 2.0
Toprank:
s1=df[df['rank']==1][['key','payload']]
s1.index=s1['key']
s1.columns=['key','last']
Second rank:
s2=df[df['rank']==2][['key','payload']]
s2.index=s2['key']
s2.drop('key',axis=1,inplace=True) # to avoid it appearing twice in final result
s2.columns=['penultimate']
Join to get the final result:
rs=pd.concat( [s1,s2], axis=1, join='inner')
rs.sort_values(by='key')
Final result:
key last penultimate
key
A A 71.11 71.70
B B 69.86 72.45
C C 70.02 66.41
D D 68.81 68.22
E E 69.30 62.96
F F 66.00 70.97
The way shown here is using the dplyr package.
Dataframe creation
df<read.table(text= gsub( '@','\n', paste0(
"'week','key','payload'@5,'A',71.02@5,'B',70.93@5,'B',71.16@5,'E',71.77@6,'B',69.66@",
"6,'F',68.67@7,'B',72.45@7,'C',69.91@7,'D',68.22@7,'F',63.73@7,'G',71.7@8,'B',69.86@",
"8,'C',64.04@8,'E',72@9,'A',70.33@10,'A',71.7@10,'C',66.41@10,'E',62.96@11,'A',71.11@",
"11,'C',70.02@11,'E',69.3@11,'F',70.97@12,'D',68.81@12,'F',66@")),sep=",",header=T)
Solution
library(dplyr)
dfr=df %>% group_by(key) %>% mutate(rank = min_rank(week))
s1=dfr[ dfr$rank==1,]
s2=dfr[ dfr$rank==2,]
res=merge (s1,s2, by.x='key',by.y='key')[,c('key','payload.x','payload.y')]
names(res)=c('key','last','penultimate')
Result:
key last penultimate
1 A 71.02 70.33
2 C 69.91 64.04
3 D 68.22 68.81
4 E 71.77 72.00
5 F 68.67 63.73
 