From a13cb385dabe5728902fa3e7e1ba4355d8accba9 Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Thu, 1 Jun 2023 18:33:32 +0200 Subject: [PATCH] restructure threads&tasks, start implementing changes api, cleanup --- src/fs/drive/change.rs | 12 +- src/fs/drive/filesystem.rs | 12 +- src/fs/drive2/filesystem/mod.rs | 15 +- src/fs/drive_file_provider/provider/mod.rs | 208 +++++++++++++++------ src/google_drive/drive.rs | 7 +- src/lib.rs | 165 +++++++++++++--- src/macros/filesystem_side.rs | 8 +- src/macros/provider_side.rs | 4 +- src/main.rs | 9 +- 9 files changed, 319 insertions(+), 121 deletions(-) diff --git a/src/fs/drive/change.rs b/src/fs/drive/change.rs index 68f0548..afcf439 100644 --- a/src/fs/drive/change.rs +++ b/src/fs/drive/change.rs @@ -41,10 +41,9 @@ impl ChangeType { #[derive(Debug)] pub struct Change { - pub drive_id: DriveId, + pub id: DriveId, pub kind: ChangeType, pub time: DateTime, - pub removed: bool, } impl TryFrom for Change { @@ -52,13 +51,9 @@ impl TryFrom for Change { #[instrument] fn try_from(drive_change: DriveChange) -> anyhow::Result { let removed = drive_change.removed.unwrap_or(false); - let drive_id = drive_change.file_id.context("drive_id is missing"); - if let Err(e) = drive_id { - error!("drive_id is missing: {:?}", e); - return Err(anyhow!("drive_id is missing: {:?}", e)); - } + let drive_id = drive_change.file_id.context("file_id is missing")?; Ok(Self { - drive_id: DriveId::from(drive_id?), + id: DriveId::from(drive_id), kind: ChangeType::from_drive_change( drive_change.change_type, drive_change.file, @@ -66,7 +61,6 @@ impl TryFrom for Change { removed, )?, time: drive_change.time.context("time is missing")?, - removed, }) } } diff --git a/src/fs/drive/filesystem.rs b/src/fs/drive/filesystem.rs index 06ddc36..25f71ea 100644 --- a/src/fs/drive/filesystem.rs +++ b/src/fs/drive/filesystem.rs @@ -517,12 +517,12 @@ impl DriveFilesystem { ChangeType::Drive(drive) => { warn!("im not sure how to handle drive changes: {:?}", drive); - updated_entries.push(change.drive_id); + updated_entries.push(change.id); continue; } ChangeType::File(file) => { debug!("file change: {:?}", file); - let drive_id = &change.drive_id; + let drive_id = &change.id; let entry = self.entries.get_mut(drive_id); if let Some(entry) = entry { @@ -533,20 +533,20 @@ impl DriveFilesystem { let change_successful = Self::update_entry_metadata(file, entry); if let Err(e) = change_successful { warn!("got an err while update entry metadata: {}", e); - updated_entries.push(change.drive_id); + updated_entries.push(change.id); continue; } } - updated_entries.push(change.drive_id); + updated_entries.push(change.id); debug!("processed change"); continue; } ChangeType::Removed => { debug!("removing entry: {:?}", change); //TODO: actually delete the entry - self.remove_entry(&change.drive_id)?; - updated_entries.push(change.drive_id); + self.remove_entry(&change.id)?; + updated_entries.push(change.id); continue; } } diff --git a/src/fs/drive2/filesystem/mod.rs b/src/fs/drive2/filesystem/mod.rs index e674be7..1500c24 100644 --- a/src/fs/drive2/filesystem/mod.rs +++ b/src/fs/drive2/filesystem/mod.rs @@ -1,7 +1,10 @@ -use std::collections::HashMap; -use std::ffi::OsStr; -use std::fmt::{Display, Formatter}; -use std::time::{Duration, SystemTime}; +use std::{ + collections::HashMap, + ffi::OsStr, + fmt::{Display, Formatter}, + sync::mpsc::{channel, Receiver, Sender}, + time::{Duration, SystemTime}, +}; use anyhow::{anyhow, Context}; use bimap::BiMap; @@ -10,10 +13,8 @@ use fuser::{ ReplyEntry, ReplyOpen, ReplyWrite, Request, TimeOrNow, }; use libc::c_int; -use std::sync::mpsc::{channel, Receiver, Sender}; use tokio::fs::File; -use tracing::field::debug; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, error, field::debug, instrument, trace}; pub use handle_flags::HandleFlags; diff --git a/src/fs/drive_file_provider/provider/mod.rs b/src/fs/drive_file_provider/provider/mod.rs index 46f3c4b..6ff3857 100644 --- a/src/fs/drive_file_provider/provider/mod.rs +++ b/src/fs/drive_file_provider/provider/mod.rs @@ -1,35 +1,37 @@ -use std::collections::HashMap; -use std::ffi::{OsStr, OsString}; -use std::fmt::{Debug, Formatter}; -use std::fs::Permissions; -use std::io::SeekFrom; -use std::os::unix::prelude::{MetadataExt, PermissionsExt}; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; - -use anyhow::{anyhow, Context, Error}; -use fuser::{FileAttr, FileType}; -use libc::c_int; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; -use tokio::time::sleep; -use tokio::{fs, join}; -use tracing::{debug, error, info, instrument, trace, warn}; - -use crate::fs::drive2::HandleFlags; -use crate::fs::drive_file_provider::{ - FileMetadata, ProviderLookupRequest, ProviderMetadataRequest, ProviderOpenFileRequest, - ProviderReadContentRequest, ProviderReadDirRequest, ProviderReadDirResponse, - ProviderReleaseFileRequest, ProviderRequest, ProviderRequestStruct, ProviderResponse, - ProviderSetAttrRequest, ProviderWriteContentRequest, +use std::{ + collections::HashMap, + fmt::{Debug, Formatter}, + io::SeekFrom, + os::unix::prelude::MetadataExt, + path::PathBuf, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use anyhow::{anyhow, Context}; +use fuser::{FileAttr, FileType}; +use google_drive3::api::StartPageToken; +use tokio::{ + fs, + fs::{File, OpenOptions}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, +}; +use tracing::{debug, error, instrument, trace, warn}; + +use crate::{ + fs::drive::{Change, ChangeType}, + fs::drive2::HandleFlags, + fs::drive_file_provider::{ + FileMetadata, ProviderLookupRequest, ProviderMetadataRequest, ProviderOpenFileRequest, + ProviderReadContentRequest, ProviderReadDirRequest, ProviderReadDirResponse, + ProviderReleaseFileRequest, ProviderRequest, ProviderResponse, ProviderSetAttrRequest, + ProviderWriteContentRequest, + }, + google_drive::{DriveId, GoogleDrive}, + prelude::*, + send_error_response, send_response, }; -use crate::google_drive::{DriveId, GoogleDrive}; -use crate::prelude::*; -use crate::{send_error_response, send_response}; #[derive(Debug)] pub enum ProviderCommand { @@ -80,6 +82,10 @@ pub struct DriveFileProvider { file_handles: HashMap, next_fh: u64, + + changes_start_token: StartPageToken, + last_checked_for_changes: SystemTime, + allowed_cache_time: Duration, } impl Debug for DriveFileProvider { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -100,6 +106,7 @@ impl DriveFileProvider { drive: GoogleDrive, cache_dir: PathBuf, perma_dir: PathBuf, + changes_start_token: StartPageToken, // file_request_receiver: std::sync::mpsc::Receiver, ) -> Self { Self { @@ -114,6 +121,10 @@ impl DriveFileProvider { children: HashMap::new(), file_handles: HashMap::new(), next_fh: 111, + + changes_start_token, + last_checked_for_changes: SystemTime::UNIX_EPOCH, + allowed_cache_time: Duration::from_secs(10), } } fn add_parent_child_relation(&mut self, parent_id: DriveId, child_id: DriveId) { @@ -172,10 +183,7 @@ impl DriveFileProvider { // //TODO: implement waiting for the stop signal instead of just waiting for 10 days } #[instrument(skip(self, rx))] - pub async fn listen_for_file_requests( - &mut self, - rx: tokio::sync::mpsc::Receiver, - ) { + pub async fn listen_for_file_requests(&mut self, rx: Receiver) { debug!("initializing entries"); let init_res = self.initialize_entries().await; if let Err(e) = init_res { @@ -186,6 +194,7 @@ impl DriveFileProvider { let mut rx = rx; while let Some(file_request) = rx.recv().await { debug!("got file request: {:?}", file_request); + self.check_and_apply_changes().await; let result = match file_request { ProviderRequest::OpenFile(r) => self.open_file(r).await, ProviderRequest::ReleaseFile(r) => self.release_file(r).await, @@ -196,7 +205,10 @@ impl DriveFileProvider { ProviderRequest::Lookup(r) => self.lookup(r).await, ProviderRequest::SetAttr(r) => self.set_attr(r).await, _ => { - error!("DriveFileProvider::listen_for_file_requests() received unknown request: {:?}", file_request); + error!( + "DriveFileProvider::listen_for_file_requests() received unknown request: {:?}", + file_request + ); todo!("handle this unknown request") } }; @@ -207,6 +219,18 @@ impl DriveFileProvider { } debug!("Received None from file request receiver, that means all senders have been dropped. Ending listener"); } + + async fn check_and_apply_changes(&mut self) { + let changes = self.get_changes().await; + if let Ok(changes) = changes { + for change in changes { + let change_applied_successful = self.process_change(change); + if let Err(e) = change_applied_successful { + error!("got an error while applying change: {:?}", e); + } + } + } + } //endregion //region request handlers @@ -239,7 +263,7 @@ impl DriveFileProvider { } } } - info!("could not find file: {} in {}", name, parent_id); + debug!("could not find file: {} in {}", name, parent_id); let response = ProviderResponse::Lookup(None); return send_response!(request, response); } @@ -327,11 +351,11 @@ impl DriveFileProvider { if let Err(e) = wait_res { return send_error_response!(request, e, libc::EIO); } - let entry = self.entries.get(file_id).context("could not get entry"); - if let Err(e) = entry { - return send_error_response!(request, e, libc::EIO); - } - let entry = entry.unwrap(); + // let entry = self.entries.get(file_id).context("could not get entry"); + // if let Err(e) = entry { + // return send_error_response!(request, e, libc::EIO); + // } + // let entry = entry.unwrap(); let file_handle = self .file_handles .remove(&request.fh) @@ -489,7 +513,7 @@ impl DriveFileProvider { } if !was_applied { let target_path = self.construct_path(&file_id)?; - fs::OpenOptions::new() + OpenOptions::new() .write(true) .open(target_path) .await @@ -570,9 +594,9 @@ impl DriveFileProvider { let opened_file = opened_file.unwrap(); file_handle.file = Some(opened_file); file_handle.marked_for_open = false; - } else { - error!("File handle does not have a file"); - return Err(anyhow!("File handle does not have a file")); + // } else { + // error!("File handle does not have a file"); + // return Err(anyhow!("File handle does not have a file")); } Ok(file_handle) } @@ -592,7 +616,7 @@ impl DriveFileProvider { "writing to file at local path: {}", file_handle.path.display() ); - let file: &mut tokio::fs::File = file; + let file: &mut File = file; trace!("seeking position: {}", request.offset); file.seek(SeekFrom::Start(request.offset)).await?; trace!("writing data: {:?}", request.data); @@ -642,7 +666,7 @@ impl DriveFileProvider { trace!("reading to buffer: size: {}", request.size); let size_read = file.read(&mut buf).await?; if size_read != request.size { - warn!( + debug!( "did not read the targeted size: target size: {}, actual size: {}", request.size, size_read ); @@ -670,6 +694,28 @@ impl DriveFileProvider { //endregion //region drive helpers + #[instrument] + async fn get_changes(&mut self) -> Result> { + if self.last_checked_for_changes + self.allowed_cache_time > SystemTime::now() { + debug!("not checking for changes since we already checked recently"); + return Ok(vec![]); + } + debug!("checking for changes..."); + let changes: Result> = self + .drive + .get_changes_since(&mut self.changes_start_token) + .await? + .into_iter() + .map(Change::try_from) + .collect(); + + self.last_checked_for_changes = SystemTime::now(); + debug!( + "checked for changes, found {} changes", + changes.as_ref().unwrap_or(&Vec::::new()).len() + ); + changes + } /// starts a download of the specified file and puts it in the running_requests map /// @@ -793,7 +839,7 @@ impl DriveFileProvider { let id = DriveId::from(id); let attr = self.create_file_attr_from_metadata(&entry); if attr.is_err() { - error!( + warn!( "error while creating FileAttr from metadata: {:?} entry: {:?}", attr, entry ); @@ -822,9 +868,9 @@ impl DriveFileProvider { self.entries.insert(id, entry_data); } } - for (i, (id, data)) in self.entries.iter().enumerate() { - println!("entry {:3} id: {:>40} data: {:?}", i, id, data); - } + // for (i, (id, data)) in self.entries.iter().enumerate() { + // info!("entry {:3} id: {:>40} data: {:?}", i, id, data); + // } Ok(()) } fn create_file_attr_from_metadata(&self, metadata: &DriveFileMetadata) -> Result { @@ -890,6 +936,64 @@ impl DriveFileProvider { } return id; } + fn process_change(&mut self, change: Change) -> Result<()> { + let id = change.id; + let id = self.get_correct_id(id); + + let entry = self.entries.get_mut(&id); + if let Some(entry) = entry { + match change.kind { + ChangeType::Drive(drive) => { + todo!("drive changes are not supported yet: {:?}", drive); + } + ChangeType::File(file_change) => { + //TODO: check if local has changes that conflict (content) + //TODO: check if the content was changed (checksum) and schedule + // a download if it is a local/perm file or mark it for download on next open + process_file_change(entry, file_change)?; + } + ChangeType::Removed => { + todo!("remove local file/dir since it was deleted on the remote"); + } + } + return Ok(()); + } else { + todo!("there was a file/dir added on the remote since this ID is unknown") + } + } +} +#[instrument] +fn process_file_change(entry: &mut FileData, change: DriveFileMetadata) -> Result<()> { + if let Some(size) = change.size { + entry.metadata.size = Some(size); + entry.attr.size = size as u64; + //TODO1: set the size of the cached file if necessary + } + if let Some(name) = change.name { + entry.metadata.name = Some(name); + } + if let Some(parents) = change.parents { + if Some(&parents) != entry.metadata.parents.as_ref() { + //TODO1: change the parent child relations + warn!( + "parents changed from {:?}: {:?}", + entry.metadata.parents, + Some(parents) + ) + } + } + if let Some(description) = change.description { + entry.metadata.description = Some(description); + } + if let Some(thumbnail_link) = change.thumbnail_link { + entry.metadata.thumbnail_link = Some(thumbnail_link); + } + warn!("not all changes have been implemented"); + //TODO2: implement all other needed changes! + // if let Some() = change.{ + // entry.metadata. = ; + // } + Ok(()) } fn remove_volatile_metadata(metadata: DriveFileMetadata) -> DriveFileMetadata { diff --git a/src/google_drive/drive.rs b/src/google_drive/drive.rs index 8e1b86c..4d30ac0 100644 --- a/src/google_drive/drive.rs +++ b/src/google_drive/drive.rs @@ -89,8 +89,8 @@ impl GoogleDrive { .param( "fields", &format!( - "changes({}, changeType, removed, fileId, \ - driveId, drive, time), newStartPageToken, nextPageToken", + "changes({}, changeType, removed, fileId, driveId, drive, time),\ + newStartPageToken, nextPageToken", file_spec ), ); @@ -344,8 +344,7 @@ async fn download_file_by_id( .param("alt", "media") .doit() .await?; - //TODO: bigger files don't get downloaded. it just starts and then hangs at ~1.3MB forever - // => check if this still happens + debug!("download_file_by_id(): response: {:?}", response); debug!("download_file_by_id(): content: {:?}", content); write_body_to_file(response, target_path).await?; diff --git a/src/lib.rs b/src/lib.rs index aa248ae..9a81d52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,27 +1,27 @@ -#![allow(dead_code, unused)] - -use std::path::{Path, PathBuf}; -use std::sync::mpsc::Receiver; -use std::time::Duration; +// #![allow(dead_code, unused)] use fuser::{MountOption, Session, SessionUnmounter}; -// use nix; +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; use tempfile::TempDir; -// use tokio::io::{AsyncReadExt, stdin}; -// use tokio::runtime::Runtime; -use tokio::sync::mpsc; -use tokio::sync::mpsc::{channel, Sender}; -use tokio::task::JoinHandle; -use tracing::field::debug; -use tracing::{debug, info}; +use tokio::{ + select, + sync::mpsc::{channel, Receiver, Sender}, + task::JoinHandle, +}; +use tracing::{debug, error, info}; use prelude::*; -use crate::config::common_file_filter::CommonFileFilter; -use crate::fs::drive::{DriveFileUploader, DriveFilesystem, FileUploaderCommand, SyncSettings}; -use crate::fs::drive_file_provider::{ProviderCommand, ProviderRequest}; -use crate::fs::{drive2, drive_file_provider}; -use crate::google_drive::GoogleDrive; +use crate::{ + config::common_file_filter::CommonFileFilter, + fs::drive::{DriveFileUploader, DriveFilesystem, FileUploaderCommand, SyncSettings}, + fs::drive_file_provider::{ProviderCommand, ProviderRequest}, + fs::{drive2, drive_file_provider}, + google_drive::GoogleDrive, +}; pub mod async_helper; pub mod common; @@ -30,14 +30,107 @@ pub mod fs; pub mod google_drive; mod macros; pub mod prelude; +//region drive2 full example +pub async fn sample_drive2() -> Result<()> { + let mountpoint = Path::new("/tmp/fuse/3"); + let perma_dir = Path::new("/tmp/fuse/2"); + let cache_dir = get_cache_dir()?; + let (provider_command_tx, provider_command_rx) = channel(1); + let (provider_request_tx, provider_request_rx) = channel(1); + + let (filesystem_handle, unmount_callable) = + filesystem_thread_starter(provider_request_tx, mountpoint).await?; + let provider_handle = provider_thread_starter( + provider_command_rx, + provider_request_rx, + unmount_callable, + cache_dir.path(), + perma_dir, + ) + .await?; + + let program_end_handle = ctrl_c_thread_starter().await?; + select! { + _= filesystem_handle => { + info!("filesystem thread finished first!"); + let x = provider_command_tx.send(ProviderCommand::Stop).await; + info!("send stop to provider: {:?}", x); + }, + _= program_end_handle => { + info!("filesystem thread finished first!"); + let x = provider_command_tx.send(ProviderCommand::Stop).await; + info!("send stop to provider: {:?}", x); + }, + } + provider_handle.await?; + + Ok(()) +} + +async fn filesystem_thread_starter( + provider_request_tx: Sender, + mountpoint: impl Into<&Path>, +) -> Result<(JoinHandle<()>, SessionUnmounter)> { + let filesystem = drive2::DriveFilesystem::new(provider_request_tx); + let mount_options = vec![ + MountOption::RW, /*TODO: make a start parameter that can change the mount to read only*/ + ]; + let mut mount = Session::new(filesystem, mountpoint.into(), &mount_options)?; + let session_unmounter = mount.unmount_callable(); + let join_handle = tokio::spawn(async move { + let mount_res = mount.run(); + debug!("mount finished with result: {:?}", mount_res); + if let Err(e) = mount_res { + error!("mount finished with error: {:?}", e); + } + }); + Ok((join_handle, session_unmounter)) +} + +async fn provider_thread_starter( + provider_command_rx: Receiver, + provider_request_rx: Receiver, + mut unmount_callable: SessionUnmounter, + cache_dir: &Path, + perma_dir: &Path, +) -> Result> { + let drive = GoogleDrive::new().await?; + + let changes_start_token = drive + .get_start_page_token() + .await + .expect("could not initialize the changes api start page token"); + let mut provider = drive_file_provider::DriveFileProvider::new( + drive, + cache_dir.to_path_buf(), + perma_dir.to_path_buf(), + changes_start_token, + ); + + Ok(tokio::spawn(async move { + provider + .listen(provider_request_rx, provider_command_rx) + .await; + unmount_callable.unmount().expect("failed to unmount"); + })) +} +async fn ctrl_c_thread_starter() -> Result> { + Ok(tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl_c event"); + + info!("got signal to end program"); + })) +} +//endregion + +//region old examples pub async fn sample_drive2_fs() -> Result<()> { // let mountpoint = "/tmp/fuse/3"; let mountpoint = Path::new("/tmp/fuse/3"); let perma_dir = "/tmp/fuse/2"; - use crate::fs::drive2; - use crate::fs::drive_file_provider; - use std::sync::mpsc::channel; let cache_dir = get_cache_dir()?; @@ -48,12 +141,13 @@ pub async fn sample_drive2_fs() -> Result<()> { debug!("entry: {:?}", entry); } debug!("test!"); - let (provider_tx, provider_rx) = tokio::sync::mpsc::channel(1); + let (provider_tx, provider_rx) = channel(1); let filesystem = drive2::DriveFilesystem::new(provider_tx); let mount_options = vec![MountOption::RW]; - let mut mount = fuser::Session::new(filesystem, &mountpoint, &mount_options)?; - let (command_tx, command_rx) = tokio::sync::mpsc::channel(1); + let mut mount = Session::new(filesystem, &mountpoint, &mount_options)?; + let mut session_unmounter = mount.unmount_callable(); + let (command_tx, command_rx) = channel(1); let provider_join_handle: JoinHandle<()> = tokio::spawn(drive2_provider( drive, cache_dir.path().to_path_buf(), @@ -61,9 +155,8 @@ pub async fn sample_drive2_fs() -> Result<()> { provider_rx, command_rx, )); - let mut session_unmounter = mount.unmount_callable(); debug!("running mount and listener"); - tokio::select!( + select!( _= async move {mount.run()} => { debug!("mount.run finished first!"); let _ = command_tx.send(ProviderCommand::Stop); @@ -88,7 +181,7 @@ pub async fn sample_drive_fs() -> Result<()> { // let source = "/tmp/fuse/2"; let drive = GoogleDrive::new().await?; // let file_uploader = FileUploader::new("config/credentials.json", "config/token.json"); - let (file_uploader_sender, file_uploader_receiver) = mpsc::channel(1); + let (file_uploader_sender, file_uploader_receiver) = channel(1); let mut file_uploader = DriveFileUploader::new( drive.clone(), upload_ignore, @@ -168,13 +261,23 @@ async fn drive2_provider( drive: GoogleDrive, cache_dir: PathBuf, perma_dir: PathBuf, - provider_rx: tokio::sync::mpsc::Receiver, - command_rx: tokio::sync::mpsc::Receiver, + provider_rx: Receiver, + command_rx: Receiver, ) { - use std::sync::mpsc::channel; - let mut provider = drive_file_provider::DriveFileProvider::new(drive, cache_dir, perma_dir); + let changes_start_token = drive + .get_start_page_token() + .await + .expect("could not initialize the changes api start page token"); + let mut provider = drive_file_provider::DriveFileProvider::new( + drive, + cache_dir, + perma_dir, + changes_start_token, + ); provider.listen(provider_rx, command_rx).await; } +//endregion + #[cfg(test)] pub mod tests { pub fn init_logs() { diff --git a/src/macros/filesystem_side.rs b/src/macros/filesystem_side.rs index 6c47da1..bead641 100644 --- a/src/macros/filesystem_side.rs +++ b/src/macros/filesystem_side.rs @@ -20,12 +20,12 @@ macro_rules! match_provider_response { #[macro_export] macro_rules! receive_response { ($rx: ident, $response: ident, $reply: ident) => { - tracing::info!("receiving response"); + tracing::trace!("receiving response"); // let $response = run_async_blocking($rx.recv()); let sync_code = std::thread::spawn(move || $rx.blocking_recv()); let $response = sync_code.join().unwrap(); - tracing::info!("received response"); + tracing::trace!("received response"); // $rx.close(); // tracing::info!("closed receiver"); @@ -41,7 +41,7 @@ macro_rules! receive_response { #[macro_export] macro_rules! send_request { ($tx: expr, $data:ident, $reply: ident) => { - tracing::info!("sending request"); + tracing::trace!("sending request"); { let sender = $tx.clone(); let send_res = std::thread::spawn(move || sender.blocking_send($data)); @@ -53,6 +53,6 @@ macro_rules! send_request { "Failed to send ProviderRequest", ); } - tracing::info!("sent request"); + tracing::trace!("sent request"); }; } diff --git a/src/macros/provider_side.rs b/src/macros/provider_side.rs index eb15c3e..2d5cf2a 100644 --- a/src/macros/provider_side.rs +++ b/src/macros/provider_side.rs @@ -22,13 +22,13 @@ macro_rules! send_response { }; ($request:ident, $response:expr) => {{ - tracing::info!("sending response"); + tracing::trace!("sending response"); let result_send_response = $request.response_sender.send($response).await; if let Err(e) = result_send_response { error!("Failed to send result response: {:?}", e); return Err(anyhow!("Failed to send result response: {:?}", e)); } - tracing::info!("sent response"); + tracing::trace!("sent response"); Ok(()) }}; } diff --git a/src/main.rs b/src/main.rs index e15966c..7cc0c40 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,3 @@ -// use tokio::io::AsyncReadExt; -// use tracing::instrument::WithSubscriber; -use tracing::span; - #[tokio::main] async fn main() { // drive_syncer::init_logger(); @@ -15,7 +11,8 @@ async fn main() { // drive_syncer::sample_fs().await.unwrap(); // drive_syncer::sample_drive_fs().await.unwrap(); - drive_syncer::sample_drive2_fs().await.unwrap(); + // drive_syncer::sample_drive2_fs().await.unwrap(); + drive_syncer::sample_drive2().await.unwrap(); } fn init_tracing() { @@ -45,7 +42,7 @@ async fn sample_logging() { use tracing::{debug, error, info, trace, warn}; info!("info"); debug!("debug"); - let s = span!( + let s = tracing::span!( tracing::Level::TRACE, "span around trace and warn with stdin read" );