Browse Source

Speed up searching

* Decode/search in parallel
* Start decoding the first Blob as soon as possible
* Update multiple markers at once
Johannes Hofmann 7 years ago
parent
commit
2120e9b16f
4 changed files with 164 additions and 43 deletions
  1. 2
    0
      Cargo.lock
  2. 3
    1
      Cargo.toml
  3. 20
    7
      src/main.rs
  4. 139
    35
      src/search.rs

+ 2
- 0
Cargo.lock View File

285
  "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
285
  "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
286
  "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
286
  "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
287
  "log 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
287
  "log 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
288
+ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
288
  "osmpbf 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
289
  "osmpbf 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
289
  "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
290
  "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
290
  "reqwest 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)",
291
  "reqwest 0.8.6 (registry+https://github.com/rust-lang/crates.io-index)",
292
+ "scoped_threadpool 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
291
  "toml 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
293
  "toml 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
292
 ]
294
 ]
293
 
295
 

+ 3
- 1
Cargo.toml View File

13
 [dependencies]
13
 [dependencies]
14
 cgmath = "0.16"
14
 cgmath = "0.16"
15
 clap = "2.29"
15
 clap = "2.29"
16
+directories = "0.10"
16
 env_logger = "0.5.0-rc.2"
17
 env_logger = "0.5.0-rc.2"
17
 gl = "0.10"
18
 gl = "0.10"
18
 glutin = "0.16"
19
 glutin = "0.16"
20
 lazy_static = "1.0"
21
 lazy_static = "1.0"
21
 linked-hash-map = "0.5.0"
22
 linked-hash-map = "0.5.0"
22
 log = "0.4"
23
 log = "0.4"
24
+num_cpus = "1.0"
23
 osmpbf = "0.1"
25
 osmpbf = "0.1"
24
 regex = "1.0"
26
 regex = "1.0"
25
 reqwest = "0.8"
27
 reqwest = "0.8"
28
+scoped_threadpool = "0.1"
26
 toml = "0.4"
29
 toml = "0.4"
27
-directories = "0.10"
28
 
30
 
29
 [build-dependencies]
31
 [build-dependencies]
30
 gl_generator = "0.9"
32
 gl_generator = "0.9"

+ 20
- 7
src/main.rs View File

10
 extern crate linked_hash_map;
10
 extern crate linked_hash_map;
11
 #[macro_use]
11
 #[macro_use]
12
 extern crate log;
12
 extern crate log;
13
+extern crate num_cpus;
13
 extern crate osmpbf;
14
 extern crate osmpbf;
14
 extern crate regex;
15
 extern crate regex;
15
 extern crate reqwest;
16
 extern crate reqwest;
17
+extern crate scoped_threadpool;
16
 extern crate toml;
18
 extern crate toml;
17
 
19
 
18
 pub mod args;
20
 pub mod args;
76
     map: &mut MapViewGl,
78
     map: &mut MapViewGl,
77
     input_state: &mut InputState,
79
     input_state: &mut InputState,
78
     sources: &mut TileSources,
80
     sources: &mut TileSources,
