Spark Cassandra оптимизированное соединение - PullRequest
0 голосов
/ 03 февраля 2020

Я хочу оптимизировать соединение RDD с Cassandra через Spark. Я пытаюсь прочитать данные и присоединиться к данным Кассандры. Я пытался использовать разъем dasastax cassandra для этого. Но это дает мне ошибку - Неверный размер строки: 6 вместо 4. Вот подробности

import com.datastax.spark.connector.cql.CassandraConnector
val ip15M = sqlContext.read.parquet("/home/hadoop/work/data").toDF();

ip15M.dtypes
res8: Array[(String, String)] = Array((key1,StringType),(key2,StringType), (key3,StringType), (column1,StringType),
(fact1,StringType),(fact2,StringType)

val joinWithRDD = ip15M.rdd.joinWithCassandraTable("key","tabl1").on(SomeColumns("key1","key2","key3","column1"))
joinWithRDD.take(10).foreach(println)

У меня есть следующая таблица Кассандры:

CREATE TABLE key.tabl1 (
key1 text,
key2 text,
key3 text,
column1 text,
value1 text,
value2 text,
PRIMARY KEY ((key1, key2, key3), column1)
) WITH CLUSTERING ORDER BY (column1 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99p';

Я получаю приведенная ниже ошибка

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9.0 (TID 332, mr25p01if-ingx03030701.mr.if.apple.com, executor 146): java.lang.IllegalArgumentException: requirement failed: Invalid row size: 6 instead of 4.

Я полагаю, что ошибка связана с тем, что в RDD 6 столбцов, а в таблице Cassandra 4 первичных ключа. Мне нужны столбцы фактов в СДР, поскольку мне нужно обновить значения, основанные на соединении. Я не уверен, как решить эту проблему.

Я попытался запустить с и без .on, но все еще та же ошибка. Исходя из того, что я вижу, «.on» предназначен для боковых столбцов Кассандры, а не для СДР

Дайте мне знать, нужны ли какие-либо другие входные данные

Обновления: если я создаю СДР с помощью распараллелить, соединение, кажется, работает. Кажется, когда я читаю файл и меняю его на RDD, он теряет схему

Любая помощь приветствуется

...