commit 3c9a111ab0ad16653e9eb519612182ba1f882098 Author: OMGeeky Date: Sat Oct 14 02:04:43 2023 +0200 fully implement downloading files diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aa346cf --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +/.idea +/.cargo diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..01df2e3 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "downloader" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +backup-config = {version = "0.1.1", git = "https://github.com/OMGeeky/backup_config.git" } +local-db = {version = "0.1", git = "https://github.com/OMGeeky/twitch_backup.local_db.git"} +reqwest-backoff = {version = "0.1", git = "https://github.com/OMGeeky/twba_reqwest_backoff.git"} + + +tracing-subscriber = "0.3" +tracing = "0.1" +tokio = { version = "1.33", features = ["rt", "rt-multi-thread", "macros"] } + +thiserror = "1.0" +anyhow = "1.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +reqwest = "0.11" +chrono = "0.4" +futures = "0.3" +futures-util = "0.3" diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..43849bc --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,75 @@ +use reqwest_backoff::ReqwestBackoffError; +use std::path::PathBuf; + +#[derive(Debug, thiserror::Error)] +pub enum DownloadError { + #[error("Malformed playlist")] + MalformedPlaylist(#[from] MalformedPlaylistError), + + #[error("Backoff error")] + Backoff(#[from] ReqwestBackoffError), + #[error("Database Error")] + Database(#[from] local_db::re_exports::sea_orm::errors::DbErr), + + #[error("Reqwest error")] + Reqwest(#[from] reqwest::Error), + + #[error("Could not parse json to access token value and signature")] + AccessTokenJsonParse(#[source] serde_json::Error), + #[error("The server did not provide an access token")] + AccessTokenEmpty, + #[error("Got an error with the Filesystem")] + File(#[from] DownloadFileError), + #[error("Error while loading config")] + LoadConfig(#[source] anyhow::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum MalformedPlaylistError { + #[error("Playlist was empty/did not contain any useful information")] + Empty, + #[error("Playlist did not specify any qualities")] + NoQualities, + + #[error("Could not parse the playlist")] + Parse(#[from] PlaylistParseError), + #[error("Could not parse the url/the url did not contain the expected information")] + InvalidUrl, +} +#[derive(Debug, thiserror::Error)] +pub enum PlaylistParseError { + #[error("Unexpected end of file while parsing playlist")] + Eof, + #[error("Invalid time format in playlist")] + InvalidTimeFormat(#[source] chrono::ParseError), +} +#[derive(Debug, thiserror::Error)] +pub enum DownloadFileError { + #[error("The target folder is not empty {0:?}")] + TargetFolderIsNotEmpty(PathBuf), + #[error("The target folder is not a directory {0:?}")] + TargetFolderIsNotADirectory(PathBuf), + #[error("The target path already exists: {0:?}")] + TargetAlreadyExists(PathBuf), + #[error("Could not create the target folder")] + CouldNotCreateTargetFolder(#[source] std::io::Error), + #[error("Could not create a needed file")] + FileCreation(#[source] std::io::Error), + #[error("Could not read the folder/file")] + Read(#[source] std::io::Error), + #[error("Could not write the folder/file")] + Write(#[source] std::io::Error), + #[error("There was some error during a filesystem operation")] + Filesystem(#[source] tokio::io::Error), + + #[error("The ffmpeg command returned an error")] + Ffmpeg(#[source] tokio::io::Error), + + #[error("could not canonicalize path: {0:?}")] + Canonicalization(#[source] std::io::Error), + + #[error("could not download file: {0:?}")] + DownloadBackoff(#[source] ReqwestBackoffError), + #[error("Got an Error during a reqwest request (download)")] + DownloadReqwest(#[source] reqwest::Error), +} diff --git a/src/prelude.rs b/src/prelude.rs new file mode 100644 index 0000000..d17fa5c --- /dev/null +++ b/src/prelude.rs @@ -0,0 +1,16 @@ +pub use crate::errors::DownloadError; +// pub(crate) use anyhow::Result; +pub(crate) use backup_config::prelude::*; +use std::fmt::Debug; +pub(crate) use tracing::{debug, error, info, trace, warn}; + +pub(crate) use std::result::Result as StdResult; + +/// Just a wrapper around Into that implements Debug. +/// +/// This is just for convenience so we dont need to write +/// '`impl Into + Debug`' everywhere. +pub trait DIntoString: Into + Debug {} +impl DIntoString for T where T: Into + Debug {} + +pub type Result = StdResult; diff --git a/src/twitch/access_token.rs b/src/twitch/access_token.rs new file mode 100644 index 0000000..126c7ae --- /dev/null +++ b/src/twitch/access_token.rs @@ -0,0 +1,18 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct TwitchVideoAccessTokenResponse { + pub data: VideoAccessTokenResponseData, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct VideoAccessTokenResponseData { + #[serde(rename = "videoPlaybackAccessToken")] + pub video_playback_access_token: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct VideoAccessTokenResponseDataAccessToken { + pub value: String, + pub signature: String, +} diff --git a/src/twitch/mod.rs b/src/twitch/mod.rs new file mode 100644 index 0000000..7b8eb7e --- /dev/null +++ b/src/twitch/mod.rs @@ -0,0 +1,555 @@ +use chrono::{NaiveDateTime, Utc}; +use futures_util::{StreamExt, TryStreamExt}; +use reqwest_backoff::ReqwestClient; +use serde_json::json; +use std::collections::HashMap; +use std::fmt::Debug; +use std::path::{Path, PathBuf}; +use tokio::fs; +use tokio::io::AsyncWriteExt; +use tokio::process::Command; +use tokio::time::Instant; +use tracing::instrument; + +use crate::errors::{DownloadError, DownloadFileError, MalformedPlaylistError, PlaylistParseError}; +use crate::prelude::*; + +mod access_token; +use access_token::TwitchVideoAccessTokenResponse; + +#[derive(Debug)] +pub struct TwitchClient { + client: ReqwestClient, + config: Conf, +} +//region public functions +impl TwitchClient { + #[tracing::instrument] + pub fn new(config: Conf) -> Self { + let client = reqwest::Client::new().into(); + Self { client, config } + } + #[tracing::instrument(skip(self))] + pub async fn download_video( + &self, + video_id: VideoId, + quality: QUALITY, + output_folder: &Path, + ) -> Result { + let video_id = video_id.into(); + let folder_path = output_folder.join(&video_id); + let final_path = output_folder.join(format!("{}.mp4", video_id)); + if final_path.exists() { + return Err(DownloadFileError::TargetAlreadyExists(final_path).into()); + } + if !folder_path.exists() { + std::fs::create_dir_all(&folder_path) + .map_err(DownloadFileError::CouldNotCreateTargetFolder)?; + } else if !folder_path.is_dir() { + return Err(DownloadFileError::TargetFolderIsNotADirectory(folder_path).into()); + } else { + // folder exists and is a directory + if folder_path + .read_dir() + .map_err(DownloadFileError::Read)? + .next() + .is_some() + { + // folder is not empty + return Err(DownloadFileError::TargetFolderIsNotEmpty(folder_path).into()); + } + } + + let mut parts = self + .download_all_parts(quality, &video_id, &folder_path) + .await?; + + sort_parts(&mut parts); + let mp4_file_path = combine_parts_to_mp4(&parts, &folder_path).await?; + + tokio::fs::rename(&mp4_file_path, &final_path) + .await + .map_err(DownloadFileError::Filesystem)?; + //clean up the leftover parts + tokio::fs::remove_dir_all(folder_path) + .await + .map_err(DownloadFileError::Filesystem)?; + Ok(final_path) + } +} +//endregion +impl TwitchClient { + async fn download_all_parts( + &self, + quality: QUALITY, + video_id: &String, + folder_path: &Path, + ) -> Result> { + let download_info = self.get_download_info(video_id, quality).await?; + let parts = download_info.parts; + let base_url = download_info.base_url; + let age = download_info.vod_age; + if parts.is_empty() { + return Err(MalformedPlaylistError::Empty.into()); + } + let try_unmute = age.unwrap_or(999) < 24; //hours i think + let amount_of_parts = parts.len() as u64; + let thread_count = self.config.twitch.downloader_thread_count; + let thread_count: u64 = if thread_count < 1 { + 1 + } else if thread_count > amount_of_parts { + amount_of_parts + } else { + thread_count + }; + + // todo!("maybe add a progress bar/indicator?"); + let it = parts + .into_iter() + .map(|part| { + let folder_path = folder_path.clone(); + let client = self.client.clone(); + let url = base_url.clone(); + async move { + // download + let result = download_part(part, url, folder_path, try_unmute, client).await; + // report progress + trace!("downloaded part: {:?}", result); + // return result + result + } + }) + .map(|x| async { + x.await.and_then(|x: PathBuf| { + x.canonicalize() + .map_err(DownloadFileError::Canonicalization) + }) + }); + let x = futures::stream::iter(it) + .buffer_unordered(thread_count as usize) + .try_collect::>() + .await?; + + Ok(x) + } + #[tracing::instrument(skip(self))] + async fn get_download_info( + &self, + video_id: ID, + quality: QUALITY, + ) -> Result { + let playlist = self.get_video_playlist(video_id, quality).await?; + let playlist_content = self + .client + .execute_with_backoff(self.client.get(&playlist).build()?) + .await? + .text() + .await?; + let base_url = &playlist[..playlist + .rfind('/') + .ok_or(MalformedPlaylistError::InvalidUrl)? + + 1]; + let parts = parse_playlist(playlist_content)?; + // dbg!(&parts); + Ok(DownloadInfo { + vod_age: parts.0, + parts: parts.1, + base_url: base_url.to_string(), + }) + } + + #[tracing::instrument(skip(self))] + async fn get_video_token_and_signature( + &self, + video_id: S, + ) -> Result<(String, String)> { + let video_id = video_id.into(); + trace!("Getting access token & signature for video {}", video_id,); + + const URL: &str = "https://gql.twitch.tv/gql"; + let json = json!({"operationName":"PlaybackAccessToken_Template", + "query": "query PlaybackAccessToken_Template($login: String!, $isLive: Boolean!, $vodID: ID!, $isVod: Boolean!, $playerType: String!) { streamPlaybackAccessToken(channelName: $login, params: {platform: \"web\", playerBackend: \"mediaplayer\", playerType: $playerType}) @include(if: $isLive) { value signature __typename } videoPlaybackAccessToken(id: $vodID, params: {platform: \"web\", playerBackend: \"mediaplayer\", playerType: $playerType}) @include(if: $isVod) { value signature __typename }}", + "variables": { + "isLive": false, + "login": "", + "isVod": true, + "vodID": video_id, + "playerType": "embed" + } + }).to_string(); + let request = self + .client + .post(URL) + .header("Client-ID", &self.config.twitch.downloader_id) + .body(json) + .build()?; + + let response = self.client.execute_with_backoff(request).await?; + let json = response.text().await?; + // trace!("Got json response: {}", json); + let token_response: TwitchVideoAccessTokenResponse = + serde_json::from_str(&json).map_err(DownloadError::AccessTokenJsonParse)?; + trace!( + "Got access token & signature for video {}=>{:?}", + video_id, + token_response + ); + let access_token = token_response + .data + .video_playback_access_token + .ok_or(DownloadError::AccessTokenEmpty)?; + + Ok((access_token.value, access_token.signature)) + } + + #[tracing::instrument(skip(self))] + async fn get_video_playlist( + &self, + video_id: ID, + quality: QUALITY, + ) -> Result { + let video_id = video_id.into(); + let quality = quality.into(); + + trace!( + "Getting video playlist with quality for video {} with quality {}", + video_id, + quality + ); + + let playlist = self.get_video_playlist_per_quality(&video_id).await?; + let playlist = get_playlist_from_quality_list(playlist, &quality)?; + + Ok(playlist) + } + + #[tracing::instrument(skip(self))] + async fn get_video_playlist_per_quality(&self, video_id: &str) -> Result { + let (token, signature) = self.get_video_token_and_signature(video_id).await?; + + let playlist_url = format!( + "https://usher.ttvnw.net/vod/{}?nauth={}&nauthsig={}&allow_source=true&player=twitchweb", + video_id, token, signature + ); + + let request = self.client.get(playlist_url).build()?; + let playlist = self.client.execute_with_backoff(request).await?; + let playlist = playlist.text().await?; + Ok(playlist) + } +} +#[instrument] +async fn download_part( + part: (String, f32), + base_url: String, + folder_path: &Path, + try_unmute: bool, + client: ReqwestClient, +) -> StdResult { + trace!("downloading part: {:?}", part); + let (part, _duration) = part; + + let part_url = format!("{}{}", base_url, part); + let part_url_unmuted = format!("{}{}", base_url, part.replace("-muted", "")); + + let try_unmute = try_unmute && part.contains("-muted"); + let target_path = folder_path.join(&part); + + if try_unmute { + trace!("trying to download unmuted part: {}", part_url_unmuted); + match try_download_part(part_url_unmuted, &target_path, &client).await { + Ok(path) => Ok(path), + Err(_) => { + trace!("failed to download unmuted part. trying muted part"); + try_download_part(part_url, folder_path, &client).await + } + } + } else { + trace!("not trying to unmute: {}", part_url); + try_download_part(part_url, &target_path, &client).await + } +} +async fn try_download_part( + url: String, + target_path: &Path, + client: &ReqwestClient, +) -> StdResult { + let request = client + .get(url) + .build() + .map_err(DownloadFileError::DownloadReqwest)?; + let mut response = client + .execute_with_backoff(request) + .await + .map_err(DownloadFileError::DownloadBackoff)?; + + let mut file = fs::File::create(target_path) + .await + .map_err(DownloadFileError::FileCreation)?; + + while let Some(chunk) = response + .chunk() + .await + .map_err(DownloadFileError::DownloadReqwest)? + { + file.write_all(&chunk) + .await + .map_err(DownloadFileError::Filesystem)?; + } + Ok(target_path.to_path_buf()) +} + +#[instrument] +async fn convert_ts_to_mp4(ts_file: &Path, mp4_file: &Path) -> Result<()> { + info!("converting to mp4"); + if mp4_file.exists() { + tokio::fs::remove_file(&mp4_file) + .await + .map_err(DownloadFileError::Filesystem)?; + } + debug!( + "running ffmpeg command: ffmpeg -i {} -c {}", + ts_file.display(), + mp4_file.display() + ); + let mut cmd = Command::new("ffmpeg"); + let start_time = Instant::now(); + cmd.arg("-i") + .arg(ts_file) + .arg("-c") + .arg("copy") + .arg(mp4_file); + let result = cmd.output().await; + let duration = Instant::now().duration_since(start_time); + debug!("ffmpeg command finished after duration: {:?}", duration); + result.map_err(DownloadFileError::Ffmpeg)?; + Ok(()) +} + +fn parse_playlist( + playlist: String, +) -> StdResult<(Option, HashMap), MalformedPlaylistError> { + info!("Parsing playlist"); + const STREAMED_DATE_IDENT: &str = "#ID3-EQUIV-TDTG:"; + + let mut age = None; + let mut parts = HashMap::new(); + dbg!(&playlist); + let mut lines = playlist.lines(); + loop { + let line = lines.next(); + trace!("line: {:?}", line); + if line.is_none() { + trace!("line is none. done parsing playlist"); + break; + } + let line = line.unwrap(); + if let Some(date) = line.strip_prefix(STREAMED_DATE_IDENT) { + let date = date.trim(); + let date: chrono::DateTime = convert_twitch_date(date)?; + let now = Utc::now(); + let duration = now.signed_duration_since(date); + age = Some(duration.num_hours() as usize); + continue; + } + if let Some(part_duration) = line.strip_prefix("#EXTINF:") { + let mut line = lines.next().ok_or(PlaylistParseError::Eof)?; + if line.starts_with("#EXT-X-BYTERANGE:") { + warn!("Found byterange, ignoring the line and moving on"); + line = lines.next().ok_or(PlaylistParseError::Eof)?; + } + + let part_duration: f32 = part_duration.trim_matches(',').parse().unwrap_or(0.0); + + parts.insert(line.trim().to_string(), part_duration); + } else { + //ignore everything but content lines + continue; + } + } + dbg!(&parts.len()); + Ok((age, parts)) +} + +/// Converts a twitch date string to a chrono::DateTime +/// Example: 2021-05-01T18:00:00 +pub fn convert_twitch_date(date: &str) -> StdResult, PlaylistParseError> { + let date = date.trim(); + let date = date.trim_matches('"'); + + //parse the date from a string like this: 2023-10-07T23:33:29 + NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%S") + .map(|e| e.and_utc()) + .map_err(PlaylistParseError::InvalidTimeFormat) +} +#[cfg(test)] +mod tests { + use super::*; + use chrono::{Datelike, Timelike}; + #[test] + fn test_convert_twitch_date() { + let date = "2021-05-01T18:00:00"; + let date = convert_twitch_date(date).unwrap(); + assert_eq!(date.year(), 2021); + assert_eq!(date.month(), 5); + assert_eq!(date.day(), 1); + assert_eq!(date.hour(), 18); + assert_eq!(date.minute(), 0); + assert_eq!(date.second(), 0); + } +} +#[tracing::instrument(skip(playlist))] +fn get_playlist_from_quality_list(playlist: String, quality: &str) -> Result { + trace!("Parsing playlist:\n{}", playlist); + + let mut qualties = HashMap::new(); + + let mut highest_quality = String::new(); + let test: Vec<&str> = playlist.lines().collect(); + for (i, line) in test.iter().enumerate() { + if !line.contains("#EXT-X-MEDIA") { + continue; + } + + let found_quality = line.split("NAME=\"").collect::>()[1] + .split('"') + .collect::>()[0]; + + if qualties.get(found_quality).is_some() { + continue; + } + if qualties.is_empty() { + // the first one is the highest quality + highest_quality = found_quality.to_string(); + } + + let url = test[i + 2]; + qualties.insert(found_quality, url); + } + if let Some(quality) = qualties.get(quality) { + Ok(quality.to_string()) + } else { + warn!( + "Given quality not found ({}), using highest quality: {}", + quality, highest_quality + ); + Ok(qualties + .get(highest_quality.as_str()) + .ok_or(MalformedPlaylistError::NoQualities)? + .to_string()) + } +} +#[derive(Debug, Clone)] +struct DownloadInfo { + vod_age: Option, + parts: HashMap, + base_url: String, +} +#[cfg(test)] +mod abc { + use futures_util::{StreamExt, TryStreamExt}; + #[tokio::test] + async fn test1() { + let v = vec![1, 3, 5]; + let x1 = run(v).await; + assert!(x1.is_err()); + assert_eq!(x1.unwrap_err(), 5i64); + } + #[tokio::test] + async fn test2() { + let v = vec![1, 5, 1]; + let x1 = run(v).await; + assert!(x1.is_err()); + assert_eq!(x1.unwrap_err(), 5i64); + } + #[tokio::test] + async fn test3() { + let v = vec![1, 3, 2, 2]; + let x1 = run(v).await; + assert!(x1.is_ok()); + assert_eq!(x1.unwrap(), vec![1, 3, 2, 2]); + } + async fn run(v: Vec) -> Result, i64> { + async fn sample(part: i32) -> Result { + dbg!(part); + if part <= 3 { + Ok(part as i16) + } else { + Err(part as i64) + } + } + let thread_count = 2; + let it = v.into_iter().map(sample); + let x = futures::stream::iter(it); + let x1: Result, i64> = x.buffer_unordered(thread_count).try_collect().await; + dbg!(&x1); + x1 + } +} + +fn sort_parts(files: &mut [PathBuf]) { + files.sort_by_key(|path| { + let number = path + .file_stem() + .map(|x| { + x.to_str() + .unwrap_or("") + .replace("-muted", "") + .replace("-unmuted", "") + }) + .unwrap_or(String::from("0")); + match number.parse::() { + Ok(n) => n, + Err(e) => { + warn!( + "potentially catchable error while parsing the file number: {}\n{}", + number, e + ); + if !number.contains('-') { + error!("Error while parsing the file number: {}", number); + panic!("Error while parsing the file number: {}", number) + } + let number = number.split('-').collect::>()[1]; + number + .parse() + .unwrap_or_else(|_| panic!("Error while parsing the file number: {}", number)) + } + } + }); +} + +#[instrument(skip(files), fields(part_amount=files.len()))] +async fn combine_parts_to_single_ts(files: &[PathBuf], target: &Path) -> Result<()> { + debug!("combining all parts of video"); + debug!("part amount: {}", files.len()); + let mut target = fs::File::create(target) + .await + .map_err(DownloadFileError::FileCreation)?; + for file_path in files { + trace!("{:?}", file_path.file_name()); + let mut file = fs::File::open(&file_path) + .await + .map_err(DownloadFileError::Read)?; + tokio::io::copy(&mut file, &mut target) + .await + .map_err(DownloadFileError::Write)?; + tokio::fs::remove_file(&file_path) + .await + .map_err(DownloadFileError::Write)?; + } + + Ok(()) +} + +async fn combine_parts_to_mp4(parts: &[PathBuf], folder_path: &Path) -> Result { + let ts_file_path = folder_path.join("video.ts"); + let mp4_file_path = folder_path.join("video.mp4"); + + combine_parts_to_single_ts(parts, &ts_file_path).await?; + convert_ts_to_mp4(&ts_file_path, &mp4_file_path).await?; + tokio::fs::remove_file(ts_file_path) + .await + .map_err(DownloadFileError::Filesystem)?; + + Ok(mp4_file_path) +}