diff --git a/Cargo.lock b/Cargo.lock index 32c182b..f24c16f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -395,6 +395,7 @@ dependencies = [ name = "google_bigquery_v2" version = "0.1.0" dependencies = [ + "async-trait", "chrono", "env_logger", "google-bigquery2", @@ -407,7 +408,7 @@ dependencies = [ [[package]] name = "google_bigquery_v2_derive" -version = "0.0.0" +version = "0.0.1" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 7e512ba..2ee84fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,9 @@ edition = "2021" google-bigquery2 = "5.0.2" serde_json = "1.0.95" tokio = "1.0.2" -google_bigquery_v2_derive = { version = "0.0.0", path = "./google_bigquery_v2_derive" } +google_bigquery_v2_derive = { version = "0.0.1", path = "./google_bigquery_v2_derive" } chrono = "0.4.24" log = "0.4.17" nameof = "1.2.2" -env_logger = "0.10.0" \ No newline at end of file +env_logger = "0.10.0" +async-trait = "0.1.68" diff --git a/google_bigquery_v2_derive/Cargo.toml b/google_bigquery_v2_derive/Cargo.toml index 1f84a7b..09035cb 100644 --- a/google_bigquery_v2_derive/Cargo.toml +++ b/google_bigquery_v2_derive/Cargo.toml @@ -1,9 +1,7 @@ [package] name = "google_bigquery_v2_derive" -version = "0.0.0" +version = "0.0.1" authors = ["OMGeeky "] -description = "A `cargo generate` template for quick-starting a procedural macro crate" -keywords = ["template", "proc_macro", "procmacro"] edition = "2021" [lib] diff --git a/google_bigquery_v2_derive/src/lib.rs b/google_bigquery_v2_derive/src/lib.rs index d498b72..fb36191 100644 --- a/google_bigquery_v2_derive/src/lib.rs +++ b/google_bigquery_v2_derive/src/lib.rs @@ -1,7 +1,7 @@ #[allow(unused)] extern crate proc_macro; -use proc_macro2::TokenStream; +use proc_macro2::{Ident, TokenStream}; use syn::DeriveInput; struct Field { @@ -28,7 +28,7 @@ pub fn big_query_table_derive(input: proc_macro::TokenStream) -> proc_macro::Tok fn impl_big_query_table_derive(ast: &syn::DeriveInput) -> proc_macro2::TokenStream { let pk_field = get_pk_field(&ast); - let client_field = get_client_field(&ast); + let client_field = get_client_field(&ast.data); implement_big_query_table_base(&ast, &pk_field, &client_field) } @@ -39,6 +39,8 @@ fn implement_big_query_table_base( ) -> proc_macro2::TokenStream { let table_ident = &ast.ident; let table_name = get_table_name(&ast); + let impl_get_all_params = implement_get_all_params(&ast, &table_ident); + let impl_get_parameter_from_field = implement_get_parameter_from_field(&ast, &table_ident); let impl_get_client = implement_get_client(&client_field); let impl_set_client = implement_set_client(&client_field); let impl_get_pk_field_name = implement_get_pk_field_name(&pk_field); @@ -46,10 +48,15 @@ fn implement_big_query_table_base( let impl_get_pk_value = implement_get_pk_value(&pk_field); let impl_get_query_fields = implement_get_query_fields(&ast); let impl_get_table_name = implement_impl_get_table_name(&table_name); + let impl_reload = implement_reload(&pk_field); let impl_set_field_value = implement_set_field_value(&ast); + let impl_get_field_value = implement_get_field_value(&ast); let impl_from_query_result_row = implement_from_query_result_row(&ast); quote::quote! { - impl<'a> BigQueryTableBase<'a> for #table_ident<'a> { + #[async_trait::async_trait] + impl BigQueryTableBase for #table_ident { + #impl_get_all_params + #impl_get_parameter_from_field #impl_get_client #impl_set_client #impl_get_pk_field_name @@ -57,19 +64,76 @@ fn implement_big_query_table_base( #impl_get_pk_value #impl_get_query_fields #impl_get_table_name + #impl_reload #impl_set_field_value + #impl_get_field_value #impl_from_query_result_row + + async fn insert(&mut self) -> Result<()>{ + todo!() + } + async fn update(&mut self) -> Result<()>{ + todo!() + } } } } +fn implement_get_all_params(ast: &DeriveInput, table_ident: &Ident) -> TokenStream { + fn get_param_from_field(f: Field, table_ident: &Ident) -> TokenStream { + let field_ident = f.field_ident; + let field_name = f.local_name; + quote::quote! { + #table_ident::get_parameter(&self.#field_ident, &#table_ident::get_field_param_name(&#field_name.to_string())?)? + } + } + let table_ident = &ast.ident; + let fields = get_fields_without_client(&ast.data); + let fields = fields + .into_iter() + .map(|f| get_param_from_field(f, &table_ident)); + + quote::quote! { + fn get_all_params(&self) -> google_bigquery_v2::prelude::Result> { + log::trace!("get_all_params() self:{:?}", self); + Ok(vec![ + #(#fields),* + ]) + } + } +} + +fn implement_get_parameter_from_field(ast: &DeriveInput, table_ident: &Ident) -> TokenStream { + fn get_param_from_field(f: Field, table_ident: &Ident) -> TokenStream { + let field_ident = f.field_ident; + let field_name = f.local_name; + quote::quote! { + #field_name => #table_ident::get_parameter(&self.#field_ident, &#table_ident::get_field_param_name(&#field_name.to_string())?), + } + } + let table_ident = &ast.ident; + let fields = get_fields_without_client(&ast.data); + let fields = fields + .into_iter() + .map(|f| get_param_from_field(f, &table_ident)); + + quote::quote! { + fn get_parameter_from_field(&self, field_name: &str) -> google_bigquery_v2::prelude::Result { + log::trace!("get_parameter_from_field(); field_name: '{}' self:{:?}", field_name, self); + match field_name { + #(#fields)* + _ => Err(format!("Field {} not found", field_name).into()), + } + } + } +} //region method implementations fn implement_get_client(client_field: &Field) -> TokenStream { let client_ident = client_field.field_ident.clone(); quote::quote! { - fn get_client(&self) -> &'a BigqueryClient { + fn get_client(&self) -> &BigqueryClient { log::trace!("get_client() self={:?}", self); &self.#client_ident } @@ -79,7 +143,7 @@ fn implement_get_client(client_field: &Field) -> TokenStream { fn implement_set_client(client_field: &Field) -> TokenStream { let client_ident = client_field.field_ident.clone(); quote::quote! { - fn set_client(&mut self, client: &'a BigqueryClient) { + fn set_client(&mut self, client: BigqueryClient) { log::trace!("set_client() self={:?}", self); self.#client_ident = client; } @@ -109,7 +173,7 @@ fn implement_get_pk_db_name(pk_field: &Field) -> TokenStream { fn implement_get_pk_value(pk_field: &Field) -> TokenStream { let pk_ident = &pk_field.field_ident; quote::quote! { - fn get_pk_value(&self) -> &dyn google_bigquery_v2::data::param_conversion::BigDataValueType { + fn get_pk_value(&self) -> &(dyn google_bigquery_v2::data::param_conversion::BigDataValueType + Send + Sync) { log::trace!("get_pk_value() self={:?}", self); &self.#pk_ident } @@ -124,14 +188,11 @@ fn implement_get_query_fields(ast: &DeriveInput) -> TokenStream { map.insert(String::from(#local_name),String::from(#db_name)); } } - let fields = get_fields(&ast.data); + let fields = get_fields_without_client(&ast.data); let pk_field = get_pk_field(&ast); - let client_ident = get_client_field(&ast).field_ident; let fields: Vec = fields .into_iter() - .filter(|f| { - f.field_ident != client_ident && f.field_ident != pk_field.field_ident - }) + .filter(|f| f.field_ident != pk_field.field_ident) .map(implement_map_insert) .collect(); @@ -168,15 +229,8 @@ fn implement_set_field_value(ast: &DeriveInput) -> TokenStream { #local_name => self.#field_ident = #field_type::from_param(value)?, } } - let client_ident = get_client_field(&ast).field_ident; - let fields = get_fields(&ast.data); - let fields: Vec = fields - .into_iter() - .filter(|f| { - f.field_ident != client_ident - }) - .map(write_set_field_value) - .collect(); + let fields = get_fields_without_client(&ast.data); + let fields: Vec = fields.into_iter().map(write_set_field_value).collect(); quote::quote! { fn set_field_value(&mut self, field_name: &str, value: &serde_json::Value) -> Result<()>{ @@ -190,6 +244,28 @@ fn implement_set_field_value(ast: &DeriveInput) -> TokenStream { } } } +fn implement_get_field_value(ast: &DeriveInput) -> TokenStream { + fn write_get_field_value(f: Field) -> TokenStream { + let field_ident = f.field_ident; + let local_name = f.local_name; + quote::quote! { + #local_name => Ok(ConvertBigQueryParams::to_param(&self.#field_ident)), + } + } + let fields = get_fields_without_client(&ast.data); + let fields: Vec = fields.into_iter().map(write_get_field_value).collect(); + + quote::quote! { + fn get_field_value(&self, field_name: &str) -> Result { + log::trace!("get_field_value() self={:?} field_name={}", self, field_name); + use google_bigquery_v2::data::param_conversion::ConvertBigQueryParams; + match field_name { + #(#fields)* + _ => return Err(google_bigquery_v2::data::param_conversion::ConversionError::new(format!("Field '{}' not found", field_name)).into()) + } + } + } +} fn implement_from_query_result_row(ast: &DeriveInput) -> TokenStream { fn set_field_value(f: Field) -> TokenStream { @@ -200,18 +276,12 @@ fn implement_from_query_result_row(ast: &DeriveInput) -> TokenStream { #field_ident: #field_type::from_param(&row[#db_name])?, } } - let client_ident = get_client_field(&ast).field_ident; - let fields = get_fields(&ast.data); - let fields: Vec = fields - .into_iter() - .filter(|f| { - f.field_ident != client_ident - }) - .map(set_field_value) - .collect(); + let client_ident = get_client_field(&ast.data).field_ident; + let fields = get_fields_without_client(&ast.data); + let fields: Vec = fields.into_iter().map(set_field_value).collect(); quote::quote! { fn new_from_query_result_row( - client: &'a BigqueryClient, + client: BigqueryClient, row: &std::collections::HashMap, ) -> Result where Self: Sized { @@ -225,6 +295,22 @@ fn implement_from_query_result_row(ast: &DeriveInput) -> TokenStream { } } } + +fn implement_reload(pk_field: &Field) -> TokenStream { + let pk_value = &pk_field.field_ident; + quote::quote! { + async fn reload(&mut self) -> Result<()> + where + Self: Sized + Send + Sync, + { + log::trace!("reload()"); + let value = &self.#pk_value;//TODO: this is the problem!. it just does not want to work + Self::get_by_pk(self.get_client().clone(), value).await.map(|mut t| { + *self = t; + }) + } + } +} //endregion //endregion @@ -250,9 +336,9 @@ fn get_pk_field(ast: &syn::DeriveInput) -> Field { pk } -fn get_client_field(ast: &syn::DeriveInput) -> Field { +fn get_client_field(data: &syn::Data) -> Field { //region client - let mut client_fields = get_fields_with_attribute(&ast.data, "client"); + let mut client_fields = get_fields_with_attribute(&data, "client"); if client_fields.len() != 1 { panic!("Exactly one client field must be specified"); } @@ -276,7 +362,16 @@ fn get_struct_attributes(ast: &syn::DeriveInput) -> Vec { } res } - +fn get_fields_without_client(data: &syn::Data) -> Vec { + let mut res = vec![]; + let client_ident = get_client_field(&data).field_ident; + for field in get_fields(&data) { + if field.field_ident != client_ident { + res.push(field); + } + } + res +} fn get_fields(data: &syn::Data) -> Vec { let mut res = vec![]; diff --git a/src/client.rs b/src/client.rs index 96c2e42..a25adae 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,24 +1,36 @@ use std::error::Error; use std::fmt::Debug; +use google_bigquery2::{hyper, hyper_rustls, oauth2}; +use google_bigquery2::Bigquery; use google_bigquery2::hyper::client::HttpConnector; use google_bigquery2::hyper_rustls::HttpsConnector; -use google_bigquery2::Bigquery; -use google_bigquery2::{hyper, hyper_rustls, oauth2}; - +#[derive(Clone)] pub struct BigqueryClient { client: Bigquery>, project_id: String, dataset_id: String, } +impl Default for BigqueryClient { + fn default() -> Self { + BigqueryClient::empty() + } +} + impl BigqueryClient { pub fn empty() -> BigqueryClient { todo!() } } +//TODO: check if this unsafe impl is needed +unsafe impl Send for BigqueryClient {} + +//TODO: check if this unsafe impl is needed +unsafe impl Sync for BigqueryClient {} + impl BigqueryClient { pub async fn new>( project_id: S, @@ -75,7 +87,7 @@ async fn get_internal_client>( "Failed to read service account key from file. {}", service_account_path ) - .as_str(), + .as_str(), ); let auth = oauth2::ServiceAccountAuthenticator::builder(secret) .build() diff --git a/src/data/bigquery_builder.rs b/src/data/bigquery_builder.rs index aa6dec5..472dc42 100644 --- a/src/data/bigquery_builder.rs +++ b/src/data/bigquery_builder.rs @@ -15,285 +15,19 @@ use crate::prelude::*; //region OrderDirection -#[derive(Debug, Clone)] -pub enum OrderDirection { - Ascending, - Descending, -} - -impl OrderDirection { - pub(crate) fn to_query_str(&self) -> String { - match self { - OrderDirection::Ascending => String::from("ASC"), - OrderDirection::Descending => String::from("DESC"), - } - } -} //endregion -//region BigQueryBuilder - -#[derive(Debug, Clone)] -pub struct BigQueryBuilder<'a, Table> { - client: Option<&'a BigqueryClient>, - required_params: Vec, - selected_fields: Option>, - wheres: Vec, - limit: Option, - order_bys: Vec<(String, OrderDirection)>, - - _table_type_marker: PhantomData>, -} - -impl<'a, Table> BigQueryBuilder<'a, Table> - where - Table: BigQueryTable<'a>, -{ - //region build methods - - pub async fn run(self) -> Result> { - trace!("BigQueryBuilder::run()"); - //TODO: maybe return an iterator instead of a vector. - // this would allow for lazy loading of the data. - // it would also make it possible that additional - // data is loaded (if the set limit is higher than - // the number of rows returned) - let client = self.client.unwrap(); - let fields = self.get_sorted_selected_fields(); - let req = self.build_query_request(); - - debug!("req: {:?}", req); - let (res, query_res) = client - .get_client() - .jobs() - .query(req, client.get_project_id()) - .doit() - .await?; - - if res.status() != 200 { - return Err(format!("Wrong status code returned! ({})", res.status()).into()); - } - - let query_res = query_res.rows.unwrap(); - log::debug!("query_res: {:?}", query_res); - let mut result: Vec = Vec::new(); - for row in query_res { - let row = row.f.unwrap(); - let mut row_data: HashMap = HashMap::new(); - for (i, field) in row.into_iter().enumerate() { - let field = field.v.unwrap_or(Value::Null); - log::debug!("{}: {}", fields[i], field); - row_data.insert(fields[i].clone(), field); - } - let data = Table::new_from_query_result_row(client, &row_data)?; - result.push(data); - } - - return Ok(result); - } - - pub fn build_query_request(self) -> QueryRequest { - QueryRequest { - query: Some(self.build_query_string()), - query_parameters: Some(self.required_params), - use_legacy_sql: Some(false), - ..Default::default() - } - } - pub fn build_query_string(&self) -> String { - let where_clause = if self.wheres.is_empty() { - String::new() - } else { - format!("WHERE {}", self.wheres.join(" AND ")) - }; - let order_by_clause = if self.order_bys.is_empty() { - String::new() - } else { - format!("ORDER BY {}", self.order_bys - .iter() - .map(|(key, dir)| format!("{} {}", key, dir.to_query_str())) - .collect::>() - .join(", ")) - }; - format!( - "SELECT {} FROM {} {} {} LIMIT {}", - self.get_sorted_selected_fields() - .join(", "), - Table::get_table_identifier_from_client(self.client.unwrap()), - where_clause, - order_by_clause, - self.limit.unwrap_or(1000) - ) - } - - //endregion - - - //region add content - - pub fn set_limit(self, limit: usize) -> Self { - Self { - limit: Some(limit), - ..self - } - } - - fn set_select_fields(self, fields: Vec) -> Result { - //TODO: this method probably does not work since the logic does - // not work if (at least the required) fields are not selected - // since the parser will not be able to create the struct instance. - let selected_fields = self.selected_fields; - let mut selected_fields = match selected_fields { - Some(selected_fields) => selected_fields, - None => Vec::new(), - }; - - for field in fields { - let field_name = Table::get_field_db_name(&field) - .map_err(|e| format!("Error while selecting field '{}': {}", field, e))?; - selected_fields.push(field_name); - } - - Ok(Self { - selected_fields: Some(selected_fields), - ..self - }) - } - - pub fn add_where_eq(self, column: &str, value: Option<&T>) -> Result - where - T: BigDataValueType + Debug, - { - let column = Table::get_field_db_name(column)?; - let mut wheres = self.wheres; - - if let Some(value) = value { - let param_name = format!("__PARAM_{}", self.required_params.len()); - - let param = get_parameter(value, ¶m_name); - - let mut required_params = self.required_params; - required_params.push(param); - - wheres.push(format!("{} = @{}", column, param_name)); - - return Ok(Self { - wheres, - required_params, - ..self - }); - } - - wheres.push(format!("{} is NULL", column)); - Ok(Self { wheres, ..self }) - } - - pub fn add_order_by(self, column: &str, direction: OrderDirection) -> Self { - let column = Table::get_field_db_name(column).unwrap(); - let mut order_bys = self.order_bys; - order_bys.push((column.to_string(), direction)); - Self { order_bys, ..self } - } - //endregion - - fn get_sorted_selected_fields(&self) -> Vec { - trace!("get_sorted_selected_fields()"); - let mut fields: Vec = match &self.selected_fields { - Some(fields) => fields.clone(), - None => { - Table::get_query_fields(true) - .into_iter() - .map(|f| f.1)//get the db name - .collect() - } - }; - log::debug!("fields: {:?}", fields); - fields.sort(); - fields - } -} -//region implement some convenience traits for BigQueryBuilder - -impl<'a, Table> Default for BigQueryBuilder<'a, Table> { - fn default() -> Self { - Self { - client: None, - required_params: vec![], - selected_fields: None, - wheres: vec![], - limit: None, - order_bys: vec![], - _table_type_marker: PhantomData, - } - } -} - -impl<'a, Table: BigQueryTable<'a>> Display for BigQueryBuilder<'a, Table> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!( - "BigQueryBuilder: {}\t\ - wheres: 'where {}'\t\ - order by's: 'order by {}'\t\ - limit: {:?}\t\ - params: {:?}", - Table::get_table_name(), - self.wheres.join(" AND "), - self.order_bys - .iter() - .map(|(key, dir)| format!("{} {}", key, dir.to_query_str())) - .collect::>() - .join(", "), - self.limit, - self.required_params - )) - } -} - -//endregion - -//endregion - -fn get_parameter(value: &T, param_name: &String) -> QueryParameter - where - T: BigDataValueType + Debug, -{ - let param_value = serde_json::from_value(value.to_param()).unwrap(); - let param_value = QueryParameterValue { - value: Some(param_value), - ..Default::default() - }; - - let param_type = T::convert_type_to_bigquery_type(); - let param_type = QueryParameterType { - type_: Some(param_type), - ..Default::default() - }; - - let param = QueryParameter { - parameter_type: Some(param_type), - parameter_value: Some(param_value), - name: Some(param_name.clone()), - }; - param -} - //region BigQueryBuilderAvailable -pub trait BigQueryBuilderAvailable<'a, Table> { - fn query(client: &'a BigqueryClient) -> BigQueryBuilder<'a, Table>; -} - -impl<'a, Table> BigQueryBuilderAvailable<'a, Table> for Table - where - Table: BigQueryTable<'a>, -{ - fn query(client: &'a BigqueryClient) -> BigQueryBuilder<'a, Table> { - BigQueryBuilder { - client: Some(client), - ..Default::default() - } - } -} +// pub trait BigQueryBuilderAvailable<'a, Table> { +// fn query(client: &'a BigqueryClient) -> BigQueryBuilder<'a, Table>; +// } +// +// impl<'a, Table> BigQueryBuilderAvailable<'a, Table> for Table +// where +// Table: BigQueryTable<'a>, +// { +// } //endregion diff --git a/src/data/bigquery_table.rs b/src/data/bigquery_table.rs index 7661b6b..0c0c7b2 100644 --- a/src/data/bigquery_table.rs +++ b/src/data/bigquery_table.rs @@ -1,33 +1,110 @@ use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::marker::PhantomData; +use async_trait::async_trait; +pub use google_bigquery2::api::QueryParameter; +use google_bigquery2::api::QueryRequest; +pub use google_bigquery2::api::{QueryParameterType, QueryParameterValue}; +use log::debug; use log::trace; use serde_json::Value; use crate::client::BigqueryClient; -use crate::data::param_conversion::{BigDataValueType}; - +use crate::data::param_conversion::{ + convert_value_to_string, BigDataValueType, +}; use crate::prelude::*; - -pub trait BigQueryTableBase<'a> { +#[async_trait] +pub trait BigQueryTableBase { + fn get_all_params(&self) -> Result>; + fn get_parameter_from_field(&self, field_name: &str) -> Result; + //region get infos + /// Returns the name of the table in the database. fn get_table_name() -> String; - fn get_client(&self) -> &'a BigqueryClient; - fn set_client(&mut self, client: &'a BigqueryClient); + /// Returns the bigquery-client for the struct. + fn get_client(&self) -> &BigqueryClient; + /// Sets the bigquery-client for the struct. + fn set_client(&mut self, client: BigqueryClient); + /// Returns the name of the primary key field in the struct. fn get_pk_field_name() -> String; + /// Returns the name of the primary key field in the database. fn get_pk_db_name() -> String; - fn get_pk_value(&self) -> &dyn BigDataValueType; + /// Returns the value of the primary key. + fn get_pk_value(&self) -> &(dyn BigDataValueType + Send + Sync); + /// Returns a HashMap with the field name as key and the db name as value. fn get_query_fields(include_pk: bool) -> HashMap; - fn set_field_value(&mut self, field_name: &str, value: &Value) -> Result<()>; + async fn reload(&mut self) -> Result<()>; + //endregion + //region set infos + /// Sets the value of a field by its db name. + fn set_field_value(&mut self, field_name: &str, value: &Value) -> Result<()>; + fn get_field_value(&self, field_name: &str) -> Result; + /// creates a new instance of the struct from a query result row and a bigquery-client. + /// + /// # Arguments + /// * `client` - The bigquery-client to use. + /// * `row` - The query result row. The keys are the db names of the fields. fn new_from_query_result_row( - client: &'a BigqueryClient, + client: BigqueryClient, row: &HashMap, ) -> Result - where - Self: Sized; + where + Self: Sized; + + //region update + + //TODO: fn update(&mut self) -> Result<()>; + //TODO: fn delete(&mut self) -> Result<()>; + + //endregion + + //region insert + + async fn insert(&mut self) -> Result<()>; + async fn update(&mut self) -> Result<()>; + + //endregion + + //endregion } -pub trait BigQueryTable<'a>: BigQueryTableBase<'a> { +#[async_trait] +pub trait BigQueryTable: BigQueryTableBase { + fn get_parameter(value: &T, param_name: &String) -> Result + where + T: BigDataValueType + Debug, + { + trace!("get_parameter({:?}, {})", value, param_name); + let value = value.to_param(); + let param_type = T::convert_type_to_bigquery_type(); + let param_type = QueryParameterType { + type_: Some(param_type), + ..Default::default() + }; + debug!("param_type: {:?}", param_type); + debug!("param_value: {:?}", value); + let param_value = convert_value_to_string(value)?; + debug!("param_value: {:?}", param_value); + let param_value = QueryParameterValue { + value: Some(param_value), + ..Default::default() + }; + + let param = QueryParameter { + parameter_type: Some(param_type), + parameter_value: Some(param_value), + name: Some(param_name.clone()), + }; + Ok(param) + } + fn get_field_param_name(field_name: &str) -> Result { + trace!("get_field_param_name({})", field_name); + let db_name = Self::get_field_db_name(field_name)?; + Ok(format!("__PARAM_{}", db_name)) + } fn get_field_db_name(field_name: &str) -> Result { trace!("get_field_db_name({})", field_name); let query_fields = Self::get_query_fields(true); @@ -43,7 +120,7 @@ pub trait BigQueryTable<'a>: BigQueryTableBase<'a> { Self::get_table_identifier_from_client(self.get_client()) } - fn get_table_identifier_from_client(client: &'a BigqueryClient) -> String { + fn get_table_identifier_from_client(client: &BigqueryClient) -> String { trace!("get_table_identifier_from_client({:?})", client); format!( "`{}.{}.{}`", @@ -52,6 +129,95 @@ pub trait BigQueryTable<'a>: BigQueryTableBase<'a> { Self::get_table_name() ) } + + async fn get_by_pk(client: BigqueryClient, pk_value: &PK) -> Result + where + PK: BigDataValueType + Send + Sync + 'static, + Self: Sized, + { + trace!("get_by_pk({:?}, {:?})", client, pk_value); + let pk_field_name = Self::get_pk_field_name(); + let pk_db_name = Self::get_pk_db_name(); + let result = Self::query(client) + .add_where_eq(&pk_field_name, Some(pk_value))? + .run() + .await; + match result { + Ok(mut v) => { + if v.len() == 0 { + Err(format!("No entry found for {} = {:?}", pk_db_name, pk_value).into()) + } else if v.len() > 1 { + Err(format!( + "More than one entry found for {} = {:?}", + pk_db_name, pk_value + ) + .into()) + } else { + Ok(v.remove(0)) + } + } + Err(e) => Err(e), + } + } + + async fn upsert(&mut self) -> Result<()> + where + Self: Sized + Clone + Send + Sync, + { + trace!("upsert()"); + + let exists = self.clone().reload().await; //TODO: this is not very efficient + match exists { + Ok(_) => { + debug!("Updating entry on db."); + self.save().await + } + Err(_) => { + debug!("Inserting new entry."); + self.insert().await + } + } + } + + /// proxy for update + async fn save(&mut self) -> Result<()> { + self.update().await + } + + /// updates the current instance from another instance. + /// Does not save the changes to the database. + fn update_from(&mut self, other: &Self) -> Result<()> { + for (field_name, _) in Self::get_query_fields(true) { + let value = other.get_field_value(&field_name)?; + self.set_field_value(&field_name, &value)?; + } + Ok(()) + } + + fn query
(client: BigqueryClient) -> BigQueryBuilder
+ where + Table: BigQueryTable, + { + BigQueryBuilder { + client: Some(client), + ..Default::default() + } + } } -impl<'a, T> BigQueryTable<'a> for T where T: BigQueryTableBase<'a> {} +impl BigQueryTable for T where T: BigQueryTableBase {} + +#[derive(Debug, Clone)] +pub enum OrderDirection { + Ascending, + Descending, +} + +impl OrderDirection { + pub(crate) fn to_query_str(&self) -> String { + match self { + OrderDirection::Ascending => String::from("ASC"), + OrderDirection::Descending => String::from("DESC"), + } + } +} diff --git a/src/data/mod.rs b/src/data/mod.rs index 7c7a96b..457fb8f 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -2,5 +2,5 @@ pub use bigquery_table::*; // pub use bigquery_builder::*; mod bigquery_table; -pub mod bigquery_builder; -pub mod param_conversion; \ No newline at end of file +pub mod param_conversion; +pub mod query_builder; diff --git a/src/data/param_conversion/convert_bigquery_params.rs b/src/data/param_conversion/convert_bigquery_params.rs index 054f3c3..0329559 100644 --- a/src/data/param_conversion/convert_bigquery_params.rs +++ b/src/data/param_conversion/convert_bigquery_params.rs @@ -1,19 +1,21 @@ use std::fmt::Debug; use chrono::{NaiveDateTime, Utc}; -use log::trace; -use serde_json::Value; +use log::{trace, warn}; +use serde_json::{value, Value}; use crate::prelude::*; pub trait ConvertBigQueryParams { - fn from_param(value: &Value) -> Result where Self: Sized; + fn from_param(value: &Value) -> Result + where + Self: Sized; fn to_param(&self) -> Value; } impl ConvertBigQueryParams for i64 { fn from_param(value: &Value) -> Result { - let string:String = serde_json::from_value(value.clone())?; + let string: String = serde_json::from_value(value.clone())?; Ok(string.parse()?) } fn to_param(&self) -> Value { @@ -23,7 +25,7 @@ impl ConvertBigQueryParams for i64 { impl ConvertBigQueryParams for i32 { fn from_param(value: &Value) -> Result { - let string:String = serde_json::from_value(value.clone())?; + let string: String = serde_json::from_value(value.clone())?; Ok(string.parse()?) } fn to_param(&self) -> Value { @@ -52,7 +54,7 @@ impl ConvertBigQueryParams for bool { impl ConvertBigQueryParams for String { fn from_param(value: &Value) -> Result { - let string:String = serde_json::from_value(value.clone())?; + let string: String = serde_json::from_value(value.clone())?; Ok(string.parse()?) } fn to_param(&self) -> Value { @@ -71,26 +73,44 @@ impl ConvertBigQueryParams for f64 { impl ConvertBigQueryParams for chrono::DateTime { fn from_param(value: &Value) -> Result { - trace!("ConvertValueToBigqueryParamValue::from_param DateTime -> in: {:?}", value); + trace!( + "ConvertValueToBigqueryParamValue::from_param DateTime -> in: {:?}", + value + ); let value: String = serde_json::from_value(value.clone())?; let value = value.replace("T", " ").replace("Z", ""); let value = NaiveDateTime::parse_from_str(&value, "%Y-%m-%d %H:%M:%S")?; let time = chrono::DateTime::::from_utc(value, Utc); - trace!("ConvertValueToBigqueryParamValue::from_param DateTime -> out: {:?}", time); + trace!( + "ConvertValueToBigqueryParamValue::from_param DateTime -> out: {:?}", + time + ); Ok(time) } fn to_param(&self) -> Value { - trace!("ConvertValueToBigqueryParamValue::to_param DateTime -> in: {:?}", self); + trace!( + "ConvertValueToBigqueryParamValue::to_param DateTime -> in: {:?}", + self + ); let value: String = self.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); let value: String = value.replace("Z", "").replace("T", " "); - trace!("ConvertValueToBigqueryParamValue::to_param DateTime -> out: {:?}", value); + trace!( + "ConvertValueToBigqueryParamValue::to_param DateTime -> out: {:?}", + value + ); serde_json::to_value(value).unwrap() } } impl ConvertBigQueryParams for Option { - fn from_param(value: &Value) -> Result where Self: Sized { - trace!("ConvertValueToBigqueryParamValue::from_param Option: {:?}", value); + fn from_param(value: &Value) -> Result + where + Self: Sized, + { + trace!( + "ConvertValueToBigqueryParamValue::from_param Option: {:?}", + value + ); match value { Value::Null => Ok(None), _ => Ok(Some(T::from_param(value)?)), @@ -98,10 +118,31 @@ impl ConvertBigQueryParams for Option { } fn to_param(&self) -> Value { - trace!("ConvertValueToBigqueryParamValue::to_param Option: {:?}", self); + trace!( + "ConvertValueToBigqueryParamValue::to_param Option: {:?}", + self + ); match self { Some(value) => value.to_param(), None => Value::Null, } } } + +pub fn convert_value_to_string(value: Value) -> Result { + trace!( + "ConvertValueToBigqueryParamValue::convert_value_to_string: {:?}", + value + ); + return if value.is_string() { + trace!("ConvertValueToBigqueryParamValue::convert_value_type_to_bigquery_type: String"); + Ok(value::from_value(value)?) + } else { + warn!("Unknown type: {:?}", value); + + //TODO: check if this is correct with for example 'DATETIME' values + // Err(format!("Unknown type: {:?}", value).into()) + let string = value.to_string(); + Ok(string) + }; +} diff --git a/src/data/param_conversion/convert_type_to_big_query_type.rs b/src/data/param_conversion/convert_type_to_big_query_type.rs index bc7f4d4..f4acdcb 100644 --- a/src/data/param_conversion/convert_type_to_big_query_type.rs +++ b/src/data/param_conversion/convert_type_to_big_query_type.rs @@ -1,7 +1,14 @@ -use std::fmt::Display; +use log::warn; +use std::fmt::{Debug, Display}; + +use serde_json::Value; + +use crate::data::param_conversion::ConvertBigQueryParams; pub trait ConvertTypeToBigQueryType { - fn convert_type_to_bigquery_type() -> String where Self: Sized; + fn convert_type_to_bigquery_type() -> String + where + Self: Sized; } impl ConvertTypeToBigQueryType for bool { @@ -22,6 +29,18 @@ impl ConvertTypeToBigQueryType for i64 { } } +impl ConvertTypeToBigQueryType for u64 { + fn convert_type_to_bigquery_type() -> String { + "INT64".to_string() + } +} + +impl ConvertTypeToBigQueryType for f64 { + fn convert_type_to_bigquery_type() -> String { + "DOUBLE".to_string() //TODO: check if this is correct + } +} + impl ConvertTypeToBigQueryType for String { fn convert_type_to_bigquery_type() -> String { "STRING".to_string() @@ -35,7 +54,9 @@ impl ConvertTypeToBigQueryType for &str { } impl ConvertTypeToBigQueryType for chrono::DateTime - where T: chrono::TimeZone + Display + Send + Sync + 'static { +where + T: chrono::TimeZone + Display + Send + Sync + 'static, +{ fn convert_type_to_bigquery_type() -> String { "DATETIME".to_string() } diff --git a/src/data/param_conversion/mod.rs b/src/data/param_conversion/mod.rs index 6511999..5da96f0 100644 --- a/src/data/param_conversion/mod.rs +++ b/src/data/param_conversion/mod.rs @@ -1,15 +1,21 @@ use std::error::Error; use std::fmt::{Debug, Display, Formatter}; -pub use convert_bigquery_params::ConvertBigQueryParams; +pub use convert_bigquery_params::{ + convert_value_to_string, ConvertBigQueryParams, +}; pub use convert_type_to_big_query_type::ConvertTypeToBigQueryType; mod convert_bigquery_params; mod convert_type_to_big_query_type; -pub trait BigDataValueType: ConvertTypeToBigQueryType + ConvertBigQueryParams + Debug {} +pub trait BigDataValueType: +ConvertTypeToBigQueryType + ConvertBigQueryParams + Debug + Send + Sync +{} -impl BigDataValueType for T {} +impl BigDataValueType +for T +{} //region ConversionError #[derive(Debug)] @@ -39,4 +45,4 @@ impl ConversionError { } } -//endregion \ No newline at end of file +//endregion diff --git a/src/data/query_builder.rs b/src/data/query_builder.rs new file mode 100644 index 0000000..aead980 --- /dev/null +++ b/src/data/query_builder.rs @@ -0,0 +1,598 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; +use std::marker::PhantomData; + +use google_bigquery2::api::{ErrorProto, QueryParameter, QueryRequest}; +use google_bigquery2::hyper::{Body, Response}; +use log::{debug, trace}; +use serde_json::Value; + +use crate::data::param_conversion::BigDataValueType; +use crate::prelude::*; + +#[derive(Debug, Clone)] +pub struct BigqueryError { + pub message: String, + pub errors: Option>, +} + +impl BigqueryError { + fn new(message: &str, errors: Option>) -> Self { + Self { + message: message.to_string(), + errors, + } + } +} + +impl Display for BigqueryError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "BigqueryError: {}", self.message) + } +} + +impl Error for BigqueryError {} + +#[derive(Debug)] +pub enum QueryResultType
{ + WithRowData(Vec
), + WithoutRowData(Result<()>), +} + +//region typestate structs + +#[derive(Debug, Default, Clone)] +pub struct HasStartingData(Table); + +#[derive(Debug, Default, Clone)] +pub struct NoStartingData; + +#[derive(Debug, Default, Clone)] +pub struct HasClient(BigqueryClient); + +#[derive(Debug, Default, Clone)] +pub struct NoClient; + +#[derive(Debug, Default, Clone)] +pub struct QueryWasBuilt; + +#[derive(Debug, Default, Clone)] +pub struct QueryWasNotBuilt; + +#[derive(Debug, Default, Clone)] +pub struct QueryTypeNoType; + +impl HasNoQueryType for QueryTypeNoType {} + +//region insert + +#[derive(Debug, Clone)] +pub struct QueryTypeInsert; + +impl HasQueryType for QueryTypeInsert {} + +//endregion +//region select + +#[derive(Debug, Clone)] +pub struct QueryTypeSelect; + +impl HasQueryType for QueryTypeSelect {} + +//endregion +// pub struct QueryTypeNoUpdate; +// pub struct QueryTypeUpdate; +// struct QueryTypeNoDelete; +// struct QueryTypeDelete; +//endregion + +pub trait HasQueryType {} + +pub trait HasNoQueryType {} + +#[derive(Debug, Clone)] +pub struct QueryBuilder { + client: Client, + query: String, + params: Vec, + where_clauses: Vec, + order_by: Vec<(String, OrderDirection)>, + limit: Option, + + starting_data: StartingData, + + query_type: PhantomData, + query_built: PhantomData, + table: PhantomData
, +} + +impl Default +for QueryBuilder +{ + fn default() -> Self { + Self { + client: Client::default(), + query: String::new(), + params: Vec::new(), + where_clauses: Vec::new(), + order_by: Vec::new(), + limit: None, + starting_data: Default::default(), + query_type: PhantomData, + query_built: PhantomData, + table: PhantomData, + } + } +} + +impl +QueryBuilder +{ + fn get_sorted_selected_fields(&self) -> Vec<(String, String)> { + trace!("get_sorted_selected_fields()"); + let mut fields: Vec<(String, String)> = Table::get_query_fields(true).into_iter().collect(); + log::debug!("fields: {:?}", fields); + fields.sort(); + fields + } + + fn get_fields_string(&self) -> String { + let mut fields = self.get_sorted_selected_fields(); + fields + .into_iter() + .map(|f| f.1) + .collect::>() + .join(", ") + } +} + +impl +QueryBuilder +{ + //region set query content + pub fn add_where_eq(self, column: &str, value: Option<&T>) -> Result + where + T: BigDataValueType + Debug, + { + let column = Table::get_field_db_name(column)?; + let mut wheres = self.where_clauses; + + if let Some(value) = value { + let param_name = format!("__PARAM_{}", self.params.len()); + + let param = Table::get_parameter(value, ¶m_name)?; + + let mut required_params = self.params; + required_params.push(param); + + wheres.push(format!("{} = @{}", column, param_name)); + + return Ok(Self { + where_clauses: wheres, + params: required_params, + ..self + }); + } + + wheres.push(format!("{} is NULL", column)); + Ok(Self { + where_clauses: wheres, + ..self + }) + } + + pub fn set_limit(self, limit: u32) -> Self { + Self { + limit: Some(limit), + ..self + } + } + //endregion + + //region build query + fn build_where_string(&self) -> String { + let mut where_string = String::new(); + if !self.where_clauses.is_empty() { + where_string.push_str(" WHERE "); + where_string.push_str(&self.where_clauses.join(" AND ")); + } + where_string + } + fn build_order_by_string(&self) -> String { + let mut order_by_string = String::new(); + if !self.order_by.is_empty() { + order_by_string.push_str(" ORDER BY "); + order_by_string.push_str( + &self + .order_by + .iter() + .map(|(column, direction)| format!("{} {}", column, direction.to_query_str())) + .collect::>() + .join(", "), + ); + } + order_by_string + } + fn build_limit_string(&self) -> String { + let mut limit_string = String::new(); + if let Some(limit) = self.limit { + limit_string.push_str(" LIMIT "); + limit_string.push_str(&limit.to_string()); + } + limit_string + } + //endregion +} + +impl +QueryBuilder +{ + pub fn set_data( + self, + data: Table, + ) -> QueryBuilder> { + QueryBuilder { + starting_data: HasStartingData(data), + query_built: PhantomData, + params: self.params, + where_clauses: self.where_clauses, + order_by: self.order_by, + limit: self.limit, + query_type: PhantomData, + table: PhantomData, + client: self.client, + query: self.query, + } + } +} + +impl +QueryBuilder +{ + pub fn select() -> QueryBuilder + { + QueryBuilder { + query: String::from("SELECT "), + ..Default::default() + } + } + pub fn insert() -> QueryBuilder + { + QueryBuilder { + query: String::from("INSERT INTO "), + ..Default::default() + } + } +} + +impl +QueryBuilder> +{ + pub fn build_query( + self, + ) -> Result< + QueryBuilder>, + > { + trace!("build_query: insert: {:?}", self); + let table_identifier = Table::get_table_identifier_from_client(&self.client.0); + let fields = self.get_fields_string(); + let values = self.get_values_params_string()?; + let params = &self.params; + log::warn!("params are not used in insert query: {:?}", params); + let mut params = vec![]; + let local_fields = Table::get_query_fields(true); + let starting_data = &self.starting_data.0; + for (local_field_name, _) in local_fields { + let para = Table::get_parameter_from_field(starting_data, &local_field_name)?; + params.push(para); + } + + let query = format!( + "insert into {} ({}) values({})", + table_identifier, fields, values + ); + Ok(QueryBuilder { + query, + params, + where_clauses: self.where_clauses, + order_by: self.order_by, + limit: self.limit, + client: self.client, + table: self.table, + starting_data: self.starting_data, + query_type: self.query_type, + query_built: PhantomData, + }) + } + + fn get_values_params_string(&self) -> Result { + let values = self.get_value_parameter_names()?; + Ok(values + .iter() + .map(|v| format!("@{}", v)) + .collect::>() + .join(", ")) + } + + fn get_value_parameter_names(&self) -> Result> { + let mut values = self.get_sorted_selected_fields(); + let res = values + .iter_mut() + .map(|(field, _)| Table::get_field_param_name(field)) + .collect::>>()?; + Ok(res) + } +} + +impl +QueryBuilder +{ + pub fn add_order_by( + mut self, + column_name: impl Into, + direction: OrderDirection, + ) -> Self { + self.order_by.push((column_name.into(), direction)); + self + } +} + +impl +QueryBuilder +{ + pub fn build_query( + self, + ) -> QueryBuilder { + trace!("build_query: select: {:?}", self); + + let table_identifier = Table::get_table_identifier_from_client(&self.client.0); + let fields_str = self.get_fields_string(); + let where_clause = self.build_where_string(); + let order_by_clause = self.build_order_by_string(); + let limit_clause = self.build_limit_string(); + let query = format!( + "SELECT {} FROM {}{}{}{}", + fields_str, table_identifier, where_clause, order_by_clause, limit_clause + ); + QueryBuilder { + query, + where_clauses: self.where_clauses, + order_by: self.order_by, + limit: self.limit, + client: self.client, + params: self.params, + table: self.table, + starting_data: self.starting_data, + query_type: self.query_type, + query_built: PhantomData, + } + } +} + +impl +QueryBuilder +{ + pub fn with_client( + self, + client: BigqueryClient, + ) -> QueryBuilder { + QueryBuilder { + client: HasClient(client), + table: self.table, + query_type: self.query_type, + query_built: self.query_built, + query: self.query, + where_clauses: self.where_clauses, + order_by: self.order_by, + limit: self.limit, + params: self.params, + starting_data: self.starting_data, + } + } +} + +impl +QueryBuilder +{ + pub fn un_build( + self, + ) -> QueryBuilder { + QueryBuilder { + client: self.client, + table: self.table, + query_type: self.query_type, + query: self.query, + where_clauses: self.where_clauses, + order_by: self.order_by, + limit: self.limit, + params: self.params, + starting_data: self.starting_data, + query_built: PhantomData, + } + } +} + +impl +QueryBuilder +{ + pub async fn run(self) -> Result> { + trace!("run query: {}", self.query); + debug!( + "Running query with params: {}\t params: {:?}", + self.query, self.params + ); + let sorted_fields = self.get_sorted_selected_fields(); + let query = Some(self.query); + let query_parameters = match self.params.is_empty() { + true => None, + false => Some(self.params), + }; + let query_request = QueryRequest { + query, + query_parameters, + use_legacy_sql: Some(false), + ..Default::default() + }; + let client = self.client.0; + let (_, query_response) = run_query_with_client(&client, query_request).await?; + // if let Some(errors) = query_response.errors { + // return Err(BigqueryError::new("Query returned errors", Some(errors)).into()); + // } + debug!( + "total rows returned: {}", + query_response.total_rows.unwrap_or(0) + ); + //TODO: pagination is not implemented + let mut result: Vec
= vec![]; + for row in query_response.rows.unwrap_or_default() { + let mut row_result: HashMap = HashMap::new(); + for (i, field) in row.f.unwrap_or_default().into_iter().enumerate() { + let field_db_name = sorted_fields[i].1.clone(); + let field_value = field.v.unwrap_or(Value::Null); + row_result.insert(field_db_name, field_value); + } + let row_result = Table::new_from_query_result_row(client.clone(), &row_result)?; + result.push(row_result); + } + debug!("total rows parsed: {}", result.len()); + + Ok(QueryResultType::WithRowData(result)) + } +} + +async fn run_query_with_client( + client: &BigqueryClient, + request: QueryRequest, +) -> Result<(Response, google_bigquery2::api::QueryResponse)> { + let project_id = client.get_project_id(); + let (response, query_response) = client + .get_client() + .jobs() + .query(request, project_id) + .doit() + .await?; + + if response.status() != 200 { + return Err(format!("Wrong status code returned! ({})", response.status()).into()); + } + + Ok((response, query_response)) +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use serde_json::Value; + + use super::*; + + #[derive(Debug, Default, Clone)] + struct TestTable { + client: BigqueryClient, + row_id: i64, + info1: Option, + info3: Option, + info4i: Option, + info4b: Option, + } + + #[async_trait::async_trait] + impl BigQueryTableBase for TestTable { + fn get_all_params(&self) -> Result> { + todo!() + } + + fn get_parameter_from_field(&self, field_name: &str) -> Result { + todo!() + } + + fn get_table_name() -> String { + todo!() + } + + fn get_client(&self) -> &BigqueryClient { + todo!() + } + + fn set_client(&mut self, client: BigqueryClient) { + todo!() + } + + fn get_pk_field_name() -> String { + todo!() + } + + fn get_pk_db_name() -> String { + todo!() + } + + fn get_pk_value(&self) -> &(dyn BigDataValueType + Send + Sync) { + todo!() + } + + fn get_query_fields(include_pk: bool) -> HashMap { + todo!() + } + + async fn reload(&mut self) -> Result<()> { + todo!() + } + + fn set_field_value(&mut self, field_name: &str, value: &Value) -> Result<()> { + todo!() + } + + fn get_field_value(&self, field_name: &str) -> Result { + todo!() + } + + fn new_from_query_result_row( + client: BigqueryClient, + row: &HashMap, + ) -> Result + where + Self: Sized, + { + todo!() + } + + async fn insert(&mut self) -> Result<()> { + todo!() + } + + async fn update(&mut self) -> Result<()> { + todo!() + } + } + + impl TestTable { + fn select() -> QueryBuilder + { + QueryBuilder::::select() + } + fn insert() -> QueryBuilder> + { + QueryBuilder::>::insert() + } + } + + #[tokio::test] + async fn test1() { + let client = BigqueryClient::new("test", "", None).await.unwrap(); + let query_builder = TestTable::select().with_client(client.clone()); + println!("{:?}", query_builder); + let query_builder = query_builder.build_query(); + + println!("query: {:?}", query_builder); + let query_builder = TestTable::insert(); + println!("{:?}", query_builder); + let query_builder = query_builder.with_client(client); + let query_builder = query_builder + .build_query() + .expect("build of insert query failed"); + let result = query_builder.clone().run().await; + println!("query: {:?}", query_builder); + } +} diff --git a/src/lib.rs b/src/lib.rs index 0e055f3..28ecbc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ pub mod client; pub mod data; -pub mod utils; pub mod prelude; +pub mod utils; diff --git a/src/prelude.rs b/src/prelude.rs index e582d97..217a349 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,12 +1,6 @@ pub use google_bigquery_v2_derive::BigDataTableDerive; pub use crate::client::BigqueryClient; -pub use crate::data::{bigquery_builder::{ - BigQueryBuilder, - BigQueryBuilderAvailable, - OrderDirection, -}, - BigQueryTable, - BigQueryTableBase}; +pub use crate::data::{BigQueryTable, BigQueryTableBase, OrderDirection}; pub type Result = std::result::Result>; diff --git a/tests/tests.rs b/tests/tests.rs index 80bb960..8fe2846 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,13 +1,17 @@ use log::{debug, info, LevelFilter}; use nameof::name_of; +use google_bigquery_v2::data::query_builder::{ + HasStartingData, NoClient, NoStartingData, QueryBuilder, QueryTypeInsert, QueryTypeNoType, + QueryTypeSelect, QueryWasNotBuilt, +}; use google_bigquery_v2::prelude::*; -#[derive(BigDataTableDerive, Debug, Clone)] +#[derive(BigDataTableDerive, Debug, Default, Clone)] #[db_name("Infos")] -pub struct DbInfos<'a> { +pub struct DbInfos { #[client] - client: &'a BigqueryClient, + client: BigqueryClient, #[primary_key] #[db_name("Id")] row_id: i64, @@ -19,6 +23,57 @@ pub struct DbInfos<'a> { #[db_name("yes")] info4b: Option::, } +pub struct DbInfos2{ + client: BigqueryClient, + row_id: i64, + info1: Option::, + info2: Option, + info3: Option, + info4i: Option, + info4b: Option, +} + +//TODO: outsource this impl into the derive +impl DbInfos { + fn select() -> QueryBuilder { + QueryBuilder::::select() + } + fn insert() -> QueryBuilder { + QueryBuilder::::insert() + } +} + +#[tokio::test] +async fn test1() { + init_logger(); + let client = get_test_client().await; + let query_builder = DbInfos::select().with_client(client.clone()); + debug!("{:?}", query_builder); + let query_builder = query_builder.build_query(); + + debug!("query: {:?}", query_builder); + let result = query_builder.clone().run().await; + debug!("select result: {:?}", result); + let sample_data = DbInfos { + client: client.clone(), + row_id: 1, + info1: Some("test1".to_string()), + info2: Some("test2".to_string()), + info3: Some("test3".to_string()), + info4i: Some(1), + info4b: Some(true), + }; + + let query_builder = DbInfos::insert(); + println!("{:?}", query_builder); + let query_builder = query_builder.with_client(client); + let query_builder = query_builder.set_data(sample_data); + let query_builder = query_builder.build_query().expect("query builder failed"); + println!("query: {:?}", query_builder); + let result = query_builder.clone().run().await; + println!("query: {:?}", query_builder); + println!("result: {:?}", result); +} #[tokio::test] async fn test_get_table_name() { @@ -34,8 +89,8 @@ async fn test_get_query_fields() { let fields = DbInfos::get_query_fields(true); log::debug!("fields: {:?}", fields); assert_eq!(6, fields.len(), "fields length is not correct"); - assert_eq!("Id", fields.get("row_id").unwrap(), ); - assert_eq!("info1", fields.get("info1").unwrap(), ); + assert_eq!("Id", fields.get("row_id").unwrap(),); + assert_eq!("info1", fields.get("info1").unwrap(),); assert_eq!("info", fields.get("info2").unwrap()); assert_eq!("info3", fields.get("info3").unwrap()); assert_eq!("info4i", fields.get("info4i").unwrap()); @@ -46,7 +101,7 @@ async fn test_get_query_fields() { async fn test_query_builder_1() { init_logger(); let client = get_test_client().await; - let query_builder: BigQueryBuilder = DbInfos::query(&client); + let query_builder: BigQueryBuilder = DbInfos::query(client); let query_builder: BigQueryBuilder = query_builder .add_where_eq::(name_of!(info1 in DbInfos), None) .unwrap() @@ -88,33 +143,36 @@ async fn get_test_client() -> BigqueryClient { async fn simple_query() { init_logger(); let client = get_test_client().await; - let q = DbInfos::query(&client) + let q: Vec = DbInfos::query(client) .add_order_by(name_of!(row_id in DbInfos), OrderDirection::Descending) - .run().await.unwrap(); + .run() + .await + .unwrap(); let mut last_num = 999999999999999999; for line in q { info!("line: {:?}", line); - debug!("row_id > last: {} <= {}",line.row_id, last_num); + debug!("row_id > last: {} <= {}", line.row_id, last_num); assert!(line.row_id <= last_num); last_num = line.row_id; } } + #[tokio::test] async fn test_select_limit_1() { init_logger(); let client = get_test_client().await; - let q = DbInfos::query(&client) - .set_limit(1) - .run().await.unwrap(); + let q: Vec = DbInfos::query(client).set_limit(1).run().await.unwrap(); assert_eq!(q.len(), 1); } fn init_logger() { + let global_level = LevelFilter::Info; + let own_level = LevelFilter::Debug; let _ = env_logger::builder() .is_test(true) - .filter_level(LevelFilter::Info) - .filter_module("google_bigquery_v2", LevelFilter::Trace) - .filter_module("google_bigquery_v2_derive", LevelFilter::Trace) - .filter_module("tests", LevelFilter::Trace) + .filter_level(global_level) + .filter_module("google_bigquery_v2", own_level) + .filter_module("google_bigquery_v2_derive", own_level) + .filter_module("tests", own_level) .try_init(); }