Я пытаюсь использовать базовый c вариант получения twitter
данных и помещения их в kafka topic
. Я ссылался на различные сообщения и, наконец, смог создать код для этого, но, к сожалению, не смог прочитать сообщения через потребителя. Ниже приведен мой искровой код
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Testing {
def main(args: Array[String]) {
val appName = "TwitterData"
val conf = new SparkConf()
conf.set("spark.master", "local")
conf.set("spark.app.name", appName)
val sc = new SparkContext(conf)
//create context
val ssc = new StreamingContext(sc, Seconds(10))
// values of Twitter API.
val consumerKey = "" // Your consumerKey
val consumerSecret = "" // your API secret
val accessToken = "" // your access token
val accessTokenSecret = "" // your token secret
//Connection to Twitter API
val cb = new ConfigurationBuilder
cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret).setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)
val auth = new OAuthAuthorization(cb.build)
val tweets = TwitterUtils.createStream(ssc, Some(auth))
val englishTweets = tweets.filter(_.getLang() == "en")
val statuses = englishTweets.map(status => (status.getText(), status.getUser.getName(), status.getUser.getScreenName(), status.getCreatedAt.toString))
statuses.foreachRDD { (rdd, time) =>
print("INSIDE ForEACH")
rdd.foreachPartition { partitionIter =>
val props = new Properties()
val bootstrap = "localhost:9092" //-- your external ip of GCP VM, example: 10.0.0.1:9092
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("bootstrap.servers", bootstrap)
val producer = new KafkaProducer[String, String](props)
partitionIter.foreach { elem =>
val dat = elem.toString()
println("before data....")
print(dat)
val data = new ProducerRecord[String, String]("twitterData", null,dat) // "twitterData" is the name of Kafka topic
producer.send(data)
}
producer.flush()
producer.close()
}
}
ssc.start()
ssc.awaitTermination()
. Я использовал приведенную ниже команду для запуска consumer
для чтения сообщений, но не смог прочитать ни одного сообщения. Он не выдает никакой ошибки, но сообщения не отображаются в окне потребителя.
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic twitterData
Я попытался протестировать свой код и обнаружил, что он никогда не дойдет до утверждения "before data..."
, и я думаю, поэтому его не публикуют в топи c.
Ниже приведен пример сообщений консоли Eclipse, которые я получаю, когда начинаю выполнять этот код:
20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486242800 stored as values in memory (estimated size 88.0 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486242800 in memory on Siddhe:62039 (size: 88.0 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486242800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486242800
20/01/08 17:54:03 INFO MemoryStore: Block input-0-1578486243000 stored as values in memory (estimated size 4.6 KB, free 1971.2 MB)
20/01/08 17:54:03 INFO BlockManagerInfo: Added input-0-1578486243000 in memory on Siddhe:62039 (size: 4.6 KB, free: 1971.2 MB)
20/01/08 17:54:03 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:03 WARN BlockManager: Block input-0-1578486243000 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:03 INFO BlockGenerator: Pushed block input-0-1578486243000
20/01/08 17:54:04 INFO MemoryStore: Block input-0-1578486243800 stored as values in memory (estimated size 112.1 KB, free 1971.1 MB)
20/01/08 17:54:04 INFO BlockManagerInfo: Added input-0-1578486243800 in memory on Siddhe:62039 (size: 112.1 KB, free: 1971.1 MB)
20/01/08 17:54:04 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/01/08 17:54:04 WARN BlockManager: Block input-0-1578486243800 replicated to only 0 peer(s) instead of 1 peers
20/01/08 17:54:04 INFO BlockGenerator: Pushed block input-0-1578486243800
Дайте мне знать, чего мне здесь не хватает.