feat: remote fs host bridge

This commit is contained in:
veeso
2024-10-05 19:25:20 +02:00
parent 87f9369041
commit abec2d5747
7 changed files with 378 additions and 17 deletions

View File

@@ -61,5 +61,12 @@ pub trait HostBridge {
fn open_file(&mut self, file: &Path) -> HostResult<Box<dyn Read + Send>>;
/// Open file for writing
fn create_file(&mut self, file: &Path) -> HostResult<Box<dyn Write + Send>>;
fn create_file(
&mut self,
file: &Path,
metadata: &Metadata,
) -> HostResult<Box<dyn Write + Send>>;
/// Finalize write operation
fn finalize_write(&mut self, writer: Box<dyn Write + Send>) -> HostResult<()>;
}

View File

@@ -491,7 +491,11 @@ impl HostBridge for Localhost {
}
}
fn create_file(&mut self, file: &Path) -> HostResult<Box<dyn Write + Send>> {
fn create_file(
&mut self,
file: &Path,
_metadata: &Metadata,
) -> HostResult<Box<dyn Write + Send>> {
let file: PathBuf = self.to_path(file);
info!("Opening file {} for write", file.display());
match OpenOptions::new()
@@ -518,6 +522,11 @@ impl HostBridge for Localhost {
}
}
}
fn finalize_write(&mut self, _writer: Box<dyn Write + Send>) -> HostResult<()> {
// no-op
Ok(())
}
}
#[cfg(test)]
@@ -651,7 +660,7 @@ mod tests {
let mut host: Localhost = Localhost::new(PathBuf::from("/dev")).ok().unwrap();
// Create temp file
let file: tempfile::NamedTempFile = create_sample_file();
assert!(host.create_file(file.path()).is_ok());
assert!(host.create_file(file.path(), &Metadata::default()).is_ok());
}
#[test]
@@ -662,7 +671,7 @@ mod tests {
//let mut perms = fs::metadata(file.path())?.permissions();
fs::set_permissions(file.path(), PermissionsExt::from_mode(0o444)).unwrap();
//fs::set_permissions(file.path(), perms)?;
assert!(host.create_file(file.path()).is_err());
assert!(host.create_file(file.path(), &Metadata::default()).is_err());
}
#[cfg(unix)]

View File

