Browse Source

Track blob offset, add BlobReader::seek* methods

Johannes Hofmann 7 years ago
parent
commit
31e0488700
1 changed files with 118 additions and 8 deletions
  1. 118
    8
      src/blob.rs

+ 118
- 8
src/blob.rs View File

8
 use error::{BlobError, Result, new_blob_error, new_protobuf_error};
8
 use error::{BlobError, Result, new_blob_error, new_protobuf_error};
9
 use proto::fileformat;
9
 use proto::fileformat;
10
 use std::fs::File;
10
 use std::fs::File;
11
-use std::io::{BufReader, Read};
11
+use std::io::{BufReader, Read, Seek, SeekFrom};
12
 use std::path::Path;
12
 use std::path::Path;
13
 use util::{parse_message_from_bytes, parse_message_from_reader};
13
 use util::{parse_message_from_bytes, parse_message_from_reader};
14
 
14
 
51
     Unknown(&'a str),
51
     Unknown(&'a str),
52
 }
52
 }
53
 
53
 
54
+/// The offset of a blob in bytes from stream start.
55
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56
+pub struct ByteOffset(pub u64);
57
+
54
 /// A blob.
58
 /// A blob.
55
 ///
59
 ///
56
 /// A PBF file consists of a sequence of blobs. This type supports decoding the content of a blob
60
 /// A PBF file consists of a sequence of blobs. This type supports decoding the content of a blob
59
 pub struct Blob {
63
 pub struct Blob {
60
     header: fileformat::BlobHeader,
64
     header: fileformat::BlobHeader,
61
     blob: fileformat::Blob,
65
     blob: fileformat::Blob,
66
+    offset: Option<ByteOffset>,
62
 }
67
 }
63
 
68
 
64
 impl Blob {
69
 impl Blob {
65
-    fn new(header: fileformat::BlobHeader, blob: fileformat::Blob) -> Blob {
70
+    fn new(
71
+        header: fileformat::BlobHeader,
72
+        blob: fileformat::Blob,
73
+        offset: Option<ByteOffset>,
74
+    ) -> Blob {
66
         Blob {
75
         Blob {
67
             header,
76
             header,
68
-            blob
77
+            blob,
78
+            offset,
69
         }
79
         }
70
     }
80
     }
71
 
81
 
95
         }
105
         }
96
     }
106
     }
97
 
107
 
108
+    /// Returns the byte offset of the blob from the start of its source stream.
109
+    /// This might be `None` if the source stream does not implement `Seek`.
110
+    pub fn offset(&self) -> Option<ByteOffset> {
111
+        self.offset
112
+    }
113
+
98
     /// Tries to decode the blob to a `HeaderBlock`. This operation might involve an expensive
114
     /// Tries to decode the blob to a `HeaderBlock`. This operation might involve an expensive
99
     /// decompression step.
115
     /// decompression step.
100
     pub fn to_headerblock(&self) -> Result<HeaderBlock> {
116
     pub fn to_headerblock(&self) -> Result<HeaderBlock> {
114
 #[derive(Clone, Debug)]
130
 #[derive(Clone, Debug)]
115
 pub struct BlobReader<R: Read> {
131
 pub struct BlobReader<R: Read> {
116
     reader: R,
132
     reader: R,
133
+    /// Current reader offset in bytes from the start of the stream.
134
+    offset: Option<ByteOffset>,
117
     last_blob_ok: bool,
135
     last_blob_ok: bool,
118
 }
136
 }
119
 
137
 
120
 impl<R: Read> BlobReader<R> {
138
 impl<R: Read> BlobReader<R> {
121
-    /// Creates a new `ElementReader`.
139
+    /// Creates a new `BlobReader`.
122
     ///
140
     ///
123
     /// # Example
141
     /// # Example
124
     /// ```
142
     /// ```
128
     /// let f = std::fs::File::open("tests/test.osm.pbf")?;
146
     /// let f = std::fs::File::open("tests/test.osm.pbf")?;
129
     /// let buf_reader = std::io::BufReader::new(f);
147
     /// let buf_reader = std::io::BufReader::new(f);
130
     ///
148
     ///
131
-    /// let reader = ElementReader::new(buf_reader);
149
+    /// let reader = BlobReader::new(buf_reader);
132
     ///
150
     ///
133
     /// # Ok(())
151
     /// # Ok(())
134
     /// # }
152
     /// # }
136
     pub fn new(reader: R) -> BlobReader<R> {
154
     pub fn new(reader: R) -> BlobReader<R> {
137
         BlobReader {
155
         BlobReader {
138
             reader,
156
             reader,
157
+            offset: None,
139
             last_blob_ok: true,
158
             last_blob_ok: true,
140
         }
159
         }
141
     }
160
     }
161
         let f = File::open(path)?;
180
         let f = File::open(path)?;
162
         let reader = BufReader::new(f);
181
         let reader = BufReader::new(f);
163
 
182
 
164
-        Ok(BlobReader::new(reader))
183
+        Ok(BlobReader {
184
+            reader,
185
+            offset: Some(ByteOffset(0)),
186
+            last_blob_ok: true,
187
+        })
165
     }
188
     }
166
 }
189
 }
167
 
190
 
174
             return None;
197
             return None;
175
         }
198
         }
176
 
199
 
