Initial version (should work pretty well)

This commit is contained in:
OMGeeky
2023-10-15 13:23:53 +02:00
commit 083fb7f998
9 changed files with 557 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
/target
/.idea
/Cargo.lock
/.cargo

21
Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[package]
name = "splitter"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
backup-config = { version = "0.1", git="https://github.com/OMGeeky/backup_config.git" }
local-db = { version = "0.2", git="https://github.com/OMGeeky/twitch_backup.local_db.git" }
tokio = "1.33"
tracing = "0.1"
tracing-subscriber = "0.3"
thiserror = "1.0"
anyhow = "1.0"
chrono = "0.4"
shellexpand = "3.1"

157
src/client.rs Normal file
View File

@@ -0,0 +1,157 @@
use crate::errors::SplitterError;
use crate::prelude::*;
use backup_config::Conf;
use chrono::Duration;
use local_db::prelude::{Status, Videos, VideosColumn, VideosModel};
use local_db::re_exports::sea_orm::{
ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter,
};
use std::path::{Path, PathBuf};
use std::time::Instant;
use tokio::fs;
mod utils;
use utils::ffmpeg::run_ffmpeg_split;
pub struct SplitterClient {
conf: Conf,
db: DatabaseConnection,
}
impl SplitterClient {
pub fn new(conf: Conf, db: DatabaseConnection) -> Self {
Self { conf, db }
}
}
impl SplitterClient {
#[tracing::instrument(skip(self))]
async fn split_video(&self, video: VideosModel) -> Result<()> {
//
let id = video.twitch_id.clone();
let mut video = video.into_active_model();
video.status = ActiveValue::Set(Status::Splitting);
video.clone().update(&self.db).await?;
let result = self.inner_split_video(id.clone()).await;
match result {
Ok(_) => {
video.status = ActiveValue::Set(Status::Split);
video.clone().update(&self.db).await?;
}
Err(err) => {
video.status = ActiveValue::Set(Status::SplitFailed);
video.clone().update(&self.db).await?;
return Err(err);
}
}
Ok(())
}
async fn inner_split_video(&self, id: String) -> Result<()> {
let base_path = Path::new(&self.conf.download_folder_path);
let input_path = base_path.join(format!("{}.mp4", id));
let output_folder_path = base_path.join(&id);
info!("Splitting video with id: {}", id);
verify_paths(base_path, &input_path, &output_folder_path).await?;
let output_path_pattern = output_folder_path.join("%03d.mp4");
let output_path_pattern = output_path_pattern
.to_str()
.ok_or_else(|| SplitterError::PathToString(output_path_pattern.clone()))?
.to_string();
let split_playlist_path = output_folder_path.join("output.m3u8");
debug!("output_path_pattern: {}", output_path_pattern);
let duration_soft_cap = Duration::minutes(
self.conf
.google
.youtube
.default_video_length_minutes_soft_cap,
);
let duration_hard_cap = Duration::minutes(
self.conf
.google
.youtube
.default_video_length_minutes_hard_cap,
);
//todo: get a user specific soft and hard cap
info!("splitting video at path: {:?}", input_path);
let start_time = Instant::now();
run_ffmpeg_split(
&input_path,
&output_path_pattern,
&split_playlist_path,
&duration_soft_cap,
)
.await?;
let duration = Instant::now().duration_since(start_time);
info!("FFMPEG-Splitting took: {:?}", duration);
let split_info = utils::get_playlist_info(&split_playlist_path).await?;
tokio::fs::remove_file(&split_playlist_path)
.await
.map_err(SplitterError::Write)?;
trace!(
"total duration: {} in {} parts",
split_info.total_duration.to_string(),
split_info.parts.len()
);
let paths =
utils::join_last_parts_if_needed(split_info, &output_folder_path, duration_hard_cap)
.await?;
debug!("removing original file: {:?}", input_path);
tokio::fs::remove_file(&input_path)
.await
.map_err(SplitterError::Write)?;
let duration = Instant::now().duration_since(start_time);
info!("Done Splitting. Whole operation took: {:?}", duration);
debug!("paths: {:?}", paths);
Ok(())
}
#[tracing::instrument(skip(self))]
pub async fn split_videos(&self) -> Result<()> {
info!("Splitting videos");
let videos = Videos::find()
.filter(VideosColumn::Status.eq(Status::Downloaded))
.all(&self.db)
.await?;
for video in videos {
info!("Splitting video: {:?}", video);
let id = video.id;
let success = self.split_video(video).await;
if let Err(err) = success {
error!(
"Could not split video with id: {} because of err: {:?}",
id, err
);
} else {
info!("Split video with id: {}", id);
}
}
info!("Finished splitting videos");
Ok(())
}
}
async fn verify_paths(
base_path: &Path,
input_path: &Path,
output_folder_path: &PathBuf,
) -> Result<()> {
if !base_path.exists() || !input_path.exists() {
return Err(SplitterError::NotFound(input_path.to_path_buf()));
}
if !input_path.is_file() {
return Err(SplitterError::InvalidInputFile(input_path.to_path_buf()));
}
fs::create_dir_all(&output_folder_path)
.await
.map_err(SplitterError::CreateFolder)?;
Ok(())
}

