Отображение UUID в разъеме Spark Cassandra - PullRequest
0 голосов
/ 12 декабря 2018

У меня есть следующий код для сохранения СДР на Кассандре:

 JavaRDD<UserByID> mapped = ......

CassandraJavaUtil.javaFunctions(mapped)
.writerBuilder("mykeyspace", "user_by_id", mapToRow(UserByID.class)).saveToCassandra();

И UserByID - это обычный сериализуемый POJO со следующей переменной с геттерами и сеттерами

private UUID userid;

Кассандратаблица имеет точно такие же имена переменных класса UserByID, а идентификатор пользователя имеет тип uuid в таблице Cassandra, я успешно загружаю данные из таблицы с использованием того же сопоставления классов.

CassandraJavaRDD<UserByID> UserByIDRDD = javaFunctions(spark)
 .cassandraTable("mykeyspace", "user_by_id", mapRowTo(UserByID.class));

однако, когда я вызываюsaveToCassandra выше, я получаю следующее исключение:

org.apache.spark.SparkException: Job aborted due to stage failure: Task
0 in stage 227.0 failed 1 times, most recent failure: Lost task 0.0
in stage 227.0 (TID 12721, localhost, executor driver): 
java.lang.IllegalArgumentException: 
The value (4e22e71a-a387-4de8-baf1-0ef6e65fe33e) of the type 
(java.util.UUID) cannot be converted to 
struct<leastSignificantBits:bigint,mostSignificantBits:bigint> 

Чтобы решить проблему, я зарегистрировал кодек UUID, но это не помогло, я использую spark-cassandra-connector_2.11 версию 2.4.0 ита же версия для spark-core_2.11 есть предложения?

моя ссылка здесь , но в ней нет примера Java UUID, ваша помощь приветствуется.

1 Ответ

0 голосов
/ 28 декабря 2018

Это действительно странная ошибка - она ​​прекрасно работает с разъемом 2.4.0 и Spark 2.2.1 со следующим примером:

Определение таблицы:

CREATE TABLE test.utest (
    id int PRIMARY KEY,
    u uuid
);

Класс POJO :

public class UUIDData {
    private UUID u;
    private int id;
    ...
    // getters/setters
};

Spark job :

public static void main(String[] args) {
    SparkSession spark = SparkSession
            .builder()
            .appName("UUIDTest")
            .getOrCreate();

    CassandraJavaRDD<UUIDData> uuids = javaFunctions(spark.sparkContext())
            .cassandraTable("test", "utest", mapRowTo(UUIDData.class));

    JavaRDD<UUIDData> uuids2 = uuids.map(x -> new UUIDData(x.getId() + 10, x.getU()));

    CassandraJavaUtil.javaFunctions(uuids2)
            .writerBuilder("test", "utest", mapToRow(UUIDData.class))
            .saveToCassandra();
}

Я заметил, что в вашем коде вы используете функции mapRowTo и mapToRowбез вызова .class в POJO - вы уверены, что ваш код скомпилирован, и вы не запускаете какую-либо старую версию кода?

...