Merge branch 'futures'

This is #94.
This commit is contained in:
Lewin Bormann
2019-06-22 22:13:26 +02:00
19 changed files with 1691 additions and 1063 deletions

View File

@@ -22,7 +22,6 @@ rust:
stages:
- name: test
- name: examples
- name: lint
- name: coverage
@@ -45,13 +44,6 @@ jobs:
- cargo fmt --all -- --check
- cargo clippy --all-targets --all-features -- -D warnings || true
- stage: examples
if: os = linux
rust: stable
script:
- cargo build --manifest-path examples/drive_example/Cargo.toml
- cargo build --manifest-path examples/service_account/Cargo.toml
- stage: coverage
if: os = linux
sudo: true

View File

@@ -1,7 +1,7 @@
[package]
name = "yup-oauth2"
version = "2.0.2"
version = "3.0.0-alpha"
authors = ["Sebastian Thiel <byronimo@gmail.com>", "Lewin Bormann <lbo@spheniscida.de>"]
repository = "https://github.com/dermesser/yup-oauth2"
description = "An oauth2 implementation, providing the 'device', 'service account' and 'installed' authorization flows"
@@ -13,6 +13,7 @@ edition = "2018"
[dependencies]
base64 = "0.10"
chrono = "0.4"
http = "0.1"
hyper = {version = "0.12", default-features = false}
hyper-tls = "0.3"
itertools = "0.8"
@@ -24,9 +25,15 @@ serde_derive = "1.0"
url = "1"
futures = "0.1"
tokio-threadpool = "0.1"
tokio = "0.1"
tokio-timer = "0.2"
[dev-dependencies]
getopts = "0.2"
open = "1.1"
yup-hyper-mock = "3.14"
tokio = "0.1"
mockito = "0.17"
env_logger = "0.6"
[workspace]
members = ["examples/test-installed/", "examples/test-svc-acct/", "examples/test-device/"]

View File

