Terentia Flores
 
01_intro
20151226

Introduction

Intention

Write a few short programs to transfer waypoint data from an Openstreetmap protobuf file to a Hadoop Distributed File System. Then use Hive to write simple SQL queries on this data. Use the 'user-defined-function' feature of Hive to write custom Java functions that can be used in Hive-SQL.

The road map for this article.

General principle: the further you go, the deeper the detail.

  • Architecture: this high level overview sketches how the data is read from the protobuf file and piped into Redis. Another process then reads the data from Redis and moves it into HDFS. After that the spotlight is on writing Hive queries.
  • Running Hive Queries
  • Ingest: more detailed explanation of the ingestion pipeline, and how to execute it.
  • Hive UDF: how to incorporate User Defined Functions into Hive
  • All the source

Prerequisites

  • a Hadoop cluster and Hive, and a recent version of Java
  • a Redis installation, anywhere on your network
  • a Linux system (anywhere on your network) that can hold a big Openstreetmap PBF file, and a Go language compiler. Does not need to be superfast.

Versions of software used:

  • Debian Linux 8.2
  • Java 1.8.0_66
  • hadoop 2.7.1
  • hive 1.2.1
  • redis 2.8.14

Github repository

The source code can be cloned from github.com/dtmngngnnj/terentiaflores

02_architecture
20151226

Architecture

A light-weight client written in the Go language reads the Openstreetmap protobuf file, and pumps only the waypoints into a Redis database. This can happen on a system on which there is no Hadoop nor Java installed.

Another process (a Java program) reads the data from Redis and places it on HDFS.

Once the data on HDFS, we are ready to fire off some SQL queries on our Hive instance!

When dealing with waypoints, we'd like to know the approximate distance between two waypoints, and for that purpose we write a custom Java function:

1
2
3
4
5
6
7
8
9
10
11
12
13

    public double evaluate(double lat1, double lon1, double lat2, double lon2) {
        // convert to radians
        lat1 = lat1 * Math.PI / 180.0;
        lon1 = lon1 * Math.PI / 180.0;
        lat2 = lat2 * Math.PI / 180.0;
        lon2 = lon2 * Math.PI / 180.0;

        double r = 6371.0; // radius of the earth in kilometer 
        double x = (lon2 - lon1) * Math.cos((lat1+lat2)/2.0);
        double y = (lat2 - lat1);
        return r*Math.sqrt(x*x+y*y);
    } 

It's very easy to incorporate functions such as above in Hive (see further).

Why the Redis middle-man?

If you look at the above diagrams you may ask: why don't you skip the scenic tour via the Redis database, and transfer the data directly from protobuf file to HDFS?

The main reason is that the big protobuf-file (Europe OSM PB-file is 17G) resides on a slow system with a big disk and no Hadoop installation.

Other reasons are:

  • the PB-file is read by a Go-language program, for which a neat, lightweight protobuf-reading library exists. Since it is 'go', the executable has a small footprint and there is no need to install 169 jar files to have it running.
  • Redis is an in-memory database, is blazing fast, and supports hash-sets and queues (and a bunch of other types)
  • in the proposed setup, Redis acts as a buffer, and it is easy to detect the slower of the two processes: the ingest (the go-process pumping into Redis) or the exude (the Java process pumping out). With that knowledge we can try and increase up the performance of the slow leg (eg. make it multi-threaded).
  • the intention is to re-use the code developed in this article in the future, whereby the data source system may range from a tiny Raspberry Pi, mid-range laptop up to big rack servers with the fastest SAS disks.
03_query
20151226

Hive queries

Table with all data: t_europe

For this we assume that the ingest has taken place (details: see next section), and the CSV files have been put under directory '/user/dmn/europe' on HDFS.

First create the table, linked to the CSV files on HDFS:

create external table t_europe 
(
    lat float,
    lon float,
    name string
)
row format delimited
fields terminated by '\t'
stored as textfile
location '/user/dmn/europe'
;

First query: how many waypoints do we have?

select count(1) from t_europe;
..
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 19  Reduce: 1   Cumulative CPU: 171.6 sec   ..
Total MapReduce CPU Time Spent: 2 minutes 51 seconds 600 msec
..
64759895

The 'rough distance' function

For finding out the distance between two waypoints, there is no standard function in Hive, so we write a User Defined Function in Java, compile it, and put it in a jar named 'udf.jar' (more details: see section 'hive_udf'). Then we incorporate it into Hive as follows:

