Чтение с Кафки с Scala Spark2 Streaming - PullRequest
0 голосов
/ 20 февраля 2020

Мне нужно подключиться к Kafka и прочитать данные из него (после этого я должен записать в базу данных ElasticSearch), но сейчас я просто хочу читать и печатать данные ..

Я новичок ie и с Кафкой, и с Scala, и с чтением в целых числах rnet Я закодировал это:

//spark
import org.apache.spark._
import org.apache.spark.streaming._

//kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe



object Main extends App{

val master = "local[2]"
val hostname = ""

val conf = new SparkConf().setAppName("KafkaConnection").setMaster(master)
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(1))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "IRC",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,  PreferConsistent,  Subscribe[String, String](topics, kafkaParams))

stream.map(record => (record.key, record.value))

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](
    ssc, kafkaParams, offsetRanges, PreferConsistent)

}

Но я не знаю, как продолжить. Что мне сейчас нужно? Кроме того, знаете ли вы какой-либо Publi c Kafka Broker / topi c, который я могу использовать для чтения из него?

Заранее спасибо!

1 Ответ

0 голосов
/ 20 февраля 2020

Что мне сейчас нужно?

Попробуйте запустить код. spark-submit или запустите основной метод.

знаете ли вы какие-либо публикации c Kafka Broker / topi c, которые я могу использовать для чтения из него?

Это было бы небезопасно, так что нет. Начните работу с собственными брокерами, следуя официальным руководствам Kafka.

Ваш код в настоящее время читает из топи c с именем test

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...