5 Commits

Autor SHA1 Nachricht Datum
  Johannes Hofmann 31a48ba07f Use binary search for inner loop in IndexedReader vor 5 Jahren
  Johannes Hofmann bd0846d555 Use BTreeSet to check node IDs in IndexedReader vor 5 Jahren
  Johannes Hofmann 5290c2041b Extract reading headers to speed up index creation vor 5 Jahren
  Johannes Hofmann 243e9a9eac Add IndexedReader example vor 5 Jahren
  Johannes Hofmann e5fd662654 Add IndexedReader vor 5 Jahren
5 geänderte Dateien mit 463 neuen und 36 gelöschten Zeilen
  1. 39
    0
      examples/indexed.rs
  2. 107
    36
      src/blob.rs
  3. 286
    0
      src/indexed.rs
  4. 2
    0
      src/lib.rs
  5. 29
    0
      tests/read.rs

+ 39
- 0
examples/indexed.rs Datei anzeigen

@@ -0,0 +1,39 @@
1
+// Count the number of buildings and their nodes from a PBF file
2
+// given as the first command line argument.
3
+
4
+extern crate osmpbf;
5
+
6
+use osmpbf::{IndexedReader, Element};
7
+use std::error::Error;
8
+
9
+fn main() -> Result<(), Box<dyn Error>> {
10
+    // Read command line argument and create IndexedReader
11
+    let arg = std::env::args_os()
12
+        .nth(1)
13
+        .ok_or("need a *.osm.pbf file as argument")?;
14
+    let mut reader = IndexedReader::from_path(&arg)?;
15
+
16
+    println!("Counting...");
17
+    let mut ways = 0;
18
+    let mut nodes = 0;
19
+
20
+    reader.read_ways_and_deps(
21
+        |way| {
22
+            // Filter ways. Return true if tags contain "building": "yes".
23
+            way.tags().any(|key_value| key_value == ("building", "yes"))
24
+        },
25
+        |element| {
26
+            // Increment counter for ways and nodes
27
+            match element {
28
+                Element::Way(_way) => ways += 1,
29
+                Element::Node(_node) => nodes += 1,
30
+                Element::DenseNode(_dense_node) => nodes += 1,
31
+                Element::Relation(_) => {}, // should not occur
32
+            }
33
+        },
34
+    )?;
35
+
36
+    // Print result
37
+    println!("ways:  {}\nnodes: {}", ways, nodes);
38
+    Ok(())
39
+}

+ 107
- 36
src/blob.rs Datei anzeigen

@@ -121,6 +121,35 @@ impl Blob {
121 121
     }
122 122
 }
123 123
 
124
+/// A blob header.
125
+///
126
+/// Just contains information about the size and type of the following `Blob`.
127
+#[derive(Clone, Debug)]
128
+pub struct BlobHeader {
129
+    header: fileformat::BlobHeader,
130
+}
131
+
132
+impl BlobHeader {
133
+    fn new(header: fileformat::BlobHeader) -> Self {
134
+        BlobHeader { header }
135
+    }
136
+
137
+    /// Returns the type of the following blob.
138
+    pub fn blob_type(&self) -> BlobType {
139
+        match self.header.get_field_type() {
140
+            "OSMHeader" => BlobType::OsmHeader,
141
+            "OSMData" => BlobType::OsmData,
142
+            x => BlobType::Unknown(x),
143
+        }
144
+    }
145
+
146
+    /// Returns the size of the following blob in bytes.
147
+    pub fn get_blob_size(&self) -> i32 {
148
+        self.header.get_datasize()
149
+    }
150
+}
151
+
152
+
124 153
 /// A reader for PBF files that allows iterating over `Blob`s.
125 154
 #[derive(Clone, Debug)]
126 155
 pub struct BlobReader<R: Read> {
@@ -154,6 +183,51 @@ impl<R: Read> BlobReader<R> {
154 183
             last_blob_ok: true,
155 184
         }
156 185
     }
