mirror of
https://github.com/OMGeeky/google-apis-rs.git
synced 2026-02-23 15:49:49 +01:00
feat(cmn): Resumable upload implemented
With all bells and whisles. For now, we don't have a good return value to indicate that the operation was cancelled, which needs fixing.
This commit is contained in:
@@ -27,7 +27,7 @@ ${lib.docs(c)}
|
||||
// We don't warn about this, as depending on the API, some data structures or facilities are never used.
|
||||
// Instead of pre-determining this, we just disable the lint. It's manually tuned to not have any
|
||||
// unused imports in fully featured APIs. Same with unused_mut ... .
|
||||
#![allow(unused_imports, unused_mut)]
|
||||
#![allow(unused_imports, unused_mut, dead_code)]
|
||||
// Required for serde annotations
|
||||
#![feature(custom_derive, custom_attribute, plugin)]
|
||||
#![plugin(serde_macros)]
|
||||
|
||||
@@ -478,10 +478,6 @@ match result {
|
||||
% if URL_ENCODE in special_cases:
|
||||
use url::{percent_encode, FORM_URLENCODED_ENCODE_SET};
|
||||
% endif
|
||||
## TODO: IntoBody is called explicilty, even though it should be working implicitly.
|
||||
## However, the compiler complains about
|
||||
## "the trait `core::marker::Sized` is not implemented for the type `std::io::Read`"
|
||||
use hyper::client::IntoBody;
|
||||
use std::io::{Read, Seek};
|
||||
use hyper::header::{ContentType, ContentLength, Authorization, UserAgent, Location};
|
||||
let mut dd = DefaultDelegate;
|
||||
@@ -720,11 +716,11 @@ else {
|
||||
|
||||
.header(ContentType(json_mime_type.clone()))
|
||||
.header(ContentLength(request_size as u64))
|
||||
.body(request_value_reader.into_body())\
|
||||
.body(&mut request_value_reader)\
|
||||
% else:
|
||||
|
||||
.header(content_type)
|
||||
.body(body_reader.into_body())\
|
||||
.body(&mut body_reader)\
|
||||
% endif ## not simple_media_param
|
||||
% endif
|
||||
;
|
||||
@@ -733,7 +729,7 @@ else {
|
||||
${READER_SEEK | indent_all_but_first_by(4)}
|
||||
req = req.header(ContentType(reader_mime_type.clone()))
|
||||
.header(ContentLength(size))
|
||||
.body(reader.into_body());
|
||||
.body(&mut reader);
|
||||
}
|
||||
% endif ## media upload handling
|
||||
% if resumable_media_param:
|
||||
|
||||
@@ -70,13 +70,13 @@ pub struct JsonServerError {
|
||||
pub struct DummyNetworkStream;
|
||||
|
||||
impl Read for DummyNetworkStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for DummyNetworkStream {
|
||||
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
|
||||
fn write(&mut self, _: &[u8]) -> io::Result<usize> {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
@@ -173,6 +173,21 @@ pub trait Delegate {
|
||||
/// It's also useful as you can be sure that a request will definitely be made.
|
||||
fn pre_request(&mut self) { }
|
||||
|
||||
/// Return the size of each chunk of a resumable upload.
|
||||
/// Must be a power of two, with 1<<18 being the smallest allowed chunk size.
|
||||
/// Will be called once before starting any resumable upload.
|
||||
fn chunk_size(&mut self) -> u64 {
|
||||
1 << 23
|
||||
}
|
||||
|
||||
/// Called before the given chunk is uploaded to the server.
|
||||
/// If true is returned, the upload will be interrupted.
|
||||
/// However, it may be resumable if you stored the upload URL in a previous call
|
||||
/// to `store_upload_url()`
|
||||
fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
|
||||
let _ = chunk;
|
||||
false
|
||||
}
|
||||
|
||||
/// Called before the API request method returns, in every case. It can be used to clean up
|
||||
/// internal state between calls to the API.
|
||||
@@ -415,7 +430,7 @@ impl Header for ContentRange {
|
||||
}
|
||||
|
||||
/// We are not parsable, as parsing is done by the `Range` header
|
||||
fn parse_header(raw: &[Vec<u8>]) -> Option<ContentRange> {
|
||||
fn parse_header(_: &[Vec<u8>]) -> Option<ContentRange> {
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -463,7 +478,7 @@ impl Header for RangeResponseHeader {
|
||||
|
||||
impl HeaderFormat for RangeResponseHeader {
|
||||
/// No implmentation necessary, we just need to parse
|
||||
fn fmt_header(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fn fmt_header(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
Err(fmt::Error)
|
||||
}
|
||||
}
|
||||
@@ -486,7 +501,7 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
|
||||
where NC: hyper::net::NetworkConnector,
|
||||
A: oauth2::GetToken {
|
||||
|
||||
fn query_transfer_status(&'a mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
|
||||
fn query_transfer_status(&mut self) -> (Option<u64>, hyper::HttpResult<hyper::client::Response>) {
|
||||
loop {
|
||||
match self.client.post(self.url)
|
||||
.header(UserAgent(self.user_agent.to_string()))
|
||||
@@ -519,14 +534,63 @@ impl<'a, NC, A> ResumableUploadHelper<'a, NC, A>
|
||||
}
|
||||
}
|
||||
|
||||
pub fn upload(&'a mut self) -> hyper::HttpResult<hyper::client::Response> {
|
||||
let start = match self.start_at {
|
||||
pub fn upload(&mut self) -> hyper::HttpResult<hyper::client::Response> {
|
||||
let mut start = match self.start_at {
|
||||
Some(s) => s,
|
||||
None => match self.query_transfer_status() {
|
||||
(Some(s), _) => s,
|
||||
(_, result) => return result
|
||||
}
|
||||
};
|
||||
Err(hyper::error::HttpError::HttpStatusError)
|
||||
|
||||
const MIN_CHUNK_SIZE: u64 = 1 << 18;
|
||||
let chunk_size = match self.delegate.chunk_size() {
|
||||
cs if cs > MIN_CHUNK_SIZE => cs,
|
||||
_ => MIN_CHUNK_SIZE
|
||||
};
|
||||
|
||||
loop {
|
||||
let request_size = match self.content_length - start {
|
||||
rs if rs > chunk_size => chunk_size,
|
||||
rs => rs
|
||||
};
|
||||
|
||||
self.reader.seek(SeekFrom::Start(start)).unwrap();
|
||||
let mut section_reader = self.reader.take(request_size);
|
||||
let range_header = ContentRange {
|
||||
range: Some(Chunk {first: start, last: start + request_size - 1}),
|
||||
total_length: self.content_length
|
||||
};
|
||||
start += request_size;
|
||||
if self.delegate.cancel_chunk_upload(&range_header) {
|
||||
return Err(hyper::error::HttpError::HttpStatusError)
|
||||
}
|
||||
match self.client.post(self.url)
|
||||
.header(range_header)
|
||||
.header(ContentType(self.media_type.clone()))
|
||||
.header(UserAgent(self.user_agent.to_string()))
|
||||
.body(&mut section_reader)
|
||||
.send() {
|
||||
Ok(res) => {
|
||||
if res.status == StatusCode::PermanentRedirect {
|
||||
continue
|
||||
}
|
||||
if res.status != StatusCode::Ok {
|
||||
if let Retry::After(d) = self.delegate.http_failure(&res, None) {
|
||||
sleep(d);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return Ok(res)
|
||||
},
|
||||
Err(err) => {
|
||||
if let Retry::After(d) = self.delegate.http_error(&err) {
|
||||
sleep(d);
|
||||
continue;
|
||||
}
|
||||
return Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user