203
src/client/utils.rs Normal file
View File

@@ -0,0 +1,203 @@
use crate::errors::SplitterError;
use crate::prelude::*;
use anyhow::Context;
use chrono::Duration;
use std::path::{Path, PathBuf};
use tokio::process::Command;
/// Converts a duration to a string that is usable for example in an ffmpeg command
///
/// Example:
///
/// ```
/// use chrono::Duration;
/// let duration: Duration = Duration::seconds(20);
/// let s = downloader::duration_to_string(&duration);
/// assert_eq!(s, "00:00:20");
/// ```
pub fn duration_to_string(duration: &Duration) -> String {
trace!("duration to string for duration: {:?}", duration);
let seconds = duration.num_seconds();
let hours = seconds / 3600;
let minutes = (seconds % 3600) / 60;
let seconds = seconds % 60;
format!("{:02}:{:02}:{:02}", hours, minutes, seconds)
}
pub(super) async fn join_last_parts_if_needed(
mut input_parts: PlaylistInfo,
base_folder: &Path,
duration_cap: Duration,
) -> Result<Vec<PathBuf>> {
info!("joining last parts if needed");
let last_part = input_parts.last_part();
let second_last_part = input_parts.second_last_part();
if let Some(last_part) = last_part {
if let Some(second_last_path) = second_last_part {
let joined_duration = last_part.duration + second_last_path.duration;
if joined_duration <= duration_cap {
//join together
join_last_two_parts(&mut input_parts, base_folder).await?;
info!("joined last two parts together");
} else {
info!("last two parts are too long to join together");
}
} else {
info!("there is only one part, so we can't join anything");
}
} else {
warn!("there are no parts, so we can't join anything");
}
input_parts
.parts
.iter()
.map(|part| Ok(base_folder.join(&part.path)))
.collect()
}
async fn join_last_two_parts(input_parts: &mut PlaylistInfo, base_folder: &Path) -> Result<()> {
let last_part = input_parts
.parts
.pop()
.ok_or(SplitterError::JoinRequiresAtLeastTwoParts)?;
let second_last_part = input_parts
.parts
.last_mut()
.ok_or(SplitterError::JoinRequiresAtLeastTwoParts)?;
second_last_part.duration = second_last_part.duration + last_part.duration;
let second_last_part_path = combine_path_as_string(base_folder, &second_last_part.path)?;
let last_part_path = combine_path_as_string(base_folder, &last_part.path)?;
let join_txt_path = base_folder
.join("join.txt")
.canonicalize()
.map_err(SplitterError::Canonicalize)?;
let join_out_tmp_path = base_folder
.join("join_out_tmp.mp4")
.canonicalize()
.map_err(SplitterError::Canonicalize)?;
tokio::fs::write(
&join_txt_path,
format!(
"file '{}'\nfile '{}'",
second_last_part_path, last_part_path
),
)
.await
.map_err(SplitterError::Write)?;
run_ffmpeg_concat(
join_txt_path
.to_str()
.ok_or_else(|| SplitterError::PathToString(join_txt_path.clone()))?
.to_string(),
join_out_tmp_path
.to_str()
.ok_or_else(|| SplitterError::PathToString(join_out_tmp_path.clone()))?
.to_string(),
)
.await?;
debug!(
"removing files: {:?}, {:?}, {:?}",
second_last_part.path, last_part.path, join_txt_path
);
tokio::fs::remove_file(last_part.path)
.await
.map_err(SplitterError::Write)?;
tokio::fs::remove_file(&second_last_part.path)
.await
.map_err(SplitterError::Write)?;
tokio::fs::remove_file(join_txt_path)
.await
.map_err(SplitterError::Write)?;
debug!(
"renaming file: {:?} to {:?}",
join_out_tmp_path, second_last_part.path
);
tokio::fs::rename(join_out_tmp_path, &second_last_part.path)
.await
.map_err(SplitterError::Write)?;
Ok(())
}
pub(crate) async fn get_playlist_info(playlist_path: &PathBuf) -> Result<PlaylistInfo> {
let mut total_duration = Duration::zero();
let mut parts: Vec<PartInfo> = vec![];
let lines = tokio::fs::read_to_string(playlist_path)
.await
.map_err(SplitterError::Read)?;
let mut last_duration = None;
for line in lines.lines() {
if line.starts_with("#EXTINF:") {
let time_str = line
.strip_prefix("#EXTINF:")
.context("could not strip prefix")
.map_err(SplitterError::PlaylistParse)?;
let time_str = time_str.split(',').next().unwrap_or(time_str);
let time_str = time_str.trim();
let duration = Duration::milliseconds(
(1000.0
* time_str
.parse::<f64>()
.context("could not parse the part duration")
.map_err(SplitterError::PlaylistParse)?) as u64 as i64,
);
last_duration = Some(duration);
total_duration = total_duration + duration;
} else if line.starts_with("#EXT-X-ENDLIST") {
break;
} else if lines.starts_with("#EXT") {
trace!("unknown line in playlist: {}", line);
continue;
} else if let Some(duration) = last_duration {
let path = PathBuf::from(line.trim().to_string());
parts.push(PartInfo { duration, path });
last_duration = None;
}
}
if parts.is_empty() {
return Err(SplitterError::PlaylistEmpty);
}
Ok(PlaylistInfo {
total_duration,
parts,
})
}
impl PlaylistInfo {
pub(crate) fn last_part(&self) -> Option<&PartInfo> {
self.parts.last()
}
pub(crate) fn second_last_part(&self) -> Option<&PartInfo> {
if self.parts.len() < 2 {
return None;
}
self.parts.get(self.parts.len() - 2)
}
}
#[derive(Debug)]
pub(crate) struct PlaylistInfo {
pub total_duration: Duration,
pub parts: Vec<PartInfo>,
}
#[derive(Debug)]
pub(crate) struct PartInfo {
pub duration: Duration,
pub path: PathBuf,
}
/// joins two paths together, canonicalizes them and returns them as a string
fn combine_path_as_string(base: &Path, path: &Path) -> Result<String> {
let path = base.join(path);
let path = path.canonicalize().map_err(SplitterError::Canonicalize)?;
let path = path
.to_str()
.ok_or_else(|| SplitterError::PathToString(path.clone()))?
.to_string();
Ok(path)
}
pub mod ffmpeg;

