浏览代码

Merge branch 'substack-non-buffering'

Johannes Hofmann 5 年前
父节点
当前提交
f1ebefa045
共有 3 个文件被更改,包括 13 次插入17 次删除
  1. 4
    4
      src/blob.rs
  2. 2
    2
      src/indexed.rs
  3. 7
    11
      src/reader.rs

+ 4
- 4
src/blob.rs 查看文件

152
 
152
 
153
 /// A reader for PBF files that allows iterating over `Blob`s.
153
 /// A reader for PBF files that allows iterating over `Blob`s.
154
 #[derive(Clone, Debug)]
154
 #[derive(Clone, Debug)]
155
-pub struct BlobReader<R: Read> {
155
+pub struct BlobReader<R: Read + Send> {
156
     reader: R,
156
     reader: R,
157
     /// Current reader offset in bytes from the start of the stream.
157
     /// Current reader offset in bytes from the start of the stream.
158
     offset: Option<ByteOffset>,
158
     offset: Option<ByteOffset>,
159
     last_blob_ok: bool,
159
     last_blob_ok: bool,
160
 }
160
 }
161
 
161
 
162
-impl<R: Read> BlobReader<R> {
162
+impl<R: Read + Send> BlobReader<R> {
163
     /// Creates a new `BlobReader`.
163
     /// Creates a new `BlobReader`.
164
     ///
164
     ///
165
     /// # Example
165
     /// # Example
258
     }
258
     }
259
 }
259
 }
260
 
260
 
261
-impl<R: Read> Iterator for BlobReader<R> {
261
+impl<R: Read + Send> Iterator for BlobReader<R> {
262
     type Item = Result<Blob>;
262
     type Item = Result<Blob>;
263
 
263
 
264
     fn next(&mut self) -> Option<Self::Item> {
264
     fn next(&mut self) -> Option<Self::Item> {
294
     }
294
     }
295
 }
295
 }
296
 
296
 
297
-impl<R: Read + Seek> BlobReader<R> {
297
+impl<R: Read + Seek + Send> BlobReader<R> {
298
     /// Creates a new `BlobReader` from the given reader that is seekable and will be initialized
298
     /// Creates a new `BlobReader` from the given reader that is seekable and will be initialized
299
     /// with a valid offset.
299
     /// with a valid offset.
300
     ///
300
     ///

+ 2
- 2
src/indexed.rs 查看文件

38
 /// Allows filtering elements and iterating over their dependencies.
38
 /// Allows filtering elements and iterating over their dependencies.
39
 /// It chooses an efficient method for navigating the PBF structure to achieve this in reasonable
39
 /// It chooses an efficient method for navigating the PBF structure to achieve this in reasonable
40
 /// time and with reasonable memory.
40
 /// time and with reasonable memory.
41
-pub struct IndexedReader<R: Read + Seek> {
41
+pub struct IndexedReader<R: Read + Seek + Send> {
42
     reader: BlobReader<R>,
42
     reader: BlobReader<R>,
43
     index: Vec<BlobInfo>,
43
     index: Vec<BlobInfo>,
44
 }
44
 }
45
 
45
 
46
-impl<R: Read + Seek> IndexedReader<R> {
46
+impl<R: Read + Seek + Send> IndexedReader<R> {
47
     /// Creates a new `IndexedReader`.
47
     /// Creates a new `IndexedReader`.
48
     ///
48
     ///
49
     /// # Example
49
     /// # Example

+ 7
- 11
src/reader.rs 查看文件

10
 
10
 
11
 /// A reader for PBF files that gives access to the stored elements: nodes, ways and relations.
11
 /// A reader for PBF files that gives access to the stored elements: nodes, ways and relations.
12
 #[derive(Clone, Debug)]
12
 #[derive(Clone, Debug)]
13
-pub struct ElementReader<R: Read> {
13
+pub struct ElementReader<R: Read + Send> {
14
     blob_iter: BlobReader<R>,
14
     blob_iter: BlobReader<R>,
15
 }
15
 }
16
 
16
 
17
-impl<R: Read> ElementReader<R> {
17
+impl<R: Read + Send> ElementReader<R> {
18
     /// Creates a new `ElementReader`.
18
     /// Creates a new `ElementReader`.
19
     ///
19
     ///
20
     /// # Example
20
     /// # Example
68
     where
68
     where
69
         F: for<'a> FnMut(Element<'a>),
69
         F: for<'a> FnMut(Element<'a>),
70
     {
70
     {
71
-        let blobs = self.blob_iter.collect::<Result<Vec<_>>>()?;
72
-
73
         //TODO do something useful with header blocks
71
         //TODO do something useful with header blocks
74
-        for blob in &blobs {
75
-            match blob.decode() {
72
+        for blob in self.blob_iter {
73
+            match blob?.decode() {
76
                 Ok(BlobDecode::OsmHeader(_)) | Ok(BlobDecode::Unknown(_)) => {}
74
                 Ok(BlobDecode::OsmHeader(_)) | Ok(BlobDecode::Unknown(_)) => {}
77
                 Ok(BlobDecode::OsmData(block)) => {
75
                 Ok(BlobDecode::OsmData(block)) => {
78
                     block.for_each_element(&mut f);
76
                     block.for_each_element(&mut f);
125
         ID: Fn() -> T + Sync + Send,
123
         ID: Fn() -> T + Sync + Send,
126
         T: Send,
124
         T: Send,
127
     {
125
     {
128
-        let blobs = self.blob_iter.collect::<Result<Vec<_>>>()?;
129
-
130
-        blobs
131
-            .into_par_iter()
132
-            .map(|blob| match blob.decode() {
126
+        self.blob_iter
127
+            .par_bridge()
128
+            .map(|blob| match blob?.decode() {
133
                 Ok(BlobDecode::OsmHeader(_)) | Ok(BlobDecode::Unknown(_)) => Ok(identity()),
129
                 Ok(BlobDecode::OsmHeader(_)) | Ok(BlobDecode::Unknown(_)) => Ok(identity()),
134
                 Ok(BlobDecode::OsmData(block)) => Ok(block
130
                 Ok(BlobDecode::OsmData(block)) => Ok(block
135
                     .elements()
131
                     .elements()