changes api

This commit is contained in:
OMGeeky
2023-05-19 01:18:52 +02:00
parent 0c88eebf3d
commit 327e0e1564
6 changed files with 250 additions and 87 deletions

51
src/fs/drive/change.rs Normal file
View File

@@ -0,0 +1,51 @@
use anyhow::{anyhow, Context};
use drive3::api::{Drive, File};
use drive3::chrono::{DateTime, Utc};
use google_drive3::api::Change as DriveChange;
use crate::google_drive::DriveId;
#[derive(Debug)]
pub enum ChangeType {
Drive(Drive),
File(File),
Removed,
}
impl ChangeType {
fn from_drive_change(change_type: Option<String>, file: Option<File>, drive: Option<Drive>, removed: bool) -> anyhow::Result<ChangeType> {
if removed {
return Ok(Self::Removed);
}
if let Some(change_type) = change_type {
match change_type.as_str() {
"drive" => Ok(Self::Drive(drive.context("no drive but change type was drive")?)),
"file" => Ok(Self::File(file.context("no file but change type was file")?)),
_ => Err(anyhow!("invalid change type: {}", change_type)),
}
} else {
Err(anyhow!("change type is missing"))
}
}
}
#[derive(Debug)]
pub struct Change {
pub drive_id: DriveId,
pub kind: ChangeType,
pub time: DateTime<Utc>,
pub removed: bool,
}
impl TryFrom<DriveChange> for Change {
type Error = anyhow::Error;
fn try_from(drive_change: DriveChange) -> anyhow::Result<Self> {
let removed = drive_change.removed.unwrap_or(false);
Ok(Self {
drive_id: DriveId::from(drive_change.drive_id.context("drive_id is missing")?),
kind: ChangeType::from_drive_change(drive_change.change_type, drive_change.file, drive_change.drive, removed)?,
time: drive_change.time.context("time is missing")?,
removed,
})
}
}

View File

@@ -12,9 +12,8 @@ pub struct DriveEntry {
// pub drive_path: OsString,
pub local_path: LocalPath,
pub attr: FileAttr,
pub metadata_cache_time: Option<std::time::SystemTime>,
pub content_cache_time: Option<std::time::SystemTime>,
pub drive_metadata: Option<drive3::api::File>,
pub has_upstream_content_changes: bool,
}
impl DriveEntry {
pub fn new(
@@ -35,9 +34,8 @@ impl DriveEntry {
// drive_path: path.clone().into(),
local_path: path,
attr,
metadata_cache_time: None,
content_cache_time: None,
drive_metadata,
has_upstream_content_changes: true,
}
}
}

View File