ADD JAR udf.jar;
CREATE TEMPORARY FUNCTION UDF_ROUGH_DISTANCE as 'UdfRoughDistance';

Now we can make use of the function as shown in this query that answers how many waypoints we have in a 10km radius around the Palantine hill (lat,lon= 41.8898803,12.4849976) in Rome:

select count(1) from t_europe
where lat is not null
and lon is not null
and udf_rough_distance(41.8898803,12.4849976, lat,lon) < 10
;

Answer:

37189

Table with subset of data: t_palantine

Let's put the waypoints close to the Palantine hill in a separate table. We also want to have a column 'distance' being the distance between the Palantine hill and all the points. At the same time we also want to filter by 'distance' :

create table t_palantine as
select lat,
    lon,
    udf_rough_distance(41.8898803,12.4849976, lat,lon) as dist_km,
    name
from t_europe
where lat is not null
and   lon is not null
and  udf_rough_distance(41.8898803,12.4849976, lat,lon)<10.0
;

This takes a considerable amount of time: 7 minutes 17 seconds 410 msec. Maybe because the distance has to be calculated twice? Let's use a CTE (a common table expression) in our SQL, to avoid that.

drop table t_palantine;

create table t_palantine as
with ct as (
    select lat,lon,
        udf_rough_distance(41.8898803,12.4849976, lat,lon) as dist_km,
        name
    from t_europe
    where lat is not null
    and   lon is not null
) 
select lat,lon,dist_km,name 
from ct
where dist_km<10
;

This only takes: 73.209 seconds. Big improvement!

Flowers for Terentia

Let's now answer the question: which flower shops are near Cicero's on the Palantine hill?

select round(dist_km,2) as dist_km, name 
from t_palantine 
where lower(name) like '%flor%'
order by dist_km asc
limit 10
;

The result:

0.88    name:Pizzeria Florida shop:bakery phone:+39 06 68803236 operator:Fiori ..
1.19    name:Flor gelato italiano amenity:ice_cream website:www.gelatiflor.it a..
2.16    name:Hotel Floridia tourism:hotel
2.16    website:http://www.hotelfloraroma.com/ operator:Marriott addr:street:Vi..
2.17    name:florian's amenity:restaurant
2.29    name:Vivaio piante shop:florist
3.21    shop:florist
3.3     shop:florist
3.41    shop:florist
3.44    shop:florist
Time taken: 24.486 seconds, Fetched: 10 row(s)

His best bet would be 'Vivaio Piante' at 2.29 km:

(picture belongs to Google maps)

04_ingest
20151226

Ingest

From PBF to Redis: pb2redis.go

The Go program digests the PB-file, filters out the waypoints, and inserts these in chunks into the Redis database.

There are 3 Redis objects:

  • TR_BUFFER: contains the actual data of the file, stored under a key of your choice (here we take the filename as key)
  • TR_QUEUE: after the file is put in the buffer, the key is left-pushed onto the queue, to indicate to the other side (the pulling side) that there is something to pull
  • TR_FLAG : this is to signal to the pulling side, that he should stay connected, and check back in a minute, because we are sending more files.

On start up the first item done is to raise the TR_FLAG. In Redis command language it looks like this:

SADD TR_FLAG RAISED 

At the end of the run, when all data has been handled, the flag is lowered as such:

DEL TR_FLAG 

Snippets of the Go code

Main loop over elements of the OSM PB-file:

94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    for {
        if v, err := d.Decode(); err == io.EOF {
            break
        } else if err != nil {
            fmt.Fprintf(os.Stderr,"Error: %v\n", err)
            os.Exit(1)
        } else {
            switch v := v.(type) {
            case *osmpbf.Node:
                tgs := ""
                // build the tags string
                for k, v := range v.Tags {
                    tgs = tgs + " " + k + ":" + v
                }
                tgs=strings.TrimSpace( strings.Replace(tgs, "\t"," ",-1) )
                if len(tgs)>0 {
                    buf.WriteString(fmt.Sprintf("%f\t%f\t%s\n", v.Lat, v.Lon, tgs) )
                }
                if (buf.Len()>c_chunk_size) {
                    insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
                    buf.Reset()
                    seq+=1
                }
                nc++
            case *osmpbf.Way:
                // Way : ignored
                wc++
            case *osmpbf.Relation:
                // Relation : ignored
                rc++
            default:
                fmt.Fprintf(os.Stderr, "unknown type %T\n", v)
                os.Exit(1)
            }
        }
    }
    // handle the last buffer
    if (buf.Len()>0) {
        insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
    }
    fmt.Printf("# Nodes: %d, Ways: %d, Relations: %d\n", nc, wc, rc)

