tl; dr Вы должны выполнить эту логику проверки самостоятельно, используя foreach
или foreachBatch
операторы.
Оказывается, я ошибался, думая, что columnNameOfCorruptRecord
Вариант может быть ответом. Он не будет работать.
Во-первых, он не будет работать из-за этого :
case _: BadRecordException => null
А во-вторых, из-за этого , который простоотключает любые другие режимы синтаксического анализа (включая PERMISSIVE
, который, кажется, используется вместе с опцией columnNameOfCorruptRecord
):
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
Другими словами, ваш единственный вариант - использовать второй элемент в вашем списке, т.е. foreach
или foreachBatch
и обрабатывать поврежденные записи самостоятельно.
Решение может использовать from_json
при сохранении исходного столбца body
. Любая запись с неправильным JSON будет заканчиваться столбцом результата null
и foreach*
будет перехватывать ее, например,
def handleCorruptRecords:
// if json == null the body was corrupt
// handle it
df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()