change bigquery lib

This commit is contained in:
OMGeeky
2023-04-15 16:29:02 +02:00
parent 85e689f413
commit 6af27c8454
7 changed files with 185 additions and 295 deletions

135
Cargo.lock generated
View File

@@ -364,15 +364,15 @@ dependencies = [
[[package]] [[package]]
name = "downloader" name = "downloader"
version = "0.1.4" version = "0.2.0"
dependencies = [ dependencies = [
"anyhow",
"chrono", "chrono",
"downloader_config", "downloader_config",
"env_logger", "env_logger",
"google-bigquery2", "google_bigquery_v2",
"google_bigquery",
"google_youtube", "google_youtube",
"log 0.4.17", "log",
"log-panics", "log-panics",
"log4rs", "log4rs",
"nameof", "nameof",
@@ -387,7 +387,7 @@ name = "downloader_config"
version = "0.3.0" version = "0.3.0"
source = "git+https://github.com/OMGeeky/downloader_config#a484c2ece44554f47d72a3e8541abc0dda9c78bb" source = "git+https://github.com/OMGeeky/downloader_config#a484c2ece44554f47d72a3e8541abc0dda9c78bb"
dependencies = [ dependencies = [
"log 0.4.17", "log",
] ]
[[package]] [[package]]
@@ -419,7 +419,7 @@ checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0"
dependencies = [ dependencies = [
"humantime", "humantime",
"is-terminal", "is-terminal",
"log 0.4.17", "log",
"regex", "regex",
"termcolor", "termcolor",
] ]
@@ -452,7 +452,7 @@ source = "git+https://github.com/OMGeeky/exponential_backoff#3cc1a85122df47af0e5
dependencies = [ dependencies = [
"chrono", "chrono",
"google-youtube3", "google-youtube3",
"log 0.4.17", "log",
"rand", "rand",
"reqwest", "reqwest",
"serde_json", "serde_json",
@@ -635,34 +635,34 @@ dependencies = [
"http", "http",
"hyper", "hyper",
"itertools", "itertools",
"mime 0.3.17", "mime",
"serde", "serde",
"serde_json", "serde_json",
"serde_with", "serde_with",
"tokio", "tokio",
"tower-service", "tower-service",
"url 1.7.2", "url 1.7.2",
"yup-oauth2 8.1.1", "yup-oauth2",
] ]
[[package]] [[package]]
name = "google-bigquery2" name = "google-bigquery2"
version = "4.0.1+20220222" version = "5.0.2+20230114"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c66a8ea4e74f9512ea85b935e4cb30d7e75c5a01826b817b5f11c7d68a67bf15" checksum = "fc3eb8b026c61ff94b9b1badbdcc64e1fd330c0ee8d981aa180853a3d402e1a3"
dependencies = [ dependencies = [
"anyhow",
"google-apis-common",
"http", "http",
"hyper", "hyper",
"hyper-rustls", "hyper-rustls",
"itertools", "itertools",
"mime 0.2.6", "mime",
"serde", "serde",
"serde_derive",
"serde_json", "serde_json",
"tokio", "tokio",
"tower-service", "tower-service",
"url 1.7.2", "url 1.7.2",
"yup-oauth2 7.0.1",
] ]
[[package]] [[package]]
@@ -677,7 +677,7 @@ dependencies = [
"hyper", "hyper",
"hyper-rustls", "hyper-rustls",
"itertools", "itertools",
"mime 0.3.17", "mime",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@@ -686,28 +686,29 @@ dependencies = [
] ]
[[package]] [[package]]
name = "google_bigquery" name = "google_bigquery_v2"
version = "0.1.0" version = "0.2.1"
source = "git+https://github.com/OMGeeky/google_bigquery#95c14859aa6f712d1f28e97d6aa26ced845dc383" source = "git+https://github.com/OMGeeky/google_bigquery_v2#fcbaf6926c58250e10706551a462020015cce002"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
"env_logger",
"google-bigquery2", "google-bigquery2",
"google_bigquery_derive", "google_bigquery_v2_derive",
"reqwest", "log",
"serde", "nameof",
"serde_json", "serde_json",
"tokio", "tokio",
] ]
[[package]] [[package]]
name = "google_bigquery_derive" name = "google_bigquery_v2_derive"
version = "0.0.1" version = "0.0.1"
source = "git+https://github.com/OMGeeky/google_bigquery#95c14859aa6f712d1f28e97d6aa26ced845dc383" source = "git+https://github.com/OMGeeky/google_bigquery_v2#fcbaf6926c58250e10706551a462020015cce002"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.109", "syn 2.0.13",
] ]
[[package]] [[package]]
@@ -719,7 +720,7 @@ dependencies = [
"downloader_config", "downloader_config",
"exponential_backoff", "exponential_backoff",
"google-youtube3", "google-youtube3",
"log 0.4.17", "log",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@@ -855,7 +856,7 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
dependencies = [ dependencies = [
"http", "http",
"hyper", "hyper",
"log 0.4.17", "log",
"rustls", "rustls",
"rustls-native-certs", "rustls-native-certs",
"tokio", "tokio",
@@ -1054,15 +1055,6 @@ dependencies = [
"scopeguard", "scopeguard",
] ]
[[package]]
name = "log"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
dependencies = [
"log 0.4.17",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.17"
@@ -1086,7 +1078,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f9dd8546191c1850ecf67d22f5ff00a935b890d0e84713159a55495cc2ac5f" checksum = "68f9dd8546191c1850ecf67d22f5ff00a935b890d0e84713159a55495cc2ac5f"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"log 0.4.17", "log",
] ]
[[package]] [[package]]
@@ -1103,7 +1095,7 @@ dependencies = [
"fnv", "fnv",
"humantime", "humantime",
"libc", "libc",
"log 0.4.17", "log",
"log-mdc", "log-mdc",
"parking_lot", "parking_lot",
"serde", "serde",
@@ -1128,15 +1120,6 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "mime"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0"
dependencies = [
"log 0.3.9",
]
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.17" version = "0.3.17"
@@ -1159,7 +1142,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
dependencies = [ dependencies = [
"libc", "libc",
"log 0.4.17", "log",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
@@ -1178,7 +1161,7 @@ checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
"libc", "libc",
"log 0.4.17", "log",
"openssl", "openssl",
"openssl-probe", "openssl-probe",
"openssl-sys", "openssl-sys",
@@ -1472,8 +1455,8 @@ dependencies = [
"hyper-tls", "hyper-tls",
"ipnet", "ipnet",
"js-sys", "js-sys",
"log 0.4.17", "log",
"mime 0.3.17", "mime",
"native-tls", "native-tls",
"once_cell", "once_cell",
"percent-encoding 2.2.0", "percent-encoding 2.2.0",
@@ -1534,7 +1517,7 @@ version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [ dependencies = [
"log 0.4.17", "log",
"ring", "ring",
"sct", "sct",
"webpki", "webpki",
@@ -1547,20 +1530,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [ dependencies = [
"openssl-probe", "openssl-probe",
"rustls-pemfile 1.0.2", "rustls-pemfile",
"schannel", "schannel",
"security-framework", "security-framework",
] ]
[[package]]
name = "rustls-pemfile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360"
dependencies = [
"base64 0.13.1",
]
[[package]] [[package]]
name = "rustls-pemfile" name = "rustls-pemfile"
version = "1.0.2" version = "1.0.2"
@@ -1775,7 +1749,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acee08041c5de3d5048c8b3f6f13fafb3026b24ba43c6a695a0c76179b844369" checksum = "acee08041c5de3d5048c8b3f6f13fafb3026b24ba43c6a695a0c76179b844369"
dependencies = [ dependencies = [
"log 0.4.17", "log",
"termcolor", "termcolor",
"time 0.3.20", "time 0.3.20",
] ]
@@ -2106,7 +2080,7 @@ dependencies = [
"exponential_backoff", "exponential_backoff",
"futures", "futures",
"indicatif", "indicatif",
"log 0.4.17", "log",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@@ -2250,7 +2224,7 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [ dependencies = [
"log 0.4.17", "log",
"try-lock", "try-lock",
] ]
@@ -2283,7 +2257,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"log 0.4.17", "log",
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -2570,33 +2544,6 @@ dependencies = [
"linked-hash-map", "linked-hash-map",
] ]
[[package]]
name = "yup-oauth2"
version = "7.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98748970d2ddf05253e6525810d989740334aa7509457864048a829902db76f3"
dependencies = [
"anyhow",
"async-trait",
"base64 0.13.1",
"futures",
"http",
"hyper",
"hyper-rustls",
"itertools",
"log 0.4.17",
"percent-encoding 2.2.0",
"rustls",
"rustls-pemfile 0.3.0",
"seahash",
"serde",
"serde_json",
"time 0.3.20",
"tokio",
"tower-service",
"url 2.3.1",
]
[[package]] [[package]]
name = "yup-oauth2" name = "yup-oauth2"
version = "8.1.1" version = "8.1.1"
@@ -2611,10 +2558,10 @@ dependencies = [
"hyper", "hyper",
"hyper-rustls", "hyper-rustls",
"itertools", "itertools",
"log 0.4.17", "log",
"percent-encoding 2.2.0", "percent-encoding 2.2.0",
"rustls", "rustls",
"rustls-pemfile 1.0.2", "rustls-pemfile",
"seahash", "seahash",
"serde", "serde",
"serde_json", "serde_json",

View File

@@ -1,12 +1,12 @@
[package] [package]
name = "downloader" name = "downloader"
version = "0.1.4" version = "0.2.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
google_bigquery = { git = "https://github.com/OMGeeky/google_bigquery" } google_bigquery_v2 = { version="0.2.1", git = "https://github.com/OMGeeky/google_bigquery_v2" }
google_youtube = { version="0.1.1",git = "https://github.com/OMGeeky/google_youtube" } google_youtube = { version="0.1.1",git = "https://github.com/OMGeeky/google_youtube" }
twitch_data = { version="0.1.7", git = "https://github.com/OMGeeky/twitch_data" } twitch_data = { version="0.1.7", git = "https://github.com/OMGeeky/twitch_data" }
downloader_config = { version="0.3.0", git = "https://github.com/OMGeeky/downloader_config" } downloader_config = { version="0.3.0", git = "https://github.com/OMGeeky/downloader_config" }

View File

@@ -4,35 +4,21 @@ use std::fmt::Debug;
use chrono::DateTime; use chrono::DateTime;
use chrono::Utc; use chrono::Utc;
use google_bigquery::utils::ConvertValueToBigqueryParamValue; use google_bigquery_v2::data::query_builder::QueryResultType;
use google_bigquery::{ use google_bigquery_v2::prelude::*;
BigDataTable, BigDataTableBase, BigDataTableBaseConvenience, BigDataTableDerive,
BigDataTableHasPk, BigqueryClient, HasBigQueryClient, HasBigQueryClientDerive,
};
#[derive(BigDataTableDerive, HasBigQueryClientDerive)] #[derive(BigDataTableDerive, Debug, Default, Clone)]
#[db_name("streamers")] #[db_name("streamers")]
pub struct Streamers<'a> { pub struct Streamers {
#[primary_key] #[primary_key]
#[required] #[required]
pub login: String, pub login: String,
#[client] #[client]
pub client: Option<&'a BigqueryClient>, pub client: BigqueryClient,
pub display_name: Option<String>, pub display_name: Option::<String>,
pub watched: Option<bool>, pub watched: Option::<bool>,
pub youtube_user: Option<String>, pub youtube_user: Option::<String>,
pub public_videos_default: Option<bool>, pub public_videos_default: Option::<bool>,
}
impl Debug for Streamers<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Streamers")
.field("login", &self.login)
.field("display_name", &self.display_name)
.field("watched", &self.watched)
.field("youtube_user", &self.youtube_user)
.finish()
}
} }
/*impl<'a> Streamers<'a> { /*impl<'a> Streamers<'a> {
@@ -45,147 +31,59 @@ impl Debug for Streamers<'_> {
} }
}*/ }*/
impl Default for Streamers<'_> { #[derive(BigDataTableDerive, Debug, Default, Clone)]
fn default() -> Self {
Self {
login: "".to_string(),
client: None,
display_name: None,
watched: None,
youtube_user: None,
public_videos_default: None,
}
}
}
#[derive(BigDataTableDerive, HasBigQueryClientDerive)]
#[db_name("videos")] #[db_name("videos")]
pub struct Videos<'a> { pub struct Videos {
#[primary_key] #[primary_key]
#[required] #[required]
pub video_id: i64, pub video_id: i64,
#[client] #[client]
pub client: Option<&'a BigqueryClient>, pub client: BigqueryClient,
pub title: Option<String>, pub title: Option::<String>,
pub description: Option<String>, pub description: Option::<String>,
pub bool_test: Option<bool>, pub bool_test: Option::<bool>,
pub user_login: Option<String>, pub user_login: Option::<String>,
pub created_at: Option<DateTime<Utc>>, pub created_at: Option::<DateTime<Utc>>,
pub url: Option<String>, pub url: Option::<String>,
pub viewable: Option<String>, pub viewable: Option::<String>,
pub language: Option<String>, pub language: Option::<String>,
pub view_count: Option<i64>, pub view_count: Option::<i64>,
pub video_type: Option<String>, pub video_type: Option::<String>,
pub duration: Option<i64>, pub duration: Option::<i64>,
pub thumbnail_url: Option<String>, pub thumbnail_url: Option::<String>,
} }
impl Debug for Videos<'_> { #[derive(BigDataTableDerive, Debug, Default, Clone)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Videos")
.field("video_id", &self.video_id)
.field("title", &self.title)
.field("description", &self.description)
.field("bool_test", &self.bool_test)
.field("user_login", &self.user_login)
.field("created_at", &self.created_at)
.field("url", &self.url)
.field("viewable", &self.viewable)
.field("language", &self.language)
.field("view_count", &self.view_count)
.field("video_type", &self.video_type)
.field("duration", &self.duration)
.field("thumbnail_url", &self.thumbnail_url)
.finish()
}
}
impl Default for Videos<'_> {
fn default() -> Self {
Self {
video_id: -9999,
client: None,
title: None,
description: None,
bool_test: None,
user_login: None,
created_at: None,
url: None,
viewable: None,
language: None,
view_count: None,
video_type: None,
duration: None,
thumbnail_url: None,
}
}
}
#[derive(BigDataTableDerive, HasBigQueryClientDerive)]
#[db_name("video_metadata")] #[db_name("video_metadata")]
pub struct VideoMetadata<'a> { pub struct VideoMetadata {
#[primary_key] #[primary_key]
#[required] #[required]
pub video_id: i64, pub video_id: i64,
#[client] #[client]
pub client: Option<&'a BigqueryClient>, pub client: BigqueryClient,
pub backed_up: Option<bool>, pub backed_up: Option::<bool>,
pub total_clips_amount: Option<i64>, pub total_clips_amount: Option::<i64>,
pub parts_backed_up_id: Option<i64>, pub parts_backed_up_id: Option::<i64>,
pub parts_size: Option<i64>, pub parts_size: Option::<i64>,
pub error: Option<String>, pub error: Option::<String>,
pub download_playlist_url: Option<String>, pub download_playlist_url: Option::<String>,
pub youtube_playlist_url: Option<String>, pub youtube_playlist_url: Option::<String>,
}
impl Debug for VideoMetadata<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VideoMetadata")
.field("video_id", &self.video_id)
.field("backed_up", &self.backed_up)
.field("total_clips_amount", &self.total_clips_amount)
.field("parts_backed_up_id", &self.parts_backed_up_id)
.field("parts_size", &self.parts_size)
.field("error", &self.error)
.field("download_playlist_url", &self.download_playlist_url)
.field("youtube_playlist_url", &self.youtube_playlist_url)
.finish()
}
}
impl Default for VideoMetadata<'_> {
fn default() -> Self {
Self {
video_id: -9999,
client: None,
error: None,
backed_up: None,
total_clips_amount: None,
parts_backed_up_id: None,
parts_size: None,
download_playlist_url: None,
youtube_playlist_url: None,
}
}
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct VideoData<'a> { pub struct VideoData {
pub video: Videos<'a>, pub video: Videos,
pub metadata: VideoMetadata<'a>, pub metadata: VideoMetadata,
pub streamer: Streamers<'a>, pub streamer: Streamers,
} }
impl<'a> VideoData<'a> { impl VideoData {
pub fn from_twitch_video( pub fn from_twitch_video(video: &twitch_data::Video, client: BigqueryClient) -> Result<Self> {
video: &twitch_data::Video,
client: &'a BigqueryClient,
) -> Result<Self, Box<dyn Error>> {
Ok(Self { Ok(Self {
video: Videos { video: Videos {
video_id: video.id.parse::<i64>()?, video_id: video.id.parse::<i64>()?,
client: Some(client), client: client.clone(),
title: Some(video.title.clone()), title: Some(video.title.clone()),
description: Some(video.description.clone()), description: Some(video.description.clone()),
bool_test: Some(true), bool_test: Some(true),
@@ -201,7 +99,7 @@ impl<'a> VideoData<'a> {
}, },
metadata: VideoMetadata { metadata: VideoMetadata {
video_id: video.id.parse::<i64>()?, video_id: video.id.parse::<i64>()?,
client: Some(client), client: client.clone(),
backed_up: Some(false), backed_up: Some(false),
..Default::default() ..Default::default()
}, },

View File

@@ -6,10 +6,11 @@ use std::io::stdin;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Stdio; use std::process::Stdio;
use anyhow::{anyhow, Result};
use chrono::{Datelike, Duration}; use chrono::{Datelike, Duration};
use downloader_config; use downloader_config;
use downloader_config::Config; use downloader_config::Config;
use google_bigquery::{BigDataTable, BigqueryClient}; use google_bigquery_v2::prelude::*;
use google_youtube::{scopes, PrivacyStatus, YoutubeClient}; use google_youtube::{scopes, PrivacyStatus, YoutubeClient};
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use nameof::name_of; use nameof::name_of;
@@ -19,10 +20,10 @@ use tokio::process::Command;
use twitch_data::{TwitchClient, Video}; use twitch_data::{TwitchClient, Video};
use crate::data::{Streamers, VideoData}; use crate::data::{Streamers, VideoData};
use crate::prelude::*;
pub mod data; pub mod data;
pub mod prelude;
type Result<T> = std::result::Result<T, Box<dyn Error>>;
async fn check_for_new_videos<'a>( async fn check_for_new_videos<'a>(
db_client: &BigqueryClient, db_client: &BigqueryClient,
@@ -38,10 +39,11 @@ async fn check_for_new_videos<'a>(
let videos = get_twitch_videos_from_streamer(&streamer, &twitch_client).await?; let videos = get_twitch_videos_from_streamer(&streamer, &twitch_client).await?;
info!("Got {} videos for {}", videos.len(), streamer.login); info!("Got {} videos for {}", videos.len(), streamer.login);
for video in videos { for video in videos {
let video_id = video.id.parse()?; let video_id: &i64 = &video.id.parse()?;
let loaded_video = data::Videos::load_from_pk(db_client, video_id).await?; let loaded_video = data::Videos::get_by_pk(db_client.clone(), video_id).await;
if loaded_video.is_none() { if loaded_video.is_err() {
let video = data::VideoData::from_twitch_video(&video, &db_client)?; let mut video = data::VideoData::from_twitch_video(&video, db_client.clone())
.map_err(|e| anyhow::anyhow!("{}", e))?;
info!( info!(
"Video {} is not in the database, adding it: {}", "Video {} is not in the database, adding it: {}",
video_id, video_id,
@@ -51,8 +53,8 @@ async fn check_for_new_videos<'a>(
.as_ref() .as_ref()
.unwrap_or(&"TITLE NOT FOUND".to_string()) .unwrap_or(&"TITLE NOT FOUND".to_string())
); );
video.video.save_to_bigquery().await?; video.video.save().await.map_err(|e| anyhow!("{}", e))?;
video.metadata.save_to_bigquery().await?; video.metadata.save().await;
} }
} }
} }
@@ -60,21 +62,33 @@ async fn check_for_new_videos<'a>(
} }
async fn get_twitch_videos_from_streamer<'a>( async fn get_twitch_videos_from_streamer<'a>(
streamer: &'a Streamers<'a>, streamer: &Streamers,
twitch_client: &TwitchClient<'a>, twitch_client: &TwitchClient<'a>,
) -> Result<Vec<Video>> { ) -> Result<Vec<Video>> {
trace!("Getting videos from streamer {}", streamer.login); trace!("Getting videos from streamer {}", streamer.login);
let videos = twitch_client let videos = twitch_client
.get_videos_from_login(&streamer.login, None) .get_videos_from_login(&streamer.login, None)
.await?; .await
.map_err(|e| anyhow!("{}", e))?;
Ok(videos) Ok(videos)
} }
async fn get_watched_streamers(client: &BigqueryClient) -> Result<Vec<Streamers>> { async fn get_watched_streamers(client: &BigqueryClient) -> Result<Vec<Streamers>> {
trace!("Getting watched streamers"); trace!("Getting watched streamers");
let watched = let watched = Streamers::select()
Streamers::load_by_field(client, name_of!(watched in Streamers), Some(true), 1000).await?; .with_client(client.clone())
.add_where_eq(name_of!(watched in Streamers), Some(&true))
.map_err(|e| anyhow!("{}", e))?
.set_limit(1000)
.build_query()
.map_err(|e| anyhow!("{}", e))?
.run()
.await
.map_err(|e| anyhow!("{}", e))?;
let watched = watched
.map_err_with_data("Error getting watched streamers")
.map_err(|e| anyhow!("{}", e))?;
Ok(watched) Ok(watched)
} }
@@ -88,9 +102,13 @@ pub async fn start_backup() -> Result<()> {
let youtube_client_secret = &config.youtube_client_secret_path.as_str(); let youtube_client_secret = &config.youtube_client_secret_path.as_str();
info!("creating BigqueryClient"); info!("creating BigqueryClient");
let client = BigqueryClient::new(project_id, dataset_id, Some(service_account_path)).await?; let client = BigqueryClient::new(project_id, dataset_id, Some(service_account_path))
.await
.map_err(|e| anyhow!("{}", e))?;
info!("creating twitch client"); info!("creating twitch client");
let twitch_client = twitch_data::get_client().await?; let twitch_client = twitch_data::get_client()
.await
.map_err(|e| anyhow!("{}", e))?;
info!("Starting main loop"); info!("Starting main loop");
'main_loop: loop { 'main_loop: loop {
trace!("Beginning of main loop"); trace!("Beginning of main loop");
@@ -98,7 +116,9 @@ pub async fn start_backup() -> Result<()> {
trace!("Checking for new videos"); trace!("Checking for new videos");
check_for_new_videos(&client, &twitch_client).await?; check_for_new_videos(&client, &twitch_client).await?;
trace!("backing up not downloaded videos"); trace!("backing up not downloaded videos");
backup_not_downloaded_videos(&client, &twitch_client, &config).await?; backup_not_downloaded_videos(&client, &twitch_client, &config)
.await
.map_err(|e| anyhow!("{}", e))?;
//sleep for an hour //sleep for an hour
info!("Sleeping for an hour"); info!("Sleeping for an hour");
@@ -109,16 +129,21 @@ pub async fn start_backup() -> Result<()> {
async fn get_not_downloaded_videos_from_db( async fn get_not_downloaded_videos_from_db(
client: &BigqueryClient, client: &BigqueryClient,
) -> Result<impl Iterator<Item = impl Future<Output = Result<Option<data::VideoData>>>>> { ) -> Result<impl Iterator<Item = impl Future<Output = Result<Option<data::VideoData>>>+ '_>+ '_> {
//TODO: make sure that this is sorted by date (oldest first) //TODO: make sure that this is sorted by date (oldest first)
info!("getting not downloaded videos from db (metadata)"); info!("getting not downloaded videos from db (metadata)");
let mut video_metadata_list = data::VideoMetadata::load_by_field(
&client, let mut video_metadata_list = data::VideoMetadata::select()
name_of!(backed_up in data::VideoMetadata), .with_client(client.clone())
Some(false), .add_where_eq(name_of!(backed_up in data::VideoMetadata), Some(&false))
1000, .map_err(|e| anyhow!("{}", e))?
) .set_limit(1000)
.await?; .build_query()
.map_err(|e| anyhow!("{}", e))?
.run()
.await
.map_err(|e| anyhow!("{}", e))?
.map_err_with_data("Error getting not downloaded videos from db").map_err(|e|anyhow!("{}",e))?;
info!("getting not downloaded videos from db (videos)"); info!("getting not downloaded videos from db (videos)");
let amount = video_metadata_list.len(); let amount = video_metadata_list.len();
info!("got about {} videos", amount); info!("got about {} videos", amount);
@@ -131,17 +156,19 @@ async fn get_not_downloaded_videos_from_db(
i + 1, i + 1,
amount amount
); );
let v = data::Videos::load_from_pk(client, metadata.video_id).await?; let video_id = metadata.video_id.clone();
let v = data::Videos::get_by_pk(client.clone(), &video_id.clone()).await;
if let Some(video) = v { if let Ok(video) = v {
info!( info!(
"getting not downloaded videos from db (streamer): {}/{}", "getting not downloaded videos from db (streamer): {}/{}",
i + 1, i + 1,
amount amount
); );
let user_login = video.user_login.clone().unwrap().to_lowercase(); let user_login = video.user_login.clone().unwrap().to_lowercase();
let streamer = data::Streamers::load_from_pk(client, user_login.clone()).await?; let streamer =
if streamer.is_none() { data::Streamers::get_by_pk(client.clone(), &user_login.clone()).await;
if streamer.is_ok() {
// .expect(format!("Streamer with login not found: {}", user_login).as_str()); // .expect(format!("Streamer with login not found: {}", user_login).as_str());
warn!("Streamer with login not found: {}", user_login); warn!("Streamer with login not found: {}", user_login);
return Ok(None); return Ok(None);
@@ -216,13 +243,14 @@ async fn backup_not_downloaded_videos<'a>(
.as_str(), .as_str(),
), ),
) )
.await?; .await
.map_err(|e| anyhow!("{}", e))?;
info!("Uploading video to youtube"); info!("Uploading video to youtube");
let res = upload_video_to_youtube(&video_parts, &mut video, &youtube_client, config).await; let res = upload_video_to_youtube(&video_parts, &mut video, &youtube_client, config).await;
if let Err(e) = res { if let Err(e) = res {
info!("Error uploading video: {}", e); info!("Error uploading video: {}", e);
video.metadata.error = Some(e.to_string()); video.metadata.error = Some(e.to_string());
video.metadata.save_to_bigquery().await?; video.metadata.save().await.map_err(|e| anyhow!("{}", e))?;
} else { } else {
info!( info!(
"Video uploaded successfully: {}: {}", "Video uploaded successfully: {}: {}",
@@ -230,7 +258,7 @@ async fn backup_not_downloaded_videos<'a>(
video.video.title.as_ref().unwrap() video.video.title.as_ref().unwrap()
); );
video.metadata.backed_up = Some(true); video.metadata.backed_up = Some(true);
video.metadata.save_to_bigquery().await?; video.metadata.save().await.map_err(|e| anyhow!("{}", e))?;
} }
info!("Cleaning up video parts"); info!("Cleaning up video parts");
cleanup_video_parts(video_parts).await?; cleanup_video_parts(video_parts).await?;
@@ -251,7 +279,7 @@ async fn cleanup_video_parts(video_parts: Vec<PathBuf>) -> Result<()> {
async fn upload_video_to_youtube<'a>( async fn upload_video_to_youtube<'a>(
video_path: &Vec<PathBuf>, video_path: &Vec<PathBuf>,
mut video: &mut VideoData<'a>, mut video: &mut VideoData,
youtube_client: &YoutubeClient, youtube_client: &YoutubeClient,
config: &Config, config: &Config,
) -> Result<()> { ) -> Result<()> {
@@ -288,15 +316,18 @@ async fn upload_video_to_youtube<'a>(
config.youtube_tags.clone(), config.youtube_tags.clone(),
privacy, privacy,
) )
.await?; .await
.map_err(|e| anyhow!("{}", e))?;
let playlist_title = get_playlist_title_from_twitch_video(&video)?; let playlist_title = get_playlist_title_from_twitch_video(&video)?;
let playlist = youtube_client let playlist = youtube_client
.find_playlist_or_create_by_name(&playlist_title) .find_playlist_or_create_by_name(&playlist_title)
.await?; .await
.map_err(|e| anyhow!("{}", e))?;
youtube_client youtube_client
.add_video_to_playlist(&youtube_video, &playlist) .add_video_to_playlist(&youtube_video, &playlist)
.await?; .await
.map_err(|e| anyhow!("{}", e))?;
video.metadata.youtube_playlist_url = playlist.id; video.metadata.youtube_playlist_url = playlist.id;
} }
@@ -609,7 +640,7 @@ pub fn get_video_title_from_twitch_video(
), ),
}; };
let title = video.video.title.as_ref().ok_or("Video has no title")?; let title = video.video.title.as_ref().ok_or("Video has no title").map_err(|e|anyhow!("{}",e))?;
let title = cap_long_title(title)?; let title = cap_long_title(title)?;
let res = format!("{}{}", prefix, title); let res = format!("{}{}", prefix, title);
@@ -621,7 +652,7 @@ const PREFIX_LENGTH: usize = 24;
pub fn get_playlist_title_from_twitch_video(video: &data::VideoData) -> Result<String> { pub fn get_playlist_title_from_twitch_video(video: &data::VideoData) -> Result<String> {
trace!("get playlist title from twitch video"); trace!("get playlist title from twitch video");
let title = video.video.title.as_ref().ok_or("Video has no title")?; let title = video.video.title.as_ref().ok_or("Video has no title").map_err(|e|anyhow!("{}",e))?;
let date_str = get_date_string_from_video(video)?; let date_str = get_date_string_from_video(video)?;
let title = format!("{} {}", date_str, title,); let title = format!("{} {}", date_str, title,);
let title = cap_long_title(title)?; let title = cap_long_title(title)?;
@@ -658,7 +689,7 @@ fn get_date_string_from_video(video: &VideoData) -> Result<String> {
let created_at = video let created_at = video
.video .video
.created_at .created_at
.ok_or(format!("Video has no created_at time: {:?}", video.video).as_str())?; .ok_or(format!("Video has no created_at time: {:?}", video.video).as_str()).map_err(|e|anyhow!("{}",e))?;
// let created_at = created_at.format("%Y-%m-%d"); // let created_at = created_at.format("%Y-%m-%d");
let res = format!( let res = format!(
"[{:0>4}-{:0>2}-{:0>2}]", "[{:0>4}-{:0>2}-{:0>2}]",

View File

@@ -4,8 +4,8 @@ use std::error::Error;
use std::fmt::Debug; use std::fmt::Debug;
use std::path::Path; use std::path::Path;
use google_bigquery; use anyhow::{anyhow, Result};
use google_bigquery::{BigDataTable, BigqueryClient}; use google_bigquery_v2::prelude::*;
use google_youtube::scopes; use google_youtube::scopes;
use google_youtube::YoutubeClient; use google_youtube::YoutubeClient;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
@@ -189,20 +189,33 @@ async fn sample_twitch<'a>(client: &TwitchClient<'a>) -> Result<(), Box<dyn Erro
async fn sample_bigquery<'a>(client: &'a BigqueryClient) -> Result<(), Box<dyn Error>> { async fn sample_bigquery<'a>(client: &'a BigqueryClient) -> Result<(), Box<dyn Error>> {
// let x = VideoMetadata::from_pk(&client, 1638184921).await?; // let x = VideoMetadata::from_pk(&client, 1638184921).await?;
let video_metadata = VideoMetadata::create_and_load_from_pk(&client, 1638184921).await?; // let video_metadata = VideoMetadata::create_and_load_from_pk(&client, 1638184921).await?;
info!("got video_metadata by id: {:?}", video_metadata); // info!("got video_metadata by id: {:?}", video_metadata);
let video_metadata = VideoMetadata::load_by_field( let video_metadata = VideoMetadata::select()
&client, .with_client(client.clone())
name_of!(backed_up in VideoMetadata), .add_where_eq(name_of!(backed_up in VideoMetadata), Some(&true)).map_err(|e|anyhow!("{}",e))?
Some(true), .set_limit(10)
10, .build_query()
) .map_err(|e| anyhow!("{}", e))?
.await?; .run()
.await
.map_err(|e| anyhow!("{}", e))?
.map_err_with_data("Select has to return data")
.map_err(|e| anyhow!("{}", e))?;
print_vec_sample("got video_metadata by backed_up:", video_metadata); print_vec_sample("got video_metadata by backed_up:", video_metadata);
let watched_streamers = let watched_streamers = Streamers::select()
Streamers::load_by_field(&client, name_of!(watched in Streamers), Some(true), 100).await?; .with_client(client.clone())
.add_where_eq(name_of!(watched in Streamers), Some(&true)).map_err(|e|anyhow!("{}",e))?
.set_limit(100)
.build_query()
.map_err(|e| anyhow!("{}", e))?
.run()
.await
.map_err(|e| anyhow!("{}", e))?
.map_err_with_data("Select has to return data")
.map_err(|e| anyhow!("{}", e))?;
print_vec_sample("got watched_streamers:", watched_streamers); print_vec_sample("got watched_streamers:", watched_streamers);
fn print_vec_sample<T: Debug>(message: &str, watched_streamers: Vec<T>) { fn print_vec_sample<T: Debug>(message: &str, watched_streamers: Vec<T>) {

1
src/prelude.rs Normal file
View File

@@ -0,0 +1 @@

View File

@@ -2,7 +2,7 @@ use std::path::{Path, PathBuf};
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
// use bigquery_googleapi::BigqueryClient; // use bigquery_googleapi::BigqueryClient;
use google_bigquery::BigqueryClient; use google_bigquery_v2::prelude::*;
use log::LevelFilter; use log::LevelFilter;
use log::{debug, info}; use log::{debug, info};
@@ -35,7 +35,7 @@ fn get_sample_video(client: &BigqueryClient) -> VideoData {
video: Videos { video: Videos {
created_at: Some(get_utc_from_string("2021-01-01T00:00:00")), created_at: Some(get_utc_from_string("2021-01-01T00:00:00")),
video_id: 1, video_id: 1,
client: Some(client), client: client.clone(),
title: Some("Test Video".to_string()), title: Some("Test Video".to_string()),
description: Some("Test Description".to_string()), description: Some("Test Description".to_string()),
bool_test: Some(true), bool_test: Some(true),
@@ -50,14 +50,14 @@ fn get_sample_video(client: &BigqueryClient) -> VideoData {
}, },
metadata: VideoMetadata { metadata: VideoMetadata {
video_id: 1, video_id: 1,
client: Some(client), client: client.clone(),
backed_up: Some(false), backed_up: Some(false),
..Default::default() ..Default::default()
}, },
streamer: Streamers { streamer: Streamers {
display_name: Some("NoPixel VODs".to_string()), display_name: Some("NoPixel VODs".to_string()),
login: "nopixelvods".to_string(), login: "nopixelvods".to_string(),
client: Some(client), client: client.clone(),
youtube_user: Some("NoPixel VODs".to_string()), youtube_user: Some("NoPixel VODs".to_string()),
watched: Some(true), watched: Some(true),
public_videos_default: Some(false), public_videos_default: Some(false),