mirror of
https://github.com/OMGeeky/google_bigquery.git
synced 2026-01-06 19:29:39 +01:00
pretty good progress
This commit is contained in:
29
src/data/big_data_table_base.rs
Normal file
29
src/data/big_data_table_base.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::client::{BigqueryClient, HasBigQueryClient};
|
||||
use crate::utils::BigDataValueType;
|
||||
|
||||
pub trait BigDataTableBase<'a, TABLE, TPK>: HasBigQueryClient<'a>
|
||||
where TPK: BigDataValueType + FromStr + std::fmt::Debug {
|
||||
fn get_pk_name() -> String;
|
||||
fn get_field_name(field_name: &str) -> Result<String, Box<dyn Error>>;
|
||||
fn get_query_fields() -> HashMap<String, String>;
|
||||
fn get_table_name() -> String;
|
||||
fn create_with_pk(client: &'a BigqueryClient, pk: TPK) -> TABLE;
|
||||
fn write_from_table_row(&mut self,
|
||||
row: &google_bigquery2::api::TableRow,
|
||||
index_to_name_mapping: &HashMap<String, usize>)
|
||||
-> Result<(), Box<dyn Error>>;
|
||||
fn get_pk_value(&self) -> TPK;
|
||||
fn get_query_fields_update_str(&self) -> String;
|
||||
fn get_all_query_parameters(&self) -> Vec<google_bigquery2::api::QueryParameter>;
|
||||
|
||||
fn create_from_table_row(client: &'a BigqueryClient,
|
||||
row: &google_bigquery2::api::TableRow,
|
||||
index_to_name_mapping: &HashMap<String, usize>)
|
||||
-> Result<Self, Box<dyn Error>>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
213
src/data/big_data_table_base_convenience.rs
Normal file
213
src/data/big_data_table_base_convenience.rs
Normal file
@@ -0,0 +1,213 @@
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fmt::Debug;
|
||||
use std::str::FromStr;
|
||||
|
||||
use google_bigquery2::api::{QueryParameter, QueryParameterType, QueryParameterValue};
|
||||
|
||||
use crate::client::BigqueryClient;
|
||||
use crate::data::BigDataTableBase;
|
||||
use crate::utils::BigDataValueType;
|
||||
|
||||
pub trait BigDataTableBaseConvenience<'a, TABLE, TPK>
|
||||
: BigDataTableBase<'a, TABLE, TPK>
|
||||
where TPK: BigDataValueType + FromStr + Debug {
|
||||
fn get_pk_param(&self) -> google_bigquery2::api::QueryParameter;
|
||||
fn get_query_fields_str() -> String;
|
||||
fn get_query_fields_insert_str() -> String;
|
||||
|
||||
fn get_where_part(field_name: &str, is_comparing_to_null: bool) -> String;
|
||||
|
||||
async fn run_get_query(&self, query: &str, project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>>;
|
||||
|
||||
async fn run_get_query_with_params(&self,
|
||||
query: &str,
|
||||
parameters: Vec<google_bigquery2::api::QueryParameter>,
|
||||
project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>>;
|
||||
|
||||
async fn run_get_query_with_params_on_client(client: &'a BigqueryClient,
|
||||
query: &str,
|
||||
parameters: Vec<google_bigquery2::api::QueryParameter>,
|
||||
project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>>;
|
||||
|
||||
// async fn get_identifier_and_base_where(&self) -> Result<(String, String), Box<dyn Error>>;
|
||||
async fn get_identifier(&self) -> Result<String, Box<dyn Error>>;
|
||||
async fn get_identifier_from_client(client: &'a BigqueryClient) -> Result<String, Box<dyn Error>>;
|
||||
fn get_base_where() -> String;
|
||||
|
||||
// async fn get_identifier_and_base_where_from_client(client: &'a BigqueryClient, pk_name: &str, table_name: &str) -> Result<(String, String), Box<dyn Error>>;
|
||||
|
||||
fn get_query_param<TField: BigDataValueType>(field_name: &str, field_value: &Option<TField>)
|
||||
-> google_bigquery2::api::QueryParameter;
|
||||
|
||||
fn parse_value_to_parameter<TValue>(value: &TValue) -> String
|
||||
where TValue: std::fmt::Display + BigDataValueType;
|
||||
|
||||
// fn create_from_table_row(client: &'a BigqueryClient,
|
||||
// row: &google_bigquery2::api::TableRow,
|
||||
// index_to_name_mapping: &HashMap<String, usize>)
|
||||
// -> Result<Self, Box<dyn Error>>
|
||||
// where
|
||||
// Self: Sized;
|
||||
}
|
||||
|
||||
impl<'a, TABLE, TPK> BigDataTableBaseConvenience<'a, TABLE, TPK> for TABLE
|
||||
where
|
||||
TABLE: BigDataTableBase<'a, TABLE, TPK>,
|
||||
TPK: BigDataValueType + FromStr + Debug,
|
||||
<TPK as FromStr>::Err: Debug,
|
||||
{
|
||||
fn get_pk_param(&self) -> QueryParameter {
|
||||
Self::get_query_param(&Self::get_pk_name(), &Some(self.get_pk_value()))
|
||||
// QueryParameter {
|
||||
// name: Some(format!("__{}",Self::get_pk_name())),
|
||||
// parameter_type: Some(QueryParameterType {
|
||||
// array_type: None,
|
||||
// struct_types: None,
|
||||
// type_: Some(TPK::to_bigquery_type()),
|
||||
// }),
|
||||
// parameter_value: Some(QueryParameterValue {
|
||||
// value: Some(self.get_pk_value().to_bigquery_param_value()),
|
||||
// ..Default::default()
|
||||
// }),
|
||||
// }
|
||||
}
|
||||
fn get_query_fields_str() -> String {
|
||||
Self::get_query_fields().values().into_iter()
|
||||
.map(|v| format!("{}", v))
|
||||
.collect::<Vec<String>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
fn get_query_fields_insert_str() -> String {
|
||||
Self::get_query_fields()
|
||||
.values()
|
||||
.into_iter()
|
||||
.map(|v| format!("@__{}", v))
|
||||
.collect::<Vec<String>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
fn get_where_part(field_name: &str, is_comparing_to_null: bool) -> String {
|
||||
if is_comparing_to_null {
|
||||
format!("{} IS NULL", field_name)
|
||||
} else {
|
||||
format!("{} = @__{}", field_name, field_name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async fn run_get_query(&self, query: &str, project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>> {
|
||||
let parameters = vec![self.get_pk_param()];//default parameters (pk)
|
||||
self.run_get_query_with_params(query, parameters, project_id).await
|
||||
}
|
||||
|
||||
|
||||
async fn run_get_query_with_params(&self,
|
||||
query: &str,
|
||||
parameters: Vec<google_bigquery2::api::QueryParameter>,
|
||||
project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>> {
|
||||
let client = self.get_client();
|
||||
Self::run_get_query_with_params_on_client(client, query, parameters, project_id).await
|
||||
}
|
||||
|
||||
async fn run_get_query_with_params_on_client(client: &'a BigqueryClient,
|
||||
query: &str,
|
||||
parameters: Vec<google_bigquery2::api::QueryParameter>,
|
||||
project_id: &str)
|
||||
-> Result<google_bigquery2::api::QueryResponse, Box<dyn Error>> {
|
||||
println!("Query: {}", query);
|
||||
println!("Parameters: {}", parameters.len());
|
||||
for (i, param) in parameters.iter().enumerate() {
|
||||
println!("{:2}: {:?}", i, param);
|
||||
}
|
||||
println!();
|
||||
let req = google_bigquery2::api::QueryRequest {
|
||||
query: Some(query.to_string()),
|
||||
query_parameters: Some(parameters),
|
||||
use_legacy_sql: Some(false),
|
||||
..Default::default()
|
||||
};
|
||||
let (res, query_res) = client.get_client().jobs().query(req, project_id)
|
||||
.doit().await?;
|
||||
|
||||
if res.status() != 200 {
|
||||
return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
}
|
||||
Ok(query_res)
|
||||
}
|
||||
// async fn get_identifier_and_base_where(&self)
|
||||
// -> Result<(String, String), Box<dyn Error>> {
|
||||
// let pk_name = Self::get_pk_name();
|
||||
// let table_name = Self::get_table_name();
|
||||
// Ok(Self::get_identifier_and_base_where_from_client(&self.get_client(), &pk_name, &table_name).await?)
|
||||
// }
|
||||
async fn get_identifier(&self) -> Result<String, Box<dyn Error>> {
|
||||
let client = self.get_client();
|
||||
Self::get_identifier_from_client(&client).await
|
||||
}
|
||||
async fn get_identifier_from_client(client: &'a BigqueryClient) -> Result<String, Box<dyn Error>> {
|
||||
let dataset_id = client.get_dataset_id();
|
||||
let table_name = Self::get_table_name();
|
||||
let table_identifier = format!("{}.{}", dataset_id, table_name);
|
||||
Ok(table_identifier)
|
||||
}
|
||||
|
||||
fn get_base_where() -> String {
|
||||
let pk_name = Self::get_pk_name();
|
||||
Self::get_where_part(&pk_name, false)
|
||||
}
|
||||
|
||||
default fn get_query_param<TField: BigDataValueType>(field_name: &str, field_value: &Option<TField>) -> google_bigquery2::api::QueryParameter
|
||||
{
|
||||
let type_to_string: String = TField::to_bigquery_type();
|
||||
let value: Option<google_bigquery2::api::QueryParameterValue> = match field_value {
|
||||
Some(value) => Some(google_bigquery2::api::QueryParameterValue {
|
||||
value: Some(value.to_bigquery_param_value()),//TODO: maybe add a way to use array types
|
||||
..Default::default()
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
|
||||
google_bigquery2::api::QueryParameter {
|
||||
name: Some(format!("__{}", field_name.clone())),
|
||||
parameter_type: Some(google_bigquery2::api::QueryParameterType {
|
||||
type_: Some(type_to_string),
|
||||
..Default::default()
|
||||
}),
|
||||
parameter_value: value,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
fn parse_value_to_parameter<TValue>(value: &TValue) -> String
|
||||
where TValue: std::fmt::Display + BigDataValueType
|
||||
{
|
||||
return value.to_bigquery_param_value();
|
||||
}
|
||||
|
||||
//
|
||||
// fn create_from_table_row(client: &'a BigqueryClient,
|
||||
// row: &google_bigquery2::api::TableRow,
|
||||
// index_to_name_mapping: &HashMap<String, usize>)
|
||||
// -> Result<Self, Box<dyn Error>>
|
||||
// where
|
||||
// Self: Sized
|
||||
// {
|
||||
// let pk_index = *index_to_name_mapping.get(&Self::get_pk_name()).unwrap();
|
||||
// let pk = row
|
||||
// .f.as_ref()
|
||||
// .unwrap()[pk_index]
|
||||
// .v.as_ref()
|
||||
// .unwrap()
|
||||
// .parse::<TPK>()
|
||||
// .unwrap();
|
||||
// let mut res = Self::create_with_pk(client, pk);
|
||||
// res.write_from_table_row(row, index_to_name_mapping)?;
|
||||
// Ok(res)
|
||||
// }
|
||||
}
|
||||
202
src/data/mod.rs
Normal file
202
src/data/mod.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::str::FromStr;
|
||||
|
||||
use google_bigquery2::api::{QueryParameter, TableSchema};
|
||||
|
||||
pub use big_data_table_base::BigDataTableBase;
|
||||
pub use big_data_table_base_convenience::BigDataTableBaseConvenience;
|
||||
|
||||
use crate::client::{BigqueryClient, HasBigQueryClient};
|
||||
use crate::utils::BigDataValueType;
|
||||
|
||||
mod big_data_table_base_convenience;
|
||||
mod big_data_table_base;
|
||||
|
||||
// pub trait BigDataTable<'a, TABLE, TPK: BigDataValueType + FromStr + Debug>: HasBigQueryClient<'a> + BigDataTableBaseConvenience<'a, TABLE, TPK> + BigDataTableBase<'a, TABLE, TPK> {
|
||||
pub trait BigDataTable<'a, TABLE, TPK>
|
||||
: HasBigQueryClient<'a>
|
||||
+ BigDataTableBaseConvenience<'a, TABLE, TPK>
|
||||
+ BigDataTableBase<'a, TABLE, TPK>
|
||||
where TPK: BigDataValueType + FromStr + Debug {
|
||||
async fn from_pk(
|
||||
client: &'a BigqueryClient,
|
||||
pk: TPK,
|
||||
) -> Result<Self, Box<dyn Error>>
|
||||
where
|
||||
Self: Sized;
|
||||
async fn save_to_bigquery(&self) -> Result<(), Box<dyn Error>>;
|
||||
async fn load_from_bigquery(&mut self) -> Result<(), Box<dyn Error>>;
|
||||
async fn load_by_field<T: BigDataValueType>(client: &'a BigqueryClient, field_name: &str, field_value: Option<T>, max_amount: usize)
|
||||
-> Result<Vec<TABLE>, Box<dyn Error>>;
|
||||
|
||||
async fn load_by_custom_query(client: &'a BigqueryClient, query: &str, parameters: Vec<QueryParameter>, max_amount: usize)
|
||||
-> Result<Vec<TABLE>, Box<dyn Error>>;
|
||||
}
|
||||
|
||||
impl<'a, TABLE, TPK> BigDataTable<'a, TABLE, TPK> for TABLE
|
||||
where
|
||||
TABLE: HasBigQueryClient<'a> + BigDataTableBaseConvenience<'a, TABLE, TPK>,
|
||||
TPK: BigDataValueType + FromStr + Debug,
|
||||
<TPK as FromStr>::Err: Debug
|
||||
{
|
||||
async fn from_pk(client: &'a BigqueryClient, pk: TPK) -> Result<Self, Box<dyn Error>> where Self: Sized {
|
||||
let mut res = Self::create_with_pk(client, pk);
|
||||
res.load_from_bigquery().await?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn save_to_bigquery(&self) -> Result<(), Box<dyn Error>> {
|
||||
let project_id = self.get_client().get_project_id();
|
||||
|
||||
let table_identifier = self.get_identifier().await?;
|
||||
let w = Self::get_base_where();
|
||||
// region check for existing data
|
||||
let exists_row: bool;
|
||||
let existing_count = format!("select count(*) from {} where {} limit 1", table_identifier, w);
|
||||
|
||||
let req = google_bigquery2::api::QueryRequest {
|
||||
query: Some(existing_count),
|
||||
query_parameters: Some(vec![self.get_pk_param()]),
|
||||
use_legacy_sql: Some(false),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
let (res, query_res) = self.get_client().get_client().jobs().query(req, project_id)
|
||||
.doit().await?;
|
||||
|
||||
if res.status() != 200 {
|
||||
return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
}
|
||||
|
||||
if let None = &query_res.rows {
|
||||
return Err("No rows returned!".into());
|
||||
}
|
||||
|
||||
let rows = query_res.rows.unwrap();
|
||||
|
||||
if rows.len() != 1 {
|
||||
return Err(format!("Wrong amount of data returned! ({})", rows.len()).into());
|
||||
}
|
||||
|
||||
let row = &rows[0];
|
||||
let amount: i32 = row.f.as_ref().unwrap()[0].clone().v.unwrap().parse().unwrap();
|
||||
|
||||
if amount == 0 {
|
||||
exists_row = false;
|
||||
} else if amount == 1 {
|
||||
exists_row = true;
|
||||
} else {
|
||||
panic!("Too many rows with key!");
|
||||
}
|
||||
|
||||
// endregion
|
||||
|
||||
|
||||
// region update or insert
|
||||
|
||||
let query = match exists_row {
|
||||
true => format!("update {} set {} where {}", table_identifier, self.get_query_fields_update_str(), w),
|
||||
false => format!("insert into {} ({}, {}) values(@__{}, {})", table_identifier,
|
||||
Self::get_pk_name(),
|
||||
Self::get_query_fields_str(),
|
||||
Self::get_pk_name(),
|
||||
Self::get_query_fields_insert_str()),
|
||||
};
|
||||
|
||||
let mut query_parameters = self.get_all_query_parameters();
|
||||
// query_parameters.push(self.get_pk_param()); // todo: check if this is needed
|
||||
let req = google_bigquery2::api::QueryRequest {
|
||||
query: Some(query),
|
||||
query_parameters: Some(query_parameters),
|
||||
use_legacy_sql: Some(false),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (res, _) = self.get_client().get_client().jobs().query(req, project_id)
|
||||
.doit().await?;
|
||||
|
||||
if res.status() != 200 {
|
||||
return Err(format!("Wrong status code returned! ({})", res.status()).into());
|
||||
}
|
||||
|
||||
//endregion
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_from_bigquery(&mut self) -> Result<(), Box<dyn Error>> {
|
||||
let project_id = self.get_client().get_project_id();
|
||||
let table_identifier = self.get_identifier().await?;
|
||||
let where_clause = Self::get_base_where();
|
||||
|
||||
let query = format!("select {} from {} where {} limit 1", Self::get_query_fields_str(), table_identifier, where_clause);
|
||||
let query_res = self.run_get_query(&query, project_id).await?;
|
||||
|
||||
if let None = &query_res.rows {
|
||||
return Err("No rows returned!".into());
|
||||
}
|
||||
|
||||
let rows = query_res.rows.unwrap();
|
||||
|
||||
if rows.len() != 1 {
|
||||
return Err(format!("Wrong amount of data returned! ({})", rows.len()).into());
|
||||
}
|
||||
let mut index_to_name_mapping: HashMap<String, usize> = get_name_index_mapping(query_res.schema);
|
||||
|
||||
let row = &rows[0];
|
||||
self.write_from_table_row(row, &index_to_name_mapping)
|
||||
}
|
||||
|
||||
async fn load_by_field<T: BigDataValueType>(client: &'a BigqueryClient, field_name: &str, field_value: Option<T>, max_amount: usize)
|
||||
-> Result<Vec<TABLE>, Box<dyn Error>> {
|
||||
let field_name: String = field_name.into();
|
||||
let field_name = Self::get_field_name(&field_name).expect(format!("Field '{}' not found!", field_name).as_str());
|
||||
let where_clause = Self::get_where_part(&field_name, field_value.is_none());
|
||||
// let where_clause = format!(" {} = @__{}", field_name, field_name);
|
||||
let table_identifier = Self::get_identifier_from_client(client).await?;
|
||||
let query = format!("select {} from {} where {} limit {}", Self::get_query_fields_str(), table_identifier, where_clause, max_amount);
|
||||
|
||||
let mut params = vec![];
|
||||
if !(field_value.is_none()) {
|
||||
params.push(Self::get_query_param(&field_name, &field_value));
|
||||
}
|
||||
Self::load_by_custom_query(client, &query, params, max_amount).await
|
||||
}
|
||||
|
||||
async fn load_by_custom_query(client: &'a BigqueryClient, query: &str, parameters: Vec<QueryParameter>, max_amount: usize)
|
||||
-> Result<Vec<TABLE>, Box<dyn Error>> {
|
||||
|
||||
let project_id = client.get_project_id();
|
||||
let query_res: google_bigquery2::api::QueryResponse = Self::run_get_query_with_params_on_client(client, &query, parameters, project_id).await?;
|
||||
|
||||
if let None = &query_res.rows {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let rows: Vec<google_bigquery2::api::TableRow> = query_res.rows.unwrap();
|
||||
let mut result: Vec<TABLE> = vec![];
|
||||
|
||||
let mut index_to_name_mapping: HashMap<String, usize> = get_name_index_mapping(query_res.schema);
|
||||
|
||||
for row in rows.iter() {
|
||||
for cell in row.f.iter() {
|
||||
//create a new object and write the values to each field
|
||||
let obj = Self::create_from_table_row(client, row, &index_to_name_mapping)?;
|
||||
result.push(obj);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_name_index_mapping(schema: Option<TableSchema>) -> HashMap<String, usize> {
|
||||
let mut index_to_name_mapping: HashMap<String, usize> = HashMap::new();
|
||||
for (i, x) in schema.unwrap().fields.unwrap().iter().enumerate() {
|
||||
index_to_name_mapping.insert(x.name.clone().unwrap(), i);
|
||||
}
|
||||
index_to_name_mapping
|
||||
}
|
||||
Reference in New Issue
Block a user