Потребитель Spark не читает сообщения производителя Kafka Scala - PullRequest
0 голосов
/ 25 апреля 2018

Я пытаюсь создать производителя Kafka, связанного с потребителем Spark. Производитель работает нормально, однако потребитель в Spark по какой-то причине не читает данные из темы. Я запускаю kafka с помощью изображения spotify / kafka в docker-compose.

Вот мой потребитель:

object SparkConsumer {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .master("local[*]")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
    val topic1 = "topic1"

    def kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val lines = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set(topic1), kafkaParams)
    )
    lines.print()
}

Производитель Кафки выглядит так:

object KafkaProducer {

  def main(args: Array[String]) {

    val events = 10
    val topic = "topic1"
    val brokers = "localhost:9092"
    val random = new Random()
    val props = new Properties()

    props.put("bootstrap.servers", brokers)
    props.put("client.id", "KafkaProducerExample")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    val t = System.currentTimeMillis()
    for (nEvents <- Range(0, events)) {
      val key = null
      val values = "2017-11-07 04:06:03"
      val data = new ProducerRecord[String, String](topic, key, values)

      producer.send(data)
      System.out.println("sent : " + data.value())
    }

    System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
    producer.close()
  }
}

UPDATE:

Мой файл docker-compose с Kafka:

version: '3.3'
services:
  kafka:
      image: spotify/kafka
      ports:
        - "9092:9092"

1 Ответ

0 голосов
/ 25 апреля 2018

Это распространенная проблема при использовании Kafka с Docker. Во-первых, вы должны проверить, какая конфигурация в zookeeper для вашей темы. Вы можете использовать скрипты Zookeeper внутри контейнера Kafka. Вероятно, когда ваша тема создана, ADVERTISED_HOST - это название вашей службы. Поэтому, когда потребитель пытается подключиться к брокеру, он возвращает «kafka» в качестве местоположения брокера. Поскольку вы работаете с потребителем вне сети докеров, ваш потребитель никогда не подключится к брокеру для потребления. Попробуйте установить env для вашего контейнера kafka с ADVERTISED_HOST = localhost.

...