implement writing to google drive with caching, switch to tracing for logs, change overall caching behaviour

This commit is contained in:
OMGeeky
2023-05-18 18:50:54 +02:00
parent baa94b2053
commit 121180e15b
17 changed files with 1591 additions and 845 deletions

2
.cargo/config.toml Normal file
View File

@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

View File

@@ -7,8 +7,7 @@ edition = "2021"
[dependencies]
google-drive3 = "5.0"
tokio = { version = "1.28", features = ["full"] }
log = "0.4"
tokio = { version = "1.28", features = ["full", "tracing"] }
env_logger = "0.10"
#nix = { version = "0.26", features = ["mount"] }
tempfile = "3.5.0"
@@ -23,4 +22,8 @@ futures = "0.3.28"
hyper = { version = "0.14.24", features = ["full", "stream"] }
mime = "0.3"
anyhow = "1.0"
async-recursion = "1.0.4"
async-recursion = "1.0.4"
ignore = "0.4.20"
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
console-subscriber = "0.1.9"

View File

@@ -1,4 +1,5 @@
use log::{debug, trace};
use std::fmt::Debug;
use tracing::{debug, trace};
use std::future::Future;
use tokio::runtime::{Handle, Runtime};
@@ -7,12 +8,13 @@ use tokio::runtime::{Handle, Runtime};
/// This function will block the current thread until the provided future has run to completion.
///
/// # Be careful with deadlocks
pub fn run_async_blocking<T>(f: impl std::future::Future<Output = T> + Sized) -> T {
pub fn run_async_blocking<T>(f: impl std::future::Future<Output = T> + Sized) -> T
where T: Debug {
trace!("run_async");
let handle = Handle::current();
handle.enter();
trace!("run_async: entered handle");
let result = futures::executor::block_on(f);
trace!("run_async: got result");
trace!("run_async: got result: {:?}", result);
result
}

View File

@@ -0,0 +1,22 @@
use std::path::{Path, PathBuf};
use crate::prelude::*;
use ignore::gitignore;
use ignore::gitignore::{Gitignore, GitignoreBuilder};
#[derive(Debug)]
pub struct CommonFileFilter {
pub filter: Gitignore,
}
impl CommonFileFilter{
pub fn from_path(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let ignores = GitignoreBuilder::new(&path)
.build()?;
let s = Self {
filter: ignores,
};
Ok(s)
}
pub fn is_filter_matched(&self, path: &Path) -> Result<bool> {
Ok(self.filter.matched(path, path.is_dir()).is_ignore())
}
}

1
src/config/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod common_file_filter;

View File

@@ -6,9 +6,10 @@ use crate::prelude::*;
use anyhow::anyhow;
use async_trait::async_trait;
use fuser::{FileAttr, FileType, TimeOrNow, FUSE_ROOT_ID};
use log::debug;
use tracing::debug;
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
@@ -145,7 +146,7 @@ pub trait CommonFilesystem<Entry: CommonEntry> {
name: &OsStr,
mode: u16,
file_type: FileType,
parent_ino: impl Into<Inode> + Send,
parent_ino: impl Into<Inode> + Send+ Debug,
size: u64,
) -> Result<Inode>;

View File

@@ -3,7 +3,7 @@ use crate::fs::{CommonEntry, Inode};
use crate::google_drive::DriveId;
use fuser::FileAttr;
use std::ffi::{OsStr, OsString};
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone)]
pub struct DriveEntry {
pub ino: Inode,
pub drive_id: DriveId,
@@ -12,6 +12,9 @@ pub struct DriveEntry {
// pub drive_path: OsString,
pub local_path: LocalPath,
pub attr: FileAttr,
pub metadata_cache_time: Option<std::time::SystemTime>,
pub content_cache_time: Option<std::time::SystemTime>,
pub drive_metadata: Option<drive3::api::File>,
}
impl DriveEntry {
pub fn new(
@@ -21,6 +24,7 @@ impl DriveEntry {
local_path: impl Into<LocalPath>,
attr: FileAttr,
drive_metadata: Option<drive3::api::File>,
) -> Self {
let name = name.into();
let path = local_path.into();
@@ -31,6 +35,9 @@ impl DriveEntry {
// drive_path: path.clone().into(),
local_path: path,
attr,
metadata_cache_time: None,
content_cache_time: None,
drive_metadata,
}
}
}

View File

@@ -0,0 +1,208 @@
use std::collections::HashMap;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use anyhow::anyhow;
use anyhow::Context;
use drive3::api::File;
use futures::TryFutureExt;
use tokio::io::{AsyncWriteExt, stdout};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
use crate::config::common_file_filter::CommonFileFilter;
use crate::google_drive::{DriveId, GoogleDrive};
use crate::prelude::*;
#[derive(Debug, Clone)]
pub struct FileCommand {
path: PathBuf,
file_metadata: File,
}
impl FileCommand {
pub fn new(path: PathBuf, file_metadata: File) -> Self {
Self {
path,
file_metadata,
}
}
}
#[derive(Debug)]
struct RunningUpload {
join_handle: JoinHandle<anyhow::Result<()>>,
stop_sender: Sender<()>,
}
#[derive(Debug)]
pub enum FileUploaderCommand {
UploadChange(FileCommand),
CreateFolder(FileCommand),
CreateFile(FileCommand),
Stop,
}
#[derive(Debug)]
pub struct DriveFileUploader {
drive: GoogleDrive,
/// the filter to apply when uploading files
upload_filter: CommonFileFilter,
/// the queue of files to upload
upload_queue: Vec<PathBuf>,
receiver: Receiver<FileUploaderCommand>,
cache_dir: PathBuf,
wait_time_before_upload: Duration,
running_uploads: HashMap<String, RunningUpload>,
}
impl<'a> DriveFileUploader {
#[instrument]
pub fn new(drive: GoogleDrive,
upload_filter: CommonFileFilter,
receiver: Receiver<FileUploaderCommand>,
cache_dir: PathBuf,
wait_time_before_upload: Duration) -> Self {
Self {
drive,
upload_filter,
upload_queue: Vec::new(),
receiver,
cache_dir,
wait_time_before_upload,
running_uploads: HashMap::new(),
}
}
#[instrument(skip(self), fields(self.upload_queue = self.upload_queue.len(),
self.upload_filter = self.upload_filter.filter.num_ignores()))]
pub async fn listen(&mut self) {
info!("listening for file upload requests");
loop {
// while let Some(command) = self.receiver.recv().await {
let command = self.receiver.recv().await;
if let Some(command) = command {
debug!("received path: {:?}", command);
debug!("received path: {:?}", command);
match command {
FileUploaderCommand::UploadChange(file_command) => {
let path = file_command.path;
let file_metadata = file_command.file_metadata;
if !self.upload_filter.is_filter_matched(&path).unwrap_or(false) {
let drive = self.drive.clone();
let drive_id = file_metadata.drive_id.clone().with_context(|| "no drive_id");
if let Err(e) = drive_id {
error!("failed to upload file: {:?} with error: {}", path, e);
continue;
}
let drive_id = drive_id.unwrap();
self.cancel_and_wait_for_running_upload_for_id(&drive_id).await;
info!("queuing upload of file: {:?}", path);
let wait_time_before_upload = self.wait_time_before_upload.clone();
let (rx, rc) = channel(1);
let upload_handle = tokio::spawn(async move { Self::upload_file(drive, file_metadata, path, wait_time_before_upload, rc).await });
self.running_uploads.insert(drive_id, RunningUpload {
join_handle: upload_handle,
stop_sender: rx,
});
} else {
info!("skipping upload of file since it is ignored: {:?}", path);
}
}
FileUploaderCommand::Stop => {
info!("received stop command: stopping file upload listener");
break;
}
_ => { warn!("received unknown command: {:?}", command); }
};
} else {
warn!("received None command, meaning all senders have been dropped. \
stopping file upload listener since no more commands will be received");
break;
}
}
info!("file upload listener stopped");
}
/// this function checks if there are any running uploads for the given drive_id
/// and if there are, it sends a stop command to all of them and then awaits for them to finish
async fn cancel_and_wait_for_running_upload_for_id(&mut self, drive_id: &String) {
debug!("checking for running uploads for file: {:?}", drive_id);
let running_uploads: Option<&mut RunningUpload> = self.running_uploads.get_mut(drive_id);
if let Some(running_upload) = running_uploads {
debug!("trying to send stop command to running upload for file: {:?}", drive_id);
running_upload.stop_sender.send(()).await;
debug!("waiting for running upload for file: {:?}", drive_id);
let x: &mut JoinHandle<anyhow::Result<()>> = &mut running_upload.join_handle;
tokio::join!(x);
debug!("finished waiting for running upload for file: {:?} ", drive_id);
debug!("removing running upload for file: {:?}", drive_id);
self.running_uploads.remove(drive_id);
}
}
#[instrument(skip(file_metadata))]
async fn upload_file(drive: GoogleDrive,
file_metadata: drive3::api::File,
local_path: PathBuf,
wait_time_before_upload: Duration,
rc: Receiver<()>) -> anyhow::Result<()> {
// debug!("uploading file: {:?}", local_path);
debug!("sleeping for {:?} before uploading {}", wait_time_before_upload, local_path.display());
tokio::select! {
_ = Self::wait_for_cancel_signal(rc) => {
debug!("received stop signal: stopping upload");
return Ok(());
},
_ = tokio::time::sleep(wait_time_before_upload)=> {
debug!("done sleeping");
return Self::upload_file_(&drive, file_metadata, &local_path, wait_time_before_upload)
.await
.map_err(|e| {
error!("error uploading file: {:?}: {:?}", local_path, e);
// FileUploadError {
// path: local_path,
// error: anyhow!(e),
anyhow!(e)
// }
});
}
}
}
#[instrument(skip(rc))]
async fn wait_for_cancel_signal(mut rc: Receiver<()>) {
match rc.recv().await {
Some(v) => {
debug!("received stop signal: stopping upload");
}
_ => { warn!("received None from cancel signal receiver") }
}
}
async fn upload_file_(drive: &GoogleDrive, file_metadata: File, local_path: &PathBuf, wait_duration: Duration) -> anyhow::Result<()> {
debug!("uploading file: {:?}", local_path);
let path = local_path.as_path();
drive.upload_file_content_from_path(file_metadata, path).await?;
// let result = drive.list_files(DriveId::from("root")).await.with_context(|| format!("could not do it"))?;
debug!("upload_file_: done");
Ok(())
}
}
pub struct FileUploadError {
path: PathBuf,
error: anyhow::Error,
}

