Terentia Flores
 
06_all_source
20151226

All sourcecode

Clone the following source files from this repository: github.com/dtmngngnnj/terentiaflores

pb2redis.go

Prerequisites:

go get github.com/garyburd/redigo/redis
go get github.com/qedus/osmpbf

Compile instructions:

go build pb2redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package main

// BUILD instructions:
//   1. prerequisites
//        go get github.com/garyburd/redigo/redis
//        go get github.com/qedus/osmpbf
//   2. go build pb2redis.go

import (
    "bytes"
    "errors"
    "fmt"
    "github.com/garyburd/redigo/redis"
    "github.com/qedus/osmpbf"
    "io"
    "os"
    "runtime"
    "strings"
    "time"
)

const (
    c_max_num_chunk   = 5         // sleep if there are more than x chunks in the redis db
    c_sleeptime_millis= 30000     // sleeptime: half a minute
    c_chunk_size      = 128000000 // size of a chunk: a bit less than 128 megabyte 
                                  // (128 MB actually is 1.37439e+11 bytes)
)

func main() {

    // need 1 input argument: the OSM pb file
    if len(os.Args)<2 {
        fmt.Fprintf(os.Stderr,"No input filename given!\n")
        os.Exit(1)
    }
    pb_filename := os.Args[1]
    if len(pb_filename)<1 {
        fmt.Fprintf(os.Stderr,"No input filename given!\n")
        os.Exit(1)
    }

    // if no REDIS_URL defined in the env, then use localhost
    redisUrl:=os.Getenv("REDIS_URL")
    if len(redisUrl)==0 {
        redisUrl="localhost:6379"
    }

     // connect to redis
    conn, err := redis.Dial("tcp", redisUrl)
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    defer conn.Close()

    // refuse to start if already data in buffer!
    var val int
    val, err = redis.Int(conn.Do("LLEN", "TR_QUEUE"))
    if (err!=nil) {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    if (val>0) {
        fmt.Fprintf(os.Stderr,"Error: buffer already contains values!\n")
        os.Exit(1)
    }


    // raise flag, to tell the chunk reader on the other side to stay connected
    // --- put in TR_BUFFER -----------------------------------------------
    _, err = conn.Do("SADD", "TR_FLAG", "RAISE" ) 
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }

    // start reading the file
    f, err := os.Open(pb_filename)
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    defer f.Close()

    d := osmpbf.NewDecoder(f)
    err = d.Start(runtime.GOMAXPROCS(-1))
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
        os.Exit(1)
    }
    var buf bytes.Buffer
    var nc, wc, rc, seq uint64
    seq=100000
    for {
        if v, err := d.Decode(); err == io.EOF {
            break
        } else if err != nil {
            fmt.Fprintf(os.Stderr,"Error: %v\n", err)
            os.Exit(1)
        } else {
            switch v := v.(type) {
            case *osmpbf.Node:
                tgs := ""
                // build the tags string
                for k, v := range v.Tags {
                    tgs = tgs + " " + k + ":" + v
                }
                tgs=strings.TrimSpace( strings.Replace(tgs, "\t"," ",-1) )
                if len(tgs)>0 {
                    buf.WriteString(fmt.Sprintf("%f\t%f\t%s\n", v.Lat, v.Lon, tgs) )
                }
                if (buf.Len()>c_chunk_size) {
                    insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
                    buf.Reset()
                    seq+=1
                }
                nc++
            case *osmpbf.Way:
                // Way : ignored
                wc++
            case *osmpbf.Relation:
                // Relation : ignored
                rc++
            default:
                fmt.Fprintf(os.Stderr, "unknown type %T\n", v)
                os.Exit(1)
            }
        }
    }
    // handle the last buffer
    if (buf.Len()>0) {
        insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
    }
    fmt.Printf("# Nodes: %d, Ways: %d, Relations: %d\n", nc, wc, rc)

    // tell client to stay connected no longer
    _, err = conn.Do("DEL", "TR_FLAG" )
    if err != nil {
        fmt.Fprintf(os.Stderr,"Error: %v\n", err)
    }
}


func insert_in_redis(conn redis.Conn, buf bytes.Buffer, key string) (err error) {
    var val int
    // --- sleep if too many entries in TR_QUEUE --------------------------
    for {
        val, err = redis.Int(conn.Do("LLEN", "TR_QUEUE"))
        if (err!=nil) { return }
        if (val>c_max_num_chunk) {
            fmt.Printf(".")
            time.Sleep(time.Duration(c_sleeptime_millis) * time.Millisecond)
        } else {
            fmt.Println()
            break
        }
    }

    // --- put in TR_BUFFER -----------------------------------------------
    val, err = redis.Int(conn.Do("HSET", "TR_BUFFER", key, buf.String()))
    if (err!=nil) { return }
    if (val==0) {
        err=errors.New("Error writing chunk to redis")
        return
    }

    // --- left push the key onto the TR_QUEUE ------------------------------
    val, err = redis.Int(conn.Do("LPUSH", "TR_QUEUE", key))
    if (err!=nil) { fmt.Printf("Error: %v\n",err); return }
    if (val==0) {
        err=errors.New("Error pushing key on queue in redis") 
        return
    }
    fmt.Printf("Chunk %s handled.\n", key)
    return
}

Redis2hdfs.java

Script to compile and run the Java file. You may need to edit the classpath pointers to jedis (a Redis client for Java) and Hadoop.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash 

#S=$1
S="Redis2hdfs.java"
T=${S%.java}.class
E=${S%.java}