View File

@@ -0,0 +1,79 @@
use super::*;
pub(crate) async fn run_ffmpeg_concat(
join_txt_path: impl Into<String>,
join_out_path: impl Into<String>,
) -> Result<()> {
let join_txt_path = join_txt_path.into();
let join_out_path = join_out_path.into();
debug!(
"Running ffmpeg command: ffmpeg -f concat -safe 0 -i {:?} -c copy {:?}",
join_txt_path, join_out_path
);
Command::new("ffmpeg")
.args([
"-f",
"concat",
"-safe",
"0",
"-i",
&join_txt_path,
"-c",
"copy",
&join_out_path,
])
.output()
.await
.map_err(SplitterError::FfmpegCommand)?;
debug!("Finished running ffmpeg command");
Ok(())
}
pub(crate) async fn run_ffmpeg_split(
input: &Path,
output_pattern: &String,
output_playlist: &Path,
target_duration: &Duration,
) -> Result<()> {
let split_duration_str = duration_to_string(target_duration);
debug!(
"Running ffmpeg command: ffmpeg -i {:?} -c copy -map 0 -segment_time {} -reset_timestamps 1 \
-segment_list {} -segment_list_type m3u8 -avoid_negative_ts 1 -f segment {}",
input,
split_duration_str,
output_playlist.display(),
output_pattern
);
Command::new("ffmpeg")
.args([
"-i",
input
.to_str()
.ok_or_else(|| SplitterError::PathToString(input.to_path_buf()))?,
"-c",
"copy",
"-map",
"0",
"-segment_time",
&split_duration_str,
"-reset_timestamps",
"1",
"-segment_list",
output_playlist
.to_str()
.ok_or_else(|| SplitterError::PathToString(output_playlist.to_path_buf()))?,
"-segment_list_type",
"m3u8",
"-avoid_negative_ts",
"1",
"-f",
"segment",
output_pattern,
])
.output()
.await
.map_err(SplitterError::FfmpegCommand)?;
debug!("Finished running ffmpeg command");
Ok(())
}