980
src/fs/drive/filesystem.rs Normal file
View File

@@ -0,0 +1,980 @@
use std::{
any::Any,
collections::HashMap,
ffi::{OsStr, OsString},
fmt::Display,
fs::OpenOptions,
os::unix::prelude::*,
path::{Path, PathBuf},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::io::{Seek, SeekFrom, Write};
use std::ops::Deref;
use anyhow::{anyhow, Context, Error};
use async_recursion::async_recursion;
use drive3::api::File;
use fuser::{
FileAttr,
Filesystem,
FileType,
FUSE_ROOT_ID,
KernelConfig,
ReplyAttr,
ReplyData,
ReplyDirectory,
ReplyEmpty,
ReplyEntry,
ReplyIoctl,
ReplyLock,
ReplyLseek,
ReplyOpen,
ReplyStatfs,
ReplyWrite,
ReplyXattr,
Request,
TimeOrNow,
};
use futures::TryFutureExt;
use libc::c_int;
use mime::Mime;
use tempfile::TempDir;
use tokio::{
io::{AsyncBufReadExt, stdin},
runtime::Runtime,
};
use tracing::{debug, error, instrument, warn};
use crate::{async_helper::run_async_blocking, common::LocalPath, config::common_file_filter::CommonFileFilter, fs::common::CommonFilesystem, fs::CommonEntry, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, google_drive, prelude::*};
use crate::fs::drive::{FileCommand, FileUploaderCommand, SyncSettings};
enum CacheState {
Missing,
UpToDate,
RefreshNeeded,
}
#[derive(Debug)]
pub struct DriveFilesystem {
// runtime: Runtime,
/// the point where the filesystem is mounted
root: PathBuf,
/// the source dir to read from and write to
source: GoogleDrive,
/// the cache dir to store the files in
cache_dir: Option<PathBuf>,
/// the filter to apply when uploading files
// upload_filter: CommonFileFilter,
entries: HashMap<Inode, DriveEntry>,
children: HashMap<Inode, Vec<Inode>>,
/// with this we can send a path to the file uploader
/// to tell it to upload certain files.
file_uploader_sender: tokio::sync::mpsc::Sender<FileUploaderCommand>,
/// The generation of the filesystem
/// This is used to invalidate the cache
/// when the filesystem is remounted
generation: u64,
settings: SyncSettings,
}
impl Display for DriveFilesystem {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DriveFilesystem {{ entries: {}, gen: {}, settings: {} }}",
// self.root.display(),
// self.cache_dir.as_ref().map(|dir| dir.path().display()),
self.entries.len(),
self.generation,
self.settings,
)
}
}
impl DriveFilesystem {
#[instrument(fields(% self, entry))]
async fn schedule_upload(&self, entry: &DriveEntry) -> Result<()> {
debug!("DriveFilesystem::schedule_upload(entry: {:?})", entry);
let path = self.get_cache_path_for_entry(entry)?;
let metadata = Self::create_drive_metadata_from_entry(entry)?;
debug!("schedule_upload: sending path to file uploader...");
self.file_uploader_sender.send(FileUploaderCommand::UploadChange(FileCommand::new(path, metadata))).await?;
debug!("schedule_upload: sent path to file uploader");
Ok(())
}
fn create_drive_metadata_from_entry(entry: &DriveEntry) -> Result<File> {
Ok(File {
drive_id: match entry.drive_id.clone().into_string() {
Ok(v) => Some(v),
Err(_) => None
},
// name: match entry.name.clone().into_string() {
// Ok(v) => Some(v),
// Err(_) => None
// },
// size: Some(entry.attr.size as i64),
// modified_time: Some(entry.attr.mtime.into()),
// file_extension: match entry.local_path.extension().clone() {
// Some(v) => v.to_str().map(|v| v.to_string()),
// None => None
// },
..Default::default()
})
}
}
// region general
impl DriveFilesystem {
#[instrument(skip(file_uploader_sender))]
pub async fn new(root: impl AsRef<Path> + Debug,
config_path: impl AsRef<Path> + Debug,
file_uploader_sender: tokio::sync::mpsc::Sender<FileUploaderCommand>,
drive: GoogleDrive, cache_dir: PathBuf,
settings: SyncSettings) -> Result<DriveFilesystem> {
let root = root.as_ref();
let config_path = config_path.as_ref();
debug!("DriveFilesystem::new(root:{}; config_path: {})", root.display(), config_path.display());
// let upload_filter = CommonFileFilter::from_path(config_path)?;
let mut entries = HashMap::new();
let now = SystemTime::now();
// Add root directory with inode number 1
let root_attr = FileAttr {
ino: FUSE_ROOT_ID,
size: 0,
blocks: 0,
atime: now,
mtime: now,
ctime: now,
crtime: now,
kind: FileType::Directory,
perm: 0o755,
nlink: 2,
uid: 0,
gid: 0,
rdev: 0,
blksize: 4096,
flags: 0,
};
let inode = Inode::from(FUSE_ROOT_ID);
entries.insert(
inode,
DriveEntry::new(
inode,
"root".to_string(),
DriveId::root(),
LocalPath::from(Path::new("")),
root_attr,
None,
),
);
let mut s = Self {
root: root.to_path_buf(),
source: drive,
cache_dir: Some(cache_dir),
entries,
file_uploader_sender,
/*TODO: implement a way to increase this if necessary*/
generation: 0,
children: HashMap::new(),
settings,
};
//
// let root = s.root.to_path_buf();
// s.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true)
// .await;
Ok(s)
}
#[instrument(fields(% self, inode))]
fn get_cache_dir_for_file(&self, inode: Inode) -> Result<PathBuf> {
debug!("get_cache_dir_for_file: {}", inode);
let cache_dir = self.cache_dir.as_ref().ok_or(anyhow!("no cache dir"))?;
debug!(
"get_cache_dir_for_file: {}, cache_dir: {}",
inode,
cache_dir.display()
);
let entry = self
.get_entry(inode)
.ok_or(anyhow!("could not get entry"))?;
debug!(
"get_cache_dir_for_file: entry local_path: {}",
entry.local_path.display()
);
let path = Self::construct_cache_folder_path(cache_dir, entry);
debug!("get_cache_dir_for_file: {}: {}", inode, path.display());
Ok(path)
}
#[instrument]
fn construct_cache_folder_path(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
let folder_path = match entry.local_path.parent() {
Some(p) => p.as_os_str(),
None => OsStr::new(""),
};
debug!("construct_cache_folder_path: folder_path: {:?}", folder_path);
let path = cache_dir.join(folder_path);
debug!("construct_cache_folder_path: {}", path.display());
path
}
#[async_recursion::async_recursion]
#[instrument(fields(% self, folder_path, parent_ino, inline_self))]
async fn add_dir_entry(
&mut self,
folder_path: &Path,
parent_ino: Inode,
inline_self: bool,
) -> Result<()> {
let ino;
debug!(
"add_dir_entry: {:?}; parent: {}; inline_self: {} ",
folder_path, parent_ino, inline_self
);
if self.root == folder_path {
debug!("add_dir_entry: root folder");
ino = parent_ino;
} else if inline_self {
debug!("add_dir_entry: inlining self entry for {:?}", folder_path);
ino = parent_ino;
} else {
debug!("add_dir_entry: adding entry for {:?}", folder_path);
ino = self
.add_entry(
folder_path.file_name().ok_or(anyhow!("invalid filename"))?,
/*TODO: correct permissions*/
0o755,
FileType::Directory,
parent_ino,
/*TODO: implement size for folders*/ 0,
)
.await?;
}
let drive = &self.source;
let folder_drive_id: DriveId = self
.get_drive_id(ino)
.ok_or(anyhow!("could not find dir drive_id"))?;
debug!(
"add_dir_entry: getting files for '{:50?}' {}",
folder_drive_id,
folder_path.display()
);
let files;
{
let files_res = self.source.list_files(folder_drive_id).await;
if let Err(e) = files_res {
warn!("could not get files: {}", e);
return Ok(());
}
files = files_res.unwrap();
}
debug!("got {} files", files.len());
// let d = std::fs::read_dir(folder_path);
for entry in files {
debug!("entry: {:?}", entry);
let name = entry.name.as_ref().ok_or_else(|| "no name");
if let Err(e) = name {
warn!("could not get name: {}", e);
continue;
}
let name = name.as_ref().unwrap();
if name.contains("/") || name.contains("\\") || name.contains(":") {
warn!("invalid name: {}", name);
continue;
}
let path = folder_path.join(&name);
if let None = &entry.mime_type {
warn!("could not get mime_type");
continue;
}
let mime_type = entry.mime_type.as_ref().unwrap();
if mime_type == "application/vnd.google-apps.document"
|| mime_type == "application/vnd.google-apps.spreadsheet"
|| mime_type == "application/vnd.google-apps.drawing"
|| mime_type == "application/vnd.google-apps.form"
|| mime_type == "application/vnd.google-apps.presentation"
|| mime_type == "application/vnd.google-apps.drive-sdk"
|| mime_type == "application/vnd.google-apps.script"
//TODO: add all relevant mime types
{
debug!(
"skipping google file: mime_type: '{}' entry: {:?}",
mime_type, entry
);
continue;
} else if mime_type == "application/vnd.google-apps.folder" {
debug!("adding folder: {:?}", path);
let res = self.add_dir_entry(&path, ino, false).await;
if let Err(e) = res {
warn!("could not add folder: {}", e);
continue;
}
} else {
debug!("adding file: '{}' {:?}", mime_type, path);
let size = match Self::get_size_from_drive_metadata(&entry) {
Some(value) => value,
None => continue,
};
let mode = 0o644; //TODO: get mode from settings
self.add_file_entry(ino, &OsString::from(&name), mode as u16, size)
.await;
}
}
Ok(())
}
#[instrument]
fn get_size_from_drive_metadata(entry: &File) -> Option<u64> {
let size = entry.size.ok_or_else(|| 0);
if let Err(e) = size {
warn!("could not get size: {}", e);
return None;
}
let size = size.unwrap();
if size < 0 {
warn!("invalid size: {}", size);
return None;
}
let size = size as u64;
Some(size)
}
#[instrument(fields(% self, ino))]
fn get_drive_id(&self, ino: impl Into<Inode>) -> Option<DriveId> {
self.get_entry(ino).map(|e| e.drive_id.clone())
}
}
// endregion
// region caching
impl DriveFilesystem {
async fn download_file_to_cache(&mut self, ino: impl Into<Inode>) -> Result<PathBuf> {
let ino = ino.into();
debug!("download_file_to_cache: {}", ino);
let entry = self.get_entry_r(ino)?;
let drive_id = entry.drive_id.clone();
let drive = &self.source;
let cache_path = self.get_cache_path_for_entry(&entry)?;
let folder = cache_path.parent()
.ok_or(anyhow!("could not get the folder the cache file should be saved in"))?;
if !folder.exists() {
debug!("creating folder: {}", folder.display());
std::fs::create_dir_all(folder)?;
}
debug!("downloading file: {}", cache_path.display());
drive.download_file(drive_id, &cache_path).await?;
debug!("downloaded file: {}", cache_path.display());
let size = std::fs::metadata(&cache_path)?.len();
self.set_entry_size(ino, size);
self.set_entry_cached(ino)?;
//TODO: check if any other things need to be updated for the entry
Ok(cache_path)
}
fn set_entry_cached(&mut self, ino: Inode) -> Result<()> {
let mut entry = self.get_entry_mut(ino).ok_or(anyhow!("could not get entry"))?;
entry.content_cache_time = Some(SystemTime::now());
Ok(())
}
fn check_if_file_is_cached(&self, ino: impl Into<Inode> + Debug) -> Result<bool> {
let entry = self.get_entry_r(ino)?;
let path = self.get_cache_path_for_entry(&entry)?;
let exists = path.exists();
Ok(exists)
}
#[instrument(fields(% self))]
async fn update_entry_metadata_cache_if_needed(&mut self, ino: impl Into<Inode> + Debug) -> Result<()> {
//TODO: do something that uses the changes api so not every file needs to check for
// itself if it needs to update, rather it gets checked once and then updates all the
// cache times for all files
let ino = ino.into();
let entry = self.get_entry_r(ino)?;
let refresh_cache = self.get_cache_state(&entry.metadata_cache_time);
match refresh_cache {
CacheState::RefreshNeeded | CacheState::Missing => {
debug!("refreshing metadata cache for drive_id: {:?}", entry.drive_id);
let metadata = self.source.get_metadata_for_file(entry.drive_id.clone()).await?;
self.update_entry_metadata(ino, &metadata)?;
self.set_entry_metadata_cached(ino)?;
}
CacheState::UpToDate => {
debug!("metadata cache is up to date");
}
_ => {
debug!("unknown cache state");
}
}
Ok(())
}
#[instrument(fields(% self))]
async fn update_cache_if_needed(&mut self, ino: impl Into<Inode> + Debug) -> Result<()> {
let ino = ino.into();
self.update_entry_metadata_cache_if_needed(ino).await?;
let entry = match self.get_entry_r(ino) {
Ok(entry) => entry,
Err(e) => {
warn!("could not get entry: {}", e);
return Err(e);
}
};
let refresh_cache = self.get_cache_state(&entry.content_cache_time);
match refresh_cache {
CacheState::Missing => {
debug!("no local cache for: {}, downloading...", ino);
self.download_file_to_cache(ino).await?;
}
CacheState::RefreshNeeded => {
debug!("cache needs refresh for: {}, checking for updated version...", ino);
let remote_mod_time: SystemTime = self.get_modified_time_on_remote(ino).await?;
debug!("remote_mod_time: {:?}", remote_mod_time);
let local_mod_time = self.get_entry_r(ino)?.attr.mtime;
debug!("local_mod_time: {:?}", local_mod_time);
if remote_mod_time > local_mod_time {
debug!("updating cached file since remote_mod_time: {:?} > local_mod_time: {:?}", remote_mod_time, local_mod_time);
self.download_file_to_cache(ino).await?;
} else {
debug!("local file is up to date: remote_mod_time: {:?} <= local_mod_time: {:?}", remote_mod_time, local_mod_time);
}
}
CacheState::UpToDate => {
debug!("Cache up to date for {} since {:?} > {}", ino, entry.content_cache_time.unwrap(), self.settings.cache_time().as_secs());
}
}
Ok(())
}
fn get_cache_state(&self, cache_time: &Option<SystemTime>) -> CacheState {
let refresh_cache: CacheState = match cache_time {
Some(cache_time) => {
let now = SystemTime::now();
let duration = now.duration_since(*cache_time).unwrap();
// let seconds = duration.as_secs();
if duration > self.settings.cache_time() {
CacheState::RefreshNeeded
} else {
CacheState::UpToDate
}
}
None => CacheState::Missing,
};
refresh_cache
}
fn get_cache_path_for_entry(&self, entry: &DriveEntry) -> Result<PathBuf> {
debug!("get_cache_path_for_entry: {}", entry.ino);
let cache_folder = match self.cache_dir.as_ref() {
Some(x) => x,
None => return Err(anyhow!("cache_dir is None").into()),
};
let path = Self::construct_cache_path_for_entry(&cache_folder, entry);
Ok(path)
}
fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display());
let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name);
debug!(
"get_cache_path_for_entry: {}: {}",
entry.ino,
path.display()
);
path
}
async fn get_modified_time_on_remote(&self, ino: Inode) -> Result<SystemTime> {
let entry = self.get_entry_r(ino)?;
let drive_id = entry.drive_id.clone();
let drive = &self.source;
let modified_time = drive.get_modified_time(drive_id).await?;
Ok(modified_time)
}
fn set_entry_size(&mut self, ino: Inode, size: u64) -> anyhow::Result<()> {
self.get_entry_mut(ino).context("no entry for ino")?.attr.size = size;
Ok(())
}
fn update_entry_metadata(&mut self, ino: Inode, drive_metadata: &google_drive3::api::File) -> anyhow::Result<()> {
let entry = self.get_entry_mut(ino).context("no entry with ino")?;
if let Some(name) = drive_metadata.name.as_ref() {
entry.name = OsString::from(name);
}
if let Some(size) = drive_metadata.size.as_ref() {
entry.attr.size = *size as u64;
}
if let Some(modified_time) = drive_metadata.modified_time.as_ref() {
entry.attr.mtime = (*modified_time).into();
}
if let Some(created_time) = drive_metadata.created_time.as_ref() {
entry.attr.ctime = (*created_time).into();
}
if let Some(viewed_by_me) = drive_metadata.viewed_by_me_time.as_ref(){
entry.attr.atime = (*viewed_by_me).into();
}
Ok(())
}
fn set_entry_metadata_cached(&mut self, ino: Inode) -> anyhow::Result<()> {
let mut entry = self.get_entry_mut(ino).context("no entry with ino")?;
entry.metadata_cache_time = Some(SystemTime::now());
Ok(())
}
}
// endregion
// region common
#[async_trait::async_trait]
impl CommonFilesystem<DriveEntry> for DriveFilesystem {
fn get_entries(&self) -> &HashMap<Inode, DriveEntry> {
&self.entries
}
fn get_entries_mut(&mut self) -> &mut HashMap<Inode, DriveEntry> {
&mut self.entries
}
fn get_children(&self) -> &HashMap<Inode, Vec<Inode>> {
&self.children
}
fn get_children_mut(&mut self) -> &mut HashMap<Inode, Vec<Inode>> {
&mut self.children
}
fn get_root_path(&self) -> LocalPath {
self.root.clone().into()
}
#[instrument(fields(% self, name, mode, file_type, parent_ino, size))]
async fn add_entry(
&mut self,
name: &OsStr,
mode: u16,
file_type: FileType,
parent_ino: impl Into<Inode> + Send + Debug,
size: u64,
) -> Result<Inode> {
let parent_ino = parent_ino.into();
debug!("add_entry: (0) name:{:20?}; parent: {}", name, parent_ino);
let ino = self.generate_ino(); // Generate a new inode number
let now = std::time::SystemTime::now();
//TODO: write the actual creation and modification time, not just now
let attr = FileAttr {
ino: ino.into(),
size: size,
/* TODO: set block size to something usefull.
maybe set it to 0 but when the file is cached set it to however big the
file in the cache is? that way it shows the actual size in blocks that are
used*/
blocks: 0,
atime: now,
mtime: now,
ctime: now,
crtime: now,
kind: file_type,
perm: mode,
nlink: 1,
uid: 0,
gid: 0,
rdev: 0,
/*TODO: set the actual block size?*/
blksize: 4096,
flags: 0,
};
let parent_drive_id = self.get_drive_id(parent_ino);
let drive_id: DriveId = self.source.get_id(name, parent_drive_id).await?;
debug!("add_entry: (1) drive_id: {:?}", drive_id);
let parent_local_path = self.get_path_from_ino(parent_ino);
let parent_path: PathBuf = parent_local_path
.ok_or(anyhow!("could not get local path"))?
.into();
self.get_entries_mut().insert(
ino,
DriveEntry::new(ino, name, drive_id, parent_path.join(name), attr, None),
);
self.add_child(parent_ino, &ino);
debug!("add_entry: (2) after adding count: {}", self.entries.len());
Ok(ino)
}
}
// endregion
//region some convenience functions/implementations
// fn check_if_entry_is_cached
//endregion
//region filesystem
impl Filesystem for DriveFilesystem {
//region init
#[instrument(skip(_req, _config), fields(% self))]
fn init(
&mut self,
_req: &Request<'_>,
_config: &mut KernelConfig,
) -> std::result::Result<(), c_int> {
debug!("init");
let root = self.root.to_path_buf();
let x = run_async_blocking(self.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true));
if let Err(e) = x {
error!("could not add root entry: {}", e);
}
Ok(())
}
//endregion
//region destroy
#[instrument(fields(% self))]
fn destroy(&mut self) {
debug!("destroy");
self.file_uploader_sender.send(FileUploaderCommand::Stop);
}
//endregion
//region lookup
#[instrument(skip(_req, reply), fields(% self))]
fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
debug!("lookup: {}:{:?}", parent, name);
let parent = parent.into();
let children = self.children.get(&parent);
if children.is_none() {
warn!("lookup: could not find children for {}", parent);
reply.error(libc::ENOENT);
return;
}
let children = children.unwrap().clone();
debug!("lookup: children: {:?}", children);
for child_inode in children {
run_async_blocking(self.update_entry_metadata_cache_if_needed(child_inode));
let entry = self.entries.get(&child_inode);
if entry.is_none() {
warn!("lookup: could not find entry for {}", child_inode);
continue;
}
let entry = entry.unwrap();
let path: PathBuf = entry.name.clone().into();
let accepted = name.eq_ignore_ascii_case(&path);
debug!(
"entry: {}:(accepted={}){:?}; {:?}",
child_inode, accepted, path, entry.attr
);
if accepted {
reply.entry(&self.settings.time_to_live(), &entry.attr, self.generation);
return;
}
}
warn!("lookup: could not find entry for {:?}", name);
reply.error(libc::ENOENT);
}
//endregion
//region getattr
#[instrument(skip(_req, reply), fields(% self))]
fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) {
debug!("getattr: {}", ino);
run_async_blocking(self.update_entry_metadata_cache_if_needed(ino));
let entry = self.entries.get(&ino.into());
if let Some(entry) = entry {
reply.attr(&self.settings.time_to_live(), &entry.attr);
} else {
reply.error(libc::ENOENT);
}
}
//endregion
//region setattr
#[instrument(skip(_req, reply), fields(% self))]
fn setattr(
&mut self,
_req: &Request<'_>,
ino: u64,
mode: Option<u32>,
uid: Option<u32>,
gid: Option<u32>,
size: Option<u64>,
_atime: Option<TimeOrNow>,
_mtime: Option<TimeOrNow>,
_ctime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
fh: Option<u64>,
_crtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_chgtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_bkuptime: Option<SystemTime>,
flags: Option<u32>,
reply: ReplyAttr,
) {
// debug!("setattr: {}", ino);
debug!(
"setattr: {}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}",
ino,
mode,
uid,
gid,
size,
_atime,
_mtime,
_ctime,
fh,
_crtime,
_chgtime,
_bkuptime,
flags
);
let ttl = self.settings.time_to_live();
let entry = self.get_entry_mut(ino);
if let None = entry {
error!("setattr: could not find entry for {}", ino);
reply.error(libc::ENOENT);
return;
}
let mut entry = entry.unwrap();
let attr = &mut entry.attr;
if let Some(mode) = mode {
attr.perm = mode as u16;
}
if let Some(uid) = uid {
attr.uid = uid;
}
if let Some(gid) = gid {
attr.gid = gid;
}
if let Some(size) = size {
attr.size = size;
}
if let Some(flags) = flags {
attr.flags = flags;
}
reply.attr(&ttl, &attr);
//TODO: update file on drive if necessary
}
//endregion
//region write
#[instrument(skip(_req, reply), fields(% self, data = data.len()))]
fn write(&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
offset: i64,
data: &[u8],
write_flags: u32,
flags: i32,
lock_owner: Option<u64>,
reply: ReplyWrite) {
debug!(
"write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}",
ino, fh, offset, flags, lock_owner, write_flags, data,
);
let cache_update_success: Result<()> = run_async_blocking(self.update_cache_if_needed(ino));
if let Err(e) = cache_update_success {
error!("write: could not update cache: {}", e);
reply.error(libc::EIO);
return;
}
let cache_dir = self.cache_dir
.as_ref()
.map(|s| s.to_path_buf());
if let None = cache_dir {
error!("write: cache dir not set");
reply.error(libc::ENOENT);
return;
}
let cache_dir = cache_dir.unwrap();
{
let entry = self.get_entry_mut(ino);
if let None = entry {
error!("write: could not find entry for {}", ino);
reply.error(libc::ENOENT);
return;
}
let mut entry = entry.unwrap();
//TODO: queue uploads on a separate thread
let path = Self::construct_cache_path_for_entry(&cache_dir, &entry);
// let path = entry.local_path.to_path_buf();
debug!("opening file: {:?}", &path);
let file = OpenOptions::new()
.truncate(false)
.create(true)
.write(true)
.open(&path);
if let Err(e) = file {
error!("write: could not open file: {:?}: {}", path, e);
reply.error(libc::ENOENT);
return;
}
let mut file = file.unwrap();
debug!("writing file: {:?} at {} with size {}",
&path,
offset,
data.len()
);
file.seek(SeekFrom::Start(offset as u64)).unwrap();
file.write_all(data).unwrap();
let size = data.len();
// let size = file.write_at(data, offset as u64);
// if let Err(e) = size {
// error!("write: could not write file: {:?}: {}", path, e);
// reply.error(libc::ENOENT);
// return;
// }
// let size = size.unwrap();
debug!("wrote file: {:?} at {}; wrote {} bits", &path, offset, size);
reply.written(size as u32);
//TODO: update size in entry if necessary
debug!("updating size to {} for entry: {:?}", entry.attr.size, entry);
let mut attr = &mut entry.attr;
attr.size = attr.size.max(offset as u64 + size as u64);
let now = SystemTime::now();
attr.mtime = now;
attr.ctime = now;
debug!("updated size to {} for entry: {:?}", entry.attr.size, entry);
debug!("write done for entry: {:?}", entry);
}
let entry = self.get_entry_r(&ino.into())
.expect("how could this happen to me. I swear it was there a second ago");
run_async_blocking(self.schedule_upload(&entry));
}
//endregion
//region read
#[instrument(skip(_req, reply), fields(% self))]
fn read(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
offset: i64,
size: u32,
flags: i32,
lock_owner: Option<u64>,
reply: ReplyData,
) {
debug!(
"read: {:10}:{:2}:{:3}:{:10}:{:10X}:{:?}",
ino, fh, offset, size, flags, lock_owner
);
run_async_blocking(self.update_entry_metadata_cache_if_needed(ino));
let x: Result<()> = run_async_blocking(self.update_cache_if_needed(ino));
if let Err(e) = x {
error!("read: could not update cache: {}", e);
reply.error(libc::EIO);
return;
}
// let is_cached = self.check_if_file_is_cached(ino);
// if !is_cached.unwrap_or(false) {
// debug!("read: file is not cached: {}", ino);
// let x: Result<PathBuf> = run_async_blocking(self.download_file_to_cache(ino));
//
// if let Err(e) = x {
// error!("read: could not download file: {}", e);
// reply.error(libc::ENOENT);
// return;
// }
// }
let entry = self.get_entry_r(&ino.into());
if let Err(e) = entry {
error!("read: could not find entry for {}: {}", ino, e);
reply.error(libc::ENOENT);
return;
}
let entry = entry.unwrap();
let path = self.get_cache_path_for_entry(&entry);
if let Err(e) = path {
error!("read: could not get cache path: {}", e);
reply.error(libc::ENOENT);
return;
}
let path = path.unwrap();
debug!("read: path: {:?}", path);
let file = std::fs::File::open(&path);
if let Err(e) = file {
error!("read: could not open file: {}", e);
reply.error(libc::EIO);
return;
}
let mut file = file.unwrap();
let mut buf = vec![0; size as usize];
debug!("reading file: {:?} at {} with size {}", &path, offset, size);
file.read_at(&mut buf, offset as u64).unwrap();
debug!("read file: {:?} at {}", &path, offset);
reply.data(&buf);
}
//endregion
//region readdir
#[instrument(skip(_req, reply), fields(% self, ino, fh, offset))]
fn readdir(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
mut offset: i64,
mut reply: ReplyDirectory,
) {
debug!("readdir: {}:{}:{:?}", ino, fh, offset);
run_async_blocking(self.update_entry_metadata_cache_if_needed(ino));
let children = self.children.get(&ino.into());
if let Some(attr) = self.get_entries().get(&ino.into()).map(|entry| entry.attr) {
if attr.kind != FileType::Directory {
reply.error(libc::ENOTDIR);
return;
}
}
if children.is_none() {
reply.error(libc::ENOENT);
return;
}
let children = children.unwrap();
debug!("children ({}): {:?}", children.len(), children);
for child_inode in children.iter().skip(offset as usize) {
let entry = self.entries.get(child_inode).unwrap();
let path: PathBuf = entry.local_path.clone().into();
let attr = entry.attr;
let inode = (*child_inode).into();
// Increment the offset for each processed entry
offset += 1;
debug!("entry: {}:{:?}; {:?}", inode, path, attr);
if reply.add(inode, offset, attr.kind, &entry.name) {
// If the buffer is full, we need to stop
debug!("readdir: buffer full");
break;
}
}
debug!("readdir: ok");
reply.ok();
}
//endregion
//region access
#[instrument(fields(% self, ino, mask))]
fn access(&mut self, _req: &Request<'_>, ino: u64, mask: i32, reply: ReplyEmpty) {
reply.ok(); //TODO: implement this correctly
}
//endregion
}
//endregion

