Move to std::futures to support async/await.

This commit is contained in:
Glenn Griffin
2019-11-07 14:23:40 -08:00
parent 01e7a7f437
commit 93cbd91341
16 changed files with 923 additions and 1148 deletions

View File

@@ -14,8 +14,8 @@ edition = "2018"
base64 = "0.10"
chrono = "0.4"
http = "0.1"
hyper = {version = "0.12", default-features = false}
hyper-rustls = "0.17"
hyper = {version = "0.13.0-alpha.4", features = ["unstable-stream"]}
hyper-rustls = "=0.18.0-alpha.2"
itertools = "0.8"
log = "0.3"
rustls = "0.16"
@@ -23,10 +23,9 @@ serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
url = "1"
futures = "0.1"
tokio-threadpool = "0.1"
tokio = "0.1"
tokio-timer = "0.2"
futures-preview = "=0.3.0-alpha.19"
tokio = "=0.2.0-alpha.6"
futures-util-preview = "=0.3.0-alpha.19"
[dev-dependencies]
getopts = "0.2"

View File

@@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
yup-oauth2 = { path = "../../" }
hyper = "0.12"
hyper-rustls = "0.17"
futures = "0.1"
tokio = "0.1"
hyper = {version = "0.13.0-alpha.4", features = ["unstable-stream"]}
hyper-rustls = "=0.18.0-alpha.2"
futures-preview = "=0.3.0-alpha.19"
tokio = "=0.2.0-alpha.6"

View File

