Framed Data
 
06_rank
20160601

Top-2 ranking elements within a group

Problem

Suppose you have weekly data, classified by a key (eg. payload=sales and key=store)

 week | key | payload 
------+-----+---------
    5 | A   |   71.02
    5 | B   |   70.93
    5 | B   |   71.16
    5 | E   |   71.77
    6 | B   |   69.66
    6 | F   |   68.67
    7 | B   |   72.45
    7 | C   |   69.91
    7 | D   |   68.22
    7 | F   |   63.73
    7 | G   |    71.7
    8 | B   |   69.86
    8 | C   |   64.04
    8 | E   |      72
    9 | A   |   70.33
   10 | A   |    71.7
   10 | C   |   66.41
   10 | E   |   62.96
   11 | A   |   71.11
   11 | C   |   70.02
   11 | E   |    69.3
   11 | F   |   70.97
   12 | D   |   68.81
   12 | F   |      66

You want the last and penultimate payload value for every key, like this:

 key | last  | penultimate
-----+-------+-------------
 A   | 71.11 |        71.7
 B   | 69.86 |       72.45
 C   | 70.02 |       66.41
 D   | 68.81 |       68.22
 E   |  69.3 |       62.96
 F   |    66 |       70.97

Note:

  • not every 'week' has an entry for every 'key'
  • key G is missing from the final result, because it only has 1 entry

Solution

Database used: Postgres.

Table creation

create table t_weekly ( week int, key varchar(3), payload numeric);

insert into t_weekly(week,key,payload) values(5,'A',71.02);
insert into t_weekly(week,key,payload) values(5,'B',70.93);
insert into t_weekly(week,key,payload) values(5,'B',71.16);
insert into t_weekly(week,key,payload) values(5,'E',71.77);
insert into t_weekly(week,key,payload) values(6,'B',69.66);
insert into t_weekly(week,key,payload) values(6,'F',68.67);
insert into t_weekly(week,key,payload) values(7,'B',72.45);
insert into t_weekly(week,key,payload) values(7,'C',69.91);
insert into t_weekly(week,key,payload) values(7,'D',68.22);
insert into t_weekly(week,key,payload) values(7,'F',63.73);
insert into t_weekly(week,key,payload) values(7,'G',71.7);
insert into t_weekly(week,key,payload) values(8,'B',69.86);
insert into t_weekly(week,key,payload) values(8,'C',64.04);
insert into t_weekly(week,key,payload) values(8,'E',72);
insert into t_weekly(week,key,payload) values(9,'A',70.33);
insert into t_weekly(week,key,payload) values(10,'A',71.7);
insert into t_weekly(week,key,payload) values(10,'C',66.41);
insert into t_weekly(week,key,payload) values(10,'E',62.96);
insert into t_weekly(week,key,payload) values(11,'A',71.11);
insert into t_weekly(week,key,payload) values(11,'C',70.02);
insert into t_weekly(week,key,payload) values(11,'E',69.3);
insert into t_weekly(week,key,payload) values(11,'F',70.97);
insert into t_weekly(week,key,payload) values(12,'D',68.81);
insert into t_weekly(week,key,payload) values(12,'F',66);

Solution

Using the 'rank() over' functionality :

with ranked as ( 
    select week,key,payload,rank() over (partition by key order by week desc) 
    from t_weekly 
) select  r1.key,
          r1.payload as last, 
          r2.payload as penultim 
from   ranked r1 join ranked r2 on r1.key=r2.key 
where  r1.rank=1 
and    r2.rank=2
order by key

Result

 key | last  | penultim 
-----+-------+----------
 A   | 71.11 |     71.7
 B   | 69.86 |    72.45
 C   | 70.02 |    66.41
 D   | 68.81 |    68.22
 E   |  69.3 |    62.96
 F   |    66 |    70.97

Dataframe creation

val week_df=sx.createDataFrame(Seq(
    (5,"A",71.02), (5,"B",70.93), (5,"B",71.16), (5,"E",71.77), (6,"B",69.66), 
    (6,"F",68.67), (7,"B",72.45), (7,"C",69.91), (7,"D",68.22), (7,"F",63.73), 
    (7,"G",71.7), (8,"B",69.86), (8,"C",64.04), (8,"E",72.0), (9,"A",70.33),
    (10,"A",71.7), (10,"C",66.41), (10,"E",62.96), (11,"A",71.11), (11,"C",70.02),
    (11,"E",69.3), (11,"F",70.97), (12,"D",68.81), (12,"F",66.0))
    ).toDF( "week","key","payload")

Attempt 1: HiveContext needed for window functions?

Experimenting with Window.partitionBy("key").orderBy("week") threw up this error:

Could not resolve window function 'rank'. Note that, using window functions currently 
requires a HiveContext;

Hmm, we don't want to be dependent on Hive. Let's try and solve it a different way.

Attempt 2

Get the top ranking values:

val toprank=week_df.groupBy("key").agg( last("week") ).
                    withColumnRenamed("last(week)()","toprank")

Now filter out these records from the original dataframe, and then do the 'toprank' again to get the 'secondrank':