@@ -8,13 +8,17 @@ Status](https://travis-ci.org/dermesser/yup-oauth2.svg)](https://travis-ci.org/d
(However, you're able to use it with raw HTTP requests as well; the flows are implemented as token
sources yielding HTTP Bearer tokens).
The provider we have been testing the code against is also Google. However, the code itself is
generic, and any OAuth provider behaving like Google will work as well. If you find one that
doesn't, please let us know and/or contribute a fix!
### Supported authorization types
* Device flow (user enters code on authorization page)
* Installed application flow (user visits URL, copies code to application, application uses
code to obtain token). Used for services like GMail, Drive, ...
* Service account flow: Non-interactive for server-to-server communication based on public key
cryptography. Used for services like Cloud Pubsub, Cloud Storage, ...
* Service account flow: Non-interactive authorization of server-to-server communication based on
public key cryptography. Used for services like Cloud Pubsub, Cloud Storage, ...
### Usage

View File

@@ -0,0 +1,12 @@
[package]
name = "test-device"
version = "0.1.0"
authors = ["Lewin Bormann <lewin@lewin-bormann.info>"]
edition = "2018"
[dependencies]
yup-oauth2 = { path = "../../" }
hyper = "0.12"
hyper-tls = "0.3"
futures = "0.1"
tokio = "0.1"

View File

@@ -0,0 +1,36 @@
use futures::prelude::*;
use yup_oauth2::{self, Authenticator, GetToken};
use hyper::client::Client;
use hyper_tls::HttpsConnector;
use std::path;
use std::time::Duration;
use tokio;
fn main() {
let creds = yup_oauth2::read_application_secret(path::Path::new("clientsecret.json"))
.expect("clientsecret");
let https = HttpsConnector::new(1).expect("tls");
let client = Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let scopes = &["https://www.googleapis.com/auth/youtube.readonly".to_string()];
let ad = yup_oauth2::DefaultFlowDelegate;
let mut df = yup_oauth2::DeviceFlow::new::<String>(client.clone(), creds, ad, None);
df.set_wait_duration(Duration::from_secs(120));
let mut auth = Authenticator::new_disk(
client,
df,
yup_oauth2::DefaultAuthenticatorDelegate,
"tokenstorage.json",
)
.expect("authenticator");
let mut rt = tokio::runtime::Runtime::new().unwrap();
let fut = auth
.token(scopes.iter())
.and_then(|tok| Ok(println!("{:?}", tok)));
println!("{:?}", rt.block_on(fut));
}

View File

@@ -0,0 +1,12 @@
[package]
name = "test-installed"
version = "0.1.0"
authors = ["Lewin Bormann <lewin@lewin-bormann.info>"]
edition = "2018"
[dependencies]
yup-oauth2 = { path = "../../" }
hyper = "0.12"
hyper-tls = "0.3"
futures = "0.1"
tokio = "0.1"

View File

@@ -0,0 +1,41 @@
use futures::prelude::*;
use yup_oauth2::GetToken;
use yup_oauth2::{Authenticator, InstalledFlow};
use hyper::client::Client;
use hyper_tls::HttpsConnector;
use std::path::Path;
fn main() {
let https = HttpsConnector::new(1).expect("tls");
let client = Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let ad = yup_oauth2::DefaultFlowDelegate;
let secret = yup_oauth2::read_application_secret(Path::new("clientsecret.json"))
.expect("clientsecret.json");
let inf = InstalledFlow::new(
client.clone(),
ad,
secret,
yup_oauth2::InstalledFlowReturnMethod::HTTPRedirect(8081),
);
let mut auth = Authenticator::new_disk(
client,
inf,
yup_oauth2::DefaultAuthenticatorDelegate,
"tokencache.json",
)
.unwrap();
let s = "https://www.googleapis.com/auth/drive.file".to_string();
let scopes = vec![s];
let tok = auth.token(scopes.iter());
let fut = tok.map_err(|e| println!("error: {:?}", e)).and_then(|t| {
println!("The token is {:?}", t);
Ok(())
});
tokio::run(fut)
}

View File

@@ -0,0 +1,12 @@
[package]
name = "test-svc-acct"
version = "0.1.0"
authors = ["Lewin Bormann <lewin@lewin-bormann.info>"]
edition = "2018"
[dependencies]
yup-oauth2 = { path = "../../" }
hyper = "0.12"
hyper-tls = "0.3"
futures = "0.1"
tokio = "0.1"

View File

@@ -0,0 +1,36 @@
use yup_oauth2;
use futures::prelude::*;
use yup_oauth2::GetToken;
use hyper::client::Client;
use hyper_tls::HttpsConnector;
use tokio;
use std::path;
fn main() {
let creds =
yup_oauth2::service_account_key_from_file(path::Path::new("serviceaccount.json")).unwrap();
let https = HttpsConnector::new(1).expect("tls");
let client = Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let mut sa = yup_oauth2::ServiceAccountAccess::new(creds, client);
let fut = sa
.token(["https://www.googleapis.com/auth/pubsub"].iter())
.and_then(|tok| {
println!("token is: {:?}", tok);
Ok(())
});
let fut2 = sa
.token(["https://www.googleapis.com/auth/pubsub"].iter())
.and_then(|tok| {
println!("cached token is {:?} and should be identical", tok);
Ok(())
});
let all = fut.join(fut2).then(|_| Ok(()));
tokio::run(all)
}

View File

@@ -1,382 +1,215 @@
use std::cmp::min;
use std::collections::hash_map::DefaultHasher;
use std::convert::From;
use crate::authenticator_delegate::{AuthenticatorDelegate, Retry};
use crate::refresh::RefreshFlow;
use crate::storage::{hash_scopes, DiskTokenStorage, MemoryStorage, TokenStorage};
use crate::types::{ApplicationSecret, GetToken, RefreshResult, RequestError, Token};
use futures::{future, prelude::*};
use tokio_timer;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::iter::IntoIterator;
use std::thread::sleep;
use std::time::Duration;
use std::io;
use std::sync::{Arc, Mutex};
use crate::authenticator_delegate::{AuthenticatorDelegate, PollError, PollInformation};
use crate::device::{DeviceFlow, GOOGLE_DEVICE_CODE_URL};
use crate::installed::{InstalledFlow, InstalledFlowReturnMethod};
use crate::refresh::{RefreshFlow, RefreshResult};
use crate::storage::TokenStorage;
use crate::types::{ApplicationSecret, FlowType, RequestError, StringError, Token};
use hyper;
/// A generalized authenticator which will keep tokens valid and store them.
/// Authenticator abstracts different `GetToken` implementations behind one type and handles
/// caching received tokens. It's important to use it (instead of the flows directly) because
/// otherwise the user needs to be asked for new authorization every time a token is generated.
///
/// It is the go-to helper to deal with any kind of supported authentication flow,
/// which will be kept valid and usable.
/// `ServiceAccountAccess` does not need (and does not work) with `Authenticator`, given that it
/// does not require interaction and implements its own caching. Use it directly.
///
/// # Device Flow
/// This involves polling the authentication server in the given intervals
/// until there is a definitive result.
///
/// These results will be passed the `DeviceFlowHelperDelegate` implementation to deal with
/// * presenting the user code
/// * inform the user about the progress or errors
/// * abort the operation
///
/// # Usage
/// Please have a look at the library's landing page.
pub struct Authenticator<D, S, C> {
flow_type: FlowType,
delegate: D,
storage: S,
client: hyper::Client<C, hyper::Body>,
secret: ApplicationSecret,
}
/// A provider for authorization tokens, yielding tokens valid for a given scope.
/// The `api_key()` method is an alternative in case there are no scopes or
/// if no user is involved.
pub trait GetToken {
fn token<'b, I, T>(&mut self, scopes: I) -> Result<Token, Box<dyn Error + Send>>
where
T: AsRef<str> + Ord + 'b,
I: IntoIterator<Item = &'b T>;
fn api_key(&mut self) -> Option<String>;
}
impl<'a, D, S, C: 'static> Authenticator<D, S, C>
where
D: AuthenticatorDelegate,
/// NOTE: It is recommended to use a client constructed like this in order to prevent functions
/// like `hyper::run()` from hanging: `let client = hyper::Client::builder().keep_alive(false);`.
/// Due to token requests being rare, this should not result in a too bad performance problem.
pub struct Authenticator<
T: GetToken,
S: TokenStorage,
AD: AuthenticatorDelegate,
C: hyper::client::connect::Connect,
> {
client: hyper::Client<C>,
inner: Arc<Mutex<T>>,
store: Arc<Mutex<S>>,
delegate: AD,
}
impl<T: GetToken, AD: AuthenticatorDelegate, C: hyper::client::connect::Connect>
Authenticator<T, MemoryStorage, AD, C>
{
/// Returns a new `Authenticator` instance
///
/// # Arguments
/// * `secret` - usually obtained from a client secret file produced by the
/// [developer console][dev-con]
/// * `delegate` - Used to further refine the flow of the authentication.
/// * `client` - used for all authentication https requests.
/// * `storage` - used to cache authorization tokens tokens permanently. However,
/// the implementation doesn't have any particular semantic requirement, which
/// is why `NullStorage` and `MemoryStorage` can be used as well.
/// * `flow_type` - the kind of authentication to use to obtain a token for the
/// required scopes. If unset, it will be derived from the secret.
///
/// NOTE: It is recommended to use a client constructed like this in order to prevent functions
/// like `hyper::run()` from hanging: `let client = hyper::Client::builder().keep_alive(false);`.
/// Due to token requests being rare, this should not result in a too bad performance problem.
/// [dev-con]: https://console.developers.google.com
/// Create an Authenticator caching tokens for the duration of this authenticator.
pub fn new(
secret: &ApplicationSecret,
delegate: D,
client: hyper::Client<C, hyper::Body>,
storage: S,
flow_type: Option<FlowType>,
) -> Authenticator<D, S, C> {
client: hyper::Client<C>,
inner: T,
delegate: AD,
) -> Authenticator<T, MemoryStorage, AD, C> {
Authenticator {
flow_type: flow_type.unwrap_or(FlowType::Device(GOOGLE_DEVICE_CODE_URL.to_string())),
delegate: delegate,
storage: storage,
client: client,
secret: secret.clone(),
}
}
fn do_installed_flow(&mut self, scopes: &Vec<&str>) -> Result<Token, Box<dyn Error + Send>> {
let installed_type;
match self.flow_type {
FlowType::InstalledInteractive => {
installed_type = Some(InstalledFlowReturnMethod::Interactive)
}
FlowType::InstalledRedirect(port) => {
installed_type = Some(InstalledFlowReturnMethod::HTTPRedirect(port))
}
_ => installed_type = None,
}
let mut flow = InstalledFlow::new(self.client.clone(), installed_type);
flow.obtain_token(&mut self.delegate, &self.secret, scopes.iter())
}
fn retrieve_device_token(
&mut self,
scopes: &Vec<&str>,
code_url: String,
) -> Result<Token, Box<dyn Error + Send>> {
let mut flow = DeviceFlow::new(self.client.clone(), &self.secret, &code_url);
// PHASE 1: REQUEST CODE
let pi: PollInformation;
loop {
let res = flow.request_code(scopes.iter());
pi = match res {
Err(res_err) => {
match res_err {
RequestError::ClientError(err) => match self.delegate.client_error(&err) {
Retry::Abort | Retry::Skip => {
return Err(Box::new(StringError::from(&err as &dyn Error)));
}
Retry::After(d) => sleep(d),
},
RequestError::HttpError(err) => {
match self.delegate.connection_error(&err) {
Retry::Abort | Retry::Skip => {
return Err(Box::new(StringError::from(&err as &dyn Error)));
}
Retry::After(d) => sleep(d),
}
}
RequestError::InvalidClient
| RequestError::NegativeServerResponse(_, _)
| RequestError::InvalidScope(_) => {
let serr = StringError::from(res_err.to_string());
self.delegate.request_failure(res_err);
return Err(Box::new(serr));
}
};
continue;
}
Ok(pi) => {
self.delegate.present_user_code(&pi);
pi
}
};
break;
}
// PHASE 1: POLL TOKEN
loop {
match flow.poll_token() {
Err(ref poll_err) => {
let pts = poll_err.to_string();
match poll_err {
&&PollError::HttpError(ref err) => match self.delegate.client_error(err) {
Retry::Abort | Retry::Skip => {
return Err(Box::new(StringError::from(err as &dyn Error)));
}
Retry::After(d) => sleep(d),
},
&&PollError::Expired(ref t) => {
self.delegate.expired(t);
return Err(Box::new(StringError::from(pts)));
}
&&PollError::AccessDenied => {
self.delegate.denied();
return Err(Box::new(StringError::from(pts)));
}
}; // end match poll_err
}
Ok(None) => match self.delegate.pending(&pi) {
Retry::Abort | Retry::Skip => {
return Err(Box::new(StringError::new(
"Pending authentication aborted".to_string(),
None,
)));
}
Retry::After(d) => sleep(min(d, pi.interval)),
},
Ok(Some(token)) => return Ok(token),
}
inner: Arc::new(Mutex::new(inner)),
store: Arc::new(Mutex::new(MemoryStorage::new())),
delegate: delegate,
}
}
}
impl<D, S, C: 'static> GetToken for Authenticator<D, S, C>
where
D: AuthenticatorDelegate,
S: TokenStorage,
C: hyper::client::connect::Connect,
impl<T: GetToken, AD: AuthenticatorDelegate, C: hyper::client::connect::Connect>
Authenticator<T, DiskTokenStorage, AD, C>
{
/// Blocks until a token was retrieved from storage, from the server, or until the delegate
/// decided to abort the attempt, or the user decided not to authorize the application.
/// In any failure case, the delegate will be provided with additional information, and
/// the caller will be informed about storage related errors.
/// Otherwise it is guaranteed to be valid for the given scopes.
fn token<'b, I, T>(&mut self, scopes: I) -> Result<Token, Box<dyn Error + Send>>
/// Create an Authenticator using the store at `path`.
pub fn new_disk<P: AsRef<str>>(
client: hyper::Client<C>,
inner: T,
delegate: AD,
token_storage_path: P,
) -> io::Result<Authenticator<T, DiskTokenStorage, AD, C>> {
Ok(Authenticator {
client: client,
inner: Arc::new(Mutex::new(inner)),
store: Arc::new(Mutex::new(DiskTokenStorage::new(token_storage_path)?)),
delegate: delegate,
})
}
}
impl<
GT: 'static + GetToken + Send,
S: 'static + TokenStorage + Send,
AD: 'static + AuthenticatorDelegate + Send,
C: 'static + hyper::client::connect::Connect + Clone + Send,
> GetToken for Authenticator<GT, S, AD, C>
{
/// Returns the API Key of the inner flow.
fn api_key(&mut self) -> Option<String> {
self.inner.lock().unwrap().api_key()
}
/// Returns the application secret of the inner flow.
fn application_secret(&self) -> ApplicationSecret {
self.inner.lock().unwrap().application_secret()
}
fn token<'b, I, T>(
&mut self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
where
T: AsRef<str> + Ord + 'b,
I: IntoIterator<Item = &'b T>,
I: Iterator<Item = &'b T>,
{
let (scope_key, scopes) = {
let mut sv: Vec<&str> = scopes
.into_iter()
.map(|s| s.as_ref())
.collect::<Vec<&str>>();
sv.sort();
let mut sh = DefaultHasher::new();
&sv.hash(&mut sh);
let sv = sv;
(sh.finish(), sv)
};
// Get cached token. Yes, let's do an explicit return
loop {
return match self.storage.get(scope_key, &scopes) {
Ok(Some(mut t)) => {
// t needs refresh ?
if t.expired() {
let mut rf = RefreshFlow::new(self.client.clone());
loop {
match *rf.refresh_token(
self.flow_type.clone(),
&self.secret,
&t.refresh_token,
) {
RefreshResult::Uninitialized => {
panic!("Token flow should never get here");
let (scope_key, scopes) = hash_scopes(scopes);
let store = self.store.clone();
let mut delegate = self.delegate.clone();
let client = self.client.clone();
let appsecret = self.inner.lock().unwrap().application_secret();
let gettoken = self.inner.clone();
let loopfn = move |()| -> Box<
dyn Future<Item = future::Loop<Token, ()>, Error = RequestError> + Send,
> {
// How well does this work with tokio?
match store.lock().unwrap().get(
scope_key.clone(),
&scopes.iter().map(|s| s.as_str()).collect(),
) {
Ok(Some(t)) => {
if !t.expired() {
return Box::new(Ok(future::Loop::Break(t)).into_future());
}
// Implement refresh flow.
let refresh_token = t.refresh_token.clone();
let mut delegate = delegate.clone();
let store = store.clone();
let scopes = scopes.clone();
let refresh_fut = RefreshFlow::refresh_token(
client.clone(),
appsecret.clone(),
refresh_token,
)
.and_then(move |rr| -> Box<dyn Future<Item=future::Loop<Token, ()>, Error=RequestError> + Send> {
match rr {
RefreshResult::Error(ref e) => {
delegate.token_refresh_failed(
format!("{}", e.description().to_string()),
&Some("the request has likely timed out".to_string()),
);
Box::new(Err(RequestError::Refresh(rr)).into_future())
}
RefreshResult::Error(ref err) => {
match self.delegate.client_error(err) {
Retry::Abort | Retry::Skip => {
return Err(Box::new(StringError::new(
err.description().to_string(),
None,
)));
RefreshResult::RefreshError(ref s, ref ss) => {
delegate.token_refresh_failed(
format!("{} {}", s, ss.clone().map(|s| format!("({})", s)).unwrap_or("".to_string())),
&Some("the refresh token is likely invalid and your authorization has been revoked".to_string()),
);
Box::new(Err(RequestError::Refresh(rr)).into_future())
}
RefreshResult::Success(t) => {
if let Err(e) = store.lock().unwrap().set(scope_key, &scopes.iter().map(|s| s.as_str()).collect(), Some(t.clone())) {
match delegate.token_storage_failure(true, &e) {
Retry::Skip => Box::new(Ok(future::Loop::Break(t)).into_future()),
Retry::Abort => Box::new(Err(RequestError::Cache(Box::new(e))).into_future()),
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(|_| Ok(future::Loop::Continue(()))),
)
as Box<
dyn Future<
Item = future::Loop<Token, ()>,
Error = RequestError> + Send>,
}
Retry::After(d) => sleep(d),
} else {
Box::new(Ok(future::Loop::Break(t)).into_future())
}
}
RefreshResult::RefreshError(ref err_str, ref err_description) => {
self.delegate.token_refresh_failed(err_str, err_description);
let storage_err =
match self.storage.set(scope_key, &scopes, None) {
Ok(_) => String::new(),
Err(err) => err.to_string(),
};
return Err(Box::new(StringError::new(
&(storage_err + err_str),
err_description.as_ref(),
)));
}
RefreshResult::Success(ref new_t) => {
t = new_t.clone();
loop {
if let Err(err) =
self.storage.set(scope_key, &scopes, Some(t.clone()))
{
match self.delegate.token_storage_failure(true, &err) {
Retry::Skip => break,
Retry::Abort => return Err(Box::new(err)),
Retry::After(d) => {
sleep(d);
continue;
}
}
}
break; // .set()
}
break; // refresh_token loop
}
} // RefreshResult handling
} // refresh loop
} // handle expiration
Ok(t)
},
}
});
Box::new(refresh_fut)
}
Ok(None) => {
// Nothing was in storage - get a new token
// get new token. The respective sub-routine will do all the logic.
match match self.flow_type.clone() {
FlowType::Device(url) => self.retrieve_device_token(&scopes, url),
FlowType::InstalledInteractive => self.do_installed_flow(&scopes),
FlowType::InstalledRedirect(_) => self.do_installed_flow(&scopes),
} {
Ok(token) => {
loop {
if let Err(err) =
self.storage.set(scope_key, &scopes, Some(token.clone()))
{
match self.delegate.token_storage_failure(true, &err) {
Retry::Skip => break,
Retry::Abort => return Err(Box::new(err)),
Retry::After(d) => {
sleep(d);
continue;
let store = store.clone();
let scopes = scopes.clone();
let mut delegate = delegate.clone();
Box::new(
gettoken
.lock()
.unwrap()
.token(scopes.iter())
.and_then(move |t| {
if let Err(e) = store.lock().unwrap().set(
scope_key,
&scopes.iter().map(|s| s.as_str()).collect(),
Some(t.clone()),
) {
match delegate.token_storage_failure(true, &e) {
Retry::Skip => {
Box::new(Ok(future::Loop::Break(t)).into_future())
}
Retry::Abort => Box::new(
Err(RequestError::Cache(Box::new(e))).into_future(),
),
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(|_| Ok(future::Loop::Continue(()))),
)
as Box<
dyn Future<
Item = future::Loop<Token, ()>,
Error = RequestError,
> + Send,
>,
}
} else {
Box::new(Ok(future::Loop::Break(t)).into_future())
}
break;
} // end attempt to save
Ok(token)
}
Err(err) => Err(err),
} // end match token retrieve result
}),
)
}
Err(err) => match self.delegate.token_storage_failure(false, &err) {
Retry::Abort | Retry::Skip => Err(Box::new(err)),
Err(err) => match delegate.token_storage_failure(false, &err) {
Retry::Abort | Retry::Skip => {
return Box::new(Err(RequestError::Cache(Box::new(err))).into_future())
}
Retry::After(d) => {
sleep(d);
continue;
return Box::new(
tokio_timer::sleep(d).then(|_| Ok(future::Loop::Continue(()))),
)
}
},
}; // end match
} // end loop
}
fn api_key(&mut self) -> Option<String> {
if self.secret.client_id.len() == 0 {
return None;
}
Some(self.secret.client_id.clone())
}
}
/// A utility type to indicate how operations DeviceFlowHelper operations should be retried
pub enum Retry {
/// Signal you don't want to retry
Abort,
/// Signals you want to retry after the given duration
After(Duration),
/// Instruct the caller to attempt to keep going, or choose an alternate path.
/// If this is not supported, it will have the same effect as `Abort`
Skip,
}
#[cfg(test)]
mod tests {
use super::super::device::tests::MockGoogleAuth;
use super::super::types::tests::SECRET;
use super::super::types::ConsoleApplicationSecret;
use super::*;
use crate::authenticator_delegate::DefaultAuthenticatorDelegate;
use crate::storage::MemoryStorage;
use hyper;
use std::default::Default;
#[test]
fn test_flow() {
use serde_json as json;
let runtime = tokio::runtime::Runtime::new().unwrap();
let secret = json::from_str::<ConsoleApplicationSecret>(SECRET)
.unwrap()
.installed
.unwrap();
let client = hyper::Client::builder()
.executor(runtime.executor())
.build(MockGoogleAuth::default());
let res = Authenticator::new(
&secret,
DefaultAuthenticatorDelegate,
client,
<MemoryStorage as Default>::default(),
None,
)
.token(&["https://www.googleapis.com/auth/youtube.upload"]);
match res {
Ok(t) => assert_eq!(t.access_token, "1/fFAGRNJru1FTz70BzhT3Zg"),
Err(err) => panic!("Expected to retrieve token in one go: {}", err),
}
}
};
Box::new(future::loop_fn((), loopfn))
}
}

View File

@@ -4,12 +4,25 @@ use std::error::Error;
use std::fmt;
use std::io;
use crate::authenticator::Retry;
use crate::types::RequestError;
use crate::types::{PollError, RequestError};
use chrono::{DateTime, Local, Utc};
use std::time::Duration;
use futures::{future, prelude::*};
use tokio::io as tio;
/// A utility type to indicate how operations DeviceFlowHelper operations should be retried
pub enum Retry {
/// Signal you don't want to retry
Abort,
/// Signals you want to retry after the given duration
After(Duration),
/// Instruct the caller to attempt to keep going, or choose an alternate path.
/// If this is not supported, it will have the same effect as `Abort`
Skip,
}
/// Contains state of pending authentication requests
#[derive(Clone, Debug, PartialEq)]
pub struct PollInformation {
@@ -32,23 +45,23 @@ impl fmt::Display for PollInformation {
}
}
/// Encapsulates all possible results of a `poll_token(...)` operation
#[derive(Debug)]
pub enum PollError {
/// Connection failure - retry if you think it's worth it
HttpError(hyper::Error),
/// indicates we are expired, including the expiration date
Expired(DateTime<Utc>),
/// Indicates that the user declined access. String is server response
AccessDenied,
}
impl fmt::Display for PollError {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
PollError::HttpError(ref err) => err.fmt(f),
PollError::Expired(ref date) => writeln!(f, "Authentication expired at {}", date),
PollError::AccessDenied => "Access denied by user".fmt(f),
PollError::TimedOut => "Timed out waiting for token".fmt(f),
PollError::Other(ref s) => format!("Unknown server error: {}", s).fmt(f),
}
}
}
impl Error for PollError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match *self {
PollError::HttpError(ref e) => Some(e),
_ => None,
}
}
}
@@ -57,7 +70,7 @@ impl fmt::Display for PollError {
///
/// The only method that needs to be implemented manually is `present_user_code(...)`,
/// as no assumptions are made on how this presentation should happen.
pub trait AuthenticatorDelegate {
pub trait AuthenticatorDelegate: Clone {
/// Called whenever there is an client, usually if there are network problems.
///
/// Return retry information.
@@ -65,13 +78,6 @@ pub trait AuthenticatorDelegate {
Retry::Abort
}
/// Called whenever there is an HttpError, usually if there are network problems.
///
/// Return retry information.
fn connection_error(&mut self, _: &hyper::http::Error) -> Retry {
Retry::Abort
}
/// Called whenever we failed to retrieve a token or set a token due to a storage error.
/// You may use it to either ignore the incident or retry.
/// This can be useful if the underlying `TokenStorage` may fail occasionally.
@@ -85,15 +91,6 @@ pub trait AuthenticatorDelegate {
/// The server denied the attempt to obtain a request code
fn request_failure(&mut self, _: RequestError) {}
/// Called if the request code is expired. You will have to start over in this case.
/// This will be the last call the delegate receives.
/// Given `DateTime` is the expiration date
fn expired(&mut self, _: &DateTime<Utc>) {}
/// Called if the user denied access. You would have to start over.
/// This will be the last call the delegate receives.
fn denied(&mut self) {}
/// Called if we could not acquire a refresh token for a reason possibly specified
/// by the server.
/// This call is made for the delegate's information only.
@@ -109,6 +106,19 @@ pub trait AuthenticatorDelegate {
let _ = error_description;
}
}
}
/// FlowDelegate methods are called when an OAuth flow needs to ask the application what to do in
/// certain cases.
pub trait FlowDelegate: Clone {
/// Called if the request code is expired. You will have to start over in this case.
/// This will be the last call the delegate receives.
/// Given `DateTime` is the expiration date
fn expired(&mut self, _: &DateTime<Utc>) {}
/// Called if the user denied access. You would have to start over.
/// This will be the last call the delegate receives.
fn denied(&mut self) {}
/// Called as long as we are waiting for the user to authorize us.
/// Can be used to print progress information, or decide to time-out.
@@ -125,7 +135,6 @@ pub trait AuthenticatorDelegate {
fn redirect_uri(&self) -> Option<String> {
None
}
/// The server has returned a `user_code` which must be shown to the user,
/// along with the `verification_url`.
/// # Notes
@@ -151,7 +160,7 @@ pub trait AuthenticatorDelegate {
&mut self,
url: S,
need_code: bool,
) -> Option<String> {
) -> Box<dyn Future<Item = Option<String>, Error = Box<dyn Error + Send>> + Send> {
if need_code {
println!(
"Please direct your browser to {}, follow the instructions and enter the \
@@ -159,24 +168,33 @@ pub trait AuthenticatorDelegate {
url
);
let mut code = String::new();
io::stdin().read_line(&mut code).ok().map(|_| {
// Remove newline
code.pop();
code
})
Box::new(
tio::lines(io::BufReader::new(tio::stdin()))
.into_future()
.map_err(|(e, _)| {
println!("{:?}", e);
Box::new(e) as Box<dyn Error + Send>
})
.and_then(|(l, _)| Ok(l)),
)
} else {
println!(
"Please direct your browser to {} and follow the instructions displayed \
there.",
url
);
None
Box::new(future::ok(None))
}
}
}
/// Uses all default implementations by AuthenticatorDelegate, and makes the trait's
/// implementation usable in the first place.
#[derive(Clone)]
pub struct DefaultAuthenticatorDelegate;
impl AuthenticatorDelegate for DefaultAuthenticatorDelegate {}
/// Uses all default implementations in the FlowDelegate trait.
#[derive(Clone)]
pub struct DefaultFlowDelegate;
impl FlowDelegate for DefaultFlowDelegate {}

View File

@@ -1,71 +1,167 @@
use std::default::Default;
use std::iter::IntoIterator;
use std::iter::{FromIterator, IntoIterator};
use std::time::Duration;
use chrono::{self, Utc};
use futures::stream::Stream;
use futures::Future;
use futures::{future, prelude::*};
use http;
use hyper;
use hyper::header;
use itertools::Itertools;
use serde_json as json;
use tokio_timer;
use url::form_urlencoded;
use crate::authenticator_delegate::{PollError, PollInformation};
use crate::types::{ApplicationSecret, Flow, FlowType, JsonError, RequestError, Token};
use crate::authenticator_delegate::{FlowDelegate, PollInformation, Retry};
use crate::types::{
ApplicationSecret, Flow, FlowType, GetToken, JsonError, PollError, RequestError, Token,
};
pub const GOOGLE_DEVICE_CODE_URL: &'static str = "https://accounts.google.com/o/oauth2/device/code";
/// Encapsulates all possible states of the Device Flow
enum DeviceFlowState {
/// We failed to poll a result
Error,
/// We received poll information and will periodically poll for a token
Pending(PollInformation),
/// The flow finished successfully, providing token information
Success(Token),
}
/// Implements the [Oauth2 Device Flow](https://developers.google.com/youtube/v3/guides/authentication#devices)
/// It operates in two steps:
/// * obtain a code to show to the user
/// * (repeatedly) poll for the user to authenticate your application
pub struct DeviceFlow<C> {
pub struct DeviceFlow<FD, C> {
client: hyper::Client<C, hyper::Body>,
device_code: String,
state: Option<DeviceFlowState>,
error: Option<PollError>,
application_secret: ApplicationSecret,
/// Usually GOOGLE_DEVICE_CODE_URL
device_code_url: String,
fd: FD,
wait: Duration,
}
impl<C> Flow for DeviceFlow<C> {
impl<FD, C> Flow for DeviceFlow<FD, C> {
fn type_id() -> FlowType {
FlowType::Device(String::new())
}
}
impl<C> DeviceFlow<C>
impl<
FD: FlowDelegate + Clone + Send + 'static,
C: hyper::client::connect::Connect + Sync + 'static,
> GetToken for DeviceFlow<FD, C>
{
fn token<'b, I, T>(
&mut self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
where
T: AsRef<str> + Ord + 'b,
I: Iterator<Item = &'b T>,
{
self.retrieve_device_token(Vec::from_iter(scopes.map(|s| s.as_ref().to_string())))
}
fn api_key(&mut self) -> Option<String> {
None
}
fn application_secret(&self) -> ApplicationSecret {
self.application_secret.clone()
}
}
impl<FD, C> DeviceFlow<FD, C>
where
C: hyper::client::connect::Connect + Sync + 'static,
C::Transport: 'static,
C::Future: 'static,
FD: FlowDelegate + Clone + Send + 'static,
{
pub fn new<S: AsRef<str>>(
pub fn new<S: 'static + AsRef<str>>(
client: hyper::Client<C, hyper::Body>,
secret: &ApplicationSecret,
device_code_url: S,
) -> DeviceFlow<C> {
secret: ApplicationSecret,
fd: FD,
device_code_url: Option<S>,
) -> DeviceFlow<FD, C> {
DeviceFlow {
client: client,
device_code: Default::default(),
application_secret: secret.clone(),
device_code_url: device_code_url.as_ref().to_string(),
state: None,
error: None,
application_secret: secret,
device_code_url: device_code_url
.as_ref()
.map(|s| s.as_ref().to_string())
.unwrap_or(GOOGLE_DEVICE_CODE_URL.to_string()),
fd: fd,
wait: Duration::from_secs(1200),
}
}
/// Set the time to wait for the user to authorize us. The default is 120 seconds.
pub fn set_wait_duration(&mut self, wait: Duration) {
self.wait = wait;
}
/// Essentially what `GetToken::token` does: Retrieve a token for the given scopes without
/// caching.
pub fn retrieve_device_token<'a>(
&mut self,
scopes: Vec<String>,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send> {
let application_secret = self.application_secret.clone();
let client = self.client.clone();
let wait = self.wait;
let mut fd = self.fd.clone();
let request_code = Self::request_code(
application_secret.clone(),
client.clone(),
self.device_code_url.clone(),
scopes,
)
.and_then(move |(pollinf, device_code)| {
fd.present_user_code(&pollinf);
Ok((pollinf, device_code))
});
let fd = self.fd.clone();
Box::new(request_code.and_then(move |(pollinf, device_code)| {
future::loop_fn(0, move |i| {
// Make a copy of everything every time, because the loop function needs to be
// repeatable, i.e. we can't move anything out.
let pt = Self::poll_token(
application_secret.clone(),
client.clone(),
device_code.clone(),
pollinf.clone(),
fd.clone(),
);
let maxn = wait.as_secs() / pollinf.interval.as_secs();
let mut fd = fd.clone();
let pollinf = pollinf.clone();
tokio_timer::sleep(pollinf.interval)
.then(|_| pt)
.then(move |r| match r {
Ok(None) if i < maxn => match fd.pending(&pollinf) {
Retry::Abort | Retry::Skip => {
Box::new(Err(RequestError::Poll(PollError::TimedOut)).into_future())
}
Retry::After(d) => Box::new(
tokio_timer::sleep(d)
.then(move |_| Ok(future::Loop::Continue(i + 1))),
)
as Box<
dyn Future<
Item = future::Loop<Token, u64>,
Error = RequestError,
> + Send,
>,
},
Ok(Some(tok)) => Box::new(Ok(future::Loop::Break(tok)).into_future()),
Err(e @ PollError::AccessDenied)
| Err(e @ PollError::TimedOut)
| Err(e @ PollError::Expired(_)) => {
Box::new(Err(RequestError::Poll(e)).into_future())
}
Err(_) if i < maxn => {
Box::new(Ok(future::Loop::Continue(i + 1)).into_future())
}
// Too many attempts.
Ok(None) | Err(_) => {
Box::new(Err(RequestError::Poll(PollError::TimedOut)).into_future())
}
})
})
}))
}
/// The first step involves asking the server for a code that the user
/// can type into a field at a specified URL. It is called only once, assuming
/// there was no connection error. Otherwise, it may be called again until
@@ -81,26 +177,22 @@ where
/// * If called after a successful result was returned at least once.
/// # Examples
/// See test-cases in source code for a more complete example.
pub fn request_code<'b, T, I>(&mut self, scopes: I) -> Result<PollInformation, RequestError>
where
T: AsRef<str> + 'b,
I: IntoIterator<Item = &'b T>,
{
if self.state.is_some() {
panic!("Must not be called after we have obtained a token and have no error");
}
fn request_code(
application_secret: ApplicationSecret,
client: hyper::Client<C>,
device_code_url: String,
scopes: Vec<String>,
) -> impl Future<Item = (PollInformation, String), Error = RequestError> {
// note: cloned() shouldn't be needed, see issue
// https://github.com/servo/rust-url/issues/81
let req = form_urlencoded::Serializer::new(String::new())
.extend_pairs(&[
("client_id", &self.application_secret.client_id),
("client_id", application_secret.client_id.clone()),
(
"scope",
&scopes
scopes
.into_iter()
.map(|s| s.as_ref())
.intersperse(" ")
.intersperse(" ".to_string())
.collect::<String>(),
),
])
@@ -108,54 +200,60 @@ where
// note: works around bug in rustlang
// https://github.com/rust-lang/rust/issues/22252
let request = hyper::Request::post(&self.device_code_url)
let request = hyper::Request::post(device_code_url)
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(req))?;
.body(hyper::Body::from(req))
.into_future();
request
.then(
move |request: Result<hyper::Request<hyper::Body>, http::Error>| {
let request = request.unwrap();
client.request(request)
},
)
.then(
|r: Result<hyper::Response<hyper::Body>, hyper::error::Error>| {
match r {
Err(err) => {
return Err(RequestError::ClientError(err));
}
Ok(res) => {
#[derive(Deserialize)]
struct JsonData {
device_code: String,
user_code: String,
verification_url: String,
expires_in: i64,
interval: i64,
}
// TODO: move the ? on request
let ret = match self.client.request(request).wait() {
Err(err) => {
return Err(RequestError::ClientError(err)); // TODO: failed here
}
Ok(res) => {
#[derive(Deserialize)]
struct JsonData {
device_code: String,
user_code: String,
verification_url: String,
expires_in: i64,
interval: i64,
}
let json_str: String = res
.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap(); // TODO: error handling
let json_str: String = res
.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap(); // TODO: error handling
// check for error
match json::from_str::<JsonError>(&json_str) {
Err(_) => {} // ignore, move on
Ok(res) => return Err(RequestError::from(res)),
}
// check for error
match json::from_str::<JsonError>(&json_str) {
Err(_) => {} // ignore, move on
Ok(res) => return Err(RequestError::from(res)),
}
let decoded: JsonData = json::from_str(&json_str).unwrap();
let decoded: JsonData = json::from_str(&json_str).unwrap();
self.device_code = decoded.device_code;
let pi = PollInformation {
user_code: decoded.user_code,
verification_url: decoded.verification_url,
expires_at: Utc::now() + chrono::Duration::seconds(decoded.expires_in),
interval: Duration::from_secs(i64::abs(decoded.interval) as u64),
};
self.state = Some(DeviceFlowState::Pending(pi.clone()));
Ok(pi)
}
};
ret
let pi = PollInformation {
user_code: decoded.user_code,
verification_url: decoded.verification_url,
expires_at: Utc::now()
+ chrono::Duration::seconds(decoded.expires_in),
interval: Duration::from_secs(i64::abs(decoded.interval) as u64),
};
Ok((pi, decoded.device_code))
}
}
},
)
}
/// If the first call is successful, this method may be called.
@@ -168,150 +266,216 @@ where
///
/// Do not call after `PollError::Expired|PollError::AccessDenied` was among the
/// `Err(PollError)` variants as the flow will not do anything anymore.
/// Thus in any unsuccessful case which is not `PollError::HttpError`, you will have to start /// over the entire flow, which requires a new instance of this type.
/// Thus in any unsuccessful case which is not `PollError::HttpError`, you will have to start
/// over the entire flow, which requires a new instance of this type.
///
/// > ⚠️ **Warning**: We assume the caller doesn't call faster than `interval` and are not
/// > protected against this kind of mis-use.
///
/// # Examples
/// See test-cases in source code for a more complete example.
pub fn poll_token(&mut self) -> Result<Option<Token>, &PollError> {
// clone, as we may re-assign our state later
let pi = match self.state {
Some(ref s) => match *s {
DeviceFlowState::Pending(ref pi) => pi.clone(),
DeviceFlowState::Error => return Err(self.error.as_ref().unwrap()),
DeviceFlowState::Success(ref t) => return Ok(Some(t.clone())),
},
_ => panic!("You have to call request_code() beforehand"),
fn poll_token<'a>(
application_secret: ApplicationSecret,
client: hyper::Client<C>,
device_code: String,
pi: PollInformation,
mut fd: FD,
) -> impl Future<Item = Option<Token>, Error = PollError> {
let expired = if pi.expires_at <= Utc::now() {
fd.expired(&pi.expires_at);
Err(PollError::Expired(pi.expires_at)).into_future()
} else {
Ok(()).into_future()
};
if pi.expires_at <= Utc::now() {
self.error = Some(PollError::Expired(pi.expires_at));
self.state = Some(DeviceFlowState::Error);
return Err(&self.error.as_ref().unwrap());
}
// We should be ready for a new request
let req = form_urlencoded::Serializer::new(String::new())
.extend_pairs(&[
("client_id", &self.application_secret.client_id[..]),
("client_secret", &self.application_secret.client_secret),
("code", &self.device_code),
("client_id", &application_secret.client_id[..]),
("client_secret", &application_secret.client_secret),
("code", &device_code),
("grant_type", "http://oauth.net/grant_type/device/1.0"),
])
.finish();
let request = hyper::Request::post(&self.application_secret.token_uri)
let request = hyper::Request::post(&application_secret.token_uri)
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(req))
.unwrap(); // TODO: Error checking
let json_str: String = match self.client.request(request).wait() {
Err(err) => {
self.error = Some(PollError::HttpError(err));
return Err(self.error.as_ref().unwrap());
}
Ok(res) => {
expired
.and_then(move |_| client.request(request).map_err(|e| PollError::HttpError(e)))
.map(|res| {
res.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap() // TODO: error handling
}
};
})
.and_then(move |json_str: String| {
#[derive(Deserialize)]
struct JsonError {
error: String,
}
#[derive(Deserialize)]
struct JsonError {
error: String,
}
match json::from_str::<JsonError>(&json_str) {
Err(_) => {} // ignore, move on, it's not an error
Ok(res) => {
match res.error.as_ref() {
"access_denied" => {
self.error = Some(PollError::AccessDenied);
self.state = Some(DeviceFlowState::Error);
return Err(self.error.as_ref().unwrap());
match json::from_str::<JsonError>(&json_str) {
Err(_) => {} // ignore, move on, it's not an error
Ok(res) => {
match res.error.as_ref() {
"access_denied" => {
fd.denied();
return Err(PollError::AccessDenied);
}
"authorization_pending" => return Ok(None),
s => {
return Err(PollError::Other(format!(
"server message '{}' not understood",
s
)))
}
};
}
"authorization_pending" => return Ok(None),
_ => panic!("server message '{}' not understood", res.error),
};
}
}
}
// yes, we expect that !
let mut t: Token = json::from_str(&json_str).unwrap();
t.set_expiry_absolute();
// yes, we expect that !
let mut t: Token = json::from_str(&json_str).unwrap();
t.set_expiry_absolute();
let res = Ok(Some(t.clone()));
self.state = Some(DeviceFlowState::Success(t));
return res;
Ok(Some(t.clone()))
})
}
}
#[cfg(test)]
pub mod tests {
mod tests {
use hyper;
use hyper_tls::HttpsConnector;
use mockito;
use tokio;
use super::*;
mock_connector_in_order!(MockGoogleAuth {
"HTTP/1.1 200 OK\r\n\
Server: BOGUS\r\n\
\r\n\
{\r\n\
\"device_code\" : \"4/L9fTtLrhY96442SEuf1Rl3KLFg3y\",\r\n\
\"user_code\" : \"a9xfwk9c\",\r\n\
\"verification_url\" : \"http://www.google.com/device\",\r\n\
\"expires_in\" : 1800,\r\n\
\"interval\" : 0\r\n\
}"
"HTTP/1.1 200 OK\r\n\
Server: BOGUS\r\n\
\r\n\
{\r\n\
\"error\" : \"authorization_pending\"\r\n\
}"
"HTTP/1.1 200 OK\r\nServer: \
BOGUS\r\n\r\n{\r\n\"access_token\":\"1/fFAGRNJru1FTz70BzhT3Zg\",\
\r\n\"expires_in\":3920,\r\n\"token_type\":\"Bearer\",\
\r\n\"refresh_token\":\
\"1/6BMfW9j53gdGImsixUH6kU5RsR4zwI9lUVX-tqf8JXQ\"\r\n}"
});
const TEST_APP_SECRET: &'static str = r#"{"installed":{"client_id":"384278056379-tr5pbot1mil66749n639jo54i4840u77.apps.googleusercontent.com","project_id":"sanguine-rhythm-105020","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://accounts.google.com/o/oauth2/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"QeQUnhzsiO4t--ZGmj9muUAu","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#;
use crate::helper::parse_application_secret;
#[test]
fn working_flow() {
use crate::helper::parse_application_secret;
let runtime = tokio::runtime::Runtime::new().unwrap();
let appsecret = parse_application_secret(&TEST_APP_SECRET.to_string()).unwrap();
let client = hyper::Client::builder()
.executor(runtime.executor())
.build(MockGoogleAuth::default());
let mut flow = DeviceFlow::new(client, &appsecret, GOOGLE_DEVICE_CODE_URL);
match flow.request_code(&["https://www.googleapis.com/auth/youtube.upload"]) {
Ok(pi) => assert_eq!(pi.interval, Duration::from_secs(0)),
Err(err) => assert!(false, "request_code failed: {}", err),
}
match flow.poll_token() {
Ok(None) => {}
_ => unreachable!(),
}
let t = match flow.poll_token() {
Ok(Some(t)) => {
assert_eq!(t.access_token, "1/fFAGRNJru1FTz70BzhT3Zg");
t
fn test_device_end2end() {
#[derive(Clone)]
struct FD;
impl FlowDelegate for FD {
fn present_user_code(&mut self, pi: &PollInformation) {
assert_eq!("https://example.com/verify", pi.verification_url);
}
_ => unreachable!(),
};
}
// from now on, all calls will yield the same result
// As our mock has only 3 items, we would panic on this call
assert_eq!(flow.poll_token().unwrap(), Some(t));
let server_url = mockito::server_url();
let app_secret = r#"{"installed":{"client_id":"902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com","project_id":"yup-test-243420","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"iuMPN6Ne1PD7cos29Tk9rlqH","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#;
let mut app_secret = parse_application_secret(app_secret).unwrap();
app_secret.token_uri = format!("{}/token", server_url);
let device_code_url = format!("{}/code", server_url);
let https = HttpsConnector::new(1).expect("tls");
let client = hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let mut flow = DeviceFlow::new(client.clone(), app_secret, FD, Some(device_code_url));
let mut rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
.unwrap();
// Successful path
{
let code_response = r#"{"device_code": "devicecode", "user_code": "usercode", "verification_url": "https://example.com/verify", "expires_in": 1234567, "interval": 1}"#;
let _m = mockito::mock("POST", "/code")
.match_body(mockito::Matcher::Regex(
".*client_id=902216714886.*".to_string(),
))
.with_status(200)
.with_body(code_response)
.create();
let token_response = r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 1234567}"#;
let _m = mockito::mock("POST", "/token")
.match_body(mockito::Matcher::Regex(
".*client_secret=iuMPN6Ne1PD7cos29Tk9rlqH&code=devicecode.*".to_string(),
))
.with_status(200)
.with_body(token_response)
.create();
let fut = flow
.token(vec!["https://www.googleapis.com/scope/1"].iter())
.then(|token| {
let token = token.unwrap();
assert_eq!("accesstoken", token.access_token);
Ok(()) as Result<(), ()>
});
rt.block_on(fut).expect("block_on");
_m.assert();
}
// Code is not delivered.
{
let code_response =
r#"{"error": "invalid_client_id", "error_description": "description"}"#;
let _m = mockito::mock("POST", "/code")
.match_body(mockito::Matcher::Regex(
".*client_id=902216714886.*".to_string(),
))
.with_status(400)
.with_body(code_response)
.create();
let token_response = r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 1234567}"#;
let _m = mockito::mock("POST", "/token")
.match_body(mockito::Matcher::Regex(
".*client_secret=iuMPN6Ne1PD7cos29Tk9rlqH&code=devicecode.*".to_string(),
))
.with_status(200)
.with_body(token_response)
.expect(0) // Never called!
.create();
let fut = flow
.token(vec!["https://www.googleapis.com/scope/1"].iter())
.then(|token| {
assert!(token.is_err());
assert!(format!("{}", token.unwrap_err()).contains("invalid_client_id"));
Ok(()) as Result<(), ()>
});
rt.block_on(fut).expect("block_on");
_m.assert();
}
// Token is not delivered.
{
let code_response = r#"{"device_code": "devicecode", "user_code": "usercode", "verification_url": "https://example.com/verify", "expires_in": 1234567, "interval": 1}"#;
let _m = mockito::mock("POST", "/code")
.match_body(mockito::Matcher::Regex(
".*client_id=902216714886.*".to_string(),
))
.with_status(200)
.with_body(code_response)
.create();
let token_response = r#"{"error": "access_denied"}"#;
let _m = mockito::mock("POST", "/token")
.match_body(mockito::Matcher::Regex(
".*client_secret=iuMPN6Ne1PD7cos29Tk9rlqH&code=devicecode.*".to_string(),
))
.with_status(400)
.with_body(token_response)
.expect(1)
.create();
let fut = flow
.token(vec!["https://www.googleapis.com/scope/1"].iter())
.then(|token| {
assert!(token.is_err());
assert!(format!("{}", token.unwrap_err()).contains("Access denied by user"));
Ok(()) as Result<(), ()>
});
rt.block_on(fut).expect("block_on");
_m.assert();
}
}
}

View File

@@ -3,22 +3,18 @@
// Refer to the project root for licensing information.
//
use std::convert::AsRef;
use std::error::Error;
use std::io;
use std::sync::{Arc, Mutex};
use futures;
use futures::prelude::*;
use futures::stream::Stream;
use futures::sync::oneshot;
use futures::Future;
use hyper;
use hyper::{header, StatusCode, Uri};
use serde_json::error;
use url::form_urlencoded;
use url::percent_encoding::{percent_encode, QUERY_ENCODE_SET};
use crate::authenticator_delegate::AuthenticatorDelegate;
use crate::types::{ApplicationSecret, Token};
use crate::authenticator_delegate::FlowDelegate;
use crate::types::{ApplicationSecret, GetToken, RequestError, Token};
const OOB_REDIRECT_URI: &'static str = "urn:ietf:wg:oauth:2.0:oob";
@@ -47,6 +43,7 @@ where
url.push_str(auth_uri);
vec![
format!("?scope={}", scopes_string),
format!("&access_type=offline"),
format!(
"&redirect_uri={}",
redirect_uri.unwrap_or(OOB_REDIRECT_URI.to_string())
@@ -61,9 +58,37 @@ where
})
}
pub struct InstalledFlow<C> {
client: hyper::Client<C, hyper::Body>,
server: Option<InstalledFlowServer>,
impl<FD: FlowDelegate + 'static + Send + Clone, C: hyper::client::connect::Connect + 'static>
GetToken for InstalledFlow<FD, C>
{
fn token<'b, I, T>(
&mut self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
where
T: AsRef<str> + Ord + 'b,
I: Iterator<Item = &'b T>,
{
Box::new(self.obtain_token(scopes.into_iter().map(|s| s.as_ref().to_string()).collect()))
}
fn api_key(&mut self) -> Option<String> {
None
}
fn application_secret(&self) -> ApplicationSecret {
self.appsecret.clone()
}
}
/// InstalledFlow provides tokens for services that follow the "Installed" OAuth flow. (See
/// https://www.oauth.com/oauth2-servers/authorization/,
/// https://developers.google.com/identity/protocols/OAuth2InstalledApp). You should use it wrapped
/// inside an `Authenticator` to benefit from refreshing tokens and caching previously obtained
/// authorization.
pub struct InstalledFlow<FD: FlowDelegate, C: hyper::client::connect::Connect + 'static> {
method: InstalledFlowReturnMethod,
client: hyper::client::Client<C, hyper::Body>,
fd: FD,
appsecret: ApplicationSecret,
}
/// cf. https://developers.google.com/identity/protocols/OAuth2InstalledApp#choosingredirecturi
@@ -77,153 +102,193 @@ pub enum InstalledFlowReturnMethod {
HTTPRedirect(u16),
}
impl<C: 'static> InstalledFlow<C>
where
C: hyper::client::connect::Connect,
impl<'c, FD: 'static + FlowDelegate + Clone + Send, C: 'c + hyper::client::connect::Connect>
InstalledFlow<FD, C>
{
/// Starts a new Installed App auth flow.
/// If HTTPRedirect is chosen as method and the server can't be started, the flow falls
/// back to Interactive.
pub fn new(
client: hyper::Client<C, hyper::Body>,
method: Option<InstalledFlowReturnMethod>,
) -> InstalledFlow<C> {
let default = InstalledFlow {
client: hyper::client::Client<C, hyper::Body>,
fd: FD,
secret: ApplicationSecret,
method: InstalledFlowReturnMethod,
) -> InstalledFlow<FD, C> {
InstalledFlow {
method: method,
fd: fd,
appsecret: secret,
client: client,
server: None,
};
match method {
None => default,
Some(InstalledFlowReturnMethod::Interactive) => default,
// Start server on localhost to accept auth code.
Some(InstalledFlowReturnMethod::HTTPRedirect(port)) => {
match InstalledFlowServer::new(port) {
Result::Err(_) => default,
Result::Ok(server) => InstalledFlow {
client: default.client,
server: Some(server),
},
}
}
}
}
/// Handles the token request flow; it consists of the following steps:
/// . Obtain a auhorization code with user cooperation or internal redirect.
/// . Obtain a authorization code with user cooperation or internal redirect.
/// . Obtain a token and refresh token using that code.
/// . Return that token
///
/// It's recommended not to use the DefaultAuthenticatorDelegate, but a specialized one.
pub fn obtain_token<'a, AD: AuthenticatorDelegate, S, T>(
/// It's recommended not to use the DefaultFlowDelegate, but a specialized one.
pub fn obtain_token<'a>(
&mut self,
auth_delegate: &mut AD,
scopes: Vec<String>, // Note: I haven't found a better way to give a list of strings here, due to ownership issues with futures.
) -> impl 'a + Future<Item = Token, Error = RequestError> + Send {
let rduri = self.fd.redirect_uri();
// Start server on localhost to accept auth code.
let server = if let InstalledFlowReturnMethod::HTTPRedirect(port) = self.method {
match InstalledFlowServer::new(port) {
Result::Err(e) => Err(RequestError::ClientError(e)),
Result::Ok(server) => Ok(Some(server)),
}
} else {
Ok(None)
};
let port = if let Ok(Some(ref srv)) = server {
Some(srv.port)
} else {
None
};
let client = self.client.clone();
let (appsecclone, appsecclone2) = (self.appsecret.clone(), self.appsecret.clone());
let auth_delegate = self.fd.clone();
server
.into_future()
// First: Obtain authorization code from user.
.and_then(move |server| {
Self::ask_authorization_code(server, auth_delegate, &appsecclone, scopes.iter())
})
// Exchange the authorization code provided by Google/the provider for a refresh and an
// access token.
.and_then(move |authcode| {
let request = Self::request_token(appsecclone2, authcode, rduri, port);
let result = client.request(request);
// Handle result here, it makes ownership tracking easier.
result
.and_then(move |r| {
r.into_body()
.concat2()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) // TODO: error handling
})
.then(|body_or| {
let resp = match body_or {
Err(e) => return Err(RequestError::ClientError(e)),
Ok(s) => s,
};
let token_resp: Result<JSONTokenResponse, serde_json::Error> =
serde_json::from_str(&resp);
match token_resp {
Err(e) => {
return Err(RequestError::JSONError(e));
}
Ok(tok) => {
if tok.error.is_some() {
Err(RequestError::NegativeServerResponse(
tok.error.unwrap(),
tok.error_description,
))
} else {
Ok(tok)
}
}
}
})
})
// Return the combined token.
.and_then(|tokens| {
// Successful response
if tokens.access_token.is_some() {
let mut token = Token {
access_token: tokens.access_token.unwrap(),
refresh_token: tokens.refresh_token.unwrap(),
token_type: tokens.token_type.unwrap(),
expires_in: tokens.expires_in,
expires_in_timestamp: None,
};
token.set_expiry_absolute();
Ok(token)
} else {
Err(RequestError::NegativeServerResponse(
tokens.error.unwrap(),
tokens.error_description,
))
}
})
}
fn ask_authorization_code<'a, S, T>(
server: Option<InstalledFlowServer>,
mut auth_delegate: FD,
appsecret: &ApplicationSecret,
scopes: S,
) -> Result<Token, Box<dyn Error + Send>>
) -> Box<dyn Future<Item = String, Error = RequestError> + Send>
where
T: AsRef<str> + 'a,
S: Iterator<Item = &'a T>,
{
let authcode = self.get_authorization_code(auth_delegate, &appsecret, scopes)?;
let tokens = self.request_token(&appsecret, &authcode, auth_delegate.redirect_uri())?;
// Successful response
if tokens.access_token.is_some() {
let mut token = Token {
access_token: tokens.access_token.unwrap(),
refresh_token: tokens.refresh_token.unwrap(),
token_type: tokens.token_type.unwrap(),
expires_in: tokens.expires_in,
expires_in_timestamp: None,
};
token.set_expiry_absolute();
Result::Ok(token)
} else {
let err = io::Error::new(
io::ErrorKind::Other,
format!(
"Token API error: {} {}",
tokens.error.unwrap_or("<unknown err>".to_string()),
tokens.error_description.unwrap_or("".to_string())
)
.as_str(),
if server.is_none() {
let url = build_authentication_request_url(
&appsecret.auth_uri,
&appsecret.client_id,
scopes,
auth_delegate.redirect_uri(),
);
Result::Err(Box::new(err))
Box::new(
auth_delegate
.present_user_url(&url, true /* need_code */)
.then(|r| {
match r {
Ok(Some(mut code)) => {
// Partial backwards compatibilty in case an implementation adds a new line
// due to previous behaviour.
let ends_with_newline =
code.chars().last().map(|c| c == '\n').unwrap_or(false);
if ends_with_newline {
code.pop();
}
Ok(code)
}
_ => Err(RequestError::UserError("couldn't read code".to_string())),
}
}),
)
} else {
let mut server = server.unwrap();
// The redirect URI must be this very localhost URL, otherwise authorization is refused
// by certain providers.
let url = build_authentication_request_url(
&appsecret.auth_uri,
&appsecret.client_id,
scopes,
auth_delegate
.redirect_uri()
.or_else(|| Some(format!("http://localhost:{}", server.port))),
);
Box::new(
auth_delegate
.present_user_url(&url, false /* need_code */)
.then(move |_| server.block_till_auth())
.map_err(|e| {
RequestError::UserError(format!(
"could not obtain token via redirect: {}",
e
))
}),
)
}
}
/// Obtains an authorization code either interactively or via HTTP redirect (see
/// InstalledFlowReturnMethod).
fn get_authorization_code<'a, AD: AuthenticatorDelegate, S, T>(
&mut self,
auth_delegate: &mut AD,
appsecret: &ApplicationSecret,
scopes: S,
) -> Result<String, Box<dyn Error + Send>>
where
T: AsRef<str> + 'a,
S: Iterator<Item = &'a T>,
{
let server = self.server.take(); // Will shutdown the server if present when goes out of scope
let result: Result<String, Box<dyn Error + Send>> = match server {
None => {
let url = build_authentication_request_url(
&appsecret.auth_uri,
&appsecret.client_id,
scopes,
auth_delegate.redirect_uri(),
);
match auth_delegate.present_user_url(&url, true /* need_code */) {
None => Result::Err(Box::new(io::Error::new(
io::ErrorKind::UnexpectedEof,
"couldn't read code",
))),
Some(mut code) => {
// Partial backwards compatibilty in case an implementation adds a new line
// due to previous behaviour.
let ends_with_newline =
code.chars().last().map(|c| c == '\n').unwrap_or(false);
if ends_with_newline {
code.pop();
}
Result::Ok(code)
}
}
}
Some(mut server) => {
// The redirect URI must be this very localhost URL, otherwise Google refuses
// authorization.
let url = build_authentication_request_url(
&appsecret.auth_uri,
&appsecret.client_id,
scopes,
auth_delegate
.redirect_uri()
.or_else(|| Some(format!("http://localhost:{}", server.port))),
);
auth_delegate.present_user_url(&url, false /* need_code */);
match server.block_till_auth() {
Result::Err(e) => Result::Err(Box::new(e)),
Result::Ok(s) => Result::Ok(s),
}
}
};
result
}
/// Sends the authorization code to the provider in order to obtain access and refresh tokens.
fn request_token(
&self,
appsecret: &ApplicationSecret,
authcode: &str,
fn request_token<'a>(
appsecret: ApplicationSecret,
authcode: String,
custom_redirect_uri: Option<String>,
) -> Result<JSONTokenResponse, Box<dyn Error + Send>> {
let redirect_uri = custom_redirect_uri.unwrap_or_else(|| match &self.server {
port: Option<u16>,
) -> hyper::Request<hyper::Body> {
let redirect_uri = custom_redirect_uri.unwrap_or_else(|| match port {
None => OOB_REDIRECT_URI.to_string(),
Some(server) => format!("http://localhost:{}", server.port),
Some(port) => format!("http://localhost:{}", port),
});
let body = form_urlencoded::Serializer::new(String::new())
@@ -236,35 +301,11 @@ where
])
.finish();
let request = hyper::Request::post(&appsecret.token_uri)
let request = hyper::Request::post(appsecret.token_uri)
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(body))
.unwrap(); // TODO: error check
let result = self.client.request(request).wait();
let resp = match result {
Result::Err(e) => return Result::Err(Box::new(e)),
Result::Ok(res) => {
let result = res
.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()); // TODO: error handling
match result {
Result::Err(e) => return Result::Err(Box::new(e)),
Result::Ok(s) => s,
}
}
};
let token_resp: Result<JSONTokenResponse, error::Error> = serde_json::from_str(&resp);
match token_resp {
Result::Err(e) => return Result::Err(Box::new(e)),
Result::Ok(tok) => Result::Ok(tok) as Result<JSONTokenResponse, Box<dyn Error + Send>>,
}
request
}
}
@@ -287,37 +328,32 @@ struct InstalledFlowServer {
}
impl InstalledFlowServer {
fn new(port: u16) -> Result<InstalledFlowServer, ()> {
let bound_port = hyper::server::conn::AddrIncoming::bind(&([127, 0, 0, 1], port).into());
match bound_port {
Result::Err(_) => Result::Err(()),
Result::Ok(bound_port) => {
let port = bound_port.local_addr().port();
fn new(port: u16) -> Result<InstalledFlowServer, hyper::error::Error> {
let (auth_code_tx, auth_code_rx) = oneshot::channel::<String>();
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (auth_code_tx, auth_code_rx) = oneshot::channel::<String>();
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let threadpool = tokio_threadpool::Builder::new()
.pool_size(1)
.name_prefix("InstalledFlowServer-")
.build();
let service_maker = InstalledFlowServiceMaker::new(auth_code_tx);
let threadpool = tokio_threadpool::Builder::new()
.pool_size(1)
.name_prefix("InstalledFlowServer-")
.build();
let service_maker = InstalledFlowServiceMaker::new(auth_code_tx);
let server = hyper::server::Server::builder(bound_port)
.http1_only(true)
.serve(service_maker)
.with_graceful_shutdown(shutdown_rx)
.map_err(|err| panic!("Failed badly: {}", err)); // TODO: Error handling
let addr = format!("127.0.0.1:{}", port);
let builder = hyper::server::Server::try_bind(&addr.parse().unwrap())?;
let server = builder.http1_only(true).serve(service_maker);
let port = server.local_addr().port();
let server_future = server
.with_graceful_shutdown(shutdown_rx)
.map_err(|err| panic!("Failed badly: {}", err));
threadpool.spawn(server);
threadpool.spawn(server_future);
Result::Ok(InstalledFlowServer {
port,
shutdown_tx: Option::Some(shutdown_tx),
auth_code_rx: Option::Some(auth_code_rx),
threadpool: Option::Some(threadpool),
})
}
}
Result::Ok(InstalledFlowServer {
port: port,
shutdown_tx: Some(shutdown_tx),
auth_code_rx: Some(auth_code_rx),
threadpool: Some(threadpool),
})
}
fn block_till_auth(&mut self) -> Result<String, oneshot::Canceled> {
@@ -466,9 +502,9 @@ impl hyper::service::Service for InstalledFlowService {
impl InstalledFlowService {
fn handle_url(&mut self, url: hyper::Uri) {
// Google redirects to the specified localhost URL, appending the authorization
// The provider redirects to the specified localhost URL, appending the authorization
// code, like this: http://localhost:8080/xyz/?code=4/731fJ3BheyCouCniPufAd280GHNV5Ju35yYcGs
// We take that code and send it to the get_authorization_code() function that
// We take that code and send it to the ask_authorization_code() function that
// waits for it.
for (param, val) in form_urlencoded::parse(url.query().unwrap_or("").as_bytes()) {
if param == "code".to_string() {
@@ -489,13 +525,173 @@ impl InstalledFlowService {
#[cfg(test)]
mod tests {
use std::error::Error;
use std::fmt;
use std::str::FromStr;
use hyper;
use hyper::client::connect::HttpConnector;
use hyper_tls::HttpsConnector;
use mockito::{self, mock};
use tokio;
use super::*;
use crate::authenticator_delegate::FlowDelegate;
use crate::helper::*;
use crate::types::StringError;
#[test]
fn test_end2end() {
#[derive(Clone)]
struct FD(
String,
hyper::Client<HttpsConnector<HttpConnector>, hyper::Body>,
);
impl FlowDelegate for FD {
/// Depending on need_code, return the pre-set code or send the code to the server at
/// the redirect_uri given in the url.
fn present_user_url<S: AsRef<str> + fmt::Display>(
&mut self,
url: S,
need_code: bool,
) -> Box<dyn Future<Item = Option<String>, Error = Box<dyn Error + Send>> + Send>
{
if need_code {
Box::new(Ok(Some(self.0.clone())).into_future())
} else {
// Parse presented url to obtain redirect_uri with location of local
// code-accepting server.
let uri = Uri::from_str(url.as_ref()).unwrap();
let query = uri.query().unwrap();
let parsed = form_urlencoded::parse(query.as_bytes()).into_owned();
let mut rduri = None;
for (k, v) in parsed {
if k == "redirect_uri" {
rduri = Some(v);
break;
}
}
if rduri.is_none() {
return Box::new(
Err(Box::new(StringError::new("no redirect uri!", None))
as Box<dyn Error + Send>)
.into_future(),
);
}
let mut rduri = rduri.unwrap();
rduri.push_str(&format!("?code={}", self.0));
let rduri = Uri::from_str(rduri.as_ref()).unwrap();
// Hit server.
return Box::new(
self.1
.get(rduri)
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)
.map(|_| None),
);
}
}
}
let server_url = mockito::server_url();
let app_secret = r#"{"installed":{"client_id":"902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com","project_id":"yup-test-243420","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"iuMPN6Ne1PD7cos29Tk9rlqH","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#;
let mut app_secret = parse_application_secret(app_secret).unwrap();
app_secret.token_uri = format!("{}/token", server_url);
let https = HttpsConnector::new(1).expect("tls");
let client = hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let fd = FD("authorizationcode".to_string(), client.clone());
let mut inf = InstalledFlow::new(
client.clone(),
fd,
app_secret.clone(),
InstalledFlowReturnMethod::Interactive,
);
let mut rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
.unwrap();
// Successful path.
{
let _m = mock("POST", "/token")
.match_body(mockito::Matcher::Regex(".*code=authorizationcode.*client_id=9022167.*".to_string()))
.with_body(r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 12345678}"#)
.expect(1)
.create();
let fut = inf
.token(vec!["https://googleapis.com/some/scope"].iter())
.and_then(|tok| {
assert_eq!("accesstoken", tok.access_token);
assert_eq!("refreshtoken", tok.refresh_token);
assert_eq!("Bearer", tok.token_type);
Ok(())
});
rt.block_on(fut).expect("block on");
_m.assert();
}
// Successful path with HTTP redirect.
{
let mut inf = InstalledFlow::new(
client.clone(),
FD(
"authorizationcodefromlocalserver".to_string(),
client.clone(),
),
app_secret,
InstalledFlowReturnMethod::HTTPRedirect(8081),
);
let _m = mock("POST", "/token")
.match_body(mockito::Matcher::Regex(".*code=authorizationcodefromlocalserver.*client_id=9022167.*".to_string()))
.with_body(r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 12345678}"#)
.expect(1)
.create();
let fut = inf
.token(vec!["https://googleapis.com/some/scope"].iter())
.and_then(|tok| {
assert_eq!("accesstoken", tok.access_token);
assert_eq!("refreshtoken", tok.refresh_token);
assert_eq!("Bearer", tok.token_type);
Ok(())
});
rt.block_on(fut).expect("block on");
_m.assert();
}
// Error from server.
{
let _m = mock("POST", "/token")
.match_body(mockito::Matcher::Regex(
".*code=authorizationcode.*client_id=9022167.*".to_string(),
))
.with_status(400)
.with_body(r#"{"error": "invalid_code"}"#)
.expect(1)
.create();
let fut = inf
.token(vec!["https://googleapis.com/some/scope"].iter())
.then(|tokr| {
assert!(tokr.is_err());
assert!(format!("{}", tokr.unwrap_err()).contains("invalid_code"));
Ok(()) as Result<(), ()>
});
rt.block_on(fut).expect("block on");
_m.assert();
}
rt.shutdown_on_idle().wait().expect("shutdown");
}
#[test]
fn test_request_url_builder() {
assert_eq!(
"https://accounts.google.\
com/o/oauth2/auth?scope=email%20profile&redirect_uri=urn:ietf:wg:oauth:2.0:\
com/o/oauth2/auth?scope=email%20profile&access_type=offline&redirect_uri=urn:ietf:wg:oauth:2.0:\
oob&response_type=code&client_id=812741506391-h38jh0j4fv0ce1krdkiq0hfvt6n5amr\
f.apps.googleusercontent.com",
build_authentication_request_url(

View File

@@ -34,41 +34,74 @@
//! The returned `Token` is stored permanently in the given token storage in order to
//! authorize future API requests to the same scopes.
//!
//! The following example, which is derived from the (actual and runnable) example in
//! `examples/test-installed/`, shows the basics of using this crate:
//!
//! ```test_harness,no_run
//! #[macro_use]
//! extern crate serde_derive;
//! use futures::prelude::*;
//! use yup_oauth2::GetToken;
//! use yup_oauth2::{Authenticator, InstalledFlow};
//!
//! use yup_oauth2::{Authenticator, DefaultAuthenticatorDelegate, PollInformation, ConsoleApplicationSecret, MemoryStorage, GetToken};
//! use serde_json as json;
//! use std::default::Default;
//! use hyper::Client;
//! use hyper::client::Client;
//! use hyper_tls::HttpsConnector;
//! # const SECRET: &'static str = "{\"installed\":{\"auth_uri\":\"https://accounts.google.com/o/oauth2/auth\",\"client_secret\":\"UqkDJd5RFwnHoiG5x5Rub8SI\",\"token_uri\":\"https://accounts.google.com/o/oauth2/token\",\"client_email\":\"\",\"redirect_uris\":[\"urn:ietf:wg:oauth:2.0:oob\",\"oob\"],\"client_x509_cert_url\":\"\",\"client_id\":\"14070749909-vgip2f1okm7bkvajhi9jugan6126io9v.apps.googleusercontent.com\",\"auth_provider_x509_cert_url\":\"https://www.googleapis.com/oauth2/v1/certs\"}}";
//!
//! # #[test] fn device() {
//! let secret = json::from_str::<ConsoleApplicationSecret>(SECRET).unwrap().installed.unwrap();
//! let res = Authenticator::new(&secret, DefaultAuthenticatorDelegate,
//! Client::builder().build(HttpsConnector::new(4).unwrap()),
//! <MemoryStorage as Default>::default(), None)
//! .token(&["https://www.googleapis.com/auth/youtube.upload"]);
//! match res {
//! Ok(t) => {
//! // now you can use t.access_token to authenticate API calls within your
//! // given scopes. It will not be valid forever, but Authenticator will automatically
//! // refresh the token for you.
//! },
//! Err(err) => println!("Failed to acquire token: {}", err),
//! use std::path::Path;
//!
//! fn main() {
//! // Boilerplate: Set up hyper HTTP client and TLS.
//! let https = HttpsConnector::new(1).expect("tls");
//! let client = Client::builder()
//! .keep_alive(false)
//! .build::<_, hyper::Body>(https);
//!
//! // Read application secret from a file. Sometimes it's easier to compile it directly into
//! // the binary. The clientsecret file contains JSON like `{"installed":{"client_id": ... }}`
//! let secret = yup_oauth2::read_application_secret(Path::new("clientsecret.json"))
//! .expect("clientsecret.json");
//!
//! // There are two types of delegates; FlowDelegate and AuthenticatorDelegate. See the
//! // respective documentation; all you need to know here is that they determine how the user
//! // is asked to visit the OAuth flow URL or how to read back the provided code.
//! let ad = yup_oauth2::DefaultFlowDelegate;
//!
//! // InstalledFlow handles OAuth flows of that type. They are usually the ones where a user
//! // grants access to their personal account (think Google Drive, Github API, etc.).
//! let inf = InstalledFlow::new(
//! client.clone(),
//! ad,
//! secret,
//! yup_oauth2::InstalledFlowReturnMethod::HTTPRedirect(8081),
//! );
//! // You could already use InstalledFlow by itself, but usually you want to cache tokens and
//! // refresh them, rather than ask the user every time to log in again. Authenticator wraps
//! // other flows and handles these.
//! // This type of authenticator caches tokens in a JSON file on disk.
//! let mut auth = Authenticator::new_disk(
//! client,
//! inf,
//! yup_oauth2::DefaultAuthenticatorDelegate,
//! "tokencache.json",
//! )
//! .unwrap();
//! let s = "https://www.googleapis.com/auth/drive.file".to_string();
//! let scopes = vec![s];
//!
//! // token(<scopes>) is the one important function of this crate; it does everything to
//! // obtain a token that can be sent e.g. as Bearer token.
//! let tok = auth.token(scopes.iter());
//! // Finally we print the token.
//! let fut = tok.map_err(|e| println!("error: {:?}", e)).and_then(|t| {
//! println!("The token is {:?}", t);
//! Ok(())
//! });
//!
//! tokio::run(fut)
//! }
//! # }
//! ```
//!
#[macro_use]
extern crate serde_derive;
#[cfg(test)]
#[macro_use]
extern crate yup_hyper_mock as hyper_mock;
mod authenticator;
mod authenticator_delegate;
mod device;
@@ -79,16 +112,17 @@ mod service_account;
mod storage;
mod types;
pub use crate::authenticator::{Authenticator, GetToken, Retry};
pub use crate::authenticator::Authenticator;
pub use crate::authenticator_delegate::{
AuthenticatorDelegate, DefaultAuthenticatorDelegate, PollError, PollInformation,
AuthenticatorDelegate, DefaultAuthenticatorDelegate, DefaultFlowDelegate, FlowDelegate,
PollInformation,
};
pub use crate::device::{DeviceFlow, GOOGLE_DEVICE_CODE_URL};
pub use crate::helper::*;
pub use crate::installed::{InstalledFlow, InstalledFlowReturnMethod};
pub use crate::refresh::{RefreshFlow, RefreshResult};
pub use crate::service_account::*;
pub use crate::storage::{DiskTokenStorage, MemoryStorage, NullStorage, TokenStorage};
pub use crate::types::{
ApplicationSecret, ConsoleApplicationSecret, FlowType, Scheme, Token, TokenType,
ApplicationSecret, ConsoleApplicationSecret, FlowType, GetToken, PollError, RefreshResult,
RequestError, Scheme, Token, TokenType,
};

View File

@@ -1,4 +1,4 @@
use crate::types::{ApplicationSecret, FlowType, JsonError};
use crate::types::{ApplicationSecret, JsonError, RefreshResult, RequestError};
use super::Token;
use chrono::Utc;
@@ -14,34 +14,9 @@ use url::form_urlencoded;
/// Refresh an expired access token, as obtained by any other authentication flow.
/// This flow is useful when your `Token` is expired and allows to obtain a new
/// and valid access token.
pub struct RefreshFlow<C> {
client: hyper::Client<C, hyper::Body>,
result: RefreshResult,
}
/// All possible outcomes of the refresh flow
pub enum RefreshResult {
// Indicates no attempt has been made to refresh yet
Uninitialized,
/// Indicates connection failure
Error(hyper::Error),
/// The server did not answer with a new token, providing the server message
RefreshError(String, Option<String>),
/// The refresh operation finished successfully, providing a new `Token`
Success(Token),
}
impl<C: 'static> RefreshFlow<C>
where
C: hyper::client::connect::Connect,
{
pub fn new(client: hyper::Client<C, hyper::Body>) -> RefreshFlow<C> {
RefreshFlow {
client: client,
result: RefreshResult::Uninitialized,
}
}
pub struct RefreshFlow;
impl RefreshFlow {
/// Attempt to refresh the given token, and obtain a new, valid one.
/// If the `RefreshResult` is `RefreshResult::Error`, you may retry within an interval
/// of your choice. If it is `RefreshResult:RefreshError`, your refresh token is invalid
@@ -56,121 +31,155 @@ where
///
/// # Examples
/// Please see the crate landing page for an example.
pub fn refresh_token(
&mut self,
_flow_type: FlowType,
client_secret: &ApplicationSecret,
refresh_token: &str,
) -> &RefreshResult {
if let RefreshResult::Success(_) = self.result {
return &self.result;
}
pub fn refresh_token<'a, C: 'static + hyper::client::connect::Connect>(
client: hyper::Client<C>,
client_secret: ApplicationSecret,
refresh_token: String,
) -> impl 'a + Future<Item = RefreshResult, Error = RequestError> {
let req = form_urlencoded::Serializer::new(String::new())
.extend_pairs(&[
("client_id", client_secret.client_id.as_ref()),
("client_secret", client_secret.client_secret.as_ref()),
("refresh_token", refresh_token),
("grant_type", "refresh_token"),
("client_id", client_secret.client_id.clone()),
("client_secret", client_secret.client_secret.clone()),
("refresh_token", refresh_token.to_string()),
("grant_type", "refresh_token".to_string()),
])
.finish();
let request = hyper::Request::post(&client_secret.token_uri)
let request = hyper::Request::post(client_secret.token_uri.clone())
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(req))
.unwrap(); // TODO: error handling
let json_str: String = match self.client.request(request).wait() {
Err(err) => {
self.result = RefreshResult::Error(err);
return &self.result;
}
Ok(res) => {
res.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap() // TODO: error handling
}
};
client
.request(request)
.then(|r| {
match r {
Err(err) => return Err(RefreshResult::Error(err)),
Ok(res) => {
Ok(res
.into_body()
.concat2()
.wait()
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap()) // TODO: error handling
}
}
})
.then(move |maybe_json_str: Result<String, RefreshResult>| {
if let Err(e) = maybe_json_str {
return Ok(e);
}
let json_str = maybe_json_str.unwrap();
#[derive(Deserialize)]
struct JsonToken {
access_token: String,
token_type: String,
expires_in: i64,
}
#[derive(Deserialize)]
struct JsonToken {
access_token: String,
token_type: String,
expires_in: i64,
}
match json::from_str::<JsonError>(&json_str) {
Err(_) => {}
Ok(res) => {
return Ok(RefreshResult::RefreshError(
res.error,
res.error_description,
))
}
}
match json::from_str::<JsonError>(&json_str) {
Err(_) => {}
Ok(res) => {
self.result = RefreshResult::RefreshError(res.error, res.error_description);
return &self.result;
}
}
let t: JsonToken = json::from_str(&json_str).unwrap();
self.result = RefreshResult::Success(Token {
access_token: t.access_token,
token_type: t.token_type,
refresh_token: refresh_token.to_string(),
expires_in: None,
expires_in_timestamp: Some(Utc::now().timestamp() + t.expires_in),
});
&self.result
let t: JsonToken = json::from_str(&json_str).unwrap();
Ok(RefreshResult::Success(Token {
access_token: t.access_token,
token_type: t.token_type,
refresh_token: refresh_token.to_string(),
expires_in: None,
expires_in_timestamp: Some(Utc::now().timestamp() + t.expires_in),
}))
})
.map_err(RequestError::Refresh)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::device::GOOGLE_DEVICE_CODE_URL;
use crate::helper::parse_application_secret;
use crate::helper;
mock_connector!(MockGoogleRefresh {
"https://accounts.google.com" =>
"HTTP/1.1 200 OK\r\n\
Server: BOGUS\r\n\
\r\n\
{\r\n\
\"access_token\":\"1/fFAGRNJru1FTz70BzhT3Zg\",\r\n\
\"expires_in\":3920,\r\n\
\"token_type\":\"Bearer\"\r\n\
}"
});
const TEST_APP_SECRET: &'static str = r#"{"installed":{"client_id":"384278056379-tr5pbot1mil66749n639jo54i4840u77.apps.googleusercontent.com","project_id":"sanguine-rhythm-105020","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://accounts.google.com/o/oauth2/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"QeQUnhzsiO4t--ZGmj9muUAu","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#;
use hyper;
use hyper_tls::HttpsConnector;
use mockito;
use tokio;
#[test]
fn refresh_flow() {
let appsecret = parse_application_secret(TEST_APP_SECRET).unwrap();
fn test_refresh_end2end() {
let server_url = mockito::server_url();
let runtime = tokio::runtime::Runtime::new().unwrap();
let app_secret = r#"{"installed":{"client_id":"902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com","project_id":"yup-test-243420","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"iuMPN6Ne1PD7cos29Tk9rlqH","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#;
let mut app_secret = helper::parse_application_secret(app_secret).unwrap();
app_secret.token_uri = format!("{}/token", server_url);
let refresh_token = "my-refresh-token".to_string();
let https = HttpsConnector::new(1).unwrap();
let client = hyper::Client::builder()
.executor(runtime.executor())
.build(MockGoogleRefresh::default());
let mut flow = RefreshFlow::new(client);
let device_flow = FlowType::Device(GOOGLE_DEVICE_CODE_URL.to_string());
.keep_alive(false)
.build::<_, hyper::Body>(https);
match flow.refresh_token(device_flow, &appsecret, "bogus_refresh_token") {
RefreshResult::Success(ref t) => {
assert_eq!(t.access_token, "1/fFAGRNJru1FTz70BzhT3Zg");
assert!(!t.expired());
}
RefreshResult::Error(err) => {
assert!(false, "Refresh flow failed: RefreshResult::Error({})", err);
}
RefreshResult::RefreshError(msg, err) => {
assert!(
false,
"Refresh flow failed: RefreshResult::RefreshError({}, {:?})",
msg, err
);
}
RefreshResult::Uninitialized => {
assert!(false, "Refresh flow failed: RefreshResult::Uninitialized");
}
let mut rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
.unwrap();
// Success
{
let _m = mockito::mock("POST", "/token")
.match_body(
mockito::Matcher::Regex(".*client_id=902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com.*refresh_token=my-refresh-token.*".to_string()))
.with_status(200)
.with_body(r#"{"access_token": "new-access-token", "token_type": "Bearer", "expires_in": 1234567}"#)
.create();
let fut = RefreshFlow::refresh_token(
client.clone(),
app_secret.clone(),
refresh_token.clone(),
)
.then(|rr| {
let rr = rr.unwrap();
match rr {
RefreshResult::Success(tok) => {
assert_eq!("new-access-token", tok.access_token);
assert_eq!("Bearer", tok.token_type);
}
_ => panic!(format!("unexpected RefreshResult {:?}", rr)),
}
Ok(()) as Result<(), ()>
});
rt.block_on(fut).expect("block_on");
_m.assert();
}
// Refresh error.
{
let _m = mockito::mock("POST", "/token")
.match_body(
mockito::Matcher::Regex(".*client_id=902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com.*refresh_token=my-refresh-token.*".to_string()))
.with_status(400)
.with_body(r#"{"error": "invalid_token"}"#)
.create();
let fut = RefreshFlow::refresh_token(client, app_secret, refresh_token).then(|rr| {
let rr = rr.unwrap();
match rr {
RefreshResult::RefreshError(e, None) => {
assert_eq!(e, "invalid_token");
}
_ => panic!(format!("unexpected RefreshResult {:?}", rr)),
}
Ok(())
});
tokio::run(fut);
_m.assert();
}
}
}

View File

@@ -12,17 +12,13 @@
//!
use std::default::Default;
use std::error;
use std::result;
use std::str;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex};
use crate::authenticator::GetToken;
use crate::storage::{hash_scopes, MemoryStorage, TokenStorage};
use crate::types::{StringError, Token};
use crate::types::{ApplicationSecret, GetToken, JsonError, RequestError, StringError, Token};
use futures::stream::Stream;
use futures::Future;
use futures::{future, prelude::*};
use hyper::header;
use url::form_urlencoded;
@@ -42,12 +38,13 @@ use serde_json;
const GRANT_TYPE: &'static str = "urn:ietf:params:oauth:grant-type:jwt-bearer";
const GOOGLE_RS256_HEAD: &'static str = "{\"alg\":\"RS256\",\"typ\":\"JWT\"}";
// Encodes s as Base64
/// Encodes s as Base64
fn encode_base64<T: AsRef<[u8]>>(s: T) -> String {
base64::encode_config(s.as_ref(), base64::URL_SAFE)
}
fn decode_rsa_key(pem_pkcs8: &str) -> Result<PrivateKey, Box<dyn error::Error + Send>> {
/// Decode a PKCS8 formatted RSA key.
fn decode_rsa_key(pem_pkcs8: &str) -> Result<PrivateKey, io::Error> {
let private = pem_pkcs8.to_string().replace("\\n", "\n").into_bytes();
let mut private_reader: &[u8] = private.as_ref();
let private_keys = pemfile::pkcs8_private_keys(&mut private_reader);
@@ -56,16 +53,16 @@ fn decode_rsa_key(pem_pkcs8: &str) -> Result<PrivateKey, Box<dyn error::Error +
if pk.len() > 0 {
Ok(pk[0].clone())
} else {
Err(Box::new(io::Error::new(
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Not enough private keys in PEM",
)))
))
}
} else {
Err(Box::new(io::Error::new(
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Error reading key from PEM",
)))
))
}
}
@@ -89,6 +86,8 @@ pub struct ServiceAccountKey {
pub client_x509_cert_url: Option<String>,
}
/// Permissions requested for a JWT.
/// See https://developers.google.com/identity/protocols/OAuth2ServiceAccount#authorizingrequests.
#[derive(Serialize, Debug)]
struct Claims {
iss: String,
@@ -99,20 +98,31 @@ struct Claims {
scope: String,
}
/// A JSON Web Token ready for signing.
struct JWT {
/// The value of GOOGLE_RS256_HEAD.
header: String,
/// A Claims struct, expressing the set of desired permissions etc.
claims: Claims,
}
impl JWT {
/// Create a new JWT from claims.
fn new(claims: Claims) -> JWT {
JWT {
header: GOOGLE_RS256_HEAD.to_string(),
claims: claims,
}
}
// Encodes the first two parts (header and claims) to base64 and assembles them into a form
// ready to be signed.
/// Set JWT header. Default is `{"alg":"RS256","typ":"JWT"}`.
#[allow(dead_code)]
pub fn set_header(&mut self, head: String) {
self.header = head;
}
/// Encodes the first two parts (header and claims) to base64 and assembles them into a form
/// ready to be signed.
fn encode_claims(&self) -> String {
let mut head = encode_base64(&self.header);
let claims = encode_base64(serde_json::to_string(&self.claims).unwrap());
@@ -122,7 +132,8 @@ impl JWT {
head
}
fn sign(&self, private_key: &str) -> Result<String, Box<dyn error::Error + Send>> {
/// Sign a JWT base string with `private_key`, which is a PKCS8 string.
fn sign(&self, private_key: &str) -> Result<String, io::Error> {
let mut jwt_head = self.encode_claims();
let key = decode_rsa_key(private_key)?;
let signing_key = sign::RSASigningKey::new(&key).map_err(|_| {
@@ -136,11 +147,10 @@ impl JWT {
.ok_or(io::Error::new(
io::ErrorKind::Other,
"Couldn't choose signing scheme",
))
.map_err(|e| Box::new(e) as Box<dyn error::Error + Send>)?;
))?;
let signature = signer
.sign(jwt_head.as_bytes())
.map_err(|e| Box::new(e) as Box<dyn error::Error + Send>)?;
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?;
let signature_b64 = encode_base64(signature);
jwt_head.push_str(".");
@@ -150,6 +160,8 @@ impl JWT {
}
}
/// Set `iss`, `aud`, `exp`, `iat`, `scope` field in the returned `Claims`. `scopes` is an iterator
/// yielding strings with OAuth scopes.
fn init_claims_from_key<'a, I, T>(key: &ServiceAccountKey, scopes: I) -> Claims
where
T: AsRef<str> + 'a,
@@ -175,19 +187,14 @@ where
}
}
/// See "Additional claims" at https://developers.google.com/identity/protocols/OAuth2ServiceAccount
#[allow(dead_code)]
fn set_sub_claim(mut claims: Claims, sub: String) -> Claims {
claims.sub = Some(sub);
claims
}
/// A token source (`GetToken`) yielding OAuth tokens for services that use ServiceAccount authorization.
/// This token source caches token and automatically renews expired ones.
/// This token source caches token and automatically renews expired ones, meaning you do not need
/// (and you also should not) use this with `Authenticator`. Just use it directly.
#[derive(Clone)]
pub struct ServiceAccountAccess<C> {
client: hyper::Client<C, hyper::Body>,
key: ServiceAccountKey,
cache: MemoryStorage,
cache: Arc<Mutex<MemoryStorage>>,
sub: Option<String>,
}
@@ -213,10 +220,7 @@ impl TokenResponse {
}
}
impl<'a, C: 'static> ServiceAccountAccess<C>
where
C: hyper::client::connect::Connect,
{
impl<'a, C: 'static + hyper::client::connect::Connect> ServiceAccountAccess<C> {
/// Returns a new `ServiceAccountAccess` token source.
#[allow(dead_code)]
pub fn new(
@@ -226,11 +230,13 @@ where
ServiceAccountAccess {
client: client,
key: key,
cache: MemoryStorage::default(),
cache: Arc::new(Mutex::new(MemoryStorage::default())),
sub: None,
}
}
/// Set `sub` claim in new `ServiceAccountKey` (see
/// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#authorizingrequests).
pub fn with_sub(
key: ServiceAccountKey,
client: hyper::Client<C, hyper::Body>,
@@ -239,81 +245,73 @@ where
ServiceAccountAccess {
client: client,
key: key,
cache: MemoryStorage::default(),
cache: Arc::new(Mutex::new(MemoryStorage::default())),
sub: Some(sub),
}
}
/// Send a request for a new Bearer token to the OAuth provider.
fn request_token(
&mut self,
scopes: &Vec<&str>,
) -> result::Result<Token, Box<dyn error::Error + Send>> {
let mut claims = init_claims_from_key(&self.key, scopes);
claims.sub = self.sub.clone();
let signed = JWT::new(claims).sign(self.key.private_key.as_ref().unwrap())?;
let body = form_urlencoded::Serializer::new(String::new())
.extend_pairs(vec![
("grant_type".to_string(), GRANT_TYPE.to_string()),
("assertion".to_string(), signed),
])
.finish();
let request = hyper::Request::post(self.key.token_uri.as_ref().unwrap())
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(body))
.unwrap(); // TOOD: error handling
let body = {
let result: Arc<RwLock<Result<hyper::Chunk, hyper::Error>>> =
Arc::new(RwLock::new(Ok(Default::default())));
let ok = Arc::clone(&result);
let err = Arc::clone(&result);
hyper::rt::run(
self.client
.request(request)
.and_then(|response| response.into_body().concat2())
.map(move |body| {
*ok.write().unwrap_or_else(|e| unreachable!("{}", e)) = Ok(body);
()
})
.map_err(move |e| {
*err.write().unwrap_or_else(|e| unreachable!("{}", e)) = Err(e);
()
}),
);
Arc::try_unwrap(result)
.unwrap_or_else(|e| unreachable!("{:?}", e))
.into_inner()
.unwrap_or_else(|e| unreachable!("{}", e))
};
let json_str = body
client: hyper::client::Client<C>,
sub: Option<String>,
key: ServiceAccountKey,
scopes: Vec<String>,
) -> impl Future<Item = Token, Error = RequestError> {
let mut claims = init_claims_from_key(&key, &scopes);
claims.sub = sub.clone();
let signed = JWT::new(claims)
.sign(key.private_key.as_ref().unwrap())
.into_future();
signed
.map_err(RequestError::LowLevelError)
.map(|signed| {
form_urlencoded::Serializer::new(String::new())
.extend_pairs(vec![
("grant_type".to_string(), GRANT_TYPE.to_string()),
("assertion".to_string(), signed),
])
.finish()
})
.map(|rqbody| {
hyper::Request::post(key.token_uri.unwrap())
.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
.body(hyper::Body::from(rqbody))
.unwrap()
})
.and_then(move |request| client.request(request).map_err(RequestError::ClientError))
.and_then(|response| {
response
.into_body()
.concat2()
.map_err(RequestError::ClientError)
})
.map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap())
.unwrap(); // TODO: error handling
let token: Result<TokenResponse, serde_json::error::Error> =
serde_json::from_str(&json_str);
match token {
Err(e) => return Err(Box::new(e)),
Ok(token) => {
if token.access_token.is_none()
|| token.token_type.is_none()
|| token.expires_in.is_none()
{
Err(Box::new(StringError::new(
"Token response lacks fields",
Some(format!("{:?}", token).as_str()),
)))
.and_then(|s| {
if let Ok(jse) = serde_json::from_str::<JsonError>(&s) {
Err(RequestError::NegativeServerResponse(
jse.error,
jse.error_description,
))
} else {
Ok(token.to_oauth_token())
serde_json::from_str(&s).map_err(RequestError::JSONError)
}
}
}
})
.then(|token: Result<TokenResponse, RequestError>| match token {
Err(e) => return Err(e),
Ok(token) => {
if token.access_token.is_none()
|| token.token_type.is_none()
|| token.expires_in.is_none()
{
Err(RequestError::BadServerResponse(format!(
"Token response lacks fields: {:?}",
token
)))
} else {
Ok(token.to_oauth_token())
}
}
})
}
}
@@ -321,27 +319,67 @@ impl<C: 'static> GetToken for ServiceAccountAccess<C>
where
C: hyper::client::connect::Connect,
{
fn token<'b, I, T>(&mut self, scopes: I) -> Result<Token, Box<dyn error::Error + Send>>
fn token<'b, I, T>(
&mut self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
where
T: AsRef<str> + Ord + 'b,
I: IntoIterator<Item = &'b T>,
I: Iterator<Item = &'b T>,
{
let (hash, scps) = hash_scopes(scopes);
let (hash, scps0) = hash_scopes(scopes);
let cache = self.cache.clone();
let scps = scps0.clone();
if let Some(token) = self
.cache
.get(hash, &scps)
.map_err(|e| Box::new(e) as Box<dyn error::Error + Send>)?
{
if !token.expired() {
return Ok(token);
let cache_lookup = futures::lazy(move || {
match cache
.lock()
.unwrap()
.get(hash, &scps.iter().map(|s| s.as_str()).collect())
{
Ok(Some(token)) => {
if !token.expired() {
return Ok(token);
}
return Err(StringError::new("expired token in cache", None));
}
Err(e) => return Err(StringError::new(format!("cache lookup error: {}", e), None)),
Ok(None) => return Err(StringError::new("no token in cache", None)),
}
}
});
let token = self.request_token(&scps)?;
let _ = self.cache.set(hash, &scps, Some(token.clone()));
let cache = self.cache.clone();
let req_token = Self::request_token(
self.client.clone(),
self.sub.clone(),
self.key.clone(),
scps0.iter().map(|s| s.to_string()).collect(),
)
.then(move |r| match r {
Ok(token) => {
let _ = cache.lock().unwrap().set(
hash,
&scps0.iter().map(|s| s.as_str()).collect(),
Some(token.clone()),
);
Box::new(future::ok(token))
}
Err(e) => Box::new(future::err(e)),
});
Ok(token)
Box::new(cache_lookup.then(|r| match r {
Ok(t) => Box::new(Ok(t).into_future())
as Box<dyn Future<Item = Token, Error = RequestError> + Send>,
Err(_) => {
Box::new(req_token) as Box<dyn Future<Item = Token, Error = RequestError> + Send>
}
}))
}
/// Returns an empty ApplicationSecret as tokens for service accounts don't need to be
/// refreshed (they are simply reissued).
fn application_secret(&self) -> ApplicationSecret {
Default::default()
}
fn api_key(&mut self) -> Option<String> {
@@ -352,12 +390,114 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::authenticator::GetToken;
use crate::helper::service_account_key_from_file;
use crate::types::GetToken;
use hyper;
use hyper_tls::HttpsConnector;
use mockito::{self, mock};
use tokio;
// This is a valid but deactivated key.
#[test]
fn test_mocked_http() {
env_logger::try_init().unwrap();
let server_url = &mockito::server_url();
let client_secret = r#"{
"type": "service_account",
"project_id": "yup-test-243420",
"private_key_id": "26de294916614a5ebdf7a065307ed3ea9941902b",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDemmylrvp1KcOn\n9yTAVVKPpnpYznvBvcAU8Qjwr2fSKylpn7FQI54wCk5VJVom0jHpAmhxDmNiP8yv\nHaqsef+87Oc0n1yZ71/IbeRcHZc2OBB33/LCFqf272kThyJo3qspEqhuAw0e8neg\nLQb4jpm9PsqR8IjOoAtXQSu3j0zkXemMYFy93PWHjVpPEUX16NGfsWH7oxspBHOk\n9JPGJL8VJdbiAoDSDgF0y9RjJY5I52UeHNhMsAkTYs6mIG4kKXt2+T9tAyHw8aho\nwmuytQAfydTflTfTG8abRtliF3nil2taAc5VB07dP1b4dVYy/9r6M8Z0z4XM7aP+\nNdn2TKm3AgMBAAECggEAWi54nqTlXcr2M5l535uRb5Xz0f+Q/pv3ceR2iT+ekXQf\n+mUSShOr9e1u76rKu5iDVNE/a7H3DGopa7ZamzZvp2PYhSacttZV2RbAIZtxU6th\n7JajPAM+t9klGh6wj4jKEcE30B3XVnbHhPJI9TCcUyFZoscuPXt0LLy/z8Uz0v4B\nd5JARwyxDMb53VXwukQ8nNY2jP7WtUig6zwE5lWBPFMbi8GwGkeGZOruAK5sPPwY\nGBAlfofKANI7xKx9UXhRwisB4+/XI1L0Q6xJySv9P+IAhDUI6z6kxR+WkyT/YpG3\nX9gSZJc7qEaxTIuDjtep9GTaoEqiGntjaFBRKoe+VQKBgQDzM1+Ii+REQqrGlUJo\nx7KiVNAIY/zggu866VyziU6h5wjpsoW+2Npv6Dv7nWvsvFodrwe50Y3IzKtquIal\nVd8aa50E72JNImtK/o5Nx6xK0VySjHX6cyKENxHRDnBmNfbALRM+vbD9zMD0lz2q\nmns/RwRGq3/98EqxP+nHgHSr9QKBgQDqUYsFAAfvfT4I75Glc9svRv8IsaemOm07\nW1LCwPnj1MWOhsTxpNF23YmCBupZGZPSBFQobgmHVjQ3AIo6I2ioV6A+G2Xq/JCF\nmzfbvZfqtbbd+nVgF9Jr1Ic5T4thQhAvDHGUN77BpjEqZCQLAnUWJx9x7e2xvuBl\n1A6XDwH/ewKBgQDv4hVyNyIR3nxaYjFd7tQZYHTOQenVffEAd9wzTtVbxuo4sRlR\nNM7JIRXBSvaATQzKSLHjLHqgvJi8LITLIlds1QbNLl4U3UVddJbiy3f7WGTqPFfG\nkLhUF4mgXpCpkMLxrcRU14Bz5vnQiDmQRM4ajS7/kfwue00BZpxuZxst3QKBgQCI\nRI3FhaQXyc0m4zPfdYYVc4NjqfVmfXoC1/REYHey4I1XetbT9Nb/+ow6ew0UbgSC\nUZQjwwJ1m1NYXU8FyovVwsfk9ogJ5YGiwYb1msfbbnv/keVq0c/Ed9+AG9th30qM\nIf93hAfClITpMz2mzXIMRQpLdmQSR4A2l+E4RjkSOwKBgQCB78AyIdIHSkDAnCxz\nupJjhxEhtQ88uoADxRoEga7H/2OFmmPsqfytU4+TWIdal4K+nBCBWRvAX1cU47vH\nJOlSOZI0gRKe0O4bRBQc8GXJn/ubhYSxI02IgkdGrIKpOb5GG10m85ZvqsXw3bKn\nRVHMD0ObF5iORjZUqD0yRitAdg==\n-----END PRIVATE KEY-----\n",
"client_email": "yup-test-sa-1@yup-test-243420.iam.gserviceaccount.com",
"client_id": "102851967901799660408",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/yup-test-sa-1%40yup-test-243420.iam.gserviceaccount.com"
}"#;
let mut key: ServiceAccountKey = serde_json::from_str(client_secret).unwrap();
key.token_uri = Some(format!("{}/token", server_url));
let json_response = r#"{
"access_token": "ya29.c.ElouBywiys0LyNaZoLPJcp1Fdi2KjFMxzvYKLXkTdvM-rDfqKlvEq6PiMhGoGHx97t5FAvz3eb_ahdwlBjSStxHtDVQB4ZPRJQ_EOi-iS7PnayahU2S9Jp8S6rk",
"expires_in": 3600,
"token_type": "Bearer"
}"#;
let bad_json_response = r#"{
"access_token": "ya29.c.ElouBywiys0LyNaZoLPJcp1Fdi2KjFMxzvYKLXkTdvM-rDfqKlvEq6PiMhGoGHx97t5FAvz3eb_ahdwlBjSStxHtDVQB4ZPRJQ_EOi-iS7PnayahU2S9Jp8S6rk",
"token_type": "Bearer"
}"#;
let https = HttpsConnector::new(1).unwrap();
let client = hyper::Client::builder()
.keep_alive(false)
.build::<_, hyper::Body>(https);
let mut rt = tokio::runtime::Builder::new()
.core_threads(1)
.panic_handler(|e| std::panic::resume_unwind(e))
.build()
.unwrap();
// Successful path.
{
let _m = mock("POST", "/token")
.with_status(200)
.with_header("content-type", "text/json")
.with_body(json_response)
.expect(1)
.create();
let mut acc = ServiceAccountAccess::new(key.clone(), client.clone());
let fut = acc
.token(vec!["https://www.googleapis.com/auth/pubsub"].iter())
.and_then(|tok| {
assert!(tok.access_token.contains("ya29.c.ElouBywiys0Ly"));
assert_eq!(Some(3600), tok.expires_in);
Ok(())
});
rt.block_on(fut).expect("block_on");
assert!(acc
.cache
.lock()
.unwrap()
.get(
3502164897243251857,
&vec!["https://www.googleapis.com/auth/pubsub"]
)
.unwrap()
.is_some());
// Test that token is in cache (otherwise mock will tell us)
let fut = acc
.token(vec!["https://www.googleapis.com/auth/pubsub"].iter())
.and_then(|tok| {
assert!(tok.access_token.contains("ya29.c.ElouBywiys0Ly"));
assert_eq!(Some(3600), tok.expires_in);
Ok(())
});
rt.block_on(fut).expect("block_on 2");
_m.assert();
}
// Malformed response.
{
let _m = mock("POST", "/token")
.with_status(200)
.with_header("content-type", "text/json")
.with_body(bad_json_response)
.create();
let mut acc = ServiceAccountAccess::new(key.clone(), client.clone());
let fut = acc
.token(vec!["https://www.googleapis.com/auth/pubsub"].iter())
.then(|result| {
assert!(result.is_err());
Ok(()) as Result<(), ()>
});
rt.block_on(fut).expect("block_on");
_m.assert();
}
rt.shutdown_on_idle().wait().expect("shutdown");
}
// Valid but deactivated key.
const TEST_PRIVATE_KEY_PATH: &'static str = "examples/Sanguine-69411a0c0eea.json";
// Uncomment this test to verify that we can successfully obtain tokens.
@@ -373,8 +513,8 @@ mod tests {
let mut acc = ServiceAccountAccess::new(key, client);
println!(
"{:?}",
acc.token(vec![&"https://www.googleapis.com/auth/pubsub"])
.unwrap()
acc.token(vec!["https://www.googleapis.com/auth/pubsub"].iter())
.wait()
);
}

View File

@@ -35,7 +35,7 @@ pub trait TokenStorage {
}
/// Calculate a hash value describing the scopes, and return a sorted Vec of the scopes.
pub fn hash_scopes<'a, I, T>(scopes: I) -> (u64, Vec<&'a str>)
pub fn hash_scopes<'a, I, T>(scopes: I) -> (u64, Vec<String>)
where
T: AsRef<str> + Ord + 'a,
I: IntoIterator<Item = &'a T>,
@@ -47,7 +47,7 @@ where
sv.sort();
let mut sh = DefaultHasher::new();
&sv.hash(&mut sh);
let sv = sv;
let sv = sv.iter().map(|s| s.to_string()).collect();
(sh.finish(), sv)
}
@@ -80,11 +80,17 @@ impl TokenStorage for NullStorage {
}
/// A storage that remembers values for one session only.
#[derive(Default)]
#[derive(Debug, Default)]
pub struct MemoryStorage {
pub tokens: HashMap<u64, Token>,
}
impl MemoryStorage {
pub fn new() -> MemoryStorage {
Default::default()
}
}
impl TokenStorage for MemoryStorage {
type Error = NullError;

View File

@@ -2,26 +2,54 @@ use chrono::{DateTime, TimeZone, Utc};
use hyper;
use std::error::Error;
use std::fmt;
use std::io;
use std::str::FromStr;
use futures::prelude::*;
/// A marker trait for all Flows
pub trait Flow {
fn type_id() -> FlowType;
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
pub struct JsonError {
pub error: String,
pub error_description: Option<String>,
pub error_uri: Option<String>,
}
/// Encapsulates all possible results of the `request_token(...)` operation
/// All possible outcomes of the refresh flow
#[derive(Debug)]
pub enum RefreshResult {
/// Indicates connection failure
Error(hyper::Error),
/// The server did not answer with a new token, providing the server message
RefreshError(String, Option<String>),
/// The refresh operation finished successfully, providing a new `Token`
Success(Token),
}
/// Encapsulates all possible results of a `poll_token(...)` operation in the Device flow.
#[derive(Debug)]
pub enum PollError {
/// Connection failure - retry if you think it's worth it
HttpError(hyper::Error),
/// Indicates we are expired, including the expiration date
Expired(DateTime<Utc>),
/// Indicates that the user declined access. String is server response
AccessDenied,
/// Indicates that too many attempts failed.
TimedOut,
/// Other type of error.
Other(String),
}
/// Encapsulates all possible results of the `token(...)` operation
#[derive(Debug)]
pub enum RequestError {
/// Indicates connection failure
ClientError(hyper::Error),
/// Indicates HTTP status failure
HttpError(hyper::http::Error),
/// The OAuth client was not found
InvalidClient,
/// Some requested scopes were invalid. String contains the scopes as part of
@@ -30,6 +58,20 @@ pub enum RequestError {
/// A 'catch-all' variant containing the server error and description
/// First string is the error code, the second may be a more detailed description
NegativeServerResponse(String, Option<String>),
/// A malformed server response.
BadServerResponse(String),
/// Error while decoding a JSON response.
JSONError(serde_json::error::Error),
/// Error within user input.
UserError(String),
/// A lower level IO error.
LowLevelError(io::Error),
/// A poll error occurred in the DeviceFlow.
Poll(PollError),
/// An error occurred while refreshing tokens.
Refresh(RefreshResult),
/// Error in token cache layer
Cache(Box<dyn Error + Send>),
}
impl From<hyper::Error> for RequestError {
@@ -38,12 +80,6 @@ impl From<hyper::Error> for RequestError {
}
}
impl From<hyper::http::Error> for RequestError {
fn from(error: hyper::http::Error) -> RequestError {
RequestError::HttpError(error)
}
}
impl From<JsonError> for RequestError {
fn from(value: JsonError) -> RequestError {
match &*value.error {
@@ -62,7 +98,6 @@ impl fmt::Display for RequestError {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
RequestError::ClientError(ref err) => err.fmt(f),
RequestError::HttpError(ref err) => err.fmt(f),
RequestError::InvalidClient => "Invalid Client".fmt(f),
RequestError::InvalidScope(ref scope) => writeln!(f, "Invalid Scope: '{}'", scope),
RequestError::NegativeServerResponse(ref error, ref desc) => {
@@ -72,6 +107,28 @@ impl fmt::Display for RequestError {
}
"\n".fmt(f)
}
RequestError::BadServerResponse(ref s) => s.fmt(f),
RequestError::JSONError(ref e) => format!(
"JSON Error; this might be a bug with unexpected server responses! {}",
e
)
.fmt(f),
RequestError::UserError(ref s) => s.fmt(f),
RequestError::LowLevelError(ref e) => e.fmt(f),
RequestError::Poll(ref pe) => pe.fmt(f),
RequestError::Refresh(ref rr) => format!("{:?}", rr).fmt(f),
RequestError::Cache(ref e) => e.fmt(f),
}
}
}
impl Error for RequestError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match *self {
RequestError::ClientError(ref err) => Some(err),
RequestError::LowLevelError(ref err) => Some(err),
RequestError::JSONError(ref err) => Some(err),
_ => None,
}
}
}
@@ -179,6 +236,25 @@ impl FromStr for Scheme {
}
}
/// A provider for authorization tokens, yielding tokens valid for a given scope.
/// The `api_key()` method is an alternative in case there are no scopes or
/// if no user is involved.
pub trait GetToken {
fn token<'b, I, T>(
&mut self,
scopes: I,
) -> Box<dyn Future<Item = Token, Error = RequestError> + Send>
where
T: AsRef<str> + Ord + 'b,
I: Iterator<Item = &'b T>;
fn api_key(&mut self) -> Option<String>;
/// Return an application secret with at least token_uri, client_secret, and client_id filled
/// in. This is used for refreshing tokens without interaction from the flow.
fn application_secret(&self) -> ApplicationSecret;
}
/// Represents a token as returned by OAuth2 servers.
///
/// It is produced by all authentication flows.