В некотором роде пост , который у меня был месяц назад.У меня есть искробезопасное приложение для парения, которое я читаю от Kafka.Вот основная структура моего кода.
Я создаю сеанс искры.
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
Затем я читаю из потока
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
В записи Кафки яприведите «значение» в виде строки.Он преобразует из двоичного в строку.
val df = data_stream
.select($"value".cast("string") as "json")
Основываясь на заранее определенной схеме, я пытаюсь разбить структуру json на столбцы.Однако проблема здесь в том, что если данные «плохие» или имеют другой формат, то они не соответствуют определенной схеме.Мне нужно отфильтровать строки, которые не соответствуют моей схеме.Независимо от того, являются ли они нулем, числа, какой-нибудь случайный текст, такой как «привет».Если это не json, то он не должен переходить к следующему процессу фрейма данных
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
, если я передаю пустое сообщение kafka через производителя консоли, происходит сбой запроса Spark, дающий
java.util.NoSuchElementException: глава пустого списка в scala.collection.immutable.Nil $ .head (List.scala: 420) в scala.collection.immutable.Nil $ .head (List.scala: 417) в орг.apache.spark.sql.catalyst.expressions.JsonToStruct.nullSafeEval (jsonExpressions.scala: 500) в org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval (Expression.scala: 325) в org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificPredicate.eval (неизвестный источник) по адресу org.apache.spark.sql.execution.FilterExec $$ anonfun $ 17 $$ anonfun $ применить $ 2.apply (basicPhysicalOperators.scala: 219) в org.apache.spark.sql.execution.FilterExec $$ anonfun $ 17 $$ anonfun $ apply $ 2.apply (basicPhysicalOperators.scala: 218) в scala.collection.Iterator $$ anon $ 13.hasNext (Iterator.scala: 463) в scala.collection.Iterator $$ Анон $ 11.hasNext (Iterator.scala: 408) в org.apache.spark.sql.execution.streaming.ForeachSink $$ anonfun $ addBatch $ 1.apply (ForeachSink.scala: 52) в org.apache.spark.sql.execution.streaming.ForeachSink $$ anonfun $ addBatch $ 1.apply (ForeachSink.scala: 49) в org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ применить $ 29.apply (RDD.scala: 925) в орг.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply (RDD.scala: 925) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 1944) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 1944) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run (Task.scala: 99)