Как записать некорректные (поврежденные) записи JSON в (Py) структурированной потоковой передаче Spark? - PullRequest
2 голосов
/ 10 ноября 2019

У меня есть Azure Eventhub , который выполняет потоковую передачу данных (в формате JSON). Я читаю его как фрейм данных Spark, анализирую входящее "тело" с from_json(col("body"), schema), где schema предопределено. В коде это выглядит следующим образом:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType().add(...) # define the incoming JSON schema 

df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"), schema)
)

И теперь = если есть некоторая несогласованность между входящей схемой JSON и определенной схемой (например, исходный концентратор событий начинает отправлять данные в новом форматебез уведомления), функции from_json() не будут выдавать ошибку = вместо этого они будут помещать NULL в поля, которые присутствуют в моем определении schema, но отсутствуют в отправляемом JSONs концентраторе событий .

Я хочу получить эту информацию и записать ее где-нибудь (журнал Spark log4j, Azure Monitor, электронное письмо с предупреждением, ...).

Мой вопрос: как лучше всего этого добиться?

Некоторые мои мысли:

  1. Перваяя могу думать о том, чтобы иметь UDF, который проверяет NULLs и, если есть какие-либо проблемы, вызывает исключение. Я полагаю, что там невозможно отправить логи в log4j через PySpark, так как контекст «искры» не может быть инициирован в UDF (на рабочих), и каждый хочет использовать значение по умолчанию:

    log4jLogger = sc. _jvm.org.apache.log4j logger = log4jLogger.LogManager.getLogger ('PySpark Logger')

  2. Второе, о чем я могу подумать, это использовать функцию «foreach / foreachBatch» и поместить этопроверь логику там.

Но я чувствую, что оба эти подхода похожи ... слишком уж обычны - я надеялся, что в Spark есть что-то встроенное для этих целей.

1 Ответ

1 голос
/ 10 ноября 2019

tl; dr Вы должны выполнить эту логику проверки самостоятельно, используя foreach или foreachBatch операторы.


Оказывается, я ошибался, думая, что columnNameOfCorruptRecordВариант может быть ответом. Он не будет работать.

Во-первых, он не будет работать из-за этого :

case _: BadRecordException => null

А во-вторых, из-за этого , который простоотключает любые другие режимы синтаксического анализа (включая PERMISSIVE, который, кажется, используется вместе с опцией columnNameOfCorruptRecord):

new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))

Другими словами, ваш единственный вариант - использовать второй элемент в вашем списке, т.е. foreach или foreachBatch и обрабатывать поврежденные записи самостоятельно.

Решение может использовать from_json при сохранении исходного столбца body. Любая запись с неправильным JSON будет заканчиваться столбцом результата null и foreach* будет перехватывать ее, например,

def handleCorruptRecords:
  // if json == null the body was corrupt
  // handle it

df_stream_input = (spark
  .readStream
  .format("eventhubs")
  .options(**ehConfInput)
  .load()
  .select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...