From fbb8f5a7c56b0d126aabda5ebef1a69b67d65613 Mon Sep 17 00:00:00 2001 From: neri Date: Wed, 24 May 2023 10:53:07 +0200 Subject: [PATCH] refactor: use results for deletion failures --- src/deleter.rs | 64 +++++++++++++++++++++++++++++++++++++++++--------- src/main.rs | 6 +++-- 2 files changed, 57 insertions(+), 13 deletions(-) diff --git a/src/deleter.rs b/src/deleter.rs index da738d7..1c3c986 100644 --- a/src/deleter.rs +++ b/src/deleter.rs @@ -1,6 +1,8 @@ use futures_util::TryStreamExt; use sqlx::{postgres::PgPool, Row}; use std::cmp::max; +use std::error::Error; +use std::fmt::Display; use std::path::{Path, PathBuf}; use time::ext::NumericalStdDuration; use time::OffsetDateTime; @@ -8,26 +10,27 @@ use tokio::fs; use tokio::sync::mpsc::Receiver; use tokio::time::timeout; -pub(crate) async fn delete_old_files(mut receiver: Receiver<()>, db: PgPool, files_dir: PathBuf) { +pub(crate) async fn delete_old_files( + mut receiver: Receiver<()>, + db: PgPool, + files_dir: PathBuf, +) -> Result<(), DeletionError> { loop { - wait_for_file_expiry(&mut receiver, &db).await; + wait_for_file_expiry(&mut receiver, &db).await?; let now = OffsetDateTime::now_utc(); let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1") .bind(now) .fetch(&db); - while let Some(row) = rows.try_next().await.expect("could not load expired files") { + while let Some(row) = rows.try_next().await? { let file_id: String = row.try_get("file_id").expect("we selected this column"); - delete_content(&file_id, &files_dir) - .await - .expect("could not delete file"); + delete_content(&file_id, &files_dir).await? } sqlx::query("DELETE FROM files WHERE valid_till < $1") .bind(now) .execute(&db) - .await - .expect("could not delete expired files from database"); + .await?; } } @@ -55,12 +58,14 @@ async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io:: Ok(()) } -async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) { +async fn wait_for_file_expiry( + receiver: &mut Receiver<()>, + db: &PgPool, +) -> Result<(), DeletionError> { let valid_till: (Option,) = sqlx::query_as("SELECT MIN(valid_till) as min from files") .fetch_one(db) - .await - .expect("could not fetch expiring files from database"); + .await?; let next_timeout = match valid_till.0 { Some(valid_till) => (max( 0, @@ -70,4 +75,41 @@ async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) { None => 1_u64.std_days(), }; let _ = timeout(next_timeout, receiver.recv()).await; + Ok(()) +} + +#[derive(Debug)] +pub enum DeletionError { + Db(sqlx::Error), + Fs(std::io::Error), +} + +impl From for DeletionError { + fn from(value: sqlx::Error) -> Self { + DeletionError::Db(value) + } +} + +impl From for DeletionError { + fn from(value: std::io::Error) -> Self { + DeletionError::Fs(value) + } +} + +impl Display for DeletionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DeletionError::Db(_) => write!(f, "Failed to fetch expired files from database"), + DeletionError::Fs(_) => write!(f, "Failed to delete file from filesystem"), + } + } +} + +impl Error for DeletionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + DeletionError::Db(err) => Some(err), + DeletionError::Fs(err) => Some(err), + } + } } diff --git a/src/main.rs b/src/main.rs index 605317d..a01f416 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,9 +123,11 @@ async fn main() -> std::io::Result<()> { .bind(bind_address)? .run(); - // exit when http_server exits OR when deleter panics + // exit when http_server exits OR when deleter errors tokio::select! { result = http_server => result, - _ = deleter => panic!("deleter never returns") + result = deleter => { + result?.map(|_| unreachable!("deletion runs infinitely")).expect("deletion may not fail") + }, } }