add progress indicator for downloading video parts

This commit is contained in:
OMGeeky
2023-07-15 13:48:28 +02:00
parent 720cd60a00
commit c54b0f50d9
2 changed files with 66 additions and 16 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "twitch_data"
version = "0.2.4"
version = "0.2.5"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -1,6 +1,13 @@
#[allow(unused, dead_code)]
//^^ hides some warnings while developing TODO: remove at release
use crate::prelude::*;
use std::{
collections::HashMap,
error::Error as StdError,
fmt::Debug,
path::{Path, PathBuf},
result::Result as StdResult,
sync::atomic::{AtomicUsize, Ordering},
sync::Arc,
};
use chrono::{DateTime, Utc};
use downloader_config::load_config;
use exponential_backoff::twitch::{
@@ -9,24 +16,23 @@ use exponential_backoff::twitch::{
};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::error::Error as StdError;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::result::Result as StdResult;
use thiserror::Error;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use twitch_api::helix::channels::ChannelInformation;
use twitch_api::helix::videos::Video as TwitchVideo;
use twitch_api::types::{Timestamp, VideoPrivacy};
use twitch_oauth2::{ClientId, ClientSecret};
pub use twitch_types::{UserId, VideoId};
#[allow(unused, dead_code)]
//^^ hides some warnings while developing TODO: remove at release
use crate::prelude::*;
pub mod prelude;
//region DownloadError
@@ -491,7 +497,7 @@ impl<'a> TwitchClient<'a> {
) -> Result<Vec<Option<PathBuf>>> {
trace!("downloading all parts of video: {}", url);
let config = load_config();
let mut amount_of_threads: u64 = config.twitch_downloader_thread_count;
let mut amount_of_threads = config.twitch_downloader_thread_count as usize;
let base_url = get_base_url(&url);
info!("getting parts");
let (age, parts) = self.get_parts(&url).await?;
@@ -499,14 +505,14 @@ impl<'a> TwitchClient<'a> {
info!("getting parts ...Done");
let amount_of_parts = parts.len();
let amount_of_parts = amount_of_parts as u64;
let amount_of_parts = amount_of_parts;
info!("part count: {}", amount_of_parts);
if amount_of_parts < 1 {
return Err(Box::new(DownloadError::NoParts));
}
//download parts
std::fs::create_dir_all(&folder_path)?;
std::fs::create_dir_all(folder_path)?;
info!("downloading parts");
@@ -515,19 +521,63 @@ impl<'a> TwitchClient<'a> {
} else if amount_of_threads > amount_of_parts {
amount_of_threads = amount_of_parts;
}
let (completed, progress_handle) = Self::create_progress_indicator(
amount_of_parts,
Duration::from_secs(5),
"Downloading Parts",
);
let files = futures::stream::iter(parts.into_iter().map(|part| {
let folder_path = folder_path.clone();
let url = base_url.clone();
download_part(part, url, folder_path, try_unmute)
async {
let result = download_part(part, url, folder_path, try_unmute).await;
completed.fetch_add(1, Ordering::Relaxed);
result
}
}))
.buffer_unordered(amount_of_threads as usize)
.buffer_unordered(amount_of_threads)
.collect::<Vec<Option<PathBuf>>>();
let files = files.await;
// Once we're done downloading, we need to ensure the progress reporter also finishes.
let _ = progress_handle.await;
info!("downloaded all parts of the video");
Ok(files)
}
fn create_progress_indicator(
amount_of_parts: usize,
report_frequency: Duration,
title: impl Into<&str>,
) -> (Arc<AtomicUsize>, JoinHandle<()>) {
let completed = Arc::new(AtomicUsize::new(0));
let progress_handle = {
let completed = Arc::clone(&completed);
tokio::spawn(async move {
while Arc::strong_count(&completed) > 1 {
// Using strong_count to check the arc's reference count to determine when all tasks are done
let current_progress = completed.load(Ordering::Relaxed);
info!(
"{}: {:>6.2}% ({}/{})",
title.into(),
(current_progress as f64 / amount_of_parts as f64) * 100.0,
current_progress,
amount_of_parts
);
tokio::time::sleep(report_frequency).await;
// sleep for a while
}
info!(
"Completed: {}/{}",
completed.load(Ordering::Relaxed),
amount_of_parts
);
})
};
(completed, progress_handle)
}
async fn get_parts(&self, url: &String) -> Result<(u64, HashMap<String, f32>)> {
// let response = self.reqwest_client.get(url).send().await?;
trace!("getting parts from url: {}", url);