From 236478ff8c5d5ea275a923449a944dd48a36506b Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Sun, 19 Feb 2023 02:32:53 +0100 Subject: [PATCH] pretty good progress --- .cargo/config.toml | 6 + .gitignore | 4 + Cargo.toml | 17 + google_bigquery_derive/.gitignore | 4 + google_bigquery_derive/Cargo.toml | 15 + google_bigquery_derive/src/lib.rs | 438 ++++++++++++++++++ src/client.rs | 52 +++ src/data/big_data_table_base.rs | 29 ++ src/data/big_data_table_base_convenience.rs | 213 +++++++++ src/data/mod.rs | 202 ++++++++ src/googlebigquery.rs | 33 ++ src/lib.rs | 17 + src/tests.rs | 248 ++++++++++ src/utils/convert_type_to_big_query_type.rs | 35 ++ .../convert_value_to_bigquery_param_value.rs | 21 + src/utils/mod.rs | 8 + 16 files changed, 1342 insertions(+) create mode 100644 .cargo/config.toml create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 google_bigquery_derive/.gitignore create mode 100644 google_bigquery_derive/Cargo.toml create mode 100644 google_bigquery_derive/src/lib.rs create mode 100644 src/client.rs create mode 100644 src/data/big_data_table_base.rs create mode 100644 src/data/big_data_table_base_convenience.rs create mode 100644 src/data/mod.rs create mode 100644 src/googlebigquery.rs create mode 100644 src/lib.rs create mode 100644 src/tests.rs create mode 100644 src/utils/convert_type_to_big_query_type.rs create mode 100644 src/utils/convert_value_to_bigquery_param_value.rs create mode 100644 src/utils/mod.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..491c46e --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,6 @@ +[build] + +rustflags = [ +# "--cfg", "man_impl=\"true\"", +# "--cfg", "man_impl_has_client=\"false\"" +] # custom flags to pass to all compiler invocations diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..43b3fc1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target +/Cargo.lock +/.idea +/auth \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..365795a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "google_bigquery" +version = "0.1.0" +edition = "2021" + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +reqwest = { version = "0.11.13", features = ["default", "json"] } +tokio = { version = "1.23.0", features = ["full"] } +serde = { version = "1.0.130", features = ["derive", "default"] } +serde_json = "1.0" +google-bigquery2 = "4.0.1" +async-trait = "0.1.60" +google_bigquery_derive = { path = "./google_bigquery_derive" } diff --git a/google_bigquery_derive/.gitignore b/google_bigquery_derive/.gitignore new file mode 100644 index 0000000..97d204a --- /dev/null +++ b/google_bigquery_derive/.gitignore @@ -0,0 +1,4 @@ +/target +**/*.rs.bk +Cargo.lock +/.idea \ No newline at end of file diff --git a/google_bigquery_derive/Cargo.toml b/google_bigquery_derive/Cargo.toml new file mode 100644 index 0000000..2156f26 --- /dev/null +++ b/google_bigquery_derive/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "google_bigquery_derive" +version = "0.0.0" +authors = ["OMGeeky "] +description = "A `cargo generate` template for quick-starting a procedural macro crate" +keywords = ["template", "proc_macro", "procmacro"] +edition = "2018" + +[lib] +proc-macro = true + +[dependencies] +quote = "1" +proc-macro2 = "1.0" +syn = { version = "1.0", features = ["extra-traits", "parsing", "full"] } diff --git a/google_bigquery_derive/src/lib.rs b/google_bigquery_derive/src/lib.rs new file mode 100644 index 0000000..10cf189 --- /dev/null +++ b/google_bigquery_derive/src/lib.rs @@ -0,0 +1,438 @@ +#![allow(unused)] +extern crate proc_macro; + +use std::any::Any; + +use proc_macro2::{Ident, TokenStream}; +use quote::quote; +use syn::{DeriveInput, parse_macro_input}; + +struct Field { + field_ident: quote::__private::Ident, + db_name: std::string::String, + local_name: std::string::String, + ty: syn::Type, + required: bool, +} + +#[proc_macro_derive(BigDataTable, +attributes(primary_key, client, db_name, db_ignore, required))] +pub fn big_data_table(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let ast = syn::parse(input).unwrap(); + let tokens = implement_derive(&ast); + tokens.into() +} + +#[proc_macro_derive(HasBigQueryClient, attributes(client))] +pub fn has_big_query_client(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let ast = syn::parse(input).unwrap(); + let tokens = implement_derive_has_big_query_client(&ast); + tokens.into() +} + +fn implement_derive_has_big_query_client(ast: &DeriveInput) -> TokenStream { + let table_ident = &ast.ident; + let client = get_client_field(&ast); + let implementation_has_bigquery_client = implement_has_bigquery_client_trait(table_ident, &client.field_ident); + quote! { + #implementation_has_bigquery_client; + } +} + +fn implement_derive(ast: &DeriveInput) -> TokenStream { + let table_ident = &ast.ident; + let pk = get_pk_field(&ast); + let implementation_big_data_table_base = implement_big_data_table_base_trait(table_ident, &pk, ast); + + let tokens = quote! { + #implementation_big_data_table_base; + }; + tokens +} + +fn implement_big_data_table_base_trait(table_ident: &Ident, primary_key: &Field, ast: &DeriveInput) -> TokenStream { + let pk_ty = &primary_key.ty; + let client_field = get_client_field(&ast); + + let mut db_fields = get_fields(&ast.data); + + + let get_pk_name = get_get_pk_name(primary_key); + let get_pk_value = get_get_pk_value(primary_key); + let get_field_name = get_get_field_name(ast, &db_fields); + + db_fields.retain(|f|f.local_name != client_field.local_name); + + let write_from_table_row = get_write_from_table_row(&db_fields); + let get_query_fields = get_get_query_fields(ast); + let get_table_name = get_get_table_name(&table_ident); + let create_with_pk = get_create_with_pk(ast); + let create_from_table_row = get_create_from_table_row(ast); + let get_query_fields_update_str = get_get_query_fields_update_str(ast); + let get_all_query_parameters = get_get_all_query_parameters(ast); + quote! { + impl<'a> BigDataTableBase<'a, #table_ident<'a>, #pk_ty> for #table_ident<'a> { + #get_pk_name + #get_field_name + #get_query_fields + #get_table_name + #create_with_pk + #create_from_table_row + #write_from_table_row + #get_pk_value + #get_query_fields_update_str + #get_all_query_parameters + } + } +} + +//region BigDataTableBase functions + +fn get_get_pk_name(primary_key_field: &Field) -> TokenStream { + let pk_name = &primary_key_field.db_name; + quote! { + fn get_pk_name() -> String { + Self::get_field_name(stringify!(#pk_name)).unwrap() + } + } +} + +fn get_get_pk_value(pk_field: &Field) -> TokenStream { + let pk_ident = &pk_field.field_ident; + quote! { + fn get_pk_value(&self) -> i64 { + self.#pk_ident + } + } +} + +fn get_get_query_fields_update_str(ast: &DeriveInput) -> TokenStream { + quote! { + fn get_query_fields_update_str(&self) -> String { + todo!();//TODO get_query_fields_update_str + } + } +} + +fn get_get_all_query_parameters(ast: &DeriveInput) -> TokenStream { + quote! { + fn get_all_query_parameters(&self) -> Vec { + todo!();//TODO get_all_query_parameters + } + } +} + +fn get_write_from_table_row(db_fields: &Vec) -> TokenStream { + fn get_write_from_table_row_single_field(field: &Field) -> TokenStream { + let field_ident = &field.field_ident; + let field_name = &field.db_name; + if field.required { + /* + let pk_index = *index_to_name_mapping.get(&Self::get_pk_name()).unwrap(); + let pk = row + .f.as_ref() + .unwrap()[pk_index] + .v.as_ref() + .unwrap() + .parse::() + .unwrap(); + */ + quote! { + let index = *index_to_name_mapping.get(Self::get_field_name(stringify!(#field_name))?.as_str()).unwrap(); + self.#field_ident = row.f.as_ref() + .unwrap()[index] + .v.as_ref() + .unwrap() + .parse() + .unwrap(); + } + } else { + /* + let info1 = *index_to_name_mapping.get(Self::get_field_name(stringify!(info1))?.as_str()).unwrap(); + self.info1 = match cell[info1].v.as_ref() { + Some(v) => Some(v.parse()?), + None => None + }; + */ + quote! { + let index = *index_to_name_mapping.get(Self::get_field_name(stringify!(#field_name))?.as_str()).unwrap(); + self.#field_ident = match row.f.as_ref().unwrap()[index].v.as_ref() { + Some(v) => Some(v.parse()?), + None => None + }; + } + } + } + + let tokens: Vec = db_fields.iter().map(|field| get_write_from_table_row_single_field(field)).collect(); + quote! { + fn write_from_table_row(&mut self, row: &TableRow, index_to_name_mapping: &HashMap) -> Result<(), Box> { + #(#tokens)* + Ok(()) + } + } +} + +fn get_create_from_table_row(ast: &DeriveInput) -> TokenStream { + quote! { + + fn create_from_table_row(client: &'a BigqueryClient, + row: &google_bigquery2::api::TableRow, + index_to_name_mapping: &HashMap) + -> Result> + where + Self: Sized{ + todo!();//TODO create_from_table_row + } + } +} + +fn get_create_with_pk(ast: &DeriveInput) -> TokenStream { + quote! { + fn create_with_pk(client: &'a BigqueryClient, pk: i64) -> Self { + todo!();//TODO create_with_pk + } + } +} + +fn get_get_table_name(table_ident: &Ident) -> TokenStream { + quote! { + fn get_table_name() -> String { + stringify!(#table_ident).to_string() + } + } +} + +fn get_get_query_fields(ast: &DeriveInput) -> TokenStream { + quote! { + fn get_query_fields() -> HashMap { + todo!();//TODO get_query_fields + } + } +} + +fn get_get_field_name(ast: &DeriveInput, db_fields: &Vec) -> TokenStream { + let mut mapping: Vec<(&Ident, String)> = Vec::new(); + for db_field in db_fields { + let field_name_local = &db_field.field_ident; + let mut field_name_remote = &db_field.db_name; + mapping.push((field_name_local, field_name_remote.to_string())); + } + + let mapping_tok: Vec = mapping.iter().map(|(field_name_local, field_name_remote)| { + quote! { + #field_name_local => Ok(#field_name_remote.to_string()), + } + }).collect(); + + + quote! { + fn get_field_name(field_name: &str) -> Result> { + match field_name { + //ex.: "row_id" => Ok("Id".to_string()), + #(#mapping_tok)* + _ => Err("Field not found".into()), + } + } + } +} + +//endregion + +fn implement_has_bigquery_client_trait(table_ident: &Ident, client_ident: &Ident) -> TokenStream { + let implementation_has_bigquery_client = quote! { + impl<'a> HasBigQueryClient<'a> for #table_ident<'a> { + fn get_client(&self) -> &'a BigqueryClient { + self.#client_ident + } + } + }; + implementation_has_bigquery_client +} + +//region Helper functions + +fn get_helper_fields(ast: &syn::DeriveInput) -> (Field, Field) { + let pk = get_pk_field(&ast); + let client = get_client_field(&ast); + (pk, client) +} + +fn get_pk_field(ast: &&DeriveInput) -> Field { + let mut pk_fields = get_attributed_fields(&ast.data, "primary_key"); + if pk_fields.len() != 1 { + panic!("Exactly one primary key field must be specified"); + } + let pk = pk_fields.remove(0); + pk +} + +fn get_client_field(ast: &&DeriveInput) -> Field { +//region client + let mut client_fields = get_attributed_fields(&ast.data, "client"); + if client_fields.len() != 1 { + panic!("Exactly one client field must be specified"); + } + let client = client_fields.remove(0); + //endregion + client +} + + +fn get_fields(data: &syn::Data) -> Vec { + let mut res = vec![]; + + match data { + syn::Data::Struct(ref data_struct) => match data_struct.fields { + syn::Fields::Named(ref fields_named) => { + 'field_loop: for field in fields_named.named.iter() { + if let Some(ident) = &field.ident { + let mut name = None; + let mut required = false; + let attrs = &field.attrs; + for attribute in attrs { + if attribute.path.is_ident("db_ignore") { + continue 'field_loop; //skip this field completely + } + if attribute.path.is_ident("db_name") { + let args: syn::LitStr = + attribute.parse_args().expect("Failed to parse target name"); + let args = args.value(); + name = Some(args); + } + if attribute.path.is_ident("required") { + required = true; + } + } + + let local_name = ident.to_string(); + let name = match name { + None => local_name.clone(), + Some(n) => n, + }; + // let name: String = "".to_string(); + res.push(Field { + field_ident: ident.clone(), + local_name, + db_name: name, + ty: field.ty.clone(), + required, + }); + } + } + } + _ => (), + }, + _ => panic!("Must be a struct!"), + }; + + return res; +} + + +fn get_attributed_fields(data: &syn::Data, attribute_name: &str) -> Vec { + let mut res = vec![]; + match data { + // Only process structs + syn::Data::Struct(ref data_struct) => { + // Check the kind of fields the struct contains + match data_struct.fields { + // Structs with named fields + syn::Fields::Named(ref fields_named) => { + // Iterate over the fields + 'field_loop: for field in fields_named.named.iter() { + if let Some(ident) = &field.ident { + // Get attributes #[..] on each field + for attr in field.attrs.iter() { + // Parse the attribute + if attr.path.is_ident(attribute_name) { + let mut name = None; + let mut required = false; + let attrs = &field.attrs; + for attribute in attrs { + if attribute.path.is_ident("db_ignore") { + continue 'field_loop; //skip this field completely + } else if attribute.path.is_ident("db_name") { + let args: syn::LitStr = attribute + .parse_args() + .expect("Failed to parse target name"); + let args = args.value(); + name = Some(args); + } else if attribute.path.is_ident("required") { + required = true; + } else if attribute.path.is_ident("primary_key") { + required = true; + } + } + + let local_name = ident.to_string(); + let name = match name { + None => local_name.clone(), + Some(n) => n, + }; + + let item = field.clone(); + res.push(Field { + field_ident: item.ident.unwrap(), + local_name, + ty: item.ty, + db_name: name, + required, + }); + } + } + } + } + } + + // Struct with unnamed fields + _ => (), + } + } + + // Panic when we don't have a struct + _ => panic!("Must be a struct"), + } + + // let res = res.iter();//.map(|(ident, ty)| (ident)).collect(); + // .fold(quote!(), |es, (name, ty)| (name, ty)); + return res; +} + +//endregion + +/* +/// Example of [function-like procedural macro][1]. +/// +/// [1]: https://doc.rust-lang.org/reference/procedural-macros.html#function-like-procedural-macros +#[proc_macro] +pub fn my_macro(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + + let tokens = quote! { + #input + + struct Hello; + }; + + tokens.into() +} +*/ + +/* +/// Example of user-defined [procedural macro attribute][1]. +/// +/// [1]: https://doc.rust-lang.org/reference/procedural-macros.html#attribute-macros +#[proc_macro_attribute] +pub fn my_attribute(_args: TokenStream, input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + + let tokens = quote! { + #input + + struct Hello; + }; + + tokens.into() +} +*/ \ No newline at end of file diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..52e0e37 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,52 @@ +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; + +use google_bigquery2::Bigquery; +use google_bigquery2::hyper::client::HttpConnector; +use google_bigquery2::hyper_rustls::HttpsConnector; + +use crate::googlebigquery; + +pub struct BigqueryClient { + client: Bigquery>, + project_id: String, + dataset_id: String, +} + +impl BigqueryClient { + pub async fn new>( + project_id: S, + dataset_id: S, + service_account_path: Option, + ) -> Result> { + let client = googlebigquery::get_client(service_account_path).await?; + Ok(BigqueryClient { + client, + project_id: project_id.into(), + dataset_id: dataset_id.into(), + }) + } + + pub fn get_client(&self) -> &Bigquery> { + &self.client + } + pub fn get_project_id(&self) -> &str { + &self.project_id + } + pub fn get_dataset_id(&self) -> &str { + &self.dataset_id + } +} + +impl Debug for BigqueryClient { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BigqueryClient") + .field("project_id", &self.project_id) + .field("dataset_id", &self.dataset_id) + .finish() + } +} + +pub trait HasBigQueryClient<'a> { + fn get_client(&self) -> &'a BigqueryClient; +} \ No newline at end of file diff --git a/src/data/big_data_table_base.rs b/src/data/big_data_table_base.rs new file mode 100644 index 0000000..be918b3 --- /dev/null +++ b/src/data/big_data_table_base.rs @@ -0,0 +1,29 @@ +use std::collections::HashMap; +use std::error::Error; +use std::str::FromStr; + +use crate::client::{BigqueryClient, HasBigQueryClient}; +use crate::utils::BigDataValueType; + +pub trait BigDataTableBase<'a, TABLE, TPK>: HasBigQueryClient<'a> + where TPK: BigDataValueType + FromStr + std::fmt::Debug { + fn get_pk_name() -> String; + fn get_field_name(field_name: &str) -> Result>; + fn get_query_fields() -> HashMap; + fn get_table_name() -> String; + fn create_with_pk(client: &'a BigqueryClient, pk: TPK) -> TABLE; + fn write_from_table_row(&mut self, + row: &google_bigquery2::api::TableRow, + index_to_name_mapping: &HashMap) + -> Result<(), Box>; + fn get_pk_value(&self) -> TPK; + fn get_query_fields_update_str(&self) -> String; + fn get_all_query_parameters(&self) -> Vec; + + fn create_from_table_row(client: &'a BigqueryClient, + row: &google_bigquery2::api::TableRow, + index_to_name_mapping: &HashMap) + -> Result> + where + Self: Sized; +} diff --git a/src/data/big_data_table_base_convenience.rs b/src/data/big_data_table_base_convenience.rs new file mode 100644 index 0000000..ba1d9ff --- /dev/null +++ b/src/data/big_data_table_base_convenience.rs @@ -0,0 +1,213 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Debug; +use std::str::FromStr; + +use google_bigquery2::api::{QueryParameter, QueryParameterType, QueryParameterValue}; + +use crate::client::BigqueryClient; +use crate::data::BigDataTableBase; +use crate::utils::BigDataValueType; + +pub trait BigDataTableBaseConvenience<'a, TABLE, TPK> +: BigDataTableBase<'a, TABLE, TPK> + where TPK: BigDataValueType + FromStr + Debug { + fn get_pk_param(&self) -> google_bigquery2::api::QueryParameter; + fn get_query_fields_str() -> String; + fn get_query_fields_insert_str() -> String; + + fn get_where_part(field_name: &str, is_comparing_to_null: bool) -> String; + + async fn run_get_query(&self, query: &str, project_id: &str) + -> Result>; + + async fn run_get_query_with_params(&self, + query: &str, + parameters: Vec, + project_id: &str) + -> Result>; + + async fn run_get_query_with_params_on_client(client: &'a BigqueryClient, + query: &str, + parameters: Vec, + project_id: &str) + -> Result>; + + // async fn get_identifier_and_base_where(&self) -> Result<(String, String), Box>; + async fn get_identifier(&self) -> Result>; + async fn get_identifier_from_client(client: &'a BigqueryClient) -> Result>; + fn get_base_where() -> String; + + // async fn get_identifier_and_base_where_from_client(client: &'a BigqueryClient, pk_name: &str, table_name: &str) -> Result<(String, String), Box>; + + fn get_query_param(field_name: &str, field_value: &Option) + -> google_bigquery2::api::QueryParameter; + + fn parse_value_to_parameter(value: &TValue) -> String + where TValue: std::fmt::Display + BigDataValueType; + + // fn create_from_table_row(client: &'a BigqueryClient, + // row: &google_bigquery2::api::TableRow, + // index_to_name_mapping: &HashMap) + // -> Result> + // where + // Self: Sized; +} + +impl<'a, TABLE, TPK> BigDataTableBaseConvenience<'a, TABLE, TPK> for TABLE + where + TABLE: BigDataTableBase<'a, TABLE, TPK>, + TPK: BigDataValueType + FromStr + Debug, + ::Err: Debug, +{ + fn get_pk_param(&self) -> QueryParameter { + Self::get_query_param(&Self::get_pk_name(), &Some(self.get_pk_value())) + // QueryParameter { + // name: Some(format!("__{}",Self::get_pk_name())), + // parameter_type: Some(QueryParameterType { + // array_type: None, + // struct_types: None, + // type_: Some(TPK::to_bigquery_type()), + // }), + // parameter_value: Some(QueryParameterValue { + // value: Some(self.get_pk_value().to_bigquery_param_value()), + // ..Default::default() + // }), + // } + } + fn get_query_fields_str() -> String { + Self::get_query_fields().values().into_iter() + .map(|v| format!("{}", v)) + .collect::>() + .join(", ") + } + + fn get_query_fields_insert_str() -> String { + Self::get_query_fields() + .values() + .into_iter() + .map(|v| format!("@__{}", v)) + .collect::>() + .join(", ") + } + + fn get_where_part(field_name: &str, is_comparing_to_null: bool) -> String { + if is_comparing_to_null { + format!("{} IS NULL", field_name) + } else { + format!("{} = @__{}", field_name, field_name) + } + } + + + async fn run_get_query(&self, query: &str, project_id: &str) + -> Result> { + let parameters = vec![self.get_pk_param()];//default parameters (pk) + self.run_get_query_with_params(query, parameters, project_id).await + } + + + async fn run_get_query_with_params(&self, + query: &str, + parameters: Vec, + project_id: &str) + -> Result> { + let client = self.get_client(); + Self::run_get_query_with_params_on_client(client, query, parameters, project_id).await + } + + async fn run_get_query_with_params_on_client(client: &'a BigqueryClient, + query: &str, + parameters: Vec, + project_id: &str) + -> Result> { + println!("Query: {}", query); + println!("Parameters: {}", parameters.len()); + for (i, param) in parameters.iter().enumerate() { + println!("{:2}: {:?}", i, param); + } + println!(); + let req = google_bigquery2::api::QueryRequest { + query: Some(query.to_string()), + query_parameters: Some(parameters), + use_legacy_sql: Some(false), + ..Default::default() + }; + let (res, query_res) = client.get_client().jobs().query(req, project_id) + .doit().await?; + + if res.status() != 200 { + return Err(format!("Wrong status code returned! ({})", res.status()).into()); + } + Ok(query_res) + } + // async fn get_identifier_and_base_where(&self) + // -> Result<(String, String), Box> { + // let pk_name = Self::get_pk_name(); + // let table_name = Self::get_table_name(); + // Ok(Self::get_identifier_and_base_where_from_client(&self.get_client(), &pk_name, &table_name).await?) + // } + async fn get_identifier(&self) -> Result> { + let client = self.get_client(); + Self::get_identifier_from_client(&client).await + } + async fn get_identifier_from_client(client: &'a BigqueryClient) -> Result> { + let dataset_id = client.get_dataset_id(); + let table_name = Self::get_table_name(); + let table_identifier = format!("{}.{}", dataset_id, table_name); + Ok(table_identifier) + } + + fn get_base_where() -> String { + let pk_name = Self::get_pk_name(); + Self::get_where_part(&pk_name, false) + } + + default fn get_query_param(field_name: &str, field_value: &Option) -> google_bigquery2::api::QueryParameter + { + let type_to_string: String = TField::to_bigquery_type(); + let value: Option = match field_value { + Some(value) => Some(google_bigquery2::api::QueryParameterValue { + value: Some(value.to_bigquery_param_value()),//TODO: maybe add a way to use array types + ..Default::default() + }), + None => None, + }; + + google_bigquery2::api::QueryParameter { + name: Some(format!("__{}", field_name.clone())), + parameter_type: Some(google_bigquery2::api::QueryParameterType { + type_: Some(type_to_string), + ..Default::default() + }), + parameter_value: value, + ..Default::default() + } + } + fn parse_value_to_parameter(value: &TValue) -> String + where TValue: std::fmt::Display + BigDataValueType + { + return value.to_bigquery_param_value(); + } + + // + // fn create_from_table_row(client: &'a BigqueryClient, + // row: &google_bigquery2::api::TableRow, + // index_to_name_mapping: &HashMap) + // -> Result> + // where + // Self: Sized + // { + // let pk_index = *index_to_name_mapping.get(&Self::get_pk_name()).unwrap(); + // let pk = row + // .f.as_ref() + // .unwrap()[pk_index] + // .v.as_ref() + // .unwrap() + // .parse::() + // .unwrap(); + // let mut res = Self::create_with_pk(client, pk); + // res.write_from_table_row(row, index_to_name_mapping)?; + // Ok(res) + // } +} diff --git a/src/data/mod.rs b/src/data/mod.rs new file mode 100644 index 0000000..0fa1469 --- /dev/null +++ b/src/data/mod.rs @@ -0,0 +1,202 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fmt::{Debug, Display}; +use std::str::FromStr; + +use google_bigquery2::api::{QueryParameter, TableSchema}; + +pub use big_data_table_base::BigDataTableBase; +pub use big_data_table_base_convenience::BigDataTableBaseConvenience; + +use crate::client::{BigqueryClient, HasBigQueryClient}; +use crate::utils::BigDataValueType; + +mod big_data_table_base_convenience; +mod big_data_table_base; + +// pub trait BigDataTable<'a, TABLE, TPK: BigDataValueType + FromStr + Debug>: HasBigQueryClient<'a> + BigDataTableBaseConvenience<'a, TABLE, TPK> + BigDataTableBase<'a, TABLE, TPK> { +pub trait BigDataTable<'a, TABLE, TPK> +: HasBigQueryClient<'a> ++ BigDataTableBaseConvenience<'a, TABLE, TPK> ++ BigDataTableBase<'a, TABLE, TPK> + where TPK: BigDataValueType + FromStr + Debug { + async fn from_pk( + client: &'a BigqueryClient, + pk: TPK, + ) -> Result> + where + Self: Sized; + async fn save_to_bigquery(&self) -> Result<(), Box>; + async fn load_from_bigquery(&mut self) -> Result<(), Box>; + async fn load_by_field(client: &'a BigqueryClient, field_name: &str, field_value: Option, max_amount: usize) + -> Result, Box>; + + async fn load_by_custom_query(client: &'a BigqueryClient, query: &str, parameters: Vec, max_amount: usize) + -> Result, Box>; +} + +impl<'a, TABLE, TPK> BigDataTable<'a, TABLE, TPK> for TABLE +where + TABLE: HasBigQueryClient<'a> + BigDataTableBaseConvenience<'a, TABLE, TPK>, + TPK: BigDataValueType + FromStr + Debug, + ::Err: Debug +{ + async fn from_pk(client: &'a BigqueryClient, pk: TPK) -> Result> where Self: Sized { + let mut res = Self::create_with_pk(client, pk); + res.load_from_bigquery().await?; + Ok(res) + } + + async fn save_to_bigquery(&self) -> Result<(), Box> { + let project_id = self.get_client().get_project_id(); + + let table_identifier = self.get_identifier().await?; + let w = Self::get_base_where(); + // region check for existing data + let exists_row: bool; + let existing_count = format!("select count(*) from {} where {} limit 1", table_identifier, w); + + let req = google_bigquery2::api::QueryRequest { + query: Some(existing_count), + query_parameters: Some(vec![self.get_pk_param()]), + use_legacy_sql: Some(false), + ..Default::default() + }; + + + let (res, query_res) = self.get_client().get_client().jobs().query(req, project_id) + .doit().await?; + + if res.status() != 200 { + return Err(format!("Wrong status code returned! ({})", res.status()).into()); + } + + if let None = &query_res.rows { + return Err("No rows returned!".into()); + } + + let rows = query_res.rows.unwrap(); + + if rows.len() != 1 { + return Err(format!("Wrong amount of data returned! ({})", rows.len()).into()); + } + + let row = &rows[0]; + let amount: i32 = row.f.as_ref().unwrap()[0].clone().v.unwrap().parse().unwrap(); + + if amount == 0 { + exists_row = false; + } else if amount == 1 { + exists_row = true; + } else { + panic!("Too many rows with key!"); + } + + // endregion + + + // region update or insert + + let query = match exists_row { + true => format!("update {} set {} where {}", table_identifier, self.get_query_fields_update_str(), w), + false => format!("insert into {} ({}, {}) values(@__{}, {})", table_identifier, + Self::get_pk_name(), + Self::get_query_fields_str(), + Self::get_pk_name(), + Self::get_query_fields_insert_str()), + }; + + let mut query_parameters = self.get_all_query_parameters(); + // query_parameters.push(self.get_pk_param()); // todo: check if this is needed + let req = google_bigquery2::api::QueryRequest { + query: Some(query), + query_parameters: Some(query_parameters), + use_legacy_sql: Some(false), + ..Default::default() + }; + + let (res, _) = self.get_client().get_client().jobs().query(req, project_id) + .doit().await?; + + if res.status() != 200 { + return Err(format!("Wrong status code returned! ({})", res.status()).into()); + } + + //endregion + + Ok(()) + } + + async fn load_from_bigquery(&mut self) -> Result<(), Box> { + let project_id = self.get_client().get_project_id(); + let table_identifier = self.get_identifier().await?; + let where_clause = Self::get_base_where(); + + let query = format!("select {} from {} where {} limit 1", Self::get_query_fields_str(), table_identifier, where_clause); + let query_res = self.run_get_query(&query, project_id).await?; + + if let None = &query_res.rows { + return Err("No rows returned!".into()); + } + + let rows = query_res.rows.unwrap(); + + if rows.len() != 1 { + return Err(format!("Wrong amount of data returned! ({})", rows.len()).into()); + } + let mut index_to_name_mapping: HashMap = get_name_index_mapping(query_res.schema); + + let row = &rows[0]; + self.write_from_table_row(row, &index_to_name_mapping) + } + + async fn load_by_field(client: &'a BigqueryClient, field_name: &str, field_value: Option, max_amount: usize) + -> Result, Box> { + let field_name: String = field_name.into(); + let field_name = Self::get_field_name(&field_name).expect(format!("Field '{}' not found!", field_name).as_str()); + let where_clause = Self::get_where_part(&field_name, field_value.is_none()); + // let where_clause = format!(" {} = @__{}", field_name, field_name); + let table_identifier = Self::get_identifier_from_client(client).await?; + let query = format!("select {} from {} where {} limit {}", Self::get_query_fields_str(), table_identifier, where_clause, max_amount); + + let mut params = vec![]; + if !(field_value.is_none()) { + params.push(Self::get_query_param(&field_name, &field_value)); + } + Self::load_by_custom_query(client, &query, params, max_amount).await + } + + async fn load_by_custom_query(client: &'a BigqueryClient, query: &str, parameters: Vec, max_amount: usize) + -> Result, Box> { + + let project_id = client.get_project_id(); + let query_res: google_bigquery2::api::QueryResponse = Self::run_get_query_with_params_on_client(client, &query, parameters, project_id).await?; + + if let None = &query_res.rows { + return Ok(vec![]); + } + + let rows: Vec = query_res.rows.unwrap(); + let mut result: Vec = vec![]; + + let mut index_to_name_mapping: HashMap = get_name_index_mapping(query_res.schema); + + for row in rows.iter() { + for cell in row.f.iter() { + //create a new object and write the values to each field + let obj = Self::create_from_table_row(client, row, &index_to_name_mapping)?; + result.push(obj); + } + } + + Ok(result) + } +} + +fn get_name_index_mapping(schema: Option) -> HashMap { + let mut index_to_name_mapping: HashMap = HashMap::new(); + for (i, x) in schema.unwrap().fields.unwrap().iter().enumerate() { + index_to_name_mapping.insert(x.name.clone().unwrap(), i); + } + index_to_name_mapping +} diff --git a/src/googlebigquery.rs b/src/googlebigquery.rs new file mode 100644 index 0000000..eb05b41 --- /dev/null +++ b/src/googlebigquery.rs @@ -0,0 +1,33 @@ +use std::error::Error; + +use google_bigquery2::{hyper, hyper_rustls, oauth2, Bigquery}; +// use google_bigquery2::api::QueryRequest; +use google_bigquery2::hyper::client::HttpConnector; +use google_bigquery2::hyper_rustls::HttpsConnector; + +pub async fn get_client>( + service_account_path: Option, +) -> Result>, Box> { + let hyper_client = hyper::Client::builder().build( + hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(), + ); + let service_account_path = match service_account_path { + None => "auth/service_account2.json".to_string(), + Some(s) => s.into(), + }; + let secret = oauth2::read_service_account_key(service_account_path) + .await + .unwrap(); + let auth = oauth2::ServiceAccountAuthenticator::builder(secret) + .build() + .await + .unwrap(); + let client: Bigquery> = Bigquery::new(hyper_client, auth); + + Ok(client) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..4049d36 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,17 @@ +#![feature(async_fn_in_trait)] +#![feature(specialization)] +#![allow(unused)] +#![allow(incomplete_features)] +// #![feature(impl_trait_projections)] +pub mod client; +mod googlebigquery; +mod data; +mod utils; + +pub use google_bigquery_derive; +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests; diff --git a/src/tests.rs b/src/tests.rs new file mode 100644 index 0000000..be7fa98 --- /dev/null +++ b/src/tests.rs @@ -0,0 +1,248 @@ +use std::cmp::Ordering; +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Display; + +use google_bigquery2::api::{QueryParameter, QueryParameterType, QueryResponse, TableRow}; +use google_bigquery_derive::BigDataTable; +use google_bigquery_derive::HasBigQueryClient; + +use crate::client::{BigqueryClient, HasBigQueryClient}; +use crate::data::{BigDataTable, BigDataTableBase, BigDataTableBaseConvenience}; +use crate::utils::{BigDataValueType, ConvertTypeToBigQueryType, ConvertValueToBigqueryParamValue}; + +use super::*; + +// #[test] +// fn it_works() { +// let result = add(2, 2); +// assert_eq!(result, 4); +// } + +#[tokio::test] +async fn load_by_field() { + let client = get_test_client().await; + + let q = Infos::load_by_field(&client, stringify!(info1), Some("a"), 10).await.unwrap(); + assert_eq!(q.len(), 1); + + let i1 = &q[0]; + assert_eq!(i1.row_id, 3); + assert_eq!(i1.info3, Some("c".to_string())); + + let mut q = Infos::load_by_field(&client, stringify!(yes), Some(true), 10).await.unwrap(); + // q.sort_by(|a, b| a.row_id.cmp(&b.row_id)); + assert_eq!(q.len(), 3); + + let i2 = &q[0]; + assert_eq!(i2.row_id, 1); + assert_eq!(i2.info3, Some("x3".to_string())); + + let i3 = &q[1]; + assert_eq!(i3.row_id, 19); + assert_eq!(i3.info3, Some("cc".to_string())); + + let i4 = &q[2]; + assert_eq!(i4.row_id, 123123); + assert_eq!(i4.info3, Some("cc".to_string())); + + + let q = Infos::load_by_field(&client, stringify!(info1), Some("aosdinsofnpsngusn"), 10).await.unwrap(); + assert_eq!(q.len(), 0); +} + +#[tokio::test] +async fn load_by_field_none_param() { + let client = get_test_client().await; + let q = Infos::load_by_field::(&client, stringify!(yes), None, 10).await.unwrap(); + assert_eq!(q.len(), 1); +} + +#[tokio::test] +async fn from_pk() { + let client = get_test_client().await; + let i1 = Infos::from_pk(&client, 3).await.unwrap(); + assert_eq!(i1.row_id, 3); + assert_eq!(i1.info1, Some("a".to_string())); + assert_eq!(i1.info3, Some("c".to_string())); + assert_eq!(i1.int_info4, None); + assert_eq!(i1.info2, None); + assert_eq!(i1.yes, None); +} + +async fn get_test_client() -> BigqueryClient { + let client = BigqueryClient::new("testrustproject-372221", "test1", None).await.unwrap(); + client +} + +#[derive(Debug)] +#[cfg_attr(not(man_impl_has_client="false"), derive(HasBigQueryClient))] +#[cfg_attr(not(man_impl="true"), derive(BigDataTable))] +pub struct Infos<'a> { + #[cfg_attr(not(man_impl="true"), primary_key)] + #[cfg_attr(not(man_impl="true"), required)] + #[cfg_attr(not(man_impl="true"), db_name("Id"))] + row_id: i64, + #[cfg_attr(any(not(man_impl="true"), not(man_impl_has_client="false")), client)] + client: &'a BigqueryClient, + info1: Option, + // #[cfg_attr(not(man_impl="true"), db_name("info"))] + info2: Option, + info3: Option, + // #[cfg_attr(not(man_impl="true"), db_name("info4i"))] + int_info4: Option, + yes: Option, +} + + +// #[cfg(any(man_impl="true", not(man_impl_has_client="false")))] +// impl<'a> HasBigQueryClient<'a> for Infos<'a> { +// fn get_client(&self) -> &'a BigqueryClient { +// self.client +// } +// } + +#[cfg(man_impl="true")] +impl<'a> BigDataTableBase<'a, Infos<'a>, i64> for Infos<'a> { + fn get_pk_name() -> String { + Self::get_field_name(stringify!(row_id)).unwrap() + } + + fn get_field_name(field_name: &str) -> Result> { + match field_name { + "row_id" => Ok("Id".to_string()), + "info1" => Ok("info1".to_string()), + "info2" => Ok("info".to_string()), + "info3" => Ok("info3".to_string()), + "int_info4" => Ok("info4i".to_string()), + "yes" => Ok("yes".to_string()), + _ => Err("Field not found".into()), + } + } + + fn get_query_fields() -> HashMap { + let mut fields = HashMap::new(); + fields.insert(stringify!(info1).to_string(), Self::get_field_name(&stringify!(info1).to_string()).unwrap()); + fields.insert(stringify!(info2).to_string(), Self::get_field_name(&stringify!(info2).to_string()).unwrap()); + fields.insert(stringify!(info3).to_string(), Self::get_field_name(&stringify!(info3).to_string()).unwrap()); + fields.insert(stringify!(int_info4).to_string(), Self::get_field_name(&stringify!(int_info4).to_string()).unwrap()); + fields.insert(stringify!(yes).to_string(), Self::get_field_name(&stringify!(yes).to_string()).unwrap()); + + //TODO: decide if the primary key should be included in the query fields + fields.insert(stringify!(row_id).to_string(), Self::get_field_name(&stringify!(row_id).to_string()).unwrap()); + + fields + } + + fn get_table_name() -> String { + stringify!(Infos).to_string() + } + + fn create_with_pk(client: &'a BigqueryClient, pk: i64) -> Self { + let mut res = Self { + row_id: pk, + client, + info1: None, + info2: None, + info3: None, + int_info4: None, + yes: None, + }; + res + } + + + fn write_from_table_row(&mut self, + row: &google_bigquery2::api::TableRow, + index_to_name_mapping: &HashMap) + -> Result<(), Box> { + let cell = row.f.as_ref().unwrap(); + + let info1 = *index_to_name_mapping.get(Self::get_field_name(stringify!(info1))?.as_str()).unwrap(); + self.info1 = match cell[info1].v.as_ref() { + Some(v) => Some(v.parse()?), + None => None + }; + + let info2 = *index_to_name_mapping.get(Self::get_field_name(stringify!(info2))?.as_str()).unwrap(); + self.info2 = match cell[info2].v.as_ref() { + Some(v) => Some(v.parse()?), + None => None + }; + + let info3 = *index_to_name_mapping.get(Self::get_field_name(stringify!(info3))?.as_str()).unwrap(); + self.info3 = match cell[info3].v.as_ref() { + Some(v) => Some(v.parse()?), + None => None + }; + + let int_info4 = *index_to_name_mapping.get(Self::get_field_name(stringify!(int_info4))?.as_str()).unwrap(); + self.int_info4 = match cell[int_info4].v.as_ref() { + Some(v) => Some(v.parse()?), + None => None + }; + + let yes = *index_to_name_mapping.get(Self::get_field_name(stringify!(yes))?.as_str()).unwrap(); + self.yes = match cell[yes].v.as_ref() { + Some(v) => Some(v.parse()?), + None => None + }; + + Ok(()) + } + + fn get_pk_value(&self) -> i64 { + self.row_id + } + + + fn get_query_fields_update_str(&self) -> String { + let mut fields = String::new(); + let info1 = Self::get_field_name(stringify!(info1)).unwrap(); + fields.push_str(&format!("{} = @__{}, ", info1, info1)); + let info2 = Self::get_field_name(stringify!(info2)).unwrap(); + fields.push_str(&format!("{} = @__{}, ", info2, info2)); + let info3 = Self::get_field_name(stringify!(info3)).unwrap(); + fields.push_str(&format!("{} = @__{}, ", info3, info3)); + let int_info4 = Self::get_field_name(stringify!(int_info4)).unwrap(); + fields.push_str(&format!("{} = @__{}, ", int_info4, int_info4)); + let yes = Self::get_field_name(stringify!(yes)).unwrap(); + fields.push_str(&format!("{} = @__{}", yes, yes)); + fields + } + + fn get_all_query_parameters(&self) -> Vec { + let mut parameters = Vec::new(); + + parameters.push(Self::get_query_param(&Self::get_field_name(stringify!(info1)).unwrap(), &self.info1)); + parameters.push(Self::get_query_param(&Self::get_field_name(stringify!(info2)).unwrap(), &self.info2)); + parameters.push(Self::get_query_param(&Self::get_field_name(stringify!(info3)).unwrap(), &self.info3)); + parameters.push(Self::get_query_param(&Self::get_field_name(stringify!(int_info4)).unwrap(), &self.int_info4)); + parameters.push(Self::get_query_param(&Self::get_field_name(stringify!(yes)).unwrap(), &self.yes)); + //TODO: decide if the primary key should be included in this list + parameters.push(Self::get_query_param(&Self::get_field_name(stringify!(row_id)).unwrap(), &Some(self.row_id))); + parameters + } + + + fn create_from_table_row(client: &'a BigqueryClient, + row: &google_bigquery2::api::TableRow, + index_to_name_mapping: &HashMap) + -> Result> + where + Self: Sized + { + let pk_index = *index_to_name_mapping.get(&Self::get_pk_name()).unwrap(); + let pk = row + .f.as_ref() + .unwrap()[pk_index] + .v.as_ref() + .unwrap() + .parse::() + .unwrap(); + let mut res = Self::create_with_pk(client, pk); + res.write_from_table_row(row, index_to_name_mapping)?; + Ok(res) + } +} + diff --git a/src/utils/convert_type_to_big_query_type.rs b/src/utils/convert_type_to_big_query_type.rs new file mode 100644 index 0000000..01652b9 --- /dev/null +++ b/src/utils/convert_type_to_big_query_type.rs @@ -0,0 +1,35 @@ +use std::fmt::Display; + +pub trait ConvertTypeToBigQueryType { + fn to_bigquery_type() -> String; +} + +impl ConvertTypeToBigQueryType for bool { + fn to_bigquery_type() -> String { + "BOOL".to_string() + } +} + +impl ConvertTypeToBigQueryType for i32 { + fn to_bigquery_type() -> String { + "INT64".to_string() + } +} + +impl ConvertTypeToBigQueryType for i64 { + fn to_bigquery_type() -> String { + "INT64".to_string() + } +} + +impl ConvertTypeToBigQueryType for String { + fn to_bigquery_type() -> String { + "STRING".to_string() + } +} + +impl ConvertTypeToBigQueryType for &str { + fn to_bigquery_type() -> String { + "STRING".to_string() + } +} diff --git a/src/utils/convert_value_to_bigquery_param_value.rs b/src/utils/convert_value_to_bigquery_param_value.rs new file mode 100644 index 0000000..b9a66e0 --- /dev/null +++ b/src/utils/convert_value_to_bigquery_param_value.rs @@ -0,0 +1,21 @@ +use std::fmt::Display; + +pub trait ConvertValueToBigqueryParamValue { + fn to_bigquery_param_value(&self) -> String; +} + +impl ConvertValueToBigqueryParamValue for bool { + fn to_bigquery_param_value(&self) -> String { + match self.to_string().as_str() { + "true" => "TRUE".to_string(), + "false" => "FALSE".to_string(), + _ => panic!("Invalid value for bool"), + } + } +} + +impl ConvertValueToBigqueryParamValue for R { + default fn to_bigquery_param_value(&self) -> String { + format!("{}", self) + } +} \ No newline at end of file diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..6969d17 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1,8 @@ +pub use convert_type_to_big_query_type::ConvertTypeToBigQueryType; +pub use convert_value_to_bigquery_param_value::ConvertValueToBigqueryParamValue; + +mod convert_type_to_big_query_type; +mod convert_value_to_bigquery_param_value; + +pub trait BigDataValueType: ConvertTypeToBigQueryType + ConvertValueToBigqueryParamValue {} +impl BigDataValueType for T {} \ No newline at end of file