reorganize stuff & db interaction

This commit is contained in:
OMGeeky
2023-10-14 15:17:05 +02:00
parent 3c9a111ab0
commit e5cac5d585
7 changed files with 424 additions and 317 deletions

93
src/client.rs Normal file
View File

@@ -0,0 +1,93 @@
use crate::prelude::*;
use crate::twitch::TwitchClient;
use local_db::prelude::*;
use local_db::re_exports::sea_orm::ActiveValue::Set;
use local_db::re_exports::sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter,
};
use std::path::Path;
#[derive(Debug)]
pub struct DownloaderClient {
db: DatabaseConnection,
pub twitch_client: TwitchClient,
}
impl DownloaderClient {
pub fn new(twitch_client: TwitchClient, db: DatabaseConnection) -> Self {
Self { twitch_client, db }
}
#[tracing::instrument(skip(self))]
pub async fn download_not_downloaded_videos(&self) -> Result<()> {
let output_folder: &Path =
Path::new(self.twitch_client.config.download_folder_path.as_str());
let videos = Videos::find()
.filter(VideosColumn::Status.eq(Status::NotStarted))
.all(&self.db)
.await?;
for video in videos {
let id = video.id;
let quality = "max";
let success = self.download_video(video, quality, output_folder).await;
if let Err(err) = success {
error!(
"Could not download video with id: {} because of err: {:?}",
id, err
);
} else {
info!("Downloaded video with id: {}", id);
}
}
Ok(())
}
pub async fn download_video_by_id<VideoId: DIntoString, Quality: DIntoString>(
&self,
video_id: VideoId,
quality: Quality,
output_folder: &Path,
) -> Result<()> {
let video_id = video_id.into();
let quality = quality.into();
let video = Videos::find()
.filter(VideosColumn::TwitchId.eq(&video_id))
.one(&self.db)
.await?
.ok_or_else(|| DownloaderError::VideoNotFound(video_id))?;
self.download_video(video, &quality, output_folder).await
}
pub async fn download_video(
&self,
video: VideosModel,
quality: &str,
output_folder: &Path,
) -> Result<()> {
let video_id = video.twitch_id.clone();
let mut video = video.into_active_model();
video.status = Set(Status::Downloading);
video.clone().update(&self.db).await?;
let download_result = self
.twitch_client
.download_video(video_id, quality, output_folder)
.await;
match download_result {
Ok(path) => {
info!("Downloaded video to {:?}", path);
video.status = Set(Status::Downloaded);
video.clone().update(&self.db).await?;
Ok(())
}
Err(err) => {
error!("Could not download video: {:?}", err);
video.status = Set(Status::Failed);
video.clone().update(&self.db).await?;
Err(err)
}
}
}
}

View File