# java:  compile a java file, and run it
export HH=/opt/hadoop-2.7.1/share/hadoop
export CLASSPATH=$CLASSPATH\
:$HH/common/hadoop-common-2.7.1.jar\
:$HH/hdfs/hadoop-hdfs-2.7.1.jar\
:$HH/mapreduce/lib/*\
:$HH/common/lib/*\
:$HH/tools/lib/*\
:/opt/jedis/jedis-2.8.0.jar

echo "."; echo "."; echo "."

# compile
javac $S

# check if class file was produced
if [ ! -e $T ] 
then
    echo "## '$T' doesn't exist, can't run it." 
    exit 1
fi

# if class is younger then source, then execute
S_AGE=`stat -c %Y $S`
T_AGE=`stat -c %Y $T`

if [ $S_AGE -le $T_AGE ] 
then 
    java -cp .:$CLASSPATH $E $*
fi

Java source:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.BufferedWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import redis.clients.jedis.Jedis;

public class Redis2hdfs { 

    public static void main(String[] args) throws java.io.IOException { 

        if (args.length<1) { 
            System.err.println("Need an output path!"); 
            System.exit(1);
        }
        String outputPath=args[0];

        int redisPort=6379;
        String redisHost="localhost";
        String redisUrl = System.getenv("REDIS_URL");
        if (redisUrl!=null) { 
            redisHost=redisUrl.replaceAll(":.*$","");
            redisPort=Integer.parseInt( redisUrl.replaceAll("^.*:",""));
        }
        Jedis jedis=new Jedis(redisHost,redisPort) ; // connect to Redis

        // HDFS setup
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop/etc/hadoop/core-site.xml"));
        conf.addResource(new Path("/opt/hadoop/etc/hadoop/hdfs-site.xml"));
        FileSystem filesystem = FileSystem.get(conf);

        // keep reading from Redis until the flag is lowered
        do {
            long qlen=jedis.llen("TR_QUEUE");
            if (qlen>0) { 
                pop(jedis,filesystem,outputPath); 
            } else { 
                System.out.print(".");
                sleep(5); 
            } 
        }
        while( jedis.exists("TR_FLAG") );  
        jedis.close();
    }

    public static void sleep(int time) { 
        try {
            Thread.sleep( 1000* time) ; 
        } catch (InterruptedException ie) {
            System.err.println("An InterruptedException was thrown in my sleep!");
        }
    }

    // Pop a key from the queue, fetch the corresponding 
    // value from the buffer and write it to HDFS.
    public static void pop(Jedis jedis, 
                           FileSystem filesystem, 
                           String outputPath
                           ) throws java.io.IOException { 

        // pop 1 value from the queue
        String key=jedis.rpop("TR_QUEUE"); 
        if (key.length()<1) {
            System.err.println("Received empty key");
            return;
        }
        
        // read the corresponding buffer 
        String value=jedis.hget("TR_BUFFER",key);
        if ( (value==null) || (value.length()<1) ) {
            System.err.println("Received empty value");
            return;
        }

        // write value to file on HDFS
        Path outFile = new Path(outputPath+"/"+key);
        if (filesystem.exists(outFile)) 
            throw new IOException("Output already exists");
        try( 
            FSDataOutputStream out = filesystem.create(outFile); 
            BufferedWriter bw = new BufferedWriter(
                                    new OutputStreamWriter(out));
        ) { 
            bw.write(value);
        }
        System.out.println("Wrote: " + outFile);

        // the key/value can now be removed from the buffer 
        long jv=jedis.hdel("TR_BUFFER",key);
        if (jv!=1) {
            System.err.println("HDEL returned "+jv);
            return;
        }
    } //end_pop
}

UdfRoughDistance.java

Script to compile the Java file and wrap it into a jar. You may need to edit the classpath pointers to Hadoop and hive.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash 

S=UdfRoughDistance.java
T=${S%.java}.class
E=${S%.java}

export HH=/opt/hadoop-2.7.1/share/hadoop
export HI=/opt/apache-hive-1.2.1-bin
export CLASSPATH=$CLASSPATH\
:$HH/common/hadoop-common-2.7.1.jar\
:$HH/hdfs/hadoop-hdfs-2.7.1.jar\
:$HH/mapreduce/lib/*\
:$HH/common/lib/*\
:$HH/tools/lib/*\
:$HI/lib/hive-common-1.2.1.jar\
:$HI/lib/lib/hive-contrib-1.2.1.jar\
:$HI/lib/hive-exec-1.2.1.jar

echo "."; echo "."; echo "."

# compile
javac $S

# check if class file was produced
if [ ! -e $T ] 
then
    echo "## '$T' doesn't exist, can't JAR it." 
    exit 1
fi

# if class is younger then source, then jar it
S_AGE=`stat -c %Y $S`
T_AGE=`stat -c %Y $T`

if [ $S_AGE -le $T_AGE ] 
then 
    jar cvf udf.jar $T
else
    echo "## class file is older than java source!"
    exit 1
fi

Java source:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.lang.Math;
import org.apache.hadoop.hive.ql.exec.UDF;

public class UdfRoughDistance extends UDF {

    /** Calculate the approximate distance between two points */ 
    public double evaluate(double lat1, double lon1, double lat2, double lon2) {

        // convert to radians
        lat1 = lat1 * Math.PI / 180.0;
        lon1 = lon1 * Math.PI / 180.0;
        lat2 = lat2 * Math.PI / 180.0;
        lon2 = lon2 * Math.PI / 180.0;

        double r = 6371.0; // radius of the earth in kilometer 
        double x = (lon2 - lon1) * Math.cos((lat1+lat2)/2.0);
        double y = (lat2 - lat1);
        return r*Math.sqrt(x*x+y*y);

    } // end evaluate

    /* The above formulas are called the "equirectangular approximation", 
     * to be used for small distances, if performance is more important 
     * than accuracy. 
     * See: http://www.movable-type.co.uk/scripts/latlong.html
     */
}
 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/terentiaflores at 2016-10-18 07:18