use coord::{TileCoord, View}; use image::DynamicImage; use image; use reqwest::Client; use std::cmp::Ordering; use std::cmp; use std::collections::hash_set::HashSet; use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::mpsc::TryRecvError; use std::sync::{Arc, mpsc, Mutex}; use std::thread; use tile::Tile; use tile_source::TileSource; //TODO remember failed loading attempts #[derive(Debug)] pub struct TileLoader { client: Option, join_handle: thread::JoinHandle<()>, request_tx: mpsc::Sender, result_rx: mpsc::Receiver<(Tile, Option)>, pending: HashSet, use_network: bool, } impl TileLoader { pub fn new(notice_func: F, use_network: bool) -> Self where F: Fn(Tile) + Sync + Send + 'static, { let (request_tx, request_rx) = mpsc::channel(); let (result_tx, result_rx) = mpsc::channel(); TileLoader { client: None, join_handle: thread::spawn(move || Self::work(&request_rx, &result_tx, notice_func, use_network)), request_tx, result_rx, pending: HashSet::new(), use_network, } } fn work( request_rx: &mpsc::Receiver, result_tx: &mpsc::Sender<(Tile, Option)>, notice_func: F, use_network: bool, ) where F: Fn(Tile) + Sync + Send + 'static, { let mut queue: Vec = vec![]; let remote_queue: Arc>> = Arc::new(Mutex::new(vec![])); let mut view_opt: Option = None; let arc_notice_func = Arc::new(notice_func); let (remote_request_tx, remote_request_rx) = mpsc::channel(); { let arc_request_rx = Arc::new(Mutex::new(remote_request_rx)); for id in 0..2 { let remote_queue = Arc::clone(&remote_queue); let arc_request_rx = Arc::clone(&arc_request_rx); let result_tx = result_tx.clone(); let arc_notice_func = Arc::clone(&arc_notice_func); thread::spawn(move || Self::work_remote(id, &remote_queue, &arc_request_rx, &result_tx, &arc_notice_func)); } } 'outer: while let Ok(message) = request_rx.recv() { let mut need_to_sort = true; match message { LoaderMessage::SetView(view) => { view_opt = Some(view); }, LoaderMessage::GetTile(request) => { queue.push(request); } } loop { loop { let message = request_rx.try_recv(); match message { Ok(LoaderMessage::SetView(view)) => { view_opt = Some(view); need_to_sort = true; }, Ok(LoaderMessage::GetTile(request)) => { queue.push(request); need_to_sort = true; }, Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => break 'outer, } } if need_to_sort { if let Some(view) = view_opt { need_to_sort = false; queue.as_mut_slice().sort_by(|a, b| { compare_tiles(a.tile, b.tile, view) }); if let Ok(mut remote_queue) = remote_queue.lock() { remote_queue.as_mut_slice().sort_by(|a, b| { compare_tiles(a.tile, b.tile, view) }); } } } match queue.pop() { None => break, Some(request) => { match image::open(&request.path) { Ok(img) => { if result_tx.send((request.tile, Some(img))).is_err() { break 'outer; } arc_notice_func(request.tile); continue; }, Err(_) => { if use_network { if let Ok(mut remote_queue) = remote_queue.lock() { //TODO restrict size of remote_queue remote_queue.push(request); if let Some(view) = view_opt { remote_queue.as_mut_slice().sort_by(|a, b| { compare_tiles(a.tile, b.tile, view) }); } if let Err(e) = remote_request_tx.send(RemoteLoaderMessage::PopQueue) { //TODO what now? restart worker? error!("remote worker terminated, {}", e); } } } else if result_tx.send((request.tile, None)).is_err() { break 'outer; } }, } }, } } } } fn work_remote( thread_id: u32, queue: &Arc>>, request_rx: &Arc>>, result_tx: &mpsc::Sender<(Tile, Option)>, notice_func: &Arc, ) where F: Fn(Tile) + Sync + Send + 'static, { let mut client_opt = None; loop { let message = request_rx.lock().ok().and_then(|r| r.recv().ok()); match message { None => break, Some(RemoteLoaderMessage::PopQueue) => { let ele: Option = queue.lock().ok().and_then(|mut q| q.pop()); if let Some(request) = ele { if client_opt.is_none() { client_opt = Client::builder().build().ok(); } info!("thread {}, download {:?}", thread_id, request.url); if let Some(Ok(mut response)) = client_opt.as_ref().map(|c| c.get(&request.url).send()) { let mut buf: Vec = vec![]; if response.copy_to(&mut buf).is_ok() { if let Ok(img) = image::load_from_memory(&buf) { // successfully loaded tile if result_tx.send((request.tile, Some(img))).is_err() { break; } notice_func(request.tile); if request.write_to_file { if let Err(e) = Self::write_to_file(&request.path, &buf) { warn!("could not write file {}, {}", request.path.display(), e); } } continue; } } } // failed not load tile info!("thread {}, fail {:?}", thread_id, request.url); if result_tx.send((request.tile, None)).is_err() { break; } } }, } } } pub fn async_request(&mut self, tile_coord: TileCoord, source: &TileSource, write_to_file: bool) { if tile_coord.zoom > source.max_tile_zoom() || tile_coord.zoom < source.min_tile_zoom() { return; } let tile = Tile::new(tile_coord, source.id()); if !self.pending.contains(&tile) { if let Some(url) = source.remote_tile_url(tile_coord) { if self.request_tx.send(LoaderMessage::GetTile( TileRequest { tile, url, path: source.local_tile_path(tile_coord), write_to_file, } )).is_ok() { self.pending.insert(tile); } } } } pub fn async_result(&mut self) -> Option<(Tile, DynamicImage)> { match self.result_rx.try_recv() { Err(_) => None, Ok((tile, None)) => { self.pending.remove(&tile); debug!("async_result none, pending.len: {}, {:?}", self.pending.len(), tile); None }, Ok((tile, Some(img))) => { self.pending.remove(&tile); debug!("async_result some, pending.len: {}, {:?}", self.pending.len(), tile); Some((tile, img)) }, } } pub fn get_sync(&mut self, tile: TileCoord, source: &TileSource, write_to_file: bool) -> Option { if tile.zoom > source.max_tile_zoom() || tile.zoom < source.min_tile_zoom() { return None; } match image::open(source.local_tile_path(tile)) { Ok(img) => { debug!("sync ok from path {:?}", tile); Some(img) }, Err(_) => { if self.use_network { //TODO do not try to create a client every time when it failed before if self.client.is_none() { self.client = Client::builder().build().ok(); } if let (Some(client), Some(url)) = (self.client.as_ref(), source.remote_tile_url(tile)) { if let Ok(mut response) = client.get(&url).send() { let mut buf: Vec = vec![]; if response.copy_to(&mut buf).is_ok() { if let Ok(img) = image::load_from_memory(&buf) { if write_to_file { let path = source.local_tile_path(tile); if let Err(e) = Self::write_to_file(&path, &buf) { warn!("could not write file {}, {}", &path.display(), e); } } debug!("sync ok from network {:?}", tile); return Some(img); } } } } debug!("sync fail from network {:?}", tile); None } else { debug!("sync fail from path {:?}", tile); None } }, } } pub fn set_view_location(&mut self, view: View) { let _ = self.request_tx.send(LoaderMessage::SetView(view)); } fn write_to_file>(path: P, img_data: &[u8]) -> ::std::io::Result<()> { if let Some(dir) = path.as_ref().parent() { ::std::fs::create_dir_all(dir)?; } let mut file = File::create(path)?; file.write_all(img_data) } } #[derive(Debug)] struct TileRequest { pub tile: Tile, pub url: String, pub path: PathBuf, pub write_to_file: bool, } #[derive(Debug)] enum LoaderMessage { GetTile(TileRequest), SetView(View), } #[derive(Debug)] enum RemoteLoaderMessage { PopQueue, } fn compare_tiles(a: Tile, b: Tile, view: View) -> Ordering { let source_a = view.source_id == a.source_id; let source_b = view.source_id == b.source_id; match (source_a, source_b) { (true, false) => Ordering::Greater, (false, true) => Ordering::Less, _ => { let zoom_diff_a = cmp::max(a.coord.zoom, view.zoom) - cmp::min(a.coord.zoom, view.zoom); let zoom_diff_b = cmp::max(b.coord.zoom, view.zoom) - cmp::min(b.coord.zoom, view.zoom); if zoom_diff_a < zoom_diff_b { Ordering::Greater } else if zoom_diff_a > zoom_diff_b { Ordering::Less } else { let map_a = a.coord.map_coord_center(); let map_b = b.coord.map_coord_center(); let diff_xa = (view.center.x - map_a.x).abs(); let diff_xa = if diff_xa > 0.5 { 1.0 - diff_xa } else { diff_xa }; let diff_xb = (view.center.x - map_b.x).abs(); let diff_xb = if diff_xb > 0.5 { 1.0 - diff_xb } else { diff_xb }; let diff_ya = view.center.y - map_a.y; let diff_yb = view.center.y - map_b.y; let center_diff_a = (diff_xa * diff_xa) + (diff_ya * diff_ya); let center_diff_b = (diff_xb * diff_xb) + (diff_yb * diff_yb); center_diff_b.partial_cmp(¢er_diff_a).unwrap_or(Ordering::Equal) } }, } }