Чтобы понять причину сбоя, вы должны признать, что DStreamFunctions.saveToCassandra
, как и DStream
операции вывода в общем, не является действием в строгом смысле.На практике просто вызывает foreachRDD
:
dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
, что в свою очередь :
Применитьфункция для каждого RDD в этом DStream.Это оператор вывода, поэтому «этот» DStream будет зарегистрирован как поток вывода и, следовательно, материализован.
Разница незначительная, но важная - операция зарегистрирована, но фактическое выполнение происходит в другом контексте, в более поздний момент времени.
Это означает, что во время выполнения не происходит сбоевпойман в точке, которую вы вызываете saveToCassandra
.
Как уже указывалось, try
или Try
будет содержать исключение драйвера, если оно применяется непосредственно к действию.Таким образом, вы, например, повторно внедрили бы saveToCassandra
как
dstream.foreachRDD(rdd => try {
rdd.sparkContext.runJob(rdd, writer.write _)
} catch {
case e: Exception => log.error("Error in saving data in Cassandra" + e. getMessage, e)
})
поток должен быть в состоянии продолжить, хотя текущий пакет будет полностью или частично потерян.
Это важноотметить, что это не то же самое, что перехват исходного исключения, которое будет выброшено, необнаружено и видно в журнале.Чтобы поймать проблему в ее источнике, вам нужно применить блок try
/ catch
непосредственно в средстве записи, и это, очевидно, не вариант при выполнении кода, над которым у вас нет контроля.
Уберите сообщение (уже указано в этой ветке) - обязательно очистите ваши данные, чтобы избежать известных источников сбоя.