[go: up one dir, main page]

worker/r2/
mod.rs

1use std::{collections::HashMap, convert::TryInto};
2
3pub use builder::*;
4
5use js_sys::{JsString, Reflect, Uint8Array};
6use wasm_bindgen::{JsCast, JsValue};
7use wasm_bindgen_futures::JsFuture;
8use worker_sys::{
9    FixedLengthStream as EdgeFixedLengthStream, R2Bucket as EdgeR2Bucket, R2Checksums,
10    R2MultipartUpload as EdgeR2MultipartUpload, R2Object as EdgeR2Object,
11    R2ObjectBody as EdgeR2ObjectBody, R2Objects as EdgeR2Objects,
12    R2UploadedPart as EdgeR2UploadedPart,
13};
14
15use crate::{
16    env::EnvBinding, ByteStream, Date, Error, FixedLengthStream, Headers, ResponseBody, Result,
17};
18
19mod builder;
20
21/// An instance of the R2 bucket binding.
22#[derive(Clone)]
23pub struct Bucket {
24    inner: EdgeR2Bucket,
25}
26
27impl Bucket {
28    /// Retrieves the [Object] for the given key containing only object metadata, if the key exists.
29    pub async fn head(&self, key: impl Into<String>) -> Result<Option<Object>> {
30        let head_promise = self.inner.head(key.into())?;
31        let value = JsFuture::from(head_promise).await?;
32
33        if value.is_null() {
34            return Ok(None);
35        }
36
37        Ok(Some(Object {
38            inner: ObjectInner::NoBody(value.into()),
39        }))
40    }
41
42    /// Retrieves the [Object] for the given key containing object metadata and the object body if
43    /// the key exists. In the event that a precondition specified in options fails, get() returns
44    /// an [Object] with no body.
45    pub fn get(&self, key: impl Into<String>) -> GetOptionsBuilder {
46        GetOptionsBuilder {
47            edge_bucket: &self.inner,
48            key: key.into(),
49            only_if: None,
50            range: None,
51        }
52    }
53
54    /// Stores the given `value` and metadata under the associated `key`. Once the write succeeds,
55    /// returns an [Object] containing metadata about the stored Object.
56    ///
57    /// R2 writes are strongly consistent. Once the future resolves, all subsequent read operations
58    /// will see this key value pair globally.
59    pub fn put(&self, key: impl Into<String>, value: impl Into<Data>) -> PutOptionsBuilder {
60        PutOptionsBuilder {
61            edge_bucket: &self.inner,
62            key: key.into(),
63            value: value.into(),
64            http_metadata: None,
65            custom_metadata: None,
66            checksum: None,
67            checksum_algorithm: "md5".into(),
68        }
69    }
70
71    /// Deletes the given value and metadata under the associated key. Once the delete succeeds,
72    /// returns void.
73    ///
74    /// R2 deletes are strongly consistent. Once the Promise resolves, all subsequent read
75    /// operations will no longer see this key value pair globally.
76    pub async fn delete(&self, key: impl Into<String>) -> Result<()> {
77        let delete_promise = self.inner.delete(key.into())?;
78        JsFuture::from(delete_promise).await?;
79        Ok(())
80    }
81
82    /// Returns an [Objects] containing a list of [Objects]s contained within the bucket. By
83    /// default, returns the first 1000 entries.
84    pub fn list(&self) -> ListOptionsBuilder {
85        ListOptionsBuilder {
86            edge_bucket: &self.inner,
87            limit: None,
88            prefix: None,
89            cursor: None,
90            delimiter: None,
91            include: None,
92        }
93    }
94
95    /// Creates a multipart upload.
96    ///
97    /// Returns a [MultipartUpload] value representing the newly created multipart upload.
98    /// Once the multipart upload has been created, the multipart upload can be immediately
99    /// interacted with globally, either through the Workers API, or through the S3 API.
100    pub fn create_multipart_upload(
101        &self,
102        key: impl Into<String>,
103    ) -> CreateMultipartUploadOptionsBuilder {
104        CreateMultipartUploadOptionsBuilder {
105            edge_bucket: &self.inner,
106            key: key.into(),
107            http_metadata: None,
108            custom_metadata: None,
109        }
110    }
111
112    /// Returns an object representing a multipart upload with the given `key` and `uploadId`.
113    ///
114    /// The operation does not perform any checks to ensure the validity of the `uploadId`,
115    /// nor does it verify the existence of a corresponding active multipart upload.
116    /// This is done to minimize latency before being able to call subsequent operations on the returned object.
117    pub fn resume_multipart_upload(
118        &self,
119        key: impl Into<String>,
120        upload_id: impl Into<String>,
121    ) -> Result<MultipartUpload> {
122        Ok(MultipartUpload {
123            inner: self
124                .inner
125                .resume_multipart_upload(key.into(), upload_id.into())?
126                .into(),
127        })
128    }
129}
130
131impl EnvBinding for Bucket {
132    const TYPE_NAME: &'static str = "R2Bucket";
133}
134
135impl JsCast for Bucket {
136    fn instanceof(val: &JsValue) -> bool {
137        val.is_instance_of::<EdgeR2Bucket>()
138    }
139
140    fn unchecked_from_js(val: JsValue) -> Self {
141        Self { inner: val.into() }
142    }
143
144    fn unchecked_from_js_ref(val: &JsValue) -> &Self {
145        unsafe { &*(val as *const JsValue as *const Self) }
146    }
147}
148
149impl From<Bucket> for JsValue {
150    fn from(bucket: Bucket) -> Self {
151        JsValue::from(bucket.inner)
152    }
153}
154
155impl AsRef<JsValue> for Bucket {
156    fn as_ref(&self) -> &JsValue {
157        &self.inner
158    }
159}
160
161/// [Object] is created when you [put](Bucket::put) an object into a [Bucket]. [Object] represents
162/// the metadata of an object based on the information provided by the uploader. Every object that
163/// you [put](Bucket::put) into a [Bucket] will have an [Object] created.
164pub struct Object {
165    inner: ObjectInner,
166}
167
168impl Object {
169    pub fn key(&self) -> String {
170        match &self.inner {
171            ObjectInner::NoBody(inner) => inner.key().unwrap(),
172            ObjectInner::Body(inner) => inner.key().unwrap(),
173        }
174    }
175
176    pub fn version(&self) -> String {
177        match &self.inner {
178            ObjectInner::NoBody(inner) => inner.version().unwrap(),
179            ObjectInner::Body(inner) => inner.version().unwrap(),
180        }
181    }
182
183    pub fn size(&self) -> u64 {
184        let size = match &self.inner {
185            ObjectInner::NoBody(inner) => inner.size().unwrap(),
186            ObjectInner::Body(inner) => inner.size().unwrap(),
187        };
188        size.round() as u64
189    }
190
191    pub fn etag(&self) -> String {
192        match &self.inner {
193            ObjectInner::NoBody(inner) => inner.etag().unwrap(),
194            ObjectInner::Body(inner) => inner.etag().unwrap(),
195        }
196    }
197
198    pub fn http_etag(&self) -> String {
199        match &self.inner {
200            ObjectInner::NoBody(inner) => inner.http_etag().unwrap(),
201            ObjectInner::Body(inner) => inner.http_etag().unwrap(),
202        }
203    }
204
205    pub fn uploaded(&self) -> Date {
206        match &self.inner {
207            ObjectInner::NoBody(inner) => inner.uploaded().unwrap(),
208            ObjectInner::Body(inner) => inner.uploaded().unwrap(),
209        }
210        .into()
211    }
212
213    pub fn http_metadata(&self) -> HttpMetadata {
214        match &self.inner {
215            ObjectInner::NoBody(inner) => inner.http_metadata().unwrap(),
216            ObjectInner::Body(inner) => inner.http_metadata().unwrap(),
217        }
218        .into()
219    }
220
221    pub fn checksum(&self) -> R2Checksums {
222        match &self.inner {
223            ObjectInner::NoBody(inner) => inner.checksums().unwrap(),
224            ObjectInner::Body(inner) => inner.checksums().unwrap(),
225        }
226        .into()
227    }
228
229    pub fn custom_metadata(&self) -> Result<HashMap<String, String>> {
230        let metadata = match &self.inner {
231            ObjectInner::NoBody(inner) => inner.custom_metadata().unwrap(),
232            ObjectInner::Body(inner) => inner.custom_metadata().unwrap(),
233        };
234
235        let keys = js_sys::Object::keys(&metadata).to_vec();
236        let mut map = HashMap::with_capacity(keys.len());
237
238        for key in keys {
239            let key = key.unchecked_into::<JsString>();
240            let value = Reflect::get(&metadata, &key)?.dyn_into::<JsString>()?;
241            map.insert(key.into(), value.into());
242        }
243
244        Ok(map)
245    }
246
247    pub fn range(&self) -> Result<Range> {
248        match &self.inner {
249            ObjectInner::NoBody(inner) => inner.range().unwrap(),
250            ObjectInner::Body(inner) => inner.range().unwrap(),
251        }
252        .try_into()
253    }
254
255    pub fn body(&self) -> Option<ObjectBody> {
256        match &self.inner {
257            ObjectInner::NoBody(_) => None,
258            ObjectInner::Body(body) => Some(ObjectBody { inner: body }),
259        }
260    }
261
262    pub fn body_used(&self) -> Option<bool> {
263        match &self.inner {
264            ObjectInner::NoBody(_) => None,
265            ObjectInner::Body(inner) => Some(inner.body_used().unwrap()),
266        }
267    }
268
269    pub fn write_http_metadata(&self, headers: Headers) -> Result<()> {
270        match &self.inner {
271            ObjectInner::NoBody(inner) => inner.write_http_metadata(headers.0)?,
272            ObjectInner::Body(inner) => inner.write_http_metadata(headers.0)?,
273        };
274
275        Ok(())
276    }
277}
278
279/// The data contained within an [Object].
280pub struct ObjectBody<'body> {
281    inner: &'body EdgeR2ObjectBody,
282}
283
284impl ObjectBody<'_> {
285    /// Reads the data in the [Object] via a [ByteStream].
286    pub fn stream(self) -> Result<ByteStream> {
287        if self.inner.body_used()? {
288            return Err(Error::BodyUsed);
289        }
290
291        let stream = self.inner.body()?;
292        let stream = wasm_streams::ReadableStream::from_raw(stream.unchecked_into());
293        Ok(ByteStream {
294            inner: stream.into_stream(),
295        })
296    }
297
298    /// Returns a [ResponseBody] containing the data in the [Object].
299    ///
300    /// This function can be used to hand off the [Object] data to the workers runtime for streaming
301    /// to the client in a [crate::Response]. This ensures that the worker does not consume CPU time
302    /// while the streaming occurs, which can be significant if instead [ObjectBody::stream] is used.
303    pub fn response_body(self) -> Result<ResponseBody> {
304        if self.inner.body_used()? {
305            return Err(Error::BodyUsed);
306        }
307
308        Ok(ResponseBody::Stream(self.inner.body()?))
309    }
310
311    pub async fn bytes(self) -> Result<Vec<u8>> {
312        let js_buffer = JsFuture::from(self.inner.array_buffer()?).await?;
313        let js_buffer = Uint8Array::new(&js_buffer);
314        let mut bytes = vec![0; js_buffer.length() as usize];
315        js_buffer.copy_to(&mut bytes);
316
317        Ok(bytes)
318    }
319
320    pub async fn text(self) -> Result<String> {
321        String::from_utf8(self.bytes().await?).map_err(|e| Error::RustError(e.to_string()))
322    }
323}
324
325/// [UploadedPart] represents a part that has been uploaded.
326/// [UploadedPart] objects are returned from [upload_part](MultipartUpload::upload_part) operations
327/// and must be passed to the [complete](MultipartUpload::complete) operation.
328pub struct UploadedPart {
329    inner: EdgeR2UploadedPart,
330}
331
332impl UploadedPart {
333    pub fn new(part_number: u16, etag: String) -> Self {
334        let obj = js_sys::Object::new();
335        Reflect::set(
336            &obj,
337            &JsValue::from_str("partNumber"),
338            &JsValue::from_f64(part_number as f64),
339        )
340        .unwrap();
341        Reflect::set(&obj, &JsValue::from_str("etag"), &JsValue::from_str(&etag)).unwrap();
342
343        let val: JsValue = obj.into();
344        Self { inner: val.into() }
345    }
346
347    pub fn part_number(&self) -> u16 {
348        self.inner.part_number().unwrap()
349    }
350
351    pub fn etag(&self) -> String {
352        self.inner.etag().unwrap()
353    }
354}
355
356pub struct MultipartUpload {
357    inner: EdgeR2MultipartUpload,
358}
359
360impl MultipartUpload {
361    /// Uploads a single part with the specified part number to this multipart upload.
362    ///
363    /// Returns an [UploadedPart] object containing the etag and part number.
364    /// These [UploadedPart] objects are required when completing the multipart upload.
365    ///
366    /// Getting hold of a value of this type does not guarantee that there is an active
367    /// underlying multipart upload corresponding to that object.
368    ///
369    /// A multipart upload can be completed or aborted at any time, either through the S3 API,
370    /// or by a parallel invocation of your Worker.
371    /// Therefore it is important to add the necessary error handling code around each operation
372    /// on the [MultipartUpload] object in case the underlying multipart upload no longer exists.
373    pub async fn upload_part(
374        &self,
375        part_number: u16,
376        value: impl Into<Data>,
377    ) -> Result<UploadedPart> {
378        let uploaded_part =
379            JsFuture::from(self.inner.upload_part(part_number, value.into().into())?).await?;
380        Ok(UploadedPart {
381            inner: uploaded_part.into(),
382        })
383    }
384
385    /// Request the upload id.
386    pub async fn upload_id(&self) -> String {
387        self.inner.upload_id().unwrap()
388    }
389
390    /// Aborts the multipart upload.
391    pub async fn abort(&self) -> Result<()> {
392        JsFuture::from(self.inner.abort()?).await?;
393        Ok(())
394    }
395
396    /// Completes the multipart upload with the given parts.
397    /// When the future is ready, the object is immediately accessible globally by any subsequent read operation.
398    pub async fn complete(
399        self,
400        uploaded_parts: impl IntoIterator<Item = UploadedPart>,
401    ) -> Result<Object> {
402        let object = JsFuture::from(
403            self.inner.complete(
404                uploaded_parts
405                    .into_iter()
406                    .map(|part| part.inner.into())
407                    .collect(),
408            )?,
409        )
410        .await?;
411        Ok(Object {
412            inner: ObjectInner::Body(object.into()),
413        })
414    }
415}
416
417/// A series of [Object]s returned by [list](Bucket::list).
418pub struct Objects {
419    inner: EdgeR2Objects,
420}
421
422impl Objects {
423    /// An [Vec] of [Object] matching the [list](Bucket::list) request.
424    pub fn objects(&self) -> Vec<Object> {
425        self.inner
426            .objects()
427            .unwrap()
428            .into_iter()
429            .map(|raw| Object {
430                inner: ObjectInner::NoBody(raw),
431            })
432            .collect()
433    }
434
435    /// If true, indicates there are more results to be retrieved for the current
436    /// [list](Bucket::list) request.
437    pub fn truncated(&self) -> bool {
438        self.inner.truncated().unwrap()
439    }
440
441    /// A token that can be passed to future [list](Bucket::list) calls to resume listing from that
442    /// point. Only present if truncated is true.
443    pub fn cursor(&self) -> Option<String> {
444        self.inner.cursor().unwrap()
445    }
446
447    /// If a delimiter has been specified, contains all prefixes between the specified prefix and
448    /// the next occurrence of the delimiter.
449    ///
450    /// For example, if no prefix is provided and the delimiter is '/', `foo/bar/baz` would return
451    /// `foo` as a delimited prefix. If `foo/` was passed as a prefix with the same structure and
452    /// delimiter, `foo/bar` would be returned as a delimited prefix.
453    pub fn delimited_prefixes(&self) -> Vec<String> {
454        self.inner
455            .delimited_prefixes()
456            .unwrap()
457            .into_iter()
458            .map(Into::into)
459            .collect()
460    }
461}
462
463#[derive(Clone)]
464pub(crate) enum ObjectInner {
465    NoBody(EdgeR2Object),
466    Body(EdgeR2ObjectBody),
467}
468
469pub enum Data {
470    ReadableStream(web_sys::ReadableStream),
471    Stream(FixedLengthStream),
472    Text(String),
473    Bytes(Vec<u8>),
474    Empty,
475}
476
477impl From<web_sys::ReadableStream> for Data {
478    fn from(stream: web_sys::ReadableStream) -> Self {
479        Data::ReadableStream(stream)
480    }
481}
482
483impl From<FixedLengthStream> for Data {
484    fn from(stream: FixedLengthStream) -> Self {
485        Data::Stream(stream)
486    }
487}
488
489impl From<String> for Data {
490    fn from(value: String) -> Self {
491        Data::Text(value)
492    }
493}
494
495impl From<Vec<u8>> for Data {
496    fn from(value: Vec<u8>) -> Self {
497        Data::Bytes(value)
498    }
499}
500
501impl From<Data> for JsValue {
502    fn from(data: Data) -> Self {
503        match data {
504            Data::ReadableStream(stream) => stream.into(),
505            Data::Stream(stream) => {
506                let stream_sys: EdgeFixedLengthStream = stream.into();
507                stream_sys.readable().into()
508            }
509            Data::Text(text) => JsString::from(text).into(),
510            Data::Bytes(bytes) => {
511                let arr = Uint8Array::new_with_length(bytes.len() as u32);
512                arr.copy_from(&bytes);
513                arr.into()
514            }
515            Data::Empty => JsValue::NULL,
516        }
517    }
518}