Исключение не перехватывается блоком try catch - PullRequest
0 голосов
/ 19 февраля 2019

Я экономлю DStream Кассандре.В Кассандре есть столбец с типом данных map<text, text>.Cassandra не поддерживает значение null в Map, но в потоке может появиться значение NULL.

Я добавил try catch, если что-то пошло не так, но программа остановилась, несмотря на это, и я не вижу сообщения об ошибке в журнале:

   try {
      cassandraStream.saveToCassandra("table", "keyspace")
    } catch {
      case e: Exception => log.error("Error in saving data in Cassandra" + e.getMessage, e)
    }

Исключение

Caused by: java.lang.NullPointerException: Map values cannot be null
    at com.datastax.driver.core.TypeCodec$AbstractMapCodec.serialize(TypeCodec.java:2026)
    at com.datastax.driver.core.TypeCodec$AbstractMapCodec.serialize(TypeCodec.java:1909)
    at com.datastax.driver.core.AbstractData.set(AbstractData.java:530)
    at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
    at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnUnset(BoundStatementBuilder.scala:73)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$6.apply(BoundStatementBuilder.scala:84)
    at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$bind$1.apply$mcVI$sp(BoundStatementBuilder.scala:106)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:101)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)
    at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    ... 3 more

Я хотел бы знать, почему программа остановилась, несмотря на блок try / catch.Почему исключение не поймано?

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

Чтобы понять причину сбоя, вы должны признать, что DStreamFunctions.saveToCassandra, как и DStream операции вывода в общем, не является действием в строгом смысле.На практике просто вызывает foreachRDD:

dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))

, что в свою очередь :

Применитьфункция для каждого RDD в этом DStream.Это оператор вывода, поэтому «этот» DStream будет зарегистрирован как поток вывода и, следовательно, материализован.

Разница незначительная, но важная - операция зарегистрирована, но фактическое выполнение происходит в другом контексте, в более поздний момент времени.

Это означает, что во время выполнения не происходит сбоевпойман в точке, которую вы вызываете saveToCassandra.

Как уже указывалось, try или Try будет содержать исключение драйвера, если оно применяется непосредственно к действию.Таким образом, вы, например, повторно внедрили бы saveToCassandra как

dstream.foreachRDD(rdd => try { 
  rdd.sparkContext.runJob(rdd, writer.write _) 
} catch {
  case e: Exception => log.error("Error in saving data in Cassandra" + e. getMessage, e)
})

поток должен быть в состоянии продолжить, хотя текущий пакет будет полностью или частично потерян.

Это важноотметить, что это не то же самое, что перехват исходного исключения, которое будет выброшено, необнаружено и видно в журнале.Чтобы поймать проблему в ее источнике, вам нужно применить блок try / catch непосредственно в средстве записи, и это, очевидно, не вариант при выполнении кода, над которым у вас нет контроля.

Уберите сообщение (уже указано в этой ветке) - обязательно очистите ваши данные, чтобы избежать известных источников сбоя.

0 голосов
/ 19 февраля 2019

Проблема в том, что вы не уловили исключение, которое, как вы думаете, вы делаете.Код, который вы получите, будет перехватывать исключение драйвера, и фактически код, структурированный таким образом, сделает это.

Это, однако, не означает, что

программа никогда не должна останавливаться.

Хотя сбой драйвера, который может быть следствием фатального сбоя исполнителя, содержится, и драйвер может корректно завершить работу, поток как таковой уже исчез.Поэтому ваш код завершается, потому что поток больше не запускается.

Если рассматриваемый код находился под вашим контролем, обработка исключений должна быть делегирована задаче, но в случае стороннего кода неттакая опция.

Вместо этого вы должны проверить свои данные и удалить проблемные записи, прежде чем они будут переданы saveToCassandra.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...