use futures_util::TryStreamExt; use time::OffsetDateTime; use sqlx::{postgres::PgPool, Row}; use std::cmp::max; use std::path::{Path, PathBuf}; use time::ext::NumericalStdDuration; use tokio::fs; use tokio::sync::mpsc::Receiver; use tokio::time::timeout; pub(crate) async fn delete_old_files(mut receiver: Receiver<()>, db: PgPool, files_dir: PathBuf) { loop { wait_for_file_expiry(&mut receiver, &db).await; let now = OffsetDateTime::now_utc(); let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1") .bind(now) .fetch(&db); while let Some(row) = rows.try_next().await.expect("could not load expired files") { let file_id: String = row.try_get("file_id").expect("we selected this column"); delete_content(&file_id, &files_dir) .await .expect("could not delete file"); } sqlx::query("DELETE FROM files WHERE valid_till < $1") .bind(now) .execute(&db) .await .expect("could not delete expired files from database"); } } pub(crate) async fn delete_by_id( db: &PgPool, file_id: &str, files_dir: &Path, ) -> Result<(), sqlx::Error> { delete_content(file_id, files_dir).await?; sqlx::query("DELETE FROM files WHERE file_id = $1") .bind(file_id) .execute(db) .await?; Ok(()) } async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> { let path = files_dir.join(file_id); if fs::remove_file(&path).await.is_ok() { log::info!("delete file {}", file_id); } Ok(()) } async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) { let valid_till: (Option,) = sqlx::query_as("SELECT MIN(valid_till) as min from files") .fetch_one(db) .await .expect("could not fetch expiring files from database"); let next_timeout: std::time::Duration = match valid_till.0 { Some(valid_till) => (max( 0, valid_till.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp(), ) as u64) .std_seconds(), None => 1_u64.std_days(), }; let _ = timeout(next_timeout, receiver.recv()).await; }