logs & single thread download

This commit is contained in:
OMGeeky
2023-04-04 22:29:24 +02:00
parent 4169eec22b
commit 769404ce78
2 changed files with 57 additions and 60 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "twitch_data"
version = "0.1.2"
version = "0.1.3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@@ -1,10 +1,9 @@
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::fmt::{Debug, Display, Formatter, Pointer};
use std::fmt::{Debug, Display, Formatter};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::Duration;
use chrono::{DateTime, Utc};
use downloader_config::load_config;
@@ -13,13 +12,12 @@ use exponential_backoff::twitch::{
check_backoff_twitch_with_client,
};
use futures::future::join_all;
use log::{debug, error, info, trace, warn};
use log::{debug, info, trace};
use reqwest;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::runtime::Builder;
use tokio::time::Instant;
use twitch_api;
use twitch_api::helix::channels::ChannelInformation;
@@ -58,7 +56,7 @@ pub struct Video {
pub title: String,
pub description: String,
pub user_login: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub created_at: DateTime<Utc>,
pub url: String,
pub viewable: String,
pub language: String,
@@ -148,10 +146,7 @@ impl<'a> TwitchClient<'a> {
//region channels
pub async fn get_channel_id_from_login(
&self,
channel_login: &str,
) -> Result<twitch_types::UserId> {
pub async fn get_channel_id_from_login(&self, channel_login: &str) -> Result<UserId> {
let info = self.get_channel_info_from_login(channel_login).await?;
let info = info.unwrap();
@@ -482,8 +477,8 @@ impl<'a> TwitchClient<'a> {
Ok(n) => n,
Err(e) => {
println!(
"potentially catchable error while parsing the file number: {}",
number
"potentially catchable error while parsing the file number: {}\n{}",
number, e
);
if !number.starts_with(&format!("{}v", video_id)) || !number.contains("-") {
panic!("Error while parsing the file number: {}", number)
@@ -554,8 +549,7 @@ impl<'a> TwitchClient<'a> {
//download parts
std::fs::create_dir_all(&folder_path)?;
println!("downloading parts");
let mut downloaders = vec![];
info!("downloading parts");
// let runtime: tokio::runtime::Runtime = Builder::new_multi_thread()
// .worker_threads(4)
@@ -563,37 +557,47 @@ impl<'a> TwitchClient<'a> {
// .thread_name("downloader-worker-thread")
// .build()
// .unwrap();
println!("part count: {}", parts.len());
info!("part count: {}", parts.len());
let mut files = vec![];
for part in parts {
// println!("downloading part: {} : {}", part.0, part.1);
// let downloader = Self::download_part(part, &base_url, &folder_path, try_unmute);
let downloader = tokio::spawn(download_part(
part,
base_url.clone(),
folder_path.clone(),
try_unmute,
));
downloaders.push(downloader);
let downloader =
download_part(part, base_url.clone(), folder_path.clone(), try_unmute).await;
files.push(downloader);
}
let mut files = join_all(downloaders).await;
// runtime.shutdown_timeout(Duration::from_secs(10));
let mut files1 = vec![];
for file in files {
let v = file?;
files1.push(v);
}
let files = files1;
println!("...Done");
// TODO: make this work in multiple threads (maybe about 10?)
// let mut downloaders = vec![];
// for part in parts {
// // println!("downloading part: {} : {}", part.0, part.1);
// // let downloader = Self::download_part(part, &base_url, &folder_path, try_unmute);
// // TODO: make this not take up all the resources of the server/have a specific amount of threads
// let downloader = tokio::spawn!(download_part(
// part,
// base_url.clone(),
// folder_path.clone(),
// try_unmute,
// ));
// downloaders.push(downloader);
// }
//
// let mut files = join_all(downloaders).await;
// // runtime.shutdown_timeout(Duration::from_secs(10));
// let mut files1 = vec![];
// for file in files {
// let v = file?;
// files1.push(v);
// }
// let files = files1;
info!("...Done");
Ok(files)
}
async fn get_parts(&self, url: &String) -> Result<(u64, HashMap<String, f32>)> {
// let response = self.reqwest_client.get(url).send().await?;
// println!("getting parts from url: {}", url);
trace!("getting parts from url: {}", url);
let response = check_backoff_twitch_get_with_client(url, &self.reqwest_client).await?;
let video_chunks = response.text().await?;
// println!("got parts:\n{}", video_chunks);
trace!("got parts: {}", video_chunks.len());
// println!("video_chunks: \n\n{}\n", video_chunks);
let lines = video_chunks.lines().collect::<Vec<&str>>();
@@ -604,7 +608,7 @@ impl<'a> TwitchClient<'a> {
}
let time = line.split("ID3-EQUIV-TDTG:").collect::<Vec<&str>>()[1];
let time = convert_twitch_time(time);
let now = chrono::Utc::now();
let now = Utc::now();
let diff = now - time;
age = diff.num_seconds() as u64 / 3600;
break;
@@ -649,21 +653,27 @@ async fn download_part(
main_path: PathBuf,
try_unmute: bool,
) -> Option<PathBuf> {
trace!("downloading part: {:?}", part);
let (part, _duration) = part;
let part_url = format!("{}{}", url, part);
let part_url_unmuted = format!("{}{}", url, part.replace("-muted", ""));
if !try_unmute {
trace!("not to unmute part: {}", part_url);
return try_download(&main_path, &part, &part_url).await;
}
trace!("trying to download unmuted part: {}", part_url_unmuted);
match try_download(&main_path, &part, &part_url_unmuted).await {
Some(path) => Some(path),
None => try_download(&main_path, &part, &part_url).await, //TODO: check if this is the right error for a failed unmute
None => {
trace!("failed to download unmuted part. downloading muted part");
try_download(&main_path, &part, &part_url).await //TODO: check if this is the right error for a failed unmute
}
}
}
async fn try_download(main_path: &PathBuf, part: &String, part_url: &String) -> Option<PathBuf> {
trace!("trying to download part: {}", part_url);
// println!("Downloading part: {}", part_url);
let path = Path::join(main_path, part);
@@ -704,7 +714,7 @@ fn get_base_url(url: &str) -> String {
base_url
}
fn convert_twitch_timestamp(time: Timestamp) -> chrono::DateTime<chrono::Utc> {
fn convert_twitch_timestamp(time: Timestamp) -> DateTime<Utc> {
let time1: String = time.take();
convert_twitch_time(&time1)
}
@@ -763,35 +773,22 @@ fn convert_twitch_duration(duration: &str) -> chrono::Duration {
// end_time - start_time
}
fn convert_twitch_time(time: &str) -> chrono::DateTime<chrono::Utc> {
let mut res = time.to_string();
fn convert_twitch_time(time: &str) -> DateTime<Utc> {
return convert_twitch_time_info(time.to_string(), "%Y-%m-%dT%H:%M:%S%z");
}
fn convert_twitch_time_info(res: String, fmt: &str) -> DateTime<Utc> {
let mut res = res;
if res.ends_with("Z") {
res = format!("{}+00:00", res.strip_suffix("Z").unwrap());
} else if !res.contains("+") {
res = format!("{}{}", res, "+00:00");
}
// println!("convert_twitch_time: time: {}", res);
let res = chrono::DateTime::parse_from_str(&res, "%Y-%m-%dT%H:%M:%S%z")
.expect(format!("Failed to parse time: {}", res).as_str());
res.with_timezone(&chrono::Utc)
}
fn convert_twitch_time_info(res: String, fmt: &str) -> DateTime<Utc> {
let mut res = res.to_string();
if res.ends_with("Z") {
res = format!("{}+00:00", res.strip_suffix("Z").unwrap());
}
if !res.ends_with("Z") && !res.contains("+") {
res = format!("{}{}", res, "+00:00");
} else {
res = res.to_string();
}
// println!("time: {}", res);
debug!("convert_twitch_time: time: {} with format: {}", res, fmt);
let res = chrono::DateTime::parse_from_str(&res, fmt)
.expect(format!("Failed to parse time: {}", res).as_str());
res.with_timezone(&chrono::Utc)
res.with_timezone(&Utc)
}
//endregion