From 986a9dba83282034e48a1aabdceac662525bab08 Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Sat, 1 Apr 2023 14:45:17 +0200 Subject: [PATCH] builds on stable --- Cargo.toml | 3 +- src/bigquery.rs | 8 +-- src/lib.rs | 26 ++++++--- src/twitch.rs | 45 ++++++++++----- src/youtube.rs | 149 ++++++++++++++++++++---------------------------- 5 files changed, 112 insertions(+), 119 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e83e126..9d94d14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,5 @@ tokio = "1.24.2" reqwest = "0.11.14" chrono = "0.4.23" google-youtube3 = "4.0" -mime = "0.2.6" serde_json = "1.0.91" -rand = "0.8.5" \ No newline at end of file +rand = "0.8.5" diff --git a/src/bigquery.rs b/src/bigquery.rs index 5252774..b9267dc 100644 --- a/src/bigquery.rs +++ b/src/bigquery.rs @@ -1,9 +1,3 @@ - use std::error::Error; - -// pub async fn check_backoff_bigquery(res: Result> )->Result>{ -// -// -// Ok(res) -// } +//TODO: implement bigquery backoff diff --git a/src/lib.rs b/src/lib.rs index 44a83fa..44550dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(async_closure)] use rand::Rng; const EXTRA_BUFFER_TIME: u64 = 100; @@ -9,20 +8,29 @@ pub enum Api { Bigquery, } -pub mod errors; -pub mod youtube; -pub mod twitch; pub mod bigquery; +pub mod errors; +pub mod twitch; +pub mod youtube; - +/// Sleeps for the given backoff time, plus some extra buffer time, plus some random extra time. +/// backoff_time is in seconds. +/// with_extra_buffer_time is a bool that determines whether or not to add the extra buffer time. async fn sleep_for_backoff_time(backoff_time: u64, with_extra_buffer_time: bool) { let extra_buffer_time = match with_extra_buffer_time { true => EXTRA_BUFFER_TIME, - false => 0 + false => 0, }; - let backoff_time = backoff_time * 1000 as u64; - // let random_extra = rand::thread_rng().gen_range(0..100); + //convert to milliseconds + let backoff_time = backoff_time * 1000; + + //add some random extra time for good measure (in milliseconds) let random_extra = rand::thread_rng().gen_range(0..100); - tokio::time::sleep(std::time::Duration::from_millis(backoff_time + extra_buffer_time + random_extra)).await; + let total_millis = backoff_time + extra_buffer_time + random_extra; + println!( + "sleep_for_backoff_time->Sleeping for {} milliseconds", + total_millis + ); + tokio::time::sleep(std::time::Duration::from_millis(total_millis)).await; } diff --git a/src/twitch.rs b/src/twitch.rs index ca76297..d2c76ef 100644 --- a/src/twitch.rs +++ b/src/twitch.rs @@ -1,15 +1,14 @@ use std::error::Error; use chrono::NaiveDateTime; -use reqwest::{Body, Client, IntoUrl, Request, RequestBuilder, Response}; use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::{Body, Client, IntoUrl, Request, RequestBuilder, Response}; use crate::errors::BackoffError; use crate::sleep_for_backoff_time; enum ErrorTypes { E429(String), - } //region reqwest //region convenience functions @@ -20,15 +19,23 @@ pub async fn check_backoff_twitch_get(url: T) -> Result(url: T, client: &Client) -> Result> { - check_backoff_twitch_with_client(Request::new(reqwest::Method::GET, url.into_url()?), client).await +pub async fn check_backoff_twitch_get_with_client( + url: T, + client: &Client, +) -> Result> { + check_backoff_twitch_with_client(Request::new(reqwest::Method::GET, url.into_url()?), client) + .await } //endregion //region POST -pub async fn check_backoff_twitch_post>(url: T, headers: Option, body: Option) -> Result> { +pub async fn check_backoff_twitch_post>( + url: T, + headers: Option, + body: Option, +) -> Result> { let client = Client::new(); check_backoff_twitch_post_with_client(url, headers, body, &client).await } @@ -37,7 +44,7 @@ pub async fn check_backoff_twitch_post_with_client>( url: T, headers: Option, body: Option, - client: &Client + client: &Client, ) -> Result> { let mut request = client.post(url.into_url()?); @@ -60,9 +67,14 @@ pub async fn check_backoff_twitch(request: Request) -> Result Result> { +pub async fn check_backoff_twitch_with_client( + request: Request, + client: &Client, +) -> Result> { loop { - let r: Request = request.try_clone().ok_or::("Request is None".into())?; + let r: Request = request + .try_clone() + .ok_or::("Request is None".into())?; // Some(v) => Ok(v), // None => Err("Request is None".into()), // }?; @@ -72,10 +84,13 @@ pub async fn check_backoff_twitch_with_client(request: Request, client: &Client) match status_code.as_u16() { 200 => return Ok(response), 429 => { - let x = &request.headers().get("Ratelimit-Reset").ok_or(BackoffError::new("No rate limit reset given"))?; + let x = &request + .headers() + .get("Ratelimit-Reset") + .ok_or(BackoffError::new("No rate limit reset given"))?; let value: String = x.to_str()?.to_string(); handle_e429(value).await?; - }, + } _ => return Ok(response), // _ => todo!("Handle other errors or "), @@ -85,18 +100,18 @@ pub async fn check_backoff_twitch_with_client(request: Request, client: &Client) async fn handle_e429(value: String) -> Result<(), Box> { let value = value.parse::()?; - let timestamp = NaiveDateTime::from_timestamp_opt(value, 0) - .ok_or(BackoffError::new(format!("Could not convert the provided timestamp: {}", value)))?; + let timestamp = NaiveDateTime::from_timestamp_opt(value, 0).ok_or(BackoffError::new( + format!("Could not convert the provided timestamp: {}", value), + ))?; let now = chrono::Local::now().naive_local(); if timestamp < now { - sleep_for_backoff_time(1000, true).await; + sleep_for_backoff_time(1, true).await; return Ok(()); } let duration = timestamp - now; - let duration = duration.num_milliseconds() as u64; + let duration = duration.num_seconds() as u64; println!("Sleeping for {} seconds", duration); sleep_for_backoff_time(duration, true).await; - // tokio::time::sleep(tokio::time::Duration::from_secs(duration)).await; //TODO: test this somehow Ok(()) } diff --git a/src/youtube.rs b/src/youtube.rs index b5adeac..e98f831 100644 --- a/src/youtube.rs +++ b/src/youtube.rs @@ -9,38 +9,35 @@ use google_youtube3::hyper::client::HttpConnector; use google_youtube3::hyper_rustls::HttpsConnector; use google_youtube3::YouTube; +use crate::sleep_for_backoff_time; + +/// the base number for the backoff +/// +/// gets used as base^n where n is the amount of backoffs const YOUTUBE_DEFAULT_BACKOFF_TIME_S: u64 = 2; +/// the max amount a single backoff can be const YOUTUBE_MAX_BACKOFF_TIME_S: u64 = 3600; +/// the max amount of backoffs that can be done +/// +/// after this amount of backoffs, the method will return Err() +const YOUTUBE_MAX_TRIES: u64 = 50;//should result in ~39 hours of maximum backoff time -struct UploadParameters { - video: Video, - path: PathBuf, - mime_type: mime::Mime -} -//TODO: implement backoff for other youtube calls - -async fn generic_check_backoff_youtube< +pub async fn generic_check_backoff_youtube<'a, 'b, 'c, T, - // Fut: Future, T)>, Box>> , + Para, Fut: Future, T), google_youtube3::Error>>, - Para> +> ( - client: &YouTube>, - para: &Para, - function: impl Fn(&YouTube>, &Para) -> Fut + client: &'a YouTube>, + para: &'b Para, + function: impl Fn(&'a YouTube>, &'b Para) -> Fut ) -> Result, T)>, Box> -// where Fut: Future, T)>> { let mut backoff = 0; let mut res: google_youtube3::Result<(Response, T)>; 'try_upload: loop { - // let stream = tokio::fs::File::open(&path).await?; - // let stream = stream.into_std().await; - // println!("Uploading video ({}): {:?}", backoff, path.as_ref().to_str()); - // - // res = client.videos().insert(video.clone()).upload(stream, mime_type.clone()).await; res = function(&client, para).await; match res { Ok(_) => break 'try_upload, @@ -48,17 +45,13 @@ async fn generic_check_backoff_youtube< println!("Error: {}", e); if let BadRequest(e1) = &e { let is_quota_error = get_is_quota_error(&e1); - backoff += 1; - println!("is_quota_error: {}", is_quota_error); if is_quota_error { - let backoff_time = YOUTUBE_DEFAULT_BACKOFF_TIME_S.pow(backoff); - println!("backoff_time: {}", backoff_time); - if backoff_time > YOUTUBE_MAX_BACKOFF_TIME_S { + println!("quota_error: {}", e); + backoff += 1; + if !wait_for_backoff(backoff).await { return Err(e.into()); } - //TODO: test this backoff - tokio::time::sleep(std::time::Duration::from_millis(backoff_time * 1000)).await; } else { return Err(e.into()); } @@ -73,69 +66,51 @@ async fn generic_check_backoff_youtube< Ok(res) } -pub async fn check_backoff_youtube_upload(client: &YouTube>, - video: Video, - path: impl AsRef, - mime_type: mime::Mime) - -> Result, Video)>, Box> -{ - let params = UploadParameters { - video: video.clone(), - path: path.as_ref().into(), - mime_type: mime_type.clone() - }; - - async fn function(client: &YouTube>, para: &UploadParameters) - -> Result<(Response, Video), google_youtube3::Error> { - // let para = para.get_parameters(); - // let stream = tokio::fs::File::open(¶.path).await?; - // let stream = stream.into_std().await; - let stream = std::fs::File::open(¶.path)?; - // println!("Uploading video ({}): {:?}", backoff, path.as_ref().to_str()); - client.videos().insert(para.video.clone()).upload(stream, para.mime_type.clone()).await +async fn wait_for_backoff<'a>(backoff: u32) -> bool { + let mut backoff_time = YOUTUBE_DEFAULT_BACKOFF_TIME_S.pow(backoff); + println!("backoff_time: {}", backoff_time); + if backoff as u64 > YOUTUBE_MAX_TRIES { + return false; } - let res = generic_check_backoff_youtube(client, ¶ms, function).await??; - - - // let mut backoff = 0; - // let mut res: google_youtube3::Result<(Response, Video)>; - // 'try_upload: loop { - // let stream = tokio::fs::File::open(&path).await?; - // let stream = stream.into_std().await; - // println!("Uploading video ({}): {:?}", backoff, path.as_ref().to_str()); - // res = client.videos().insert(video.clone()).upload(stream, mime_type.clone()).await; - // match res { - // Ok(_) => break 'try_upload, - // Err(e) => { - // println!("Error: {}", e); - // if let BadRequest(e1) = &e { - // let is_quota_error = get_is_quota_error(&e1); - // backoff += 1; - // - // println!("is_quota_error: {}", is_quota_error); - // if is_quota_error { - // let backoff_time = YOUTUBE_DEFAULT_BACKOFF_TIME_S.pow(backoff); - // println!("backoff_time: {}", backoff_time); - // if backoff_time > YOUTUBE_MAX_BACKOFF_TIME_S { - // return Err(e.into()); - // } - // //TODO: test this backoff - // tokio::time::sleep(std::time::Duration::from_millis(backoff_time * 1000)).await; - // } else { - // return Err(e.into()); - // } - // } else { - // return Err(e.into()); - // } - // } - // } - // } - // - - let res: google_youtube3::Result<(Response, Video)> = Ok(res); - Ok(res) + if backoff_time > YOUTUBE_MAX_BACKOFF_TIME_S { + backoff_time = YOUTUBE_MAX_BACKOFF_TIME_S; + } + sleep_for_backoff_time(backoff_time, false).await; + true } +// +// pub async fn check_backoff_youtube_upload(client: &YouTube>, +// video: Video, +// path: impl AsRef, +// mime_type: mime::Mime) +// -> Result, Video)>, Box> +// { +// struct UploadParameters { +// video: Video, +// path: PathBuf, +// mime_type: mime::Mime +// } +// +// let params = UploadParameters { +// video: video.clone(), +// path: path.as_ref().into(), +// mime_type: mime_type.clone() +// }; +// +// async fn function(client: &YouTube>, para: &UploadParameters) +// -> Result<(Response, Video), google_youtube3::Error> { +// let stream = tokio::fs::File::open(¶.path).await?; +// let stream = stream.into_std().await; +// client.videos().insert(para.video.clone()).upload(stream, para.mime_type.clone()).await +// } +// let res = generic_check_backoff_youtube:: +// (client, ¶ms, function).await??; +// +// let res: google_youtube3::Result<(Response, Video)> = Ok(res); +// Ok(res) +// } + fn get_is_quota_error(e: &serde_json::value::Value) -> bool { let is_quota_error = e.get("error") .and_then(|e| e.get("errors")) @@ -145,6 +120,8 @@ fn get_is_quota_error(e: &serde_json::value::Value) -> bool { .and_then(|e| if e == "quotaExceeded" { Some(()) + } else if e == "uploadLimitExceeded" { + Some(()) } else { None }