View File

@@ -1,741 +1,9 @@
use crate::{
google_drive::{DriveId, GoogleDrive},
fs::CommonEntry,
fs::inode::Inode,
fs::common::CommonFilesystem,
common::LocalPath,
prelude::*,
async_helper::run_async_blocking,
};
use anyhow::{anyhow, Error};
use async_recursion::async_recursion;
use drive3::api::File;
use fuser::{
FileAttr, FileType, Filesystem, KernelConfig, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty,
ReplyEntry, ReplyOpen, ReplyStatfs, ReplyWrite, ReplyXattr, Request, TimeOrNow, FUSE_ROOT_ID,
};
use futures::TryFutureExt;
use libc::c_int;
use log::{debug, error, warn};
use mime::Mime;
use std::{
any::Any,
collections::HashMap,
ffi::{OsStr, OsString},
fmt::Display,
fs::OpenOptions,
os::unix::prelude::*,
path::{Path, PathBuf},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use std::io::{Seek, SeekFrom, Write};
use tempfile::TempDir;
use tokio::{
io::{stdin, AsyncBufReadExt},
runtime::Runtime,
};
pub use entry::*;
pub use filesystem::*;
pub use file_uploader::*;
pub use settings::*;
mod entry;
#[derive(Debug)]
pub struct DriveFilesystem {
// runtime: Runtime,
/// the point where the filesystem is mounted
root: PathBuf,
/// the source dir to read from and write to
source: GoogleDrive,
/// the cache dir to store the files in
cache_dir: Option<TempDir>,
/// How long the responses can/should be cached
time_to_live: Duration,
entries: HashMap<Inode, DriveEntry>,
children: HashMap<Inode, Vec<Inode>>,
/// The generation of the filesystem
/// This is used to invalidate the cache
/// when the filesystem is remounted
generation: u64,
}
// region general
impl DriveFilesystem {
pub async fn new(root: impl AsRef<Path>) -> Result<Self> {
debug!("new: {:?};", root.as_ref());
let mut entries = HashMap::new();
let now = SystemTime::now();
// Add root directory with inode number 1
let root_attr = FileAttr {
ino: FUSE_ROOT_ID,
size: 0,
blocks: 0,
atime: now,
mtime: now,
ctime: now,
crtime: now,
kind: FileType::Directory,
perm: 0o755,
nlink: 2,
uid: 0,
gid: 0,
rdev: 0,
blksize: 4096,
flags: 0,
};
let inode = FUSE_ROOT_ID.into();
entries.insert(
inode,
DriveEntry {
ino: inode,
name: "root".into(),
local_path: LocalPath::from(Path::new("")),
drive_id: DriveId::root(),
// drive_path: "/".into(),
attr: root_attr,
},
);
let cache_dir = tempfile::tempdir()?;
debug!("cache_dir: {:?}", cache_dir.path());
if !cache_dir.path().exists() {
debug!("creating cache dir: {:?}", cache_dir.path());
std::fs::create_dir_all(cache_dir.path())?;
} else {
debug!("cache dir exists: {}", cache_dir.path().display());
}
let mut s = Self {
root: root.as_ref().to_path_buf(),
source: GoogleDrive::new().await?,
cache_dir: Some(cache_dir),
time_to_live: Duration::from_secs(2),
entries,
/*TODO: implement a way to increase this if necessary*/
generation: 0,
children: HashMap::new(),
};
//
// let root = s.root.to_path_buf();
// s.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true)
// .await;
Ok(s)
}
fn get_cache_dir_for_file(&self, inode: Inode) -> Result<PathBuf> {
debug!("get_cache_dir_for_file: {}", inode);
let cache_dir = self.cache_dir.as_ref().ok_or(anyhow!("no cache dir"))?;
debug!(
"get_cache_dir_for_file: {}, cache_dir: {}",
inode,
cache_dir.path().display()
);
let entry = self
.get_entry(inode)
.ok_or(anyhow!("could not get entry"))?;
debug!(
"get_cache_dir_for_file: entry local_path: {}",
entry.local_path.display()
);
let path = Self::construct_cache_folder_path(cache_dir.path(), entry);
debug!("get_cache_dir_for_file: {}: {}", inode, path.display());
Ok(path)
}
fn construct_cache_folder_path(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
let folder_path = match entry.local_path.parent() {
Some(p) => p.as_os_str(),
None => OsStr::new(""),
};
debug!("construct_cache_folder_path: folder_path: {:?}", folder_path);
let path = cache_dir.join(folder_path);
debug!("construct_cache_folder_path: {}", path.display());
path
}
#[async_recursion::async_recursion]
async fn add_dir_entry(
&mut self,
folder_path: &Path,
parent_ino: Inode,
skip_self: bool,
) -> Result<()> {
let ino;
debug!(
"add_dir_entry: {:?}; parent: {}; skip_self: {} ",
folder_path, parent_ino, skip_self
);
if self.root == folder_path {
ino = parent_ino;
} else {
debug!("add_dir_entry: adding entry for {:?}", folder_path);
ino = self
.add_entry(
folder_path.file_name().ok_or(anyhow!("invalid filename"))?,
/*TODO: correct permissions*/
0o755,
FileType::Directory,
parent_ino,
/*TODO: implement size for folders*/ 0,
)
.await?;
}
let drive = &self.source;
let folder_drive_id: DriveId = self
.get_drive_id(ino)
.ok_or(anyhow!("could not find dir drive_id"))?;
debug!(
"add_dir_entry: getting files for '{:50?}' {}",
folder_drive_id,
folder_path.display()
);
let files;
{
let files_res = self.source.list_files(folder_drive_id).await;
if let Err(e) = files_res {
warn!("could not get files: {}", e);
return Ok(());
}
files = files_res.unwrap();
}
debug!("got {} files", files.len());
// let d = std::fs::read_dir(folder_path);
for entry in files {
debug!("entry: {:?}", entry);
let name = entry.name.as_ref().ok_or_else(|| "no name");
if let Err(e) = name {
warn!("could not get name: {}", e);
continue;
}
let name = name.as_ref().unwrap();
if name.contains("/") || name.contains("\\") || name.contains(":") {
warn!("invalid name: {}", name);
continue;
}
let path = folder_path.join(&name);
if let None = &entry.mime_type {
warn!("could not get mime_type");
continue;
}
let mime_type = entry.mime_type.as_ref().unwrap();
if mime_type == "application/vnd.google-apps.document"
|| mime_type == "application/vnd.google-apps.spreadsheet"
|| mime_type == "application/vnd.google-apps.drawing"
|| mime_type == "application/vnd.google-apps.form"
|| mime_type == "application/vnd.google-apps.presentation"
|| mime_type == "application/vnd.google-apps.drive-sdk"
|| mime_type == "application/vnd.google-apps.script"
//TODO: add all relevant mime types
{
debug!(
"skipping google file: mime_type: '{}' entry: {:?}",
mime_type, entry
);
continue;
} else if mime_type == "application/vnd.google-apps.folder" {
debug!("adding folder: {:?}", path);
let res = self.add_dir_entry(&path, ino, false).await;
if let Err(e) = res {
warn!("could not add folder: {}", e);
continue;
}
// } else if metadata.is_file() {
} else {
debug!("adding file: '{}' {:?}", mime_type, path);
let size = match Self::get_size_from_drive_metadata(&entry) {
Some(value) => value,
None => continue,
};
let mode = 0o644; //TODO: get mode from settings
self.add_file_entry(ino, &OsString::from(&name), mode as u16, size)
.await;
}
}
Ok(())
}
fn get_size_from_drive_metadata(entry: &File) -> Option<u64> {
let size = entry.size.ok_or_else(|| 0);
if let Err(e) = size {
warn!("could not get size: {}", e);
return None;
}
let size = size.unwrap();
if size < 0 {
warn!("invalid size: {}", size);
return None;
}
let size = size as u64;
Some(size)
}
fn get_drive_id(&self, ino: impl Into<Inode>) -> Option<DriveId> {
self.get_entry(ino).map(|e| e.drive_id.clone())
}
}
// endregion
// region caching
impl DriveFilesystem {
async fn download_file_to_cache(&self, ino: impl Into<Inode>) -> Result<PathBuf> {
let ino = ino.into();
debug!("download_file_to_cache: {}", ino);
let entry = self.get_entry_r(ino)?;
let drive_id = entry.drive_id.clone();
let drive = &self.source;
let path = self.get_cache_path_for_entry(&entry)?;
let folder = path.parent().unwrap();
if !folder.exists() {
debug!("creating folder: {}", folder.display());
std::fs::create_dir_all(folder)?;
}
debug!("downloading file: {}", path.display());
drive.download_file(drive_id, &path).await?;
Ok(path)
}
fn check_if_file_is_cached(&self, ino: impl Into<Inode>) -> Result<bool> {
let entry = self.get_entry_r(ino)?;
let path = self.get_cache_path_for_entry(&entry)?;
let exists = path.exists();
Ok(exists)
}
fn get_cache_path_for_entry(&self, entry: &DriveEntry) -> Result<PathBuf> {
debug!("get_cache_path_for_entry: {}", entry.ino);
let cache_folder = match self.cache_dir.as_ref() {
Some(x) => x.path() ,
None => return Err(anyhow!("cache_dir is None").into()),
};
let path = Self::construct_cache_path_for_entry(&cache_folder, entry);
Ok(path)
}
fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display());
let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name);
debug!(
"get_cache_path_for_entry: {}: {}",
entry.ino,
path.display()
);
path
}
}
// endregion
// region common
#[async_trait::async_trait]
impl CommonFilesystem<DriveEntry> for DriveFilesystem {
fn get_entries(&self) -> &HashMap<Inode, DriveEntry> {
&self.entries
}
fn get_entries_mut(&mut self) -> &mut HashMap<Inode, DriveEntry> {
&mut self.entries
}
fn get_children(&self) -> &HashMap<Inode, Vec<Inode>> {
&self.children
}
fn get_children_mut(&mut self) -> &mut HashMap<Inode, Vec<Inode>> {
&mut self.children
}
fn get_root_path(&self) -> LocalPath {
self.root.clone().into()
}
async fn add_entry(
&mut self,
name: &OsStr,
mode: u16,
file_type: FileType,
parent_ino: impl Into<Inode> + Send,
size: u64,
) -> Result<Inode> {
let parent_ino = parent_ino.into();
debug!("add_entry: (0) name:{:20?}; parent: {}", name, parent_ino);
let ino = self.generate_ino(); // Generate a new inode number
let now = std::time::SystemTime::now();
//TODO: write the actual creation and modification time, not just now
let attr = FileAttr {
ino: ino.into(),
size: size,
/* TODO: set block size to something usefull.
maybe set it to 0 but when the file is cached set it to however big the
file in the cache is? that way it shows the actual size in blocks that are
used*/
blocks: 0,
atime: now,
mtime: now,
ctime: now,
crtime: now,
kind: file_type,
perm: mode,
nlink: 1,
uid: 0,
gid: 0,
rdev: 0,
/*TODO: set the actual block size?*/
blksize: 4096,
flags: 0,
};
let parent_drive_id = self.get_drive_id(parent_ino);
let drive_id: DriveId = self.source.get_id(name, parent_drive_id).await?;
debug!("add_entry: (1) drive_id: {:?}", drive_id);
let parent_local_path = self.get_path_from_ino(parent_ino);
let parent_path: PathBuf = parent_local_path
.ok_or(anyhow!("could not get local path"))?
.into();
self.get_entries_mut().insert(
ino,
DriveEntry::new(ino, name, drive_id, parent_path.join(name), attr),
);
self.add_child(parent_ino, &ino);
debug!("add_entry: (2) after adding count: {}", self.entries.len());
Ok(ino)
}
}
// endregion
//region some convenience functions/implementations
impl Display for DriveFilesystem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DriveFilesystem at: '/{}'", self.root.display())
}
}
//endregion
//region filesystem
impl Filesystem for DriveFilesystem {
//region init
fn init(
&mut self,
_req: &Request<'_>,
_config: &mut KernelConfig,
) -> std::result::Result<(), c_int> {
debug!("init");
let root = self.root.to_path_buf();
let x = run_async_blocking(self.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true));
if let Err(e) = x {
error!("could not add root entry: {}", e);
}
Ok(())
}
//endregion
//region destroy
fn destroy(&mut self) {
debug!("destroy");
debug!("destroy: removing cache dir: {:?}", self.cache_dir);
self.cache_dir = None;
}
//endregion
//region lookup
fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
debug!("lookup: {}:{:?}", parent, name);
let parent = parent.into();
let children = self.children.get(&parent);
if children.is_none() {
warn!("lookup: could not find children for {}", parent);
reply.error(libc::ENOENT);
return;
}
let children = children.unwrap();
debug!("lookup: children: {:?}", children);
for child_inode in children {
let entry = self.entries.get(child_inode);
if entry.is_none() {
warn!("lookup: could not find entry for {}", child_inode);
continue;
}
let entry = entry.unwrap();
let path: PathBuf = entry.name.clone().into();
let accepted = name.eq_ignore_ascii_case(&path);
debug!(
"entry: {}:(accepted={}){:?}; {:?}",
child_inode, accepted, path, entry.attr
);
if accepted {
reply.entry(&self.time_to_live, &entry.attr, self.generation);
return;
}
}
warn!("lookup: could not find entry for {:?}", name);
reply.error(libc::ENOENT);
}
//endregion
//region getattr
fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) {
debug!("getattr: {}", ino);
let entry = self.entries.get(&ino.into());
if let Some(entry) = entry {
reply.attr(&self.time_to_live, &entry.attr);
} else {
reply.error(libc::ENOENT);
}
}
//endregion
//region setattr
fn setattr(
&mut self,
_req: &Request<'_>,
ino: u64,
mode: Option<u32>,
uid: Option<u32>,
gid: Option<u32>,
size: Option<u64>,
_atime: Option<TimeOrNow>,
_mtime: Option<TimeOrNow>,
_ctime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
fh: Option<u64>,
_crtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_chgtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_bkuptime: Option<SystemTime>,
flags: Option<u32>,
reply: ReplyAttr,
) {
// debug!("setattr: {}", ino);
debug!(
"setattr: {}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}",
ino,
mode,
uid,
gid,
size,
_atime,
_mtime,
_ctime,
fh,
_crtime,
_chgtime,
_bkuptime,
flags
);
let ttl = self.time_to_live.clone();
let entry = self.get_entry_mut(ino);
if let None = entry {
error!("setattr: could not find entry for {}", ino);
reply.error(libc::ENOENT);
return;
}
let mut entry = entry.unwrap();
let attr = &mut entry.attr;
if let Some(mode) = mode {
attr.perm = mode as u16;
}
if let Some(uid) = uid {
attr.uid = uid;
}
if let Some(gid) = gid {
attr.gid = gid;
}
if let Some(size) = size {
attr.size = size;
}
if let Some(flags) = flags {
attr.flags = flags;
}
reply.attr(&ttl, &attr);
//TODO: update file on drive if necessary
}
//endregion
//region write
fn write(&mut self, _req: &Request<'_>, ino: u64, fh: u64, offset: i64, data: &[u8], write_flags: u32, flags: i32, lock_owner: Option<u64>, reply: ReplyWrite) {
debug!(
"write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}",
ino, fh, offset, flags, lock_owner, write_flags, data,
);
let cache_dir = self.cache_dir
.as_ref()
.map(|c| c.path().to_path_buf())
.clone();
if let None = cache_dir {
error!("write: cache dir not set");
reply.error(libc::ENOENT);
return;
}
let cache_dir = cache_dir.unwrap();
let entry = self.get_entry_mut(&ino.into());
if let None = entry {
error!("write: could not find entry for {}", ino);
reply.error(libc::ENOENT);
return;
}
let mut entry = entry.unwrap();
//TODO: queue uploads on a separate thread
let path = Self::construct_cache_path_for_entry(&cache_dir, &entry);
// let path = entry.local_path.to_path_buf();
debug!("opening file: {:?}", &path);
let file = OpenOptions::new()
.truncate(false)
.create(true)
.write(true)
.open(&path);
if let Err(e) = file {
error!("write: could not open file: {:?}: {}", path, e);
reply.error(libc::ENOENT);
return;
}
let mut file = file.unwrap();
debug!("writing file: {:?} at {} with size {}",
&path,
offset,
data.len()
);
file.seek(SeekFrom::Start(offset as u64)).unwrap();
file.write_all(data).unwrap();
let size = data.len();
// let size = file.write_at(data, offset as u64);
// if let Err(e) = size {
// error!("write: could not write file: {:?}: {}", path, e);
// reply.error(libc::ENOENT);
// return;
// }
// let size = size.unwrap();
debug!("wrote file: {:?} at {}; wrote {} bits", &path, offset, size);
reply.written(size as u32);
//TODO: update size in entry if necessary
debug!("updating size to {} for entry: {:?}", entry.attr.size, entry);
let mut attr = &mut entry.attr;
attr.size = attr.size.max(offset as u64 + size as u64);
let now = SystemTime::now();
attr.mtime = now;
attr.ctime = now;
debug!("updated size to {} for entry: {:?}", entry.attr.size, entry);
debug!("write done for entry: {:?}", entry);
}
//endregion
//region read
fn read(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
offset: i64,
size: u32,
flags: i32,
lock_owner: Option<u64>,
reply: ReplyData,
) {
debug!(
"read: {:10}:{:2}:{:3}:{:10}:{:10X}:{:?}",
ino, fh, offset, size, flags, lock_owner
);
let entry = self.get_entry_r(&ino.into());
if let Err(e) = entry {
error!("read: could not find entry for {}: {}", ino, e);
reply.error(libc::ENOENT);
return;
}
let entry = entry.unwrap();
let is_cached = self.check_if_file_is_cached(ino);
if !is_cached.unwrap_or(false) {
debug!("read: file is not cached: {}", ino);
let x: Result<PathBuf> = run_async_blocking(self.download_file_to_cache(ino));
if let Err(e) = x {
error!("read: could not download file: {}", e);
reply.error(libc::ENOENT);
return;
}
}
let path = self.get_cache_path_for_entry(&entry);
if let Err(e) = path {
error!("read: could not get cache path: {}", e);
reply.error(libc::ENOENT);
return;
}
let path = path.unwrap();
debug!("read: path: {:?}", path);
let file = std::fs::File::open(&path);
if let Err(e) = file {
error!("read: could not open file: {}", e);
reply.error(libc::ENOENT);
return;
}
let mut file = file.unwrap();
let mut buf = vec![0; size as usize];
debug!("reading file: {:?} at {} with size {}", &path, offset, size);
file.read_at(&mut buf, offset as u64).unwrap();
debug!("read file: {:?} at {}", &path, offset);
reply.data(&buf);
}
//endregion
//region readdir
fn readdir(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
mut offset: i64,
mut reply: ReplyDirectory,
) {
debug!("readdir: {}:{}:{:?}", ino, fh, offset);
let children = self.children.get(&ino.into());
if let Some(attr) = self.get_entries().get(&ino.into()).map(|entry| entry.attr) {
if attr.kind != FileType::Directory {
reply.error(libc::ENOTDIR);
return;
}
}
if children.is_none() {
reply.error(libc::ENOENT);
return;
}
let children = children.unwrap();
debug!("children ({}): {:?}", children.len(), children);
for child_inode in children.iter().skip(offset as usize) {
let entry = self.entries.get(child_inode).unwrap();
let path: PathBuf = entry.local_path.clone().into();
let attr = entry.attr;
let inode = (*child_inode).into();
// Increment the offset for each processed entry
offset += 1;
debug!("entry: {}:{:?}; {:?}", inode, path, attr);
if reply.add(inode, offset, attr.kind, &entry.name) {
// If the buffer is full, we need to stop
debug!("readdir: buffer full");
break;
}
}
debug!("readdir: ok");
reply.ok();
}
//endregion
//region access
fn access(&mut self, _req: &Request<'_>, ino: u64, mask: i32, reply: ReplyEmpty) {
reply.ok(); //TODO: implement this correctly
}
//endregion
}
//endregion
mod filesystem;
mod file_uploader;
mod settings;

