Как я могу обработать старые данные в теме кафки? - PullRequest
0 голосов
/ 26 ноября 2018

Я начинаю использовать потоковую структурированную искру.

Я получаю readStream из темы kafka (startOffset: latest) с waterMark, группирую по времени события с продолжительностью окна и пишу в тему kafka.

У меня вопрос: как я могу обработать данные, записанные в теме kafka перед заданием структурированного потокового вещания с помощью spark?

Сначала я попытался запустить с помощью `startOffset: ранние версии '.но данные в теме kafka слишком велики, поэтому процесс потоковой передачи не запускается из-за истечения времени ожидания пряжи.(хотя я увеличиваю значение тайм-аута)

1.Если я просто создаю пакетное задание и фильтрую по конкретному диапазону данных.результат не отражается в текущем состоянии искрового потока, кажется, есть проблема с согласованностью и точностью результата.

Я пытался сбросить каталог контрольных точек, но он не работал.

Как я могу обработать старые и большие данные?Помоги мне.

1 Ответ

0 голосов
/ 27 ноября 2018

вы можете попробовать параметр maxOffsetsPerTrigger для Kafka + Structured Streaming для получения старых данных от Kafka.Установите для этого параметра значение количества записей, которые вы хотите получить от Kafka за один раз.

Использование:

sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test-name")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 1)
      .option("group.id", "2")
      .option("auto.offset.reset", "earliest")
      .load()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...