Function to insert into Redis:

144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
func insert_in_redis(conn redis.Conn, buf bytes.Buffer, key string) (err error) {
    var val int
    // --- sleep if too many entries in TR_QUEUE --------------------------
    for {
        val, err = redis.Int(conn.Do("LLEN", "TR_QUEUE"))
        if (err!=nil) { return }
        if (val>c_max_num_chunk) {
            fmt.Printf(".")
            time.Sleep(time.Duration(c_sleeptime_millis) * time.Millisecond)
        } else {
            fmt.Println()
            break
        }
    }

    // --- put in TR_BUFFER -----------------------------------------------
    val, err = redis.Int(conn.Do("HSET", "TR_BUFFER", key, buf.String()))
    if (err!=nil) { return }
    if (val==0) {
        err=errors.New("Error writing chunk to redis")
        return
    }

    // --- left push the key onto the TR_QUEUE ------------------------------
    val, err = redis.Int(conn.Do("LPUSH", "TR_QUEUE", key))
    if (err!=nil) { fmt.Printf("Error: %v\n",err); return }
    if (val==0) {
        err=errors.New("Error pushing key on queue in redis") 
        return
    }
    fmt.Printf("Chunk %s handled.\n", key)
    return
}

Environment variable REDIS_URL

Both the ingest and exude program consult environment variable REDIS_URL to pick up the host name and port of your Redis server. By default it is taken as "localhost:6379"

Define it as follows on the command line:

export REDIS_URL="myredishost:6379"

Redis2hdfs.java

This program goes into an eternal loop, reading keys from the TR_QUEUE, if any, and pop the corresponding value from the TR_BUFFER. This value is then written to HDFS.

The eternal loop is broken when the TR_FLAG is lowered (ie. gets deleted).

The loop:

35
36
37
38
39
40
41
42
43
44
45
        // keep reading from Redis until the flag is lowered
        do {
            long qlen=jedis.llen("TR_QUEUE");
            if (qlen>0) { 
                pop(jedis,filesystem,outputPath); 
            } else { 
                System.out.print(".");
                sleep(5); 
            } 
        }
        while( jedis.exists("TR_FLAG") );  

The pop-from-Redis, write-to-HDFS function:

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    public static void pop(Jedis jedis, 
                           FileSystem filesystem, 
                           String outputPath
                           ) throws java.io.IOException { 

        // pop 1 value from the queue
        String key=jedis.rpop("TR_QUEUE"); 
        if (key.length()<1) {
            System.err.println("Received empty key");
            return;
        }
        
        // read the corresponding buffer 
        String value=jedis.hget("TR_BUFFER",key);
        if ( (value==null) || (value.length()<1) ) {
            System.err.println("Received empty value");
            return;
        }

        // write value to file on HDFS
        Path outFile = new Path(outputPath+"/"+key);
        if (filesystem.exists(outFile)) 
            throw new IOException("Output already exists");
        try( 
            FSDataOutputStream out = filesystem.create(outFile); 
            BufferedWriter bw = new BufferedWriter(
                                    new OutputStreamWriter(out));
        ) { 
            bw.write(value);
        }
        System.out.println("Wrote: " + outFile);

        // the key/value can now be removed from the buffer 
        long jv=jedis.hdel("TR_BUFFER",key);
        if (jv!=1) {
            System.err.println("HDEL returned "+jv);
            return;
        }
    } //end_pop

Running it

To do the actual data transfer from PDF to HDFS, open up 2 terminal windows, in which you first define the Redis url as follows:

export REDIS_URL="myredishost:6379"

Terminal 1

And then, in one terminal startup the pb2redis process:

./pb2redis <YOUR_OSM_FILE>.pbf

eg.

Terminal 2

.. and in the other one, run the 'compile_run_redis2hdfs.sh' script, passing the directory on HDFS were to store the files as first argument:

compile_run_redis2hdfs.sh '/user/dmn/europe' 

eg.

05_Hive_UDF
20151226

User Defined Functions in Hive

In Hive it is very easy to define your own function:

  • write some Java code
  • wrap it into a JAR
  • add the jar in Hive
  • define a function to your UDF

The user defined hive function of UdfRoughDistance.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.lang.Math;
import org.apache.hadoop.hive.ql.exec.UDF;

public class UdfRoughDistance extends UDF {

