From d86540ab1eebc48e2ea32a654b819234265c7780 Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Wed, 5 Apr 2023 18:56:22 +0200 Subject: [PATCH] logs & multiple awaitable tasks for downloading (test) --- Cargo.toml | 3 +- src/lib.rs | 112 ++++++++++++++++++++++++++++++++++------------------ src/main.rs | 12 +++++- 3 files changed, 86 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e3589dc..56cbb77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "twitch_data" -version = "0.1.4" +version = "0.1.5" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -25,3 +25,4 @@ indicatif = "0.17" futures = "0.3" async-recursion = "1.0.0" log = "0.4.17" +simplelog = "0.12.1" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 4a8ea5e..7f75f1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -446,24 +446,19 @@ impl<'a> TwitchClient<'a> { //combine parts - let mut files1: Vec = vec![]; - for file in files { - let v = file - .ok_or(DownloadError::new("Error while downloading all parts"))? - .canonicalize()?; - // println!("file: {:?}", v); - files1.push(v); + for file in files.iter_mut() { + let file = file; + if file.is_some() { + let x = file.as_mut().unwrap(); + let y = x.canonicalize()?; + *x = y; + } else { + return Err(DownloadError::new("Error while downloading all parts").into()); + } } - let mut files = files1; - // - // let mut files: Vec = files - // .iter_mut() - // .map(|f| { - // let x = f.as_mut()?.clone(); - // x - // }) - // .collect(); + let mut files: Vec = files.into_iter().map(|f| f.unwrap()).collect(); + files.sort_by_key(|f| { let number = f .file_name() @@ -542,34 +537,69 @@ impl<'a> TwitchClient<'a> { url: &String, folder_path: &PathBuf, ) -> Result>> { + let mut amount_of_threads = 10; + trace!("downloading all parts of video: {}", url); let base_url = get_base_url(&url); - println!("getting parts"); + info!("getting parts"); let (age, parts) = self.get_parts(&url).await?; let try_unmute = age < 24; - println!("...Done"); + info!("getting parts ...Done"); + + let amount_of_parts = parts.len(); + info!("part count: {}", amount_of_parts); + if amount_of_parts < 1 { + return Err(DownloadError::new("No parts found").into()); + } + //download parts std::fs::create_dir_all(&folder_path)?; info!("downloading parts"); - // let runtime: tokio::runtime::Runtime = Builder::new_multi_thread() - // .worker_threads(4) - // .enable_all() - // .thread_name("downloader-worker-thread") - // .build() - // .unwrap(); - info!("part count: {}", parts.len()); let mut files = vec![]; - for part in parts { - // println!("downloading part: {} : {}", part.0, part.1); - let downloader = - download_part(part, base_url.clone(), folder_path.clone(), try_unmute).await; - files.push(downloader); + + if amount_of_threads < 1 { + amount_of_threads = 1 + } else if amount_of_threads > amount_of_parts { + amount_of_threads = amount_of_parts; } + + let iterations_needed = amount_of_parts / amount_of_threads; + + let mut parts_iterator = parts.into_iter(); + for _ in 0..iterations_needed { + let mut downloader_futures = vec![]; + for _ in 0..amount_of_threads { + if let Some(part) = parts_iterator.next() { + let folder_path = folder_path.clone(); + let url = base_url.clone(); + let downloader = tokio::spawn(async move { + download_part(part, url, folder_path, try_unmute).await + }); + + downloader_futures.push(downloader); + } + } + // let mut result: Vec> = tokio::join!(downloader_futures); + let result: Vec, tokio::task::JoinError>> = + join_all(downloader_futures).await; + for downloader in result.into_iter() { + let downloader = downloader?; + files.push(downloader); + } + } + // + // + // for part in parts { + // // info!("downloading part: {} : {}", part.0, part.1); + // let downloader = + // download_part(part, base_url.clone(), folder_path.clone(), try_unmute).await; + // files.push(downloader); + // } // TODO: make this work in multiple threads (maybe about 10?) // let mut downloaders = vec![]; // for part in parts { - // // println!("downloading part: {} : {}", part.0, part.1); + // // info!("downloading part: {} : {}", part.0, part.1); // // let downloader = Self::download_part(part, &base_url, &folder_path, try_unmute); // // TODO: make this not take up all the resources of the server/have a specific amount of threads // let downloader = tokio::spawn!(download_part( @@ -589,7 +619,7 @@ impl<'a> TwitchClient<'a> { // files1.push(v); // } // let files = files1; - info!("...Done"); + info!("downloaded all parts of video: "); Ok(files) } @@ -599,7 +629,7 @@ impl<'a> TwitchClient<'a> { let response = check_backoff_twitch_get_with_client(url, &self.reqwest_client).await?; let video_chunks = response.text().await?; trace!("got parts: {}", video_chunks.len()); - // println!("video_chunks: \n\n{}\n", video_chunks); + // info!("video_chunks: \n\n{}\n", video_chunks); let lines = video_chunks.lines().collect::>(); let mut age = 25; @@ -618,7 +648,7 @@ impl<'a> TwitchClient<'a> { let mut parts = HashMap::new(); for i in 0..lines.len() { - // println!("line: {}", i); + // info!("line: {}", i); let l0 = lines[i]; if !l0.contains("#EXTINF") { continue; @@ -628,7 +658,7 @@ impl<'a> TwitchClient<'a> { if !l1.contains("#EXT-X-BYTERANGE") { let v = l0[8..].strip_suffix(",").unwrap().parse::().unwrap(); parts.insert(l1.to_string(), v); - // println!("no byterange found: {i}"); + // info!("no byterange found: {i}"); continue; } @@ -639,7 +669,7 @@ impl<'a> TwitchClient<'a> { let v = l0[8..].strip_suffix(",").unwrap().parse::().unwrap(); parts.insert(l2.to_string(), v); } - println!( + info!( "i: {}; videoChunks[i + 2]: {}; videoChunks[i]: {}", i, l2, l0 ) @@ -675,7 +705,7 @@ async fn download_part( async fn try_download(main_path: &PathBuf, part: &String, part_url: &String) -> Option { trace!("trying to download part: {}", part_url); - // println!("Downloading part: {}", part_url); + // info!("Downloading part: {}", part_url); let path = Path::join(main_path, part); let mut res = check_backoff_twitch_get(part_url).await.ok()?; @@ -717,11 +747,12 @@ fn get_base_url(url: &str) -> String { fn convert_twitch_timestamp(time: Timestamp) -> DateTime { let time1: String = time.take(); + trace!("convert_twitch_timestamp: {}", time1); convert_twitch_time(&time1) } fn convert_twitch_duration(duration: &str) -> chrono::Duration { - // println!("duration: {}", duration); + trace!("convert_twitch_duration: {}", duration); // todo!("Parse duration that comes in like '2h49m47s'"); let duration = duration @@ -779,6 +810,11 @@ fn convert_twitch_time(time: &str) -> DateTime { } fn convert_twitch_time_info(res: String, fmt: &str) -> DateTime { + trace!( + "convert_twitch_time: time: '{}' with format: '{}'", + res, + fmt + ); let mut res = res; if res.ends_with("Z") { res = format!("{}+00:00", res.strip_suffix("Z").unwrap()); diff --git a/src/main.rs b/src/main.rs index 281cf0b..afcf626 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use log::info; use std::error::Error; use std::path::Path; @@ -7,6 +8,13 @@ use twitch_data::get_client; #[tokio::main] async fn main() -> Result<(), Box> { + simplelog::TermLogger::init( + simplelog::LevelFilter::Info, + simplelog::Config::default(), + simplelog::TerminalMode::Mixed, + simplelog::ColorChoice::Auto, + ) + .expect("Failed to initialize logger"); // async fn main() -> Result<(), Box> { println!("Starting!"); sample().await?; @@ -20,7 +28,7 @@ async fn main() -> Result<(), Box> { println!("Done! 4"); // get_video_playlist("1677206253").await?; println!("Done! 5"); - download_video("1677206253").await?; + download_video("1768835851").await?; println!("Done! 6"); println!("Done! 7"); println!("\n\nDone!"); @@ -38,6 +46,6 @@ async fn download_video(video_id: &str) -> Result<(), Box> { async fn sample() -> Result<(), Box> { let client = get_client().await?; let title = client.get_channel_title_from_login("bananabrea").await?; - println!("Title: {}", title); + info!("Title: {}", title); Ok(()) }