79
-    marker_rx: &mpsc::Receiver<LatLon>,
81
+    marker_rx: &mpsc::Receiver<Vec<LatLon>>,
80
 ) -> Action {
82
 ) -> Action {
81
     match *event {
83
     match *event {
82
         Event::Awakened => {
84
         Event::Awakened => {
83
-            for pos in marker_rx.try_iter() {
85
+            for pos in marker_rx.try_iter().flat_map(|c| c.into_iter()) {
84
                 map.add_marker(pos.into());
86
                 map.add_marker(pos.into());
85
             }
87
             }
86
             Action::Redraw
88
             Action::Redraw
247
     if let (Some(path), Some(pattern)) = (config.pbf_path(), config.search_pattern()) {
249
     if let (Some(path), Some(pattern)) = (config.pbf_path(), config.search_pattern()) {
248
         let proxy = events_loop.create_proxy();
250
         let proxy = events_loop.create_proxy();
249
 
251
 
250
-        search::search_pbf(
252
+        search::par_search(
251
             path,
253
             path,
252
             pattern,
254
             pattern,
253
-            move |latlon| {
254
-                if marker_tx.send(latlon).is_err() {
255
-                    return search::ControlFlow::Break;
255
+            move |coords| {
256
+                if coords.is_empty() {
257
+                    search::ControlFlow::Continue
258
+                } else {
259
+                    if marker_tx.send(coords).is_err() {
260
+                        return search::ControlFlow::Break;
261
+                    }
262
+                    proxy.wakeup().into()
263
+                }
264
+            },
265
+            move |result| {
266
+                if let Err(err) = result {
267
+                    println!("search error: {}", err);
268
+                } else {
269
+                    info!("finished searching");
256
                 }
270
                 }
257
-                proxy.wakeup().into()
258
             },
271
             },
259
         )?;
272
         )?;
260
     }
273
     }

+ 139
- 35
src/search.rs View File

1
-use osmpbf::{Element, ElementReader};
1
+use scoped_threadpool::Pool;
2
+use coord::LatLon;
3
+use osmpbf::{Blob, BlobDecode, BlobReader};
2
 use regex::Regex;
4
 use regex::Regex;
3
 use std::path::{Path, PathBuf};
5
 use std::path::{Path, PathBuf};
6
+use std::sync::mpsc::sync_channel;
4
 use std::thread;
7
 use std::thread;
5
-use coord::LatLon;
6
 
8
 
7
 
9
 
8
 #[derive(Debug, Eq, PartialEq)]
10
 #[derive(Debug, Eq, PartialEq)]
21
     }
23
     }
22
 }
24
 }
23
 
25
 
24
-//TODO Add callbacks for other events: search finished, on error, ...
25
-pub fn search_pbf<P, F>(
26
+enum WorkerMessage {
27
+    PleaseStop,
28
+    DoBlob(Blob),
29
+}
30
+
31
+pub fn par_search<P, F, G>(
26
     pbf_path: P,
32
     pbf_path: P,
27
     search_pattern: &str,
33
     search_pattern: &str,
28
-    update_func: F,
34
+    found_func: F,
35
+    finished_func: G,
29
 ) -> Result<thread::JoinHandle<()>, String>
36
 ) -> Result<thread::JoinHandle<()>, String>
30
 where P: AsRef<Path>,
37
 where P: AsRef<Path>,
31
-      F: Fn(LatLon) -> ControlFlow + Send + 'static,
38
+      F: Fn(Vec<LatLon>) -> ControlFlow + Send + 'static,
39
+      G: Fn(Result<(), String>) + Send + 'static,
32
 {
40
 {
33
-    let pathbuf = PathBuf::from(pbf_path.as_ref());
34
-    let re = Regex::new(search_pattern)
35
-        .map_err(|e| format!("{}", e))?;
36
-    let reader = ElementReader::from_path(&pathbuf)
37
-        .map_err(|e| format!("Failed to read PBF file {:?}: {}", pbf_path.as_ref(), e))?;
38
-
41
+    let pbf_path = PathBuf::from(pbf_path.as_ref());
42
+    let search_pattern = search_pattern.to_string();
39
     let handle = thread::spawn(move|| {
43
     let handle = thread::spawn(move|| {
40
-        reader.for_each(|element| {
41
-            match element {
42
-                Element::Node(node) => {
43
-                    for (_key, val) in node.tags() {
44
-                        if re.is_match(val) {
45
-                            let pos = LatLon::new(node.lat(), node.lon());
46
-                            if update_func(pos) == ControlFlow::Break {
47
-                                return;
44
+        let res = par_search_blocking(pbf_path, &search_pattern, found_func);
45
+        finished_func(res);
46
+    });
47
+
48
+    Ok(handle)
49
+}
50
+
51
+pub fn par_search_blocking<P, F>(
52
+    pbf_path: P,
53
+    search_pattern: &str,
54
+    found_func: F,
55
+) -> Result<(), String>
56
+where P: AsRef<Path>,
57
+      F: Fn(Vec<LatLon>) -> ControlFlow + Send + 'static,
58
+{
59
+    let num_threads = ::num_cpus::get();
60
+    let mut pool = Pool::new(num_threads as u32);
61
+
62
+    pool.scoped(|scope| {
63
+        let re = Regex::new(search_pattern)
64
+            .map_err(|e| format!("{}", e))?;
65
+        let mut reader = BlobReader::from_path(&pbf_path)
66
+            .map_err(|e| format!("{}", e))?;
67
+
68
+        let mut chans = Vec::with_capacity(num_threads);
69
+        let (result_tx, result_rx) = sync_channel::<(usize, Result<Vec<LatLon>, String>)>(0);
70
+
71
+        for thread_id in 0..num_threads {
72
+            let re = re.clone();
73
+            let result_tx = result_tx.clone();
74
+
75
+            let (request_tx, request_rx) = sync_channel::<WorkerMessage>(0);
76
+            chans.push(request_tx);
77
+
78
+            scope.execute(move || {
79
+                for request in request_rx.iter() {
80
+                    match request {
81
+                        WorkerMessage::PleaseStop => return,
82
+                        WorkerMessage::DoBlob(blob) => {
83
+                            let mut matches = vec![];
84
+                            let block = match blob.decode() {
85
+                                Ok(b) => b,
86
+                                Err(err) => {
87
+                                    let _ = result_tx.send((thread_id, Err(format!("{}", err))));
88
+                                    return;
89
+                                }
90
+                            };
91
+                            if let BlobDecode::OsmData(block) = block {
92
+                                for node in block.groups().flat_map(|g| g.nodes()) {
93
+                                    for (_key, val) in node.tags() {
94
+                                        if re.is_match(val) {
95
+                                            let pos = LatLon::new(node.lat(), node.lon());
96
+                                            matches.push(pos);
97
+                                            break;
98
+                                        }
99
+                                    }
100
+                                }
101
+
102
+                                for node in block.groups().flat_map(|g| g.dense_nodes()) {
103
+                                    for (_key, val) in node.tags() {
104
+                                        if re.is_match(val) {
105
+                                            let pos = LatLon::new(node.lat(), node.lon());
106
+                                            matches.push(pos);
107
+                                            break;
108
+                                        }
109
+                                    }
110
+                                }
48
                             }
111
                             }
49
-                            break;
50
-                        }
51
-                    }
52
-                },
53
-                Element::DenseNode(node) => {
54
-                    for (_key, val) in node.tags() {
55
-                        if re.is_match(val) {
56
-                            let pos = LatLon::new(node.lat(), node.lon());
57
-                            if update_func(pos) == ControlFlow::Break {
112
+                            if result_tx.send((thread_id, Ok(matches))).is_err() {
58
                                 return;
113
                                 return;
59
                             }
114
                             }
60
-                            break;
61
                         }
115
                         }
62
-                    }
116
+                    };
117
+                }
118
+            });
119
+        }
120
+
121
+        let mut stopped_threads = 0;
122
+
123
+        // send initial message to each worker thread
124
+        for thread_id in 0..num_threads {
125
+            match reader.next() {
126
+                Some(Ok(blob)) => {
127
+                    chans[thread_id].send(WorkerMessage::DoBlob(blob))
128
+                        .map_err(|e| format!("{}", e))?;
129
+
130
+                },
131
+                Some(Err(err)) => {
132
+                    return Err(format!("{}", err));
133
+                },
134
+                None => {
135
+                    chans[thread_id].send(WorkerMessage::PleaseStop)
136
+                        .map_err(|e| format!("{}", e))?;
137
+                    stopped_threads += 1;
63
                 },
138
                 },
64
-                _ => {},
65
             }
139
             }
66
-        }).unwrap();
67
-    });
140
+        }
68
 
141
 
69
-    Ok(handle)
142
+        if stopped_threads == num_threads {
143
+            return Ok(());
144
+        }
145
+
146
+        for (thread_id, matches) in result_rx.iter() {
147
+            let matches = matches?;
148
+
149
+            if found_func(matches) == ControlFlow::Break {
150
+                break;
151
+            }
152
+
153
+            match reader.next() {
154
+                Some(Ok(blob)) => {
155
+                    chans[thread_id].send(WorkerMessage::DoBlob(blob))
156
+                        .map_err(|e| format!("{}", e))?;
157
+                },
158
+                Some(Err(err)) => {
159
+                    return Err(format!("{}", err));
160
+                },
161
+                None => {
162
+                    chans[thread_id].send(WorkerMessage::PleaseStop)
163
+                        .map_err(|e| format!("{}", e))?;
164
+                    stopped_threads += 1;
165
+                    if stopped_threads == num_threads {
166
+                        break;
167
+                    }
168
+                }
169
+            }
170
+        }
171
+
172
+        Ok(())
173
+    })
70
 }
174
 }