51
src/fs/drive/settings.rs Normal file
View File

@@ -0,0 +1,51 @@
use std::fmt::{Display, Formatter};
use std::path::Path;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncSettings {
/// How long the responses can/should be cached
time_to_live: Duration,
/// How long the files should be cached before checking
/// for updates
///
/// this does not necessarily mean that the file will
/// be downloaded again, it just checks the modified time
/// on the remote against the local file
cache_time: Duration,
}
impl SyncSettings {
pub fn new(time_to_live: Duration, cache_time: Duration) -> Self {
Self {
time_to_live,
cache_time,
}
}
// pub fn from_path(path: &Path)-> Self{
// let s = Self{
// time_to_live: Duration::from_secs(60),
// cache_time: None,
// };
// s
// }
}
// region getters
impl SyncSettings {
pub fn time_to_live(&self) -> Duration {
self.time_to_live
}
pub fn cache_time(&self) -> Duration {
self.cache_time
}
}
// endregion
impl Display for SyncSettings {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SyncSettings {{ ttl: {}s, cache_time: {}s }}",
self.time_to_live.as_secs(),
self.cache_time.as_secs())
}
}

View File

@@ -10,10 +10,10 @@ use fuser::{
ReplyOpen, ReplyStatfs, ReplyWrite, ReplyXattr, Request, TimeOrNow, FUSE_ROOT_ID,
};
use libc::c_int;
use log::{debug, warn};
use tracing::{debug, warn};
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::Display;
use std::fmt::{Debug, Display};
use std::fs::OpenOptions;
use std::os::unix::prelude::*;
use std::path::{Path, PathBuf};
@@ -157,7 +157,7 @@ impl CommonFilesystem<SampleEntry> for SampleFilesystem {
name: &OsStr,
mode: u16,
file_type: FileType,
parent_ino: impl Into<Inode> + Send,
parent_ino: impl Into<Inode> + Send+ Debug,
size: u64,
) -> Result<Inode> {
let parent_ino = parent_ino.into();

View File

@@ -1,33 +1,72 @@
use crate::google_drive::{drive, helpers, DriveId};
use crate::prelude::*;
use std::ffi::{OsStr, OsString};
// use drive3::api::Scope::File;
use anyhow::anyhow;
use drive3::api::{File, Scope};
use drive3::client::ReadSeek;
use drive3::hyper::body::HttpBody;
use drive3::hyper::client::HttpConnector;
use drive3::hyper::{body, Body, Response};
use drive3::hyper_rustls::HttpsConnector;
use drive3::DriveHub;
use drive3::{hyper_rustls, oauth2};
use futures::{Stream, StreamExt};
use hyper::Client;
use log::{debug, trace, warn};
use mime::{FromStrError, Mime};
use std::fmt::{Debug, Error};
use std::fmt::{Debug, Display, Error};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::SystemTime;
// use drive3::api::Scope::File;
use anyhow::{anyhow, Context};
use drive3::{hyper_rustls, oauth2};
use drive3::api::{File, Scope};
use drive3::client::ReadSeek;
use drive3::DriveHub;
use drive3::hyper::{body, Body, Response};
use drive3::hyper::body::HttpBody;
use drive3::hyper::client::HttpConnector;
use drive3::hyper_rustls::HttpsConnector;
use futures::{Stream, StreamExt};
use hyper::Client;
use mime::{FromStrError, Mime};
use tokio::{fs, io};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
use tokio::{fs, io};
use tracing::{debug, instrument, trace, warn};
use tracing::field::debug;
use crate::google_drive::{drive, DriveId, helpers};
use crate::prelude::*;
#[derive(Clone)]
pub struct GoogleDrive {
hub: DriveHub<HttpsConnector<HttpConnector>>,
}
impl GoogleDrive {
#[instrument]
pub(crate) async fn get_metadata_for_file(&self, drive_id: DriveId) -> anyhow::Result<File> {
let drive_id = drive_id.into_string().map_err(|_| anyhow!("invalid drive_id"))?;
let (response, file) = self
.hub
.files()
.get(&drive_id)
.param("fields", "id, name, modifiedTime, driveId, size, createdTime, viewedByMeTime")
.doit().await?;
Ok(file)
}
}
impl GoogleDrive {
#[instrument(skip(file), fields(file_name = file.name, file_id = file.drive_id))]
pub async fn upload_file_content_from_path(&self, file: File, path: &Path) -> anyhow::Result<()> {
update_file_content_on_drive_from_path(&self, file, path).await?;
Ok(())
}
}
impl GoogleDrive {
pub(crate) async fn get_modified_time(&self, drive_id: DriveId) -> Result<SystemTime> {
let drive_id: OsString = drive_id.into();
let drive_id = drive_id.into_string().map_err(|_| anyhow!("invalid drive_id"))?;
let (response, file) = self.hub.files().get(&drive_id).param("fields", "modifiedTime").doit().await?;
let x = file.modified_time.ok_or_else(|| anyhow!("modified_time not found"))?;
Ok(x.into())
}
}
impl GoogleDrive {
#[instrument]
pub async fn download_file(&self, file_id: DriveId, target_file: &PathBuf) -> Result<()> {
debug!(
"download_file: file_id: {:50?} to {}",
@@ -50,6 +89,7 @@ impl GoogleDrive {
}
impl GoogleDrive {
#[instrument]
pub async fn get_id(&self, path: &OsStr, parent_drive_id: Option<DriveId>) -> Result<DriveId> {
debug!("Get ID of '{:?}' with parent: {:?}", path, parent_drive_id);
let path: OsString = path.into();
@@ -61,7 +101,7 @@ impl GoogleDrive {
Some(parent_drive_id) => parent_drive_id,
None => DriveId::from("root"),
}
.into();
.into();
let parent_drive_id = match parent_drive_id.into_string() {
Ok(parent_drive_id) => parent_drive_id,
Err(_) => return Err("invalid parent_drive_id".into()),
@@ -110,6 +150,7 @@ impl GoogleDrive {
}
impl GoogleDrive {
#[instrument]
pub(crate) async fn new() -> Result<Self> {
let auth = drive3::oauth2::read_application_secret("auth/client_secret.json").await?;
@@ -117,9 +158,9 @@ impl GoogleDrive {
auth,
oauth2::InstalledFlowReturnMethod::HTTPRedirect,
)
.persist_tokens_to_disk("auth/tokens.json")
.build()
.await?;
.persist_tokens_to_disk("auth/tokens.json")
.build()
.await?;
let http_client = Client::builder().build(
hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
@@ -133,21 +174,24 @@ impl GoogleDrive {
let mut drive = GoogleDrive { hub };
Ok(drive)
}
pub async fn list_files(&mut self, folder_id: DriveId) -> Result<Vec<File>> {
#[instrument]
pub async fn list_files(&self, folder_id: DriveId) -> anyhow::Result<Vec<File>> {
debug!("list_files: folder_id: {:?}", folder_id);
let folder_id: OsString = folder_id.into();
let folder_id = match folder_id.into_string() {
Ok(folder_id) => folder_id,
Err(_) => return Err("invalid folder_id".into()),
Err(_) => return Err(anyhow!("invalid folder_id")),
};
if folder_id.is_empty() {
return Err("folder_id is empty".into());
return Err(anyhow!("folder_id is empty"));
}
if folder_id.contains('\'') {
return Err("folder_id contains invalid character".into());
return Err(anyhow!("folder_id contains invalid character"));
}
let mut files = Vec::new();
let mut page_token = None;
loop {
debug!("list_files: page_token: {:?}", page_token);
let (response, result) = self
.hub
.files()
@@ -160,7 +204,9 @@ impl GoogleDrive {
.q(format!("'{}' in parents", folder_id).as_str())
.doit()
.await?;
files.extend(result.files.ok_or("no file list returned")?);
let result_files = result.files.ok_or(anyhow!("no file list returned"))?;
debug!("list_files: response: {:?}", result_files.len());
files.extend(result_files);
page_token = result.next_page_token;
if page_token.is_none() {
break;
@@ -169,11 +215,19 @@ impl GoogleDrive {
Ok(files)
}
}
impl Debug for GoogleDrive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GoogleDrive")
}
}
impl Display for GoogleDrive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GoogleDrive")
}
}
pub async fn sample() -> Result<()> {
debug!("sample");
@@ -340,27 +394,45 @@ async fn create_file_on_drive(
debug!("create_file(): file: {:?}", file);
Ok(file)
}
#[instrument(skip(file), fields(drive_id = file.drive_id))]
pub async fn update_file_content_on_drive_from_path(
drive: &GoogleDrive,
file: google_drive3::api::File,
source_path: &Path,
) -> Result<()> {
) -> anyhow::Result<()> {
debug!("update_file_content_on_drive_from_path(): source_path: {:?}", source_path);
// {
// debug!("reading content from file for testing");
// let content = std::fs::File::open(source_path)?;
// let mut content = tokio::fs::File::from_std(content);
// let mut s = String::new();
// content.read_to_string(&mut s).await?;
// debug!("update_file_content_on_drive_from_path(): content: {:?}", s);
// }
let content = fs::File::open(source_path).await?;
update_file_content_on_drive(drive, file, content).await?;
Ok(())
}
#[instrument(skip(file, content))]
async fn update_file_content_on_drive(
drive: &GoogleDrive,
file: google_drive3::api::File,
content: fs::File,
) -> Result<()> {
) -> anyhow::Result<()> {
let stream = content.into_std().await;
// let stream = content;
let mime_type = helpers::get_mime_from_file_metadata(&file)?;
let id = file.id.clone().unwrap();
let id = file.drive_id.clone().with_context(|| "file metadata has no drive id")?;
debug!("starting upload");
let (response, file) = drive
.hub
.files()
.update(file, &id)
.upload(stream, mime_type)
.await?;
debug!("upload done!");
debug!("update_file_on_drive(): response: {:?}", response);
debug!("update_file_on_drive(): file: {:?}", file);
Ok(())

View File

@@ -1,5 +1,5 @@
use std::ffi::OsString;
use std::fmt::Display;
use std::fmt::{Display, Pointer};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DriveId(OsString);
@@ -7,6 +7,12 @@ impl DriveId {
pub(crate) fn root() -> DriveId {
DriveId(OsString::from("root"))
}
pub fn as_str(&self) -> Option<&str> {
self.0.to_str()
}
pub fn into_string(self) -> Result<String, OsString> {
self.0.into_string()
}
}
impl Into<OsString> for DriveId {
@@ -40,4 +46,4 @@ impl DriveId {
pub fn new(id: impl Into<OsString>) -> Self {
Self(id.into())
}
}
}

View File

@@ -4,11 +4,11 @@ use crate::google_drive::{DriveId, GoogleDrive};
use crate::prelude::*;
use anyhow::anyhow;
use drive3::api::File;
use log::debug;
use tracing::debug;
use mime::Mime;
use std::path::{Path, PathBuf};
use std::str::FromStr;
pub fn get_mime_from_file_metadata(file: &File) -> Result<Mime> {
pub fn get_mime_from_file_metadata(file: &File) -> anyhow::Result<Mime> {
Ok(Mime::from_str(
&file.mime_type.as_ref().unwrap_or(&"*/*".to_string()),
)?)
@@ -47,23 +47,23 @@ pub fn get_drive_id_from_local_path(drive: &DriveFilesystem, path: &Path) -> Res
}
mod test {
use super::*;
#[tokio::test]
async fn test_get_drive_id_from_local_path() {
crate::init_logger();
let path = Path::new("/drive1");
let drive = DriveFilesystem::new(path).await;
let drive_mount_point = Path::new("/drive1");
let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
assert_eq!(drive_id, "root".into());
let path = Path::new("/drive1/");
let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
assert_eq!(drive_id, "root".into());
let path = Path::new("/drive1/dir1/dir2/file1.txt");
let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
todo!("create assert for this test");
// assert_eq!(drive_id, "TODO".into());
}
// #[tokio::test]
// async fn test_get_drive_id_from_local_path() {
// crate::init_logger();
// let path = Path::new("/drive1");
// let drive = DriveFilesystem::new(path, GoogleDrive::new().await?).await;
// let drive_mount_point = Path::new("/drive1");
//
// let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
// assert_eq!(drive_id, "root".into());
//
// let path = Path::new("/drive1/");
// let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
// assert_eq!(drive_id, "root".into());
//
// let path = Path::new("/drive1/dir1/dir2/file1.txt");
// let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
// todo!("create assert for this test");
// // assert_eq!(drive_id, "TODO".into());
// }
}

View File

@@ -1,33 +1,48 @@
#![allow(dead_code, unused)]
use google_drive3::oauth2::read_application_secret;
use std::error::Error;
use std::time::{Duration, SystemTime};
extern crate google_drive3 as drive3;
use std::error::Error;
use std::ffi::OsStr;
use std::path::Path;
use std::time::{Duration, SystemTime};
use std::time::UNIX_EPOCH;
use async_trait::async_trait;
use drive3::{DriveHub, hyper, hyper_rustls, oauth2};
use drive3::api::Channel;
use drive3::{hyper, hyper_rustls, oauth2, DriveHub};
use log::{debug, error, info, trace, warn};
use fuser::{FileAttr, Filesystem, FileType, FUSE_ROOT_ID, MountOption, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry, ReplyOpen, ReplyWrite, ReplyXattr, Request, Session, SessionUnmounter, TimeOrNow};
use google_drive3::oauth2::read_application_secret;
// use nix;
use notify::{recommended_watcher, INotifyWatcher, RecommendedWatcher};
use tokio::io::{stdin, AsyncReadExt};
use tokio::sync::mpsc::channel;
use notify::{INotifyWatcher, recommended_watcher, RecommendedWatcher};
use tempfile::TempDir;
use tokio::io::{AsyncReadExt, stdin};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{channel, Sender};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};
use prelude::*;
use crate::config::common_file_filter::CommonFileFilter;
use crate::fs::drive::{DriveFilesystem, DriveFileUploader, FileUploaderCommand, SyncSettings};
use crate::fs::sample::SampleFilesystem;
use crate::google_drive::GoogleDrive;
pub mod async_helper;
pub mod common;
pub mod fs;
pub mod google_drive;
pub mod prelude;
use prelude::*;
pub mod config;
#[cfg(test)]
mod tests {
use super::*;
fn init_logger() {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.is_test(true)
.try_init();
todo!("init logger (tracing)")
}
#[tokio::test]
@@ -37,16 +52,11 @@ mod tests {
}
}
pub fn init_logger() {
env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.try_init();
}
pub async fn sample() -> Result<()> {
//Test file id: "1IotISYu3cF7JrOdfFPKNOkgYg1-ii5Qs"
list_files().await
}
async fn list_files() -> Result<()> {
debug!("Hello, world!");
let secret: oauth2::ApplicationSecret = read_application_secret("auth/client_secret.json")
@@ -56,9 +66,9 @@ async fn list_files() -> Result<()> {
secret,
oauth2::InstalledFlowReturnMethod::HTTPRedirect,
)
.persist_tokens_to_disk("auth/token_store.json")
.build()
.await?;
.persist_tokens_to_disk("auth/token_store.json")
.build()
.await?;
let hub = DriveHub::new(
hyper::Client::builder().build(
@@ -93,13 +103,6 @@ async fn list_files() -> Result<()> {
);
Ok(())
}
use fuser::{
FileAttr, FileType, Filesystem, MountOption, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty,
ReplyEntry, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow, FUSE_ROOT_ID,
};
use std::ffi::OsStr;
use std::path::Path;
use std::time::UNIX_EPOCH;
#[derive(Default)]
struct MyFS {
@@ -117,16 +120,12 @@ struct MyFS {
main_name: String,
}
use crate::fs::drive::DriveFilesystem;
use crate::fs::sample::SampleFilesystem;
use async_trait::async_trait;
use tokio::runtime::Runtime;
struct DirEntry {
ino: u64,
name: String,
file_type: FileType,
}
impl MyFS {
fn get_attr(&self, ino: u64) -> Option<FileAttr> {
// Get the file attributes based on the inode number
@@ -264,6 +263,7 @@ impl MyFS {
}
}
}
#[async_trait]
impl Filesystem for MyFS {
fn open(&mut self, _req: &Request<'_>, _ino: u64, _flags: i32, reply: ReplyOpen) {
@@ -381,6 +381,7 @@ impl Filesystem for MyFS {
}
}
}
pub async fn watch_file_reading() -> Result<()> {
let mountpoint = "/tmp/fuse/1";
let options = vec![
@@ -402,11 +403,12 @@ pub async fn watch_file_reading() -> Result<()> {
mountpoint,
&options,
)
.unwrap();
.unwrap();
debug!("Exiting...");
Ok(())
}
pub async fn sample_fs() -> Result<()> {
let mountpoint = "/tmp/fuse/1";
let source = "/tmp/fuse/2";
@@ -419,18 +421,92 @@ pub async fn sample_fs() -> Result<()> {
debug!("Exiting...");
Ok(())
}
pub async fn sample_drive_fs() -> Result<()> {
let mountpoint = "/tmp/fuse/3";
let upload_ignore_path = Path::new("config/.upload_ignore");
let settings_path = Path::new("config/settings.json");
let cache_dir = get_cache_dir()?;
let upload_ignore = CommonFileFilter::from_path(upload_ignore_path)?;
let sync_settings = SyncSettings::new(Duration::from_secs(2), Duration::from_secs(20));
// let source = "/tmp/fuse/2";
let options = vec![MountOption::RW];
let drive = GoogleDrive::new().await?;
// let file_uploader = FileUploader::new("config/credentials.json", "config/token.json");
let (file_uploader_sender, file_uploader_receiver) = mpsc::channel(1);
let mut file_uploader = DriveFileUploader::new(drive.clone(),
upload_ignore,
file_uploader_receiver,
cache_dir.path().to_path_buf(),
Duration::from_secs(3));
debug!("Mounting fuse filesystem at {}", mountpoint);
let fs = DriveFilesystem::new(mountpoint).await?;
let fs = DriveFilesystem::new(mountpoint,
Path::new(""),
file_uploader_sender.clone(),
drive,
cache_dir.into_path(),
sync_settings,
).await?;
fuser::mount2(fs, mountpoint, &options).unwrap();
// let session_unmounter =
let mount_options = vec![MountOption::RW];
debug!("Exiting...");
let uploader_handle: JoinHandle<()> = tokio::spawn(async move { file_uploader.listen().await; });
let end_signal_handle: JoinHandle<()> = mount(fs, &mountpoint, &mount_options, file_uploader_sender).await?;
tokio::try_join!(uploader_handle, end_signal_handle)?;
// tokio::spawn(async move {
// end_program_signal_awaiter(file_uploader_sender, session_unmounter).await?;
// });
// fuser::mount2(fs, &mountpoint, &options).unwrap();
debug!("Exiting gracefully...");
Ok(())
}
fn get_cache_dir() -> Result<TempDir> {
let cache_dir = tempfile::tempdir()?;
debug!("cache_dir: {:?}", cache_dir.path());
if !cache_dir.path().exists() {
debug!("creating cache dir: {:?}", cache_dir.path());
std::fs::create_dir_all(cache_dir.path())?;
} else {
debug!("cache dir exists: {}", cache_dir.path().display());
}
Ok(cache_dir)
}
async fn mount(fs: DriveFilesystem,
mountpoint: &str,
options: &[MountOption],
sender: Sender<FileUploaderCommand>) -> Result<JoinHandle<()>> {
let mut session = Session::new(fs, mountpoint.as_ref(), options)?;
let session_ender = session.unmount_callable();
let end_program_signal_handle = tokio::spawn(async move {
end_program_signal_awaiter(sender, session_ender).await;
});
debug!("Mounting fuse filesystem" );
session.run();
debug!("Finished with mounting");
// Ok(session_ender)
Ok(end_program_signal_handle)
}
async fn end_program_signal_awaiter(file_uploader_sender: Sender<FileUploaderCommand>,
mut session_unmounter: SessionUnmounter) -> Result<()> {
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl_c event");
info!("got signal to end program");
file_uploader_sender.send(FileUploaderCommand::Stop).await?;
info!("sent stop command to file uploader");
info!("unmounting...");
session_unmounter.unmount()?;
info!("unmounted");
Ok(())
}
/*
// pub async fn watch_file_reading() -> Result<()> {
// let temp_file = tempfile::NamedTempFile::new()?;
// let file_path = temp_file.path();
@@ -474,3 +550,4 @@ pub async fn sample_drive_fs() -> Result<()> {
// info!("Done (nix)");
// Ok(())
// }
*/

View File

@@ -1,11 +1,57 @@
use tokio::io::AsyncReadExt;
use tracing::instrument::WithSubscriber;
use tracing::span;
#[tokio::main]
async fn main() {
drive_syncer::init_logger();
// drive_syncer::init_logger();
init_tracing();
// drive_syncer::sample().await.unwrap();
// drive_syncer::google_drive::sample().await.unwrap();
// drive_syncer::watch_file_reading().await.unwrap();
// drive_syncer::sample_nix().await.unwrap();
// drive_syncer::sample_fs().await.unwrap();
sample_logging().await;
drive_syncer::sample_drive_fs().await.unwrap();
}
fn init_tracing() {
// use tracing::Level;
// use tracing_subscriber::fmt;
// use tracing_subscriber::EnvFilter;
// // Create a new subscriber with the default configuration
// let subscriber = fmt::Subscriber::builder()
//
// // .with_thread_ids(true)
// .with_env_filter(EnvFilter::from_default_env())
// .with_max_level(Level::DEBUG)
// .with_line_number(true)
// .with_target(true)
// .with_file(true)
// // .with_span_events(fmt::format::FmtSpan::NONE)
// .finish();
//
// // Install the subscriber as the default for this thread
// tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
console_subscriber::init();
tracing::info!("tracing initialized");
}
#[tracing::instrument]
async fn sample_logging() {
use tracing::{debug, error, info, trace, warn};
info!("info");
debug!("debug");
let s = span!(tracing::Level::TRACE, "span around trace and warn with stdin read");
{
let _x = s.enter();
trace!("trace");
let mut string = [0u8; 1];
// info!("press any key to continue");
// tokio::io::stdin().read(&mut string).await.expect("failed to read stdin");
warn!("warn");
}
error!("error");
}