Spark структурированный поток с контрольной точки Kafka и подтверждение - PullRequest
0 голосов
/ 24 апреля 2019

В моем приложении структурированного потокового вещания я читаю сообщения от Кафки, фильтрую их и, наконец, сохраняю в Кассандре. Я использую спарк 2.4.1. Из структурированной потоковой документации

Семантика отказоустойчивости Предоставление сквозной семантики "точно один раз" было одной из ключевых целей разработки структурированного потокового вещания. Чтобы достичь этого, мы разработали источники структурированной потоковой передачи, приемники и механизм выполнения для надежного отслеживания точного хода обработки, чтобы он мог обрабатывать любые виды сбоев путем перезапуска и / или повторной обработки. Предполагается, что каждый потоковый источник имеет смещения (аналогичные смещениям Кафки или порядковым номерам Kinesis) для отслеживания позиции чтения в потоке. Движок использует журналы контрольных точек и записи с опережением для записи диапазона смещения данных, обрабатываемых в каждом триггере. Потоковые приемники спроектированы так, чтобы быть идепотентными для обработки переработки. Совместно, используя воспроизводимые источники и идемпотентные приемники, структурированная потоковая передача может обеспечить сквозную семантику ровно один раз при любом сбое.

Но я не уверен, как Спарк на самом деле этого добивается. В моем случае, если кластер Cassandra не работает, что приводит к сбоям в операции записи, контрольная точка для Kafka не записывает эти смещения.

Основано ли смещение контрольной точки Kafka только на успешных чтениях из Kafka, или вся операция, включая запись, рассматривается для каждого сообщения?

Ответы [ 2 ]

1 голос
/ 24 апреля 2019

Spark использует несколько файлов журнала для обеспечения отказоустойчивости.Для вашего запроса важны журнал смещения и журнал фиксации.из класса StreamExecution doc:

 /**
   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
   * that a given batch will always consist of the same data, we write to this log *before* any
   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
   */
  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

  /**
   * A log that records the batch ids that have completed. This is used to check if a batch was
   * fully processed, and its output was committed to the sink, hence no need to process it again.
   * This is used (for instance) during restart, to help identify which batch to run next.
   */
  val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))

, поэтому при чтении из Kafka записывает смещения в offsetLog и только после обработки данных и записи в приемник (втвой случай Кассандра) он записывает смещения в commitLog.

0 голосов
/ 24 апреля 2019

Spark Structured Streaming не передает смещения на kafka, как это сделал бы «обычный» потребитель kafka.Spark управляет смещениями внутренне с помощью механизма контрольных точек.

Посмотрите на первый ответ на следующий вопрос, который дает хорошее объяснение того, как состояние управляется с помощью контрольных точек и журнала коммитов: Как получить Кафкусмещения для структурированного запроса для ручного и надежного управления смещениями?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...