Как это может быть возможно? дубликаты записей в очереди Кафки? - PullRequest
0 голосов
/ 13 февраля 2020

Я использую Apache Nifi, Spark и Kafka для отправки сообщений между ними. Прежде всего, я беру данные с Nifi и отправляю их в Spark для их обработки. Затем я снова отправляю данные из Spark в Nifi, чтобы вставить их в БД.

Моя проблема в том, что каждый раз, когда я запускаю Spark, я получаю одни и те же записи 3.142. У меня первая часть Nifi остановлена, вторая запущена, и каждый раз, когда я запускаю Spark, у меня одни и те же записи 3.142., И теперь я не могу понять эти данные.

Откуда это?

Я пытался проверить, есть ли у меня данные о Кафке-Очередь-I (от Нифи до Спарка) или Кафке-Очередь-II (от Искры до NiFi), но в обоих случаях ответ НЕТ. Только когда я запускаю Spark, появляется в записях Kafka-Queue-II 3.142, но это не происходит в Kafka-Queue-I ...

В Nifi, PublishKafka_1_0 1.7.0:

PublishKafka_1_0 1.7.0

В Spark, KafkaConsumer:

val raw_data = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_servers)
  .option("group.id", "spark_kafka_consumer")
  .option("startingOffsets", "latest")
  .option("enable.auto.commit", true)
  option("failOnDataLoss", "false")
  .option("subscribe", input_topic)
  .load()

Это показывает мне много ложной тревоги ... False Alarm

Некоторый процесс ...

var parsed_data = raw_data
  .select(from_json(col("value").cast("string"), schema).alias("data"))
  .select("data.*")
  . ...

Источник Кафки в Spark

var query = parsed_data
  .select(to_json(schema_out).alias("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_servers)
  .option("zookeeper.connect", zookeeper_servers)
  .option("checkpointLocation", checkpoint_location)
  .option("topic", output_topic)
  .start()
query.awaitTermination()

И, KafkaConsumer 1.7.0 в NiFi

NiFi_Kafka_Consumer

1 Ответ

0 голосов
/ 13 февраля 2020

Я подозреваю, что вы используете новую (автоматически сгенерированную) группу потребителей каждый раз, когда запускаете Spark, и у вас самая ранняя политика сброса смещения. В результате Spark запускается с начала topi c каждый раз.

Kafka не удаляет сообщения из topi c при его использовании (в отличие от других систем pub-sub). Чтобы не видеть старые сообщения, вам нужно установить группу потребителей и заставить Spark фиксировать смещения в процессе обработки. Эти смещения сохраняются, и затем в следующий раз, когда потребитель начинает с этой группы, он извлечет из последнего сохраненного смещения для этой группы.

Я также хотел бы отметить, что Кафка, за пределами некоторых очень специфических c шаблоны использования и технологии выбора, не обещает "точно один раз" обмен сообщениями, а вместо этого "по крайней мере один раз"; Мой общий совет - стараться быть терпимым к дублирующимся записям.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...