В настоящее время я использую два пользовательских ридера, использующих API V2 для работы с потоковой структурой с искрой. После выполнения задания в течение ~ 30-60 минут оно бомбит:
Caused by: java.lang.RuntimeException: Offsets committed out of order: 608799 followed by 2982
Я повторно использую найденные примеры здесь , и это бомбардировка на линии: 206.
Вместо того, чтобы использовать поток Twitter, который представлен в примере, я реализую его для JMS & SQS.
Мой вопрос: кто-нибудь сталкивался с этой проблемой? Или что-то не так с этой реализацией?
Фрагмент кода:
override def commit(end: Offset): Unit = {
internalLog(s"** commit($end) lastOffsetCommitted: $lastOffsetCommitted")
val newOffset = TwitterOffset.convert(end).getOrElse(
sys.error(s"TwitterStreamMicroBatchReader.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
)
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}
tweetList.trimStart(offsetDiff)
lastOffsetCommitted = newOffset
}
Я не могу найти ответ с моими обычными выходами. Я, однако, вижу этот . Был сделан вывод об удалении данных контрольных точек, что не представляется приемлемым решением в производственной системе. Другой был то, что исходная система не поддерживает информацию о смещении? У меня сложилось впечатление, что искра будет сама обрабатывать информацию о смещении. Если проблема заключается во втором пункте, как я могу убедиться, что исходная система обрабатывает эту парадигму.
Пожалуйста, дайте мне знать, если я смогу предоставить больше информации.
Редактировать: Глядя на интерфейс MicroBatchReader, документация для коммита гласит:
/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
Таким образом, возникает вопрос, почему искра посылает мне смещения коммитов, которые уже были зафиксированы?