Parcourir la source

Extract reading headers to speed up index creation

When creating an index (`IndexedReader::create_index`) the unused `Blob`s are not copied into memory, which really improves performance for this operation (5x faster).
Johannes Hofmann il y a 5 ans
Parent
révision
5290c2041b
2 fichiers modifiés avec 113 ajouts et 42 suppressions
  1. 107
    36
      src/blob.rs
  2. 6
    6
      src/indexed.rs

+ 107
- 36
src/blob.rs Voir le fichier

@@ -121,6 +121,35 @@ impl Blob {
121 121
     }
122 122
 }
123 123
 
124
+/// A blob header.
125
+///
126
+/// Just contains information about the size and type of the following `Blob`.
127
+#[derive(Clone, Debug)]
128
+pub struct BlobHeader {
129
+    header: fileformat::BlobHeader,
130
+}
131
+
132
+impl BlobHeader {
133
+    fn new(header: fileformat::BlobHeader) -> Self {
134
+        BlobHeader { header }
135
+    }
136
+
137
+    /// Returns the type of the following blob.
138
+    pub fn blob_type(&self) -> BlobType {
139
+        match self.header.get_field_type() {
140
+            "OSMHeader" => BlobType::OsmHeader,
141
+            "OSMData" => BlobType::OsmData,
142
+            x => BlobType::Unknown(x),
143
+        }
144
+    }
145
+
146
+    /// Returns the size of the following blob in bytes.
147
+    pub fn get_blob_size(&self) -> i32 {
148
+        self.header.get_datasize()
149
+    }
150
+}
151
+
152
+
124 153
 /// A reader for PBF files that allows iterating over `Blob`s.
125 154
 #[derive(Clone, Debug)]
126 155
 pub struct BlobReader<R: Read> {
@@ -154,6 +183,51 @@ impl<R: Read> BlobReader<R> {
154 183
             last_blob_ok: true,
155 184
         }
156 185
     }
186
+
187
+    fn read_blob_header(&mut self) -> Option<Result<fileformat::BlobHeader>> {
188
+        let header_size: u64 = match self.reader.read_u32::<byteorder::BigEndian>() {
189
+            Ok(n) => {
190
+                self.offset = self.offset.map(|x| ByteOffset(x.0 + 4));
191
+                u64::from(n)
192
+            }
193
+            Err(e) => {
194
+                self.offset = None;
195
+                match e.kind() {
196
+                    ::std::io::ErrorKind::UnexpectedEof => {
197
+                        //TODO This also accepts corrupted files in the case of 1-3 available bytes
198
+                        return None;
199
+                    }
200
+                    _ => {
201
+                        self.last_blob_ok = false;
202
+                        return Some(Err(new_blob_error(BlobError::InvalidHeaderSize)));
203
+                    }
204
+                }
205
+            }
206
+        };
207
+
208
+        if header_size >= MAX_BLOB_HEADER_SIZE {
209
+            self.last_blob_ok = false;
210
+            return Some(Err(new_blob_error(BlobError::HeaderTooBig {
211
+                size: header_size,
212
+            })));
213
+        }
214
+
215
+        let header: fileformat::BlobHeader =
216
+            match parse_message_from_reader(&mut self.reader.by_ref().take(header_size)) {
217
+                Ok(header) => header,
218
+                Err(e) => {
219
+                    self.offset = None;
220
+                    self.last_blob_ok = false;
221
+                    return Some(Err(new_protobuf_error(e, "blob header")));
222
+                }
223
+            };
224
+
225
+        self.offset = self
226
+            .offset
227
+            .map(|x| ByteOffset(x.0 + header_size));
228
+
229
+        Some(Ok(header))
230
+    }
157 231
 }
158 232
 