186
+
187
+    fn read_blob_header(&mut self) -> Option<Result<fileformat::BlobHeader>> {
188
+        let header_size: u64 = match self.reader.read_u32::<byteorder::BigEndian>() {
189
+            Ok(n) => {
190
+                self.offset = self.offset.map(|x| ByteOffset(x.0 + 4));
191
+                u64::from(n)
192
+            }
193
+            Err(e) => {
194
+                self.offset = None;
195
+                match e.kind() {
196
+                    ::std::io::ErrorKind::UnexpectedEof => {
197
+                        //TODO This also accepts corrupted files in the case of 1-3 available bytes
198
+                        return None;
199
+                    }
200
+                    _ => {
201
+                        self.last_blob_ok = false;
202
+                        return Some(Err(new_blob_error(BlobError::InvalidHeaderSize)));
203
+                    }
204
+                }
205
+            }
206
+        };
207
+
208
+        if header_size >= MAX_BLOB_HEADER_SIZE {
209
+            self.last_blob_ok = false;
210
+            return Some(Err(new_blob_error(BlobError::HeaderTooBig {
211
+                size: header_size,
212
+            })));
213
+        }
214
+
215
+        let header: fileformat::BlobHeader =
216
+            match parse_message_from_reader(&mut self.reader.by_ref().take(header_size)) {
217
+                Ok(header) => header,
218
+                Err(e) => {
219
+                    self.offset = None;
220
+                    self.last_blob_ok = false;
221
+                    return Some(Err(new_protobuf_error(e, "blob header")));
222
+                }
223
+            };
224
+
225
+        self.offset = self
226
+            .offset
227
+            .map(|x| ByteOffset(x.0 + header_size));
228
+
229
+        Some(Ok(header))
230
+    }
157 231
 }
158 232
 
