builds on stable

This commit is contained in:
OMGeeky
2023-04-01 14:45:17 +02:00
parent fc0346a790
commit 986a9dba83
5 changed files with 112 additions and 119 deletions

View File

@@ -10,6 +10,5 @@ tokio = "1.24.2"
reqwest = "0.11.14"
chrono = "0.4.23"
google-youtube3 = "4.0"
mime = "0.2.6"
serde_json = "1.0.91"
rand = "0.8.5"
rand = "0.8.5"

View File

@@ -1,9 +1,3 @@
use std::error::Error;
// pub async fn check_backoff_bigquery<T>(res: Result<T, Box<dyn Error>> )->Result<T, Box<dyn Error>>{
//
//
// Ok(res)
// }
//TODO: implement bigquery backoff

View File

@@ -1,4 +1,3 @@
#![feature(async_closure)]
use rand::Rng;
const EXTRA_BUFFER_TIME: u64 = 100;
@@ -9,20 +8,29 @@ pub enum Api {
Bigquery,
}
pub mod errors;
pub mod youtube;
pub mod twitch;
pub mod bigquery;
pub mod errors;
pub mod twitch;
pub mod youtube;
/// Sleeps for the given backoff time, plus some extra buffer time, plus some random extra time.
/// backoff_time is in seconds.
/// with_extra_buffer_time is a bool that determines whether or not to add the extra buffer time.
async fn sleep_for_backoff_time(backoff_time: u64, with_extra_buffer_time: bool) {
let extra_buffer_time = match with_extra_buffer_time {
true => EXTRA_BUFFER_TIME,
false => 0
false => 0,
};
let backoff_time = backoff_time * 1000 as u64;
// let random_extra = rand::thread_rng().gen_range(0..100);
//convert to milliseconds
let backoff_time = backoff_time * 1000;
//add some random extra time for good measure (in milliseconds)
let random_extra = rand::thread_rng().gen_range(0..100);
tokio::time::sleep(std::time::Duration::from_millis(backoff_time + extra_buffer_time + random_extra)).await;
let total_millis = backoff_time + extra_buffer_time + random_extra;
println!(
"sleep_for_backoff_time->Sleeping for {} milliseconds",
total_millis
);
tokio::time::sleep(std::time::Duration::from_millis(total_millis)).await;
}

View File

