на ноутбуке spark jupyter не отображается вывод консоли scala - PullRequest
0 голосов
/ 05 июля 2018

1) Я изучаю потоковую передачу и сталкиваюсь с проблемами, когда ничего не отображается (println через sendEVent) на консоли (scala). Далее я попытался внедрить строку println ("xyz") и обнаружил, что она печатается только в том случае, если они не встроены в блок блока while ... в противном случае она не будет напечатана даже перед циклом while. Я поместил еще несколько строк этих println ("xyz") и обнаружил, что некоторые могут быть заблокированы ... и распечатана только последняя.

Ранее я также дважды подключал два разных кода для потоковой передачи Storm: ничего не печаталось из ноутбука Jupyter, но на Scala Shell все было в порядке.

2) Также мне интересно в тех awaitTermination (), таких как: messages.writeStream.outputMode ("append"). format ("console"). option ("truncate", false) .start (). awaitTermination () (я также не получаю вывод из консоли)

или те "бесконечные петли", как показано ниже: закончено = ложь пока (! закончено) {................. ..}

они ждут тяжелого перерыва, как остановка или [CTR] C ... или как их правильно сломать? поэтому следующая строка будет выполнена. Я так растерялся, что автор, пишущий образцы / учебники, ничего не объяснил по этому поводу.

enter code here

import java.util._
import scala.collection.JavaConverters._
import java.util.concurrent._

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.eventhubs.ConnectionStringBuilder

// Event hub configurations
// Replace values below with yours        
val eventHubName = "<Event hub name>"
val eventHubNSConnStr = "<Event hub namespace connection string>"
val connStr = ConnectionStringBuilder (eventHubNSConnStr)
.setEventHubName(eventHubName).build 

import com.microsoft.azure.eventhubs._
val pool = Executors.newFixedThreadPool(1)
val eventHubClient = EventHubClient.create(connStr.toString(), pool)

def sendEvent(message: String) = {
  val messageData = EventData.create(message.getBytes("UTF-8"))
  eventHubClient.get().send(messageData)
  println("Sent event: " + message + "\n")
}


import twitter4j._
import twitter4j.TwitterFactory
import twitter4j.Twitter
import twitter4j.conf.ConfigurationBuilder

// Twitter application configurations
// Replace values below with yours   
val twitterConsumerKey = "<CONSUMER KEY>"
val twitterConsumerSecret = "<CONSUMER SECRET>"
val twitterOauthAccessToken = "<ACCESS TOKEN>"
val twitterOauthTokenSecret = "<TOKEN SECRET>"

val cb = new ConfigurationBuilder()
cb.setDebugEnabled
(true).setOAuthConsumerKeywitterConsumerKey).setOAuthConsumerSecret
(twitterConsumerSecret).setOAuthAccessToken
(twitterOauthAccessToken).setOAuthAccessTokenSecret(twitterOauthTokenSecret)

val twitterFactory = new TwitterFactory(cb.build())
val twitter = twitterFactory.getInstance()

//Getting tweets with keyword "Azure" and sending them to Event Hub realtime 

val query = new Query(" #Azure ")
query.setCount(100)
query.lang("en")
var finished = false
while (!finished) {
  val result = twitter.search(query)
  val statuses = result.getTweets()
  var lowestStatusId = Long.MaxValue
  for (status <- statuses.asScala) {
  if(!status.isRetweet()){
  sendEvent(status.getText())
}
  lowestStatusId = Math.min(status.getId(), lowestStatusId)
  Thread.sleep(2000)
}
query.setMaxId(lowestStatusId - 1)
}

// Closing connection to the Event Hub
eventHubClient.get().close()
...