Потребитель Spark не читает сообщения Кафки Scala - PullRequest
0 голосов
/ 22 января 2019

Я определил приложение Spark для потребителей и производителей Kafka. Однако сообщения не попадают в Spark. Поэтому я не уверен, где именно проблема - на стороне потребителя или производителя. Я развернул Kafka с Kubernetes и определил службы NodePort для реестра схемы и Kafka, ссылки на которые я определил как серверы начальной загрузки и URL-адрес реестра схемы. В чем может быть причина этой ошибки?

Конфигурация Кафки:

  val props = new Properties()
  props.put("bootstrap.servers", "192.168.99.100:32400")
  props.put("client.id", "avro_data")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("schema.registry.url", "http://192.168.99.100:32500")
val producer = new KafkaProducer[String, Object](props)

val data = new ProducerRecord[String, Object]("test_topic", getRandomAvroRecord())
producer.send(data)

Конфигурация искры:

def kafkaParams() = Map[String, Object](
      "bootstrap.servers" -> "192.168.99.100:32400",
      "schema.registry.url" -> "http://192.168.99.100:32500",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[KafkaAvroDeserializer],
      "group.id" -> "avro_data",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

   val lines = KafkaUtils.createDirectStream[String, Object](
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, Object](Array("test_topic"), kafkaParams())
    )

Определение службы порта узла для Kafka (аналогично для реестра схемы):

kind: Service
apiVersion: v1
metadata:
  name: kafka-service
spec:
  selector:
    app: cp-kafka
    release: my-confluent-oss
  ports:
    - protocol: TCP
      targetPort: 9092
      port: 32400
      nodePort: 32400
  type: NodePort

ОБНОВЛЕНИЕ: я вижу такие сообщения в журнале Spark Application:

INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Revoking previously assigned partitions []
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] (Re-)joining group
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2147483645 (/172.17.0.17:9092) could not be established. Broker may not be available.
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Group coordinator 172.17.0.17:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 1 (/172.17.0.14:9092) could not be established. Broker may not be available.
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2 (/172.17.0.17:9092) could not be established. Broker may not be available.

Конфигурации Kafka StatefulSet:

 Command:
      sh
      -exc
      export KAFKA_BROKER_ID=${HOSTNAME##*-} && \
      export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092,EXTERNAL://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID})) && \
      exec /etc/confluent/docker/run

    Environment:
      POD_IP:                                   (v1:status.podIP)
      HOST_IP:                                  (v1:status.hostIP)
      KAFKA_ZOOKEEPER_CONNECT:                 my-confluent-oss-cp-zookeeper-headless:2181
      KAFKA_ADVERTISED_LISTENERS:              EXTERNAL://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID}))
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:    PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...