logs & multiple awaitable tasks for downloading (test)

This commit is contained in:
OMGeeky
2023-04-05 18:56:22 +02:00
parent 1e0e6f48ac
commit d86540ab1e
3 changed files with 86 additions and 41 deletions

View File

@@ -446,24 +446,19 @@ impl<'a> TwitchClient<'a> {
//combine parts
let mut files1: Vec<PathBuf> = vec![];
for file in files {
let v = file
.ok_or(DownloadError::new("Error while downloading all parts"))?
.canonicalize()?;
// println!("file: {:?}", v);
files1.push(v);
for file in files.iter_mut() {
let file = file;
if file.is_some() {
let x = file.as_mut().unwrap();
let y = x.canonicalize()?;
*x = y;
} else {
return Err(DownloadError::new("Error while downloading all parts").into());
}
}
let mut files = files1;
//
// let mut files: Vec<PathBuf> = files
// .iter_mut()
// .map(|f| {
// let x = f.as_mut()?.clone();
// x
// })
// .collect();
let mut files: Vec<PathBuf> = files.into_iter().map(|f| f.unwrap()).collect();
files.sort_by_key(|f| {
let number = f
.file_name()
@@ -542,34 +537,69 @@ impl<'a> TwitchClient<'a> {
url: &String,
folder_path: &PathBuf,
) -> Result<Vec<Option<PathBuf>>> {
let mut amount_of_threads = 10;
trace!("downloading all parts of video: {}", url);
let base_url = get_base_url(&url);
println!("getting parts");
info!("getting parts");
let (age, parts) = self.get_parts(&url).await?;
let try_unmute = age < 24;
println!("...Done");
info!("getting parts ...Done");
let amount_of_parts = parts.len();
info!("part count: {}", amount_of_parts);
if amount_of_parts < 1 {
return Err(DownloadError::new("No parts found").into());
}
//download parts
std::fs::create_dir_all(&folder_path)?;
info!("downloading parts");
// let runtime: tokio::runtime::Runtime = Builder::new_multi_thread()
// .worker_threads(4)
// .enable_all()
// .thread_name("downloader-worker-thread")
// .build()
// .unwrap();
info!("part count: {}", parts.len());
let mut files = vec![];
for part in parts {
// println!("downloading part: {} : {}", part.0, part.1);
let downloader =
download_part(part, base_url.clone(), folder_path.clone(), try_unmute).await;
files.push(downloader);
if amount_of_threads < 1 {
amount_of_threads = 1
} else if amount_of_threads > amount_of_parts {
amount_of_threads = amount_of_parts;
}
let iterations_needed = amount_of_parts / amount_of_threads;
let mut parts_iterator = parts.into_iter();
for _ in 0..iterations_needed {
let mut downloader_futures = vec![];
for _ in 0..amount_of_threads {
if let Some(part) = parts_iterator.next() {
let folder_path = folder_path.clone();
let url = base_url.clone();
let downloader = tokio::spawn(async move {
download_part(part, url, folder_path, try_unmute).await
});
downloader_futures.push(downloader);
}
}
// let mut result: Vec<Option<PathBuf>> = tokio::join!(downloader_futures);
let result: Vec<std::result::Result<Option<PathBuf>, tokio::task::JoinError>> =
join_all(downloader_futures).await;
for downloader in result.into_iter() {
let downloader = downloader?;
files.push(downloader);
}
}
//
//
// for part in parts {
// // info!("downloading part: {} : {}", part.0, part.1);
// let downloader =
// download_part(part, base_url.clone(), folder_path.clone(), try_unmute).await;
// files.push(downloader);
// }
// TODO: make this work in multiple threads (maybe about 10?)
// let mut downloaders = vec![];
// for part in parts {
// // println!("downloading part: {} : {}", part.0, part.1);
// // info!("downloading part: {} : {}", part.0, part.1);
// // let downloader = Self::download_part(part, &base_url, &folder_path, try_unmute);
// // TODO: make this not take up all the resources of the server/have a specific amount of threads
// let downloader = tokio::spawn!(download_part(
@@ -589,7 +619,7 @@ impl<'a> TwitchClient<'a> {
// files1.push(v);
// }
// let files = files1;
info!("...Done");
info!("downloaded all parts of video: ");
Ok(files)
}
@@ -599,7 +629,7 @@ impl<'a> TwitchClient<'a> {
let response = check_backoff_twitch_get_with_client(url, &self.reqwest_client).await?;
let video_chunks = response.text().await?;
trace!("got parts: {}", video_chunks.len());
// println!("video_chunks: \n\n{}\n", video_chunks);
// info!("video_chunks: \n\n{}\n", video_chunks);
let lines = video_chunks.lines().collect::<Vec<&str>>();
let mut age = 25;
@@ -618,7 +648,7 @@ impl<'a> TwitchClient<'a> {
let mut parts = HashMap::new();
for i in 0..lines.len() {
// println!("line: {}", i);
// info!("line: {}", i);
let l0 = lines[i];
if !l0.contains("#EXTINF") {
continue;
@@ -628,7 +658,7 @@ impl<'a> TwitchClient<'a> {
if !l1.contains("#EXT-X-BYTERANGE") {
let v = l0[8..].strip_suffix(",").unwrap().parse::<f32>().unwrap();
parts.insert(l1.to_string(), v);
// println!("no byterange found: {i}");
// info!("no byterange found: {i}");
continue;
}
@@ -639,7 +669,7 @@ impl<'a> TwitchClient<'a> {
let v = l0[8..].strip_suffix(",").unwrap().parse::<f32>().unwrap();
parts.insert(l2.to_string(), v);
}
println!(
info!(
"i: {}; videoChunks[i + 2]: {}; videoChunks[i]: {}",
i, l2, l0
)
@@ -675,7 +705,7 @@ async fn download_part(
async fn try_download(main_path: &PathBuf, part: &String, part_url: &String) -> Option<PathBuf> {
trace!("trying to download part: {}", part_url);
// println!("Downloading part: {}", part_url);
// info!("Downloading part: {}", part_url);
let path = Path::join(main_path, part);
let mut res = check_backoff_twitch_get(part_url).await.ok()?;
@@ -717,11 +747,12 @@ fn get_base_url(url: &str) -> String {
fn convert_twitch_timestamp(time: Timestamp) -> DateTime<Utc> {
let time1: String = time.take();
trace!("convert_twitch_timestamp: {}", time1);
convert_twitch_time(&time1)
}
fn convert_twitch_duration(duration: &str) -> chrono::Duration {
// println!("duration: {}", duration);
trace!("convert_twitch_duration: {}", duration);
// todo!("Parse duration that comes in like '2h49m47s'");
let duration = duration
@@ -779,6 +810,11 @@ fn convert_twitch_time(time: &str) -> DateTime<Utc> {
}
fn convert_twitch_time_info(res: String, fmt: &str) -> DateTime<Utc> {
trace!(
"convert_twitch_time: time: '{}' with format: '{}'",
res,
fmt
);
let mut res = res;
if res.ends_with("Z") {
res = format!("{}+00:00", res.strip_suffix("Z").unwrap());

View File

@@ -1,3 +1,4 @@
use log::info;
use std::error::Error;
use std::path::Path;
@@ -7,6 +8,13 @@ use twitch_data::get_client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
simplelog::Config::default(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
)
.expect("Failed to initialize logger");
// async fn main() -> Result<(), Box<dyn Error>> {
println!("Starting!");
sample().await?;
@@ -20,7 +28,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Done! 4");
// get_video_playlist("1677206253").await?;
println!("Done! 5");
download_video("1677206253").await?;
download_video("1768835851").await?;
println!("Done! 6");
println!("Done! 7");
println!("\n\nDone!");
@@ -38,6 +46,6 @@ async fn download_video(video_id: &str) -> Result<(), Box<dyn Error>> {
async fn sample() -> Result<(), Box<dyn Error>> {
let client = get_client().await?;
let title = client.get_channel_title_from_login("bananabrea").await?;
println!("Title: {}", title);
info!("Title: {}", title);
Ok(())
}