implement writing (only to local cache) & fix some weird behaviour

This commit is contained in:
OMGeeky
2023-05-15 18:29:44 +02:00
parent b600cbb94c
commit baa94b2053
4 changed files with 179 additions and 27 deletions

View File

@@ -114,6 +114,9 @@ pub trait CommonFilesystem<Entry: CommonEntry> {
fn get_entry(&self, ino: impl Into<Inode>) -> Option<&Entry> {
self.get_entries().get(&ino.into())
}
fn get_entry_mut(&mut self, ino: impl Into<Inode>) -> Option<&mut Entry> {
self.get_entries_mut().get_mut(&ino.into())
}
fn get_entry_r(&self, ino: impl Into<Inode>) -> Result<&Entry> {
self.get_entries()
.get(&ino.into())

View File

@@ -28,6 +28,7 @@ use std::{
path::{Path, PathBuf},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use std::io::{Seek, SeekFrom, Write};
use tempfile::TempDir;
use tokio::{
io::{stdin, AsyncBufReadExt},
@@ -132,21 +133,26 @@ impl DriveFilesystem {
cache_dir.path().display()
);
let entry = self
.entries
.get(&inode)
.get_entry(inode)
.ok_or(anyhow!("could not get entry"))?;
debug!(
"get_cache_dir_for_file: entry local_path: {}",
entry.local_path.display()
);
let path = Self::construct_cache_folder_path(cache_dir.path(), entry);
debug!("get_cache_dir_for_file: {}: {}", inode, path.display());
Ok(path)
}
fn construct_cache_folder_path(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
let folder_path = match entry.local_path.parent() {
Some(p) => p.as_os_str(),
None => OsStr::new(""),
};
debug!("get_cache_dir_for_file: folder_path: {:?}", folder_path);
let path = cache_dir.path().join(folder_path);
debug!("get_cache_dir_for_file: {}: {}", inode, path.display());
Ok(path)
debug!("construct_cache_folder_path: folder_path: {:?}", folder_path);
let path = cache_dir.join(folder_path);
debug!("construct_cache_folder_path: {}", path.display());
path
}
#[async_recursion::async_recursion]
async fn add_dir_entry(
@@ -300,16 +306,24 @@ impl DriveFilesystem {
Ok(exists)
}
fn get_cache_path_for_entry(&self, entry: &&DriveEntry) -> Result<PathBuf> {
fn get_cache_path_for_entry(&self, entry: &DriveEntry) -> Result<PathBuf> {
debug!("get_cache_path_for_entry: {}", entry.ino);
let folder = self.get_cache_dir_for_file(entry.ino)?;
let path = folder.join(&entry.name);
let cache_folder = match self.cache_dir.as_ref() {
Some(x) => x.path() ,
None => return Err(anyhow!("cache_dir is None").into()),
};
let path = Self::construct_cache_path_for_entry(&cache_folder, entry);
Ok(path)
}
fn construct_cache_path_for_entry(cache_dir: &Path, entry: &DriveEntry) -> PathBuf {
debug!("construct_cache_path_for_entry: {} with cache_dir: {}", entry.ino, cache_dir.display());
let path = Self::construct_cache_folder_path(cache_dir, entry).join(&entry.name);
debug!(
"get_cache_path_for_entry: {}: {}",
entry.ino,
path.display()
);
Ok(path)
path
}
}
@@ -477,6 +491,144 @@ impl Filesystem for DriveFilesystem {
}
}
//endregion
//region setattr
fn setattr(
&mut self,
_req: &Request<'_>,
ino: u64,
mode: Option<u32>,
uid: Option<u32>,
gid: Option<u32>,
size: Option<u64>,
_atime: Option<TimeOrNow>,
_mtime: Option<TimeOrNow>,
_ctime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
fh: Option<u64>,
_crtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_chgtime: Option<SystemTime>,
/*TODO: check if this change need to be implemented*/
_bkuptime: Option<SystemTime>,
flags: Option<u32>,
reply: ReplyAttr,
) {
// debug!("setattr: {}", ino);
debug!(
"setattr: {}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}:{:?}",
ino,
mode,
uid,
gid,
size,
_atime,
_mtime,
_ctime,
fh,
_crtime,
_chgtime,
_bkuptime,
flags
);
let ttl = self.time_to_live.clone();
let entry = self.get_entry_mut(ino);
if let None = entry {
error!("setattr: could not find entry for {}", ino);
reply.error(libc::ENOENT);
return;
}
let mut entry = entry.unwrap();
let attr = &mut entry.attr;
if let Some(mode) = mode {
attr.perm = mode as u16;
}
if let Some(uid) = uid {
attr.uid = uid;
}
if let Some(gid) = gid {
attr.gid = gid;
}
if let Some(size) = size {
attr.size = size;
}
if let Some(flags) = flags {
attr.flags = flags;
}
reply.attr(&ttl, &attr);
//TODO: update file on drive if necessary
}
//endregion
//region write
fn write(&mut self, _req: &Request<'_>, ino: u64, fh: u64, offset: i64, data: &[u8], write_flags: u32, flags: i32, lock_owner: Option<u64>, reply: ReplyWrite) {
debug!(
"write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}",
ino, fh, offset, flags, lock_owner, write_flags, data,
);
let cache_dir = self.cache_dir
.as_ref()
.map(|c| c.path().to_path_buf())
.clone();
if let None = cache_dir {
error!("write: cache dir not set");
reply.error(libc::ENOENT);
return;
}
let cache_dir = cache_dir.unwrap();
let entry = self.get_entry_mut(&ino.into());
if let None = entry {
error!("write: could not find entry for {}", ino);
reply.error(libc::ENOENT);
return;
}
let mut entry = entry.unwrap();
//TODO: queue uploads on a separate thread
let path = Self::construct_cache_path_for_entry(&cache_dir, &entry);
// let path = entry.local_path.to_path_buf();
debug!("opening file: {:?}", &path);
let file = OpenOptions::new()
.truncate(false)
.create(true)
.write(true)
.open(&path);
if let Err(e) = file {
error!("write: could not open file: {:?}: {}", path, e);
reply.error(libc::ENOENT);
return;
}
let mut file = file.unwrap();
debug!("writing file: {:?} at {} with size {}",
&path,
offset,
data.len()
);
file.seek(SeekFrom::Start(offset as u64)).unwrap();
file.write_all(data).unwrap();
let size = data.len();
// let size = file.write_at(data, offset as u64);
// if let Err(e) = size {
// error!("write: could not write file: {:?}: {}", path, e);
// reply.error(libc::ENOENT);
// return;
// }
// let size = size.unwrap();
debug!("wrote file: {:?} at {}; wrote {} bits", &path, offset, size);
reply.written(size as u32);
//TODO: update size in entry if necessary
debug!("updating size to {} for entry: {:?}", entry.attr.size, entry);
let mut attr = &mut entry.attr;
attr.size = attr.size.max(offset as u64 + size as u64);
let now = SystemTime::now();
attr.mtime = now;
attr.ctime = now;
debug!("updated size to {} for entry: {:?}", entry.attr.size, entry);
debug!("write done for entry: {:?}", entry);
}
//endregion
//region read
fn read(
&mut self,

View File

@@ -70,6 +70,7 @@ impl CommonEntry for SampleEntry {
&self.attr
}
}
#[derive(Debug, Default)]
pub struct SampleFilesystem {
/// the point where the filesystem is mounted
@@ -89,6 +90,7 @@ pub struct SampleFilesystem {
/// when the filesystem is remounted
generation: u64,
}
impl SampleFilesystem {
pub fn new(root: impl AsRef<Path>, source: impl AsRef<Path>) -> Self {
debug!("new: {:?}; {:?}", root.as_ref(), source.as_ref());
@@ -132,6 +134,7 @@ impl SampleFilesystem {
}
}
}
#[async_trait::async_trait]
impl CommonFilesystem<SampleEntry> for SampleFilesystem {
fn get_entries(&self) -> &HashMap<Inode, SampleEntry> {
@@ -185,6 +188,7 @@ impl CommonFilesystem<SampleEntry> for SampleFilesystem {
Ok(ino)
}
}
impl SampleFilesystem {
async fn add_dir_entry(
&mut self,
@@ -221,8 +225,7 @@ impl SampleFilesystem {
} else if metadata.is_file() {
let mode = metadata.mode();
let size = metadata.size();
//TODO: async call
// self.add_file_entry(ino, name.as_os_str(), mode as u16, size);
self.add_file_entry(ino, name.as_os_str(), mode as u16, size).await;
}
}
}
@@ -242,9 +245,13 @@ impl fuser::Filesystem for SampleFilesystem {
// self.add_file_entry(1, "hello.txt".as_ref(), 0o644);
let source = self.source.clone();
debug!("init: add_dir_entry");
run_async_blocking(async {
debug!("init: add_dir_entry (async)");
self.add_dir_entry(&source, FUSE_ROOT_ID, true).await;
debug!("init: add_dir_entry done (async)");
});
debug!("init: add_dir_entry done");
// self.add_dir_entry(&source, FUSE_ROOT_ID, true);
Ok(())
}
@@ -286,8 +293,7 @@ impl fuser::Filesystem for SampleFilesystem {
return;
}
}
if !(children.is_none()) {
} else {
if !(children.is_none()) {} else {
reply.error(libc::ENOENT);
return;
}
@@ -299,7 +305,8 @@ impl fuser::Filesystem for SampleFilesystem {
let path: PathBuf = entry.local_path.clone().into();
let attr = entry.attr;
let inode = (*child_inode).into();
offset += 1; // Increment the offset for each processed entry
// Increment the offset for each processed entry
offset += 1;
debug!("entry: {}:{:?}; {:?}", inode, path, attr);
if !reply.add(inode, offset, attr.kind, path) {
break;

View File

@@ -1,21 +1,11 @@
#[tokio::main]
async fn main() {
drive_syncer::init_logger();
// use tokio::runtime::Runtime;
// let rt = Runtime::new().unwrap();
// let filesystem_runtime = Runtime::new().unwrap();
//
// let handle = rt.handle();
// handle.block_on(async {
// drive_syncer::sample().await.unwrap();
// drive_syncer::sample_fs().await.unwrap();
// drive_syncer::google_drive::sample().await.unwrap();
// drive_syncer::watch_file_reading().await.unwrap();
// drive_syncer::sample_nix().await.unwrap();
// drive_syncer::sample_fs().await.unwrap();
drive_syncer::sample_drive_fs().await.unwrap();
// });
// RUNTIME.block_on(async {
// //test
// });
}