запись поврежденных данных из источника данных kafka / json в структурированной потоковой передаче - PullRequest
0 голосов
/ 27 декабря 2018

В пакетных заданиях спарк обычно у меня есть источник данных JSON, записанный в файл, и я могу использовать функции поврежденных столбцов считывателя DataFrame для записи поврежденных данных в отдельном месте, а другой считыватель - для записи действительных данных из одного и того же источника.работа.(Данные записаны как паркет)

Но в Spark Structred Streaming я сначала читаю поток через kafka в виде строки, а затем использую from_json, чтобы получить мой DataFrame.Затем from_json использует JsonToStructs, который использует режим FailFast в анализаторе и не возвращает непарсированную строку в столбец в DataFrame.(см. примечание в статье). Как я могу записать поврежденные данные, которые не соответствуют моей схеме и, возможно, неверный JSON, в другое место, используя SSS?

Наконец, в пакетном задании одно и то же задание может записать оба кадра данных.Но Spark Structured Streaming требует специальной обработки для нескольких приемников.Затем в Spark 2.3.1 (моя текущая версия) мы должны включить подробности о том, как правильно записывать как поврежденные, так и недействительные потоки ...

Ref: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html

val rawKafkaDataFrame=spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", config.broker)
  .option("kafka.ssl.truststore.location", path.toString)
  .option("kafka.ssl.truststore.password", config.pass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.security.protocol", "SSL")
  .option("subscribe", config.topic)
  .option("startingOffsets", "earliest")

  .load()

val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))

// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")

1 Ответ

0 голосов
/ 27 декабря 2018

Когда вы преобразуете в json из строки, и если он не сможет проанализировать предоставленную схему, он вернет ноль.Вы можете отфильтровать нулевые значения и выбрать строку.Как то так.

val jsonDF =  jsonDataFrame.withColumn("json", from_json(col("value"), schema))
val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...