diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 8fc44624774f2919e36e27602b99a410861b9747..2b5531918f6f598995ec0899df13ebeb6750c337 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -9,9 +9,12 @@ [haslett.format :as ws-fmt] [huon.log :refer [debug info warn error]] [lambdaisland.uri :as uri] + [otarta.errify :refer [errify errify>> errify> + err-through> err-through>> ] + :refer-macros [err-> err->> ]] [otarta.format :as fmt :refer [PayloadFormat]] [otarta.packet :as packet] - [otarta.util :as util :refer-macros [ err-> err->>]])) + [otarta.util :as util])) (defn timeout-channels-wont-prevent-nodejs-exit! @@ -229,25 +232,24 @@ This function ensures that if there's no other activity keeping the event loop r (let [{:keys [control-ch stop-status] :as pinger} {:control-ch (async/promise-chan) :stop-status (async/promise-chan)}] - (go - (go-loop [sleep-seconds delay] - (info :pinger {:sleep-seconds sleep-seconds}) - (let [sleep (async/timeout (* 1000 sleep-seconds)) - stop! (fn [] - (info :pinger :stopping) - (async/put! stop-status {:reason :stopped})) - failed! (fn [err] - (error :pinger err) - (async/put! stop-status {:reason :failed :error err})) - [v ch] (async/alts! [control-ch sleep])] - (if v - (stop!) - (let [[err _] ( stream deref :close-status) pinger-stopped? (-> pinger deref :stop-status)] (go - (go - (let [[v ch] (async/alts! [control-ch stream-closed? pinger-stopped?])] - (info :connection-watcher :received {:v v}) - (condp = ch - stream-closed? (do (error :connection-watcher :stream-closed) - (stop-pinger client)) - pinger-stopped? (do (error :connection-watcher :pinger-stopped) - (stream-disconnect client)) - control-ch (info :connection-watcher :intentional-stop) - (error :connection-watcher :unkown-event)))) - (update client :connection-watcher reset! watcher) - [nil client]))) + (let [[v ch] (async/alts! [control-ch stream-closed? pinger-stopped?])] + (info :connection-watcher :received {:v v}) + (condp = ch + stream-closed? (do (error :connection-watcher :stream-closed) + (stop-pinger client)) + pinger-stopped? (do (error :connection-watcher :pinger-stopped) + (stream-disconnect client)) + control-ch (info :connection-watcher :intentional-stop) + (error :connection-watcher :unkown-event)))) + (update client :connection-watcher reset! watcher) + client)) (defn- stop-connection-watcher [{watcher :connection-watcher :as client}] @@ -347,8 +348,8 @@ WARNING: Connecting with a client-id that's already in use results in the existi ( client (stream-connect) (mqtt-connect) - (start-pinger {:delay (:keep-alive config)}) - (start-connection-watcher)))) + ( #(start-pinger % {:delay (:keep-alive config)})) + ( start-connection-watcher)))) (defn- publish* [{stream :stream :as client} app-topic payload {:keys [format retain?] :or {format :string retain? false}}] @@ -358,12 +359,11 @@ WARNING: Connecting with a client-id that's already in use results in the existi to-publish {:topic (app-topic->broker-topic client app-topic) :retain? retain? :payload payload - :empty? (empty-payload? payload)} - [fmt-err formatted] (fmt/write format to-publish)] - (if fmt-err - [fmt-err nil] - (do (>! sink (packet/publish formatted)) - [nil {}]))))) + :empty? (empty-payload? payload)}] + (err-> format + (fmt/write to-publish) + (errify> packet/publish) + (err-through> (partial async/put! sink)))))) (defn- did-connect! [client] @@ -414,9 +414,9 @@ WARNING: Connecting with a client-id that's already in use results in the existi :topic (broker-topic->app-topic client topic)}) subscription-xf (comp pkts-for-topic-filter (map pkt->msg) - (map (comp second msg-reader)) - (remove nil?))] - [nil (capture-all-packets source subscription-xf)])) + (map msg-reader) + (remove (comp nil? :payload)))] + (capture-all-packets source subscription-xf))) (defn- send-subscribe-and-await-suback [{stream :stream :as client} subscriptions] @@ -442,7 +442,8 @@ WARNING: Connecting with a client-id that's already in use results in the existi (let [topic-filter (app-topic->broker-topic client app-topic-filter) [sub-err sub-ch] (err->> format fmt/read - (subscription-chan client topic-filter))] + (errify>> + (partial subscription-chan client topic-filter)))] (if sub-err [sub-err nil] (let [sub {:topic-filter topic-filter diff --git a/src/otarta/errify.cljc b/src/otarta/errify.cljc new file mode 100644 index 0000000000000000000000000000000000000000..394778487917de52b4394317771ff99f8d6bbc07 --- /dev/null +++ b/src/otarta/errify.cljc @@ -0,0 +1,214 @@ +(ns otarta.errify + #?(:cljs (:require [cljs.core.async :as async])) + #?(:clj (:require [clojure.core.async :as async]))) + +(defn errify + "Turns `op` into function that can be used in err-*. + + When no error-opts given, will wrap result as success: `[nil result]`. + When provided `:err-when` and `:err`/`:err-fn`, will yield `[err nil]` when `(err-when result)` is true. + + ;; simple: make inc comply with convention: + ((errify inc) 1) ;; => [nil 2] + + ;; some results are wrong though: + ((errify inc {:err-when #{2}) 1) ;; => [2 nil] + + ;; ...and should yield a specific error: + ((errify inc {:err-when #{2} :err :some-error}) 1) ;; => [:some-error nil] + + ;; ...or an error based on the outcome: + ((errify inc {:err-when #(> % 10) + :err-fn #(str \"Too large: \" %)}) 20) ;; => [\"Too large: 21\" nil] + + + Use it in `err->`: + (let [my-inc (errify inc {:err-when #{2} :err :some-err})] + (err-> 0 my-inc) ;; => [nil 1] + (err-> 1 my-inc)) ;; => [:some-err nil] + + To 'inline' a function in err-* use errify> and errify>>: + (err-> 1 (errify> inc)) ;; => [nil 2] + (err-> 1 (errify> inc {:err-when #{2} :err :darn})) ;; => [:darn nil] + + ;; validate-email assoc-es :error when :email is not valid. + ;; To 'shortcut' our pipeline based on this key: + (err-> person + (errify> validate-email {:err-when :error :err :invalid-email})) +" + ([op] + (errify op {})) + ([op {:keys [err-when err err-fn] :or {err-when (constantly false) + err-fn identity}}] + (let [err-fn (if err (constantly err) err-fn)] + (fn [& args] + (let [res (apply op args)] + (if (err-when res) + [(err-fn res) nil] + [nil res])))))) + + +(defn errify> + "Like `errify` but for inlining in `err->`. + + Example: + ;; *will not* work (as `errify` would receive 2 arguments): + (err-> 1 (errify inc)) ;; FAILS + + ;; `errify>` handles this: + (err-> 1 (errify> inc)) ;; => [nil 2] + (err-> 2 (errify> inc {:err-when #{3} :err :some-err})) ;; => [:some-err nil]" + ([v op] + (errify> v op {})) + ([v op opts] + ((errify op opts) v))) + + +(defn errify>> + "Like `errify` but for inlining in `err->>`. See `errify>`" + ([op v] + (errify>> op {} v)) + ([op opts v] + ((errify op opts) v))) + + +(comment + ;; future ideas + (err-> person + (validate-email) + (errify> send-invite {:err-when nil? + :err-fn-pre #(vector :invite-failed (:email %))}))) + + +(defn err-through> + "Like `errify>` but passes input through when no errors occur. + + Examples: + ;; send-invite is for side-effects only and it's result should not + ;; be propagated in the pipeline: + (err-> person + (err-through> send-invite)) ;; => [nil person] + + ;; ...though we can shortcut on a faulty result: + (err-> person + (err-through> send-invite {:err-when nil? :err :invite-failed})) + + ;; yields: + ;; [:invite-failed nil] when (nil? (send-invite person)) + ;; [nil person] otherwise +" + ([v op] + (err-through> v op {})) + ([v op opts] + (let [[err _] ((errify op opts) v)] + (if err + [err nil] + [nil v])))) + + +(defn err-through>> + ([op v] + (err-through>> op v {})) + ([op opts v] + (let [[err _] ((errify op opts) v)] + (if err + [err nil] + [nil v])))) + + +(defn + ([op v] + ( op v {})) + ([op opts v] + (async/go (err-through> op opts v)))) + + +#?(:clj + (defmacro ^:private ~init ~placeholder ~@(for [expr exprs] + `(let [result# (cljs.core.async/ + "Each of the forms should yield a tuple [err result] wrapped + in an async-channel. As long as err is nil, result is passed onto + the next form as the first argument. As soon as an err is not + nil (or there are no more forms), the tuple is returned. + + Example: + (go + (let [[err result] ( {:url \"some-url\"} + (connect {:keep-alive 10}) + (do-query {:some :selector})))] + (if err + (println \"Something went wrong:\" err) + (println \"Query result:\" result)))) +" + [x & forms] + `(cljs.core.async.macros/go (cljs.core.async/ (cljs.core.async.macros/go [nil ~x]) ~@forms))))) + + +#?(:clj + (defmacro > + "Each of the forms should yield a tuple [err result] wrapped + in an async-channel. As long as err is nil, result is passed onto + the next form as the last argument. As soon as err is not + nil (or there are no more forms), the tuple is returned. + + Example: + (go + (let [[err result] (> connect-opts + (connect client)))] + (if err + (println \"Something went wrong:\" err) + (println \"Connect result:\" result)))) +" + [x & forms] + `(cljs.core.async.macros/go (cljs.core.async/> (cljs.core.async.macros/go [nil ~x]) ~@forms))))) + + +#?(:clj + (defmacro ^:private err-* + "Internal, use err-> or err->>." + [threading init & exprs] + (let [placeholder (gensym)] + `(as-> ~init ~placeholder ~@(for [expr exprs] + `(cond-> ~placeholder + (not (first ~placeholder)) + (~threading second ~expr))))))) + + +#?(:clj + (defmacro err-> + "Threads the expr through the forms. Inserts x as the + second item in the first form, making a list of it if it is not a + list already. If there are more forms, inserts the first form as the + second item in second form, etc. + + Each form should return a tuple, where the left value is either nil + or an error, and the right value is a correct result. Threading + stops as soon as a form returns an error." + [x & forms] + `(err-* -> [nil ~x] ~@forms))) + + +#?(:clj + (defmacro err->> + "Threads the expr through the forms. Inserts x as the last item in + the first form, making a list of it if it is not a list already. If + there are more forms, inserts the first form as the second item in + second form, etc. + + Each form should return a tuple, where the left value is either nil + or an error, and the right value is a correct result. Threading + stops as soon as a form returns an error." + [x & forms] + `(err-* ->> [nil ~x] ~@forms))) diff --git a/src/otarta/format.cljs b/src/otarta/format.cljs index 35a784ff00a386d5586b690bf8926dff9bdb1f34..b86e73c17bc0b699756af441be52271e8ade8676 100644 --- a/src/otarta/format.cljs +++ b/src/otarta/format.cljs @@ -4,7 +4,7 @@ [cljs.reader :as reader] [cognitect.transit :as transit] [goog.crypt :as crypt] - [otarta.util :refer-macros [err-> err->>]] + [otarta.errify :refer [errify errify>> errify>] :refer-macros [err-> err->>]] [huon.log :refer [debug info warn error]])) @@ -33,7 +33,7 @@ (def string (reify PayloadFormat (-read [_ buff] - (info :read-string) + (info :read-string {:buff buff}) (crypt/utf8ByteArrayToString buff)) (-write [_ v] (info :write-string {:value v}) @@ -104,20 +104,15 @@ This makes writing records impossible." (defn msg-formatter [rw format] - (if-let [payload-format (find-payload-format format)] - (let [formatter (fn [{e? :empty? :as msg}] - (info :formatter) - (let [try-format #(try (rw payload-format %) - (catch js/Error _ - (error :format-error) - nil)) - format-fn (if e? (partial rw empty) try-format) - formatted-payload (-> msg :payload format-fn)] - (if (nil? formatted-payload) - [:format-error nil] - [nil (assoc msg :payload formatted-payload)])))] - [nil formatter]) - [:unkown-format nil])) + (when-let [payload-format (find-payload-format format)] + (fn [{e? :empty? :as msg}] + (info :formatter) + (let [try-format #(try (rw payload-format %) + (catch js/Error _ + (error :format-error) + nil)) + format-fn (if e? (partial rw empty) try-format)] + (update msg :payload format-fn))))) (defn write @@ -139,12 +134,16 @@ This makes writing records impossible." ;; => [nil {:payload #object[Uint8Array ]}] " ([format] - (err->> format - (msg-formatter -write))) + (let [acquire-writer (partial msg-formatter -write)] + (err->> format + (errify>> acquire-writer {:err-when nil? + :err :unknown-format})))) ([format msg] - (err-> format - (write) - (apply (list msg))))) + (let [apply-writer #(% msg)] + (err-> format + (write) + (errify> apply-writer {:err-when (comp nil? :payload) + :err :format-error}))))) (defn read @@ -166,9 +165,13 @@ This makes writing records impossible." ;; => [nil {:empty? true :payload \"\"}] " ([format] - (err->> format - (msg-formatter -read))) + (let [acquire-reader (partial msg-formatter -read)] + (err->> format + (errify>> acquire-reader {:err-when nil? + :err :unknown-format})))) ([format msg] - (err-> format - read - (apply (list msg))))) + (let [apply-reader #(apply % (list msg))] + (err-> format + read + (errify> apply-reader {:err-when (comp nil? :payload) + :err :format-error}))))) diff --git a/src/otarta/util.clj b/src/otarta/util.clj deleted file mode 100644 index 75f9221bfa5f54307b1711302e64a1120031d620..0000000000000000000000000000000000000000 --- a/src/otarta/util.clj +++ /dev/null @@ -1,86 +0,0 @@ -(ns otarta.util) - - -(defmacro ^:private ~init ~placeholder ~@(for [expr exprs] - `(let [result# (cljs.core.async/ - "Each of the forms should yield a tuple [err result] wrapped - in an async-channel. As long as err is nil, result is passed onto - the next form as the first argument. As soon as an err is not - nil (or there are no more forms), the tuple is returned. - - Example: - (go - (let [[err result] ( {:url \"some-url\"} - (connect {:keep-alive 10}) - (do-query {:some :selector})))] - (if err - (println \"Something went wrong:\" err) - (println \"Query result:\" result)))) -" - [x & forms] - `(cljs.core.async.macros/go (cljs.core.async/ (cljs.core.async.macros/go [nil ~x]) ~@forms)))) - - -(defmacro > - "Each of the forms should yield a tuple [err result] wrapped - in an async-channel. As long as err is nil, result is passed onto - the next form as the last argument. As soon as err is not - nil (or there are no more forms), the tuple is returned. - - Example: - (go - (let [[err result] (> connect-opts - (connect client)))] - (if err - (println \"Something went wrong:\" err) - (println \"Connect result:\" result)))) -" - [x & forms] - `(cljs.core.async.macros/go (cljs.core.async/> (cljs.core.async.macros/go [nil ~x]) ~@forms)))) - - -(defmacro ^:private err-* - "Internal, use err-> or err->>." - [threading init & exprs] - (let [placeholder (gensym)] - `(as-> ~init ~placeholder ~@(for [expr exprs] - `(cond-> ~placeholder - (not (first ~placeholder)) - (~threading second ~expr)))))) - - -(defmacro err-> - "Threads the expr through the forms. Inserts x as the - second item in the first form, making a list of it if it is not a - list already. If there are more forms, inserts the first form as the - second item in second form, etc. - - Each form should return a tuple, where the left value is either nil - or an error, and the right value is a correct result. Threading - stops as soon as a form returns an error." - [x & forms] - `(err-* -> [nil ~x] ~@forms)) - - -(defmacro err->> - "Threads the expr through the forms. Inserts x as the last item in - the first form, making a list of it if it is not a list already. If - there are more forms, inserts the first form as the second item in - second form, etc. - - Each form should return a tuple, where the left value is either nil - or an error, and the right value is a correct result. Threading - stops as soon as a form returns an error." - [x & forms] - `(err-* ->> [nil ~x] ~@forms)) diff --git a/test/otarta/core_test.cljs b/test/otarta/core_test.cljs index daf443f572faf00c526094f0ac1105c160d7e71f..4dcebf27e069aea1a2d38ca0eec74fd2d306e711 100644 --- a/test/otarta/core_test.cljs +++ b/test/otarta/core_test.cljs @@ -9,7 +9,7 @@ [otarta.core :as sut] [otarta.format :as fmt :refer [PayloadFormat]] [otarta.packet :as pkt] - [otarta.util :refer-macros [err-> err->>]] + [otarta.errify :refer-macros [err-> err->>]] [otarta.test-helpers :as helpers :refer [test-async sub?]])) #_(log/enable!) @@ -130,17 +130,16 @@ {:empty? (= msg "") :topic topic :payload (str->uint8array msg)}))) - subscribe! #(-> %3 - (err->> (fmt/read) - (sut/subscription-chan %1 %2)) - second) + subscribe! (fn [client tfilter format] + (sut/subscription-chan client tfilter + (second (fmt/read format)))) messages-received (fn [ch] (async/close! ch) (async/into [] ch)) payloads-received #(go (map :payload ( err->]])) + +(deftest err->-test + (testing "chains operations" + (letfn [(err-inc [x] [nil (inc x)])] + (is (= [nil 3] + (err-> 1 + (err-inc) + (err-inc)))))) + + (testing "stops chaining on error" + (letfn [(err-inc-<-10 [x] (if (< x 10) + [nil (inc x)] + [:too-large nil]))] + (is (= [:too-large nil] + (err-> 9 + (err-inc-<-10) + (err-inc-<-10)))))) + + (testing "passes result as first argument" + (letfn [(err-div [x y] [nil (/ x y)])] + (is (= [nil 6] + (err-> 6 + (err-div 1))))))) + + +(deftest errify-test + (testing "with no conditions: [nil result]" + (let [fut (sut/errify inc)] + (is (= [nil 2] + (fut 1))))) + + (testing "with error conditions: [result nil]" + (let [errs-eq-2 (sut/errify inc {:err-when #{2}})] + (are [input expected] (= expected (errs-eq-2 input)) + 0 [nil 1] + 1 [2 nil] + 2 [nil 3]))) + + (testing "providing :err" + (let [errs-lt-10 (sut/errify inc {:err-when #(< % 10) :err :too-small})] + (are [input expected] (= expected (errs-lt-10 input)) + 0 [:too-small nil] + 8 [:too-small nil] + 9 [nil 10]))) + + (testing "providing :err-fn" + (let [errs-gt-10 (sut/errify inc {:err-when #(> % 10) + :err-fn (partial str "Too large: ")})] + (are [input expected] (= expected (errs-gt-10 input)) + 0 [nil 1] + 10 ["Too large: 11" nil])))) + + +(deftest errify>-test + (testing "inlining in err->" + (is (= [nil 3] + (err-> 1 + (sut/errify> inc) + (sut/errify> inc)))) + (is (= [:some-error nil] + (err-> 1 (sut/errify> inc {:err-when #{2} :err :some-error})))) + (is (= [:err-on-2 nil] + (err-> 1 + (sut/errify> inc {:err-when #{2} :err :err-on-2}) + (sut/errify> inc {:err-when #{3} :err :err-on-3})))) + (is (= [:err-on-5 nil] + (err-> 1 + (sut/errify> (partial + 2) {:err-when #{2} :err :err-on-2}) + (sut/errify> (partial + 2) {:err-when #{5} :err :err-on-5})))))) diff --git a/test/otarta/format_test.cljs b/test/otarta/format_test.cljs index bde071d9303ba5edb52d708904810e6af4bed491..ee6722b21c568e4e7a426444a71c8562a4dab067 100644 --- a/test/otarta/format_test.cljs +++ b/test/otarta/format_test.cljs @@ -106,7 +106,7 @@ ;; read message (deftest read-test (testing "yields error for unknown format" - (is (sub? [:unkown-format] + (is (sub? [:unknown-format] (sut/read :foo)))) (testing "yields no error for known formats" @@ -144,7 +144,7 @@ (deftest write-test (testing "yields error for unknown format" - (is (sub? [:unkown-format] + (is (sub? [:unknown-format] (sut/write :foo)))) (testing "yields no error for known formats"