Datasource V2 Reader (Spark Structured Streaming) - смещения не в порядке - PullRequest
0 голосов
/ 22 мая 2019

В настоящее время я использую два пользовательских ридера, использующих API V2 для работы с потоковой структурой с искрой. После выполнения задания в течение ~ 30-60 минут оно бомбит:

Caused by: java.lang.RuntimeException: Offsets committed out of order: 608799 followed by 2982

Я повторно использую найденные примеры здесь , и это бомбардировка на линии: 206.

Вместо того, чтобы использовать поток Twitter, который представлен в примере, я реализую его для JMS & SQS.

Мой вопрос: кто-нибудь сталкивался с этой проблемой? Или что-то не так с этой реализацией?

Фрагмент кода:

override def commit(end: Offset): Unit = {
    internalLog(s"** commit($end) lastOffsetCommitted: $lastOffsetCommitted")

    val newOffset = TwitterOffset.convert(end).getOrElse(
      sys.error(s"TwitterStreamMicroBatchReader.commit() received an offset ($end) that did not " +
        s"originate with an instance of this class")
    )

    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

    if (offsetDiff < 0) {
      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
    }

    tweetList.trimStart(offsetDiff)
    lastOffsetCommitted = newOffset
}

Я не могу найти ответ с моими обычными выходами. Я, однако, вижу этот . Был сделан вывод об удалении данных контрольных точек, что не представляется приемлемым решением в производственной системе. Другой был то, что исходная система не поддерживает информацию о смещении? У меня сложилось впечатление, что искра будет сама обрабатывать информацию о смещении. Если проблема заключается во втором пункте, как я могу убедиться, что исходная система обрабатывает эту парадигму.

Пожалуйста, дайте мне знать, если я смогу предоставить больше информации.

Редактировать: Глядя на интерфейс MicroBatchReader, документация для коммита гласит:

    /**
     * Informs the source that Spark has completed processing all data for offsets less than or
     * equal to `end` and will only request offsets greater than `end` in the future.
     */
    void commit(Offset end);

Таким образом, возникает вопрос, почему искра посылает мне смещения коммитов, которые уже были зафиксированы?

1 Ответ

0 голосов
/ 30 мая 2019

Отвечая на мой собственный вопрос, если кому-то это поможет,

Мне следовало добавить дополнительную информацию к вопросу - это задание выполняется в EMR и использует EFS для проверки данных.

Проблема возникла, когда я использовал Amazon 100 * для монтирования EFS.По какой-то причине каждый работник не мог видеть операции чтения и записи других работников - как будто EFS не монтировалась.

Решением было переключиться на nfs-utils для монтирования EFS (согласно инструкциям AWS), чтобы каждый работник мог точно прочитать данные контрольной точки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...