mirror of
https://github.com/OMGeeky/google_bigquery.git
synced 2026-01-06 19:29:39 +01:00
the derive is working with the tests so far
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use std::error::Error;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
|
||||
use google_bigquery2::Bigquery;
|
||||
use google_bigquery2::{Bigquery, hyper, hyper_rustls, oauth2};
|
||||
use google_bigquery2::hyper::client::HttpConnector;
|
||||
use google_bigquery2::hyper_rustls::HttpsConnector;
|
||||
|
||||
@@ -13,6 +13,28 @@ pub struct BigqueryClient {
|
||||
dataset_id: String,
|
||||
}
|
||||
|
||||
impl BigqueryClient {
|
||||
pub(crate) fn empty() -> &'static BigqueryClient {
|
||||
todo!("Implement BigqueryClient::empty() or throw an error if it's not possible or something.");
|
||||
// let hyper_client = hyper::Client::builder().build(
|
||||
// hyper_rustls::HttpsConnectorBuilder::new()
|
||||
// .with_native_roots()
|
||||
// .https_or_http()
|
||||
// .enable_http1()
|
||||
// .enable_http2()
|
||||
// .build(),
|
||||
// );
|
||||
//
|
||||
// let auth = oauth2::ServiceAccountAuthenticator::with_client();
|
||||
// let client = Bigquery::new(hyper_client,auth);
|
||||
// Self {
|
||||
// dataset_id: Default::default(),
|
||||
// project_id: Default::default(),
|
||||
// client,
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
impl BigqueryClient {
|
||||
pub async fn new<S: Into<String>>(
|
||||
project_id: S,
|
||||
|
||||
@@ -17,7 +17,7 @@ pub trait BigDataTableBase<'a, TABLE, TPK>: HasBigQueryClient<'a>
|
||||
index_to_name_mapping: &HashMap<String, usize>)
|
||||
-> Result<(), Box<dyn Error>>;
|
||||
fn get_pk_value(&self) -> TPK;
|
||||
fn get_query_fields_update_str(&self) -> String;
|
||||
// fn get_query_fields_update_str(&self) -> String;
|
||||
fn get_all_query_parameters(&self) -> Vec<google_bigquery2::api::QueryParameter>;
|
||||
|
||||
fn create_from_table_row(client: &'a BigqueryClient,
|
||||
|
||||
@@ -3,7 +3,8 @@ use std::error::Error;
|
||||
use std::fmt::Debug;
|
||||
use std::str::FromStr;
|
||||
|
||||
use google_bigquery2::api::{QueryParameter, QueryParameterType, QueryParameterValue};
|
||||
use google_bigquery2::api::{QueryParameter, QueryParameterType, QueryParameterValue, QueryRequest};
|
||||
use google_bigquery2::hyper::{Body, Response};
|
||||
|
||||
use crate::client::BigqueryClient;
|
||||
use crate::data::BigDataTableBase;
|
||||
@@ -15,9 +16,19 @@ pub trait BigDataTableBaseConvenience<'a, TABLE, TPK>
|
||||
fn get_pk_param(&self) -> google_bigquery2::api::QueryParameter;
|
||||
fn get_query_fields_str() -> String;
|
||||
fn get_query_fields_insert_str() -> String;
|
||||
|
||||
fn get_query_fields_update_str(&self) -> String;
|
||||
fn get_where_part(field_name: &str, is_comparing_to_null: bool) -> String;
|
||||
//region run query
|
||||
async fn run_query(&self, req: QueryRequest, project_id: &str)
|
||||
-> Result<(Response<Body>, google_bigquery2::api::QueryResponse), Box<dyn Error>>;
|
||||
|
||||
async fn run_query_on_client(client: &'a BigqueryClient,
|
||||
req: QueryRequest,
|
||||
project_id: &str)
|
||||
-> Result<(Response<Body>, google_bigquery2::api::QueryResponse), Box<dyn Error>>;
|
||||
//endregion run query
|
||||
|
||||
//region run get query
|
||||
async fn run_get_query(&self, query: &str, project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>>;
|
||||
|
||||
@@ -32,6 +43,8 @@ pub trait BigDataTableBaseConvenience<'a, TABLE, TPK>
|
||||
parameters: Vec<google_bigquery2::api::QueryParameter>,
|
||||
project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>>;
|
||||
//endregion
|
||||
|
||||
|
||||
// async fn get_identifier_and_base_where(&self) -> Result<(String, String), Box<dyn Error>>;
|
||||
async fn get_identifier(&self) -> Result<String, Box<dyn Error>>;
|
||||
@@ -91,6 +104,17 @@ impl<'a, TABLE, TPK> BigDataTableBaseConvenience<'a, TABLE, TPK> for TABLE
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
fn get_query_fields_update_str(&self) -> String {
|
||||
let x = Self::get_query_fields();
|
||||
let pk_name = Self::get_pk_name();
|
||||
let mut vec = x.values()
|
||||
.filter(|k| *k != &pk_name)
|
||||
.map(|k| format!("{} = @__{}", k, k))
|
||||
.collect::<Vec<String>>();
|
||||
// vec.sort();
|
||||
let update_str = vec.join(", ");
|
||||
update_str
|
||||
}
|
||||
fn get_where_part(field_name: &str, is_comparing_to_null: bool) -> String {
|
||||
if is_comparing_to_null {
|
||||
format!("{} IS NULL", field_name)
|
||||
@@ -99,6 +123,42 @@ impl<'a, TABLE, TPK> BigDataTableBaseConvenience<'a, TABLE, TPK> for TABLE
|
||||
}
|
||||
}
|
||||
|
||||
//region run query
|
||||
|
||||
async fn run_query(&self, req: QueryRequest, project_id: &str)
|
||||
-> Result<(Response<Body>, google_bigquery2::api::QueryResponse), Box<dyn Error>> {
|
||||
Self::run_query_on_client(self.get_client(), req, project_id).await
|
||||
}
|
||||
|
||||
async fn run_query_on_client(client: &'a BigqueryClient,
|
||||
req: QueryRequest,
|
||||
project_id: &str)
|
||||
-> Result<(Response<Body>, google_bigquery2::api::QueryResponse), Box<dyn Error>> {
|
||||
#[cfg(debug_assertions="true")]
|
||||
{
|
||||
println!("Query: {}", &req.query.as_ref().unwrap());//There has to be a query, this would not make any sense otherwise
|
||||
if let Some(parameters) = &req.query_parameters {
|
||||
println!("Parameters: {}", parameters.len());
|
||||
for (i, param) in parameters.iter().enumerate() {
|
||||
println!("{:2}: {:?}", i, param);
|
||||
}
|
||||
} else {
|
||||
println!("Parameters: None");
|
||||
}
|
||||
println!();
|
||||
}
|
||||
|
||||
|
||||
let (res, query_res) = client.get_client().jobs().query(req, project_id)
|
||||
.doit().await?;
|
||||
|
||||
if res.status() != 200 {
|
||||
return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
}
|
||||
|
||||
Ok((res, query_res))
|
||||
}
|
||||
//endregion run query
|
||||
|
||||
async fn run_get_query(&self, query: &str, project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>> {
|
||||
@@ -121,24 +181,19 @@ impl<'a, TABLE, TPK> BigDataTableBaseConvenience<'a, TABLE, TPK> for TABLE
|
||||
parameters: Vec<google_bigquery2::api::QueryParameter>,
|
||||
project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>> {
|
||||
println!("Query: {}", query);
|
||||
println!("Parameters: {}", parameters.len());
|
||||
for (i, param) in parameters.iter().enumerate() {
|
||||
println!("{:2}: {:?}", i, param);
|
||||
}
|
||||
println!();
|
||||
let req = google_bigquery2::api::QueryRequest {
|
||||
query: Some(query.to_string()),
|
||||
query_parameters: Some(parameters),
|
||||
use_legacy_sql: Some(false),
|
||||
..Default::default()
|
||||
};
|
||||
let (res, query_res) = client.get_client().jobs().query(req, project_id)
|
||||
.doit().await?;
|
||||
|
||||
if res.status() != 200 {
|
||||
return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
}
|
||||
let (_, query_res) = Self::run_query_on_client(client, req, project_id).await?;
|
||||
// let (res, query_res) = client.get_client().jobs().query(req, project_id)
|
||||
// .doit().await?;
|
||||
//
|
||||
// if res.status() != 200 {
|
||||
// return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
// }
|
||||
Ok(query_res)
|
||||
}
|
||||
// async fn get_identifier_and_base_where(&self)
|
||||
@@ -166,13 +221,13 @@ impl<'a, TABLE, TPK> BigDataTableBaseConvenience<'a, TABLE, TPK> for TABLE
|
||||
default fn get_query_param<TField: BigDataValueType>(field_name: &str, field_value: &Option<TField>) -> google_bigquery2::api::QueryParameter
|
||||
{
|
||||
let type_to_string: String = TField::to_bigquery_type();
|
||||
let value: Option<google_bigquery2::api::QueryParameterValue> = match field_value {
|
||||
Some(value) => Some(google_bigquery2::api::QueryParameterValue {
|
||||
value: Some(value.to_bigquery_param_value()),//TODO: maybe add a way to use array types
|
||||
..Default::default()
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
let value: Option<google_bigquery2::api::QueryParameterValue> = Some(google_bigquery2::api::QueryParameterValue {
|
||||
value: match field_value {
|
||||
Some(value) =>Some(value.to_bigquery_param_value()),//TODO: maybe add a way to use array types
|
||||
None => None,
|
||||
},
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
google_bigquery2::api::QueryParameter {
|
||||
name: Some(format!("__{}", field_name.clone())),
|
||||
|
||||
@@ -19,6 +19,7 @@ pub trait BigDataTable<'a, TABLE, TPK>
|
||||
: HasBigQueryClient<'a>
|
||||
+ BigDataTableBaseConvenience<'a, TABLE, TPK>
|
||||
+ BigDataTableBase<'a, TABLE, TPK>
|
||||
+ Default
|
||||
where TPK: BigDataValueType + FromStr + Debug {
|
||||
async fn from_pk(
|
||||
client: &'a BigqueryClient,
|
||||
@@ -37,7 +38,7 @@ pub trait BigDataTable<'a, TABLE, TPK>
|
||||
|
||||
impl<'a, TABLE, TPK> BigDataTable<'a, TABLE, TPK> for TABLE
|
||||
where
|
||||
TABLE: HasBigQueryClient<'a> + BigDataTableBaseConvenience<'a, TABLE, TPK>,
|
||||
TABLE: HasBigQueryClient<'a> + BigDataTableBaseConvenience<'a, TABLE, TPK> + Default,
|
||||
TPK: BigDataValueType + FromStr + Debug,
|
||||
<TPK as FromStr>::Err: Debug
|
||||
{
|
||||
@@ -51,10 +52,10 @@ where
|
||||
let project_id = self.get_client().get_project_id();
|
||||
|
||||
let table_identifier = self.get_identifier().await?;
|
||||
let w = Self::get_base_where();
|
||||
let where_clause = Self::get_base_where();
|
||||
// region check for existing data
|
||||
let exists_row: bool;
|
||||
let existing_count = format!("select count(*) from {} where {} limit 1", table_identifier, w);
|
||||
let existing_count = format!("select count(*) from {} where {} limit 1", table_identifier, where_clause);
|
||||
|
||||
let req = google_bigquery2::api::QueryRequest {
|
||||
query: Some(existing_count),
|
||||
@@ -63,13 +64,13 @@ where
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
let (res, query_res) = self.get_client().get_client().jobs().query(req, project_id)
|
||||
.doit().await?;
|
||||
|
||||
if res.status() != 200 {
|
||||
return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
}
|
||||
let (_, query_res) = self.run_query(req, project_id).await?;
|
||||
// let (res, query_res) = self.get_client().get_client().jobs().query(req, project_id)
|
||||
// .doit().await?;
|
||||
//
|
||||
// if res.status() != 200 {
|
||||
// return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
// }
|
||||
|
||||
if let None = &query_res.rows {
|
||||
return Err("No rows returned!".into());
|
||||
@@ -98,7 +99,7 @@ where
|
||||
// region update or insert
|
||||
|
||||
let query = match exists_row {
|
||||
true => format!("update {} set {} where {}", table_identifier, self.get_query_fields_update_str(), w),
|
||||
true => format!("update {} set {} where {}", table_identifier, self.get_query_fields_update_str(), where_clause),
|
||||
false => format!("insert into {} ({}, {}) values(@__{}, {})", table_identifier,
|
||||
Self::get_pk_name(),
|
||||
Self::get_query_fields_str(),
|
||||
@@ -115,12 +116,14 @@ where
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (res, _) = self.get_client().get_client().jobs().query(req, project_id)
|
||||
.doit().await?;
|
||||
|
||||
if res.status() != 200 {
|
||||
return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
}
|
||||
let (_, _) = self.run_query(req, project_id).await?;
|
||||
// let (res, _) = self.get_client().get_client().jobs().query(req, project_id)
|
||||
// .doit().await?;
|
||||
//
|
||||
// if res.status() != 200 {
|
||||
// return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
// }
|
||||
|
||||
//endregion
|
||||
|
||||
@@ -145,6 +148,7 @@ where
|
||||
return Err(format!("Wrong amount of data returned! ({})", rows.len()).into());
|
||||
}
|
||||
let mut index_to_name_mapping: HashMap<String, usize> = get_name_index_mapping(query_res.schema);
|
||||
println!("index_to_name_mapping: {:?}", index_to_name_mapping);
|
||||
|
||||
let row = &rows[0];
|
||||
self.write_from_table_row(row, &index_to_name_mapping)
|
||||
|
||||
137
src/tests.rs
137
src/tests.rs
@@ -19,6 +19,37 @@ use super::*;
|
||||
// assert_eq!(result, 4);
|
||||
// }
|
||||
|
||||
#[tokio::test]
|
||||
async fn save() {
|
||||
let client = get_test_client().await;
|
||||
|
||||
let mut q = Infos::load_by_field(&client, stringify!(info1), Some("a"), 10).await.unwrap();
|
||||
assert_eq!(q.len(), 1);
|
||||
|
||||
let mut i1 = &mut q[0];
|
||||
assert_eq!(i1.row_id, 3);
|
||||
assert_eq!(i1.info3, Some("c".to_string()));
|
||||
assert_eq!(i1.info2, None);
|
||||
|
||||
i1.info2 = Some("b".to_string());
|
||||
i1.save_to_bigquery().await.unwrap();
|
||||
|
||||
assert_eq!(i1.info2, Some("b".to_string()));
|
||||
i1.info2 = Some("c".to_string());
|
||||
assert_eq!(i1.info2, Some("c".to_string()));
|
||||
|
||||
i1.load_from_bigquery().await.unwrap();
|
||||
assert_eq!(i1.info2, Some("b".to_string()));
|
||||
|
||||
i1.info2 = None;
|
||||
i1.save_to_bigquery().await.unwrap();
|
||||
i1.load_from_bigquery().await.unwrap();
|
||||
|
||||
assert_eq!(i1.row_id, 3);
|
||||
assert_eq!(i1.info3, Some("c".to_string()));
|
||||
assert_eq!(i1.info2, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn load_by_field() {
|
||||
let client = get_test_client().await;
|
||||
@@ -31,7 +62,7 @@ async fn load_by_field() {
|
||||
assert_eq!(i1.info3, Some("c".to_string()));
|
||||
|
||||
let mut q = Infos::load_by_field(&client, stringify!(yes), Some(true), 10).await.unwrap();
|
||||
// q.sort_by(|a, b| a.row_id.cmp(&b.row_id));
|
||||
q.sort_by(|a, b| a.row_id.cmp(&b.row_id));
|
||||
assert_eq!(q.len(), 3);
|
||||
|
||||
let i2 = &q[0];
|
||||
@@ -76,33 +107,49 @@ async fn get_test_client() -> BigqueryClient {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(not(man_impl_has_client="false"), derive(HasBigQueryClient))]
|
||||
#[cfg_attr(not(man_impl="true"), derive(BigDataTable))]
|
||||
#[cfg_attr(man_impl_has_client = "false", derive(HasBigQueryClient))]
|
||||
#[cfg_attr(not(man_impl = "true"), derive(BigDataTable))]
|
||||
pub struct Infos<'a> {
|
||||
#[cfg_attr(not(man_impl="true"), primary_key)]
|
||||
#[cfg_attr(not(man_impl="true"), required)]
|
||||
#[cfg_attr(not(man_impl="true"), db_name("Id"))]
|
||||
#[cfg_attr(not(man_impl = "true"), primary_key)]
|
||||
#[cfg_attr(not(man_impl = "true"), required)]
|
||||
#[cfg_attr(not(man_impl = "true"), db_name("Id"))]
|
||||
row_id: i64,
|
||||
#[cfg_attr(any(not(man_impl="true"), not(man_impl_has_client="false")), client)]
|
||||
client: &'a BigqueryClient,
|
||||
#[cfg_attr(any(not(man_impl = "true"), man_impl_has_client = "false"), client)]
|
||||
/// This client should never be left as None, doing so will cause a panic when trying to use it
|
||||
client: Option<&'a BigqueryClient>,
|
||||
info1: Option<String>,
|
||||
// #[cfg_attr(not(man_impl="true"), db_name("info"))]
|
||||
#[cfg_attr(not(man_impl="true"), db_name("info"))]
|
||||
info2: Option<String>,
|
||||
info3: Option<String>,
|
||||
// #[cfg_attr(not(man_impl="true"), db_name("info4i"))]
|
||||
#[cfg_attr(not(man_impl="true"), db_name("info4i"))]
|
||||
int_info4: Option<i64>,
|
||||
yes: Option<bool>,
|
||||
}
|
||||
|
||||
|
||||
// #[cfg(any(man_impl="true", not(man_impl_has_client="false")))]
|
||||
// impl<'a> HasBigQueryClient<'a> for Infos<'a> {
|
||||
// fn get_client(&self) -> &'a BigqueryClient {
|
||||
// self.client
|
||||
// }
|
||||
// }
|
||||
#[cfg(not(man_impl_has_client="false"))]
|
||||
impl<'a> HasBigQueryClient<'a> for Infos<'a> {
|
||||
fn get_client(&self) -> &'a BigqueryClient {
|
||||
self.client.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(man_impl="true")]
|
||||
impl<'a> Default for Infos<'a> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
// client: &BigqueryClient::new("none", "none", None).await.unwrap(),
|
||||
client: None,
|
||||
row_id: -9999,
|
||||
info1: Default::default(),
|
||||
info2: Default::default(),
|
||||
info3: Default::default(),
|
||||
int_info4: Default::default(),
|
||||
yes: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(man_impl = "true")]
|
||||
impl<'a> BigDataTableBase<'a, Infos<'a>, i64> for Infos<'a> {
|
||||
fn get_pk_name() -> String {
|
||||
Self::get_field_name(stringify!(row_id)).unwrap()
|
||||
@@ -131,6 +178,7 @@ impl<'a> BigDataTableBase<'a, Infos<'a>, i64> for Infos<'a> {
|
||||
//TODO: decide if the primary key should be included in the query fields
|
||||
fields.insert(stringify!(row_id).to_string(), Self::get_field_name(&stringify!(row_id).to_string()).unwrap());
|
||||
|
||||
println!("get_query_fields: fields: {:?}", fields);
|
||||
fields
|
||||
}
|
||||
|
||||
@@ -139,19 +187,13 @@ impl<'a> BigDataTableBase<'a, Infos<'a>, i64> for Infos<'a> {
|
||||
}
|
||||
|
||||
fn create_with_pk(client: &'a BigqueryClient, pk: i64) -> Self {
|
||||
let mut res = Self {
|
||||
Self {
|
||||
row_id: pk,
|
||||
client,
|
||||
info1: None,
|
||||
info2: None,
|
||||
info3: None,
|
||||
int_info4: None,
|
||||
yes: None,
|
||||
};
|
||||
res
|
||||
client: Some(client),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn write_from_table_row(&mut self,
|
||||
row: &google_bigquery2::api::TableRow,
|
||||
index_to_name_mapping: &HashMap<String, usize>)
|
||||
@@ -195,21 +237,34 @@ impl<'a> BigDataTableBase<'a, Infos<'a>, i64> for Infos<'a> {
|
||||
self.row_id
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
fn get_query_fields_update_str(&self) -> String {
|
||||
let mut fields = String::new();
|
||||
let info1 = Self::get_field_name(stringify!(info1)).unwrap();
|
||||
fields.push_str(&format!("{} = @__{}, ", info1, info1));
|
||||
let info2 = Self::get_field_name(stringify!(info2)).unwrap();
|
||||
fields.push_str(&format!("{} = @__{}, ", info2, info2));
|
||||
let info3 = Self::get_field_name(stringify!(info3)).unwrap();
|
||||
fields.push_str(&format!("{} = @__{}, ", info3, info3));
|
||||
let int_info4 = Self::get_field_name(stringify!(int_info4)).unwrap();
|
||||
fields.push_str(&format!("{} = @__{}, ", int_info4, int_info4));
|
||||
let yes = Self::get_field_name(stringify!(yes)).unwrap();
|
||||
fields.push_str(&format!("{} = @__{}", yes, yes));
|
||||
fields
|
||||
}
|
||||
let x = Self::get_query_fields();
|
||||
let pk_name = Self::get_pk_name();
|
||||
let mut vec = x.values()
|
||||
.filter(|k| *k != &pk_name)
|
||||
.map(|k| format!("{} = @__{}", k, k))
|
||||
.collect::<Vec<String>>();
|
||||
vec.sort();
|
||||
let x = vec
|
||||
.join(", ");
|
||||
|
||||
// let mut fields = String::new();
|
||||
// let info1 = Self::get_field_name(stringify!(info1)).unwrap();
|
||||
// fields.push_str(&format!("{} = @__{}, ", info1, info1));
|
||||
// let info2 = Self::get_field_name(stringify!(info2)).unwrap();
|
||||
// fields.push_str(&format!("{} = @__{}, ", info2, info2));
|
||||
// let info3 = Self::get_field_name(stringify!(info3)).unwrap();
|
||||
// fields.push_str(&format!("{} = @__{}, ", info3, info3));
|
||||
// let int_info4 = Self::get_field_name(stringify!(int_info4)).unwrap();
|
||||
// fields.push_str(&format!("{} = @__{}, ", int_info4, int_info4));
|
||||
// let yes = Self::get_field_name(stringify!(yes)).unwrap();
|
||||
// fields.push_str(&format!("{} = @__{}", yes, yes));
|
||||
// println!("fields: {}", fields);
|
||||
println!("x : {}", x);
|
||||
// fields
|
||||
x
|
||||
}*/
|
||||
|
||||
fn get_all_query_parameters(&self) -> Vec<QueryParameter> {
|
||||
let mut parameters = Vec::new();
|
||||
|
||||
Reference in New Issue
Block a user