Я новичок в докере.Я пытаюсь запустить потоковое приложение с использованием докера.
У меня есть приложение kafka и потоковая передача Spark, работающее отдельно в 2 контейнерах.
Мой сервис kafka запущен и работает нормально.Я тестировал с $ KAFKA_HOME / bin / kafka-console-producer.sh и $ KAFKA_HOME / bin / kafka-console-consumer.sh.Я могу получать сообщения.
Но когда я запускаю мое приложение для потокового воспроизведения, оно показывает:
[Consumer clientId=consumer-1, groupId=consumer-spark] Connection to node -1 could not be established. Broker may not be available.
Итак, я не могуиспользовать сообщения.
kafka: docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://:9092
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Код потоковой передачи Spark:
val sparkConf = new SparkConf().setAppName("Twitter Ingest Data")
sparkConf.setIfMissing("spark.master", "local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaTopics = "sentiment"
val kafkaBroker = "kafka:9092"
val topics : Set[String] = kafkaTopics.split(",").map(_.trim).toSet
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> kafkaBroker,
"group.id" -> "consumer-spark",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
logger.info("Connecting to broker...")
logger.info(s"kafkaParams: $kafkaParams")
val tweetStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
Я не уверен, что что-то упустил.
Любая помощь будет высоко оценена !!