@@ -4,6 +4,7 @@
mod bridge;
mod localhost;
mod remote_bridged;
use std::path::{Path, PathBuf};
@@ -48,6 +49,12 @@ pub struct HostError {
path: Option<PathBuf>,
}
impl From<remotefs::RemoteError> for HostError {
fn from(value: remotefs::RemoteError) -> Self {
HostError::from(HostErrorType::RemoteFs(value))
}
}
impl HostError {
/// Instantiates a new HostError
pub(crate) fn new(error: HostErrorType, errno: Option<std::io::Error>, p: &Path) -> Self {

194
src/host/remote_bridged.rs Normal file
View File

@@ -0,0 +1,194 @@
mod temp_mapped_file;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use remotefs::fs::{Metadata, UnixPex};
use remotefs::{File, RemoteError, RemoteErrorType, RemoteFs};
use self::temp_mapped_file::TempMappedFile;
use super::{HostBridge, HostError, HostResult};
struct WriteStreamOp {
path: PathBuf,
metadata: Metadata,
tempfile: TempMappedFile,
}
/// A remote host bridged over the local host
pub struct RemoteBridged {
/// Remote fs client
remote: Box<dyn RemoteFs>,
/// Reminder used to finalize write stream
write_stream_op: Option<WriteStreamOp>,
}
impl RemoteBridged {
fn open_file_from_temp(&mut self, file: &Path) -> HostResult<Box<dyn Read + Send>> {
let mut temp_file = TempMappedFile::new()?;
self.remote
.open_file(file, Box::new(temp_file.clone()))
.map_err(HostError::from)?;
// Sync changes
temp_file.sync()?;
// now return as read
Ok(Box::new(temp_file))
}
}
impl From<Box<dyn RemoteFs>> for RemoteBridged {
fn from(remote: Box<dyn RemoteFs>) -> Self {
RemoteBridged {
remote,
write_stream_op: None,
}
}
}
impl HostBridge for RemoteBridged {
fn pwd(&mut self) -> HostResult<PathBuf> {
todo!()
}
fn change_wrkdir(&mut self, new_dir: &Path) -> HostResult<PathBuf> {
debug!("Changing working directory to {:?}", new_dir);
self.remote.change_dir(new_dir).map_err(HostError::from)
}
fn mkdir_ex(&mut self, dir_name: &Path, ignore_existing: bool) -> HostResult<()> {
debug!("Creating directory {:?}", dir_name);
match self.remote.create_dir(dir_name, UnixPex::from(0o755)) {
Ok(_) => Ok(()),
Err(remotefs::RemoteError {
kind: RemoteErrorType::DirectoryAlreadyExists,
..
}) if ignore_existing => Ok(()),
Err(e) => Err(HostError::from(e)),
}
}
fn remove(&mut self, entry: &File) -> HostResult<()> {
debug!("Removing {:?}", entry.path());
if entry.is_dir() {
self.remote
.remove_dir_all(entry.path())
.map_err(HostError::from)
} else {
self.remote
.remove_file(entry.path())
.map_err(HostError::from)
}
}
fn rename(&mut self, entry: &File, dst_path: &Path) -> HostResult<()> {
debug!("Renaming {:?} to {:?}", entry.path(), dst_path);
self.remote
.mov(entry.path(), dst_path)
.map_err(HostError::from)
}
fn copy(&mut self, entry: &File, dst: &Path) -> HostResult<()> {
debug!("Copying {:?} to {:?}", entry.path(), dst);
self.remote.copy(entry.path(), dst).map_err(HostError::from)
}
fn stat(&mut self, path: &Path) -> HostResult<File> {
debug!("Statting {:?}", path);
self.remote.stat(path).map_err(HostError::from)
}
fn exists(&mut self, path: &Path) -> HostResult<bool> {
debug!("Checking existence of {:?}", path);
self.remote.exists(path).map_err(HostError::from)
}
fn list_dir(&mut self, path: &Path) -> HostResult<Vec<File>> {
debug!("Listing directory {:?}", path);
self.remote.list_dir(path).map_err(HostError::from)
}
fn setstat(&mut self, path: &Path, metadata: &Metadata) -> HostResult<()> {
debug!("Setting metadata for {:?}", path);
self.remote
.setstat(path, metadata.clone())
.map_err(HostError::from)
}
fn exec(&mut self, cmd: &str) -> HostResult<String> {
debug!("Executing command: {}", cmd);
self.remote
.exec(cmd)
.map(|(_, stdout)| stdout)
.map_err(HostError::from)
}
fn symlink(&mut self, src: &Path, dst: &Path) -> HostResult<()> {
debug!("Creating symlink from {:?} to {:?}", src, dst);
self.remote.symlink(src, dst).map_err(HostError::from)
}
fn chmod(&mut self, path: &Path, pex: UnixPex) -> HostResult<()> {
debug!("Changing permissions of {:?} to {:?}", path, pex);
let stat = self.remote.stat(path).map_err(HostError::from)?;
let mut metadata = stat.metadata.clone();
metadata.mode = Some(pex);
self.setstat(path, &metadata)
}
fn open_file(&mut self, file: &Path) -> HostResult<Box<dyn Read + Send>> {
// try to use stream, otherwise download to a temporary file and return a reader
match self.remote.open(file) {
Ok(stream) => Ok(Box::new(stream)),
Err(RemoteError {
kind: RemoteErrorType::UnsupportedFeature,
..
}) => self.open_file_from_temp(file),
Err(e) => Err(HostError::from(e)),
}
}
fn create_file(
&mut self,
file: &Path,
metadata: &Metadata,
) -> HostResult<Box<dyn Write + Send>> {
// try to use stream, otherwise download to a temporary file and return a reader
match self.remote.create(file, metadata) {
Ok(stream) => Ok(Box::new(stream)),
Err(RemoteError {
kind: RemoteErrorType::UnsupportedFeature,
..
}) => {
let tempfile = TempMappedFile::new()?;
self.write_stream_op = Some(WriteStreamOp {
path: file.to_path_buf(),
metadata: metadata.clone(),
tempfile: tempfile.clone(),
});
Ok(Box::new(tempfile))
}
Err(e) => Err(HostError::from(e)),
}
}
fn finalize_write(&mut self, _writer: Box<dyn Write + Send>) -> HostResult<()> {
if let Some(WriteStreamOp {
path,
metadata,
mut tempfile,
}) = self.write_stream_op.take()
{
// sync
tempfile.sync()?;
// write file
self.remote
.create_file(&path, &metadata, Box::new(tempfile))?;
}
Ok(())
}
}

View File

@@ -0,0 +1,120 @@
use std::fs::File;
use std::io::{self, Read, Write};
use std::sync::{Arc, Mutex};
use tempfile::NamedTempFile;
use crate::host::{HostError, HostErrorType, HostResult};
/// A temporary file mapped to a remote file which has been transferred to local
/// and which supports read/write operations
#[derive(Debug, Clone)]
pub struct TempMappedFile {
tempfile: Arc<NamedTempFile>,
handle: Arc<Mutex<Option<File>>>,
}
impl Write for TempMappedFile {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let rc = self.write_hnd()?;
let mut ref_mut = rc.lock().unwrap();
ref_mut.as_mut().unwrap().write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let rc = self.write_hnd()?;
let mut ref_mut = rc.lock().unwrap();
ref_mut.as_mut().unwrap().flush()
}
}
impl Read for TempMappedFile {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let rc = self.read_hnd()?;
let mut ref_mut = rc.lock().unwrap();
ref_mut.as_mut().unwrap().read(buf)
}
}
impl TempMappedFile {
pub fn new() -> HostResult<Self> {
NamedTempFile::new()
.map(|tempfile| TempMappedFile {
tempfile: Arc::new(tempfile),
handle: Arc::new(Mutex::new(None)),
})
.map_err(|e| {
HostError::new(
HostErrorType::CouldNotCreateFile,
Some(e),
std::path::Path::new(""),
)
})
}
/// Syncs the file to disk and frees the file handle.
///
/// Must be called
pub fn sync(&mut self) -> HostResult<()> {
{
let mut lock = self.handle.lock().unwrap();
if let Some(hnd) = lock.take() {
hnd.sync_all().map_err(|e| {
HostError::new(
HostErrorType::FileNotAccessible,
Some(e),
self.tempfile.path(),
)
})?;
}
}
Ok(())
}
fn write_hnd(&mut self) -> io::Result<Arc<Mutex<Option<File>>>> {
{
let mut lock = self.handle.lock().unwrap();
if lock.is_none() {
let hnd = File::create(self.tempfile.path())?;
lock.replace(hnd);
}
}
Ok(self.handle.clone())
}
fn read_hnd(&mut self) -> io::Result<Arc<Mutex<Option<File>>>> {
{
let mut lock = self.handle.lock().unwrap();
if lock.is_none() {
let hnd = File::open(self.tempfile.path())?;
lock.replace(hnd);
}
}
Ok(self.handle.clone())
}
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_should_write_and_read_file() {
let mut file = TempMappedFile::new().unwrap();
file.write_all(b"Hello, World!").unwrap();
file.sync().unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).unwrap();
assert_eq!(buf, b"Hello, World!");
}
}

