diff --git a/src/fs/drive/change.rs b/src/fs/drive/change.rs new file mode 100644 index 0000000..2df7d0c --- /dev/null +++ b/src/fs/drive/change.rs @@ -0,0 +1,51 @@ +use anyhow::{anyhow, Context}; +use drive3::api::{Drive, File}; +use drive3::chrono::{DateTime, Utc}; +use google_drive3::api::Change as DriveChange; + +use crate::google_drive::DriveId; + +#[derive(Debug)] +pub enum ChangeType { + Drive(Drive), + File(File), + Removed, +} + +impl ChangeType { + fn from_drive_change(change_type: Option, file: Option, drive: Option, removed: bool) -> anyhow::Result { + if removed { + return Ok(Self::Removed); + } + if let Some(change_type) = change_type { + match change_type.as_str() { + "drive" => Ok(Self::Drive(drive.context("no drive but change type was drive")?)), + "file" => Ok(Self::File(file.context("no file but change type was file")?)), + _ => Err(anyhow!("invalid change type: {}", change_type)), + } + } else { + Err(anyhow!("change type is missing")) + } + } +} + +#[derive(Debug)] +pub struct Change { + pub drive_id: DriveId, + pub kind: ChangeType, + pub time: DateTime, + pub removed: bool, +} + +impl TryFrom for Change { + type Error = anyhow::Error; + fn try_from(drive_change: DriveChange) -> anyhow::Result { + let removed = drive_change.removed.unwrap_or(false); + Ok(Self { + drive_id: DriveId::from(drive_change.drive_id.context("drive_id is missing")?), + kind: ChangeType::from_drive_change(drive_change.change_type, drive_change.file, drive_change.drive, removed)?, + time: drive_change.time.context("time is missing")?, + removed, + }) + } +} \ No newline at end of file diff --git a/src/fs/drive/entry.rs b/src/fs/drive/entry.rs index 6fad042..e5deb0e 100644 --- a/src/fs/drive/entry.rs +++ b/src/fs/drive/entry.rs @@ -12,9 +12,8 @@ pub struct DriveEntry { // pub drive_path: OsString, pub local_path: LocalPath, pub attr: FileAttr, - pub metadata_cache_time: Option, - pub content_cache_time: Option, pub drive_metadata: Option, + pub has_upstream_content_changes: bool, } impl DriveEntry { pub fn new( @@ -35,9 +34,8 @@ impl DriveEntry { // drive_path: path.clone().into(), local_path: path, attr, - metadata_cache_time: None, - content_cache_time: None, drive_metadata, + has_upstream_content_changes: true, } } } diff --git a/src/fs/drive/filesystem.rs b/src/fs/drive/filesystem.rs index 4efb5b5..df0160d 100644 --- a/src/fs/drive/filesystem.rs +++ b/src/fs/drive/filesystem.rs @@ -15,7 +15,7 @@ use std::ops::Deref; use anyhow::{anyhow, Context, Error}; use async_recursion::async_recursion; -use drive3::api::File; +use drive3::api::{File, StartPageToken}; use fuser::{ FileAttr, Filesystem, @@ -48,7 +48,7 @@ use tokio::{ use tracing::{debug, error, instrument, warn}; use crate::{async_helper::run_async_blocking, common::LocalPath, config::common_file_filter::CommonFileFilter, fs::common::CommonFilesystem, fs::CommonEntry, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, google_drive, prelude::*}; -use crate::fs::drive::{FileCommand, FileUploaderCommand, SyncSettings}; +use crate::fs::drive::{Change, ChangeType, FileCommand, FileUploaderCommand, SyncSettings}; enum CacheState { Missing, @@ -58,7 +58,6 @@ enum CacheState { #[derive(Debug)] pub struct DriveFilesystem { - // runtime: Runtime, /// the point where the filesystem is mounted root: PathBuf, /// the source dir to read from and write to @@ -66,9 +65,6 @@ pub struct DriveFilesystem { /// the cache dir to store the files in cache_dir: Option, - /// the filter to apply when uploading files - // upload_filter: CommonFileFilter, - entries: HashMap, children: HashMap>, @@ -83,6 +79,20 @@ pub struct DriveFilesystem { generation: u64, settings: SyncSettings, + + /// the token to use when requesting changes + /// from the google drive api + /// + /// this should be initialized as soon as tracking starts + /// and should be updated after retrieving every changelist + changes_start_token: StartPageToken, + + /// the time when it was last checked for changes + /// + /// if this is longer ago than the configured duration + /// the filesystem will check for changes with + /// the changes_start_token on the google drive api + last_checked_changes: SystemTime, } impl Display for DriveFilesystem { @@ -174,6 +184,7 @@ impl DriveFilesystem { None, ), ); + let changes_start_token = drive.get_start_page_token().await?; let mut s = Self { root: root.to_path_buf(), @@ -185,6 +196,8 @@ impl DriveFilesystem { generation: 0, children: HashMap::new(), settings, + changes_start_token, + last_checked_changes: UNIX_EPOCH, }; // // let root = s.root.to_path_buf(); @@ -375,18 +388,16 @@ impl DriveFilesystem { std::fs::create_dir_all(folder)?; } debug!("downloading file: {}", cache_path.display()); - drive.download_file(drive_id, &cache_path).await?; + let metadata = drive.download_file(drive_id, &cache_path).await?; debug!("downloaded file: {}", cache_path.display()); - let size = std::fs::metadata(&cache_path)?.len(); - self.set_entry_size(ino, size); - self.set_entry_cached(ino)?; - //TODO: check if any other things need to be updated for the entry + self.set_entry_metadata_with_ino(ino, &metadata)?; + self.set_entry_content_up_to_date(ino)?; Ok(cache_path) } - fn set_entry_cached(&mut self, ino: Inode) -> Result<()> { - let mut entry = self.get_entry_mut(ino).ok_or(anyhow!("could not get entry"))?; - entry.content_cache_time = Some(SystemTime::now()); + fn set_entry_content_up_to_date(&mut self, ino: Inode) -> Result<()> { + let mut entry = self.get_entry_mut(ino).context("could not get entry")?; + entry.has_upstream_content_changes = false; Ok(()) } fn check_if_file_is_cached(&self, ino: impl Into + Debug) -> Result { @@ -396,34 +407,71 @@ impl DriveFilesystem { Ok(exists) } #[instrument(fields(% self))] - async fn update_entry_metadata_cache_if_needed(&mut self, ino: impl Into + Debug) -> Result<()> { - //TODO: do something that uses the changes api so not every file needs to check for - // itself if it needs to update, rather it gets checked once and then updates all the - // cache times for all files - - let ino = ino.into(); - let entry = self.get_entry_r(ino)?; - let refresh_cache = self.get_cache_state(&entry.metadata_cache_time); - match refresh_cache { - CacheState::RefreshNeeded | CacheState::Missing => { - debug!("refreshing metadata cache for drive_id: {:?}", entry.drive_id); - let metadata = self.source.get_metadata_for_file(entry.drive_id.clone()).await?; - self.update_entry_metadata(ino, &metadata)?; - self.set_entry_metadata_cached(ino)?; - } - CacheState::UpToDate => { - debug!("metadata cache is up to date"); - } - _ => { - debug!("unknown cache state"); + async fn update_entry_metadata_cache_if_needed(&mut self) -> Result<()> { + //TODO: figure out how to prevent this from downloading changes that have been just uploaded + debug!("getting changes..."); + let changes = self.get_changes().await?; + debug!("got changes: {}", changes.len()); + for change in changes { + debug!("processing change: {:?}", change); + if let ChangeType::Drive(drive) = change.kind { + warn!("im not sure how to handle drive changes: {:?}", drive); + continue; + } else if let ChangeType::File(file) = change.kind { + debug!("file change: {:?}", file); + if let Some(drive_id) = &file.drive_id { + let searched_ino = self.find_entry_by_drive_id(drive_id); + if let Some(ino) = searched_ino { + let entry = self.get_entry_mut(ino).context("could not get entry")?; + Self::update_entry_metadata(&file, entry)?; + } + } + + debug!("processed change: {:?}", file); + continue; + } else if let ChangeType::Removed = change.kind { + debug!("removing entry: {:?}", change); + let drive_id = change.drive_id.as_str().context("could not get drive id")?; + + let ino = self.find_entry_by_drive_id(drive_id); + if let Some(ino) = ino { + self.remove_entry(ino)?; + } else { + warn!("could not find entry for drive id: {}", drive_id); + continue; + } + continue; } } + debug!("updated entry metadata cache"); Ok(()) } + + fn find_entry_by_drive_id(&self, drive_id: impl Into) -> Option { + let drive_id = drive_id.into(); + let mut searched_entry = None; + for (ino, entry) in self.entries.iter() { + let entry_drive_id = entry.drive_id.as_str(); + if let None = entry_drive_id { + warn!("could not get drive id for entry: {:?}", entry); + continue; + } + if entry_drive_id.unwrap() == drive_id { + debug!("updating entry: {}", ino); + searched_entry = Some(entry.ino); + break; + } + } + searched_entry + } + + /// Updates the entry from the drive if needed + /// + /// returns true if the entry was updated from the drive #[instrument(fields(% self))] - async fn update_cache_if_needed(&mut self, ino: impl Into + Debug) -> Result<()> { + async fn update_cache_if_needed(&mut self, ino: impl Into + Debug) -> Result { let ino = ino.into(); - self.update_entry_metadata_cache_if_needed(ino).await?; + self.update_entry_metadata_cache_if_needed().await?; let entry = match self.get_entry_r(ino) { Ok(entry) => entry, Err(e) => { @@ -431,30 +479,12 @@ impl DriveFilesystem { return Err(e); } }; - let refresh_cache = self.get_cache_state(&entry.content_cache_time); - match refresh_cache { - CacheState::Missing => { - debug!("no local cache for: {}, downloading...", ino); - self.download_file_to_cache(ino).await?; - } - CacheState::RefreshNeeded => { - debug!("cache needs refresh for: {}, checking for updated version...", ino); - let remote_mod_time: SystemTime = self.get_modified_time_on_remote(ino).await?; - debug!("remote_mod_time: {:?}", remote_mod_time); - let local_mod_time = self.get_entry_r(ino)?.attr.mtime; - debug!("local_mod_time: {:?}", local_mod_time); - if remote_mod_time > local_mod_time { - debug!("updating cached file since remote_mod_time: {:?} > local_mod_time: {:?}", remote_mod_time, local_mod_time); - self.download_file_to_cache(ino).await?; - } else { - debug!("local file is up to date: remote_mod_time: {:?} <= local_mod_time: {:?}", remote_mod_time, local_mod_time); - } - } - CacheState::UpToDate => { - debug!("Cache up to date for {} since {:?} > {}", ino, entry.content_cache_time.unwrap(), self.settings.cache_time().as_secs()); - } + if entry.has_upstream_content_changes { + debug!("entry has upstream changes: {}, downloading...", ino); + self.download_file_to_cache(ino).await?; + return Ok(true); } - Ok(()) + Ok(false) } fn get_cache_state(&self, cache_time: &Option) -> CacheState { @@ -504,8 +534,13 @@ impl DriveFilesystem { self.get_entry_mut(ino).context("no entry for ino")?.attr.size = size; Ok(()) } - fn update_entry_metadata(&mut self, ino: Inode, drive_metadata: &google_drive3::api::File) -> anyhow::Result<()> { + fn set_entry_metadata_with_ino(&mut self, ino: Inode, drive_metadata: &google_drive3::api::File) -> anyhow::Result<()> { let entry = self.get_entry_mut(ino).context("no entry with ino")?; + + Self::update_entry_metadata(drive_metadata, entry) + } + + fn update_entry_metadata(drive_metadata: &File, entry: &mut DriveEntry) -> anyhow::Result<()> { if let Some(name) = drive_metadata.name.as_ref() { entry.name = OsString::from(name); } @@ -518,15 +553,35 @@ impl DriveFilesystem { if let Some(created_time) = drive_metadata.created_time.as_ref() { entry.attr.ctime = (*created_time).into(); } - if let Some(viewed_by_me) = drive_metadata.viewed_by_me_time.as_ref(){ + if let Some(viewed_by_me) = drive_metadata.viewed_by_me_time.as_ref() { entry.attr.atime = (*viewed_by_me).into(); } + entry.has_upstream_content_changes = true; Ok(()) } - fn set_entry_metadata_cached(&mut self, ino: Inode) -> anyhow::Result<()> { - let mut entry = self.get_entry_mut(ino).context("no entry with ino")?; - entry.metadata_cache_time = Some(SystemTime::now()); + async fn get_changes(&mut self) -> anyhow::Result> { + if self.last_checked_changes + self.settings.cache_time() > SystemTime::now() { + debug!("not checking for changes since we already checked recently"); + return Ok(vec![]); + } + debug!("checking for changes..."); + let changes: anyhow::Result> = self + .source + .get_changes_since(&mut self.changes_start_token) + .await? + .into_iter() + .map(Change::try_from) + .collect(); + + self.last_checked_changes = SystemTime::now(); + debug!("checked for changes, found {} changes", changes.as_ref().unwrap_or(&Vec::::new()).len()); + changes + } + fn remove_entry(&mut self, ino: Inode) -> anyhow::Result<()> { + let entry = self.entries.remove_entry(&ino); + //TODO: remove from children + //TODO: remove from cache if it exists Ok(()) } } @@ -660,8 +715,7 @@ impl Filesystem for DriveFilesystem { let children = children.unwrap().clone(); debug!("lookup: children: {:?}", children); for child_inode in children { - - run_async_blocking(self.update_entry_metadata_cache_if_needed(child_inode)); + run_async_blocking(self.update_entry_metadata_cache_if_needed()); let entry = self.entries.get(&child_inode); if entry.is_none() { warn!("lookup: could not find entry for {}", child_inode); @@ -689,7 +743,8 @@ impl Filesystem for DriveFilesystem { #[instrument(skip(_req, reply), fields(% self))] fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) { debug!("getattr: {}", ino); - run_async_blocking(self.update_entry_metadata_cache_if_needed(ino)); + run_async_blocking(self.update_entry_metadata_cache_if_needed()); + debug!("getattr: after update_entry_metadata_cache_if_needed"); let entry = self.entries.get(&ino.into()); if let Some(entry) = entry { reply.attr(&self.settings.time_to_live(), &entry.attr); @@ -784,11 +839,21 @@ impl Filesystem for DriveFilesystem { "write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}", ino, fh, offset, flags, lock_owner, write_flags, data, ); - let cache_update_success: Result<()> = run_async_blocking(self.update_cache_if_needed(ino)); - if let Err(e) = cache_update_success { - error!("write: could not update cache: {}", e); - reply.error(libc::EIO); - return; + let cache_update_success: Result = run_async_blocking(self.update_cache_if_needed(ino)); + match cache_update_success { + Err(e) => { + error!("write: could not update cache: {}", e); + reply.error(libc::EIO); + return; + } + Ok(cache_updated) => { + if cache_updated { + error!("conflict detected, upstream had a change, cache was updated, aborting write"); + //TODO: maybe output a message to the user? + reply.error(libc::EIO); + return; + } + } } let cache_dir = self.cache_dir .as_ref() @@ -875,8 +940,8 @@ impl Filesystem for DriveFilesystem { ino, fh, offset, size, flags, lock_owner ); - run_async_blocking(self.update_entry_metadata_cache_if_needed(ino)); - let x: Result<()> = run_async_blocking(self.update_cache_if_needed(ino)); + run_async_blocking(self.update_entry_metadata_cache_if_needed()); + let x: Result = run_async_blocking(self.update_cache_if_needed(ino)); if let Err(e) = x { error!("read: could not update cache: {}", e); reply.error(libc::EIO); @@ -937,7 +1002,7 @@ impl Filesystem for DriveFilesystem { mut reply: ReplyDirectory, ) { debug!("readdir: {}:{}:{:?}", ino, fh, offset); - run_async_blocking(self.update_entry_metadata_cache_if_needed(ino)); + run_async_blocking(self.update_entry_metadata_cache_if_needed()); let children = self.children.get(&ino.into()); if let Some(attr) = self.get_entries().get(&ino.into()).map(|entry| entry.attr) { if attr.kind != FileType::Directory { diff --git a/src/fs/drive/mod.rs b/src/fs/drive/mod.rs index e3b32d5..6bfabcb 100644 --- a/src/fs/drive/mod.rs +++ b/src/fs/drive/mod.rs @@ -3,7 +3,9 @@ pub use entry::*; pub use filesystem::*; pub use file_uploader::*; pub use settings::*; +pub use change::*; mod entry; mod filesystem; mod file_uploader; -mod settings; \ No newline at end of file +mod settings; +mod change; \ No newline at end of file diff --git a/src/google_drive/drive.rs b/src/google_drive/drive.rs index c3fe30c..949c97e 100644 --- a/src/google_drive/drive.rs +++ b/src/google_drive/drive.rs @@ -8,7 +8,8 @@ use std::time::SystemTime; // use drive3::api::Scope::File; use anyhow::{anyhow, Context}; use drive3::{hyper_rustls, oauth2}; -use drive3::api::{File, Scope}; +use drive3::api::{Change, File, Scope, StartPageToken}; +use drive3::chrono::{DateTime, Utc}; use drive3::client::ReadSeek; use drive3::DriveHub; use drive3::hyper::{body, Body, Response}; @@ -21,7 +22,7 @@ use mime::{FromStrError, Mime}; use tokio::{fs, io}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use tokio::runtime::Runtime; -use tracing::{debug, instrument, trace, warn}; +use tracing::{debug, error, instrument, trace, warn}; use tracing::field::debug; use crate::google_drive::{drive, DriveId, helpers}; @@ -32,6 +33,52 @@ pub struct GoogleDrive { hub: DriveHub>, } +impl GoogleDrive { + #[instrument] + pub(crate) async fn get_start_page_token(&self) -> anyhow::Result { + let (_response, start_page_token) = self.hub.changes().get_start_page_token().doit().await?; + Ok(start_page_token) + } +} + +impl GoogleDrive { + #[instrument] + pub(crate) async fn get_changes_since(&self, start_page_token: &mut StartPageToken) -> anyhow::Result> { + let mut changes = vec![]; + let mut page_token: Option = None; + loop { + debug!("getting changes since {:?} page: {:?}", start_page_token, page_token); + let mut request = self + .hub + .changes() + .list(&start_page_token + .start_page_token + .as_ref() + .context("no start_page_token")?); + if let Some(page_token) = &page_token { + request = request.page_token(page_token); + } + let (_response, change_list) = request + .doit() + .await + .context("could not get changes")?; + if let Some(change_list) = change_list.changes { + changes.extend(change_list); + } + if let Some(next_page_token) = change_list.next_page_token { + page_token = Some(next_page_token); + } else if let Some(new_start_page_token) = change_list.new_start_page_token { + start_page_token.start_page_token = Some(new_start_page_token); + break; + } else { + error!("no next_page_token or new_start_page_token"); + break; + } + } + Ok(changes) + } +} + impl GoogleDrive { #[instrument] pub(crate) async fn get_metadata_for_file(&self, drive_id: DriveId) -> anyhow::Result { @@ -67,7 +114,7 @@ impl GoogleDrive { impl GoogleDrive { #[instrument] - pub async fn download_file(&self, file_id: DriveId, target_file: &PathBuf) -> Result<()> { + pub async fn download_file(&self, file_id: DriveId, target_file: &PathBuf) -> Result { debug!( "download_file: file_id: {:50?} to {}", file_id, @@ -78,13 +125,13 @@ impl GoogleDrive { Err(e) => return Err(anyhow!("invalid file_id: {:?}", e).into()), }; - let x = download_file_by_id(&self, file_id, target_file.as_path()).await; + let file = download_file_by_id(&self, file_id, target_file.as_path()).await; debug!("download_file: completed"); - let x = x?; + let file = file?; debug!("download_file: success"); - Ok(()) + Ok(file) } } diff --git a/src/lib.rs b/src/lib.rs index 4cff914..d7cabfb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -429,7 +429,7 @@ pub async fn sample_drive_fs() -> Result<()> { let cache_dir = get_cache_dir()?; let upload_ignore = CommonFileFilter::from_path(upload_ignore_path)?; - let sync_settings = SyncSettings::new(Duration::from_secs(2), Duration::from_secs(20)); + let sync_settings = SyncSettings::new(Duration::from_secs(2), Duration::from_secs(5)); // let source = "/tmp/fuse/2"; let drive = GoogleDrive::new().await?; // let file_uploader = FileUploader::new("config/credentials.json", "config/token.json");