Я использую Apache Nifi, Spark и Kafka для отправки сообщений между ними. Прежде всего, я беру данные с Nifi и отправляю их в Spark для их обработки. Затем я снова отправляю данные из Spark в Nifi, чтобы вставить их в БД.
Моя проблема в том, что каждый раз, когда я запускаю Spark, я получаю одни и те же записи 3.142. У меня первая часть Nifi остановлена, вторая запущена, и каждый раз, когда я запускаю Spark, у меня одни и те же записи 3.142., И теперь я не могу понять эти данные.
Откуда это?
Я пытался проверить, есть ли у меня данные о Кафке-Очередь-I (от Нифи до Спарка) или Кафке-Очередь-II (от Искры до NiFi), но в обоих случаях ответ НЕТ. Только когда я запускаю Spark, появляется в записях Kafka-Queue-II 3.142, но это не происходит в Kafka-Queue-I ...
В Nifi, PublishKafka_1_0 1.7.0:
В Spark, KafkaConsumer:
val raw_data = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("group.id", "spark_kafka_consumer")
.option("startingOffsets", "latest")
.option("enable.auto.commit", true)
option("failOnDataLoss", "false")
.option("subscribe", input_topic)
.load()
Это показывает мне много ложной тревоги ...
Некоторый процесс ...
var parsed_data = raw_data
.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.*")
. ...
Источник Кафки в Spark
var query = parsed_data
.select(to_json(schema_out).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("zookeeper.connect", zookeeper_servers)
.option("checkpointLocation", checkpoint_location)
.option("topic", output_topic)
.start()
query.awaitTermination()
И, KafkaConsumer 1.7.0 в NiFi