From 083fb7f998c7b6dc59aba74eebfceb90d573fe94 Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Sun, 15 Oct 2023 13:23:53 +0200 Subject: [PATCH] Initial version (should work pretty well) --- .gitignore | 4 + Cargo.toml | 21 +++ src/client.rs | 157 +++++++++++++++++++ src/client/utils.rs | 203 +++++++++++++++++++++++++ src/client/utils/ffmpeg.rs | 79 ++++++++++ src/errors.rs | 39 +++++ src/main.rs | 38 +++++ src/prelude.rs | 6 + tests/data/sample_output_playlist.m3u8 | 10 ++ 9 files changed, 557 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/client.rs create mode 100644 src/client/utils.rs create mode 100644 src/client/utils/ffmpeg.rs create mode 100644 src/errors.rs create mode 100644 src/main.rs create mode 100644 src/prelude.rs create mode 100644 tests/data/sample_output_playlist.m3u8 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1dac2ce --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target +/.idea +/Cargo.lock +/.cargo \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..80b2fee --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "splitter" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +backup-config = { version = "0.1", git="https://github.com/OMGeeky/backup_config.git" } +local-db = { version = "0.2", git="https://github.com/OMGeeky/twitch_backup.local_db.git" } + +tokio = "1.33" +tracing = "0.1" +tracing-subscriber = "0.3" + +thiserror = "1.0" +anyhow = "1.0" + +chrono = "0.4" + +shellexpand = "3.1" diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..3c675d5 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,157 @@ +use crate::errors::SplitterError; +use crate::prelude::*; +use backup_config::Conf; +use chrono::Duration; +use local_db::prelude::{Status, Videos, VideosColumn, VideosModel}; +use local_db::re_exports::sea_orm::{ + ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, + QueryFilter, +}; +use std::path::{Path, PathBuf}; +use std::time::Instant; +use tokio::fs; + +mod utils; +use utils::ffmpeg::run_ffmpeg_split; + +pub struct SplitterClient { + conf: Conf, + db: DatabaseConnection, +} + +impl SplitterClient { + pub fn new(conf: Conf, db: DatabaseConnection) -> Self { + Self { conf, db } + } +} + +impl SplitterClient { + #[tracing::instrument(skip(self))] + async fn split_video(&self, video: VideosModel) -> Result<()> { + // + let id = video.twitch_id.clone(); + let mut video = video.into_active_model(); + video.status = ActiveValue::Set(Status::Splitting); + video.clone().update(&self.db).await?; + let result = self.inner_split_video(id.clone()).await; + + match result { + Ok(_) => { + video.status = ActiveValue::Set(Status::Split); + video.clone().update(&self.db).await?; + } + Err(err) => { + video.status = ActiveValue::Set(Status::SplitFailed); + video.clone().update(&self.db).await?; + return Err(err); + } + } + Ok(()) + } + async fn inner_split_video(&self, id: String) -> Result<()> { + let base_path = Path::new(&self.conf.download_folder_path); + let input_path = base_path.join(format!("{}.mp4", id)); + let output_folder_path = base_path.join(&id); + + info!("Splitting video with id: {}", id); + verify_paths(base_path, &input_path, &output_folder_path).await?; + let output_path_pattern = output_folder_path.join("%03d.mp4"); + let output_path_pattern = output_path_pattern + .to_str() + .ok_or_else(|| SplitterError::PathToString(output_path_pattern.clone()))? + .to_string(); + + let split_playlist_path = output_folder_path.join("output.m3u8"); + debug!("output_path_pattern: {}", output_path_pattern); + let duration_soft_cap = Duration::minutes( + self.conf + .google + .youtube + .default_video_length_minutes_soft_cap, + ); + let duration_hard_cap = Duration::minutes( + self.conf + .google + .youtube + .default_video_length_minutes_hard_cap, + ); + //todo: get a user specific soft and hard cap + info!("splitting video at path: {:?}", input_path); + let start_time = Instant::now(); + run_ffmpeg_split( + &input_path, + &output_path_pattern, + &split_playlist_path, + &duration_soft_cap, + ) + .await?; + + let duration = Instant::now().duration_since(start_time); + info!("FFMPEG-Splitting took: {:?}", duration); + let split_info = utils::get_playlist_info(&split_playlist_path).await?; + tokio::fs::remove_file(&split_playlist_path) + .await + .map_err(SplitterError::Write)?; + trace!( + "total duration: {} in {} parts", + split_info.total_duration.to_string(), + split_info.parts.len() + ); + let paths = + utils::join_last_parts_if_needed(split_info, &output_folder_path, duration_hard_cap) + .await?; + + debug!("removing original file: {:?}", input_path); + tokio::fs::remove_file(&input_path) + .await + .map_err(SplitterError::Write)?; + + let duration = Instant::now().duration_since(start_time); + info!("Done Splitting. Whole operation took: {:?}", duration); + debug!("paths: {:?}", paths); + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub async fn split_videos(&self) -> Result<()> { + info!("Splitting videos"); + let videos = Videos::find() + .filter(VideosColumn::Status.eq(Status::Downloaded)) + .all(&self.db) + .await?; + + for video in videos { + info!("Splitting video: {:?}", video); + let id = video.id; + let success = self.split_video(video).await; + if let Err(err) = success { + error!( + "Could not split video with id: {} because of err: {:?}", + id, err + ); + } else { + info!("Split video with id: {}", id); + } + } + + info!("Finished splitting videos"); + Ok(()) + } +} + +async fn verify_paths( + base_path: &Path, + input_path: &Path, + output_folder_path: &PathBuf, +) -> Result<()> { + if !base_path.exists() || !input_path.exists() { + return Err(SplitterError::NotFound(input_path.to_path_buf())); + } + if !input_path.is_file() { + return Err(SplitterError::InvalidInputFile(input_path.to_path_buf())); + } + fs::create_dir_all(&output_folder_path) + .await + .map_err(SplitterError::CreateFolder)?; + Ok(()) +} diff --git a/src/client/utils.rs b/src/client/utils.rs new file mode 100644 index 0000000..efe9ee0 --- /dev/null +++ b/src/client/utils.rs @@ -0,0 +1,203 @@ +use crate::errors::SplitterError; +use crate::prelude::*; +use anyhow::Context; +use chrono::Duration; +use std::path::{Path, PathBuf}; +use tokio::process::Command; + +/// Converts a duration to a string that is usable for example in an ffmpeg command +/// +/// Example: +/// +/// ``` +/// use chrono::Duration; +/// let duration: Duration = Duration::seconds(20); +/// let s = downloader::duration_to_string(&duration); +/// assert_eq!(s, "00:00:20"); +/// ``` +pub fn duration_to_string(duration: &Duration) -> String { + trace!("duration to string for duration: {:?}", duration); + let seconds = duration.num_seconds(); + let hours = seconds / 3600; + let minutes = (seconds % 3600) / 60; + let seconds = seconds % 60; + format!("{:02}:{:02}:{:02}", hours, minutes, seconds) +} + +pub(super) async fn join_last_parts_if_needed( + mut input_parts: PlaylistInfo, + base_folder: &Path, + duration_cap: Duration, +) -> Result> { + info!("joining last parts if needed"); + let last_part = input_parts.last_part(); + let second_last_part = input_parts.second_last_part(); + if let Some(last_part) = last_part { + if let Some(second_last_path) = second_last_part { + let joined_duration = last_part.duration + second_last_path.duration; + if joined_duration <= duration_cap { + //join together + + join_last_two_parts(&mut input_parts, base_folder).await?; + info!("joined last two parts together"); + } else { + info!("last two parts are too long to join together"); + } + } else { + info!("there is only one part, so we can't join anything"); + } + } else { + warn!("there are no parts, so we can't join anything"); + } + + input_parts + .parts + .iter() + .map(|part| Ok(base_folder.join(&part.path))) + .collect() +} + +async fn join_last_two_parts(input_parts: &mut PlaylistInfo, base_folder: &Path) -> Result<()> { + let last_part = input_parts + .parts + .pop() + .ok_or(SplitterError::JoinRequiresAtLeastTwoParts)?; + let second_last_part = input_parts + .parts + .last_mut() + .ok_or(SplitterError::JoinRequiresAtLeastTwoParts)?; + second_last_part.duration = second_last_part.duration + last_part.duration; + let second_last_part_path = combine_path_as_string(base_folder, &second_last_part.path)?; + let last_part_path = combine_path_as_string(base_folder, &last_part.path)?; + let join_txt_path = base_folder + .join("join.txt") + .canonicalize() + .map_err(SplitterError::Canonicalize)?; + let join_out_tmp_path = base_folder + .join("join_out_tmp.mp4") + .canonicalize() + .map_err(SplitterError::Canonicalize)?; + tokio::fs::write( + &join_txt_path, + format!( + "file '{}'\nfile '{}'", + second_last_part_path, last_part_path + ), + ) + .await + .map_err(SplitterError::Write)?; + + run_ffmpeg_concat( + join_txt_path + .to_str() + .ok_or_else(|| SplitterError::PathToString(join_txt_path.clone()))? + .to_string(), + join_out_tmp_path + .to_str() + .ok_or_else(|| SplitterError::PathToString(join_out_tmp_path.clone()))? + .to_string(), + ) + .await?; + debug!( + "removing files: {:?}, {:?}, {:?}", + second_last_part.path, last_part.path, join_txt_path + ); + tokio::fs::remove_file(last_part.path) + .await + .map_err(SplitterError::Write)?; + tokio::fs::remove_file(&second_last_part.path) + .await + .map_err(SplitterError::Write)?; + tokio::fs::remove_file(join_txt_path) + .await + .map_err(SplitterError::Write)?; + debug!( + "renaming file: {:?} to {:?}", + join_out_tmp_path, second_last_part.path + ); + tokio::fs::rename(join_out_tmp_path, &second_last_part.path) + .await + .map_err(SplitterError::Write)?; + Ok(()) +} + +pub(crate) async fn get_playlist_info(playlist_path: &PathBuf) -> Result { + let mut total_duration = Duration::zero(); + let mut parts: Vec = vec![]; + + let lines = tokio::fs::read_to_string(playlist_path) + .await + .map_err(SplitterError::Read)?; + + let mut last_duration = None; + for line in lines.lines() { + if line.starts_with("#EXTINF:") { + let time_str = line + .strip_prefix("#EXTINF:") + .context("could not strip prefix") + .map_err(SplitterError::PlaylistParse)?; + let time_str = time_str.split(',').next().unwrap_or(time_str); + let time_str = time_str.trim(); + let duration = Duration::milliseconds( + (1000.0 + * time_str + .parse::() + .context("could not parse the part duration") + .map_err(SplitterError::PlaylistParse)?) as u64 as i64, + ); + last_duration = Some(duration); + total_duration = total_duration + duration; + } else if line.starts_with("#EXT-X-ENDLIST") { + break; + } else if lines.starts_with("#EXT") { + trace!("unknown line in playlist: {}", line); + continue; + } else if let Some(duration) = last_duration { + let path = PathBuf::from(line.trim().to_string()); + parts.push(PartInfo { duration, path }); + last_duration = None; + } + } + if parts.is_empty() { + return Err(SplitterError::PlaylistEmpty); + } + + Ok(PlaylistInfo { + total_duration, + parts, + }) +} +impl PlaylistInfo { + pub(crate) fn last_part(&self) -> Option<&PartInfo> { + self.parts.last() + } + pub(crate) fn second_last_part(&self) -> Option<&PartInfo> { + if self.parts.len() < 2 { + return None; + } + self.parts.get(self.parts.len() - 2) + } +} +#[derive(Debug)] +pub(crate) struct PlaylistInfo { + pub total_duration: Duration, + pub parts: Vec, +} +#[derive(Debug)] +pub(crate) struct PartInfo { + pub duration: Duration, + pub path: PathBuf, +} + +/// joins two paths together, canonicalizes them and returns them as a string +fn combine_path_as_string(base: &Path, path: &Path) -> Result { + let path = base.join(path); + let path = path.canonicalize().map_err(SplitterError::Canonicalize)?; + let path = path + .to_str() + .ok_or_else(|| SplitterError::PathToString(path.clone()))? + .to_string(); + Ok(path) +} + +pub mod ffmpeg; diff --git a/src/client/utils/ffmpeg.rs b/src/client/utils/ffmpeg.rs new file mode 100644 index 0000000..96aa669 --- /dev/null +++ b/src/client/utils/ffmpeg.rs @@ -0,0 +1,79 @@ +use super::*; + +pub(crate) async fn run_ffmpeg_concat( + join_txt_path: impl Into, + join_out_path: impl Into, +) -> Result<()> { + let join_txt_path = join_txt_path.into(); + let join_out_path = join_out_path.into(); + + debug!( + "Running ffmpeg command: ffmpeg -f concat -safe 0 -i {:?} -c copy {:?}", + join_txt_path, join_out_path + ); + Command::new("ffmpeg") + .args([ + "-f", + "concat", + "-safe", + "0", + "-i", + &join_txt_path, + "-c", + "copy", + &join_out_path, + ]) + .output() + .await + .map_err(SplitterError::FfmpegCommand)?; + debug!("Finished running ffmpeg command"); + Ok(()) +} + +pub(crate) async fn run_ffmpeg_split( + input: &Path, + output_pattern: &String, + output_playlist: &Path, + target_duration: &Duration, +) -> Result<()> { + let split_duration_str = duration_to_string(target_duration); + debug!( + "Running ffmpeg command: ffmpeg -i {:?} -c copy -map 0 -segment_time {} -reset_timestamps 1 \ + -segment_list {} -segment_list_type m3u8 -avoid_negative_ts 1 -f segment {}", + input, + split_duration_str, + output_playlist.display(), + output_pattern +); + Command::new("ffmpeg") + .args([ + "-i", + input + .to_str() + .ok_or_else(|| SplitterError::PathToString(input.to_path_buf()))?, + "-c", + "copy", + "-map", + "0", + "-segment_time", + &split_duration_str, + "-reset_timestamps", + "1", + "-segment_list", + output_playlist + .to_str() + .ok_or_else(|| SplitterError::PathToString(output_playlist.to_path_buf()))?, + "-segment_list_type", + "m3u8", + "-avoid_negative_ts", + "1", + "-f", + "segment", + output_pattern, + ]) + .output() + .await + .map_err(SplitterError::FfmpegCommand)?; + debug!("Finished running ffmpeg command"); + Ok(()) +} diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..30a7f68 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,39 @@ +use std::path::PathBuf; +use tokio::io; + +#[derive(Debug, thiserror::Error)] +pub enum SplitterError { + #[error("Could not load config")] + LoadConfig(#[source] anyhow::Error), + + #[error("Some error with the database")] + OpenDatabase(#[from] local_db::re_exports::sea_orm::DbErr), + + #[error("File or Folder not found or invalid: {0:?}")] + NotFound(PathBuf), + #[error("Input File was not a valid input: {0:?}")] + InvalidInputFile(PathBuf), + + #[error("Could not create folder: {0:?}")] + CreateFolder(#[source] io::Error), + #[error("Could not read from filesystem: {0:?}")] + Read(#[source] io::Error), + #[error("Could not write to filesystem: {0:?}")] + Write(#[source] io::Error), + + #[error("Path could not be canonicalized: {0:?}")] + Canonicalize(#[source] io::Error), + #[error("Could not convert path to string: {0:?}")] + PathToString(PathBuf), + + #[error("Something went wrong during the ffmpeg command")] + FfmpegCommand(#[source] io::Error), + + #[error("Could not parse the playlist")] + PlaylistParse(#[source] anyhow::Error), + #[error("Playlist was empty/did not contain any parts")] + PlaylistEmpty, + + #[error("Joining two parts requires at least two parts in the list")] + JoinRequiresAtLeastTwoParts, +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ce61eca --- /dev/null +++ b/src/main.rs @@ -0,0 +1,38 @@ +use crate::errors::SplitterError; +use backup_config::prelude::Config; +use backup_config::Conf; +use prelude::*; + +pub mod client; +pub mod errors; +pub mod prelude; +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_env_filter( + "sea_orm=warn,sea_orm_migration=warn,sqlx=warn,splitter=trace,local_db=warn,other=warn", + ) + .init(); + info!("Hello, world!"); + + run().await?; + + info!("Bye"); + Ok(()) +} +async fn run() -> Result<()> { + let conf = Conf::builder() + .env() + .file("./settings.toml") + .file(shellexpand::tilde("~/twba/config.toml").into_owned()) + .load() + .map_err(|e| SplitterError::LoadConfig(e.into()))?; + + let db = local_db::open_database(Some(&conf.db_url)).await?; + local_db::migrate_db(&db).await?; + + let client = client::SplitterClient::new(conf, db); + client.split_videos().await?; + Ok(()) +} diff --git a/src/prelude.rs b/src/prelude.rs new file mode 100644 index 0000000..1a83a5a --- /dev/null +++ b/src/prelude.rs @@ -0,0 +1,6 @@ +use crate::errors::SplitterError; +pub(crate) use std::result::Result as StdResult; + +pub type Result = StdResult; + +pub(crate) use tracing::{debug, error, info, trace, warn}; diff --git a/tests/data/sample_output_playlist.m3u8 b/tests/data/sample_output_playlist.m3u8 new file mode 100644 index 0000000..584ea21 --- /dev/null +++ b/tests/data/sample_output_playlist.m3u8 @@ -0,0 +1,10 @@ +#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-ALLOW-CACHE:YES +#EXT-X-TARGETDURATION:18002 +#EXTINF:18001.720898, +000.mp4 +#EXTINF:14633.040755, +001.mp4 +#EXT-X-ENDLIST \ No newline at end of file