View File

@@ -6,6 +6,8 @@
use std::fs::File as StdFile;
use std::path::PathBuf;
use remotefs::fs::Metadata;
use super::{File, FileTransferActivity, LogLevel};
impl FileTransferActivity {
@@ -21,19 +23,35 @@ impl FileTransferActivity {
self.log_and_alert(LogLevel::Warn, format!("File \"{input}\" already exists",));
return;
}
// Create file
let file_path: PathBuf = PathBuf::from(input.as_str());
if let Err(err) = self.host.create_file(file_path.as_path()) {
let writer = match self
.host
.create_file(file_path.as_path(), &Metadata::default())
{
Ok(f) => f,
Err(err) => {
self.log_and_alert(
LogLevel::Error,
format!("Could not create file \"{}\": {}", file_path.display(), err),
);
return;
}
};
// finalize write
if let Err(err) = self.host.finalize_write(writer) {
self.log_and_alert(
LogLevel::Error,
format!("Could not create file \"{}\": {}", file_path.display(), err),
);
} else {
self.log(
LogLevel::Info,
format!("Created file \"{}\"", file_path.display()),
format!("Could not write file \"{}\": {}", file_path.display(), err),
);
return;
}
self.log(
LogLevel::Info,
format!("Created file \"{}\"", file_path.display()),
);
}
pub(crate) fn action_remote_newfile(&mut self, input: String) {

View File

@@ -890,13 +890,12 @@ impl FileTransferActivity {
}
// Try to open local file
match self.host.create_file(local) {
Ok(local_file) => {
match self.host.create_file(local, &remote.metadata) {
Ok(writer) => {
// Download file from remote
match self.client.open(remote.path.as_path()) {
Ok(rhnd) => self.filetransfer_recv_one_with_stream(
local, remote, file_name, rhnd, local_file,
),
Ok(rhnd) => self
.filetransfer_recv_one_with_stream(local, remote, file_name, rhnd, writer),
Err(err) if err.kind == RemoteErrorType::UnsupportedFeature => {
self.filetransfer_recv_one_wno_stream(local, remote, file_name)
}
@@ -985,6 +984,12 @@ impl FileTransferActivity {
if self.transfer.aborted() {
return Err(TransferErrorReason::Abrupted);
}
// finalize write
self.host
.finalize_write(writer)
.map_err(TransferErrorReason::HostError)?;
// Apply file mode to file
if let Err(err) = self.host.setstat(local, remote.metadata()) {
self.log(
@@ -1008,6 +1013,7 @@ impl FileTransferActivity {
ByteSize(self.transfer.partial.calc_bytes_per_second()),
),
);
Ok(())
}
@@ -1021,7 +1027,7 @@ impl FileTransferActivity {
// Open local file
let reader = self
.host
.create_file(local)
.create_file(local, &remote.metadata)
.map_err(TransferErrorReason::HostError)
.map(Box::new)?;
// Init transfer