first basic implementation of functionality

(mounting, & getting changes)
This commit is contained in:
OMGeeky
2024-04-14 02:09:18 +02:00
parent 76ef182989
commit 3c37a55991
11 changed files with 1287 additions and 77 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
/target
.idea
/auth/

811
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,20 +1,20 @@
[workspace]
members = [
"gdriver-common",
"gdriver-backend",
"gdriver-client",
"gdriver-common",
"gdriver-backend",
"gdriver-client",
]
resolver="2"
resolver = "2"
[workspace.dependencies]
tracing="0.1"
tokio={ version = "1.35", features = ["rt-multi-thread", "tracing", "fs", "macros"] }
serde={ version = "1.0", features = ["serde_derive"] }
tracing = "0.1"
tokio = { version = "1.35", features = ["rt-multi-thread", "tracing", "fs", "macros", "signal"] }
serde = { version = "1.0", features = ["serde_derive"] }
tarpc = { version = "0.34", features = ["full"] }
futures="0.3"
lazy_static="1.4"
chrono="0.4"
futures = "0.3"
lazy_static = "1.4"
chrono = "0.4"
[patch.crates-io]
#tarpc = {path = "../../Documents/git/OMGeeky/tarpc/tarpc/"}
tarpc = {git = "https://github.com/google/tarpc.git"}
tarpc = { git = "https://github.com/google/tarpc.git" }

View File

