|
Ingest
From PBF to Redis: pb2redis.go
The Go program digests the PB-file, filters out the waypoints, and inserts these in chunks into the Redis database.
There are 3 Redis objects:
- TR_BUFFER: contains the actual data of the file, stored under a key of your choice (here we take the filename as key)
- TR_QUEUE: after the file is put in the buffer, the key is left-pushed onto the queue, to indicate to the other side (the pulling side) that there is something to pull
- TR_FLAG : this is to signal to the pulling side, that he should stay connected, and check back in a minute, because we are sending more files.
On start up the first item done is to raise the TR_FLAG. In Redis command language it looks like this:
SADD TR_FLAG RAISED
At the end of the run, when all data has been handled, the flag is lowered as such:
DEL TR_FLAG
Snippets of the Go code
Main loop over elements of the OSM PB-file:
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
| for {
if v, err := d.Decode(); err == io.EOF {
break
} else if err != nil {
fmt.Fprintf(os.Stderr,"Error: %v\n", err)
os.Exit(1)
} else {
switch v := v.(type) {
case *osmpbf.Node:
tgs := ""
// build the tags string
for k, v := range v.Tags {
tgs = tgs + " " + k + ":" + v
}
tgs=strings.TrimSpace( strings.Replace(tgs, "\t"," ",-1) )
if len(tgs)>0 {
buf.WriteString(fmt.Sprintf("%f\t%f\t%s\n", v.Lat, v.Lon, tgs) )
}
if (buf.Len()>c_chunk_size) {
insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
buf.Reset()
seq+=1
}
nc++
case *osmpbf.Way:
// Way : ignored
wc++
case *osmpbf.Relation:
// Relation : ignored
rc++
default:
fmt.Fprintf(os.Stderr, "unknown type %T\n", v)
os.Exit(1)
}
}
}
// handle the last buffer
if (buf.Len()>0) {
insert_in_redis(conn,buf, fmt.Sprintf("%d.csv",seq))
}
fmt.Printf("# Nodes: %d, Ways: %d, Relations: %d\n", nc, wc, rc)
|
Function to insert into Redis:
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
| func insert_in_redis(conn redis.Conn, buf bytes.Buffer, key string) (err error) {
var val int
// --- sleep if too many entries in TR_QUEUE --------------------------
for {
val, err = redis.Int(conn.Do("LLEN", "TR_QUEUE"))
if (err!=nil) { return }
if (val>c_max_num_chunk) {
fmt.Printf(".")
time.Sleep(time.Duration(c_sleeptime_millis) * time.Millisecond)
} else {
fmt.Println()
break
}
}
// --- put in TR_BUFFER -----------------------------------------------
val, err = redis.Int(conn.Do("HSET", "TR_BUFFER", key, buf.String()))
if (err!=nil) { return }
if (val==0) {
err=errors.New("Error writing chunk to redis")
return
}
// --- left push the key onto the TR_QUEUE ------------------------------
val, err = redis.Int(conn.Do("LPUSH", "TR_QUEUE", key))
if (err!=nil) { fmt.Printf("Error: %v\n",err); return }
if (val==0) {
err=errors.New("Error pushing key on queue in redis")
return
}
fmt.Printf("Chunk %s handled.\n", key)
return
}
|
Environment variable REDIS_URL
Both the ingest and exude program consult environment variable REDIS_URL to pick up the host name and port of your Redis server. By default it is taken as "localhost:6379"
Define it as follows on the command line:
export REDIS_URL="myredishost:6379"
Redis2hdfs.java
This program goes into an eternal loop, reading keys from the TR_QUEUE, if any, and pop the corresponding value from the TR_BUFFER. This value is then written to HDFS.
The eternal loop is broken when the TR_FLAG is lowered (ie. gets deleted).
The loop:
35
36
37
38
39
40
41
42
43
44
45
| // keep reading from Redis until the flag is lowered
do {
long qlen=jedis.llen("TR_QUEUE");
if (qlen>0) {
pop(jedis,filesystem,outputPath);
} else {
System.out.print(".");
sleep(5);
}
}
while( jedis.exists("TR_FLAG") );
|
The pop-from-Redis, write-to-HDFS function:
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
| public static void pop(Jedis jedis,
FileSystem filesystem,
String outputPath
) throws java.io.IOException {
// pop 1 value from the queue
String key=jedis.rpop("TR_QUEUE");
if (key.length()<1) {
System.err.println("Received empty key");
return;
}
// read the corresponding buffer
String value=jedis.hget("TR_BUFFER",key);
if ( (value==null) || (value.length()<1) ) {
System.err.println("Received empty value");
return;
}
// write value to file on HDFS
Path outFile = new Path(outputPath+"/"+key);
if (filesystem.exists(outFile))
throw new IOException("Output already exists");
try(
FSDataOutputStream out = filesystem.create(outFile);
BufferedWriter bw = new BufferedWriter(
new OutputStreamWriter(out));
) {
bw.write(value);
}
System.out.println("Wrote: " + outFile);
// the key/value can now be removed from the buffer
long jv=jedis.hdel("TR_BUFFER",key);
if (jv!=1) {
System.err.println("HDEL returned "+jv);
return;
}
} //end_pop
|
Running it
To do the actual data transfer from PDF to HDFS, open up 2 terminal windows, in which you first define the Redis url as follows:
export REDIS_URL="myredishost:6379"
Terminal 1
And then, in one terminal startup the pb2redis process:
./pb2redis <YOUR_OSM_FILE>.pbf
eg.
Terminal 2
.. and in the other one, run the 'compile_run_redis2hdfs.sh' script, passing the directory on HDFS were to store the files as first argument:
compile_run_redis2hdfs.sh '/user/dmn/europe'
eg.
| |