mirror of
https://github.com/OMGeeky/twba.uploader.git
synced 2026-02-23 15:49:58 +01:00
Initial prototype
This commit is contained in:
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
/target
|
||||
/.idea
|
||||
3543
Cargo.lock
generated
Normal file
3543
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
34
Cargo.toml
Normal file
34
Cargo.toml
Normal file
@@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "uploader"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
backup-config = { version = "0.1.2", git = "https://github.com/OMGeeky/backup_config.git" }
|
||||
local-db = { version = "0.2", git = "https://github.com/OMGeeky/twitch_backup.local_db.git" }
|
||||
reqwest-backoff = { version = "0.1", git = "https://github.com/OMGeeky/twba_reqwest_backoff.git" }
|
||||
|
||||
|
||||
tracing-subscriber = "0.3"
|
||||
shellexpand = "3.1"
|
||||
|
||||
tracing = "0.1"
|
||||
tokio = { version = "1.33", features = ["rt", "rt-multi-thread", "macros"] }
|
||||
|
||||
thiserror = "1.0"
|
||||
anyhow = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
reqwest = "0.11"
|
||||
chrono = "0.4"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
|
||||
google-youtube3 = "5.0"
|
||||
yup-oauth2 = "8.3"
|
||||
strfmt = "0.2"
|
||||
|
||||
|
||||
lazy_static = "1.4"
|
||||
276
src/client.rs
Normal file
276
src/client.rs
Normal file
@@ -0,0 +1,276 @@
|
||||
use crate::prelude::*;
|
||||
use crate::CONF;
|
||||
use anyhow::{anyhow, Context};
|
||||
use google_youtube3::api::Scope;
|
||||
use lazy_static::lazy_static;
|
||||
use local_db::entities::video_upload::{ActiveModel as VideoUploadActiveModel, UploadStatus};
|
||||
use local_db::prelude::*;
|
||||
use local_db::re_exports::sea_orm::{
|
||||
ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
|
||||
Order, QueryFilter, QueryOrder,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::ffi::OsStr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tracing::instrument;
|
||||
use tracing_subscriber::fmt::format;
|
||||
|
||||
mod youtube;
|
||||
|
||||
lazy_static! {
|
||||
static ref YOUTUBE_DEFAULT_SCOPES: Vec<Scope> =
|
||||
vec![Scope::Upload, Scope::Readonly, Scope::Full];
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub struct UploaderClient {
|
||||
db: DatabaseConnection,
|
||||
reqwest_client: reqwest::Client,
|
||||
youtube_client: HashMap<String, youtube::YoutubeClient>,
|
||||
}
|
||||
|
||||
impl UploaderClient {
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub(crate) async fn upload_videos(&self) -> Result<()> {
|
||||
let videos = Videos::find()
|
||||
.filter(VideosColumn::Status.eq(Status::Split))
|
||||
.order_by(VideosColumn::CreatedAt, Order::Asc)
|
||||
.all(&self.db)
|
||||
.await?;
|
||||
let count = videos.len();
|
||||
info!("got {} videos to upload", count);
|
||||
|
||||
'video_loop: for video in videos {
|
||||
match self.upload_video(&video).await {
|
||||
Ok(_) => {
|
||||
info!("Uploaded video: {}: {}", video.id, video.name);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error while uploading the video: {}: {}", video.id, e);
|
||||
|
||||
{
|
||||
let fail_count = video.fail_count + 1;
|
||||
let previous_fails = video
|
||||
.fail_reason
|
||||
.as_ref()
|
||||
.unwrap_or(&String::new())
|
||||
.to_string();
|
||||
let mut video = video.clone().into_active_model();
|
||||
video.fail_count = ActiveValue::Set(fail_count);
|
||||
video.fail_reason = ActiveValue::Set(Some(format!(
|
||||
"{}: {}\n\n{}",
|
||||
fail_count, e, previous_fails
|
||||
)));
|
||||
}
|
||||
// self.set_video_status_on_db(&video, Status::UploadFailed)
|
||||
// .await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//todo: maybe add some log to the db when videos were last uploaded?
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self, video), fields(id=video.id))]
|
||||
async fn upload_video(&self, video: &VideosModel) -> Result<()> {
|
||||
let video_id = video.id;
|
||||
trace!("uploading video: {:?}", video);
|
||||
let client_for_video = self.get_client_for_video(video)?;
|
||||
|
||||
self.set_video_status_on_db(video, Status::Uploading)
|
||||
.await?;
|
||||
|
||||
let part_count = video.part_count;
|
||||
let parts_folder_path = Path::new(&CONF.download_folder_path).join(video_id.to_string());
|
||||
let parts = get_part_files(&parts_folder_path, part_count).await?;
|
||||
dbg!(&parts);
|
||||
|
||||
let playlist_id = client_for_video.create_playlist(video).await?;
|
||||
self.set_playlist_id_for_video(video, playlist_id.clone())
|
||||
.await?;
|
||||
|
||||
'part_loop: for (part, part_number) in parts {
|
||||
let mut video_upload = self
|
||||
.insert_video_upload(video_id, part_number)
|
||||
.await?
|
||||
.into_active_model();
|
||||
|
||||
let upload = client_for_video
|
||||
.upload_video_part(video, &part, part_number)
|
||||
.await;
|
||||
match upload {
|
||||
Ok(uploaded_video_id) => {
|
||||
dbg!(&uploaded_video_id);
|
||||
client_for_video
|
||||
.add_video_to_playlist(uploaded_video_id.clone(), playlist_id.clone())
|
||||
.await?;
|
||||
video_upload.upload_status = ActiveValue::Set(UploadStatus::Uploaded);
|
||||
video_upload.youtube_video_id = ActiveValue::Set(Some(uploaded_video_id));
|
||||
video_upload = video_upload.update(&self.db).await?.into_active_model();
|
||||
}
|
||||
Err(e) => {
|
||||
error!("could not upload part: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
self.set_video_status_on_db(video, Status::PartiallyUploaded)
|
||||
.await?;
|
||||
}
|
||||
|
||||
self.set_video_status_on_db(video, Status::Uploaded).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_video_upload(
|
||||
&self,
|
||||
video_id: i32,
|
||||
part_number: usize,
|
||||
) -> Result<VideoUploadModel> {
|
||||
let video_upload = VideoUploadModel {
|
||||
video_id,
|
||||
part: part_number as i32,
|
||||
upload_status: UploadStatus::Uploading,
|
||||
youtube_video_id: None,
|
||||
}
|
||||
.into_active_model();
|
||||
let x = VideoUpload::insert(video_upload);
|
||||
let x = x.exec_with_returning(&self.db).await?;
|
||||
Ok(x)
|
||||
}
|
||||
|
||||
async fn set_playlist_id_for_video(
|
||||
&self,
|
||||
video: &VideosModel,
|
||||
playlist_id: String,
|
||||
) -> Result<()> {
|
||||
let mut video = video.clone().into_active_model();
|
||||
video.youtube_playlist_id = ActiveValue::Set(Some(playlist_id));
|
||||
video.update(&self.db).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn add_video_to_playlist(&self, video: &VideosModel, playlist_id: String) -> Result<()> {
|
||||
let mut video = video.clone().into_active_model();
|
||||
video.youtube_playlist_id = ActiveValue::Set(Some(playlist_id));
|
||||
video.update(&self.db).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, video))]
|
||||
async fn set_video_status_on_db(&self, video: &VideosModel, status: Status) -> Result<()> {
|
||||
trace!("setting status of video {} to {:?}", video.id, status);
|
||||
let mut active_video = video.clone().into_active_model();
|
||||
active_video.status = ActiveValue::Set(status);
|
||||
active_video
|
||||
.update(&self.db)
|
||||
.await
|
||||
.context("could not save video status")?;
|
||||
Ok(())
|
||||
}
|
||||
#[tracing::instrument(skip(self, video_upload))]
|
||||
async fn set_video_upload_status_on_db(
|
||||
&self,
|
||||
video_upload: &VideoUploadModel,
|
||||
status: UploadStatus,
|
||||
) -> Result<()> {
|
||||
trace!(
|
||||
"setting status of video upload {}:{} to {:?}",
|
||||
video_upload.video_id,
|
||||
video_upload.part,
|
||||
status
|
||||
);
|
||||
let mut active_video = video_upload.clone().into_active_model();
|
||||
active_video.upload_status = ActiveValue::Set(status);
|
||||
active_video
|
||||
.update(&self.db)
|
||||
.await
|
||||
.context("could not save video upload status")?;
|
||||
Ok(())
|
||||
}
|
||||
fn get_client_for_video(&self, video: &VideosModel) -> Result<&youtube::YoutubeClient> {
|
||||
let c = self
|
||||
.youtube_client
|
||||
.get(&video.id.to_string())
|
||||
.context("could not get youtube client for video")?;
|
||||
Ok(c)
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_part_files(folder_path: &Path, count: i32) -> Result<Vec<(PathBuf, usize)>> {
|
||||
let mut parts = Vec::new();
|
||||
trace!(
|
||||
"getting {} parts from folder '{}'",
|
||||
count,
|
||||
folder_path.display()
|
||||
);
|
||||
let x = folder_path
|
||||
.read_dir()
|
||||
.context("could not read parts folder")?;
|
||||
for path in x {
|
||||
let path = path.context("could not read path")?;
|
||||
let path = path.path();
|
||||
let part_number = get_part_number_from_path(&path)?;
|
||||
dbg!(part_number);
|
||||
parts.push((path, part_number));
|
||||
}
|
||||
if parts.len() != count as usize {
|
||||
return Err(anyhow!(
|
||||
"part count does not match: expected: {}, got: {}",
|
||||
count,
|
||||
parts.len()
|
||||
)
|
||||
.into());
|
||||
}
|
||||
parts.sort_by_key(|a| a.1);
|
||||
Ok(parts)
|
||||
}
|
||||
|
||||
fn get_part_number_from_path(path: &Path) -> Result<usize> {
|
||||
match path.extension() {
|
||||
None => {
|
||||
warn!("path has no extension: {:?}", path);
|
||||
}
|
||||
Some(e) => {
|
||||
if e == OsStr::new("mp4") {
|
||||
let part_number = path
|
||||
.file_stem()
|
||||
.context("could not get file stem")?
|
||||
.to_str()
|
||||
.context("could not convert path to string")?
|
||||
.to_string();
|
||||
let part_number = part_number
|
||||
.parse::<usize>()
|
||||
.context("could not parse path")?;
|
||||
return Ok(part_number);
|
||||
}
|
||||
warn!("path has not the expected extension (.mp4): {:?}", path);
|
||||
}
|
||||
}
|
||||
Err(anyhow!("wrong file extension").into())
|
||||
}
|
||||
|
||||
impl UploaderClient {
|
||||
pub async fn new(db: DatabaseConnection) -> Result<Self> {
|
||||
let reqwest_client = reqwest::Client::new();
|
||||
|
||||
let mut clients = HashMap::new();
|
||||
|
||||
let users = local_db::get_watched_users(&db).await?;
|
||||
for user in users {
|
||||
let user_id = user.id.to_string();
|
||||
let client = youtube::YoutubeClient::new(&YOUTUBE_DEFAULT_SCOPES, Some(user)).await?;
|
||||
clients.insert(user_id, client);
|
||||
}
|
||||
if clients.is_empty() {
|
||||
//insert default user/client
|
||||
let client = youtube::YoutubeClient::new(&YOUTUBE_DEFAULT_SCOPES, None).await?;
|
||||
clients.insert("unknown".into(), client);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
db,
|
||||
reqwest_client,
|
||||
youtube_client: clients,
|
||||
})
|
||||
}
|
||||
}
|
||||
318
src/client/youtube.rs
Normal file
318
src/client/youtube.rs
Normal file
@@ -0,0 +1,318 @@
|
||||
use anyhow::Context;
|
||||
use chrono::{Datelike, NaiveDateTime, ParseResult, Utc};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use crate::prelude::*;
|
||||
use google_youtube3::api::enums::{PlaylistStatuPrivacyStatusEnum, VideoStatuPrivacyStatusEnum};
|
||||
use google_youtube3::api::{
|
||||
Playlist, PlaylistSnippet, PlaylistStatus, Scope, VideoSnippet, VideoStatus,
|
||||
};
|
||||
use google_youtube3::api::{PlaylistItem, PlaylistItemSnippet, ResourceId, Video};
|
||||
use google_youtube3::{
|
||||
hyper,
|
||||
hyper::client::HttpConnector,
|
||||
hyper::Client,
|
||||
hyper_rustls::{HttpsConnector, HttpsConnectorBuilder},
|
||||
Error as YoutubeError,
|
||||
};
|
||||
use local_db::entities::videos::Model;
|
||||
use local_db::prelude::{UsersModel, VideosModel};
|
||||
use tokio::fs;
|
||||
use tracing::instrument;
|
||||
|
||||
mod auth;
|
||||
mod flow_delegate;
|
||||
|
||||
pub struct YoutubeClient {
|
||||
//TODO: change this to a thing that does exponential backoff when possible
|
||||
client: google_youtube3::YouTube<HttpsConnector<HttpConnector>>,
|
||||
user: Option<UsersModel>,
|
||||
}
|
||||
|
||||
impl YoutubeClient {
|
||||
#[instrument(skip(self, video, path))]
|
||||
pub(crate) async fn upload_video_part(
|
||||
&self,
|
||||
video: &VideosModel,
|
||||
path: &Path,
|
||||
part_num: usize,
|
||||
) -> Result<String> {
|
||||
trace!(
|
||||
"uploading part {} for video: {} from path: {}",
|
||||
part_num,
|
||||
video.id,
|
||||
path.display()
|
||||
);
|
||||
let title = create_youtube_title(video, TitleLocation::VideoTitle(part_num))?;
|
||||
let description = format!(
|
||||
"default description for video: {}",
|
||||
create_youtube_title(video, TitleLocation::Descriptions)?
|
||||
);
|
||||
let tags = vec![];
|
||||
let privacy_status = VideoStatuPrivacyStatusEnum::Private;
|
||||
self.upload_youtube_video_resumable(title, description, tags, privacy_status, path)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn upload_youtube_video_resumable(
|
||||
&self,
|
||||
title: impl Into<String>,
|
||||
description: impl Into<String>,
|
||||
tags: impl Into<Vec<String>>,
|
||||
privacy_status: VideoStatuPrivacyStatusEnum,
|
||||
path: &Path,
|
||||
) -> Result<String> {
|
||||
let video = Video {
|
||||
snippet: Some(VideoSnippet {
|
||||
title: Some(title.into()),
|
||||
description: Some(description.into()),
|
||||
category_id: Some("20".to_string()),
|
||||
tags: Some(tags.into()),
|
||||
..Default::default()
|
||||
}),
|
||||
status: Some(VideoStatus {
|
||||
privacy_status: Some(privacy_status),
|
||||
public_stats_viewable: Some(true),
|
||||
embeddable: Some(true),
|
||||
self_declared_made_for_kids: Some(false),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let stream = fs::File::open(path).await.context("could not open file")?;
|
||||
|
||||
let insert_call = self.client.videos().insert(video);
|
||||
trace!("Starting resumable upload");
|
||||
let upload = insert_call
|
||||
.upload_resumable(stream.into_std().await, "video/mp4".parse().unwrap())
|
||||
.await;
|
||||
trace!("Resumable upload finished");
|
||||
let result_str = if upload.is_ok() { "Ok" } else { "Error" };
|
||||
info!("upload request done with result: {}", result_str);
|
||||
upload?.1.id.ok_or(UploaderError::Tmp2)
|
||||
}
|
||||
}
|
||||
|
||||
impl YoutubeClient {
|
||||
#[instrument(skip(self))]
|
||||
pub(crate) async fn add_video_to_playlist(
|
||||
&self,
|
||||
uploaded_video_id: String,
|
||||
playlist_id: String,
|
||||
) -> Result<()> {
|
||||
let playlist_item = PlaylistItem {
|
||||
snippet: Some(PlaylistItemSnippet {
|
||||
playlist_id: Some(playlist_id),
|
||||
resource_id: Some(ResourceId {
|
||||
kind: Some("youtube#video".to_string()),
|
||||
video_id: Some(uploaded_video_id),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
self.client
|
||||
.playlist_items()
|
||||
.insert(playlist_item)
|
||||
.doit()
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
#[instrument(skip(self, video))]
|
||||
pub(crate) async fn create_playlist(&self, video: &VideosModel) -> Result<String> {
|
||||
trace!("creating playlist for video: {:?}", video);
|
||||
let title = create_youtube_title(video, TitleLocation::PlaylistTitle)?;
|
||||
trace!("title: {}", title);
|
||||
let description: Option<String> = None;
|
||||
trace!("description: {:?}", description);
|
||||
let privacy_status = PlaylistStatuPrivacyStatusEnum::Private; //TODO: Get setting per user from db
|
||||
trace!("privacy: {:?}", privacy_status);
|
||||
|
||||
let playlist = Playlist {
|
||||
snippet: Some(PlaylistSnippet {
|
||||
title: Some(title),
|
||||
description,
|
||||
..Default::default()
|
||||
}),
|
||||
status: Some(PlaylistStatus {
|
||||
privacy_status: Some(privacy_status),
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
let playlist_insert_call = self.client.playlists().insert(playlist);
|
||||
let (x, playlist) = playlist_insert_call
|
||||
.doit()
|
||||
.await
|
||||
// .context("could not create playlist")
|
||||
// ?
|
||||
.unwrap()
|
||||
//test
|
||||
;
|
||||
|
||||
Ok(playlist
|
||||
.id
|
||||
.context("playlist creation did not return an ID")?)
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for YoutubeClient {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("YoutubeClient").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl YoutubeClient {
|
||||
#[tracing::instrument]
|
||||
pub async fn new(scopes: &Vec<Scope>, user: Option<UsersModel>) -> Result<Self> {
|
||||
let hyper_client = Self::create_hyper_client();
|
||||
let application_secret_path = &crate::CONF.google.youtube.client_secret_path;
|
||||
|
||||
let auth = auth::get_auth(
|
||||
application_secret_path,
|
||||
scopes,
|
||||
user.as_ref().map(|x| &x.youtube_id),
|
||||
)
|
||||
.await?;
|
||||
let client = google_youtube3::YouTube::new(hyper_client, auth);
|
||||
Ok(Self { client, user })
|
||||
}
|
||||
|
||||
fn create_hyper_client() -> Client<HttpsConnector<HttpConnector>> {
|
||||
hyper::Client::builder().build(
|
||||
HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.https_or_http()
|
||||
.enable_http1()
|
||||
.enable_http2()
|
||||
.build(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
enum TitleLocation {
|
||||
VideoTitle(usize),
|
||||
PlaylistTitle,
|
||||
Descriptions,
|
||||
}
|
||||
fn create_youtube_title(video: &VideosModel, target: TitleLocation) -> Result<String> {
|
||||
const YOUTUBE_TITLE_MAX_LENGTH: usize = 100;
|
||||
let max = video.part_count as usize;
|
||||
let date = parse_date(&video.created_at)
|
||||
.context(format!("could not parse date: {}", &video.created_at))?;
|
||||
let title = match target {
|
||||
TitleLocation::VideoTitle(current) => {
|
||||
let date_prefix = get_date_prefix(date.date());
|
||||
let part_prefix = if current != max {
|
||||
format_progress(max, current)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
shorten_string_if_needed(
|
||||
&format!("{}{} {}", date_prefix, part_prefix, video.name),
|
||||
YOUTUBE_TITLE_MAX_LENGTH,
|
||||
)
|
||||
}
|
||||
TitleLocation::PlaylistTitle => {
|
||||
let prefix = get_date_prefix(date.date());
|
||||
shorten_string_if_needed(
|
||||
&format!("{} {}", prefix, &video.name),
|
||||
YOUTUBE_TITLE_MAX_LENGTH,
|
||||
)
|
||||
}
|
||||
TitleLocation::Descriptions => format!("\"{}\"", video.name),
|
||||
};
|
||||
Ok(title)
|
||||
}
|
||||
|
||||
fn format_progress(max: usize, current: usize) -> String {
|
||||
let width = (max.checked_ilog10().unwrap_or(0) + 1) as usize;
|
||||
format!("[{:0width$}/{:0width$}]", current, max, width = width)
|
||||
}
|
||||
|
||||
const DATETIME_FORMAT: &str = "%Y-%m-%dT%H:%M:%S";
|
||||
fn parse_date(date: &str) -> ParseResult<NaiveDateTime> {
|
||||
chrono::NaiveDateTime::parse_from_str(&date, DATETIME_FORMAT)
|
||||
}
|
||||
|
||||
fn get_date_prefix(date: chrono::NaiveDate) -> String {
|
||||
format!(
|
||||
"[{:0>4}-{:0>2}-{:0>2}]",
|
||||
date.year(),
|
||||
date.month(),
|
||||
date.day()
|
||||
)
|
||||
}
|
||||
|
||||
fn shorten_string_if_needed(s: &str, target_len: usize) -> String {
|
||||
const SHORTEN_CHARS: &str = "...";
|
||||
if target_len < SHORTEN_CHARS.len() {
|
||||
return SHORTEN_CHARS[..target_len].to_string();
|
||||
}
|
||||
if s.len() > target_len {
|
||||
let s = &s[..target_len - SHORTEN_CHARS.len()];
|
||||
let result = s.to_string() + SHORTEN_CHARS;
|
||||
assert_eq!(result.len(), target_len);
|
||||
result
|
||||
} else {
|
||||
s.to_string()
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::client::youtube::{create_youtube_title, TitleLocation};
|
||||
use local_db::prelude::{Status, VideosModel};
|
||||
|
||||
#[test]
|
||||
fn test_shorten_string() {
|
||||
let test = super::shorten_string_if_needed("123456789", 50);
|
||||
assert_eq!("123456789", test);
|
||||
let test = super::shorten_string_if_needed("123456789", 5);
|
||||
assert_eq!("12...", test);
|
||||
let test = super::shorten_string_if_needed("123456789", 3);
|
||||
assert_eq!("...", test);
|
||||
let test = super::shorten_string_if_needed("123456789", 2);
|
||||
assert_eq!("..", test);
|
||||
let test = super::shorten_string_if_needed("123456789", 0);
|
||||
assert_eq!("", test);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_youtube_title() {
|
||||
let mut x = VideosModel {
|
||||
part_count: 4,
|
||||
name: "wow".to_string(),
|
||||
created_at: "2023-10-09T19:33:59".to_string(),
|
||||
//the rest is just dummy data
|
||||
id: 3,
|
||||
status: Status::Uploading,
|
||||
user_id: 0,
|
||||
twitch_id: String::new(),
|
||||
twitch_preview_image_url: None,
|
||||
twitch_download_url: None,
|
||||
duration: 0,
|
||||
youtube_id: None,
|
||||
youtube_playlist_name: String::new(),
|
||||
youtube_preview_image_url: None,
|
||||
youtube_playlist_id: None,
|
||||
youtube_playlist_created_at: None,
|
||||
fail_count: 0,
|
||||
fail_reason: None,
|
||||
};
|
||||
|
||||
let description = create_youtube_title(&x, TitleLocation::Descriptions).unwrap();
|
||||
assert_eq!("\"wow\"", description);
|
||||
|
||||
let playlist = create_youtube_title(&x, TitleLocation::PlaylistTitle).unwrap();
|
||||
assert_eq!("[2023-10-09] wow", playlist);
|
||||
|
||||
let video = create_youtube_title(&x, TitleLocation::VideoTitle(2)).unwrap();
|
||||
assert_eq!("[2023-10-09][2/4] wow", video);
|
||||
|
||||
x.part_count = 14;
|
||||
let video = create_youtube_title(&x, TitleLocation::VideoTitle(2)).unwrap();
|
||||
assert_eq!("[2023-10-09][02/14] wow", video);
|
||||
}
|
||||
}
|
||||
109
src/client/youtube/auth.rs
Normal file
109
src/client/youtube/auth.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
use crate::client::youtube::flow_delegate::CustomFlowDelegate;
|
||||
use crate::prelude::*;
|
||||
use anyhow::{anyhow, Context};
|
||||
use google_youtube3::api::Scope;
|
||||
use google_youtube3::{hyper::client::HttpConnector, hyper_rustls::HttpsConnector, oauth2};
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tracing::instrument;
|
||||
use yup_oauth2::authenticator::Authenticator;
|
||||
#[instrument]
|
||||
pub(super) async fn get_auth<USER: EasyString>(
|
||||
application_secret_path: &String,
|
||||
scopes: &Vec<Scope>,
|
||||
user: Option<USER>,
|
||||
) -> Result<Authenticator<HttpsConnector<HttpConnector>>> {
|
||||
trace!(
|
||||
"getting auth for user: {:?} with scopes: {:?} and secret_path: {:?}",
|
||||
user,
|
||||
scopes,
|
||||
application_secret_path
|
||||
);
|
||||
|
||||
let app_secret = oauth2::read_application_secret(application_secret_path)
|
||||
.await
|
||||
.context("could not read application secret from path")?;
|
||||
|
||||
let persistent_path =
|
||||
get_and_validate_persistent_path(&crate::CONF.google.path_auth_cache, user.clone()).await?;
|
||||
trace!(
|
||||
"persistent path for auth for user: {:?}: {:?}",
|
||||
user,
|
||||
&persistent_path
|
||||
);
|
||||
|
||||
trace!("creating authenticator");
|
||||
let user = user.map(|x| x.into());
|
||||
let method = oauth2::InstalledFlowReturnMethod::Interactive;
|
||||
let auth = oauth2::InstalledFlowAuthenticator::builder(app_secret, method)
|
||||
.flow_delegate(Box::new(CustomFlowDelegate::new(user, &crate::CONF)))
|
||||
.persist_tokens_to_disk(persistent_path)
|
||||
.force_account_selection(true)
|
||||
.build()
|
||||
.await
|
||||
.context("error creating authenticator")?;
|
||||
|
||||
trace!("got authenticator, requesting scopes");
|
||||
let access_token = auth
|
||||
.token(scopes)
|
||||
.await
|
||||
.context("could not get access to the requested scopes")?;
|
||||
trace!("got scope access: {:?}", access_token);
|
||||
Ok(auth)
|
||||
}
|
||||
|
||||
async fn get_and_validate_persistent_path<TEMPLATE: EasyString, USER: EasyString>(
|
||||
persistent_path_template: TEMPLATE,
|
||||
user: Option<USER>,
|
||||
) -> Result<PathBuf> {
|
||||
let persistent_path = get_persistent_path(persistent_path_template, user.clone())?;
|
||||
let persistent_path = Path::new(&persistent_path);
|
||||
info!(
|
||||
"Persistent auth path for user:{:?} => {}",
|
||||
user,
|
||||
persistent_path.display()
|
||||
);
|
||||
|
||||
if persistent_path.is_dir() {
|
||||
warn!("persistent path is a dir: {}", persistent_path.display());
|
||||
}
|
||||
|
||||
let persistent_path_parent_folder = persistent_path
|
||||
.parent()
|
||||
.context("could not get parent folder")?;
|
||||
if !persistent_path_parent_folder.exists() {
|
||||
debug!(
|
||||
"persistent path parent folder does not exist, creating it: {}",
|
||||
persistent_path_parent_folder.display()
|
||||
);
|
||||
fs::create_dir_all(persistent_path_parent_folder)
|
||||
.await
|
||||
.context("could not create dirs")?;
|
||||
} else if !persistent_path_parent_folder.is_dir() {
|
||||
error!(
|
||||
"persistent path parent folder is not a dir: {}",
|
||||
persistent_path_parent_folder.display()
|
||||
);
|
||||
return Err(anyhow!(
|
||||
"persistent path parent folder is not a dir: {}",
|
||||
persistent_path_parent_folder.display()
|
||||
)
|
||||
.into());
|
||||
}
|
||||
Ok(persistent_path.to_path_buf())
|
||||
}
|
||||
|
||||
fn get_persistent_path<TEMPLATE: EasyString, USER: EasyString>(
|
||||
persistent_path_template: TEMPLATE,
|
||||
user: Option<USER>,
|
||||
) -> Result<String> {
|
||||
let user: String = match user {
|
||||
Some(user) => user.into(),
|
||||
None => "unknown".to_string(),
|
||||
};
|
||||
let vars: HashMap<String, String> = HashMap::from([("user".to_string(), user)]);
|
||||
let persistent_path = strfmt::strfmt(&persistent_path_template.into(), &vars)
|
||||
.context("could not replace user in persistent path")?;
|
||||
Ok(persistent_path)
|
||||
}
|
||||
124
src/client/youtube/flow_delegate.rs
Normal file
124
src/client/youtube/flow_delegate.rs
Normal file
@@ -0,0 +1,124 @@
|
||||
use crate::prelude::*;
|
||||
use anyhow::anyhow;
|
||||
use backup_config::Conf;
|
||||
use std::{
|
||||
fmt::{Debug, Formatter},
|
||||
future::Future,
|
||||
path::Path,
|
||||
pin::Pin,
|
||||
};
|
||||
use tracing::instrument;
|
||||
use yup_oauth2::authenticator_delegate::InstalledFlowDelegate;
|
||||
|
||||
pub struct CustomFlowDelegate<USER: EasyString> {
|
||||
user: Option<USER>,
|
||||
}
|
||||
|
||||
impl<USER: EasyString> Debug for CustomFlowDelegate<USER> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CustomFlowDelegate")
|
||||
.field("user", &self.user)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
impl<USER: EasyString> CustomFlowDelegate<USER> {
|
||||
pub(crate) fn new(user: Option<USER>, config: &'static Conf) -> Self {
|
||||
Self { user }
|
||||
}
|
||||
}
|
||||
impl<USER: EasyString> InstalledFlowDelegate for CustomFlowDelegate<USER> {
|
||||
#[tracing::instrument]
|
||||
fn redirect_uri(&self) -> Option<&str> {
|
||||
if !(&crate::CONF.google.local_auth_redirect) {
|
||||
let url = "https://game-omgeeky.de:7443/googleapi/auth";
|
||||
trace!("server redirect uri: {}", url);
|
||||
Some(url)
|
||||
} else {
|
||||
let url = "http://localhost:8080/googleapi/auth";
|
||||
trace!("local redirect uri: {}", url);
|
||||
Some(url)
|
||||
}
|
||||
}
|
||||
fn present_user_url<'a>(
|
||||
&'a self,
|
||||
url: &'a str,
|
||||
need_code: bool,
|
||||
) -> Pin<Box<dyn Future<Output = StdResult<String, String>> + Send + 'a>> {
|
||||
Box::pin(self.present_user_url(url, need_code))
|
||||
}
|
||||
}
|
||||
impl<USER: EasyString> CustomFlowDelegate<USER> {
|
||||
#[tracing::instrument]
|
||||
async fn present_user_url(&self, url: &str, need_code: bool) -> StdResult<String, String> {
|
||||
let user: String = self
|
||||
.user
|
||||
.clone()
|
||||
.map(|x| x.into())
|
||||
.unwrap_or_else(|| "unknown".into());
|
||||
let message = format!(
|
||||
"Please open this URL in your browser to authenticate for {}:\n{}\n",
|
||||
user, url
|
||||
);
|
||||
println!("{}", message);
|
||||
info!("{}", message);
|
||||
if need_code {
|
||||
let mut code = String::new();
|
||||
if crate::CONF.google.use_file_auth_response {
|
||||
code = get_auth_code().await.unwrap_or("".to_string());
|
||||
}
|
||||
if code.is_empty() {
|
||||
println!("Please enter the code provided: ");
|
||||
match std::io::stdin().read_line(&mut code) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Error reading line: {}", e);
|
||||
return Err("".into());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(code)
|
||||
} else {
|
||||
Ok("".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
#[instrument]
|
||||
async fn get_auth_code() -> Result<String> {
|
||||
let code: String;
|
||||
|
||||
let path = Path::new(&crate::CONF.google.path_auth_code);
|
||||
if let Err(e) = std::fs::remove_file(path) {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
println!("Error removing file: {}", e);
|
||||
error!("Error removing file: {}", e);
|
||||
return Err(anyhow!("Error removing file: {}", e).into());
|
||||
}
|
||||
}
|
||||
let message = format!("Waiting for auth code in file: {}", path.display());
|
||||
println!("{}", message);
|
||||
info!(message);
|
||||
loop {
|
||||
let res = std::fs::read_to_string(path);
|
||||
if let Ok(content) = res {
|
||||
let line = content.lines().next();
|
||||
let line = match line {
|
||||
Some(s) => s.to_string(),
|
||||
None => {
|
||||
let message = "No code found in file";
|
||||
println!("{}", message);
|
||||
info!(message);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
code = line;
|
||||
break;
|
||||
}
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(
|
||||
crate::CONF.google.auth_file_read_timeout,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(code)
|
||||
}
|
||||
21
src/errors.rs
Normal file
21
src/errors.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum UploaderError {
|
||||
#[error("Could not load config")]
|
||||
LoadConfig(#[source] anyhow::Error),
|
||||
|
||||
#[error("Some error with the database")]
|
||||
OpenDatabase(#[from] local_db::re_exports::sea_orm::DbErr),
|
||||
|
||||
#[error("Error with some Youtube operation: {0} ")]
|
||||
YoutubeError(#[source] google_youtube3::Error),
|
||||
|
||||
#[error("Temporary error. Remove for production, {0}")]
|
||||
//TODO: Remove this error
|
||||
Tmp1(#[from] anyhow::Error),
|
||||
#[error("Temporary error. Remove for production, {0}")]
|
||||
//TODO: Remove this error
|
||||
Tmp3(#[from] google_youtube3::Error),
|
||||
#[error("Temporary error. Remove for production")]
|
||||
//TODO: Remove this error
|
||||
Tmp2,
|
||||
}
|
||||
56
src/main.rs
Normal file
56
src/main.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
#![allow(unused)]
|
||||
use backup_config::prelude::*;
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
use prelude::*;
|
||||
|
||||
mod client;
|
||||
pub mod errors;
|
||||
pub mod prelude;
|
||||
|
||||
lazy_static! {
|
||||
pub(crate) static ref CONF: Conf = Conf::builder()
|
||||
.env()
|
||||
.file("./settings.toml")
|
||||
.file(shellexpand::tilde("~/twba/config.toml").into_owned())
|
||||
.load()
|
||||
.map_err(|e| UploaderError::LoadConfig(e.into()))
|
||||
.expect("to load config");
|
||||
}
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::INFO)
|
||||
.with_env_filter("warn,uploader=trace")
|
||||
.init();
|
||||
let args = std::env::args().collect::<Vec<_>>();
|
||||
let presentation_mode = args.len() > 1;
|
||||
info!("Hello, world!");
|
||||
|
||||
run().await?;
|
||||
|
||||
info!("Bye");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
async fn run() -> Result<()> {
|
||||
trace!("run");
|
||||
let x = &CONF.google;
|
||||
debug!("{:?}", x);
|
||||
|
||||
trace!("creating db-connection");
|
||||
let db = local_db::open_database(Some(&CONF.db_url)).await?;
|
||||
trace!("migrating db");
|
||||
local_db::migrate_db(&db).await?;
|
||||
// local_db::print_db(&db).await?;
|
||||
|
||||
trace!("creating client");
|
||||
// dbg!(&conf);
|
||||
|
||||
let client = client::UploaderClient::new(db).await?;
|
||||
trace!("uploading videos");
|
||||
client.upload_videos().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
11
src/prelude.rs
Normal file
11
src/prelude.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
pub use crate::errors::UploaderError;
|
||||
use std::fmt::Debug;
|
||||
pub(crate) use std::result::Result as StdResult;
|
||||
|
||||
pub type Result<T> = StdResult<T, UploaderError>;
|
||||
|
||||
pub(crate) use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
pub trait EasyString: Into<String> + Clone + Debug + Send + Sync {}
|
||||
|
||||
impl<T> EasyString for T where T: Into<String> + Clone + Debug + Send + Sync {}
|
||||
Reference in New Issue
Block a user