39
src/errors.rs Normal file
View File

@@ -0,0 +1,39 @@
use std::path::PathBuf;
use tokio::io;
#[derive(Debug, thiserror::Error)]
pub enum SplitterError {
#[error("Could not load config")]
LoadConfig(#[source] anyhow::Error),
#[error("Some error with the database")]
OpenDatabase(#[from] local_db::re_exports::sea_orm::DbErr),
#[error("File or Folder not found or invalid: {0:?}")]
NotFound(PathBuf),
#[error("Input File was not a valid input: {0:?}")]
InvalidInputFile(PathBuf),
#[error("Could not create folder: {0:?}")]
CreateFolder(#[source] io::Error),
#[error("Could not read from filesystem: {0:?}")]
Read(#[source] io::Error),
#[error("Could not write to filesystem: {0:?}")]
Write(#[source] io::Error),
#[error("Path could not be canonicalized: {0:?}")]
Canonicalize(#[source] io::Error),
#[error("Could not convert path to string: {0:?}")]
PathToString(PathBuf),
#[error("Something went wrong during the ffmpeg command")]
FfmpegCommand(#[source] io::Error),
#[error("Could not parse the playlist")]
PlaylistParse(#[source] anyhow::Error),
#[error("Playlist was empty/did not contain any parts")]
PlaylistEmpty,
#[error("Joining two parts requires at least two parts in the list")]
JoinRequiresAtLeastTwoParts,
}

38
src/main.rs Normal file
View File

@@ -0,0 +1,38 @@
use crate::errors::SplitterError;
use backup_config::prelude::Config;
use backup_config::Conf;
use prelude::*;
pub mod client;
pub mod errors;
pub mod prelude;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_env_filter(
"sea_orm=warn,sea_orm_migration=warn,sqlx=warn,splitter=trace,local_db=warn,other=warn",
)
.init();
info!("Hello, world!");
run().await?;
info!("Bye");
Ok(())
}
async fn run() -> Result<()> {
let conf = Conf::builder()
.env()
.file("./settings.toml")
.file(shellexpand::tilde("~/twba/config.toml").into_owned())
.load()
.map_err(|e| SplitterError::LoadConfig(e.into()))?;
let db = local_db::open_database(Some(&conf.db_url)).await?;
local_db::migrate_db(&db).await?;
let client = client::SplitterClient::new(conf, db);
client.split_videos().await?;
Ok(())
}

6
src/prelude.rs Normal file
View File

@@ -0,0 +1,6 @@
use crate::errors::SplitterError;
pub(crate) use std::result::Result as StdResult;
pub type Result<T> = StdResult<T, SplitterError>;
pub(crate) use tracing::{debug, error, info, trace, warn};

View File

@@ -0,0 +1,10 @@
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-ALLOW-CACHE:YES
#EXT-X-TARGETDURATION:18002
#EXTINF:18001.720898,
000.mp4
#EXTINF:14633.040755,
001.mp4
#EXT-X-ENDLIST