159 233
 impl BlobReader<BufReader<File>> {
@@ -195,43 +269,12 @@ impl<R: Read> Iterator for BlobReader<R> {
195 269
 
196 270
         let prev_offset = self.offset;
197 271
 
198
-        let header_size: u64 = match self.reader.read_u32::<byteorder::BigEndian>() {
199
-            Ok(n) => {
200
-                self.offset = self.offset.map(|x| ByteOffset(x.0 + 4));
201
-                u64::from(n)
202
-            }
203
-            Err(e) => {
204
-                self.offset = None;
205
-                match e.kind() {
206
-                    ::std::io::ErrorKind::UnexpectedEof => {
207
-                        //TODO This also accepts corrupted files in the case of 1-3 available bytes
208
-                        return None;
209
-                    }
210
-                    _ => {
211
-                        self.last_blob_ok = false;
212
-                        return Some(Err(new_blob_error(BlobError::InvalidHeaderSize)));
213
-                    }
214
-                }
215
-            }
272
+        let header = match self.read_blob_header() {
273
+            Some(Ok(header)) => header,
274
+            Some(Err(err)) => return Some(Err(err)),
275
+            None => return None,
216 276
         };
217 277
 
218
-        if header_size >= MAX_BLOB_HEADER_SIZE {
219
-            self.last_blob_ok = false;
220
-            return Some(Err(new_blob_error(BlobError::HeaderTooBig {
221
-                size: header_size,
222
-            })));
223
-        }
224
-
225
-        let header: fileformat::BlobHeader =
226
-            match parse_message_from_reader(&mut self.reader.by_ref().take(header_size)) {
227
-                Ok(header) => header,
228
-                Err(e) => {
229
-                    self.offset = None;
230
-                    self.last_blob_ok = false;
231
-                    return Some(Err(new_protobuf_error(e, "blob header")));
232
-                }
233
-            };
234
-
235 278
         let blob: fileformat::Blob = match parse_message_from_reader(
236 279
             &mut self.reader.by_ref().take(header.get_datasize() as u64),
237 280
         ) {
@@ -245,7 +288,7 @@ impl<R: Read> Iterator for BlobReader<R> {
245 288
 
246 289
         self.offset = self
247 290
             .offset
248
-            .map(|x| ByteOffset(x.0 + header_size + header.get_datasize() as u64));
291
+            .map(|x| ByteOffset(x.0 + header.get_datasize() as u64));
249 292
 
250 293
         Some(Ok(Blob::new(header, blob, prev_offset)))
251 294
     }
@@ -326,6 +369,34 @@ impl<R: Read + Seek> BlobReader<R> {
326 369
             }
327 370
         }
328 371
     }
372
+
373
+    /// Read and return next `BlobHeader` but skip the following `Blob`. This allows really fast
374
+    /// iteration of the PBF structure if only the byte offset and `BlobType` are important.
375
+    /// On success, returns the `BlobHeader` and the byte offset of the header which can also be
376
+    /// used as an offset for reading the entire `Blob` (including header).
377
+    pub fn next_header_skip_blob(&mut self) -> Option<Result<(BlobHeader, Option<ByteOffset>)>> {
378
+        // Stop iteration if there was an error.
379
+        if !self.last_blob_ok {
380
+            return None;
381
+        }
382
+
383
+        let prev_offset = self.offset;
384
+
385
+        // read header
386
+        let header = match self.read_blob_header() {
387
+            Some(Ok(header)) => header,
388
+            Some(Err(err)) => return Some(Err(err)),
389
+            None => return None,
390
+        };
391
+
392
+        // skip blob (which also adjusts self.offset)
393
+        if let Err(err) = self.seek_raw(SeekFrom::Current(header.get_datasize() as i64)) {
394
+            self.last_blob_ok = false;
395
+            return Some(Err(err));
396
+        }
397
+
398
+        Some(Ok((BlobHeader::new(header), prev_offset)))
399
+    }
329 400
 }
330 401
 
331 402
 impl BlobReader<BufReader<File>> {

+ 6
- 6
src/indexed.rs Voir le fichier

@@ -81,15 +81,15 @@ impl<R: Read + Seek> IndexedReader<R> {
81 81
         })
82 82
     }
83 83
 
84
-    fn create_index(&mut self) -> Result<()> {
84
+    pub fn create_index(&mut self) -> Result<()> {
85 85
         // remove old items
86 86
         self.index.clear();
87 87
 
88
-        for blob in &mut self.reader {
89
-            let blob = blob?;
90
-            // Reader is seekable, so offset should return Some(ByteOffset)
91
-            let offset = blob.offset().unwrap();
92
-            let blob_type = match blob.get_type() {
88
+        while let Some(result) = self.reader.next_header_skip_blob() {
89
+            let (header, offset) = result?;
90
+            // Reader is seekable, so offset should be Some(ByteOffset)
91
+            let offset = offset.unwrap();
92
+            let blob_type = match header.blob_type() {
93 93
                 BlobType::OsmHeader => SimpleBlobType::Header,
94 94
                 BlobType::OsmData => SimpleBlobType::Primitive,
95 95
                 BlobType::Unknown(_) => SimpleBlobType::Unknown,