From c457949169bc64b263791909f19772298324dc8d Mon Sep 17 00:00:00 2001 From: Gert Goet Date: Thu, 27 Sep 2018 00:53:27 +0200 Subject: [PATCH 1/4] Sleep before sending first PINGREQ --- src/otarta/core.cljs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index e113aee..70b4e5d 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -202,20 +202,18 @@ control-ch (async/promise-chan)] (info :start-pinger) (go-loop [n 0] - (info :start-pinger :sending-ping n) - (let [next-pingresp (capture-first-packet source - (packet-filter {[:first-byte :type] :pingresp})) - [err resp] ( Date: Thu, 27 Sep 2018 00:53:51 +0200 Subject: [PATCH 2/4] Formatting --- src/otarta/core.cljs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 70b4e5d..9f3f6f8 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -234,11 +234,14 @@ The root-topic is prepended to all subscribes/publishes and ensures that the cli [{:keys [broker-url default-root-topic] :as opts}] {:pre [broker-url]} (let [default-opts {:keep-alive 60 :client-id (str "otarta-" (random-uuid))} - config (-> broker-url - (parse-broker-url {:default-root-topic default-root-topic}) - (merge default-opts) - (merge (select-keys opts [:client-id :keep-alive])))] - {:config config :stream (atom nil) :pinger (atom nil) :packet-identifier (atom 0)})) + config (-> broker-url + (parse-broker-url {:default-root-topic default-root-topic}) + (merge default-opts) + (merge (select-keys opts [:client-id :keep-alive])))] + {:config config + :packet-identifier (atom 0) + :pinger (atom nil) + :stream (atom nil)})) (defn connect @@ -309,7 +312,7 @@ The root-topic is prepended to all subscribes/publishes and ensures that the cli (defn- subscription-chan [{stream :stream :as client} topic-filter payload-formatter] - (let [{source :source} @stream + (let [{source :source} @stream pkts-for-topic-filter (packet-filter {[:remaining-bytes :topic] (partial topic-filter-matches-topic? topic-filter)}) -- GitLab From fc5b992eb02bdb4b64a09c3bf30a6a5d05a417c2 Mon Sep 17 00:00:00 2001 From: Gert Goet Date: Thu, 27 Sep 2018 13:06:20 +0200 Subject: [PATCH 3/4] Allow for pinger to be instructed to skip --- src/otarta/core.cljs | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/otarta/core.cljs b/src/otarta/core.cljs index 9f3f6f8..a7e2f51 100644 --- a/src/otarta/core.cljs +++ b/src/otarta/core.cljs @@ -199,16 +199,21 @@ {keep-alive :keep-alive} :config :as client}] (go (let [{:keys [sink source]} @stream - control-ch (async/promise-chan)] + control-ch (async/chan (async/sliding-buffer 1))] (info :start-pinger) (go-loop [n 0] (let [[v ch] (async/alts! [control-ch (async/timeout (* 1000 keep-alive))]) + stop? (= v :stop) + skip? (= v :skip) next-pingresp (capture-first-packet source (packet-filter {[:first-byte :type] :pingresp}))] - (if v + (if stop? (info :pinger :stopping) - (let [[err resp] (! @pinger :stop)) [nil client])) +(defn- skip-ping [{pinger :pinger :as client}] + (info :skip-ping {:pinger pinger}) + (go + (and @pinger (>! @pinger :skip)) + [nil client])) + + (defn client "Accepts the following parameters: - broker-url (required) - url of the form ws(s)://(user:password@)host:12345/path(#some/root-topic). @@ -294,8 +306,10 @@ The root-topic is prepended to all subscribes/publishes and ensures that the cli (#(apply % (list to-publish))))] (if fmt-err [fmt-err nil] - (do (>! sink (packet/publish formatted)) - [nil {}]))))) + (do + (>! sink (packet/publish formatted)) + (skip-ping client) + [nil {}]))))) (defn publish -- GitLab From b1d62b0e1e18a2eb05390dbfdb3033a523f570f4 Mon Sep 17 00:00:00 2001 From: Gert Goet Date: Thu, 27 Sep 2018 13:28:43 +0200 Subject: [PATCH 4/4] Improve commented example --- src/otarta/main.cljs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/otarta/main.cljs b/src/otarta/main.cljs index fc91e58..59987d5 100644 --- a/src/otarta/main.cljs +++ b/src/otarta/main.cljs @@ -2,6 +2,7 @@ (:require-macros [cljs.core.async.macros :refer [go go-loop]]) (:require + [cljs.core.async :as async] [goog.crypt :as crypt] [huon.log :as log :refer [debug info warn error]] [otarta.core :as mqtt] @@ -58,21 +59,32 @@ (comment - (def client (mqtt/client {:broker-url "ws://localhost:9001"})) + (def client2 (mqtt/client {:broker-url "ws://localhost:9001#some/root/topic" :keep-alive 10})) + (def sub1 (atom nil)) + + (go - (let [[err {sub-ch :ch}] (