159 233
 impl BlobReader<BufReader<File>> {
@@ -195,43 +269,12 @@ impl<R: Read> Iterator for BlobReader<R> {
195 269
 
196 270
         let prev_offset = self.offset;
197 271
 
198
-        let header_size: u64 = match self.reader.read_u32::<byteorder::BigEndian>() {
199
-            Ok(n) => {
200
-                self.offset = self.offset.map(|x| ByteOffset(x.0 + 4));
201
-                u64::from(n)
202
-            }
203
-            Err(e) => {
204
-                self.offset = None;
205
-                match e.kind() {
206
-                    ::std::io::ErrorKind::UnexpectedEof => {
207
-                        //TODO This also accepts corrupted files in the case of 1-3 available bytes
208
-                        return None;
209
-                    }
210
-                    _ => {
211
-                        self.last_blob_ok = false;
212
-                        return Some(Err(new_blob_error(BlobError::InvalidHeaderSize)));
213
-                    }
214
-                }
215
-            }
272
+        let header = match self.read_blob_header() {
273
+            Some(Ok(header)) => header,
274
+            Some(Err(err)) => return Some(Err(err)),
275
+            None => return None,
216 276
         };
217 277
 
218
-        if header_size >= MAX_BLOB_HEADER_SIZE {
219
-            self.last_blob_ok = false;
220
-            return Some(Err(new_blob_error(BlobError::HeaderTooBig {
221
-                size: header_size,
222
-            })));
223
-        }
224
-
225
-        let header: fileformat::BlobHeader =
226
-            match parse_message_from_reader(&mut self.reader.by_ref().take(header_size)) {
227
-                Ok(header) => header,
228
-                Err(e) => {
229
-                    self.offset = None;
230
-                    self.last_blob_ok = false;
231
-                    return Some(Err(new_protobuf_error(e, "blob header")));
232
-                }
233
-            };
234
-
235 278
         let blob: fileformat::Blob = match parse_message_from_reader(
236 279
             &mut self.reader.by_ref().take(header.get_datasize() as u64),
237 280
         ) {
@@ -245,7 +288,7 @@ impl<R: Read> Iterator for BlobReader<R> {
245 288
 
246 289
         self.offset = self
247 290
             .offset
248
-            .map(|x| ByteOffset(x.0 + header_size + header.get_datasize() as u64));
291
+            .map(|x| ByteOffset(x.0 + header.get_datasize() as u64));
249 292
 
250 293
         Some(Ok(Blob::new(header, blob, prev_offset)))
251 294
     }
@@ -326,6 +369,34 @@ impl<R: Read + Seek> BlobReader<R> {
326 369
             }
327 370
         }
328 371
     }
372
+
373
+    /// Read and return next `BlobHeader` but skip the following `Blob`. This allows really fast
374
+    /// iteration of the PBF structure if only the byte offset and `BlobType` are important.
375
+    /// On success, returns the `BlobHeader` and the byte offset of the header which can also be
376
+    /// used as an offset for reading the entire `Blob` (including header).
377
+    pub fn next_header_skip_blob(&mut self) -> Option<Result<(BlobHeader, Option<ByteOffset>)>> {
378
+        // Stop iteration if there was an error.
379
+        if !self.last_blob_ok {
380
+            return None;
381
+        }
382
+
383
+        let prev_offset = self.offset;
384
+
385
+        // read header
386
+        let header = match self.read_blob_header() {
387
+            Some(Ok(header)) => header,
388
+            Some(Err(err)) => return Some(Err(err)),
389
+            None => return None,
390
+        };
391
+
392
+        // skip blob (which also adjusts self.offset)
393
+        if let Err(err) = self.seek_raw(SeekFrom::Current(header.get_datasize() as i64)) {
394
+            self.last_blob_ok = false;
395
+            return Some(Err(err));
396
+        }
397
+
398
+        Some(Ok((BlobHeader::new(header), prev_offset)))
399
+    }
329 400
 }
330 401
 
331 402
 impl BlobReader<BufReader<File>> {

+ 286
- 0
src/indexed.rs Datei anzeigen

@@ -0,0 +1,286 @@
1
+//! Speed up searches by using an index
2
+
3
+use error::Result;
4
+use std::collections::BTreeSet;
5
+use std::fs::File;
6
+use std::io::{Read, Seek};
7
+use std::ops::RangeInclusive;
8
+use std::path::Path;
9
+use {BlobReader, BlobType, ByteOffset, Element, Way};
10
+
11
+/// Stores the minimum and maximum id of every element type.
12
+#[derive(Debug)]
13
+pub struct IdRanges {
14
+    node_ids: Option<RangeInclusive<i64>>,
15
+    way_ids: Option<RangeInclusive<i64>>,
16
+    relation_ids: Option<RangeInclusive<i64>>,
17
+}
18
+
19
+/// Returns true if the given set contains at least one value that is inside the given range.
20
+fn range_included(range: RangeInclusive<i64>, node_ids: &BTreeSet<i64>) -> bool {
21
+    node_ids.range(range).next().is_some()
22
+}
23
+
24
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25
+enum SimpleBlobType {
26
+    Header,
27
+    Primitive,
28
+    Unknown,
29
+}
30
+
31
+#[derive(Debug)]
32
+struct BlobInfo {
33
+    offset: ByteOffset,
34
+    blob_type: SimpleBlobType,
35
+    id_ranges: Option<IdRanges>,
36
+}
37
+
38
+/// Allows filtering elements and iterating over their dependencies.
39
+/// It chooses an efficient method for navigating the PBF structure to achieve this in reasonable
40
+/// time and with reasonable memory.
41
+pub struct IndexedReader<R: Read + Seek> {
42
+    reader: BlobReader<R>,
43
+    index: Vec<BlobInfo>,
44
+}
45
+
46
+impl<R: Read + Seek> IndexedReader<R> {
47
+    /// Creates a new `IndexedReader`.
48
+    ///
49
+    /// # Example
50
+    /// ```
51
+    /// use osmpbf::*;
52
+    ///
53
+    /// # fn foo() -> Result<()> {
54
+    /// let f = std::fs::File::open("tests/test.osm.pbf")?;
55
+    /// let buf_reader = std::io::BufReader::new(f);
56
+    ///
57
+    /// let reader = IndexedReader::new(buf_reader)?;
58
+    ///
59
+    /// # Ok(())
60
+    /// # }
61
+    /// # foo().unwrap();
62
+    /// ```
63
+    pub fn new(reader: R) -> Result<Self> {
64
+        let reader = BlobReader::new_seekable(reader)?;
65
+        Ok(Self {
66
+            reader,
67
+            index: vec![],
68
+        })
69
+    }
70
+
71
+    pub fn create_index(&mut self) -> Result<()> {
72
+        // remove old items
73
+        self.index.clear();
74
+
75
+        while let Some(result) = self.reader.next_header_skip_blob() {
76
+            let (header, offset) = result?;
77
+            // Reader is seekable, so offset should be Some(ByteOffset)
78
+            let offset = offset.unwrap();
79
+            let blob_type = match header.blob_type() {
80
+                BlobType::OsmHeader => SimpleBlobType::Header,
81
+                BlobType::OsmData => SimpleBlobType::Primitive,
82
+                BlobType::Unknown(_) => SimpleBlobType::Unknown,
83
+            };
84
+
85
+            self.index.push(BlobInfo {
86
+                offset,
87
+                blob_type,
88
+                id_ranges: None,
89
+            });
90
+        }
91
+
92
+        Ok(())
93
+    }
94
+
95
+    /// Filter ways using a closure and return matching ways and their dependent nodes (`Node`s and
96
+    /// `DenseNode`s) in another closure.
97
+    ///
98
+    /// # Example
99
+    /// ```
100
+    /// use osmpbf::*;
101
+    ///
102
+    /// # fn foo() -> Result<()> {
103
+    /// let mut reader = IndexedReader::from_path("tests/test.osm.pbf")?;
104
+    /// let mut ways = 0;
105
+    /// let mut nodes = 0;
106
+    ///
107
+    /// // Filter all ways that are buildings and count their nodes.
108
+    /// reader.read_ways_and_deps(
109
+    ///     |way| {
110
+    ///         // Filter ways. Return true if tags contain "building": "yes".
111
+    ///         way.tags().any(|key_value| key_value == ("building", "yes"))
112
+    ///     },
113
+    ///     |element| {
114
+    ///         // Increment counter
115
+    ///         match element {
116
+    ///             Element::Way(way) => ways += 1,
117
+    ///             Element::Node(node) => nodes += 1,
118
+    ///             Element::DenseNode(dense_node) => nodes += 1,
119
+    ///             Element::Relation(_) => (), // should not occur
120
+    ///         }
121
+    ///     },
122
+    /// )?;
123
+    ///
124
+    /// println!("ways:  {}\nnodes: {}", ways, nodes);
125
+    ///
126
+    /// # assert_eq!(ways, 1);
127
+    /// # assert_eq!(nodes, 3);
128
+    /// # Ok(())
129
+    /// # }
130
+    /// # foo().unwrap();
131
+    /// ```
132
+    pub fn read_ways_and_deps<F, E>(
133
+        &mut self,
134
+        mut filter: F,
135
+        mut element_callback: E,
136
+    ) -> Result<()>
137
+    where
138
+        F: for<'a> FnMut(&Way<'a>) -> bool,
139
+        E: for<'a> FnMut(&Element<'a>),
140
+    {
141
+        // Create index
142
+        if self.index.is_empty() {
143
+            self.create_index()?;
144
+        }
145
+
146
+        let mut node_ids: BTreeSet<i64> = BTreeSet::new();
147
+
148
+        // First pass:
149
+        //   * Filter ways and store their dependencies as node IDs
150
+        //   * Store range of node IDs (min and max value) of each block
151
+        for info in &mut self.index {
152
+            //TODO do something useful with header blocks
153
+            if info.blob_type == SimpleBlobType::Primitive {
154
+                self.reader.seek(info.offset)?;
155
+                let blob = self.reader.next().ok_or_else(|| {
156
+                    ::std::io::Error::new(
157
+                        ::std::io::ErrorKind::UnexpectedEof,
158
+                        "could not read next blob",
159
+                    )
160
+                })??;
161
+                let block = blob.to_primitiveblock()?;
162
+                let mut min_node_id: Option<i64> = None;
163
+                let mut max_node_id: Option<i64> = None;
164
+                for group in block.groups() {
165
+                    // filter ways and record node IDs
166
+                    for way in group.ways() {
167
+                        if filter(&way) {
168
+                            let refs = way.refs();
169
+
170
+                            node_ids.extend(refs);
171
+
172
+                            // Return way
173
+                            element_callback(&Element::Way(way));
174
+                        }
175
+                    }
176
+
177
+                    // Check node IDs of this block, record min and max
178
+
179
+                    let mut check_min_max = |id| {
180
+                        min_node_id = Some(min_node_id.map_or(id, |x| x.min(id)));
181
+                        max_node_id = Some(max_node_id.map_or(id, |x| x.max(id)));
182
+                    };
183
+
184
+                    for node in group.nodes() {
185
+                        check_min_max(node.id())
186
+                    }
187
+                    for node in group.dense_nodes() {
188
+                        check_min_max(node.id)
189
+                    }
190
+                }
191
+                if let (Some(min), Some(max)) = (min_node_id, max_node_id) {
192
+                    info.id_ranges = Some(IdRanges {
193
+                        node_ids: Some(RangeInclusive::new(min, max)),
194
+                        way_ids: None,
195
+                        relation_ids: None,
196
+                    });
197
+                }
198
+            }
199
+        }
200
+
201
+        // Second pass:
202
+        //   * Iterate only over blobs that may include the node IDs we're searching for
203
+        for info in &mut self.index {
204
+            if info.blob_type == SimpleBlobType::Primitive {
205
+                if let Some(node_id_range) = info.id_ranges.as_ref().and_then(|r| r.node_ids.as_ref()) {
206
+                    if range_included(node_id_range.clone(), &node_ids) {
207
+                        //TODO Only collect into Vec if range has a reasonable size
208
+                        let node_ids: Vec<i64> = node_ids.range(node_id_range.clone()).map(|x| *x).collect();
209
+                        self.reader.seek(info.offset)?;
210
+                        let blob = self.reader.next().ok_or_else(|| {
211
+                            ::std::io::Error::new(
212
+                                ::std::io::ErrorKind::UnexpectedEof,
213
+                                "could not read next blob",
214
+                            )
215
+                        })??;
216
+                        let block = blob.to_primitiveblock()?;
217
+                        for group in block.groups() {
218
+                            for node in group.nodes() {
219
+                                if node_ids.binary_search(&node.id()).is_ok() {
220
+                                    // ID found, return node
221
+                                    element_callback(&Element::Node(node));
222
+                                }
223
+                            }
224
+                            for node in group.dense_nodes() {
225
+                                if node_ids.binary_search(&node.id).is_ok() {
226
+                                    // ID found, return dense node
227
+                                    element_callback(&Element::DenseNode(node));
228
+                                }
229
+                            }
230
+                        }
231
+                    }
232
+                }
233
+            }
234
+        }
235
+
236
+        Ok(())
237
+    }
238
+}
239
+
240
+impl IndexedReader<File> {
241
+    /// Creates a new `IndexedReader` from a given path.
242
+    ///
243
+    /// # Example
244
+    /// ```
245
+    /// use osmpbf::*;
246
+    ///
247
+    /// # fn foo() -> Result<()> {
248
+    /// let reader = IndexedReader::from_path("tests/test.osm.pbf")?;
249
+    ///
250
+    /// # Ok(())
251
+    /// # }
252
+    /// # foo().unwrap();
253
+    /// ```
254
+    pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self> {
255
+        //TODO take some more measurements to determine if `BufReader` should be used here
256
+        let f = File::open(path)?;
257
+        Self::new(f)
258
+    }
259
+}
260
+
261
+#[cfg(test)]
262
+mod tests {
263
+    use super::*;
264
+
265
+    #[test]
266
+    fn test_range_included_set() {
267
+        let mut set = BTreeSet::<i64>::new();
268
+        set.extend(&[1,2,6]);
269
+
270
+        assert_eq!(range_included(RangeInclusive::new(0, 0), &set), false);
271
+        assert_eq!(range_included(RangeInclusive::new(1, 1), &set), true);
272
+        assert_eq!(range_included(RangeInclusive::new(2, 2), &set), true);
273
+        assert_eq!(range_included(RangeInclusive::new(3, 3), &set), false);
274
+        assert_eq!(range_included(RangeInclusive::new(3, 5), &set), false);
275
+        assert_eq!(range_included(RangeInclusive::new(3, 6), &set), true);
276
+        assert_eq!(range_included(RangeInclusive::new(6, 6), &set), true);
277
+        assert_eq!(range_included(RangeInclusive::new(7, 7), &set), false);
278
+        assert_eq!(range_included(RangeInclusive::new(0, 1), &set), true);
279
+        assert_eq!(range_included(RangeInclusive::new(6, 7), &set), true);
280
+        assert_eq!(range_included(RangeInclusive::new(2, 3), &set), true);
281
+        assert_eq!(range_included(RangeInclusive::new(5, 6), &set), true);
282
+        assert_eq!(range_included(RangeInclusive::new(5, 8), &set), true);
283
+        assert_eq!(range_included(RangeInclusive::new(0, 8), &set), true);
284
+        assert_eq!(range_included(RangeInclusive::new(0, 4), &set), true);
285
+    }
286
+}