@@ -1,18 +1,26 @@
use std::collections::HashMap;
use crate::drive::google_drive::GoogleDrive;
use crate::path_resolver::PathResolver;
use chrono::{DateTime, Utc};
use crate::prelude::*;
mod google_drive;
pub struct Drive {
tracked_files: HashMap<DriveId, DateTime<Utc>>,
pub path_resolver: PathResolver,
google_drive: GoogleDrive,
}
impl Drive {
pub fn new() -> Self {
Self {
#[instrument()]
pub async fn new() -> Result<Self> {
Ok(Self {
tracked_files: HashMap::new(),
}
path_resolver: PathResolver::new(),
google_drive: GoogleDrive::new().await?,
})
}
#[instrument(skip(self))]
pub fn get_file_tracking_state(&self, id: &DriveId) -> TrackingState {
let file = self.tracked_files.get(id);
match file {
@@ -20,6 +28,24 @@ impl Drive {
None => TrackingState::Untracked,
}
}
#[instrument(skip(self))]
pub async fn update(&mut self) -> Result<()> {
let changes = self.google_drive.get_changes().await?;
if changes.is_empty() {
info!("No changes");
return Ok(());
}
for change in changes {
dbg!(change);
}
Err("Not implemented".into())
// Ok(()) //TODO: implement this
}
#[instrument(skip(self))]
pub async fn ping(&self) -> Result<()> {
self.google_drive.ping().await
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum TrackingState {

View File

@@ -0,0 +1,164 @@
use crate::prelude::*;
use const_format::formatcp;
use gdriver_common::ipc::gdriver_service::SETTINGS;
use gdriver_common::prelude::*;
use google_drive3::api::{Change, File, Scope, StartPageToken};
use google_drive3::hyper::client::HttpConnector;
use google_drive3::hyper::{Body, Client, Response};
use google_drive3::hyper_rustls::HttpsConnector;
use google_drive3::DriveHub;
use google_drive3::{hyper_rustls, oauth2};
use std::any::type_name;
use std::fmt::{Debug, Display, Formatter};
use tokio::fs;
const FIELDS_FILE: &'static str = "id, name, size, mimeType, kind, md5Checksum, parents, trashed, createdTime, modifiedTime, viewedByMeTime";
const FIELDS_CHANGE: &str = formatcp!("changes(removed, fileId, changeType, file({FIELDS_FILE}))");
#[derive(Clone)]
pub struct GoogleDrive {
hub: DriveHub<HttpsConnector<HttpConnector>>,
changes_start_page_token: Option<String>,
}
impl GoogleDrive {
#[instrument]
pub(crate) async fn new() -> Result<Self> {
trace!("Initializing GoogleDrive client.");
let auth = oauth2::read_application_secret("auth/client_secret.json").await?;
let auth = oauth2::InstalledFlowAuthenticator::builder(
auth,
oauth2::InstalledFlowReturnMethod::HTTPRedirect,
)
.persist_tokens_to_disk("auth/tokens.json")
.build()
.await?;
let http_client = Client::builder().build(
hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
// .enable_http2()
.build(),
);
let hub = DriveHub::new(http_client, auth);
let drive = GoogleDrive {
hub,
changes_start_page_token: None,
};
trace!("Successfully initialized {}", drive);
Ok(drive)
}
#[instrument]
pub(crate) async fn ping(&self) -> Result<()> {
let (response, body) = self
.hub
.about()
.get()
.param("fields", "user(emailAddress)")
.add_scope(Scope::Readonly)
.doit()
.await?;
let status_code = response.status();
let email = body
.user
.unwrap_or_default()
.email_address
.unwrap_or_default();
trace!("response status: {}, email: '{}'", status_code, email);
if status_code.is_success() {
return Ok(());
}
error!(
"Did not get expected result on ping: {} {:?}",
status_code, response
);
Err("Did not get expected result on ping".into())
}
//region changes
#[instrument]
pub async fn get_changes(&mut self) -> Result<Vec<Change>> {
let mut page_token = Some(self.change_start_token().await?);
let mut changes = Vec::new();
while let Some(current_page_token) = page_token {
info!("Getting changes with page token: {}", current_page_token);
let (response, body) = self
.hub
.changes()
.list(current_page_token.as_str())
.param("fields", FIELDS_CHANGE)
.page_size(2) //TODO: Change this to a more reasonable value
.include_corpus_removals(true) //TODO3: Check if this is useful
.supports_all_drives(false)
.restrict_to_my_drive(true)
.include_removed(true)
.include_items_from_all_drives(false)
.doit()
.await?;
self.changes_start_page_token = body.new_start_page_token;
if response.status().is_success() {
changes.extend(body.changes.unwrap_or_default());
page_token = body.next_page_token;
} else {
error!("Could not get changes: {:?}", response);
return Err("Could not get changes".into());
}
}
trace!("Got {} changes", changes.len());
Ok(changes)
}
async fn change_start_token(&mut self) -> Result<String> {
Ok(match &self.changes_start_page_token {
None => {
//
let token = fs::read_to_string(SETTINGS.get_changes_file_path())
.await
.unwrap_or_default();
self.changes_start_page_token = if !token.is_empty() {
Some(token)
} else {
Some(self.get_changes_start_token_from_api().await?)
};
self.changes_start_page_token
.clone()
.expect("We just set it")
}
Some(start_token) => start_token.clone(),
})
}
async fn get_changes_start_token_from_api(&self) -> Result<String> {
let (response, body) = self
.hub
.changes()
.get_start_page_token()
.supports_all_drives(false)
.doit()
.await?;
if response.status().is_success() {
let start_page_token = body.start_page_token.unwrap_or_default();
Ok(start_page_token)
} else {
Err("Could not get start page token".into())
}
}
//endregion
}
//region debug & display traits
impl Debug for GoogleDrive {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct(type_name::<GoogleDrive>())
.field("changes_start_page_token", &self.changes_start_page_token)
.finish()
}
}
impl Display for GoogleDrive {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", type_name::<GoogleDrive>())
}
}
//endregion

View File

@@ -8,6 +8,7 @@ use tarpc::{
};
mod drive;
mod path_resolver;
mod prelude;
mod sample;
mod service;

View File

@@ -0,0 +1,61 @@
use crate::drive::Drive;
use crate::prelude::*;
use gdriver_common::ipc::gdriver_service::ReadDirResult;
use gdriver_common::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
#[derive(Eq, PartialEq, Debug, Clone)]
pub struct PathResolver {
parents: HashMap<DriveId, Vec<DriveId>>,
children: HashMap<DriveId, Vec<ReadDirResult>>,
}
impl PathResolver {
pub fn new() -> Self {
Self {
parents: HashMap::new(),
children: HashMap::new(),
}
}
pub async fn get_id_from_path(&mut self, path: &Path, drive: &Drive) -> Result<DriveId> {
let segments: Vec<_> = path
.to_str()
.ok_or(PathResolveError::InvalidPath)?
.split('/')
.collect();
let mut current = ROOT_ID.clone();
self.update_from_drive(drive).await?;
for segment in segments {
current = self
.get_id_from_parent_and_name(segment, &current)
.ok_or("path-segment not found")?;
}
return Ok(current);
}
pub fn get_id_from_parent_and_name(&self, name: &str, parent: &DriveId) -> Option<DriveId> {
if let Some(children) = self.children.get(parent) {
if let Some(x) = children.into_iter().find(|x| x.name.eq(name)) {
return Some(x.id.clone());
}
}
None
}
async fn update_from_drive(&mut self, drive: &Drive) -> Result<()> {
todo!()
}
pub(crate) fn add_relationship(&mut self, parent: DriveId, entry: ReadDirResult) {
todo!()
}
pub(crate) fn remove_relationship(&mut self, parent: DriveId, entry: ReadDirResult) {
todo!()
}
}
#[derive(Debug, Serialize, Deserialize, thiserror::Error)]
pub enum PathResolveError {
#[error("The path provided was invalid")]
InvalidPath,
}

View File

@@ -20,8 +20,21 @@ impl GDriverService for GdriverServer {
// todo!()
// }
async fn get_file_by_name(self, context: Context, name: OsString, parent: DriveId) -> StdResult<DriveId, GetFileByPathError> {
todo!()
async fn get_file_by_name(
self,
context: Context,
name: OsString,
parent: DriveId,
) -> StdResult<DriveId, GetFileByPathError> {
let mut drive_lock = self.drive.lock().await;
let x = drive_lock.path_resolver.get_id_from_parent_and_name(
name.to_str().ok_or(GetFileByPathError::InvalidName)?,
&parent,
);
match x {
None => Err(GetFileByPathError::NotFound),
Some(id) => Ok(id),
}
}
async fn get_file_by_path(
@@ -29,7 +42,7 @@ impl GDriverService for GdriverServer {
context: Context,
path: PathBuf,
) -> StdResult<DriveId, GetFileByPathError> {
todo!()
Err(GetFileByPathError::Other)
}
async fn write_local_change(
@@ -37,7 +50,7 @@ impl GDriverService for GdriverServer {
context: Context,
id: DriveId,
) -> StdResult<(), WriteLocalChangeError> {
todo!()
Err(WriteLocalChangeError::Other)
}
async fn get_metadata_for_file(
@@ -45,7 +58,7 @@ impl GDriverService for GdriverServer {
context: Context,
id: DriveId,
) -> StdResult<(), GetMetadataError> {
todo!()
Err(GetMetadataError::Other)
}
async fn download_content_for_file(
@@ -53,7 +66,7 @@ impl GDriverService for GdriverServer {
context: Context,
id: DriveId,
) -> StdResult<(), GetContentError> {
todo!()
Err(GetContentError::Other)
}
async fn list_files_in_directory(
@@ -78,7 +91,7 @@ impl GDriverService for GdriverServer {
context: Context,
id: DriveId,
) -> StdResult<(), MarkFileAsDeletedError> {
todo!()
Err(MarkFileAsDeletedError::Other)
}
async fn mark_file_for_keeping_local(
@@ -86,7 +99,7 @@ impl GDriverService for GdriverServer {
context: Context,
id: DriveId,
) -> StdResult<(), MarkFileForKeepingLocalError> {
todo!()
Err(MarkFileForKeepingLocalError::Other)
}
async fn unmark_file_for_keeping_local(
@@ -94,7 +107,7 @@ impl GDriverService for GdriverServer {
context: Context,
id: DriveId,
) -> StdResult<(), UnmarkFileForKeepingLocalError> {
todo!()
Err(UnmarkFileForKeepingLocalError::Other)
}
#[doc = " Returns true if the file was had remote changes and was updadet"]
@@ -103,7 +116,7 @@ impl GDriverService for GdriverServer {
context: Context,
id: DriveId,
) -> StdResult<bool, UpdateChangesError> {
todo!()
Err(UpdateChangesError::Other)
}
async fn update_changes(self, context: Context) -> StdResult<(), UpdateChangesError> {
@@ -174,7 +187,16 @@ pub async fn start() -> Result<()> {
let config = &CONFIGURATION;
info!("Config: {:?}", **config);
let drive = Drive::new();
let drive = Drive::new().await?;
match drive.ping().await {
Ok(_) => {
info!("Can reach google drive api.");
}
Err(e) => {
error!("Cannot reach google drive api.");
return Err(e);
}
}
let m = Arc::new(Mutex::new(drive));
let server_addr = (config.ip, config.port);

View File

@@ -1,24 +1,22 @@
use crate::filesystem::attributes::{read_inode_attributes_from_meta_file, ConvertFileType};
use crate::filesystem::errors::FilesystemError;
use crate::prelude::macros::*;
use crate::prelude::*;
use anyhow::anyhow;
use bimap::BiMap;
use fuser::{KernelConfig, ReplyAttr, ReplyDirectory, ReplyEntry, Request};
use gdriver_common::drive_structure::drive_id::DriveId;
use gdriver_common::drive_structure::drive_id::ROOT_ID;
use gdriver_common::ipc::gdriver_service::errors::GDriverServiceError;
use gdriver_common::ipc::gdriver_service::GDriverServiceClient;
use gdriver_common::ipc::gdriver_service::SETTINGS;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::os::raw::c_int;
use std::time::Duration;
use anyhow::anyhow;
use bimap::BiMap;
use fuser::{KernelConfig, ReplyEntry, Request};
use tracing::*;
use gdriver_common::drive_structure::drive_id::{DriveId, ROOT_ID};
use gdriver_common::drive_structure::meta::read_metadata_file;
use gdriver_common::ipc::gdriver_service::{
errors::GDriverServiceError, GDriverServiceClient, GDriverSettings,
};
use crate::filesystem::attributes::read_inode_attributes_from_meta_file;
use crate::filesystem::errors::FilesystemError;
use crate::prelude::macros::*;
use crate::prelude::*;
use tarpc::context::current as current_context;
use tokio::sync::mpsc::Receiver;
mod macros;
@@ -32,26 +30,33 @@ struct FileIdentifier {
parent: Inode,
name: OsString,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, Hash, Ord, PartialOrd, Eq, PartialEq)]
pub enum ShutdownRequest {
Gracefully,
Force,
}
pub struct Filesystem {
gdriver_client: GDriverServiceClient,
entry_ids: BiMap<Inode, DriveId>,
ino_to_file_handles: HashMap<Inode, Vec<u64>>,
next_ino: u64,
gdriver_settings: GDriverSettings,
entry_name_parent_to_ino: BiMap<FileIdentifier, Inode>,
shutdown_signal_receiver: Receiver<ShutdownRequest>,
}
impl Filesystem {
pub fn new(gdriver_client: GDriverServiceClient) -> Self {
pub fn new(
gdriver_client: GDriverServiceClient,
shutdown_signal_receiver: Receiver<ShutdownRequest>,
) -> Self {
Self {
gdriver_client,
entry_ids: BiMap::new(),
ino_to_file_handles: HashMap::new(),
next_ino: 222,
gdriver_settings: GDriverSettings::default(),
entry_name_parent_to_ino: BiMap::new(),
shutdown_signal_receiver,
}
}
fn generate_ino(&mut self) -> Inode {
@@ -93,23 +98,26 @@ mod attributes;
impl fuser::Filesystem for Filesystem {
//region init
#[instrument(skip(self, _req, _config))]
fn init(&mut self, _req: &Request<'_>, _config: &mut KernelConfig) -> StdResult<(), c_int> {
self.entry_ids.insert(1, ROOT_ID.clone());
self.gdriver_settings = send_request!(self.gdriver_client.get_settings(current_context()))
send_request!(self.gdriver_client.update_changes(current_context()))
.map_err(|e| {
error!("Got a connection error while fetching settings: {e}");
libc::ECONNREFUSED
error!("Got a connection error while updating changes for on init. ");
dbg!(e);
libc::ECANCELED
})?
.map_err(|e| {
error!("Got an error while fetching settings: {e}");
trace!("details: {e:?}");
libc::EBADMSG
error!("Error while updating changes on init");
dbg!(e);
libc::EIO
})?;
Ok(())
}
//endregion
//region lookup
#[instrument(skip(self, _req, reply))]
fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
let metadata = utils::lookup::lookup(self, parent, name.to_os_string());
match metadata {
@@ -133,6 +141,67 @@ impl fuser::Filesystem for Filesystem {
}
}
//endregion
#[instrument(skip(self, _req, reply))]
fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) {
let id = self.get_id_from_ino(ino);
info!("Reading dir: {id:?}/{ino}");
match id {
None => {}
Some(id) => {
let result = utils::get_attributes(self, id, ino);
match result {
Ok(attr) => {
reply.attr(&TTL, &attr.into());
}
Err(e) => {
error!("Got an error during readdir: {}", e);
dbg!(e);
reply.error(libc::EIO);
}
}
}
}
}
#[instrument(skip(self, _req, reply))]
fn readdir(
&mut self,
_req: &Request<'_>,
ino: u64,
_fh: u64,
offset: i64,
mut reply: ReplyDirectory,
) {
let id = self.get_id_from_ino(ino);
info!("Reading dir: {id:?}/{ino}");
match id {
None => {}
Some(id) => {
let result = utils::readdir::readdir(self, id.clone(), offset as u64);
match result {
Ok(entries) => {
let mut counter = 0;
for entry in entries {
let ino = self.get_ino_from_id(entry.id);
counter += 1;
let buffer_full =
reply.add(ino, counter, entry.kind.into_ft(), entry.name);
if buffer_full {
debug!("Buffer full after {counter}");
break;
}
}
debug!("sending ok");
reply.ok();
}
Err(e) => {
error!("Got an error during readdir: {}", e);
dbg!(e);
reply.error(libc::EIO);
}
}
}
}
}
}
mod errors {
use gdriver_common::ipc::gdriver_service::errors::GDriverServiceError;
@@ -155,6 +224,7 @@ mod errors {
}
mod utils {
use super::*;
use crate::filesystem::attributes::InodeAttributes;
pub mod lookup {
use super::*;
use crate::filesystem::attributes::InodeAttributes;
@@ -205,16 +275,39 @@ mod utils {
.clone();
}
}
let open_file_handles =
fs.ino_to_file_handles.get(&ino).map(Vec::len).unwrap_or(0) as u64;
send_request!(fs
.gdriver_client
.get_metadata_for_file(current_context(), id.clone()))?
get_attributes(fs, &id, ino)
}
}
pub(crate) fn get_attributes(
fs: &Filesystem,
id: &DriveId,
ino: Inode,
) -> StdResult<InodeAttributes, FilesystemError> {
let open_file_handles = fs.ino_to_file_handles.get(&ino).map(Vec::len).unwrap_or(0) as u64;
send_request!(fs
.gdriver_client
.get_metadata_for_file(current_context(), id.clone()))?
.map_err(GDriverServiceError::from)?;
let meta_path = SETTINGS.get_metadata_file_path(&id);
let metadata = read_inode_attributes_from_meta_file(&meta_path, ino, open_file_handles)
.map_err(FilesystemError::IO)?;
Ok(metadata)
}
pub mod readdir {
use super::*;
pub fn readdir(
fs: &mut Filesystem,
id: DriveId,
offset: u64,
) -> StdResult<Vec<gdriver_common::ipc::gdriver_service::ReadDirResult>, FilesystemError>
{
let res = send_request!(fs.gdriver_client.list_files_in_directory_with_offset(
current_context(),
id,
offset as u64
))?
.map_err(GDriverServiceError::from)?;
let meta_path = fs.gdriver_settings.get_metadata_file_path(&id);
let metadata = read_inode_attributes_from_meta_file(&meta_path, ino, open_file_handles)
.map_err(FilesystemError::IO)?;
Ok(metadata)
Ok(res)
}
}
}

View File

@@ -1,13 +1,27 @@
use fuser::{MountOption, Session, SessionUnmounter};
use std::{error::Error, net::IpAddr, result::Result as StdResult};
use tokio::sync::mpsc::{channel, Sender};
use crate::filesystem::{Filesystem, ShutdownRequest};
use gdriver_common::{ipc::sample::*, prelude::*};
use tarpc::{client, tokio_serde::formats::Json};
use tokio::task::JoinHandle;
type Result<T> = StdResult<T, Box<dyn Error>>;
#[tokio::main]
async fn main() -> Result<()> {
service::start().await?;
gdriver_common::tracing_setup::init_tracing();
// service::start().await?;
let mount_options = &[MountOption::RW];
let (tx, rx) = channel(1);
let f = Filesystem::new(
service::create_client(CONFIGURATION.ip, CONFIGURATION.port).await?,
rx,
);
mount(f, &"/var/tmp/gdriver2_mount", mount_options, tx)
.await?
.await?;
Ok(())
}
pub mod prelude;
@@ -15,3 +29,40 @@ mod sample;
mod filesystem;
mod service;
async fn mount(
fs: Filesystem,
mountpoint: &str,
options: &[MountOption],
sender: Sender<ShutdownRequest>,
) -> Result<JoinHandle<()>> {
let mut session = Session::new(fs, mountpoint.as_ref(), options)?;
let session_ender = session.unmount_callable();
let end_program_signal_handle = tokio::spawn(async move {
let _ = end_program_signal_awaiter(sender, session_ender).await;
});
debug!("Mounting fuse filesystem");
let _ = session.run();
debug!("Stopped with mounting");
// Ok(session_ender)
Ok(end_program_signal_handle)
}
async fn end_program_signal_awaiter(
sender: Sender<ShutdownRequest>,
mut session_unmounter: SessionUnmounter,
) -> Result<()> {
info!("Waiting for Ctrl-C");
println!("Waiting for Ctrl-C");
tokio::signal::ctrl_c()
.await
.expect("failed to listen for ctrl_c event");
println!(); //to not have ^C on the same line as the next log if it is directly in a console
info!("got signal to end program");
sender.send(ShutdownRequest::Gracefully).await?;
info!("sent stop command to file uploader");
info!("unmounting...");
session_unmounter.unmount()?;
info!("unmounted");
Ok(())
}

View File

@@ -7,14 +7,16 @@ edition = "2021"
[dependencies]
serde.workspace = true
tracing.workspace = true
tarpc.workspace = true
tokio.workspace = true
futures.workspace = true
lazy_static.workspace = true
confique={ version = "0.2" }
confique = { version = "0.2" }
thiserror = "1.0"
anyhow = "1.0.79"
anyhow = "1.0"
directories = "5.0"
serde_json = "1.0.111"
serde_json = "1.0"
tracing-subscriber = "0.3"
#[patch.crates-io]
#confique = {path="~/Documents/git/OMGeeky/confique "}