val week_excltop_df=week_df.join(toprank,Seq("key"),"leftouter").filter("week!=toprank")

Get the second-ranking values:

val secondrank=week_excltop_df.groupBy("key").agg( last("week") ).
                               withColumnRenamed("last(week)()","secondrank")

Turn the ranks into values:

val s1=week_df.join( toprank,Seq("key"),"leftouter").
               where("week=toprank").
               select("key","payload"). 
               withColumnRenamed("payload","final")

val s2=week_df.join( secondrank,Seq("key"),"leftouter").
               where("week=secondrank").
               select("key","payload").
               withColumnRenamed("payload","penultim")

And join s1 and s2 together, to get the final result:

s1.join(s2, Seq("key"),"inner").show()

+---+-----+--------+                                                            
|key|final|penultim|
+---+-----+--------+
|  A|71.11|    71.7|
|  B|69.86|   72.45|
|  C|70.02|   66.41|
|  D|68.81|   68.22|
|  E| 69.3|   62.96|
|  F| 66.0|   70.97|
+---+-----+--------+

Dataframe creation

df= pd.DataFrame( [ { "week":5, "key":"A", "payload":71.02},
 { "week":5, "key":"B", "payload":70.93}, { "week":5, "key":"B", "payload":71.16},
 { "week":5, "key":"E", "payload":71.77}, { "week":6, "key":"B", "payload":69.66},
 { "week":6, "key":"F", "payload":68.67}, { "week":7, "key":"B", "payload":72.45},
 { "week":7, "key":"C", "payload":69.91}, { "week":7, "key":"D", "payload":68.22},
 { "week":7, "key":"F", "payload":63.73}, { "week":7, "key":"G", "payload":71.7},
 { "week":8, "key":"B", "payload":69.86}, { "week":8, "key":"C", "payload":64.04},
 { "week":8, "key":"E", "payload":72}, { "week":9, "key":"A", "payload":70.33},
 { "week":10, "key":"A", "payload":71.7}, { "week":10, "key":"C", "payload":66.41},
 { "week":10, "key":"E", "payload":62.96}, { "week":11, "key":"A", "payload":71.11},
 { "week":11, "key":"C", "payload":70.02}, { "week":11, "key":"E", "payload":69.3},
 { "week":11, "key":"F", "payload":70.97}, { "week":12, "key":"D", "payload":68.81},
 { "week":12, "key":"F", "payload":66} ] )

Result

Put the rank column into the dataframe:

gb=df.groupby( ['key'])
df['rank']=gb['week'].rank(method='min',ascending=False)

Intermediate result:

   key  payload  week  rank
0    A    71.02     5   4.0
1    B    70.93     5   4.0
2    B    71.16     5   4.0
3    E    71.77     5   4.0
4    B    69.66     6   3.0
5    F    68.67     6   4.0
6    B    72.45     7   2.0
7    C    69.91     7   4.0
8    D    68.22     7   2.0
9    F    63.73     7   3.0
10   G    71.70     7   1.0
11   B    69.86     8   1.0
12   C    64.04     8   3.0
13   E    72.00     8   3.0
14   A    70.33     9   3.0
15   A    71.70    10   2.0

Toprank:

s1=df[df['rank']==1][['key','payload']]
s1.index=s1['key']
s1.columns=['key','last']

Second rank:

s2=df[df['rank']==2][['key','payload']]
s2.index=s2['key']
s2.drop('key',axis=1,inplace=True)  # to avoid it appearing twice in final result
s2.columns=['penultimate']

Join to get the final result:

rs=pd.concat( [s1,s2], axis=1, join='inner')
rs.sort_values(by='key')

Final result:

    key   last  penultimate
key                        
A     A  71.11        71.70
B     B  69.86        72.45
C     C  70.02        66.41
D     D  68.81        68.22
E     E  69.30        62.96
F     F  66.00        70.97

The way shown here is using the dplyr package.

Dataframe creation

df<-read.table(text= gsub( '@','\n', paste0(
    "'week','key','payload'@5,'A',71.02@5,'B',70.93@5,'B',71.16@5,'E',71.77@6,'B',69.66@",
    "6,'F',68.67@7,'B',72.45@7,'C',69.91@7,'D',68.22@7,'F',63.73@7,'G',71.7@8,'B',69.86@",
    "8,'C',64.04@8,'E',72@9,'A',70.33@10,'A',71.7@10,'C',66.41@10,'E',62.96@11,'A',71.11@",
    "11,'C',70.02@11,'E',69.3@11,'F',70.97@12,'D',68.81@12,'F',66@")),sep=",",header=T)

Solution

library(dplyr)

dfr=df %>% group_by(key) %>% mutate(rank = min_rank(week))

s1=dfr[ dfr$rank==1,]
s2=dfr[ dfr$rank==2,]
res=merge (s1,s2, by.x='key',by.y='key')[,c('key','payload.x','payload.y')]
names(res)=c('key','last','penultimate') 

Result:

  key  last penultimate
1   A 71.02       70.33
2   C 69.91       64.04
3   D 68.22       68.81
4   E 71.77       72.00
5   F 68.67       63.73
 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/frameddata at 2016-10-18 07:18