[go: up one dir, main page]

worker/
queue.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13/// A batch of messages that are sent to a consumer Worker.
14pub struct MessageBatch<T> {
15    inner: MessageBatchSys,
16    phantom: PhantomData<T>,
17}
18
19impl<T> MessageBatch<T> {
20    /// The name of the Queue that belongs to this batch.
21    pub fn queue(&self) -> String {
22        self.inner.queue().unwrap().into()
23    }
24
25    /// Marks every message to be retried in the next batch.
26    pub fn retry_all(&self) {
27        self.inner.retry_all(JsValue::null()).unwrap();
28    }
29
30    /// Marks every message to be retried in the next batch with options.
31    pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
32        self.inner
33            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
34            .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
35            .unwrap();
36    }
37
38    /// Marks every message acknowledged in the batch.
39    pub fn ack_all(&self) {
40        self.inner.ack_all().unwrap();
41    }
42
43    /// Iterator for raw messages in the message batch. Ordering of messages is not guaranteed.
44    pub fn raw_iter(&self) -> RawMessageIter {
45        let messages = self.inner.messages().unwrap();
46        RawMessageIter {
47            range: 0..messages.length(),
48            array: messages,
49        }
50    }
51}
52
53impl<T: DeserializeOwned> MessageBatch<T> {
54    /// An array of messages in the batch. Ordering of messages is not guaranteed.
55    pub fn messages(&self) -> Result<Vec<Message<T>>> {
56        self.iter().collect()
57    }
58
59    /// Iterator for messages in the message batch. Ordering of messages is not guaranteed.
60    pub fn iter(&self) -> MessageIter<T> {
61        let messages = self.inner.messages().unwrap();
62        MessageIter {
63            range: 0..messages.length(),
64            array: messages,
65            marker: PhantomData,
66        }
67    }
68}
69
70impl<T> From<MessageBatchSys> for MessageBatch<T> {
71    fn from(value: MessageBatchSys) -> Self {
72        Self {
73            inner: value,
74            phantom: PhantomData,
75        }
76    }
77}
78
79/// A message that is sent to a consumer Worker.
80pub struct Message<T> {
81    inner: MessageSys,
82    body: T,
83}
84
85impl<T> Message<T> {
86    /// The body of the message.
87    pub fn body(&self) -> &T {
88        &self.body
89    }
90
91    /// The body of the message.
92    pub fn into_body(self) -> T {
93        self.body
94    }
95
96    /// The raw body of the message.
97    pub fn raw_body(&self) -> JsValue {
98        self.inner().body().unwrap()
99    }
100}
101
102impl<T> TryFrom<RawMessage> for Message<T>
103where
104    T: DeserializeOwned,
105{
106    type Error = Error;
107
108    fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
109        let body = serde_wasm_bindgen::from_value(value.body())?;
110        Ok(Self {
111            inner: value.inner,
112            body,
113        })
114    }
115}
116
117/// A message that is sent to a consumer Worker.
118pub struct RawMessage {
119    inner: MessageSys,
120}
121
122impl RawMessage {
123    /// The body of the message.
124    pub fn body(&self) -> JsValue {
125        self.inner.body().unwrap()
126    }
127}
128
129impl From<MessageSys> for RawMessage {
130    fn from(value: MessageSys) -> Self {
131        Self { inner: value }
132    }
133}
134
135trait MessageSysInner {
136    fn inner(&self) -> &MessageSys;
137}
138
139impl MessageSysInner for RawMessage {
140    fn inner(&self) -> &MessageSys {
141        &self.inner
142    }
143}
144
145impl<T> MessageSysInner for Message<T> {
146    fn inner(&self) -> &MessageSys {
147        &self.inner
148    }
149}
150
151#[derive(Serialize)]
152#[serde(rename_all = "camelCase")]
153/// Optional configuration when marking a message or a batch of messages for retry.
154pub struct QueueRetryOptions {
155    delay_seconds: Option<u32>,
156}
157
158pub struct QueueRetryOptionsBuilder {
159    delay_seconds: Option<u32>,
160}
161
162impl QueueRetryOptionsBuilder {
163    /// Creates a new retry options builder.
164    pub fn new() -> Self {
165        Self {
166            delay_seconds: None,
167        }
168    }
169
170    #[must_use]
171    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
172    pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
173        self.delay_seconds = Some(delay_seconds);
174        self
175    }
176
177    /// Build the retry options.
178    pub fn build(self) -> QueueRetryOptions {
179        QueueRetryOptions {
180            delay_seconds: self.delay_seconds,
181        }
182    }
183}
184
185pub trait MessageExt {
186    /// A unique, system-generated ID for the message.
187    fn id(&self) -> String;
188
189    /// A timestamp when the message was sent.
190    fn timestamp(&self) -> Date;
191
192    /// Marks message to be retried.
193    fn retry(&self);
194
195    /// Marks message to be retried with options.
196    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
197
198    /// Marks message acknowledged.
199    fn ack(&self);
200}
201
202impl<T: MessageSysInner> MessageExt for T {
203    /// A unique, system-generated ID for the message.
204    fn id(&self) -> String {
205        self.inner().id().unwrap().into()
206    }
207
208    /// A timestamp when the message was sent.
209    fn timestamp(&self) -> Date {
210        Date::from(self.inner().timestamp().unwrap())
211    }
212
213    /// Marks message to be retried.
214    fn retry(&self) {
215        self.inner().retry(JsValue::null()).unwrap();
216    }
217
218    /// Marks message to be retried with options.
219    fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) {
220        self.inner()
221            // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value.
222            .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
223            .unwrap();
224    }
225
226    /// Marks message acknowledged.
227    fn ack(&self) {
228        self.inner().ack().unwrap();
229    }
230}
231
232pub struct MessageIter<T> {
233    range: std::ops::Range<u32>,
234    array: Array,
235    marker: PhantomData<T>,
236}
237
238impl<T> std::iter::Iterator for MessageIter<T>
239where
240    T: DeserializeOwned,
241{
242    type Item = Result<Message<T>>;
243
244    fn next(&mut self) -> Option<Self::Item> {
245        let index = self.range.next()?;
246        let value = self.array.get(index);
247        let raw_message = RawMessage::from(MessageSys::from(value));
248        Some(raw_message.try_into())
249    }
250
251    #[inline]
252    fn size_hint(&self) -> (usize, Option<usize>) {
253        self.range.size_hint()
254    }
255}
256
257impl<T> std::iter::DoubleEndedIterator for MessageIter<T>
258where
259    T: DeserializeOwned,
260{
261    fn next_back(&mut self) -> Option<Self::Item> {
262        let index = self.range.next_back()?;
263        let value = self.array.get(index);
264        let raw_message = RawMessage::from(MessageSys::from(value));
265        Some(raw_message.try_into())
266    }
267}
268
269impl<T> std::iter::FusedIterator for MessageIter<T> where T: DeserializeOwned {}
270
271impl<T> std::iter::ExactSizeIterator for MessageIter<T> where T: DeserializeOwned {}
272
273pub struct RawMessageIter {
274    range: std::ops::Range<u32>,
275    array: Array,
276}
277
278impl std::iter::Iterator for RawMessageIter {
279    type Item = RawMessage;
280
281    fn next(&mut self) -> Option<Self::Item> {
282        let index = self.range.next()?;
283        let value = self.array.get(index);
284        Some(MessageSys::from(value).into())
285    }
286
287    #[inline]
288    fn size_hint(&self) -> (usize, Option<usize>) {
289        self.range.size_hint()
290    }
291}
292
293impl std::iter::DoubleEndedIterator for RawMessageIter {
294    fn next_back(&mut self) -> Option<Self::Item> {
295        let index = self.range.next_back()?;
296        let value = self.array.get(index);
297        Some(MessageSys::from(value).into())
298    }
299}
300
301impl std::iter::FusedIterator for RawMessageIter {}
302
303impl std::iter::ExactSizeIterator for RawMessageIter {}
304
305#[derive(Clone)]
306pub struct Queue(EdgeQueue);
307
308unsafe impl Send for Queue {}
309unsafe impl Sync for Queue {}
310
311impl EnvBinding for Queue {
312    const TYPE_NAME: &'static str = "WorkerQueue";
313}
314
315impl JsCast for Queue {
316    fn instanceof(val: &JsValue) -> bool {
317        val.is_instance_of::<Queue>()
318    }
319
320    fn unchecked_from_js(val: JsValue) -> Self {
321        Self(val.into())
322    }
323
324    fn unchecked_from_js_ref(val: &JsValue) -> &Self {
325        unsafe { &*(val as *const JsValue as *const Self) }
326    }
327}
328
329impl From<Queue> for JsValue {
330    fn from(queue: Queue) -> Self {
331        JsValue::from(queue.0)
332    }
333}
334
335impl AsRef<JsValue> for Queue {
336    fn as_ref(&self) -> &JsValue {
337        &self.0
338    }
339}
340
341#[derive(Clone, Copy, Debug)]
342pub enum QueueContentType {
343    /// Send a JavaScript object that can be JSON-serialized. This content type can be previewed from the Cloudflare dashboard.
344    Json,
345    /// Send a String. This content type can be previewed with the List messages from the dashboard feature.
346    Text,
347    /// Send a JavaScript object that cannot be JSON-serialized but is supported by structured clone (for example Date and Map). This content type cannot be previewed from the Cloudflare dashboard and will display as Base64-encoded.
348    V8,
349}
350
351impl Serialize for QueueContentType {
352    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
353    where
354        S: serde::Serializer,
355    {
356        serializer.serialize_str(match self {
357            Self::Json => "json",
358            Self::Text => "text",
359            Self::V8 => "v8",
360        })
361    }
362}
363
364#[derive(Serialize)]
365#[serde(rename_all = "camelCase")]
366pub struct QueueSendOptions {
367    content_type: Option<QueueContentType>,
368    delay_seconds: Option<u32>,
369}
370
371pub struct MessageBuilder<T> {
372    message: T,
373    delay_seconds: Option<u32>,
374    content_type: QueueContentType,
375}
376
377impl<T: Serialize> MessageBuilder<T> {
378    /// Creates a new message builder. The message must be `serializable`.
379    pub fn new(message: T) -> Self {
380        Self {
381            message,
382            delay_seconds: None,
383            content_type: QueueContentType::Json,
384        }
385    }
386
387    #[must_use]
388    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
389    pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
390        self.delay_seconds = Some(delay_seconds);
391        self
392    }
393
394    #[must_use]
395    /// The content type of the message.
396    /// Default is `QueueContentType::Json`.
397    pub fn content_type(mut self, content_type: QueueContentType) -> Self {
398        self.content_type = content_type;
399        self
400    }
401
402    /// Build the message.
403    pub fn build(self) -> SendMessage<T> {
404        SendMessage {
405            message: self.message,
406            options: Some(QueueSendOptions {
407                content_type: Some(self.content_type),
408                delay_seconds: self.delay_seconds,
409            }),
410        }
411    }
412}
413
414pub struct RawMessageBuilder {
415    message: JsValue,
416    delay_seconds: Option<u32>,
417}
418
419impl RawMessageBuilder {
420    /// Creates a new raw message builder. The message must be a `JsValue`.
421    pub fn new(message: JsValue) -> Self {
422        Self {
423            message,
424            delay_seconds: None,
425        }
426    }
427
428    #[must_use]
429    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
430    pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
431        self.delay_seconds = Some(delay_seconds);
432        self
433    }
434
435    /// Build the message with a content type.
436    pub fn build_with_content_type(self, content_type: QueueContentType) -> SendMessage<JsValue> {
437        SendMessage {
438            message: self.message,
439            options: Some(QueueSendOptions {
440                content_type: Some(content_type),
441                delay_seconds: self.delay_seconds,
442            }),
443        }
444    }
445}
446
447/// A wrapper type used for sending message.
448///
449/// This type can't be constructed directly.
450///
451/// It should be constructed using the `MessageBuilder`, `RawMessageBuilder` or by calling `.into()` on a struct that is `serializable`.
452pub struct SendMessage<T> {
453    /// The body of the message.
454    ///
455    /// Can be either a serializable struct or a `JsValue`.
456    message: T,
457
458    /// Options to apply to the current message, including content type and message delay settings.
459    options: Option<QueueSendOptions>,
460}
461
462impl<T: Serialize> SendMessage<T> {
463    fn into_raw_send_message(self) -> Result<SendMessage<JsValue>> {
464        Ok(SendMessage {
465            message: serde_wasm_bindgen::to_value(&self.message)?,
466            options: self.options,
467        })
468    }
469}
470
471impl<T: Serialize> From<T> for SendMessage<T> {
472    fn from(message: T) -> Self {
473        Self {
474            message,
475            options: Some(QueueSendOptions {
476                content_type: Some(QueueContentType::Json),
477                delay_seconds: None,
478            }),
479        }
480    }
481}
482
483pub struct BatchSendMessage<T> {
484    body: Vec<SendMessage<T>>,
485    options: Option<QueueSendBatchOptions>,
486}
487
488#[derive(Serialize)]
489#[serde(rename_all = "camelCase")]
490pub struct QueueSendBatchOptions {
491    delay_seconds: Option<u32>,
492}
493
494pub struct BatchMessageBuilder<T> {
495    messages: Vec<SendMessage<T>>,
496    delay_seconds: Option<u32>,
497}
498
499impl<T> BatchMessageBuilder<T> {
500    /// Creates a new batch message builder.
501    pub fn new() -> Self {
502        Self {
503            messages: Vec::new(),
504            delay_seconds: None,
505        }
506    }
507
508    #[must_use]
509    /// Adds a message to the batch.
510    pub fn message<U: Into<SendMessage<T>>>(mut self, message: U) -> Self {
511        self.messages.push(message.into());
512        self
513    }
514
515    #[must_use]
516    /// Adds messages to the batch.
517    pub fn messages<U, V>(mut self, messages: U) -> Self
518    where
519        U: IntoIterator<Item = V>,
520        V: Into<SendMessage<T>>,
521    {
522        self.messages
523            .extend(messages.into_iter().map(std::convert::Into::into));
524        self
525    }
526
527    #[must_use]
528    /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer
529    pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
530        self.delay_seconds = Some(delay_seconds);
531        self
532    }
533
534    pub fn build(self) -> BatchSendMessage<T> {
535        BatchSendMessage {
536            body: self.messages,
537            options: self
538                .delay_seconds
539                .map(|delay_seconds| QueueSendBatchOptions {
540                    delay_seconds: Some(delay_seconds),
541                }),
542        }
543    }
544}
545
546impl<T, U, V> From<U> for BatchSendMessage<T>
547where
548    U: IntoIterator<Item = V>,
549    V: Into<SendMessage<T>>,
550{
551    fn from(value: U) -> Self {
552        Self {
553            body: value.into_iter().map(std::convert::Into::into).collect(),
554            options: None,
555        }
556    }
557}
558
559impl<T: Serialize> BatchSendMessage<T> {
560    fn into_raw_batch_send_message(self) -> Result<BatchSendMessage<JsValue>> {
561        Ok(BatchSendMessage {
562            body: self
563                .body
564                .into_iter()
565                .map(SendMessage::into_raw_send_message)
566                .collect::<Result<_>>()?,
567            options: self.options,
568        })
569    }
570}
571
572impl Queue {
573    /// Sends a message to the Queue.
574    ///
575    /// Accepts a struct that is `serializable`.
576    ///
577    /// If message options are needed use the `MessageBuilder` to create the message.
578    ///
579    /// ## Example
580    /// ```no_run
581    /// #[derive(Serialize)]
582    /// pub struct MyMessage {
583    ///     my_data: u32,
584    /// }
585    ///
586    /// queue.send(MyMessage{ my_data: 1}).await?;
587    /// ```
588    pub async fn send<T, U: Into<SendMessage<T>>>(&self, message: U) -> Result<()>
589    where
590        T: Serialize,
591    {
592        let message: SendMessage<T> = message.into();
593        let serialized_message = message.into_raw_send_message()?;
594        self.send_raw(serialized_message).await
595    }
596
597    /// Sends a raw `JsValue` to the Queue.
598    ///
599    /// Use the `RawMessageBuilder` to create the message.
600    pub async fn send_raw<T: Into<SendMessage<JsValue>>>(&self, message: T) -> Result<()> {
601        let message: SendMessage<JsValue> = message.into();
602        let options = match message.options {
603            Some(options) => serde_wasm_bindgen::to_value(&options)?,
604            None => JsValue::null(),
605        };
606
607        let fut: JsFuture = self.0.send(message.message, options)?.into();
608        fut.await.map_err(Error::from)?;
609        Ok(())
610    }
611
612    /// Sends a batch of messages to the Queue.
613    ///
614    /// Accepts an iterator that produces structs that are `serializable`.
615    ///
616    /// If message options are needed use the `BatchMessageBuilder` to create the batch.
617    ///
618    /// ## Example
619    /// ```no_run
620    /// #[derive(Serialize)]
621    /// pub struct MyMessage {
622    ///     my_data: u32,
623    /// }
624    ///
625    /// queue.send_batch(vec![MyMessage{ my_data: 1}]).await?;
626    /// ```
627    pub async fn send_batch<T: Serialize, U: Into<BatchSendMessage<T>>>(
628        &self,
629        messages: U,
630    ) -> Result<()> {
631        let messages: BatchSendMessage<T> = messages.into();
632        let serialized_messages = messages.into_raw_batch_send_message()?;
633        self.send_raw_batch(serialized_messages).await
634    }
635
636    /// Sends a batch of raw messages to the Queue.
637    ///
638    /// Accepts an iterator that produces structs that are `serializable`.
639    ///
640    /// If message options are needed use the `BatchMessageBuilder` to create the batch.
641    pub async fn send_raw_batch<T: Into<BatchSendMessage<JsValue>>>(
642        &self,
643        messages: T,
644    ) -> Result<()> {
645        let messages: BatchSendMessage<JsValue> = messages.into();
646        let batch_send_options = serde_wasm_bindgen::to_value(&messages.options)?;
647
648        let messages = messages
649            .body
650            .into_iter()
651            .map(|message: SendMessage<JsValue>| {
652                let body = message.message;
653                let message_send_request = js_sys::Object::new();
654
655                js_sys::Reflect::set(&message_send_request, &"body".into(), &body)?;
656                js_sys::Reflect::set(
657                    &message_send_request,
658                    &"contentType".into(),
659                    &serde_wasm_bindgen::to_value(
660                        &message.options.as_ref().map(|o| o.content_type),
661                    )?,
662                )?;
663                js_sys::Reflect::set(
664                    &message_send_request,
665                    &"delaySeconds".into(),
666                    &serde_wasm_bindgen::to_value(
667                        &message.options.as_ref().map(|o| o.delay_seconds),
668                    )?,
669                )?;
670
671                Ok::<JsValue, Error>(message_send_request.into())
672            })
673            .collect::<Result<js_sys::Array>>()?;
674
675        let fut: JsFuture = self.0.send_batch(messages, batch_send_options)?.into();
676        fut.await.map_err(Error::from)?;
677        Ok(())
678    }
679}