From 121180e15b955afbf612c12ff66524cb35aa5733 Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Thu, 18 May 2023 18:50:54 +0200 Subject: [PATCH] implement writing to google drive with caching, switch to tracing for logs, change overall caching behaviour --- .cargo/config.toml | 2 + Cargo.toml | 9 +- src/async_helper.rs | 8 +- src/config/common_file_filter.rs | 22 + src/config/mod.rs | 1 + src/fs/common.rs | 5 +- src/fs/drive/entry.rs | 9 +- src/fs/drive/file_uploader.rs | 208 +++++++ src/fs/drive/filesystem.rs | 980 +++++++++++++++++++++++++++++++ src/fs/drive/mod.rs | 744 +---------------------- src/fs/drive/settings.rs | 51 ++ src/fs/sample.rs | 6 +- src/google_drive/drive.rs | 132 ++++- src/google_drive/drive_id.rs | 10 +- src/google_drive/helpers.rs | 42 +- src/lib.rs | 159 +++-- src/main.rs | 48 +- 17 files changed, 1591 insertions(+), 845 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 src/config/common_file_filter.rs create mode 100644 src/config/mod.rs create mode 100644 src/fs/drive/file_uploader.rs create mode 100644 src/fs/drive/filesystem.rs create mode 100644 src/fs/drive/settings.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..bff29e6 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/Cargo.toml b/Cargo.toml index 1c69a9e..d14eddb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,7 @@ edition = "2021" [dependencies] google-drive3 = "5.0" -tokio = { version = "1.28", features = ["full"] } -log = "0.4" +tokio = { version = "1.28", features = ["full", "tracing"] } env_logger = "0.10" #nix = { version = "0.26", features = ["mount"] } tempfile = "3.5.0" @@ -23,4 +22,8 @@ futures = "0.3.28" hyper = { version = "0.14.24", features = ["full", "stream"] } mime = "0.3" anyhow = "1.0" -async-recursion = "1.0.4" \ No newline at end of file +async-recursion = "1.0.4" +ignore = "0.4.20" +tracing = "0.1.37" +tracing-subscriber = "0.3.17" +console-subscriber = "0.1.9" diff --git a/src/async_helper.rs b/src/async_helper.rs index 62ff21f..62d395e 100644 --- a/src/async_helper.rs +++ b/src/async_helper.rs @@ -1,4 +1,5 @@ -use log::{debug, trace}; +use std::fmt::Debug; +use tracing::{debug, trace}; use std::future::Future; use tokio::runtime::{Handle, Runtime}; @@ -7,12 +8,13 @@ use tokio::runtime::{Handle, Runtime}; /// This function will block the current thread until the provided future has run to completion. /// /// # Be careful with deadlocks -pub fn run_async_blocking(f: impl std::future::Future + Sized) -> T { +pub fn run_async_blocking(f: impl std::future::Future + Sized) -> T +where T: Debug { trace!("run_async"); let handle = Handle::current(); handle.enter(); trace!("run_async: entered handle"); let result = futures::executor::block_on(f); - trace!("run_async: got result"); + trace!("run_async: got result: {:?}", result); result } diff --git a/src/config/common_file_filter.rs b/src/config/common_file_filter.rs new file mode 100644 index 0000000..29220df --- /dev/null +++ b/src/config/common_file_filter.rs @@ -0,0 +1,22 @@ +use std::path::{Path, PathBuf}; +use crate::prelude::*; +use ignore::gitignore; +use ignore::gitignore::{Gitignore, GitignoreBuilder}; +#[derive(Debug)] +pub struct CommonFileFilter { + pub filter: Gitignore, +} +impl CommonFileFilter{ + pub fn from_path(path: impl Into) -> Result { + let path = path.into(); + let ignores = GitignoreBuilder::new(&path) + .build()?; + let s = Self { + filter: ignores, + }; + Ok(s) + } + pub fn is_filter_matched(&self, path: &Path) -> Result { + Ok(self.filter.matched(path, path.is_dir()).is_ignore()) + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs new file mode 100644 index 0000000..fc2ab8e --- /dev/null +++ b/src/config/mod.rs @@ -0,0 +1 @@ +pub mod common_file_filter; \ No newline at end of file diff --git a/src/fs/common.rs b/src/fs/common.rs index c4fbd02..e08db5e 100644 --- a/src/fs/common.rs +++ b/src/fs/common.rs @@ -6,9 +6,10 @@ use crate::prelude::*; use anyhow::anyhow; use async_trait::async_trait; use fuser::{FileAttr, FileType, TimeOrNow, FUSE_ROOT_ID}; -use log::debug; +use tracing::debug; use std::collections::HashMap; use std::ffi::{OsStr, OsString}; +use std::fmt::Debug; use std::path::{Path, PathBuf}; use std::time::SystemTime; @@ -145,7 +146,7 @@ pub trait CommonFilesystem { name: &OsStr, mode: u16, file_type: FileType, - parent_ino: impl Into + Send, + parent_ino: impl Into + Send+ Debug, size: u64, ) -> Result; diff --git a/src/fs/drive/entry.rs b/src/fs/drive/entry.rs index e4b14f7..6fad042 100644 --- a/src/fs/drive/entry.rs +++ b/src/fs/drive/entry.rs @@ -3,7 +3,7 @@ use crate::fs::{CommonEntry, Inode}; use crate::google_drive::DriveId; use fuser::FileAttr; use std::ffi::{OsStr, OsString}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct DriveEntry { pub ino: Inode, pub drive_id: DriveId, @@ -12,6 +12,9 @@ pub struct DriveEntry { // pub drive_path: OsString, pub local_path: LocalPath, pub attr: FileAttr, + pub metadata_cache_time: Option, + pub content_cache_time: Option, + pub drive_metadata: Option, } impl DriveEntry { pub fn new( @@ -21,6 +24,7 @@ impl DriveEntry { local_path: impl Into, attr: FileAttr, + drive_metadata: Option, ) -> Self { let name = name.into(); let path = local_path.into(); @@ -31,6 +35,9 @@ impl DriveEntry { // drive_path: path.clone().into(), local_path: path, attr, + metadata_cache_time: None, + content_cache_time: None, + drive_metadata, } } } diff --git a/src/fs/drive/file_uploader.rs b/src/fs/drive/file_uploader.rs new file mode 100644 index 0000000..bb8c834 --- /dev/null +++ b/src/fs/drive/file_uploader.rs @@ -0,0 +1,208 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; + +use anyhow::anyhow; +use anyhow::Context; +use drive3::api::File; +use futures::TryFutureExt; +use tokio::io::{AsyncWriteExt, stdout}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::task::JoinHandle; +use tracing::{debug, error, info, instrument, warn}; + +use crate::config::common_file_filter::CommonFileFilter; +use crate::google_drive::{DriveId, GoogleDrive}; +use crate::prelude::*; + +#[derive(Debug, Clone)] +pub struct FileCommand { + path: PathBuf, + file_metadata: File, +} + +impl FileCommand { + pub fn new(path: PathBuf, file_metadata: File) -> Self { + Self { + path, + file_metadata, + } + } +} + +#[derive(Debug)] +struct RunningUpload { + join_handle: JoinHandle>, + stop_sender: Sender<()>, +} + +#[derive(Debug)] +pub enum FileUploaderCommand { + UploadChange(FileCommand), + CreateFolder(FileCommand), + CreateFile(FileCommand), + Stop, + +} + +#[derive(Debug)] +pub struct DriveFileUploader { + drive: GoogleDrive, + + /// the filter to apply when uploading files + upload_filter: CommonFileFilter, + + /// the queue of files to upload + upload_queue: Vec, + receiver: Receiver, + cache_dir: PathBuf, + wait_time_before_upload: Duration, + + running_uploads: HashMap, +} + +impl<'a> DriveFileUploader { + #[instrument] + pub fn new(drive: GoogleDrive, + upload_filter: CommonFileFilter, + receiver: Receiver, + cache_dir: PathBuf, + wait_time_before_upload: Duration) -> Self { + Self { + drive, + upload_filter, + upload_queue: Vec::new(), + receiver, + cache_dir, + wait_time_before_upload, + running_uploads: HashMap::new(), + } + } + #[instrument(skip(self), fields(self.upload_queue = self.upload_queue.len(), + self.upload_filter = self.upload_filter.filter.num_ignores()))] + pub async fn listen(&mut self) { + info!("listening for file upload requests"); + loop { + // while let Some(command) = self.receiver.recv().await { + let command = self.receiver.recv().await; + if let Some(command) = command { + debug!("received path: {:?}", command); + debug!("received path: {:?}", command); + match command { + FileUploaderCommand::UploadChange(file_command) => { + let path = file_command.path; + let file_metadata = file_command.file_metadata; + if !self.upload_filter.is_filter_matched(&path).unwrap_or(false) { + let drive = self.drive.clone(); + let drive_id = file_metadata.drive_id.clone().with_context(|| "no drive_id"); + if let Err(e) = drive_id { + error!("failed to upload file: {:?} with error: {}", path, e); + continue; + } + let drive_id = drive_id.unwrap(); + + self.cancel_and_wait_for_running_upload_for_id(&drive_id).await; + + info!("queuing upload of file: {:?}", path); + let wait_time_before_upload = self.wait_time_before_upload.clone(); + let (rx, rc) = channel(1); + let upload_handle = tokio::spawn(async move { Self::upload_file(drive, file_metadata, path, wait_time_before_upload, rc).await }); + self.running_uploads.insert(drive_id, RunningUpload { + join_handle: upload_handle, + stop_sender: rx, + }); + } else { + info!("skipping upload of file since it is ignored: {:?}", path); + } + } + FileUploaderCommand::Stop => { + info!("received stop command: stopping file upload listener"); + break; + } + _ => { warn!("received unknown command: {:?}", command); } + }; + } else { + warn!("received None command, meaning all senders have been dropped. \ + stopping file upload listener since no more commands will be received"); + break; + } + } + + info!("file upload listener stopped"); + } + + /// this function checks if there are any running uploads for the given drive_id + /// and if there are, it sends a stop command to all of them and then awaits for them to finish + async fn cancel_and_wait_for_running_upload_for_id(&mut self, drive_id: &String) { + debug!("checking for running uploads for file: {:?}", drive_id); + let running_uploads: Option<&mut RunningUpload> = self.running_uploads.get_mut(drive_id); + if let Some(running_upload) = running_uploads { + debug!("trying to send stop command to running upload for file: {:?}", drive_id); + running_upload.stop_sender.send(()).await; + + debug!("waiting for running upload for file: {:?}", drive_id); + let x: &mut JoinHandle> = &mut running_upload.join_handle; + tokio::join!(x); + debug!("finished waiting for running upload for file: {:?} ", drive_id); + + debug!("removing running upload for file: {:?}", drive_id); + self.running_uploads.remove(drive_id); + } + } + #[instrument(skip(file_metadata))] + async fn upload_file(drive: GoogleDrive, + file_metadata: drive3::api::File, + local_path: PathBuf, + wait_time_before_upload: Duration, + rc: Receiver<()>) -> anyhow::Result<()> { + // debug!("uploading file: {:?}", local_path); + debug!("sleeping for {:?} before uploading {}", wait_time_before_upload, local_path.display()); + tokio::select! { + _ = Self::wait_for_cancel_signal(rc) => { + debug!("received stop signal: stopping upload"); + return Ok(()); + }, + _ = tokio::time::sleep(wait_time_before_upload)=> { + debug!("done sleeping"); + return Self::upload_file_(&drive, file_metadata, &local_path, wait_time_before_upload) + .await + .map_err(|e| { + error!("error uploading file: {:?}: {:?}", local_path, e); + // FileUploadError { + // path: local_path, + // error: anyhow!(e), + anyhow!(e) + // } + }); + } + } + } + + #[instrument(skip(rc))] + async fn wait_for_cancel_signal(mut rc: Receiver<()>) { + match rc.recv().await { + Some(v) => { + debug!("received stop signal: stopping upload"); + } + _ => { warn!("received None from cancel signal receiver") } + } + } + async fn upload_file_(drive: &GoogleDrive, file_metadata: File, local_path: &PathBuf, wait_duration: Duration) -> anyhow::Result<()> { + debug!("uploading file: {:?}", local_path); + let path = local_path.as_path(); + drive.upload_file_content_from_path(file_metadata, path).await?; + // let result = drive.list_files(DriveId::from("root")).await.with_context(|| format!("could not do it"))?; + debug!("upload_file_: done"); + + Ok(()) + } +} + +pub struct FileUploadError { + path: PathBuf, + error: anyhow::Error, +} diff --git a/src/fs/drive/filesystem.rs b/src/fs/drive/filesystem.rs new file mode 100644 index 0000000..4efb5b5 --- /dev/null +++ b/src/fs/drive/filesystem.rs @@ -0,0 +1,980 @@ +use std::{ + any::Any, + collections::HashMap, + ffi::{OsStr, OsString}, + fmt::Display, + fs::OpenOptions, + os::unix::prelude::*, + path::{Path, PathBuf}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::io::{Seek, SeekFrom, Write}; +use std::ops::Deref; + +use anyhow::{anyhow, Context, Error}; +use async_recursion::async_recursion; +use drive3::api::File; +use fuser::{ + FileAttr, + Filesystem, + FileType, + FUSE_ROOT_ID, + KernelConfig, + ReplyAttr, + ReplyData, + ReplyDirectory, + ReplyEmpty, + ReplyEntry, + ReplyIoctl, + ReplyLock, + ReplyLseek, + ReplyOpen, + ReplyStatfs, + ReplyWrite, + ReplyXattr, + Request, + TimeOrNow, +}; +use futures::TryFutureExt; +use libc::c_int; +use mime::Mime; +use tempfile::TempDir; +use tokio::{ + io::{AsyncBufReadExt, stdin}, + runtime::Runtime, +}; +use tracing::{debug, error, instrument, warn}; + +use crate::{async_helper::run_async_blocking, common::LocalPath, config::common_file_filter::CommonFileFilter, fs::common::CommonFilesystem, fs::CommonEntry, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, google_drive, prelude::*}; +use crate::fs::drive::{FileCommand, FileUploaderCommand, SyncSettings}; + +enum CacheState { + Missing, + UpToDate, + RefreshNeeded, +} + +#[derive(Debug)] +pub struct DriveFilesystem { + // runtime: Runtime, + /// the point where the filesystem is mounted + root: PathBuf, + /// the source dir to read from and write to + source: GoogleDrive, + /// the cache dir to store the files in + cache_dir: Option, + + /// the filter to apply when uploading files + // upload_filter: CommonFileFilter, + + entries: HashMap, + + children: HashMap>, + + /// with this we can send a path to the file uploader + /// to tell it to upload certain files. + file_uploader_sender: tokio::sync::mpsc::Sender, + + /// The generation of the filesystem + /// This is used to invalidate the cache + /// when the filesystem is remounted + generation: u64, + + settings: SyncSettings, +} + +impl Display for DriveFilesystem { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "DriveFilesystem {{ entries: {}, gen: {}, settings: {} }}", + // self.root.display(), + // self.cache_dir.as_ref().map(|dir| dir.path().display()), + self.entries.len(), + self.generation, + self.settings, + ) + } +} + +impl DriveFilesystem { + #[instrument(fields(% self, entry))] + async fn schedule_upload(&self, entry: &DriveEntry) -> Result<()> { + debug!("DriveFilesystem::schedule_upload(entry: {:?})", entry); + let path = self.get_cache_path_for_entry(entry)?; + let metadata = Self::create_drive_metadata_from_entry(entry)?; + debug!("schedule_upload: sending path to file uploader..."); + self.file_uploader_sender.send(FileUploaderCommand::UploadChange(FileCommand::new(path, metadata))).await?; + debug!("schedule_upload: sent path to file uploader"); + Ok(()) + } + + fn create_drive_metadata_from_entry(entry: &DriveEntry) -> Result { + Ok(File { + drive_id: match entry.drive_id.clone().into_string() { + Ok(v) => Some(v), + Err(_) => None + }, + // name: match entry.name.clone().into_string() { + // Ok(v) => Some(v), + // Err(_) => None + // }, + // size: Some(entry.attr.size as i64), + // modified_time: Some(entry.attr.mtime.into()), + // file_extension: match entry.local_path.extension().clone() { + // Some(v) => v.to_str().map(|v| v.to_string()), + // None => None + // }, + ..Default::default() + }) + } +} + +// region general +impl DriveFilesystem { + #[instrument(skip(file_uploader_sender))] + pub async fn new(root: impl AsRef + Debug, + config_path: impl AsRef + Debug, + file_uploader_sender: tokio::sync::mpsc::Sender, + drive: GoogleDrive, cache_dir: PathBuf, + settings: SyncSettings) -> Result { + let root = root.as_ref(); + let config_path = config_path.as_ref(); + debug!("DriveFilesystem::new(root:{}; config_path: {})", root.display(), config_path.display()); + // let upload_filter = CommonFileFilter::from_path(config_path)?; + let mut entries = HashMap::new(); + let now = SystemTime::now(); + // Add root directory with inode number 1 + let root_attr = FileAttr { + ino: FUSE_ROOT_ID, + size: 0, + blocks: 0, + atime: now, + mtime: now, + ctime: now, + crtime: now, + kind: FileType::Directory, + perm: 0o755, + nlink: 2, + uid: 0, + gid: 0, + rdev: 0, + blksize: 4096, + flags: 0, + }; + let inode = Inode::from(FUSE_ROOT_ID); + entries.insert( + inode, + DriveEntry::new( + inode, + "root".to_string(), + DriveId::root(), + LocalPath::from(Path::new("")), + root_attr, + None, + ), + ); + + let mut s = Self { + root: root.to_path_buf(), + source: drive, + cache_dir: Some(cache_dir), + entries, + file_uploader_sender, + /*TODO: implement a way to increase this if necessary*/ + generation: 0, + children: HashMap::new(), + settings, + }; + // + // let root = s.root.to_path_buf(); + // s.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true) + // .await; + + Ok(s) + } + #[instrument(fields(% self, inode))] + fn get_cache_dir_for_file(&self, inode: Inode) -> Result { + debug!("get_cache_dir_for_file: {}", inode); + let cache_dir = self.cache_dir.as_ref().ok_or(anyhow!("no cache dir"))?; + debug!( + "get_cache_dir_for_file: {}, cache_dir: {}", + inode, + cache_dir.display() + ); + let entry = self + .get_entry(inode) + .ok_or(anyhow!("could not get entry"))?; + debug!( + "get_cache_dir_for_file: entry local_path: {}", + entry.local_path.display() + ); + let path = Self::construct_cache_folder_path(cache_dir, entry); + debug!("get_cache_dir_for_file: {}: {}", inode, path.display()); + Ok(path) + } + + #[instrument] + fn construct_cache_folder_path(cache_dir: &Path, entry: &DriveEntry) -> PathBuf { + let folder_path = match entry.local_path.parent() { + Some(p) => p.as_os_str(), + None => OsStr::new(""), + }; + debug!("construct_cache_folder_path: folder_path: {:?}", folder_path); + let path = cache_dir.join(folder_path); + debug!("construct_cache_folder_path: {}", path.display()); + path + } + #[async_recursion::async_recursion] + #[instrument(fields(% self, folder_path, parent_ino, inline_self))] + async fn add_dir_entry( + &mut self, + folder_path: &Path, + parent_ino: Inode, + inline_self: bool, + ) -> Result<()> { + let ino; + debug!( + "add_dir_entry: {:?}; parent: {}; inline_self: {} ", + folder_path, parent_ino, inline_self + ); + if self.root == folder_path { + debug!("add_dir_entry: root folder"); + ino = parent_ino; + } else if inline_self { + debug!("add_dir_entry: inlining self entry for {:?}", folder_path); + ino = parent_ino; + } else { + debug!("add_dir_entry: adding entry for {:?}", folder_path); + ino = self + .add_entry( + folder_path.file_name().ok_or(anyhow!("invalid filename"))?, + /*TODO: correct permissions*/ + 0o755, + FileType::Directory, + parent_ino, + /*TODO: implement size for folders*/ 0, + ) + .await?; + } + + let drive = &self.source; + + let folder_drive_id: DriveId = self + .get_drive_id(ino) + .ok_or(anyhow!("could not find dir drive_id"))?; + debug!( + "add_dir_entry: getting files for '{:50?}' {}", + folder_drive_id, + folder_path.display() + ); + let files; + { + let files_res = self.source.list_files(folder_drive_id).await; + if let Err(e) = files_res { + warn!("could not get files: {}", e); + return Ok(()); + } + files = files_res.unwrap(); + } + debug!("got {} files", files.len()); + // let d = std::fs::read_dir(folder_path); + + for entry in files { + debug!("entry: {:?}", entry); + let name = entry.name.as_ref().ok_or_else(|| "no name"); + if let Err(e) = name { + warn!("could not get name: {}", e); + continue; + } + let name = name.as_ref().unwrap(); + if name.contains("/") || name.contains("\\") || name.contains(":") { + warn!("invalid name: {}", name); + continue; + } + let path = folder_path.join(&name); + + if let None = &entry.mime_type { + warn!("could not get mime_type"); + continue; + } + + let mime_type = entry.mime_type.as_ref().unwrap(); + if mime_type == "application/vnd.google-apps.document" + || mime_type == "application/vnd.google-apps.spreadsheet" + || mime_type == "application/vnd.google-apps.drawing" + || mime_type == "application/vnd.google-apps.form" + || mime_type == "application/vnd.google-apps.presentation" + || mime_type == "application/vnd.google-apps.drive-sdk" + || mime_type == "application/vnd.google-apps.script" + //TODO: add all relevant mime types + { + debug!( + "skipping google file: mime_type: '{}' entry: {:?}", + mime_type, entry + ); + continue; + } else if mime_type == "application/vnd.google-apps.folder" { + debug!("adding folder: {:?}", path); + let res = self.add_dir_entry(&path, ino, false).await; + if let Err(e) = res { + warn!("could not add folder: {}", e); + continue; + } + } else { + debug!("adding file: '{}' {:?}", mime_type, path); + let size = match Self::get_size_from_drive_metadata(&entry) { + Some(value) => value, + None => continue, + }; + let mode = 0o644; //TODO: get mode from settings + + self.add_file_entry(ino, &OsString::from(&name), mode as u16, size) + .await; + } + } + + Ok(()) + } + + #[instrument] + fn get_size_from_drive_metadata(entry: &File) -> Option { + let size = entry.size.ok_or_else(|| 0); + if let Err(e) = size { + warn!("could not get size: {}", e); + return None; + } + let size = size.unwrap(); + if size < 0 { + warn!("invalid size: {}", size); + return None; + } + let size = size as u64; + Some(size) + } + #[instrument(fields(% self, ino))] + fn get_drive_id(&self, ino: impl Into) -> Option { + self.get_entry(ino).map(|e| e.drive_id.clone()) + } +} +// endregion + +// region caching +impl DriveFilesystem { + async fn download_file_to_cache(&mut self, ino: impl Into) -> Result { + let ino = ino.into(); + debug!("download_file_to_cache: {}", ino); + let entry = self.get_entry_r(ino)?; + let drive_id = entry.drive_id.clone(); + let drive = &self.source; + let cache_path = self.get_cache_path_for_entry(&entry)?; + let folder = cache_path.parent() + .ok_or(anyhow!("could not get the folder the cache file should be saved in"))?; + if !folder.exists() { + debug!("creating folder: {}", folder.display()); + std::fs::create_dir_all(folder)?; + } + debug!("downloading file: {}", cache_path.display()); + drive.download_file(drive_id, &cache_path).await?; + debug!("downloaded file: {}", cache_path.display()); + let size = std::fs::metadata(&cache_path)?.len(); + self.set_entry_size(ino, size); + self.set_entry_cached(ino)?; + //TODO: check if any other things need to be updated for the entry + Ok(cache_path) + } + + fn set_entry_cached(&mut self, ino: Inode) -> Result<()> { + let mut entry = self.get_entry_mut(ino).ok_or(anyhow!("could not get entry"))?; + entry.content_cache_time = Some(SystemTime::now()); + Ok(()) + } + fn check_if_file_is_cached(&self, ino: impl Into + Debug) -> Result { + let entry = self.get_entry_r(ino)?; + let path = self.get_cache_path_for_entry(&entry)?; + let exists = path.exists(); + Ok(exists) + } + #[instrument(fields(% self))] + async fn update_entry_metadata_cache_if_needed(&mut self, ino: impl Into + Debug) -> Result<()> { + //TODO: do something that uses the changes api so not every file needs to check for + // itself if it needs to update, rather it gets checked once and then updates all the + // cache times for all files + + let ino = ino.into(); + let entry = self.get_entry_r(ino)?; + let refresh_cache = self.get_cache_state(&entry.metadata_cache_time); + match refresh_cache { + CacheState::RefreshNeeded | CacheState::Missing => { + debug!("refreshing metadata cache for drive_id: {:?}", entry.drive_id); + let metadata = self.source.get_metadata_for_file(entry.drive_id.clone()).await?; + self.update_entry_metadata(ino, &metadata)?; + self.set_entry_metadata_cached(ino)?; + } + CacheState::UpToDate => { + debug!("metadata cache is up to date"); + } + _ => { + debug!("unknown cache state"); + } + } + Ok(()) + } + #[instrument(fields(% self))] + async fn update_cache_if_needed(&mut self, ino: impl Into + Debug) -> Result<()> { + let ino = ino.into(); + self.update_entry_metadata_cache_if_needed(ino).await?; + let entry = match self.get_entry_r(ino) { + Ok(entry) => entry, + Err(e) => { + warn!("could not get entry: {}", e); + return Err(e); + } + }; + let refresh_cache = self.get_cache_state(&entry.content_cache_time); + match refresh_cache { + CacheState::Missing => { + debug!("no local cache for: {}, downloading...", ino); + self.download_file_to_cache(ino).await?; + } + CacheState::RefreshNeeded => { + debug!("cache needs refresh for: {}, checking for updated version...", ino); + let remote_mod_time: SystemTime = self.get_modified_time_on_remote(ino).await?; + debug!("remote_mod_time: {:?}", remote_mod_time); + let local_mod_time = self.get_entry_r(ino)?.attr.mtime; + debug!("local_mod_time: {:?}", local_mod_time); + if remote_mod_time > local_mod_time { + debug!("updating cached file since remote_mod_time: {:?} > local_mod_time: {:?}", remote_mod_time, local_mod_time); + self.download_file_to_cache(ino).await?; + } else { + debug!("local file is up to date: remote_mod_time: {:?} <= local_mod_time: {:?}", remote_mod_time, local_mod_time); + } + } + CacheState::UpToDate => { + debug!("Cache up to date for {} since {:?} > {}", ino, entry.content_cache_time.unwrap(), self.settings.cache_time().as_secs()); + } + } + Ok(()) + } + + fn get_cache_state(&self, cache_time: &Option) -> CacheState { + let refresh_cache: CacheState = match cache_time { + Some(cache_time) => { + let now = SystemTime::now(); + let duration = now.duration_since(*cache_time).unwrap(); + // let seconds = duration.as_secs(); + if duration > self.settings.cache_time() { + CacheState::RefreshNeeded + } else { + CacheState::UpToDate + } + } + None => CacheState::Missing, + }; + refresh_cache + } + + fn get_cache_path_for_entry(&self, entry: &DriveEntry) -> Result { + debug!("get_cache_path_for_entry: {}", entry.ino); + let cache_folder = match self.cache_dir.as_ref() { + Some(x) => x, + None => return Err(anyhow!("cache_dir is None").into()), + }; + let path = Self::construct_cache_path_for_entry(&cache_folder, entry); + Ok(path) + } + fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf { + debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display()); + let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name); + debug!( + "get_cache_path_for_entry: {}: {}", + entry.ino, + path.display() + ); + path + } + async fn get_modified_time_on_remote(&self, ino: Inode) -> Result { + let entry = self.get_entry_r(ino)?; + let drive_id = entry.drive_id.clone(); + let drive = &self.source; + let modified_time = drive.get_modified_time(drive_id).await?; + Ok(modified_time) + } + fn set_entry_size(&mut self, ino: Inode, size: u64) -> anyhow::Result<()> { + self.get_entry_mut(ino).context("no entry for ino")?.attr.size = size; + Ok(()) + } + fn update_entry_metadata(&mut self, ino: Inode, drive_metadata: &google_drive3::api::File) -> anyhow::Result<()> { + let entry = self.get_entry_mut(ino).context("no entry with ino")?; + if let Some(name) = drive_metadata.name.as_ref() { + entry.name = OsString::from(name); + } + if let Some(size) = drive_metadata.size.as_ref() { + entry.attr.size = *size as u64; + } + if let Some(modified_time) = drive_metadata.modified_time.as_ref() { + entry.attr.mtime = (*modified_time).into(); + } + if let Some(created_time) = drive_metadata.created_time.as_ref() { + entry.attr.ctime = (*created_time).into(); + } + if let Some(viewed_by_me) = drive_metadata.viewed_by_me_time.as_ref(){ + entry.attr.atime = (*viewed_by_me).into(); + } + + Ok(()) + } + fn set_entry_metadata_cached(&mut self, ino: Inode) -> anyhow::Result<()> { + let mut entry = self.get_entry_mut(ino).context("no entry with ino")?; + entry.metadata_cache_time = Some(SystemTime::now()); + Ok(()) + } +} + +// endregion + +// region common +#[async_trait::async_trait] +impl CommonFilesystem for DriveFilesystem { + fn get_entries(&self) -> &HashMap { + &self.entries + } + + fn get_entries_mut(&mut self) -> &mut HashMap { + &mut self.entries + } + + fn get_children(&self) -> &HashMap> { + &self.children + } + + fn get_children_mut(&mut self) -> &mut HashMap> { + &mut self.children + } + + fn get_root_path(&self) -> LocalPath { + self.root.clone().into() + } + + #[instrument(fields(% self, name, mode, file_type, parent_ino, size))] + async fn add_entry( + &mut self, + name: &OsStr, + mode: u16, + file_type: FileType, + parent_ino: impl Into + Send + Debug, + size: u64, + ) -> Result { + let parent_ino = parent_ino.into(); + debug!("add_entry: (0) name:{:20?}; parent: {}", name, parent_ino); + let ino = self.generate_ino(); // Generate a new inode number + let now = std::time::SystemTime::now(); + //TODO: write the actual creation and modification time, not just now + let attr = FileAttr { + ino: ino.into(), + size: size, + /* TODO: set block size to something usefull. + maybe set it to 0 but when the file is cached set it to however big the + file in the cache is? that way it shows the actual size in blocks that are + used*/ + blocks: 0, + atime: now, + mtime: now, + ctime: now, + crtime: now, + kind: file_type, + perm: mode, + nlink: 1, + uid: 0, + gid: 0, + rdev: 0, + /*TODO: set the actual block size?*/ + blksize: 4096, + flags: 0, + }; + + let parent_drive_id = self.get_drive_id(parent_ino); + let drive_id: DriveId = self.source.get_id(name, parent_drive_id).await?; + debug!("add_entry: (1) drive_id: {:?}", drive_id); + + let parent_local_path = self.get_path_from_ino(parent_ino); + let parent_path: PathBuf = parent_local_path + .ok_or(anyhow!("could not get local path"))? + .into(); + + self.get_entries_mut().insert( + ino, + DriveEntry::new(ino, name, drive_id, parent_path.join(name), attr, None), + ); + + self.add_child(parent_ino, &ino); + debug!("add_entry: (2) after adding count: {}", self.entries.len()); + Ok(ino) + } +} + +// endregion + +//region some convenience functions/implementations +// fn check_if_entry_is_cached + +//endregion + +//region filesystem +impl Filesystem for DriveFilesystem { + //region init + #[instrument(skip(_req, _config), fields(% self))] + fn init( + &mut self, + _req: &Request<'_>, + _config: &mut KernelConfig, + ) -> std::result::Result<(), c_int> { + debug!("init"); + + let root = self.root.to_path_buf(); + let x = run_async_blocking(self.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true)); + if let Err(e) = x { + error!("could not add root entry: {}", e); + } + Ok(()) + } + //endregion + //region destroy + #[instrument(fields(% self))] + fn destroy(&mut self) { + debug!("destroy"); + self.file_uploader_sender.send(FileUploaderCommand::Stop); + } + //endregion + //region lookup + #[instrument(skip(_req, reply), fields(% self))] + fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) { + debug!("lookup: {}:{:?}", parent, name); + let parent = parent.into(); + let children = self.children.get(&parent); + if children.is_none() { + warn!("lookup: could not find children for {}", parent); + reply.error(libc::ENOENT); + return; + } + let children = children.unwrap().clone(); + debug!("lookup: children: {:?}", children); + for child_inode in children { + + run_async_blocking(self.update_entry_metadata_cache_if_needed(child_inode)); + let entry = self.entries.get(&child_inode); + if entry.is_none() { + warn!("lookup: could not find entry for {}", child_inode); + continue; + } + let entry = entry.unwrap(); + + let path: PathBuf = entry.name.clone().into(); + let accepted = name.eq_ignore_ascii_case(&path); + debug!( + "entry: {}:(accepted={}){:?}; {:?}", + child_inode, accepted, path, entry.attr + ); + if accepted { + reply.entry(&self.settings.time_to_live(), &entry.attr, self.generation); + return; + } + } + warn!("lookup: could not find entry for {:?}", name); + + reply.error(libc::ENOENT); + } + //endregion + //region getattr + #[instrument(skip(_req, reply), fields(% self))] + fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) { + debug!("getattr: {}", ino); + run_async_blocking(self.update_entry_metadata_cache_if_needed(ino)); + let entry = self.entries.get(&ino.into()); + if let Some(entry) = entry { + reply.attr(&self.settings.time_to_live(), &entry.attr); + } else { + reply.error(libc::ENOENT); + } + } + //endregion + //region setattr + #[instrument(skip(_req, reply), fields(% self))] + fn setattr( + &mut self, + _req: &Request<'_>, + ino: u64, + mode: Option, + uid: Option, + gid: Option, + size: Option, + _atime: Option, + _mtime: Option, + _ctime: Option, + /*TODO: check if this change need to be implemented*/ + fh: Option, + _crtime: Option, + /*TODO: check if this change need to be implemented*/ + _chgtime: Option, + /*TODO: check if this change need to be implemented*/ + _bkuptime: Option, + flags: Option, + reply: ReplyAttr, + ) { + // debug!("setattr: {}", ino); + + debug!( + "setattr: {}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}", + ino, + mode, + uid, + gid, + size, + _atime, + _mtime, + _ctime, + fh, + _crtime, + _chgtime, + _bkuptime, + flags + ); + let ttl = self.settings.time_to_live(); + let entry = self.get_entry_mut(ino); + if let None = entry { + error!("setattr: could not find entry for {}", ino); + reply.error(libc::ENOENT); + return; + } + let mut entry = entry.unwrap(); + let attr = &mut entry.attr; + + if let Some(mode) = mode { + attr.perm = mode as u16; + } + if let Some(uid) = uid { + attr.uid = uid; + } + if let Some(gid) = gid { + attr.gid = gid; + } + if let Some(size) = size { + attr.size = size; + } + if let Some(flags) = flags { + attr.flags = flags; + } + reply.attr(&ttl, &attr); + //TODO: update file on drive if necessary + } + //endregion + //region write + #[instrument(skip(_req, reply), fields(% self, data = data.len()))] + fn write(&mut self, + _req: &Request<'_>, + ino: u64, + fh: u64, + offset: i64, + data: &[u8], + write_flags: u32, + flags: i32, + lock_owner: Option, + reply: ReplyWrite) { + debug!( + "write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}", + ino, fh, offset, flags, lock_owner, write_flags, data, + ); + let cache_update_success: Result<()> = run_async_blocking(self.update_cache_if_needed(ino)); + if let Err(e) = cache_update_success { + error!("write: could not update cache: {}", e); + reply.error(libc::EIO); + return; + } + let cache_dir = self.cache_dir + .as_ref() + .map(|s| s.to_path_buf()); + if let None = cache_dir { + error!("write: cache dir not set"); + reply.error(libc::ENOENT); + return; + } + let cache_dir = cache_dir.unwrap(); + { + let entry = self.get_entry_mut(ino); + if let None = entry { + error!("write: could not find entry for {}", ino); + reply.error(libc::ENOENT); + return; + } + let mut entry = entry.unwrap(); + //TODO: queue uploads on a separate thread + + let path = Self::construct_cache_path_for_entry(&cache_dir, &entry); + // let path = entry.local_path.to_path_buf(); + debug!("opening file: {:?}", &path); + let file = OpenOptions::new() + .truncate(false) + .create(true) + .write(true) + .open(&path); + if let Err(e) = file { + error!("write: could not open file: {:?}: {}", path, e); + reply.error(libc::ENOENT); + return; + } + let mut file = file.unwrap(); + + debug!("writing file: {:?} at {} with size {}", + &path, + offset, + data.len() + ); + + file.seek(SeekFrom::Start(offset as u64)).unwrap(); + file.write_all(data).unwrap(); + let size = data.len(); + // let size = file.write_at(data, offset as u64); + // if let Err(e) = size { + // error!("write: could not write file: {:?}: {}", path, e); + // reply.error(libc::ENOENT); + // return; + // } + // let size = size.unwrap(); + debug!("wrote file: {:?} at {}; wrote {} bits", &path, offset, size); + reply.written(size as u32); + //TODO: update size in entry if necessary + debug!("updating size to {} for entry: {:?}", entry.attr.size, entry); + let mut attr = &mut entry.attr; + attr.size = attr.size.max(offset as u64 + size as u64); + let now = SystemTime::now(); + attr.mtime = now; + attr.ctime = now; + debug!("updated size to {} for entry: {:?}", entry.attr.size, entry); + debug!("write done for entry: {:?}", entry); + } + let entry = self.get_entry_r(&ino.into()) + .expect("how could this happen to me. I swear it was there a second ago"); + run_async_blocking(self.schedule_upload(&entry)); + } + //endregion + //region read + #[instrument(skip(_req, reply), fields(% self))] + fn read( + &mut self, + _req: &Request<'_>, + ino: u64, + fh: u64, + offset: i64, + size: u32, + flags: i32, + lock_owner: Option, + reply: ReplyData, + ) { + debug!( + "read: {:10}:{:2}:{:3}:{:10}:{:10X}:{:?}", + ino, fh, offset, size, flags, lock_owner + ); + + run_async_blocking(self.update_entry_metadata_cache_if_needed(ino)); + let x: Result<()> = run_async_blocking(self.update_cache_if_needed(ino)); + if let Err(e) = x { + error!("read: could not update cache: {}", e); + reply.error(libc::EIO); + return; + } + // let is_cached = self.check_if_file_is_cached(ino); + // if !is_cached.unwrap_or(false) { + // debug!("read: file is not cached: {}", ino); + // let x: Result = run_async_blocking(self.download_file_to_cache(ino)); + // + // if let Err(e) = x { + // error!("read: could not download file: {}", e); + // reply.error(libc::ENOENT); + // return; + // } + // } + + let entry = self.get_entry_r(&ino.into()); + if let Err(e) = entry { + error!("read: could not find entry for {}: {}", ino, e); + reply.error(libc::ENOENT); + return; + } + let entry = entry.unwrap(); + + let path = self.get_cache_path_for_entry(&entry); + if let Err(e) = path { + error!("read: could not get cache path: {}", e); + reply.error(libc::ENOENT); + return; + } + let path = path.unwrap(); + + debug!("read: path: {:?}", path); + let file = std::fs::File::open(&path); + if let Err(e) = file { + error!("read: could not open file: {}", e); + reply.error(libc::EIO); + return; + } + let mut file = file.unwrap(); + + let mut buf = vec![0; size as usize]; + debug!("reading file: {:?} at {} with size {}", &path, offset, size); + file.read_at(&mut buf, offset as u64).unwrap(); + debug!("read file: {:?} at {}", &path, offset); + reply.data(&buf); + } + //endregion + //region readdir + #[instrument(skip(_req, reply), fields(% self, ino, fh, offset))] + fn readdir( + &mut self, + _req: &Request<'_>, + ino: u64, + fh: u64, + mut offset: i64, + mut reply: ReplyDirectory, + ) { + debug!("readdir: {}:{}:{:?}", ino, fh, offset); + run_async_blocking(self.update_entry_metadata_cache_if_needed(ino)); + let children = self.children.get(&ino.into()); + if let Some(attr) = self.get_entries().get(&ino.into()).map(|entry| entry.attr) { + if attr.kind != FileType::Directory { + reply.error(libc::ENOTDIR); + return; + } + } + if children.is_none() { + reply.error(libc::ENOENT); + return; + } + + let children = children.unwrap(); + debug!("children ({}): {:?}", children.len(), children); + for child_inode in children.iter().skip(offset as usize) { + let entry = self.entries.get(child_inode).unwrap(); + let path: PathBuf = entry.local_path.clone().into(); + let attr = entry.attr; + let inode = (*child_inode).into(); + // Increment the offset for each processed entry + offset += 1; + debug!("entry: {}:{:?}; {:?}", inode, path, attr); + if reply.add(inode, offset, attr.kind, &entry.name) { + // If the buffer is full, we need to stop + debug!("readdir: buffer full"); + break; + } + } + debug!("readdir: ok"); + reply.ok(); + } + //endregion + //region access + #[instrument(fields(% self, ino, mask))] + fn access(&mut self, _req: &Request<'_>, ino: u64, mask: i32, reply: ReplyEmpty) { + reply.ok(); //TODO: implement this correctly + } + //endregion +} +//endregion diff --git a/src/fs/drive/mod.rs b/src/fs/drive/mod.rs index 7a44072..e3b32d5 100644 --- a/src/fs/drive/mod.rs +++ b/src/fs/drive/mod.rs @@ -1,741 +1,9 @@ -use crate::{ - google_drive::{DriveId, GoogleDrive}, - fs::CommonEntry, - fs::inode::Inode, - fs::common::CommonFilesystem, - common::LocalPath, - prelude::*, - async_helper::run_async_blocking, -}; -use anyhow::{anyhow, Error}; -use async_recursion::async_recursion; -use drive3::api::File; -use fuser::{ - FileAttr, FileType, Filesystem, KernelConfig, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, - ReplyEntry, ReplyOpen, ReplyStatfs, ReplyWrite, ReplyXattr, Request, TimeOrNow, FUSE_ROOT_ID, -}; -use futures::TryFutureExt; -use libc::c_int; -use log::{debug, error, warn}; -use mime::Mime; -use std::{ - any::Any, - collections::HashMap, - ffi::{OsStr, OsString}, - fmt::Display, - fs::OpenOptions, - os::unix::prelude::*, - path::{Path, PathBuf}, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; -use std::io::{Seek, SeekFrom, Write}; -use tempfile::TempDir; -use tokio::{ - io::{stdin, AsyncBufReadExt}, - runtime::Runtime, -}; pub use entry::*; - +pub use filesystem::*; +pub use file_uploader::*; +pub use settings::*; mod entry; - -#[derive(Debug)] -pub struct DriveFilesystem { - // runtime: Runtime, - /// the point where the filesystem is mounted - root: PathBuf, - /// the source dir to read from and write to - source: GoogleDrive, - /// the cache dir to store the files in - cache_dir: Option, - - /// How long the responses can/should be cached - time_to_live: Duration, - - entries: HashMap, - - children: HashMap>, - - /// The generation of the filesystem - /// This is used to invalidate the cache - /// when the filesystem is remounted - generation: u64, -} - -// region general -impl DriveFilesystem { - pub async fn new(root: impl AsRef) -> Result { - debug!("new: {:?};", root.as_ref()); - let mut entries = HashMap::new(); - let now = SystemTime::now(); - // Add root directory with inode number 1 - let root_attr = FileAttr { - ino: FUSE_ROOT_ID, - size: 0, - blocks: 0, - atime: now, - mtime: now, - ctime: now, - crtime: now, - kind: FileType::Directory, - perm: 0o755, - nlink: 2, - uid: 0, - gid: 0, - rdev: 0, - blksize: 4096, - flags: 0, - }; - let inode = FUSE_ROOT_ID.into(); - entries.insert( - inode, - DriveEntry { - ino: inode, - name: "root".into(), - local_path: LocalPath::from(Path::new("")), - drive_id: DriveId::root(), - // drive_path: "/".into(), - attr: root_attr, - }, - ); - - let cache_dir = tempfile::tempdir()?; - debug!("cache_dir: {:?}", cache_dir.path()); - if !cache_dir.path().exists() { - debug!("creating cache dir: {:?}", cache_dir.path()); - std::fs::create_dir_all(cache_dir.path())?; - } else { - debug!("cache dir exists: {}", cache_dir.path().display()); - } - let mut s = Self { - root: root.as_ref().to_path_buf(), - source: GoogleDrive::new().await?, - cache_dir: Some(cache_dir), - time_to_live: Duration::from_secs(2), - entries, - /*TODO: implement a way to increase this if necessary*/ - generation: 0, - children: HashMap::new(), - }; - // - // let root = s.root.to_path_buf(); - // s.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true) - // .await; - - Ok(s) - } - fn get_cache_dir_for_file(&self, inode: Inode) -> Result { - debug!("get_cache_dir_for_file: {}", inode); - let cache_dir = self.cache_dir.as_ref().ok_or(anyhow!("no cache dir"))?; - debug!( - "get_cache_dir_for_file: {}, cache_dir: {}", - inode, - cache_dir.path().display() - ); - let entry = self - .get_entry(inode) - .ok_or(anyhow!("could not get entry"))?; - debug!( - "get_cache_dir_for_file: entry local_path: {}", - entry.local_path.display() - ); - let path = Self::construct_cache_folder_path(cache_dir.path(), entry); - debug!("get_cache_dir_for_file: {}: {}", inode, path.display()); - Ok(path) - } - - fn construct_cache_folder_path(cache_dir: &Path, entry: &DriveEntry) -> PathBuf { - let folder_path = match entry.local_path.parent() { - Some(p) => p.as_os_str(), - None => OsStr::new(""), - }; - debug!("construct_cache_folder_path: folder_path: {:?}", folder_path); - let path = cache_dir.join(folder_path); - debug!("construct_cache_folder_path: {}", path.display()); - path - } - #[async_recursion::async_recursion] - async fn add_dir_entry( - &mut self, - folder_path: &Path, - parent_ino: Inode, - skip_self: bool, - ) -> Result<()> { - let ino; - debug!( - "add_dir_entry: {:?}; parent: {}; skip_self: {} ", - folder_path, parent_ino, skip_self - ); - if self.root == folder_path { - ino = parent_ino; - } else { - debug!("add_dir_entry: adding entry for {:?}", folder_path); - ino = self - .add_entry( - folder_path.file_name().ok_or(anyhow!("invalid filename"))?, - /*TODO: correct permissions*/ - 0o755, - FileType::Directory, - parent_ino, - /*TODO: implement size for folders*/ 0, - ) - .await?; - } - - let drive = &self.source; - - let folder_drive_id: DriveId = self - .get_drive_id(ino) - .ok_or(anyhow!("could not find dir drive_id"))?; - debug!( - "add_dir_entry: getting files for '{:50?}' {}", - folder_drive_id, - folder_path.display() - ); - let files; - { - let files_res = self.source.list_files(folder_drive_id).await; - if let Err(e) = files_res { - warn!("could not get files: {}", e); - return Ok(()); - } - files = files_res.unwrap(); - } - debug!("got {} files", files.len()); - // let d = std::fs::read_dir(folder_path); - - for entry in files { - debug!("entry: {:?}", entry); - let name = entry.name.as_ref().ok_or_else(|| "no name"); - if let Err(e) = name { - warn!("could not get name: {}", e); - continue; - } - let name = name.as_ref().unwrap(); - if name.contains("/") || name.contains("\\") || name.contains(":") { - warn!("invalid name: {}", name); - continue; - } - let path = folder_path.join(&name); - - if let None = &entry.mime_type { - warn!("could not get mime_type"); - continue; - } - - let mime_type = entry.mime_type.as_ref().unwrap(); - if mime_type == "application/vnd.google-apps.document" - || mime_type == "application/vnd.google-apps.spreadsheet" - || mime_type == "application/vnd.google-apps.drawing" - || mime_type == "application/vnd.google-apps.form" - || mime_type == "application/vnd.google-apps.presentation" - || mime_type == "application/vnd.google-apps.drive-sdk" - || mime_type == "application/vnd.google-apps.script" - //TODO: add all relevant mime types - { - debug!( - "skipping google file: mime_type: '{}' entry: {:?}", - mime_type, entry - ); - continue; - } else if mime_type == "application/vnd.google-apps.folder" { - debug!("adding folder: {:?}", path); - let res = self.add_dir_entry(&path, ino, false).await; - if let Err(e) = res { - warn!("could not add folder: {}", e); - continue; - } - // } else if metadata.is_file() { - } else { - debug!("adding file: '{}' {:?}", mime_type, path); - let size = match Self::get_size_from_drive_metadata(&entry) { - Some(value) => value, - None => continue, - }; - let mode = 0o644; //TODO: get mode from settings - self.add_file_entry(ino, &OsString::from(&name), mode as u16, size) - .await; - } - } - - Ok(()) - } - - fn get_size_from_drive_metadata(entry: &File) -> Option { - let size = entry.size.ok_or_else(|| 0); - if let Err(e) = size { - warn!("could not get size: {}", e); - return None; - } - let size = size.unwrap(); - if size < 0 { - warn!("invalid size: {}", size); - return None; - } - let size = size as u64; - Some(size) - } - fn get_drive_id(&self, ino: impl Into) -> Option { - self.get_entry(ino).map(|e| e.drive_id.clone()) - } -} -// endregion - -// region caching -impl DriveFilesystem { - async fn download_file_to_cache(&self, ino: impl Into) -> Result { - let ino = ino.into(); - debug!("download_file_to_cache: {}", ino); - let entry = self.get_entry_r(ino)?; - let drive_id = entry.drive_id.clone(); - let drive = &self.source; - let path = self.get_cache_path_for_entry(&entry)?; - let folder = path.parent().unwrap(); - if !folder.exists() { - debug!("creating folder: {}", folder.display()); - std::fs::create_dir_all(folder)?; - } - debug!("downloading file: {}", path.display()); - drive.download_file(drive_id, &path).await?; - Ok(path) - } - fn check_if_file_is_cached(&self, ino: impl Into) -> Result { - let entry = self.get_entry_r(ino)?; - let path = self.get_cache_path_for_entry(&entry)?; - let exists = path.exists(); - Ok(exists) - } - - fn get_cache_path_for_entry(&self, entry: &DriveEntry) -> Result { - debug!("get_cache_path_for_entry: {}", entry.ino); - let cache_folder = match self.cache_dir.as_ref() { - Some(x) => x.path() , - None => return Err(anyhow!("cache_dir is None").into()), - }; - let path = Self::construct_cache_path_for_entry(&cache_folder, entry); - Ok(path) - } - fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf { - debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display()); - let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name); - debug!( - "get_cache_path_for_entry: {}: {}", - entry.ino, - path.display() - ); - path - } -} - -// endregion - -// region common -#[async_trait::async_trait] -impl CommonFilesystem for DriveFilesystem { - fn get_entries(&self) -> &HashMap { - &self.entries - } - - fn get_entries_mut(&mut self) -> &mut HashMap { - &mut self.entries - } - - fn get_children(&self) -> &HashMap> { - &self.children - } - - fn get_children_mut(&mut self) -> &mut HashMap> { - &mut self.children - } - - fn get_root_path(&self) -> LocalPath { - self.root.clone().into() - } - - async fn add_entry( - &mut self, - name: &OsStr, - mode: u16, - file_type: FileType, - parent_ino: impl Into + Send, - size: u64, - ) -> Result { - let parent_ino = parent_ino.into(); - debug!("add_entry: (0) name:{:20?}; parent: {}", name, parent_ino); - let ino = self.generate_ino(); // Generate a new inode number - let now = std::time::SystemTime::now(); - //TODO: write the actual creation and modification time, not just now - let attr = FileAttr { - ino: ino.into(), - size: size, - /* TODO: set block size to something usefull. - maybe set it to 0 but when the file is cached set it to however big the - file in the cache is? that way it shows the actual size in blocks that are - used*/ - blocks: 0, - atime: now, - mtime: now, - ctime: now, - crtime: now, - kind: file_type, - perm: mode, - nlink: 1, - uid: 0, - gid: 0, - rdev: 0, - /*TODO: set the actual block size?*/ - blksize: 4096, - flags: 0, - }; - - let parent_drive_id = self.get_drive_id(parent_ino); - let drive_id: DriveId = self.source.get_id(name, parent_drive_id).await?; - debug!("add_entry: (1) drive_id: {:?}", drive_id); - - let parent_local_path = self.get_path_from_ino(parent_ino); - let parent_path: PathBuf = parent_local_path - .ok_or(anyhow!("could not get local path"))? - .into(); - - self.get_entries_mut().insert( - ino, - DriveEntry::new(ino, name, drive_id, parent_path.join(name), attr), - ); - - self.add_child(parent_ino, &ino); - debug!("add_entry: (2) after adding count: {}", self.entries.len()); - Ok(ino) - } -} - -// endregion - -//region some convenience functions/implementations -impl Display for DriveFilesystem { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DriveFilesystem at: '/{}'", self.root.display()) - } -} - -//endregion - -//region filesystem -impl Filesystem for DriveFilesystem { - //region init - fn init( - &mut self, - _req: &Request<'_>, - _config: &mut KernelConfig, - ) -> std::result::Result<(), c_int> { - debug!("init"); - - let root = self.root.to_path_buf(); - let x = run_async_blocking(self.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true)); - if let Err(e) = x { - error!("could not add root entry: {}", e); - } - Ok(()) - } - //endregion - //region destroy - fn destroy(&mut self) { - debug!("destroy"); - debug!("destroy: removing cache dir: {:?}", self.cache_dir); - self.cache_dir = None; - } - //endregion - //region lookup - fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) { - debug!("lookup: {}:{:?}", parent, name); - let parent = parent.into(); - let children = self.children.get(&parent); - if children.is_none() { - warn!("lookup: could not find children for {}", parent); - reply.error(libc::ENOENT); - return; - } - let children = children.unwrap(); - debug!("lookup: children: {:?}", children); - for child_inode in children { - let entry = self.entries.get(child_inode); - if entry.is_none() { - warn!("lookup: could not find entry for {}", child_inode); - continue; - } - let entry = entry.unwrap(); - - let path: PathBuf = entry.name.clone().into(); - let accepted = name.eq_ignore_ascii_case(&path); - debug!( - "entry: {}:(accepted={}){:?}; {:?}", - child_inode, accepted, path, entry.attr - ); - if accepted { - reply.entry(&self.time_to_live, &entry.attr, self.generation); - return; - } - } - warn!("lookup: could not find entry for {:?}", name); - - reply.error(libc::ENOENT); - } - //endregion - //region getattr - fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) { - debug!("getattr: {}", ino); - let entry = self.entries.get(&ino.into()); - if let Some(entry) = entry { - reply.attr(&self.time_to_live, &entry.attr); - } else { - reply.error(libc::ENOENT); - } - } - //endregion - //region setattr - fn setattr( - &mut self, - _req: &Request<'_>, - ino: u64, - mode: Option, - uid: Option, - gid: Option, - size: Option, - _atime: Option, - _mtime: Option, - _ctime: Option, - /*TODO: check if this change need to be implemented*/ - fh: Option, - _crtime: Option, - /*TODO: check if this change need to be implemented*/ - _chgtime: Option, - /*TODO: check if this change need to be implemented*/ - _bkuptime: Option, - flags: Option, - reply: ReplyAttr, - ) { - // debug!("setattr: {}", ino); - - debug!( - "setattr: {}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}", - ino, - mode, - uid, - gid, - size, - _atime, - _mtime, - _ctime, - fh, - _crtime, - _chgtime, - _bkuptime, - flags - ); - let ttl = self.time_to_live.clone(); - let entry = self.get_entry_mut(ino); - if let None = entry { - error!("setattr: could not find entry for {}", ino); - reply.error(libc::ENOENT); - return; - } - let mut entry = entry.unwrap(); - let attr = &mut entry.attr; - - if let Some(mode) = mode { - attr.perm = mode as u16; - } - if let Some(uid) = uid { - attr.uid = uid; - } - if let Some(gid) = gid { - attr.gid = gid; - } - if let Some(size) = size { - attr.size = size; - } - if let Some(flags) = flags { - attr.flags = flags; - } - reply.attr(&ttl, &attr); - //TODO: update file on drive if necessary - } - //endregion - //region write - fn write(&mut self, _req: &Request<'_>, ino: u64, fh: u64, offset: i64, data: &[u8], write_flags: u32, flags: i32, lock_owner: Option, reply: ReplyWrite) { - debug!( - "write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}", - ino, fh, offset, flags, lock_owner, write_flags, data, - ); - let cache_dir = self.cache_dir - .as_ref() - .map(|c| c.path().to_path_buf()) - .clone(); - if let None = cache_dir { - error!("write: cache dir not set"); - reply.error(libc::ENOENT); - return; - } - let cache_dir = cache_dir.unwrap(); - let entry = self.get_entry_mut(&ino.into()); - if let None = entry { - error!("write: could not find entry for {}", ino); - reply.error(libc::ENOENT); - return; - } - let mut entry = entry.unwrap(); - //TODO: queue uploads on a separate thread - - let path = Self::construct_cache_path_for_entry(&cache_dir, &entry); - // let path = entry.local_path.to_path_buf(); - debug!("opening file: {:?}", &path); - let file = OpenOptions::new() - .truncate(false) - .create(true) - .write(true) - .open(&path); - if let Err(e) = file { - error!("write: could not open file: {:?}: {}", path, e); - reply.error(libc::ENOENT); - return; - } - let mut file = file.unwrap(); - - debug!("writing file: {:?} at {} with size {}", - &path, - offset, - data.len() - ); - - file.seek(SeekFrom::Start(offset as u64)).unwrap(); - file.write_all(data).unwrap(); - let size = data.len(); - // let size = file.write_at(data, offset as u64); - // if let Err(e) = size { - // error!("write: could not write file: {:?}: {}", path, e); - // reply.error(libc::ENOENT); - // return; - // } - // let size = size.unwrap(); - debug!("wrote file: {:?} at {}; wrote {} bits", &path, offset, size); - reply.written(size as u32); - //TODO: update size in entry if necessary - debug!("updating size to {} for entry: {:?}", entry.attr.size, entry); - let mut attr = &mut entry.attr; - attr.size = attr.size.max(offset as u64 + size as u64); - let now = SystemTime::now(); - attr.mtime = now; - attr.ctime = now; - debug!("updated size to {} for entry: {:?}", entry.attr.size, entry); - debug!("write done for entry: {:?}", entry); - } - //endregion - //region read - fn read( - &mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - offset: i64, - size: u32, - flags: i32, - lock_owner: Option, - reply: ReplyData, - ) { - debug!( - "read: {:10}:{:2}:{:3}:{:10}:{:10X}:{:?}", - ino, fh, offset, size, flags, lock_owner - ); - - let entry = self.get_entry_r(&ino.into()); - if let Err(e) = entry { - error!("read: could not find entry for {}: {}", ino, e); - reply.error(libc::ENOENT); - return; - } - let entry = entry.unwrap(); - - let is_cached = self.check_if_file_is_cached(ino); - if !is_cached.unwrap_or(false) { - debug!("read: file is not cached: {}", ino); - let x: Result = run_async_blocking(self.download_file_to_cache(ino)); - - if let Err(e) = x { - error!("read: could not download file: {}", e); - reply.error(libc::ENOENT); - return; - } - } - - let path = self.get_cache_path_for_entry(&entry); - if let Err(e) = path { - error!("read: could not get cache path: {}", e); - reply.error(libc::ENOENT); - return; - } - let path = path.unwrap(); - - debug!("read: path: {:?}", path); - let file = std::fs::File::open(&path); - if let Err(e) = file { - error!("read: could not open file: {}", e); - reply.error(libc::ENOENT); - return; - } - let mut file = file.unwrap(); - - let mut buf = vec![0; size as usize]; - debug!("reading file: {:?} at {} with size {}", &path, offset, size); - file.read_at(&mut buf, offset as u64).unwrap(); - debug!("read file: {:?} at {}", &path, offset); - reply.data(&buf); - } - //endregion - //region readdir - fn readdir( - &mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - mut offset: i64, - mut reply: ReplyDirectory, - ) { - debug!("readdir: {}:{}:{:?}", ino, fh, offset); - let children = self.children.get(&ino.into()); - if let Some(attr) = self.get_entries().get(&ino.into()).map(|entry| entry.attr) { - if attr.kind != FileType::Directory { - reply.error(libc::ENOTDIR); - return; - } - } - if children.is_none() { - reply.error(libc::ENOENT); - return; - } - - let children = children.unwrap(); - debug!("children ({}): {:?}", children.len(), children); - for child_inode in children.iter().skip(offset as usize) { - let entry = self.entries.get(child_inode).unwrap(); - let path: PathBuf = entry.local_path.clone().into(); - let attr = entry.attr; - let inode = (*child_inode).into(); - // Increment the offset for each processed entry - offset += 1; - debug!("entry: {}:{:?}; {:?}", inode, path, attr); - if reply.add(inode, offset, attr.kind, &entry.name) { - // If the buffer is full, we need to stop - debug!("readdir: buffer full"); - break; - } - } - debug!("readdir: ok"); - reply.ok(); - } - //endregion - //region access - fn access(&mut self, _req: &Request<'_>, ino: u64, mask: i32, reply: ReplyEmpty) { - reply.ok(); //TODO: implement this correctly - } - //endregion -} -//endregion +mod filesystem; +mod file_uploader; +mod settings; \ No newline at end of file diff --git a/src/fs/drive/settings.rs b/src/fs/drive/settings.rs new file mode 100644 index 0000000..2eaccba --- /dev/null +++ b/src/fs/drive/settings.rs @@ -0,0 +1,51 @@ +use std::fmt::{Display, Formatter}; +use std::path::Path; +use std::time::Duration; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SyncSettings { + /// How long the responses can/should be cached + time_to_live: Duration, + /// How long the files should be cached before checking + /// for updates + /// + /// this does not necessarily mean that the file will + /// be downloaded again, it just checks the modified time + /// on the remote against the local file + cache_time: Duration, +} + +impl SyncSettings { + pub fn new(time_to_live: Duration, cache_time: Duration) -> Self { + Self { + time_to_live, + cache_time, + } + } + // pub fn from_path(path: &Path)-> Self{ + // let s = Self{ + // time_to_live: Duration::from_secs(60), + // cache_time: None, + // }; + // s + // } +} + +// region getters +impl SyncSettings { + pub fn time_to_live(&self) -> Duration { + self.time_to_live + } + pub fn cache_time(&self) -> Duration { + self.cache_time + } +} + +// endregion +impl Display for SyncSettings { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "SyncSettings {{ ttl: {}s, cache_time: {}s }}", + self.time_to_live.as_secs(), + self.cache_time.as_secs()) + } +} diff --git a/src/fs/sample.rs b/src/fs/sample.rs index f0d228d..b9d4d51 100644 --- a/src/fs/sample.rs +++ b/src/fs/sample.rs @@ -10,10 +10,10 @@ use fuser::{ ReplyOpen, ReplyStatfs, ReplyWrite, ReplyXattr, Request, TimeOrNow, FUSE_ROOT_ID, }; use libc::c_int; -use log::{debug, warn}; +use tracing::{debug, warn}; use std::collections::HashMap; use std::ffi::{OsStr, OsString}; -use std::fmt::Display; +use std::fmt::{Debug, Display}; use std::fs::OpenOptions; use std::os::unix::prelude::*; use std::path::{Path, PathBuf}; @@ -157,7 +157,7 @@ impl CommonFilesystem for SampleFilesystem { name: &OsStr, mode: u16, file_type: FileType, - parent_ino: impl Into + Send, + parent_ino: impl Into + Send+ Debug, size: u64, ) -> Result { let parent_ino = parent_ino.into(); diff --git a/src/google_drive/drive.rs b/src/google_drive/drive.rs index 59f4464..c3fe30c 100644 --- a/src/google_drive/drive.rs +++ b/src/google_drive/drive.rs @@ -1,33 +1,72 @@ -use crate::google_drive::{drive, helpers, DriveId}; -use crate::prelude::*; use std::ffi::{OsStr, OsString}; -// use drive3::api::Scope::File; -use anyhow::anyhow; -use drive3::api::{File, Scope}; -use drive3::client::ReadSeek; -use drive3::hyper::body::HttpBody; -use drive3::hyper::client::HttpConnector; -use drive3::hyper::{body, Body, Response}; -use drive3::hyper_rustls::HttpsConnector; -use drive3::DriveHub; -use drive3::{hyper_rustls, oauth2}; -use futures::{Stream, StreamExt}; -use hyper::Client; -use log::{debug, trace, warn}; -use mime::{FromStrError, Mime}; -use std::fmt::{Debug, Error}; +use std::fmt::{Debug, Display, Error}; use std::io::Write; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::time::SystemTime; + +// use drive3::api::Scope::File; +use anyhow::{anyhow, Context}; +use drive3::{hyper_rustls, oauth2}; +use drive3::api::{File, Scope}; +use drive3::client::ReadSeek; +use drive3::DriveHub; +use drive3::hyper::{body, Body, Response}; +use drive3::hyper::body::HttpBody; +use drive3::hyper::client::HttpConnector; +use drive3::hyper_rustls::HttpsConnector; +use futures::{Stream, StreamExt}; +use hyper::Client; +use mime::{FromStrError, Mime}; +use tokio::{fs, io}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use tokio::runtime::Runtime; -use tokio::{fs, io}; +use tracing::{debug, instrument, trace, warn}; +use tracing::field::debug; +use crate::google_drive::{drive, DriveId, helpers}; +use crate::prelude::*; + +#[derive(Clone)] pub struct GoogleDrive { hub: DriveHub>, } impl GoogleDrive { + #[instrument] + pub(crate) async fn get_metadata_for_file(&self, drive_id: DriveId) -> anyhow::Result { + let drive_id = drive_id.into_string().map_err(|_| anyhow!("invalid drive_id"))?; + let (response, file) = self + .hub + .files() + .get(&drive_id) + .param("fields", "id, name, modifiedTime, driveId, size, createdTime, viewedByMeTime") + .doit().await?; + + Ok(file) + } +} + +impl GoogleDrive { + #[instrument(skip(file), fields(file_name = file.name, file_id = file.drive_id))] + pub async fn upload_file_content_from_path(&self, file: File, path: &Path) -> anyhow::Result<()> { + update_file_content_on_drive_from_path(&self, file, path).await?; + Ok(()) + } +} + +impl GoogleDrive { + pub(crate) async fn get_modified_time(&self, drive_id: DriveId) -> Result { + let drive_id: OsString = drive_id.into(); + let drive_id = drive_id.into_string().map_err(|_| anyhow!("invalid drive_id"))?; + let (response, file) = self.hub.files().get(&drive_id).param("fields", "modifiedTime").doit().await?; + let x = file.modified_time.ok_or_else(|| anyhow!("modified_time not found"))?; + Ok(x.into()) + } +} + +impl GoogleDrive { + #[instrument] pub async fn download_file(&self, file_id: DriveId, target_file: &PathBuf) -> Result<()> { debug!( "download_file: file_id: {:50?} to {}", @@ -50,6 +89,7 @@ impl GoogleDrive { } impl GoogleDrive { + #[instrument] pub async fn get_id(&self, path: &OsStr, parent_drive_id: Option) -> Result { debug!("Get ID of '{:?}' with parent: {:?}", path, parent_drive_id); let path: OsString = path.into(); @@ -61,7 +101,7 @@ impl GoogleDrive { Some(parent_drive_id) => parent_drive_id, None => DriveId::from("root"), } - .into(); + .into(); let parent_drive_id = match parent_drive_id.into_string() { Ok(parent_drive_id) => parent_drive_id, Err(_) => return Err("invalid parent_drive_id".into()), @@ -110,6 +150,7 @@ impl GoogleDrive { } impl GoogleDrive { + #[instrument] pub(crate) async fn new() -> Result { let auth = drive3::oauth2::read_application_secret("auth/client_secret.json").await?; @@ -117,9 +158,9 @@ impl GoogleDrive { auth, oauth2::InstalledFlowReturnMethod::HTTPRedirect, ) - .persist_tokens_to_disk("auth/tokens.json") - .build() - .await?; + .persist_tokens_to_disk("auth/tokens.json") + .build() + .await?; let http_client = Client::builder().build( hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() @@ -133,21 +174,24 @@ impl GoogleDrive { let mut drive = GoogleDrive { hub }; Ok(drive) } - pub async fn list_files(&mut self, folder_id: DriveId) -> Result> { + #[instrument] + pub async fn list_files(&self, folder_id: DriveId) -> anyhow::Result> { + debug!("list_files: folder_id: {:?}", folder_id); let folder_id: OsString = folder_id.into(); let folder_id = match folder_id.into_string() { Ok(folder_id) => folder_id, - Err(_) => return Err("invalid folder_id".into()), + Err(_) => return Err(anyhow!("invalid folder_id")), }; if folder_id.is_empty() { - return Err("folder_id is empty".into()); + return Err(anyhow!("folder_id is empty")); } if folder_id.contains('\'') { - return Err("folder_id contains invalid character".into()); + return Err(anyhow!("folder_id contains invalid character")); } let mut files = Vec::new(); let mut page_token = None; loop { + debug!("list_files: page_token: {:?}", page_token); let (response, result) = self .hub .files() @@ -160,7 +204,9 @@ impl GoogleDrive { .q(format!("'{}' in parents", folder_id).as_str()) .doit() .await?; - files.extend(result.files.ok_or("no file list returned")?); + let result_files = result.files.ok_or(anyhow!("no file list returned"))?; + debug!("list_files: response: {:?}", result_files.len()); + files.extend(result_files); page_token = result.next_page_token; if page_token.is_none() { break; @@ -169,11 +215,19 @@ impl GoogleDrive { Ok(files) } } + impl Debug for GoogleDrive { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GoogleDrive") } } + +impl Display for GoogleDrive { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "GoogleDrive") + } +} + pub async fn sample() -> Result<()> { debug!("sample"); @@ -340,27 +394,45 @@ async fn create_file_on_drive( debug!("create_file(): file: {:?}", file); Ok(file) } + +#[instrument(skip(file), fields(drive_id = file.drive_id))] pub async fn update_file_content_on_drive_from_path( drive: &GoogleDrive, file: google_drive3::api::File, source_path: &Path, -) -> Result<()> { +) -> anyhow::Result<()> { + debug!("update_file_content_on_drive_from_path(): source_path: {:?}", source_path); + // { + // debug!("reading content from file for testing"); + // let content = std::fs::File::open(source_path)?; + // let mut content = tokio::fs::File::from_std(content); + // let mut s = String::new(); + // content.read_to_string(&mut s).await?; + // debug!("update_file_content_on_drive_from_path(): content: {:?}", s); + // } + let content = fs::File::open(source_path).await?; + update_file_content_on_drive(drive, file, content).await?; Ok(()) } + +#[instrument(skip(file, content))] async fn update_file_content_on_drive( drive: &GoogleDrive, file: google_drive3::api::File, content: fs::File, -) -> Result<()> { +) -> anyhow::Result<()> { let stream = content.into_std().await; + // let stream = content; let mime_type = helpers::get_mime_from_file_metadata(&file)?; - let id = file.id.clone().unwrap(); + let id = file.drive_id.clone().with_context(|| "file metadata has no drive id")?; + debug!("starting upload"); let (response, file) = drive .hub .files() .update(file, &id) .upload(stream, mime_type) .await?; + debug!("upload done!"); debug!("update_file_on_drive(): response: {:?}", response); debug!("update_file_on_drive(): file: {:?}", file); Ok(()) diff --git a/src/google_drive/drive_id.rs b/src/google_drive/drive_id.rs index b4f752b..18f94e1 100644 --- a/src/google_drive/drive_id.rs +++ b/src/google_drive/drive_id.rs @@ -1,5 +1,5 @@ use std::ffi::OsString; -use std::fmt::Display; +use std::fmt::{Display, Pointer}; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct DriveId(OsString); @@ -7,6 +7,12 @@ impl DriveId { pub(crate) fn root() -> DriveId { DriveId(OsString::from("root")) } + pub fn as_str(&self) -> Option<&str> { + self.0.to_str() + } + pub fn into_string(self) -> Result { + self.0.into_string() + } } impl Into for DriveId { @@ -40,4 +46,4 @@ impl DriveId { pub fn new(id: impl Into) -> Self { Self(id.into()) } -} +} \ No newline at end of file diff --git a/src/google_drive/helpers.rs b/src/google_drive/helpers.rs index ebc72c4..3f85dba 100644 --- a/src/google_drive/helpers.rs +++ b/src/google_drive/helpers.rs @@ -4,11 +4,11 @@ use crate::google_drive::{DriveId, GoogleDrive}; use crate::prelude::*; use anyhow::anyhow; use drive3::api::File; -use log::debug; +use tracing::debug; use mime::Mime; use std::path::{Path, PathBuf}; use std::str::FromStr; -pub fn get_mime_from_file_metadata(file: &File) -> Result { +pub fn get_mime_from_file_metadata(file: &File) -> anyhow::Result { Ok(Mime::from_str( &file.mime_type.as_ref().unwrap_or(&"*/*".to_string()), )?) @@ -47,23 +47,23 @@ pub fn get_drive_id_from_local_path(drive: &DriveFilesystem, path: &Path) -> Res } mod test { use super::*; - #[tokio::test] - async fn test_get_drive_id_from_local_path() { - crate::init_logger(); - let path = Path::new("/drive1"); - let drive = DriveFilesystem::new(path).await; - let drive_mount_point = Path::new("/drive1"); - - let drive_id = get_drive_id_from_local_path(&drive, path).unwrap(); - assert_eq!(drive_id, "root".into()); - - let path = Path::new("/drive1/"); - let drive_id = get_drive_id_from_local_path(&drive, path).unwrap(); - assert_eq!(drive_id, "root".into()); - - let path = Path::new("/drive1/dir1/dir2/file1.txt"); - let drive_id = get_drive_id_from_local_path(&drive, path).unwrap(); - todo!("create assert for this test"); - // assert_eq!(drive_id, "TODO".into()); - } + // #[tokio::test] + // async fn test_get_drive_id_from_local_path() { + // crate::init_logger(); + // let path = Path::new("/drive1"); + // let drive = DriveFilesystem::new(path, GoogleDrive::new().await?).await; + // let drive_mount_point = Path::new("/drive1"); + // + // let drive_id = get_drive_id_from_local_path(&drive, path).unwrap(); + // assert_eq!(drive_id, "root".into()); + // + // let path = Path::new("/drive1/"); + // let drive_id = get_drive_id_from_local_path(&drive, path).unwrap(); + // assert_eq!(drive_id, "root".into()); + // + // let path = Path::new("/drive1/dir1/dir2/file1.txt"); + // let drive_id = get_drive_id_from_local_path(&drive, path).unwrap(); + // todo!("create assert for this test"); + // // assert_eq!(drive_id, "TODO".into()); + // } } diff --git a/src/lib.rs b/src/lib.rs index c44a1d3..4cff914 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,33 +1,48 @@ #![allow(dead_code, unused)] -use google_drive3::oauth2::read_application_secret; -use std::error::Error; -use std::time::{Duration, SystemTime}; - extern crate google_drive3 as drive3; + +use std::error::Error; +use std::ffi::OsStr; +use std::path::Path; +use std::time::{Duration, SystemTime}; +use std::time::UNIX_EPOCH; + +use async_trait::async_trait; +use drive3::{DriveHub, hyper, hyper_rustls, oauth2}; use drive3::api::Channel; -use drive3::{hyper, hyper_rustls, oauth2, DriveHub}; -use log::{debug, error, info, trace, warn}; +use fuser::{FileAttr, Filesystem, FileType, FUSE_ROOT_ID, MountOption, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry, ReplyOpen, ReplyWrite, ReplyXattr, Request, Session, SessionUnmounter, TimeOrNow}; +use google_drive3::oauth2::read_application_secret; // use nix; -use notify::{recommended_watcher, INotifyWatcher, RecommendedWatcher}; -use tokio::io::{stdin, AsyncReadExt}; -use tokio::sync::mpsc::channel; +use notify::{INotifyWatcher, recommended_watcher, RecommendedWatcher}; +use tempfile::TempDir; +use tokio::io::{AsyncReadExt, stdin}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{channel, Sender}; +use tokio::task::JoinHandle; +use tracing::{debug, error, info, trace, warn}; + +use prelude::*; + +use crate::config::common_file_filter::CommonFileFilter; +use crate::fs::drive::{DriveFilesystem, DriveFileUploader, FileUploaderCommand, SyncSettings}; +use crate::fs::sample::SampleFilesystem; +use crate::google_drive::GoogleDrive; + pub mod async_helper; pub mod common; pub mod fs; pub mod google_drive; pub mod prelude; - -use prelude::*; +pub mod config; #[cfg(test)] mod tests { use super::*; + fn init_logger() { - let _ = env_logger::builder() - .filter_level(log::LevelFilter::Trace) - .is_test(true) - .try_init(); + todo!("init logger (tracing)") } #[tokio::test] @@ -37,16 +52,11 @@ mod tests { } } -pub fn init_logger() { - env_logger::builder() - .filter_level(log::LevelFilter::Debug) - .try_init(); -} - pub async fn sample() -> Result<()> { //Test file id: "1IotISYu3cF7JrOdfFPKNOkgYg1-ii5Qs" list_files().await } + async fn list_files() -> Result<()> { debug!("Hello, world!"); let secret: oauth2::ApplicationSecret = read_application_secret("auth/client_secret.json") @@ -56,9 +66,9 @@ async fn list_files() -> Result<()> { secret, oauth2::InstalledFlowReturnMethod::HTTPRedirect, ) - .persist_tokens_to_disk("auth/token_store.json") - .build() - .await?; + .persist_tokens_to_disk("auth/token_store.json") + .build() + .await?; let hub = DriveHub::new( hyper::Client::builder().build( @@ -93,13 +103,6 @@ async fn list_files() -> Result<()> { ); Ok(()) } -use fuser::{ - FileAttr, FileType, Filesystem, MountOption, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, - ReplyEntry, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow, FUSE_ROOT_ID, -}; -use std::ffi::OsStr; -use std::path::Path; -use std::time::UNIX_EPOCH; #[derive(Default)] struct MyFS { @@ -117,16 +120,12 @@ struct MyFS { main_name: String, } -use crate::fs::drive::DriveFilesystem; -use crate::fs::sample::SampleFilesystem; -use async_trait::async_trait; -use tokio::runtime::Runtime; - struct DirEntry { ino: u64, name: String, file_type: FileType, } + impl MyFS { fn get_attr(&self, ino: u64) -> Option { // Get the file attributes based on the inode number @@ -264,6 +263,7 @@ impl MyFS { } } } + #[async_trait] impl Filesystem for MyFS { fn open(&mut self, _req: &Request<'_>, _ino: u64, _flags: i32, reply: ReplyOpen) { @@ -381,6 +381,7 @@ impl Filesystem for MyFS { } } } + pub async fn watch_file_reading() -> Result<()> { let mountpoint = "/tmp/fuse/1"; let options = vec![ @@ -402,11 +403,12 @@ pub async fn watch_file_reading() -> Result<()> { mountpoint, &options, ) - .unwrap(); + .unwrap(); debug!("Exiting..."); Ok(()) } + pub async fn sample_fs() -> Result<()> { let mountpoint = "/tmp/fuse/1"; let source = "/tmp/fuse/2"; @@ -419,18 +421,92 @@ pub async fn sample_fs() -> Result<()> { debug!("Exiting..."); Ok(()) } + pub async fn sample_drive_fs() -> Result<()> { let mountpoint = "/tmp/fuse/3"; + let upload_ignore_path = Path::new("config/.upload_ignore"); + let settings_path = Path::new("config/settings.json"); + + let cache_dir = get_cache_dir()?; + let upload_ignore = CommonFileFilter::from_path(upload_ignore_path)?; + let sync_settings = SyncSettings::new(Duration::from_secs(2), Duration::from_secs(20)); // let source = "/tmp/fuse/2"; - let options = vec![MountOption::RW]; + let drive = GoogleDrive::new().await?; + // let file_uploader = FileUploader::new("config/credentials.json", "config/token.json"); + let (file_uploader_sender, file_uploader_receiver) = mpsc::channel(1); + let mut file_uploader = DriveFileUploader::new(drive.clone(), + upload_ignore, + file_uploader_receiver, + cache_dir.path().to_path_buf(), + Duration::from_secs(3)); debug!("Mounting fuse filesystem at {}", mountpoint); - let fs = DriveFilesystem::new(mountpoint).await?; + let fs = DriveFilesystem::new(mountpoint, + Path::new(""), + file_uploader_sender.clone(), + drive, + cache_dir.into_path(), + sync_settings, + ).await?; - fuser::mount2(fs, mountpoint, &options).unwrap(); + // let session_unmounter = + let mount_options = vec![MountOption::RW]; - debug!("Exiting..."); + let uploader_handle: JoinHandle<()> = tokio::spawn(async move { file_uploader.listen().await; }); + let end_signal_handle: JoinHandle<()> = mount(fs, &mountpoint, &mount_options, file_uploader_sender).await?; + tokio::try_join!(uploader_handle, end_signal_handle)?; + + // tokio::spawn(async move { + // end_program_signal_awaiter(file_uploader_sender, session_unmounter).await?; + // }); + // fuser::mount2(fs, &mountpoint, &options).unwrap(); + + + debug!("Exiting gracefully..."); Ok(()) } + +fn get_cache_dir() -> Result { + let cache_dir = tempfile::tempdir()?; + debug!("cache_dir: {:?}", cache_dir.path()); + if !cache_dir.path().exists() { + debug!("creating cache dir: {:?}", cache_dir.path()); + std::fs::create_dir_all(cache_dir.path())?; + } else { + debug!("cache dir exists: {}", cache_dir.path().display()); + } + Ok(cache_dir) +} + +async fn mount(fs: DriveFilesystem, + mountpoint: &str, + options: &[MountOption], + sender: Sender) -> Result> { + let mut session = Session::new(fs, mountpoint.as_ref(), options)?; + let session_ender = session.unmount_callable(); + let end_program_signal_handle = tokio::spawn(async move { + end_program_signal_awaiter(sender, session_ender).await; + }); + debug!("Mounting fuse filesystem" ); + session.run(); + debug!("Finished with mounting"); + // Ok(session_ender) + Ok(end_program_signal_handle) +} + +async fn end_program_signal_awaiter(file_uploader_sender: Sender, + mut session_unmounter: SessionUnmounter) -> Result<()> { + tokio::signal::ctrl_c().await.expect("failed to listen for ctrl_c event"); + + info!("got signal to end program"); + file_uploader_sender.send(FileUploaderCommand::Stop).await?; + info!("sent stop command to file uploader"); + info!("unmounting..."); + session_unmounter.unmount()?; + info!("unmounted"); + Ok(()) +} + +/* // pub async fn watch_file_reading() -> Result<()> { // let temp_file = tempfile::NamedTempFile::new()?; // let file_path = temp_file.path(); @@ -474,3 +550,4 @@ pub async fn sample_drive_fs() -> Result<()> { // info!("Done (nix)"); // Ok(()) // } +*/ diff --git a/src/main.rs b/src/main.rs index 28fccb6..316a4cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,57 @@ +use tokio::io::AsyncReadExt; +use tracing::instrument::WithSubscriber; +use tracing::span; + #[tokio::main] async fn main() { - drive_syncer::init_logger(); + // drive_syncer::init_logger(); + init_tracing(); // drive_syncer::sample().await.unwrap(); // drive_syncer::google_drive::sample().await.unwrap(); // drive_syncer::watch_file_reading().await.unwrap(); // drive_syncer::sample_nix().await.unwrap(); // drive_syncer::sample_fs().await.unwrap(); + + sample_logging().await; drive_syncer::sample_drive_fs().await.unwrap(); } + +fn init_tracing() { + // use tracing::Level; + // use tracing_subscriber::fmt; + // use tracing_subscriber::EnvFilter; + // // Create a new subscriber with the default configuration + // let subscriber = fmt::Subscriber::builder() + // + // // .with_thread_ids(true) + // .with_env_filter(EnvFilter::from_default_env()) + // .with_max_level(Level::DEBUG) + // .with_line_number(true) + // .with_target(true) + // .with_file(true) + // // .with_span_events(fmt::format::FmtSpan::NONE) + // .finish(); + // + // // Install the subscriber as the default for this thread + // tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + console_subscriber::init(); + tracing::info!("tracing initialized"); +} + +#[tracing::instrument] +async fn sample_logging() { + use tracing::{debug, error, info, trace, warn}; + info!("info"); + debug!("debug"); + let s = span!(tracing::Level::TRACE, "span around trace and warn with stdin read"); + { + let _x = s.enter(); + trace!("trace"); + let mut string = [0u8; 1]; + // info!("press any key to continue"); + // tokio::io::stdin().read(&mut string).await.expect("failed to read stdin"); + warn!("warn"); + } + error!("error"); +}