diff --git a/Cargo.toml b/Cargo.toml index 80b5201f71..e6102dc7aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ name = "client" path = "src/rust/lib.rs" [dependencies] +google-apis-common = { version = "4.0", path = "google-apis-common" } clap = "2" http = "^0.2" hyper = "0.14" diff --git a/google-apis-common/Cargo.toml b/google-apis-common/Cargo.toml new file mode 100644 index 0000000000..f10eb52cee --- /dev/null +++ b/google-apis-common/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "google-apis-common" +version = "4.0.0" +authors = ["Sebastian Thiel "] +repository = "https://github.com/Byron/google-apis-rs" +homepage = "https://github.com/Byron/google-apis-rs/google-apis-common" +documentation = "https://docs.rs/google-apis-common" +license = "MIT" +keywords = ["google", "web", "api", "common"] +edition = "2021" + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +mime = "^ 0.2.0" +serde = "^ 1.0" +serde_json = "^ 1.0" +## TODO: Make yup-oauth2 optional +## yup-oauth2 = { version = "^ 7.0", optional = true } +yup-oauth2 = "^ 7.0" +itertools = "^ 0.10" +hyper = "^ 0.14" +http = "^0.2" +tokio = "^1.0" +tower-service = "^0.3.1" + + diff --git a/google-apis-common/src/lib.rs b/google-apis-common/src/lib.rs new file mode 100644 index 0000000000..b593198145 --- /dev/null +++ b/google-apis-common/src/lib.rs @@ -0,0 +1,863 @@ +use std::error; +use std::error::Error as StdError; +use std::fmt::{self, Display}; +use std::future::Future; +use std::io::{self, Cursor, Read, Seek, SeekFrom, Write}; +use std::pin::Pin; +use std::str::FromStr; +use std::thread::sleep; +use std::time::Duration; + +use itertools::Itertools; + +use hyper::http::Uri; + +use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT}; +use hyper::Method; +use hyper::StatusCode; + +use mime::{Attr, Mime, SubLevel, TopLevel, Value}; + +use serde_json as json; + +use tokio::io::{AsyncRead, AsyncWrite}; +use tower_service; + +pub use yup_oauth2 as oauth2; + +const LINE_ENDING: &str = "\r\n"; + +pub enum Retry { + /// Signal you don't want to retry + Abort, + /// Signals you want to retry after the given duration + After(Duration), +} + +/// Identifies the Hub. There is only one per library, this trait is supposed +/// to make intended use more explicit. +/// The hub allows to access all resource methods more easily. +pub trait Hub {} + +/// Identifies types for building methods of a particular resource type +pub trait MethodsBuilder {} + +/// Identifies types which represent builders for a particular resource method +pub trait CallBuilder {} + +/// Identifies types which can be inserted and deleted. +/// Types with this trait are most commonly used by clients of this API. +pub trait Resource {} + +/// Identifies types which are used in API responses. +pub trait ResponseResult {} + +/// Identifies types which are used in API requests. +pub trait RequestValue {} + +/// Identifies types which are not actually used by the API +/// This might be a bug within the google API schema. +pub trait UnusedType {} + +/// Identifies types which are only used as part of other types, which +/// usually are carrying the `Resource` trait. +pub trait Part {} + +/// Identifies types which are only used by other types internally. +/// They have no special meaning, this trait just marks them for completeness. +pub trait NestedType {} + +/// A utility to specify reader types which provide seeking capabilities too +pub trait ReadSeek: Seek + Read + Send {} +impl ReadSeek for T {} + +/// A trait for all types that can convert themselves into a *parts* string +pub trait ToParts { + fn to_parts(&self) -> String; +} + +/// A trait specifying functionality to help controlling any request performed by the API. +/// The trait has a conservative default implementation. +/// +/// It contains methods to deal with all common issues, as well with the ones related to +/// uploading media +pub trait Delegate: Send { + /// Called at the beginning of any API request. The delegate should store the method + /// information if he is interesting in knowing more context when further calls to it + /// are made. + /// The matching `finished()` call will always be made, no matter whether or not the API + /// request was successful. That way, the delegate may easily maintain a clean state + /// between various API calls. + fn begin(&mut self, _info: MethodInfo) {} + + /// Called whenever there is an [HttpError](hyper::Error), usually if there are network problems. + /// + /// If you choose to retry after a duration, the duration should be chosen using the + /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff). + /// + /// Return retry information. + fn http_error(&mut self, _err: &hyper::Error) -> Retry { + Retry::Abort + } + + /// Called whenever there is the need for your applications API key after + /// the official authenticator implementation didn't provide one, for some reason. + /// If this method returns None as well, the underlying operation will fail + fn api_key(&mut self) -> Option { + None + } + + // TODO: Remove oauth2::Error + /// Called whenever the Authenticator didn't yield a token. The delegate + /// may attempt to provide one, or just take it as a general information about the + /// impending failure. + /// The given Error provides information about why the token couldn't be acquired in the + /// first place + fn token(&mut self, err: &oauth2::Error) -> Option { + let _ = err; + None + } + + /// Called during resumable uploads to provide a URL for the impending upload. + /// It was saved after a previous call to `store_upload_url(...)`, and if not None, + /// will be used instead of asking the server for a new upload URL. + /// This is useful in case a previous resumable upload was aborted/canceled, but should now + /// be resumed. + /// The returned URL will be used exactly once - if it fails again and the delegate allows + /// to retry, we will ask the server for a new upload URL. + fn upload_url(&mut self) -> Option { + None + } + + /// Called after we have retrieved a new upload URL for a resumable upload to store it + /// in case we fail or cancel. That way, we can attempt to resume the upload later, + /// see `upload_url()`. + /// It will also be called with None after a successful upload, which allows the delegate + /// to forget the URL. That way, we will not attempt to resume an upload that has already + /// finished. + fn store_upload_url(&mut self, url: Option<&str>) { + let _ = url; + } + + /// Called whenever a server response could not be decoded from json. + /// It's for informational purposes only, the caller will return with an error + /// accordingly. + /// + /// # Arguments + /// + /// * `json_encoded_value` - The json-encoded value which failed to decode. + /// * `json_decode_error` - The decoder error + fn response_json_decode_error( + &mut self, + json_encoded_value: &str, + json_decode_error: &json::Error, + ) { + let _ = json_encoded_value; + let _ = json_decode_error; + } + + /// Called whenever the http request returns with a non-success status code. + /// This can involve authentication issues, or anything else that very much + /// depends on the used API method. + /// The delegate should check the status, header and decoded json error to decide + /// whether to retry or not. In the latter case, the underlying call will fail. + /// + /// If you choose to retry after a duration, the duration should be chosen using the + /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff). + fn http_failure( + &mut self, + _: &hyper::Response, + _err: Option, + ) -> Retry { + Retry::Abort + } + + /// Called prior to sending the main request of the given method. It can be used to time + /// the call or to print progress information. + /// It's also useful as you can be sure that a request will definitely be made. + fn pre_request(&mut self) {} + + /// Return the size of each chunk of a resumable upload. + /// Must be a power of two, with 1<<18 being the smallest allowed chunk size. + /// Will be called once before starting any resumable upload. + fn chunk_size(&mut self) -> u64 { + 1 << 23 + } + + /// Called before the given chunk is uploaded to the server. + /// If true is returned, the upload will be interrupted. + /// However, it may be resumable if you stored the upload URL in a previous call + /// to `store_upload_url()` + fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool { + let _ = chunk; + false + } + + /// Called before the API request method returns, in every case. It can be used to clean up + /// internal state between calls to the API. + /// This call always has a matching call to `begin(...)`. + /// + /// # Arguments + /// + /// * `is_success` - a true value indicates the operation was successful. If false, you should + /// discard all values stored during `store_upload_url`. + fn finished(&mut self, is_success: bool) { + let _ = is_success; + } +} + +/// A delegate with a conservative default implementation, which is used if no other delegate is +/// set. +#[derive(Default)] +pub struct DefaultDelegate; + +impl Delegate for DefaultDelegate {} + +#[derive(Debug)] +pub enum Error { + /// The http connection failed + HttpError(hyper::Error), + + /// An attempt was made to upload a resource with size stored in field `.0` + /// even though the maximum upload size is what is stored in field `.1`. + UploadSizeLimitExceeded(u64, u64), + + /// Represents information about a request that was not understood by the server. + /// Details are included. + BadRequest(serde_json::Value), + + /// We needed an API key for authentication, but didn't obtain one. + /// Neither through the authenticator, nor through the Delegate. + MissingAPIKey, + + // TODO: Remove oauth2::Error + /// We required a Token, but didn't get one from the Authenticator + MissingToken(oauth2::Error), + + /// The delgate instructed to cancel the operation + Cancelled, + + /// An additional, free form field clashed with one of the built-in optional ones + FieldClash(&'static str), + + /// Shows that we failed to decode the server response. + /// This can happen if the protocol changes in conjunction with strict json decoding. + JsonDecodeError(String, json::Error), + + /// Indicates an HTTP repsonse with a non-success status code + Failure(hyper::Response), + + /// An IO error occurred while reading a stream into memory + Io(std::io::Error), +} + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Io(ref err) => err.fmt(f), + Error::HttpError(ref err) => err.fmt(f), + Error::UploadSizeLimitExceeded(ref resource_size, ref max_size) => writeln!( + f, + "The media size {} exceeds the maximum allowed upload size of {}", + resource_size, max_size + ), + Error::MissingAPIKey => { + (writeln!( + f, + "The application's API key was not found in the configuration" + )) + .ok(); + writeln!( + f, + "It is used as there are no Scopes defined for this method." + ) + } + Error::BadRequest(ref message) => { + writeln!(f, "Bad Request: {}", message)?; + Ok(()) + } + // TODO: Remove oauth2::Error + Error::MissingToken(ref err) => { + writeln!(f, "Token retrieval failed with error: {}", err) + } + Error::Cancelled => writeln!(f, "Operation cancelled by delegate"), + Error::FieldClash(field) => writeln!( + f, + "The custom parameter '{}' is already provided natively by the CallBuilder.", + field + ), + Error::JsonDecodeError(ref json_str, ref err) => writeln!(f, "{}: {}", err, json_str), + Error::Failure(ref response) => { + writeln!(f, "Http status indicates failure: {:?}", response) + } + } + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match *self { + Error::HttpError(ref err) => err.source(), + Error::JsonDecodeError(_, ref err) => err.source(), + _ => None, + } + } +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Error::Io(err) + } +} + +/// A universal result type used as return for all calls. +pub type Result = std::result::Result; + +/// Contains information about an API request. +pub struct MethodInfo { + pub id: &'static str, + pub http_method: Method, +} + +const BOUNDARY: &str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d"; + +/// Provides a `Read` interface that converts multiple parts into the protocol +/// identified by [RFC2387](https://tools.ietf.org/html/rfc2387). +/// **Note**: This implementation is just as rich as it needs to be to perform uploads +/// to google APIs, and might not be a fully-featured implementation. +#[derive(Default)] +pub struct MultiPartReader<'a> { + raw_parts: Vec<(HeaderMap, &'a mut (dyn Read + Send))>, + current_part: Option<(Cursor>, &'a mut (dyn Read + Send))>, + last_part_boundary: Option>>, +} + +impl<'a> MultiPartReader<'a> { + /// Reserve memory for exactly the given amount of parts + pub fn reserve_exact(&mut self, cap: usize) { + self.raw_parts.reserve_exact(cap); + } + + /// Add a new part to the queue of parts to be read on the first `read` call. + /// + /// # Arguments + /// + /// `headers` - identifying the body of the part. It's similar to the header + /// in an ordinary single-part call, and should thus contain the + /// same information. + /// `reader` - a reader providing the part's body + /// `size` - the amount of bytes provided by the reader. It will be put onto the header as + /// content-size. + /// `mime` - It will be put onto the content type + pub fn add_part( + &mut self, + reader: &'a mut (dyn Read + Send), + size: u64, + mime_type: Mime, + ) -> &mut MultiPartReader<'a> { + let mut headers = HeaderMap::new(); + headers.insert( + CONTENT_TYPE, + hyper::header::HeaderValue::from_str(&format!("{}", mime_type)).unwrap(), + ); + headers.insert(CONTENT_LENGTH, size.into()); + self.raw_parts.push((headers, reader)); + self + } + + /// Returns the mime-type representing our multi-part message. + /// Use it with the ContentType header. + pub fn mime_type(&self) -> Mime { + Mime( + TopLevel::Multipart, + SubLevel::Ext("related".to_string()), + vec![( + Attr::Ext("boundary".to_string()), + Value::Ext(BOUNDARY.to_string()), + )], + ) + } + + /// Returns true if we are totally used + fn is_depleted(&self) -> bool { + self.raw_parts.is_empty() + && self.current_part.is_none() + && self.last_part_boundary.is_none() + } + + /// Returns true if we are handling our last part + fn is_last_part(&self) -> bool { + self.raw_parts.is_empty() && self.current_part.is_some() + } +} + +impl<'a> Read for MultiPartReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match ( + self.raw_parts.len(), + self.current_part.is_none(), + self.last_part_boundary.is_none(), + ) { + (_, _, false) => { + let br = self + .last_part_boundary + .as_mut() + .unwrap() + .read(buf) + .unwrap_or(0); + if br < buf.len() { + self.last_part_boundary = None; + } + return Ok(br); + } + (0, true, true) => return Ok(0), + (n, true, _) if n > 0 => { + let (headers, reader) = self.raw_parts.remove(0); + let mut c = Cursor::new(Vec::::new()); + // TODO: The first line ending should be omitted for the first part, + // fortunately Google's API serves don't seem to mind. + (write!( + &mut c, + "{}--{}{}{}{}{}", + LINE_ENDING, + BOUNDARY, + LINE_ENDING, + headers + .iter() + .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap())) + .join(LINE_ENDING), + LINE_ENDING, + LINE_ENDING, + )) + .unwrap(); + c.seek(SeekFrom::Start(0)).unwrap(); + self.current_part = Some((c, reader)); + } + _ => {} + } + + // read headers as long as possible + let (hb, rr) = { + let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap(); + let b = c.read(buf).unwrap_or(0); + (b, reader.read(&mut buf[b..])) + }; + + match rr { + Ok(bytes_read) => { + if hb < buf.len() && bytes_read == 0 { + if self.is_last_part() { + // before clearing the last part, we will add the boundary that + // will be written last + self.last_part_boundary = Some(Cursor::new( + format!("{}--{}--{}", LINE_ENDING, BOUNDARY, LINE_ENDING).into_bytes(), + )) + } + // We are depleted - this can trigger the next part to come in + self.current_part = None; + } + let mut total_bytes_read = hb + bytes_read; + while total_bytes_read < buf.len() && !self.is_depleted() { + match self.read(&mut buf[total_bytes_read..]) { + Ok(br) => total_bytes_read += br, + Err(err) => return Err(err), + } + } + Ok(total_bytes_read) + } + Err(err) => { + // fail permanently + self.current_part = None; + self.last_part_boundary = None; + self.raw_parts.clear(); + Err(err) + } + } + } +} + +/// The `X-Upload-Content-Type` header. +/// +/// Generated via rustc --pretty expanded -Z unstable-options, and manually +/// processed to be more readable. +#[derive(PartialEq, Debug, Clone)] +pub struct XUploadContentType(pub Mime); + +impl ::std::ops::Deref for XUploadContentType { + type Target = Mime; + fn deref(&self) -> &Mime { + &self.0 + } +} +impl ::std::ops::DerefMut for XUploadContentType { + fn deref_mut(&mut self) -> &mut Mime { + &mut self.0 + } +} +impl Display for XUploadContentType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +#[derive(Clone, PartialEq, Debug)] +pub struct Chunk { + pub first: u64, + pub last: u64, +} + +impl fmt::Display for Chunk { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + (write!(fmt, "{}-{}", self.first, self.last)).ok(); + Ok(()) + } +} + +impl FromStr for Chunk { + type Err = &'static str; + + /// NOTE: only implements `%i-%i`, not `*` + fn from_str(s: &str) -> std::result::Result { + let parts: Vec<&str> = s.split('-').collect(); + if parts.len() != 2 { + return Err("Expected two parts: %i-%i"); + } + Ok(Chunk { + first: match FromStr::from_str(parts[0]) { + Ok(d) => d, + _ => return Err("Couldn't parse 'first' as digit"), + }, + last: match FromStr::from_str(parts[1]) { + Ok(d) => d, + _ => return Err("Couldn't parse 'last' as digit"), + }, + }) + } +} + +/// Implements the Content-Range header, for serialization only +#[derive(Clone, PartialEq, Debug)] +pub struct ContentRange { + pub range: Option, + pub total_length: u64, +} + +impl ContentRange { + pub fn header_value(&self) -> String { + format!( + "bytes {}/{}", + match self.range { + Some(ref c) => format!("{}", c), + None => "*".to_string(), + }, + self.total_length + ) + } +} + +#[derive(Clone, PartialEq, Debug)] +pub struct RangeResponseHeader(pub Chunk); + +impl RangeResponseHeader { + fn from_bytes(raw: &[u8]) -> Self { + if !raw.is_empty() { + if let Ok(s) = std::str::from_utf8(raw) { + const PREFIX: &str = "bytes "; + if let Some(stripped) = s.strip_prefix(PREFIX) { + if let Ok(c) = ::from_str(&stripped) { + return RangeResponseHeader(c); + } + } + } + } + + panic!("Unable to parse Range header {:?}", raw) + } +} + +/// A utility type to perform a resumable upload from start to end. +pub struct ResumableUploadHelper<'a, A: 'a, S> +where + S: tower_service::Service + Clone + Send + Sync + 'static, + S::Response: hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + S::Future: Send + Unpin + 'static, + S::Error: Into>, +{ + pub client: &'a hyper::client::Client< + S, + hyper::body::Body, + >, + pub delegate: &'a mut dyn Delegate, + pub start_at: Option, + pub auth: &'a A, + pub user_agent: &'a str, + pub auth_header: String, + pub url: &'a str, + pub reader: &'a mut dyn ReadSeek, + pub media_type: Mime, + pub content_length: u64, +} + +impl<'a, A, S> ResumableUploadHelper<'a, A, S> +where + S: tower_service::Service + Clone + Send + Sync + 'static, + S::Response: hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + S::Future: Send + Unpin + 'static, + S::Error: Into>, +{ + async fn query_transfer_status( + &mut self, + ) -> std::result::Result>> { + loop { + match self + .client + .request( + hyper::Request::builder() + .method(hyper::Method::POST) + .uri(self.url) + .header(USER_AGENT, self.user_agent.to_string()) + .header( + "Content-Range", + ContentRange { + range: None, + total_length: self.content_length, + } + .header_value(), + ) + .header(AUTHORIZATION, self.auth_header.clone()) + .body(hyper::body::Body::empty()) + .unwrap(), + ) + .await + { + Ok(r) => { + // 308 = resume-incomplete == PermanentRedirect + let headers = r.headers().clone(); + let h: RangeResponseHeader = match headers.get("Range") { + Some(hh) if r.status() == StatusCode::PERMANENT_REDIRECT => { + RangeResponseHeader::from_bytes(hh.as_bytes()) + } + None | Some(_) => { + if let Retry::After(d) = self.delegate.http_failure(&r, None) { + sleep(d); + continue; + } + return Err(Ok(r)); + } + }; + return Ok(h.0.last); + } + Err(err) => { + if let Retry::After(d) = self.delegate.http_error(&err) { + sleep(d); + continue; + } + return Err(Err(err)); + } + } + } + } + + /// returns None if operation was cancelled by delegate, or the HttpResult. + /// It can be that we return the result just because we didn't understand the status code - + /// caller should check for status himself before assuming it's OK to use + pub async fn upload(&mut self) -> Option>> { + let mut start = match self.start_at { + Some(s) => s, + None => match self.query_transfer_status().await { + Ok(s) => s, + Err(result) => return Some(result), + }, + }; + + const MIN_CHUNK_SIZE: u64 = 1 << 18; + let chunk_size = match self.delegate.chunk_size() { + cs if cs > MIN_CHUNK_SIZE => cs, + _ => MIN_CHUNK_SIZE, + }; + + loop { + self.reader.seek(SeekFrom::Start(start)).unwrap(); + + let request_size = match self.content_length - start { + rs if rs > chunk_size => chunk_size, + rs => rs, + }; + + let mut section_reader = self.reader.take(request_size); + let mut req_bytes = vec![]; + section_reader.read_to_end(&mut req_bytes).unwrap(); + let range_header = ContentRange { + range: Some(Chunk { + first: start, + last: start + request_size - 1, + }), + total_length: self.content_length, + }; + if self.delegate.cancel_chunk_upload(&range_header) { + return None; + } + let res = self + .client + .request( + hyper::Request::builder() + .uri(self.url) + .method(hyper::Method::POST) + .header("Content-Range", range_header.header_value()) + .header(CONTENT_TYPE, format!("{}", self.media_type)) + .header(USER_AGENT, self.user_agent.to_string()) + .body(hyper::body::Body::from(req_bytes)) + .unwrap(), + ) + .await; + match res { + Ok(res) => { + start += request_size; + + if res.status() == StatusCode::PERMANENT_REDIRECT { + continue; + } + + let (res_parts, res_body) = res.into_parts(); + let res_body = match hyper::body::to_bytes(res_body).await { + Ok(res_body) => res_body.into_iter().collect(), + Err(err) => return Some(Err(err)), + }; + let res_body_string: String = String::from_utf8(res_body).unwrap(); + let reconstructed_result = + hyper::Response::from_parts(res_parts, res_body_string.clone().into()); + + if !reconstructed_result.status().is_success() { + if let Retry::After(d) = self.delegate.http_failure( + &reconstructed_result, + json::from_str(&res_body_string).ok(), + ) { + sleep(d); + continue; + } + } + return Some(Ok(reconstructed_result)); + } + Err(err) => { + if let Retry::After(d) = self.delegate.http_error(&err) { + sleep(d); + continue; + } + return Some(Err(err)); + } + } + } + } +} + +// TODO(ST): Allow sharing common code between program types +pub fn remove_json_null_values(value: &mut json::value::Value) { + match *value { + json::value::Value::Object(ref mut map) => { + let mut for_removal = Vec::new(); + + for (key, mut value) in map.iter_mut() { + if value.is_null() { + for_removal.push(key.clone()); + } else { + remove_json_null_values(&mut value); + } + } + + for key in &for_removal { + map.remove(key); + } + } + json::value::Value::Array(ref mut arr) => { + let mut i = 0; + while i < arr.len() { + if arr[i].is_null() { + arr.remove(i); + } else { + remove_json_null_values(&mut arr[i]); + i += 1; + } + } + } + _ => {} + } +} + +// Borrowing the body object as mutable and converts it to a string +pub async fn get_body_as_string(res_body: &mut hyper::Body) -> String { + let res_body_buf = hyper::body::to_bytes(res_body).await.unwrap(); + let res_body_string = String::from_utf8_lossy(&res_body_buf); + res_body_string.to_string() +} + +// TODO: Simplify this to Option +type TokenResult = std::result::Result, oauth2::Error>; + +pub trait GetToken: GetTokenClone { + /// Called whenever there is the need for an oauth token after + /// the official authenticator implementation didn't provide one, for some reason. + /// If this method returns None as well, the underlying operation will fail + fn get_token<'a>(&'a self, _scopes: &'a [&str]) -> Pin + 'a>> { + Box::pin(async move { Ok(None) }) + } +} + +pub trait GetTokenClone { + fn clone_box(&self) -> Box; +} + +impl GetTokenClone for T +where + T: 'static + GetToken + Clone, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} + +impl GetToken for String { + fn get_token<'a>(&'a self, _scopes: &'a [&str]) -> Pin + 'a>> { + Box::pin(async move { Ok(Some(self.clone())) }) + } +} + +/// In the event that the API endpoint does not require an oauth2 token, `NoToken` should be provided to the hub to avoid specifying an +/// authenticator. +#[derive(Default, Clone)] +pub struct NoToken; + +impl GetToken for NoToken {} + +// TODO: Make this optional +// #[cfg(feature = "yup-oauth2")] +mod yup_oauth2_impl { + use core::future::Future; + use core::pin::Pin; + + use super::{GetToken, TokenResult}; + + use tower_service::Service; + use yup_oauth2::authenticator::Authenticator; + use tokio::io::{AsyncRead, AsyncWrite}; + use http::Uri; + use hyper::client::connect::Connection; + + + impl GetToken for Authenticator where + S: Service + Clone + Send + Sync + 'static, + S::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + S::Future: Send + Unpin + 'static, + S::Error: Into> { + fn get_token<'a>(&'a self, scopes: &'a [&str]) -> Pin + 'a>> { + Box::pin(async move { + self.token(scopes).await.map(|t| Some(t.as_str().to_owned())) + }) + } + } +} diff --git a/src/generator/templates/Cargo.toml.mako b/src/generator/templates/Cargo.toml.mako index 7885c43435..584e9755fe 100644 --- a/src/generator/templates/Cargo.toml.mako +++ b/src/generator/templates/Cargo.toml.mako @@ -31,13 +31,12 @@ anyhow = "^ 1.0" hyper-rustls = "0.23.0" ## Must match the one hyper uses, otherwise there are duplicate similarly named `Mime` structs mime = "^ 0.2.0" -serde = "^ 1.0" +serde = { version = "^ 1.0", features = ["derive"] } serde_json = "^ 1.0" -serde_derive = "^ 1.0" -## TODO: Make yup-oauth2 optional -## yup-oauth2 = { version = "^ 7.0", optional = true } -yup-oauth2 = "^ 7.0" itertools = "^ 0.10" +% if 'is_executable' not in cargo: +google-apis-common = { path = "../../google-apis-common", version = "4.0" } +% endif % for dep in cargo.get('dependencies', list()): ${dep} % endfor @@ -50,7 +49,6 @@ ${dep} crate_name_we_depend_on = library_to_crate_name(api_name, suffix=make.depends_on_suffix) %>\ - % if make.depends_on_suffix is not None: [dependencies.${crate_name_we_depend_on}] diff --git a/src/generator/templates/api/api.rs.mako b/src/generator/templates/api/api.rs.mako index cc81bcf07d..4de2146c9c 100644 --- a/src/generator/templates/api/api.rs.mako +++ b/src/generator/templates/api/api.rs.mako @@ -30,7 +30,9 @@ use http::Uri; use hyper::client::connect; use tokio::io::{AsyncRead, AsyncWrite}; use tower_service; -use crate::{client, client::GetToken}; +use serde::{Serialize, Deserialize}; + +use crate::{client, client::GetToken, client::oauth2}; // ############## // UTILITIES ### diff --git a/src/generator/templates/api/lib.rs.mako b/src/generator/templates/api/lib.rs.mako index b6062b2cdf..1bf1ef0faa 100644 --- a/src/generator/templates/api/lib.rs.mako +++ b/src/generator/templates/api/lib.rs.mako @@ -40,22 +40,14 @@ ${lib.docs(c)} <%util:gen_info source="${self.uri}" />\ -#[macro_use] -extern crate serde_derive; - // Re-export the hyper and hyper_rustls crate, they are required to build the hub -pub extern crate hyper; -pub extern crate hyper_rustls; -extern crate serde; -extern crate serde_json; -// Re-export the yup_oauth2 crate, that is required to call some methods of the hub and the client -pub extern crate yup_oauth2 as oauth2; -extern crate mime; -extern crate url; +pub use hyper; +pub use hyper_rustls; +pub extern crate google_apis_common as client; pub mod api; -pub mod client; // Re-export the hub type and some basic client structs pub use api::${hub_type}; -pub use client::{Result, Error, Delegate}; +// Re-export the yup_oauth2 crate, that is required to call some methods of the hub and the client +pub use client::{Result, Error, Delegate, oauth2}; diff --git a/src/rust/api/client.rs b/src/rust/api/client.rs index 25b2459830..c7ad4dd216 100644 --- a/src/rust/api/client.rs +++ b/src/rust/api/client.rs @@ -1,864 +1,2 @@ -use std::error; -use std::error::Error as StdError; -use std::fmt::{self, Display}; -use std::future::Future; -use std::io::{self, Cursor, Read, Seek, SeekFrom, Write}; -use std::pin::Pin; -use std::str::FromStr; -use std::thread::sleep; -use std::time::Duration; - -use itertools::Itertools; - -use hyper::http::Uri; - -use hyper::body::Buf; -use hyper::client::connect; -use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT}; -use hyper::Method; -use hyper::StatusCode; - -use mime::{Attr, Mime, SubLevel, TopLevel, Value}; - -use serde_json as json; - -use tokio::io::{AsyncRead, AsyncWrite}; -use tower_service; - -const LINE_ENDING: &str = "\r\n"; - -pub enum Retry { - /// Signal you don't want to retry - Abort, - /// Signals you want to retry after the given duration - After(Duration), -} - -/// Identifies the Hub. There is only one per library, this trait is supposed -/// to make intended use more explicit. -/// The hub allows to access all resource methods more easily. -pub trait Hub {} - -/// Identifies types for building methods of a particular resource type -pub trait MethodsBuilder {} - -/// Identifies types which represent builders for a particular resource method -pub trait CallBuilder {} - -/// Identifies types which can be inserted and deleted. -/// Types with this trait are most commonly used by clients of this API. -pub trait Resource {} - -/// Identifies types which are used in API responses. -pub trait ResponseResult {} - -/// Identifies types which are used in API requests. -pub trait RequestValue {} - -/// Identifies types which are not actually used by the API -/// This might be a bug within the google API schema. -pub trait UnusedType {} - -/// Identifies types which are only used as part of other types, which -/// usually are carrying the `Resource` trait. -pub trait Part {} - -/// Identifies types which are only used by other types internally. -/// They have no special meaning, this trait just marks them for completeness. -pub trait NestedType {} - -/// A utility to specify reader types which provide seeking capabilities too -pub trait ReadSeek: Seek + Read + Send {} -impl ReadSeek for T {} - -/// A trait for all types that can convert themselves into a *parts* string -pub trait ToParts { - fn to_parts(&self) -> String; -} - -/// A trait specifying functionality to help controlling any request performed by the API. -/// The trait has a conservative default implementation. -/// -/// It contains methods to deal with all common issues, as well with the ones related to -/// uploading media -pub trait Delegate: Send { - /// Called at the beginning of any API request. The delegate should store the method - /// information if he is interesting in knowing more context when further calls to it - /// are made. - /// The matching `finished()` call will always be made, no matter whether or not the API - /// request was successful. That way, the delegate may easily maintain a clean state - /// between various API calls. - fn begin(&mut self, _info: MethodInfo) {} - - /// Called whenever there is an [HttpError](hyper::Error), usually if there are network problems. - /// - /// If you choose to retry after a duration, the duration should be chosen using the - /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff). - /// - /// Return retry information. - fn http_error(&mut self, _err: &hyper::Error) -> Retry { - Retry::Abort - } - - /// Called whenever there is the need for your applications API key after - /// the official authenticator implementation didn't provide one, for some reason. - /// If this method returns None as well, the underlying operation will fail - fn api_key(&mut self) -> Option { - None - } - - // TODO: Remove oauth2::Error - /// Called whenever the Authenticator didn't yield a token. The delegate - /// may attempt to provide one, or just take it as a general information about the - /// impending failure. - /// The given Error provides information about why the token couldn't be acquired in the - /// first place - fn token(&mut self, err: &oauth2::Error) -> Option { - let _ = err; - None - } - - /// Called during resumable uploads to provide a URL for the impending upload. - /// It was saved after a previous call to `store_upload_url(...)`, and if not None, - /// will be used instead of asking the server for a new upload URL. - /// This is useful in case a previous resumable upload was aborted/canceled, but should now - /// be resumed. - /// The returned URL will be used exactly once - if it fails again and the delegate allows - /// to retry, we will ask the server for a new upload URL. - fn upload_url(&mut self) -> Option { - None - } - - /// Called after we have retrieved a new upload URL for a resumable upload to store it - /// in case we fail or cancel. That way, we can attempt to resume the upload later, - /// see `upload_url()`. - /// It will also be called with None after a successful upload, which allows the delegate - /// to forget the URL. That way, we will not attempt to resume an upload that has already - /// finished. - fn store_upload_url(&mut self, url: Option<&str>) { - let _ = url; - } - - /// Called whenever a server response could not be decoded from json. - /// It's for informational purposes only, the caller will return with an error - /// accordingly. - /// - /// # Arguments - /// - /// * `json_encoded_value` - The json-encoded value which failed to decode. - /// * `json_decode_error` - The decoder error - fn response_json_decode_error( - &mut self, - json_encoded_value: &str, - json_decode_error: &json::Error, - ) { - let _ = json_encoded_value; - let _ = json_decode_error; - } - - /// Called whenever the http request returns with a non-success status code. - /// This can involve authentication issues, or anything else that very much - /// depends on the used API method. - /// The delegate should check the status, header and decoded json error to decide - /// whether to retry or not. In the latter case, the underlying call will fail. - /// - /// If you choose to retry after a duration, the duration should be chosen using the - /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff). - fn http_failure( - &mut self, - _: &hyper::Response, - _err: Option, - ) -> Retry { - Retry::Abort - } - - /// Called prior to sending the main request of the given method. It can be used to time - /// the call or to print progress information. - /// It's also useful as you can be sure that a request will definitely be made. - fn pre_request(&mut self) {} - - /// Return the size of each chunk of a resumable upload. - /// Must be a power of two, with 1<<18 being the smallest allowed chunk size. - /// Will be called once before starting any resumable upload. - fn chunk_size(&mut self) -> u64 { - 1 << 23 - } - - /// Called before the given chunk is uploaded to the server. - /// If true is returned, the upload will be interrupted. - /// However, it may be resumable if you stored the upload URL in a previous call - /// to `store_upload_url()` - fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool { - let _ = chunk; - false - } - - /// Called before the API request method returns, in every case. It can be used to clean up - /// internal state between calls to the API. - /// This call always has a matching call to `begin(...)`. - /// - /// # Arguments - /// - /// * `is_success` - a true value indicates the operation was successful. If false, you should - /// discard all values stored during `store_upload_url`. - fn finished(&mut self, is_success: bool) { - let _ = is_success; - } -} - -/// A delegate with a conservative default implementation, which is used if no other delegate is -/// set. -#[derive(Default)] -pub struct DefaultDelegate; - -impl Delegate for DefaultDelegate {} - -#[derive(Debug)] -pub enum Error { - /// The http connection failed - HttpError(hyper::Error), - - /// An attempt was made to upload a resource with size stored in field `.0` - /// even though the maximum upload size is what is stored in field `.1`. - UploadSizeLimitExceeded(u64, u64), - - /// Represents information about a request that was not understood by the server. - /// Details are included. - BadRequest(serde_json::Value), - - /// We needed an API key for authentication, but didn't obtain one. - /// Neither through the authenticator, nor through the Delegate. - MissingAPIKey, - - // TODO: Remove oauth2::Error - /// We required a Token, but didn't get one from the Authenticator - MissingToken(oauth2::Error), - - /// The delgate instructed to cancel the operation - Cancelled, - - /// An additional, free form field clashed with one of the built-in optional ones - FieldClash(&'static str), - - /// Shows that we failed to decode the server response. - /// This can happen if the protocol changes in conjunction with strict json decoding. - JsonDecodeError(String, json::Error), - - /// Indicates an HTTP repsonse with a non-success status code - Failure(hyper::Response), - - /// An IO error occurred while reading a stream into memory - Io(std::io::Error), -} - -impl Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - Error::Io(ref err) => err.fmt(f), - Error::HttpError(ref err) => err.fmt(f), - Error::UploadSizeLimitExceeded(ref resource_size, ref max_size) => writeln!( - f, - "The media size {} exceeds the maximum allowed upload size of {}", - resource_size, max_size - ), - Error::MissingAPIKey => { - (writeln!( - f, - "The application's API key was not found in the configuration" - )) - .ok(); - writeln!( - f, - "It is used as there are no Scopes defined for this method." - ) - } - Error::BadRequest(ref message) => { - writeln!(f, "Bad Request: {}", message)?; - Ok(()) - } - // TODO: Remove oauth2::Error - Error::MissingToken(ref err) => { - writeln!(f, "Token retrieval failed with error: {}", err) - } - Error::Cancelled => writeln!(f, "Operation cancelled by delegate"), - Error::FieldClash(field) => writeln!( - f, - "The custom parameter '{}' is already provided natively by the CallBuilder.", - field - ), - Error::JsonDecodeError(ref json_str, ref err) => writeln!(f, "{}: {}", err, json_str), - Error::Failure(ref response) => { - writeln!(f, "Http status indicates failure: {:?}", response) - } - } - } -} - -impl error::Error for Error { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match *self { - Error::HttpError(ref err) => err.source(), - Error::JsonDecodeError(_, ref err) => err.source(), - _ => None, - } - } -} - -impl From for Error { - fn from(err: std::io::Error) -> Self { - Error::Io(err) - } -} - -/// A universal result type used as return for all calls. -pub type Result = std::result::Result; - -/// Contains information about an API request. -pub struct MethodInfo { - pub id: &'static str, - pub http_method: Method, -} - -const BOUNDARY: &str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d"; - -/// Provides a `Read` interface that converts multiple parts into the protocol -/// identified by [RFC2387](https://tools.ietf.org/html/rfc2387). -/// **Note**: This implementation is just as rich as it needs to be to perform uploads -/// to google APIs, and might not be a fully-featured implementation. -#[derive(Default)] -pub struct MultiPartReader<'a> { - raw_parts: Vec<(HeaderMap, &'a mut (dyn Read + Send))>, - current_part: Option<(Cursor>, &'a mut (dyn Read + Send))>, - last_part_boundary: Option>>, -} - -impl<'a> MultiPartReader<'a> { - /// Reserve memory for exactly the given amount of parts - pub fn reserve_exact(&mut self, cap: usize) { - self.raw_parts.reserve_exact(cap); - } - - /// Add a new part to the queue of parts to be read on the first `read` call. - /// - /// # Arguments - /// - /// `headers` - identifying the body of the part. It's similar to the header - /// in an ordinary single-part call, and should thus contain the - /// same information. - /// `reader` - a reader providing the part's body - /// `size` - the amount of bytes provided by the reader. It will be put onto the header as - /// content-size. - /// `mime` - It will be put onto the content type - pub fn add_part( - &mut self, - reader: &'a mut (dyn Read + Send), - size: u64, - mime_type: Mime, - ) -> &mut MultiPartReader<'a> { - let mut headers = HeaderMap::new(); - headers.insert( - CONTENT_TYPE, - hyper::header::HeaderValue::from_str(&format!("{}", mime_type)).unwrap(), - ); - headers.insert(CONTENT_LENGTH, size.into()); - self.raw_parts.push((headers, reader)); - self - } - - /// Returns the mime-type representing our multi-part message. - /// Use it with the ContentType header. - pub fn mime_type(&self) -> Mime { - Mime( - TopLevel::Multipart, - SubLevel::Ext("related".to_string()), - vec![( - Attr::Ext("boundary".to_string()), - Value::Ext(BOUNDARY.to_string()), - )], - ) - } - - /// Returns true if we are totally used - fn is_depleted(&self) -> bool { - self.raw_parts.is_empty() - && self.current_part.is_none() - && self.last_part_boundary.is_none() - } - - /// Returns true if we are handling our last part - fn is_last_part(&self) -> bool { - self.raw_parts.is_empty() && self.current_part.is_some() - } -} - -impl<'a> Read for MultiPartReader<'a> { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match ( - self.raw_parts.len(), - self.current_part.is_none(), - self.last_part_boundary.is_none(), - ) { - (_, _, false) => { - let br = self - .last_part_boundary - .as_mut() - .unwrap() - .read(buf) - .unwrap_or(0); - if br < buf.len() { - self.last_part_boundary = None; - } - return Ok(br); - } - (0, true, true) => return Ok(0), - (n, true, _) if n > 0 => { - let (headers, reader) = self.raw_parts.remove(0); - let mut c = Cursor::new(Vec::::new()); - // TODO: The first line ending should be omitted for the first part, - // fortunately Google's API serves don't seem to mind. - (write!( - &mut c, - "{}--{}{}{}{}{}", - LINE_ENDING, - BOUNDARY, - LINE_ENDING, - headers - .iter() - .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap())) - .join(LINE_ENDING), - LINE_ENDING, - LINE_ENDING, - )) - .unwrap(); - c.seek(SeekFrom::Start(0)).unwrap(); - self.current_part = Some((c, reader)); - } - _ => {} - } - - // read headers as long as possible - let (hb, rr) = { - let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap(); - let b = c.read(buf).unwrap_or(0); - (b, reader.read(&mut buf[b..])) - }; - - match rr { - Ok(bytes_read) => { - if hb < buf.len() && bytes_read == 0 { - if self.is_last_part() { - // before clearing the last part, we will add the boundary that - // will be written last - self.last_part_boundary = Some(Cursor::new( - format!("{}--{}--{}", LINE_ENDING, BOUNDARY, LINE_ENDING).into_bytes(), - )) - } - // We are depleted - this can trigger the next part to come in - self.current_part = None; - } - let mut total_bytes_read = hb + bytes_read; - while total_bytes_read < buf.len() && !self.is_depleted() { - match self.read(&mut buf[total_bytes_read..]) { - Ok(br) => total_bytes_read += br, - Err(err) => return Err(err), - } - } - Ok(total_bytes_read) - } - Err(err) => { - // fail permanently - self.current_part = None; - self.last_part_boundary = None; - self.raw_parts.clear(); - Err(err) - } - } - } -} - -/// The `X-Upload-Content-Type` header. -/// -/// Generated via rustc --pretty expanded -Z unstable-options, and manually -/// processed to be more readable. -#[derive(PartialEq, Debug, Clone)] -pub struct XUploadContentType(pub Mime); - -impl ::std::ops::Deref for XUploadContentType { - type Target = Mime; - fn deref(&self) -> &Mime { - &self.0 - } -} -impl ::std::ops::DerefMut for XUploadContentType { - fn deref_mut(&mut self) -> &mut Mime { - &mut self.0 - } -} -impl Display for XUploadContentType { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Display::fmt(&**self, f) - } -} - -#[derive(Clone, PartialEq, Debug)] -pub struct Chunk { - pub first: u64, - pub last: u64, -} - -impl fmt::Display for Chunk { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - (write!(fmt, "{}-{}", self.first, self.last)).ok(); - Ok(()) - } -} - -impl FromStr for Chunk { - type Err = &'static str; - - /// NOTE: only implements `%i-%i`, not `*` - fn from_str(s: &str) -> std::result::Result { - let parts: Vec<&str> = s.split('-').collect(); - if parts.len() != 2 { - return Err("Expected two parts: %i-%i"); - } - Ok(Chunk { - first: match FromStr::from_str(parts[0]) { - Ok(d) => d, - _ => return Err("Couldn't parse 'first' as digit"), - }, - last: match FromStr::from_str(parts[1]) { - Ok(d) => d, - _ => return Err("Couldn't parse 'last' as digit"), - }, - }) - } -} - -/// Implements the Content-Range header, for serialization only -#[derive(Clone, PartialEq, Debug)] -pub struct ContentRange { - pub range: Option, - pub total_length: u64, -} - -impl ContentRange { - pub fn header_value(&self) -> String { - format!( - "bytes {}/{}", - match self.range { - Some(ref c) => format!("{}", c), - None => "*".to_string(), - }, - self.total_length - ) - } -} - -#[derive(Clone, PartialEq, Debug)] -pub struct RangeResponseHeader(pub Chunk); - -impl RangeResponseHeader { - fn from_bytes(raw: &[u8]) -> Self { - if !raw.is_empty() { - if let Ok(s) = std::str::from_utf8(raw) { - const PREFIX: &str = "bytes "; - if let Some(stripped) = s.strip_prefix(PREFIX) { - if let Ok(c) = ::from_str(&stripped) { - return RangeResponseHeader(c); - } - } - } - } - - panic!("Unable to parse Range header {:?}", raw) - } -} - -/// A utility type to perform a resumable upload from start to end. -pub struct ResumableUploadHelper<'a, A: 'a, S> -where - S: tower_service::Service + Clone + Send + Sync + 'static, - S::Response: hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, - S::Future: Send + Unpin + 'static, - S::Error: Into>, -{ - pub client: &'a hyper::client::Client< - S, - hyper::body::Body, - >, - pub delegate: &'a mut dyn Delegate, - pub start_at: Option, - pub auth: &'a A, - pub user_agent: &'a str, - pub auth_header: String, - pub url: &'a str, - pub reader: &'a mut dyn ReadSeek, - pub media_type: Mime, - pub content_length: u64, -} - -impl<'a, A, S> ResumableUploadHelper<'a, A, S> -where - S: tower_service::Service + Clone + Send + Sync + 'static, - S::Response: hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, - S::Future: Send + Unpin + 'static, - S::Error: Into>, -{ - async fn query_transfer_status( - &mut self, - ) -> std::result::Result>> { - loop { - match self - .client - .request( - hyper::Request::builder() - .method(hyper::Method::POST) - .uri(self.url) - .header(USER_AGENT, self.user_agent.to_string()) - .header( - "Content-Range", - ContentRange { - range: None, - total_length: self.content_length, - } - .header_value(), - ) - .header(AUTHORIZATION, self.auth_header.clone()) - .body(hyper::body::Body::empty()) - .unwrap(), - ) - .await - { - Ok(r) => { - // 308 = resume-incomplete == PermanentRedirect - let headers = r.headers().clone(); - let h: RangeResponseHeader = match headers.get("Range") { - Some(hh) if r.status() == StatusCode::PERMANENT_REDIRECT => { - RangeResponseHeader::from_bytes(hh.as_bytes()) - } - None | Some(_) => { - if let Retry::After(d) = self.delegate.http_failure(&r, None) { - sleep(d); - continue; - } - return Err(Ok(r)); - } - }; - return Ok(h.0.last); - } - Err(err) => { - if let Retry::After(d) = self.delegate.http_error(&err) { - sleep(d); - continue; - } - return Err(Err(err)); - } - } - } - } - - /// returns None if operation was cancelled by delegate, or the HttpResult. - /// It can be that we return the result just because we didn't understand the status code - - /// caller should check for status himself before assuming it's OK to use - pub async fn upload(&mut self) -> Option>> { - let mut start = match self.start_at { - Some(s) => s, - None => match self.query_transfer_status().await { - Ok(s) => s, - Err(result) => return Some(result), - }, - }; - - const MIN_CHUNK_SIZE: u64 = 1 << 18; - let chunk_size = match self.delegate.chunk_size() { - cs if cs > MIN_CHUNK_SIZE => cs, - _ => MIN_CHUNK_SIZE, - }; - - loop { - self.reader.seek(SeekFrom::Start(start)).unwrap(); - - let request_size = match self.content_length - start { - rs if rs > chunk_size => chunk_size, - rs => rs, - }; - - let mut section_reader = self.reader.take(request_size); - let mut req_bytes = vec![]; - section_reader.read_to_end(&mut req_bytes).unwrap(); - let range_header = ContentRange { - range: Some(Chunk { - first: start, - last: start + request_size - 1, - }), - total_length: self.content_length, - }; - if self.delegate.cancel_chunk_upload(&range_header) { - return None; - } - let res = self - .client - .request( - hyper::Request::builder() - .uri(self.url) - .method(hyper::Method::POST) - .header("Content-Range", range_header.header_value()) - .header(CONTENT_TYPE, format!("{}", self.media_type)) - .header(USER_AGENT, self.user_agent.to_string()) - .body(hyper::body::Body::from(req_bytes)) - .unwrap(), - ) - .await; - match res { - Ok(res) => { - start += request_size; - - if res.status() == StatusCode::PERMANENT_REDIRECT { - continue; - } - - let (res_parts, res_body) = res.into_parts(); - let res_body = match hyper::body::to_bytes(res_body).await { - Ok(res_body) => res_body.into_iter().collect(), - Err(err) => return Some(Err(err)), - }; - let res_body_string: String = String::from_utf8(res_body).unwrap(); - let reconstructed_result = - hyper::Response::from_parts(res_parts, res_body_string.clone().into()); - - if !reconstructed_result.status().is_success() { - if let Retry::After(d) = self.delegate.http_failure( - &reconstructed_result, - json::from_str(&res_body_string).ok(), - ) { - sleep(d); - continue; - } - } - return Some(Ok(reconstructed_result)); - } - Err(err) => { - if let Retry::After(d) = self.delegate.http_error(&err) { - sleep(d); - continue; - } - return Some(Err(err)); - } - } - } - } -} - -// Copy of src/rust/cli/client.rs -// TODO(ST): Allow sharing common code between program types -pub fn remove_json_null_values(value: &mut json::value::Value) { - match *value { - json::value::Value::Object(ref mut map) => { - let mut for_removal = Vec::new(); - - for (key, mut value) in map.iter_mut() { - if value.is_null() { - for_removal.push(key.clone()); - } else { - remove_json_null_values(&mut value); - } - } - - for key in &for_removal { - map.remove(key); - } - } - json::value::Value::Array(ref mut arr) => { - let mut i = 0; - while i < arr.len() { - if arr[i].is_null() { - arr.remove(i); - } else { - remove_json_null_values(&mut arr[i]); - i += 1; - } - } - } - _ => {} - } -} - -// Borrowing the body object as mutable and converts it to a string -pub async fn get_body_as_string(res_body: &mut hyper::Body) -> String { - let res_body_buf = hyper::body::to_bytes(res_body).await.unwrap(); - let res_body_string = String::from_utf8_lossy(&res_body_buf); - res_body_string.to_string() -} - -// TODO: Simplify this to Option -type TokenResult = std::result::Result, oauth2::Error>; - -pub trait GetToken: GetTokenClone { - /// Called whenever there is the need for an oauth token after - /// the official authenticator implementation didn't provide one, for some reason. - /// If this method returns None as well, the underlying operation will fail - fn get_token<'a>(&'a self, _scopes: &'a [&str]) -> Pin + 'a>> { - Box::pin(async move { Ok(None) }) - } -} - -pub trait GetTokenClone { - fn clone_box(&self) -> Box; -} - -impl GetTokenClone for T -where - T: 'static + GetToken + Clone, -{ - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_box() - } -} - -impl GetToken for String { - fn get_token<'a>(&'a self, _scopes: &'a [&str]) -> Pin + 'a>> { - Box::pin(async move { Ok(Some(self.clone())) }) - } -} - -/// In the event that the API endpoint does not require an oauth2 token, `NoToken` should be provided to the hub to avoid specifying an -/// authenticator. -#[derive(Default, Clone)] -pub struct NoToken; - -impl GetToken for NoToken {} - -// TODO: Make this optional -// #[cfg(feature = "yup-oauth2")] -mod yup_oauth2_impl { - use core::future::Future; - use core::pin::Pin; - - use super::{GetToken, TokenResult}; - - use tower_service::Service; - use yup_oauth2::authenticator::Authenticator; - use tokio::io::{AsyncRead, AsyncWrite}; - use http::Uri; - use hyper::client::connect::Connection; - - - impl GetToken for Authenticator where - S: Service + Clone + Send + Sync + 'static, - S::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, - S::Future: Send + Unpin + 'static, - S::Error: Into> { - fn get_token<'a>(&'a self, scopes: &'a [&str]) -> Pin + 'a>> { - Box::pin(async move { - self.token(scopes).await.map(|t| Some(t.as_str().to_owned())) - }) - } - } -} +//! This file serves on purpose and is an artifact of the buildsystem. +//! Its content can now be found in the `google-apis-common` crate. \ No newline at end of file diff --git a/src/rust/api/mod.rs b/src/rust/api/mod.rs deleted file mode 100644 index b9babe5bc1..0000000000 --- a/src/rust/api/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod client; diff --git a/src/rust/lib.rs b/src/rust/lib.rs index 378d92243c..185b0c27b7 100644 --- a/src/rust/lib.rs +++ b/src/rust/lib.rs @@ -21,13 +21,13 @@ extern crate serde_derive; extern crate strsim; // just pull it in the check if it compiles -mod api; mod cli; +use google_apis_common as api; /// This module is for testing only, its code is used in mako templates #[cfg(test)] mod test_api { - use super::api::client::*; + use super::api::*; use hyper; use std::default::Default; use std::io::Read;