diff --git a/src/fs/common.rs b/src/fs/common.rs index f90e4aa..c4fbd02 100644 --- a/src/fs/common.rs +++ b/src/fs/common.rs @@ -114,6 +114,9 @@ pub trait CommonFilesystem { fn get_entry(&self, ino: impl Into) -> Option<&Entry> { self.get_entries().get(&ino.into()) } + fn get_entry_mut(&mut self, ino: impl Into) -> Option<&mut Entry> { + self.get_entries_mut().get_mut(&ino.into()) + } fn get_entry_r(&self, ino: impl Into) -> Result<&Entry> { self.get_entries() .get(&ino.into()) diff --git a/src/fs/drive/mod.rs b/src/fs/drive/mod.rs index 689954c..7a44072 100644 --- a/src/fs/drive/mod.rs +++ b/src/fs/drive/mod.rs @@ -28,6 +28,7 @@ use std::{ path::{Path, PathBuf}, time::{Duration, SystemTime, UNIX_EPOCH}, }; +use std::io::{Seek, SeekFrom, Write}; use tempfile::TempDir; use tokio::{ io::{stdin, AsyncBufReadExt}, @@ -132,21 +133,26 @@ impl DriveFilesystem { cache_dir.path().display() ); let entry = self - .entries - .get(&inode) + .get_entry(inode) .ok_or(anyhow!("could not get entry"))?; debug!( "get_cache_dir_for_file: entry local_path: {}", entry.local_path.display() ); + let path = Self::construct_cache_folder_path(cache_dir.path(), entry); + debug!("get_cache_dir_for_file: {}: {}", inode, path.display()); + Ok(path) + } + + fn construct_cache_folder_path(cache_dir: &Path, entry: &DriveEntry) -> PathBuf { let folder_path = match entry.local_path.parent() { Some(p) => p.as_os_str(), None => OsStr::new(""), }; - debug!("get_cache_dir_for_file: folder_path: {:?}", folder_path); - let path = cache_dir.path().join(folder_path); - debug!("get_cache_dir_for_file: {}: {}", inode, path.display()); - Ok(path) + debug!("construct_cache_folder_path: folder_path: {:?}", folder_path); + let path = cache_dir.join(folder_path); + debug!("construct_cache_folder_path: {}", path.display()); + path } #[async_recursion::async_recursion] async fn add_dir_entry( @@ -300,16 +306,24 @@ impl DriveFilesystem { Ok(exists) } - fn get_cache_path_for_entry(&self, entry: &&DriveEntry) -> Result { + fn get_cache_path_for_entry(&self, entry: &DriveEntry) -> Result { debug!("get_cache_path_for_entry: {}", entry.ino); - let folder = self.get_cache_dir_for_file(entry.ino)?; - let path = folder.join(&entry.name); + let cache_folder = match self.cache_dir.as_ref() { + Some(x) => x.path() , + None => return Err(anyhow!("cache_dir is None").into()), + }; + let path = Self::construct_cache_path_for_entry(&cache_folder, entry); + Ok(path) + } + fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf { + debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display()); + let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name); debug!( "get_cache_path_for_entry: {}: {}", entry.ino, path.display() ); - Ok(path) + path } } @@ -477,6 +491,144 @@ impl Filesystem for DriveFilesystem { } } //endregion + //region setattr + fn setattr( + &mut self, + _req: &Request<'_>, + ino: u64, + mode: Option, + uid: Option, + gid: Option, + size: Option, + _atime: Option, + _mtime: Option, + _ctime: Option, + /*TODO: check if this change need to be implemented*/ + fh: Option, + _crtime: Option, + /*TODO: check if this change need to be implemented*/ + _chgtime: Option, + /*TODO: check if this change need to be implemented*/ + _bkuptime: Option, + flags: Option, + reply: ReplyAttr, + ) { + // debug!("setattr: {}", ino); + + debug!( + "setattr: {}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}", + ino, + mode, + uid, + gid, + size, + _atime, + _mtime, + _ctime, + fh, + _crtime, + _chgtime, + _bkuptime, + flags + ); + let ttl = self.time_to_live.clone(); + let entry = self.get_entry_mut(ino); + if let None = entry { + error!("setattr: could not find entry for {}", ino); + reply.error(libc::ENOENT); + return; + } + let mut entry = entry.unwrap(); + let attr = &mut entry.attr; + + if let Some(mode) = mode { + attr.perm = mode as u16; + } + if let Some(uid) = uid { + attr.uid = uid; + } + if let Some(gid) = gid { + attr.gid = gid; + } + if let Some(size) = size { + attr.size = size; + } + if let Some(flags) = flags { + attr.flags = flags; + } + reply.attr(&ttl, &attr); + //TODO: update file on drive if necessary + } + //endregion + //region write + fn write(&mut self, _req: &Request<'_>, ino: u64, fh: u64, offset: i64, data: &[u8], write_flags: u32, flags: i32, lock_owner: Option, reply: ReplyWrite) { + debug!( + "write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}", + ino, fh, offset, flags, lock_owner, write_flags, data, + ); + let cache_dir = self.cache_dir + .as_ref() + .map(|c| c.path().to_path_buf()) + .clone(); + if let None = cache_dir { + error!("write: cache dir not set"); + reply.error(libc::ENOENT); + return; + } + let cache_dir = cache_dir.unwrap(); + let entry = self.get_entry_mut(&ino.into()); + if let None = entry { + error!("write: could not find entry for {}", ino); + reply.error(libc::ENOENT); + return; + } + let mut entry = entry.unwrap(); + //TODO: queue uploads on a separate thread + + let path = Self::construct_cache_path_for_entry(&cache_dir, &entry); + // let path = entry.local_path.to_path_buf(); + debug!("opening file: {:?}", &path); + let file = OpenOptions::new() + .truncate(false) + .create(true) + .write(true) + .open(&path); + if let Err(e) = file { + error!("write: could not open file: {:?}: {}", path, e); + reply.error(libc::ENOENT); + return; + } + let mut file = file.unwrap(); + + debug!("writing file: {:?} at {} with size {}", + &path, + offset, + data.len() + ); + + file.seek(SeekFrom::Start(offset as u64)).unwrap(); + file.write_all(data).unwrap(); + let size = data.len(); + // let size = file.write_at(data, offset as u64); + // if let Err(e) = size { + // error!("write: could not write file: {:?}: {}", path, e); + // reply.error(libc::ENOENT); + // return; + // } + // let size = size.unwrap(); + debug!("wrote file: {:?} at {}; wrote {} bits", &path, offset, size); + reply.written(size as u32); + //TODO: update size in entry if necessary + debug!("updating size to {} for entry: {:?}", entry.attr.size, entry); + let mut attr = &mut entry.attr; + attr.size = attr.size.max(offset as u64 + size as u64); + let now = SystemTime::now(); + attr.mtime = now; + attr.ctime = now; + debug!("updated size to {} for entry: {:?}", entry.attr.size, entry); + debug!("write done for entry: {:?}", entry); + } + //endregion //region read fn read( &mut self, diff --git a/src/fs/sample.rs b/src/fs/sample.rs index 84d7c21..f0d228d 100644 --- a/src/fs/sample.rs +++ b/src/fs/sample.rs @@ -70,6 +70,7 @@ impl CommonEntry for SampleEntry { &self.attr } } + #[derive(Debug, Default)] pub struct SampleFilesystem { /// the point where the filesystem is mounted @@ -89,6 +90,7 @@ pub struct SampleFilesystem { /// when the filesystem is remounted generation: u64, } + impl SampleFilesystem { pub fn new(root: impl AsRef, source: impl AsRef) -> Self { debug!("new: {:?}; {:?}", root.as_ref(), source.as_ref()); @@ -132,6 +134,7 @@ impl SampleFilesystem { } } } + #[async_trait::async_trait] impl CommonFilesystem for SampleFilesystem { fn get_entries(&self) -> &HashMap { @@ -185,6 +188,7 @@ impl CommonFilesystem for SampleFilesystem { Ok(ino) } } + impl SampleFilesystem { async fn add_dir_entry( &mut self, @@ -221,8 +225,7 @@ impl SampleFilesystem { } else if metadata.is_file() { let mode = metadata.mode(); let size = metadata.size(); - //TODO: async call - // self.add_file_entry(ino, name.as_os_str(), mode as u16, size); + self.add_file_entry(ino, name.as_os_str(), mode as u16, size).await; } } } @@ -242,9 +245,13 @@ impl fuser::Filesystem for SampleFilesystem { // self.add_file_entry(1, "hello.txt".as_ref(), 0o644); let source = self.source.clone(); + debug!("init: add_dir_entry"); run_async_blocking(async { + debug!("init: add_dir_entry (async)"); self.add_dir_entry(&source, FUSE_ROOT_ID, true).await; + debug!("init: add_dir_entry done (async)"); }); + debug!("init: add_dir_entry done"); // self.add_dir_entry(&source, FUSE_ROOT_ID, true); Ok(()) } @@ -286,8 +293,7 @@ impl fuser::Filesystem for SampleFilesystem { return; } } - if !(children.is_none()) { - } else { + if !(children.is_none()) {} else { reply.error(libc::ENOENT); return; } @@ -299,7 +305,8 @@ impl fuser::Filesystem for SampleFilesystem { let path: PathBuf = entry.local_path.clone().into(); let attr = entry.attr; let inode = (*child_inode).into(); - offset += 1; // Increment the offset for each processed entry + // Increment the offset for each processed entry + offset += 1; debug!("entry: {}:{:?}; {:?}", inode, path, attr); if !reply.add(inode, offset, attr.kind, path) { break; diff --git a/src/main.rs b/src/main.rs index a18cca7..28fccb6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,11 @@ #[tokio::main] async fn main() { drive_syncer::init_logger(); - // use tokio::runtime::Runtime; - - // let rt = Runtime::new().unwrap(); - // let filesystem_runtime = Runtime::new().unwrap(); - // - // let handle = rt.handle(); - // handle.block_on(async { // drive_syncer::sample().await.unwrap(); - // drive_syncer::sample_fs().await.unwrap(); // drive_syncer::google_drive::sample().await.unwrap(); // drive_syncer::watch_file_reading().await.unwrap(); // drive_syncer::sample_nix().await.unwrap(); + + // drive_syncer::sample_fs().await.unwrap(); drive_syncer::sample_drive_fs().await.unwrap(); - // }); - // RUNTIME.block_on(async { - // //test - // }); }