Terentia Flores
 
04_ingest
20151226

Ingest

From PBF to Redis: pb2redis.go

The Go program digests the PB-file, filters out the waypoints, and inserts these in chunks into the Redis database.

There are 3 Redis objects:

  • TR_BUFFER: contains the actual data of the file, stored under a key of your choice (here we take the filename as key)
  • TR_QUEUE: after the file is put in the buffer, the key is left-pushed onto the queue, to indicate to the other side (the pulling side) that there is something to pull
  • TR_FLAG : this is to signal to the pulling side, that he should stay connected, and check back in a minute, because we are sending more files.

On start up the first item done is to raise the TR_FLAG. In Redis command language it looks like this:

SADD TR_FLAG RAISED 

At the end of the run, when all data has been handled, the flag is lowered as such:

DEL TR_FLAG 

Snippets of the Go code

Main loop over elements of the OSM PB-file:

94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    for {
        if v, err := d.Decode(); err == io.EOF {
            break
        } else if err != nil {
            fmt.Fprintf(os.Stderr,"Error: %v\n", err)
            os.Exit(1)
        } else {
            switch v := v.(type) {
            case *osmpbf.Node:
                tgs := ""
                // build the tags string
                for k, v := range v.Tags {
                    tgs = tgs + " " + k + ":" + v
                }
                tgs=strings.TrimSpace( strings.Replace(tgs, "\t"," ",-1) )
                if len(tgs)>0 {
                    buf.WriteString(fmt.Sprintf("%f\t%f\t%s\n", v.Lat, v.Lon, tgs) )
                }
                if (buf.Len()>c_chunk_size) {
                    insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
                    buf.Reset()
                    seq+=1
                }
                nc++
            case *osmpbf.Way:
                // Way : ignored
                wc++
            case *osmpbf.Relation:
                // Relation : ignored
                rc++
            default:
                fmt.Fprintf(os.Stderr, "unknown type %T\n", v)
                os.Exit(1)
            }
        }
    }
    // handle the last buffer
    if (buf.Len()>0) {
        insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
    }
    fmt.Printf("# Nodes: %d, Ways: %d, Relations: %d\n", nc, wc, rc)

Function to insert into Redis:

144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
func insert_in_redis(conn redis.Conn, buf bytes.Buffer, key string) (err error) {
    var val int
    // --- sleep if too many entries in TR_QUEUE --------------------------
    for {
        val, err = redis.Int(conn.Do("LLEN", "TR_QUEUE"))
        if (err!=nil) { return }
        if (val>c_max_num_chunk) {
            fmt.Printf(".")
            time.Sleep(time.Duration(c_sleeptime_millis) * time.Millisecond)
        } else {
            fmt.Println()
            break
        }
    }

    // --- put in TR_BUFFER -----------------------------------------------
    val, err = redis.Int(conn.Do("HSET", "TR_BUFFER", key, buf.String()))
    if (err!=nil) { return }
    if (val==0) {
        err=errors.New("Error writing chunk to redis")
        return
    }

    // --- left push the key onto the TR_QUEUE ------------------------------
    val, err = redis.Int(conn.Do("LPUSH", "TR_QUEUE", key))
    if (err!=nil) { fmt.Printf("Error: %v\n",err); return }
    if (val==0) {
        err=errors.New("Error pushing key on queue in redis") 
        return
    }
    fmt.Printf("Chunk %s handled.\n", key)
    return
}

Environment variable REDIS_URL

Both the ingest and exude program consult environment variable REDIS_URL to pick up the host name and port of your Redis server. By default it is taken as "localhost:6379"

Define it as follows on the command line:

export REDIS_URL="myredishost:6379"

Redis2hdfs.java

This program goes into an eternal loop, reading keys from the TR_QUEUE, if any, and pop the corresponding value from the TR_BUFFER. This value is then written to HDFS.

The eternal loop is broken when the TR_FLAG is lowered (ie. gets deleted).

The loop:

35
36
37
38
39
40
41
42
43
44
45
        // keep reading from Redis until the flag is lowered
        do {
            long qlen=jedis.llen("TR_QUEUE");
            if (qlen>0) { 
                pop(jedis,filesystem,outputPath); 
            } else { 
                System.out.print(".");
                sleep(5); 
            } 
        }
        while( jedis.exists("TR_FLAG") );  

The pop-from-Redis, write-to-HDFS function:

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    public static void pop(Jedis jedis, 
                           FileSystem filesystem, 
                           String outputPath
                           ) throws java.io.IOException { 

        // pop 1 value from the queue
        String key=jedis.rpop("TR_QUEUE"); 
        if (key.length()<1) {
            System.err.println("Received empty key");
            return;
        }
        
        // read the corresponding buffer 
        String value=jedis.hget("TR_BUFFER",key);
        if ( (value==null) || (value.length()<1) ) {
            System.err.println("Received empty value");
            return;
        }

        // write value to file on HDFS
        Path outFile = new Path(outputPath+"/"+key);
        if (filesystem.exists(outFile)) 
            throw new IOException("Output already exists");
        try( 
            FSDataOutputStream out = filesystem.create(outFile); 
            BufferedWriter bw = new BufferedWriter(
                                    new OutputStreamWriter(out));
        ) { 
            bw.write(value);
        }
        System.out.println("Wrote: " + outFile);

        // the key/value can now be removed from the buffer 
        long jv=jedis.hdel("TR_BUFFER",key);
        if (jv!=1) {
            System.err.println("HDEL returned "+jv);
            return;
        }
    } //end_pop

Running it

To do the actual data transfer from PDF to HDFS, open up 2 terminal windows, in which you first define the Redis url as follows:

export REDIS_URL="myredishost:6379"

Terminal 1

And then, in one terminal startup the pb2redis process:

./pb2redis <YOUR_OSM_FILE>.pbf

eg.

Terminal 2

.. and in the other one, run the 'compile_run_redis2hdfs.sh' script, passing the directory on HDFS were to store the files as first argument:

compile_run_redis2hdfs.sh '/user/dmn/europe' 

eg.

 
Notes by Data Munging Ninja. Generated on nini:sync/20151223_datamungingninja/terentiaflores at 2016-10-18 07:18