прочитать таблицу cassandra с UDT с нулевыми значениями и сопоставить с классом Scala case в Spark - PullRequest
0 голосов
/ 05 мая 2020

Ошибка показывает:

Вызвано: java .lang.NullPointerException: запрошен TypeTag для GettableToMappedTypeConverter, который не может десериализовать TypeTags из-за Scala 2.10 ограничения TypeTag. Они возвращаются как пустые значения, и поэтому вы видите этот NPE.

gradle.build

dependencies {
    implementation group: 'org.scala-lang', name: 'scala-library', version: '2.12.11'
    implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.5'
    implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.5'
    implementation group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.12', version: '2.5.0'
    implementation group: 'org.apache.spark', name: 'spark-mllib_2.12', version: '2.4.5'
    implementation group: 'log4j', name: 'log4j', version: '1.2.17'
    implementation group: 'org.scalaj', name: 'scalaj-http_2.12', version: '2.4.2'
}

Scala объект

object SparkModule {
    case class UDTCaseClass(a: Int = 0, b: Float = 0f, c: Int = 0, d: Int = 0)
    case class TableCaseClass(id: UUID, col1: Boolean, list: List[UDTCaseClass])

    val spark = SparkSession.builder
        .master("local[2]")
        .appName("App")
        .config("spark.cassandra.connection.host", "127.0.0.1")
        .config("spark.cassandra.connection.port", "9042")
        .config("spark.executor.cores", "1")
        .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    val cassandraRDD = sc.cassandraTable[TableCaseClass](
        "keyspace", "table"
    ).limit(20)

    println(cassandraRDD.count())
}

Сначала иногда ошибка показывалась, а иногда нет, пока я не сузил ее и не понял, что она показывает, когда любое из полей UDT имеет значение null, в противном случае все работает нормально. Например, если таблица содержит одну из следующих строк, возникает ошибка:

f39b5201-1e96-44a8-946 c -d959c217f174 | Ложь | [{a: 123, b: 2.3, c: 33, d: null }]
f39b5201-1e96-44a8-946 c -d959c217f174 | Ложь | [{a: 123, b: 2.3, c: null , d: 34}]
f39b5201-1e96-44a8-946 c -d959c217f174 | Ложь | [{a: 123, b: null , c: 33, d: 12}]
f39b5201-1e96-44a8-946 c -d959c217f174 | Ложь | [{a: null , b: 2.3, c: 33, d: 22}]

тогда как, например, этот:

f39b5201-1e96-44a8-946 c -d959c217f174 | Ложь | null

отлично читается cassandraTable.

Я пробовал использовать Option вот так: case class UDTCaseClass(a: Option[Int] = None, b: Option[Float] = None, c: Option[Int] = None, d: Option[Int] = None), но появляется та же ошибка.

Я всегда мог просто вставить 0 вместо null, но можно ли этого избежать?

Спасибо

1 Ответ

0 голосов
/ 08 мая 2020

Прекрасно работает с Spark 2.4.2 / Scala 2.12 и S CC 2.5.0.

Для следующих UDT / таблиц и данных:

CREATE TYPE test.udt (
  id int,
  t1 int,
  t2 int,
  a2 int
);

CREATE TABLE test.u3 (
    id int PRIMARY KEY,
    u list<frozen<udt>>
);
insert into test.u3(id, u) values (5, [{id: 1, t1: 3}]);

следующих Scala код работает нормально:

case class UDT(id: Int, t1: Int, t2: Option[Int], a2: Option[Int])
case class U3(id: Int, u: List[UDT])

import com.datastax.spark.connector._
val d = sc.cassandraTable[U3]("test", "u3")
d.collect

он возвращает: Array(U3(5,List(UDT(1,3,None,None)))) как и ожидалось.

Ваша ошибка может возникнуть из-за проблемы, из-за которой вы не перекомпилировали код или он каким-то образом кэширован. ..

PS, как я указал в комментарии, если вы только начинаете, вместо этого лучше использовать Dataframe API - он полностью поддерживается S CC.

...