У меня есть следующий код для сохранения JSON на Кассандре с помощью искры
ss.read().json("test_data.json").write()
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.option("table", table)
.option("keyspace", KEY_SPACE)
.option("confirm.truncate", true)
.save();
таблица имеет первичный ключ, а когда запись имеет нулевое значение для первичного ключа, save()
выдает исключение TypeConversionException Cannot convert object [null,null,null,null,null,n..., "test text test text"
type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to List[AnyRef]
для меня ясно, что эта запись должна быть либо отфильтрована, либо для записи в журнал, когда произошло исключение. для меня проблема в Я не могу найти способ поймать это исключение , и тогда я могу записать грязную запись.
sc.read().json("test_data.json").na().drop()
не помогает, потому что в записи есть данные.
Я обнаружил, что в коннекторе cassandra есть метод saveToCassandra()
, который может иметь способ реализовать обработчик исключений, но я не смог найти его в своей SparkSession.
SparkSession ss = SparkSession
.builder()
.config("spark.cassandra.connection.host", cassandraHost)
.config("spark.master", "local")
.getOrCreate();
Я использую последнюю версию Spark 2.3.2.