Я определил приложение Spark для потребителей и производителей Kafka. Однако сообщения не попадают в Spark. Поэтому я не уверен, где именно проблема - на стороне потребителя или производителя. Я развернул Kafka с Kubernetes и определил службы NodePort для реестра схемы и Kafka, ссылки на которые я определил как серверы начальной загрузки и URL-адрес реестра схемы.
В чем может быть причина этой ошибки?
Конфигурация Кафки:
val props = new Properties()
props.put("bootstrap.servers", "192.168.99.100:32400")
props.put("client.id", "avro_data")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("schema.registry.url", "http://192.168.99.100:32500")
val producer = new KafkaProducer[String, Object](props)
val data = new ProducerRecord[String, Object]("test_topic", getRandomAvroRecord())
producer.send(data)
Конфигурация искры:
def kafkaParams() = Map[String, Object](
"bootstrap.servers" -> "192.168.99.100:32400",
"schema.registry.url" -> "http://192.168.99.100:32500",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "avro_data",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = KafkaUtils.createDirectStream[String, Object](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, Object](Array("test_topic"), kafkaParams())
)
Определение службы порта узла для Kafka (аналогично для реестра схемы):
kind: Service
apiVersion: v1
metadata:
name: kafka-service
spec:
selector:
app: cp-kafka
release: my-confluent-oss
ports:
- protocol: TCP
targetPort: 9092
port: 32400
nodePort: 32400
type: NodePort
ОБНОВЛЕНИЕ: я вижу такие сообщения в журнале Spark Application:
INFO ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Revoking previously assigned partitions []
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] (Re-)joining group
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2147483645 (/172.17.0.17:9092) could not be established. Broker may not be available.
INFO AbstractCoordinator: [Consumer clientId=consumer-1, groupId=avro_data] Group coordinator 172.17.0.17:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 1 (/172.17.0.14:9092) could not be established. Broker may not be available.
WARN NetworkClient: [Consumer clientId=consumer-1, groupId=avro_data] Connection to node 2 (/172.17.0.17:9092) could not be established. Broker may not be available.
Конфигурации Kafka StatefulSet:
Command:
sh
-exc
export KAFKA_BROKER_ID=${HOSTNAME##*-} && \
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092,EXTERNAL://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID})) && \
exec /etc/confluent/docker/run
Environment:
POD_IP: (v1:status.podIP)
HOST_IP: (v1:status.hostIP)
KAFKA_ZOOKEEPER_CONNECT: my-confluent-oss-cp-zookeeper-headless:2181
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://${HOST_IP}:$((31090 + ${KAFKA_BROKER_ID}))
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT