This commit is contained in:
OMGeeky
2023-05-21 12:13:38 +02:00
parent ca41a177b0
commit 8f6c15acc6
15 changed files with 421 additions and 220 deletions

View File

@@ -8,8 +8,10 @@ use tracing::trace;
/// This function will block the current thread until the provided future has run to completion.
///
/// # Be careful with deadlocks
pub fn run_async_blocking<T>(f: impl std::future::Future<Output=T> + Sized) -> T
where T: Debug {
pub fn run_async_blocking<T>(f: impl std::future::Future<Output = T> + Sized) -> T
where
T: Debug,
{
trace!("run_async");
let handle = Handle::current();
let _enter_guard = handle.enter();

View File

@@ -36,9 +36,9 @@ impl From<&OsString> for LocalPath {
}
impl<T> AsRef<T> for LocalPath
where
T: ?Sized,
<PathBuf as Deref>::Target: AsRef<T>,
where
T: ?Sized,
<PathBuf as Deref>::Target: AsRef<T>,
{
fn as_ref(&self) -> &T {
self.0.deref().as_ref()

View File

@@ -12,11 +12,8 @@ pub struct CommonFileFilter {
impl CommonFileFilter {
pub fn from_path(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let ignores = GitignoreBuilder::new(&path)
.build()?;
let s = Self {
filter: ignores,
};
let ignores = GitignoreBuilder::new(&path).build()?;
let s = Self { filter: ignores };
Ok(s)
}
pub fn is_filter_matched(&self, path: &Path) -> Result<bool> {

View File

@@ -1 +1 @@
pub mod common_file_filter;
pub mod common_file_filter;

View File

@@ -14,14 +14,23 @@ pub enum ChangeType {
}
impl ChangeType {
fn from_drive_change(change_type: Option<String>, file: Option<File>, drive: Option<Drive>, removed: bool) -> anyhow::Result<ChangeType> {
fn from_drive_change(
change_type: Option<String>,
file: Option<File>,
drive: Option<Drive>,
removed: bool,
) -> anyhow::Result<ChangeType> {
if removed {
return Ok(Self::Removed);
}
if let Some(change_type) = change_type {
match change_type.as_str() {
"drive" => Ok(Self::Drive(drive.context("no drive but change type was drive")?)),
"file" => Ok(Self::File(file.context("no file but change type was file")?)),
"drive" => Ok(Self::Drive(
drive.context("no drive but change type was drive")?,
)),
"file" => Ok(Self::File(
file.context("no file but change type was file")?,
)),
_ => Err(anyhow!("invalid change type: {}", change_type)),
}
} else {
@@ -50,9 +59,14 @@ impl TryFrom<DriveChange> for Change {
}
Ok(Self {
drive_id: DriveId::from(drive_id?),
kind: ChangeType::from_drive_change(drive_change.change_type, drive_change.file, drive_change.drive, removed)?,
kind: ChangeType::from_drive_change(
drive_change.change_type,
drive_change.file,
drive_change.drive,
removed,
)?,
time: drive_change.time.context("time is missing")?,
removed,
})
}
}
}

View File

@@ -40,7 +40,6 @@ pub enum FileUploaderCommand {
CreateFolder(FileCommand),
CreateFile(FileCommand),
Stop,
}
#[derive(Debug)]
@@ -60,10 +59,12 @@ pub struct DriveFileUploader {
impl<'a> DriveFileUploader {
#[instrument]
pub fn new(drive: GoogleDrive,
upload_filter: CommonFileFilter,
receiver: Receiver<FileUploaderCommand>,
wait_time_before_upload: Duration) -> Self {
pub fn new(
drive: GoogleDrive,
upload_filter: CommonFileFilter,
receiver: Receiver<FileUploaderCommand>,
wait_time_before_upload: Duration,
) -> Self {
Self {
drive,
upload_filter,
@@ -89,23 +90,39 @@ impl<'a> DriveFileUploader {
let file_metadata = file_command.file_metadata;
if !self.upload_filter.is_filter_matched(&path).unwrap_or(false) {
let drive = self.drive.clone();
let drive_id = file_metadata.drive_id.clone().with_context(|| "no drive_id");
let drive_id = file_metadata
.drive_id
.clone()
.with_context(|| "no drive_id");
if let Err(e) = drive_id {
error!("failed to upload file: {:?} with error: {}", path, e);
continue;
}
let drive_id = drive_id.unwrap();
self.cancel_and_wait_for_running_upload_for_id(&drive_id).await;
self.cancel_and_wait_for_running_upload_for_id(&drive_id)
.await;
info!("queuing upload of file: {:?}", path);
let wait_time_before_upload = self.wait_time_before_upload.clone();
let (rx, rc) = channel(1);
let upload_handle = tokio::spawn(async move { Self::upload_file(drive, file_metadata, path, wait_time_before_upload, rc).await });
self.running_uploads.insert(drive_id, RunningUpload {
join_handle: upload_handle,
stop_sender: rx,
let upload_handle = tokio::spawn(async move {
Self::upload_file(
drive,
file_metadata,
path,
wait_time_before_upload,
rc,
)
.await
});
self.running_uploads.insert(
drive_id,
RunningUpload {
join_handle: upload_handle,
stop_sender: rx,
},
);
} else {
info!("skipping upload of file since it is ignored: {:?}", path);
}
@@ -114,11 +131,15 @@ impl<'a> DriveFileUploader {
info!("received stop command: stopping file upload listener");
break;
}
_ => { warn!("received unknown command: {:?}", command); }
_ => {
warn!("received unknown command: {:?}", command);
}
};
} else {
warn!("received None command, meaning all senders have been dropped. \
stopping file upload listener since no more commands will be received");
warn!(
"received None command, meaning all senders have been dropped. \
stopping file upload listener since no more commands will be received"
);
break;
}
}
@@ -132,29 +153,44 @@ impl<'a> DriveFileUploader {
debug!("checking for running uploads for file: {:?}", drive_id);
let running_uploads: Option<&mut RunningUpload> = self.running_uploads.get_mut(drive_id);
if let Some(running_upload) = running_uploads {
debug!("trying to send stop command to running upload for file: {:?}", drive_id);
debug!(
"trying to send stop command to running upload for file: {:?}",
drive_id
);
let send_stop = running_upload.stop_sender.send(()).await;
if let Err(e) = send_stop {
error!("failed to send stop command to running upload for file: {:?} with error: {}", drive_id, e);
error!(
"failed to send stop command to running upload for file: {:?} with error: {}",
drive_id, e
);
}
debug!("waiting for running upload for file: {:?}", drive_id);
let x: &mut JoinHandle<anyhow::Result<()>> = &mut running_upload.join_handle;
let _join_res = tokio::join!(x);
debug!("finished waiting for running upload for file: {:?} ", drive_id);
debug!(
"finished waiting for running upload for file: {:?} ",
drive_id
);
debug!("removing running upload for file: {:?}", drive_id);
self.running_uploads.remove(drive_id);
}
}
#[instrument(skip(file_metadata, rc), fields(drive = % drive))]
async fn upload_file(drive: GoogleDrive,
file_metadata: File,
local_path: PathBuf,
wait_time_before_upload: Duration,
rc: Receiver<()>) -> anyhow::Result<()> {
async fn upload_file(
drive: GoogleDrive,
file_metadata: File,
local_path: PathBuf,
wait_time_before_upload: Duration,
rc: Receiver<()>,
) -> anyhow::Result<()> {
// debug!("uploading file: {:?}", local_path);
debug!("sleeping for {:?} before uploading {}", wait_time_before_upload, local_path.display());
debug!(
"sleeping for {:?} before uploading {}",
wait_time_before_upload,
local_path.display()
);
tokio::select! {
_ = Self::wait_for_cancel_signal(rc) => {
debug!("received stop signal: stopping upload");
@@ -182,17 +218,24 @@ impl<'a> DriveFileUploader {
Some(_v) => {
debug!("received stop signal: stopping upload");
}
_ => { warn!("received None from cancel signal receiver") }
_ => {
warn!("received None from cancel signal receiver")
}
}
}
async fn upload_file_(drive: &GoogleDrive, file_metadata: File, local_path: &PathBuf) -> anyhow::Result<()> {
async fn upload_file_(
drive: &GoogleDrive,
file_metadata: File,
local_path: &PathBuf,
) -> anyhow::Result<()> {
debug!("uploading file: {:?}", local_path);
let path = local_path.as_path();
drive.upload_file_content_from_path(file_metadata, path).await?;
drive
.upload_file_content_from_path(file_metadata, path)
.await?;
// let result = drive.list_files(DriveId::from("root")).await.with_context(|| format!("could not do it"))?;
debug!("upload_file_: done");
Ok(())
}
}

View File

@@ -1,3 +1,5 @@
use std::fmt::{Debug, Formatter};
use std::io::{stdout, Seek, SeekFrom, Write};
use std::{
collections::HashMap,
ffi::{OsStr, OsString},
@@ -7,33 +9,27 @@ use std::{
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use std::fmt::{Debug, Formatter};
use std::io::{Seek, SeekFrom, stdout, Write};
use anyhow::{anyhow, Context};
use bimap::BiMap;
use drive3::api::{File, StartPageToken};
use fuser::{
FileAttr,
Filesystem,
FileType,
FUSE_ROOT_ID,
KernelConfig,
ReplyAttr,
ReplyData,
ReplyDirectory,
ReplyEmpty,
ReplyEntry,
ReplyWrite,
Request,
TimeOrNow,
FileAttr, FileType, Filesystem, KernelConfig, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty,
ReplyEntry, ReplyOpen, ReplyWrite, Request, TimeOrNow, FUSE_ROOT_ID,
};
use libc::c_int;
use tracing::{debug, error, instrument, warn};
use tracing::field::debug;
use tracing::{debug, error, instrument, warn};
use crate::{async_helper::run_async_blocking, common::LocalPath, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, prelude::*};
use crate::fs::drive::{Change, ChangeType, FileCommand, FileUploaderCommand, SyncSettings};
use crate::{
async_helper::run_async_blocking,
common::LocalPath,
fs::drive::DriveEntry,
fs::inode::Inode,
google_drive::{DriveId, GoogleDrive},
prelude::*,
};
#[derive(Debug)]
enum ChecksumMatch {
@@ -100,13 +96,15 @@ pub struct DriveFilesystem {
impl Display for DriveFilesystem {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DriveFilesystem {{ entries: {} }}",
// write!(f, "DriveFilesystem {{ entries: {}, gen: {}, settings: {} }}",
// self.root.display(),
// self.cache_dir.as_ref().map(|dir| dir.path().display()),
self.entries.len(),
// self.generation,
// self.settings,
write!(
f,
"DriveFilesystem {{ entries: {} }}",
// write!(f, "DriveFilesystem {{ entries: {}, gen: {}, settings: {} }}",
// self.root.display(),
// self.cache_dir.as_ref().map(|dir| dir.path().display()),
self.entries.len(),
// self.generation,
// self.settings,
)
}
}
@@ -118,7 +116,11 @@ impl DriveFilesystem {
let path = self.get_cache_path_for_entry(entry)?;
let metadata = Self::create_drive_metadata_from_entry(entry)?;
debug!("schedule_upload: sending path to file uploader...");
self.file_uploader_sender.send(FileUploaderCommand::UploadChange(FileCommand::new(path, metadata))).await?;
self.file_uploader_sender
.send(FileUploaderCommand::UploadChange(FileCommand::new(
path, metadata,
)))
.await?;
debug!("schedule_upload: sent path to file uploader");
Ok(())
}
@@ -141,22 +143,32 @@ impl DriveFilesystem {
}
fn get_drive_id_from_ino(&self, parent: impl Into<Inode>) -> anyhow::Result<&DriveId> {
self.ino_drive_id.get_by_left(&parent.into()).context("could not get drive id for ino")
self.ino_drive_id
.get_by_left(&parent.into())
.context("could not get drive id for ino")
}
fn get_ino_from_drive_id(&self, parent: impl Into<DriveId>) -> anyhow::Result<&Inode> {
self.ino_drive_id.get_by_right(&parent.into()).context("could not get drive id for ino")
self.ino_drive_id
.get_by_right(&parent.into())
.context("could not get drive id for ino")
}
}
// region general
impl DriveFilesystem {
#[instrument(skip(file_uploader_sender))]
pub async fn new(config_path: impl AsRef<Path> + Debug,
file_uploader_sender: tokio::sync::mpsc::Sender<FileUploaderCommand>,
drive: GoogleDrive, cache_dir: PathBuf,
settings: SyncSettings) -> Result<DriveFilesystem> {
pub async fn new(
config_path: impl AsRef<Path> + Debug,
file_uploader_sender: tokio::sync::mpsc::Sender<FileUploaderCommand>,
drive: GoogleDrive,
cache_dir: PathBuf,
settings: SyncSettings,
) -> Result<DriveFilesystem> {
let config_path = config_path.as_ref();
debug!("DriveFilesystem::new(config_path: {})", config_path.display());
debug!(
"DriveFilesystem::new(config_path: {})",
config_path.display()
);
// let upload_filter = CommonFileFilter::from_path(config_path)?;
let mut entries = HashMap::new();
Self::add_root_entry(&mut entries);
@@ -203,13 +215,7 @@ impl DriveFilesystem {
let inode = Inode::from(FUSE_ROOT_ID);
entries.insert(
DriveId::root(),
DriveEntry::new(
inode,
"root".to_string(),
DriveId::root(),
root_attr,
None,
),
DriveEntry::new(inode, "root".to_string(), DriveId::root(), root_attr, None),
);
}
#[instrument(fields(% self, inode))]
@@ -222,7 +228,8 @@ impl DriveFilesystem {
cache_dir.display()
);
let entry = self
.entries.get(&inode)
.entries
.get(&inode)
.ok_or(anyhow!("could not get entry"))?;
debug!(
"get_cache_dir_for_file: entry local_path: {:?}",
@@ -239,9 +246,9 @@ impl DriveFilesystem {
path.join(match entry.local_path.as_ref() {
Some(x) => match x.parent() {
Some(parent) => parent.to_path_buf(),
None => PathBuf::new()
None => PathBuf::new(),
},
None => PathBuf::new()
None => PathBuf::new(),
})
}
#[instrument(fields(% self))]
@@ -251,7 +258,8 @@ impl DriveFilesystem {
let mut entries = HashMap::new();
self.children = HashMap::new();
self.ino_drive_id = BiMap::new();
self.ino_drive_id.insert(Inode::from(FUSE_ROOT_ID), DriveId::root());
self.ino_drive_id
.insert(Inode::from(FUSE_ROOT_ID), DriveId::root());
let alternative_rood_id = self
.source
.get_metadata_for_file(DriveId::root())
@@ -266,12 +274,19 @@ impl DriveFilesystem {
let entry = self.create_entry_from_drive_metadata(&metadata, inode);
if let Ok(entry) = entry {
let inode = entry.ino.clone();
debug!("add_all_file_entries: adding entry: ({}) {:?}", inode, entry);
debug!(
"add_all_file_entries: adding entry: ({}) {:?}",
inode, entry
);
let drive_id = entry.drive_id.clone();
entries.insert(drive_id.clone(), entry);
self.ino_drive_id.insert(inode, drive_id.clone());
if let Some(parents) = metadata.parents {
debug!("drive_id: {:<40} has parents: {}", drive_id.to_string(), parents.len());
debug!(
"drive_id: {:<40} has parents: {}",
drive_id.to_string(),
parents.len()
);
let parents = parents.iter().map(|p| DriveId::from(p));
for parent in parents {
if parent.to_string() == alternative_rood_id.to_string() {
@@ -282,31 +297,54 @@ impl DriveFilesystem {
self.add_child(drive_id.clone(), &parent);
}
} else {
debug!("drive_id: {:<40} does not have parents", drive_id.to_string());
debug!(
"drive_id: {:<40} does not have parents",
drive_id.to_string()
);
//does not belong to any folder, add to root
self.add_child(drive_id, &DriveId::root());
}
} else {
warn!("add_all_file_entries: could not create entry! err: {:?} metadata:{:?}", entry, metadata);
warn!(
"add_all_file_entries: could not create entry! err: {:?} metadata:{:?}",
entry, metadata
);
}
}
debug!("add_all_file_entries: entries: new len: {} old len: {}", entries.len(), old_len);
debug!(
"add_all_file_entries: entries: new len: {} old len: {}",
entries.len(),
old_len
);
self.entries = entries;
debug!("build all local paths");
self.get_entry_mut(&DriveId::root()).expect("The root entry has to exist by now").build_local_path(None);
self.get_entry_mut(&DriveId::root())
.expect("The root entry has to exist by now")
.build_local_path(None);
self.build_path_for_children(&DriveId::root());
Ok(())
}
#[instrument(skip(self))]
fn build_path_for_children(&mut self, parent_id: &DriveId) {
let parent = self.entries.get(parent_id).expect("parent entry has to exist");
debug!("build_path_for_children: parent: {:<40} => {:?}", parent.drive_id.to_string(), parent.name);
let parent = self
.entries
.get(parent_id)
.expect("parent entry has to exist");
debug!(
"build_path_for_children: parent: {:<40} => {:?}",
parent.drive_id.to_string(),
parent.name
);
if let Some(child_list) = self.children.get(&parent_id) {
debug!("build_path_for_children: ({}) child_list: {:?}", child_list.len(), child_list);
debug!(
"build_path_for_children: ({}) child_list: {:?}",
child_list.len(),
child_list
);
for child_id in child_list.clone() {
let parent: Option<LocalPath> = match self.entries.get(parent_id) {
Some(e) => e.local_path.clone(),
None => None
None => None,
};
let child = self.entries.get_mut(&child_id);
if let Some(child) = child {
@@ -314,9 +352,12 @@ impl DriveFilesystem {
} else {
warn!("add_all_file_entries: could not find child entry!");
}
debug!("build_path_for_children: child: {:?} parent: {:?}", child_id, parent_id);
debug!(
"build_path_for_children: child: {:?} parent: {:?}",
child_id, parent_id
);
self.build_path_for_children(&child_id);
};
}
}
}
@@ -324,10 +365,16 @@ impl DriveFilesystem {
fn add_child(&mut self, drive_id: DriveId, parent: &DriveId) {
let existing_child_list = self.children.get_mut(&parent);
if let Some(existing_child_list) = existing_child_list {
debug!("add_child: adding child: {:?} to parent: {:?}", drive_id, parent);
debug!(
"add_child: adding child: {:?} to parent: {:?}",
drive_id, parent
);
existing_child_list.push(drive_id);
} else {
debug!("add_child: adding child: {:?} to parent: {:?} (new)", drive_id, parent);
debug!(
"add_child: adding child: {:?} to parent: {:?} (new)",
drive_id, parent
);
let set = vec![drive_id];
self.children.insert(parent.clone(), set);
}
@@ -371,10 +418,9 @@ impl DriveFilesystem {
}
let ino = inode;
let id = DriveId::from(metadata.id.as_ref().context("could not get id")?);
let mime_type = metadata
.mime_type
.as_ref()
.context("could not determine if this is a file or a folder since the mime type was empty")?;
let mime_type = metadata.mime_type.as_ref().context(
"could not determine if this is a file or a folder since the mime type was empty",
)?;
let kind = match mime_type.as_str() {
"application/vnd.google-apps.document"
| "application/vnd.google-apps.spreadsheet"
@@ -396,10 +442,19 @@ impl DriveFilesystem {
ino: ino.into(),
size: Self::get_size_from_drive_metadata(metadata).unwrap_or(0),
blocks: 0,
atime: metadata.viewed_by_me_time.map(SystemTime::from).unwrap_or(UNIX_EPOCH),
mtime: metadata.modified_time.map(SystemTime::from).unwrap_or(UNIX_EPOCH),
atime: metadata
.viewed_by_me_time
.map(SystemTime::from)
.unwrap_or(UNIX_EPOCH),
mtime: metadata
.modified_time
.map(SystemTime::from)
.unwrap_or(UNIX_EPOCH),
ctime: SystemTime::now(),
crtime: metadata.created_time.map(SystemTime::from).unwrap_or(UNIX_EPOCH),
crtime: metadata
.created_time
.map(SystemTime::from)
.unwrap_or(UNIX_EPOCH),
kind,
perm: permissions,
nlink: 1,
@@ -434,8 +489,9 @@ impl DriveFilesystem {
let drive_id = entry.drive_id.clone();
let drive = &self.source;
let cache_path = self.get_cache_path_for_entry(&entry)?;
let folder = cache_path.parent()
.ok_or(anyhow!("could not get the folder the cache file should be saved in"))?;
let folder = cache_path.parent().ok_or(anyhow!(
"could not get the folder the cache file should be saved in"
))?;
if !folder.exists() {
debug!("creating folder: {}", folder.display());
std::fs::create_dir_all(folder)?;
@@ -469,7 +525,10 @@ impl DriveFilesystem {
let entry = self.entries.get_mut(drive_id);
if let Some(entry) = entry {
debug!("updating entry metadata: {}, {:?} entry: {:?}",entry.ino, entry.md5_checksum, entry);
debug!(
"updating entry metadata: {}, {:?} entry: {:?}",
entry.ino, entry.md5_checksum, entry
);
let change_successful = Self::update_entry_metadata(file, entry);
if let Err(e) = change_successful {
warn!("got an err while update entry metadata: {}", e);
@@ -478,7 +537,6 @@ impl DriveFilesystem {
}
}
updated_entries.push(change.drive_id);
debug!("processed change");
continue;
@@ -530,7 +588,11 @@ impl DriveFilesystem {
Ok(path)
}
fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display());
debug!(
"construct_cache_path_for_entry: {} with cache_dir: {}",
entry.ino,
cache_dir.display()
);
let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name);
debug!(
"get_cache_path_for_entry: {}: {}",
@@ -540,7 +602,11 @@ impl DriveFilesystem {
path
}
fn set_entry_metadata_with_ino(&mut self, ino: impl Into<DriveId>, drive_metadata: File) -> anyhow::Result<()> {
fn set_entry_metadata_with_ino(
&mut self,
ino: impl Into<DriveId>,
drive_metadata: File,
) -> anyhow::Result<()> {
let entry = self.get_entry_mut(ino).context("no entry with ino")?;
Self::update_entry_metadata(drive_metadata, entry)
@@ -567,26 +633,39 @@ impl DriveFilesystem {
let checksum_mismatch = Self::compare_checksums(&drive_metadata.md5_checksum, &entry);
match checksum_mismatch {
ChecksumMatch::Missing | ChecksumMatch::Unknown | ChecksumMatch::RemoteMismatch => {
debug!("md5_checksum mismatch: {:?} != {:?}", drive_metadata.md5_checksum, entry.md5_checksum);
debug!(
"md5_checksum mismatch: {:?} != {:?}",
drive_metadata.md5_checksum, entry.md5_checksum
);
entry.set_md5_checksum(drive_metadata.md5_checksum);
entry.has_upstream_content_changes = true;
debug!("updated md5_checksum of {} to: {:?}", entry.ino, &entry.md5_checksum);
debug!(
"updated md5_checksum of {} to: {:?}",
entry.ino, &entry.md5_checksum
);
}
ChecksumMatch::Match => {
debug!("md5_checksum match: {:?} == {:?}", drive_metadata.md5_checksum, &entry.md5_checksum);
debug!(
"md5_checksum match: {:?} == {:?}",
drive_metadata.md5_checksum, &entry.md5_checksum
);
entry.has_upstream_content_changes = false;
}
ChecksumMatch::CacheMismatch => {
debug!("the local checksum and the remote checksum match,\
so we can assume the local changes have just been uploaded to the remote");
debug!(
"the local checksum and the remote checksum match,\
so we can assume the local changes have just been uploaded to the remote"
);
entry.has_upstream_content_changes = false;
}
ChecksumMatch::LocalMismatch => {
debug!("the local checksum does not match the remote or the cached \
checksum, this means the local file has been modified");
debug!(
"the local checksum does not match the remote or the cached \
checksum, this means the local file has been modified"
);
entry.has_upstream_content_changes = false;
}
@@ -624,8 +703,14 @@ impl DriveFilesystem {
debug!("md5_checksum match: (r) == (l) == (c): {:?} ", md5_checksum);
return ChecksumMatch::Match;
}
if md5_checksum != &entry.local_md5_checksum && md5_checksum != &entry.md5_checksum && entry.local_md5_checksum != entry.md5_checksum {
debug!("md5_checksum match: {:?} (r) != {:?} (l) != {:?} (c)", md5_checksum, entry.local_md5_checksum, entry.md5_checksum);
if md5_checksum != &entry.local_md5_checksum
&& md5_checksum != &entry.md5_checksum
&& entry.local_md5_checksum != entry.md5_checksum
{
debug!(
"md5_checksum match: {:?} (r) != {:?} (l) != {:?} (c)",
md5_checksum, entry.local_md5_checksum, entry.md5_checksum
);
return ChecksumMatch::Conflict;
}
@@ -638,7 +723,10 @@ impl DriveFilesystem {
return ChecksumMatch::CacheMismatch;
}
if &entry.local_md5_checksum == &entry.md5_checksum {
debug!("md5_checksum match: (l) == (c): {:?} ", entry.local_md5_checksum);
debug!(
"md5_checksum match: (l) == (c): {:?} ",
entry.local_md5_checksum
);
return ChecksumMatch::RemoteMismatch;
}
warn!("how could I get here?");
@@ -676,7 +764,10 @@ impl DriveFilesystem {
.collect();
self.last_checked_changes = SystemTime::now();
debug!("checked for changes, found {} changes", changes.as_ref().unwrap_or(&Vec::<Change>::new()).len());
debug!(
"checked for changes, found {} changes",
changes.as_ref().unwrap_or(&Vec::<Change>::new()).len()
);
changes
}
fn remove_entry(&mut self, id: &DriveId) -> anyhow::Result<()> {
@@ -761,7 +852,8 @@ impl Filesystem for DriveFilesystem {
#[instrument(fields(% self))]
fn destroy(&mut self) {
debug!("destroy");
let stop_res = run_async_blocking(self.file_uploader_sender.send(FileUploaderCommand::Stop));
let stop_res =
run_async_blocking(self.file_uploader_sender.send(FileUploaderCommand::Stop));
if let Err(e) = stop_res {
error!("could not send stop command to file uploader: {}", e);
}
@@ -780,14 +872,20 @@ impl Filesystem for DriveFilesystem {
let parent = parent.into();
let parent_drive_id = self.get_drive_id_from_ino(&parent);
if parent_drive_id.is_err() {
warn!("lookup: could not get drive_id for {}: {:?}", parent, parent_drive_id);
warn!(
"lookup: could not get drive_id for {}: {:?}",
parent, parent_drive_id
);
reply.error(libc::ENOENT);
return;
}
let parent_drive_id = parent_drive_id.unwrap();
let children = self.children.get(&parent_drive_id);
if children.is_none() {
warn!("lookup: could not find children for {}: {}", parent, parent_drive_id);
warn!(
"lookup: could not find children for {}: {}",
parent, parent_drive_id
);
for (id, entry) in self.entries.iter() {
debug!("entry: {:<40} => {:?}", id.to_string(), entry);
}
@@ -808,7 +906,7 @@ impl Filesystem for DriveFilesystem {
let accepted = name.eq_ignore_ascii_case(&path);
debug!(
"entry: {}:(accepted={}),{:?}; {:?}; {:?}",
child_inode, accepted, entry.md5_checksum ,path, entry.attr
child_inode, accepted, entry.md5_checksum, path, entry.attr
);
if accepted {
reply.entry(&self.settings.time_to_live(), &entry.attr, self.generation);
@@ -994,28 +1092,31 @@ impl Filesystem for DriveFilesystem {
let mut buf = vec![0; size as usize];
debug!("reading file: {:?} at {} with size {}", &path, offset, size);
file.read_at(&mut buf, offset as u64).unwrap();
file.read_exact_at(&mut buf, offset as u64).unwrap();
debug!("read file: {:?} at {}", &path, offset);
reply.data(&buf);
}
//endregion
//region write
#[instrument(skip(_req, reply), fields(% self, data = data.len()))]
fn write(&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
offset: i64,
data: &[u8],
write_flags: u32,
flags: i32,
lock_owner: Option<u64>,
reply: ReplyWrite) {
fn write(
&mut self,
_req: &Request<'_>,
ino: u64,
fh: u64,
offset: i64,
data: &[u8],
write_flags: u32,
flags: i32,
lock_owner: Option<u64>,
reply: ReplyWrite,
) {
debug!(
"write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}",
ino, fh, offset, flags, lock_owner, write_flags, data,
);
let cache_update_success: Result<bool> = run_async_blocking(self.update_cache_if_needed(ino));
let cache_update_success: Result<bool> =
run_async_blocking(self.update_cache_if_needed(ino));
match cache_update_success {
Err(e) => {
error!("write: could not update cache: {}", e);
@@ -1031,9 +1132,7 @@ impl Filesystem for DriveFilesystem {
}
}
}
let cache_dir = self.cache_dir
.as_ref()
.map(|s| s.to_path_buf());
let cache_dir = self.cache_dir.as_ref().map(|s| s.to_path_buf());
if let None = cache_dir {
error!("write: cache dir not set");
reply.error(libc::ENOENT);
@@ -1074,18 +1173,25 @@ impl Filesystem for DriveFilesystem {
}
let mut file = file.unwrap();
debug!("writing file: {:?} at {} with size {}",
debug!(
"writing file: {:?} at {} with size {}",
&path,
offset,
data.len()
);
);
file.seek(SeekFrom::Start(offset as u64)).unwrap();
file.write_all(data).unwrap();
let size = data.len();
debug!("wrote file: {:?} at {}; wrote {} bytes", &path, offset, size);
debug!(
"wrote file: {:?} at {}; wrote {} bytes",
&path, offset, size
);
reply.written(size as u32);
debug!("updating size to {} for entry: {:?}", entry.attr.size, entry);
debug!(
"updating size to {} for entry: {:?}",
entry.attr.size, entry
);
let mut attr = &mut entry.attr;
if truncate {
attr.size = size as u64;
@@ -1095,9 +1201,15 @@ impl Filesystem for DriveFilesystem {
let now = SystemTime::now();
attr.mtime = now;
attr.ctime = now;
debug!("updated size to {} for entry: {:?}", entry.attr.size, entry);
debug!(
"updated size to {} for entry: {:?}",
entry.attr.size, entry
);
entry.local_md5_checksum = Self::compute_md5_checksum(&path);
debug!("updated local md5 to {:?} for entry: {:?}", entry.local_md5_checksum, entry);
debug!(
"updated local md5 to {:?} for entry: {:?}",
entry.local_md5_checksum, entry
);
debug!("write done for entry: {:?}", entry);
}
@@ -1107,8 +1219,9 @@ impl Filesystem for DriveFilesystem {
return;
}
let drive_id = drive_id.unwrap();
let entry = self.get_entry_r(drive_id)
.expect("how could this happen to me. I swear it was there a second ago");
let entry = self
.get_entry_r(drive_id)
.expect("how could this happen to me. I swear it was there a second ago");
let schedule_res = run_async_blocking(self.schedule_upload(&entry));
if let Err(e) = schedule_res {
error!("read: could not schedule the upload: {}", e);
@@ -1194,7 +1307,6 @@ impl Filesystem for DriveFilesystem {
}
//endregion
//TODOs:
// TODO: implement rename/move
// TODO: implement create

View File

@@ -4,8 +4,8 @@ pub use file_uploader::*;
pub use filesystem::*;
pub use settings::*;
mod change;
mod entry;
mod filesystem;
mod file_uploader;
mod filesystem;
mod settings;
mod change;

View File

@@ -43,8 +43,11 @@ impl SyncSettings {
// endregion
impl Display for SyncSettings {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SyncSettings {{ ttl: {}s, cache_time: {}s }}",
self.time_to_live.as_secs(),
self.cache_time.as_secs())
write!(
f,
"SyncSettings {{ ttl: {}s, cache_time: {}s }}",
self.time_to_live.as_secs(),
self.cache_time.as_secs()
)
}
}

View File

@@ -5,17 +5,17 @@ use std::path::{Path, PathBuf};
// use drive3::api::Scope::File;
use anyhow::{anyhow, Context};
use drive3::{hyper_rustls, oauth2};
use drive3::api::{Change, File, Scope, StartPageToken};
use drive3::DriveHub;
use drive3::hyper::{Body, Response};
use drive3::hyper::client::HttpConnector;
use drive3::hyper::{Body, Response};
use drive3::hyper_rustls::HttpsConnector;
use drive3::DriveHub;
use drive3::{hyper_rustls, oauth2};
use hyper::Client;
use tokio::fs;
use tracing::{debug, error, instrument, trace, warn};
use crate::google_drive::{DriveId, helpers};
use crate::google_drive::{helpers, DriveId};
use crate::prelude::*;
const FIELDS_FILE: &str = "id, name, size, mimeType, kind, md5Checksum, parents,trashed, createdTime, modifiedTime, viewedByMeTime";
@@ -32,21 +32,15 @@ impl GoogleDrive {
let mut page_token: Option<String> = None;
loop {
debug!("list_files: page_token: {:?}", page_token);
let mut request =
self
.hub
.files()
.list()
.param(
"fields",
&format!("nextPageToken, files({})", FIELDS_FILE),
);
let mut request = self
.hub
.files()
.list()
.param("fields", &format!("nextPageToken, files({})", FIELDS_FILE));
if let Some(page_token) = page_token {
request = request.page_token(&page_token);
}
let (_response, result) = request
.doit()
.await?;
let (_response, result) = request.doit().await?;
let result_files = result.files.ok_or(anyhow!("no file list returned"))?;
debug!("list_files: response: {:?}", result_files.len());
files.extend(result_files);
@@ -62,35 +56,47 @@ impl GoogleDrive {
impl GoogleDrive {
#[instrument]
pub(crate) async fn get_start_page_token(&self) -> anyhow::Result<StartPageToken> {
let (_response, start_page_token) = self.hub.changes().get_start_page_token().doit().await?;
let (_response, start_page_token) =
self.hub.changes().get_start_page_token().doit().await?;
Ok(start_page_token)
}
}
impl GoogleDrive {
#[instrument]
pub(crate) async fn get_changes_since(&self, start_page_token: &mut StartPageToken) -> anyhow::Result<Vec<Change>> {
pub(crate) async fn get_changes_since(
&self,
start_page_token: &mut StartPageToken,
) -> anyhow::Result<Vec<Change>> {
let mut changes = vec![];
let mut page_token: Option<String> = None;
loop {
debug!("getting changes since {:?} page: {:?}", start_page_token, page_token);
debug!(
"getting changes since {:?} page: {:?}",
start_page_token, page_token
);
let file_spec = &format!("file({})", FIELDS_FILE);
let mut request = self
.hub
.changes()
.list(&start_page_token
.start_page_token
.as_ref()
.context("no start_page_token")?)
.param("fields", &format!("changes({}, changeType, removed, fileId, \
driveId, drive, time), newStartPageToken, nextPageToken", file_spec));
.list(
&start_page_token
.start_page_token
.as_ref()
.context("no start_page_token")?,
)
.param(
"fields",
&format!(
"changes({}, changeType, removed, fileId, \
driveId, drive, time), newStartPageToken, nextPageToken",
file_spec
),
);
if let Some(page_token) = &page_token {
request = request.page_token(page_token);
}
let response = request
.doit()
.await
.context("could not get changes");
let response = request.doit().await.context("could not get changes");
if let Err(e) = &response {
error!("error getting changes: {:?}", e);
return Err(anyhow!("error getting changes: {:?}", e));
@@ -122,7 +128,8 @@ impl GoogleDrive {
.files()
.get(&drive_id)
.param("fields", &FIELDS_FILE)
.doit().await?;
.doit()
.await?;
Ok(file)
}
@@ -130,7 +137,11 @@ impl GoogleDrive {
impl GoogleDrive {
#[instrument(skip(file), fields(file_name = file.name, file_id = file.drive_id))]
pub async fn upload_file_content_from_path(&self, file: File, path: &Path) -> anyhow::Result<()> {
pub async fn upload_file_content_from_path(
&self,
file: File,
path: &Path,
) -> anyhow::Result<()> {
update_file_content_on_drive_from_path(&self, file, path).await?;
Ok(())
}
@@ -169,7 +180,7 @@ impl GoogleDrive {
Some(parent_drive_id) => parent_drive_id,
None => DriveId::from("root"),
}
.into();
.into();
let parent_drive_id = match parent_drive_id.into_string() {
Ok(parent_drive_id) => parent_drive_id,
Err(_) => return Err(anyhow!("invalid parent_drive_id")),
@@ -226,9 +237,9 @@ impl GoogleDrive {
auth,
oauth2::InstalledFlowReturnMethod::HTTPRedirect,
)
.persist_tokens_to_disk("auth/tokens.json")
.build()
.await?;
.persist_tokens_to_disk("auth/tokens.json")
.build()
.await?;
let http_client = Client::builder().build(
hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
@@ -264,10 +275,7 @@ impl GoogleDrive {
.hub
.files()
.list()
.param(
"fields",
&format!("nextPageToken, files({})", FIELDS_FILE),
)
.param("fields", &format!("nextPageToken, files({})", FIELDS_FILE))
// .page_token(page_token.as_ref().map(String::as_str))
.q(format!("'{}' in parents", folder_id).as_str())
.doit()
@@ -465,7 +473,10 @@ pub async fn update_file_content_on_drive_from_path(
file: google_drive3::api::File,
source_path: &Path,
) -> anyhow::Result<()> {
debug!("update_file_content_on_drive_from_path(): source_path: {:?}", source_path);
debug!(
"update_file_content_on_drive_from_path(): source_path: {:?}",
source_path
);
// {
// debug!("reading content from file for testing");
// let content = std::fs::File::open(source_path)?;
@@ -487,7 +498,10 @@ async fn update_file_content_on_drive(
) -> anyhow::Result<()> {
let stream = content.into_std().await;
let mime_type = helpers::get_mime_from_file_metadata(&file)?;
let id = file.drive_id.clone().with_context(|| "file metadata has no drive id")?;
let id = file
.drive_id
.clone()
.with_context(|| "file metadata has no drive id")?;
debug!("starting upload");
let (response, file) = drive
.hub

View File

@@ -24,7 +24,10 @@ impl TryFrom<OsString> for DriveId {
fn try_from(value: OsString) -> anyhow::Result<Self> {
let result = value.into_string();
if let Err(e) = result {
return Err(anyhow::anyhow!("Failed to convert OsString to String: {:?}", e));
return Err(anyhow::anyhow!(
"Failed to convert OsString to String: {:?}",
e
));
}
Ok(DriveId::new(result.unwrap()))
}

View File

@@ -7,4 +7,3 @@ mod helpers;
mod drive;
mod drive_id;

View File

@@ -2,7 +2,6 @@
extern crate google_drive3 as drive3;
use std::path::Path;
use std::time::Duration;
@@ -19,15 +18,15 @@ use tracing::{debug, info};
use prelude::*;
use crate::config::common_file_filter::CommonFileFilter;
use crate::fs::drive::{DriveFilesystem, DriveFileUploader, FileUploaderCommand, SyncSettings};
use crate::fs::drive::{DriveFileUploader, DriveFilesystem, FileUploaderCommand, SyncSettings};
use crate::google_drive::GoogleDrive;
pub mod async_helper;
pub mod common;
pub mod config;
pub mod fs;
pub mod google_drive;
pub mod prelude;
pub mod config;
pub async fn sample_drive_fs() -> Result<()> {
let mountpoint = "/tmp/fuse/3";
@@ -41,10 +40,12 @@ pub async fn sample_drive_fs() -> Result<()> {
let drive = GoogleDrive::new().await?;
// let file_uploader = FileUploader::new("config/credentials.json", "config/token.json");
let (file_uploader_sender, file_uploader_receiver) = mpsc::channel(1);
let mut file_uploader = DriveFileUploader::new(drive.clone(),
upload_ignore,
file_uploader_receiver,
Duration::from_secs(3));
let mut file_uploader = DriveFileUploader::new(
drive.clone(),
upload_ignore,
file_uploader_receiver,
Duration::from_secs(3),
);
debug!("Mounting fuse filesystem at {}", mountpoint);
let fs = DriveFilesystem::new(
Path::new(""),
@@ -52,12 +53,16 @@ pub async fn sample_drive_fs() -> Result<()> {
drive,
cache_dir.into_path(),
sync_settings,
).await?;
)
.await?;
let mount_options = vec![MountOption::RW];
let uploader_handle: JoinHandle<()> = tokio::spawn(async move { file_uploader.listen().await; });
let end_signal_handle: JoinHandle<()> = mount(fs, &mountpoint, &mount_options, file_uploader_sender).await?;
let uploader_handle: JoinHandle<()> = tokio::spawn(async move {
file_uploader.listen().await;
});
let end_signal_handle: JoinHandle<()> =
mount(fs, &mountpoint, &mount_options, file_uploader_sender).await?;
tokio::try_join!(uploader_handle, end_signal_handle)?;
debug!("Exiting gracefully...");
@@ -76,25 +81,31 @@ fn get_cache_dir() -> Result<TempDir> {
Ok(cache_dir)
}
async fn mount(fs: DriveFilesystem,
mountpoint: &str,
options: &[MountOption],
sender: Sender<FileUploaderCommand>) -> Result<JoinHandle<()>> {
async fn mount(
fs: DriveFilesystem,
mountpoint: &str,
options: &[MountOption],
sender: Sender<FileUploaderCommand>,
) -> Result<JoinHandle<()>> {
let mut session = Session::new(fs, mountpoint.as_ref(), options)?;
let session_ender = session.unmount_callable();
let end_program_signal_handle = tokio::spawn(async move {
let _ = end_program_signal_awaiter(sender, session_ender).await;
});
debug!("Mounting fuse filesystem" );
debug!("Mounting fuse filesystem");
let _ = session.run();
debug!("Stopped with mounting");
// Ok(session_ender)
Ok(end_program_signal_handle)
}
async fn end_program_signal_awaiter(file_uploader_sender: Sender<FileUploaderCommand>,
mut session_unmounter: SessionUnmounter) -> Result<()> {
tokio::signal::ctrl_c().await.expect("failed to listen for ctrl_c event");
async fn end_program_signal_awaiter(
file_uploader_sender: Sender<FileUploaderCommand>,
mut session_unmounter: SessionUnmounter,
) -> Result<()> {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for ctrl_c event");
info!("got signal to end program");
file_uploader_sender.send(FileUploaderCommand::Stop).await?;

View File

@@ -44,7 +44,10 @@ async fn sample_logging() {
use tracing::{debug, error, info, trace, warn};
info!("info");
debug!("debug");
let s = span!(tracing::Level::TRACE, "span around trace and warn with stdin read");
let s = span!(
tracing::Level::TRACE,
"span around trace and warn with stdin read"
);
{
let _x = s.enter();
trace!("trace");

View File

@@ -1,3 +1,3 @@
pub use anyhow::Result;
pub use drive3::api::File as DriveFileMetadata;
pub use drive3::api::File as DriveFileMetadata;