From 8f6c15acc6de581ee25ac24072b0a097eadc95e6 Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Sun, 21 May 2023 12:13:38 +0200 Subject: [PATCH] rustfmt --- src/async_helper.rs | 6 +- src/common.rs | 6 +- src/config/common_file_filter.rs | 7 +- src/config/mod.rs | 2 +- src/fs/drive/change.rs | 24 ++- src/fs/drive/file_uploader.rs | 97 ++++++--- src/fs/drive/filesystem.rs | 330 +++++++++++++++++++++---------- src/fs/drive/mod.rs | 4 +- src/fs/drive/settings.rs | 9 +- src/google_drive/drive.rs | 96 +++++---- src/google_drive/drive_id.rs | 5 +- src/google_drive/mod.rs | 1 - src/lib.rs | 47 +++-- src/main.rs | 5 +- src/prelude.rs | 2 +- 15 files changed, 421 insertions(+), 220 deletions(-) diff --git a/src/async_helper.rs b/src/async_helper.rs index f56d798..93d0378 100644 --- a/src/async_helper.rs +++ b/src/async_helper.rs @@ -8,8 +8,10 @@ use tracing::trace; /// This function will block the current thread until the provided future has run to completion. /// /// # Be careful with deadlocks -pub fn run_async_blocking(f: impl std::future::Future + Sized) -> T - where T: Debug { +pub fn run_async_blocking(f: impl std::future::Future + Sized) -> T +where + T: Debug, +{ trace!("run_async"); let handle = Handle::current(); let _enter_guard = handle.enter(); diff --git a/src/common.rs b/src/common.rs index d0d88b1..66c72ea 100644 --- a/src/common.rs +++ b/src/common.rs @@ -36,9 +36,9 @@ impl From<&OsString> for LocalPath { } impl AsRef for LocalPath - where - T: ?Sized, - ::Target: AsRef, +where + T: ?Sized, + ::Target: AsRef, { fn as_ref(&self) -> &T { self.0.deref().as_ref() diff --git a/src/config/common_file_filter.rs b/src/config/common_file_filter.rs index 2df68de..806e3ae 100644 --- a/src/config/common_file_filter.rs +++ b/src/config/common_file_filter.rs @@ -12,11 +12,8 @@ pub struct CommonFileFilter { impl CommonFileFilter { pub fn from_path(path: impl Into) -> Result { let path = path.into(); - let ignores = GitignoreBuilder::new(&path) - .build()?; - let s = Self { - filter: ignores, - }; + let ignores = GitignoreBuilder::new(&path).build()?; + let s = Self { filter: ignores }; Ok(s) } pub fn is_filter_matched(&self, path: &Path) -> Result { diff --git a/src/config/mod.rs b/src/config/mod.rs index fc2ab8e..e579f6b 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1 +1 @@ -pub mod common_file_filter; \ No newline at end of file +pub mod common_file_filter; diff --git a/src/fs/drive/change.rs b/src/fs/drive/change.rs index 8985e48..82501fa 100644 --- a/src/fs/drive/change.rs +++ b/src/fs/drive/change.rs @@ -14,14 +14,23 @@ pub enum ChangeType { } impl ChangeType { - fn from_drive_change(change_type: Option, file: Option, drive: Option, removed: bool) -> anyhow::Result { + fn from_drive_change( + change_type: Option, + file: Option, + drive: Option, + removed: bool, + ) -> anyhow::Result { if removed { return Ok(Self::Removed); } if let Some(change_type) = change_type { match change_type.as_str() { - "drive" => Ok(Self::Drive(drive.context("no drive but change type was drive")?)), - "file" => Ok(Self::File(file.context("no file but change type was file")?)), + "drive" => Ok(Self::Drive( + drive.context("no drive but change type was drive")?, + )), + "file" => Ok(Self::File( + file.context("no file but change type was file")?, + )), _ => Err(anyhow!("invalid change type: {}", change_type)), } } else { @@ -50,9 +59,14 @@ impl TryFrom for Change { } Ok(Self { drive_id: DriveId::from(drive_id?), - kind: ChangeType::from_drive_change(drive_change.change_type, drive_change.file, drive_change.drive, removed)?, + kind: ChangeType::from_drive_change( + drive_change.change_type, + drive_change.file, + drive_change.drive, + removed, + )?, time: drive_change.time.context("time is missing")?, removed, }) } -} \ No newline at end of file +} diff --git a/src/fs/drive/file_uploader.rs b/src/fs/drive/file_uploader.rs index 2430c68..b4b9fd2 100644 --- a/src/fs/drive/file_uploader.rs +++ b/src/fs/drive/file_uploader.rs @@ -40,7 +40,6 @@ pub enum FileUploaderCommand { CreateFolder(FileCommand), CreateFile(FileCommand), Stop, - } #[derive(Debug)] @@ -60,10 +59,12 @@ pub struct DriveFileUploader { impl<'a> DriveFileUploader { #[instrument] - pub fn new(drive: GoogleDrive, - upload_filter: CommonFileFilter, - receiver: Receiver, - wait_time_before_upload: Duration) -> Self { + pub fn new( + drive: GoogleDrive, + upload_filter: CommonFileFilter, + receiver: Receiver, + wait_time_before_upload: Duration, + ) -> Self { Self { drive, upload_filter, @@ -89,23 +90,39 @@ impl<'a> DriveFileUploader { let file_metadata = file_command.file_metadata; if !self.upload_filter.is_filter_matched(&path).unwrap_or(false) { let drive = self.drive.clone(); - let drive_id = file_metadata.drive_id.clone().with_context(|| "no drive_id"); + let drive_id = file_metadata + .drive_id + .clone() + .with_context(|| "no drive_id"); if let Err(e) = drive_id { error!("failed to upload file: {:?} with error: {}", path, e); continue; } let drive_id = drive_id.unwrap(); - self.cancel_and_wait_for_running_upload_for_id(&drive_id).await; + self.cancel_and_wait_for_running_upload_for_id(&drive_id) + .await; info!("queuing upload of file: {:?}", path); let wait_time_before_upload = self.wait_time_before_upload.clone(); let (rx, rc) = channel(1); - let upload_handle = tokio::spawn(async move { Self::upload_file(drive, file_metadata, path, wait_time_before_upload, rc).await }); - self.running_uploads.insert(drive_id, RunningUpload { - join_handle: upload_handle, - stop_sender: rx, + let upload_handle = tokio::spawn(async move { + Self::upload_file( + drive, + file_metadata, + path, + wait_time_before_upload, + rc, + ) + .await }); + self.running_uploads.insert( + drive_id, + RunningUpload { + join_handle: upload_handle, + stop_sender: rx, + }, + ); } else { info!("skipping upload of file since it is ignored: {:?}", path); } @@ -114,11 +131,15 @@ impl<'a> DriveFileUploader { info!("received stop command: stopping file upload listener"); break; } - _ => { warn!("received unknown command: {:?}", command); } + _ => { + warn!("received unknown command: {:?}", command); + } }; } else { - warn!("received None command, meaning all senders have been dropped. \ - stopping file upload listener since no more commands will be received"); + warn!( + "received None command, meaning all senders have been dropped. \ + stopping file upload listener since no more commands will be received" + ); break; } } @@ -132,29 +153,44 @@ impl<'a> DriveFileUploader { debug!("checking for running uploads for file: {:?}", drive_id); let running_uploads: Option<&mut RunningUpload> = self.running_uploads.get_mut(drive_id); if let Some(running_upload) = running_uploads { - debug!("trying to send stop command to running upload for file: {:?}", drive_id); + debug!( + "trying to send stop command to running upload for file: {:?}", + drive_id + ); let send_stop = running_upload.stop_sender.send(()).await; if let Err(e) = send_stop { - error!("failed to send stop command to running upload for file: {:?} with error: {}", drive_id, e); + error!( + "failed to send stop command to running upload for file: {:?} with error: {}", + drive_id, e + ); } debug!("waiting for running upload for file: {:?}", drive_id); let x: &mut JoinHandle> = &mut running_upload.join_handle; let _join_res = tokio::join!(x); - debug!("finished waiting for running upload for file: {:?} ", drive_id); + debug!( + "finished waiting for running upload for file: {:?} ", + drive_id + ); debug!("removing running upload for file: {:?}", drive_id); self.running_uploads.remove(drive_id); } } #[instrument(skip(file_metadata, rc), fields(drive = % drive))] - async fn upload_file(drive: GoogleDrive, - file_metadata: File, - local_path: PathBuf, - wait_time_before_upload: Duration, - rc: Receiver<()>) -> anyhow::Result<()> { + async fn upload_file( + drive: GoogleDrive, + file_metadata: File, + local_path: PathBuf, + wait_time_before_upload: Duration, + rc: Receiver<()>, + ) -> anyhow::Result<()> { // debug!("uploading file: {:?}", local_path); - debug!("sleeping for {:?} before uploading {}", wait_time_before_upload, local_path.display()); + debug!( + "sleeping for {:?} before uploading {}", + wait_time_before_upload, + local_path.display() + ); tokio::select! { _ = Self::wait_for_cancel_signal(rc) => { debug!("received stop signal: stopping upload"); @@ -182,17 +218,24 @@ impl<'a> DriveFileUploader { Some(_v) => { debug!("received stop signal: stopping upload"); } - _ => { warn!("received None from cancel signal receiver") } + _ => { + warn!("received None from cancel signal receiver") + } } } - async fn upload_file_(drive: &GoogleDrive, file_metadata: File, local_path: &PathBuf) -> anyhow::Result<()> { + async fn upload_file_( + drive: &GoogleDrive, + file_metadata: File, + local_path: &PathBuf, + ) -> anyhow::Result<()> { debug!("uploading file: {:?}", local_path); let path = local_path.as_path(); - drive.upload_file_content_from_path(file_metadata, path).await?; + drive + .upload_file_content_from_path(file_metadata, path) + .await?; // let result = drive.list_files(DriveId::from("root")).await.with_context(|| format!("could not do it"))?; debug!("upload_file_: done"); Ok(()) } } - diff --git a/src/fs/drive/filesystem.rs b/src/fs/drive/filesystem.rs index b8d00b0..1eda846 100644 --- a/src/fs/drive/filesystem.rs +++ b/src/fs/drive/filesystem.rs @@ -1,3 +1,5 @@ +use std::fmt::{Debug, Formatter}; +use std::io::{stdout, Seek, SeekFrom, Write}; use std::{ collections::HashMap, ffi::{OsStr, OsString}, @@ -7,33 +9,27 @@ use std::{ path::{Path, PathBuf}, time::{SystemTime, UNIX_EPOCH}, }; -use std::fmt::{Debug, Formatter}; -use std::io::{Seek, SeekFrom, stdout, Write}; use anyhow::{anyhow, Context}; use bimap::BiMap; use drive3::api::{File, StartPageToken}; use fuser::{ - FileAttr, - Filesystem, - FileType, - FUSE_ROOT_ID, - KernelConfig, - ReplyAttr, - ReplyData, - ReplyDirectory, - ReplyEmpty, - ReplyEntry, - ReplyWrite, - Request, - TimeOrNow, + FileAttr, FileType, Filesystem, KernelConfig, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, + ReplyEntry, ReplyOpen, ReplyWrite, Request, TimeOrNow, FUSE_ROOT_ID, }; use libc::c_int; -use tracing::{debug, error, instrument, warn}; use tracing::field::debug; +use tracing::{debug, error, instrument, warn}; -use crate::{async_helper::run_async_blocking, common::LocalPath, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, prelude::*}; use crate::fs::drive::{Change, ChangeType, FileCommand, FileUploaderCommand, SyncSettings}; +use crate::{ + async_helper::run_async_blocking, + common::LocalPath, + fs::drive::DriveEntry, + fs::inode::Inode, + google_drive::{DriveId, GoogleDrive}, + prelude::*, +}; #[derive(Debug)] enum ChecksumMatch { @@ -100,13 +96,15 @@ pub struct DriveFilesystem { impl Display for DriveFilesystem { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "DriveFilesystem {{ entries: {} }}", - // write!(f, "DriveFilesystem {{ entries: {}, gen: {}, settings: {} }}", - // self.root.display(), - // self.cache_dir.as_ref().map(|dir| dir.path().display()), - self.entries.len(), - // self.generation, - // self.settings, + write!( + f, + "DriveFilesystem {{ entries: {} }}", + // write!(f, "DriveFilesystem {{ entries: {}, gen: {}, settings: {} }}", + // self.root.display(), + // self.cache_dir.as_ref().map(|dir| dir.path().display()), + self.entries.len(), + // self.generation, + // self.settings, ) } } @@ -118,7 +116,11 @@ impl DriveFilesystem { let path = self.get_cache_path_for_entry(entry)?; let metadata = Self::create_drive_metadata_from_entry(entry)?; debug!("schedule_upload: sending path to file uploader..."); - self.file_uploader_sender.send(FileUploaderCommand::UploadChange(FileCommand::new(path, metadata))).await?; + self.file_uploader_sender + .send(FileUploaderCommand::UploadChange(FileCommand::new( + path, metadata, + ))) + .await?; debug!("schedule_upload: sent path to file uploader"); Ok(()) } @@ -141,22 +143,32 @@ impl DriveFilesystem { } fn get_drive_id_from_ino(&self, parent: impl Into) -> anyhow::Result<&DriveId> { - self.ino_drive_id.get_by_left(&parent.into()).context("could not get drive id for ino") + self.ino_drive_id + .get_by_left(&parent.into()) + .context("could not get drive id for ino") } fn get_ino_from_drive_id(&self, parent: impl Into) -> anyhow::Result<&Inode> { - self.ino_drive_id.get_by_right(&parent.into()).context("could not get drive id for ino") + self.ino_drive_id + .get_by_right(&parent.into()) + .context("could not get drive id for ino") } } // region general impl DriveFilesystem { #[instrument(skip(file_uploader_sender))] - pub async fn new(config_path: impl AsRef + Debug, - file_uploader_sender: tokio::sync::mpsc::Sender, - drive: GoogleDrive, cache_dir: PathBuf, - settings: SyncSettings) -> Result { + pub async fn new( + config_path: impl AsRef + Debug, + file_uploader_sender: tokio::sync::mpsc::Sender, + drive: GoogleDrive, + cache_dir: PathBuf, + settings: SyncSettings, + ) -> Result { let config_path = config_path.as_ref(); - debug!("DriveFilesystem::new(config_path: {})", config_path.display()); + debug!( + "DriveFilesystem::new(config_path: {})", + config_path.display() + ); // let upload_filter = CommonFileFilter::from_path(config_path)?; let mut entries = HashMap::new(); Self::add_root_entry(&mut entries); @@ -203,13 +215,7 @@ impl DriveFilesystem { let inode = Inode::from(FUSE_ROOT_ID); entries.insert( DriveId::root(), - DriveEntry::new( - inode, - "root".to_string(), - DriveId::root(), - root_attr, - None, - ), + DriveEntry::new(inode, "root".to_string(), DriveId::root(), root_attr, None), ); } #[instrument(fields(% self, inode))] @@ -222,7 +228,8 @@ impl DriveFilesystem { cache_dir.display() ); let entry = self - .entries.get(&inode) + .entries + .get(&inode) .ok_or(anyhow!("could not get entry"))?; debug!( "get_cache_dir_for_file: entry local_path: {:?}", @@ -239,9 +246,9 @@ impl DriveFilesystem { path.join(match entry.local_path.as_ref() { Some(x) => match x.parent() { Some(parent) => parent.to_path_buf(), - None => PathBuf::new() + None => PathBuf::new(), }, - None => PathBuf::new() + None => PathBuf::new(), }) } #[instrument(fields(% self))] @@ -251,7 +258,8 @@ impl DriveFilesystem { let mut entries = HashMap::new(); self.children = HashMap::new(); self.ino_drive_id = BiMap::new(); - self.ino_drive_id.insert(Inode::from(FUSE_ROOT_ID), DriveId::root()); + self.ino_drive_id + .insert(Inode::from(FUSE_ROOT_ID), DriveId::root()); let alternative_rood_id = self .source .get_metadata_for_file(DriveId::root()) @@ -266,12 +274,19 @@ impl DriveFilesystem { let entry = self.create_entry_from_drive_metadata(&metadata, inode); if let Ok(entry) = entry { let inode = entry.ino.clone(); - debug!("add_all_file_entries: adding entry: ({}) {:?}", inode, entry); + debug!( + "add_all_file_entries: adding entry: ({}) {:?}", + inode, entry + ); let drive_id = entry.drive_id.clone(); entries.insert(drive_id.clone(), entry); self.ino_drive_id.insert(inode, drive_id.clone()); if let Some(parents) = metadata.parents { - debug!("drive_id: {:<40} has parents: {}", drive_id.to_string(), parents.len()); + debug!( + "drive_id: {:<40} has parents: {}", + drive_id.to_string(), + parents.len() + ); let parents = parents.iter().map(|p| DriveId::from(p)); for parent in parents { if parent.to_string() == alternative_rood_id.to_string() { @@ -282,31 +297,54 @@ impl DriveFilesystem { self.add_child(drive_id.clone(), &parent); } } else { - debug!("drive_id: {:<40} does not have parents", drive_id.to_string()); + debug!( + "drive_id: {:<40} does not have parents", + drive_id.to_string() + ); //does not belong to any folder, add to root self.add_child(drive_id, &DriveId::root()); } } else { - warn!("add_all_file_entries: could not create entry! err: {:?} metadata:{:?}", entry, metadata); + warn!( + "add_all_file_entries: could not create entry! err: {:?} metadata:{:?}", + entry, metadata + ); } } - debug!("add_all_file_entries: entries: new len: {} old len: {}", entries.len(), old_len); + debug!( + "add_all_file_entries: entries: new len: {} old len: {}", + entries.len(), + old_len + ); self.entries = entries; debug!("build all local paths"); - self.get_entry_mut(&DriveId::root()).expect("The root entry has to exist by now").build_local_path(None); + self.get_entry_mut(&DriveId::root()) + .expect("The root entry has to exist by now") + .build_local_path(None); self.build_path_for_children(&DriveId::root()); Ok(()) } #[instrument(skip(self))] fn build_path_for_children(&mut self, parent_id: &DriveId) { - let parent = self.entries.get(parent_id).expect("parent entry has to exist"); - debug!("build_path_for_children: parent: {:<40} => {:?}", parent.drive_id.to_string(), parent.name); + let parent = self + .entries + .get(parent_id) + .expect("parent entry has to exist"); + debug!( + "build_path_for_children: parent: {:<40} => {:?}", + parent.drive_id.to_string(), + parent.name + ); if let Some(child_list) = self.children.get(&parent_id) { - debug!("build_path_for_children: ({}) child_list: {:?}", child_list.len(), child_list); + debug!( + "build_path_for_children: ({}) child_list: {:?}", + child_list.len(), + child_list + ); for child_id in child_list.clone() { let parent: Option = match self.entries.get(parent_id) { Some(e) => e.local_path.clone(), - None => None + None => None, }; let child = self.entries.get_mut(&child_id); if let Some(child) = child { @@ -314,9 +352,12 @@ impl DriveFilesystem { } else { warn!("add_all_file_entries: could not find child entry!"); } - debug!("build_path_for_children: child: {:?} parent: {:?}", child_id, parent_id); + debug!( + "build_path_for_children: child: {:?} parent: {:?}", + child_id, parent_id + ); self.build_path_for_children(&child_id); - }; + } } } @@ -324,10 +365,16 @@ impl DriveFilesystem { fn add_child(&mut self, drive_id: DriveId, parent: &DriveId) { let existing_child_list = self.children.get_mut(&parent); if let Some(existing_child_list) = existing_child_list { - debug!("add_child: adding child: {:?} to parent: {:?}", drive_id, parent); + debug!( + "add_child: adding child: {:?} to parent: {:?}", + drive_id, parent + ); existing_child_list.push(drive_id); } else { - debug!("add_child: adding child: {:?} to parent: {:?} (new)", drive_id, parent); + debug!( + "add_child: adding child: {:?} to parent: {:?} (new)", + drive_id, parent + ); let set = vec![drive_id]; self.children.insert(parent.clone(), set); } @@ -371,10 +418,9 @@ impl DriveFilesystem { } let ino = inode; let id = DriveId::from(metadata.id.as_ref().context("could not get id")?); - let mime_type = metadata - .mime_type - .as_ref() - .context("could not determine if this is a file or a folder since the mime type was empty")?; + let mime_type = metadata.mime_type.as_ref().context( + "could not determine if this is a file or a folder since the mime type was empty", + )?; let kind = match mime_type.as_str() { "application/vnd.google-apps.document" | "application/vnd.google-apps.spreadsheet" @@ -396,10 +442,19 @@ impl DriveFilesystem { ino: ino.into(), size: Self::get_size_from_drive_metadata(metadata).unwrap_or(0), blocks: 0, - atime: metadata.viewed_by_me_time.map(SystemTime::from).unwrap_or(UNIX_EPOCH), - mtime: metadata.modified_time.map(SystemTime::from).unwrap_or(UNIX_EPOCH), + atime: metadata + .viewed_by_me_time + .map(SystemTime::from) + .unwrap_or(UNIX_EPOCH), + mtime: metadata + .modified_time + .map(SystemTime::from) + .unwrap_or(UNIX_EPOCH), ctime: SystemTime::now(), - crtime: metadata.created_time.map(SystemTime::from).unwrap_or(UNIX_EPOCH), + crtime: metadata + .created_time + .map(SystemTime::from) + .unwrap_or(UNIX_EPOCH), kind, perm: permissions, nlink: 1, @@ -434,8 +489,9 @@ impl DriveFilesystem { let drive_id = entry.drive_id.clone(); let drive = &self.source; let cache_path = self.get_cache_path_for_entry(&entry)?; - let folder = cache_path.parent() - .ok_or(anyhow!("could not get the folder the cache file should be saved in"))?; + let folder = cache_path.parent().ok_or(anyhow!( + "could not get the folder the cache file should be saved in" + ))?; if !folder.exists() { debug!("creating folder: {}", folder.display()); std::fs::create_dir_all(folder)?; @@ -469,7 +525,10 @@ impl DriveFilesystem { let entry = self.entries.get_mut(drive_id); if let Some(entry) = entry { - debug!("updating entry metadata: {}, {:?} entry: {:?}",entry.ino, entry.md5_checksum, entry); + debug!( + "updating entry metadata: {}, {:?} entry: {:?}", + entry.ino, entry.md5_checksum, entry + ); let change_successful = Self::update_entry_metadata(file, entry); if let Err(e) = change_successful { warn!("got an err while update entry metadata: {}", e); @@ -478,7 +537,6 @@ impl DriveFilesystem { } } - updated_entries.push(change.drive_id); debug!("processed change"); continue; @@ -530,7 +588,11 @@ impl DriveFilesystem { Ok(path) } fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf { - debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display()); + debug!( + "construct_cache_path_for_entry: {} with cache_dir: {}", + entry.ino, + cache_dir.display() + ); let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name); debug!( "get_cache_path_for_entry: {}: {}", @@ -540,7 +602,11 @@ impl DriveFilesystem { path } - fn set_entry_metadata_with_ino(&mut self, ino: impl Into, drive_metadata: File) -> anyhow::Result<()> { + fn set_entry_metadata_with_ino( + &mut self, + ino: impl Into, + drive_metadata: File, + ) -> anyhow::Result<()> { let entry = self.get_entry_mut(ino).context("no entry with ino")?; Self::update_entry_metadata(drive_metadata, entry) @@ -567,26 +633,39 @@ impl DriveFilesystem { let checksum_mismatch = Self::compare_checksums(&drive_metadata.md5_checksum, &entry); match checksum_mismatch { ChecksumMatch::Missing | ChecksumMatch::Unknown | ChecksumMatch::RemoteMismatch => { - debug!("md5_checksum mismatch: {:?} != {:?}", drive_metadata.md5_checksum, entry.md5_checksum); + debug!( + "md5_checksum mismatch: {:?} != {:?}", + drive_metadata.md5_checksum, entry.md5_checksum + ); entry.set_md5_checksum(drive_metadata.md5_checksum); entry.has_upstream_content_changes = true; - debug!("updated md5_checksum of {} to: {:?}", entry.ino, &entry.md5_checksum); + debug!( + "updated md5_checksum of {} to: {:?}", + entry.ino, &entry.md5_checksum + ); } ChecksumMatch::Match => { - debug!("md5_checksum match: {:?} == {:?}", drive_metadata.md5_checksum, &entry.md5_checksum); + debug!( + "md5_checksum match: {:?} == {:?}", + drive_metadata.md5_checksum, &entry.md5_checksum + ); entry.has_upstream_content_changes = false; } ChecksumMatch::CacheMismatch => { - debug!("the local checksum and the remote checksum match,\ - so we can assume the local changes have just been uploaded to the remote"); + debug!( + "the local checksum and the remote checksum match,\ + so we can assume the local changes have just been uploaded to the remote" + ); entry.has_upstream_content_changes = false; } ChecksumMatch::LocalMismatch => { - debug!("the local checksum does not match the remote or the cached \ - checksum, this means the local file has been modified"); + debug!( + "the local checksum does not match the remote or the cached \ + checksum, this means the local file has been modified" + ); entry.has_upstream_content_changes = false; } @@ -624,8 +703,14 @@ impl DriveFilesystem { debug!("md5_checksum match: (r) == (l) == (c): {:?} ", md5_checksum); return ChecksumMatch::Match; } - if md5_checksum != &entry.local_md5_checksum && md5_checksum != &entry.md5_checksum && entry.local_md5_checksum != entry.md5_checksum { - debug!("md5_checksum match: {:?} (r) != {:?} (l) != {:?} (c)", md5_checksum, entry.local_md5_checksum, entry.md5_checksum); + if md5_checksum != &entry.local_md5_checksum + && md5_checksum != &entry.md5_checksum + && entry.local_md5_checksum != entry.md5_checksum + { + debug!( + "md5_checksum match: {:?} (r) != {:?} (l) != {:?} (c)", + md5_checksum, entry.local_md5_checksum, entry.md5_checksum + ); return ChecksumMatch::Conflict; } @@ -638,7 +723,10 @@ impl DriveFilesystem { return ChecksumMatch::CacheMismatch; } if &entry.local_md5_checksum == &entry.md5_checksum { - debug!("md5_checksum match: (l) == (c): {:?} ", entry.local_md5_checksum); + debug!( + "md5_checksum match: (l) == (c): {:?} ", + entry.local_md5_checksum + ); return ChecksumMatch::RemoteMismatch; } warn!("how could I get here?"); @@ -676,7 +764,10 @@ impl DriveFilesystem { .collect(); self.last_checked_changes = SystemTime::now(); - debug!("checked for changes, found {} changes", changes.as_ref().unwrap_or(&Vec::::new()).len()); + debug!( + "checked for changes, found {} changes", + changes.as_ref().unwrap_or(&Vec::::new()).len() + ); changes } fn remove_entry(&mut self, id: &DriveId) -> anyhow::Result<()> { @@ -761,7 +852,8 @@ impl Filesystem for DriveFilesystem { #[instrument(fields(% self))] fn destroy(&mut self) { debug!("destroy"); - let stop_res = run_async_blocking(self.file_uploader_sender.send(FileUploaderCommand::Stop)); + let stop_res = + run_async_blocking(self.file_uploader_sender.send(FileUploaderCommand::Stop)); if let Err(e) = stop_res { error!("could not send stop command to file uploader: {}", e); } @@ -780,14 +872,20 @@ impl Filesystem for DriveFilesystem { let parent = parent.into(); let parent_drive_id = self.get_drive_id_from_ino(&parent); if parent_drive_id.is_err() { - warn!("lookup: could not get drive_id for {}: {:?}", parent, parent_drive_id); + warn!( + "lookup: could not get drive_id for {}: {:?}", + parent, parent_drive_id + ); reply.error(libc::ENOENT); return; } let parent_drive_id = parent_drive_id.unwrap(); let children = self.children.get(&parent_drive_id); if children.is_none() { - warn!("lookup: could not find children for {}: {}", parent, parent_drive_id); + warn!( + "lookup: could not find children for {}: {}", + parent, parent_drive_id + ); for (id, entry) in self.entries.iter() { debug!("entry: {:<40} => {:?}", id.to_string(), entry); } @@ -808,7 +906,7 @@ impl Filesystem for DriveFilesystem { let accepted = name.eq_ignore_ascii_case(&path); debug!( "entry: {}:(accepted={}),{:?}; {:?}; {:?}", - child_inode, accepted, entry.md5_checksum ,path, entry.attr + child_inode, accepted, entry.md5_checksum, path, entry.attr ); if accepted { reply.entry(&self.settings.time_to_live(), &entry.attr, self.generation); @@ -994,28 +1092,31 @@ impl Filesystem for DriveFilesystem { let mut buf = vec![0; size as usize]; debug!("reading file: {:?} at {} with size {}", &path, offset, size); - file.read_at(&mut buf, offset as u64).unwrap(); + file.read_exact_at(&mut buf, offset as u64).unwrap(); debug!("read file: {:?} at {}", &path, offset); reply.data(&buf); } //endregion //region write #[instrument(skip(_req, reply), fields(% self, data = data.len()))] - fn write(&mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - offset: i64, - data: &[u8], - write_flags: u32, - flags: i32, - lock_owner: Option, - reply: ReplyWrite) { + fn write( + &mut self, + _req: &Request<'_>, + ino: u64, + fh: u64, + offset: i64, + data: &[u8], + write_flags: u32, + flags: i32, + lock_owner: Option, + reply: ReplyWrite, + ) { debug!( "write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}", ino, fh, offset, flags, lock_owner, write_flags, data, ); - let cache_update_success: Result = run_async_blocking(self.update_cache_if_needed(ino)); + let cache_update_success: Result = + run_async_blocking(self.update_cache_if_needed(ino)); match cache_update_success { Err(e) => { error!("write: could not update cache: {}", e); @@ -1031,9 +1132,7 @@ impl Filesystem for DriveFilesystem { } } } - let cache_dir = self.cache_dir - .as_ref() - .map(|s| s.to_path_buf()); + let cache_dir = self.cache_dir.as_ref().map(|s| s.to_path_buf()); if let None = cache_dir { error!("write: cache dir not set"); reply.error(libc::ENOENT); @@ -1074,18 +1173,25 @@ impl Filesystem for DriveFilesystem { } let mut file = file.unwrap(); - debug!("writing file: {:?} at {} with size {}", + debug!( + "writing file: {:?} at {} with size {}", &path, offset, data.len() - ); + ); file.seek(SeekFrom::Start(offset as u64)).unwrap(); file.write_all(data).unwrap(); let size = data.len(); - debug!("wrote file: {:?} at {}; wrote {} bytes", &path, offset, size); + debug!( + "wrote file: {:?} at {}; wrote {} bytes", + &path, offset, size + ); reply.written(size as u32); - debug!("updating size to {} for entry: {:?}", entry.attr.size, entry); + debug!( + "updating size to {} for entry: {:?}", + entry.attr.size, entry + ); let mut attr = &mut entry.attr; if truncate { attr.size = size as u64; @@ -1095,9 +1201,15 @@ impl Filesystem for DriveFilesystem { let now = SystemTime::now(); attr.mtime = now; attr.ctime = now; - debug!("updated size to {} for entry: {:?}", entry.attr.size, entry); + debug!( + "updated size to {} for entry: {:?}", + entry.attr.size, entry + ); entry.local_md5_checksum = Self::compute_md5_checksum(&path); - debug!("updated local md5 to {:?} for entry: {:?}", entry.local_md5_checksum, entry); + debug!( + "updated local md5 to {:?} for entry: {:?}", + entry.local_md5_checksum, entry + ); debug!("write done for entry: {:?}", entry); } @@ -1107,8 +1219,9 @@ impl Filesystem for DriveFilesystem { return; } let drive_id = drive_id.unwrap(); - let entry = self.get_entry_r(drive_id) - .expect("how could this happen to me. I swear it was there a second ago"); + let entry = self + .get_entry_r(drive_id) + .expect("how could this happen to me. I swear it was there a second ago"); let schedule_res = run_async_blocking(self.schedule_upload(&entry)); if let Err(e) = schedule_res { error!("read: could not schedule the upload: {}", e); @@ -1194,7 +1307,6 @@ impl Filesystem for DriveFilesystem { } //endregion - //TODOs: // TODO: implement rename/move // TODO: implement create diff --git a/src/fs/drive/mod.rs b/src/fs/drive/mod.rs index b1434b8..d19fef7 100644 --- a/src/fs/drive/mod.rs +++ b/src/fs/drive/mod.rs @@ -4,8 +4,8 @@ pub use file_uploader::*; pub use filesystem::*; pub use settings::*; +mod change; mod entry; -mod filesystem; mod file_uploader; +mod filesystem; mod settings; -mod change; \ No newline at end of file diff --git a/src/fs/drive/settings.rs b/src/fs/drive/settings.rs index e5bbe86..f01645d 100644 --- a/src/fs/drive/settings.rs +++ b/src/fs/drive/settings.rs @@ -43,8 +43,11 @@ impl SyncSettings { // endregion impl Display for SyncSettings { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "SyncSettings {{ ttl: {}s, cache_time: {}s }}", - self.time_to_live.as_secs(), - self.cache_time.as_secs()) + write!( + f, + "SyncSettings {{ ttl: {}s, cache_time: {}s }}", + self.time_to_live.as_secs(), + self.cache_time.as_secs() + ) } } diff --git a/src/google_drive/drive.rs b/src/google_drive/drive.rs index e2fb1d0..a70d006 100644 --- a/src/google_drive/drive.rs +++ b/src/google_drive/drive.rs @@ -5,17 +5,17 @@ use std::path::{Path, PathBuf}; // use drive3::api::Scope::File; use anyhow::{anyhow, Context}; -use drive3::{hyper_rustls, oauth2}; use drive3::api::{Change, File, Scope, StartPageToken}; -use drive3::DriveHub; -use drive3::hyper::{Body, Response}; use drive3::hyper::client::HttpConnector; +use drive3::hyper::{Body, Response}; use drive3::hyper_rustls::HttpsConnector; +use drive3::DriveHub; +use drive3::{hyper_rustls, oauth2}; use hyper::Client; use tokio::fs; use tracing::{debug, error, instrument, trace, warn}; -use crate::google_drive::{DriveId, helpers}; +use crate::google_drive::{helpers, DriveId}; use crate::prelude::*; const FIELDS_FILE: &str = "id, name, size, mimeType, kind, md5Checksum, parents,trashed, createdTime, modifiedTime, viewedByMeTime"; @@ -32,21 +32,15 @@ impl GoogleDrive { let mut page_token: Option = None; loop { debug!("list_files: page_token: {:?}", page_token); - let mut request = - self - .hub - .files() - .list() - .param( - "fields", - &format!("nextPageToken, files({})", FIELDS_FILE), - ); + let mut request = self + .hub + .files() + .list() + .param("fields", &format!("nextPageToken, files({})", FIELDS_FILE)); if let Some(page_token) = page_token { request = request.page_token(&page_token); } - let (_response, result) = request - .doit() - .await?; + let (_response, result) = request.doit().await?; let result_files = result.files.ok_or(anyhow!("no file list returned"))?; debug!("list_files: response: {:?}", result_files.len()); files.extend(result_files); @@ -62,35 +56,47 @@ impl GoogleDrive { impl GoogleDrive { #[instrument] pub(crate) async fn get_start_page_token(&self) -> anyhow::Result { - let (_response, start_page_token) = self.hub.changes().get_start_page_token().doit().await?; + let (_response, start_page_token) = + self.hub.changes().get_start_page_token().doit().await?; Ok(start_page_token) } } impl GoogleDrive { #[instrument] - pub(crate) async fn get_changes_since(&self, start_page_token: &mut StartPageToken) -> anyhow::Result> { + pub(crate) async fn get_changes_since( + &self, + start_page_token: &mut StartPageToken, + ) -> anyhow::Result> { let mut changes = vec![]; let mut page_token: Option = None; loop { - debug!("getting changes since {:?} page: {:?}", start_page_token, page_token); + debug!( + "getting changes since {:?} page: {:?}", + start_page_token, page_token + ); let file_spec = &format!("file({})", FIELDS_FILE); let mut request = self .hub .changes() - .list(&start_page_token - .start_page_token - .as_ref() - .context("no start_page_token")?) - .param("fields", &format!("changes({}, changeType, removed, fileId, \ - driveId, drive, time), newStartPageToken, nextPageToken", file_spec)); + .list( + &start_page_token + .start_page_token + .as_ref() + .context("no start_page_token")?, + ) + .param( + "fields", + &format!( + "changes({}, changeType, removed, fileId, \ + driveId, drive, time), newStartPageToken, nextPageToken", + file_spec + ), + ); if let Some(page_token) = &page_token { request = request.page_token(page_token); } - let response = request - .doit() - .await - .context("could not get changes"); + let response = request.doit().await.context("could not get changes"); if let Err(e) = &response { error!("error getting changes: {:?}", e); return Err(anyhow!("error getting changes: {:?}", e)); @@ -122,7 +128,8 @@ impl GoogleDrive { .files() .get(&drive_id) .param("fields", &FIELDS_FILE) - .doit().await?; + .doit() + .await?; Ok(file) } @@ -130,7 +137,11 @@ impl GoogleDrive { impl GoogleDrive { #[instrument(skip(file), fields(file_name = file.name, file_id = file.drive_id))] - pub async fn upload_file_content_from_path(&self, file: File, path: &Path) -> anyhow::Result<()> { + pub async fn upload_file_content_from_path( + &self, + file: File, + path: &Path, + ) -> anyhow::Result<()> { update_file_content_on_drive_from_path(&self, file, path).await?; Ok(()) } @@ -169,7 +180,7 @@ impl GoogleDrive { Some(parent_drive_id) => parent_drive_id, None => DriveId::from("root"), } - .into(); + .into(); let parent_drive_id = match parent_drive_id.into_string() { Ok(parent_drive_id) => parent_drive_id, Err(_) => return Err(anyhow!("invalid parent_drive_id")), @@ -226,9 +237,9 @@ impl GoogleDrive { auth, oauth2::InstalledFlowReturnMethod::HTTPRedirect, ) - .persist_tokens_to_disk("auth/tokens.json") - .build() - .await?; + .persist_tokens_to_disk("auth/tokens.json") + .build() + .await?; let http_client = Client::builder().build( hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() @@ -264,10 +275,7 @@ impl GoogleDrive { .hub .files() .list() - .param( - "fields", - &format!("nextPageToken, files({})", FIELDS_FILE), - ) + .param("fields", &format!("nextPageToken, files({})", FIELDS_FILE)) // .page_token(page_token.as_ref().map(String::as_str)) .q(format!("'{}' in parents", folder_id).as_str()) .doit() @@ -465,7 +473,10 @@ pub async fn update_file_content_on_drive_from_path( file: google_drive3::api::File, source_path: &Path, ) -> anyhow::Result<()> { - debug!("update_file_content_on_drive_from_path(): source_path: {:?}", source_path); + debug!( + "update_file_content_on_drive_from_path(): source_path: {:?}", + source_path + ); // { // debug!("reading content from file for testing"); // let content = std::fs::File::open(source_path)?; @@ -487,7 +498,10 @@ async fn update_file_content_on_drive( ) -> anyhow::Result<()> { let stream = content.into_std().await; let mime_type = helpers::get_mime_from_file_metadata(&file)?; - let id = file.drive_id.clone().with_context(|| "file metadata has no drive id")?; + let id = file + .drive_id + .clone() + .with_context(|| "file metadata has no drive id")?; debug!("starting upload"); let (response, file) = drive .hub diff --git a/src/google_drive/drive_id.rs b/src/google_drive/drive_id.rs index db2c149..1c1804d 100644 --- a/src/google_drive/drive_id.rs +++ b/src/google_drive/drive_id.rs @@ -24,7 +24,10 @@ impl TryFrom for DriveId { fn try_from(value: OsString) -> anyhow::Result { let result = value.into_string(); if let Err(e) = result { - return Err(anyhow::anyhow!("Failed to convert OsString to String: {:?}", e)); + return Err(anyhow::anyhow!( + "Failed to convert OsString to String: {:?}", + e + )); } Ok(DriveId::new(result.unwrap())) } diff --git a/src/google_drive/mod.rs b/src/google_drive/mod.rs index 8ec3bce..6f7ea59 100644 --- a/src/google_drive/mod.rs +++ b/src/google_drive/mod.rs @@ -7,4 +7,3 @@ mod helpers; mod drive; mod drive_id; - diff --git a/src/lib.rs b/src/lib.rs index 66b6327..b8f318a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ extern crate google_drive3 as drive3; - use std::path::Path; use std::time::Duration; @@ -19,15 +18,15 @@ use tracing::{debug, info}; use prelude::*; use crate::config::common_file_filter::CommonFileFilter; -use crate::fs::drive::{DriveFilesystem, DriveFileUploader, FileUploaderCommand, SyncSettings}; +use crate::fs::drive::{DriveFileUploader, DriveFilesystem, FileUploaderCommand, SyncSettings}; use crate::google_drive::GoogleDrive; pub mod async_helper; pub mod common; +pub mod config; pub mod fs; pub mod google_drive; pub mod prelude; -pub mod config; pub async fn sample_drive_fs() -> Result<()> { let mountpoint = "/tmp/fuse/3"; @@ -41,10 +40,12 @@ pub async fn sample_drive_fs() -> Result<()> { let drive = GoogleDrive::new().await?; // let file_uploader = FileUploader::new("config/credentials.json", "config/token.json"); let (file_uploader_sender, file_uploader_receiver) = mpsc::channel(1); - let mut file_uploader = DriveFileUploader::new(drive.clone(), - upload_ignore, - file_uploader_receiver, - Duration::from_secs(3)); + let mut file_uploader = DriveFileUploader::new( + drive.clone(), + upload_ignore, + file_uploader_receiver, + Duration::from_secs(3), + ); debug!("Mounting fuse filesystem at {}", mountpoint); let fs = DriveFilesystem::new( Path::new(""), @@ -52,12 +53,16 @@ pub async fn sample_drive_fs() -> Result<()> { drive, cache_dir.into_path(), sync_settings, - ).await?; + ) + .await?; let mount_options = vec![MountOption::RW]; - let uploader_handle: JoinHandle<()> = tokio::spawn(async move { file_uploader.listen().await; }); - let end_signal_handle: JoinHandle<()> = mount(fs, &mountpoint, &mount_options, file_uploader_sender).await?; + let uploader_handle: JoinHandle<()> = tokio::spawn(async move { + file_uploader.listen().await; + }); + let end_signal_handle: JoinHandle<()> = + mount(fs, &mountpoint, &mount_options, file_uploader_sender).await?; tokio::try_join!(uploader_handle, end_signal_handle)?; debug!("Exiting gracefully..."); @@ -76,25 +81,31 @@ fn get_cache_dir() -> Result { Ok(cache_dir) } -async fn mount(fs: DriveFilesystem, - mountpoint: &str, - options: &[MountOption], - sender: Sender) -> Result> { +async fn mount( + fs: DriveFilesystem, + mountpoint: &str, + options: &[MountOption], + sender: Sender, +) -> Result> { let mut session = Session::new(fs, mountpoint.as_ref(), options)?; let session_ender = session.unmount_callable(); let end_program_signal_handle = tokio::spawn(async move { let _ = end_program_signal_awaiter(sender, session_ender).await; }); - debug!("Mounting fuse filesystem" ); + debug!("Mounting fuse filesystem"); let _ = session.run(); debug!("Stopped with mounting"); // Ok(session_ender) Ok(end_program_signal_handle) } -async fn end_program_signal_awaiter(file_uploader_sender: Sender, - mut session_unmounter: SessionUnmounter) -> Result<()> { - tokio::signal::ctrl_c().await.expect("failed to listen for ctrl_c event"); +async fn end_program_signal_awaiter( + file_uploader_sender: Sender, + mut session_unmounter: SessionUnmounter, +) -> Result<()> { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl_c event"); info!("got signal to end program"); file_uploader_sender.send(FileUploaderCommand::Stop).await?; diff --git a/src/main.rs b/src/main.rs index 0cd8ead..b2fdf36 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,7 +44,10 @@ async fn sample_logging() { use tracing::{debug, error, info, trace, warn}; info!("info"); debug!("debug"); - let s = span!(tracing::Level::TRACE, "span around trace and warn with stdin read"); + let s = span!( + tracing::Level::TRACE, + "span around trace and warn with stdin read" + ); { let _x = s.enter(); trace!("trace"); diff --git a/src/prelude.rs b/src/prelude.rs index 64a81f3..326a995 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,3 +1,3 @@ pub use anyhow::Result; -pub use drive3::api::File as DriveFileMetadata; \ No newline at end of file +pub use drive3::api::File as DriveFileMetadata;