diff --git a/.travis.yml b/.travis.yml index 42e492e..f63d99c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,7 +22,6 @@ rust: stages: - name: test - - name: examples - name: lint - name: coverage @@ -45,13 +44,6 @@ jobs: - cargo fmt --all -- --check - cargo clippy --all-targets --all-features -- -D warnings || true - - stage: examples - if: os = linux - rust: stable - script: - - cargo build --manifest-path examples/drive_example/Cargo.toml - - cargo build --manifest-path examples/service_account/Cargo.toml - - stage: coverage if: os = linux sudo: true diff --git a/Cargo.toml b/Cargo.toml index d7dd8ff..92ae597 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "yup-oauth2" -version = "2.0.2" +version = "3.0.0-alpha" authors = ["Sebastian Thiel ", "Lewin Bormann "] repository = "https://github.com/dermesser/yup-oauth2" description = "An oauth2 implementation, providing the 'device', 'service account' and 'installed' authorization flows" @@ -13,6 +13,7 @@ edition = "2018" [dependencies] base64 = "0.10" chrono = "0.4" +http = "0.1" hyper = {version = "0.12", default-features = false} hyper-tls = "0.3" itertools = "0.8" @@ -24,9 +25,15 @@ serde_derive = "1.0" url = "1" futures = "0.1" tokio-threadpool = "0.1" +tokio = "0.1" +tokio-timer = "0.2" [dev-dependencies] getopts = "0.2" open = "1.1" yup-hyper-mock = "3.14" -tokio = "0.1" +mockito = "0.17" +env_logger = "0.6" + +[workspace] +members = ["examples/test-installed/", "examples/test-svc-acct/", "examples/test-device/"] diff --git a/README.md b/README.md index 69ae42e..0751216 100644 --- a/README.md +++ b/README.md @@ -8,13 +8,17 @@ Status](https://travis-ci.org/dermesser/yup-oauth2.svg)](https://travis-ci.org/d (However, you're able to use it with raw HTTP requests as well; the flows are implemented as token sources yielding HTTP Bearer tokens). +The provider we have been testing the code against is also Google. However, the code itself is +generic, and any OAuth provider behaving like Google will work as well. If you find one that +doesn't, please let us know and/or contribute a fix! + ### Supported authorization types * Device flow (user enters code on authorization page) * Installed application flow (user visits URL, copies code to application, application uses code to obtain token). Used for services like GMail, Drive, ... -* Service account flow: Non-interactive for server-to-server communication based on public key - cryptography. Used for services like Cloud Pubsub, Cloud Storage, ... +* Service account flow: Non-interactive authorization of server-to-server communication based on + public key cryptography. Used for services like Cloud Pubsub, Cloud Storage, ... ### Usage diff --git a/examples/auth.rs b/examples/old/auth.rs similarity index 100% rename from examples/auth.rs rename to examples/old/auth.rs diff --git a/examples/test-device/Cargo.toml b/examples/test-device/Cargo.toml new file mode 100644 index 0000000..648213b --- /dev/null +++ b/examples/test-device/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "test-device" +version = "0.1.0" +authors = ["Lewin Bormann "] +edition = "2018" + +[dependencies] +yup-oauth2 = { path = "../../" } +hyper = "0.12" +hyper-tls = "0.3" +futures = "0.1" +tokio = "0.1" diff --git a/examples/test-device/src/main.rs b/examples/test-device/src/main.rs new file mode 100644 index 0000000..c24a032 --- /dev/null +++ b/examples/test-device/src/main.rs @@ -0,0 +1,36 @@ +use futures::prelude::*; +use yup_oauth2::{self, Authenticator, GetToken}; + +use hyper::client::Client; +use hyper_tls::HttpsConnector; +use std::path; +use std::time::Duration; +use tokio; + +fn main() { + let creds = yup_oauth2::read_application_secret(path::Path::new("clientsecret.json")) + .expect("clientsecret"); + let https = HttpsConnector::new(1).expect("tls"); + let client = Client::builder() + .keep_alive(false) + .build::<_, hyper::Body>(https); + let scopes = &["https://www.googleapis.com/auth/youtube.readonly".to_string()]; + + let ad = yup_oauth2::DefaultFlowDelegate; + let mut df = yup_oauth2::DeviceFlow::new::(client.clone(), creds, ad, None); + df.set_wait_duration(Duration::from_secs(120)); + let mut auth = Authenticator::new_disk( + client, + df, + yup_oauth2::DefaultAuthenticatorDelegate, + "tokenstorage.json", + ) + .expect("authenticator"); + + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let fut = auth + .token(scopes.iter()) + .and_then(|tok| Ok(println!("{:?}", tok))); + + println!("{:?}", rt.block_on(fut)); +} diff --git a/examples/test-installed/Cargo.toml b/examples/test-installed/Cargo.toml new file mode 100644 index 0000000..104183a --- /dev/null +++ b/examples/test-installed/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "test-installed" +version = "0.1.0" +authors = ["Lewin Bormann "] +edition = "2018" + +[dependencies] +yup-oauth2 = { path = "../../" } +hyper = "0.12" +hyper-tls = "0.3" +futures = "0.1" +tokio = "0.1" diff --git a/examples/test-installed/src/main.rs b/examples/test-installed/src/main.rs new file mode 100644 index 0000000..d11df4e --- /dev/null +++ b/examples/test-installed/src/main.rs @@ -0,0 +1,41 @@ +use futures::prelude::*; +use yup_oauth2::GetToken; +use yup_oauth2::{Authenticator, InstalledFlow}; + +use hyper::client::Client; +use hyper_tls::HttpsConnector; + +use std::path::Path; + +fn main() { + let https = HttpsConnector::new(1).expect("tls"); + let client = Client::builder() + .keep_alive(false) + .build::<_, hyper::Body>(https); + let ad = yup_oauth2::DefaultFlowDelegate; + let secret = yup_oauth2::read_application_secret(Path::new("clientsecret.json")) + .expect("clientsecret.json"); + let inf = InstalledFlow::new( + client.clone(), + ad, + secret, + yup_oauth2::InstalledFlowReturnMethod::HTTPRedirect(8081), + ); + let mut auth = Authenticator::new_disk( + client, + inf, + yup_oauth2::DefaultAuthenticatorDelegate, + "tokencache.json", + ) + .unwrap(); + let s = "https://www.googleapis.com/auth/drive.file".to_string(); + let scopes = vec![s]; + + let tok = auth.token(scopes.iter()); + let fut = tok.map_err(|e| println!("error: {:?}", e)).and_then(|t| { + println!("The token is {:?}", t); + Ok(()) + }); + + tokio::run(fut) +} diff --git a/examples/test-svc-acct/Cargo.toml b/examples/test-svc-acct/Cargo.toml new file mode 100644 index 0000000..3b58bfd --- /dev/null +++ b/examples/test-svc-acct/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "test-svc-acct" +version = "0.1.0" +authors = ["Lewin Bormann "] +edition = "2018" + +[dependencies] +yup-oauth2 = { path = "../../" } +hyper = "0.12" +hyper-tls = "0.3" +futures = "0.1" +tokio = "0.1" diff --git a/examples/test-svc-acct/src/main.rs b/examples/test-svc-acct/src/main.rs new file mode 100644 index 0000000..f9a9ee0 --- /dev/null +++ b/examples/test-svc-acct/src/main.rs @@ -0,0 +1,36 @@ +use yup_oauth2; + +use futures::prelude::*; +use yup_oauth2::GetToken; + +use hyper::client::Client; +use hyper_tls::HttpsConnector; +use tokio; + +use std::path; + +fn main() { + let creds = + yup_oauth2::service_account_key_from_file(path::Path::new("serviceaccount.json")).unwrap(); + let https = HttpsConnector::new(1).expect("tls"); + let client = Client::builder() + .keep_alive(false) + .build::<_, hyper::Body>(https); + + let mut sa = yup_oauth2::ServiceAccountAccess::new(creds, client); + + let fut = sa + .token(["https://www.googleapis.com/auth/pubsub"].iter()) + .and_then(|tok| { + println!("token is: {:?}", tok); + Ok(()) + }); + let fut2 = sa + .token(["https://www.googleapis.com/auth/pubsub"].iter()) + .and_then(|tok| { + println!("cached token is {:?} and should be identical", tok); + Ok(()) + }); + let all = fut.join(fut2).then(|_| Ok(())); + tokio::run(all) +} diff --git a/src/authenticator.rs b/src/authenticator.rs index 9f7c642..fa68076 100644 --- a/src/authenticator.rs +++ b/src/authenticator.rs @@ -1,382 +1,215 @@ -use std::cmp::min; -use std::collections::hash_map::DefaultHasher; -use std::convert::From; +use crate::authenticator_delegate::{AuthenticatorDelegate, Retry}; +use crate::refresh::RefreshFlow; +use crate::storage::{hash_scopes, DiskTokenStorage, MemoryStorage, TokenStorage}; +use crate::types::{ApplicationSecret, GetToken, RefreshResult, RequestError, Token}; + +use futures::{future, prelude::*}; +use tokio_timer; + use std::error::Error; -use std::hash::{Hash, Hasher}; -use std::iter::IntoIterator; -use std::thread::sleep; -use std::time::Duration; +use std::io; +use std::sync::{Arc, Mutex}; -use crate::authenticator_delegate::{AuthenticatorDelegate, PollError, PollInformation}; -use crate::device::{DeviceFlow, GOOGLE_DEVICE_CODE_URL}; -use crate::installed::{InstalledFlow, InstalledFlowReturnMethod}; -use crate::refresh::{RefreshFlow, RefreshResult}; -use crate::storage::TokenStorage; -use crate::types::{ApplicationSecret, FlowType, RequestError, StringError, Token}; - -use hyper; - -/// A generalized authenticator which will keep tokens valid and store them. +/// Authenticator abstracts different `GetToken` implementations behind one type and handles +/// caching received tokens. It's important to use it (instead of the flows directly) because +/// otherwise the user needs to be asked for new authorization every time a token is generated. /// -/// It is the go-to helper to deal with any kind of supported authentication flow, -/// which will be kept valid and usable. +/// `ServiceAccountAccess` does not need (and does not work) with `Authenticator`, given that it +/// does not require interaction and implements its own caching. Use it directly. /// -/// # Device Flow -/// This involves polling the authentication server in the given intervals -/// until there is a definitive result. -/// -/// These results will be passed the `DeviceFlowHelperDelegate` implementation to deal with -/// * presenting the user code -/// * inform the user about the progress or errors -/// * abort the operation -/// -/// # Usage -/// Please have a look at the library's landing page. -pub struct Authenticator { - flow_type: FlowType, - delegate: D, - storage: S, - client: hyper::Client, - secret: ApplicationSecret, -} - -/// A provider for authorization tokens, yielding tokens valid for a given scope. -/// The `api_key()` method is an alternative in case there are no scopes or -/// if no user is involved. -pub trait GetToken { - fn token<'b, I, T>(&mut self, scopes: I) -> Result> - where - T: AsRef + Ord + 'b, - I: IntoIterator; - - fn api_key(&mut self) -> Option; -} - -impl<'a, D, S, C: 'static> Authenticator -where - D: AuthenticatorDelegate, +/// NOTE: It is recommended to use a client constructed like this in order to prevent functions +/// like `hyper::run()` from hanging: `let client = hyper::Client::builder().keep_alive(false);`. +/// Due to token requests being rare, this should not result in a too bad performance problem. +pub struct Authenticator< + T: GetToken, S: TokenStorage, + AD: AuthenticatorDelegate, C: hyper::client::connect::Connect, +> { + client: hyper::Client, + inner: Arc>, + store: Arc>, + delegate: AD, +} + +impl + Authenticator { - /// Returns a new `Authenticator` instance - /// - /// # Arguments - /// * `secret` - usually obtained from a client secret file produced by the - /// [developer console][dev-con] - /// * `delegate` - Used to further refine the flow of the authentication. - /// * `client` - used for all authentication https requests. - /// * `storage` - used to cache authorization tokens tokens permanently. However, - /// the implementation doesn't have any particular semantic requirement, which - /// is why `NullStorage` and `MemoryStorage` can be used as well. - /// * `flow_type` - the kind of authentication to use to obtain a token for the - /// required scopes. If unset, it will be derived from the secret. - /// - /// NOTE: It is recommended to use a client constructed like this in order to prevent functions - /// like `hyper::run()` from hanging: `let client = hyper::Client::builder().keep_alive(false);`. - /// Due to token requests being rare, this should not result in a too bad performance problem. - /// [dev-con]: https://console.developers.google.com + /// Create an Authenticator caching tokens for the duration of this authenticator. pub fn new( - secret: &ApplicationSecret, - delegate: D, - client: hyper::Client, - storage: S, - flow_type: Option, - ) -> Authenticator { + client: hyper::Client, + inner: T, + delegate: AD, + ) -> Authenticator { Authenticator { - flow_type: flow_type.unwrap_or(FlowType::Device(GOOGLE_DEVICE_CODE_URL.to_string())), - delegate: delegate, - storage: storage, client: client, - secret: secret.clone(), - } - } - - fn do_installed_flow(&mut self, scopes: &Vec<&str>) -> Result> { - let installed_type; - - match self.flow_type { - FlowType::InstalledInteractive => { - installed_type = Some(InstalledFlowReturnMethod::Interactive) - } - FlowType::InstalledRedirect(port) => { - installed_type = Some(InstalledFlowReturnMethod::HTTPRedirect(port)) - } - _ => installed_type = None, - } - - let mut flow = InstalledFlow::new(self.client.clone(), installed_type); - flow.obtain_token(&mut self.delegate, &self.secret, scopes.iter()) - } - - fn retrieve_device_token( - &mut self, - scopes: &Vec<&str>, - code_url: String, - ) -> Result> { - let mut flow = DeviceFlow::new(self.client.clone(), &self.secret, &code_url); - - // PHASE 1: REQUEST CODE - let pi: PollInformation; - loop { - let res = flow.request_code(scopes.iter()); - - pi = match res { - Err(res_err) => { - match res_err { - RequestError::ClientError(err) => match self.delegate.client_error(&err) { - Retry::Abort | Retry::Skip => { - return Err(Box::new(StringError::from(&err as &dyn Error))); - } - Retry::After(d) => sleep(d), - }, - RequestError::HttpError(err) => { - match self.delegate.connection_error(&err) { - Retry::Abort | Retry::Skip => { - return Err(Box::new(StringError::from(&err as &dyn Error))); - } - Retry::After(d) => sleep(d), - } - } - RequestError::InvalidClient - | RequestError::NegativeServerResponse(_, _) - | RequestError::InvalidScope(_) => { - let serr = StringError::from(res_err.to_string()); - self.delegate.request_failure(res_err); - return Err(Box::new(serr)); - } - }; - continue; - } - Ok(pi) => { - self.delegate.present_user_code(&pi); - pi - } - }; - break; - } - - // PHASE 1: POLL TOKEN - loop { - match flow.poll_token() { - Err(ref poll_err) => { - let pts = poll_err.to_string(); - match poll_err { - &&PollError::HttpError(ref err) => match self.delegate.client_error(err) { - Retry::Abort | Retry::Skip => { - return Err(Box::new(StringError::from(err as &dyn Error))); - } - Retry::After(d) => sleep(d), - }, - &&PollError::Expired(ref t) => { - self.delegate.expired(t); - return Err(Box::new(StringError::from(pts))); - } - &&PollError::AccessDenied => { - self.delegate.denied(); - return Err(Box::new(StringError::from(pts))); - } - }; // end match poll_err - } - Ok(None) => match self.delegate.pending(&pi) { - Retry::Abort | Retry::Skip => { - return Err(Box::new(StringError::new( - "Pending authentication aborted".to_string(), - None, - ))); - } - Retry::After(d) => sleep(min(d, pi.interval)), - }, - Ok(Some(token)) => return Ok(token), - } + inner: Arc::new(Mutex::new(inner)), + store: Arc::new(Mutex::new(MemoryStorage::new())), + delegate: delegate, } } } -impl GetToken for Authenticator -where - D: AuthenticatorDelegate, - S: TokenStorage, - C: hyper::client::connect::Connect, +impl + Authenticator { - /// Blocks until a token was retrieved from storage, from the server, or until the delegate - /// decided to abort the attempt, or the user decided not to authorize the application. - /// In any failure case, the delegate will be provided with additional information, and - /// the caller will be informed about storage related errors. - /// Otherwise it is guaranteed to be valid for the given scopes. - fn token<'b, I, T>(&mut self, scopes: I) -> Result> + /// Create an Authenticator using the store at `path`. + pub fn new_disk>( + client: hyper::Client, + inner: T, + delegate: AD, + token_storage_path: P, + ) -> io::Result> { + Ok(Authenticator { + client: client, + inner: Arc::new(Mutex::new(inner)), + store: Arc::new(Mutex::new(DiskTokenStorage::new(token_storage_path)?)), + delegate: delegate, + }) + } +} + +impl< + GT: 'static + GetToken + Send, + S: 'static + TokenStorage + Send, + AD: 'static + AuthenticatorDelegate + Send, + C: 'static + hyper::client::connect::Connect + Clone + Send, + > GetToken for Authenticator +{ + /// Returns the API Key of the inner flow. + fn api_key(&mut self) -> Option { + self.inner.lock().unwrap().api_key() + } + /// Returns the application secret of the inner flow. + fn application_secret(&self) -> ApplicationSecret { + self.inner.lock().unwrap().application_secret() + } + + fn token<'b, I, T>( + &mut self, + scopes: I, + ) -> Box + Send> where T: AsRef + Ord + 'b, - I: IntoIterator, + I: Iterator, { - let (scope_key, scopes) = { - let mut sv: Vec<&str> = scopes - .into_iter() - .map(|s| s.as_ref()) - .collect::>(); - sv.sort(); - let mut sh = DefaultHasher::new(); - &sv.hash(&mut sh); - let sv = sv; - (sh.finish(), sv) - }; - - // Get cached token. Yes, let's do an explicit return - loop { - return match self.storage.get(scope_key, &scopes) { - Ok(Some(mut t)) => { - // t needs refresh ? - if t.expired() { - let mut rf = RefreshFlow::new(self.client.clone()); - loop { - match *rf.refresh_token( - self.flow_type.clone(), - &self.secret, - &t.refresh_token, - ) { - RefreshResult::Uninitialized => { - panic!("Token flow should never get here"); + let (scope_key, scopes) = hash_scopes(scopes); + let store = self.store.clone(); + let mut delegate = self.delegate.clone(); + let client = self.client.clone(); + let appsecret = self.inner.lock().unwrap().application_secret(); + let gettoken = self.inner.clone(); + let loopfn = move |()| -> Box< + dyn Future, Error = RequestError> + Send, + > { + // How well does this work with tokio? + match store.lock().unwrap().get( + scope_key.clone(), + &scopes.iter().map(|s| s.as_str()).collect(), + ) { + Ok(Some(t)) => { + if !t.expired() { + return Box::new(Ok(future::Loop::Break(t)).into_future()); + } + // Implement refresh flow. + let refresh_token = t.refresh_token.clone(); + let mut delegate = delegate.clone(); + let store = store.clone(); + let scopes = scopes.clone(); + let refresh_fut = RefreshFlow::refresh_token( + client.clone(), + appsecret.clone(), + refresh_token, + ) + .and_then(move |rr| -> Box, Error=RequestError> + Send> { + match rr { + RefreshResult::Error(ref e) => { + delegate.token_refresh_failed( + format!("{}", e.description().to_string()), + &Some("the request has likely timed out".to_string()), + ); + Box::new(Err(RequestError::Refresh(rr)).into_future()) } - RefreshResult::Error(ref err) => { - match self.delegate.client_error(err) { - Retry::Abort | Retry::Skip => { - return Err(Box::new(StringError::new( - err.description().to_string(), - None, - ))); + RefreshResult::RefreshError(ref s, ref ss) => { + delegate.token_refresh_failed( + format!("{} {}", s, ss.clone().map(|s| format!("({})", s)).unwrap_or("".to_string())), + &Some("the refresh token is likely invalid and your authorization has been revoked".to_string()), + ); + Box::new(Err(RequestError::Refresh(rr)).into_future()) + } + RefreshResult::Success(t) => { + if let Err(e) = store.lock().unwrap().set(scope_key, &scopes.iter().map(|s| s.as_str()).collect(), Some(t.clone())) { + match delegate.token_storage_failure(true, &e) { + Retry::Skip => Box::new(Ok(future::Loop::Break(t)).into_future()), + Retry::Abort => Box::new(Err(RequestError::Cache(Box::new(e))).into_future()), + Retry::After(d) => Box::new( + tokio_timer::sleep(d) + .then(|_| Ok(future::Loop::Continue(()))), + ) + as Box< + dyn Future< + Item = future::Loop, + Error = RequestError> + Send>, } - Retry::After(d) => sleep(d), + } else { + Box::new(Ok(future::Loop::Break(t)).into_future()) } - } - RefreshResult::RefreshError(ref err_str, ref err_description) => { - self.delegate.token_refresh_failed(err_str, err_description); - let storage_err = - match self.storage.set(scope_key, &scopes, None) { - Ok(_) => String::new(), - Err(err) => err.to_string(), - }; - return Err(Box::new(StringError::new( - &(storage_err + err_str), - err_description.as_ref(), - ))); - } - RefreshResult::Success(ref new_t) => { - t = new_t.clone(); - loop { - if let Err(err) = - self.storage.set(scope_key, &scopes, Some(t.clone())) - { - match self.delegate.token_storage_failure(true, &err) { - Retry::Skip => break, - Retry::Abort => return Err(Box::new(err)), - Retry::After(d) => { - sleep(d); - continue; - } - } - } - break; // .set() - } - break; // refresh_token loop - } - } // RefreshResult handling - } // refresh loop - } // handle expiration - Ok(t) + }, + } + }); + Box::new(refresh_fut) } Ok(None) => { - // Nothing was in storage - get a new token - // get new token. The respective sub-routine will do all the logic. - match match self.flow_type.clone() { - FlowType::Device(url) => self.retrieve_device_token(&scopes, url), - FlowType::InstalledInteractive => self.do_installed_flow(&scopes), - FlowType::InstalledRedirect(_) => self.do_installed_flow(&scopes), - } { - Ok(token) => { - loop { - if let Err(err) = - self.storage.set(scope_key, &scopes, Some(token.clone())) - { - match self.delegate.token_storage_failure(true, &err) { - Retry::Skip => break, - Retry::Abort => return Err(Box::new(err)), - Retry::After(d) => { - sleep(d); - continue; + let store = store.clone(); + let scopes = scopes.clone(); + let mut delegate = delegate.clone(); + Box::new( + gettoken + .lock() + .unwrap() + .token(scopes.iter()) + .and_then(move |t| { + if let Err(e) = store.lock().unwrap().set( + scope_key, + &scopes.iter().map(|s| s.as_str()).collect(), + Some(t.clone()), + ) { + match delegate.token_storage_failure(true, &e) { + Retry::Skip => { + Box::new(Ok(future::Loop::Break(t)).into_future()) } + Retry::Abort => Box::new( + Err(RequestError::Cache(Box::new(e))).into_future(), + ), + Retry::After(d) => Box::new( + tokio_timer::sleep(d) + .then(|_| Ok(future::Loop::Continue(()))), + ) + as Box< + dyn Future< + Item = future::Loop, + Error = RequestError, + > + Send, + >, } + } else { + Box::new(Ok(future::Loop::Break(t)).into_future()) } - break; - } // end attempt to save - Ok(token) - } - Err(err) => Err(err), - } // end match token retrieve result + }), + ) } - Err(err) => match self.delegate.token_storage_failure(false, &err) { - Retry::Abort | Retry::Skip => Err(Box::new(err)), + Err(err) => match delegate.token_storage_failure(false, &err) { + Retry::Abort | Retry::Skip => { + return Box::new(Err(RequestError::Cache(Box::new(err))).into_future()) + } Retry::After(d) => { - sleep(d); - continue; + return Box::new( + tokio_timer::sleep(d).then(|_| Ok(future::Loop::Continue(()))), + ) } }, - }; // end match - } // end loop - } - - fn api_key(&mut self) -> Option { - if self.secret.client_id.len() == 0 { - return None; - } - Some(self.secret.client_id.clone()) - } -} - -/// A utility type to indicate how operations DeviceFlowHelper operations should be retried -pub enum Retry { - /// Signal you don't want to retry - Abort, - /// Signals you want to retry after the given duration - After(Duration), - /// Instruct the caller to attempt to keep going, or choose an alternate path. - /// If this is not supported, it will have the same effect as `Abort` - Skip, -} - -#[cfg(test)] -mod tests { - use super::super::device::tests::MockGoogleAuth; - use super::super::types::tests::SECRET; - use super::super::types::ConsoleApplicationSecret; - use super::*; - use crate::authenticator_delegate::DefaultAuthenticatorDelegate; - use crate::storage::MemoryStorage; - use hyper; - use std::default::Default; - - #[test] - fn test_flow() { - use serde_json as json; - - let runtime = tokio::runtime::Runtime::new().unwrap(); - let secret = json::from_str::(SECRET) - .unwrap() - .installed - .unwrap(); - let client = hyper::Client::builder() - .executor(runtime.executor()) - .build(MockGoogleAuth::default()); - let res = Authenticator::new( - &secret, - DefaultAuthenticatorDelegate, - client, - ::default(), - None, - ) - .token(&["https://www.googleapis.com/auth/youtube.upload"]); - - match res { - Ok(t) => assert_eq!(t.access_token, "1/fFAGRNJru1FTz70BzhT3Zg"), - Err(err) => panic!("Expected to retrieve token in one go: {}", err), - } + } + }; + Box::new(future::loop_fn((), loopfn)) } } diff --git a/src/authenticator_delegate.rs b/src/authenticator_delegate.rs index b1c793b..9077f6f 100644 --- a/src/authenticator_delegate.rs +++ b/src/authenticator_delegate.rs @@ -4,12 +4,25 @@ use std::error::Error; use std::fmt; use std::io; -use crate::authenticator::Retry; -use crate::types::RequestError; +use crate::types::{PollError, RequestError}; use chrono::{DateTime, Local, Utc}; use std::time::Duration; +use futures::{future, prelude::*}; +use tokio::io as tio; + +/// A utility type to indicate how operations DeviceFlowHelper operations should be retried +pub enum Retry { + /// Signal you don't want to retry + Abort, + /// Signals you want to retry after the given duration + After(Duration), + /// Instruct the caller to attempt to keep going, or choose an alternate path. + /// If this is not supported, it will have the same effect as `Abort` + Skip, +} + /// Contains state of pending authentication requests #[derive(Clone, Debug, PartialEq)] pub struct PollInformation { @@ -32,23 +45,23 @@ impl fmt::Display for PollInformation { } } -/// Encapsulates all possible results of a `poll_token(...)` operation -#[derive(Debug)] -pub enum PollError { - /// Connection failure - retry if you think it's worth it - HttpError(hyper::Error), - /// indicates we are expired, including the expiration date - Expired(DateTime), - /// Indicates that the user declined access. String is server response - AccessDenied, -} - impl fmt::Display for PollError { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { PollError::HttpError(ref err) => err.fmt(f), PollError::Expired(ref date) => writeln!(f, "Authentication expired at {}", date), PollError::AccessDenied => "Access denied by user".fmt(f), + PollError::TimedOut => "Timed out waiting for token".fmt(f), + PollError::Other(ref s) => format!("Unknown server error: {}", s).fmt(f), + } + } +} + +impl Error for PollError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match *self { + PollError::HttpError(ref e) => Some(e), + _ => None, } } } @@ -57,7 +70,7 @@ impl fmt::Display for PollError { /// /// The only method that needs to be implemented manually is `present_user_code(...)`, /// as no assumptions are made on how this presentation should happen. -pub trait AuthenticatorDelegate { +pub trait AuthenticatorDelegate: Clone { /// Called whenever there is an client, usually if there are network problems. /// /// Return retry information. @@ -65,13 +78,6 @@ pub trait AuthenticatorDelegate { Retry::Abort } - /// Called whenever there is an HttpError, usually if there are network problems. - /// - /// Return retry information. - fn connection_error(&mut self, _: &hyper::http::Error) -> Retry { - Retry::Abort - } - /// Called whenever we failed to retrieve a token or set a token due to a storage error. /// You may use it to either ignore the incident or retry. /// This can be useful if the underlying `TokenStorage` may fail occasionally. @@ -85,15 +91,6 @@ pub trait AuthenticatorDelegate { /// The server denied the attempt to obtain a request code fn request_failure(&mut self, _: RequestError) {} - /// Called if the request code is expired. You will have to start over in this case. - /// This will be the last call the delegate receives. - /// Given `DateTime` is the expiration date - fn expired(&mut self, _: &DateTime) {} - - /// Called if the user denied access. You would have to start over. - /// This will be the last call the delegate receives. - fn denied(&mut self) {} - /// Called if we could not acquire a refresh token for a reason possibly specified /// by the server. /// This call is made for the delegate's information only. @@ -109,6 +106,19 @@ pub trait AuthenticatorDelegate { let _ = error_description; } } +} + +/// FlowDelegate methods are called when an OAuth flow needs to ask the application what to do in +/// certain cases. +pub trait FlowDelegate: Clone { + /// Called if the request code is expired. You will have to start over in this case. + /// This will be the last call the delegate receives. + /// Given `DateTime` is the expiration date + fn expired(&mut self, _: &DateTime) {} + + /// Called if the user denied access. You would have to start over. + /// This will be the last call the delegate receives. + fn denied(&mut self) {} /// Called as long as we are waiting for the user to authorize us. /// Can be used to print progress information, or decide to time-out. @@ -125,7 +135,6 @@ pub trait AuthenticatorDelegate { fn redirect_uri(&self) -> Option { None } - /// The server has returned a `user_code` which must be shown to the user, /// along with the `verification_url`. /// # Notes @@ -151,7 +160,7 @@ pub trait AuthenticatorDelegate { &mut self, url: S, need_code: bool, - ) -> Option { + ) -> Box, Error = Box> + Send> { if need_code { println!( "Please direct your browser to {}, follow the instructions and enter the \ @@ -159,24 +168,33 @@ pub trait AuthenticatorDelegate { url ); - let mut code = String::new(); - io::stdin().read_line(&mut code).ok().map(|_| { - // Remove newline - code.pop(); - code - }) + Box::new( + tio::lines(io::BufReader::new(tio::stdin())) + .into_future() + .map_err(|(e, _)| { + println!("{:?}", e); + Box::new(e) as Box + }) + .and_then(|(l, _)| Ok(l)), + ) } else { println!( "Please direct your browser to {} and follow the instructions displayed \ there.", url ); - None + Box::new(future::ok(None)) } } } /// Uses all default implementations by AuthenticatorDelegate, and makes the trait's /// implementation usable in the first place. +#[derive(Clone)] pub struct DefaultAuthenticatorDelegate; impl AuthenticatorDelegate for DefaultAuthenticatorDelegate {} + +/// Uses all default implementations in the FlowDelegate trait. +#[derive(Clone)] +pub struct DefaultFlowDelegate; +impl FlowDelegate for DefaultFlowDelegate {} diff --git a/src/device.rs b/src/device.rs index a4d72bd..c4b60df 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1,71 +1,167 @@ -use std::default::Default; -use std::iter::IntoIterator; +use std::iter::{FromIterator, IntoIterator}; use std::time::Duration; use chrono::{self, Utc}; use futures::stream::Stream; -use futures::Future; +use futures::{future, prelude::*}; +use http; use hyper; use hyper::header; use itertools::Itertools; use serde_json as json; +use tokio_timer; use url::form_urlencoded; -use crate::authenticator_delegate::{PollError, PollInformation}; -use crate::types::{ApplicationSecret, Flow, FlowType, JsonError, RequestError, Token}; +use crate::authenticator_delegate::{FlowDelegate, PollInformation, Retry}; +use crate::types::{ + ApplicationSecret, Flow, FlowType, GetToken, JsonError, PollError, RequestError, Token, +}; pub const GOOGLE_DEVICE_CODE_URL: &'static str = "https://accounts.google.com/o/oauth2/device/code"; -/// Encapsulates all possible states of the Device Flow -enum DeviceFlowState { - /// We failed to poll a result - Error, - /// We received poll information and will periodically poll for a token - Pending(PollInformation), - /// The flow finished successfully, providing token information - Success(Token), -} - /// Implements the [Oauth2 Device Flow](https://developers.google.com/youtube/v3/guides/authentication#devices) /// It operates in two steps: /// * obtain a code to show to the user /// * (repeatedly) poll for the user to authenticate your application -pub struct DeviceFlow { +pub struct DeviceFlow { client: hyper::Client, - device_code: String, - state: Option, - error: Option, application_secret: ApplicationSecret, + /// Usually GOOGLE_DEVICE_CODE_URL device_code_url: String, + fd: FD, + wait: Duration, } -impl Flow for DeviceFlow { +impl Flow for DeviceFlow { fn type_id() -> FlowType { FlowType::Device(String::new()) } } -impl DeviceFlow +impl< + FD: FlowDelegate + Clone + Send + 'static, + C: hyper::client::connect::Connect + Sync + 'static, + > GetToken for DeviceFlow +{ + fn token<'b, I, T>( + &mut self, + scopes: I, + ) -> Box + Send> + where + T: AsRef + Ord + 'b, + I: Iterator, + { + self.retrieve_device_token(Vec::from_iter(scopes.map(|s| s.as_ref().to_string()))) + } + fn api_key(&mut self) -> Option { + None + } + fn application_secret(&self) -> ApplicationSecret { + self.application_secret.clone() + } +} + +impl DeviceFlow where C: hyper::client::connect::Connect + Sync + 'static, C::Transport: 'static, C::Future: 'static, + FD: FlowDelegate + Clone + Send + 'static, { - pub fn new>( + pub fn new>( client: hyper::Client, - secret: &ApplicationSecret, - device_code_url: S, - ) -> DeviceFlow { + secret: ApplicationSecret, + fd: FD, + device_code_url: Option, + ) -> DeviceFlow { DeviceFlow { client: client, - device_code: Default::default(), - application_secret: secret.clone(), - device_code_url: device_code_url.as_ref().to_string(), - state: None, - error: None, + application_secret: secret, + device_code_url: device_code_url + .as_ref() + .map(|s| s.as_ref().to_string()) + .unwrap_or(GOOGLE_DEVICE_CODE_URL.to_string()), + fd: fd, + wait: Duration::from_secs(1200), } } + /// Set the time to wait for the user to authorize us. The default is 120 seconds. + pub fn set_wait_duration(&mut self, wait: Duration) { + self.wait = wait; + } + + /// Essentially what `GetToken::token` does: Retrieve a token for the given scopes without + /// caching. + pub fn retrieve_device_token<'a>( + &mut self, + scopes: Vec, + ) -> Box + Send> { + let application_secret = self.application_secret.clone(); + let client = self.client.clone(); + let wait = self.wait; + let mut fd = self.fd.clone(); + let request_code = Self::request_code( + application_secret.clone(), + client.clone(), + self.device_code_url.clone(), + scopes, + ) + .and_then(move |(pollinf, device_code)| { + fd.present_user_code(&pollinf); + Ok((pollinf, device_code)) + }); + let fd = self.fd.clone(); + Box::new(request_code.and_then(move |(pollinf, device_code)| { + future::loop_fn(0, move |i| { + // Make a copy of everything every time, because the loop function needs to be + // repeatable, i.e. we can't move anything out. + let pt = Self::poll_token( + application_secret.clone(), + client.clone(), + device_code.clone(), + pollinf.clone(), + fd.clone(), + ); + let maxn = wait.as_secs() / pollinf.interval.as_secs(); + let mut fd = fd.clone(); + let pollinf = pollinf.clone(); + tokio_timer::sleep(pollinf.interval) + .then(|_| pt) + .then(move |r| match r { + Ok(None) if i < maxn => match fd.pending(&pollinf) { + Retry::Abort | Retry::Skip => { + Box::new(Err(RequestError::Poll(PollError::TimedOut)).into_future()) + } + Retry::After(d) => Box::new( + tokio_timer::sleep(d) + .then(move |_| Ok(future::Loop::Continue(i + 1))), + ) + as Box< + dyn Future< + Item = future::Loop, + Error = RequestError, + > + Send, + >, + }, + Ok(Some(tok)) => Box::new(Ok(future::Loop::Break(tok)).into_future()), + Err(e @ PollError::AccessDenied) + | Err(e @ PollError::TimedOut) + | Err(e @ PollError::Expired(_)) => { + Box::new(Err(RequestError::Poll(e)).into_future()) + } + Err(_) if i < maxn => { + Box::new(Ok(future::Loop::Continue(i + 1)).into_future()) + } + // Too many attempts. + Ok(None) | Err(_) => { + Box::new(Err(RequestError::Poll(PollError::TimedOut)).into_future()) + } + }) + }) + })) + } + /// The first step involves asking the server for a code that the user /// can type into a field at a specified URL. It is called only once, assuming /// there was no connection error. Otherwise, it may be called again until @@ -81,26 +177,22 @@ where /// * If called after a successful result was returned at least once. /// # Examples /// See test-cases in source code for a more complete example. - pub fn request_code<'b, T, I>(&mut self, scopes: I) -> Result - where - T: AsRef + 'b, - I: IntoIterator, - { - if self.state.is_some() { - panic!("Must not be called after we have obtained a token and have no error"); - } - + fn request_code( + application_secret: ApplicationSecret, + client: hyper::Client, + device_code_url: String, + scopes: Vec, + ) -> impl Future { // note: cloned() shouldn't be needed, see issue // https://github.com/servo/rust-url/issues/81 let req = form_urlencoded::Serializer::new(String::new()) .extend_pairs(&[ - ("client_id", &self.application_secret.client_id), + ("client_id", application_secret.client_id.clone()), ( "scope", - &scopes + scopes .into_iter() - .map(|s| s.as_ref()) - .intersperse(" ") + .intersperse(" ".to_string()) .collect::(), ), ]) @@ -108,54 +200,60 @@ where // note: works around bug in rustlang // https://github.com/rust-lang/rust/issues/22252 - let request = hyper::Request::post(&self.device_code_url) + let request = hyper::Request::post(device_code_url) .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") - .body(hyper::Body::from(req))?; + .body(hyper::Body::from(req)) + .into_future(); + request + .then( + move |request: Result, http::Error>| { + let request = request.unwrap(); + client.request(request) + }, + ) + .then( + |r: Result, hyper::error::Error>| { + match r { + Err(err) => { + return Err(RequestError::ClientError(err)); + } + Ok(res) => { + #[derive(Deserialize)] + struct JsonData { + device_code: String, + user_code: String, + verification_url: String, + expires_in: i64, + interval: i64, + } - // TODO: move the ? on request - let ret = match self.client.request(request).wait() { - Err(err) => { - return Err(RequestError::ClientError(err)); // TODO: failed here - } - Ok(res) => { - #[derive(Deserialize)] - struct JsonData { - device_code: String, - user_code: String, - verification_url: String, - expires_in: i64, - interval: i64, - } + let json_str: String = res + .into_body() + .concat2() + .wait() + .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) + .unwrap(); // TODO: error handling - let json_str: String = res - .into_body() - .concat2() - .wait() - .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) - .unwrap(); // TODO: error handling + // check for error + match json::from_str::(&json_str) { + Err(_) => {} // ignore, move on + Ok(res) => return Err(RequestError::from(res)), + } - // check for error - match json::from_str::(&json_str) { - Err(_) => {} // ignore, move on - Ok(res) => return Err(RequestError::from(res)), - } + let decoded: JsonData = json::from_str(&json_str).unwrap(); - let decoded: JsonData = json::from_str(&json_str).unwrap(); - - self.device_code = decoded.device_code; - let pi = PollInformation { - user_code: decoded.user_code, - verification_url: decoded.verification_url, - expires_at: Utc::now() + chrono::Duration::seconds(decoded.expires_in), - interval: Duration::from_secs(i64::abs(decoded.interval) as u64), - }; - self.state = Some(DeviceFlowState::Pending(pi.clone())); - - Ok(pi) - } - }; - - ret + let pi = PollInformation { + user_code: decoded.user_code, + verification_url: decoded.verification_url, + expires_at: Utc::now() + + chrono::Duration::seconds(decoded.expires_in), + interval: Duration::from_secs(i64::abs(decoded.interval) as u64), + }; + Ok((pi, decoded.device_code)) + } + } + }, + ) } /// If the first call is successful, this method may be called. @@ -168,150 +266,216 @@ where /// /// Do not call after `PollError::Expired|PollError::AccessDenied` was among the /// `Err(PollError)` variants as the flow will not do anything anymore. - /// Thus in any unsuccessful case which is not `PollError::HttpError`, you will have to start /// over the entire flow, which requires a new instance of this type. + /// Thus in any unsuccessful case which is not `PollError::HttpError`, you will have to start + /// over the entire flow, which requires a new instance of this type. /// /// > ⚠️ **Warning**: We assume the caller doesn't call faster than `interval` and are not /// > protected against this kind of mis-use. /// /// # Examples /// See test-cases in source code for a more complete example. - pub fn poll_token(&mut self) -> Result, &PollError> { - // clone, as we may re-assign our state later - let pi = match self.state { - Some(ref s) => match *s { - DeviceFlowState::Pending(ref pi) => pi.clone(), - DeviceFlowState::Error => return Err(self.error.as_ref().unwrap()), - DeviceFlowState::Success(ref t) => return Ok(Some(t.clone())), - }, - _ => panic!("You have to call request_code() beforehand"), + fn poll_token<'a>( + application_secret: ApplicationSecret, + client: hyper::Client, + device_code: String, + pi: PollInformation, + mut fd: FD, + ) -> impl Future, Error = PollError> { + let expired = if pi.expires_at <= Utc::now() { + fd.expired(&pi.expires_at); + Err(PollError::Expired(pi.expires_at)).into_future() + } else { + Ok(()).into_future() }; - if pi.expires_at <= Utc::now() { - self.error = Some(PollError::Expired(pi.expires_at)); - self.state = Some(DeviceFlowState::Error); - return Err(&self.error.as_ref().unwrap()); - } - // We should be ready for a new request let req = form_urlencoded::Serializer::new(String::new()) .extend_pairs(&[ - ("client_id", &self.application_secret.client_id[..]), - ("client_secret", &self.application_secret.client_secret), - ("code", &self.device_code), + ("client_id", &application_secret.client_id[..]), + ("client_secret", &application_secret.client_secret), + ("code", &device_code), ("grant_type", "http://oauth.net/grant_type/device/1.0"), ]) .finish(); - let request = hyper::Request::post(&self.application_secret.token_uri) + let request = hyper::Request::post(&application_secret.token_uri) .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") .body(hyper::Body::from(req)) .unwrap(); // TODO: Error checking - let json_str: String = match self.client.request(request).wait() { - Err(err) => { - self.error = Some(PollError::HttpError(err)); - return Err(self.error.as_ref().unwrap()); - } - Ok(res) => { + expired + .and_then(move |_| client.request(request).map_err(|e| PollError::HttpError(e))) + .map(|res| { res.into_body() .concat2() .wait() .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) .unwrap() // TODO: error handling - } - }; + }) + .and_then(move |json_str: String| { + #[derive(Deserialize)] + struct JsonError { + error: String, + } - #[derive(Deserialize)] - struct JsonError { - error: String, - } - - match json::from_str::(&json_str) { - Err(_) => {} // ignore, move on, it's not an error - Ok(res) => { - match res.error.as_ref() { - "access_denied" => { - self.error = Some(PollError::AccessDenied); - self.state = Some(DeviceFlowState::Error); - return Err(self.error.as_ref().unwrap()); + match json::from_str::(&json_str) { + Err(_) => {} // ignore, move on, it's not an error + Ok(res) => { + match res.error.as_ref() { + "access_denied" => { + fd.denied(); + return Err(PollError::AccessDenied); + } + "authorization_pending" => return Ok(None), + s => { + return Err(PollError::Other(format!( + "server message '{}' not understood", + s + ))) + } + }; } - "authorization_pending" => return Ok(None), - _ => panic!("server message '{}' not understood", res.error), - }; - } - } + } - // yes, we expect that ! - let mut t: Token = json::from_str(&json_str).unwrap(); - t.set_expiry_absolute(); + // yes, we expect that ! + let mut t: Token = json::from_str(&json_str).unwrap(); + t.set_expiry_absolute(); - let res = Ok(Some(t.clone())); - self.state = Some(DeviceFlowState::Success(t)); - return res; + Ok(Some(t.clone())) + }) } } #[cfg(test)] -pub mod tests { +mod tests { + use hyper; + use hyper_tls::HttpsConnector; + use mockito; + use tokio; + use super::*; - - mock_connector_in_order!(MockGoogleAuth { - "HTTP/1.1 200 OK\r\n\ - Server: BOGUS\r\n\ - \r\n\ - {\r\n\ - \"device_code\" : \"4/L9fTtLrhY96442SEuf1Rl3KLFg3y\",\r\n\ - \"user_code\" : \"a9xfwk9c\",\r\n\ - \"verification_url\" : \"http://www.google.com/device\",\r\n\ - \"expires_in\" : 1800,\r\n\ - \"interval\" : 0\r\n\ - }" - "HTTP/1.1 200 OK\r\n\ - Server: BOGUS\r\n\ - \r\n\ - {\r\n\ - \"error\" : \"authorization_pending\"\r\n\ - }" - "HTTP/1.1 200 OK\r\nServer: \ - BOGUS\r\n\r\n{\r\n\"access_token\":\"1/fFAGRNJru1FTz70BzhT3Zg\",\ - \r\n\"expires_in\":3920,\r\n\"token_type\":\"Bearer\",\ - \r\n\"refresh_token\":\ - \"1/6BMfW9j53gdGImsixUH6kU5RsR4zwI9lUVX-tqf8JXQ\"\r\n}" - }); - - const TEST_APP_SECRET: &'static str = r#"{"installed":{"client_id":"384278056379-tr5pbot1mil66749n639jo54i4840u77.apps.googleusercontent.com","project_id":"sanguine-rhythm-105020","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://accounts.google.com/o/oauth2/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"QeQUnhzsiO4t--ZGmj9muUAu","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#; + use crate::helper::parse_application_secret; #[test] - fn working_flow() { - use crate::helper::parse_application_secret; - - let runtime = tokio::runtime::Runtime::new().unwrap(); - let appsecret = parse_application_secret(&TEST_APP_SECRET.to_string()).unwrap(); - let client = hyper::Client::builder() - .executor(runtime.executor()) - .build(MockGoogleAuth::default()); - - let mut flow = DeviceFlow::new(client, &appsecret, GOOGLE_DEVICE_CODE_URL); - - match flow.request_code(&["https://www.googleapis.com/auth/youtube.upload"]) { - Ok(pi) => assert_eq!(pi.interval, Duration::from_secs(0)), - Err(err) => assert!(false, "request_code failed: {}", err), - } - - match flow.poll_token() { - Ok(None) => {} - _ => unreachable!(), - } - - let t = match flow.poll_token() { - Ok(Some(t)) => { - assert_eq!(t.access_token, "1/fFAGRNJru1FTz70BzhT3Zg"); - t + fn test_device_end2end() { + #[derive(Clone)] + struct FD; + impl FlowDelegate for FD { + fn present_user_code(&mut self, pi: &PollInformation) { + assert_eq!("https://example.com/verify", pi.verification_url); } - _ => unreachable!(), - }; + } - // from now on, all calls will yield the same result - // As our mock has only 3 items, we would panic on this call - assert_eq!(flow.poll_token().unwrap(), Some(t)); + let server_url = mockito::server_url(); + let app_secret = r#"{"installed":{"client_id":"902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com","project_id":"yup-test-243420","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"iuMPN6Ne1PD7cos29Tk9rlqH","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#; + let mut app_secret = parse_application_secret(app_secret).unwrap(); + app_secret.token_uri = format!("{}/token", server_url); + let device_code_url = format!("{}/code", server_url); + + let https = HttpsConnector::new(1).expect("tls"); + let client = hyper::Client::builder() + .keep_alive(false) + .build::<_, hyper::Body>(https); + + let mut flow = DeviceFlow::new(client.clone(), app_secret, FD, Some(device_code_url)); + + let mut rt = tokio::runtime::Builder::new() + .core_threads(1) + .panic_handler(|e| std::panic::resume_unwind(e)) + .build() + .unwrap(); + + // Successful path + { + let code_response = r#"{"device_code": "devicecode", "user_code": "usercode", "verification_url": "https://example.com/verify", "expires_in": 1234567, "interval": 1}"#; + let _m = mockito::mock("POST", "/code") + .match_body(mockito::Matcher::Regex( + ".*client_id=902216714886.*".to_string(), + )) + .with_status(200) + .with_body(code_response) + .create(); + let token_response = r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 1234567}"#; + let _m = mockito::mock("POST", "/token") + .match_body(mockito::Matcher::Regex( + ".*client_secret=iuMPN6Ne1PD7cos29Tk9rlqH&code=devicecode.*".to_string(), + )) + .with_status(200) + .with_body(token_response) + .create(); + + let fut = flow + .token(vec!["https://www.googleapis.com/scope/1"].iter()) + .then(|token| { + let token = token.unwrap(); + assert_eq!("accesstoken", token.access_token); + Ok(()) as Result<(), ()> + }); + rt.block_on(fut).expect("block_on"); + + _m.assert(); + } + // Code is not delivered. + { + let code_response = + r#"{"error": "invalid_client_id", "error_description": "description"}"#; + let _m = mockito::mock("POST", "/code") + .match_body(mockito::Matcher::Regex( + ".*client_id=902216714886.*".to_string(), + )) + .with_status(400) + .with_body(code_response) + .create(); + let token_response = r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 1234567}"#; + let _m = mockito::mock("POST", "/token") + .match_body(mockito::Matcher::Regex( + ".*client_secret=iuMPN6Ne1PD7cos29Tk9rlqH&code=devicecode.*".to_string(), + )) + .with_status(200) + .with_body(token_response) + .expect(0) // Never called! + .create(); + + let fut = flow + .token(vec!["https://www.googleapis.com/scope/1"].iter()) + .then(|token| { + assert!(token.is_err()); + assert!(format!("{}", token.unwrap_err()).contains("invalid_client_id")); + Ok(()) as Result<(), ()> + }); + rt.block_on(fut).expect("block_on"); + + _m.assert(); + } + // Token is not delivered. + { + let code_response = r#"{"device_code": "devicecode", "user_code": "usercode", "verification_url": "https://example.com/verify", "expires_in": 1234567, "interval": 1}"#; + let _m = mockito::mock("POST", "/code") + .match_body(mockito::Matcher::Regex( + ".*client_id=902216714886.*".to_string(), + )) + .with_status(200) + .with_body(code_response) + .create(); + let token_response = r#"{"error": "access_denied"}"#; + let _m = mockito::mock("POST", "/token") + .match_body(mockito::Matcher::Regex( + ".*client_secret=iuMPN6Ne1PD7cos29Tk9rlqH&code=devicecode.*".to_string(), + )) + .with_status(400) + .with_body(token_response) + .expect(1) + .create(); + + let fut = flow + .token(vec!["https://www.googleapis.com/scope/1"].iter()) + .then(|token| { + assert!(token.is_err()); + assert!(format!("{}", token.unwrap_err()).contains("Access denied by user")); + Ok(()) as Result<(), ()> + }); + rt.block_on(fut).expect("block_on"); + + _m.assert(); + } } } diff --git a/src/installed.rs b/src/installed.rs index 8f5db98..d734887 100644 --- a/src/installed.rs +++ b/src/installed.rs @@ -3,22 +3,18 @@ // Refer to the project root for licensing information. // use std::convert::AsRef; -use std::error::Error; -use std::io; use std::sync::{Arc, Mutex}; -use futures; +use futures::prelude::*; use futures::stream::Stream; use futures::sync::oneshot; -use futures::Future; use hyper; use hyper::{header, StatusCode, Uri}; -use serde_json::error; use url::form_urlencoded; use url::percent_encoding::{percent_encode, QUERY_ENCODE_SET}; -use crate::authenticator_delegate::AuthenticatorDelegate; -use crate::types::{ApplicationSecret, Token}; +use crate::authenticator_delegate::FlowDelegate; +use crate::types::{ApplicationSecret, GetToken, RequestError, Token}; const OOB_REDIRECT_URI: &'static str = "urn:ietf:wg:oauth:2.0:oob"; @@ -47,6 +43,7 @@ where url.push_str(auth_uri); vec![ format!("?scope={}", scopes_string), + format!("&access_type=offline"), format!( "&redirect_uri={}", redirect_uri.unwrap_or(OOB_REDIRECT_URI.to_string()) @@ -61,9 +58,37 @@ where }) } -pub struct InstalledFlow { - client: hyper::Client, - server: Option, +impl + GetToken for InstalledFlow +{ + fn token<'b, I, T>( + &mut self, + scopes: I, + ) -> Box + Send> + where + T: AsRef + Ord + 'b, + I: Iterator, + { + Box::new(self.obtain_token(scopes.into_iter().map(|s| s.as_ref().to_string()).collect())) + } + fn api_key(&mut self) -> Option { + None + } + fn application_secret(&self) -> ApplicationSecret { + self.appsecret.clone() + } +} + +/// InstalledFlow provides tokens for services that follow the "Installed" OAuth flow. (See +/// https://www.oauth.com/oauth2-servers/authorization/, +/// https://developers.google.com/identity/protocols/OAuth2InstalledApp). You should use it wrapped +/// inside an `Authenticator` to benefit from refreshing tokens and caching previously obtained +/// authorization. +pub struct InstalledFlow { + method: InstalledFlowReturnMethod, + client: hyper::client::Client, + fd: FD, + appsecret: ApplicationSecret, } /// cf. https://developers.google.com/identity/protocols/OAuth2InstalledApp#choosingredirecturi @@ -77,153 +102,193 @@ pub enum InstalledFlowReturnMethod { HTTPRedirect(u16), } -impl InstalledFlow -where - C: hyper::client::connect::Connect, +impl<'c, FD: 'static + FlowDelegate + Clone + Send, C: 'c + hyper::client::connect::Connect> + InstalledFlow { /// Starts a new Installed App auth flow. /// If HTTPRedirect is chosen as method and the server can't be started, the flow falls /// back to Interactive. pub fn new( - client: hyper::Client, - method: Option, - ) -> InstalledFlow { - let default = InstalledFlow { + client: hyper::client::Client, + fd: FD, + secret: ApplicationSecret, + method: InstalledFlowReturnMethod, + ) -> InstalledFlow { + InstalledFlow { + method: method, + fd: fd, + appsecret: secret, client: client, - server: None, - }; - match method { - None => default, - Some(InstalledFlowReturnMethod::Interactive) => default, - // Start server on localhost to accept auth code. - Some(InstalledFlowReturnMethod::HTTPRedirect(port)) => { - match InstalledFlowServer::new(port) { - Result::Err(_) => default, - Result::Ok(server) => InstalledFlow { - client: default.client, - server: Some(server), - }, - } - } } } /// Handles the token request flow; it consists of the following steps: - /// . Obtain a auhorization code with user cooperation or internal redirect. + /// . Obtain a authorization code with user cooperation or internal redirect. /// . Obtain a token and refresh token using that code. /// . Return that token /// - /// It's recommended not to use the DefaultAuthenticatorDelegate, but a specialized one. - pub fn obtain_token<'a, AD: AuthenticatorDelegate, S, T>( + /// It's recommended not to use the DefaultFlowDelegate, but a specialized one. + pub fn obtain_token<'a>( &mut self, - auth_delegate: &mut AD, + scopes: Vec, // Note: I haven't found a better way to give a list of strings here, due to ownership issues with futures. + ) -> impl 'a + Future + Send { + let rduri = self.fd.redirect_uri(); + // Start server on localhost to accept auth code. + let server = if let InstalledFlowReturnMethod::HTTPRedirect(port) = self.method { + match InstalledFlowServer::new(port) { + Result::Err(e) => Err(RequestError::ClientError(e)), + Result::Ok(server) => Ok(Some(server)), + } + } else { + Ok(None) + }; + let port = if let Ok(Some(ref srv)) = server { + Some(srv.port) + } else { + None + }; + let client = self.client.clone(); + let (appsecclone, appsecclone2) = (self.appsecret.clone(), self.appsecret.clone()); + let auth_delegate = self.fd.clone(); + server + .into_future() + // First: Obtain authorization code from user. + .and_then(move |server| { + Self::ask_authorization_code(server, auth_delegate, &appsecclone, scopes.iter()) + }) + // Exchange the authorization code provided by Google/the provider for a refresh and an + // access token. + .and_then(move |authcode| { + let request = Self::request_token(appsecclone2, authcode, rduri, port); + let result = client.request(request); + // Handle result here, it makes ownership tracking easier. + result + .and_then(move |r| { + r.into_body() + .concat2() + .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) // TODO: error handling + }) + .then(|body_or| { + let resp = match body_or { + Err(e) => return Err(RequestError::ClientError(e)), + Ok(s) => s, + }; + + let token_resp: Result = + serde_json::from_str(&resp); + + match token_resp { + Err(e) => { + return Err(RequestError::JSONError(e)); + } + Ok(tok) => { + if tok.error.is_some() { + Err(RequestError::NegativeServerResponse( + tok.error.unwrap(), + tok.error_description, + )) + } else { + Ok(tok) + } + } + } + }) + }) + // Return the combined token. + .and_then(|tokens| { + // Successful response + if tokens.access_token.is_some() { + let mut token = Token { + access_token: tokens.access_token.unwrap(), + refresh_token: tokens.refresh_token.unwrap(), + token_type: tokens.token_type.unwrap(), + expires_in: tokens.expires_in, + expires_in_timestamp: None, + }; + + token.set_expiry_absolute(); + Ok(token) + } else { + Err(RequestError::NegativeServerResponse( + tokens.error.unwrap(), + tokens.error_description, + )) + } + }) + } + + fn ask_authorization_code<'a, S, T>( + server: Option, + mut auth_delegate: FD, appsecret: &ApplicationSecret, scopes: S, - ) -> Result> + ) -> Box + Send> where T: AsRef + 'a, S: Iterator, { - let authcode = self.get_authorization_code(auth_delegate, &appsecret, scopes)?; - let tokens = self.request_token(&appsecret, &authcode, auth_delegate.redirect_uri())?; - - // Successful response - if tokens.access_token.is_some() { - let mut token = Token { - access_token: tokens.access_token.unwrap(), - refresh_token: tokens.refresh_token.unwrap(), - token_type: tokens.token_type.unwrap(), - expires_in: tokens.expires_in, - expires_in_timestamp: None, - }; - - token.set_expiry_absolute(); - Result::Ok(token) - } else { - let err = io::Error::new( - io::ErrorKind::Other, - format!( - "Token API error: {} {}", - tokens.error.unwrap_or("".to_string()), - tokens.error_description.unwrap_or("".to_string()) - ) - .as_str(), + if server.is_none() { + let url = build_authentication_request_url( + &appsecret.auth_uri, + &appsecret.client_id, + scopes, + auth_delegate.redirect_uri(), ); - Result::Err(Box::new(err)) + Box::new( + auth_delegate + .present_user_url(&url, true /* need_code */) + .then(|r| { + match r { + Ok(Some(mut code)) => { + // Partial backwards compatibilty in case an implementation adds a new line + // due to previous behaviour. + let ends_with_newline = + code.chars().last().map(|c| c == '\n').unwrap_or(false); + if ends_with_newline { + code.pop(); + } + Ok(code) + } + _ => Err(RequestError::UserError("couldn't read code".to_string())), + } + }), + ) + } else { + let mut server = server.unwrap(); + // The redirect URI must be this very localhost URL, otherwise authorization is refused + // by certain providers. + let url = build_authentication_request_url( + &appsecret.auth_uri, + &appsecret.client_id, + scopes, + auth_delegate + .redirect_uri() + .or_else(|| Some(format!("http://localhost:{}", server.port))), + ); + Box::new( + auth_delegate + .present_user_url(&url, false /* need_code */) + .then(move |_| server.block_till_auth()) + .map_err(|e| { + RequestError::UserError(format!( + "could not obtain token via redirect: {}", + e + )) + }), + ) } } - /// Obtains an authorization code either interactively or via HTTP redirect (see - /// InstalledFlowReturnMethod). - fn get_authorization_code<'a, AD: AuthenticatorDelegate, S, T>( - &mut self, - auth_delegate: &mut AD, - appsecret: &ApplicationSecret, - scopes: S, - ) -> Result> - where - T: AsRef + 'a, - S: Iterator, - { - let server = self.server.take(); // Will shutdown the server if present when goes out of scope - let result: Result> = match server { - None => { - let url = build_authentication_request_url( - &appsecret.auth_uri, - &appsecret.client_id, - scopes, - auth_delegate.redirect_uri(), - ); - match auth_delegate.present_user_url(&url, true /* need_code */) { - None => Result::Err(Box::new(io::Error::new( - io::ErrorKind::UnexpectedEof, - "couldn't read code", - ))), - Some(mut code) => { - // Partial backwards compatibilty in case an implementation adds a new line - // due to previous behaviour. - let ends_with_newline = - code.chars().last().map(|c| c == '\n').unwrap_or(false); - if ends_with_newline { - code.pop(); - } - Result::Ok(code) - } - } - } - Some(mut server) => { - // The redirect URI must be this very localhost URL, otherwise Google refuses - // authorization. - let url = build_authentication_request_url( - &appsecret.auth_uri, - &appsecret.client_id, - scopes, - auth_delegate - .redirect_uri() - .or_else(|| Some(format!("http://localhost:{}", server.port))), - ); - auth_delegate.present_user_url(&url, false /* need_code */); - - match server.block_till_auth() { - Result::Err(e) => Result::Err(Box::new(e)), - Result::Ok(s) => Result::Ok(s), - } - } - }; - - result - } - /// Sends the authorization code to the provider in order to obtain access and refresh tokens. - fn request_token( - &self, - appsecret: &ApplicationSecret, - authcode: &str, + fn request_token<'a>( + appsecret: ApplicationSecret, + authcode: String, custom_redirect_uri: Option, - ) -> Result> { - let redirect_uri = custom_redirect_uri.unwrap_or_else(|| match &self.server { + port: Option, + ) -> hyper::Request { + let redirect_uri = custom_redirect_uri.unwrap_or_else(|| match port { None => OOB_REDIRECT_URI.to_string(), - Some(server) => format!("http://localhost:{}", server.port), + Some(port) => format!("http://localhost:{}", port), }); let body = form_urlencoded::Serializer::new(String::new()) @@ -236,35 +301,11 @@ where ]) .finish(); - let request = hyper::Request::post(&appsecret.token_uri) + let request = hyper::Request::post(appsecret.token_uri) .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") .body(hyper::Body::from(body)) .unwrap(); // TODO: error check - - let result = self.client.request(request).wait(); - - let resp = match result { - Result::Err(e) => return Result::Err(Box::new(e)), - Result::Ok(res) => { - let result = res - .into_body() - .concat2() - .wait() - .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()); // TODO: error handling - - match result { - Result::Err(e) => return Result::Err(Box::new(e)), - Result::Ok(s) => s, - } - } - }; - - let token_resp: Result = serde_json::from_str(&resp); - - match token_resp { - Result::Err(e) => return Result::Err(Box::new(e)), - Result::Ok(tok) => Result::Ok(tok) as Result>, - } + request } } @@ -287,37 +328,32 @@ struct InstalledFlowServer { } impl InstalledFlowServer { - fn new(port: u16) -> Result { - let bound_port = hyper::server::conn::AddrIncoming::bind(&([127, 0, 0, 1], port).into()); - match bound_port { - Result::Err(_) => Result::Err(()), - Result::Ok(bound_port) => { - let port = bound_port.local_addr().port(); + fn new(port: u16) -> Result { + let (auth_code_tx, auth_code_rx) = oneshot::channel::(); + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let (auth_code_tx, auth_code_rx) = oneshot::channel::(); - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let threadpool = tokio_threadpool::Builder::new() + .pool_size(1) + .name_prefix("InstalledFlowServer-") + .build(); + let service_maker = InstalledFlowServiceMaker::new(auth_code_tx); - let threadpool = tokio_threadpool::Builder::new() - .pool_size(1) - .name_prefix("InstalledFlowServer-") - .build(); - let service_maker = InstalledFlowServiceMaker::new(auth_code_tx); - let server = hyper::server::Server::builder(bound_port) - .http1_only(true) - .serve(service_maker) - .with_graceful_shutdown(shutdown_rx) - .map_err(|err| panic!("Failed badly: {}", err)); // TODO: Error handling + let addr = format!("127.0.0.1:{}", port); + let builder = hyper::server::Server::try_bind(&addr.parse().unwrap())?; + let server = builder.http1_only(true).serve(service_maker); + let port = server.local_addr().port(); + let server_future = server + .with_graceful_shutdown(shutdown_rx) + .map_err(|err| panic!("Failed badly: {}", err)); - threadpool.spawn(server); + threadpool.spawn(server_future); - Result::Ok(InstalledFlowServer { - port, - shutdown_tx: Option::Some(shutdown_tx), - auth_code_rx: Option::Some(auth_code_rx), - threadpool: Option::Some(threadpool), - }) - } - } + Result::Ok(InstalledFlowServer { + port: port, + shutdown_tx: Some(shutdown_tx), + auth_code_rx: Some(auth_code_rx), + threadpool: Some(threadpool), + }) } fn block_till_auth(&mut self) -> Result { @@ -466,9 +502,9 @@ impl hyper::service::Service for InstalledFlowService { impl InstalledFlowService { fn handle_url(&mut self, url: hyper::Uri) { - // Google redirects to the specified localhost URL, appending the authorization + // The provider redirects to the specified localhost URL, appending the authorization // code, like this: http://localhost:8080/xyz/?code=4/731fJ3BheyCouCniPufAd280GHNV5Ju35yYcGs - // We take that code and send it to the get_authorization_code() function that + // We take that code and send it to the ask_authorization_code() function that // waits for it. for (param, val) in form_urlencoded::parse(url.query().unwrap_or("").as_bytes()) { if param == "code".to_string() { @@ -489,13 +525,173 @@ impl InstalledFlowService { #[cfg(test)] mod tests { + use std::error::Error; + use std::fmt; + use std::str::FromStr; + + use hyper; + use hyper::client::connect::HttpConnector; + use hyper_tls::HttpsConnector; + use mockito::{self, mock}; + use tokio; + use super::*; + use crate::authenticator_delegate::FlowDelegate; + use crate::helper::*; + use crate::types::StringError; + + #[test] + fn test_end2end() { + #[derive(Clone)] + struct FD( + String, + hyper::Client, hyper::Body>, + ); + impl FlowDelegate for FD { + /// Depending on need_code, return the pre-set code or send the code to the server at + /// the redirect_uri given in the url. + fn present_user_url + fmt::Display>( + &mut self, + url: S, + need_code: bool, + ) -> Box, Error = Box> + Send> + { + if need_code { + Box::new(Ok(Some(self.0.clone())).into_future()) + } else { + // Parse presented url to obtain redirect_uri with location of local + // code-accepting server. + let uri = Uri::from_str(url.as_ref()).unwrap(); + let query = uri.query().unwrap(); + let parsed = form_urlencoded::parse(query.as_bytes()).into_owned(); + let mut rduri = None; + for (k, v) in parsed { + if k == "redirect_uri" { + rduri = Some(v); + break; + } + } + if rduri.is_none() { + return Box::new( + Err(Box::new(StringError::new("no redirect uri!", None)) + as Box) + .into_future(), + ); + } + let mut rduri = rduri.unwrap(); + rduri.push_str(&format!("?code={}", self.0)); + let rduri = Uri::from_str(rduri.as_ref()).unwrap(); + // Hit server. + return Box::new( + self.1 + .get(rduri) + .map_err(|e| Box::new(e) as Box) + .map(|_| None), + ); + } + } + } + + let server_url = mockito::server_url(); + let app_secret = r#"{"installed":{"client_id":"902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com","project_id":"yup-test-243420","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"iuMPN6Ne1PD7cos29Tk9rlqH","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#; + let mut app_secret = parse_application_secret(app_secret).unwrap(); + app_secret.token_uri = format!("{}/token", server_url); + + let https = HttpsConnector::new(1).expect("tls"); + let client = hyper::Client::builder() + .keep_alive(false) + .build::<_, hyper::Body>(https); + + let fd = FD("authorizationcode".to_string(), client.clone()); + let mut inf = InstalledFlow::new( + client.clone(), + fd, + app_secret.clone(), + InstalledFlowReturnMethod::Interactive, + ); + + let mut rt = tokio::runtime::Builder::new() + .core_threads(1) + .panic_handler(|e| std::panic::resume_unwind(e)) + .build() + .unwrap(); + + // Successful path. + { + let _m = mock("POST", "/token") + .match_body(mockito::Matcher::Regex(".*code=authorizationcode.*client_id=9022167.*".to_string())) + .with_body(r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 12345678}"#) + .expect(1) + .create(); + + let fut = inf + .token(vec!["https://googleapis.com/some/scope"].iter()) + .and_then(|tok| { + assert_eq!("accesstoken", tok.access_token); + assert_eq!("refreshtoken", tok.refresh_token); + assert_eq!("Bearer", tok.token_type); + Ok(()) + }); + rt.block_on(fut).expect("block on"); + _m.assert(); + } + // Successful path with HTTP redirect. + { + let mut inf = InstalledFlow::new( + client.clone(), + FD( + "authorizationcodefromlocalserver".to_string(), + client.clone(), + ), + app_secret, + InstalledFlowReturnMethod::HTTPRedirect(8081), + ); + let _m = mock("POST", "/token") + .match_body(mockito::Matcher::Regex(".*code=authorizationcodefromlocalserver.*client_id=9022167.*".to_string())) + .with_body(r#"{"access_token": "accesstoken", "refresh_token": "refreshtoken", "token_type": "Bearer", "expires_in": 12345678}"#) + .expect(1) + .create(); + + let fut = inf + .token(vec!["https://googleapis.com/some/scope"].iter()) + .and_then(|tok| { + assert_eq!("accesstoken", tok.access_token); + assert_eq!("refreshtoken", tok.refresh_token); + assert_eq!("Bearer", tok.token_type); + Ok(()) + }); + rt.block_on(fut).expect("block on"); + _m.assert(); + } + // Error from server. + { + let _m = mock("POST", "/token") + .match_body(mockito::Matcher::Regex( + ".*code=authorizationcode.*client_id=9022167.*".to_string(), + )) + .with_status(400) + .with_body(r#"{"error": "invalid_code"}"#) + .expect(1) + .create(); + + let fut = inf + .token(vec!["https://googleapis.com/some/scope"].iter()) + .then(|tokr| { + assert!(tokr.is_err()); + assert!(format!("{}", tokr.unwrap_err()).contains("invalid_code")); + Ok(()) as Result<(), ()> + }); + rt.block_on(fut).expect("block on"); + _m.assert(); + } + rt.shutdown_on_idle().wait().expect("shutdown"); + } #[test] fn test_request_url_builder() { assert_eq!( "https://accounts.google.\ - com/o/oauth2/auth?scope=email%20profile&redirect_uri=urn:ietf:wg:oauth:2.0:\ + com/o/oauth2/auth?scope=email%20profile&access_type=offline&redirect_uri=urn:ietf:wg:oauth:2.0:\ oob&response_type=code&client_id=812741506391-h38jh0j4fv0ce1krdkiq0hfvt6n5amr\ f.apps.googleusercontent.com", build_authentication_request_url( diff --git a/src/lib.rs b/src/lib.rs index 12d5e46..26b59cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,41 +34,74 @@ //! The returned `Token` is stored permanently in the given token storage in order to //! authorize future API requests to the same scopes. //! +//! The following example, which is derived from the (actual and runnable) example in +//! `examples/test-installed/`, shows the basics of using this crate: +//! //! ```test_harness,no_run -//! #[macro_use] -//! extern crate serde_derive; +//! use futures::prelude::*; +//! use yup_oauth2::GetToken; +//! use yup_oauth2::{Authenticator, InstalledFlow}; //! -//! use yup_oauth2::{Authenticator, DefaultAuthenticatorDelegate, PollInformation, ConsoleApplicationSecret, MemoryStorage, GetToken}; -//! use serde_json as json; -//! use std::default::Default; -//! use hyper::Client; +//! use hyper::client::Client; //! use hyper_tls::HttpsConnector; -//! # const SECRET: &'static str = "{\"installed\":{\"auth_uri\":\"https://accounts.google.com/o/oauth2/auth\",\"client_secret\":\"UqkDJd5RFwnHoiG5x5Rub8SI\",\"token_uri\":\"https://accounts.google.com/o/oauth2/token\",\"client_email\":\"\",\"redirect_uris\":[\"urn:ietf:wg:oauth:2.0:oob\",\"oob\"],\"client_x509_cert_url\":\"\",\"client_id\":\"14070749909-vgip2f1okm7bkvajhi9jugan6126io9v.apps.googleusercontent.com\",\"auth_provider_x509_cert_url\":\"https://www.googleapis.com/oauth2/v1/certs\"}}"; //! -//! # #[test] fn device() { -//! let secret = json::from_str::(SECRET).unwrap().installed.unwrap(); -//! let res = Authenticator::new(&secret, DefaultAuthenticatorDelegate, -//! Client::builder().build(HttpsConnector::new(4).unwrap()), -//! ::default(), None) -//! .token(&["https://www.googleapis.com/auth/youtube.upload"]); -//! match res { -//! Ok(t) => { -//! // now you can use t.access_token to authenticate API calls within your -//! // given scopes. It will not be valid forever, but Authenticator will automatically -//! // refresh the token for you. -//! }, -//! Err(err) => println!("Failed to acquire token: {}", err), +//! use std::path::Path; +//! +//! fn main() { +//! // Boilerplate: Set up hyper HTTP client and TLS. +//! let https = HttpsConnector::new(1).expect("tls"); +//! let client = Client::builder() +//! .keep_alive(false) +//! .build::<_, hyper::Body>(https); +//! +//! // Read application secret from a file. Sometimes it's easier to compile it directly into +//! // the binary. The clientsecret file contains JSON like `{"installed":{"client_id": ... }}` +//! let secret = yup_oauth2::read_application_secret(Path::new("clientsecret.json")) +//! .expect("clientsecret.json"); +//! +//! // There are two types of delegates; FlowDelegate and AuthenticatorDelegate. See the +//! // respective documentation; all you need to know here is that they determine how the user +//! // is asked to visit the OAuth flow URL or how to read back the provided code. +//! let ad = yup_oauth2::DefaultFlowDelegate; +//! +//! // InstalledFlow handles OAuth flows of that type. They are usually the ones where a user +//! // grants access to their personal account (think Google Drive, Github API, etc.). +//! let inf = InstalledFlow::new( +//! client.clone(), +//! ad, +//! secret, +//! yup_oauth2::InstalledFlowReturnMethod::HTTPRedirect(8081), +//! ); +//! // You could already use InstalledFlow by itself, but usually you want to cache tokens and +//! // refresh them, rather than ask the user every time to log in again. Authenticator wraps +//! // other flows and handles these. +//! // This type of authenticator caches tokens in a JSON file on disk. +//! let mut auth = Authenticator::new_disk( +//! client, +//! inf, +//! yup_oauth2::DefaultAuthenticatorDelegate, +//! "tokencache.json", +//! ) +//! .unwrap(); +//! let s = "https://www.googleapis.com/auth/drive.file".to_string(); +//! let scopes = vec![s]; +//! +//! // token() is the one important function of this crate; it does everything to +//! // obtain a token that can be sent e.g. as Bearer token. +//! let tok = auth.token(scopes.iter()); +//! // Finally we print the token. +//! let fut = tok.map_err(|e| println!("error: {:?}", e)).and_then(|t| { +//! println!("The token is {:?}", t); +//! Ok(()) +//! }); +//! +//! tokio::run(fut) //! } -//! # } //! ``` //! #[macro_use] extern crate serde_derive; -#[cfg(test)] -#[macro_use] -extern crate yup_hyper_mock as hyper_mock; - mod authenticator; mod authenticator_delegate; mod device; @@ -79,16 +112,17 @@ mod service_account; mod storage; mod types; -pub use crate::authenticator::{Authenticator, GetToken, Retry}; +pub use crate::authenticator::Authenticator; pub use crate::authenticator_delegate::{ - AuthenticatorDelegate, DefaultAuthenticatorDelegate, PollError, PollInformation, + AuthenticatorDelegate, DefaultAuthenticatorDelegate, DefaultFlowDelegate, FlowDelegate, + PollInformation, }; pub use crate::device::{DeviceFlow, GOOGLE_DEVICE_CODE_URL}; pub use crate::helper::*; pub use crate::installed::{InstalledFlow, InstalledFlowReturnMethod}; -pub use crate::refresh::{RefreshFlow, RefreshResult}; pub use crate::service_account::*; pub use crate::storage::{DiskTokenStorage, MemoryStorage, NullStorage, TokenStorage}; pub use crate::types::{ - ApplicationSecret, ConsoleApplicationSecret, FlowType, Scheme, Token, TokenType, + ApplicationSecret, ConsoleApplicationSecret, FlowType, GetToken, PollError, RefreshResult, + RequestError, Scheme, Token, TokenType, }; diff --git a/src/refresh.rs b/src/refresh.rs index 2eef3f1..ac1fc72 100644 --- a/src/refresh.rs +++ b/src/refresh.rs @@ -1,4 +1,4 @@ -use crate::types::{ApplicationSecret, FlowType, JsonError}; +use crate::types::{ApplicationSecret, JsonError, RefreshResult, RequestError}; use super::Token; use chrono::Utc; @@ -14,34 +14,9 @@ use url::form_urlencoded; /// Refresh an expired access token, as obtained by any other authentication flow. /// This flow is useful when your `Token` is expired and allows to obtain a new /// and valid access token. -pub struct RefreshFlow { - client: hyper::Client, - result: RefreshResult, -} - -/// All possible outcomes of the refresh flow -pub enum RefreshResult { - // Indicates no attempt has been made to refresh yet - Uninitialized, - /// Indicates connection failure - Error(hyper::Error), - /// The server did not answer with a new token, providing the server message - RefreshError(String, Option), - /// The refresh operation finished successfully, providing a new `Token` - Success(Token), -} - -impl RefreshFlow -where - C: hyper::client::connect::Connect, -{ - pub fn new(client: hyper::Client) -> RefreshFlow { - RefreshFlow { - client: client, - result: RefreshResult::Uninitialized, - } - } +pub struct RefreshFlow; +impl RefreshFlow { /// Attempt to refresh the given token, and obtain a new, valid one. /// If the `RefreshResult` is `RefreshResult::Error`, you may retry within an interval /// of your choice. If it is `RefreshResult:RefreshError`, your refresh token is invalid @@ -56,121 +31,155 @@ where /// /// # Examples /// Please see the crate landing page for an example. - pub fn refresh_token( - &mut self, - _flow_type: FlowType, - client_secret: &ApplicationSecret, - refresh_token: &str, - ) -> &RefreshResult { - if let RefreshResult::Success(_) = self.result { - return &self.result; - } - + pub fn refresh_token<'a, C: 'static + hyper::client::connect::Connect>( + client: hyper::Client, + client_secret: ApplicationSecret, + refresh_token: String, + ) -> impl 'a + Future { let req = form_urlencoded::Serializer::new(String::new()) .extend_pairs(&[ - ("client_id", client_secret.client_id.as_ref()), - ("client_secret", client_secret.client_secret.as_ref()), - ("refresh_token", refresh_token), - ("grant_type", "refresh_token"), + ("client_id", client_secret.client_id.clone()), + ("client_secret", client_secret.client_secret.clone()), + ("refresh_token", refresh_token.to_string()), + ("grant_type", "refresh_token".to_string()), ]) .finish(); - let request = hyper::Request::post(&client_secret.token_uri) + let request = hyper::Request::post(client_secret.token_uri.clone()) .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") .body(hyper::Body::from(req)) .unwrap(); // TODO: error handling - let json_str: String = match self.client.request(request).wait() { - Err(err) => { - self.result = RefreshResult::Error(err); - return &self.result; - } - Ok(res) => { - res.into_body() - .concat2() - .wait() - .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) - .unwrap() // TODO: error handling - } - }; + client + .request(request) + .then(|r| { + match r { + Err(err) => return Err(RefreshResult::Error(err)), + Ok(res) => { + Ok(res + .into_body() + .concat2() + .wait() + .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) + .unwrap()) // TODO: error handling + } + } + }) + .then(move |maybe_json_str: Result| { + if let Err(e) = maybe_json_str { + return Ok(e); + } + let json_str = maybe_json_str.unwrap(); + #[derive(Deserialize)] + struct JsonToken { + access_token: String, + token_type: String, + expires_in: i64, + } - #[derive(Deserialize)] - struct JsonToken { - access_token: String, - token_type: String, - expires_in: i64, - } + match json::from_str::(&json_str) { + Err(_) => {} + Ok(res) => { + return Ok(RefreshResult::RefreshError( + res.error, + res.error_description, + )) + } + } - match json::from_str::(&json_str) { - Err(_) => {} - Ok(res) => { - self.result = RefreshResult::RefreshError(res.error, res.error_description); - return &self.result; - } - } - - let t: JsonToken = json::from_str(&json_str).unwrap(); - self.result = RefreshResult::Success(Token { - access_token: t.access_token, - token_type: t.token_type, - refresh_token: refresh_token.to_string(), - expires_in: None, - expires_in_timestamp: Some(Utc::now().timestamp() + t.expires_in), - }); - - &self.result + let t: JsonToken = json::from_str(&json_str).unwrap(); + Ok(RefreshResult::Success(Token { + access_token: t.access_token, + token_type: t.token_type, + refresh_token: refresh_token.to_string(), + expires_in: None, + expires_in_timestamp: Some(Utc::now().timestamp() + t.expires_in), + })) + }) + .map_err(RequestError::Refresh) } } #[cfg(test)] mod tests { use super::*; - use crate::device::GOOGLE_DEVICE_CODE_URL; - use crate::helper::parse_application_secret; + use crate::helper; - mock_connector!(MockGoogleRefresh { - "https://accounts.google.com" => - "HTTP/1.1 200 OK\r\n\ - Server: BOGUS\r\n\ - \r\n\ - {\r\n\ - \"access_token\":\"1/fFAGRNJru1FTz70BzhT3Zg\",\r\n\ - \"expires_in\":3920,\r\n\ - \"token_type\":\"Bearer\"\r\n\ - }" - }); - - const TEST_APP_SECRET: &'static str = r#"{"installed":{"client_id":"384278056379-tr5pbot1mil66749n639jo54i4840u77.apps.googleusercontent.com","project_id":"sanguine-rhythm-105020","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://accounts.google.com/o/oauth2/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"QeQUnhzsiO4t--ZGmj9muUAu","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#; + use hyper; + use hyper_tls::HttpsConnector; + use mockito; + use tokio; #[test] - fn refresh_flow() { - let appsecret = parse_application_secret(TEST_APP_SECRET).unwrap(); + fn test_refresh_end2end() { + let server_url = mockito::server_url(); - let runtime = tokio::runtime::Runtime::new().unwrap(); + let app_secret = r#"{"installed":{"client_id":"902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com","project_id":"yup-test-243420","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"iuMPN6Ne1PD7cos29Tk9rlqH","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}}"#; + let mut app_secret = helper::parse_application_secret(app_secret).unwrap(); + app_secret.token_uri = format!("{}/token", server_url); + let refresh_token = "my-refresh-token".to_string(); + + let https = HttpsConnector::new(1).unwrap(); let client = hyper::Client::builder() - .executor(runtime.executor()) - .build(MockGoogleRefresh::default()); - let mut flow = RefreshFlow::new(client); - let device_flow = FlowType::Device(GOOGLE_DEVICE_CODE_URL.to_string()); + .keep_alive(false) + .build::<_, hyper::Body>(https); - match flow.refresh_token(device_flow, &appsecret, "bogus_refresh_token") { - RefreshResult::Success(ref t) => { - assert_eq!(t.access_token, "1/fFAGRNJru1FTz70BzhT3Zg"); - assert!(!t.expired()); - } - RefreshResult::Error(err) => { - assert!(false, "Refresh flow failed: RefreshResult::Error({})", err); - } - RefreshResult::RefreshError(msg, err) => { - assert!( - false, - "Refresh flow failed: RefreshResult::RefreshError({}, {:?})", - msg, err - ); - } - RefreshResult::Uninitialized => { - assert!(false, "Refresh flow failed: RefreshResult::Uninitialized"); - } + let mut rt = tokio::runtime::Builder::new() + .core_threads(1) + .panic_handler(|e| std::panic::resume_unwind(e)) + .build() + .unwrap(); + + // Success + { + let _m = mockito::mock("POST", "/token") + .match_body( + mockito::Matcher::Regex(".*client_id=902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com.*refresh_token=my-refresh-token.*".to_string())) + .with_status(200) + .with_body(r#"{"access_token": "new-access-token", "token_type": "Bearer", "expires_in": 1234567}"#) + .create(); + let fut = RefreshFlow::refresh_token( + client.clone(), + app_secret.clone(), + refresh_token.clone(), + ) + .then(|rr| { + let rr = rr.unwrap(); + match rr { + RefreshResult::Success(tok) => { + assert_eq!("new-access-token", tok.access_token); + assert_eq!("Bearer", tok.token_type); + } + _ => panic!(format!("unexpected RefreshResult {:?}", rr)), + } + Ok(()) as Result<(), ()> + }); + + rt.block_on(fut).expect("block_on"); + _m.assert(); + } + // Refresh error. + { + let _m = mockito::mock("POST", "/token") + .match_body( + mockito::Matcher::Regex(".*client_id=902216714886-k2v9uei3p1dk6h686jbsn9mo96tnbvto.apps.googleusercontent.com.*refresh_token=my-refresh-token.*".to_string())) + .with_status(400) + .with_body(r#"{"error": "invalid_token"}"#) + .create(); + + let fut = RefreshFlow::refresh_token(client, app_secret, refresh_token).then(|rr| { + let rr = rr.unwrap(); + match rr { + RefreshResult::RefreshError(e, None) => { + assert_eq!(e, "invalid_token"); + } + _ => panic!(format!("unexpected RefreshResult {:?}", rr)), + } + Ok(()) + }); + + tokio::run(fut); + _m.assert(); } } } diff --git a/src/service_account.rs b/src/service_account.rs index b927a76..74dee21 100644 --- a/src/service_account.rs +++ b/src/service_account.rs @@ -12,17 +12,13 @@ //! use std::default::Default; -use std::error; -use std::result; -use std::str; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex}; -use crate::authenticator::GetToken; use crate::storage::{hash_scopes, MemoryStorage, TokenStorage}; -use crate::types::{StringError, Token}; +use crate::types::{ApplicationSecret, GetToken, JsonError, RequestError, StringError, Token}; use futures::stream::Stream; -use futures::Future; +use futures::{future, prelude::*}; use hyper::header; use url::form_urlencoded; @@ -42,12 +38,13 @@ use serde_json; const GRANT_TYPE: &'static str = "urn:ietf:params:oauth:grant-type:jwt-bearer"; const GOOGLE_RS256_HEAD: &'static str = "{\"alg\":\"RS256\",\"typ\":\"JWT\"}"; -// Encodes s as Base64 +/// Encodes s as Base64 fn encode_base64>(s: T) -> String { base64::encode_config(s.as_ref(), base64::URL_SAFE) } -fn decode_rsa_key(pem_pkcs8: &str) -> Result> { +/// Decode a PKCS8 formatted RSA key. +fn decode_rsa_key(pem_pkcs8: &str) -> Result { let private = pem_pkcs8.to_string().replace("\\n", "\n").into_bytes(); let mut private_reader: &[u8] = private.as_ref(); let private_keys = pemfile::pkcs8_private_keys(&mut private_reader); @@ -56,16 +53,16 @@ fn decode_rsa_key(pem_pkcs8: &str) -> Result 0 { Ok(pk[0].clone()) } else { - Err(Box::new(io::Error::new( + Err(io::Error::new( io::ErrorKind::InvalidInput, "Not enough private keys in PEM", - ))) + )) } } else { - Err(Box::new(io::Error::new( + Err(io::Error::new( io::ErrorKind::InvalidInput, "Error reading key from PEM", - ))) + )) } } @@ -89,6 +86,8 @@ pub struct ServiceAccountKey { pub client_x509_cert_url: Option, } +/// Permissions requested for a JWT. +/// See https://developers.google.com/identity/protocols/OAuth2ServiceAccount#authorizingrequests. #[derive(Serialize, Debug)] struct Claims { iss: String, @@ -99,20 +98,31 @@ struct Claims { scope: String, } +/// A JSON Web Token ready for signing. struct JWT { + /// The value of GOOGLE_RS256_HEAD. header: String, + /// A Claims struct, expressing the set of desired permissions etc. claims: Claims, } impl JWT { + /// Create a new JWT from claims. fn new(claims: Claims) -> JWT { JWT { header: GOOGLE_RS256_HEAD.to_string(), claims: claims, } } - // Encodes the first two parts (header and claims) to base64 and assembles them into a form - // ready to be signed. + + /// Set JWT header. Default is `{"alg":"RS256","typ":"JWT"}`. + #[allow(dead_code)] + pub fn set_header(&mut self, head: String) { + self.header = head; + } + + /// Encodes the first two parts (header and claims) to base64 and assembles them into a form + /// ready to be signed. fn encode_claims(&self) -> String { let mut head = encode_base64(&self.header); let claims = encode_base64(serde_json::to_string(&self.claims).unwrap()); @@ -122,7 +132,8 @@ impl JWT { head } - fn sign(&self, private_key: &str) -> Result> { + /// Sign a JWT base string with `private_key`, which is a PKCS8 string. + fn sign(&self, private_key: &str) -> Result { let mut jwt_head = self.encode_claims(); let key = decode_rsa_key(private_key)?; let signing_key = sign::RSASigningKey::new(&key).map_err(|_| { @@ -136,11 +147,10 @@ impl JWT { .ok_or(io::Error::new( io::ErrorKind::Other, "Couldn't choose signing scheme", - )) - .map_err(|e| Box::new(e) as Box)?; + ))?; let signature = signer .sign(jwt_head.as_bytes()) - .map_err(|e| Box::new(e) as Box)?; + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("{}", e)))?; let signature_b64 = encode_base64(signature); jwt_head.push_str("."); @@ -150,6 +160,8 @@ impl JWT { } } +/// Set `iss`, `aud`, `exp`, `iat`, `scope` field in the returned `Claims`. `scopes` is an iterator +/// yielding strings with OAuth scopes. fn init_claims_from_key<'a, I, T>(key: &ServiceAccountKey, scopes: I) -> Claims where T: AsRef + 'a, @@ -175,19 +187,14 @@ where } } -/// See "Additional claims" at https://developers.google.com/identity/protocols/OAuth2ServiceAccount -#[allow(dead_code)] -fn set_sub_claim(mut claims: Claims, sub: String) -> Claims { - claims.sub = Some(sub); - claims -} - /// A token source (`GetToken`) yielding OAuth tokens for services that use ServiceAccount authorization. -/// This token source caches token and automatically renews expired ones. +/// This token source caches token and automatically renews expired ones, meaning you do not need +/// (and you also should not) use this with `Authenticator`. Just use it directly. +#[derive(Clone)] pub struct ServiceAccountAccess { client: hyper::Client, key: ServiceAccountKey, - cache: MemoryStorage, + cache: Arc>, sub: Option, } @@ -213,10 +220,7 @@ impl TokenResponse { } } -impl<'a, C: 'static> ServiceAccountAccess -where - C: hyper::client::connect::Connect, -{ +impl<'a, C: 'static + hyper::client::connect::Connect> ServiceAccountAccess { /// Returns a new `ServiceAccountAccess` token source. #[allow(dead_code)] pub fn new( @@ -226,11 +230,13 @@ where ServiceAccountAccess { client: client, key: key, - cache: MemoryStorage::default(), + cache: Arc::new(Mutex::new(MemoryStorage::default())), sub: None, } } + /// Set `sub` claim in new `ServiceAccountKey` (see + /// https://developers.google.com/identity/protocols/OAuth2ServiceAccount#authorizingrequests). pub fn with_sub( key: ServiceAccountKey, client: hyper::Client, @@ -239,81 +245,73 @@ where ServiceAccountAccess { client: client, key: key, - cache: MemoryStorage::default(), + cache: Arc::new(Mutex::new(MemoryStorage::default())), sub: Some(sub), } } + /// Send a request for a new Bearer token to the OAuth provider. fn request_token( - &mut self, - scopes: &Vec<&str>, - ) -> result::Result> { - let mut claims = init_claims_from_key(&self.key, scopes); - claims.sub = self.sub.clone(); - let signed = JWT::new(claims).sign(self.key.private_key.as_ref().unwrap())?; - - let body = form_urlencoded::Serializer::new(String::new()) - .extend_pairs(vec![ - ("grant_type".to_string(), GRANT_TYPE.to_string()), - ("assertion".to_string(), signed), - ]) - .finish(); - - let request = hyper::Request::post(self.key.token_uri.as_ref().unwrap()) - .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") - .body(hyper::Body::from(body)) - .unwrap(); // TOOD: error handling - - let body = { - let result: Arc>> = - Arc::new(RwLock::new(Ok(Default::default()))); - let ok = Arc::clone(&result); - let err = Arc::clone(&result); - hyper::rt::run( - self.client - .request(request) - .and_then(|response| response.into_body().concat2()) - .map(move |body| { - *ok.write().unwrap_or_else(|e| unreachable!("{}", e)) = Ok(body); - - () - }) - .map_err(move |e| { - *err.write().unwrap_or_else(|e| unreachable!("{}", e)) = Err(e); - - () - }), - ); - - Arc::try_unwrap(result) - .unwrap_or_else(|e| unreachable!("{:?}", e)) - .into_inner() - .unwrap_or_else(|e| unreachable!("{}", e)) - }; - - let json_str = body + client: hyper::client::Client, + sub: Option, + key: ServiceAccountKey, + scopes: Vec, + ) -> impl Future { + let mut claims = init_claims_from_key(&key, &scopes); + claims.sub = sub.clone(); + let signed = JWT::new(claims) + .sign(key.private_key.as_ref().unwrap()) + .into_future(); + signed + .map_err(RequestError::LowLevelError) + .map(|signed| { + form_urlencoded::Serializer::new(String::new()) + .extend_pairs(vec![ + ("grant_type".to_string(), GRANT_TYPE.to_string()), + ("assertion".to_string(), signed), + ]) + .finish() + }) + .map(|rqbody| { + hyper::Request::post(key.token_uri.unwrap()) + .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") + .body(hyper::Body::from(rqbody)) + .unwrap() + }) + .and_then(move |request| client.request(request).map_err(RequestError::ClientError)) + .and_then(|response| { + response + .into_body() + .concat2() + .map_err(RequestError::ClientError) + }) .map(|c| String::from_utf8(c.into_bytes().to_vec()).unwrap()) - .unwrap(); // TODO: error handling - - let token: Result = - serde_json::from_str(&json_str); - - match token { - Err(e) => return Err(Box::new(e)), - Ok(token) => { - if token.access_token.is_none() - || token.token_type.is_none() - || token.expires_in.is_none() - { - Err(Box::new(StringError::new( - "Token response lacks fields", - Some(format!("{:?}", token).as_str()), - ))) + .and_then(|s| { + if let Ok(jse) = serde_json::from_str::(&s) { + Err(RequestError::NegativeServerResponse( + jse.error, + jse.error_description, + )) } else { - Ok(token.to_oauth_token()) + serde_json::from_str(&s).map_err(RequestError::JSONError) } - } - } + }) + .then(|token: Result| match token { + Err(e) => return Err(e), + Ok(token) => { + if token.access_token.is_none() + || token.token_type.is_none() + || token.expires_in.is_none() + { + Err(RequestError::BadServerResponse(format!( + "Token response lacks fields: {:?}", + token + ))) + } else { + Ok(token.to_oauth_token()) + } + } + }) } } @@ -321,27 +319,67 @@ impl GetToken for ServiceAccountAccess where C: hyper::client::connect::Connect, { - fn token<'b, I, T>(&mut self, scopes: I) -> Result> + fn token<'b, I, T>( + &mut self, + scopes: I, + ) -> Box + Send> where T: AsRef + Ord + 'b, - I: IntoIterator, + I: Iterator, { - let (hash, scps) = hash_scopes(scopes); + let (hash, scps0) = hash_scopes(scopes); + let cache = self.cache.clone(); + let scps = scps0.clone(); - if let Some(token) = self - .cache - .get(hash, &scps) - .map_err(|e| Box::new(e) as Box)? - { - if !token.expired() { - return Ok(token); + let cache_lookup = futures::lazy(move || { + match cache + .lock() + .unwrap() + .get(hash, &scps.iter().map(|s| s.as_str()).collect()) + { + Ok(Some(token)) => { + if !token.expired() { + return Ok(token); + } + return Err(StringError::new("expired token in cache", None)); + } + Err(e) => return Err(StringError::new(format!("cache lookup error: {}", e), None)), + Ok(None) => return Err(StringError::new("no token in cache", None)), } - } + }); - let token = self.request_token(&scps)?; - let _ = self.cache.set(hash, &scps, Some(token.clone())); + let cache = self.cache.clone(); + let req_token = Self::request_token( + self.client.clone(), + self.sub.clone(), + self.key.clone(), + scps0.iter().map(|s| s.to_string()).collect(), + ) + .then(move |r| match r { + Ok(token) => { + let _ = cache.lock().unwrap().set( + hash, + &scps0.iter().map(|s| s.as_str()).collect(), + Some(token.clone()), + ); + Box::new(future::ok(token)) + } + Err(e) => Box::new(future::err(e)), + }); - Ok(token) + Box::new(cache_lookup.then(|r| match r { + Ok(t) => Box::new(Ok(t).into_future()) + as Box + Send>, + Err(_) => { + Box::new(req_token) as Box + Send> + } + })) + } + + /// Returns an empty ApplicationSecret as tokens for service accounts don't need to be + /// refreshed (they are simply reissued). + fn application_secret(&self) -> ApplicationSecret { + Default::default() } fn api_key(&mut self) -> Option { @@ -352,12 +390,114 @@ where #[cfg(test)] mod tests { use super::*; - use crate::authenticator::GetToken; use crate::helper::service_account_key_from_file; + use crate::types::GetToken; + use hyper; use hyper_tls::HttpsConnector; + use mockito::{self, mock}; + use tokio; - // This is a valid but deactivated key. + #[test] + fn test_mocked_http() { + env_logger::try_init().unwrap(); + let server_url = &mockito::server_url(); + let client_secret = r#"{ + "type": "service_account", + "project_id": "yup-test-243420", + "private_key_id": "26de294916614a5ebdf7a065307ed3ea9941902b", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDemmylrvp1KcOn\n9yTAVVKPpnpYznvBvcAU8Qjwr2fSKylpn7FQI54wCk5VJVom0jHpAmhxDmNiP8yv\nHaqsef+87Oc0n1yZ71/IbeRcHZc2OBB33/LCFqf272kThyJo3qspEqhuAw0e8neg\nLQb4jpm9PsqR8IjOoAtXQSu3j0zkXemMYFy93PWHjVpPEUX16NGfsWH7oxspBHOk\n9JPGJL8VJdbiAoDSDgF0y9RjJY5I52UeHNhMsAkTYs6mIG4kKXt2+T9tAyHw8aho\nwmuytQAfydTflTfTG8abRtliF3nil2taAc5VB07dP1b4dVYy/9r6M8Z0z4XM7aP+\nNdn2TKm3AgMBAAECggEAWi54nqTlXcr2M5l535uRb5Xz0f+Q/pv3ceR2iT+ekXQf\n+mUSShOr9e1u76rKu5iDVNE/a7H3DGopa7ZamzZvp2PYhSacttZV2RbAIZtxU6th\n7JajPAM+t9klGh6wj4jKEcE30B3XVnbHhPJI9TCcUyFZoscuPXt0LLy/z8Uz0v4B\nd5JARwyxDMb53VXwukQ8nNY2jP7WtUig6zwE5lWBPFMbi8GwGkeGZOruAK5sPPwY\nGBAlfofKANI7xKx9UXhRwisB4+/XI1L0Q6xJySv9P+IAhDUI6z6kxR+WkyT/YpG3\nX9gSZJc7qEaxTIuDjtep9GTaoEqiGntjaFBRKoe+VQKBgQDzM1+Ii+REQqrGlUJo\nx7KiVNAIY/zggu866VyziU6h5wjpsoW+2Npv6Dv7nWvsvFodrwe50Y3IzKtquIal\nVd8aa50E72JNImtK/o5Nx6xK0VySjHX6cyKENxHRDnBmNfbALRM+vbD9zMD0lz2q\nmns/RwRGq3/98EqxP+nHgHSr9QKBgQDqUYsFAAfvfT4I75Glc9svRv8IsaemOm07\nW1LCwPnj1MWOhsTxpNF23YmCBupZGZPSBFQobgmHVjQ3AIo6I2ioV6A+G2Xq/JCF\nmzfbvZfqtbbd+nVgF9Jr1Ic5T4thQhAvDHGUN77BpjEqZCQLAnUWJx9x7e2xvuBl\n1A6XDwH/ewKBgQDv4hVyNyIR3nxaYjFd7tQZYHTOQenVffEAd9wzTtVbxuo4sRlR\nNM7JIRXBSvaATQzKSLHjLHqgvJi8LITLIlds1QbNLl4U3UVddJbiy3f7WGTqPFfG\nkLhUF4mgXpCpkMLxrcRU14Bz5vnQiDmQRM4ajS7/kfwue00BZpxuZxst3QKBgQCI\nRI3FhaQXyc0m4zPfdYYVc4NjqfVmfXoC1/REYHey4I1XetbT9Nb/+ow6ew0UbgSC\nUZQjwwJ1m1NYXU8FyovVwsfk9ogJ5YGiwYb1msfbbnv/keVq0c/Ed9+AG9th30qM\nIf93hAfClITpMz2mzXIMRQpLdmQSR4A2l+E4RjkSOwKBgQCB78AyIdIHSkDAnCxz\nupJjhxEhtQ88uoADxRoEga7H/2OFmmPsqfytU4+TWIdal4K+nBCBWRvAX1cU47vH\nJOlSOZI0gRKe0O4bRBQc8GXJn/ubhYSxI02IgkdGrIKpOb5GG10m85ZvqsXw3bKn\nRVHMD0ObF5iORjZUqD0yRitAdg==\n-----END PRIVATE KEY-----\n", + "client_email": "yup-test-sa-1@yup-test-243420.iam.gserviceaccount.com", + "client_id": "102851967901799660408", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/yup-test-sa-1%40yup-test-243420.iam.gserviceaccount.com" +}"#; + let mut key: ServiceAccountKey = serde_json::from_str(client_secret).unwrap(); + key.token_uri = Some(format!("{}/token", server_url)); + + let json_response = r#"{ + "access_token": "ya29.c.ElouBywiys0LyNaZoLPJcp1Fdi2KjFMxzvYKLXkTdvM-rDfqKlvEq6PiMhGoGHx97t5FAvz3eb_ahdwlBjSStxHtDVQB4ZPRJQ_EOi-iS7PnayahU2S9Jp8S6rk", + "expires_in": 3600, + "token_type": "Bearer" +}"#; + let bad_json_response = r#"{ + "access_token": "ya29.c.ElouBywiys0LyNaZoLPJcp1Fdi2KjFMxzvYKLXkTdvM-rDfqKlvEq6PiMhGoGHx97t5FAvz3eb_ahdwlBjSStxHtDVQB4ZPRJQ_EOi-iS7PnayahU2S9Jp8S6rk", + "token_type": "Bearer" +}"#; + + let https = HttpsConnector::new(1).unwrap(); + let client = hyper::Client::builder() + .keep_alive(false) + .build::<_, hyper::Body>(https); + let mut rt = tokio::runtime::Builder::new() + .core_threads(1) + .panic_handler(|e| std::panic::resume_unwind(e)) + .build() + .unwrap(); + + // Successful path. + { + let _m = mock("POST", "/token") + .with_status(200) + .with_header("content-type", "text/json") + .with_body(json_response) + .expect(1) + .create(); + let mut acc = ServiceAccountAccess::new(key.clone(), client.clone()); + let fut = acc + .token(vec!["https://www.googleapis.com/auth/pubsub"].iter()) + .and_then(|tok| { + assert!(tok.access_token.contains("ya29.c.ElouBywiys0Ly")); + assert_eq!(Some(3600), tok.expires_in); + Ok(()) + }); + rt.block_on(fut).expect("block_on"); + + assert!(acc + .cache + .lock() + .unwrap() + .get( + 3502164897243251857, + &vec!["https://www.googleapis.com/auth/pubsub"] + ) + .unwrap() + .is_some()); + // Test that token is in cache (otherwise mock will tell us) + let fut = acc + .token(vec!["https://www.googleapis.com/auth/pubsub"].iter()) + .and_then(|tok| { + assert!(tok.access_token.contains("ya29.c.ElouBywiys0Ly")); + assert_eq!(Some(3600), tok.expires_in); + Ok(()) + }); + rt.block_on(fut).expect("block_on 2"); + + _m.assert(); + } + // Malformed response. + { + let _m = mock("POST", "/token") + .with_status(200) + .with_header("content-type", "text/json") + .with_body(bad_json_response) + .create(); + let mut acc = ServiceAccountAccess::new(key.clone(), client.clone()); + let fut = acc + .token(vec!["https://www.googleapis.com/auth/pubsub"].iter()) + .then(|result| { + assert!(result.is_err()); + Ok(()) as Result<(), ()> + }); + rt.block_on(fut).expect("block_on"); + _m.assert(); + } + rt.shutdown_on_idle().wait().expect("shutdown"); + } + + // Valid but deactivated key. const TEST_PRIVATE_KEY_PATH: &'static str = "examples/Sanguine-69411a0c0eea.json"; // Uncomment this test to verify that we can successfully obtain tokens. @@ -373,8 +513,8 @@ mod tests { let mut acc = ServiceAccountAccess::new(key, client); println!( "{:?}", - acc.token(vec![&"https://www.googleapis.com/auth/pubsub"]) - .unwrap() + acc.token(vec!["https://www.googleapis.com/auth/pubsub"].iter()) + .wait() ); } diff --git a/src/storage.rs b/src/storage.rs index 2dd79f6..30eabe9 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -35,7 +35,7 @@ pub trait TokenStorage { } /// Calculate a hash value describing the scopes, and return a sorted Vec of the scopes. -pub fn hash_scopes<'a, I, T>(scopes: I) -> (u64, Vec<&'a str>) +pub fn hash_scopes<'a, I, T>(scopes: I) -> (u64, Vec) where T: AsRef + Ord + 'a, I: IntoIterator, @@ -47,7 +47,7 @@ where sv.sort(); let mut sh = DefaultHasher::new(); &sv.hash(&mut sh); - let sv = sv; + let sv = sv.iter().map(|s| s.to_string()).collect(); (sh.finish(), sv) } @@ -80,11 +80,17 @@ impl TokenStorage for NullStorage { } /// A storage that remembers values for one session only. -#[derive(Default)] +#[derive(Debug, Default)] pub struct MemoryStorage { pub tokens: HashMap, } +impl MemoryStorage { + pub fn new() -> MemoryStorage { + Default::default() + } +} + impl TokenStorage for MemoryStorage { type Error = NullError; diff --git a/src/types.rs b/src/types.rs index fe3dd1c..a4d200e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,26 +2,54 @@ use chrono::{DateTime, TimeZone, Utc}; use hyper; use std::error::Error; use std::fmt; +use std::io; use std::str::FromStr; +use futures::prelude::*; + /// A marker trait for all Flows pub trait Flow { fn type_id() -> FlowType; } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] pub struct JsonError { pub error: String, pub error_description: Option, pub error_uri: Option, } -/// Encapsulates all possible results of the `request_token(...)` operation +/// All possible outcomes of the refresh flow +#[derive(Debug)] +pub enum RefreshResult { + /// Indicates connection failure + Error(hyper::Error), + /// The server did not answer with a new token, providing the server message + RefreshError(String, Option), + /// The refresh operation finished successfully, providing a new `Token` + Success(Token), +} + +/// Encapsulates all possible results of a `poll_token(...)` operation in the Device flow. +#[derive(Debug)] +pub enum PollError { + /// Connection failure - retry if you think it's worth it + HttpError(hyper::Error), + /// Indicates we are expired, including the expiration date + Expired(DateTime), + /// Indicates that the user declined access. String is server response + AccessDenied, + /// Indicates that too many attempts failed. + TimedOut, + /// Other type of error. + Other(String), +} + +/// Encapsulates all possible results of the `token(...)` operation +#[derive(Debug)] pub enum RequestError { /// Indicates connection failure ClientError(hyper::Error), - /// Indicates HTTP status failure - HttpError(hyper::http::Error), /// The OAuth client was not found InvalidClient, /// Some requested scopes were invalid. String contains the scopes as part of @@ -30,6 +58,20 @@ pub enum RequestError { /// A 'catch-all' variant containing the server error and description /// First string is the error code, the second may be a more detailed description NegativeServerResponse(String, Option), + /// A malformed server response. + BadServerResponse(String), + /// Error while decoding a JSON response. + JSONError(serde_json::error::Error), + /// Error within user input. + UserError(String), + /// A lower level IO error. + LowLevelError(io::Error), + /// A poll error occurred in the DeviceFlow. + Poll(PollError), + /// An error occurred while refreshing tokens. + Refresh(RefreshResult), + /// Error in token cache layer + Cache(Box), } impl From for RequestError { @@ -38,12 +80,6 @@ impl From for RequestError { } } -impl From for RequestError { - fn from(error: hyper::http::Error) -> RequestError { - RequestError::HttpError(error) - } -} - impl From for RequestError { fn from(value: JsonError) -> RequestError { match &*value.error { @@ -62,7 +98,6 @@ impl fmt::Display for RequestError { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { RequestError::ClientError(ref err) => err.fmt(f), - RequestError::HttpError(ref err) => err.fmt(f), RequestError::InvalidClient => "Invalid Client".fmt(f), RequestError::InvalidScope(ref scope) => writeln!(f, "Invalid Scope: '{}'", scope), RequestError::NegativeServerResponse(ref error, ref desc) => { @@ -72,6 +107,28 @@ impl fmt::Display for RequestError { } "\n".fmt(f) } + RequestError::BadServerResponse(ref s) => s.fmt(f), + RequestError::JSONError(ref e) => format!( + "JSON Error; this might be a bug with unexpected server responses! {}", + e + ) + .fmt(f), + RequestError::UserError(ref s) => s.fmt(f), + RequestError::LowLevelError(ref e) => e.fmt(f), + RequestError::Poll(ref pe) => pe.fmt(f), + RequestError::Refresh(ref rr) => format!("{:?}", rr).fmt(f), + RequestError::Cache(ref e) => e.fmt(f), + } + } +} + +impl Error for RequestError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match *self { + RequestError::ClientError(ref err) => Some(err), + RequestError::LowLevelError(ref err) => Some(err), + RequestError::JSONError(ref err) => Some(err), + _ => None, } } } @@ -179,6 +236,25 @@ impl FromStr for Scheme { } } +/// A provider for authorization tokens, yielding tokens valid for a given scope. +/// The `api_key()` method is an alternative in case there are no scopes or +/// if no user is involved. +pub trait GetToken { + fn token<'b, I, T>( + &mut self, + scopes: I, + ) -> Box + Send> + where + T: AsRef + Ord + 'b, + I: Iterator; + + fn api_key(&mut self) -> Option; + + /// Return an application secret with at least token_uri, client_secret, and client_id filled + /// in. This is used for refreshing tokens without interaction from the flow. + fn application_secret(&self) -> ApplicationSecret; +} + /// Represents a token as returned by OAuth2 servers. /// /// It is produced by all authentication flows.