diff --git a/gen/groupsmigration1/src/cmn.rs b/gen/groupsmigration1/src/cmn.rs index 01becfcb71..ffcc9b3fd1 100644 --- a/gen/groupsmigration1/src/cmn.rs +++ b/gen/groupsmigration1/src/cmn.rs @@ -3,14 +3,18 @@ use std::marker::MarkerTrait; use std::io::{self, Read, Seek, Cursor, Write, SeekFrom}; use std; +use std::fmt::{self, Display}; +use std::str::FromStr; +use std::thread::sleep; use mime::{Mime, TopLevel, SubLevel, Attr, Value}; -use oauth2; -use oauth2::TokenType; +use oauth2::{TokenType, Retry, self}; use hyper; -use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization}; +use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization, Header, + HeaderFormat}; use hyper::http::LINE_ENDING; use hyper::method::Method; +use hyper::status::StatusCode; use serde; @@ -108,8 +112,8 @@ pub trait Delegate { /// Called whenever there is an [HttpError](http://hyperium.github.io/hyper/hyper/error/enum.HttpError.html), usually if there are network problems. /// /// Return retry information. - fn http_error(&mut self, &hyper::HttpError) -> oauth2::Retry { - oauth2::Retry::Abort + fn http_error(&mut self, &hyper::HttpError) -> Retry { + Retry::Abort } /// Called whenever there is the need for your applications API key after @@ -162,8 +166,8 @@ pub trait Delegate { /// depends on the used API method. /// The delegate should check the status, header and decoded json error to decide /// whether to retry or not. In the latter case, the underlying call will fail. - fn http_failure(&mut self, _: &hyper::client::Response, JsonServerError) -> oauth2::Retry { - oauth2::Retry::Abort + fn http_failure(&mut self, _: &hyper::client::Response, Option) -> Retry { + Retry::Abort } /// Called prior to sending the main request of the given method. It can be used to time @@ -363,35 +367,168 @@ impl_header!(XUploadContentType, "X-Upload-Content-Type", Mime); +#[derive(Clone, PartialEq, Debug)] +pub struct Chunk { + pub first: u64, + pub last: u64 +} + +impl fmt::Display for Chunk { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}-{}", self.first, self.last).ok(); + Ok(()) + } +} + +impl FromStr for Chunk { + type Err = &'static str; + + /// NOTE: only implements `%i-%i`, not `*` + fn from_str(s: &str) -> std::result::Result { + let parts: Vec<&str> = s.split('-').collect(); + if parts.len() != 2 { + return Err("Expected two parts: %i-%i") + } + Ok( + Chunk { + first: match FromStr::from_str(parts[0]) { + Ok(d) => d, + _ => return Err("Couldn't parse 'first' as digit") + }, + last: match FromStr::from_str(parts[1]) { + Ok(d) => d, + _ => return Err("Couldn't parse 'last' as digit") + } + } + ) + } +} + +/// Implements the Content-Range header, for serialization only +#[derive(Clone, PartialEq, Debug)] +pub struct ContentRange { + pub range: Option, + pub total_length: u64, +} + +impl Header for ContentRange { + fn header_name() -> &'static str { + "Content-Range" + } + + /// We are not parsable, as parsing is done by the `Range` header + fn parse_header(raw: &[Vec]) -> Option { + None + } +} + + +impl HeaderFormat for ContentRange { + fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + try!(fmt.write_str("bytes ")); + match self.range { + Some(ref c) => try!(c.fmt(fmt)), + None => try!(fmt.write_str("*")) + } + write!(fmt, "/{}", self.total_length).ok(); + Ok(()) + } +} + +#[derive(Clone, PartialEq, Debug)] +pub struct RangeResponseHeader(pub Chunk); + +impl Header for RangeResponseHeader { + fn header_name() -> &'static str { + "Range" + } + + fn parse_header(raw: &[Vec]) -> Option { + match raw { + [ref v] => { + if let Ok(s) = std::str::from_utf8(v) { + const PREFIX: &'static str = "bytes="; + if s.starts_with(PREFIX) { + let c: Chunk = match FromStr::from_str(&s[PREFIX.len()..]) { + Ok(c) => c, + _ => return None + }; + return Some(RangeResponseHeader(c)) + } + } + None + }, + _ => None + } + } +} + +impl HeaderFormat for RangeResponseHeader { + /// No implmentation necessary, we just need to parse + fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + Err(fmt::Error) + } +} + /// A utility type to perform a resumable upload from start to end. pub struct ResumableUploadHelper<'a, NC: 'a, A: 'a> { pub client: &'a mut hyper::client::Client, pub delegate: &'a mut Delegate, + pub start_at: Option, pub auth: &'a mut A, pub user_agent: &'a str, pub auth_header: Authorization, pub url: &'a str, pub reader: &'a mut ReadSeek, pub media_type: Mime, - pub content_size: u64 + pub content_length: u64 } impl<'a, NC, A> ResumableUploadHelper<'a, NC, A> where NC: hyper::net::NetworkConnector, A: oauth2::GetToken { - fn query_transfer_status(&'a mut self) -> (u64, hyper::HttpResult) { - self.client.post(self.url) - .header(UserAgent(self.user_agent.to_string())) - .header(self.auth_header.clone()); - (0, Err(hyper::error::HttpError::HttpStatusError)) + fn query_transfer_status(&'a mut self) -> (Option, hyper::HttpResult) { + loop { + match self.client.post(self.url) + .header(UserAgent(self.user_agent.to_string())) + .header(ContentRange { range: None, total_length: self.content_length }) + .header(self.auth_header.clone()) + .send() { + Ok(r) => { + // 308 = resume-incomplete == PermanentRedirect + let headers = r.headers.clone(); + let h: &RangeResponseHeader = match headers.get() { + Some(hh) if r.status == StatusCode::PermanentRedirect => hh, + None|Some(_) => { + if let Retry::After(d) = self.delegate.http_failure(&r, None) { + sleep(d); + continue; + } + return (None, Ok(r)) + } + }; + return (Some(h.0.last), Ok(r)) + } + Err(err) => { + if let Retry::After(d) = self.delegate.http_error(&err) { + sleep(d); + continue; + } + return (None, Err(err)) + } + } + } } pub fn upload(&'a mut self) -> hyper::HttpResult { - let (start, result) = self.query_transfer_status(); - if let Err(_) = result { - return result - } + let start = match self.start_at { + Some(s) => s, + None => match self.query_transfer_status() { + (Some(s), _) => s, + (_, result) => return result + } + }; Err(hyper::error::HttpError::HttpStatusError) } } \ No newline at end of file diff --git a/gen/groupsmigration1/src/lib.rs b/gen/groupsmigration1/src/lib.rs index f1728dd762..55580b306b 100644 --- a/gen/groupsmigration1/src/lib.rs +++ b/gen/groupsmigration1/src/lib.rs @@ -585,7 +585,7 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network let mut json_err = String::new(); res.read_to_string(&mut json_err).unwrap(); let error_info: cmn::JsonServerError = json::from_str(&json_err).unwrap(); - if let oauth2::Retry::After(d) = dlg.http_failure(&res, error_info) { + if let oauth2::Retry::After(d) = dlg.http_failure(&res, Some(error_info)) { sleep(d); continue; } @@ -608,13 +608,14 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network cmn::ResumableUploadHelper { client: &mut client.borrow_mut(), delegate: dlg, + start_at: if upload_url_from_server { Some(0) } else { None }, auth: &mut *self.hub.auth.borrow_mut(), user_agent: &self.hub._user_agent, auth_header: auth_header.clone(), url: url, reader: &mut reader, media_type: reader_mime_type.clone(), - content_size: size + content_length: size }.upload() }; match upload_result { diff --git a/src/mako/lib/mbuild.mako b/src/mako/lib/mbuild.mako index 62c5d5887d..dbed1da986 100644 --- a/src/mako/lib/mbuild.mako +++ b/src/mako/lib/mbuild.mako @@ -765,7 +765,7 @@ else { let mut json_err = String::new(); res.read_to_string(&mut json_err).unwrap(); let error_info: cmn::JsonServerError = json::from_str(&json_err).unwrap(); - if let oauth2::Retry::After(d) = dlg.http_failure(&res, error_info) { + if let oauth2::Retry::After(d) = dlg.http_failure(&res, Some(error_info)) { sleep(d); continue; } @@ -785,6 +785,7 @@ else { cmn::ResumableUploadHelper { client: &mut client.borrow_mut(), delegate: dlg, + start_at: if upload_url_from_server { Some(0) } else { None }, auth: &mut *self.hub.auth.borrow_mut(), user_agent: &self.hub._user_agent, auth_header: auth_header.clone(), diff --git a/src/rust/cmn.rs b/src/rust/cmn.rs index 6d6d061e46..3243f2c5d1 100644 --- a/src/rust/cmn.rs +++ b/src/rust/cmn.rs @@ -1,17 +1,18 @@ use std::marker::MarkerTrait; use std::io::{self, Read, Seek, Cursor, Write, SeekFrom}; use std; -use std::fmt; +use std::fmt::{self, Display}; use std::str::FromStr; +use std::thread::sleep; use mime::{Mime, TopLevel, SubLevel, Attr, Value}; -use oauth2; -use oauth2::TokenType; +use oauth2::{TokenType, Retry, self}; use hyper; use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization, Header, HeaderFormat}; use hyper::http::LINE_ENDING; use hyper::method::Method; +use hyper::status::StatusCode; use serde; @@ -109,8 +110,8 @@ pub trait Delegate { /// Called whenever there is an [HttpError](http://hyperium.github.io/hyper/hyper/error/enum.HttpError.html), usually if there are network problems. /// /// Return retry information. - fn http_error(&mut self, &hyper::HttpError) -> oauth2::Retry { - oauth2::Retry::Abort + fn http_error(&mut self, &hyper::HttpError) -> Retry { + Retry::Abort } /// Called whenever there is the need for your applications API key after @@ -163,8 +164,8 @@ pub trait Delegate { /// depends on the used API method. /// The delegate should check the status, header and decoded json error to decide /// whether to retry or not. In the latter case, the underlying call will fail. - fn http_failure(&mut self, _: &hyper::client::Response, JsonServerError) -> oauth2::Retry { - oauth2::Retry::Abort + fn http_failure(&mut self, _: &hyper::client::Response, Option) -> Retry { + Retry::Abort } /// Called prior to sending the main request of the given method. It can be used to time @@ -365,41 +366,38 @@ impl_header!(XUploadContentType, Mime); #[derive(Clone, PartialEq, Debug)] -pub enum ByteRange { - Any, - Chunk(u64, u64) +pub struct Chunk { + pub first: u64, + pub last: u64 } -impl fmt::Display for ByteRange { +impl fmt::Display for Chunk { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match *self { - ByteRange::Any => fmt.write_str("*").ok(), - ByteRange::Chunk(first, last) => write!(fmt, "{}-{}", first, last).ok() - }; + write!(fmt, "{}-{}", self.first, self.last).ok(); Ok(()) } } -impl FromStr for ByteRange { +impl FromStr for Chunk { type Err = &'static str; /// NOTE: only implements `%i-%i`, not `*` - fn from_str(s: &str) -> std::result::Result { + fn from_str(s: &str) -> std::result::Result { let parts: Vec<&str> = s.split('-').collect(); if parts.len() != 2 { return Err("Expected two parts: %i-%i") } Ok( - ByteRange::Chunk( - match FromStr::from_str(parts[0]) { + Chunk { + first: match FromStr::from_str(parts[0]) { Ok(d) => d, _ => return Err("Couldn't parse 'first' as digit") }, - match FromStr::from_str(parts[1]) { + last: match FromStr::from_str(parts[1]) { Ok(d) => d, _ => return Err("Couldn't parse 'last' as digit") } - ) + } ) } } @@ -407,7 +405,7 @@ impl FromStr for ByteRange { /// Implements the Content-Range header, for serialization only #[derive(Clone, PartialEq, Debug)] pub struct ContentRange { - pub range: ByteRange, + pub range: Option, pub total_length: u64, } @@ -425,13 +423,18 @@ impl Header for ContentRange { impl HeaderFormat for ContentRange { fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "bytes {}/{}", self.range, self.total_length).ok(); + try!(fmt.write_str("bytes ")); + match self.range { + Some(ref c) => try!(c.fmt(fmt)), + None => try!(fmt.write_str("*")) + } + write!(fmt, "/{}", self.total_length).ok(); Ok(()) } } #[derive(Clone, PartialEq, Debug)] -pub struct RangeResponseHeader(pub ByteRange); +pub struct RangeResponseHeader(pub Chunk); impl Header for RangeResponseHeader { fn header_name() -> &'static str { @@ -442,13 +445,13 @@ impl Header for RangeResponseHeader { match raw { [ref v] => { if let Ok(s) = std::str::from_utf8(v) { - if s.starts_with("bytes=") { - return Some(RangeResponseHeader( - match FromStr::from_str(&s[6..]) { - Ok(br) => br, - _ => return None - } - )) + const PREFIX: &'static str = "bytes="; + if s.starts_with(PREFIX) { + let c: Chunk = match FromStr::from_str(&s[PREFIX.len()..]) { + Ok(c) => c, + _ => return None + }; + return Some(RangeResponseHeader(c)) } } None @@ -469,6 +472,7 @@ impl HeaderFormat for RangeResponseHeader { pub struct ResumableUploadHelper<'a, NC: 'a, A: 'a> { pub client: &'a mut hyper::client::Client, pub delegate: &'a mut Delegate, + pub start_at: Option, pub auth: &'a mut A, pub user_agent: &'a str, pub auth_header: Authorization, @@ -482,19 +486,47 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A> where NC: hyper::net::NetworkConnector, A: oauth2::GetToken { - fn query_transfer_status(&'a mut self) -> (u64, hyper::HttpResult) { - self.client.post(self.url) - .header(UserAgent(self.user_agent.to_string())) - .header(ContentRange { range: ByteRange::Any, total_length: self.content_length } ) - .header(self.auth_header.clone()); - (0, Err(hyper::error::HttpError::HttpStatusError)) + fn query_transfer_status(&'a mut self) -> (Option, hyper::HttpResult) { + loop { + match self.client.post(self.url) + .header(UserAgent(self.user_agent.to_string())) + .header(ContentRange { range: None, total_length: self.content_length }) + .header(self.auth_header.clone()) + .send() { + Ok(r) => { + // 308 = resume-incomplete == PermanentRedirect + let headers = r.headers.clone(); + let h: &RangeResponseHeader = match headers.get() { + Some(hh) if r.status == StatusCode::PermanentRedirect => hh, + None|Some(_) => { + if let Retry::After(d) = self.delegate.http_failure(&r, None) { + sleep(d); + continue; + } + return (None, Ok(r)) + } + }; + return (Some(h.0.last), Ok(r)) + } + Err(err) => { + if let Retry::After(d) = self.delegate.http_error(&err) { + sleep(d); + continue; + } + return (None, Err(err)) + } + } + } } pub fn upload(&'a mut self) -> hyper::HttpResult { - let (start, result) = self.query_transfer_status(); - if let Err(_) = result { - return result - } + let start = match self.start_at { + Some(s) => s, + None => match self.query_transfer_status() { + (Some(s), _) => s, + (_, result) => return result + } + }; Err(hyper::error::HttpError::HttpStatusError) } } \ No newline at end of file diff --git a/src/rust/lib.rs b/src/rust/lib.rs index c139cd3377..6906bb37a5 100644 --- a/src/rust/lib.rs +++ b/src/rust/lib.rs @@ -128,8 +128,8 @@ bar\r\n\ #[test] fn content_range() { for &(ref c, ref expected) in - &[(ContentRange {range: ByteRange::Any, total_length: 50 }, "Content-Range: bytes */50\r\n"), - (ContentRange {range: ByteRange::Chunk(23, 40), total_length: 45}, + &[(ContentRange {range: None, total_length: 50 }, "Content-Range: bytes */50\r\n"), + (ContentRange {range: Some(Chunk { first: 23, last: 40 }), total_length: 45}, "Content-Range: bytes 23-40/45\r\n")] { let mut headers = hyper::header::Headers::new(); headers.set(c.clone()); @@ -139,20 +139,14 @@ bar\r\n\ #[test] fn byte_range_from_str() { - assert_eq!(::from_str("2-42"), - Ok(ByteRange::Chunk(2, 42))) + assert_eq!(::from_str("2-42"), + Ok(Chunk { first: 2, last: 42 })) } #[test] fn parse_range_response() { let r: RangeResponseHeader = hyper::header::Header::parse_header(&[b"bytes=2-42".to_vec()]).unwrap(); - - match r.0 { - ByteRange::Chunk(f, l) => { - assert_eq!(f, 2); - assert_eq!(l, 42); - } - _ => unreachable!() - } + assert_eq!(r.0.first, 2); + assert_eq!(r.0.last, 42); } } \ No newline at end of file