СПАРК: Повторная обработка ошибочных записей с новыми изменениями кода для обработки этих сбоев? - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть задание структурированной потоковой передачи Spark с включенной контрольной точкой, например

df.writeStream .option("checkpointLocation", "s3://path/to/bucket/") .forEachWriter(customForEachWriterImp) .start()

. foreachwriter предназначен для пропуска неверных записей, и мы создаем панель мониторинга на основе AWS Cloudtrail дляотслеживать пропущенные ошибочные записи, чтобы мы могли внести необходимые изменения в код и повторно развернуть, но поскольку смещение этой неверной записи уже проверено, Spark не будет пытаться снова прочитать этот объект из S3, хотя у нас есть новый код для обработки этихизначально неверные данные.

Причина в том, что мы не хотим пропустить обработку каких-либо записей, если только данные s3 не являются полностью плохими, для которых мы даже не будем повторно развертывать изменения кода (и игнорируем это какшум).

Так, например, в S3 json объект record1, если поле a предполагается равным integer в соответствии с исходной схемой, тогда пользовательский Spark ForEachWriter не сможет это сделатьзапись плохая, но логически это не плохая запись, поэтому мы хотим исправить код для обработки этого поля как double, который также будет соответствовать исходной integer и double,поэтому мы повторно развернем код.

Теперь, когда мы повторно развернем, мы хотим, чтобы старая плохая запись на основе double была повторно обработана, даже если ее смещение уже задано контрольной точкой в ​​S3.

Задание Spark запущено наAmazon EMR, чтение из Amazon S3.

1 Ответ

0 голосов
/ 07 декабря 2018

Единственный известный мне способ повторной обработки после контрольной точки - это запуск без контрольной точки или установка нового пустого каталога контрольных точек.Это переработает все.

...