@@ -2,14 +2,20 @@ use reqwest_backoff::ReqwestBackoffError;
use std::path::PathBuf;
#[derive(Debug, thiserror::Error)]
pub enum DownloadError {
pub enum DownloaderError {
#[error("Video not found: {0}")]
VideoNotFound(String),
#[error("User not found: {0}")]
UserNotFound(i32),
#[error("Malformed playlist")]
MalformedPlaylist(#[from] MalformedPlaylistError),
#[error("Backoff error")]
Backoff(#[from] ReqwestBackoffError),
#[error("Database Error")]
Database(#[from] local_db::re_exports::sea_orm::errors::DbErr),
Database(#[from] local_db::re_exports::sea_orm::DbErr),
#[error("Reqwest error")]
Reqwest(#[from] reqwest::Error),

44
src/main.rs Normal file
View File

@@ -0,0 +1,44 @@
pub mod prelude;
use prelude::*;
pub mod client;
mod errors;
pub mod twitch;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_env_filter(
"sea_orm=warn,sea_orm_migration=warn,sqlx=warn,downloader=trace,local_db=warn,reqwest-backoff=warn",
)
.init();
info!("Hello, world!");
run().await?;
info!("Bye");
Ok(())
}
#[tracing::instrument]
async fn run() -> Result<()> {
let conf = Conf::builder()
.env()
.file("./settings.toml")
.file("/home/omgeeky/twba/config.toml")
.load()
.map_err(|e| DownloaderError::LoadConfig(e.into()))?;
let db = local_db::open_database(Some(&conf.db_url)).await?;
local_db::migrate_db(&db).await?;
// local_db::print_db(&db).await?;
// dbg!(&conf);
let twitch_client = twitch::TwitchClient::new(conf);
let client = client::DownloaderClient::new(twitch_client, db);
client.download_not_downloaded_videos().await?;
Ok(())
}

View File

@@ -1,4 +1,4 @@
pub use crate::errors::DownloadError;
pub use crate::errors::DownloaderError;
// pub(crate) use anyhow::Result;
pub(crate) use backup_config::prelude::*;
use std::fmt::Debug;
@@ -13,4 +13,4 @@ pub(crate) use std::result::Result as StdResult;
pub trait DIntoString: Into<String> + Debug {}
impl<T> DIntoString for T where T: Into<String> + Debug {}
pub type Result<T> = StdResult<T, DownloadError>;
pub type Result<T> = StdResult<T, DownloaderError>;

View File

@@ -1,4 +1,3 @@
use chrono::{NaiveDateTime, Utc};
use futures_util::{StreamExt, TryStreamExt};
use reqwest_backoff::ReqwestClient;
use serde_json::json;
@@ -11,16 +10,21 @@ use tokio::process::Command;
use tokio::time::Instant;
use tracing::instrument;
use crate::errors::{DownloadError, DownloadFileError, MalformedPlaylistError, PlaylistParseError};
use crate::errors::*;
use crate::prelude::*;
mod access_token;
use crate::twitch::parts_util::*;
use crate::twitch::twitch_utils::*;
use access_token::TwitchVideoAccessTokenResponse;
mod parts_util;
pub mod twitch_utils;
#[derive(Debug)]
pub struct TwitchClient {
client: ReqwestClient,
config: Conf,
pub config: Conf,
}
//region public functions
impl TwitchClient {
@@ -188,7 +192,7 @@ impl TwitchClient {
let json = response.text().await?;
// trace!("Got json response: {}", json);
let token_response: TwitchVideoAccessTokenResponse =
serde_json::from_str(&json).map_err(DownloadError::AccessTokenJsonParse)?;
serde_json::from_str(&json).map_err(DownloaderError::AccessTokenJsonParse)?;
trace!(
"Got access token & signature for video {}=>{:?}",
video_id,
@@ -197,7 +201,7 @@ impl TwitchClient {
let access_token = token_response
.data
.video_playback_access_token
.ok_or(DownloadError::AccessTokenEmpty)?;
.ok_or(DownloaderError::AccessTokenEmpty)?;
Ok((access_token.value, access_token.signature))
}
@@ -238,318 +242,10 @@ impl TwitchClient {
Ok(playlist)
}
}
#[instrument]
async fn download_part(
part: (String, f32),
base_url: String,
folder_path: &Path,
try_unmute: bool,
client: ReqwestClient,
) -> StdResult<PathBuf, DownloadFileError> {
trace!("downloading part: {:?}", part);
let (part, _duration) = part;
let part_url = format!("{}{}", base_url, part);
let part_url_unmuted = format!("{}{}", base_url, part.replace("-muted", ""));
let try_unmute = try_unmute && part.contains("-muted");
let target_path = folder_path.join(&part);
if try_unmute {
trace!("trying to download unmuted part: {}", part_url_unmuted);
match try_download_part(part_url_unmuted, &target_path, &client).await {
Ok(path) => Ok(path),
Err(_) => {
trace!("failed to download unmuted part. trying muted part");
try_download_part(part_url, folder_path, &client).await
}
}
} else {
trace!("not trying to unmute: {}", part_url);
try_download_part(part_url, &target_path, &client).await
}
}
async fn try_download_part(
url: String,
target_path: &Path,
client: &ReqwestClient,
) -> StdResult<PathBuf, DownloadFileError> {
let request = client
.get(url)
.build()
.map_err(DownloadFileError::DownloadReqwest)?;
let mut response = client
.execute_with_backoff(request)
.await
.map_err(DownloadFileError::DownloadBackoff)?;
let mut file = fs::File::create(target_path)
.await
.map_err(DownloadFileError::FileCreation)?;
while let Some(chunk) = response
.chunk()
.await
.map_err(DownloadFileError::DownloadReqwest)?
{
file.write_all(&chunk)
.await
.map_err(DownloadFileError::Filesystem)?;
}
Ok(target_path.to_path_buf())
}
#[instrument]
async fn convert_ts_to_mp4(ts_file: &Path, mp4_file: &Path) -> Result<()> {
info!("converting to mp4");
if mp4_file.exists() {
tokio::fs::remove_file(&mp4_file)
.await
.map_err(DownloadFileError::Filesystem)?;
}
debug!(
"running ffmpeg command: ffmpeg -i {} -c {}",
ts_file.display(),
mp4_file.display()
);
let mut cmd = Command::new("ffmpeg");
let start_time = Instant::now();
cmd.arg("-i")
.arg(ts_file)
.arg("-c")
.arg("copy")
.arg(mp4_file);
let result = cmd.output().await;
let duration = Instant::now().duration_since(start_time);
debug!("ffmpeg command finished after duration: {:?}", duration);
result.map_err(DownloadFileError::Ffmpeg)?;
Ok(())
}
fn parse_playlist(
playlist: String,
) -> StdResult<(Option<usize>, HashMap<String, f32>), MalformedPlaylistError> {
info!("Parsing playlist");
const STREAMED_DATE_IDENT: &str = "#ID3-EQUIV-TDTG:";
let mut age = None;
let mut parts = HashMap::new();
dbg!(&playlist);
let mut lines = playlist.lines();
loop {
let line = lines.next();
trace!("line: {:?}", line);
if line.is_none() {
trace!("line is none. done parsing playlist");
break;
}
let line = line.unwrap();
if let Some(date) = line.strip_prefix(STREAMED_DATE_IDENT) {
let date = date.trim();
let date: chrono::DateTime<Utc> = convert_twitch_date(date)?;
let now = Utc::now();
let duration = now.signed_duration_since(date);
age = Some(duration.num_hours() as usize);
continue;
}
if let Some(part_duration) = line.strip_prefix("#EXTINF:") {
let mut line = lines.next().ok_or(PlaylistParseError::Eof)?;
if line.starts_with("#EXT-X-BYTERANGE:") {
warn!("Found byterange, ignoring the line and moving on");
line = lines.next().ok_or(PlaylistParseError::Eof)?;
}
let part_duration: f32 = part_duration.trim_matches(',').parse().unwrap_or(0.0);
parts.insert(line.trim().to_string(), part_duration);
} else {
//ignore everything but content lines
continue;
}
}
dbg!(&parts.len());
Ok((age, parts))
}
/// Converts a twitch date string to a chrono::DateTime<Utc>
/// Example: 2021-05-01T18:00:00
pub fn convert_twitch_date(date: &str) -> StdResult<chrono::DateTime<Utc>, PlaylistParseError> {
let date = date.trim();
let date = date.trim_matches('"');
//parse the date from a string like this: 2023-10-07T23:33:29
NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%S")
.map(|e| e.and_utc())
.map_err(PlaylistParseError::InvalidTimeFormat)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Datelike, Timelike};
#[test]
fn test_convert_twitch_date() {
let date = "2021-05-01T18:00:00";
let date = convert_twitch_date(date).unwrap();
assert_eq!(date.year(), 2021);
assert_eq!(date.month(), 5);
assert_eq!(date.day(), 1);
assert_eq!(date.hour(), 18);
assert_eq!(date.minute(), 0);
assert_eq!(date.second(), 0);
}
}
#[tracing::instrument(skip(playlist))]
fn get_playlist_from_quality_list(playlist: String, quality: &str) -> Result<String> {
trace!("Parsing playlist:\n{}", playlist);
let mut qualties = HashMap::new();
let mut highest_quality = String::new();
let test: Vec<&str> = playlist.lines().collect();
for (i, line) in test.iter().enumerate() {
if !line.contains("#EXT-X-MEDIA") {
continue;
}
let found_quality = line.split("NAME=\"").collect::<Vec<&str>>()[1]
.split('"')
.collect::<Vec<&str>>()[0];
if qualties.get(found_quality).is_some() {
continue;
}
if qualties.is_empty() {
// the first one is the highest quality
highest_quality = found_quality.to_string();
}
let url = test[i + 2];
qualties.insert(found_quality, url);
}
if let Some(quality) = qualties.get(quality) {
Ok(quality.to_string())
} else {
warn!(
"Given quality not found ({}), using highest quality: {}",
quality, highest_quality
);
Ok(qualties
.get(highest_quality.as_str())
.ok_or(MalformedPlaylistError::NoQualities)?
.to_string())
}
}
#[derive(Debug, Clone)]
struct DownloadInfo {
vod_age: Option<usize>,
parts: HashMap<String, f32>,
base_url: String,
}
#[cfg(test)]
mod abc {
use futures_util::{StreamExt, TryStreamExt};
#[tokio::test]
async fn test1() {
let v = vec![1, 3, 5];
let x1 = run(v).await;
assert!(x1.is_err());
assert_eq!(x1.unwrap_err(), 5i64);
}
#[tokio::test]
async fn test2() {
let v = vec![1, 5, 1];
let x1 = run(v).await;
assert!(x1.is_err());
assert_eq!(x1.unwrap_err(), 5i64);
}
#[tokio::test]
async fn test3() {
let v = vec![1, 3, 2, 2];
let x1 = run(v).await;
assert!(x1.is_ok());
assert_eq!(x1.unwrap(), vec![1, 3, 2, 2]);
}
async fn run(v: Vec<i32>) -> Result<Vec<i16>, i64> {
async fn sample(part: i32) -> Result<i16, i64> {
dbg!(part);
if part <= 3 {
Ok(part as i16)
} else {
Err(part as i64)
}
}
let thread_count = 2;
let it = v.into_iter().map(sample);
let x = futures::stream::iter(it);
let x1: Result<Vec<i16>, i64> = x.buffer_unordered(thread_count).try_collect().await;
dbg!(&x1);
x1
}
}
fn sort_parts(files: &mut [PathBuf]) {
files.sort_by_key(|path| {
let number = path
.file_stem()
.map(|x| {
x.to_str()
.unwrap_or("")
.replace("-muted", "")
.replace("-unmuted", "")
})
.unwrap_or(String::from("0"));
match number.parse::<u32>() {
Ok(n) => n,
Err(e) => {
warn!(
"potentially catchable error while parsing the file number: {}\n{}",
number, e
);
if !number.contains('-') {
error!("Error while parsing the file number: {}", number);
panic!("Error while parsing the file number: {}", number)
}
let number = number.split('-').collect::<Vec<&str>>()[1];
number
.parse()
.unwrap_or_else(|_| panic!("Error while parsing the file number: {}", number))
}
}
});
}
#[instrument(skip(files), fields(part_amount=files.len()))]
async fn combine_parts_to_single_ts(files: &[PathBuf], target: &Path) -> Result<()> {
debug!("combining all parts of video");
debug!("part amount: {}", files.len());
let mut target = fs::File::create(target)
.await
.map_err(DownloadFileError::FileCreation)?;
for file_path in files {
trace!("{:?}", file_path.file_name());
let mut file = fs::File::open(&file_path)
.await
.map_err(DownloadFileError::Read)?;
tokio::io::copy(&mut file, &mut target)
.await
.map_err(DownloadFileError::Write)?;
tokio::fs::remove_file(&file_path)
.await
.map_err(DownloadFileError::Write)?;
}
Ok(())
}
async fn combine_parts_to_mp4(parts: &[PathBuf], folder_path: &Path) -> Result<PathBuf> {
let ts_file_path = folder_path.join("video.ts");
let mp4_file_path = folder_path.join("video.mp4");
combine_parts_to_single_ts(parts, &ts_file_path).await?;
convert_ts_to_mp4(&ts_file_path, &mp4_file_path).await?;
tokio::fs::remove_file(ts_file_path)
.await
.map_err(DownloadFileError::Filesystem)?;
Ok(mp4_file_path)
}

163
src/twitch/parts_util.rs Normal file
View File

@@ -0,0 +1,163 @@
use super::*;
/// Sorts the parts by their number.
///
/// The parts must be named like this: `1.ts`, `2.ts`, `3-muted.ts`, `4-unmuted.ts`, etc.
///
/// Optionally if the number contains a single `-` like this: `1094734-1.ts`, `1094734-2.ts`, `1094734-3-muted.ts`, `1094734-4-unmuted.ts`, etc.
/// everything before the `-` will be ignored and it will try to parse the rest as a number.
///
/// If that all fails, it will panic!
pub fn sort_parts(files: &mut [PathBuf]) {
files.sort_by_key(|path| {
let number = path
.file_stem()
.map(|x| {
x.to_str()
.unwrap_or("")
.replace("-muted", "")
.replace("-unmuted", "")
})
.unwrap_or(String::from("0"));
match number.parse::<u32>() {
Ok(n) => n,
Err(e) => {
warn!(
"potentially catchable error while parsing the file number: {}\n{}",
number, e
);
if !number.contains('-') {
error!("Error while parsing the file number: {}", number);
panic!("Error while parsing the file number: {}", number)
}
let number = number.split('-').collect::<Vec<&str>>()[1];
number
.parse()
.unwrap_or_else(|_| panic!("Error while parsing the file number: {}", number))
}
}
});
}
#[instrument(skip(files), fields(part_amount=files.len()))]
pub async fn combine_parts_to_single_ts(files: &[PathBuf], target: &Path) -> Result<()> {
debug!("combining all parts of video");
debug!("part amount: {}", files.len());
let mut target = fs::File::create(target)
.await
.map_err(DownloadFileError::FileCreation)?;
for file_path in files {
trace!("{:?}", file_path.file_name());
let mut file = fs::File::open(&file_path)
.await
.map_err(DownloadFileError::Read)?;
tokio::io::copy(&mut file, &mut target)
.await
.map_err(DownloadFileError::Write)?;
tokio::fs::remove_file(&file_path)
.await
.map_err(DownloadFileError::Write)?;
}
Ok(())
}
pub async fn combine_parts_to_mp4(parts: &[PathBuf], folder_path: &Path) -> Result<PathBuf> {
let ts_file_path = folder_path.join("video.ts");
let mp4_file_path = folder_path.join("video.mp4");
combine_parts_to_single_ts(parts, &ts_file_path).await?;
convert_ts_to_mp4(&ts_file_path, &mp4_file_path).await?;
tokio::fs::remove_file(ts_file_path)
.await
.map_err(DownloadFileError::Filesystem)?;
Ok(mp4_file_path)
}
#[instrument]
pub async fn convert_ts_to_mp4(ts_file: &Path, mp4_file: &Path) -> Result<()> {
info!("converting to mp4");
if mp4_file.exists() {
tokio::fs::remove_file(&mp4_file)
.await
.map_err(DownloadFileError::Filesystem)?;
}
debug!(
"running ffmpeg command: ffmpeg -i {} -c {}",
ts_file.display(),
mp4_file.display()
);
let mut cmd = Command::new("ffmpeg");
let start_time = Instant::now();
cmd.arg("-i")
.arg(ts_file)
.arg("-c")
.arg("copy")
.arg(mp4_file);
let result = cmd.output().await;
let duration = Instant::now().duration_since(start_time);
debug!("ffmpeg command finished after duration: {:?}", duration);
result.map_err(DownloadFileError::Ffmpeg)?;
Ok(())
}
#[instrument]
pub async fn download_part(
part: (String, f32),
base_url: String,
folder_path: &Path,
try_unmute: bool,
client: ReqwestClient,
) -> StdResult<PathBuf, DownloadFileError> {
trace!("downloading part: {:?}", part);
let (part, _duration) = part;
let part_url = format!("{}{}", base_url, part);
let part_url_unmuted = format!("{}{}", base_url, part.replace("-muted", ""));
let try_unmute = try_unmute && part.contains("-muted");
let target_path = folder_path.join(&part);
if try_unmute {
trace!("trying to download unmuted part: {}", part_url_unmuted);
match try_download_part(part_url_unmuted, &target_path, &client).await {
Ok(path) => Ok(path),
Err(_) => {
trace!("failed to download unmuted part. trying muted part");
try_download_part(part_url, folder_path, &client).await
}
}
} else {
trace!("not trying to unmute: {}", part_url);
try_download_part(part_url, &target_path, &client).await
}
}
pub async fn try_download_part(
url: String,
target_path: &Path,
client: &ReqwestClient,
) -> StdResult<PathBuf, DownloadFileError> {
let request = client
.get(url)
.build()
.map_err(DownloadFileError::DownloadReqwest)?;
let mut response = client
.execute_with_backoff(request)
.await
.map_err(DownloadFileError::DownloadBackoff)?;
let mut file = fs::File::create(target_path)
.await
.map_err(DownloadFileError::FileCreation)?;
while let Some(chunk) = response
.chunk()
.await
.map_err(DownloadFileError::DownloadReqwest)?
{
file.write_all(&chunk)
.await
.map_err(DownloadFileError::Filesystem)?;
}
Ok(target_path.to_path_buf())
}

105
src/twitch/twitch_utils.rs Normal file
View File

@@ -0,0 +1,105 @@
use crate::errors::{MalformedPlaylistError, PlaylistParseError};
use crate::prelude::StdResult;
use crate::prelude::*;
use chrono::{NaiveDateTime, Utc};
use std::collections::HashMap;
/// Converts a twitch date string to a chrono::DateTime<Utc>
///
/// Example: 2023-10-07T23:33:29
pub fn convert_twitch_date(date: &str) -> StdResult<chrono::DateTime<Utc>, PlaylistParseError> {
let date = date.trim();
let date = date.trim_matches('"');
//parse the date from a string like this: 2023-10-07T23:33:29
NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%S")
.map(|x| x.and_utc())
.map_err(PlaylistParseError::InvalidTimeFormat)
}
pub fn parse_playlist(
playlist: String,
) -> StdResult<(Option<usize>, HashMap<String, f32>), MalformedPlaylistError> {
info!("Parsing playlist");
const STREAMED_DATE_IDENT: &str = "#ID3-EQUIV-TDTG:";
let mut age = None;
let mut parts = HashMap::new();
dbg!(&playlist);
let mut lines = playlist.lines();
loop {
let line = lines.next();
trace!("line: {:?}", line);
if line.is_none() {
trace!("line is none. done parsing playlist");
break;
}
let line = line.unwrap();
if let Some(date) = line.strip_prefix(STREAMED_DATE_IDENT) {
let date = date.trim();
let date: chrono::DateTime<Utc> = convert_twitch_date(date)?;
let now = Utc::now();
let duration = now.signed_duration_since(date);
age = Some(duration.num_hours() as usize);
continue;
}
if let Some(part_duration) = line.strip_prefix("#EXTINF:") {
let mut line = lines.next().ok_or(PlaylistParseError::Eof)?;
if line.starts_with("#EXT-X-BYTERANGE:") {
warn!("Found byterange, ignoring the line and moving on");
line = lines.next().ok_or(PlaylistParseError::Eof)?;
}
let part_duration: f32 = part_duration.trim_matches(',').parse().unwrap_or(0.0);
parts.insert(line.trim().to_string(), part_duration);
} else {
//ignore everything but content lines
continue;
}
}
dbg!(&parts.len());
Ok((age, parts))
}
#[tracing::instrument(skip(playlist))]
pub fn get_playlist_from_quality_list(playlist: String, quality: &str) -> Result<String> {
trace!("Parsing playlist:\n{}", playlist);
let mut qualties = HashMap::new();
let mut highest_quality = String::new();
let test: Vec<&str> = playlist.lines().collect();
for (i, line) in test.iter().enumerate() {
if !line.contains("#EXT-X-MEDIA") {
continue;
}
let found_quality = line.split("NAME=\"").collect::<Vec<&str>>()[1]
.split('"')
.collect::<Vec<&str>>()[0];
if qualties.get(found_quality).is_some() {
continue;
}
if qualties.is_empty() {
// the first one is the highest quality
highest_quality = found_quality.to_string();
}
let url = test[i + 2];
qualties.insert(found_quality, url);
}
if let Some(quality) = qualties.get(quality) {
Ok(quality.to_string())
} else {
warn!(
"Given quality not found ({}), using highest quality: {}",
quality, highest_quality
);
Ok(qualties
.get(highest_quality.as_str())
.ok_or(MalformedPlaylistError::NoQualities)?
.to_string())
}
}