feat(mbuild): resumable-upload infrastructure

Layout the `ResumableUploadHelper` and implement the entire logic
with the mbuild renerator.

All that's left to be done is to implement the 'chunked upload' method.

The borrow checker helped me to prevent a bug as well.
This commit is contained in:
Sebastian Thiel
2015-03-21 13:55:15 +01:00
parent 98f4bbab47
commit 307d3f487c
5 changed files with 755 additions and 351 deletions

View File

@@ -32,6 +32,7 @@ ${lib.docs(c)}
#![feature(custom_derive, custom_attribute, plugin)]
#![plugin(serde_macros)]
#[macro_use]
extern crate hyper;
extern crate serde;
extern crate "yup-oauth2" as oauth2;

View File

@@ -400,6 +400,7 @@ match result {
possible_urls = [m.path]
simple_media_param = None
resumable_media_param = None
if media_params:
stripped = lambda s: s.strip().strip(',')
qualifier = ''
@@ -410,6 +411,8 @@ match result {
possible_urls.append(p.path)
if p.protocol == 'simple':
simple_media_param = p
elif p.protocol == 'resumable':
resumable_media_param = p
# end for each param
where = ' where ' + stripped(where)
type_params = '<' + stripped(type_params) + '>'
@@ -655,7 +658,6 @@ else {
request_value_reader.seek(io::SeekFrom::Start(0)).unwrap();
% endif
let mut client = &mut *self.hub.client.borrow_mut();
loop {
% if supports_scopes(auth):
let mut token = ${auth_call}.token(self.${api.properties.scopes}.keys());
@@ -686,36 +688,46 @@ else {
};
% endif
let mut req = client.borrow_mut().request(${method_name_to_variant(m.httpMethod)}, url.as_slice())
.header(UserAgent(self.hub._user_agent.clone()))\
% if supports_scopes(auth):
let mut req_result = {
let mut client = &mut *self.hub.client.borrow_mut();
let mut req = client.borrow_mut().request(${method_name_to_variant(m.httpMethod)}, url.as_slice())
.header(UserAgent(self.hub._user_agent.clone()))\
% if supports_scopes(auth):
.header(auth_header)\
.header(auth_header)\
% endif
% if request_value:
% if not simple_media_param:
.header(ContentType(json_mime_type.clone()))
.header(ContentLength(request_size as u64))
.body(request_value_reader.into_body())\
% else:
.header(content_type)
.body(body_reader.into_body())\
% endif ## not simple_media_param
% endif
;
% if simple_media_param and not request_value:
if let Some(&mut (ref mut reader, ref mime)) = ${simple_media_param.type.arg_name}.as_mut() {
${READER_SEEK | indent_all_but_first_by(4)}
req = req.header(ContentType(mime.clone()))
.header(ContentLength(size))
.body(reader.into_body());
}
% endif ## media upload handling
% if resumable_media_param:
if let Some(&mut (_, ref mime)) = ${resumable_media_param.type.arg_name}.as_mut() {
req = req.header(cmn::XUploadContentType(mime.clone()));
}
% endif
% if request_value:
% if not simple_media_param:
.header(ContentType(json_mime_type.clone()))
.header(ContentLength(request_size as u64))
.body(request_value_reader.into_body())\
% else:
dlg.pre_request();
req.send()
};
.header(content_type)
.body(body_reader.into_body())\
% endif ## not simple_media_param
% endif
;
% if simple_media_param and not request_value:
if let Some(&mut (ref mut reader, ref mime)) = ${simple_media_param.type.arg_name}.as_mut() {
${READER_SEEK | indent_all_but_first_by(4)}
req = req.header(ContentType(mime.clone()))
.header(ContentLength(size))
.body(reader.into_body());
}
% endif ## media upload handling
dlg.pre_request();
match req.send() {
match req_result {
Err(err) => {
if let oauth2::Retry::After(d) = dlg.http_error(&err) {
sleep(d);
@@ -736,6 +748,30 @@ else {
${delegate_finish}
return Result::Failure(res)
}
% if resumable_media_param:
if let Some((ref mut reader, ref mime)) = ${resumable_media_param.type.arg_name} {
let request_size = reader.seek(io::SeekFrom::End(0)).unwrap();
reader.seek(io::SeekFrom::Start(0)).unwrap();
let mut client = &mut *self.hub.client.borrow_mut();
match (cmn::ResumableUploadHelper {
client: &mut client.borrow_mut(),
delegate: dlg,
url: &res.headers.get::<hyper::header::Location>().expect("Location header is part of protocol").0,
reader: reader,
media_type: mime.clone(),
content_size: request_size
}.upload()) {
Err(err) => {
## Do not ask the delgate again, as it was asked by the helper !
${delegate_finish}
return Result::HttpError(err)
}
## Now the result contains the actual resource, if any ... it will be
## decoded next
Ok(upload_result) => res = upload_result,
}
}
% endif
% if response_schema:
## If 'alt' is not json, we cannot attempt to decode the response
let result_value = \
@@ -760,6 +796,7 @@ if enable_resource_parsing \
% else:
let result_value = res;
% endif
${delegate_finish}
return Result::Success(result_value)
}

View File

@@ -290,4 +290,30 @@ impl<'a> Read for MultiPartReader<'a> {
}
}
}
}
/// The `X-Upload-Content-Type` header.
#[derive(Clone, PartialEq, Debug)]
pub struct XUploadContentType(pub Mime);
impl_header!(XUploadContentType,
"X-Upload-Content-Type",
Mime);
/// A utility type to perform a resumable upload from start to end.
pub struct ResumableUploadHelper<'a, NC: 'a> {
pub client: &'a mut hyper::client::Client<NC>,
pub delegate: &'a mut Delegate,
pub url: &'a str,
pub reader: &'a mut ReadSeek,
pub media_type: Mime,
pub content_size: u64
}
impl<'a, NC> ResumableUploadHelper<'a, NC>
where NC: hyper::net::NetworkConnector {
pub fn upload(&'a mut self) -> hyper::HttpResult<hyper::client::Response> {
Err(hyper::error::HttpError::HttpStatusError)
}
}