commit 8705de191d1564cd9bac472f5c2f9557eb52fed5 Author: asonix Date: Mon May 25 14:47:10 2020 -0500 Actix FS impl (needs docs) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3e4d38b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target +Cargo.lock +/tests/write.txt +/tests/test.txt diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..50742a3 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "actix-fs" +version = "0.1.0" +authors = ["asonix "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-threadpool = "0.3.2" +bytes = "0.5.4" +futures = "0.3.5" +thiserror = "1.0" + +[dev-dependencies] +actix-rt = "1.1.1" +anyhow = "1.0" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..cd1fc96 --- /dev/null +++ b/LICENSE @@ -0,0 +1,417 @@ +Actix FS +Copyright Riley Trautman 2020 + +COOPERATIVE SOFTWARE LICENSE + +THE WORK (AS DEFINED BELOW) IS PROVIDED UNDER THE TERMS OF THIS +COPYFARLEFT PUBLIC LICENSE ("LICENSE"). THE WORK IS PROTECTED BY +COPYRIGHT AND ALL OTHER APPLICABLE LAWS. ANY USE OF THE WORK OTHER THAN +AS AUTHORIZED UNDER THIS LICENSE OR COPYRIGHT LAW IS PROHIBITED. BY +EXERCISING ANY RIGHTS TO THE WORK PROVIDED IN THIS LICENSE, YOU AGREE +TO BE BOUND BY THE TERMS OF THIS LICENSE. TO THE EXTENT THIS LICENSE +MAY BE CONSIDERED TO BE A CONTRACT, THE LICENSOR GRANTS YOU THE RIGHTS +CONTAINED HERE IN AS CONSIDERATION FOR ACCEPTING THE TERMS AND +CONDITIONS OF THIS LICENSE AND FOR AGREEING TO BE BOUND BY THE TERMS +AND CONDITIONS OF THIS LICENSE. + +1. DEFINITIONS + + a. "Adaptation" means a work based upon the Work, or upon the + Work and other pre-existing works, such as a translation, + adaptation, derivative work, arrangement of music or other + alterations of a literary or artistic work, or phonogram or + performance and includes cinematographic adaptations or any + other form in which the Work may be recast, transformed, or + adapted including in any form recognizably derived from the + original, except that a work that constitutes a Collection will + not be considered an Adaptation for the purpose of this License. + For the avoidance of doubt, where the Work is a musical work, + performance or phonogram, the synchronization of the Work in + timed-relation with a moving image ("synching") will be + considered an Adaptation for the purpose of this License. + + b. "Collection" means a collection of literary or artistic + works, such as encyclopedias and anthologies, or performances, + phonograms or broadcasts, or other works or subject matter other + than works listed in Section 1(f) below, which, by reason of the + selection and arrangement of their contents, constitute + intellectual creations, in which the Work is included in its + entirety in unmodified form along with one or more other + contributions, each constituting separate and independent works + in themselves, which together are assembled into a collective + whole. A work that constitutes a Collection will not be + considered an Adaptation (as defined above) for the purposes of + this License. + + c. "Distribute" means to make available to the public the + original and copies of the Work or Adaptation, as appropriate, + through sale, gift or any other transfer of possession or + ownership. + + d. "Licensor" means the individual, individuals, entity or + entities that offer(s) the Work under the terms of this License. + + e. "Original Author" means, in the case of a literary or + artistic work, the individual, individuals, entity or entities + who created the Work or if no individual or entity can be + identified, the publisher; and in addition (i) in the case of a + performance the actors, singers, musicians, dancers, and other + persons who act, sing, deliver, declaim, play in, interpret or + otherwise perform literary or artistic works or expressions of + folklore; (ii) in the case of a phonogram the producer being the + person or legal entity who first fixes the sounds of a + performance or other sounds; and, (iii) in the case of + broadcasts, the organization that transmits the broadcast. + + f. "Work" means the literary and/or artistic work offered under + the terms of this License including without limitation any + production in the literary, scientific and artistic domain, + whatever may be the mode or form of its expression including + digital form, such as a book, pamphlet and other writing; a + lecture, address, sermon or other work of the same nature; a + dramatic or dramatico-musical work; a choreographic work or + entertainment in dumb show; a musical composition with or + without words; a cinematographic work to which are assimilated + works expressed by a process analogous to cinematography; a work + of drawing, painting, architecture, sculpture, engraving or + lithography; a photographic work to which are assimilated works + expressed by a process analogous to photography; a work of + applied art; an illustration, map, plan, sketch or + three-dimensional work relative to geography, topography, + architecture or science; a performance; a broadcast; a + phonogram; a compilation of data to the extent it is protected + as a copyrightable work; or a work performed by a variety or + circus performer to the extent it is not otherwise considered a + literary or artistic work. + + g. "You" means an individual or entity exercising rights under + this License who has not previously violated the terms of this + License with respect to the Work, or who has received express + permission from the Licensor to exercise rights under this + License despite a previous violation. + + h. "Publicly Perform" means to perform public recitations of the + Work and to communicate to the public those public recitations, + by any means or process, including by wire or wireless means or + public digital performances; to make available to the public + Works in such a way that members of the public may access these + Works from a place and at a place individually chosen by them; + to perform the Work to the public by any means or process and + the communication to the public of the performances of the Work, + including by public digital performance; to broadcast and + rebroadcast the Work by any means including signs, sounds or + images. + + i. "Reproduce" means to make copies of the Work by any means + including without limitation by sound or visual recordings and + the right of fixation and reproducing fixations of the Work, + including storage of a protected performance or phonogram in + digital form or other electronic medium. + + j. "Software" means any digital Work which, through use of a + third-party piece of Software or through the direct usage of + itself on a computer system, the memory of the computer is + modified dynamically or semi-dynamically. "Software", + secondly, processes or interprets information. + + k. "Source Code" means the human-readable form of Software + through which the Original Author and/or Distributor originally + created, derived, and/or modified it. + + l. "Web Service" means the use of a piece of Software to + interpret or modify information that is subsequently and directly + served to users over the Internet. + +2. FAIR DEALING RIGHTS + + Nothing in this License is intended to reduce, limit, or restrict any + uses free from copyright or rights arising from limitations or + exceptions that are provided for in connection with the copyright + protection under copyright law or other applicable laws. + +3. LICENSE GRANT + + Subject to the terms and conditions of this License, Licensor hereby + grants You a worldwide, royalty-free, non-exclusive, perpetual (for the + duration of the applicable copyright) license to exercise the rights in + the Work as stated below: + + a. to Reproduce the Work, to incorporate the Work into one or + more Collections, and to Reproduce the Work as incorporated in + the Collections; + + b. to create and Reproduce Adaptations provided that any such + Adaptation, including any translation in any medium, takes + reasonable steps to clearly label, demarcate or otherwise + identify that changes were made to the original Work. For + example, a translation could be marked "The original work was + translated from English to Spanish," or a modification could + indicate "The original work has been modified."; + + c. to Distribute and Publicly Perform the Work including as + incorporated in Collections; and, + + d. to Distribute and Publicly Perform Adaptations. The above + rights may be exercised in all media and formats whether now + known or hereafter devised. The above rights include the right + to make such modifications as are technically necessary to + exercise the rights in other media and formats. Subject to + Section 8(g), all rights not expressly granted by Licensor are + hereby reserved, including but not limited to the rights set + forth in Section 4(h). + +4. RESTRICTIONS + + The license granted in Section 3 above is expressly made subject to and + limited by the following restrictions: + + a. You may Distribute or Publicly Perform the Work only under + the terms of this License. You must include a copy of, or the + Uniform Resource Identifier (URI) for, this License with every + copy of the Work You Distribute or Publicly Perform. You may not + offer or impose any terms on the Work that restrict the terms of + this License or the ability of the recipient of the Work to + exercise the rights granted to that recipient under the terms of + the License. You may not sublicense the Work. You must keep + intact all notices that refer to this License and to the + disclaimer of warranties with every copy of the Work You + Distribute or Publicly Perform. When You Distribute or Publicly + Perform the Work, You may not impose any effective technological + measures on the Work that restrict the ability of a recipient of + the Work from You to exercise the rights granted to that + recipient under the terms of the License. This Section 4(a) + applies to the Work as incorporated in a Collection, but this + does not require the Collection apart from the Work itself to be + made subject to the terms of this License. If You create a + Collection, upon notice from any Licensor You must, to the + extent practicable, remove from the Collection any credit as + required by Section 4(f), as requested. If You create an + Adaptation, upon notice from any Licensor You must, to the + extent practicable, remove from the Adaptation any credit as + required by Section 4(f), as requested. + + b. Subject to the exception in Section 4(e), you may not + exercise any of the rights granted to You in Section 3 above in + any manner that is primarily intended for or directed toward + commercial advantage or private monetary compensation. The + exchange of the Work for other copyrighted works by means of + digital file-sharing or otherwise shall not be considered to be + intended for or directed toward commercial advantage or private + monetary compensation, provided there is no payment of any + monetary compensation in connection with the exchange of + copyrighted works. + + c. If the Work meets the definition of Software, You may exercise + the rights granted in Section 3 only if You provide a copy of the + corresponding Source Code from which the Work was derived in digital + form, or You provide a URI for the corresponding Source Code of + the Work, to any recipients upon request. + + d. If the Work is used as or for a Web Service, You may exercise + the rights granted in Section 3 only if You provide a copy of the + corresponding Source Code from which the Work was derived in digital + form, or You provide a URI for the corresponding Source Code to the + Work, to any recipients of the data served or modified by the Web + Service. + + e. You may exercise the rights granted in Section 3 for + commercial purposes only if you satisfy any of the following: + + i. You are a worker-owned business or worker-owned + collective; and + ii. after tax, all financial gain, surplus, profits and + benefits produced by the business or collective are + distributed among the worker-owners + iii. You are not using such rights on behalf of a business + other than those specified in 4(e.i) and elaborated upon in + 4(e.ii), nor are using such rights as a proxy on behalf of a + business with the intent to circumvent the aforementioned + restrictions on such a business. + + f. Any use by a business that is privately owned and managed, + and that seeks to generate profit from the labor of employees + paid by salary or other wages, is not permitted under this + license. + + g. If You Distribute, or Publicly Perform the Work or any + Adaptations or Collections, You must, unless a request has been + made pursuant to Section 4(a), keep intact all copyright notices + for the Work and provide, reasonable to the medium or means You + are utilizing: (i) the name of the Original Author (or + pseudonym, if applicable) if supplied, and/or if the Original + Author and/or Licensor designate another party or parties (e.g., + a sponsor institute, publishing entity, journal) for attribution + ("Attribution Parties") in Licensor!s copyright notice, terms of + service or by other reasonable means, the name of such party or + parties; (ii) the title of the Work if supplied; (iii) to the + extent reasonably practicable, the URI, if any, that Licensor + specifies to be associated with the Work, unless such URI does + not refer to the copyright notice or licensing information for + the Work; and, (iv) consistent with Section 3(b), in the case of + an Adaptation, a credit identifying the use of the Work in the + Adaptation (e.g., "French translation of the Work by Original + Author," or "Screenplay based on original Work by Original + Author"). The credit required by this Section 4(f) may be + implemented in any reasonable manner; provided, however, that in + the case of a Adaptation or Collection, at a minimum such credit + will appear, if a credit for all contributing authors of the + Adaptation or Collection appears, then as part of these credits + and in a manner at least as prominent as the credits for the + other contributing authors. For the avoidance of doubt, You may + only use the credit required by this Section for the purpose of + attribution in the manner set out above and, by exercising Your + rights under this License, You may not implicitly or explicitly + assert or imply any connection with, sponsorship or endorsement + by the Original Author, Licensor and/or Attribution Parties, as + appropriate, of You or Your use of the Work, without the + separate, express prior written permission of the Original + Author, Licensor and/or Attribution Parties. + + h. For the avoidance of doubt: + + i. Non-waivable Compulsory License Schemes. In those + jurisdictions in which the right to collect royalties + through any statutory or compulsory licensing scheme + cannot be waived, the Licensor reserves the exclusive + right to collect such royalties for any exercise by You of + the rights granted under this License; + + ii. Waivable Compulsory License Schemes. In those + jurisdictions in which the right to collect royalties + through any statutory or compulsory licensing scheme can + be waived, the Licensor reserves the exclusive right to + collect such royalties for any exercise by You of the + rights granted under this License if Your exercise of such + rights is for a purpose or use which is otherwise than + noncommercial as permitted under Section 4(b) and + otherwise waives the right to collect royalties through + any statutory or compulsory licensing scheme; and, + iii.Voluntary License Schemes. The Licensor reserves the + right to collect royalties, whether individually or, in + the event that the Licensor is a member of a collecting + society that administers voluntary licensing schemes, via + that society, from any exercise by You of the rights + granted under this License that is for a purpose or use + which is otherwise than noncommercial as permitted under + Section 4(b). + + i. Except as otherwise agreed in writing by the Licensor or as + may be otherwise permitted by applicable law, if You Reproduce, + Distribute or Publicly Perform the Work either by itself or as + part of any Adaptations or Collections, You must not distort, + mutilate, modify or take other derogatory action in relation to + the Work which would be prejudicial to the Original Author's + honor or reputation. Licensor agrees that in those jurisdictions + (e.g. Japan), in which any exercise of the right granted in + Section 3(b) of this License (the right to make Adaptations) + would be deemed to be a distortion, mutilation, modification or + other derogatory action prejudicial to the Original Author's + honor and reputation, the Licensor will waive or not assert, as + appropriate, this Section, to the fullest extent permitted by + the applicable national law, to enable You to reasonably + exercise Your right under Section 3(b) of this License (right to + make Adaptations) but not otherwise. + +5. REPRESENTATIONS, WARRANTIES AND DISCLAIMER + + UNLESS OTHERWISE MUTUALLY AGREED TO BY THE PARTIES IN WRITING, LICENSOR + OFFERS THE WORK AS-IS AND MAKES NO REPRESENTATIONS OR WARRANTIES OF ANY + KIND CONCERNING THE WORK, EXPRESS, IMPLIED, STATUTORY OR OTHERWISE, + INCLUDING, WITHOUT LIMITATION, WARRANTIES OF TITLE, MERCHANTIBILITY, + FITNESS FOR A PARTICULAR PURPOSE, NONINFRINGEMENT, OR THE ABSENCE OF + LATENT OR OTHER DEFECTS, ACCURACY, OR THE PRESENCE OF ABSENCE OF + ERRORS, WHETHER OR NOT DISCOVERABLE. SOME JURISDICTIONS DO NOT ALLOW + THE EXCLUSION OF IMPLIED WARRANTIES, SO SUCH EXCLUSION MAY NOT APPLY TO + YOU. + +6. LIMITATION ON LIABILITY + + EXCEPT TO THE EXTENT REQUIRED BY APPLICABLE LAW, IN NO EVENT WILL + LICENSOR BE LIABLE TO YOU ON ANY LEGAL THEORY FOR ANY SPECIAL, + INCIDENTAL, CONSEQUENTIAL, PUNITIVE OR EXEMPLARY DAMAGES ARISING OUT OF + THIS LICENSE OR THE USE OF THE WORK, EVEN IF LICENSOR HAS BEEN ADVISED + OF THE POSSIBILITY OF SUCH DAMAGES. + +7. TERMINATION + + a. This License and the rights granted hereunder will terminate + automatically upon any breach by You of the terms of this + License. Individuals or entities who have received Adaptations + or Collections from You under this License, however, will not + have their licenses terminated provided such individuals or + entities remain in full compliance with those licenses. Sections + 1, 2, 5, 6, 7, and 8 will survive any termination of this + License. + + b. Subject to the above terms and conditions, the license + granted here is perpetual (for the duration of the applicable + copyright in the Work). Notwithstanding the above, Licensor + reserves the right to release the Work under different license + terms or to stop distributing the Work at any time; provided, + however that any such election will not serve to withdraw this + License (or any other license that has been, or is required to + be, granted under the terms of this License), and this License + will continue in full force and effect unless terminated as + stated above. + +8. MISCELLANEOUS + + a. Each time You Distribute or Publicly Perform the Work or a + Collection, the Licensor offers to the recipient a license to + the Work on the same terms and conditions as the license granted + to You under this License. + + b. Each time You Distribute or Publicly Perform an Adaptation, + Licensor offers to the recipient a license to the original Work + on the same terms and conditions as the license granted to You + under this License. + + c. If the Work is classified as Software, each time You Distribute + or Publicly Perform an Adaptation, Licensor offers to the recipient + a copy and/or URI of the corresponding Source Code on the same + terms and conditions as the license granted to You under this License. + + d. If the Work is used as a Web Service, each time You Distribute + or Publicly Perform an Adaptation, or serve data derived from the + Software, the Licensor offers to any recipients of the data a copy + and/or URI of the corresponding Source Code on the same terms and + conditions as the license granted to You under this License. + + e. If any provision of this License is invalid or unenforceable + under applicable law, it shall not affect the validity or + enforceability of the remainder of the terms of this License, + and without further action by the parties to this agreement, + such provision shall be reformed to the minimum extent necessary + to make such provision valid and enforceable. + + f. No term or provision of this License shall be deemed waived + and no breach consented to unless such waiver or consent shall + be in writing and signed by the party to be charged with such + waiver or consent. + + g. This License constitutes the entire agreement between the + parties with respect to the Work licensed here. There are no + understandings, agreements or representations with respect to + the Work not specified here. Licensor shall not be bound by any + additional provisions that may appear in any communication from + You. This License may not be modified without the mutual written + agreement of the Licensor and You. + + h. The rights granted under, and the subject matter referenced, + in this License were drafted utilizing the terminology of the + Berne Convention for the Protection of Literary and Artistic + Works (as amended on September 28, 1979), the Rome Convention of + 1961, the WIPO Copyright Treaty of 1996, the WIPO Performances + and Phonograms Treaty of 1996 and the Universal Copyright + Convention (as revised on July 24, 1971). These rights and + subject matter take effect in the relevant jurisdiction in which + the License terms are sought to be enforced according to the + corresponding provisions of the implementation of those treaty + provisions in the applicable national law. If the standard suite + of rights granted under applicable copyright law includes + additional rights not granted under this License, such + additional rights are deemed to be included in the License; this + License is not intended to restrict the license of any rights + under applicable law. + + diff --git a/README.md b/README.md new file mode 100644 index 0000000..73d2279 --- /dev/null +++ b/README.md @@ -0,0 +1,37 @@ +# Actix FS +_Asyncronous filesystem operations for actix-based systems_ + +## Usage + +```rust +use std::io::SeekFrom; + +#[actix_rt::main] +async fn main() -> Result<(), anyhow::Error> { + let file = actix_fs::open("tests/read.txt").await?; + let (file, position) = actix_fs::seek(file, SeekFrom::Start(7)).await?; + let bytes = actix_fs::read_bytes(file).await?; + + assert!(position == 7); + assert!(bytes.as_ref() == b"World!\n"); + Ok(()) +} +``` + +### Contributing +Unless otherwise stated, all contributions to this project will be licensed under the CSL with +the exceptions listed in the License section of this file. + +### License +This work is licensed under the Cooperative Software License. This is not a Free Software +License, but may be considered a "source-available License." For most hobbyists, self-employed +developers, worker-owned companies, and cooperatives, this software can be used in most +projects so long as this software is distributed under the terms of the CSL. For more +information, see the provided LICENSE file. If none exists, the license can be found online +[here](https://lynnesbian.space/csl/). If you are a free software project and wish to use this +software under the terms of the GNU Affero General Public License, please contact me at +[asonix@asonix.dog](mailto:asonix@asonix.dog) and we can sort that out. If you wish to use this +project under any other license, especially in proprietary software, the answer is likely no. + +Actix FS is currently licensed under the AGPL to the Lemmy project, found +at [github.com/LemmyNet/lemmy](https://github.com/LemmyNet/lemmy) diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e17451d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,422 @@ +//! # Actix FS +//! _Asyncronous filesystem operations for actix-based systems_ +//! +//! ## Usage +//! +//! ```rust +//! use std::io::SeekFrom; +//! +//! #[actix_rt::main] +//! async fn main() -> Result<(), anyhow::Error> { +//! let file = actix_fs::open("tests/read.txt").await?; +//! let (file, position) = actix_fs::seek(file, SeekFrom::Start(7)).await?; +//! let bytes = actix_fs::read_bytes(file).await?; +//! +//! assert!(position == 7); +//! assert!(bytes.as_ref() == b"World!\n"); +//! Ok(()) +//! } +//! ``` +//! +//! ### Contributing +//! Unless otherwise stated, all contributions to this project will be licensed under the CSL with +//! the exceptions listed in the License section of this file. +//! +//! ### License +//! This work is licensed under the Cooperative Software License. This is not a Free Software +//! License, but may be considered a "source-available License." For most hobbyists, self-employed +//! developers, worker-owned companies, and cooperatives, this software can be used in most +//! projects so long as this software is distributed under the terms of the CSL. For more +//! information, see the provided LICENSE file. If none exists, the license can be found online +//! [here](https://lynnesbian.space/csl/). If you are a free software project and wish to use this +//! software under the terms of the GNU Affero General Public License, please contact me at +//! [asonix@asonix.dog](mailto:asonix@asonix.dog) and we can sort that out. If you wish to use this +//! project under any other license, especially in proprietary software, the answer is likely no. +//! +//! Actix FS is currently licensed under the AGPL to the Lemmy project, found +//! at [github.com/LemmyNet/lemmy](https://github.com/LemmyNet/lemmy) +use actix_threadpool::BlockingError; +use bytes::{Bytes, BytesMut}; +use futures::{ + future::{FutureExt, LocalBoxFuture}, + sink::{Sink, SinkExt}, + stream::{Stream, StreamExt}, +}; +use std::{ + fs::{File, Metadata}, + future::Future, + io::{self, prelude::*}, + marker::PhantomData, + path::Path, + pin::Pin, + task::{Context, Poll}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("{0}")] + Io(#[from] io::Error), + + #[error("Task canceled")] + Canceled, +} + +pub struct FileStream { + chunk_size: u64, + size: u64, + offset: u64, + file: Option, + fut: Option>>>, +} + +impl FileStream { + async fn new(file: File) -> Result { + let (file, offset) = seek(file, io::SeekFrom::Current(0)).await?; + let (file, metadata) = metadata(file).await?; + + Ok(FileStream { + chunk_size: 65_356, + size: metadata.len(), + offset, + file: Some(file), + fut: None, + }) + } + + pub fn chunk_size(mut self, chunk_size: u64) -> Self { + self.chunk_size = chunk_size; + self + } +} + +struct FileSink { + file: Option, + fut: Option>>>, + chunk_size: u64, + closing: bool, + _error: PhantomData, +} + +impl FileSink { + fn new(file: File) -> Self { + FileSink { + file: Some(file), + fut: None, + chunk_size: 0, + closing: false, + _error: PhantomData, + } + } +} + +pub async fn open

(path: P) -> Result +where + P: AsRef + Send + 'static, +{ + let file = actix_threadpool::run(move || File::open(path)).await?; + + Ok(file) +} + +pub async fn create

(path: P) -> Result +where + P: AsRef + Send + 'static, +{ + let file = actix_threadpool::run(move || File::create(path)).await?; + + Ok(file) +} + +pub async fn remove

(path: P) -> Result<(), Error> +where + P: AsRef + Send + 'static, +{ + actix_threadpool::run(move || std::fs::remove_file(path)).await?; + + Ok(()) +} + +pub async fn seek(mut file: File, seek: io::SeekFrom) -> Result<(File, u64), Error> { + let tup = actix_threadpool::run(move || { + let pos = file.seek(seek)?; + Ok((file, pos)) as Result<_, io::Error> + }) + .await?; + + Ok(tup) +} + +pub async fn metadata(file: File) -> Result<(File, Metadata), Error> { + let tup = actix_threadpool::run(move || { + let metadata = file.metadata()?; + + Ok((file, metadata)) as Result<_, io::Error> + }) + .await?; + + Ok(tup) +} + +pub async fn read_stream(file: File) -> Result { + FileStream::new(file).await +} + +pub async fn read_bytes(file: File) -> Result { + let mut stream = FileStream::new(file).await?; + let mut bytes_mut = BytesMut::new(); + + while let Some(res) = stream.next().await { + bytes_mut.extend(res?); + } + + Ok(bytes_mut.freeze()) +} + +pub async fn write_stream(file: File, mut stream: S) -> Result<(), E> +where + S: Stream> + Unpin, + E: From + Unpin, +{ + let mut sink = FileSink::::new(file); + + sink.send_all(&mut stream).await?; + sink.close().await?; + Ok(()) +} + +pub async fn write_bytes(file: File, bytes: Bytes) -> Result<(), Error> { + let mut sink = FileSink::::new(file); + + sink.send(bytes).await?; + sink.close().await?; + Ok(()) +} + +impl Stream for FileStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(ref mut fut) = self.fut { + return match Pin::new(fut).poll(cx) { + Poll::Ready(Ok((file, bytes, offset))) => { + self.fut.take(); + self.file = Some(file); + self.offset = offset as u64; + Poll::Ready(Some(Ok(bytes))) + } + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))), + Poll::Pending => Poll::Pending, + }; + } + + let size = self.size; + let offset = self.offset; + let chunk_size = self.chunk_size; + + if size == offset { + return Poll::Ready(None); + } + + let mut file = self.file.take().expect("Use after completion"); + self.fut = Some( + actix_threadpool::run(move || { + let max_bytes: usize; + max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize; + let mut buf = Vec::with_capacity(max_bytes); + let pos = file.seek(io::SeekFrom::Start(offset))?; + let nbytes = Read::by_ref(&mut file) + .take(max_bytes as u64) + .read_to_end(&mut buf)?; + if nbytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + Ok((file, Bytes::from(buf), pos as usize + nbytes)) + }) + .boxed_local(), + ); + self.poll_next(cx) + } +} + +impl Sink for FileSink +where + E: From + Unpin, +{ + type Error = E; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(ref mut fut) = self.fut { + return match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(file)) => { + self.fut.take(); + self.file = Some(file); + self.chunk_size = 0; + + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(Error::from(e).into())), + Poll::Pending => Poll::Pending, + }; + } + + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { + let mut file = self.file.take().expect("Use after completion"); + self.chunk_size = item.len() as u64; + + self.fut = Some( + actix_threadpool::run(move || { + file.write_all(item.as_ref())?; + Ok(file) as Result<_, io::Error> + }) + .boxed_local(), + ); + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.poll_ready(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if !self.closing { + if let Some(ref mut fut) = self.fut { + match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(file)) => { + self.file = Some(file); + self.chunk_size = 0; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::from(e).into())), + Poll::Pending => return Poll::Pending, + }; + } + + let mut file = self.file.take().expect("Use after completion"); + self.closing = true; + + self.fut = Some( + actix_threadpool::run(move || { + file.flush()?; + Ok(file) as Result<_, io::Error> + }) + .boxed_local(), + ); + } + + self.poll_ready(cx) + } +} + +impl From> for Error { + fn from(e: BlockingError) -> Self { + match e { + BlockingError::Error(e) => e.into(), + _ => Error::Canceled, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const READ_FILE: &str = "tests/read.txt"; + const WRITE_FILE: &str = "tests/write.txt"; + const TEST_FILE: &str = "tests/test.txt"; + + #[test] + fn stream_file() { + run(async move { + let read_file = open(READ_FILE).await?; + let stream = read_stream(read_file).await?; + let write_file = create(WRITE_FILE).await?; + + write_stream(write_file, stream).await?; + + let read_file = open(READ_FILE).await?; + let write_file = open(WRITE_FILE).await?; + let read = read_bytes(read_file).await?; + let written = read_bytes(write_file).await?; + + assert!(written.as_ref() == read.as_ref()); + + remove(WRITE_FILE).await?; + + Ok(()) as Result<_, Error> + }) + .unwrap() + } + + #[test] + fn read_write_file() { + let bytes_to_be_written = b"abcdefg"; + run(async move { + let file = create(TEST_FILE).await?; + write_bytes(file, bytes_to_be_written.to_vec().into()).await?; + + let file = open(TEST_FILE).await?; + let bytes = read_bytes(file).await?; + + assert!(bytes.as_ref() == bytes_to_be_written); + + remove(TEST_FILE).await?; + + Ok(()) as Result<_, Error> + }) + .unwrap(); + } + + #[test] + fn read_file() { + run(async move { + let file = open(READ_FILE).await?; + let bytes = read_bytes(file).await?; + + assert!(bytes.as_ref() == b"Hello, World!\n"); + Ok(()) as Result<_, Error> + }) + .unwrap(); + } + + #[test] + fn seek_file() { + run(async move { + let file = open(READ_FILE).await?; + let (file, pos) = seek(file, io::SeekFrom::Start(7)).await?; + assert!(pos == 7); + + let bytes = read_bytes(file).await?; + + assert!(bytes.as_ref() == b"World!\n"); + Ok(()) as Result<_, Error> + }) + .unwrap(); + } + + #[test] + fn small_chunks() { + run(async move { + let file = open(READ_FILE).await?; + + let mut bytes_mut = BytesMut::new(); + let (file, _) = seek(file, io::SeekFrom::Start(7)).await?; + let mut stream = read_stream(file).await?.chunk_size(2); + + while let Some(res) = stream.next().await { + bytes_mut.extend(res?); + } + let bytes = bytes_mut.freeze(); + + assert!(bytes.as_ref() == b"World!\n"); + Ok(()) as Result<_, Error> + }) + .unwrap(); + } + + fn run(f: F) -> F::Output { + actix_rt::System::new("test-system").block_on(f) + } +} diff --git a/tests/read.txt b/tests/read.txt new file mode 100644 index 0000000..8ab686e --- /dev/null +++ b/tests/read.txt @@ -0,0 +1 @@ +Hello, World!