A simple map viewer

tile_loader.rs 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. use coord::{TileCoord, View};
  2. use image::DynamicImage;
  3. use image;
  4. use reqwest::Client;
  5. use std::cmp::Ordering;
  6. use std::cmp;
  7. use std::collections::hash_set::HashSet;
  8. use std::fs::File;
  9. use std::io::Write;
  10. use std::path::{Path, PathBuf};
  11. use std::sync::{Arc, mpsc, Mutex};
  12. use std::sync::mpsc::TryRecvError;
  13. use std::thread;
  14. use tile::Tile;
  15. use tile_source::TileSource;
  16. //TODO remember failed loading attempts
  17. #[derive(Debug)]
  18. pub struct TileLoader {
  19. client: Option<Client>,
  20. join_handle: thread::JoinHandle<()>,
  21. request_tx: mpsc::Sender<LoaderMessage>,
  22. result_rx: mpsc::Receiver<(Tile, Option<DynamicImage>)>,
  23. pending: HashSet<Tile>,
  24. }
  25. impl TileLoader {
  26. pub fn new<F>(notice_func: F) -> Self
  27. where F: Fn(Tile) + Sync + Send + 'static,
  28. {
  29. let (request_tx, request_rx) = mpsc::channel();
  30. let (result_tx, result_rx) = mpsc::channel();
  31. TileLoader {
  32. client: None,
  33. join_handle: thread::spawn(move || Self::work(request_rx, result_tx, notice_func)),
  34. request_tx: request_tx,
  35. result_rx: result_rx,
  36. pending: HashSet::new(),
  37. }
  38. }
  39. fn work<F>(
  40. request_rx: mpsc::Receiver<LoaderMessage>,
  41. result_tx: mpsc::Sender<(Tile, Option<DynamicImage>)>,
  42. notice_func: F,
  43. )
  44. where F: Fn(Tile) + Sync + Send + 'static,
  45. {
  46. let mut queue: Vec<TileRequest> = vec![];
  47. let remote_queue: Arc<Mutex<Vec<TileRequest>>> = Arc::new(Mutex::new(vec![]));
  48. let mut view_opt: Option<View> = None;
  49. let arc_notice_func = Arc::new(notice_func);
  50. let (remote_request_tx, remote_request_rx) = mpsc::channel();
  51. {
  52. let arc_request_rx = Arc::new(Mutex::new(remote_request_rx));
  53. for id in 0..2 {
  54. let remote_queue = Arc::clone(&remote_queue);
  55. let arc_request_rx = Arc::clone(&arc_request_rx);
  56. let result_tx = result_tx.clone();
  57. let arc_notice_func = Arc::clone(&arc_notice_func);
  58. thread::spawn(move || Self::work_remote(id, &remote_queue, &arc_request_rx, &result_tx, &arc_notice_func));
  59. }
  60. }
  61. 'outer: while let Ok(message) = request_rx.recv() {
  62. let mut need_to_sort = true;
  63. match message {
  64. LoaderMessage::SetView(view) => {
  65. view_opt = Some(view);
  66. },
  67. LoaderMessage::GetTile(request) => {
  68. queue.push(request);
  69. }
  70. }
  71. loop {
  72. loop {
  73. let message = request_rx.try_recv();
  74. match message {
  75. Ok(LoaderMessage::SetView(view)) => {
  76. view_opt = Some(view);
  77. need_to_sort = true;
  78. },
  79. Ok(LoaderMessage::GetTile(request)) => {
  80. queue.push(request);
  81. need_to_sort = true;
  82. },
  83. Err(TryRecvError::Empty) => break,
  84. Err(TryRecvError::Disconnected) => break 'outer,
  85. }
  86. }
  87. if need_to_sort {
  88. if let Some(view) = view_opt {
  89. need_to_sort = false;
  90. queue.as_mut_slice().sort_by(|a, b| {
  91. compare_tiles(a.tile, b.tile, view)
  92. });
  93. if let Ok(mut remote_queue) = remote_queue.lock() {
  94. remote_queue.as_mut_slice().sort_by(|a, b| {
  95. compare_tiles(a.tile, b.tile, view)
  96. });
  97. }
  98. }
  99. }
  100. match queue.pop() {
  101. None => break,
  102. Some(request) => {
  103. match image::open(&request.path) {
  104. Ok(img) => {
  105. if let Err(_) = result_tx.send((request.tile, Some(img))) {
  106. break 'outer;
  107. }
  108. arc_notice_func(request.tile);
  109. continue;
  110. },
  111. Err(_) => {
  112. if let Ok(mut remote_queue) = remote_queue.lock() {
  113. //TODO restrict size of remote_queue
  114. remote_queue.push(request);
  115. if let Some(view) = view_opt {
  116. remote_queue.as_mut_slice().sort_by(|a, b| {
  117. compare_tiles(a.tile, b.tile, view)
  118. });
  119. }
  120. if remote_request_tx.send(RemoteLoaderMessage::PopQueue).is_err() {
  121. //TODO remote worker terminated
  122. }
  123. }
  124. },
  125. }
  126. },
  127. }
  128. }
  129. }
  130. }
  131. fn work_remote<F>(
  132. thread_id: u32,
  133. queue: &Arc<Mutex<Vec<TileRequest>>>,
  134. request_rx: &Arc<Mutex<mpsc::Receiver<RemoteLoaderMessage>>>,
  135. result_tx: &mpsc::Sender<(Tile, Option<DynamicImage>)>,
  136. notice_func: &Arc<F>,
  137. )
  138. where F: Fn(Tile) + Sync + Send + 'static,
  139. {
  140. let mut client_opt = None;
  141. loop {
  142. let message = request_rx.lock().ok().and_then(|r| r.recv().ok());
  143. match message {
  144. None | Some(RemoteLoaderMessage::Terminate) => break,
  145. Some(RemoteLoaderMessage::PopQueue) => {
  146. let ele: Option<TileRequest> = queue.lock().ok().and_then(|mut q| q.pop());
  147. if let Some(request) = ele {
  148. if client_opt.is_none() {
  149. client_opt = Client::builder().build().ok();
  150. }
  151. if let Some(Ok(mut response)) = client_opt.as_ref().map(|c| c.get(&request.url).send()) {
  152. let mut buf: Vec<u8> = vec![];
  153. if response.copy_to(&mut buf).is_ok() {
  154. if let Ok(img) = image::load_from_memory(&buf) {
  155. // successfully loaded tile
  156. if result_tx.send((request.tile, Some(img))).is_err() {
  157. break;
  158. }
  159. notice_func(request.tile);
  160. if request.write_to_file {
  161. //TODO do something on write errors
  162. let _ = Self::write_to_file(&request.path, &buf);
  163. }
  164. continue;
  165. }
  166. }
  167. }
  168. // failed not load tile
  169. if result_tx.send((request.tile, None)).is_err() {
  170. break;
  171. }
  172. }
  173. },
  174. }
  175. }
  176. }
  177. pub fn async_request(&mut self, tile_coord: TileCoord, source: &TileSource, write_to_file: bool) {
  178. if tile_coord.zoom > source.max_tile_zoom() {
  179. return;
  180. }
  181. let tile = Tile::new(tile_coord, source.id());
  182. if !self.pending.contains(&tile) {
  183. if self.request_tx.send(LoaderMessage::GetTile(
  184. TileRequest {
  185. tile: tile,
  186. url: source.remote_tile_url(tile_coord),
  187. path: source.local_tile_path(tile_coord),
  188. write_to_file: write_to_file,
  189. }
  190. )).is_ok()
  191. {
  192. self.pending.insert(tile);
  193. }
  194. }
  195. }
  196. pub fn async_result(&mut self) -> Option<(Tile, DynamicImage)> {
  197. match self.result_rx.try_recv() {
  198. Err(_) => None,
  199. Ok((tile, None)) => {
  200. self.pending.remove(&tile);
  201. None
  202. },
  203. Ok((tile, Some(img))) => {
  204. self.pending.remove(&tile);
  205. Some((tile, img))
  206. },
  207. }
  208. }
  209. pub fn get_sync(&mut self, tile: TileCoord, source: &TileSource, write_to_file: bool) -> Option<DynamicImage> {
  210. match image::open(source.local_tile_path(tile)) {
  211. Ok(img) => {
  212. Some(img)
  213. },
  214. Err(_) => {
  215. //TODO do not try to create a client every time when it failed before
  216. if self.client.is_none() {
  217. self.client = Client::builder().build().ok();
  218. }
  219. if let Some(ref client) = self.client {
  220. if let Ok(mut response) = client.get(&source.remote_tile_url(tile)).send() {
  221. let mut buf: Vec<u8> = vec![];
  222. if response.copy_to(&mut buf).is_ok() {
  223. if let Ok(img) = image::load_from_memory(&buf) {
  224. if write_to_file {
  225. let path = source.local_tile_path(tile);
  226. let _ = Self::write_to_file(path, &buf);
  227. }
  228. return Some(img);
  229. }
  230. }
  231. }
  232. }
  233. None
  234. },
  235. }
  236. }
  237. pub fn set_view_location(&mut self, view: View) {
  238. let _ = self.request_tx.send(LoaderMessage::SetView(view));
  239. }
  240. fn write_to_file<P: AsRef<Path>>(path: P, img_data: &[u8]) -> ::std::io::Result<()> {
  241. if let Some(dir) = path.as_ref().parent() {
  242. ::std::fs::create_dir_all(dir)?;
  243. }
  244. let mut file = File::create(path)?;
  245. file.write_all(img_data)
  246. }
  247. }
  248. #[derive(Debug)]
  249. struct TileRequest {
  250. pub tile: Tile,
  251. pub url: String,
  252. pub path: PathBuf,
  253. pub write_to_file: bool,
  254. }
  255. #[derive(Debug)]
  256. enum LoaderMessage {
  257. GetTile(TileRequest),
  258. SetView(View),
  259. }
  260. #[derive(Debug)]
  261. enum RemoteLoaderMessage {
  262. PopQueue,
  263. Terminate,
  264. }
  265. fn compare_tiles(a: Tile, b: Tile, view: View) -> Ordering {
  266. let source_a = view.source_id == a.source_id;
  267. let source_b = view.source_id == b.source_id;
  268. match (source_a, source_b) {
  269. (true, false) => Ordering::Greater,
  270. (false, true) => Ordering::Less,
  271. _ => {
  272. let zoom_diff_a = cmp::max(a.coord.zoom, view.zoom) - cmp::min(a.coord.zoom, view.zoom);
  273. let zoom_diff_b = cmp::max(b.coord.zoom, view.zoom) - cmp::min(b.coord.zoom, view.zoom);
  274. if zoom_diff_a < zoom_diff_b {
  275. Ordering::Greater
  276. } else if zoom_diff_a > zoom_diff_b {
  277. Ordering::Less
  278. } else {
  279. let map_a = a.coord.map_coord_center();
  280. let map_b = b.coord.map_coord_center();
  281. let center_diff_a = (view.center.x - map_a.x).hypot(view.center.y - map_a.y);
  282. let center_diff_b = (view.center.x - map_b.x).hypot(view.center.y - map_b.y);
  283. center_diff_b.partial_cmp(&center_diff_a).unwrap_or(Ordering::Equal)
  284. }
  285. },
  286. }
  287. }