use async_std::{ channel::Receiver, fs, path::{Path, PathBuf}, task, }; use chrono::{prelude::*, Duration}; use futures::{future::FutureExt, TryStreamExt}; use sqlx::{postgres::PgPool, Row}; pub(crate) async fn delete_old_files(receiver: Receiver<()>, db: PgPool, files_dir: PathBuf) { loop { wait_for_file_expiry(&receiver, &db).await; let now = Local::now().naive_local(); let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1") .bind(now) .fetch(&db); while let Some(row) = rows.try_next().await.expect("could not load expired files") { let file_id: String = row.try_get("file_id").expect("we selected this column"); delete_content(&file_id, &files_dir) .await .expect("could not delete file"); } sqlx::query("DELETE FROM files WHERE valid_till < $1") .bind(now) .execute(&db) .await .expect("could not delete expired files from database"); } } pub(crate) async fn delete_by_id( db: &PgPool, file_id: &str, files_dir: &Path, ) -> Result<(), sqlx::Error> { delete_content(file_id, &files_dir).await?; sqlx::query("DELETE FROM files WHERE file_id = $1") .bind(file_id) .execute(db) .await?; Ok(()) } async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> { let path = files_dir.join(file_id); if path.exists().await { log::info!("delete file {}", file_id); fs::remove_file(&path).await?; } Ok(()) } async fn wait_for_file_expiry(receiver: &Receiver<()>, db: &PgPool) { let mut rows = sqlx::query("SELECT MIN(valid_till) as min from files").fetch(db); let row = rows .try_next() .await .expect("could not fetch expiring files from database") .expect("postgres min did not return any row"); let valid_till: Option = row.get("min"); let next_timeout = match valid_till { Some(valid_till) => valid_till.signed_duration_since(Local::now().naive_local()), None => Duration::days(1), }; let positive_timeout = next_timeout .to_std() .unwrap_or_else(|_| std::time::Duration::from_secs(0)); futures::select! { _ = task::sleep(positive_timeout).fuse() => {} _ = receiver.recv().fuse() => {} } }