    /** Calculate the approximate distance between two points */ 
    public double evaluate(double lat1, double lon1, double lat2, double lon2) {

        // convert to radians
        lat1 = lat1 * Math.PI / 180.0;
        lon1 = lon1 * Math.PI / 180.0;
        lat2 = lat2 * Math.PI / 180.0;
        lon2 = lon2 * Math.PI / 180.0;

        double r = 6371.0; // radius of the earth in kilometer 
        double x = (lon2 - lon1) * Math.cos((lat1+lat2)/2.0);
        double y = (lat2 - lat1);
        return r*Math.sqrt(x*x+y*y);

    } // end evaluate

    /* The above formulas are called the "equirectangular approximation", 
     * to be used for small distances, if performance is more important 
     * than accuracy. 
     * See: http://www.movable-type.co.uk/scripts/latlong.html
     */
}

Once you have setup the proper class path, just compile your java file:

javac UdfRoughDistance.java

.. and create a jar file:

jar cvf udf.jar UdfRoughDistance.class

.. which you incorporate into hive as follows:

ADD JAR udf.jar;
CREATE TEMPORARY FUNCTION UDF_ROUGH_DISTANCE as 'UdfRoughDistance';

My classpath is defined as follows (from script: compile_jar_udf.sh) :

7
8
9
10
11
12
13
14
15
16
17
export HH=/opt/hadoop-2.7.1/share/hadoop
export HI=/opt/apache-hive-1.2.1-bin
export CLASSPATH=$CLASSPATH\
:$HH/common/hadoop-common-2.7.1.jar\
:$HH/hdfs/hadoop-hdfs-2.7.1.jar\
:$HH/mapreduce/lib/*\
:$HH/common/lib/*\
:$HH/tools/lib/*\
:$HI/lib/hive-common-1.2.1.jar\
:$HI/lib/lib/hive-contrib-1.2.1.jar\
:$HI/lib/hive-exec-1.2.1.jar
06_all_source
20151226

All sourcecode

Clone the following source files from this repository: github.com/dtmngngnnj/terentiaflores

pb2redis.go

Prerequisites:

go get github.com/garyburd/redigo/redis
go get github.com/qedus/osmpbf

Compile instructions:

go build pb2redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package main

// BUILD instructions:
//   1. prerequisites
//        go get github.com/garyburd/redigo/redis
//        go get github.com/qedus/osmpbf
//   2. go build pb2redis.go

import (
    "bytes"
    "errors"
    "fmt"
    "github.com/garyburd/redigo/redis"
    "github.com/qedus/osmpbf"
    "io"
    "os"
    "runtime"
    "strings"
    "time"
)

const (
    c_max_num_chunk   = 5         // sleep if there are more than x chunks in the redis db
    c_sleeptime_millis= 30000     // sleeptime: half a minute
    c_chunk_size      = 128000000 // size of a chunk: a bit less than 128 megabyte 
                                  // (128 MB actually is 1.37439e+11 bytes)
)

func main() {

    // need 1 input argument: the OSM pb file
    if len(os.Args)<2 {
        fmt.Fprintf(os.Stderr,"No input filename given!\n")
        os.Exit(1)
    }
    pb_filename := os.Args[1]
    if len(pb_filename)<1 {
        fmt.Fprintf(os.Stderr,"No input filename given!\n")
        os.Exit(1)
    }

    // if no REDIS_URL defined in the env, then use localhost
    redisUrl:=os.Getenv("REDIS_URL")
    if len(redisUrl)==0 {
        redisUrl="localhost:6379"
    }

     // connect to redis
    conn, err := redis.Dial("tcp", redisUrl)
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    defer conn.Close()

    // refuse to start if already data in buffer!
    var val int
    val, err = redis.Int(conn.Do("LLEN", "TR_QUEUE"))
    if (err!=nil) {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    if (val>0) {
        fmt.Fprintf(os.Stderr,"Error: buffer already contains values!\n")
        os.Exit(1)
    }


    // raise flag, to tell the chunk reader on the other side to stay connected
    // --- put in TR_BUFFER -----------------------------------------------
    _, err = conn.Do("SADD", "TR_FLAG", "RAISE" ) 
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }

    // start reading the file
    f, err := os.Open(pb_filename)
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    defer f.Close()

    d := osmpbf.NewDecoder(f)
    err = d.Start(runtime.GOMAXPROCS(-1))
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    var buf bytes.Buffer
    var nc, wc, rc, seq uint64
    seq=100000
    for {
        if v, err := d.Decode(); err == io.EOF {
            break
        } else if err != nil {
            fmt.Fprintf(os.Stderr,"Error: %v\n", err)
            os.Exit(1)
        } else {
            switch v := v.(type) {
            case *osmpbf.Node:
                tgs := ""
                // build the tags string
                for k, v := range v.Tags {
                    tgs = tgs + " " + k + ":" + v
                }
                tgs=strings.TrimSpace( strings.Replace(tgs, "\t"," ",-1) )
                if len(tgs)>0 {
                    buf.WriteString(fmt.Sprintf("%f\t%f\t%s\n", v.Lat, v.Lon, tgs) )
                }
                if (buf.Len()>c_chunk_size) {
                    insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
                    buf.Reset()
                    seq+=1
                }
                nc++
            case *osmpbf.Way:
                // Way : ignored
                wc++
            case *osmpbf.Relation:
                // Relation : ignored
                rc++
            default:
                fmt.Fprintf(os.Stderr, "unknown type %T\n", v)
                os.Exit(1)
            }
        }
    }
    // handle the last buffer
    if (buf.Len()>0) {
        insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
    }
    fmt.Printf("# Nodes: %d, Ways: %d, Relations: %d\n", nc, wc, rc)

    // tell client to stay connected no longer
    _, err = conn.Do("DEL", "TR_FLAG" )
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
    }
}


func insert_in_redis(conn redis.Conn, buf bytes.Buffer, key string) (err error) {
    var val int
    // --- sleep if too many entries in TR_QUEUE --------------------------
    for {
        val, err = redis.Int(conn.Do("LLEN", "TR_QUEUE"))
        if (err!=nil) { return }
        if (val>c_max_num_chunk) {
            fmt.Printf(".")
            time.Sleep(time.Duration(c_sleeptime_millis) * time.Millisecond)
        } else {
            fmt.Println()
            break
        }
    }

    // --- put in TR_BUFFER -----------------------------------------------
    val, err = redis.Int(conn.Do("HSET", "TR_BUFFER", key, buf.String()))
    if (err!=nil) { return }
    if (val==0) {
        err=errors.New("Error writing chunk to redis")
        return
    }

    // --- left push the key onto the TR_QUEUE ------------------------------
    val, err = redis.Int(conn.Do("LPUSH", "TR_QUEUE", key))
    if (err!=nil) { fmt.Printf("Error: %v\n",err); return }
    if (val==0) {
        err=errors.New("Error pushing key on queue in redis") 
        return
    }
    fmt.Printf("Chunk %s handled.\n", key)
    return
}

Redis2hdfs.java

Script to compile and run the Java file. You may need to edit the classpath pointers to jedis (a Redis client for Java) and Hadoop.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash 

#S=$1
S="Redis2hdfs.java"
T=${S%.java}.class
E=${S%.java}

# java:  compile a java file, and run it
export HH=/opt/hadoop-2.7.1/share/hadoop
export CLASSPATH=$CLASSPATH\
:$HH/common/hadoop-common-2.7.1.jar\
:$HH/hdfs/hadoop-hdfs-2.7.1.jar\
:$HH/mapreduce/lib/*\
:$HH/common/lib/*\
:$HH/tools/lib/*\
:/opt/jedis/jedis-2.8.0.jar

echo "."; echo "."; echo "."

# compile
javac $S

# check if class file was produced
if [ ! -e $T ] 
then
    echo "## '$T' doesn't exist, can't run it." 
    exit 1
fi

# if class is younger then source, then execute
S_AGE=`stat -c %Y $S`
T_AGE=`stat -c %Y $T`

if [ $S_AGE -le $T_AGE ] 
then 
    java -cp .:$CLASSPATH $E $*
fi

Java source:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.BufferedWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import redis.clients.jedis.Jedis;

public class Redis2hdfs { 

    public static void main(String[] args) throws java.io.IOException { 

        if (args.length<1) { 
            System.err.println("Need an output path!"); 
            System.exit(1);
        }
        String outputPath=args[0];

        int redisPort=6379;
        String redisHost="localhost";
        String redisUrl = System.getenv("REDIS_URL");
        if (redisUrl!=null) { 
            redisHost=redisUrl.replaceAll(":.*$","");
            redisPort=Integer.parseInt( redisUrl.replaceAll("^.*:",""));
        }
        Jedis jedis=new Jedis(redisHost,redisPort) ; // connect to Redis

        // HDFS setup
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop/etc/hadoop/core-site.xml"));
        conf.addResource(new Path("/opt/hadoop/etc/hadoop/hdfs-site.xml"));
        FileSystem filesystem = FileSystem.get(conf);

        // keep reading from Redis until the flag is lowered
        do {
            long qlen=jedis.llen("TR_QUEUE");
            if (qlen>0) { 
                pop(jedis,filesystem,outputPath); 
            } else { 
                System.out.print(".");
                sleep(5); 
            } 
        }
        while( jedis.exists("TR_FLAG") );  
        jedis.close();
    }

    public static void sleep(int time) { 
        try {
            Thread.sleep( 1000* time) ; 
        } catch (InterruptedException ie) {
            System.err.println("An InterruptedException was thrown in my sleep!");
        }
    }

    // Pop a key from the queue, fetch the corresponding 
    // value from the buffer and write it to HDFS.
    public static void pop(Jedis jedis, 
                           FileSystem filesystem, 
                           String outputPath
                           ) throws java.io.IOException { 

        // pop 1 value from the queue
        String key=jedis.rpop("TR_QUEUE"); 
        if (key.length()<1) {
            System.err.println("Received empty key");
            return;
        }
        
        // read the corresponding buffer 
        String value=jedis.hget("TR_BUFFER",key);
        if ( (value==null) || (value.length()<1) ) {
            System.err.println("Received empty value");
            return;
        }

        // write value to file on HDFS
        Path outFile = new Path(outputPath+"/"+key);
        if (filesystem.exists(outFile)) 
            throw new IOException("Output already exists");
        try( 
            FSDataOutputStream out = filesystem.create(outFile); 
            BufferedWriter bw = new BufferedWriter(
                                    new OutputStreamWriter(out));
        ) { 
            bw.write(value);
        }
        System.out.println("Wrote: " + outFile);

        // the key/value can now be removed from the buffer 
        long jv=jedis.hdel("TR_BUFFER",key);
        if (jv!=1) {
            System.err.println("HDEL returned "+jv);
            return;
        }
    } //end_pop
}

UdfRoughDistance.java

Script to compile the Java file and wrap it into a jar. You may need to edit the classpath pointers to Hadoop and hive.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash 

S=UdfRoughDistance.java
T=${S%.java}.class
E=${S%.java}

export HH=/opt/hadoop-2.7.1/share/hadoop
export HI=/opt/apache-hive-1.2.1-bin
export CLASSPATH=$CLASSPATH\
:$HH/common/hadoop-common-2.7.1.jar\
:$HH/hdfs/hadoop-hdfs-2.7.1.jar\
:$HH/mapreduce/lib/*\
:$HH/common/lib/*\
:$HH/tools/lib/*\
:$HI/lib/hive-common-1.2.1.jar\
:$HI/lib/lib/hive-contrib-1.2.1.jar\
:$HI/lib/hive-exec-1.2.1.jar

echo "."; echo "."; echo "."

# compile
javac $S

# check if class file was produced
if [ ! -e $T ] 
then
    echo "## '$T' doesn't exist, can't JAR it." 
    exit 1
fi

# if class is younger then source, then jar it
S_AGE=`stat -c %Y $S`
T_AGE=`stat -c %Y $T`

if [ $S_AGE -le $T_AGE ] 
then 
    jar cvf udf.jar $T
else
    echo "## class file is older than java source!"
    exit 1
fi

Java source:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.lang.Math;
import org.apache.hadoop.hive.ql.exec.UDF;

public class UdfRoughDistance extends UDF {

    /** Calculate the approximate distance between two points */ 
    public double evaluate(double lat1, double lon1, double lat2, double lon2) {

        // convert to radians
        lat1 = lat1 * Math.PI / 180.0;
        lon1 = lon1 * Math.PI / 180.0;
        lat2 = lat2 * Math.PI / 180.0;
        lon2 = lon2 * Math.PI / 180.0;

        double r = 6371.0; // radius of the earth in kilometer 
        double x = (lon2 - lon1) * Math.cos((lat1+lat2)/2.0);
        double y = (lat2 - lat1);
        return r*Math.sqrt(x*x+y*y);

    } // end evaluate

    /* The above formulas are called the "equirectangular approximation", 
     * to be used for small distances, if performance is more important 
     * than accuracy. 
     * See: http://www.movable-type.co.uk/scripts/latlong.html
     */
}
 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/terentiaflores at 2016-10-18 07:18