1use std::{
2 convert::{TryFrom, TryInto},
3 marker::PhantomData,
4};
5
6use crate::{env::EnvBinding, Date, Error, Result};
7use js_sys::Array;
8use serde::{de::DeserializeOwned, Serialize};
9use wasm_bindgen::{prelude::*, JsCast};
10use wasm_bindgen_futures::JsFuture;
11use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue};
12
13pub struct MessageBatch<T> {
15 inner: MessageBatchSys,
16 phantom: PhantomData<T>,
17}
18
19impl<T> MessageBatch<T> {
20 pub fn queue(&self) -> String {
22 self.inner.queue().unwrap().into()
23 }
24
25 pub fn retry_all(&self) {
27 self.inner.retry_all(JsValue::null()).unwrap();
28 }
29
30 pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) {
32 self.inner
33 .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
35 .unwrap();
36 }
37
38 pub fn ack_all(&self) {
40 self.inner.ack_all().unwrap();
41 }
42
43 pub fn raw_iter(&self) -> RawMessageIter {
45 let messages = self.inner.messages().unwrap();
46 RawMessageIter {
47 range: 0..messages.length(),
48 array: messages,
49 }
50 }
51}
52
53impl<T: DeserializeOwned> MessageBatch<T> {
54 pub fn messages(&self) -> Result<Vec<Message<T>>> {
56 self.iter().collect()
57 }
58
59 pub fn iter(&self) -> MessageIter<T> {
61 let messages = self.inner.messages().unwrap();
62 MessageIter {
63 range: 0..messages.length(),
64 array: messages,
65 marker: PhantomData,
66 }
67 }
68}
69
70impl<T> From<MessageBatchSys> for MessageBatch<T> {
71 fn from(value: MessageBatchSys) -> Self {
72 Self {
73 inner: value,
74 phantom: PhantomData,
75 }
76 }
77}
78
79pub struct Message<T> {
81 inner: MessageSys,
82 body: T,
83}
84
85impl<T> Message<T> {
86 pub fn body(&self) -> &T {
88 &self.body
89 }
90
91 pub fn into_body(self) -> T {
93 self.body
94 }
95
96 pub fn raw_body(&self) -> JsValue {
98 self.inner().body().unwrap()
99 }
100}
101
102impl<T> TryFrom<RawMessage> for Message<T>
103where
104 T: DeserializeOwned,
105{
106 type Error = Error;
107
108 fn try_from(value: RawMessage) -> std::result::Result<Self, Self::Error> {
109 let body = serde_wasm_bindgen::from_value(value.body())?;
110 Ok(Self {
111 inner: value.inner,
112 body,
113 })
114 }
115}
116
117pub struct RawMessage {
119 inner: MessageSys,
120}
121
122impl RawMessage {
123 pub fn body(&self) -> JsValue {
125 self.inner.body().unwrap()
126 }
127}
128
129impl From<MessageSys> for RawMessage {
130 fn from(value: MessageSys) -> Self {
131 Self { inner: value }
132 }
133}
134
135trait MessageSysInner {
136 fn inner(&self) -> &MessageSys;
137}
138
139impl MessageSysInner for RawMessage {
140 fn inner(&self) -> &MessageSys {
141 &self.inner
142 }
143}
144
145impl<T> MessageSysInner for Message<T> {
146 fn inner(&self) -> &MessageSys {
147 &self.inner
148 }
149}
150
151#[derive(Serialize)]
152#[serde(rename_all = "camelCase")]
153pub struct QueueRetryOptions {
155 delay_seconds: Option<u32>,
156}
157
158pub struct QueueRetryOptionsBuilder {
159 delay_seconds: Option<u32>,
160}
161
162impl QueueRetryOptionsBuilder {
163 pub fn new() -> Self {
165 Self {
166 delay_seconds: None,
167 }
168 }
169
170 #[must_use]
171 pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self {
173 self.delay_seconds = Some(delay_seconds);
174 self
175 }
176
177 pub fn build(self) -> QueueRetryOptions {
179 QueueRetryOptions {
180 delay_seconds: self.delay_seconds,
181 }
182 }
183}
184
185pub trait MessageExt {
186 fn id(&self) -> String;
188
189 fn timestamp(&self) -> Date;
191
192 fn retry(&self);
194
195 fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions);
197
198 fn ack(&self);
200}
201
202impl<T: MessageSysInner> MessageExt for T {
203 fn id(&self) -> String {
205 self.inner().id().unwrap().into()
206 }
207
208 fn timestamp(&self) -> Date {
210 Date::from(self.inner().timestamp().unwrap())
211 }
212
213 fn retry(&self) {
215 self.inner().retry(JsValue::null()).unwrap();
216 }
217
218 fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) {
220 self.inner()
221 .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap())
223 .unwrap();
224 }
225
226 fn ack(&self) {
228 self.inner().ack().unwrap();
229 }
230}
231
232pub struct MessageIter<T> {
233 range: std::ops::Range<u32>,
234 array: Array,
235 marker: PhantomData<T>,
236}
237
238impl<T> std::iter::Iterator for MessageIter<T>
239where
240 T: DeserializeOwned,
241{
242 type Item = Result<Message<T>>;
243
244 fn next(&mut self) -> Option<Self::Item> {
245 let index = self.range.next()?;
246 let value = self.array.get(index);
247 let raw_message = RawMessage::from(MessageSys::from(value));
248 Some(raw_message.try_into())
249 }
250
251 #[inline]
252 fn size_hint(&self) -> (usize, Option<usize>) {
253 self.range.size_hint()
254 }
255}
256
257impl<T> std::iter::DoubleEndedIterator for MessageIter<T>
258where
259 T: DeserializeOwned,
260{
261 fn next_back(&mut self) -> Option<Self::Item> {
262 let index = self.range.next_back()?;
263 let value = self.array.get(index);
264 let raw_message = RawMessage::from(MessageSys::from(value));
265 Some(raw_message.try_into())
266 }
267}
268
269impl<T> std::iter::FusedIterator for MessageIter<T> where T: DeserializeOwned {}
270
271impl<T> std::iter::ExactSizeIterator for MessageIter<T> where T: DeserializeOwned {}
272
273pub struct RawMessageIter {
274 range: std::ops::Range<u32>,
275 array: Array,
276}
277
278impl std::iter::Iterator for RawMessageIter {
279 type Item = RawMessage;
280
281 fn next(&mut self) -> Option<Self::Item> {
282 let index = self.range.next()?;
283 let value = self.array.get(index);
284 Some(MessageSys::from(value).into())
285 }
286
287 #[inline]
288 fn size_hint(&self) -> (usize, Option<usize>) {
289 self.range.size_hint()
290 }
291}
292
293impl std::iter::DoubleEndedIterator for RawMessageIter {
294 fn next_back(&mut self) -> Option<Self::Item> {
295 let index = self.range.next_back()?;
296 let value = self.array.get(index);
297 Some(MessageSys::from(value).into())
298 }
299}
300
301impl std::iter::FusedIterator for RawMessageIter {}
302
303impl std::iter::ExactSizeIterator for RawMessageIter {}
304
305#[derive(Clone)]
306pub struct Queue(EdgeQueue);
307
308unsafe impl Send for Queue {}
309unsafe impl Sync for Queue {}
310
311impl EnvBinding for Queue {
312 const TYPE_NAME: &'static str = "WorkerQueue";
313}
314
315impl JsCast for Queue {
316 fn instanceof(val: &JsValue) -> bool {
317 val.is_instance_of::<Queue>()
318 }
319
320 fn unchecked_from_js(val: JsValue) -> Self {
321 Self(val.into())
322 }
323
324 fn unchecked_from_js_ref(val: &JsValue) -> &Self {
325 unsafe { &*(val as *const JsValue as *const Self) }
326 }
327}
328
329impl From<Queue> for JsValue {
330 fn from(queue: Queue) -> Self {
331 JsValue::from(queue.0)
332 }
333}
334
335impl AsRef<JsValue> for Queue {
336 fn as_ref(&self) -> &JsValue {
337 &self.0
338 }
339}
340
341#[derive(Clone, Copy, Debug)]
342pub enum QueueContentType {
343 Json,
345 Text,
347 V8,
349}
350
351impl Serialize for QueueContentType {
352 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
353 where
354 S: serde::Serializer,
355 {
356 serializer.serialize_str(match self {
357 Self::Json => "json",
358 Self::Text => "text",
359 Self::V8 => "v8",
360 })
361 }
362}
363
364#[derive(Serialize)]
365#[serde(rename_all = "camelCase")]
366pub struct QueueSendOptions {
367 content_type: Option<QueueContentType>,
368 delay_seconds: Option<u32>,
369}
370
371pub struct MessageBuilder<T> {
372 message: T,
373 delay_seconds: Option<u32>,
374 content_type: QueueContentType,
375}
376
377impl<T: Serialize> MessageBuilder<T> {
378 pub fn new(message: T) -> Self {
380 Self {
381 message,
382 delay_seconds: None,
383 content_type: QueueContentType::Json,
384 }
385 }
386
387 #[must_use]
388 pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
390 self.delay_seconds = Some(delay_seconds);
391 self
392 }
393
394 #[must_use]
395 pub fn content_type(mut self, content_type: QueueContentType) -> Self {
398 self.content_type = content_type;
399 self
400 }
401
402 pub fn build(self) -> SendMessage<T> {
404 SendMessage {
405 message: self.message,
406 options: Some(QueueSendOptions {
407 content_type: Some(self.content_type),
408 delay_seconds: self.delay_seconds,
409 }),
410 }
411 }
412}
413
414pub struct RawMessageBuilder {
415 message: JsValue,
416 delay_seconds: Option<u32>,
417}
418
419impl RawMessageBuilder {
420 pub fn new(message: JsValue) -> Self {
422 Self {
423 message,
424 delay_seconds: None,
425 }
426 }
427
428 #[must_use]
429 pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
431 self.delay_seconds = Some(delay_seconds);
432 self
433 }
434
435 pub fn build_with_content_type(self, content_type: QueueContentType) -> SendMessage<JsValue> {
437 SendMessage {
438 message: self.message,
439 options: Some(QueueSendOptions {
440 content_type: Some(content_type),
441 delay_seconds: self.delay_seconds,
442 }),
443 }
444 }
445}
446
447pub struct SendMessage<T> {
453 message: T,
457
458 options: Option<QueueSendOptions>,
460}
461
462impl<T: Serialize> SendMessage<T> {
463 fn into_raw_send_message(self) -> Result<SendMessage<JsValue>> {
464 Ok(SendMessage {
465 message: serde_wasm_bindgen::to_value(&self.message)?,
466 options: self.options,
467 })
468 }
469}
470
471impl<T: Serialize> From<T> for SendMessage<T> {
472 fn from(message: T) -> Self {
473 Self {
474 message,
475 options: Some(QueueSendOptions {
476 content_type: Some(QueueContentType::Json),
477 delay_seconds: None,
478 }),
479 }
480 }
481}
482
483pub struct BatchSendMessage<T> {
484 body: Vec<SendMessage<T>>,
485 options: Option<QueueSendBatchOptions>,
486}
487
488#[derive(Serialize)]
489#[serde(rename_all = "camelCase")]
490pub struct QueueSendBatchOptions {
491 delay_seconds: Option<u32>,
492}
493
494pub struct BatchMessageBuilder<T> {
495 messages: Vec<SendMessage<T>>,
496 delay_seconds: Option<u32>,
497}
498
499impl<T> BatchMessageBuilder<T> {
500 pub fn new() -> Self {
502 Self {
503 messages: Vec::new(),
504 delay_seconds: None,
505 }
506 }
507
508 #[must_use]
509 pub fn message<U: Into<SendMessage<T>>>(mut self, message: U) -> Self {
511 self.messages.push(message.into());
512 self
513 }
514
515 #[must_use]
516 pub fn messages<U, V>(mut self, messages: U) -> Self
518 where
519 U: IntoIterator<Item = V>,
520 V: Into<SendMessage<T>>,
521 {
522 self.messages
523 .extend(messages.into_iter().map(std::convert::Into::into));
524 self
525 }
526
527 #[must_use]
528 pub fn delay_seconds(mut self, delay_seconds: u32) -> Self {
530 self.delay_seconds = Some(delay_seconds);
531 self
532 }
533
534 pub fn build(self) -> BatchSendMessage<T> {
535 BatchSendMessage {
536 body: self.messages,
537 options: self
538 .delay_seconds
539 .map(|delay_seconds| QueueSendBatchOptions {
540 delay_seconds: Some(delay_seconds),
541 }),
542 }
543 }
544}
545
546impl<T, U, V> From<U> for BatchSendMessage<T>
547where
548 U: IntoIterator<Item = V>,
549 V: Into<SendMessage<T>>,
550{
551 fn from(value: U) -> Self {
552 Self {
553 body: value.into_iter().map(std::convert::Into::into).collect(),
554 options: None,
555 }
556 }
557}
558
559impl<T: Serialize> BatchSendMessage<T> {
560 fn into_raw_batch_send_message(self) -> Result<BatchSendMessage<JsValue>> {
561 Ok(BatchSendMessage {
562 body: self
563 .body
564 .into_iter()
565 .map(SendMessage::into_raw_send_message)
566 .collect::<Result<_>>()?,
567 options: self.options,
568 })
569 }
570}
571
572impl Queue {
573 pub async fn send<T, U: Into<SendMessage<T>>>(&self, message: U) -> Result<()>
589 where
590 T: Serialize,
591 {
592 let message: SendMessage<T> = message.into();
593 let serialized_message = message.into_raw_send_message()?;
594 self.send_raw(serialized_message).await
595 }
596
597 pub async fn send_raw<T: Into<SendMessage<JsValue>>>(&self, message: T) -> Result<()> {
601 let message: SendMessage<JsValue> = message.into();
602 let options = match message.options {
603 Some(options) => serde_wasm_bindgen::to_value(&options)?,
604 None => JsValue::null(),
605 };
606
607 let fut: JsFuture = self.0.send(message.message, options)?.into();
608 fut.await.map_err(Error::from)?;
609 Ok(())
610 }
611
612 pub async fn send_batch<T: Serialize, U: Into<BatchSendMessage<T>>>(
628 &self,
629 messages: U,
630 ) -> Result<()> {
631 let messages: BatchSendMessage<T> = messages.into();
632 let serialized_messages = messages.into_raw_batch_send_message()?;
633 self.send_raw_batch(serialized_messages).await
634 }
635
636 pub async fn send_raw_batch<T: Into<BatchSendMessage<JsValue>>>(
642 &self,
643 messages: T,
644 ) -> Result<()> {
645 let messages: BatchSendMessage<JsValue> = messages.into();
646 let batch_send_options = serde_wasm_bindgen::to_value(&messages.options)?;
647
648 let messages = messages
649 .body
650 .into_iter()
651 .map(|message: SendMessage<JsValue>| {
652 let body = message.message;
653 let message_send_request = js_sys::Object::new();
654
655 js_sys::Reflect::set(&message_send_request, &"body".into(), &body)?;
656 js_sys::Reflect::set(
657 &message_send_request,
658 &"contentType".into(),
659 &serde_wasm_bindgen::to_value(
660 &message.options.as_ref().map(|o| o.content_type),
661 )?,
662 )?;
663 js_sys::Reflect::set(
664 &message_send_request,
665 &"delaySeconds".into(),
666 &serde_wasm_bindgen::to_value(
667 &message.options.as_ref().map(|o| o.delay_seconds),
668 )?,
669 )?;
670
671 Ok::<JsValue, Error>(message_send_request.into())
672 })
673 .collect::<Result<js_sys::Array>>()?;
674
675 let fut: JsFuture = self.0.send_batch(messages, batch_send_options)?.into();
676 fut.await.map_err(Error::from)?;
677 Ok(())
678 }
679}