Невозможно прочитать данные из твиттера, опубликованные в kafka topi c, используя spark scala - PullRequest
0 голосов
/ 08 января 2020

Я пытаюсь использовать базовый c вариант получения twitter данных и помещения их в kafka topic. Я ссылался на различные сообщения и, наконец, смог создать код для этого, но, к сожалению, не смог прочитать сообщения через потребителя. Ниже приведен мой искровой код

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object Testing {

  def main(args: Array[String]) {
    val appName = "TwitterData"
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", appName)
    val sc = new SparkContext(conf)
    //create context
    val ssc = new StreamingContext(sc, Seconds(10))

    // values of Twitter API.
    val consumerKey = "" // Your consumerKey
    val consumerSecret = "" // your API secret
    val accessToken = "" // your access token
    val accessTokenSecret = "" // your token secret

    //Connection to Twitter API
    val cb = new ConfigurationBuilder
    cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret).setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)

    val auth = new OAuthAuthorization(cb.build)
    val tweets = TwitterUtils.createStream(ssc, Some(auth))
    val englishTweets = tweets.filter(_.getLang() == "en")

    val statuses = englishTweets.map(status => (status.getText(), status.getUser.getName(), status.getUser.getScreenName(), status.getCreatedAt.toString))

    statuses.foreachRDD { (rdd, time) =>
    print("INSIDE ForEACH")
      rdd.foreachPartition { partitionIter =>
        val props = new Properties()
        val  bootstrap = "localhost:9092" //-- your external ip of GCP VM, example: 10.0.0.1:9092
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("bootstrap.servers", bootstrap)
        val producer = new KafkaProducer[String, String](props)
        partitionIter.foreach { elem =>
          val dat = elem.toString()
          println("before data....")
          print(dat)
          val data = new ProducerRecord[String, String]("twitterData", null,dat) // "twitterData" is the name of Kafka topic
          producer.send(data)
        }
        producer.flush()
        producer.close()
      }
    }
    ssc.start()
    ssc.awaitTermination()

. Я использовал приведенную ниже команду для запуска consumer для чтения сообщений, но не смог прочитать ни одного сообщения. Он не выдает никакой ошибки, но сообщения не отображаются в окне потребителя.

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic twitterData  

Я попытался протестировать свой код и обнаружил, что он никогда не дойдет до утверждения "before data...", и я думаю, поэтому его не публикуют в топи c.

Ниже приведен пример сообщений консоли Eclipse, которые я получаю, когда начинаю выполнять этот код:

20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486242800 stored as values in memory (estimated size 88.0 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486242800 in memory on Siddhe:62039 (size: 88.0 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486242800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486242800
20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486243000 stored as values in memory (estimated size 4.6 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486243000 in memory on Siddhe:62039 (size: 4.6 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486243000 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486243000
20/01/08 17:54:04 INFO MemoryStore: Block input-0-1578486243800 stored as values in memory (estimated size 112.1 KB, free 1971.1 MB)
20/01/08 17:54:04 INFO BlockManagerInfo: Added input-0-1578486243800 in memory on Siddhe:62039 (size: 112.1 KB, free: 1971.1 MB)
20/01/08 17:54:04 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:04 WARN BlockManager: Block input-0-1578486243800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:04 INFO BlockGenerator: Pushed block input-0-1578486243800

Дайте мне знать, чего мне здесь не хватает.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...