feat(cmn): implement query_transfer_status()

The delegate logic is implemented and seems sound.
It's somewhat funny that after all this back and forth, all we get
is a valid start position for the upload.
This commit is contained in:
Sebastian Thiel
2015-03-22 17:58:34 +01:00
parent 42a76e4655
commit 065753cc3a
5 changed files with 238 additions and 73 deletions

View File

@@ -3,14 +3,18 @@
use std::marker::MarkerTrait;
use std::io::{self, Read, Seek, Cursor, Write, SeekFrom};
use std;
use std::fmt::{self, Display};
use std::str::FromStr;
use std::thread::sleep;
use mime::{Mime, TopLevel, SubLevel, Attr, Value};
use oauth2;
use oauth2::TokenType;
use oauth2::{TokenType, Retry, self};
use hyper;
use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization};
use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization, Header,
HeaderFormat};
use hyper::http::LINE_ENDING;
use hyper::method::Method;
use hyper::status::StatusCode;
use serde;
@@ -108,8 +112,8 @@ pub trait Delegate {
/// Called whenever there is an [HttpError](http://hyperium.github.io/hyper/hyper/error/enum.HttpError.html), usually if there are network problems.
///
/// Return retry information.
fn http_error(&mut self, &hyper::HttpError) -> oauth2::Retry {
oauth2::Retry::Abort
fn http_error(&mut self, &hyper::HttpError) -> Retry {
Retry::Abort
}
/// Called whenever there is the need for your applications API key after
@@ -162,8 +166,8 @@ pub trait Delegate {
/// depends on the used API method.
/// The delegate should check the status, header and decoded json error to decide
/// whether to retry or not. In the latter case, the underlying call will fail.
fn http_failure(&mut self, _: &hyper::client::Response, JsonServerError) -> oauth2::Retry {
oauth2::Retry::Abort
fn http_failure(&mut self, _: &hyper::client::Response, Option<JsonServerError>) -> Retry {
Retry::Abort
}
/// Called prior to sending the main request of the given method. It can be used to time
@@ -363,35 +367,168 @@ impl_header!(XUploadContentType,
"X-Upload-Content-Type",
Mime);
#[derive(Clone, PartialEq, Debug)]
pub struct Chunk {
pub first: u64,
pub last: u64
}
impl fmt::Display for Chunk {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}-{}", self.first, self.last).ok();
Ok(())
}
}
impl FromStr for Chunk {
type Err = &'static str;
/// NOTE: only implements `%i-%i`, not `*`
fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() != 2 {
return Err("Expected two parts: %i-%i")
}
Ok(
Chunk {
first: match FromStr::from_str(parts[0]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'first' as digit")
},
last: match FromStr::from_str(parts[1]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'last' as digit")
}
}
)
}
}
/// Implements the Content-Range header, for serialization only
#[derive(Clone, PartialEq, Debug)]
pub struct ContentRange {
pub range: Option<Chunk>,
pub total_length: u64,
}
impl Header for ContentRange {
fn header_name() -> &'static str {
"Content-Range"
}
/// We are not parsable, as parsing is done by the `Range` header
fn parse_header(raw: &[Vec<u8>]) -> Option<ContentRange> {
None
}
}
impl HeaderFormat for ContentRange {
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
try!(fmt.write_str("bytes "));
match self.range {
Some(ref c) => try!(c.fmt(fmt)),
None => try!(fmt.write_str("*"))
}
write!(fmt, "/{}", self.total_length).ok();
Ok(())
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct RangeResponseHeader(pub Chunk);
impl Header for RangeResponseHeader {
fn header_name() -> &'static str {
"Range"
}
fn parse_header(raw: &[Vec<u8>]) -> Option<RangeResponseHeader> {
match raw {
[ref v] => {
if let Ok(s) = std::str::from_utf8(v) {
const PREFIX: &'static str = "bytes=";
if s.starts_with(PREFIX) {
let c: Chunk = match FromStr::from_str(&s[PREFIX.len()..]) {
Ok(c) => c,
_ => return None
};
return Some(RangeResponseHeader(c))
}
}
None
},
_ => None
}
}
}
impl HeaderFormat for RangeResponseHeader {
/// No implmentation necessary, we just need to parse
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
Err(fmt::Error)
}
}
/// A utility type to perform a resumable upload from start to end.
pub struct ResumableUploadHelper<'a, NC: 'a, A: 'a> {
pub client: &'a mut hyper::client::Client<NC>,
pub delegate: &'a mut Delegate,
pub start_at: Option<u64>,
pub auth: &'a mut A,
pub user_agent: &'a str,
pub auth_header: Authorization<oauth2::Scheme>,
pub url: &'a str,
pub reader: &'a mut ReadSeek,
pub media_type: Mime,
pub content_size: u64
pub content_length: u64
}
impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
where NC: hyper::net::NetworkConnector,
A: oauth2::GetToken {
fn query_transfer_status(&'a mut self) -> (u64, hyper::HttpResult<hyper::client::Response>) {
self.client.post(self.url)
.header(UserAgent(self.user_agent.to_string()))
.header(self.auth_header.clone());
(0, Err(hyper::error::HttpError::HttpStatusError))
fn query_transfer_status(&'a mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
loop {
match self.client.post(self.url)
.header(UserAgent(self.user_agent.to_string()))
.header(ContentRange { range: None, total_length: self.content_length })
.header(self.auth_header.clone())
.send() {
Ok(r) => {
// 308 = resume-incomplete == PermanentRedirect
let headers = r.headers.clone();
let h: &RangeResponseHeader = match headers.get() {
Some(hh) if r.status == StatusCode::PermanentRedirect => hh,
None|Some(_) => {
if let Retry::After(d) = self.delegate.http_failure(&r, None) {
sleep(d);
continue;
}
return (None, Ok(r))
}
};
return (Some(h.0.last), Ok(r))
}
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep(d);
continue;
}
return (None, Err(err))
}
}
}
}
pub fn upload(&'a mut self) -> hyper::HttpResult<hyper::client::Response> {
let (start, result) = self.query_transfer_status();
if let Err(_) = result {
return result
}
let start = match self.start_at {
Some(s) => s,
None => match self.query_transfer_status() {
(Some(s), _) => s,
(_, result) => return result
}
};
Err(hyper::error::HttpError::HttpStatusError)
}
}

