From fcacf9a198dd373606bc8bb44786ac6b17169801 Mon Sep 17 00:00:00 2001 From: Gert Goet Date: Tue, 16 Oct 2018 19:27:52 +0200 Subject: [PATCH 1/7] Add errify --- src/otarta/errify.cljc | 154 +++++++++++++++++++++++++++++++++++ test/otarta/errify_test.cljs | 73 +++++++++++++++++ 2 files changed, 227 insertions(+) create mode 100644 src/otarta/errify.cljc create mode 100644 test/otarta/errify_test.cljs diff --git a/src/otarta/errify.cljc b/src/otarta/errify.cljc new file mode 100644 index 0000000..258cfe6 --- /dev/null +++ b/src/otarta/errify.cljc @@ -0,0 +1,154 @@ +(ns otarta.errify + #?(:cljs (:require-macros + [otarta.errify :refer [err-> err->>]]))) + +(defn errify + "Turns `op` into function that can be used in err-*. + + When no error-opts given, will wrap result as success: `[nil result]`. + When provided `:err-when` and `:err`/`:err-fn`, will yield `[err nil]` when `(err-when result)` is true. + + ;; simple: make inc comply with convention: + ((errify inc) 1) ;; => [nil 2] + + ;; some results are wrong though: + ((errify inc {:err-when #{2}) 1) ;; => [2 nil] + + ;; ...and should yield a specific error: + ((errify inc {:err-when #{2} :err :some-error}) 1) ;; => [:some-error nil] + + ;; ...or an error based on the outcome: + ((errify inc {:err-when #(> % 10) + :err-fn #(str \"Too large: \" %)}) 20) ;; => [\"Too large: 21\" nil] + + + Use it in `err->`: + (let [my-inc (errify inc {:err-when #{2} :err :some-err})] + (err-> 0 my-inc) ;; => [nil 1] + (err-> 1 my-inc)) ;; => [:some-err nil] + + To 'inline' a function in err-* use errify> and errify>>: + (err-> 1 (errify> inc)) ;; => [nil 2] + (err-> 1 (errify> inc {:err-when #{2} :err :darn})) ;; => [:darn nil] + + ;; validate-email assoc-es :error when :email is not valid. + ;; To 'shortcut' our pipeline based on this key: + (err-> person + (errify> validate-email {:err-when :error :err :invalid-email})) +" + ([op] + (errify op {})) + ([op {:keys [err-when err err-fn] :or {err-when (constantly false) + err-fn identity}}] + (let [err-fn (if err (constantly err) err-fn)] + (fn [& args] + (let [res (apply op args)] + (if (err-when res) + [(err-fn res) nil] + [nil res])))))) + + +(defn errify> + "Like `errify` but for inlining in `err->`. + + Example: + ;; *will not* work (as `errify` would receive 2 arguments): + (err-> 1 (errify inc)) ;; FAILS + + ;; `errify>` handles this: + (err-> 1 (errify> inc)) ;; => [nil 2] + (err-> 2 (errify> inc {:err-when #{3} :err :some-err})) ;; => [:some-err nil]" + ([v op] + (errify> v op {})) + ([v op opts] + ((errify op opts) v))) + + +(defn errify>> + "Like `errify` but for inlining in `err->>`. See `errify>`" + ([op v] + (errify>> op {} v)) + ([op opts v] + ((errify op opts) v))) + + +(comment + ;; future ideas + (err-> person + (validate-email) + (errify> send-invite {:err-when nil? + :err-fn-pre #(vector :invite-failed (:email %))}))) + +(defn err-through> + "Like `errify>` but passes input through when no errors occur. + + Examples: + ;; send-invite is for side-effects only and it's result should not + ;; be propagated in the pipeline: + (err-> person + (err-through> send-invite)) ;; => [nil person] + + ;; ...though we can shortcut on a faulty result: + (err-> person + (err-through> send-invite {:err-when nil? :err :invite-failed})) + + ;; yields: + ;; [:invite-failed nil] when (nil? (send-invite person)) + ;; [nil person] otherwise +" + ([v op] + (err-through> v op {})) + ([v op opts] + (let [[err _] ((errify op opts) v)] + (if err + [err nil] + [nil v])))) + + +(defn err-through>> + ([op v] + (err-through>> op v {})) + ([op opts v] + (let [[err _] ((errify op opts) v)] + (if err + [err nil] + [nil v])))) + + +#?(:clj + (defmacro ^:private err-* + "Internal, use err-> or err->>." + [threading init & exprs] + (let [placeholder (gensym)] + `(as-> ~init ~placeholder ~@(for [expr exprs] + `(cond-> ~placeholder + (not (first ~placeholder)) + (~threading second ~expr))))))) + + +#?(:clj + (defmacro err-> + "Threads the expr through the forms. Inserts x as the + second item in the first form, making a list of it if it is not a + list already. If there are more forms, inserts the first form as the + second item in second form, etc. + + Each form should return a tuple, where the left value is either nil + or an error, and the right value is a correct result. Threading + stops as soon as a form returns an error." + [x & forms] + `(err-* -> [nil ~x] ~@forms))) + + +#?(:clj + (defmacro err->> + "Threads the expr through the forms. Inserts x as the last item in + the first form, making a list of it if it is not a list already. If + there are more forms, inserts the first form as the second item in + second form, etc. + + Each form should return a tuple, where the left value is either nil + or an error, and the right value is a correct result. Threading + stops as soon as a form returns an error." + [x & forms] + `(err-* ->> [nil ~x] ~@forms))) diff --git a/test/otarta/errify_test.cljs b/test/otarta/errify_test.cljs new file mode 100644 index 0000000..9d526b4 --- /dev/null +++ b/test/otarta/errify_test.cljs @@ -0,0 +1,73 @@ +(ns otarta.errify-test + (:require + [cljs.test :refer [deftest is testing are]] + [otarta.errify :as sut :refer-macros [err-> err->]])) + +(deftest err->-test + (testing "chains operations" + (letfn [(err-inc [x] [nil (inc x)])] + (is (= [nil 3] + (err-> 1 + (err-inc) + (err-inc)))))) + + (testing "stops chaining on error" + (letfn [(err-inc-<-10 [x] (if (< x 10) + [nil (inc x)] + [:too-large nil]))] + (is (= [:too-large nil] + (err-> 9 + (err-inc-<-10) + (err-inc-<-10)))))) + + (testing "passes result as first argument" + (letfn [(err-div [x y] [nil (/ x y)])] + (is (= [nil 6] + (err-> 6 + (err-div 1))))))) + + +(deftest errify-test + (testing "with no conditions: [nil result]" + (let [fut (sut/errify inc)] + (is (= [nil 2] + (fut 1))))) + + (testing "with error conditions: [result nil]" + (let [errs-eq-2 (sut/errify inc {:err-when #{2}})] + (are [input expected] (= expected (errs-eq-2 input)) + 0 [nil 1] + 1 [2 nil] + 2 [nil 3]))) + + (testing "providing :err" + (let [errs-lt-10 (sut/errify inc {:err-when #(< % 10) :err :too-small})] + (are [input expected] (= expected (errs-lt-10 input)) + 0 [:too-small nil] + 8 [:too-small nil] + 9 [nil 10]))) + + (testing "providing :err-fn" + (let [errs-gt-10 (sut/errify inc {:err-when #(> % 10) + :err-fn (partial str "Too large: ")})] + (are [input expected] (= expected (errs-gt-10 input)) + 0 [nil 1] + 10 ["Too large: 11" nil])))) + + +(deftest errify>-test + (testing "inlining in err->" + (is (= [nil 3] + (err-> 1 + (sut/errify> inc) + (sut/errify> inc)))) + (is (= [:some-error nil] + (err-> 1 (sut/errify> inc {:err-when #{2} :err :some-error})))) + (is (= [:err-on-2 nil] + (err-> 1 + (sut/errify> inc {:err-when #{2} :err :err-on-2}) + (sut/errify> inc {:err-when #{3} :err :err-on-3})))) + (is (= [:err-on-5 nil] + (err-> 1 + (sut/errify> (partial + 2) {:err-when #{2} :err :err-on-2}) + (sut/errify> (partial + 2) {:err-when #{5} :err :err-on-5})))))) -- GitLab From 8e9863ea13a8be9afc720a3f6ea2f42f233746c4 Mon Sep 17 00:00:00 2001 From: Gert Goet Date: Tue, 16 Oct 2018 19:27:57 +0200 Subject: [PATCH 2/7] Apply errify --- src/otarta/core.cljs | 17 +++++++------ src/otarta/errify.cljc | 1 + src/otarta/format.cljs | 55 ++++++++++++++++++++++-------------------- src/otarta/main.cljs | 4 +-- 4 files changed, 41 insertions(+), 36 deletions(-) diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 8fc4462..716b080 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -11,7 +11,9 @@ [lambdaisland.uri :as uri] [otarta.format :as fmt :refer [PayloadFormat]] [otarta.packet :as packet] - [otarta.util :as util :refer-macros [ err-> err->>]])) + [otarta.util :as util :refer-macros []] + [otarta.errify :refer [errify errify>> errify> err-through> err-through>>] + :refer-macros [err-> err->>]])) (defn timeout-channels-wont-prevent-nodejs-exit! @@ -358,12 +360,11 @@ WARNING: Connecting with a client-id that's already in use results in the existi to-publish {:topic (app-topic->broker-topic client app-topic) :retain? retain? :payload payload - :empty? (empty-payload? payload)} - [fmt-err formatted] (fmt/write format to-publish)] - (if fmt-err - [fmt-err nil] - (do (>! sink (packet/publish formatted)) - [nil {}]))))) + :empty? (empty-payload? payload)}] + (err-> format + (fmt/write to-publish) + (errify> packet/publish) + (err-through> (partial async/put! sink)))))) (defn- did-connect! [client] @@ -414,7 +415,7 @@ WARNING: Connecting with a client-id that's already in use results in the existi :topic (broker-topic->app-topic client topic)}) subscription-xf (comp pkts-for-topic-filter (map pkt->msg) - (map (comp second msg-reader)) + (map msg-reader) (remove nil?))] [nil (capture-all-packets source subscription-xf)])) diff --git a/src/otarta/errify.cljc b/src/otarta/errify.cljc index 258cfe6..3b30457 100644 --- a/src/otarta/errify.cljc +++ b/src/otarta/errify.cljc @@ -79,6 +79,7 @@ (errify> send-invite {:err-when nil? :err-fn-pre #(vector :invite-failed (:email %))}))) + (defn err-through> "Like `errify>` but passes input through when no errors occur. diff --git a/src/otarta/format.cljs b/src/otarta/format.cljs index 35a784f..b86e73c 100644 --- a/src/otarta/format.cljs +++ b/src/otarta/format.cljs @@ -4,7 +4,7 @@ [cljs.reader :as reader] [cognitect.transit :as transit] [goog.crypt :as crypt] - [otarta.util :refer-macros [err-> err->>]] + [otarta.errify :refer [errify errify>> errify>] :refer-macros [err-> err->>]] [huon.log :refer [debug info warn error]])) @@ -33,7 +33,7 @@ (def string (reify PayloadFormat (-read [_ buff] - (info :read-string) + (info :read-string {:buff buff}) (crypt/utf8ByteArrayToString buff)) (-write [_ v] (info :write-string {:value v}) @@ -104,20 +104,15 @@ This makes writing records impossible." (defn msg-formatter [rw format] - (if-let [payload-format (find-payload-format format)] - (let [formatter (fn [{e? :empty? :as msg}] - (info :formatter) - (let [try-format #(try (rw payload-format %) - (catch js/Error _ - (error :format-error) - nil)) - format-fn (if e? (partial rw empty) try-format) - formatted-payload (-> msg :payload format-fn)] - (if (nil? formatted-payload) - [:format-error nil] - [nil (assoc msg :payload formatted-payload)])))] - [nil formatter]) - [:unkown-format nil])) + (when-let [payload-format (find-payload-format format)] + (fn [{e? :empty? :as msg}] + (info :formatter) + (let [try-format #(try (rw payload-format %) + (catch js/Error _ + (error :format-error) + nil)) + format-fn (if e? (partial rw empty) try-format)] + (update msg :payload format-fn))))) (defn write @@ -139,12 +134,16 @@ This makes writing records impossible." ;; => [nil {:payload #object[Uint8Array ]}] " ([format] - (err->> format - (msg-formatter -write))) + (let [acquire-writer (partial msg-formatter -write)] + (err->> format + (errify>> acquire-writer {:err-when nil? + :err :unknown-format})))) ([format msg] - (err-> format - (write) - (apply (list msg))))) + (let [apply-writer #(% msg)] + (err-> format + (write) + (errify> apply-writer {:err-when (comp nil? :payload) + :err :format-error}))))) (defn read @@ -166,9 +165,13 @@ This makes writing records impossible." ;; => [nil {:empty? true :payload \"\"}] " ([format] - (err->> format - (msg-formatter -read))) + (let [acquire-reader (partial msg-formatter -read)] + (err->> format + (errify>> acquire-reader {:err-when nil? + :err :unknown-format})))) ([format msg] - (err-> format - read - (apply (list msg))))) + (let [apply-reader #(apply % (list msg))] + (err-> format + read + (errify> apply-reader {:err-when (comp nil? :payload) + :err :format-error}))))) diff --git a/src/otarta/main.cljs b/src/otarta/main.cljs index 50cee76..b419d17 100644 --- a/src/otarta/main.cljs +++ b/src/otarta/main.cljs @@ -26,7 +26,7 @@ (go-loop [] ;; TODO dist. between empty?/verbose when printing? (when-let [{:keys [payload empty?] :as m} ( Date: Tue, 16 Oct 2018 20:20:13 +0200 Subject: [PATCH 3/7] Apply some more --- src/otarta/core.cljs | 71 +++++++++++++++++++++--------------------- src/otarta/errify.cljc | 11 +++++-- src/otarta/main.cljs | 2 +- 3 files changed, 45 insertions(+), 39 deletions(-) diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 716b080..2a519d3 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -12,7 +12,8 @@ [otarta.format :as fmt :refer [PayloadFormat]] [otarta.packet :as packet] [otarta.util :as util :refer-macros []] - [otarta.errify :refer [errify errify>> errify> err-through> err-through>>] + [otarta.errify :refer [errify errify>> errify> + err-through> err-through>> ] :refer-macros [err-> err->>]])) @@ -231,25 +232,24 @@ This function ensures that if there's no other activity keeping the event loop r (let [{:keys [control-ch stop-status] :as pinger} {:control-ch (async/promise-chan) :stop-status (async/promise-chan)}] - (go - (go-loop [sleep-seconds delay] - (info :pinger {:sleep-seconds sleep-seconds}) - (let [sleep (async/timeout (* 1000 sleep-seconds)) - stop! (fn [] - (info :pinger :stopping) - (async/put! stop-status {:reason :stopped})) - failed! (fn [err] - (error :pinger err) - (async/put! stop-status {:reason :failed :error err})) - [v ch] (async/alts! [control-ch sleep])] - (if v - (stop!) - (let [[err _] ( stream deref :close-status) pinger-stopped? (-> pinger deref :stop-status)] (go - (go - (let [[v ch] (async/alts! [control-ch stream-closed? pinger-stopped?])] - (info :connection-watcher :received {:v v}) - (condp = ch - stream-closed? (do (error :connection-watcher :stream-closed) - (stop-pinger client)) - pinger-stopped? (do (error :connection-watcher :pinger-stopped) - (stream-disconnect client)) - control-ch (info :connection-watcher :intentional-stop) - (error :connection-watcher :unkown-event)))) - (update client :connection-watcher reset! watcher) - [nil client]))) + (let [[v ch] (async/alts! [control-ch stream-closed? pinger-stopped?])] + (info :connection-watcher :received {:v v}) + (condp = ch + stream-closed? (do (error :connection-watcher :stream-closed) + (stop-pinger client)) + pinger-stopped? (do (error :connection-watcher :pinger-stopped) + (stream-disconnect client)) + control-ch (info :connection-watcher :intentional-stop) + (error :connection-watcher :unkown-event)))) + (update client :connection-watcher reset! watcher))) (defn- stop-connection-watcher [{watcher :connection-watcher :as client}] @@ -349,8 +347,8 @@ WARNING: Connecting with a client-id that's already in use results in the existi ( client (stream-connect) (mqtt-connect) - (start-pinger {:delay (:keep-alive config)}) - (start-connection-watcher)))) + ( #(start-pinger % {:delay (:keep-alive config)})) + ( start-connection-watcher)))) (defn- publish* [{stream :stream :as client} app-topic payload {:keys [format retain?] :or {format :string retain? false}}] @@ -417,7 +415,7 @@ WARNING: Connecting with a client-id that's already in use results in the existi (map pkt->msg) (map msg-reader) (remove nil?))] - [nil (capture-all-packets source subscription-xf)])) + (capture-all-packets source subscription-xf))) (defn- send-subscribe-and-await-suback [{stream :stream :as client} subscriptions] @@ -443,7 +441,8 @@ WARNING: Connecting with a client-id that's already in use results in the existi (let [topic-filter (app-topic->broker-topic client app-topic-filter) [sub-err sub-ch] (err->> format fmt/read - (subscription-chan client topic-filter))] + (errify>> + (partial subscription-chan client topic-filter)))] (if sub-err [sub-err nil] (let [sub {:topic-filter topic-filter diff --git a/src/otarta/errify.cljc b/src/otarta/errify.cljc index 3b30457..c1c8da2 100644 --- a/src/otarta/errify.cljc +++ b/src/otarta/errify.cljc @@ -1,6 +1,6 @@ (ns otarta.errify - #?(:cljs (:require-macros - [otarta.errify :refer [err-> err->>]]))) + #?(:cljs (:require [cljs.core.async :as async])) + #?(:clj (:require [clojure.core.async :as async]))) (defn errify "Turns `op` into function that can be used in err-*. @@ -116,6 +116,13 @@ [nil v])))) +(defn + ([op v] + ( op v {})) + ([op opts v] + (async/go (err-through> op opts v)))) + + #?(:clj (defmacro ^:private err-* "Internal, use err-> or err->>." diff --git a/src/otarta/main.cljs b/src/otarta/main.cljs index b419d17..04c6919 100644 --- a/src/otarta/main.cljs +++ b/src/otarta/main.cljs @@ -18,7 +18,7 @@ (defn handle-sub [broker-url topic-filter] (info :handle-sub :broker-url broker-url :topic-filter topic-filter) (go - (reset! mqtt-client (mqtt/client broker-url)) + (reset! mqtt-client (mqtt/client broker-url {:keep-alive 10})) (let [[err {sub-ch :ch}] ( Date: Tue, 16 Oct 2018 21:12:24 +0200 Subject: [PATCH 4/7] Remove util.clj --- src/otarta/core.cljs | 11 +++--- src/otarta/errify.cljc | 52 +++++++++++++++++++++++++ src/otarta/util.clj | 86 ------------------------------------------ 3 files changed, 58 insertions(+), 91 deletions(-) delete mode 100644 src/otarta/util.clj diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 2a519d3..458a784 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -9,12 +9,12 @@ [haslett.format :as ws-fmt] [huon.log :refer [debug info warn error]] [lambdaisland.uri :as uri] - [otarta.format :as fmt :refer [PayloadFormat]] - [otarta.packet :as packet] - [otarta.util :as util :refer-macros []] [otarta.errify :refer [errify errify>> errify> err-through> err-through>> ] - :refer-macros [err-> err->>]])) + :refer-macros [err-> err->> ]] + [otarta.format :as fmt :refer [PayloadFormat]] + [otarta.packet :as packet] + [otarta.util :as util])) (defn timeout-channels-wont-prevent-nodejs-exit! @@ -292,7 +292,8 @@ This function ensures that if there's no other activity keeping the event loop r (stream-disconnect client)) control-ch (info :connection-watcher :intentional-stop) (error :connection-watcher :unkown-event)))) - (update client :connection-watcher reset! watcher))) + (update client :connection-watcher reset! watcher) + client)) (defn- stop-connection-watcher [{watcher :connection-watcher :as client}] diff --git a/src/otarta/errify.cljc b/src/otarta/errify.cljc index c1c8da2..3947784 100644 --- a/src/otarta/errify.cljc +++ b/src/otarta/errify.cljc @@ -123,6 +123,58 @@ (async/go (err-through> op opts v)))) +#?(:clj + (defmacro ^:private ~init ~placeholder ~@(for [expr exprs] + `(let [result# (cljs.core.async/ + "Each of the forms should yield a tuple [err result] wrapped + in an async-channel. As long as err is nil, result is passed onto + the next form as the first argument. As soon as an err is not + nil (or there are no more forms), the tuple is returned. + + Example: + (go + (let [[err result] ( {:url \"some-url\"} + (connect {:keep-alive 10}) + (do-query {:some :selector})))] + (if err + (println \"Something went wrong:\" err) + (println \"Query result:\" result)))) +" + [x & forms] + `(cljs.core.async.macros/go (cljs.core.async/ (cljs.core.async.macros/go [nil ~x]) ~@forms))))) + + +#?(:clj + (defmacro > + "Each of the forms should yield a tuple [err result] wrapped + in an async-channel. As long as err is nil, result is passed onto + the next form as the last argument. As soon as err is not + nil (or there are no more forms), the tuple is returned. + + Example: + (go + (let [[err result] (> connect-opts + (connect client)))] + (if err + (println \"Something went wrong:\" err) + (println \"Connect result:\" result)))) +" + [x & forms] + `(cljs.core.async.macros/go (cljs.core.async/> (cljs.core.async.macros/go [nil ~x]) ~@forms))))) + + #?(:clj (defmacro ^:private err-* "Internal, use err-> or err->>." diff --git a/src/otarta/util.clj b/src/otarta/util.clj deleted file mode 100644 index 75f9221..0000000 --- a/src/otarta/util.clj +++ /dev/null @@ -1,86 +0,0 @@ -(ns otarta.util) - - -(defmacro ^:private ~init ~placeholder ~@(for [expr exprs] - `(let [result# (cljs.core.async/ - "Each of the forms should yield a tuple [err result] wrapped - in an async-channel. As long as err is nil, result is passed onto - the next form as the first argument. As soon as an err is not - nil (or there are no more forms), the tuple is returned. - - Example: - (go - (let [[err result] ( {:url \"some-url\"} - (connect {:keep-alive 10}) - (do-query {:some :selector})))] - (if err - (println \"Something went wrong:\" err) - (println \"Query result:\" result)))) -" - [x & forms] - `(cljs.core.async.macros/go (cljs.core.async/ (cljs.core.async.macros/go [nil ~x]) ~@forms)))) - - -(defmacro > - "Each of the forms should yield a tuple [err result] wrapped - in an async-channel. As long as err is nil, result is passed onto - the next form as the last argument. As soon as err is not - nil (or there are no more forms), the tuple is returned. - - Example: - (go - (let [[err result] (> connect-opts - (connect client)))] - (if err - (println \"Something went wrong:\" err) - (println \"Connect result:\" result)))) -" - [x & forms] - `(cljs.core.async.macros/go (cljs.core.async/> (cljs.core.async.macros/go [nil ~x]) ~@forms)))) - - -(defmacro ^:private err-* - "Internal, use err-> or err->>." - [threading init & exprs] - (let [placeholder (gensym)] - `(as-> ~init ~placeholder ~@(for [expr exprs] - `(cond-> ~placeholder - (not (first ~placeholder)) - (~threading second ~expr)))))) - - -(defmacro err-> - "Threads the expr through the forms. Inserts x as the - second item in the first form, making a list of it if it is not a - list already. If there are more forms, inserts the first form as the - second item in second form, etc. - - Each form should return a tuple, where the left value is either nil - or an error, and the right value is a correct result. Threading - stops as soon as a form returns an error." - [x & forms] - `(err-* -> [nil ~x] ~@forms)) - - -(defmacro err->> - "Threads the expr through the forms. Inserts x as the last item in - the first form, making a list of it if it is not a list already. If - there are more forms, inserts the first form as the second item in - second form, etc. - - Each form should return a tuple, where the left value is either nil - or an error, and the right value is a correct result. Threading - stops as soon as a form returns an error." - [x & forms] - `(err-* ->> [nil ~x] ~@forms)) -- GitLab From 4cb389f8ab72d4265856721df0790549f5133ec7 Mon Sep 17 00:00:00 2001 From: Gert Goet Date: Tue, 16 Oct 2018 21:43:46 +0200 Subject: [PATCH 5/7] Fix tests --- test/otarta/core_test.cljs | 11 +++++------ test/otarta/format_test.cljs | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/test/otarta/core_test.cljs b/test/otarta/core_test.cljs index daf443f..4dcebf2 100644 --- a/test/otarta/core_test.cljs +++ b/test/otarta/core_test.cljs @@ -9,7 +9,7 @@ [otarta.core :as sut] [otarta.format :as fmt :refer [PayloadFormat]] [otarta.packet :as pkt] - [otarta.util :refer-macros [err-> err->>]] + [otarta.errify :refer-macros [err-> err->>]] [otarta.test-helpers :as helpers :refer [test-async sub?]])) #_(log/enable!) @@ -130,17 +130,16 @@ {:empty? (= msg "") :topic topic :payload (str->uint8array msg)}))) - subscribe! #(-> %3 - (err->> (fmt/read) - (sut/subscription-chan %1 %2)) - second) + subscribe! (fn [client tfilter format] + (sut/subscription-chan client tfilter + (second (fmt/read format)))) messages-received (fn [ch] (async/close! ch) (async/into [] ch)) payloads-received #(go (map :payload ( Date: Tue, 16 Oct 2018 21:44:19 +0200 Subject: [PATCH 6/7] Revert main --- src/otarta/main.cljs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/otarta/main.cljs b/src/otarta/main.cljs index 04c6919..50cee76 100644 --- a/src/otarta/main.cljs +++ b/src/otarta/main.cljs @@ -18,7 +18,7 @@ (defn handle-sub [broker-url topic-filter] (info :handle-sub :broker-url broker-url :topic-filter topic-filter) (go - (reset! mqtt-client (mqtt/client broker-url {:keep-alive 10})) + (reset! mqtt-client (mqtt/client broker-url)) (let [[err {sub-ch :ch}] ( Date: Tue, 16 Oct 2018 21:54:58 +0200 Subject: [PATCH 7/7] Fix test --- src/otarta/core.cljs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 458a784..2b55319 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -415,7 +415,7 @@ WARNING: Connecting with a client-id that's already in use results in the existi subscription-xf (comp pkts-for-topic-filter (map pkt->msg) (map msg-reader) - (remove nil?))] + (remove (comp nil? :payload)))] (capture-all-packets source subscription-xf))) -- GitLab