mirror of
https://github.com/OMGeeky/drive_syncer.git
synced 2026-01-01 01:09:57 +01:00
restructure threads&tasks, start implementing changes api, cleanup
This commit is contained in:
@@ -41,10 +41,9 @@ impl ChangeType {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Change {
|
||||
pub drive_id: DriveId,
|
||||
pub id: DriveId,
|
||||
pub kind: ChangeType,
|
||||
pub time: DateTime<Utc>,
|
||||
pub removed: bool,
|
||||
}
|
||||
|
||||
impl TryFrom<DriveChange> for Change {
|
||||
@@ -52,13 +51,9 @@ impl TryFrom<DriveChange> for Change {
|
||||
#[instrument]
|
||||
fn try_from(drive_change: DriveChange) -> anyhow::Result<Self> {
|
||||
let removed = drive_change.removed.unwrap_or(false);
|
||||
let drive_id = drive_change.file_id.context("drive_id is missing");
|
||||
if let Err(e) = drive_id {
|
||||
error!("drive_id is missing: {:?}", e);
|
||||
return Err(anyhow!("drive_id is missing: {:?}", e));
|
||||
}
|
||||
let drive_id = drive_change.file_id.context("file_id is missing")?;
|
||||
Ok(Self {
|
||||
drive_id: DriveId::from(drive_id?),
|
||||
id: DriveId::from(drive_id),
|
||||
kind: ChangeType::from_drive_change(
|
||||
drive_change.change_type,
|
||||
drive_change.file,
|
||||
@@ -66,7 +61,6 @@ impl TryFrom<DriveChange> for Change {
|
||||
removed,
|
||||
)?,
|
||||
time: drive_change.time.context("time is missing")?,
|
||||
removed,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -517,12 +517,12 @@ impl DriveFilesystem {
|
||||
ChangeType::Drive(drive) => {
|
||||
warn!("im not sure how to handle drive changes: {:?}", drive);
|
||||
|
||||
updated_entries.push(change.drive_id);
|
||||
updated_entries.push(change.id);
|
||||
continue;
|
||||
}
|
||||
ChangeType::File(file) => {
|
||||
debug!("file change: {:?}", file);
|
||||
let drive_id = &change.drive_id;
|
||||
let drive_id = &change.id;
|
||||
|
||||
let entry = self.entries.get_mut(drive_id);
|
||||
if let Some(entry) = entry {
|
||||
@@ -533,20 +533,20 @@ impl DriveFilesystem {
|
||||
let change_successful = Self::update_entry_metadata(file, entry);
|
||||
if let Err(e) = change_successful {
|
||||
warn!("got an err while update entry metadata: {}", e);
|
||||
updated_entries.push(change.drive_id);
|
||||
updated_entries.push(change.id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
updated_entries.push(change.drive_id);
|
||||
updated_entries.push(change.id);
|
||||
debug!("processed change");
|
||||
continue;
|
||||
}
|
||||
ChangeType::Removed => {
|
||||
debug!("removing entry: {:?}", change);
|
||||
//TODO: actually delete the entry
|
||||
self.remove_entry(&change.drive_id)?;
|
||||
updated_entries.push(change.drive_id);
|
||||
self.remove_entry(&change.id)?;
|
||||
updated_entries.push(change.id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use std::collections::HashMap;
|
||||
use std::ffi::OsStr;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ffi::OsStr,
|
||||
fmt::{Display, Formatter},
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use bimap::BiMap;
|
||||
@@ -10,10 +13,8 @@ use fuser::{
|
||||
ReplyEntry, ReplyOpen, ReplyWrite, Request, TimeOrNow,
|
||||
};
|
||||
use libc::c_int;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::fs::File;
|
||||
use tracing::field::debug;
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
use tracing::{debug, error, field::debug, instrument, trace};
|
||||
|
||||
pub use handle_flags::HandleFlags;
|
||||
|
||||
|
||||
@@ -1,35 +1,37 @@
|
||||
use std::collections::HashMap;
|
||||
use std::ffi::{OsStr, OsString};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::fs::Permissions;
|
||||
use std::io::SeekFrom;
|
||||
use std::os::unix::prelude::{MetadataExt, PermissionsExt};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use anyhow::{anyhow, Context, Error};
|
||||
use fuser::{FileAttr, FileType};
|
||||
use libc::c_int;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::sleep;
|
||||
use tokio::{fs, join};
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
use crate::fs::drive2::HandleFlags;
|
||||
use crate::fs::drive_file_provider::{
|
||||
FileMetadata, ProviderLookupRequest, ProviderMetadataRequest, ProviderOpenFileRequest,
|
||||
ProviderReadContentRequest, ProviderReadDirRequest, ProviderReadDirResponse,
|
||||
ProviderReleaseFileRequest, ProviderRequest, ProviderRequestStruct, ProviderResponse,
|
||||
ProviderSetAttrRequest, ProviderWriteContentRequest,
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::{Debug, Formatter},
|
||||
io::SeekFrom,
|
||||
os::unix::prelude::MetadataExt,
|
||||
path::PathBuf,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use fuser::{FileAttr, FileType};
|
||||
use google_drive3::api::StartPageToken;
|
||||
use tokio::{
|
||||
fs,
|
||||
fs::{File, OpenOptions},
|
||||
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
|
||||
sync::mpsc::{Receiver, Sender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tracing::{debug, error, instrument, trace, warn};
|
||||
|
||||
use crate::{
|
||||
fs::drive::{Change, ChangeType},
|
||||
fs::drive2::HandleFlags,
|
||||
fs::drive_file_provider::{
|
||||
FileMetadata, ProviderLookupRequest, ProviderMetadataRequest, ProviderOpenFileRequest,
|
||||
ProviderReadContentRequest, ProviderReadDirRequest, ProviderReadDirResponse,
|
||||
ProviderReleaseFileRequest, ProviderRequest, ProviderResponse, ProviderSetAttrRequest,
|
||||
ProviderWriteContentRequest,
|
||||
},
|
||||
google_drive::{DriveId, GoogleDrive},
|
||||
prelude::*,
|
||||
send_error_response, send_response,
|
||||
};
|
||||
use crate::google_drive::{DriveId, GoogleDrive};
|
||||
use crate::prelude::*;
|
||||
use crate::{send_error_response, send_response};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ProviderCommand {
|
||||
@@ -80,6 +82,10 @@ pub struct DriveFileProvider {
|
||||
|
||||
file_handles: HashMap<u64, FileHandleData>,
|
||||
next_fh: u64,
|
||||
|
||||
changes_start_token: StartPageToken,
|
||||
last_checked_for_changes: SystemTime,
|
||||
allowed_cache_time: Duration,
|
||||
}
|
||||
impl Debug for DriveFileProvider {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -100,6 +106,7 @@ impl DriveFileProvider {
|
||||
drive: GoogleDrive,
|
||||
cache_dir: PathBuf,
|
||||
perma_dir: PathBuf,
|
||||
changes_start_token: StartPageToken,
|
||||
// file_request_receiver: std::sync::mpsc::Receiver<ProviderRequest>,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -114,6 +121,10 @@ impl DriveFileProvider {
|
||||
children: HashMap::new(),
|
||||
file_handles: HashMap::new(),
|
||||
next_fh: 111,
|
||||
|
||||
changes_start_token,
|
||||
last_checked_for_changes: SystemTime::UNIX_EPOCH,
|
||||
allowed_cache_time: Duration::from_secs(10),
|
||||
}
|
||||
}
|
||||
fn add_parent_child_relation(&mut self, parent_id: DriveId, child_id: DriveId) {
|
||||
@@ -172,10 +183,7 @@ impl DriveFileProvider {
|
||||
// //TODO: implement waiting for the stop signal instead of just waiting for 10 days
|
||||
}
|
||||
#[instrument(skip(self, rx))]
|
||||
pub async fn listen_for_file_requests(
|
||||
&mut self,
|
||||
rx: tokio::sync::mpsc::Receiver<ProviderRequest>,
|
||||
) {
|
||||
pub async fn listen_for_file_requests(&mut self, rx: Receiver<ProviderRequest>) {
|
||||
debug!("initializing entries");
|
||||
let init_res = self.initialize_entries().await;
|
||||
if let Err(e) = init_res {
|
||||
@@ -186,6 +194,7 @@ impl DriveFileProvider {
|
||||
let mut rx = rx;
|
||||
while let Some(file_request) = rx.recv().await {
|
||||
debug!("got file request: {:?}", file_request);
|
||||
self.check_and_apply_changes().await;
|
||||
let result = match file_request {
|
||||
ProviderRequest::OpenFile(r) => self.open_file(r).await,
|
||||
ProviderRequest::ReleaseFile(r) => self.release_file(r).await,
|
||||
@@ -196,7 +205,10 @@ impl DriveFileProvider {
|
||||
ProviderRequest::Lookup(r) => self.lookup(r).await,
|
||||
ProviderRequest::SetAttr(r) => self.set_attr(r).await,
|
||||
_ => {
|
||||
error!("DriveFileProvider::listen_for_file_requests() received unknown request: {:?}", file_request);
|
||||
error!(
|
||||
"DriveFileProvider::listen_for_file_requests() received unknown request: {:?}",
|
||||
file_request
|
||||
);
|
||||
todo!("handle this unknown request")
|
||||
}
|
||||
};
|
||||
@@ -207,6 +219,18 @@ impl DriveFileProvider {
|
||||
}
|
||||
debug!("Received None from file request receiver, that means all senders have been dropped. Ending listener");
|
||||
}
|
||||
|
||||
async fn check_and_apply_changes(&mut self) {
|
||||
let changes = self.get_changes().await;
|
||||
if let Ok(changes) = changes {
|
||||
for change in changes {
|
||||
let change_applied_successful = self.process_change(change);
|
||||
if let Err(e) = change_applied_successful {
|
||||
error!("got an error while applying change: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//endregion
|
||||
|
||||
//region request handlers
|
||||
@@ -239,7 +263,7 @@ impl DriveFileProvider {
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("could not find file: {} in {}", name, parent_id);
|
||||
debug!("could not find file: {} in {}", name, parent_id);
|
||||
let response = ProviderResponse::Lookup(None);
|
||||
return send_response!(request, response);
|
||||
}
|
||||
@@ -327,11 +351,11 @@ impl DriveFileProvider {
|
||||
if let Err(e) = wait_res {
|
||||
return send_error_response!(request, e, libc::EIO);
|
||||
}
|
||||
let entry = self.entries.get(file_id).context("could not get entry");
|
||||
if let Err(e) = entry {
|
||||
return send_error_response!(request, e, libc::EIO);
|
||||
}
|
||||
let entry = entry.unwrap();
|
||||
// let entry = self.entries.get(file_id).context("could not get entry");
|
||||
// if let Err(e) = entry {
|
||||
// return send_error_response!(request, e, libc::EIO);
|
||||
// }
|
||||
// let entry = entry.unwrap();
|
||||
let file_handle = self
|
||||
.file_handles
|
||||
.remove(&request.fh)
|
||||
@@ -489,7 +513,7 @@ impl DriveFileProvider {
|
||||
}
|
||||
if !was_applied {
|
||||
let target_path = self.construct_path(&file_id)?;
|
||||
fs::OpenOptions::new()
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
.open(target_path)
|
||||
.await
|
||||
@@ -570,9 +594,9 @@ impl DriveFileProvider {
|
||||
let opened_file = opened_file.unwrap();
|
||||
file_handle.file = Some(opened_file);
|
||||
file_handle.marked_for_open = false;
|
||||
} else {
|
||||
error!("File handle does not have a file");
|
||||
return Err(anyhow!("File handle does not have a file"));
|
||||
// } else {
|
||||
// error!("File handle does not have a file");
|
||||
// return Err(anyhow!("File handle does not have a file"));
|
||||
}
|
||||
Ok(file_handle)
|
||||
}
|
||||
@@ -592,7 +616,7 @@ impl DriveFileProvider {
|
||||
"writing to file at local path: {}",
|
||||
file_handle.path.display()
|
||||
);
|
||||
let file: &mut tokio::fs::File = file;
|
||||
let file: &mut File = file;
|
||||
trace!("seeking position: {}", request.offset);
|
||||
file.seek(SeekFrom::Start(request.offset)).await?;
|
||||
trace!("writing data: {:?}", request.data);
|
||||
@@ -642,7 +666,7 @@ impl DriveFileProvider {
|
||||
trace!("reading to buffer: size: {}", request.size);
|
||||
let size_read = file.read(&mut buf).await?;
|
||||
if size_read != request.size {
|
||||
warn!(
|
||||
debug!(
|
||||
"did not read the targeted size: target size: {}, actual size: {}",
|
||||
request.size, size_read
|
||||
);
|
||||
@@ -670,6 +694,28 @@ impl DriveFileProvider {
|
||||
//endregion
|
||||
|
||||
//region drive helpers
|
||||
#[instrument]
|
||||
async fn get_changes(&mut self) -> Result<Vec<Change>> {
|
||||
if self.last_checked_for_changes + self.allowed_cache_time > SystemTime::now() {
|
||||
debug!("not checking for changes since we already checked recently");
|
||||
return Ok(vec![]);
|
||||
}
|
||||
debug!("checking for changes...");
|
||||
let changes: Result<Vec<Change>> = self
|
||||
.drive
|
||||
.get_changes_since(&mut self.changes_start_token)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Change::try_from)
|
||||
.collect();
|
||||
|
||||
self.last_checked_for_changes = SystemTime::now();
|
||||
debug!(
|
||||
"checked for changes, found {} changes",
|
||||
changes.as_ref().unwrap_or(&Vec::<Change>::new()).len()
|
||||
);
|
||||
changes
|
||||
}
|
||||
|
||||
/// starts a download of the specified file and puts it in the running_requests map
|
||||
///
|
||||
@@ -793,7 +839,7 @@ impl DriveFileProvider {
|
||||
let id = DriveId::from(id);
|
||||
let attr = self.create_file_attr_from_metadata(&entry);
|
||||
if attr.is_err() {
|
||||
error!(
|
||||
warn!(
|
||||
"error while creating FileAttr from metadata: {:?} entry: {:?}",
|
||||
attr, entry
|
||||
);
|
||||
@@ -822,9 +868,9 @@ impl DriveFileProvider {
|
||||
self.entries.insert(id, entry_data);
|
||||
}
|
||||
}
|
||||
for (i, (id, data)) in self.entries.iter().enumerate() {
|
||||
println!("entry {:3} id: {:>40} data: {:?}", i, id, data);
|
||||
}
|
||||
// for (i, (id, data)) in self.entries.iter().enumerate() {
|
||||
// info!("entry {:3} id: {:>40} data: {:?}", i, id, data);
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
fn create_file_attr_from_metadata(&self, metadata: &DriveFileMetadata) -> Result<FileAttr> {
|
||||
@@ -890,6 +936,64 @@ impl DriveFileProvider {
|
||||
}
|
||||
return id;
|
||||
}
|
||||
fn process_change(&mut self, change: Change) -> Result<()> {
|
||||
let id = change.id;
|
||||
let id = self.get_correct_id(id);
|
||||
|
||||
let entry = self.entries.get_mut(&id);
|
||||
if let Some(entry) = entry {
|
||||
match change.kind {
|
||||
ChangeType::Drive(drive) => {
|
||||
todo!("drive changes are not supported yet: {:?}", drive);
|
||||
}
|
||||
ChangeType::File(file_change) => {
|
||||
//TODO: check if local has changes that conflict (content)
|
||||
//TODO: check if the content was changed (checksum) and schedule
|
||||
// a download if it is a local/perm file or mark it for download on next open
|
||||
process_file_change(entry, file_change)?;
|
||||
}
|
||||
ChangeType::Removed => {
|
||||
todo!("remove local file/dir since it was deleted on the remote");
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
} else {
|
||||
todo!("there was a file/dir added on the remote since this ID is unknown")
|
||||
}
|
||||
}
|
||||
}
|
||||
#[instrument]
|
||||
fn process_file_change(entry: &mut FileData, change: DriveFileMetadata) -> Result<()> {
|
||||
if let Some(size) = change.size {
|
||||
entry.metadata.size = Some(size);
|
||||
entry.attr.size = size as u64;
|
||||
//TODO1: set the size of the cached file if necessary
|
||||
}
|
||||
if let Some(name) = change.name {
|
||||
entry.metadata.name = Some(name);
|
||||
}
|
||||
if let Some(parents) = change.parents {
|
||||
if Some(&parents) != entry.metadata.parents.as_ref() {
|
||||
//TODO1: change the parent child relations
|
||||
warn!(
|
||||
"parents changed from {:?}: {:?}",
|
||||
entry.metadata.parents,
|
||||
Some(parents)
|
||||
)
|
||||
}
|
||||
}
|
||||
if let Some(description) = change.description {
|
||||
entry.metadata.description = Some(description);
|
||||
}
|
||||
if let Some(thumbnail_link) = change.thumbnail_link {
|
||||
entry.metadata.thumbnail_link = Some(thumbnail_link);
|
||||
}
|
||||
warn!("not all changes have been implemented");
|
||||
//TODO2: implement all other needed changes!
|
||||
// if let Some() = change.{
|
||||
// entry.metadata. = ;
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_volatile_metadata(metadata: DriveFileMetadata) -> DriveFileMetadata {
|
||||
|
||||
@@ -89,8 +89,8 @@ impl GoogleDrive {
|
||||
.param(
|
||||
"fields",
|
||||
&format!(
|
||||
"changes({}, changeType, removed, fileId, \
|
||||
driveId, drive, time), newStartPageToken, nextPageToken",
|
||||
"changes({}, changeType, removed, fileId, driveId, drive, time),\
|
||||
newStartPageToken, nextPageToken",
|
||||
file_spec
|
||||
),
|
||||
);
|
||||
@@ -344,8 +344,7 @@ async fn download_file_by_id(
|
||||
.param("alt", "media")
|
||||
.doit()
|
||||
.await?;
|
||||
//TODO: bigger files don't get downloaded. it just starts and then hangs at ~1.3MB forever
|
||||
// => check if this still happens
|
||||
|
||||
debug!("download_file_by_id(): response: {:?}", response);
|
||||
debug!("download_file_by_id(): content: {:?}", content);
|
||||
write_body_to_file(response, target_path).await?;
|
||||
|
||||
165
src/lib.rs
165
src/lib.rs
@@ -1,27 +1,27 @@
|
||||
#![allow(dead_code, unused)]
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc::Receiver;
|
||||
use std::time::Duration;
|
||||
// #![allow(dead_code, unused)]
|
||||
|
||||
use fuser::{MountOption, Session, SessionUnmounter};
|
||||
// use nix;
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
// use tokio::io::{AsyncReadExt, stdin};
|
||||
// use tokio::runtime::Runtime;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::field::debug;
|
||||
use tracing::{debug, info};
|
||||
use tokio::{
|
||||
select,
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use prelude::*;
|
||||
|
||||
use crate::config::common_file_filter::CommonFileFilter;
|
||||
use crate::fs::drive::{DriveFileUploader, DriveFilesystem, FileUploaderCommand, SyncSettings};
|
||||
use crate::fs::drive_file_provider::{ProviderCommand, ProviderRequest};
|
||||
use crate::fs::{drive2, drive_file_provider};
|
||||
use crate::google_drive::GoogleDrive;
|
||||
use crate::{
|
||||
config::common_file_filter::CommonFileFilter,
|
||||
fs::drive::{DriveFileUploader, DriveFilesystem, FileUploaderCommand, SyncSettings},
|
||||
fs::drive_file_provider::{ProviderCommand, ProviderRequest},
|
||||
fs::{drive2, drive_file_provider},
|
||||
google_drive::GoogleDrive,
|
||||
};
|
||||
|
||||
pub mod async_helper;
|
||||
pub mod common;
|
||||
@@ -30,14 +30,107 @@ pub mod fs;
|
||||
pub mod google_drive;
|
||||
mod macros;
|
||||
pub mod prelude;
|
||||
//region drive2 full example
|
||||
pub async fn sample_drive2() -> Result<()> {
|
||||
let mountpoint = Path::new("/tmp/fuse/3");
|
||||
let perma_dir = Path::new("/tmp/fuse/2");
|
||||
let cache_dir = get_cache_dir()?;
|
||||
|
||||
let (provider_command_tx, provider_command_rx) = channel(1);
|
||||
let (provider_request_tx, provider_request_rx) = channel(1);
|
||||
|
||||
let (filesystem_handle, unmount_callable) =
|
||||
filesystem_thread_starter(provider_request_tx, mountpoint).await?;
|
||||
let provider_handle = provider_thread_starter(
|
||||
provider_command_rx,
|
||||
provider_request_rx,
|
||||
unmount_callable,
|
||||
cache_dir.path(),
|
||||
perma_dir,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let program_end_handle = ctrl_c_thread_starter().await?;
|
||||
select! {
|
||||
_= filesystem_handle => {
|
||||
info!("filesystem thread finished first!");
|
||||
let x = provider_command_tx.send(ProviderCommand::Stop).await;
|
||||
info!("send stop to provider: {:?}", x);
|
||||
},
|
||||
_= program_end_handle => {
|
||||
info!("filesystem thread finished first!");
|
||||
let x = provider_command_tx.send(ProviderCommand::Stop).await;
|
||||
info!("send stop to provider: {:?}", x);
|
||||
},
|
||||
}
|
||||
provider_handle.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn filesystem_thread_starter(
|
||||
provider_request_tx: Sender<ProviderRequest>,
|
||||
mountpoint: impl Into<&Path>,
|
||||
) -> Result<(JoinHandle<()>, SessionUnmounter)> {
|
||||
let filesystem = drive2::DriveFilesystem::new(provider_request_tx);
|
||||
let mount_options = vec![
|
||||
MountOption::RW, /*TODO: make a start parameter that can change the mount to read only*/
|
||||
];
|
||||
let mut mount = Session::new(filesystem, mountpoint.into(), &mount_options)?;
|
||||
let session_unmounter = mount.unmount_callable();
|
||||
let join_handle = tokio::spawn(async move {
|
||||
let mount_res = mount.run();
|
||||
debug!("mount finished with result: {:?}", mount_res);
|
||||
if let Err(e) = mount_res {
|
||||
error!("mount finished with error: {:?}", e);
|
||||
}
|
||||
});
|
||||
Ok((join_handle, session_unmounter))
|
||||
}
|
||||
|
||||
async fn provider_thread_starter(
|
||||
provider_command_rx: Receiver<ProviderCommand>,
|
||||
provider_request_rx: Receiver<ProviderRequest>,
|
||||
mut unmount_callable: SessionUnmounter,
|
||||
cache_dir: &Path,
|
||||
perma_dir: &Path,
|
||||
) -> Result<JoinHandle<()>> {
|
||||
let drive = GoogleDrive::new().await?;
|
||||
|
||||
let changes_start_token = drive
|
||||
.get_start_page_token()
|
||||
.await
|
||||
.expect("could not initialize the changes api start page token");
|
||||
let mut provider = drive_file_provider::DriveFileProvider::new(
|
||||
drive,
|
||||
cache_dir.to_path_buf(),
|
||||
perma_dir.to_path_buf(),
|
||||
changes_start_token,
|
||||
);
|
||||
|
||||
Ok(tokio::spawn(async move {
|
||||
provider
|
||||
.listen(provider_request_rx, provider_command_rx)
|
||||
.await;
|
||||
unmount_callable.unmount().expect("failed to unmount");
|
||||
}))
|
||||
}
|
||||
async fn ctrl_c_thread_starter() -> Result<JoinHandle<()>> {
|
||||
Ok(tokio::spawn(async move {
|
||||
tokio::signal::ctrl_c()
|
||||
.await
|
||||
.expect("failed to listen for ctrl_c event");
|
||||
|
||||
info!("got signal to end program");
|
||||
}))
|
||||
}
|
||||
//endregion
|
||||
|
||||
//region old examples
|
||||
pub async fn sample_drive2_fs() -> Result<()> {
|
||||
// let mountpoint = "/tmp/fuse/3";
|
||||
let mountpoint = Path::new("/tmp/fuse/3");
|
||||
let perma_dir = "/tmp/fuse/2";
|
||||
use crate::fs::drive2;
|
||||
use crate::fs::drive_file_provider;
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
let cache_dir = get_cache_dir()?;
|
||||
|
||||
@@ -48,12 +141,13 @@ pub async fn sample_drive2_fs() -> Result<()> {
|
||||
debug!("entry: {:?}", entry);
|
||||
}
|
||||
debug!("test!");
|
||||
let (provider_tx, provider_rx) = tokio::sync::mpsc::channel(1);
|
||||
let (provider_tx, provider_rx) = channel(1);
|
||||
let filesystem = drive2::DriveFilesystem::new(provider_tx);
|
||||
let mount_options = vec![MountOption::RW];
|
||||
let mut mount = fuser::Session::new(filesystem, &mountpoint, &mount_options)?;
|
||||
let (command_tx, command_rx) = tokio::sync::mpsc::channel(1);
|
||||
let mut mount = Session::new(filesystem, &mountpoint, &mount_options)?;
|
||||
let mut session_unmounter = mount.unmount_callable();
|
||||
|
||||
let (command_tx, command_rx) = channel(1);
|
||||
let provider_join_handle: JoinHandle<()> = tokio::spawn(drive2_provider(
|
||||
drive,
|
||||
cache_dir.path().to_path_buf(),
|
||||
@@ -61,9 +155,8 @@ pub async fn sample_drive2_fs() -> Result<()> {
|
||||
provider_rx,
|
||||
command_rx,
|
||||
));
|
||||
let mut session_unmounter = mount.unmount_callable();
|
||||
debug!("running mount and listener");
|
||||
tokio::select!(
|
||||
select!(
|
||||
_= async move {mount.run()} => {
|
||||
debug!("mount.run finished first!");
|
||||
let _ = command_tx.send(ProviderCommand::Stop);
|
||||
@@ -88,7 +181,7 @@ pub async fn sample_drive_fs() -> Result<()> {
|
||||
// let source = "/tmp/fuse/2";
|
||||
let drive = GoogleDrive::new().await?;
|
||||
// let file_uploader = FileUploader::new("config/credentials.json", "config/token.json");
|
||||
let (file_uploader_sender, file_uploader_receiver) = mpsc::channel(1);
|
||||
let (file_uploader_sender, file_uploader_receiver) = channel(1);
|
||||
let mut file_uploader = DriveFileUploader::new(
|
||||
drive.clone(),
|
||||
upload_ignore,
|
||||
@@ -168,13 +261,23 @@ async fn drive2_provider(
|
||||
drive: GoogleDrive,
|
||||
cache_dir: PathBuf,
|
||||
perma_dir: PathBuf,
|
||||
provider_rx: tokio::sync::mpsc::Receiver<ProviderRequest>,
|
||||
command_rx: tokio::sync::mpsc::Receiver<ProviderCommand>,
|
||||
provider_rx: Receiver<ProviderRequest>,
|
||||
command_rx: Receiver<ProviderCommand>,
|
||||
) {
|
||||
use std::sync::mpsc::channel;
|
||||
let mut provider = drive_file_provider::DriveFileProvider::new(drive, cache_dir, perma_dir);
|
||||
let changes_start_token = drive
|
||||
.get_start_page_token()
|
||||
.await
|
||||
.expect("could not initialize the changes api start page token");
|
||||
let mut provider = drive_file_provider::DriveFileProvider::new(
|
||||
drive,
|
||||
cache_dir,
|
||||
perma_dir,
|
||||
changes_start_token,
|
||||
);
|
||||
provider.listen(provider_rx, command_rx).await;
|
||||
}
|
||||
//endregion
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
pub fn init_logs() {
|
||||
|
||||
@@ -20,12 +20,12 @@ macro_rules! match_provider_response {
|
||||
#[macro_export]
|
||||
macro_rules! receive_response {
|
||||
($rx: ident, $response: ident, $reply: ident) => {
|
||||
tracing::info!("receiving response");
|
||||
tracing::trace!("receiving response");
|
||||
// let $response = run_async_blocking($rx.recv());
|
||||
|
||||
let sync_code = std::thread::spawn(move || $rx.blocking_recv());
|
||||
let $response = sync_code.join().unwrap();
|
||||
tracing::info!("received response");
|
||||
tracing::trace!("received response");
|
||||
// $rx.close();
|
||||
// tracing::info!("closed receiver");
|
||||
|
||||
@@ -41,7 +41,7 @@ macro_rules! receive_response {
|
||||
#[macro_export]
|
||||
macro_rules! send_request {
|
||||
($tx: expr, $data:ident, $reply: ident) => {
|
||||
tracing::info!("sending request");
|
||||
tracing::trace!("sending request");
|
||||
{
|
||||
let sender = $tx.clone();
|
||||
let send_res = std::thread::spawn(move || sender.blocking_send($data));
|
||||
@@ -53,6 +53,6 @@ macro_rules! send_request {
|
||||
"Failed to send ProviderRequest",
|
||||
);
|
||||
}
|
||||
tracing::info!("sent request");
|
||||
tracing::trace!("sent request");
|
||||
};
|
||||
}
|
||||
|
||||
@@ -22,13 +22,13 @@ macro_rules! send_response {
|
||||
};
|
||||
|
||||
($request:ident, $response:expr) => {{
|
||||
tracing::info!("sending response");
|
||||
tracing::trace!("sending response");
|
||||
let result_send_response = $request.response_sender.send($response).await;
|
||||
if let Err(e) = result_send_response {
|
||||
error!("Failed to send result response: {:?}", e);
|
||||
return Err(anyhow!("Failed to send result response: {:?}", e));
|
||||
}
|
||||
tracing::info!("sent response");
|
||||
tracing::trace!("sent response");
|
||||
Ok(())
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -1,7 +1,3 @@
|
||||
// use tokio::io::AsyncReadExt;
|
||||
// use tracing::instrument::WithSubscriber;
|
||||
use tracing::span;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// drive_syncer::init_logger();
|
||||
@@ -15,7 +11,8 @@ async fn main() {
|
||||
// drive_syncer::sample_fs().await.unwrap();
|
||||
|
||||
// drive_syncer::sample_drive_fs().await.unwrap();
|
||||
drive_syncer::sample_drive2_fs().await.unwrap();
|
||||
// drive_syncer::sample_drive2_fs().await.unwrap();
|
||||
drive_syncer::sample_drive2().await.unwrap();
|
||||
}
|
||||
|
||||
fn init_tracing() {
|
||||
@@ -45,7 +42,7 @@ async fn sample_logging() {
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
info!("info");
|
||||
debug!("debug");
|
||||
let s = span!(
|
||||
let s = tracing::span!(
|
||||
tracing::Level::TRACE,
|
||||
"span around trace and warn with stdin read"
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user