@@ -1,15 +1,14 @@
use std::error::Error;
use chrono::NaiveDateTime;
use reqwest::{Body, Client, IntoUrl, Request, RequestBuilder, Response};
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Body, Client, IntoUrl, Request, RequestBuilder, Response};
use crate::errors::BackoffError;
use crate::sleep_for_backoff_time;
enum ErrorTypes {
E429(String),
}
//region reqwest
//region convenience functions
@@ -20,15 +19,23 @@ pub async fn check_backoff_twitch_get<T: IntoUrl>(url: T) -> Result<Response, Bo
check_backoff_twitch(Request::new(reqwest::Method::GET, url.into_url()?)).await
}
pub async fn check_backoff_twitch_get_with_client<T: IntoUrl>(url: T, client: &Client) -> Result<Response, Box<dyn Error>> {
check_backoff_twitch_with_client(Request::new(reqwest::Method::GET, url.into_url()?), client).await
pub async fn check_backoff_twitch_get_with_client<T: IntoUrl>(
url: T,
client: &Client,
) -> Result<Response, Box<dyn Error>> {
check_backoff_twitch_with_client(Request::new(reqwest::Method::GET, url.into_url()?), client)
.await
}
//endregion
//region POST
pub async fn check_backoff_twitch_post<T: IntoUrl, B: Into<Body>>(url: T, headers: Option<HeaderMap>, body: Option<B>) -> Result<Response, Box<dyn Error>> {
pub async fn check_backoff_twitch_post<T: IntoUrl, B: Into<Body>>(
url: T,
headers: Option<HeaderMap>,
body: Option<B>,
) -> Result<Response, Box<dyn Error>> {
let client = Client::new();
check_backoff_twitch_post_with_client(url, headers, body, &client).await
}
@@ -37,7 +44,7 @@ pub async fn check_backoff_twitch_post_with_client<T: IntoUrl, B: Into<Body>>(
url: T,
headers: Option<HeaderMap>,
body: Option<B>,
client: &Client
client: &Client,
) -> Result<Response, Box<dyn Error>> {
let mut request = client.post(url.into_url()?);
@@ -60,9 +67,14 @@ pub async fn check_backoff_twitch(request: Request) -> Result<Response, Box<dyn
//endregion
pub async fn check_backoff_twitch_with_client(request: Request, client: &Client) -> Result<Response, Box<dyn Error>> {
pub async fn check_backoff_twitch_with_client(
request: Request,
client: &Client,
) -> Result<Response, Box<dyn Error>> {
loop {
let r: Request = request.try_clone().ok_or::<BackoffError>("Request is None".into())?;
let r: Request = request
.try_clone()
.ok_or::<BackoffError>("Request is None".into())?;
// Some(v) => Ok(v),
// None => Err("Request is None".into()),
// }?;
@@ -72,10 +84,13 @@ pub async fn check_backoff_twitch_with_client(request: Request, client: &Client)
match status_code.as_u16() {
200 => return Ok(response),
429 => {
let x = &request.headers().get("Ratelimit-Reset").ok_or(BackoffError::new("No rate limit reset given"))?;
let x = &request
.headers()
.get("Ratelimit-Reset")
.ok_or(BackoffError::new("No rate limit reset given"))?;
let value: String = x.to_str()?.to_string();
handle_e429(value).await?;
},
}
_ => return Ok(response),
// _ => todo!("Handle other errors or "),
@@ -85,18 +100,18 @@ pub async fn check_backoff_twitch_with_client(request: Request, client: &Client)
async fn handle_e429(value: String) -> Result<(), Box<dyn Error>> {
let value = value.parse::<i64>()?;
let timestamp = NaiveDateTime::from_timestamp_opt(value, 0)
.ok_or(BackoffError::new(format!("Could not convert the provided timestamp: {}", value)))?;
let timestamp = NaiveDateTime::from_timestamp_opt(value, 0).ok_or(BackoffError::new(
format!("Could not convert the provided timestamp: {}", value),
))?;
let now = chrono::Local::now().naive_local();
if timestamp < now {
sleep_for_backoff_time(1000, true).await;
sleep_for_backoff_time(1, true).await;
return Ok(());
}
let duration = timestamp - now;
let duration = duration.num_milliseconds() as u64;
let duration = duration.num_seconds() as u64;
println!("Sleeping for {} seconds", duration);
sleep_for_backoff_time(duration, true).await;
// tokio::time::sleep(tokio::time::Duration::from_secs(duration)).await;
//TODO: test this somehow
Ok(())
}

View File

@@ -9,38 +9,35 @@ use google_youtube3::hyper::client::HttpConnector;
use google_youtube3::hyper_rustls::HttpsConnector;
use google_youtube3::YouTube;
use crate::sleep_for_backoff_time;
/// the base number for the backoff
///
/// gets used as base^n where n is the amount of backoffs
const YOUTUBE_DEFAULT_BACKOFF_TIME_S: u64 = 2;
/// the max amount a single backoff can be
const YOUTUBE_MAX_BACKOFF_TIME_S: u64 = 3600;
/// the max amount of backoffs that can be done
///
/// after this amount of backoffs, the method will return Err()
const YOUTUBE_MAX_TRIES: u64 = 50;//should result in ~39 hours of maximum backoff time
struct UploadParameters {
video: Video,
path: PathBuf,
mime_type: mime::Mime
}
//TODO: implement backoff for other youtube calls
async fn generic_check_backoff_youtube<
pub async fn generic_check_backoff_youtube<'a, 'b, 'c,
T,
// Fut: Future<Output=Result<google_youtube3::Result<(Response<Body>, T)>, Box<dyn Error>>> ,
Para,
Fut: Future<Output=Result<(Response<Body>, T), google_youtube3::Error>>,
Para>
>
(
client: &YouTube<HttpsConnector<HttpConnector>>,
para: &Para,
function: impl Fn(&YouTube<HttpsConnector<HttpConnector>>, &Para) -> Fut
client: &'a YouTube<HttpsConnector<HttpConnector>>,
para: &'b Para,
function: impl Fn(&'a YouTube<HttpsConnector<HttpConnector>>, &'b Para) -> Fut
)
-> Result<google_youtube3::Result<(Response<Body>, T)>, Box<dyn Error>>
// where Fut: Future<Output=google_youtube3::Result<(Response<Body>, T)>>
{
let mut backoff = 0;
let mut res: google_youtube3::Result<(Response<Body>, T)>;
'try_upload: loop {
// let stream = tokio::fs::File::open(&path).await?;
// let stream = stream.into_std().await;
// println!("Uploading video ({}): {:?}", backoff, path.as_ref().to_str());
//
// res = client.videos().insert(video.clone()).upload(stream, mime_type.clone()).await;
res = function(&client, para).await;
match res {
Ok(_) => break 'try_upload,
@@ -48,17 +45,13 @@ async fn generic_check_backoff_youtube<
println!("Error: {}", e);
if let BadRequest(e1) = &e {
let is_quota_error = get_is_quota_error(&e1);
backoff += 1;
println!("is_quota_error: {}", is_quota_error);
if is_quota_error {
let backoff_time = YOUTUBE_DEFAULT_BACKOFF_TIME_S.pow(backoff);
println!("backoff_time: {}", backoff_time);
if backoff_time > YOUTUBE_MAX_BACKOFF_TIME_S {
println!("quota_error: {}", e);
backoff += 1;
if !wait_for_backoff(backoff).await {
return Err(e.into());
}
//TODO: test this backoff
tokio::time::sleep(std::time::Duration::from_millis(backoff_time * 1000)).await;
} else {
return Err(e.into());
}
@@ -73,69 +66,51 @@ async fn generic_check_backoff_youtube<
Ok(res)
}
pub async fn check_backoff_youtube_upload(client: &YouTube<HttpsConnector<HttpConnector>>,
video: Video,
path: impl AsRef<Path>,
mime_type: mime::Mime)
-> Result<google_youtube3::Result<(Response<Body>, Video)>, Box<dyn Error>>
{
let params = UploadParameters {
video: video.clone(),
path: path.as_ref().into(),
mime_type: mime_type.clone()
};
async fn function(client: &YouTube<HttpsConnector<HttpConnector>>, para: &UploadParameters)
-> Result<(Response<Body>, Video), google_youtube3::Error> {
// let para = para.get_parameters();
// let stream = tokio::fs::File::open(&para.path).await?;
// let stream = stream.into_std().await;
let stream = std::fs::File::open(&para.path)?;
// println!("Uploading video ({}): {:?}", backoff, path.as_ref().to_str());
client.videos().insert(para.video.clone()).upload(stream, para.mime_type.clone()).await
async fn wait_for_backoff<'a>(backoff: u32) -> bool {
let mut backoff_time = YOUTUBE_DEFAULT_BACKOFF_TIME_S.pow(backoff);
println!("backoff_time: {}", backoff_time);
if backoff as u64 > YOUTUBE_MAX_TRIES {
return false;
}
let res = generic_check_backoff_youtube(client, &params, function).await??;
// let mut backoff = 0;
// let mut res: google_youtube3::Result<(Response<Body>, Video)>;
// 'try_upload: loop {
// let stream = tokio::fs::File::open(&path).await?;
// let stream = stream.into_std().await;
// println!("Uploading video ({}): {:?}", backoff, path.as_ref().to_str());
// res = client.videos().insert(video.clone()).upload(stream, mime_type.clone()).await;
// match res {
// Ok(_) => break 'try_upload,
// Err(e) => {
// println!("Error: {}", e);
// if let BadRequest(e1) = &e {
// let is_quota_error = get_is_quota_error(&e1);
// backoff += 1;
//
// println!("is_quota_error: {}", is_quota_error);
// if is_quota_error {
// let backoff_time = YOUTUBE_DEFAULT_BACKOFF_TIME_S.pow(backoff);
// println!("backoff_time: {}", backoff_time);
// if backoff_time > YOUTUBE_MAX_BACKOFF_TIME_S {
// return Err(e.into());
// }
// //TODO: test this backoff
// tokio::time::sleep(std::time::Duration::from_millis(backoff_time * 1000)).await;
// } else {
// return Err(e.into());
// }
// } else {
// return Err(e.into());
// }
// }
// }
// }
//
let res: google_youtube3::Result<(Response<Body>, Video)> = Ok(res);
Ok(res)
if backoff_time > YOUTUBE_MAX_BACKOFF_TIME_S {
backoff_time = YOUTUBE_MAX_BACKOFF_TIME_S;
}
sleep_for_backoff_time(backoff_time, false).await;
true
}
//
// pub async fn check_backoff_youtube_upload(client: &YouTube<HttpsConnector<HttpConnector>>,
// video: Video,
// path: impl AsRef<Path>,
// mime_type: mime::Mime)
// -> Result<google_youtube3::Result<(Response<Body>, Video)>, Box<dyn Error>>
// {
// struct UploadParameters {
// video: Video,
// path: PathBuf,
// mime_type: mime::Mime
// }
//
// let params = UploadParameters {
// video: video.clone(),
// path: path.as_ref().into(),
// mime_type: mime_type.clone()
// };
//
// async fn function(client: &YouTube<HttpsConnector<HttpConnector>>, para: &UploadParameters)
// -> Result<(Response<Body>, Video), google_youtube3::Error> {
// let stream = tokio::fs::File::open(&para.path).await?;
// let stream = stream.into_std().await;
// client.videos().insert(para.video.clone()).upload(stream, para.mime_type.clone()).await
// }
// let res = generic_check_backoff_youtube::<Video, UploadParameters, _>
// (client, &params, function).await??;
//
// let res: google_youtube3::Result<(Response<Body>, Video)> = Ok(res);
// Ok(res)
// }
fn get_is_quota_error(e: &serde_json::value::Value) -> bool {
let is_quota_error = e.get("error")
.and_then(|e| e.get("errors"))
@@ -145,6 +120,8 @@ fn get_is_quota_error(e: &serde_json::value::Value) -> bool {
.and_then(|e|
if e == "quotaExceeded" {
Some(())
} else if e == "uploadLimitExceeded" {
Some(())
} else {
None
}