Spark Structured Streaming + Kafka: остановка сбоя запроса, когда сообщение Kafka не соответствует схеме JSON - PullRequest
0 голосов
/ 07 сентября 2018

В некотором роде пост , который у меня был месяц назад.У меня есть искробезопасное приложение для парения, которое я читаю от Kafka.Вот основная структура моего кода.

Я создаю сеанс искры.

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

Затем я читаю из потока

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

В записи Кафки яприведите «значение» в виде строки.Он преобразует из двоичного в строку.

val df = data_stream
    .select($"value".cast("string") as "json")

Основываясь на заранее определенной схеме, я пытаюсь разбить структуру json на столбцы.Однако проблема здесь в том, что если данные «плохие» или имеют другой формат, то они не соответствуют определенной схеме.Мне нужно отфильтровать строки, которые не соответствуют моей схеме.Независимо от того, являются ли они нулем, числа, какой-нибудь случайный текст, такой как «привет».Если это не json, то он не должен переходить к следующему процессу фрейма данных

val df2 = df.select(from_json($"json", schema) as "data")
  .select("data.*")

, если я передаю пустое сообщение kafka через производителя консоли, происходит сбой запроса Spark, дающий

java.util.NoSuchElementException: глава пустого списка в scala.collection.immutable.Nil $ .head (List.scala: 420) в scala.collection.immutable.Nil $ .head (List.scala: 417) в орг.apache.spark.sql.catalyst.expressions.JsonToStruct.nullSafeEval (jsonExpressions.scala: 500) в org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval (Expression.scala: 325) в org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificPredicate.eval (неизвестный источник) по адресу org.apache.spark.sql.execution.FilterExec $$ anonfun $ 17 $$ anonfun $ применить $ 2.apply (basicPhysicalOperators.scala: 219) в org.apache.spark.sql.execution.FilterExec $$ anonfun $ 17 $$ anonfun $ apply $ 2.apply (basicPhysicalOperators.scala: 218) в scala.collection.Iterator $$ anon $ 13.hasNext (Iterator.scala: 463) в scala.collection.Iterator $$ Анон $ 11.hasNext (Iterator.scala: 408) в org.apache.spark.sql.execution.streaming.ForeachSink $$ anonfun $ addBatch $ 1.apply (ForeachSink.scala: 52) в org.apache.spark.sql.execution.streaming.ForeachSink $$ anonfun $ addBatch $ 1.apply (ForeachSink.scala: 49) в org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ применить $ 29.apply (RDD.scala: 925) в орг.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply (RDD.scala: 925) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 1944) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 1944) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run (Task.scala: 99)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...