@@ -1,20 +1,20 @@
use futures::prelude::*;
use yup_oauth2::{self, Authenticator, DeviceFlow, GetToken};
use std::path;
use tokio;
fn main() {
#[tokio::main]
async fn main() {
let creds = yup_oauth2::read_application_secret(path::Path::new("clientsecret.json"))
.expect("clientsecret");
let mut auth = Authenticator::new(DeviceFlow::new(creds))
let auth = Authenticator::new(DeviceFlow::new(creds))
.persist_tokens_to_disk("tokenstorage.json")
.build()
.expect("authenticator");
let scopes = vec!["https://www.googleapis.com/auth/youtube.readonly"];
let mut rt = tokio::runtime::Runtime::new().unwrap();
let fut = auth.token(scopes).and_then(|tok| Ok(println!("{:?}", tok)));
println!("{:?}", rt.block_on(fut));
match auth.token(scopes).await {
Err(e) => println!("error: {:?}", e),
Ok(t) => println!("token: {:?}", t),
}
}

View File

@@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
yup-oauth2 = { path = "../../" }
hyper = "0.12"
hyper-rustls = "0.17"
futures = "0.1"
tokio = "0.1"
hyper = {version = "0.13.0-alpha.4", features = ["unstable-stream"]}
hyper-rustls = "=0.18.0-alpha.2"
futures-preview = "=0.3.0-alpha.19"
tokio = "=0.2.0-alpha.6"

View File

@@ -1,24 +1,16 @@
use futures::prelude::*;
use yup_oauth2::GetToken;
use yup_oauth2::{Authenticator, InstalledFlow};
use hyper::client::Client;
use hyper_rustls::HttpsConnector;
use std::path::Path;
fn main() {
let https = HttpsConnector::new(1);
let client = Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let ad = yup_oauth2::DefaultFlowDelegate;
#[tokio::main]
async fn main() {
let secret = yup_oauth2::read_application_secret(Path::new("clientsecret.json"))
.expect("clientsecret.json");
let mut auth = Authenticator::new(InstalledFlow::new(
let auth = Authenticator::new(InstalledFlow::new(
secret,
yup_oauth2::InstalledFlowReturnMethod::HTTPRedirect(8081),
yup_oauth2::InstalledFlowReturnMethod::HTTPRedirectEphemeral,
))
.persist_tokens_to_disk("tokencache.json")
.build()
@@ -26,11 +18,8 @@ fn main() {
let s = "https://www.googleapis.com/auth/drive.file".to_string();
let scopes = vec![s];
let tok = auth.token(scopes);
let fut = tok.map_err(|e| println!("error: {:?}", e)).and_then(|t| {
println!("The token is {:?}", t);
Ok(())
});
tokio::run(fut)
match auth.token(scopes).await {
Err(e) => println!("error: {:?}", e),
Ok(t) => println!("The token is {:?}", t),
}
}

View File

@@ -6,7 +6,7 @@ edition = "2018"
[dependencies]
yup-oauth2 = { path = "../../" }
hyper = "0.12"
hyper-rustls = "0.17"
futures = "0.1"
tokio = "0.1"
hyper = {version = "0.13.0-alpha.4", features = ["unstable-stream"]}
hyper-rustls = "=0.18.0-alpha.2"
futures-preview = "=0.3.0-alpha.19"
tokio = "=0.2.0-alpha.6"

View File

@@ -1,29 +1,22 @@
use std::path;
use tokio;
use yup_oauth2;
use futures::prelude::*;
use yup_oauth2::GetToken;
use tokio;
use std::path;
fn main() {
#[tokio::main]
async fn main() {
let creds =
yup_oauth2::service_account_key_from_file(path::Path::new("serviceaccount.json")).unwrap();
let mut sa = yup_oauth2::ServiceAccountAccess::new(creds).build();
let sa = yup_oauth2::ServiceAccountAccess::new(creds).build();
let fut = sa
let tok = sa
.token(vec!["https://www.googleapis.com/auth/pubsub"])
.and_then(|tok| {
.await
.unwrap();
println!("token is: {:?}", tok);
Ok(())
});
let fut2 = sa
let tok = sa
.token(vec!["https://www.googleapis.com/auth/pubsub"])
.and_then(|tok| {
.await
.unwrap();
println!("cached token is {:?} and should be identical", tok);
Ok(())
});
let all = fut.join(fut2).then(|_| Ok(()));
tokio::run(all)
}

View File

@@ -3,13 +3,13 @@ use crate::refresh::RefreshFlow;
use crate::storage::{hash_scopes, DiskTokenStorage, MemoryStorage, TokenStorage};
use crate::types::{ApplicationSecret, GetToken, RefreshResult, RequestError, Token};
use futures::{future, prelude::*};
use tokio_timer;
use futures::prelude::*;
use std::error::Error;
use std::io;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::pin::Pin;
use std::sync::Arc;
/// Authenticator abstracts different `GetToken` implementations behind one type and handles
/// caching received tokens. It's important to use it (instead of the flows directly) because
@@ -28,8 +28,8 @@ struct AuthenticatorImpl<
C: hyper::client::connect::Connect,
> {
client: hyper::Client<C>,
inner: Arc<Mutex<T>>,
store: Arc<Mutex<S>>,
inner: Arc<T>,
store: Arc<S>,
delegate: AD,
}
@@ -48,7 +48,7 @@ impl HyperClientBuilder for DefaultHyperClient {
fn build_hyper_client(self) -> hyper::Client<Self::Connector> {
hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(hyper_rustls::HttpsConnector::new(1))
.build::<_, hyper::Body>(hyper_rustls::HttpsConnector::new())
}
}
@@ -169,14 +169,12 @@ where
where
T::TokenGetter: 'static + GetToken + Send,
S: 'static + Send,
AD: 'static + Send,
AD: 'static + Send + Sync,
C::Connector: 'static + Clone + Send,
{
let client = self.client.build_hyper_client();
let store = Arc::new(Mutex::new(self.store?));
let inner = Arc::new(Mutex::new(
self.token_getter.build_token_getter(client.clone()),
));
let store = Arc::new(self.store?);
let inner = Arc::new(self.token_getter.build_token_getter(client.clone()));
Ok(AuthenticatorImpl {
client,
@@ -187,147 +185,125 @@ where
}
}
impl<
GT: 'static + GetToken + Send,
S: 'static + TokenStorage + Send,
AD: 'static + AuthenticatorDelegate + Send,
impl<GT, S, AD, C> AuthenticatorImpl<GT, S, AD, C>
where
GT: 'static + GetToken,
S: 'static + TokenStorage,
AD: 'static + AuthenticatorDelegate + Send + Sync,
C: 'static + hyper::client::connect::Connect + Clone + Send,
> GetToken for AuthenticatorImpl<GT, S, AD, C>
{
/// Returns the API Key of the inner flow.
fn api_key(&mut self) -> Option<String> {
self.inner.lock().unwrap().api_key()
}
/// Returns the application secret of the inner flow.
fn application_secret(&self) -> ApplicationSecret {
self.inner.lock().unwrap().application_secret()
}
fn token<I, T>(
&mut self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
where
T: Into<String>,
I: IntoIterator<Item = T>,
{
let (scope_key, scopes) = hash_scopes(scopes);
async fn get_token(&self, scope_key: u64, scopes: Vec<String>) -> Result<Token, RequestError> {
let store = self.store.clone();
let mut delegate = self.delegate.clone();
let client = self.client.clone();
let appsecret = self.inner.lock().unwrap().application_secret();
let appsecret = self.inner.application_secret();
let gettoken = self.inner.clone();
let loopfn = move |()| -> Box<
dyn Future<Item = future::Loop<Token, ()>, Error = RequestError> + Send,
> {
// How well does this work with tokio?
match store.lock().unwrap().get(
loop {
match store.get(
scope_key.clone(),
&scopes.iter().map(|s| s.as_str()).collect(),
) {
Ok(Some(t)) => {
if !t.expired() {
return Box::new(Ok(future::Loop::Break(t)).into_future());
return Ok(t);
}
// Implement refresh flow.
let refresh_token = t.refresh_token.clone();
let mut delegate = delegate.clone();
let store = store.clone();
let scopes = scopes.clone();
let refresh_fut = RefreshFlow::refresh_token(
let rr = RefreshFlow::refresh_token(
client.clone(),
appsecret.clone(),
refresh_token.unwrap(),
)
.and_then(move |rr| -> Box<dyn Future<Item=future::Loop<Token, ()>, Error=RequestError> + Send> {
.await?;
match rr {
RefreshResult::Error(ref e) => {
delegate.token_refresh_failed(
format!("{}", e.description().to_string()),
&Some("the request has likely timed out".to_string()),
);
Box::new(Err(RequestError::Refresh(rr)).into_future())
return Err(RequestError::Refresh(rr));
}
RefreshResult::RefreshError(ref s, ref ss) => {
delegate.token_refresh_failed(
format!("{} {}", s, ss.clone().map(|s| format!("({})", s)).unwrap_or("".to_string())),
&Some("the refresh token is likely invalid and your authorization has been revoked".to_string()),
);
Box::new(Err(RequestError::Refresh(rr)).into_future())
return Err(RequestError::Refresh(rr));
}
RefreshResult::Success(t) => {
if let Err(e) = store.lock().unwrap().set(scope_key, &scopes.iter().map(|s| s.as_str()).collect(), Some(t.clone())) {
let x = store.set(
scope_key,
&scopes.iter().map(|s| s.as_str()).collect(),
Some(t.clone()),
);
if let Err(e) = x {
match delegate.token_storage_failure(true, &e) {
Retry::Skip => Box::new(Ok(future::Loop::Break(t)).into_future()),
Retry::Abort => Box::new(Err(RequestError::Cache(Box::new(e))).into_future()),
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(|_| Ok(future::Loop::Continue(()))),
)
as Box<
dyn Future<
Item = future::Loop<Token, ()>,
Error = RequestError> + Send>,
Retry::Skip => return Ok(t),
Retry::Abort => return Err(RequestError::Cache(Box::new(e))),
Retry::After(d) => tokio::timer::delay_for(d).await,
}
} else {
Box::new(Ok(future::Loop::Break(t)).into_future())
return Ok(t);
}
}
},
}
});
Box::new(refresh_fut)
}
Ok(None) => {
let store = store.clone();
let scopes = scopes.clone();
let mut delegate = delegate.clone();
Box::new(
gettoken
.lock()
.unwrap()
.token(scopes.clone())
.and_then(move |t| {
if let Err(e) = store.lock().unwrap().set(
let t = gettoken.token(scopes.clone()).await?;
if let Err(e) = store.set(
scope_key,
&scopes.iter().map(|s| s.as_str()).collect(),
Some(t.clone()),
) {
match delegate.token_storage_failure(true, &e) {
Retry::Skip => {
Box::new(Ok(future::Loop::Break(t)).into_future())
}
Retry::Abort => Box::new(
Err(RequestError::Cache(Box::new(e))).into_future(),
),
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(|_| Ok(future::Loop::Continue(()))),
)
as Box<
dyn Future<
Item = future::Loop<Token, ()>,
Error = RequestError,
> + Send,
>,
Retry::Skip => return Ok(t),
Retry::Abort => return Err(RequestError::Cache(Box::new(e))),
Retry::After(d) => tokio::timer::delay_for(d).await,
}
} else {
Box::new(Ok(future::Loop::Break(t)).into_future())
return Ok(t);
}
}),
)
}
Err(err) => match delegate.token_storage_failure(false, &err) {
Retry::Abort | Retry::Skip => {
return Box::new(Err(RequestError::Cache(Box::new(err))).into_future())
}
Retry::After(d) => {
return Box::new(
tokio_timer::sleep(d).then(|_| Ok(future::Loop::Continue(()))),
)
}
Retry::Abort | Retry::Skip => return Err(RequestError::Cache(Box::new(err))),
Retry::After(d) => tokio::timer::delay_for(d).await,
},
}
};
Box::new(future::loop_fn((), loopfn))
}
}
}
impl<
GT: 'static + GetToken,
S: 'static + TokenStorage,
AD: 'static + AuthenticatorDelegate + Send + Sync,
C: 'static + hyper::client::connect::Connect + Clone + Send,
> GetToken for AuthenticatorImpl<GT, S, AD, C>
{
/// Returns the API Key of the inner flow.
fn api_key(&self) -> Option<String> {
self.inner.api_key()
}
/// Returns the application secret of the inner flow.
fn application_secret(&self) -> ApplicationSecret {
self.inner.application_secret()
}
fn token<'a, I, T>(
&'a self,
scopes: I,
) -> Pin<Box<dyn Future<Output = Result<Token, RequestError>> + Send + 'a>>
where
T: Into<String>,
I: IntoIterator<Item = T>,
{
let (scope_key, scopes) = hash_scopes(scopes);
Box::pin(self.get_token(scope_key, scopes))
}
}

View File

@@ -2,14 +2,15 @@ use hyper;
use std::error::Error;
use std::fmt;
use std::io;
use std::pin::Pin;
use crate::types::{PollError, RequestError};
use chrono::{DateTime, Local, Utc};
use std::time::Duration;
use futures::{future, prelude::*};
use futures::prelude::*;
use tio::AsyncBufReadExt;
use tokio::io as tio;
/// A utility type to indicate how operations DeviceFlowHelper operations should be retried
@@ -83,7 +84,7 @@ pub trait AuthenticatorDelegate: Clone {
/// This can be useful if the underlying `TokenStorage` may fail occasionally.
/// if `is_set` is true, the failure resulted from `TokenStorage.set(...)`. Otherwise,
/// it was `TokenStorage.get(...)`
fn token_storage_failure(&mut self, is_set: bool, _: &dyn Error) -> Retry {
fn token_storage_failure(&mut self, is_set: bool, _: &(dyn Error + Send + Sync)) -> Retry {
let _ = is_set;
Retry::Abort
}
@@ -114,11 +115,11 @@ pub trait FlowDelegate: Clone {
/// Called if the request code is expired. You will have to start over in this case.
/// This will be the last call the delegate receives.
/// Given `DateTime` is the expiration date
fn expired(&mut self, _: &DateTime<Utc>) {}
fn expired(&self, _: &DateTime<Utc>) {}
/// Called if the user denied access. You would have to start over.
/// This will be the last call the delegate receives.
fn denied(&mut self) {}
fn denied(&self) {}
/// Called as long as we are waiting for the user to authorize us.
/// Can be used to print progress information, or decide to time-out.
@@ -127,7 +128,7 @@ pub trait FlowDelegate: Clone {
/// # Notes
/// * Only used in `DeviceFlow`. Return value will only be used if it
/// is larger than the interval desired by the server.
fn pending(&mut self, _: &PollInformation) -> Retry {
fn pending(&self, _: &PollInformation) -> Retry {
Retry::After(Duration::from_secs(5))
}
@@ -140,7 +141,7 @@ pub trait FlowDelegate: Clone {
/// # Notes
/// * Will be called exactly once, provided we didn't abort during `request_code` phase.
/// * Will only be called if the Authenticator's flow_type is `FlowType::Device`.
fn present_user_code(&mut self, pi: &PollInformation) {
fn present_user_code(&self, pi: &PollInformation) {
println!(
"Please enter {} at {} and grant access to this application",
pi.user_code, pi.verification_url
@@ -156,35 +157,44 @@ pub trait FlowDelegate: Clone {
/// We need the user to navigate to a URL using their browser and potentially paste back a code
/// (or maybe not). Whether they have to enter a code depends on the InstalledFlowReturnMethod
/// used.
fn present_user_url<S: AsRef<str> + fmt::Display>(
&mut self,
fn present_user_url<'a, S: AsRef<str> + fmt::Display + Send + Sync + 'a>(
&'a self,
url: S,
need_code: bool,
) -> Box<dyn Future<Item = Option<String>, Error = Box<dyn Error + Send>> + Send> {
) -> Pin<Box<dyn Future<Output = Result<String, Box<dyn Error + Send + Sync>>> + Send + 'a>>
{
Box::pin(present_user_url(url, need_code))
}
}
async fn present_user_url<S: AsRef<str> + fmt::Display>(
url: S,
need_code: bool,
) -> Result<String, Box<dyn Error + Send + Sync>> {
if need_code {
println!(
"Please direct your browser to {}, follow the instructions and enter the \
code displayed here: ",
url
);
Box::new(
tio::lines(io::BufReader::new(tio::stdin()))
.into_future()
.map_err(|(e, _)| {
println!("{:?}", e);
Box::new(e) as Box<dyn Error + Send>
})
.and_then(|(l, _)| Ok(l)),
)
let mut user_input = String::new();
match tio::BufReader::new(tio::stdin())
.read_line(&mut user_input)
.await
{
Err(err) => {
println!("{:?}", err);
Err(Box::new(err) as Box<dyn Error + Send + Sync>)
}
Ok(_) => Ok(user_input),
}
} else {
println!(
"Please direct your browser to {} and follow the instructions displayed \
there.",
url
);
Box::new(future::ok(None))
}
Ok(String::new())
}
}

View File

@@ -1,16 +1,14 @@
use std::iter::{FromIterator, IntoIterator};
use std::pin::Pin;
use std::time::Duration;
use ::log::{error, log};
use chrono::{self, Utc};
use futures::stream::Stream;
use futures::{future, prelude::*};
use http;
use futures::{prelude::*};
use hyper;
use hyper::header;
use itertools::Itertools;
use serde_json as json;
use tokio_timer;
use url::form_urlencoded;
use crate::authenticator_delegate::{DefaultFlowDelegate, FlowDelegate, PollInformation, Retry};
@@ -75,7 +73,7 @@ impl<FD> DeviceFlow<FD> {
impl<FD, C> crate::authenticator::AuthFlow<C> for DeviceFlow<FD>
where
FD: FlowDelegate + Send + 'static,
FD: FlowDelegate + Send + Sync + 'static,
C: hyper::client::connect::Connect + 'static,
{
type TokenGetter = DeviceFlowImpl<FD, C>;
@@ -108,21 +106,21 @@ impl<FD, C> Flow for DeviceFlowImpl<FD, C> {
}
impl<
FD: FlowDelegate + Clone + Send + 'static,
FD: FlowDelegate + Clone + Send + Sync + 'static,
C: hyper::client::connect::Connect + Sync + 'static,
> GetToken for DeviceFlowImpl<FD, C>
{
fn token<I, T>(
&mut self,
fn token<'a, I, T>(
&'a self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
) -> Pin<Box<dyn Future<Output = Result<Token, RequestError>> + Send + 'a>>
where
T: Into<String>,
I: IntoIterator<Item = T>,
{
self.retrieve_device_token(Vec::from_iter(scopes.into_iter().map(Into::into)))
Box::pin(self.retrieve_device_token(Vec::from_iter(scopes.into_iter().map(Into::into))))
}
fn api_key(&mut self) -> Option<String> {
fn api_key(&self) -> Option<String> {
None
}
fn application_secret(&self) -> ApplicationSecret {
@@ -139,75 +137,51 @@ where
{
/// Essentially what `GetToken::token` does: Retrieve a token for the given scopes without
/// caching.
fn retrieve_device_token<'a>(
&mut self,
pub async fn retrieve_device_token<'a>(
&self,
scopes: Vec<String>,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send> {
) -> Result<Token, RequestError> {
let application_secret = self.application_secret.clone();
let client = self.client.clone();
let wait = self.wait;
let mut fd = self.fd.clone();
let request_code = Self::request_code(
let fd = self.fd.clone();
let (pollinf, device_code) = Self::request_code(
application_secret.clone(),
client.clone(),
self.device_code_url.clone(),
scopes,
)
.and_then(move |(pollinf, device_code)| {
.await?;
fd.present_user_code(&pollinf);
Ok((pollinf, device_code))
});
let fd = self.fd.clone();
Box::new(request_code.and_then(move |(pollinf, device_code)| {
future::loop_fn(0, move |i| {
// Make a copy of everything every time, because the loop function needs to be
// repeatable, i.e. we can't move anything out.
let pt = Self::poll_token(
let maxn = wait.as_secs() / pollinf.interval.as_secs();
for _ in 0..maxn {
let fd = fd.clone();
let pollinf = pollinf.clone();
tokio::timer::delay_for(pollinf.interval).await;
let r = Self::poll_token(
application_secret.clone(),
client.clone(),
device_code.clone(),
pollinf.clone(),
fd.clone(),
);
let maxn = wait.as_secs() / pollinf.interval.as_secs();
let mut fd = fd.clone();
let pollinf = pollinf.clone();
tokio_timer::sleep(pollinf.interval)
.then(|_| pt)
.then(move |r| match r {
Ok(None) if i < maxn => match fd.pending(&pollinf) {
Retry::Abort | Retry::Skip => {
Box::new(Err(RequestError::Poll(PollError::TimedOut)).into_future())
}
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(move |_| Ok(future::Loop::Continue(i + 1))),
)
as Box<
dyn Future<
Item = future::Loop<Token, u64>,
Error = RequestError,
> + Send,
>,
.await;
match r {
Ok(None) => match fd.pending(&pollinf) {
Retry::Abort | Retry::Skip => {
return Err(RequestError::Poll(PollError::TimedOut))
}
Retry::After(d) => tokio::timer::delay_for(d).await,
},
Ok(Some(tok)) => Box::new(Ok(future::Loop::Break(tok)).into_future()),
Ok(Some(tok)) => return Ok(tok),
Err(e @ PollError::AccessDenied)
| Err(e @ PollError::TimedOut)
| Err(e @ PollError::Expired(_)) => {
Box::new(Err(RequestError::Poll(e)).into_future())
| Err(e @ PollError::Expired(_)) => return Err(RequestError::Poll(e)),
Err(ref e) => error!("Unknown error from poll token api: {}", e),
}
Err(ref e) if i < maxn => {
error!("Unknown error from poll token api: {}", e);
Box::new(Ok(future::Loop::Continue(i + 1)).into_future())
}
// Too many attempts.
Ok(None) | Err(_) => {
error!("Too many poll attempts");
Box::new(Err(RequestError::Poll(PollError::TimedOut)).into_future())
}
})
})
}))
Err(RequestError::Poll(PollError::TimedOut))
}
/// The first step involves asking the server for a code that the user
@@ -225,12 +199,12 @@ where
/// * If called after a successful result was returned at least once.
/// # Examples
/// See test-cases in source code for a more complete example.
fn request_code(
async fn request_code(
application_secret: ApplicationSecret,
client: hyper::Client<C>,
device_code_url: String,
scopes: Vec<String>,
) -> impl Future<Item = (PollInformation, String), Error = RequestError> {
) -> Result<(PollInformation, String), RequestError> {
// note: cloned() shouldn't be needed, see issue
// https://github.com/servo/rust-url/issues/81
let req = form_urlencoded::Serializer::new(String::new())
@@ -248,24 +222,14 @@ where
// note: works around bug in rustlang
// https://github.com/rust-lang/rust/issues/22252
let request = hyper::Request::post(device_code_url)
let req = hyper::Request::post(device_code_url)
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(req))
.into_future();
request
.then(
move |request: Result<hyper::Request<hyper::Body>, http::Error>| {
let request = request.unwrap();
client.request(request)
},
)
.then(
|r: Result<hyper::Response<hyper::Body>, hyper::error::Error>| {
match r {
Err(err) => {
return Err(RequestError::ClientError(err));
}
Ok(res) => {
.unwrap();
let resp = client
.request(req)
.await
.map_err(|e| RequestError::ClientError(e))?;
// This return type is defined in https://tools.ietf.org/html/draft-ietf-oauth-device-flow-15#section-3.2
// The alias is present as Google use a non-standard name for verification_uri.
// According to the standard interval is optional, however, all tested implementations provide it.
@@ -280,20 +244,16 @@ where
interval: i64,
}
let json_str: String = res
.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap(); // TODO: error handling
let json_bytes = resp.into_body().try_concat().await?;
// check for error
match json::from_str::<JsonError>(&json_str) {
match json::from_slice::<JsonError>(&json_bytes) {
Err(_) => {} // ignore, move on
Ok(res) => return Err(RequestError::from(res)),
}
let decoded: JsonData = json::from_str(&json_str).unwrap();
let decoded: JsonData =
json::from_slice(&json_bytes).map_err(|e| RequestError::JSONError(e))?;
let expires_in = decoded.expires_in.unwrap_or(60 * 60);
@@ -305,10 +265,6 @@ where
};
Ok((pi, decoded.device_code))
}
}
},
)
}
/// If the first call is successful, this method may be called.
/// As long as we are waiting for authentication, it will return `Ok(None)`.
@@ -328,19 +284,17 @@ where
///
/// # Examples
/// See test-cases in source code for a more complete example.
fn poll_token<'a>(
async fn poll_token<'a>(
application_secret: ApplicationSecret,
client: hyper::Client<C>,
device_code: String,
pi: PollInformation,
mut fd: FD,
) -> impl Future<Item = Option<Token>, Error = PollError> {
let expired = if pi.expires_at <= Utc::now() {
fd: FD,
) -> Result<Option<Token>, PollError> {
if pi.expires_at <= Utc::now() {
fd.expired(&pi.expires_at);
Err(PollError::Expired(pi.expires_at)).into_future()
} else {
Ok(()).into_future()
};
return Err(PollError::Expired(pi.expires_at));
}
// We should be ready for a new request
let req = form_urlencoded::Serializer::new(String::new())
@@ -356,22 +310,21 @@ where
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(req))
.unwrap(); // TODO: Error checking
expired
.and_then(move |_| client.request(request).map_err(|e| PollError::HttpError(e)))
.map(|res| {
res.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap() // TODO: error handling
})
.and_then(move |json_str: String| {
let res = client
.request(request)
.await
.map_err(|e| PollError::HttpError(e))?;
let body = res
.into_body()
.try_concat()
.await
.map_err(|e| PollError::HttpError(e))?;
#[derive(Deserialize)]
struct JsonError {
error: String,
}
match json::from_str::<JsonError>(&json_str) {
match json::from_slice::<JsonError>(&body) {
Err(_) => {} // ignore, move on, it's not an error
Ok(res) => {
match res.error.as_ref() {
@@ -391,11 +344,10 @@ where
}
// yes, we expect that !
let mut t: Token = json::from_str(&json_str).unwrap();
let mut t: Token = json::from_slice(&body).unwrap();
t.set_expiry_absolute();
Ok(Some(t.clone()))
})
Ok(Some(t))
}
}
@@ -415,7 +367,7 @@ mod tests {
#[derive(Clone)]
struct FD;
impl FlowDelegate for FD {
fn present_user_code(&mut self, pi: &PollInformation) {
fn present_user_code(&self, pi: &PollInformation) {
assert_eq!("https://example.com/verify", pi.verification_url);
}
}
@@ -426,17 +378,17 @@ mod tests {
app_secret.token_uri = format!("{}/token", server_url);
let device_code_url = format!("{}/code", server_url);
let https = HttpsConnector::new(1);
let https = HttpsConnector::new();
let client = hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let mut flow = DeviceFlow::new(app_secret)
let flow = DeviceFlow::new(app_secret)
.delegate(FD)
.device_code_url(device_code_url)
.build_token_getter(client);
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
@@ -461,13 +413,14 @@ mod tests {
.with_body(token_response)
.create();
let fut = flow
let fut = async {
let token = flow
.token(vec!["https://www.googleapis.com/scope/1"])
.then(|token| {
let token = token.unwrap();
.await
.unwrap();
assert_eq!("accesstoken", token.access_token);
Ok(()) as Result<(), ()>
});
};
rt.block_on(fut).expect("block_on");
_m.assert();
@@ -493,13 +446,12 @@ mod tests {
.expect(0) // Never called!
.create();
let fut = flow
.token(vec!["https://www.googleapis.com/scope/1"])
.then(|token| {
assert!(token.is_err());
assert!(format!("{}", token.unwrap_err()).contains("invalid_client_id"));
let fut = async {
let res = flow.token(vec!["https://www.googleapis.com/scope/1"]).await;
assert!(res.is_err());
assert!(format!("{}", res.unwrap_err()).contains("invalid_client_id"));
Ok(()) as Result<(), ()>
});
};
rt.block_on(fut).expect("block_on");
_m.assert();
@@ -524,13 +476,12 @@ mod tests {
.expect(1)
.create();
let fut = flow
.token(vec!["https://www.googleapis.com/scope/1"])
.then(|token| {
assert!(token.is_err());
assert!(format!("{}", token.unwrap_err()).contains("Access denied by user"));
let fut = async {
let res = flow.token(vec!["https://www.googleapis.com/scope/1"]).await;
assert!(res.is_err());
assert!(format!("{}", res.unwrap_err()).contains("Access denied by user"));
Ok(()) as Result<(), ()>
});
};
rt.block_on(fut).expect("block_on");
_m.assert();

View File

@@ -3,13 +3,16 @@
// Refer to the project root for licensing information.
//
use std::convert::AsRef;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use futures::prelude::*;
use futures::stream::Stream;
use futures::sync::oneshot;
use futures::future::FutureExt;
use futures_util::try_stream::TryStreamExt;
use hyper;
use hyper::{header, StatusCode, Uri};
use hyper::header;
use tokio::sync::oneshot;
use url::form_urlencoded;
use url::percent_encoding::{percent_encode, QUERY_ENCODE_SET};
@@ -58,20 +61,22 @@ where
})
}
impl<FD: FlowDelegate + 'static + Send + Clone, C: hyper::client::connect::Connect + 'static>
GetToken for InstalledFlowImpl<FD, C>
impl<
FD: FlowDelegate + 'static + Send + Sync + Clone,
C: hyper::client::connect::Connect + 'static,
> GetToken for InstalledFlowImpl<FD, C>
{
fn token<I, T>(
&mut self,
fn token<'a, I, T>(
&'a self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
) -> Pin<Box<dyn Future<Output = Result<Token, RequestError>> + Send + 'a>>
where
T: Into<String>,
I: IntoIterator<Item = T>,
{
Box::new(self.obtain_token(scopes.into_iter().map(Into::into).collect()))
Box::pin(self.obtain_token(scopes.into_iter().map(Into::into).collect()))
}
fn api_key(&mut self) -> Option<String> {
fn api_key(&self) -> Option<String> {
None
}
fn application_secret(&self) -> ApplicationSecret {
@@ -140,7 +145,7 @@ where
impl<FD, C> crate::authenticator::AuthFlow<C> for InstalledFlow<FD>
where
FD: FlowDelegate + Send + 'static,
FD: FlowDelegate + Send + Sync + 'static,
C: hyper::client::connect::Connect + 'static,
{
type TokenGetter = InstalledFlowImpl<FD, C>;
@@ -164,139 +169,69 @@ impl<'c, FD: 'static + FlowDelegate + Clone + Send, C: 'c + hyper::client::conne
/// . Return that token
///
/// It's recommended not to use the DefaultFlowDelegate, but a specialized one.
fn obtain_token<'a>(
&mut self,
async fn obtain_token<'a>(
&self,
scopes: Vec<String>, // Note: I haven't found a better way to give a list of strings here, due to ownership issues with futures.
) -> impl 'a + Future<Item = Token, Error = RequestError> + Send {
let rduri = self.fd.redirect_uri();
// Start server on localhost to accept auth code.
let server_bind_port = match self.method {
InstalledFlowReturnMethod::HTTPRedirect(port) => Some(port),
InstalledFlowReturnMethod::HTTPRedirectEphemeral => Some(0),
_ => None,
};
let server = if let Some(port) = server_bind_port {
match InstalledFlowServer::new(port) {
Result::Err(e) => Err(RequestError::ClientError(e)),
Result::Ok(server) => Ok(Some(server)),
) -> Result<Token, RequestError> {
match self.method {
InstalledFlowReturnMethod::HTTPRedirect(port) => {
self.ask_auth_code_via_http(scopes.iter(), port).await
}
} else {
Ok(None)
};
let port = if let Ok(Some(ref srv)) = server {
Some(srv.port)
} else {
None
};
let client = self.client.clone();
let (appsecclone, appsecclone2) = (self.appsecret.clone(), self.appsecret.clone());
let auth_delegate = self.fd.clone();
server
.into_future()
// First: Obtain authorization code from user.
.and_then(move |server| {
Self::ask_authorization_code(server, auth_delegate, &appsecclone, scopes.iter())
})
// Exchange the authorization code provided by Google/the provider for a refresh and an
// access token.
.and_then(move |authcode| {
let request = Self::request_token(appsecclone2, authcode, rduri, port);
let result = client.request(request);
// Handle result here, it makes ownership tracking easier.
result
.and_then(move |r| {
r.into_body()
.concat2()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
// TODO: error handling
})
.then(|body_or| {
let resp = match body_or {
Err(e) => return Err(RequestError::ClientError(e)),
Ok(s) => s,
};
let token_resp: Result<JSONTokenResponse, serde_json::Error> =
serde_json::from_str(&resp);
match token_resp {
Err(e) => {
return Err(RequestError::JSONError(e));
InstalledFlowReturnMethod::HTTPRedirectEphemeral => {
self.ask_auth_code_via_http(scopes.iter(), 0).await
}
Ok(tok) => {
if tok.error.is_some() {
Err(RequestError::NegativeServerResponse(
tok.error.unwrap(),
tok.error_description,
))
} else {
Ok(tok)
InstalledFlowReturnMethod::Interactive => {
self.ask_auth_code_interactively(scopes.iter()).await
}
}
}
})
})
// Return the combined token.
.and_then(|tokens| {
// Successful response
if tokens.access_token.is_some() {
let mut token = Token {
access_token: tokens.access_token.unwrap(),
refresh_token: Some(tokens.refresh_token.unwrap()),
token_type: tokens.token_type.unwrap(),
expires_in: tokens.expires_in,
expires_in_timestamp: None,
};
token.set_expiry_absolute();
Ok(token)
} else {
Err(RequestError::NegativeServerResponse(
tokens.error.unwrap(),
tokens.error_description,
))
}
})
}
fn ask_authorization_code<'a, S, T>(
server: Option<InstalledFlowServer>,
mut auth_delegate: FD,
appsecret: &ApplicationSecret,
scopes: S,
) -> Box<dyn Future<Item = String, Error = RequestError> + Send>
async fn ask_auth_code_interactively<'a, S, T>(&self, scopes: S) -> Result<Token, RequestError>
where
T: AsRef<str> + 'a,
S: Iterator<Item = &'a T>,
{
if server.is_none() {
let auth_delegate = &self.fd;
let appsecret = &self.appsecret;
let url = build_authentication_request_url(
&appsecret.auth_uri,
&appsecret.client_id,
scopes,
auth_delegate.redirect_uri(),
);
Box::new(
auth_delegate
.present_user_url(&url, true /* need_code */)
.then(|r| {
match r {
Ok(Some(mut code)) => {
let authcode = match auth_delegate
.present_user_url(&url, true /* need code */)
.await
{
Ok(mut code) => {
// Partial backwards compatibility in case an implementation adds a new line
// due to previous behaviour.
let ends_with_newline =
code.chars().last().map(|c| c == '\n').unwrap_or(false);
let ends_with_newline = code.chars().last().map(|c| c == '\n').unwrap_or(false);
if ends_with_newline {
code.pop();
}
Ok(code)
code
}
_ => Err(RequestError::UserError("couldn't read code".to_string())),
_ => return Err(RequestError::UserError("couldn't read code".to_string())),
};
self.exchange_auth_code(authcode, None).await
}
}),
)
} else {
let mut server = server.unwrap();
async fn ask_auth_code_via_http<'a, S, T>(
&self,
scopes: S,
desired_port: u16,
) -> Result<Token, RequestError>
where
T: AsRef<str> + 'a,
S: Iterator<Item = &'a T>,
{
let auth_delegate = &self.fd;
let appsecret = &self.appsecret;
let server = InstalledFlowServer::run(desired_port)?;
let bound_port = server.local_addr().port();
// Present url to user.
// The redirect URI must be this very localhost URL, otherwise authorization is refused
// by certain providers.
let url = build_authentication_request_url(
@@ -305,19 +240,65 @@ impl<'c, FD: 'static + FlowDelegate + Clone + Send, C: 'c + hyper::client::conne
scopes,
auth_delegate
.redirect_uri()
.or_else(|| Some(format!("http://localhost:{}", server.port))),
.or_else(|| Some(format!("http://localhost:{}", bound_port))),
);
Box::new(
auth_delegate
.present_user_url(&url, false /* need_code */)
.then(move |_| server.block_till_auth())
.map_err(|e| {
RequestError::UserError(format!(
"could not obtain token via redirect: {}",
e
))
}),
)
let _ = auth_delegate
.present_user_url(&url, false /* need code */)
.await;
let auth_code = server.wait_for_auth_code().await;
self.exchange_auth_code(auth_code, Some(bound_port)).await
}
async fn exchange_auth_code(
&self,
authcode: String,
port: Option<u16>,
) -> Result<Token, RequestError> {
let appsec = &self.appsecret;
let redirect_uri = &self.fd.redirect_uri();
let request = Self::request_token(appsec.clone(), authcode, redirect_uri.clone(), port);
let resp = self
.client
.request(request)
.await
.map_err(|e| RequestError::ClientError(e))?;
let body = resp
.into_body()
.try_concat()
.await
.map_err(|e| RequestError::ClientError(e))?;
let tokens: JSONTokenResponse =
serde_json::from_slice(&body).map_err(|e| RequestError::JSONError(e))?;
match tokens {
JSONTokenResponse {
error: Some(err),
error_description,
..
} => Err(RequestError::NegativeServerResponse(err, error_description)),
JSONTokenResponse {
access_token: Some(access_token),
refresh_token,
token_type: Some(token_type),
expires_in,
..
} => {
let mut token = Token {
access_token,
refresh_token,
token_type,
expires_in,
expires_in_timestamp: None,
};
token.set_expiry_absolute();
Ok(token)
}
JSONTokenResponse {
error_description, ..
} => Err(RequestError::NegativeServerResponse(
"".to_owned(),
error_description,
)),
}
}
@@ -362,124 +343,86 @@ struct JSONTokenResponse {
error_description: Option<String>,
}
fn spawn_with_handle<F>(f: F) -> impl Future<Output = ()>
where
F: Future<Output = ()> + 'static + Send,
{
let (tx, rx) = oneshot::channel();
tokio::spawn(f.map(move |_| tx.send(()).unwrap()));
async {
let _ = rx.await;
}
}
struct InstalledFlowServer {
port: u16,
shutdown_tx: Option<oneshot::Sender<()>>,
auth_code_rx: Option<oneshot::Receiver<String>>,
threadpool: Option<tokio_threadpool::ThreadPool>,
addr: SocketAddr,
auth_code_rx: oneshot::Receiver<String>,
trigger_shutdown_tx: oneshot::Sender<()>,
shutdown_complete: Pin<Box<dyn Future<Output = ()> + Send>>,
}
impl InstalledFlowServer {
fn new(port: u16) -> Result<InstalledFlowServer, hyper::error::Error> {
fn run(desired_port: u16) -> Result<Self, RequestError> {
use hyper::service::{make_service_fn, service_fn};
let (auth_code_tx, auth_code_rx) = oneshot::channel::<String>();
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (trigger_shutdown_tx, trigger_shutdown_rx) = oneshot::channel::<()>();
let auth_code_tx = Arc::new(Mutex::new(Some(auth_code_tx)));
let threadpool = tokio_threadpool::Builder::new()
.pool_size(1)
.name_prefix("InstalledFlowServer-")
.build();
let service_maker = InstalledFlowServiceMaker::new(auth_code_tx);
let addr: std::net::SocketAddr = ([127, 0, 0, 1], port).into();
let builder = hyper::server::Server::try_bind(&addr)?;
let server = builder.http1_only(true).serve(service_maker);
let port = server.local_addr().port();
let server_future = server
.with_graceful_shutdown(shutdown_rx)
.map_err(|err| panic!("Failed badly: {}", err));
threadpool.spawn(server_future);
Result::Ok(InstalledFlowServer {
port: port,
shutdown_tx: Some(shutdown_tx),
auth_code_rx: Some(auth_code_rx),
threadpool: Some(threadpool),
let service = make_service_fn(move |_| {
let auth_code_tx = auth_code_tx.clone();
async move {
use std::convert::Infallible;
Ok::<_, Infallible>(service_fn(move |req| {
installed_flow_server::handle_req(req, auth_code_tx.clone())
}))
}
});
let addr: std::net::SocketAddr = ([127, 0, 0, 1], desired_port).into();
let server = hyper::server::Server::try_bind(&addr)?;
let server = server.http1_only(true).serve(service);
let addr = server.local_addr();
let shutdown_complete = spawn_with_handle(async {
let _ = server
.with_graceful_shutdown(async move {
let _ = trigger_shutdown_rx.await;
})
.await;
});
Ok(InstalledFlowServer {
addr,
auth_code_rx,
trigger_shutdown_tx,
shutdown_complete: Box::pin(shutdown_complete),
})
}
fn block_till_auth(&mut self) -> Result<String, oneshot::Canceled> {
match self.auth_code_rx.take() {
Some(auth_code_rx) => auth_code_rx.wait(),
None => Result::Err(oneshot::Canceled),
fn local_addr(&self) -> SocketAddr {
self.addr
}
async fn wait_for_auth_code(self) -> String {
// Wait for the auth code from the server.
let auth_code = self
.auth_code_rx
.await
.expect("server shutdown while waiting for auth_code");
// auth code received. shutdown the server
let _ = self.trigger_shutdown_tx.send(());
self.shutdown_complete.await;
auth_code
}
}
impl std::ops::Drop for InstalledFlowServer {
fn drop(&mut self) {
self.shutdown_tx.take().map(|tx| tx.send(()));
self.auth_code_rx.take().map(|mut rx| rx.close());
self.threadpool.take();
}
}
mod installed_flow_server {
use hyper::{Body, Request, Response, StatusCode, Uri};
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use url::form_urlencoded;
pub struct InstalledFlowHandlerResponseFuture {
inner: Box<
dyn futures::Future<Item = hyper::Response<hyper::Body>, Error = hyper::http::Error> + Send,
>,
}
impl InstalledFlowHandlerResponseFuture {
fn new(
fut: Box<
dyn futures::Future<Item = hyper::Response<hyper::Body>, Error = hyper::http::Error>
+ Send,
>,
) -> Self {
Self { inner: fut }
}
}
impl futures::Future for InstalledFlowHandlerResponseFuture {
type Item = hyper::Response<hyper::Body>;
type Error = hyper::http::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
/// Creates InstalledFlowService on demand
struct InstalledFlowServiceMaker {
pub(super) async fn handle_req(
req: Request<Body>,
auth_code_tx: Arc<Mutex<Option<oneshot::Sender<String>>>>,
}
impl InstalledFlowServiceMaker {
fn new(auth_code_tx: oneshot::Sender<String>) -> InstalledFlowServiceMaker {
let auth_code_tx = Arc::new(Mutex::new(Option::Some(auth_code_tx)));
InstalledFlowServiceMaker { auth_code_tx }
}
}
impl<Ctx> hyper::service::MakeService<Ctx> for InstalledFlowServiceMaker {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = hyper::http::Error;
type Service = InstalledFlowService;
type Future = futures::future::FutureResult<Self::Service, Self::Error>;
type MakeError = hyper::http::Error;
fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
let service = InstalledFlowService {
auth_code_tx: self.auth_code_tx.clone(),
};
futures::future::ok(service)
}
}
/// HTTP service handling the redirect from the provider.
struct InstalledFlowService {
auth_code_tx: Arc<Mutex<Option<oneshot::Sender<String>>>>,
}
impl hyper::service::Service for InstalledFlowService {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = hyper::http::Error;
type Future = InstalledFlowHandlerResponseFuture;
fn call(&mut self, req: hyper::Request<Self::ReqBody>) -> Self::Future {
) -> Result<Response<Body>, http::Error> {
match req.uri().path_and_query() {
Some(path_and_query) => {
// We use a fake URL because the redirect goes to a URL, meaning we
@@ -491,77 +434,44 @@ impl hyper::service::Service for InstalledFlowService {
.path_and_query(path_and_query.clone())
.build();
if url.is_err() {
let response = hyper::Response::builder()
match url {
Err(_) => hyper::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(hyper::Body::from("Unparseable URL"));
match response {
Ok(response) => InstalledFlowHandlerResponseFuture::new(Box::new(
futures::future::ok(response),
)),
Err(err) => InstalledFlowHandlerResponseFuture::new(Box::new(
futures::future::err(err),
)),
.body(hyper::Body::from("Unparseable URL")),
Ok(url) => match auth_code_from_url(url) {
Some(auth_code) => {
if let Some(sender) = auth_code_tx.lock().unwrap().take() {
let _ = sender.send(auth_code);
}
} else {
self.handle_url(url.unwrap());
let response =
hyper::Response::builder()
.status(StatusCode::OK)
.body(hyper::Body::from(
hyper::Response::builder().status(StatusCode::OK).body(
hyper::Body::from(
"<html><head><title>Success</title></head><body>You may now \
close this window.</body></html>",
));
match response {
Ok(response) => InstalledFlowHandlerResponseFuture::new(Box::new(
futures::future::ok(response),
)),
Err(err) => InstalledFlowHandlerResponseFuture::new(Box::new(
futures::future::err(err),
)),
),
)
}
}
}
None => {
let response = hyper::Response::builder()
None => hyper::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(hyper::Body::from("Invalid Request!"));
.body(hyper::Body::from("No `code` in URL")),
},
}
}
None => hyper::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(hyper::Body::from("Invalid Request!")),
}
}
match response {
Ok(response) => InstalledFlowHandlerResponseFuture::new(Box::new(
futures::future::ok(response),
)),
Err(err) => {
InstalledFlowHandlerResponseFuture::new(Box::new(futures::future::err(err)))
}
}
}
}
}
}
impl InstalledFlowService {
fn handle_url(&mut self, url: hyper::Uri) {
fn auth_code_from_url(url: hyper::Uri) -> Option<String> {
// The provider redirects to the specified localhost URL, appending the authorization
// code, like this: http://localhost:8080/xyz/?code=4/731fJ3BheyCouCniPufAd280GHNV5Ju35yYcGs
// We take that code and send it to the ask_authorization_code() function that
// waits for it.
for (param, val) in form_urlencoded::parse(url.query().unwrap_or("").as_bytes()) {
if param == "code".to_string() {
let mut auth_code_tx = self.auth_code_tx.lock().unwrap();
match auth_code_tx.take() {
Some(auth_code_tx) => {
let _ = auth_code_tx.send(val.to_owned().to_string());
}
None => {
// call to the server after a previous call. Each server is only designed
// to receive a single request.
}
};
}
form_urlencoded::parse(url.query().unwrap_or("").as_bytes()).find_map(|(param, val)| {
if param == "code" {
Some(val.into_owned())
} else {
None
}
})
}
}
@@ -571,7 +481,7 @@ mod tests {
use std::fmt;
use std::str::FromStr;
use hyper;
use hyper::Uri;
use hyper::client::connect::HttpConnector;
use hyper_rustls::HttpsConnector;
use mockito::{self, mock};
@@ -593,14 +503,16 @@ mod tests {
impl FlowDelegate for FD {
/// Depending on need_code, return the pre-set code or send the code to the server at
/// the redirect_uri given in the url.
fn present_user_url<S: AsRef<str> + fmt::Display>(
&mut self,
fn present_user_url<'a, S: AsRef<str> + fmt::Display + Send + Sync + 'a>(
&'a self,
url: S,
need_code: bool,
) -> Box<dyn Future<Item = Option<String>, Error = Box<dyn Error + Send>> + Send>
{
) -> Pin<
Box<dyn Future<Output = Result<String, Box<dyn Error + Send + Sync>>> + Send + 'a>,
> {
Box::pin(async move {
if need_code {
Box::new(Ok(Some(self.0.clone())).into_future())
Ok(self.0.clone())
} else {
// Parse presented url to obtain redirect_uri with location of local
// code-accepting server.
@@ -615,23 +527,20 @@ mod tests {
}
}
if rduri.is_none() {
return Box::new(
Err(Box::new(StringError::new("no redirect uri!", None))
as Box<dyn Error + Send>)
.into_future(),
);
return Err(Box::new(StringError::new("no redirect uri!", None))
as Box<dyn Error + Send + Sync>);
}
let mut rduri = rduri.unwrap();
rduri.push_str(&format!("?code={}", self.0));
let rduri = Uri::from_str(rduri.as_ref()).unwrap();
// Hit server.
return Box::new(
self.1
.get(rduri)
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)
.map(|_| None),
);
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
.map(|_| "".to_string())
}
})
}
}
@@ -640,18 +549,18 @@ mod tests {
let mut app_secret = parse_application_secret(app_secret).unwrap();
app_secret.token_uri = format!("{}/token", server_url);
let https = HttpsConnector::new(1);
let https = HttpsConnector::new();
let client = hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let fd = FD("authorizationcode".to_string(), client.clone());
let mut inf =
let inf =
InstalledFlow::new(app_secret.clone(), InstalledFlowReturnMethod::Interactive)
.delegate(fd)
.build_token_getter(client.clone());
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
@@ -665,20 +574,24 @@ mod tests {
.expect(1)
.create();
let fut = inf
let fut = || {
async {
let tok = inf
.token(vec!["https://googleapis.com/some/scope"])
.and_then(|tok| {
.await
.map_err(|_| ())?;
assert_eq!("accesstoken", tok.access_token);
assert_eq!("refreshtoken", tok.refresh_token.unwrap());
assert_eq!("Bearer", tok.token_type);
Ok(())
});
rt.block_on(fut).expect("block on");
Ok(()) as Result<(), ()>
}
};
rt.block_on(fut()).expect("block on");
_m.assert();
}
// Successful path with HTTP redirect.
{
let mut inf =
let inf =
InstalledFlow::new(app_secret, InstalledFlowReturnMethod::HTTPRedirect(8081))
.delegate(FD(
"authorizationcodefromlocalserver".to_string(),
@@ -691,14 +604,16 @@ mod tests {
.expect(1)
.create();
let fut = inf
let fut = async {
let tok = inf
.token(vec!["https://googleapis.com/some/scope"])
.and_then(|tok| {
.await
.map_err(|_| ())?;
assert_eq!("accesstoken", tok.access_token);
assert_eq!("refreshtoken", tok.refresh_token.unwrap());
assert_eq!("Bearer", tok.token_type);
Ok(())
});
Ok(()) as Result<(), ()>
};
rt.block_on(fut).expect("block on");
_m.assert();
}
@@ -713,17 +628,16 @@ mod tests {
.expect(1)
.create();
let fut = inf
.token(vec!["https://googleapis.com/some/scope"])
.then(|tokr| {
let fut = async {
let tokr = inf.token(vec!["https://googleapis.com/some/scope"]).await;
assert!(tokr.is_err());
assert!(format!("{}", tokr.unwrap_err()).contains("invalid_code"));
Ok(()) as Result<(), ()>
});
};
rt.block_on(fut).expect("block on");
_m.assert();
}
rt.shutdown_on_idle().wait().expect("shutdown");
rt.shutdown_on_idle();
}
#[test]
@@ -743,41 +657,53 @@ mod tests {
);
}
#[test]
fn test_server_random_local_port() {
let addr1 = InstalledFlowServer::new(0).unwrap();
let addr2 = InstalledFlowServer::new(0).unwrap();
assert_ne!(addr1.port, addr2.port);
#[tokio::test]
async fn test_server_random_local_port() {
let addr1 = InstalledFlowServer::run(0).unwrap().local_addr();
let addr2 = InstalledFlowServer::run(0).unwrap().local_addr();
assert_ne!(addr1.port(), addr2.port());
}
#[test]
fn test_http_handle_url() {
#[tokio::test]
async fn test_http_handle_url() {
let (tx, rx) = oneshot::channel();
let mut handler = InstalledFlowService {
auth_code_tx: Arc::new(Mutex::new(Option::Some(tx))),
};
// URLs are usually a bit botched
let url: Uri = "http://example.com:1234/?code=ab/c%2Fd#".parse().unwrap();
handler.handle_url(url);
assert_eq!(rx.wait().unwrap(), "ab/c/d".to_string());
let req = hyper::Request::get(url)
.body(hyper::body::Body::empty())
.unwrap();
installed_flow_server::handle_req(req, Arc::new(Mutex::new(Some(tx))))
.await
.unwrap();
assert_eq!(rx.await.unwrap().as_str(), "ab/c/d");
}
#[test]
fn test_server() {
let runtime = tokio::runtime::Runtime::new().unwrap();
#[tokio::test]
async fn test_server() {
let client: hyper::Client<hyper::client::HttpConnector, hyper::Body> =
hyper::Client::builder()
.executor(runtime.executor())
.build_http();
let mut server = InstalledFlowServer::new(0).unwrap();
hyper::Client::builder().build_http();
let server = InstalledFlowServer::run(0).unwrap();
let response = client
.get(format!("http://{}/", server.local_addr()).parse().unwrap())
.await;
match response {
Result::Ok(_response) => {
// TODO: Do we really want this to assert success?
//assert!(response.status().is_success());
}
Result::Err(err) => {
assert!(false, "Failed to request from local server: {:?}", err);
}
}
let response = client
.get(
format!("http://127.0.0.1:{}/", server.port)
format!("http://{}/?code=ab/c%2Fd#", server.local_addr())
.parse()
.unwrap(),
)
.wait();
.await;
match response {
Result::Ok(response) => {
assert!(response.status().is_success());
@@ -787,29 +713,6 @@ mod tests {
}
}
let response = client
.get(
format!("http://127.0.0.1:{}/?code=ab/c%2Fd#", server.port)
.parse()
.unwrap(),
)
.wait();
match response {
Result::Ok(response) => {
assert!(response.status().is_success());
}
Result::Err(err) => {
assert!(false, "Failed to request from local server: {:?}", err);
}
}
match server.block_till_auth() {
Result::Ok(response) => {
assert_eq!(response, "ab/c/d".to_string());
}
Result::Err(err) => {
assert!(false, "Server failed to pass on the message: {:?}", err);
}
}
assert_eq!(server.wait_for_auth_code().await.as_str(), "ab/c/d");
}
}

View File

@@ -47,7 +47,8 @@
//!
//! use std::path::Path;
//!
//! fn main() {
//! #[tokio::main]
//! async fn main() {
//! // Read application secret from a file. Sometimes it's easier to compile it directly into
//! // the binary. The clientsecret file contains JSON like `{"installed":{"client_id": ... }}`
//! let secret = yup_oauth2::read_application_secret(Path::new("clientsecret.json"))
@@ -69,14 +70,10 @@
//!
//! // token(<scopes>) is the one important function of this crate; it does everything to
//! // obtain a token that can be sent e.g. as Bearer token.
//! let tok = auth.token(scopes);
//! // Finally we print the token.
//! let fut = tok.map_err(|e| println!("error: {:?}", e)).and_then(|t| {
//! println!("The token is {:?}", t);
//! Ok(())
//! });
//!
//! tokio::run(fut)
//! match auth.token(scopes).await {
//! Ok(token) => println!("The token is {:?}", token),
//! Err(e) => println!("error: {:?}", e),
//! }
//! }
//! ```
//!

View File

@@ -2,8 +2,7 @@ use crate::types::{ApplicationSecret, JsonError, RefreshResult, RequestError};
use super::Token;
use chrono::Utc;
use futures::stream::Stream;
use futures::Future;
use futures_util::try_stream::TryStreamExt;
use hyper;
use hyper::header;
use serde_json as json;
@@ -31,11 +30,12 @@ impl RefreshFlow {
///
/// # Examples
/// Please see the crate landing page for an example.
pub fn refresh_token<'a, C: 'static + hyper::client::connect::Connect>(
pub async fn refresh_token<C: 'static + hyper::client::connect::Connect>(
client: hyper::Client<C>,
client_secret: ApplicationSecret,
refresh_token: String,
) -> impl 'a + Future<Item = RefreshResult, Error = RequestError> {
) -> Result<RefreshResult, RequestError> {
// TODO: Does this function ever return RequestError? Maybe have it just return RefreshResult.
let req = form_urlencoded::Serializer::new(String::new())
.extend_pairs(&[
("client_id", client_secret.client_id.clone()),
@@ -50,44 +50,35 @@ impl RefreshFlow {
.body(hyper::Body::from(req))
.unwrap(); // TODO: error handling
client
.request(request)
.then(|r| {
match r {
Err(err) => return Err(RefreshResult::Error(err)),
Ok(res) => {
Ok(res
.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap()) // TODO: error handling
let resp = match client.request(request).await {
Ok(resp) => resp,
Err(err) => return Ok(RefreshResult::Error(err)),
};
let body = match resp.into_body().try_concat().await {
Ok(body) => body,
Err(err) => return Ok(RefreshResult::Error(err)),
};
if let Ok(json_err) = json::from_slice::<JsonError>(&body) {
return Ok(RefreshResult::RefreshError(
json_err.error,
json_err.error_description,
));
}
}
})
.then(move |maybe_json_str: Result<String, RefreshResult>| {
if let Err(e) = maybe_json_str {
return Ok(e);
}
let json_str = maybe_json_str.unwrap();
#[derive(Deserialize)]
struct JsonToken {
access_token: String,
token_type: String,
expires_in: i64,
}
match json::from_str::<JsonError>(&json_str) {
Err(_) => {}
Ok(res) => {
let t: JsonToken = match json::from_slice(&body) {
Err(_) => {
return Ok(RefreshResult::RefreshError(
res.error,
res.error_description,
"failed to deserialized json token from refresh response".to_owned(),
None,
))
}
}
let t: JsonToken = json::from_str(&json_str).unwrap();
Ok(token) => token,
};
Ok(RefreshResult::Success(Token {
access_token: t.access_token,
token_type: t.token_type,
@@ -95,8 +86,6 @@ impl RefreshFlow {
expires_in: None,
expires_in_timestamp: Some(Utc::now().timestamp() + t.expires_in),
}))
})
.map_err(RequestError::Refresh)
}
}
@@ -119,12 +108,12 @@ mod tests {
app_secret.token_uri = format!("{}/token", server_url);
let refresh_token = "my-refresh-token".to_string();
let https = HttpsConnector::new(1);
let https = HttpsConnector::new();
let client = hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
@@ -138,13 +127,14 @@ mod tests {
.with_status(200)
.with_body(r#"{"access_token": "new-access-token", "token_type": "Bearer", "expires_in": 1234567}"#)
.create();
let fut = RefreshFlow::refresh_token(
let fut = async {
let rr = RefreshFlow::refresh_token(
client.clone(),
app_secret.clone(),
refresh_token.clone(),
)
.then(|rr| {
let rr = rr.unwrap();
.await
.unwrap();
match rr {
RefreshResult::Success(tok) => {
assert_eq!("new-access-token", tok.access_token);
@@ -153,7 +143,7 @@ mod tests {
_ => panic!(format!("unexpected RefreshResult {:?}", rr)),
}
Ok(()) as Result<(), ()>
});
};
rt.block_on(fut).expect("block_on");
_m.assert();
@@ -167,18 +157,20 @@ mod tests {
.with_body(r#"{"error": "invalid_token"}"#)
.create();
let fut = RefreshFlow::refresh_token(client, app_secret, refresh_token).then(|rr| {
let rr = rr.unwrap();
let fut = async {
let rr = RefreshFlow::refresh_token(client, app_secret, refresh_token)
.await
.unwrap();
match rr {
RefreshResult::RefreshError(e, None) => {
assert_eq!(e, "invalid_token");
}
_ => panic!(format!("unexpected RefreshResult {:?}", rr)),
}
Ok(())
});
Ok(()) as Result<(), ()>
};
tokio::run(fut);
rt.block_on(fut).expect("block_on");
_m.assert();
}
}

View File

@@ -12,14 +12,14 @@
//!
use std::default::Default;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use crate::authenticator::{DefaultHyperClient, HyperClientBuilder};
use crate::storage::{hash_scopes, MemoryStorage, TokenStorage};
use crate::types::{ApplicationSecret, GetToken, JsonError, RequestError, StringError, Token};
use crate::types::{ApplicationSecret, GetToken, JsonError, RequestError, Token};
use futures::stream::Stream;
use futures::{future, prelude::*};
use futures::prelude::*;
use hyper::header;
use url::form_urlencoded;
@@ -266,83 +266,95 @@ struct TokenResponse {
expires_in: Option<i64>,
}
impl TokenResponse {
fn to_oauth_token(self) -> Token {
let expires_ts = chrono::Utc::now().timestamp() + self.expires_in.unwrap_or(0);
Token {
access_token: self.access_token.unwrap(),
token_type: self.token_type.unwrap(),
refresh_token: Some(String::new()),
expires_in: self.expires_in,
expires_in_timestamp: Some(expires_ts),
}
}
}
impl<'a, C: 'static + hyper::client::connect::Connect> ServiceAccountAccessImpl<C> {
/// Send a request for a new Bearer token to the OAuth provider.
fn request_token(
async fn request_token(
client: hyper::client::Client<C>,
sub: Option<String>,
key: ServiceAccountKey,
scopes: Vec<String>,
) -> impl Future<Item = Token, Error = RequestError> {
) -> Result<Token, RequestError> {
let mut claims = init_claims_from_key(&key, &scopes);
claims.sub = sub.clone();
let signed = JWT::new(claims)
.sign(key.private_key.as_ref().unwrap())
.into_future();
signed
.map_err(RequestError::LowLevelError)
.map(|signed| {
form_urlencoded::Serializer::new(String::new())
.map_err(RequestError::LowLevelError)?;
let rqbody = form_urlencoded::Serializer::new(String::new())
.extend_pairs(vec![
("grant_type".to_string(), GRANT_TYPE.to_string()),
("assertion".to_string(), signed),
])
.finish()
})
.map(|rqbody| {
hyper::Request::post(key.token_uri.unwrap())
.finish();
let request = hyper::Request::post(key.token_uri.unwrap())
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(rqbody))
.unwrap()
})
.and_then(move |request| client.request(request).map_err(RequestError::ClientError))
.and_then(|response| {
response
.unwrap();
let response = client
.request(request)
.await
.map_err(RequestError::ClientError)?;
let body = response
.into_body()
.concat2()
.map_err(RequestError::ClientError)
})
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.and_then(|s| {
if let Ok(jse) = serde_json::from_str::<JsonError>(&s) {
Err(RequestError::NegativeServerResponse(
.try_concat()
.await
.map_err(RequestError::ClientError)?;
if let Ok(jse) = serde_json::from_slice::<JsonError>(&body) {
return Err(RequestError::NegativeServerResponse(
jse.error,
jse.error_description,
))
} else {
serde_json::from_str(&s).map_err(RequestError::JSONError)
));
}
})
.then(|token: Result<TokenResponse, RequestError>| match token {
Err(e) => return Err(e),
Ok(token) => {
if token.access_token.is_none()
|| token.token_type.is_none()
|| token.expires_in.is_none()
{
Err(RequestError::BadServerResponse(format!(
let token: TokenResponse =
serde_json::from_slice(&body).map_err(RequestError::JSONError)?;
let token = match token {
TokenResponse {
access_token: Some(access_token),
token_type: Some(token_type),
expires_in: Some(expires_in),
..
} => {
let expires_ts = chrono::Utc::now().timestamp() + expires_in;
Token {
access_token,
token_type,
refresh_token: None,
expires_in: Some(expires_in),
expires_in_timestamp: Some(expires_ts),
}
}
_ => {
return Err(RequestError::BadServerResponse(format!(
"Token response lacks fields: {:?}",
token
)))
} else {
Ok(token.to_oauth_token())
}
};
Ok(token)
}
})
async fn get_token(&self, hash: u64, scopes: Vec<String>) -> Result<Token, RequestError> {
let cache = self.cache.clone();
match cache
.lock()
.unwrap()
.get(hash, &scopes.iter().map(|s| s.as_str()).collect())
{
Ok(Some(token)) if !token.expired() => return Ok(token),
_ => {}
}
let token = Self::request_token(
self.client.clone(),
self.sub.clone(),
self.key.clone(),
scopes.iter().map(|s| s.to_string()).collect(),
)
.await?;
let _ = cache.lock().unwrap().set(
hash,
&scopes.iter().map(|s| s.as_str()).collect(),
Some(token.clone()),
);
Ok(token)
}
}
@@ -350,61 +362,16 @@ impl<C: 'static> GetToken for ServiceAccountAccessImpl<C>
where
C: hyper::client::connect::Connect,
{
fn token<I, T>(
&mut self,
fn token<'a, I, T>(
&'a self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
) -> Pin<Box<dyn Future<Output = Result<Token, RequestError>> + Send + 'a>>
where
T: Into<String>,
I: IntoIterator<Item = T>,
{
let (hash, scps0) = hash_scopes(scopes);
let cache = self.cache.clone();
let scps = scps0.clone();
let cache_lookup = futures::lazy(move || {
match cache
.lock()
.unwrap()
.get(hash, &scps.iter().map(|s| s.as_str()).collect())
{
Ok(Some(token)) => {
if !token.expired() {
return Ok(token);
}
return Err(StringError::new("expired token in cache", None));
}
Err(e) => return Err(StringError::new(format!("cache lookup error: {}", e), None)),
Ok(None) => return Err(StringError::new("no token in cache", None)),
}
});
let cache = self.cache.clone();
let req_token = Self::request_token(
self.client.clone(),
self.sub.clone(),
self.key.clone(),
scps0.iter().map(|s| s.to_string()).collect(),
)
.then(move |r| match r {
Ok(token) => {
let _ = cache.lock().unwrap().set(
hash,
&scps0.iter().map(|s| s.as_str()).collect(),
Some(token.clone()),
);
Box::new(future::ok(token))
}
Err(e) => Box::new(future::err(e)),
});
Box::new(cache_lookup.then(|r| match r {
Ok(t) => Box::new(Ok(t).into_future())
as Box<dyn Future<Item = Token, Error = RequestError> + Send>,
Err(_) => {
Box::new(req_token) as Box<dyn Future<Item = Token, Error = RequestError> + Send>
}
}))
Box::pin(self.get_token(hash, scps0))
}
/// Returns an empty ApplicationSecret as tokens for service accounts don't need to be
@@ -413,7 +380,7 @@ where
Default::default()
}
fn api_key(&mut self) -> Option<String> {
fn api_key(&self) -> Option<String> {
None
}
}
@@ -458,11 +425,11 @@ mod tests {
"token_type": "Bearer"
}"#;
let https = HttpsConnector::new(1);
let https = HttpsConnector::new();
let client = hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
@@ -476,14 +443,15 @@ mod tests {
.with_body(json_response)
.expect(1)
.create();
let mut acc = ServiceAccountAccessImpl::new(client.clone(), key.clone(), None);
let fut = acc
let acc = ServiceAccountAccessImpl::new(client.clone(), key.clone(), None);
let fut = async {
let tok = acc
.token(vec!["https://www.googleapis.com/auth/pubsub"])
.and_then(|tok| {
.await?;
assert!(tok.access_token.contains("ya29.c.ElouBywiys0Ly"));
assert_eq!(Some(3600), tok.expires_in);
Ok(())
});
Ok(()) as Result<(), RequestError>
};
rt.block_on(fut).expect("block_on");
assert!(acc
@@ -497,13 +465,14 @@ mod tests {
.unwrap()
.is_some());
// Test that token is in cache (otherwise mock will tell us)
let fut = acc
let fut = async {
let tok = acc
.token(vec!["https://www.googleapis.com/auth/pubsub"])
.and_then(|tok| {
.await?;
assert!(tok.access_token.contains("ya29.c.ElouBywiys0Ly"));
assert_eq!(Some(3600), tok.expires_in);
Ok(())
});
Ok(()) as Result<(), RequestError>
};
rt.block_on(fut).expect("block_on 2");
_m.assert();
@@ -515,19 +484,20 @@ mod tests {
.with_header("content-type", "text/json")
.with_body(bad_json_response)
.create();
let mut acc = ServiceAccountAccess::new(key.clone())
let acc = ServiceAccountAccess::new(key.clone())
.hyper_client(client.clone())
.build();
let fut = acc
let fut = async {
let result = acc
.token(vec!["https://www.googleapis.com/auth/pubsub"])
.then(|result| {
.await;
assert!(result.is_err());
Ok(()) as Result<(), ()>
});
};
rt.block_on(fut).expect("block_on");
_m.assert();
}
rt.shutdown_on_idle().wait().expect("shutdown");
rt.shutdown_on_idle();
}
// Valid but deactivated key.
@@ -538,17 +508,21 @@ mod tests {
#[allow(dead_code)]
fn test_service_account_e2e() {
let key = service_account_key_from_file(&TEST_PRIVATE_KEY_PATH.to_string()).unwrap();
let https = HttpsConnector::new(4);
let runtime = tokio::runtime::Runtime::new().unwrap();
let client = hyper::Client::builder()
.executor(runtime.executor())
.build(https);
let mut acc = ServiceAccountAccess::new(key).hyper_client(client).build();
let https = HttpsConnector::new();
let client = hyper::Client::builder().build(https);
let acc = ServiceAccountAccess::new(key).hyper_client(client).build();
let rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
.unwrap();
rt.block_on(async {
println!(
"{:?}",
acc.token(vec!["https://www.googleapis.com/auth/pubsub"])
.wait()
.await
);
});
}
#[test]

View File

@@ -10,7 +10,9 @@ use std::fmt;
use std::fs;
use std::hash::{Hash, Hasher};
use std::io;
use std::io::{Read, Write};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use crate::types::Token;
use itertools::Itertools;
@@ -20,13 +22,13 @@ use itertools::Itertools;
/// should be stored or retrieved.
/// For completeness, the underlying, sorted scopes are provided as well. They might be
/// useful for presentation to the user.
pub trait TokenStorage {
pub trait TokenStorage: Send + Sync {
type Error: 'static + Error + Send + Sync;
/// If `token` is None, it is invalid or revoked and should be removed from storage.
/// Otherwise, it should be saved.
fn set(
&mut self,
&self,
scope_hash: u64,
scopes: &Vec<&str>,
token: Option<Token>,
@@ -69,7 +71,7 @@ impl fmt::Display for NullError {
impl TokenStorage for NullStorage {
type Error = NullError;
fn set(&mut self, _: u64, _: &Vec<&str>, _: Option<Token>) -> Result<(), NullError> {
fn set(&self, _: u64, _: &Vec<&str>, _: Option<Token>) -> Result<(), NullError> {
Ok(())
}
fn get(&self, _: u64, _: &Vec<&str>) -> Result<Option<Token>, NullError> {
@@ -80,7 +82,7 @@ impl TokenStorage for NullStorage {
/// A storage that remembers values for one session only.
#[derive(Debug, Default)]
pub struct MemoryStorage {
tokens: Vec<JSONToken>,
tokens: Mutex<Vec<JSONToken>>,
}
impl MemoryStorage {
@@ -93,19 +95,20 @@ impl TokenStorage for MemoryStorage {
type Error = NullError;
fn set(
&mut self,
&self,
scope_hash: u64,
scopes: &Vec<&str>,
token: Option<Token>,
) -> Result<(), NullError> {
let matched = self.tokens.iter().find_position(|x| x.hash == scope_hash);
if let Some(_) = matched {
let mut tokens = self.tokens.lock().expect("poisoned mutex");
let matched = tokens.iter().find_position(|x| x.hash == scope_hash);
if let Some((idx, _)) = matched {
self.tokens.retain(|x| x.hash != scope_hash);
}
match token {
Some(t) => {
self.tokens.push(JSONToken {
tokens.push(JSONToken {
hash: scope_hash,
scopes: Some(scopes.iter().map(|x| x.to_string()).collect()),
token: t.clone(),
@@ -120,7 +123,8 @@ impl TokenStorage for MemoryStorage {
fn get(&self, scope_hash: u64, scopes: &Vec<&str>) -> Result<Option<Token>, NullError> {
let scopes: Vec<_> = scopes.iter().sorted().unique().collect();
for t in &self.tokens {
let tokens = self.tokens.lock().expect("poisoned mutex");
for t in tokens.iter() {
if let Some(token_scopes) = &t.scopes {
let matched = token_scopes
.iter()
@@ -174,59 +178,33 @@ struct JSONTokens {
/// Serializes tokens to a JSON file on disk.
#[derive(Default)]
pub struct DiskTokenStorage {
location: String,
tokens: Vec<JSONToken>,
location: PathBuf,
tokens: Mutex<Vec<JSONToken>>,
}
impl DiskTokenStorage {
pub fn new<S: AsRef<str>>(location: S) -> Result<DiskTokenStorage, io::Error> {
let mut dts = DiskTokenStorage {
location: location.as_ref().to_owned(),
tokens: Vec::new(),
pub fn new<S: Into<PathBuf>>(location: S) -> Result<DiskTokenStorage, io::Error> {
let filename = location.into();
let tokens = match load_from_file(&filename) {
Ok(tokens) => tokens,
Err(e) if e.kind() == io::ErrorKind::NotFound => Vec::new(),
Err(e) => return Err(e),
};
// best-effort
let read_result = dts.load_from_file();
match read_result {
Result::Ok(()) => Result::Ok(dts),
Result::Err(e) => {
match e.kind() {
io::ErrorKind::NotFound => Result::Ok(dts), // File not found; ignore and create new one
_ => Result::Err(e), // e.g. PermissionDenied
}
}
}
Ok(DiskTokenStorage {
location: filename,
tokens: Mutex::new(tokens),
})
}
fn load_from_file(&mut self) -> Result<(), io::Error> {
let mut f = fs::OpenOptions::new().read(true).open(&self.location)?;
let mut contents = String::new();
match f.read_to_string(&mut contents) {
Result::Err(e) => return Result::Err(e),
Result::Ok(_sz) => (),
}
let tokens: JSONTokens;
match serde_json::from_str(&contents) {
Result::Err(e) => return Result::Err(io::Error::new(io::ErrorKind::InvalidData, e)),
Result::Ok(t) => tokens = t,
}
for t in tokens.tokens {
self.tokens.push(t);
}
return Result::Ok(());
}
pub fn dump_to_file(&mut self) -> Result<(), io::Error> {
pub fn dump_to_file(&self) -> Result<(), io::Error> {
let mut jsontokens = JSONTokens { tokens: Vec::new() };
for token in self.tokens.iter() {
{
let tokens = self.tokens.lock().expect("mutex poisoned");
for token in tokens.iter() {
jsontokens.tokens.push((*token).clone());
}
}
let serialized;
@@ -235,6 +213,7 @@ impl DiskTokenStorage {
Result::Ok(s) => serialized = s,
}
// TODO: Write to disk asynchronously so that we don't stall the eventloop if invoked in async context.
let mut f = fs::OpenOptions::new()
.create(true)
.write(true)
@@ -244,23 +223,32 @@ impl DiskTokenStorage {
}
}
fn load_from_file(filename: &Path) -> Result<Vec<JSONToken>, io::Error> {
let contents = std::fs::read_to_string(filename)?;
let container: JSONTokens = serde_json::from_str(&contents)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(container.tokens)
}
impl TokenStorage for DiskTokenStorage {
type Error = io::Error;
fn set(
&mut self,
&self,
scope_hash: u64,
scopes: &Vec<&str>,
token: Option<Token>,
) -> Result<(), Self::Error> {
let matched = self.tokens.iter().find_position(|x| x.hash == scope_hash);
if let Some(_) = matched {
{
let mut tokens = self.tokens.lock().expect("poisoned mutex");
let matched = tokens.iter().find_position(|x| x.hash == scope_hash);
if let Some((idx, _)) = matched {
self.tokens.retain(|x| x.hash != scope_hash);
}
match token {
None => (),
Some(t) => {
self.tokens.push(JSONToken {
tokens.push(JSONToken {
hash: scope_hash,
scopes: Some(scopes.iter().map(|x| x.to_string()).collect()),
token: t.clone(),
@@ -268,12 +256,14 @@ impl TokenStorage for DiskTokenStorage {
()
}
}
}
self.dump_to_file()
}
fn get(&self, scope_hash: u64, scopes: &Vec<&str>) -> Result<Option<Token>, Self::Error> {
let scopes: Vec<_> = scopes.iter().sorted().unique().collect();
for t in &self.tokens {
let tokens = self.tokens.lock().expect("poisoned mutex");
for t in tokens.iter() {
if let Some(token_scopes) = &t.scopes {
let matched = token_scopes
.iter()

View File

@@ -3,6 +3,7 @@ use hyper;
use std::error::Error;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::str::FromStr;
use futures::prelude::*;
@@ -239,16 +240,16 @@ impl FromStr for Scheme {
/// A provider for authorization tokens, yielding tokens valid for a given scope.
/// The `api_key()` method is an alternative in case there are no scopes or
/// if no user is involved.
pub trait GetToken {
fn token<I, T>(
&mut self,
pub trait GetToken: Send + Sync {
fn token<'a, I, T>(
&'a self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
) -> Pin<Box<dyn Future<Output = Result<Token, RequestError>> + Send + 'a>>
where
T: Into<String>,
I: IntoIterator<Item = T>;
fn api_key(&mut self) -> Option<String>;
fn api_key(&self) -> Option<String>;
/// Return an application secret with at least token_uri, client_secret, and client_id filled
/// in. This is used for refreshing tokens without interaction from the flow.