mirror of
https://github.com/OMGeeky/google_bigquery_v2.git
synced 2025-12-27 06:29:38 +01:00
implement update & some fixes
This commit is contained in:
@@ -68,13 +68,6 @@ fn implement_big_query_table_base(
|
||||
#impl_set_field_value
|
||||
#impl_get_field_value
|
||||
#impl_from_query_result_row
|
||||
|
||||
async fn insert(&mut self) -> Result<()>{
|
||||
todo!()
|
||||
}
|
||||
async fn update(&mut self) -> Result<()>{
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::error::Error;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use google_bigquery2::{hyper, hyper_rustls, oauth2};
|
||||
use google_bigquery2::Bigquery;
|
||||
use google_bigquery2::hyper::client::HttpConnector;
|
||||
use google_bigquery2::hyper_rustls::HttpsConnector;
|
||||
use google_bigquery2::Bigquery;
|
||||
use google_bigquery2::{hyper, hyper_rustls, oauth2};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BigqueryClient {
|
||||
@@ -87,7 +87,7 @@ async fn get_internal_client<S: Into<String>>(
|
||||
"Failed to read service account key from file. {}",
|
||||
service_account_path
|
||||
)
|
||||
.as_str(),
|
||||
.as_str(),
|
||||
);
|
||||
let auth = oauth2::ServiceAccountAuthenticator::builder(secret)
|
||||
.build()
|
||||
|
||||
@@ -11,8 +11,10 @@ use log::trace;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::client::BigqueryClient;
|
||||
use crate::data::param_conversion::{
|
||||
convert_value_to_string, BigDataValueType,
|
||||
use crate::data::param_conversion::{convert_value_to_string, BigDataValueType};
|
||||
use crate::data::query_builder::{
|
||||
NoClient, NoStartingData, QueryBuilder, QueryResultType, QueryTypeInsert, QueryTypeNoType,
|
||||
QueryTypeSelect, QueryTypeUpdate, QueryWasNotBuilt,
|
||||
};
|
||||
use crate::prelude::*;
|
||||
|
||||
@@ -61,18 +63,29 @@ pub trait BigQueryTableBase {
|
||||
|
||||
//endregion
|
||||
|
||||
//region insert
|
||||
|
||||
async fn insert(&mut self) -> Result<()>;
|
||||
async fn update(&mut self) -> Result<()>;
|
||||
|
||||
//endregion
|
||||
|
||||
//endregion
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait BigQueryTable: BigQueryTableBase {
|
||||
fn select() -> QueryBuilder<Self, QueryTypeSelect, NoClient, QueryWasNotBuilt, NoStartingData>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
QueryBuilder::<Self, QueryTypeNoType, NoClient, QueryWasNotBuilt, NoStartingData>::select()
|
||||
}
|
||||
fn insert() -> QueryBuilder<Self, QueryTypeInsert, NoClient, QueryWasNotBuilt, NoStartingData>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
QueryBuilder::<Self, QueryTypeNoType, NoClient, QueryWasNotBuilt, NoStartingData>::insert()
|
||||
}
|
||||
fn update() -> QueryBuilder<Self, QueryTypeUpdate, NoClient, QueryWasNotBuilt, NoStartingData>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
QueryBuilder::<Self, QueryTypeNoType, NoClient, QueryWasNotBuilt, NoStartingData>::update()
|
||||
}
|
||||
fn get_parameter<T>(value: &T, param_name: &String) -> Result<QueryParameter>
|
||||
where
|
||||
T: BigDataValueType + Debug,
|
||||
@@ -86,16 +99,25 @@ pub trait BigQueryTable: BigQueryTableBase {
|
||||
};
|
||||
debug!("param_type: {:?}", param_type);
|
||||
debug!("param_value: {:?}", value);
|
||||
let param_value = convert_value_to_string(value)?;
|
||||
let param_value = convert_value_to_string(value);
|
||||
debug!("param_value: {:?}", param_value);
|
||||
let param_value = QueryParameterValue {
|
||||
value: Some(param_value),
|
||||
..Default::default()
|
||||
let param_value = match param_value {
|
||||
Ok(param_value) => Some(QueryParameterValue {
|
||||
value: Some(param_value),
|
||||
..Default::default()
|
||||
}),
|
||||
Err(_) => todo!(
|
||||
"a parameter value probably of sort null is not yet \
|
||||
implemented. Does this even make sense or should the code that's \
|
||||
calling this react if there is an error returned from this function \
|
||||
and modify the where to be 'is null' instead of '== @__PARAM_x'?"
|
||||
),
|
||||
};
|
||||
debug!("param_value: {:?}", param_value);
|
||||
|
||||
let param = QueryParameter {
|
||||
parameter_type: Some(param_type),
|
||||
parameter_value: Some(param_value),
|
||||
parameter_value: param_value,
|
||||
name: Some(param_name.clone()),
|
||||
};
|
||||
Ok(param)
|
||||
@@ -133,36 +155,44 @@ pub trait BigQueryTable: BigQueryTableBase {
|
||||
async fn get_by_pk<PK>(client: BigqueryClient, pk_value: &PK) -> Result<Self>
|
||||
where
|
||||
PK: BigDataValueType + Send + Sync + 'static,
|
||||
Self: Sized,
|
||||
Self: Sized + Debug,
|
||||
{
|
||||
trace!("get_by_pk({:?}, {:?})", client, pk_value);
|
||||
let pk_field_name = Self::get_pk_field_name();
|
||||
let pk_db_name = Self::get_pk_db_name();
|
||||
let result = Self::query(client)
|
||||
let result = Self::select()
|
||||
.with_client(client)
|
||||
.add_where_eq(&pk_field_name, Some(pk_value))?
|
||||
.build_query()?
|
||||
.run()
|
||||
.await;
|
||||
match result {
|
||||
Ok(mut v) => {
|
||||
if v.len() == 0 {
|
||||
Err(format!("No entry found for {} = {:?}", pk_db_name, pk_value).into())
|
||||
} else if v.len() > 1 {
|
||||
Err(format!(
|
||||
"More than one entry found for {} = {:?}",
|
||||
pk_db_name, pk_value
|
||||
)
|
||||
.into())
|
||||
} else {
|
||||
Ok(v.remove(0))
|
||||
}
|
||||
.await?;
|
||||
let mut rows = match result {
|
||||
QueryResultType::WithRowData(data) => data,
|
||||
QueryResultType::WithoutRowData(success) => {
|
||||
return Err(format!(
|
||||
"something went wrong when getting for {} = {:?};\tresult: {:?}",
|
||||
pk_field_name, pk_value, success
|
||||
)
|
||||
.into());
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
|
||||
if rows.len() == 0 {
|
||||
Err(format!("No entry found for {} = {:?}", pk_db_name, pk_value).into())
|
||||
} else if rows.len() > 1 {
|
||||
Err(format!(
|
||||
"More than one entry found for {} = {:?}",
|
||||
pk_db_name, pk_value
|
||||
)
|
||||
.into())
|
||||
} else {
|
||||
Ok(rows.remove(0))
|
||||
}
|
||||
}
|
||||
|
||||
async fn upsert(&mut self) -> Result<()>
|
||||
where
|
||||
Self: Sized + Clone + Send + Sync,
|
||||
Self: Sized + Clone + Send + Sync + Debug + Default,
|
||||
{
|
||||
trace!("upsert()");
|
||||
|
||||
@@ -174,14 +204,42 @@ pub trait BigQueryTable: BigQueryTableBase {
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("Inserting new entry.");
|
||||
self.insert().await
|
||||
Self::insert()
|
||||
.with_client(self.get_client().clone())
|
||||
.set_data(self.clone())
|
||||
.build_query()?
|
||||
.run()
|
||||
.await?
|
||||
.map_err_without_data("upsert should not return data.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// proxy for update
|
||||
async fn save(&mut self) -> Result<()> {
|
||||
self.update().await
|
||||
async fn save(&mut self) -> Result<()>
|
||||
where
|
||||
Self: Sized + Clone + Send + Sync + Debug + Default,
|
||||
{
|
||||
trace!("save(): {:?}", self);
|
||||
let result = Self::update()
|
||||
.with_client(self.get_client().clone())
|
||||
.set_data(self.clone())
|
||||
.build_query()?
|
||||
.run()
|
||||
.await?;
|
||||
trace!("save() result: {:?}", result);
|
||||
let count = result
|
||||
.expect_with_data("save should return empty data.")
|
||||
.len();
|
||||
if count == 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format!(
|
||||
"save should return empty data, but returned {} rows.",
|
||||
count
|
||||
)
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// updates the current instance from another instance.
|
||||
@@ -193,16 +251,6 @@ pub trait BigQueryTable: BigQueryTableBase {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn query<Table>(client: BigqueryClient) -> BigQueryBuilder<Table>
|
||||
where
|
||||
Table: BigQueryTable,
|
||||
{
|
||||
BigQueryBuilder {
|
||||
client: Some(client),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> BigQueryTable for T where T: BigQueryTableBase {}
|
||||
|
||||
@@ -139,7 +139,9 @@ pub fn convert_value_to_string(value: Value) -> Result<String> {
|
||||
Ok(value::from_value(value)?)
|
||||
} else {
|
||||
warn!("Unknown type: {:?}", value);
|
||||
|
||||
if value == Value::Null {
|
||||
return Err("Value is Null".into());
|
||||
}
|
||||
//TODO: check if this is correct with for example 'DATETIME' values
|
||||
// Err(format!("Unknown type: {:?}", value).into())
|
||||
let string = value.to_string();
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
use std::error::Error;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
|
||||
pub use convert_bigquery_params::{
|
||||
convert_value_to_string, ConvertBigQueryParams,
|
||||
};
|
||||
pub use convert_bigquery_params::{convert_value_to_string, ConvertBigQueryParams};
|
||||
pub use convert_type_to_big_query_type::ConvertTypeToBigQueryType;
|
||||
|
||||
mod convert_bigquery_params;
|
||||
mod convert_type_to_big_query_type;
|
||||
|
||||
pub trait BigDataValueType:
|
||||
ConvertTypeToBigQueryType + ConvertBigQueryParams + Debug + Send + Sync
|
||||
{}
|
||||
ConvertTypeToBigQueryType + ConvertBigQueryParams + Debug + Send + Sync
|
||||
{
|
||||
}
|
||||
|
||||
impl<T: ConvertTypeToBigQueryType + ConvertBigQueryParams + Debug + Send + Sync> BigDataValueType
|
||||
for T
|
||||
{}
|
||||
for T
|
||||
{
|
||||
}
|
||||
|
||||
//region ConversionError
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -11,6 +11,7 @@ use serde_json::Value;
|
||||
use crate::data::param_conversion::BigDataValueType;
|
||||
use crate::prelude::*;
|
||||
|
||||
//region BigqueryError
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BigqueryError {
|
||||
pub message: String,
|
||||
@@ -34,12 +35,63 @@ impl Display for BigqueryError {
|
||||
|
||||
impl Error for BigqueryError {}
|
||||
|
||||
//endregion
|
||||
|
||||
//region typestate
|
||||
//region QueryResultType
|
||||
#[derive(Debug)]
|
||||
pub enum QueryResultType<Table> {
|
||||
WithRowData(Vec<Table>),
|
||||
WithoutRowData(Result<()>),
|
||||
}
|
||||
|
||||
impl<T> QueryResultType<T> {
|
||||
pub fn map_err_with_data(self, message: impl Into<String>) -> Result<Vec<T>> {
|
||||
match self {
|
||||
QueryResultType::WithRowData(data) => Ok(data),
|
||||
QueryResultType::WithoutRowData(_) => {
|
||||
Err(format!("map_err_with_data message:{}", message.into()).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn map_err_without_data(self, message: impl Into<String>) -> Result<()> {
|
||||
match self {
|
||||
QueryResultType::WithoutRowData(result) => result,
|
||||
QueryResultType::WithRowData(_) => {
|
||||
Err(format!("map_err_without_data message:{}", message.into()).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn expect_with_data(self, message: impl Into<String>) -> Vec<T> {
|
||||
match self {
|
||||
QueryResultType::WithRowData(data) => data,
|
||||
QueryResultType::WithoutRowData(_) => {
|
||||
panic!("expect_with_data message:{}", message.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn expect_without_data(self, message: impl Into<String>) -> Result<()> {
|
||||
match self {
|
||||
QueryResultType::WithoutRowData(result) => result,
|
||||
QueryResultType::WithRowData(_) => {
|
||||
panic!("expect_without_data message:{}", message.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn is_with_row_data(&self) -> bool {
|
||||
match self {
|
||||
QueryResultType::WithRowData(_) => true,
|
||||
QueryResultType::WithoutRowData(_) => false,
|
||||
}
|
||||
}
|
||||
pub fn is_without_row_data(&self) -> bool {
|
||||
match self {
|
||||
QueryResultType::WithRowData(_) => false,
|
||||
QueryResultType::WithoutRowData(_) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
//endregion
|
||||
//region typestate structs
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
@@ -81,16 +133,22 @@ pub struct QueryTypeSelect;
|
||||
impl HasQueryType for QueryTypeSelect {}
|
||||
|
||||
//endregion
|
||||
// pub struct QueryTypeNoUpdate;
|
||||
// pub struct QueryTypeUpdate;
|
||||
// struct QueryTypeNoDelete;
|
||||
// struct QueryTypeDelete;
|
||||
//region update
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct QueryTypeUpdate;
|
||||
|
||||
impl HasQueryType for QueryTypeUpdate {}
|
||||
|
||||
//endregion
|
||||
|
||||
//endregion
|
||||
|
||||
pub trait HasQueryType {}
|
||||
|
||||
pub trait HasNoQueryType {}
|
||||
//endregion
|
||||
|
||||
//region QueryBuilder
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct QueryBuilder<Table, QueryType, Client, QueryBuilt, StartingData> {
|
||||
client: Client,
|
||||
@@ -107,8 +165,9 @@ pub struct QueryBuilder<Table, QueryType, Client, QueryBuilt, StartingData> {
|
||||
table: PhantomData<Table>,
|
||||
}
|
||||
|
||||
//region default implementation for QueryBuilder
|
||||
impl<Table, QueryType, Client: Default, QueryBuilt, StartingData: Default> Default
|
||||
for QueryBuilder<Table, QueryType, Client, QueryBuilt, StartingData>
|
||||
for QueryBuilder<Table, QueryType, Client, QueryBuilt, StartingData>
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -126,8 +185,11 @@ for QueryBuilder<Table, QueryType, Client, QueryBuilt, StartingData>
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region general QueryBuilder
|
||||
//region functions for all queries
|
||||
impl<Table: BigQueryTable, UnknownQueryType, Client, QueryBuilt, StartingData>
|
||||
QueryBuilder<Table, UnknownQueryType, Client, QueryBuilt, StartingData>
|
||||
QueryBuilder<Table, UnknownQueryType, Client, QueryBuilt, StartingData>
|
||||
{
|
||||
fn get_sorted_selected_fields(&self) -> Vec<(String, String)> {
|
||||
trace!("get_sorted_selected_fields()");
|
||||
@@ -138,6 +200,7 @@ QueryBuilder<Table, UnknownQueryType, Client, QueryBuilt, StartingData>
|
||||
}
|
||||
|
||||
fn get_fields_string(&self) -> String {
|
||||
trace!("get_fields_string()");
|
||||
let mut fields = self.get_sorted_selected_fields();
|
||||
fields
|
||||
.into_iter()
|
||||
@@ -147,14 +210,46 @@ QueryBuilder<Table, UnknownQueryType, Client, QueryBuilt, StartingData>
|
||||
}
|
||||
}
|
||||
|
||||
impl<Table: BigQueryTable, UnknownQueryType, Client, StartingData>
|
||||
QueryBuilder<Table, UnknownQueryType, Client, QueryWasNotBuilt, StartingData>
|
||||
//endregion
|
||||
//region functions for not built queries
|
||||
//region with Starting data
|
||||
impl<Table: BigQueryTable + Default, UnknownQueryType, Client>
|
||||
QueryBuilder<Table, UnknownQueryType, Client, QueryWasNotBuilt, HasStartingData<Table>>
|
||||
{
|
||||
pub fn add_field_where(self, field: &str) -> Result<Self> {
|
||||
trace!("add_field_where(field: {})", field);
|
||||
|
||||
let field_db_name = Table::get_field_db_name(field)?;
|
||||
let param = Table::get_parameter_from_field(&self.starting_data.0, &field)?;
|
||||
let has_param_value = param.parameter_value.is_some();
|
||||
let mut params = self.params;
|
||||
|
||||
let mut wheres = self.where_clauses;
|
||||
if has_param_value {
|
||||
let param_name = param.name.as_ref().unwrap().to_string();
|
||||
params.push(param);
|
||||
wheres.push(format!("{} = @{}", field_db_name, param_name));
|
||||
} else {
|
||||
wheres.push(format!("{} is NULL", field_db_name));
|
||||
}
|
||||
Ok(Self {
|
||||
where_clauses: wheres,
|
||||
params,
|
||||
..self
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
impl<Table: BigQueryTable + Debug, UnknownQueryType: Debug, Client: Debug, StartingData: Debug>
|
||||
QueryBuilder<Table, UnknownQueryType, Client, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
//region set query content
|
||||
pub fn add_where_eq<T>(self, column: &str, value: Option<&T>) -> Result<Self>
|
||||
where
|
||||
T: BigDataValueType + Debug,
|
||||
where
|
||||
T: BigDataValueType + Debug,
|
||||
{
|
||||
trace!("add_where_eq({:?}, {:?})", column, value);
|
||||
let column = Table::get_field_db_name(column)?;
|
||||
let mut wheres = self.where_clauses;
|
||||
|
||||
@@ -183,6 +278,7 @@ QueryBuilder<Table, UnknownQueryType, Client, QueryWasNotBuilt, StartingData>
|
||||
}
|
||||
|
||||
pub fn set_limit(self, limit: u32) -> Self {
|
||||
trace!("set_limit({:?})", limit);
|
||||
Self {
|
||||
limit: Some(limit),
|
||||
..self
|
||||
@@ -192,6 +288,7 @@ QueryBuilder<Table, UnknownQueryType, Client, QueryWasNotBuilt, StartingData>
|
||||
|
||||
//region build query
|
||||
fn build_where_string(&self) -> String {
|
||||
trace!("build_where_string: {:?}", self);
|
||||
let mut where_string = String::new();
|
||||
if !self.where_clauses.is_empty() {
|
||||
where_string.push_str(" WHERE ");
|
||||
@@ -199,22 +296,23 @@ QueryBuilder<Table, UnknownQueryType, Client, QueryWasNotBuilt, StartingData>
|
||||
}
|
||||
where_string
|
||||
}
|
||||
fn build_order_by_string(&self) -> String {
|
||||
fn build_order_by_string(&self) -> Result<String> {
|
||||
trace!("build_order_by_string: {:?}", self);
|
||||
let mut order_by_string = String::new();
|
||||
if !self.order_by.is_empty() {
|
||||
order_by_string.push_str(" ORDER BY ");
|
||||
order_by_string.push_str(
|
||||
&self
|
||||
.order_by
|
||||
.iter()
|
||||
.map(|(column, direction)| format!("{} {}", column, direction.to_query_str()))
|
||||
.collect::<Vec<String>>()
|
||||
.join(", "),
|
||||
);
|
||||
let mut order_by = vec![];
|
||||
for (column, direction) in &self.order_by {
|
||||
let column = Table::get_field_db_name(&column)?;
|
||||
order_by.push(format!("{} {}", column, direction.to_query_str()));
|
||||
}
|
||||
|
||||
order_by_string.push_str(&order_by.join(", "));
|
||||
}
|
||||
order_by_string
|
||||
Ok(order_by_string)
|
||||
}
|
||||
fn build_limit_string(&self) -> String {
|
||||
trace!("build_limit_string: {:?}", self);
|
||||
let mut limit_string = String::new();
|
||||
if let Some(limit) = self.limit {
|
||||
limit_string.push_str(" LIMIT ");
|
||||
@@ -225,13 +323,17 @@ QueryBuilder<Table, UnknownQueryType, Client, QueryWasNotBuilt, StartingData>
|
||||
//endregion
|
||||
}
|
||||
|
||||
impl<Table: BigQueryTable + Default, QueryType: HasQueryType, Client: Default>
|
||||
QueryBuilder<Table, QueryType, Client, QueryWasNotBuilt, NoStartingData>
|
||||
//endregion
|
||||
//endregion
|
||||
//region set_data
|
||||
impl<Table: BigQueryTable + Default + Debug, QueryType: HasQueryType, Client: Default>
|
||||
QueryBuilder<Table, QueryType, Client, QueryWasNotBuilt, NoStartingData>
|
||||
{
|
||||
pub fn set_data(
|
||||
self,
|
||||
data: Table,
|
||||
) -> QueryBuilder<Table, QueryType, Client, QueryWasNotBuilt, HasStartingData<Table>> {
|
||||
trace!("set_data({:?})", data);
|
||||
QueryBuilder {
|
||||
starting_data: HasStartingData(data),
|
||||
query_built: PhantomData,
|
||||
@@ -247,11 +349,14 @@ QueryBuilder<Table, QueryType, Client, QueryWasNotBuilt, NoStartingData>
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region QueryTypeNoType
|
||||
impl<Table: BigQueryTable, Client: Default, StartingData: Default>
|
||||
QueryBuilder<Table, QueryTypeNoType, Client, QueryWasNotBuilt, StartingData>
|
||||
QueryBuilder<Table, QueryTypeNoType, Client, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
pub fn select() -> QueryBuilder<Table, QueryTypeSelect, NoClient, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
trace!("select()");
|
||||
QueryBuilder {
|
||||
query: String::from("SELECT "),
|
||||
..Default::default()
|
||||
@@ -259,6 +364,15 @@ QueryBuilder<Table, QueryTypeNoType, Client, QueryWasNotBuilt, StartingData>
|
||||
}
|
||||
pub fn insert() -> QueryBuilder<Table, QueryTypeInsert, NoClient, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
trace!("insert()");
|
||||
QueryBuilder {
|
||||
query: String::from("INSERT INTO "),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
pub fn update() -> QueryBuilder<Table, QueryTypeUpdate, NoClient, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
trace!("update()");
|
||||
QueryBuilder {
|
||||
query: String::from("INSERT INTO "),
|
||||
..Default::default()
|
||||
@@ -266,8 +380,10 @@ QueryBuilder<Table, QueryTypeNoType, Client, QueryWasNotBuilt, StartingData>
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region QueryTypeInsert
|
||||
impl<Table: BigQueryTable + Default + Debug>
|
||||
QueryBuilder<Table, QueryTypeInsert, HasClient, QueryWasNotBuilt, HasStartingData<Table>>
|
||||
QueryBuilder<Table, QueryTypeInsert, HasClient, QueryWasNotBuilt, HasStartingData<Table>>
|
||||
{
|
||||
pub fn build_query(
|
||||
self,
|
||||
@@ -325,8 +441,82 @@ QueryBuilder<Table, QueryTypeInsert, HasClient, QueryWasNotBuilt, HasStartingDat
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region QueryTypeUpdate
|
||||
impl<Table: BigQueryTable + Default + Debug>
|
||||
QueryBuilder<Table, QueryTypeUpdate, HasClient, QueryWasNotBuilt, HasStartingData<Table>>
|
||||
{
|
||||
pub fn build_query(
|
||||
mut self,
|
||||
) -> Result<
|
||||
QueryBuilder<Table, QueryTypeUpdate, HasClient, QueryWasBuilt, HasStartingData<Table>>,
|
||||
> {
|
||||
trace!("build_query: update: {:?}", self);
|
||||
let table_identifier = Table::get_table_identifier_from_client(&self.client.0);
|
||||
let fields_str = self.build_update_fields_string()?;
|
||||
if self.where_clauses.is_empty() {
|
||||
trace!("no where clause, adding pk field to where clause");
|
||||
self = self.add_field_where(&Table::get_pk_field_name())?;
|
||||
}
|
||||
let where_clause = self.build_where_string();
|
||||
let params = &self.params;
|
||||
log::warn!("params are not used in update query: {:?}", params);
|
||||
let mut params = vec![];
|
||||
let local_fields = Table::get_query_fields(true);
|
||||
let starting_data = &self.starting_data.0;
|
||||
for (local_field_name, _) in local_fields {
|
||||
let para = Table::get_parameter_from_field(starting_data, &local_field_name)?;
|
||||
params.push(para);
|
||||
}
|
||||
|
||||
let query = format!(
|
||||
"update {} set {} {}",
|
||||
table_identifier, fields_str, where_clause
|
||||
);
|
||||
Ok(QueryBuilder {
|
||||
query,
|
||||
params,
|
||||
where_clauses: self.where_clauses,
|
||||
order_by: self.order_by,
|
||||
limit: self.limit,
|
||||
client: self.client,
|
||||
table: self.table,
|
||||
starting_data: self.starting_data,
|
||||
query_type: self.query_type,
|
||||
query_built: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_update_fields_string(&mut self) -> Result<String> {
|
||||
trace!("build_update_fields_string");
|
||||
let result = self
|
||||
.get_value_parameter_names()?
|
||||
.into_iter()
|
||||
.map(|(f, p)| format!("{} = @{}", f, p).to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(", ");
|
||||
trace!("build_update_fields_string: result: {}", result);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn get_value_parameter_names(&self) -> Result<Vec<(String, String)>> {
|
||||
let mut values = self.get_sorted_selected_fields();
|
||||
let mut res = vec![];
|
||||
for (field, _) in values.iter_mut() {
|
||||
res.push((
|
||||
Table::get_field_db_name(field)?,
|
||||
Table::get_field_param_name(field)?,
|
||||
));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region QueryTypeSelect
|
||||
//region client not needed
|
||||
impl<Table: BigQueryTable + Debug, Client: Debug, StartingData: Debug>
|
||||
QueryBuilder<Table, QueryTypeSelect, Client, QueryWasNotBuilt, StartingData>
|
||||
QueryBuilder<Table, QueryTypeSelect, Client, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
pub fn add_order_by(
|
||||
mut self,
|
||||
@@ -338,24 +528,26 @@ QueryBuilder<Table, QueryTypeSelect, Client, QueryWasNotBuilt, StartingData>
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region client needed
|
||||
impl<Table: BigQueryTable + Debug, StartingData: Debug>
|
||||
QueryBuilder<Table, QueryTypeSelect, HasClient, QueryWasNotBuilt, StartingData>
|
||||
QueryBuilder<Table, QueryTypeSelect, HasClient, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
pub fn build_query(
|
||||
self,
|
||||
) -> QueryBuilder<Table, QueryTypeSelect, HasClient, QueryWasBuilt, StartingData> {
|
||||
) -> Result<QueryBuilder<Table, QueryTypeSelect, HasClient, QueryWasBuilt, StartingData>> {
|
||||
trace!("build_query: select: {:?}", self);
|
||||
|
||||
let table_identifier = Table::get_table_identifier_from_client(&self.client.0);
|
||||
let fields_str = self.get_fields_string();
|
||||
let where_clause = self.build_where_string();
|
||||
let order_by_clause = self.build_order_by_string();
|
||||
let order_by_clause = self.build_order_by_string()?;
|
||||
let limit_clause = self.build_limit_string();
|
||||
let query = format!(
|
||||
"SELECT {} FROM {}{}{}{}",
|
||||
fields_str, table_identifier, where_clause, order_by_clause, limit_clause
|
||||
);
|
||||
QueryBuilder {
|
||||
Ok(QueryBuilder {
|
||||
query,
|
||||
where_clauses: self.where_clauses,
|
||||
order_by: self.order_by,
|
||||
@@ -366,12 +558,15 @@ QueryBuilder<Table, QueryTypeSelect, HasClient, QueryWasNotBuilt, StartingData>
|
||||
starting_data: self.starting_data,
|
||||
query_type: self.query_type,
|
||||
query_built: PhantomData,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//endregion
|
||||
//region with_client
|
||||
impl<Table: BigQueryTable, QueryType, StartingData>
|
||||
QueryBuilder<Table, QueryType, NoClient, QueryWasNotBuilt, StartingData>
|
||||
QueryBuilder<Table, QueryType, NoClient, QueryWasNotBuilt, StartingData>
|
||||
{
|
||||
pub fn with_client(
|
||||
self,
|
||||
@@ -392,8 +587,10 @@ QueryBuilder<Table, QueryType, NoClient, QueryWasNotBuilt, StartingData>
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region un_build & get query string
|
||||
impl<Table: BigQueryTable, QueryType, Client, StartingData>
|
||||
QueryBuilder<Table, QueryType, Client, QueryWasBuilt, StartingData>
|
||||
QueryBuilder<Table, QueryType, Client, QueryWasBuilt, StartingData>
|
||||
{
|
||||
pub fn un_build(
|
||||
self,
|
||||
@@ -411,10 +608,15 @@ QueryBuilder<Table, QueryType, Client, QueryWasBuilt, StartingData>
|
||||
query_built: PhantomData,
|
||||
}
|
||||
}
|
||||
pub fn get_query_string(&self) -> &str {
|
||||
&self.query
|
||||
}
|
||||
}
|
||||
|
||||
//endregion
|
||||
//region run
|
||||
impl<Table: BigQueryTable, QueryType: HasQueryType, StartingData>
|
||||
QueryBuilder<Table, QueryType, HasClient, QueryWasBuilt, StartingData>
|
||||
QueryBuilder<Table, QueryType, HasClient, QueryWasBuilt, StartingData>
|
||||
{
|
||||
pub async fn run(self) -> Result<QueryResultType<Table>> {
|
||||
trace!("run query: {}", self.query);
|
||||
@@ -435,6 +637,7 @@ QueryBuilder<Table, QueryType, HasClient, QueryWasBuilt, StartingData>
|
||||
..Default::default()
|
||||
};
|
||||
let client = self.client.0;
|
||||
debug!("query_request: {:?}", query_request);
|
||||
let (_, query_response) = run_query_with_client(&client, query_request).await?;
|
||||
// if let Some(errors) = query_response.errors {
|
||||
// return Err(BigqueryError::new("Query returned errors", Some(errors)).into());
|
||||
@@ -460,7 +663,10 @@ QueryBuilder<Table, QueryType, HasClient, QueryWasBuilt, StartingData>
|
||||
Ok(QueryResultType::WithRowData(result))
|
||||
}
|
||||
}
|
||||
//endregion
|
||||
//endregion
|
||||
|
||||
//region extra helper functions
|
||||
async fn run_query_with_client(
|
||||
client: &BigqueryClient,
|
||||
request: QueryRequest,
|
||||
@@ -480,119 +686,4 @@ async fn run_query_with_client(
|
||||
Ok((response, query_response))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde_json::Value;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct TestTable {
|
||||
client: BigqueryClient,
|
||||
row_id: i64,
|
||||
info1: Option<String>,
|
||||
info3: Option<String>,
|
||||
info4i: Option<i32>,
|
||||
info4b: Option<bool>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl BigQueryTableBase for TestTable {
|
||||
fn get_all_params(&self) -> Result<Vec<QueryParameter>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_parameter_from_field(&self, field_name: &str) -> Result<QueryParameter> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_table_name() -> String {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_client(&self) -> &BigqueryClient {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn set_client(&mut self, client: BigqueryClient) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_pk_field_name() -> String {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_pk_db_name() -> String {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_pk_value(&self) -> &(dyn BigDataValueType + Send + Sync) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_query_fields(include_pk: bool) -> HashMap<String, String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn reload(&mut self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn set_field_value(&mut self, field_name: &str, value: &Value) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_field_value(&self, field_name: &str) -> Result<Value> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn new_from_query_result_row(
|
||||
client: BigqueryClient,
|
||||
row: &HashMap<String, Value>,
|
||||
) -> Result<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn insert(&mut self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn update(&mut self) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl TestTable {
|
||||
fn select() -> QueryBuilder<Self, QueryTypeSelect, NoClient, QueryWasNotBuilt, NoStartingData>
|
||||
{
|
||||
QueryBuilder::<Self, QueryTypeNoType, NoClient, QueryWasNotBuilt, NoStartingData>::select()
|
||||
}
|
||||
fn insert() -> QueryBuilder<Self, QueryTypeInsert, NoClient, QueryWasNotBuilt, HasStartingData<Self>>
|
||||
{
|
||||
QueryBuilder::<Self, QueryTypeNoType, NoClient, QueryWasNotBuilt, HasStartingData<Self>>::insert()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test1() {
|
||||
let client = BigqueryClient::new("test", "", None).await.unwrap();
|
||||
let query_builder = TestTable::select().with_client(client.clone());
|
||||
println!("{:?}", query_builder);
|
||||
let query_builder = query_builder.build_query();
|
||||
|
||||
println!("query: {:?}", query_builder);
|
||||
let query_builder = TestTable::insert();
|
||||
println!("{:?}", query_builder);
|
||||
let query_builder = query_builder.with_client(client);
|
||||
let query_builder = query_builder
|
||||
.build_query()
|
||||
.expect("build of insert query failed");
|
||||
let result = query_builder.clone().run().await;
|
||||
println!("query: {:?}", query_builder);
|
||||
}
|
||||
}
|
||||
//endregion
|
||||
|
||||
122
tests/tests.rs
122
tests/tests.rs
@@ -2,8 +2,8 @@ use log::{debug, info, LevelFilter};
|
||||
use nameof::name_of;
|
||||
|
||||
use google_bigquery_v2::data::query_builder::{
|
||||
HasStartingData, NoClient, NoStartingData, QueryBuilder, QueryTypeInsert, QueryTypeNoType,
|
||||
QueryTypeSelect, QueryWasNotBuilt,
|
||||
HasStartingData, NoClient, NoStartingData, QueryBuilder, QueryResultType, QueryTypeInsert,
|
||||
QueryTypeNoType, QueryTypeSelect, QueryTypeUpdate, QueryWasNotBuilt,
|
||||
};
|
||||
use google_bigquery_v2::prelude::*;
|
||||
|
||||
@@ -15,41 +15,32 @@ pub struct DbInfos {
|
||||
#[primary_key]
|
||||
#[db_name("Id")]
|
||||
row_id: i64,
|
||||
info1: Option::<String>,
|
||||
info1: Option<String>,
|
||||
#[db_name("info")]
|
||||
info2: Option::<String>,
|
||||
info3: Option::<String>,
|
||||
info4i: Option::<i32>,
|
||||
info2: Option<String>,
|
||||
info3: Option<String>,
|
||||
info4i: Option<i32>,
|
||||
#[db_name("yes")]
|
||||
info4b: Option::<bool>,
|
||||
info4b: Option<bool>,
|
||||
}
|
||||
pub struct DbInfos2{
|
||||
|
||||
pub struct DbInfos2 {
|
||||
client: BigqueryClient,
|
||||
row_id: i64,
|
||||
info1: Option::<String>,
|
||||
info1: Option<String>,
|
||||
info2: Option<String>,
|
||||
info3: Option<String>,
|
||||
info4i: Option<i32>,
|
||||
info4b: Option<bool>,
|
||||
}
|
||||
|
||||
//TODO: outsource this impl into the derive
|
||||
impl DbInfos {
|
||||
fn select() -> QueryBuilder<Self, QueryTypeSelect, NoClient, QueryWasNotBuilt, NoStartingData> {
|
||||
QueryBuilder::<Self, QueryTypeNoType, NoClient, QueryWasNotBuilt, NoStartingData>::select()
|
||||
}
|
||||
fn insert() -> QueryBuilder<Self, QueryTypeInsert, NoClient, QueryWasNotBuilt, NoStartingData> {
|
||||
QueryBuilder::<Self, QueryTypeNoType, NoClient, QueryWasNotBuilt, NoStartingData>::insert()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test1() {
|
||||
init_logger();
|
||||
let client = get_test_client().await;
|
||||
let query_builder = DbInfos::select().with_client(client.clone());
|
||||
debug!("{:?}", query_builder);
|
||||
let query_builder = query_builder.build_query();
|
||||
let query_builder = query_builder.build_query().unwrap();
|
||||
|
||||
debug!("query: {:?}", query_builder);
|
||||
let result = query_builder.clone().run().await;
|
||||
@@ -75,6 +66,28 @@ async fn test1() {
|
||||
println!("result: {:?}", result);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save() {
|
||||
init_logger();
|
||||
let client = get_test_client().await;
|
||||
let mut entry = DbInfos::get_by_pk(client.clone(), &123123)
|
||||
.await
|
||||
.expect("get_by_pk failed");
|
||||
entry.info1 = Some("test1".to_string());
|
||||
entry.info2 = Some("test2".to_string());
|
||||
entry.info3 = Some("test3".to_string());
|
||||
entry.info4i = Some(1);
|
||||
entry.info4b = Some(true);
|
||||
log::debug!("entry: {:?}", entry);
|
||||
debug!("========================================================================");
|
||||
debug!("starting save");
|
||||
debug!("========================================================================");
|
||||
entry.save().await.expect("save failed");
|
||||
debug!("========================================================================");
|
||||
debug!("save done");
|
||||
debug!("========================================================================");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_table_name() {
|
||||
init_logger();
|
||||
@@ -101,35 +114,43 @@ async fn test_get_query_fields() {
|
||||
async fn test_query_builder_1() {
|
||||
init_logger();
|
||||
let client = get_test_client().await;
|
||||
let query_builder: BigQueryBuilder<DbInfos> = DbInfos::query(client);
|
||||
let query_builder: BigQueryBuilder<DbInfos> = query_builder
|
||||
let query_builder = DbInfos::select().with_client(client);
|
||||
let query_builder = query_builder
|
||||
.add_where_eq::<String>(name_of!(info1 in DbInfos), None)
|
||||
.unwrap()
|
||||
.add_where_eq(name_of!(info3 in DbInfos), Some(&"cc".to_string()))
|
||||
.unwrap()
|
||||
.add_order_by(name_of!(info2 in DbInfos), OrderDirection::Ascending);
|
||||
let query_string = query_builder.clone().build_query_string();
|
||||
let expected_query_string = String::from(
|
||||
"SELECT Id, info, info1, info3, info4i, yes \
|
||||
FROM `testrustproject-372221.test1.Infos` \
|
||||
WHERE info1 is NULL AND info3 = @__PARAM_0 \
|
||||
ORDER BY info ASC LIMIT 1000",
|
||||
);
|
||||
let query_string = query_builder
|
||||
.clone()
|
||||
.build_query()
|
||||
.unwrap()
|
||||
.get_query_string()
|
||||
.to_string();
|
||||
let expected_query_string =
|
||||
"SELECT info1, info, info3, yes, info4i, Id FROM `testrustproject-372221.test1.Infos` WHERE info1 is NULL AND info3 = @__PARAM_0 ORDER BY info ASC".to_string()
|
||||
;
|
||||
log::debug!("query : {}", query_string);
|
||||
log::debug!("expected: {}", expected_query_string);
|
||||
log::debug!("request: {:?}", query_builder.clone().build_query_request());
|
||||
log::debug!("request: {:?}", query_builder.clone().build_query());
|
||||
|
||||
assert_eq!(query_string, expected_query_string);
|
||||
assert_eq!(
|
||||
query_builder
|
||||
.clone()
|
||||
.build_query_request()
|
||||
.query_parameters
|
||||
.unwrap()
|
||||
.len(),
|
||||
1
|
||||
);
|
||||
let res = query_builder.clone().run().await.unwrap();
|
||||
// assert_eq!(
|
||||
// query_builder
|
||||
// .clone()
|
||||
// .build_query_request()
|
||||
// .query_parameters
|
||||
// .unwrap()
|
||||
// .len(),
|
||||
// 1
|
||||
// );
|
||||
let res = query_builder
|
||||
.clone()
|
||||
.build_query()
|
||||
.unwrap()
|
||||
.run()
|
||||
.await
|
||||
.unwrap();
|
||||
log::debug!("res: {:?}", res);
|
||||
}
|
||||
|
||||
@@ -143,11 +164,18 @@ async fn get_test_client() -> BigqueryClient {
|
||||
async fn simple_query() {
|
||||
init_logger();
|
||||
let client = get_test_client().await;
|
||||
let q: Vec<DbInfos> = DbInfos::query(client)
|
||||
let q = DbInfos::select()
|
||||
.with_client(client)
|
||||
.add_order_by(name_of!(row_id in DbInfos), OrderDirection::Descending)
|
||||
.build_query()
|
||||
.unwrap()
|
||||
.run()
|
||||
.await
|
||||
.unwrap();
|
||||
let q = match q {
|
||||
QueryResultType::WithRowData(q) => q,
|
||||
QueryResultType::WithoutRowData(e) => panic!("no data: {:?}", e),
|
||||
};
|
||||
let mut last_num = 999999999999999999;
|
||||
for line in q {
|
||||
info!("line: {:?}", line);
|
||||
@@ -161,13 +189,21 @@ async fn simple_query() {
|
||||
async fn test_select_limit_1() {
|
||||
init_logger();
|
||||
let client = get_test_client().await;
|
||||
let q: Vec<DbInfos> = DbInfos::query(client).set_limit(1).run().await.unwrap();
|
||||
let q: Vec<DbInfos> = DbInfos::select()
|
||||
.with_client(client)
|
||||
.set_limit(1)
|
||||
.build_query()
|
||||
.unwrap()
|
||||
.run()
|
||||
.await
|
||||
.unwrap()
|
||||
.expect_with_data("no data");
|
||||
assert_eq!(q.len(), 1);
|
||||
}
|
||||
|
||||
fn init_logger() {
|
||||
let global_level = LevelFilter::Info;
|
||||
let own_level = LevelFilter::Debug;
|
||||
let own_level = LevelFilter::Trace;
|
||||
let _ = env_logger::builder()
|
||||
.is_test(true)
|
||||
.filter_level(global_level)
|
||||
|
||||
Reference in New Issue
Block a user