Kafka Spark Streaming: Брокер может быть недоступен [Докер] - PullRequest
0 голосов
/ 17 февраля 2019

Я новичок в докере.Я пытаюсь запустить потоковое приложение с использованием докера.

У меня есть приложение kafka и потоковая передача Spark, работающее отдельно в 2 контейнерах.

Мой сервис kafka запущен и работает нормально.Я тестировал с $ KAFKA_HOME / bin / kafka-console-producer.sh и $ KAFKA_HOME / bin / kafka-console-consumer.sh.Я могу получать сообщения.

Но когда я запускаю мое приложение для потокового воспроизведения, оно показывает:

[Consumer clientId=consumer-1, groupId=consumer-spark] Connection to node -1 could not be established. Broker may not be available.

Итак, я не могуиспользовать сообщения.

kafka: docker-compose.yml

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://:9092
    depends_on: 
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Код потоковой передачи Spark:

val sparkConf = new SparkConf().setAppName("Twitter Ingest Data")
    sparkConf.setIfMissing("spark.master", "local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaTopics = "sentiment"
    val kafkaBroker = "kafka:9092"

    val topics : Set[String] = kafkaTopics.split(",").map(_.trim).toSet
    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> kafkaBroker,
              "group.id" -> "consumer-spark",
              "key.deserializer" -> classOf[StringDeserializer],
              "value.deserializer" -> classOf[StringDeserializer]
    )

    logger.info("Connecting to broker...")
    logger.info(s"kafkaParams: $kafkaParams")

    val tweetStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))

Я не уверен, что что-то упустил.

Любая помощь будет высоко оценена !!

1 Ответ

0 голосов
/ 18 февраля 2019

Если вы новичок в Docker, я не рекомендовал бы, чтобы Kafka или Spark были первыми вещами, с которыми вы пытаетесь это сделать.Кроме того, похоже, что вы только что скопировали пример wurstmeister, не читая README о его настройке ... (что я могу сказать, потому что вам не нужно свойство build: ., потому что этот контейнер уже существует в DockerHub)

По сути, Kafka доступен только в вашей сети Docker через эту конфигурацию

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

Вам нужно будет отредактировать это, чтобы переадресация портов работала должным образом из-за пределов сети Docker Compose по умолчанию,или вы также должны запустить свой код Spark в контейнере.

Если код Spark отсутствует в контейнере, то указание на kafka:9092 не будет работать вообще

Ссылка. Слушатели Кафки объяснили

И множество предыдущих вопросов с похожими проблемами (проблема связана не только с Spark)

...