Подключение к брокеру AWS AppSync MQTT с помощью Paho - PullRequest
5 голосов
/ 31 мая 2019

Сначала код выполняет HTTP-запрос с запросом GraphQL, чтобы получить URL-адрес подписки MQTT, идентификатор клиента и тему для подписки. Проект использует Paho 1.1.0, затем с этими данными пытается подключиться к брокеру MQTT. Когда код запускается, я могу получить только URL MQTT, но когда он пытается подключиться к AWS AppSync Broker, он генерирует исключение NullPointerException:

Exception in thread "main" MqttException (0) - java.lang.NullPointerException, compiling:(/tmp/form-init1329614074410829203.clj:1:73)
        at clojure.lang.Compiler.load(Compiler.java:7391)
        at clojure.lang.Compiler.loadFile(Compiler.java:7317)
        at clojure.main$load_script.invokeStatic(main.clj:275)
        at clojure.main$init_opt.invokeStatic(main.clj:277)
        at clojure.main$init_opt.invoke(main.clj:277)
        at clojure.main$initialize.invokeStatic(main.clj:308)
        at clojure.main$null_opt.invokeStatic(main.clj:342)
        at clojure.main$null_opt.invoke(main.clj:339)
        at clojure.main$main.invokeStatic(main.clj:421)
        at clojure.main$main.doInvoke(main.clj:384)
        at clojure.lang.RestFn.invoke(RestFn.java:421)
        at clojure.lang.Var.invoke(Var.java:383)
        at clojure.lang.AFn.applyToHelper(AFn.java:156)
        at clojure.lang.Var.applyTo(Var.java:700)
        at clojure.main.main(main.java:37)
Caused by: MqttException (0) - java.lang.NullPointerException
        at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
        at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:664)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketHandshake.receiveHandshakeResponse(WebSocketHandshake.java:133)
        at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketHandshake.execute(WebSocketHandshake.java:74)
        at org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketSecureNetworkModule.start(WebSocketSecureNetworkModule.java:77)
        at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:650)
        ... 1 more

Отладка Я понял, что брокер на самом деле возвращает 403 Forbidden

код:

project.clj

(defproject gateway-clj "0.1.0-SNAPSHOT"
  :description "test"
  :url "http://example.com/FIXME"
  :main gateway-clj.core
  :debug true
  :uberjar  {:aot :all}
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [clojurewerkz/urly "1.0.0"]
                 [clj-http "3.10.0"]
                 [cheshire "5.8.1"]
                 [org.eclipse.paho/org.eclipse.paho.client.mqttv3 "1.1.0"]])

core.clj

   (ns gateway-clj.core
  (:refer-clojure :exclude [resolve])
  (:use clojurewerkz.urly.core)
  (require
   [clj-http.client :as client]
   [cheshire.core :refer :all])
  (import  [org.eclipse.paho.client.mqttv3 MqttCallback MqttAsyncClient MqttConnectOptions MqttTopic MqttException MqttDeliveryToken MqttClientPersistence]
           [org.eclipse.paho.client.mqttv3.persist MemoryPersistence]
           [java.util Properties]
           [java.net URI URL]
           [javax.net SocketFactory]))

(def data {:api-url "https://xxxxx.appsync-api.us-east-2.amazonaws.com/graphql"
           :request-conf {:headers {"X-Api-Key" "xxxxxx"}
                          :content-type :json
                          :body "{\"query\":\"mutation createRoom ($input: CreateRoomInput!) {\\n  createRoom(input:$input){\\n    id\\n  }\\n}\\n\\nquery listMessages{\\n  listMessages {\\n    items{\\n      id\\n      content\\n    }\\n  }\\n}\\n\\nquery listRooms {\\n  listRooms {\\n    items {\\n      id\\n      createdAt\\n    }\\n  }\\n}\\n\\nsubscription OnCreateMessage {\\n    onCreateMessage {\\n      __typename\\n      id\\n      when\\n      content\\n      roomId\\n    }\\n  }\\n \",\"variables\":{\"input\":{\"id\":\"room2\"}},\"operationName\":\"OnCreateMessage\"}"}})


(defn ^MqttClientPersistence new-memory-persister
  "Returns new in-memory persister"
  []
  (MemoryPersistence.))

(defn- mqtt-callback
  "Function called after delivery confirmation"
  []
  (reify MqttCallback
    (connectionLost [_ cause]
      (println (.toString cause)))
    (messageArrived [_ topic message]
      (println "Topic: " (.getName topic))
      (println "Message: " (.getPayload message)))
    (deliveryComplete [_ token]
      (println "*** DELIVERY COMPLETE ***"))))

(defn- mqtt-connect
  [topic broker-url client-id]
  (let [mqtt-conn-options (MqttConnectOptions.)
        url-map   (as-map (url-like broker-url))
        mqtt-client (MqttAsyncClient. broker-url client-id (new-memory-persister))]
    (println client-id)
    (println "----")
    (println broker-url)
    (doto mqtt-conn-options
      (.setCleanSession false)
      (.setKeepAliveInterval 30)
      (.setMqttVersion 0))

    (.setCallback mqtt-client (mqtt-callback))
    (let [mqtt-token (.connect mqtt-client mqtt-conn-options)]
      (println mqtt-token)
      (. mqtt-token waitForCompletion)
      mqtt-client)))

(defn -main [] 
  (let [res (client/post (get data :api-url) (get data :request-conf))
        body (parse-string (get res :body) true)
        mqtt-data (first (get-in body [:extensions :subscription :mqttConnections]))

        {mqtt-url :url
         mqtt-client-id :client
         mqtt-topic  :topic} mqtt-data
        mqtt-client (mqtt-connect mqtt-topic mqtt-url  mqtt-client-id)]
     (. mqtt-client subscribe mqtt-topic 0)))
...