From 827baf8eec1bbe5cabc95656501ccad599184c12 Mon Sep 17 00:00:00 2001 From: neri Date: Thu, 9 Jul 2020 19:27:24 +0200 Subject: [PATCH] finish it i guess --- Cargo.lock | 1 - Cargo.toml | 2 +- src/deleter.rs | 45 ++++++++ src/file_kind.rs | 28 +++++ src/main.rs | 181 +++++++++++++++++++------------ src/multipart.rs | 119 ++++++++++++++++++-- static/index.css | 28 +++++ static/index.html | 18 --- template/index.html | 35 ++++++ {static => template}/upload.html | 3 +- template/view.html | 14 +++ 11 files changed, 373 insertions(+), 101 deletions(-) create mode 100644 src/deleter.rs create mode 100644 src/file_kind.rs delete mode 100644 static/index.html create mode 100644 template/index.html rename {static => template}/upload.html (82%) create mode 100644 template/view.html diff --git a/Cargo.lock b/Cargo.lock index 2918273..633e049 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,7 +76,6 @@ dependencies = [ "derive_more", "either", "encoding_rs", - "failure", "flate2", "futures-channel", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 6bba9f2..c756f00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-web = "2.0.0" +actix-web = { version = "2.0.0", default-features = false, features = [ "compress" ] } sqlx = { version = "0.3.5", default-features = false, features = [ "runtime-async-std", "macros", "postgres", "chrono" ] } actix-rt = "1.1.1" env_logger = "0.7.1" diff --git a/src/deleter.rs b/src/deleter.rs new file mode 100644 index 0000000..90042e9 --- /dev/null +++ b/src/deleter.rs @@ -0,0 +1,45 @@ +use async_std::{fs, path::PathBuf, sync::Receiver, task}; +use chrono::{prelude::*, Duration}; +use futures::future::FutureExt; +use sqlx::postgres::PgPool; + +pub(crate) async fn delete_old_files(receiver: Receiver<()>, db: PgPool) { + loop { + wait_for_file_expiry(&receiver, &db).await; + let now = Local::now().naive_local(); + let expired_files = + sqlx::query!("SELECT file_id FROM files WHERE files.valid_till < $1", now) + .fetch_all(&db) + .await + .unwrap(); + for expired_file in expired_files { + let path = PathBuf::from(&format!("files/{}", expired_file.file_id)); + if path.exists().await { + log::info!("delete file {}", expired_file.file_id); + fs::remove_file(&path).await.expect("could not delete file"); + } + } + sqlx::query!("DELETE FROM files WHERE valid_till < $1", now) + .execute(&db) + .await + .expect("could not delete expired files from database"); + } +} + +async fn wait_for_file_expiry(receiver: &Receiver<()>, db: &PgPool) { + let row = sqlx::query!("SELECT MIN(valid_till) as min from files") + .fetch_one(db) + .await + .expect("could not fetch expiring file from database"); + let next_timeout = match row.min { + Some(min) => min.signed_duration_since(Local::now().naive_local()), + None => Duration::days(1), + }; + let positive_timeout = next_timeout + .to_std() + .unwrap_or_else(|_| std::time::Duration::from_secs(0)); + futures::select! { + _ = task::sleep(positive_timeout).fuse() => {} + _ = receiver.recv().fuse() => {} + } +} diff --git a/src/file_kind.rs b/src/file_kind.rs new file mode 100644 index 0000000..51a796e --- /dev/null +++ b/src/file_kind.rs @@ -0,0 +1,28 @@ +use std::{fmt::Display, str::FromStr}; + +#[derive(Debug)] +pub(crate) enum FileKind { + TEXT, + BINARY, +} + +impl Display for FileKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FileKind::TEXT => write!(f, "text"), + FileKind::BINARY => write!(f, "binary"), + } + } +} + +impl FromStr for FileKind { + type Err = String; + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "text" => Ok(FileKind::TEXT), + "binary" => Ok(FileKind::BINARY), + _ => Err(format!("unknown kind {}", s)), + } + } +} + diff --git a/src/main.rs b/src/main.rs index 317f6e5..e1c9f46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,24 @@ +mod deleter; +mod file_kind; mod multipart; -use actix_files::Files; +use actix_files::{Files, NamedFile}; use actix_multipart::Multipart; -use actix_web::{error, middleware, web, App, Error, HttpResponse, HttpServer}; -use async_std::prelude::*; -use chrono::{prelude::*, Duration}; -use futures::{StreamExt, TryStreamExt}; +use actix_web::{ + error, + http::header::{ContentDisposition, DispositionParam, DispositionType}, + middleware, + web::{self, Bytes}, + App, Error, FromRequest, HttpRequest, HttpResponse, HttpServer, +}; +use async_std::{fs, path::PathBuf, sync::Sender, task}; +use file_kind::FileKind; use sqlx::postgres::PgPool; use std::env; -const INDEX_HTML: &str = include_str!("../static/index.html"); -const UPLOAD_HTML: &str = include_str!("../static/upload.html"); +const INDEX_HTML: &str = include_str!("../template/index.html"); +const UPLOAD_HTML: &str = include_str!("../template/upload.html"); +const VIEW_HTML: &str = include_str!("../template/view.html"); async fn index() -> Result { Ok(HttpResponse::Ok() @@ -18,105 +26,140 @@ async fn index() -> Result { .body(INDEX_HTML)) } -async fn upload(mut payload: Multipart, db: web::Data) -> Result { - let id = format!("{:x?}", rand::random::()); - let filename = format!("files/{}", id); - let mut timeout: Option = None; - let mut kind: Option = None; +async fn upload( + payload: Multipart, + db: web::Data, + sender: web::Data>, +) -> Result { + let file_id = format!("{:x?}", rand::random::()); + let filename = PathBuf::from(format!("files/{}", file_id)); - while let Ok(Some(mut field)) = payload.try_next().await { - let name = multipart::get_field_name(&field)?; - match name.as_str() { - "validity_secs" => { - timeout = multipart::read_string(field) - .await - .map(Some) - .map_err(error::ErrorInternalServerError)?; - } - "kind" => { - kind = multipart::read_string(field) - .await - .map(Some) - .map_err(error::ErrorInternalServerError)?; - } - "content" => { - let mut file = async_std::fs::File::create(&filename) - .await - .map_err(error::ErrorInternalServerError)?; - - while let Some(chunk) = field.next().await { - let data = chunk.unwrap(); - file = file.write_all(&data).await.map(|_| file)?; + let (original_name, valid_till, kind) = + match multipart::parse_multipart(payload, &file_id, &filename).await { + Ok(data) => data, + Err(err) => { + if filename.exists().await { + fs::remove_file(filename) + .await + .map_err(|_| error::ErrorInternalServerError("could not remove file"))?; } + return Err(err); } - _ => {} }; - } - println!("timeout = {:?}, kind = {:?}", timeout, kind); + sqlx::query!( + "INSERT INTO Files (file_id, file_name, valid_till, kind) VALUES ($1, $2, $3, $4)", + file_id, + original_name.unwrap_or_else(|| file_id.clone()), + valid_till.naive_local(), + kind.to_string() + ) + .execute(db.as_ref()) + .await + .map_err(|_| error::ErrorInternalServerError("could not insert file into database"))?; - if timeout == None || kind == None { - async_std::fs::remove_file(&filename) - .await - .expect("could not delete file"); - return Ok(HttpResponse::BadRequest().body("timeout or kind not specified")); - } + log::info!( + "create new file {} (valid_till: {}, kind: {})", + file_id, + valid_till, + kind + ); - let validity_secs = timeout - .unwrap() - .parse::() - .expect("could not parse validity as int"); - let valid_till = Local::now() + Duration::seconds(validity_secs); - let kind = kind.unwrap(); - - sqlx::query("INSERT INTO Files (valid_till, kind) VALUES ($1, $2)") - .bind(valid_till) - .bind(kind) - .execute(db.as_ref()) - .await - .expect("could not insert"); + sender.send(()).await; Ok(HttpResponse::Found() - .header("location", format!("/upload/{}", id)) + .header("location", format!("/upload/{}", file_id)) .finish()) } async fn uploaded(id: web::Path) -> Result { - let upload_html = UPLOAD_HTML.replace("{id}", &*id); + let upload_html = UPLOAD_HTML.replace("{id}", id.as_ref()); Ok(HttpResponse::Ok() .content_type("text/html") .body(upload_html)) } +async fn download( + req: HttpRequest, + id: web::Path, + db: web::Data, +) -> Result { + let row = sqlx::query!( + "SELECT file_id, file_name, kind from files WHERE file_id = $1", + *id + ) + .fetch_one(db.as_ref()) + .await + .map_err(|_| error::ErrorNotFound("could not find file"))?; + let path: PathBuf = PathBuf::from(format!("files/{}", row.file_id)); + + if row.kind == FileKind::TEXT.to_string() { + let content = fs::read_to_string(path).await?; + let view_html = VIEW_HTML.replace("{text}", &content); + let response = HttpResponse::Ok().content_type("text/html").body(view_html); + Ok(response) + } else { + let file = NamedFile::open(path)?.set_content_disposition(ContentDisposition { + disposition: DispositionType::Attachment, + parameters: vec![DispositionParam::Filename(row.file_name)], + }); + file.into_response(&req) + } +} + +async fn setup_db() -> PgPool { + let pool = PgPool::builder() + .max_size(5) + .build(&env::var("DATABASE_URL").expect("DATABASE_URL environement variable not set")) + .await + .expect("could not create db pool"); + + sqlx::query!( + " + CREATE TABLE IF NOT EXISTS files ( + id serial, + file_id varchar(255) not null, + file_name varchar(255) not null, + valid_till timestamp not null, + kind varchar(255) not null, + primary key (id) + ) + " + ) + .execute(&pool) + .await + .expect("could not create table Files"); + + pool +} + #[actix_rt::main] async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "warn,datatrash=info,actix_web=info"); std::env::set_var("DATABASE_URL", "postgresql://localhost"); env_logger::init(); - let pool: PgPool = PgPool::builder() - .max_size(5) // maximum number of connections in the pool - .build(&env::var("DATABASE_URL").expect("DATABASE_URL environement variable not set")) - .await - .expect("could not create db pool"); - sqlx::query!("CREATE TABLE IF NOT EXISTS Files ( id serial, valid_till timestamp, kind varchar(255), primary key (id) )") - .execute(&pool) - .await - .expect("could not create table Files"); + let pool: PgPool = setup_db().await; log::info!("omnomnom"); + let (send, recv) = async_std::sync::channel::<()>(1); + task::spawn(deleter::delete_old_files(recv, pool.clone())); + let db = web::Data::new(pool); + let send = web::Data::new(send); HttpServer::new(move || { App::new() .wrap(middleware::Logger::default()) .app_data(db.clone()) + .app_data(send.clone()) + .app_data(Bytes::configure(|cfg| cfg.limit(8_388_608))) .service(web::resource("/").route(web::get().to(index))) .service(web::resource("/upload").route(web::post().to(upload))) .service(web::resource("/upload/{id}").route(web::get().to(uploaded))) + .service(web::resource("/file/{id}").route(web::get().to(download))) .service(Files::new("/static", "static").disable_content_disposition()) - .service(Files::new("/file", "files")) }) .bind("0.0.0.0:8000")? .run() diff --git a/src/multipart.rs b/src/multipart.rs index c316553..6624195 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -1,21 +1,118 @@ -use actix_multipart::Field; -use futures::StreamExt; +use crate::file_kind::FileKind; +use actix_multipart::{Field, Multipart}; +use actix_web::{error, http::header::DispositionParam}; +use async_std::{fs, fs::File, path::Path, prelude::*}; +use chrono::{prelude::*, Duration}; +use futures::{StreamExt, TryStreamExt}; -pub fn get_field_name(field: &Field) -> Result { - field +pub(crate) async fn parse_multipart( + mut payload: Multipart, + file_id: &str, + filename: &Path, +) -> Result<(Option, DateTime, FileKind), error::Error> { + let mut original_name: Option = None; + let mut timeout: Option = None; + let mut kind: Option = None; + + while let Ok(Some(field)) = payload.try_next().await { + let name = get_field_name(&field)?; + let name = name.as_str(); + match name { + "validity_secs" => { + timeout = Some(parse_string(name, field).await?); + } + "content" => { + let file_original_name = get_original_filename(&field); + if file_original_name == None || file_original_name.as_deref() == Some("") { + continue; + } + println!("got content"); + original_name = file_original_name; + kind = Some(FileKind::BINARY); + let mut file = fs::File::create(&filename) + .await + .map_err(|_| error::ErrorInternalServerError("could not create file"))?; + write_to_file(&mut file, field) + .await + .map_err(|_| error::ErrorInternalServerError("could not write file"))?; + } + "text_content" => { + if original_name.is_some() { + continue; + } + println!("got text content"); + original_name = Some(format!("{}.txt", file_id)); + kind = Some(FileKind::TEXT); + let mut file = fs::File::create(&filename) + .await + .map_err(|_| error::ErrorInternalServerError("could not create file"))?; + write_to_file(&mut file, field) + .await + .map_err(|_| error::ErrorInternalServerError("could not write file"))?; + } + _ => {} + }; + } + + if let Some(original_name) = &original_name { + if original_name.len() > 255 { + return Err(error::ErrorBadRequest("filename is too long")); + } + } + + let validity_secs = timeout + .ok_or_else(|| error::ErrorBadRequest("field validity_secs not set"))? + .parse() + .map_err(|e| { + error::ErrorBadRequest(format!("field validity_secs is not a number: {}", e)) + })?; + let valid_till = Local::now() + Duration::seconds(validity_secs); + let kind = kind.ok_or_else(|| error::ErrorBadRequest("no content found"))?; + Ok((original_name, valid_till, kind)) +} + +fn get_field_name(field: &Field) -> Result { + Ok(field .content_disposition() - .ok_or_else(|| actix_web::error::ParseError::Incomplete)? + .ok_or_else(|| error::ParseError::Incomplete)? .get_name() .map(|s| s.to_owned()) - .ok_or_else(|| actix_web::error::ParseError::Incomplete) + .ok_or_else(|| error::ParseError::Incomplete)?) } -pub async fn read_string( - mut field: actix_multipart::Field, -) -> Result { +async fn parse_string(name: &str, field: actix_multipart::Field) -> Result { + let data = read_content(field).await?; + String::from_utf8(data) + .map_err(|_| error::ErrorBadRequest(format!("could not parse field {} as utf-8", name))) +} + +async fn read_content(mut field: actix_multipart::Field) -> Result, error::Error> { let mut data = Vec::new(); while let Some(chunk) = field.next().await { - data.extend(chunk.unwrap()); + data.extend(chunk.map_err(error::ErrorBadRequest)?); } - String::from_utf8(data) + Ok(data) +} + +async fn write_to_file( + file: &mut File, + mut field: actix_multipart::Field, +) -> Result<(), error::Error> { + while let Some(chunk) = field.next().await { + file.write_all(chunk.map_err(error::ErrorBadRequest)?.as_ref()) + .await?; + } + Ok(()) +} + +fn get_original_filename(field: &actix_multipart::Field) -> Option { + field.content_disposition().and_then(|content_disposition| { + content_disposition + .parameters + .into_iter() + .find_map(|param| match param { + DispositionParam::Filename(filename) => Some(filename), + _ => None, + }) + }) } diff --git a/static/index.css b/static/index.css index 9db1d1d..6f24b1f 100644 --- a/static/index.css +++ b/static/index.css @@ -8,3 +8,31 @@ main { max-width: 1200px; margin: 0 auto; } + +h1 > a, +h1 > a:visited { + color: #dddddd; +} + +a { + color: cornflowerblue; +} + +a:visited { + color: mediumorchid; +} + +input, +select, +textarea { + background-color: #222222; + color: #dddddd; + padding: 0.5rem; + border: 2px solid #dddddd; + border-radius: 5px; + margin-bottom: 1rem; +} + +input[type='submit'] { + background-color: green; +} diff --git a/static/index.html b/static/index.html deleted file mode 100644 index 8cb83cc..0000000 --- a/static/index.html +++ /dev/null @@ -1,18 +0,0 @@ - - - - datatrash - - - -
-

datatrash

-
- - - - -
-
- - diff --git a/template/index.html b/template/index.html new file mode 100644 index 0000000..e09c383 --- /dev/null +++ b/template/index.html @@ -0,0 +1,35 @@ + + + + datatrash + + + + +
+

datatrash

+
+ +
+ +
+ Gültig für + +
+ +
+
+ + diff --git a/static/upload.html b/template/upload.html similarity index 82% rename from static/upload.html rename to template/upload.html index c2ab0b5..5923e9f 100644 --- a/static/upload.html +++ b/template/upload.html @@ -2,11 +2,12 @@ datatrash +
-

datatrash

+

datatrash

Uploaded diff --git a/template/view.html b/template/view.html new file mode 100644 index 0000000..4cd0d3e --- /dev/null +++ b/template/view.html @@ -0,0 +1,14 @@ + + + + datatrash + + + + +

+

datatrash

+ +
+ +