remove dead & unused code

This commit is contained in:
OMGeeky
2023-05-20 14:43:10 +02:00
parent a8171c5fca
commit 2da59302b4
18 changed files with 160 additions and 1044 deletions

View File

@@ -1,18 +1,18 @@
use std::fmt::Debug;
use tracing::{debug, trace};
use std::future::Future;
use tokio::runtime::{Handle, Runtime};
use tokio::runtime::Handle;
use tracing::trace;
/// Run a future to completion on the current thread.
/// This is useful when you want to run a future in a blocking context.
/// This function will block the current thread until the provided future has run to completion.
///
/// # Be careful with deadlocks
pub fn run_async_blocking<T>(f: impl std::future::Future<Output = T> + Sized) -> T
where T: Debug {
pub fn run_async_blocking<T>(f: impl std::future::Future<Output=T> + Sized) -> T
where T: Debug {
trace!("run_async");
let handle = Handle::current();
handle.enter();
let _enter_guard = handle.enter();
trace!("run_async: entered handle");
let result = futures::executor::block_on(f);
trace!("run_async: got result: {:?}", result);

View File

@@ -28,6 +28,7 @@ impl From<OsString> for LocalPath {
Self::from(&path)
}
}
impl From<&OsString> for LocalPath {
fn from(path: &OsString) -> Self {
Path::new(path).into()
@@ -35,14 +36,15 @@ impl From<&OsString> for LocalPath {
}
impl<T> AsRef<T> for LocalPath
where
T: ?Sized,
<PathBuf as Deref>::Target: AsRef<T>,
where
T: ?Sized,
<PathBuf as Deref>::Target: AsRef<T>,
{
fn as_ref(&self) -> &T {
self.0.deref().as_ref()
}
}
impl Deref for LocalPath {
type Target = PathBuf;
@@ -57,11 +59,13 @@ impl Into<PathBuf> for LocalPath {
self.0
}
}
impl Into<OsString> for LocalPath {
fn into(self) -> OsString {
self.0.into_os_string()
}
}
impl<'a> Into<&'a Path> for &'a LocalPath {
fn into(self) -> &'a Path {
&self.0

View File

@@ -1,12 +1,15 @@
use std::path::{Path, PathBuf};
use crate::prelude::*;
use ignore::gitignore;
use ignore::gitignore::{Gitignore, GitignoreBuilder};
use crate::prelude::*;
#[derive(Debug)]
pub struct CommonFileFilter {
pub filter: Gitignore,
}
impl CommonFileFilter{
impl CommonFileFilter {
pub fn from_path(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let ignores = GitignoreBuilder::new(&path)

View File

@@ -1,33 +1,27 @@
use crate::async_helper::run_async_blocking;
use crate::common::LocalPath;
use crate::fs::inode::Inode;
use crate::google_drive::DriveId;
use crate::prelude::*;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::path::PathBuf;
use std::time::SystemTime;
use anyhow::anyhow;
use async_trait::async_trait;
use fuser::{FileAttr, FileType, TimeOrNow, FUSE_ROOT_ID};
use fuser::{FileAttr, FileType, FUSE_ROOT_ID, TimeOrNow};
use tracing::debug;
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use crate::common::LocalPath;
use crate::fs::inode::Inode;
use crate::prelude::*;
pub trait CommonEntry {
fn get_ino(&self) -> Inode;
fn get_name(&self) -> &OsStr;
fn get_local_path(&self) -> &LocalPath;
fn get_attr(&self) -> &FileAttr;
// fn new(
// ino: impl Into<Inode>,
// name: impl Into<OsString>,
// local_path: impl Into<LocalPath>,
// attr: FileAttr,
// ) -> Self;
}
#[async_trait]
pub trait CommonFilesystem<Entry: CommonEntry > {
pub trait CommonFilesystem<Entry: CommonEntry> {
fn get_entries(&self) -> &HashMap<Inode, Entry>;
fn get_entries_mut(&mut self) -> &mut HashMap<Inode, Entry>;
fn get_children(&self) -> &HashMap<Inode, Vec<Inode>>;
@@ -146,7 +140,7 @@ pub trait CommonFilesystem<Entry: CommonEntry > {
name: &OsStr,
mode: u16,
file_type: FileType,
parent_ino: impl Into<Inode> + Send+ Debug,
parent_ino: impl Into<Inode> + Send + Debug,
size: u64,
) -> Result<Inode>;
@@ -155,10 +149,10 @@ pub trait CommonFilesystem<Entry: CommonEntry > {
entry: Entry,
parent_ino: impl Into<Inode> + Debug,
) -> Inode
where Entry: Debug{
where Entry: Debug {
let ino = entry.get_ino();
self.get_entries_mut().insert(
ino,entry,
ino, entry,
);
self.add_child(parent_ino, &ino);

View File

@@ -1,8 +1,8 @@
use tracing::{error, instrument};
use anyhow::{anyhow, Context};
use drive3::api::{Drive, File};
use drive3::chrono::{DateTime, Utc};
use google_drive3::api::Change as DriveChange;
use tracing::{error, instrument};
use crate::google_drive::DriveId;

View File

@@ -1,11 +1,13 @@
use crate::common::LocalPath;
use crate::fs::{CommonEntry, Inode};
use crate::google_drive::DriveId;
use fuser::FileAttr;
use std::ffi::{OsStr, OsString};
use std::ffi::OsString;
use std::path::PathBuf;
use fuser::FileAttr;
use tracing::instrument;
use crate::common::LocalPath;
use crate::fs::Inode;
use crate::google_drive::DriveId;
#[derive(Debug, Clone)]
pub struct DriveEntry {
pub ino: Inode,
@@ -50,15 +52,15 @@ impl DriveEntry {
attr,
drive_metadata,
has_upstream_content_changes: true,
md5_checksum:None,
local_md5_checksum:None,
md5_checksum: None,
local_md5_checksum: None,
}
}
pub fn build_local_path(&mut self, parent: Option<LocalPath>){
pub fn build_local_path(&mut self, parent: Option<LocalPath>) {
if let Some(parent_path) = parent {
let path = parent_path.join(&self.name);
self.local_path = Some(LocalPath::from(path));
}else{
} else {
self.local_path = Some(LocalPath::from(PathBuf::from("")));
}
}

View File

@@ -1,23 +1,17 @@
use std::collections::HashMap;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread::sleep;
use std::fmt::Debug;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::anyhow;
use anyhow::Context;
use drive3::api::File;
use futures::TryFutureExt;
use tokio::io::{AsyncWriteExt, stdout};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
use crate::config::common_file_filter::CommonFileFilter;
use crate::google_drive::{DriveId, GoogleDrive};
use crate::prelude::*;
use crate::google_drive::GoogleDrive;
#[derive(Debug, Clone)]
pub struct FileCommand {
@@ -59,7 +53,6 @@ pub struct DriveFileUploader {
/// the queue of files to upload
upload_queue: Vec<PathBuf>,
receiver: Receiver<FileUploaderCommand>,
cache_dir: PathBuf,
wait_time_before_upload: Duration,
running_uploads: HashMap<String, RunningUpload>,
@@ -70,14 +63,12 @@ impl<'a> DriveFileUploader {
pub fn new(drive: GoogleDrive,
upload_filter: CommonFileFilter,
receiver: Receiver<FileUploaderCommand>,
cache_dir: PathBuf,
wait_time_before_upload: Duration) -> Self {
Self {
drive,
upload_filter,
upload_queue: Vec::new(),
receiver,
cache_dir,
wait_time_before_upload,
running_uploads: HashMap::new(),
}
@@ -142,20 +133,23 @@ impl<'a> DriveFileUploader {
let running_uploads: Option<&mut RunningUpload> = self.running_uploads.get_mut(drive_id);
if let Some(running_upload) = running_uploads {
debug!("trying to send stop command to running upload for file: {:?}", drive_id);
running_upload.stop_sender.send(()).await;
let send_stop = running_upload.stop_sender.send(()).await;
if let Err(e) = send_stop {
error!("failed to send stop command to running upload for file: {:?} with error: {}", drive_id, e);
}
debug!("waiting for running upload for file: {:?}", drive_id);
let x: &mut JoinHandle<anyhow::Result<()>> = &mut running_upload.join_handle;
tokio::join!(x);
let _join_res = tokio::join!(x);
debug!("finished waiting for running upload for file: {:?} ", drive_id);
debug!("removing running upload for file: {:?}", drive_id);
self.running_uploads.remove(drive_id);
}
}
#[instrument(skip(file_metadata, rc), fields(drive=%drive))]
#[instrument(skip(file_metadata, rc), fields(drive = % drive))]
async fn upload_file(drive: GoogleDrive,
file_metadata: drive3::api::File,
file_metadata: File,
local_path: PathBuf,
wait_time_before_upload: Duration,
rc: Receiver<()>) -> anyhow::Result<()> {
@@ -168,7 +162,7 @@ impl<'a> DriveFileUploader {
},
_ = tokio::time::sleep(wait_time_before_upload)=> {
debug!("done sleeping");
return Self::upload_file_(&drive, file_metadata, &local_path, wait_time_before_upload)
return Self::upload_file_(&drive, file_metadata, &local_path)
.await
.map_err(|e| {
error!("error uploading file: {:?}: {:?}", local_path, e);
@@ -185,13 +179,13 @@ impl<'a> DriveFileUploader {
#[instrument(skip(rc))]
async fn wait_for_cancel_signal(mut rc: Receiver<()>) {
match rc.recv().await {
Some(v) => {
Some(_v) => {
debug!("received stop signal: stopping upload");
}
_ => { warn!("received None from cancel signal receiver") }
}
}
async fn upload_file_(drive: &GoogleDrive, file_metadata: File, local_path: &PathBuf, wait_duration: Duration) -> anyhow::Result<()> {
async fn upload_file_(drive: &GoogleDrive, file_metadata: File, local_path: &PathBuf) -> anyhow::Result<()> {
debug!("uploading file: {:?}", local_path);
let path = local_path.as_path();
drive.upload_file_content_from_path(file_metadata, path).await?;
@@ -202,7 +196,3 @@ impl<'a> DriveFileUploader {
}
}
pub struct FileUploadError {
path: PathBuf,
error: anyhow::Error,
}

View File

@@ -1,20 +1,16 @@
use std::{
any::Any,
collections::HashMap,
ffi::{OsStr, OsString},
fmt::Display,
fs::OpenOptions,
os::unix::prelude::*,
path::{Path, PathBuf},
time::{Duration, SystemTime, UNIX_EPOCH},
time::{SystemTime, UNIX_EPOCH},
};
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::io::{Seek, SeekFrom, stdout, Write};
use std::ops::Deref;
use anyhow::{anyhow, Context, Error};
use async_recursion::async_recursion;
use anyhow::{anyhow, Context};
use bimap::BiMap;
use drive3::api::{File, StartPageToken};
use fuser::{
@@ -28,37 +24,17 @@ use fuser::{
ReplyDirectory,
ReplyEmpty,
ReplyEntry,
ReplyIoctl,
ReplyLock,
ReplyLseek,
ReplyOpen,
ReplyStatfs,
ReplyWrite,
ReplyXattr,
Request,
TimeOrNow,
};
use futures::TryFutureExt;
use libc::{c_int, clone};
use md5::digest::typenum::private::Trim;
use mime::Mime;
use tempfile::TempDir;
use tokio::{
io::{AsyncBufReadExt, stdin},
runtime::Runtime,
};
use libc::c_int;
use tracing::{debug, error, instrument, warn};
use tracing::field::debug;
use crate::{async_helper::run_async_blocking, common::LocalPath, config::common_file_filter::CommonFileFilter, fs::common::CommonFilesystem, fs::CommonEntry, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, google_drive, prelude::*};
use crate::{async_helper::run_async_blocking, common::LocalPath, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, prelude::*};
use crate::fs::drive::{Change, ChangeType, FileCommand, FileUploaderCommand, SyncSettings};
enum CacheState {
Missing,
UpToDate,
RefreshNeeded,
}
#[derive(Debug)]
enum ChecksumMatch {
/// when the local, the cache and the remote checksum match
@@ -87,8 +63,6 @@ enum ChecksumMatch {
#[derive(Debug)]
pub struct DriveFilesystem {
/// the point where the filesystem is mounted
root: PathBuf,
/// the source dir to read from and write to
source: GoogleDrive,
/// the cache dir to store the files in
@@ -177,14 +151,12 @@ impl DriveFilesystem {
// region general
impl DriveFilesystem {
#[instrument(skip(file_uploader_sender))]
pub async fn new(root: impl AsRef<Path> + Debug,
config_path: impl AsRef<Path> + Debug,
pub async fn new(config_path: impl AsRef<Path> + Debug,
file_uploader_sender: tokio::sync::mpsc::Sender<FileUploaderCommand>,
drive: GoogleDrive, cache_dir: PathBuf,
settings: SyncSettings) -> Result<DriveFilesystem> {
let root = root.as_ref();
let config_path = config_path.as_ref();
debug!("DriveFilesystem::new(root:{}; config_path: {})", root.display(), config_path.display());
debug!("DriveFilesystem::new(config_path: {})", config_path.display());
// let upload_filter = CommonFileFilter::from_path(config_path)?;
let mut entries = HashMap::new();
Self::add_root_entry(&mut entries);
@@ -192,7 +164,6 @@ impl DriveFilesystem {
let changes_start_token = drive.get_start_page_token().await?;
let mut s = Self {
root: root.to_path_buf(),
source: drive,
cache_dir: Some(cache_dir),
entries,
@@ -206,11 +177,6 @@ impl DriveFilesystem {
ino_drive_id: BiMap::new(),
};
s.ino_drive_id.insert(FUSE_ROOT_ID.into(), DriveId::root());
//
// let root = s.root.to_path_buf();
// s.add_dir_entry(&root, Inode::from(FUSE_ROOT_ID), true)
// .await;
Ok(s)
}
@@ -256,7 +222,7 @@ impl DriveFilesystem {
cache_dir.display()
);
let entry = self
.get_entry(&inode)
.entries.get(&inode)
.ok_or(anyhow!("could not get entry"))?;
debug!(
"get_cache_dir_for_file: entry local_path: {:?}",
@@ -269,7 +235,7 @@ impl DriveFilesystem {
#[instrument]
fn construct_cache_folder_path(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
let mut path = cache_dir.to_path_buf();
let path = cache_dir.to_path_buf();
path.join(match entry.local_path.as_ref() {
Some(x) => match x.parent() {
Some(parent) => parent.to_path_buf(),
@@ -277,14 +243,6 @@ impl DriveFilesystem {
},
None => PathBuf::new()
})
// let folder_path = match entry.local_path.parent() {
// Some(p) => p.as_os_str(),
// None => OsStr::new(""),
// };
// debug!("construct_cache_folder_path: folder_path: {:?}", folder_path);
// let path = cache_dir.join(folder_path);
// debug!("construct_cache_folder_path: {}", path.display());
// path
}
#[instrument(fields(% self))]
async fn add_all_file_entries(&mut self) -> anyhow::Result<()> {
@@ -337,10 +295,6 @@ impl DriveFilesystem {
debug!("build all local paths");
self.get_entry_mut(&DriveId::root()).expect("The root entry has to exist by now").build_local_path(None);
self.build_path_for_children(&DriveId::root());
// for (drive_id, child_id) in self.children.iter(){
// self.entries.get_mut()
// }
Ok(())
}
#[instrument(skip(self))]
@@ -374,7 +328,7 @@ impl DriveFilesystem {
existing_child_list.push(drive_id);
} else {
debug!("add_child: adding child: {:?} to parent: {:?} (new)", drive_id, parent);
let mut set = vec![drive_id];
let set = vec![drive_id];
self.children.insert(parent.clone(), set);
}
}
@@ -429,7 +383,7 @@ impl DriveFilesystem {
| "application/vnd.google-apps.presentation"
| "application/vnd.google-apps.drive-sdk"
| "application/vnd.google-apps.script"
//TODO: add all relevant mime types
//TODO: add all relevant mime types or match only the start or something
=> return Err(anyhow!("google app files are not supported (docs, sheets, etc)")),
"application/vnd.google-apps.folder" => FileType::Directory,
_ => FileType::RegularFile,
@@ -461,7 +415,7 @@ impl DriveFilesystem {
Ok(entry)
}
#[instrument(fields(% self))]
fn get_file_permissions(&self, drive_id: &DriveId, file_kind: &FileType) -> u16 {
fn get_file_permissions(&self, _drive_id: &DriveId, file_kind: &FileType) -> u16 {
//TODO: actually get the permissions from a default or some config for each file etc, not just these hardcoded ones
if file_kind == &FileType::Directory {
return 0o755;
@@ -494,13 +448,6 @@ impl DriveFilesystem {
Ok(cache_path)
}
fn check_if_file_is_cached(&self, drive_id: impl Into<DriveId> + Debug) -> Result<bool> {
let drive_id = drive_id.into();
let entry = self.get_entry_r(&drive_id)?;
let path = self.get_cache_path_for_entry(&entry)?;
let exists = path.exists();
Ok(exists)
}
#[instrument(fields(% self))]
async fn update_entry_metadata_cache_if_needed(&mut self) -> Result<Vec<DriveId>> {
debug!("getting changes...");
@@ -549,20 +496,6 @@ impl DriveFilesystem {
Ok(updated_entries)
}
fn find_entry_by_drive_id(&self, drive_id: impl Into<String>) -> Option<Inode> {
let drive_id = drive_id.into();
let mut searched_entry = None;
for (ino, entry) in self.entries.iter() {
let entry_drive_id = entry.drive_id.as_str();
if entry_drive_id == drive_id {
debug!("updating entry: {}", ino);
searched_entry = Some(entry.ino);
break;
}
}
searched_entry
}
/// Updates the entry from the drive if needed
///
/// returns true if the entry's metadata was updated from the drive
@@ -587,23 +520,6 @@ impl DriveFilesystem {
Ok(metadata_updated)
}
fn get_cache_state(&self, cache_time: &Option<SystemTime>) -> CacheState {
let refresh_cache: CacheState = match cache_time {
Some(cache_time) => {
let now = SystemTime::now();
let duration = now.duration_since(*cache_time).unwrap();
// let seconds = duration.as_secs();
if duration > self.settings.cache_time() {
CacheState::RefreshNeeded
} else {
CacheState::UpToDate
}
}
None => CacheState::Missing,
};
refresh_cache
}
fn get_cache_path_for_entry(&self, entry: &DriveEntry) -> Result<PathBuf> {
debug!("get_cache_path_for_entry: {}", entry.ino);
let cache_folder = match self.cache_dir.as_ref() {
@@ -623,22 +539,13 @@ impl DriveFilesystem {
);
path
}
async fn get_modified_time_on_remote(&self, ino: DriveId) -> Result<SystemTime> {
let entry = self.get_entry_r(&ino)?;
let drive_id = entry.drive_id.clone();
let drive = &self.source;
let modified_time = drive.get_modified_time(drive_id).await?;
Ok(modified_time)
}
fn set_entry_size(&mut self, ino: DriveId, size: u64) -> anyhow::Result<()> {
self.get_entry_mut(ino).context("no entry for ino")?.attr.size = size;
Ok(())
}
fn set_entry_metadata_with_ino(&mut self, ino: impl Into<DriveId>, drive_metadata: google_drive3::api::File) -> anyhow::Result<()> {
fn set_entry_metadata_with_ino(&mut self, ino: impl Into<DriveId>, drive_metadata: File) -> anyhow::Result<()> {
let entry = self.get_entry_mut(ino).context("no entry with ino")?;
Self::update_entry_metadata(drive_metadata, entry)
}
#[instrument]
fn update_entry_metadata(drive_metadata: File, entry: &mut DriveEntry) -> anyhow::Result<()> {
if let Some(name) = drive_metadata.name {
@@ -737,90 +644,8 @@ impl DriveFilesystem {
warn!("how could I get here?");
debug(md5_checksum);
debug(entry);
//TODO: make sure this case does not happen
return ChecksumMatch::Unknown;
/*
// let checksum_match: ChecksumMatch = match md5_checksum {
// Some(remote_md5_checksum) => {
// if let Some(cache_checksum) = &entry.md5_checksum {
// if cache_checksum != remote_md5_checksum {
// debug!("md5_checksum mismatch: {} != {}", cache_checksum, remote_md5_checksum);
// if let Some(local_md5_checksum) = &entry.local_md5_checksum {
// if local_md5_checksum == remote_md5_checksum {
// debug!("md5_checksum local match: {} == {} (local)", local_md5_checksum, remote_md5_checksum);
// ChecksumMatch::CacheMismatch
// } else {
// debug!("md5_checksum local mismatch: {} != {} (local)", local_md5_checksum, remote_md5_checksum);
// ChecksumMatch::Conflict
// }
// } else {
// debug!("md5_checksum local missing, can't compare, treating as a mismatch");
// ChecksumMatch::Mismatch
// }
// } else {
// if let Some(local_md5_checksum) = &entry.local_md5_checksum {
// if local_md5_checksum == remote_md5_checksum {
// debug!("md5_checksum local match: {} == {} (local)", local_md5_checksum, remote_md5_checksum);
// ChecksumMatch::Match
// } else {
// debug!("md5_checksum local mismatch: {} != {} (local)", local_md5_checksum, remote_md5_checksum);
// ChecksumMatch::LocalMismatch
// }
// } else {
// debug!("md5_checksum match: {} == {}", cache_checksum, remote_md5_checksum);
// ChecksumMatch::LocalMismatch
// }
// }
// } else {
// if let Some(local_md5_checksum) = &entry.local_md5_checksum {
// if local_md5_checksum == remote_md5_checksum {
// debug!("md5_checksum local match: {} == {} (local)", local_md5_checksum, remote_md5_checksum);
// ChecksumMatch::CacheMismatch
// } else {
// debug!("md5_checksum local mismatch: {} != {} (local)", local_md5_checksum, remote_md5_checksum);
// ChecksumMatch::Conflict
// }
// } else {
// debug!("local and cache md5_checksum missing but remote exists");
// ChecksumMatch::RemoteMismatch
// }
// }
// }
// None => {
// warn!("no remote md5_checksum, can't compare, treating as a missing");
// ChecksumMatch::Missing
// }
// };
// return checksum_match;
// if let Some(md5_checksum) = md5_checksum {
// if let Some(entry_checksum) = &entry.md5_checksum {
// if entry_checksum != md5_checksum {
// debug!("md5_checksum mismatch: {} != {}", entry_checksum, md5_checksum);
// if let Some(local_md5_checksum) = &entry.local_md5_checksum {
// if local_md5_checksum == md5_checksum {
// debug!("md5_checksum local match: {} == {} (local)", local_md5_checksum, md5_checksum);
// checksum_match = ChecksumMatch::CacheMismatch;
// } else {
// debug!("md5_checksum local mismatch: {} != {} (local)", local_md5_checksum, md5_checksum);
// checksum_match = ChecksumMatch::Mismatch;
// }
// } else {
// debug!("no local_md5_checksum in entry: {}", entry.ino);
// checksum_match = ChecksumMatch::Mismatch;
// }
// } else {
// debug!("md5_checksum match: {} == {}", entry_checksum, md5_checksum);
// checksum_match = ChecksumMatch::Match;
// }
// } else {
// debug!("no md5_checksum in entry: {}", entry.ino);
// checksum_match = ChecksumMatch::Missing;
// }
// } else {
// debug!("no md5_checksum in drive_metadata");
// checksum_match = ChecksumMatch::Missing;
// }
// checksum_match
*/
}
#[instrument]
@@ -855,7 +680,7 @@ impl DriveFilesystem {
changes
}
fn remove_entry(&mut self, id: &DriveId) -> anyhow::Result<()> {
let entry = self.entries.remove_entry(&id);
let _entry = self.entries.remove_entry(&id);
//TODO: remove from children
//TODO: remove from cache if it exists
@@ -863,7 +688,7 @@ impl DriveFilesystem {
}
fn get_input_from_user(message: &str, options: Vec<&str>) -> String {
let mut input = String::new();
loop{
loop {
Self::print_message_to_user(message);
let size_read = std::io::stdin().read_line(&mut input);
if let Ok(size_read) = size_read {
@@ -874,15 +699,14 @@ impl DriveFilesystem {
}
}
Self::print_message_to_user("invalid input, please try again");
}
else{
} else {
error!("could not read input from user: {:?}", size_read);
}
}
}
fn print_message_to_user(message: &str) {
let x = stdout().write_all(format!("{}\n",message).as_bytes());
let x = stdout().flush();
let _x = stdout().write_all(format!("{}\n", message).as_bytes());
let _x = stdout().flush();
}
}
@@ -890,31 +714,14 @@ impl DriveFilesystem {
// region common
impl DriveFilesystem {
pub(crate) fn get_root_path(&self) -> LocalPath {
self.root.clone().into()
}
fn generate_ino(&self) -> Inode {
self.generate_ino_with_offset(0)
}
fn generate_ino_with_offset(&self, offset: usize) -> Inode {
Inode::new((self.entries.len() + 10 + offset) as u64)
}
fn convert_to_system_time(mtime: TimeOrNow) -> SystemTime {
let mtime = match mtime {
TimeOrNow::SpecificTime(t) => t,
TimeOrNow::Now => SystemTime::now(),
};
mtime
}
fn get_entry(&self, ino: impl Into<DriveId>) -> Option<&DriveEntry> {
self.entries.get(&ino.into())
}
fn get_entry_mut(&mut self, ino: impl Into<DriveId>) -> Option<&mut DriveEntry> {
self.entries.get_mut(&ino.into())
}
fn get_entry_r<'a>(&self, ino: impl Into<&'a DriveId>) -> Result<&DriveEntry> {
let ino = ino.into();
self.entries
@@ -954,14 +761,22 @@ impl Filesystem for DriveFilesystem {
#[instrument(fields(% self))]
fn destroy(&mut self) {
debug!("destroy");
self.file_uploader_sender.send(FileUploaderCommand::Stop);
let stop_res = run_async_blocking(self.file_uploader_sender.send(FileUploaderCommand::Stop));
if let Err(e) = stop_res {
error!("could not send stop command to file uploader: {}", e);
}
}
//endregion
//region lookup
#[instrument(skip(_req, reply), fields(% self))]
fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
debug!("lookup: {}:{:?}", parent, name);
run_async_blocking(self.update_entry_metadata_cache_if_needed());
let update_res = run_async_blocking(self.update_entry_metadata_cache_if_needed());
if let Err(e) = update_res {
error!("read: could not update metadata cache: {}", e);
reply.error(libc::EIO);
return;
}
let parent = parent.into();
let parent_drive_id = self.get_drive_id_from_ino(&parent);
if parent_drive_id.is_err() {
@@ -1009,7 +824,12 @@ impl Filesystem for DriveFilesystem {
#[instrument(skip(_req, reply), fields(% self))]
fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) {
debug!("getattr: {}", ino);
run_async_blocking(self.update_entry_metadata_cache_if_needed());
let update_res = run_async_blocking(self.update_entry_metadata_cache_if_needed());
if let Err(e) = update_res {
error!("read: could not update metadata cache: {}", e);
reply.error(libc::EIO);
return;
}
debug!("getattr: after update_entry_metadata_cache_if_needed");
let drive_id = self.get_drive_id_from_ino(&ino.into());
if drive_id.is_err() {
@@ -1082,7 +902,7 @@ impl Filesystem for DriveFilesystem {
reply.error(libc::ENOENT);
return;
}
let mut entry = entry.unwrap();
let entry = entry.unwrap();
let attr = &mut entry.attr;
if let Some(mode) = mode {
@@ -1193,16 +1013,8 @@ impl Filesystem for DriveFilesystem {
file.seek(SeekFrom::Start(offset as u64)).unwrap();
file.write_all(data).unwrap();
let size = data.len();
// let size = file.write_at(data, offset as u64);
// if let Err(e) = size {
// error!("write: could not write file: {:?}: {}", path, e);
// reply.error(libc::ENOENT);
// return;
// }
// let size = size.unwrap();
debug!("wrote file: {:?} at {}; wrote {} bytes", &path, offset, size);
reply.written(size as u32);
//TODO: update size in entry if necessary
debug!("updating size to {} for entry: {:?}", entry.attr.size, entry);
let mut attr = &mut entry.attr;
if truncate {
@@ -1227,7 +1039,11 @@ impl Filesystem for DriveFilesystem {
let drive_id = drive_id.unwrap();
let entry = self.get_entry_r(drive_id)
.expect("how could this happen to me. I swear it was there a second ago");
run_async_blocking(self.schedule_upload(&entry));
let schedule_res = run_async_blocking(self.schedule_upload(&entry));
if let Err(e) = schedule_res {
error!("read: could not schedule the upload: {}", e);
return;
}
}
//endregion
//region read
@@ -1248,24 +1064,18 @@ impl Filesystem for DriveFilesystem {
ino, fh, offset, size, flags, lock_owner
);
run_async_blocking(self.update_entry_metadata_cache_if_needed());
let update_res = run_async_blocking(self.update_entry_metadata_cache_if_needed());
if let Err(e) = update_res {
error!("read: could not update metadata cache: {}", e);
reply.error(libc::EIO);
return;
}
let x: Result<bool> = run_async_blocking(self.update_cache_if_needed(ino));
if let Err(e) = x {
error!("read: could not update cache: {}", e);
reply.error(libc::EIO);
return;
}
// let is_cached = self.check_if_file_is_cached(ino);
// if !is_cached.unwrap_or(false) {
// debug!("read: file is not cached: {}", ino);
// let x: Result<PathBuf> = run_async_blocking(self.download_file_to_cache(ino));
//
// if let Err(e) = x {
// error!("read: could not download file: {}", e);
// reply.error(libc::ENOENT);
// return;
// }
// }
let drive_id = self.get_drive_id_from_ino(&ino.into());
if drive_id.is_err() {
@@ -1297,7 +1107,7 @@ impl Filesystem for DriveFilesystem {
reply.error(libc::EIO);
return;
}
let mut file = file.unwrap();
let file = file.unwrap();
let mut buf = vec![0; size as usize];
debug!("reading file: {:?} at {} with size {}", &path, offset, size);
@@ -1317,7 +1127,12 @@ impl Filesystem for DriveFilesystem {
mut reply: ReplyDirectory,
) {
debug!("readdir: {}:{}:{:?}", ino, fh, offset);
run_async_blocking(self.update_entry_metadata_cache_if_needed());
let update_res = run_async_blocking(self.update_entry_metadata_cache_if_needed());
if let Err(e) = update_res {
error!("read: could not update metadata cache: {}", e);
reply.error(libc::EIO);
return;
}
let drive_id = self.get_drive_id_from_ino(&ino.into());
if drive_id.is_err() {
warn!("readdir: could not get drive id for ino: {}", ino);
@@ -1372,7 +1187,7 @@ impl Filesystem for DriveFilesystem {
//endregion
//region access
#[instrument(fields(% self, ino, mask))]
fn access(&mut self, _req: &Request<'_>, ino: u64, mask: i32, reply: ReplyEmpty) {
fn access(&mut self, _req: &Request<'_>, _ino: u64, _mask: i32, reply: ReplyEmpty) {
reply.ok(); //TODO: implement this correctly
}
//endregion

View File

@@ -1,9 +1,9 @@
pub use entry::*;
pub use filesystem::*;
pub use file_uploader::*;
pub use settings::*;
pub use change::*;
pub use entry::*;
pub use file_uploader::*;
pub use filesystem::*;
pub use settings::*;
mod entry;
mod filesystem;
mod file_uploader;

View File

@@ -1,5 +1,4 @@
use std::fmt::{Display, Formatter};
use std::path::Path;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]

View File

@@ -24,12 +24,6 @@ impl Into<u64> for Inode {
}
}
// impl Into<Inode> for Inode{
// fn into(self) -> Inode {
// self
// }
// }
impl TryInto<u32> for Inode {
type Error = std::num::TryFromIntError;
@@ -43,6 +37,7 @@ impl From<u64> for Inode {
Inode(value)
}
}
impl From<u32> for Inode {
fn from(value: u32) -> Inode {
Inode(value as u64)

View File

@@ -1,6 +1,7 @@
mod common;
mod inode;
pub use common::*;
pub use inode::*;
mod common;
mod inode;
pub mod drive;
pub mod sample;

View File

@@ -1,481 +0,0 @@
// use crate::async_helper::run_async_in_sync;
use crate::async_helper::run_async_blocking;
use crate::common::LocalPath;
use crate::fs::common::CommonFilesystem;
use crate::fs::inode::Inode;
use crate::fs::CommonEntry;
use crate::prelude::*;
use fuser::{
FileAttr, FileType, KernelConfig, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry,
ReplyOpen, ReplyStatfs, ReplyWrite, ReplyXattr, Request, TimeOrNow, FUSE_ROOT_ID,
};
use libc::c_int;
use tracing::{debug, warn};
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{Debug, Display};
use std::fs::OpenOptions;
use std::os::unix::prelude::*;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug)]
struct SampleEntry {
pub ino: Inode,
pub name: OsString,
pub local_path: LocalPath,
pub attr: FileAttr,
}
impl SampleEntry {
// fn new(ino: impl Into<Inode>, local_path: OsString, attr: FileAttr) -> Self {
// Self {
// ino: ino.into(),
// name: OsString::new(),
// local_path: LocalPath::from(Path::new(&local_path)),
// attr,
// }
// }
fn new(
ino: impl Into<Inode>,
name: impl Into<OsString>,
local_path: impl Into<LocalPath>,
attr: FileAttr,
) -> Self {
Self {
ino: ino.into(),
name: name.into(),
local_path: local_path.into(),
attr,
}
}
}
impl CommonEntry for SampleEntry {
fn get_ino(&self) -> Inode {
self.ino
}
fn get_name(&self) -> &OsStr {
self.name.as_os_str()
}
fn get_local_path(&self) -> &LocalPath {
&self.local_path
}
fn get_attr(&self) -> &FileAttr {
&self.attr
}
}
#[derive(Debug, Default)]
pub struct SampleFilesystem {
/// the point where the filesystem is mounted
root: PathBuf,
/// the source dir to read from and write to
source: PathBuf,
/// How long the responses can/should be cached
time_to_live: Duration,
entries: HashMap<Inode, SampleEntry>,
children: HashMap<Inode, Vec<Inode>>,
/// The generation of the filesystem
/// This is used to invalidate the cache
/// when the filesystem is remounted
generation: u64,
}
impl SampleFilesystem {
pub fn new(root: impl AsRef<Path>, source: impl AsRef<Path>) -> Self {
debug!("new: {:?}; {:?}", root.as_ref(), source.as_ref());
let mut entries = HashMap::new();
// Add root directory with inode number 1
let root_attr = FileAttr {
ino: FUSE_ROOT_ID,
size: 0,
blocks: 0,
atime: UNIX_EPOCH,
mtime: UNIX_EPOCH,
ctime: UNIX_EPOCH,
crtime: UNIX_EPOCH,
kind: FileType::Directory,
perm: 0o755,
nlink: 2,
uid: 0,
gid: 0,
rdev: 0,
blksize: 4096,
flags: 0,
};
entries.insert(
FUSE_ROOT_ID.into(),
SampleEntry::new(
FUSE_ROOT_ID,
"root",
LocalPath::from(root.as_ref()),
root_attr,
),
);
Self {
root: root.as_ref().to_path_buf(),
source: source.as_ref().to_path_buf(),
time_to_live: Duration::from_secs(2),
entries,
/*TODO: implement a way to increase this if necessary*/
generation: 0,
children: HashMap::new(),
}
}
}
#[async_trait::async_trait]
impl CommonFilesystem<SampleEntry> for SampleFilesystem {
fn get_entries(&self) -> &HashMap<Inode, SampleEntry> {
&self.entries
}
fn get_entries_mut(&mut self) -> &mut HashMap<Inode, SampleEntry> {
&mut self.entries
}
fn get_children(&self) -> &HashMap<Inode, Vec<Inode>> {
&self.children
}
fn get_children_mut(&mut self) -> &mut HashMap<Inode, Vec<Inode>> {
&mut self.children
}
fn get_root_path(&self) -> LocalPath {
self.source.clone().into()
}
async fn add_entry_new(
&mut self,
name: &OsStr,
mode: u16,
file_type: FileType,
parent_ino: impl Into<Inode> + Send+ Debug,
size: u64,
) -> Result<Inode> {
let parent_ino = parent_ino.into();
let ino = self.generate_ino(); // Generate a new inode number
let now = std::time::SystemTime::now();
let attr = FileAttr {
ino: ino.into(),
size: size,
blocks: 0,
atime: now,
mtime: now,
ctime: now,
crtime: now,
kind: file_type,
perm: mode,
nlink: 1,
uid: 0,
gid: 0,
rdev: 0,
blksize: 4096,
flags: 0,
};
self.get_entries_mut()
.insert(ino, SampleEntry::new(ino, name, OsString::from(name), attr));
self.add_child(parent_ino, &ino);
Ok(ino)
}
}
impl SampleFilesystem {
async fn add_dir_entry(
&mut self,
folder_path: &Path,
parent_ino: impl Into<Inode>,
skip_self: bool,
) -> Result<()> {
let parent_ino = parent_ino.into();
let ino: Inode;
if skip_self {
ino = parent_ino;
} else {
ino = self
.add_entry_new(
folder_path.file_name().unwrap(),
/*TODO: correct permissions*/
0o755,
FileType::Directory,
parent_ino,
/*TODO: implement size for folders*/ 0,
)
.await?;
}
let d = std::fs::read_dir(folder_path);
if let Ok(d) = d {
for entry in d {
if let Ok(entry) = entry {
let path = entry.path();
let name = entry.file_name();
let metadata = entry.metadata();
if let Ok(metadata) = metadata {
if metadata.is_dir() {
self.add_dir_entry(&path, ino, false);
} else if metadata.is_file() {
let mode = metadata.mode();
let size = metadata.size();
self.add_file_entry(ino, name.as_os_str(), mode as u16, size).await;
}
}
}
}
}
Ok(())
}
}
impl fuser::Filesystem for SampleFilesystem {
fn init(
&mut self,
_req: &Request<'_>,
_config: &mut KernelConfig,
) -> std::result::Result<(), c_int> {
debug!("init");
// self.add_file_entry(1, "hello.txt".as_ref(), 0o644);
let source = self.source.clone();
debug!("init: add_dir_entry");
run_async_blocking(async {
debug!("init: add_dir_entry (async)");
self.add_dir_entry(&source, FUSE_ROOT_ID, true).await;
debug!("init: add_dir_entry done (async)");
});
debug!("init: add_dir_entry done");
// self.add_dir_entry(&source, FUSE_ROOT_ID, true);
Ok(())
}
fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
debug!("lookup: {}:{:?}", parent, name);
for (inode, entry) in self.entries.iter() {
let path: PathBuf = entry.local_path.clone().into();
let accepted = name.eq_ignore_ascii_case(&path);
debug!(
"entry: {}:(accepted={}){:?}; {:?}",
inode, accepted, path, entry.attr
);
if accepted {
reply.entry(&self.time_to_live, &entry.attr, self.generation);
return;
}
}
reply.error(libc::ENOENT);
}
fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) {
self.entries.get(&ino.into()).map(|entry| {
reply.attr(&self.time_to_live, &entry.attr);
});
}
fn readdir(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
mut offset: i64,
mut reply: ReplyDirectory,
) {
debug!("readdir: {}:{}:{:?}", ino, fh, offset);
let children = self.children.get(&ino.into());
if let Some(attr) = self.get_entries().get(&ino.into()).map(|entry| entry.attr) {
if attr.kind != FileType::Directory {
reply.error(libc::ENOTDIR);
return;
}
}
if !(children.is_none()) {} else {
reply.error(libc::ENOENT);
return;
}
let children = children.unwrap();
debug!("children ({}): {:?}", children.len(), children);
for child_inode in children.iter().skip(offset as usize) {
let entry = self.entries.get(child_inode).unwrap();
let path: PathBuf = entry.local_path.clone().into();
let attr = entry.attr;
let inode = (*child_inode).into();
// Increment the offset for each processed entry
offset += 1;
debug!("entry: {}:{:?}; {:?}", inode, path, attr);
if !reply.add(inode, offset, attr.kind, path) {
break;
}
}
debug!("readdir: ok");
reply.ok();
}
fn read(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
offset: i64,
size: u32,
flags: i32,
lock_owner: Option<u64>,
reply: ReplyData,
) {
debug!(
"read: {}:{}:{}:{}:{:#x?}:{:?}",
ino, fh, offset, size, flags, lock_owner
);
let data = self.get_entry(ino).map(|entry| entry.attr);
if let Some(attr) = data {
if attr.kind != FileType::RegularFile {
reply.error(libc::EISDIR);
return;
}
let path = self.get_full_path_from_ino(ino);
debug!("opening file: {:?}", &path);
let mut file = std::fs::File::open::<PathBuf>(path.clone().unwrap().into()).unwrap();
let mut buf = vec![0; size as usize];
debug!("reading file: {:?} at {} with size {}", &path, offset, size);
file.read_at(&mut buf, offset as u64).unwrap();
debug!("read file: {:?} at {}", &path, offset);
reply.data(&buf);
} else {
reply.error(libc::ENOENT);
}
}
fn write(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
offset: i64,
data: &[u8],
write_flags: u32,
flags: i32,
lock_owner: Option<u64>,
reply: ReplyWrite,
) {
debug!(
"write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}",
ino, fh, offset, flags, lock_owner, write_flags, data,
);
let attr = self.get_entry(ino).map(|entry| entry.attr);
if let Some(attr) = attr {
if attr.kind != FileType::RegularFile {
warn!(
"write: not a file, writing is not supported: kind:{:?}; attr:{:?}",
attr.kind, attr
);
reply.error(libc::EISDIR);
return;
}
let path = self.get_full_path_from_ino(ino);
debug!("opening file: {:?}", &path);
let mut file = OpenOptions::new()
.write(true)
.create(true)
.open::<PathBuf>(path.clone().unwrap().into())
.unwrap();
debug!(
"writing file: {:?} at {} with size {}",
&path,
offset,
data.len()
);
let size = file.write_at(data, offset as u64).unwrap();
debug!("wrote file: {:?} at {}; wrote {} bits", &path, offset, size);
reply.written(size as u32);
} else {
reply.error(libc::ENOENT);
}
}
fn setattr(
&mut self,
_req: &Request<'_>,
ino: u64,
mode: Option<u32>,
uid: Option<u32>,
gid: Option<u32>,
size: Option<u64>,
_atime: Option<TimeOrNow>,
_mtime: Option<TimeOrNow>,
_ctime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
fh: Option<u64>,
_crtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_chgtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_bkuptime: Option<SystemTime>,
flags: Option<u32>,
reply: ReplyAttr,
) {
debug!(
"setattr: {}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}",
ino,
mode,
uid,
gid,
size,
_atime,
_mtime,
_ctime,
fh,
_crtime,
_chgtime,
_bkuptime,
flags
);
let attr = self
.entries
.get_mut(&ino.into())
.map(|entry| &mut entry.attr);
if attr.is_none() {
reply.error(libc::ENOENT);
return;
}
let mut attr = attr.unwrap();
if let Some(mode) = mode {
attr.perm = mode as u16;
}
if let Some(uid) = uid {
attr.uid = uid;
}
if let Some(gid) = gid {
attr.gid = gid;
}
if let Some(size) = size {
attr.size = size;
}
if let Some(atime) = _atime {
attr.atime = Self::convert_to_system_time(atime);
}
if let Some(mtime) = _mtime {
attr.mtime = Self::convert_to_system_time(mtime);
}
if let Some(ctime) = _ctime {
attr.ctime = ctime;
}
if let Some(crtime) = _crtime {
attr.crtime = crtime;
}
if let Some(flags) = flags {
attr.flags = flags;
}
reply.attr(&self.time_to_live, attr);
}
fn access(&mut self, _req: &Request<'_>, ino: u64, mask: i32, reply: ReplyEmpty) {
reply.ok(); //TODO: implement this a bit better/more useful
}
}

View File

@@ -1,31 +1,21 @@
use std::ffi::{OsStr, OsString};
use std::fmt::{Debug, Display, Error};
use std::fmt::{Debug, Display};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::SystemTime;
// use drive3::api::Scope::File;
use anyhow::{anyhow, Context};
use drive3::{hyper_rustls, oauth2};
use drive3::api::{Change, File, Scope, StartPageToken};
use drive3::chrono::{DateTime, Utc};
use drive3::client::ReadSeek;
use drive3::DriveHub;
use drive3::hyper::{body, Body, Response};
use drive3::hyper::body::HttpBody;
use drive3::hyper::{Body, Response};
use drive3::hyper::client::HttpConnector;
use drive3::hyper_rustls::HttpsConnector;
use futures::{Stream, StreamExt};
use hyper::Client;
use mime::{FromStrError, Mime};
use tokio::{fs, io};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
use tokio::fs;
use tracing::{debug, error, instrument, trace, warn};
use tracing::field::debug;
use crate::google_drive::{drive, DriveId, helpers};
use crate::google_drive::{DriveId, helpers};
use crate::prelude::*;
const FIELDS_FILE: &str = "id, name, size, mimeType, kind, md5Checksum, parents,trashed, createdTime, modifiedTime, viewedByMeTime";
@@ -54,7 +44,7 @@ impl GoogleDrive {
if let Some(page_token) = page_token {
request = request.page_token(&page_token);
}
let (response, result) = request
let (_response, result) = request
.doit()
.await?;
let result_files = result.files.ok_or(anyhow!("no file list returned"))?;
@@ -67,47 +57,6 @@ impl GoogleDrive {
}
Ok(files)
}
//
// #[instrument]
// pub async fn list_files(&self, folder_id: DriveId) -> anyhow::Result<Vec<File>> {
// debug!("list_files: folder_id: {:?}", folder_id);
// let folder_id: OsString = folder_id.into();
// let folder_id = match folder_id.into_string() {
// Ok(folder_id) => folder_id,
// Err(_) => return Err(anyhow!("invalid folder_id")),
// };
// if folder_id.is_empty() {
// return Err(anyhow!("folder_id is empty"));
// }
// if folder_id.contains('\'') {
// return Err(anyhow!("folder_id contains invalid character"));
// }
// let mut files = Vec::new();
// let mut page_token = None;
// loop {
// debug!("list_files: page_token: {:?}", page_token);
// let (response, result) = self
// .hub
// .files()
// .list()
// .param(
// "fields",
// &format!("nextPageToken, files({})", FIELDS_FILE),
// )
// // .page_token(page_token.as_ref().map(String::as_str))
// .q(format!("'{}' in parents", folder_id).as_str())
// .doit()
// .await?;
// let result_files = result.files.ok_or(anyhow!("no file list returned"))?;
// debug!("list_files: response: {:?}", result_files.len());
// files.extend(result_files);
// page_token = result.next_page_token;
// if page_token.is_none() {
// break;
// }
// }
// Ok(files)
// }
}
impl GoogleDrive {
@@ -168,7 +117,7 @@ impl GoogleDrive {
#[instrument]
pub(crate) async fn get_metadata_for_file(&self, drive_id: DriveId) -> anyhow::Result<File> {
let drive_id = drive_id.to_string();
let (response, file) = self
let (_response, file) = self
.hub
.files()
.get(&drive_id)
@@ -187,16 +136,6 @@ impl GoogleDrive {
}
}
impl GoogleDrive {
pub(crate) async fn get_modified_time(&self, drive_id: DriveId) -> Result<SystemTime> {
let drive_id: OsString = drive_id.into();
let drive_id = drive_id.into_string().map_err(|_| anyhow!("invalid drive_id"))?;
let (response, file) = self.hub.files().get(&drive_id).param("fields", "modifiedTime").doit().await?;
let x = file.modified_time.ok_or_else(|| anyhow!("modified_time not found"))?;
Ok(x.into())
}
}
impl GoogleDrive {
#[instrument]
pub async fn download_file(&self, file_id: DriveId, target_file: &PathBuf) -> Result<File> {
@@ -250,7 +189,7 @@ impl GoogleDrive {
.param("fields", "files(id)")
.doit()
.await;
let (response, files) = match req {
let (_response, files) = match req {
Ok((response, files)) => (response, files),
Err(e) => {
warn!("get_id: Error: {}", e);
@@ -300,7 +239,7 @@ impl GoogleDrive {
);
let hub = DriveHub::new(http_client, auth);
let mut drive = GoogleDrive { hub };
let drive = GoogleDrive { hub };
Ok(drive)
}
#[instrument]
@@ -321,7 +260,7 @@ impl GoogleDrive {
let mut page_token = None;
loop {
debug!("list_files: page_token: {:?}", page_token);
let (response, result) = self
let (_response, result) = self
.hub
.files()
.list()
@@ -381,25 +320,11 @@ pub async fn sample() -> Result<()> {
Ok(())
}
async fn download_file(
hub: &GoogleDrive,
file: &drive3::api::File,
target_path: &Path,
) -> Result<File> {
if let Some(id) = &file.id {
download_file_by_id(hub, id, target_path).await
} else {
Err("file id not found".into())
}
}
async fn download_file_by_id(
hub: &GoogleDrive,
id: impl Into<String>,
target_path: &Path,
) -> Result<File> {
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
let id = id.into();
let (response, content): (Response<Body>, google_drive3::api::File) = hub
.hub
@@ -434,7 +359,7 @@ async fn write_body_to_file(response: Response<Body>, target_path: &Path) -> Res
let mut file = std::fs::File::create(target_path)?;
let mut stream = response.into_body();
let mut buffer = bytes::BytesMut::new();
let _buffer = bytes::BytesMut::new();
let mut counter = 0;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
@@ -448,7 +373,7 @@ async fn write_body_to_file(response: Response<Body>, target_path: &Path) -> Res
async fn get_file_header_by_id(hub: &GoogleDrive, id: &str) -> Result<File> {
debug!("get_file_header_by_id(): id: {:?}", id);
let (response, content) = hub.hub.files().get(id).doit().await?;
let (_response, content) = hub.hub.files().get(id).doit().await?;
Ok(content)
}
@@ -505,7 +430,7 @@ async fn sample_list_files(drive: &GoogleDrive) -> Result<()> {
Ok(())
}
async fn create_file_on_drive_from_path(
pub async fn create_file_on_drive_from_path(
drive: &GoogleDrive,
file: File,
path: &Path,
@@ -516,7 +441,7 @@ async fn create_file_on_drive_from_path(
Ok(())
}
async fn create_file_on_drive(
pub async fn create_file_on_drive(
drive: &GoogleDrive,
file: google_drive3::api::File,
mime_type: mime::Mime,
@@ -557,11 +482,10 @@ pub async fn update_file_content_on_drive_from_path(
#[instrument(skip(file, content))]
async fn update_file_content_on_drive(
drive: &GoogleDrive,
file: google_drive3::api::File,
file: File,
content: fs::File,
) -> anyhow::Result<()> {
let stream = content.into_std().await;
// let stream = content;
let mime_type = helpers::get_mime_from_file_metadata(&file)?;
let id = file.drive_id.clone().with_context(|| "file metadata has no drive id")?;
debug!("starting upload");

View File

@@ -1,7 +1,5 @@
use std::ffi::OsString;
use std::fmt::{Display, Pointer};
use anyhow::Context;
use std::fmt::Display;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DriveId(String);

View File

@@ -1,69 +1,10 @@
use crate::fs::drive::DriveFilesystem;
use crate::fs::{CommonFilesystem, Inode};
use crate::google_drive::{DriveId, GoogleDrive};
use crate::prelude::*;
use anyhow::anyhow;
use drive3::api::File;
use tracing::debug;
use mime::Mime;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use drive3::api::File;
use mime::Mime;
pub fn get_mime_from_file_metadata(file: &File) -> anyhow::Result<Mime> {
Ok(Mime::from_str(
&file.mime_type.as_ref().unwrap_or(&"*/*".to_string()),
)?)
}
// pub fn get_drive_id_from_local_path(drive: &DriveFilesystem, path: &Path) -> Result<DriveId> {
// let drive_mount_point: &PathBuf = &drive.get_root_path().into();
// debug!("get_drive_id_from_path(): (0) path: '{}'", path.display());
// let path = match path.strip_prefix(drive_mount_point) {
// Err(e) => {
// return Err(anyhow!(
// "Path {:?} is not a prefix of {:?}",
// drive_mount_point,
// path
// ))?
// }
// Ok(path) => path,
// };
// debug!("get_drive_id_from_path(): (1) path: '{}'", path.display());
// if path == Path::new("/") || path == Path::new("") {
// debug!(
// "get_drive_id_from_path(): (1) path is root: '{}'",
// path.display()
// );
// return Ok("root".into());
// }
//
// let mut parent_ino: Inode = 5u32.into();
// // let mut parent_ino : Inode =Inode::from(5u32);//.into();
// for part in path.iter() {
// debug!("get_drive_id_from_path(): (2..) path: '{:?}'", part);
//
// let children = drive.get_children().get(&parent_ino);
// debug!("get_drive_id_from_path(): (2..) children: '{:?}'", children);
// }
// todo!("get_drive_id_from_path()")
// }
mod test {
use super::*;
// #[tokio::test]
// async fn test_get_drive_id_from_local_path() {
// crate::init_logger();
// let path = Path::new("/drive1");
// let drive = DriveFilesystem::new(path, GoogleDrive::new().await?).await;
// let drive_mount_point = Path::new("/drive1");
//
// let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
// assert_eq!(drive_id, "root".into());
//
// let path = Path::new("/drive1/");
// let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
// assert_eq!(drive_id, "root".into());
//
// let path = Path::new("/drive1/dir1/dir2/file1.txt");
// let drive_id = get_drive_id_from_local_path(&drive, path).unwrap();
// todo!("create assert for this test");
// // assert_eq!(drive_id, "TODO".into());
// }
}

View File

@@ -1,8 +1,10 @@
mod helpers;
pub use drive::*;
pub use drive_id::*;
pub use helpers::*;
mod helpers;
mod drive;
mod drive_id;
pub use drive_id::*;

View File

@@ -4,26 +4,22 @@ extern crate google_drive3 as drive3;
use std::path::Path;
use std::time::{Duration};
use std::time::Duration;
use fuser::{Session, SessionUnmounter,MountOption};
use fuser::{MountOption, Session, SessionUnmounter};
// use nix;
use tempfile::TempDir;
// use tokio::io::{AsyncReadExt, stdin};
// use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{ Sender};
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tracing::{debug, info};
use tracing::{debug, info};
use prelude::*;
use crate::config::common_file_filter::CommonFileFilter;
use crate::fs::drive::{DriveFilesystem, DriveFileUploader, FileUploaderCommand, SyncSettings};
use crate::fs::sample::SampleFilesystem;
use crate::google_drive::GoogleDrive;
pub mod async_helper;
@@ -33,19 +29,6 @@ pub mod google_drive;
pub mod prelude;
pub mod config;
pub async fn sample_fs() -> Result<()> {
let mountpoint = "/tmp/fuse/1";
let source = "/tmp/fuse/2";
let options = vec![MountOption::RW];
debug!("Mounting fuse filesystem at {}", mountpoint);
let fs = SampleFilesystem::new(mountpoint, source);
fuser::mount2(fs, mountpoint, &options).unwrap();
debug!("Exiting...");
Ok(())
}
pub async fn sample_drive_fs() -> Result<()> {
let mountpoint = "/tmp/fuse/3";
let upload_ignore_path = Path::new("config/.upload_ignore");
@@ -61,30 +44,22 @@ pub async fn sample_drive_fs() -> Result<()> {
let mut file_uploader = DriveFileUploader::new(drive.clone(),
upload_ignore,
file_uploader_receiver,
cache_dir.path().to_path_buf(),
Duration::from_secs(3));
debug!("Mounting fuse filesystem at {}", mountpoint);
let fs = DriveFilesystem::new(mountpoint,
Path::new(""),
file_uploader_sender.clone(),
drive,
cache_dir.into_path(),
sync_settings,
let fs = DriveFilesystem::new(
Path::new(""),
file_uploader_sender.clone(),
drive,
cache_dir.into_path(),
sync_settings,
).await?;
// let session_unmounter =
let mount_options = vec![MountOption::RW];
let uploader_handle: JoinHandle<()> = tokio::spawn(async move { file_uploader.listen().await; });
let end_signal_handle: JoinHandle<()> = mount(fs, &mountpoint, &mount_options, file_uploader_sender).await?;
tokio::try_join!(uploader_handle, end_signal_handle)?;
// tokio::spawn(async move {
// end_program_signal_awaiter(file_uploader_sender, session_unmounter).await?;
// });
// fuser::mount2(fs, &mountpoint, &options).unwrap();
debug!("Exiting gracefully...");
Ok(())
}
@@ -129,49 +104,3 @@ async fn end_program_signal_awaiter(file_uploader_sender: Sender<FileUploaderCom
info!("unmounted");
Ok(())
}
/*
// pub async fn watch_file_reading() -> Result<()> {
// let temp_file = tempfile::NamedTempFile::new()?;
// let file_path = temp_file.path();
// info!("File path: {:?}", file_path);
// use notify::{recommended_watcher, RecursiveMode, Watcher};
// let mut config = notify::Config::default();
// let mut watcher: INotifyWatcher = Watcher::new(MyReadHandler, config).unwrap();
// watcher
// .watch(file_path, RecursiveMode::NonRecursive)
// .unwrap();
//
// info!("Press any key to exit...");
// let x = &mut [0u8; 1];
// stdin().read(x).await?;
// debug!("Done");
// Ok(())
// }
// struct MyReadHandler;
// impl notify::EventHandler for MyReadHandler {
// fn handle_event(&mut self, event: std::result::Result<notify::Event, notify::Error>) {
// debug!("File read: {:?}", event);
// }
// }
//
// pub async fn sample_nix() -> Result<()> {
// info!("Hello, world! (nix)");
// let tmppath = tempfile::tempdir()?;
// nix::mount::mount(
// // Some("/home/omgeeky/Documents/testmount/"),
// None::<&str>,
// tmppath.path(),
// Some("tmpfs"),
// nix::mount::MsFlags::empty(),
// None::<&str>,
// );
// info!("Mounted tmpfs at {:?}", tmppath.path());
// info!("Press any key to exit (nix)...");
// // block execution until keyboard input is received
// nix::unistd::read(0, &mut [0])?;
// nix::mount::umount(tmppath.path()).unwrap();
// info!("Done (nix)");
// Ok(())
// }
*/