Przeglądaj źródła

tile_loader: Download tiles in separate threads

Johannes Hofmann 8 lat temu
rodzic
commit
7f3c28ca49
1 zmienionych plików z 138 dodań i 59 usunięć
  1. 138
    59
      src/tile_loader.rs

+ 138
- 59
src/tile_loader.rs Wyświetl plik

@@ -8,8 +8,8 @@ use std::collections::hash_set::HashSet;
8 8
 use std::fs::File;
9 9
 use std::io::Write;
10 10
 use std::path::{Path, PathBuf};
11
+use std::sync::{Arc, mpsc, Mutex};
11 12
 use std::sync::mpsc::TryRecvError;
12
-use std::sync::mpsc;
13 13
 use std::thread;
14 14
 use tile::Tile;
15 15
 use tile_source::TileSource;
@@ -49,85 +49,154 @@ impl TileLoader {
49 49
     )
50 50
         where F: Fn(Tile) + Sync + Send + 'static,
51 51
     {
52
-        let mut client_opt = None;
53
-        let mut queue: Vec<(Tile, String, PathBuf, bool)> = vec![];
52
+        let mut queue: Vec<TileRequest> = vec![];
53
+        let remote_queue: Arc<Mutex<Vec<TileRequest>>> = Arc::new(Mutex::new(vec![]));
54
+        let mut view_opt: Option<View> = None;
55
+
56
+        let arc_notice_func = Arc::new(notice_func);
57
+
58
+        let (remote_request_tx, remote_request_rx) = mpsc::channel();
59
+        {
60
+            let arc_request_rx = Arc::new(Mutex::new(remote_request_rx));
61
+            for id in 0..2 {
62
+                let remote_queue = Arc::clone(&remote_queue);
63
+                let arc_request_rx = Arc::clone(&arc_request_rx);
64
+                let result_tx = result_tx.clone();
65
+                let arc_notice_func = Arc::clone(&arc_notice_func);
66
+                thread::spawn(move || Self::work_remote(id, remote_queue, arc_request_rx, result_tx, arc_notice_func));
67
+            }
68
+        }
54 69
 
55 70
         'outer: while let Ok(message) = request_rx.recv() {
56
-            let mut view_opt: Option<View> = None;
71
+            let mut need_to_sort = true;
57 72
 
58 73
             match message {
59
-                LoaderMessage::SetViewLocation{view} => {
74
+                LoaderMessage::SetView(view) => {
60 75
                     view_opt = Some(view);
61 76
                 },
62
-                LoaderMessage::GetTile{tile, url, path, write_to_file} => {
63
-                    queue.push((tile, url, path, write_to_file));
77
+                LoaderMessage::GetTile(request) => {
78
+                    queue.push(request);
64 79
                 }
65 80
             }
66 81
 
67 82
             loop {
68 83
                 loop {
69
-                    match request_rx.try_recv() {
70
-                        Ok(LoaderMessage::SetViewLocation{view}) => {
84
+                    let message = request_rx.try_recv();
85
+
86
+                    match message {
87
+                        Ok(LoaderMessage::SetView(view)) => {
71 88
                             view_opt = Some(view);
89
+                            need_to_sort = true;
72 90
                         },
73
-                        Ok(LoaderMessage::GetTile{tile, url, path, write_to_file}) => {
74
-                            queue.push((tile, url, path, write_to_file));
91
+                        Ok(LoaderMessage::GetTile(request)) => {
92
+                            queue.push(request);
93
+                            need_to_sort = true;
75 94
                         },
76 95
                         Err(TryRecvError::Empty) => break,
77 96
                         Err(TryRecvError::Disconnected) => break 'outer,
78 97
                     }
79 98
                 }
80 99
 
81
-                if let Some(view) = view_opt {
82
-                    //TODO sort queue
83
-                    queue.as_mut_slice().sort_by(|&(tile_a, _, _, _), &(tile_b, _, _, _)| {
84
-                        compare_tiles(tile_a, tile_b, view)
85
-                    });
100
+                if need_to_sort {
101
+                    if let Some(view) = view_opt {
102
+                        need_to_sort = false;
103
+
104
+                        queue.as_mut_slice().sort_by(|a, b| {
105
+                            compare_tiles(a.tile, b.tile, view)
106
+                        });
107
+
108
+                        if let Ok(mut remote_queue) = remote_queue.lock() {
109
+                            remote_queue.as_mut_slice().sort_by(|a, b| {
110
+                                compare_tiles(a.tile, b.tile, view)
111
+                            });
112
+                        }
113
+                    }
86 114
                 }
87 115
 
88 116
                 match queue.pop() {
89 117
                     None => break,
90
-                    Some((tile, url, path, write_to_file)) => {
91
-                        println!("queue {:?} {:?}", tile, path);
92
-                        match image::open(&path) {
118
+                    Some(request) => {
119
+                        match image::open(&request.path) {
93 120
                             Ok(img) => {
94
-                                result_tx.send((tile, Some(img))).unwrap();
95
-                                notice_func(tile);
121
+                                if let Err(_) = result_tx.send((request.tile, Some(img))) {
122
+                                    break 'outer;
123
+                                }
124
+                                arc_notice_func(request.tile);
96 125
                                 continue;
97 126
                             },
98 127
                             Err(_) => {
99
-                                //TODO do not try to create a client every time when it failed before
100
-                                if client_opt.is_none() {
101
-                                    client_opt = Client::builder().build().ok();
102
-                                }
103
-
104
-                                if let Some(ref client) = client_opt {
105
-                                    println!("use client {:?}", tile);
106
-                                    if let Ok(mut response) = client.get(&url).send() {
107
-                                        let mut buf: Vec<u8> = vec![];
108
-                                        response.copy_to(&mut buf).unwrap();
109
-                                        if let Ok(img) = image::load_from_memory(&buf) {
110
-                                            result_tx.send((tile, Some(img))).unwrap();
111
-                                            notice_func(tile);
112
-
113
-                                            if write_to_file {
114
-                                                //TODO do something on write errors
115
-                                                let _ = Self::write_to_file(&path, &buf);
116
-                                            }
117
-
118
-                                            continue;
119
-                                        }
128
+                                if let Ok(mut remote_queue) = remote_queue.lock() {
129
+                                    //TODO restrict size of remote_queue
130
+                                    remote_queue.push(request);
131
+                                    if let Some(view) = view_opt {
132
+                                        remote_queue.as_mut_slice().sort_by(|a, b| {
133
+                                            compare_tiles(a.tile, b.tile, view)
134
+                                        });
135
+                                    }
136
+                                    if remote_request_tx.send(RemoteLoaderMessage::PopQueue).is_err() {
137
+                                        //TODO remote worker terminated
120 138
                                     }
121 139
                                 }
122 140
                             },
123 141
                         }
124
-                        result_tx.send((tile, None)).unwrap();
125 142
                     },
126 143
                 }
127 144
             }
128 145
         }
129 146
     }
130 147
 
148
+    fn work_remote<F>(
149
+        thread_id: u32,
150
+        queue: Arc<Mutex<Vec<TileRequest>>>,
151
+        request_rx: Arc<Mutex<mpsc::Receiver<RemoteLoaderMessage>>>,
152
+        result_tx: mpsc::Sender<(Tile, Option<DynamicImage>)>,
153
+        notice_func: Arc<F>,
154
+    )
155
+        where F: Fn(Tile) + Sync + Send + 'static,
156
+    {
157
+        let mut client_opt = None;
158
+
159
+        loop {
160
+            let message = request_rx.lock().ok().and_then(|r| r.recv().ok());
161
+            match message {
162
+                None => break,
163
+                Some(RemoteLoaderMessage::Terminate) => break,
164
+                Some(RemoteLoaderMessage::PopQueue) => {
165
+                    let ele: Option<TileRequest> = queue.lock().ok().and_then(|mut q| q.pop());
166
+
167
+                    if let Some(request) = ele {
168
+                        println!("thread {}: queue {:?} {:?}", thread_id, request.tile, request.path);
169
+                        if client_opt.is_none() {
170
+                            client_opt = Client::builder().build().ok();
171
+                        }
172
+
173
+                        if let Some(Ok(mut response)) = client_opt.as_ref().map(|c| c.get(&request.url).send()) {
174
+                            let mut buf: Vec<u8> = vec![];
175
+                            response.copy_to(&mut buf).unwrap();
176
+                            if let Ok(img) = image::load_from_memory(&buf) {
177
+                                if result_tx.send((request.tile, Some(img))).is_err() {
178
+                                    break;
179
+                                }
180
+
181
+                                notice_func(request.tile);
182
+
183
+                                if request.write_to_file {
184
+                                    //TODO do something on write errors
185
+                                    let _ = Self::write_to_file(&request.path, &buf);
186
+                                }
187
+
188
+                                continue;
189
+                            }
190
+                        }
191
+                        if result_tx.send((request.tile, None)).is_err() {
192
+                            break;
193
+                        }
194
+                    }
195
+                },
196
+            }
197
+        }
198
+    }
199
+
131 200
     pub fn async_request(&mut self, tile_coord: TileCoord, source: &TileSource, write_to_file: bool) {
132 201
         if tile_coord.zoom > source.max_tile_zoom() {
133 202
             return;
@@ -137,12 +206,14 @@ impl TileLoader {
137 206
 
138 207
         if !self.pending.contains(&tile) {
139 208
             self.pending.insert(tile);
140
-            self.request_tx.send(LoaderMessage::GetTile{
141
-                tile: tile,
142
-                url: source.remote_tile_url(tile_coord),
143
-                path: source.local_tile_path(tile_coord),
144
-                write_to_file: write_to_file,
145
-            }).unwrap();
209
+            self.request_tx.send(LoaderMessage::GetTile(
210
+                TileRequest {
211
+                    tile: tile,
212
+                    url: source.remote_tile_url(tile_coord),
213
+                    path: source.local_tile_path(tile_coord),
214
+                    write_to_file: write_to_file,
215
+                }
216
+            )).unwrap();
146 217
         }
147 218
     }
148 219
 
@@ -172,7 +243,6 @@ impl TileLoader {
172 243
                 }
173 244
 
174 245
                 if let Some(ref client) = self.client {
175
-                    println!("use client {:?}", tile);
176 246
                     if let Ok(mut response) = client.get(&source.remote_tile_url(tile)).send() {
177 247
                         let mut buf: Vec<u8> = vec![];
178 248
                         response.copy_to(&mut buf).unwrap();
@@ -196,28 +266,37 @@ impl TileLoader {
196 266
     }
197 267
 
198 268
     pub fn set_view_location(&mut self, view: View) {
199
-        self.request_tx.send(LoaderMessage::SetViewLocation{
200
-            view: view,
201
-        }).unwrap();
269
+        self.request_tx.send(LoaderMessage::SetView(view)).unwrap();
202 270
     }
203 271
 
204 272
     fn write_to_file<P: AsRef<Path>>(path: P, img_data: &[u8]) -> ::std::io::Result<()> {
205
-
206 273
         if let Some(dir) = path.as_ref().parent() {
207 274
             ::std::fs::create_dir_all(dir)?;
208 275
         }
209 276
 
210
-        //TODO remove
211
-        println!("write file {:?}", path.as_ref());
212
-
213 277
         let mut file = File::create(path)?;
214 278
         file.write_all(img_data)
215 279
     }
216 280
 }
217 281
 
282
+#[derive(Debug)]
283
+struct TileRequest {
284
+    pub tile: Tile,
285
+    pub url: String,
286
+    pub path: PathBuf,
287
+    pub write_to_file: bool,
288
+}
289
+
290
+#[derive(Debug)]
218 291
 enum LoaderMessage {
219
-    GetTile{tile: Tile, url: String, path: PathBuf, write_to_file: bool},
220
-    SetViewLocation{view: View},
292
+    GetTile(TileRequest),
293
+    SetView(View),
294
+}
295
+
296
+#[derive(Debug)]
297
+enum RemoteLoaderMessage {
298
+    PopQueue,
299
+    Terminate,
221 300
 }
222 301
 
223 302
 fn compare_tiles(a: Tile, b: Tile, view: View) -> Ordering {