View File

@@ -585,7 +585,7 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network
let mut json_err = String::new();
res.read_to_string(&mut json_err).unwrap();
let error_info: cmn::JsonServerError = json::from_str(&json_err).unwrap();
if let oauth2::Retry::After(d) = dlg.http_failure(&res, error_info) {
if let oauth2::Retry::After(d) = dlg.http_failure(&res, Some(error_info)) {
sleep(d);
continue;
}
@@ -608,13 +608,14 @@ impl<'a, C, NC, A> ArchiveInsertCall<'a, C, NC, A> where NC: hyper::net::Network
cmn::ResumableUploadHelper {
client: &mut client.borrow_mut(),
delegate: dlg,
start_at: if upload_url_from_server { Some(0) } else { None },
auth: &mut *self.hub.auth.borrow_mut(),
user_agent: &self.hub._user_agent,
auth_header: auth_header.clone(),
url: url,
reader: &mut reader,
media_type: reader_mime_type.clone(),
content_size: size
content_length: size
}.upload()
};
match upload_result {

View File

@@ -765,7 +765,7 @@ else {
let mut json_err = String::new();
res.read_to_string(&mut json_err).unwrap();
let error_info: cmn::JsonServerError = json::from_str(&json_err).unwrap();
if let oauth2::Retry::After(d) = dlg.http_failure(&res, error_info) {
if let oauth2::Retry::After(d) = dlg.http_failure(&res, Some(error_info)) {
sleep(d);
continue;
}
@@ -785,6 +785,7 @@ else {
cmn::ResumableUploadHelper {
client: &mut client.borrow_mut(),
delegate: dlg,
start_at: if upload_url_from_server { Some(0) } else { None },
auth: &mut *self.hub.auth.borrow_mut(),
user_agent: &self.hub._user_agent,
auth_header: auth_header.clone(),

View File

@@ -1,17 +1,18 @@
use std::marker::MarkerTrait;
use std::io::{self, Read, Seek, Cursor, Write, SeekFrom};
use std;
use std::fmt;
use std::fmt::{self, Display};
use std::str::FromStr;
use std::thread::sleep;
use mime::{Mime, TopLevel, SubLevel, Attr, Value};
use oauth2;
use oauth2::TokenType;
use oauth2::{TokenType, Retry, self};
use hyper;
use hyper::header::{ContentType, ContentLength, Headers, UserAgent, Authorization, Header,
HeaderFormat};
use hyper::http::LINE_ENDING;
use hyper::method::Method;
use hyper::status::StatusCode;
use serde;
@@ -109,8 +110,8 @@ pub trait Delegate {
/// Called whenever there is an [HttpError](http://hyperium.github.io/hyper/hyper/error/enum.HttpError.html), usually if there are network problems.
///
/// Return retry information.
fn http_error(&mut self, &hyper::HttpError) -> oauth2::Retry {
oauth2::Retry::Abort
fn http_error(&mut self, &hyper::HttpError) -> Retry {
Retry::Abort
}
/// Called whenever there is the need for your applications API key after
@@ -163,8 +164,8 @@ pub trait Delegate {
/// depends on the used API method.
/// The delegate should check the status, header and decoded json error to decide
/// whether to retry or not. In the latter case, the underlying call will fail.
fn http_failure(&mut self, _: &hyper::client::Response, JsonServerError) -> oauth2::Retry {
oauth2::Retry::Abort
fn http_failure(&mut self, _: &hyper::client::Response, Option<JsonServerError>) -> Retry {
Retry::Abort
}
/// Called prior to sending the main request of the given method. It can be used to time
@@ -365,41 +366,38 @@ impl_header!(XUploadContentType,
Mime);
#[derive(Clone, PartialEq, Debug)]
pub enum ByteRange {
Any,
Chunk(u64, u64)
pub struct Chunk {
pub first: u64,
pub last: u64
}
impl fmt::Display for ByteRange {
impl fmt::Display for Chunk {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
ByteRange::Any => fmt.write_str("*").ok(),
ByteRange::Chunk(first, last) => write!(fmt, "{}-{}", first, last).ok()
};
write!(fmt, "{}-{}", self.first, self.last).ok();
Ok(())
}
}
impl FromStr for ByteRange {
impl FromStr for Chunk {
type Err = &'static str;
/// NOTE: only implements `%i-%i`, not `*`
fn from_str(s: &str) -> std::result::Result<ByteRange, &'static str> {
fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() != 2 {
return Err("Expected two parts: %i-%i")
}
Ok(
ByteRange::Chunk(
match FromStr::from_str(parts[0]) {
Chunk {
first: match FromStr::from_str(parts[0]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'first' as digit")
},
match FromStr::from_str(parts[1]) {
last: match FromStr::from_str(parts[1]) {
Ok(d) => d,
_ => return Err("Couldn't parse 'last' as digit")
}
)
}
)
}
}
@@ -407,7 +405,7 @@ impl FromStr for ByteRange {
/// Implements the Content-Range header, for serialization only
#[derive(Clone, PartialEq, Debug)]
pub struct ContentRange {
pub range: ByteRange,
pub range: Option<Chunk>,
pub total_length: u64,
}
@@ -425,13 +423,18 @@ impl Header for ContentRange {
impl HeaderFormat for ContentRange {
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "bytes {}/{}", self.range, self.total_length).ok();
try!(fmt.write_str("bytes "));
match self.range {
Some(ref c) => try!(c.fmt(fmt)),
None => try!(fmt.write_str("*"))
}
write!(fmt, "/{}", self.total_length).ok();
Ok(())
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct RangeResponseHeader(pub ByteRange);
pub struct RangeResponseHeader(pub Chunk);
impl Header for RangeResponseHeader {
fn header_name() -> &'static str {
@@ -442,13 +445,13 @@ impl Header for RangeResponseHeader {
match raw {
[ref v] => {
if let Ok(s) = std::str::from_utf8(v) {
if s.starts_with("bytes=") {
return Some(RangeResponseHeader(
match FromStr::from_str(&s[6..]) {
Ok(br) => br,
_ => return None
}
))
const PREFIX: &'static str = "bytes=";
if s.starts_with(PREFIX) {
let c: Chunk = match FromStr::from_str(&s[PREFIX.len()..]) {
Ok(c) => c,
_ => return None
};
return Some(RangeResponseHeader(c))
}
}
None
@@ -469,6 +472,7 @@ impl HeaderFormat for RangeResponseHeader {
pub struct ResumableUploadHelper<'a, NC: 'a, A: 'a> {
pub client: &'a mut hyper::client::Client<NC>,
pub delegate: &'a mut Delegate,
pub start_at: Option<u64>,
pub auth: &'a mut A,
pub user_agent: &'a str,
pub auth_header: Authorization<oauth2::Scheme>,
@@ -482,19 +486,47 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
where NC: hyper::net::NetworkConnector,
A: oauth2::GetToken {
fn query_transfer_status(&'a mut self) -> (u64, hyper::HttpResult<hyper::client::Response>) {
self.client.post(self.url)
.header(UserAgent(self.user_agent.to_string()))
.header(ContentRange { range: ByteRange::Any, total_length: self.content_length } )
.header(self.auth_header.clone());
(0, Err(hyper::error::HttpError::HttpStatusError))
fn query_transfer_status(&'a mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
loop {
match self.client.post(self.url)
.header(UserAgent(self.user_agent.to_string()))
.header(ContentRange { range: None, total_length: self.content_length })
.header(self.auth_header.clone())
.send() {
Ok(r) => {
// 308 = resume-incomplete == PermanentRedirect
let headers = r.headers.clone();
let h: &RangeResponseHeader = match headers.get() {
Some(hh) if r.status == StatusCode::PermanentRedirect => hh,
None|Some(_) => {
if let Retry::After(d) = self.delegate.http_failure(&r, None) {
sleep(d);
continue;
}
return (None, Ok(r))
}
};
return (Some(h.0.last), Ok(r))
}
Err(err) => {
if let Retry::After(d) = self.delegate.http_error(&err) {
sleep(d);
continue;
}
return (None, Err(err))
}
}
}
}
pub fn upload(&'a mut self) -> hyper::HttpResult<hyper::client::Response> {
let (start, result) = self.query_transfer_status();
if let Err(_) = result {
return result
}
let start = match self.start_at {
Some(s) => s,
None => match self.query_transfer_status() {
(Some(s), _) => s,
(_, result) => return result
}
};
Err(hyper::error::HttpError::HttpStatusError)
}
}

View File

@@ -128,8 +128,8 @@ bar\r\n\
#[test]
fn content_range() {
for &(ref c, ref expected) in
&[(ContentRange {range: ByteRange::Any, total_length: 50 }, "Content-Range: bytes */50\r\n"),
(ContentRange {range: ByteRange::Chunk(23, 40), total_length: 45},
&[(ContentRange {range: None, total_length: 50 }, "Content-Range: bytes */50\r\n"),
(ContentRange {range: Some(Chunk { first: 23, last: 40 }), total_length: 45},
"Content-Range: bytes 23-40/45\r\n")] {
let mut headers = hyper::header::Headers::new();
headers.set(c.clone());
@@ -139,20 +139,14 @@ bar\r\n\
#[test]
fn byte_range_from_str() {
assert_eq!(<ByteRange as FromStr>::from_str("2-42"),
Ok(ByteRange::Chunk(2, 42)))
assert_eq!(<Chunk as FromStr>::from_str("2-42"),
Ok(Chunk { first: 2, last: 42 }))
}
#[test]
fn parse_range_response() {
let r: RangeResponseHeader = hyper::header::Header::parse_header(&[b"bytes=2-42".to_vec()]).unwrap();
match r.0 {
ByteRange::Chunk(f, l) => {
assert_eq!(f, 2);
assert_eq!(l, 42);
}
_ => unreachable!()
}
assert_eq!(r.0.first, 2);
assert_eq!(r.0.last, 42);
}
}