У меня есть задание структурированной потоковой передачи Spark с включенной контрольной точкой, например
df.writeStream
.option("checkpointLocation", "s3://path/to/bucket/")
.forEachWriter(customForEachWriterImp)
.start()
. foreachwriter
предназначен для пропуска неверных записей, и мы создаем панель мониторинга на основе AWS Cloudtrail дляотслеживать пропущенные ошибочные записи, чтобы мы могли внести необходимые изменения в код и повторно развернуть, но поскольку смещение этой неверной записи уже проверено, Spark не будет пытаться снова прочитать этот объект из S3, хотя у нас есть новый код для обработки этихизначально неверные данные.
Причина в том, что мы не хотим пропустить обработку каких-либо записей, если только данные s3 не являются полностью плохими, для которых мы даже не будем повторно развертывать изменения кода (и игнорируем это какшум).
Так, например, в S3 json объект record1
, если поле a
предполагается равным integer
в соответствии с исходной схемой, тогда пользовательский Spark ForEachWriter
не сможет это сделатьзапись плохая, но логически это не плохая запись, поэтому мы хотим исправить код для обработки этого поля как double
, который также будет соответствовать исходной integer
и double
,поэтому мы повторно развернем код.
Теперь, когда мы повторно развернем, мы хотим, чтобы старая плохая запись на основе double
была повторно обработана, даже если ее смещение уже задано контрольной точкой в S3.
Задание Spark запущено наAmazon EMR, чтение из Amazon S3.