Разобрать Spark RDD после присоединения Кассандры - PullRequest
0 голосов
/ 07 февраля 2020

Я хочу прочитать файл паркета и присоединиться к некоторым колонкам в Кассандре. Мне удалось присоединиться и получить СДР, но я не могу разобрать полученную СДР. Вот дополнительные сведения

case class IP (key: String, key2: String,key3: String,key4: String,key5: String,key6: String,key7: String,key8: String,key9: String,key10: String,key11: String,key12: String,key13: String,key14: String,key15: String,column1:String,column2:String,column3:String,column4:String,column5:String,value1:String)

val a = cs_cube2_2_6.rdd.map(p => IP(p(0).toString, p(1).toString, p(2).toString, p(3).toString, p(4).toString, p(5).toString, p(6).toString, p(7).toString, p(8).toString, p(9).toString, p(10).toString, p(11).toString, p(12).toString, p(13).toString, p(14).toString, p(15).toString, p(16).toString, p(17).toString, p(18).toString, p(19).toString, p(20).toString))


val joinWithRDD = a.joinWithCassandraTable("key","tbl").on(SomeColumns("key","key2","key3","key4","key5","key6","key7","key8","key9","key10","key11","key12","key13","key14","key15")).select("value1")

scala> joinWithRDD: com.datastax.spark.connector.rdd.CassandraJoinRDD[IP,com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[15] at RDD at CassandraRDD.scala:19

Результирующая схема RDD показана выше. Вывод RDD выглядит примерно так:

(IP(2_2_6,AA,FF,14-12-07 23,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,3580),CassandraRow{value1: 3580})

(IP(2_2_6,BB,GG,143,2019-12-07 00,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,154),CassandraRow{value1: 154})

Я не уверен, как анализировать этот RDD. Я хочу подвести итог последнего столбца по IP и столбца value1 из строки Кассандры.

Дайте мне знать, если понадобятся какие-либо дополнительные сведения. И оцените помощь

Вот схема таблицы Cassandra

CREATE TABLE aa.tbl (
key text,
key2 text,
key3 text,
key4 text,
key5 text,
key6 text,
key7 text,
key8 text,
key9 text,
key10 text,
key11 text,
key12 text,
key13 text,
key14 text,
key15 text,
column1 text,
column2 text,
column3 text,
column4 text,
column5 text,
value1 text,
value2 blob,
value3 text,
PRIMARY KEY ((key, key2, key3, key4, key5, key6, key7, key8, key9, key10, key11, key12, key13, key14, key15), column1, column2, column3, column4, column5)
) WITH CLUSTERING ORDER BY (column1 ASC, column2 ASC, column3 ASC, column4 ASC, column5 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99p';

Ответы [ 2 ]

1 голос
/ 07 февраля 2020

Вам нужно сделать что-то вроде этого (код не проверялся, но он должен работать с небольшими корректировками - я предположил, что value1 имеет целочисленный тип в Кассандре):

joinWithRDD.map { case (ip, row) => 
   val newVal = ip.value1.toInteger + row.getInt("value1")
   IP(ip.key, key2, .... newVal.toString)
}

joinWithCassandraTable возвращает кортеж ваших данных как _1, а данные, найденные в Кассандре, как _2. При доступе к данным Cassandra вы можете использовать функции получения getInt, getString, et c., Или вы можете отобразить Row в класс дел, как описано в документации для Spark Cassandra Connector .

0 голосов
/ 08 февраля 2020

Я смог сделать эту работу. Благодаря вкладу от Алекса

val b = joinWithRDD.map { case (ip, row) => IP(ip.key, ip.key2, ip.key3,ip.key4,ip.key5,ip.key6,ip.key7,ip.key8,ip.key9,ip.key10,ip.key11,ip.key12,ip.key13,ip.key14,ip.key15,ip.column1,ip.column2,ip.column3,ip.column4,ip.column5,ip.value1 + row.getString("value1")) }
...