diff --git a/Cargo.toml b/Cargo.toml index cbd22a4..5bcbcff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "twitch_data" -version = "0.2.1" +version = "0.2.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/lib.rs b/src/lib.rs index d3354d9..aba7df4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ use exponential_backoff::twitch::{ check_backoff_twitch_with_client, }; use futures::future::join_all; -use crate::prelude::*; +use futures::StreamExt; use reqwest; use serde::{Deserialize, Serialize}; use tokio::fs::File; @@ -25,6 +25,9 @@ use twitch_api::helix::videos::Video as TwitchVideo; use twitch_api::types::{Timestamp, VideoPrivacy}; use twitch_oauth2::{ClientId, ClientSecret}; pub use twitch_types::{UserId, VideoId}; + +use crate::prelude::*; + pub mod prelude; //region DownloadError #[derive(Debug, Clone)] @@ -564,40 +567,20 @@ impl<'a> TwitchClient<'a> { info!("downloading parts"); - let mut files = vec![]; - if amount_of_threads < 1 { amount_of_threads = 1 } else if amount_of_threads > amount_of_parts { amount_of_threads = amount_of_parts; } - let iterations_needed = amount_of_parts / amount_of_threads; - - let mut parts_iterator = parts.into_iter(); - for _ in 0..iterations_needed { - let mut downloader_futures = vec![]; - for _ in 0..amount_of_threads { - if let Some(part) = parts_iterator.next() { - let folder_path = folder_path.clone(); - let url = base_url.clone(); - let downloader = tokio::spawn(async move { - download_part(part, url, folder_path, try_unmute).await - }); - - downloader_futures.push(downloader); - } - } - // let mut result: Vec> = tokio::join!(downloader_futures); - let result: Vec, tokio::task::JoinError>> = - join_all(downloader_futures).await; - - for downloader in result.into_iter() { - let downloader = downloader?; - files.push(downloader); - } - } - + let files = futures::stream::iter(parts.into_iter().map(|part| { + let folder_path = folder_path.clone(); + let url = base_url.clone(); + download_part(part, url, folder_path, try_unmute) + })) + .buffer_unordered(amount_of_threads as usize) + .collect::>>(); + let files = files.await; info!("downloaded all parts of the video"); Ok(files) }