@@ -15,7 +15,7 @@ use std::ops::Deref;
use anyhow::{anyhow, Context, Error};
use async_recursion::async_recursion;
use drive3::api::File;
use drive3::api::{File, StartPageToken};
use fuser::{
FileAttr,
Filesystem,
@@ -48,7 +48,7 @@ use tokio::{
use tracing::{debug, error, instrument, warn};
use crate::{async_helper::run_async_blocking, common::LocalPath, config::common_file_filter::CommonFileFilter, fs::common::CommonFilesystem, fs::CommonEntry, fs::drive::DriveEntry, fs::inode::Inode, google_drive::{DriveId, GoogleDrive}, google_drive, prelude::*};
use crate::fs::drive::{FileCommand, FileUploaderCommand, SyncSettings};
use crate::fs::drive::{Change, ChangeType, FileCommand, FileUploaderCommand, SyncSettings};
enum CacheState {
Missing,
@@ -58,7 +58,6 @@ enum CacheState {
#[derive(Debug)]
pub struct DriveFilesystem {
// runtime: Runtime,
/// the point where the filesystem is mounted
root: PathBuf,
/// the source dir to read from and write to
@@ -66,9 +65,6 @@ pub struct DriveFilesystem {
/// the cache dir to store the files in
cache_dir: Option<PathBuf>,
/// the filter to apply when uploading files
// upload_filter: CommonFileFilter,
entries: HashMap<Inode, DriveEntry>,
children: HashMap<Inode, Vec<Inode>>,
@@ -83,6 +79,20 @@ pub struct DriveFilesystem {
generation: u64,
settings: SyncSettings,
/// the token to use when requesting changes
/// from the google drive api
///
/// this should be initialized as soon as tracking starts
/// and should be updated after retrieving every changelist
changes_start_token: StartPageToken,
/// the time when it was last checked for changes
///
/// if this is longer ago than the configured duration
/// the filesystem will check for changes with
/// the changes_start_token on the google drive api
last_checked_changes: SystemTime,
}
impl Display for DriveFilesystem {
@@ -174,6 +184,7 @@ impl DriveFilesystem {
None,
),
);
let changes_start_token = drive.get_start_page_token().await?;
let mut s = Self {
root: root.to_path_buf(),
@@ -185,6 +196,8 @@ impl DriveFilesystem {
generation: 0,
children: HashMap::new(),
settings,
changes_start_token,
last_checked_changes: UNIX_EPOCH,
};
//
// let root = s.root.to_path_buf();
@@ -375,18 +388,16 @@ impl DriveFilesystem {
std::fs::create_dir_all(folder)?;
}
debug!("downloading file: {}", cache_path.display());
drive.download_file(drive_id, &cache_path).await?;
let metadata = drive.download_file(drive_id, &cache_path).await?;
debug!("downloaded file: {}", cache_path.display());
let size = std::fs::metadata(&cache_path)?.len();
self.set_entry_size(ino, size);
self.set_entry_cached(ino)?;
//TODO: check if any other things need to be updated for the entry
self.set_entry_metadata_with_ino(ino, &metadata)?;
self.set_entry_content_up_to_date(ino)?;
Ok(cache_path)
}
fn set_entry_cached(&mut self, ino: Inode) -> Result<()> {
let mut entry = self.get_entry_mut(ino).ok_or(anyhow!("could not get entry"))?;
entry.content_cache_time = Some(SystemTime::now());
fn set_entry_content_up_to_date(&mut self, ino: Inode) -> Result<()> {
let mut entry = self.get_entry_mut(ino).context("could not get entry")?;
entry.has_upstream_content_changes = false;
Ok(())
}
fn check_if_file_is_cached(&self, ino: impl Into<Inode> + Debug) -> Result<bool> {
@@ -396,34 +407,71 @@ impl DriveFilesystem {
Ok(exists)
}
#[instrument(fields(% self))]
async fn update_entry_metadata_cache_if_needed(&mut self, ino: impl Into<Inode> + Debug) -> Result<()> {
//TODO: do something that uses the changes api so not every file needs to check for
// itself if it needs to update, rather it gets checked once and then updates all the
// cache times for all files
let ino = ino.into();
let entry = self.get_entry_r(ino)?;
let refresh_cache = self.get_cache_state(&entry.metadata_cache_time);
match refresh_cache {
CacheState::RefreshNeeded | CacheState::Missing => {
debug!("refreshing metadata cache for drive_id: {:?}", entry.drive_id);
let metadata = self.source.get_metadata_for_file(entry.drive_id.clone()).await?;
self.update_entry_metadata(ino, &metadata)?;
self.set_entry_metadata_cached(ino)?;
}
CacheState::UpToDate => {
debug!("metadata cache is up to date");
}
_ => {
debug!("unknown cache state");
async fn update_entry_metadata_cache_if_needed(&mut self) -> Result<()> {
//TODO: figure out how to prevent this from downloading changes that have been just uploaded
debug!("getting changes...");
let changes = self.get_changes().await?;
debug!("got changes: {}", changes.len());
for change in changes {
debug!("processing change: {:?}", change);
if let ChangeType::Drive(drive) = change.kind {
warn!("im not sure how to handle drive changes: {:?}", drive);
continue;
} else if let ChangeType::File(file) = change.kind {
debug!("file change: {:?}", file);
if let Some(drive_id) = &file.drive_id {
let searched_ino = self.find_entry_by_drive_id(drive_id);
if let Some(ino) = searched_ino {
let entry = self.get_entry_mut(ino).context("could not get entry")?;
Self::update_entry_metadata(&file, entry)?;
}
}
debug!("processed change: {:?}", file);
continue;
} else if let ChangeType::Removed = change.kind {
debug!("removing entry: {:?}", change);
let drive_id = change.drive_id.as_str().context("could not get drive id")?;
let ino = self.find_entry_by_drive_id(drive_id);
if let Some(ino) = ino {
self.remove_entry(ino)?;
} else {
warn!("could not find entry for drive id: {}", drive_id);
continue;
}
continue;
}
}
debug!("updated entry metadata cache");
Ok(())
}
fn find_entry_by_drive_id(&self, drive_id: impl Into<String>) -> Option<Inode> {
let drive_id = drive_id.into();
let mut searched_entry = None;
for (ino, entry) in self.entries.iter() {
let entry_drive_id = entry.drive_id.as_str();
if let None = entry_drive_id {
warn!("could not get drive id for entry: {:?}", entry);
continue;
}
if entry_drive_id.unwrap() == drive_id {
debug!("updating entry: {}", ino);
searched_entry = Some(entry.ino);
break;
}
}
searched_entry
}
/// Updates the entry from the drive if needed
///
/// returns true if the entry was updated from the drive
#[instrument(fields(% self))]
async fn update_cache_if_needed(&mut self, ino: impl Into<Inode> + Debug) -> Result<()> {
async fn update_cache_if_needed(&mut self, ino: impl Into<Inode> + Debug) -> Result<bool> {
let ino = ino.into();
self.update_entry_metadata_cache_if_needed(ino).await?;
self.update_entry_metadata_cache_if_needed().await?;
let entry = match self.get_entry_r(ino) {
Ok(entry) => entry,
Err(e) => {
@@ -431,30 +479,12 @@ impl DriveFilesystem {
return Err(e);
}
};
let refresh_cache = self.get_cache_state(&entry.content_cache_time);
match refresh_cache {
CacheState::Missing => {
debug!("no local cache for: {}, downloading...", ino);
self.download_file_to_cache(ino).await?;
}
CacheState::RefreshNeeded => {
debug!("cache needs refresh for: {}, checking for updated version...", ino);
let remote_mod_time: SystemTime = self.get_modified_time_on_remote(ino).await?;
debug!("remote_mod_time: {:?}", remote_mod_time);
let local_mod_time = self.get_entry_r(ino)?.attr.mtime;
debug!("local_mod_time: {:?}", local_mod_time);
if remote_mod_time > local_mod_time {
debug!("updating cached file since remote_mod_time: {:?} > local_mod_time: {:?}", remote_mod_time, local_mod_time);
self.download_file_to_cache(ino).await?;
} else {
debug!("local file is up to date: remote_mod_time: {:?} <= local_mod_time: {:?}", remote_mod_time, local_mod_time);
}
}
CacheState::UpToDate => {
debug!("Cache up to date for {} since {:?} > {}", ino, entry.content_cache_time.unwrap(), self.settings.cache_time().as_secs());
}
if entry.has_upstream_content_changes {
debug!("entry has upstream changes: {}, downloading...", ino);
self.download_file_to_cache(ino).await?;
return Ok(true);
}
Ok(())
Ok(false)
}
fn get_cache_state(&self, cache_time: &Option<SystemTime>) -> CacheState {
@@ -504,8 +534,13 @@ impl DriveFilesystem {
self.get_entry_mut(ino).context("no entry for ino")?.attr.size = size;
Ok(())
}
fn update_entry_metadata(&mut self, ino: Inode, drive_metadata: &google_drive3::api::File) -> anyhow::Result<()> {
fn set_entry_metadata_with_ino(&mut self, ino: Inode, drive_metadata: &google_drive3::api::File) -> anyhow::Result<()> {
let entry = self.get_entry_mut(ino).context("no entry with ino")?;
Self::update_entry_metadata(drive_metadata, entry)
}
fn update_entry_metadata(drive_metadata: &File, entry: &mut DriveEntry) -> anyhow::Result<()> {
if let Some(name) = drive_metadata.name.as_ref() {
entry.name = OsString::from(name);
}
@@ -518,15 +553,35 @@ impl DriveFilesystem {
if let Some(created_time) = drive_metadata.created_time.as_ref() {
entry.attr.ctime = (*created_time).into();
}
if let Some(viewed_by_me) = drive_metadata.viewed_by_me_time.as_ref(){
if let Some(viewed_by_me) = drive_metadata.viewed_by_me_time.as_ref() {
entry.attr.atime = (*viewed_by_me).into();
}
entry.has_upstream_content_changes = true;
Ok(())
}
fn set_entry_metadata_cached(&mut self, ino: Inode) -> anyhow::Result<()> {
let mut entry = self.get_entry_mut(ino).context("no entry with ino")?;
entry.metadata_cache_time = Some(SystemTime::now());
async fn get_changes(&mut self) -> anyhow::Result<Vec<Change>> {
if self.last_checked_changes + self.settings.cache_time() > SystemTime::now() {
debug!("not checking for changes since we already checked recently");
return Ok(vec![]);
}
debug!("checking for changes...");
let changes: anyhow::Result<Vec<Change>> = self
.source
.get_changes_since(&mut self.changes_start_token)
.await?
.into_iter()
.map(Change::try_from)
.collect();
self.last_checked_changes = SystemTime::now();
debug!("checked for changes, found {} changes", changes.as_ref().unwrap_or(&Vec::<Change>::new()).len());
changes
}
fn remove_entry(&mut self, ino: Inode) -> anyhow::Result<()> {
let entry = self.entries.remove_entry(&ino);
//TODO: remove from children
//TODO: remove from cache if it exists
Ok(())
}
}
@@ -660,8 +715,7 @@ impl Filesystem for DriveFilesystem {
let children = children.unwrap().clone();
debug!("lookup: children: {:?}", children);
for child_inode in children {
run_async_blocking(self.update_entry_metadata_cache_if_needed(child_inode));
run_async_blocking(self.update_entry_metadata_cache_if_needed());
let entry = self.entries.get(&child_inode);
if entry.is_none() {
warn!("lookup: could not find entry for {}", child_inode);
@@ -689,7 +743,8 @@ impl Filesystem for DriveFilesystem {
#[instrument(skip(_req, reply), fields(% self))]
fn getattr(&mut self, _req: &Request<'_>, ino: u64, reply: ReplyAttr) {
debug!("getattr: {}", ino);
run_async_blocking(self.update_entry_metadata_cache_if_needed(ino));
run_async_blocking(self.update_entry_metadata_cache_if_needed());
debug!("getattr: after update_entry_metadata_cache_if_needed");
let entry = self.entries.get(&ino.into());
if let Some(entry) = entry {
reply.attr(&self.settings.time_to_live(), &entry.attr);
@@ -784,11 +839,21 @@ impl Filesystem for DriveFilesystem {
"write: {}:{}:{}:{:#x?}:{:?}:{:#x?}:{:?}",
ino, fh, offset, flags, lock_owner, write_flags, data,
);
let cache_update_success: Result<()> = run_async_blocking(self.update_cache_if_needed(ino));
if let Err(e) = cache_update_success {
error!("write: could not update cache: {}", e);
reply.error(libc::EIO);
return;
let cache_update_success: Result<bool> = run_async_blocking(self.update_cache_if_needed(ino));
match cache_update_success {
Err(e) => {
error!("write: could not update cache: {}", e);
reply.error(libc::EIO);
return;
}
Ok(cache_updated) => {
if cache_updated {
error!("conflict detected, upstream had a change, cache was updated, aborting write");
//TODO: maybe output a message to the user?
reply.error(libc::EIO);
return;
}
}
}
let cache_dir = self.cache_dir
.as_ref()
@@ -875,8 +940,8 @@ impl Filesystem for DriveFilesystem {
ino, fh, offset, size, flags, lock_owner
);
run_async_blocking(self.update_entry_metadata_cache_if_needed(ino));
let x: Result<()> = run_async_blocking(self.update_cache_if_needed(ino));
run_async_blocking(self.update_entry_metadata_cache_if_needed());
let x: Result<bool> = run_async_blocking(self.update_cache_if_needed(ino));
if let Err(e) = x {
error!("read: could not update cache: {}", e);
reply.error(libc::EIO);
@@ -937,7 +1002,7 @@ impl Filesystem for DriveFilesystem {
mut reply: ReplyDirectory,
) {
debug!("readdir: {}:{}:{:?}", ino, fh, offset);
run_async_blocking(self.update_entry_metadata_cache_if_needed(ino));
run_async_blocking(self.update_entry_metadata_cache_if_needed());
let children = self.children.get(&ino.into());
if let Some(attr) = self.get_entries().get(&ino.into()).map(|entry| entry.attr) {
if attr.kind != FileType::Directory {

View File

@@ -3,7 +3,9 @@ pub use entry::*;
pub use filesystem::*;
pub use file_uploader::*;
pub use settings::*;
pub use change::*;
mod entry;
mod filesystem;
mod file_uploader;
mod settings;
mod settings;
mod change;

View File

@@ -8,7 +8,8 @@ use std::time::SystemTime;
// use drive3::api::Scope::File;
use anyhow::{anyhow, Context};
use drive3::{hyper_rustls, oauth2};
use drive3::api::{File, Scope};
use drive3::api::{Change, File, Scope, StartPageToken};
use drive3::chrono::{DateTime, Utc};
use drive3::client::ReadSeek;
use drive3::DriveHub;
use drive3::hyper::{body, Body, Response};
@@ -21,7 +22,7 @@ use mime::{FromStrError, Mime};
use tokio::{fs, io};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
use tracing::{debug, instrument, trace, warn};
use tracing::{debug, error, instrument, trace, warn};
use tracing::field::debug;
use crate::google_drive::{drive, DriveId, helpers};
@@ -32,6 +33,52 @@ pub struct GoogleDrive {
hub: DriveHub<HttpsConnector<HttpConnector>>,
}
impl GoogleDrive {
#[instrument]
pub(crate) async fn get_start_page_token(&self) -> anyhow::Result<StartPageToken> {
let (_response, start_page_token) = self.hub.changes().get_start_page_token().doit().await?;
Ok(start_page_token)
}
}
impl GoogleDrive {
#[instrument]
pub(crate) async fn get_changes_since(&self, start_page_token: &mut StartPageToken) -> anyhow::Result<Vec<Change>> {
let mut changes = vec![];
let mut page_token: Option<String> = None;
loop {
debug!("getting changes since {:?} page: {:?}", start_page_token, page_token);
let mut request = self
.hub
.changes()
.list(&start_page_token
.start_page_token
.as_ref()
.context("no start_page_token")?);
if let Some(page_token) = &page_token {
request = request.page_token(page_token);
}
let (_response, change_list) = request
.doit()
.await
.context("could not get changes")?;
if let Some(change_list) = change_list.changes {
changes.extend(change_list);
}
if let Some(next_page_token) = change_list.next_page_token {
page_token = Some(next_page_token);
} else if let Some(new_start_page_token) = change_list.new_start_page_token {
start_page_token.start_page_token = Some(new_start_page_token);
break;
} else {
error!("no next_page_token or new_start_page_token");
break;
}
}
Ok(changes)
}
}
impl GoogleDrive {
#[instrument]
pub(crate) async fn get_metadata_for_file(&self, drive_id: DriveId) -> anyhow::Result<File> {
@@ -67,7 +114,7 @@ impl GoogleDrive {
impl GoogleDrive {
#[instrument]
pub async fn download_file(&self, file_id: DriveId, target_file: &PathBuf) -> Result<()> {
pub async fn download_file(&self, file_id: DriveId, target_file: &PathBuf) -> Result<File> {
debug!(
"download_file: file_id: {:50?} to {}",
file_id,
@@ -78,13 +125,13 @@ impl GoogleDrive {
Err(e) => return Err(anyhow!("invalid file_id: {:?}", e).into()),
};
let x = download_file_by_id(&self, file_id, target_file.as_path()).await;
let file = download_file_by_id(&self, file_id, target_file.as_path()).await;
debug!("download_file: completed");
let x = x?;
let file = file?;
debug!("download_file: success");
Ok(())
Ok(file)
}
}

View File

@@ -429,7 +429,7 @@ pub async fn sample_drive_fs() -> Result<()> {
let cache_dir = get_cache_dir()?;
let upload_ignore = CommonFileFilter::from_path(upload_ignore_path)?;
let sync_settings = SyncSettings::new(Duration::from_secs(2), Duration::from_secs(20));
let sync_settings = SyncSettings::new(Duration::from_secs(2), Duration::from_secs(5));
// let source = "/tmp/fuse/2";
let drive = GoogleDrive::new().await?;
// let file_uploader = FileUploader::new("config/credentials.json", "config/token.json");