Отправка потоковых данных в формате JSON в Java / Scala - PullRequest
0 голосов
/ 05 января 2019

Я привык к Python и использую библиотеки Scala Spark Streaming для обработки потоковых данных Twitter в реальном времени. Прямо сейчас я могу отправить в виде строки, однако мой потоковый сервис требует JSON. Есть ли способ, которым я могу легко адаптировать свой код для отправки в виде словаря JSON вместо строки?

%scala

import scala.collection.JavaConverters._
import com.microsoft.azure.eventhubs._
import java.util.concurrent._

val namespaceName = "hubnamespace"
val eventHubName = "hubname"
val sasKeyName = "RootManageSharedAccessKey"
val sasKey = "key"
val connStr = new ConnectionStringBuilder()
            .setNamespaceName(namespaceName)
            .setEventHubName(eventHubName)
            .setSasKeyName(sasKeyName)
            .setSasKey(sasKey)

val pool = Executors.newFixedThreadPool(1)
val eventHubClient = EventHubClient.create(connStr.toString(), pool)

def sendEvent(message: String) = {
  val messageData = EventData.create(message.getBytes("UTF-8"))
  // CONVERT IT HERE?
  eventHubClient.get().send(messageData)
  System.out.println("Sent event: " + message + "\n")
}

import twitter4j._
import twitter4j.TwitterFactory
import twitter4j.Twitter
import twitter4j.conf.ConfigurationBuilder

val twitterConsumerKey = "key"
val twitterConsumerSecret = "key"
val twitterOauthAccessToken = "key"
val twitterOauthTokenSecret = "key"

val cb = new ConfigurationBuilder()
  cb.setDebugEnabled(true)
  .setOAuthConsumerKey(twitterConsumerKey)
  .setOAuthConsumerSecret(twitterConsumerSecret)
  .setOAuthAccessToken(twitterOauthAccessToken)
  .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

val twitterFactory = new TwitterFactory(cb.build())
val twitter = twitterFactory.getInstance()

val query = new Query(" #happynewyear ")
query.setCount(100)
query.lang("en")
var finished = false
while (!finished) {
  val result = twitter.search(query)
  val statuses = result.getTweets()
  var lowestStatusId = Long.MaxValue
  for (status <- statuses.asScala) {
    if(!status.isRetweet()){
      sendEvent(status.getText())
    }
    lowestStatusId = Math.min(status.getId(), lowestStatusId)
    Thread.sleep(2000)
  }
  query.setMaxId(lowestStatusId - 1)
}

 eventHubClient.get().close()

1 Ответ

0 голосов
/ 05 января 2019

В Scala нет собственного способа преобразования строки в Json, вам нужно использовать внешнюю библиотеку. Я рекомендую использовать Jackson. Если вы используете gradle, вы можете добавить такую ​​зависимость: compile("com.fasterxml.jackson.module:jackson-module-scala_2.12"). (Используйте соответствующую версию Scala)

Затем вы можете просто преобразовать ваш объект данных в JSON следующим образом:

val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)

val json = valueToTree(messageData)

Я настоятельно рекомендую вам приложить усилия к Джексону, вам это сильно понадобится, если вы будете работать с JSON.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...