В пакетных заданиях спарк обычно у меня есть источник данных JSON, записанный в файл, и я могу использовать функции поврежденных столбцов считывателя DataFrame для записи поврежденных данных в отдельном месте, а другой считыватель - для записи действительных данных из одного и того же источника.работа.(Данные записаны как паркет)
Но в Spark Structred Streaming я сначала читаю поток через kafka в виде строки, а затем использую from_json, чтобы получить мой DataFrame.Затем from_json использует JsonToStructs, который использует режим FailFast в анализаторе и не возвращает непарсированную строку в столбец в DataFrame.(см. примечание в статье). Как я могу записать поврежденные данные, которые не соответствуют моей схеме и, возможно, неверный JSON, в другое место, используя SSS?
Наконец, в пакетном задании одно и то же задание может записать оба кадра данных.Но Spark Structured Streaming требует специальной обработки для нескольких приемников.Затем в Spark 2.3.1 (моя текущая версия) мы должны включить подробности о том, как правильно записывать как поврежденные, так и недействительные потоки ...
Ref: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html
val rawKafkaDataFrame=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.broker)
.option("kafka.ssl.truststore.location", path.toString)
.option("kafka.ssl.truststore.password", config.pass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.security.protocol", "SSL")
.option("subscribe", config.topic)
.option("startingOffsets", "earliest")
.load()
val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")