200
+        let prev_offset = self.offset;
201
+
177
         let header_size: u64 = match self.reader.read_u32::<byteorder::BigEndian>() {
202
         let header_size: u64 = match self.reader.read_u32::<byteorder::BigEndian>() {
178
-            Ok(n) => u64::from(n),
203
+            Ok(n) => {
204
+                self.offset = self.offset.map(|x| ByteOffset(x.0 + 4));
205
+                u64::from(n)
206
+            },
179
             Err(e) => {
207
             Err(e) => {
208
+                self.offset = None;
180
                 match e.kind() {
209
                 match e.kind() {
181
                     ::std::io::ErrorKind::UnexpectedEof => {
210
                     ::std::io::ErrorKind::UnexpectedEof => {
182
                         //TODO This also accepts corrupted files in the case of 1-3 available bytes
211
                         //TODO This also accepts corrupted files in the case of 1-3 available bytes
198
         let header: fileformat::BlobHeader = match parse_message_from_reader(&mut self.reader.by_ref().take(header_size)) {
227
         let header: fileformat::BlobHeader = match parse_message_from_reader(&mut self.reader.by_ref().take(header_size)) {
199
             Ok(header) => header,
228
             Ok(header) => header,
200
             Err(e) => {
229
             Err(e) => {
230
+                self.offset = None;
201
                 self.last_blob_ok = false;
231
                 self.last_blob_ok = false;
202
                 return Some(Err(new_protobuf_error(e, "blob header")));
232
                 return Some(Err(new_protobuf_error(e, "blob header")));
203
             },
233
             },
206
         let blob: fileformat::Blob = match parse_message_from_reader(&mut self.reader.by_ref().take(header.get_datasize() as u64)) {
236
         let blob: fileformat::Blob = match parse_message_from_reader(&mut self.reader.by_ref().take(header.get_datasize() as u64)) {
207
             Ok(blob) => blob,
237
             Ok(blob) => blob,
208
             Err(e) => {
238
             Err(e) => {
239
+                self.offset = None;
209
                 self.last_blob_ok = false;
240
                 self.last_blob_ok = false;
210
                 return Some(Err(new_protobuf_error(e, "blob content")));
241
                 return Some(Err(new_protobuf_error(e, "blob content")));
211
             },
242
             },
212
         };
243
         };
213
 
244
 
214
-        Some(Ok(Blob::new(header, blob)))
245
+        self.offset = self.offset.map(|x| ByteOffset(
246
+            x.0 + header_size + header.get_datasize() as u64
247
+        ));
248
+
249
+        Some(Ok(Blob::new(header, blob, prev_offset)))
250
+    }
251
+}
252
+
253
+impl<R: Read + Seek> BlobReader<R> {
254
+    /// Creates a new `BlobReader` from the given reader that is seekable and will be initialized
255
+    /// with a valid offset.
256
+    ///
257
+    /// # Example
258
+    /// ```
259
+    /// use osmpbf::*;
260
+    ///
261
+    /// # fn foo() -> Result<()> {
262
+    /// let f = std::fs::File::open("tests/test.osm.pbf")?;
263
+    /// let buf_reader = std::io::BufReader::new(f);
264
+    ///
265
+    /// let mut reader = BlobReader::new_seekable(buf_reader)?;
266
+    /// let first_blob = reader.next().unwrap()?;
267
+    ///
268
+    /// assert_eq!(first_blob.offset(), Some(ByteOffset(1)));
269
+    /// # Ok(())
270
+    /// # }
271
+    /// ```
272
+    pub fn new_seekable(mut reader: R) -> Result<BlobReader<R>> {
273
+        let pos = reader.seek(SeekFrom::Current(0))?;
274
+
275
+        Ok(BlobReader {
276
+            reader,
277
+            offset: Some(ByteOffset(pos)),
278
+            last_blob_ok: true,
279
+        })
280
+    }
281
+
282
+    /// Seek to an offset in bytes from the start of the stream.
283
+    ///
284
+    /// # Example
285
+    /// ```
286
+    /// use osmpbf::*;
287
+    ///
288
+    /// # fn foo() -> Result<()> {
289
+    /// let mut reader = BlobReader::from_path("tests/test.osm.pbf")?;
290
+    /// let first_blob = reader.next().unwrap()?;
291
+    /// let second_blob = reader.next().unwrap()?;
292
+    ///
293
+    /// reader.seek(first_blob.offset().unwrap())?;
294
+    ///
295
+    /// let first_blob_again = reader.next().unwrap()?;
296
+    /// assert_eq!(first_blob.offset(), first_blob_again.offset());
297
+    /// # Ok(())
298
+    /// # }
299
+    /// ```
300
+    pub fn seek(&mut self, pos: ByteOffset) -> Result<()> {
301
+        match self.reader.seek(SeekFrom::Start(pos.0)) {
302
+            Ok(offset) => {
303
+                self.offset = Some(ByteOffset(offset));
304
+                Ok(())
305
+            },
306
+            Err(e) => {
307
+                self.offset = None;
308
+                Err(e.into())
309
+            },
310
+        }
311
+    }
312
+
313
+    /// Seek to an offset in bytes. (See `std::io::Seek`)
314
+    pub fn seek_raw(&mut self, pos: SeekFrom) -> Result<u64> {
315
+        match self.reader.seek(pos) {
316
+            Ok(offset) => {
317
+                self.offset = Some(ByteOffset(offset));
318
+                Ok(offset)
319
+            },
320
+            Err(e) => {
321
+                self.offset = None;
322
+                Err(e.into())
323
+            },
324
+        }
215
     }
325
     }
216
 }
326
 }
217
 
327