+ 2
- 0
src/lib.rs Datei anzeigen

@@ -81,6 +81,7 @@ pub use block::*;
81 81
 pub use dense::*;
82 82
 pub use elements::*;
83 83
 pub use error::{BlobError, Error, ErrorKind, Result};
84
+pub use indexed::*;
84 85
 pub use mmap_blob::*;
85 86
 pub use reader::*;
86 87
 
@@ -89,6 +90,7 @@ pub mod block;
89 90
 pub mod dense;
90 91
 pub mod elements;
91 92
 mod error;
93
+pub mod indexed;
92 94
 pub mod mmap_blob;
93 95
 mod proto;
94 96
 pub mod reader;

+ 29
- 0
tests/read.rs Datei anzeigen

@@ -198,3 +198,32 @@ fn par_read_elements() {
198 198
         assert_eq!(elements, 5);
199 199
     }
200 200
 }
201
+
202
+#[test]
203
+fn read_ways_and_deps() {
204
+    for path in &TEST_FILE_PATHS {
205
+        let mut reader = IndexedReader::from_path(path).unwrap();
206
+
207
+        let mut ways = 0;
208
+        let mut nodes = 0;
209
+
210
+        reader.read_ways_and_deps(
211
+            |way| {
212
+                way.tags()
213
+                   .find(|&key_value| key_value == ("building", "yes"))
214
+                   .is_some()
215
+            },
216
+            |element| {
217
+                match element {
218
+                    Element::Way(_) => ways += 1,
219
+                    Element::Node(_) => nodes += 1,
220
+                    Element::DenseNode(_) => nodes += 1,
221
+                    Element::Relation(_) => panic!(), // should not occur
222
+                }
223
+            },
224
+        ).unwrap();
225
+
226
+        assert_eq!(ways, 1);
227
+        assert_eq!(nodes, 3);
228
+    }
229
+}