Spark RDD объединяется с Cassandra Table - PullRequest
0 голосов
/ 13 марта 2020

Я присоединяюсь к Spark RDD с Cassandra table (поиск), но не в состоянии понять некоторые вещи.

  1. Запустит все записи между range_start и range_end из Cassandra table, а затем присоединится это с RDD в искровой памяти, или он вытянет все значения из СДР в Кассандру и * 108 * выполнит соединение там
  2. Где будет применяться предел (1)? (Cassandra или Spark)
  3. Будет ли Spark всегда извлекать одинаковое количество записей из Cassandra независимо от того, какой предел применяется (1 или 1000)?

Код ниже:

//creating dataframe with fields required for join with cassandra table
//and converting same to rdd
val df_for_join = src_df.select(src_df("col1"),src_df("col2"))
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("my_keyspace", "my_table"
,selectedColumns = SomeColumns("col1","col2","col3","col4")
,SomeColumns("col1", "col2")
).where("created_at >''range_start'' and created_at<= range_end")
.clusteringOrder(Ascending).limit(1)

Подробности таблицы Кассандры -

PRIMARY KEY ((col1, col2), created_at) WITH CLUSTERING ORDER BY (created_at ASC)

1 Ответ

2 голосов
/ 13 марта 2020

joinWithCassandra таблица извлекает значения разделов / первичных ключей из переданного RDD и преобразует их в отдельные запросы к разделам в Cassandra. Затем, в дополнение к этому, S CC может применить дополнительную фильтрацию, как будто вы where условие. Если я правильно помню, но могу ошибаться, предел не будет полностью передан Cassandra - он все равно может извлекать limit строк на каждый раздел.

Вы всегда можете проверить, где происходит соединение, выполнив result_rdd.toDebugString. Для моего кода:

val df_for_join = Seq((2, 5),(5, 2)).toDF("col1", "col2")
val rdd_for_join = df_for_join.rdd

val result_rdd = rdd_for_join
.joinWithCassandraTable("test", "jt"
,selectedColumns = SomeColumns("col1","col2", "v")
,SomeColumns("col1", "col2")
).where("created_at >'2020-03-13T00:00:00Z' and created_at<= '2020-03-14T00:00:00Z'")
.limit(1)

это дает следующее:

scala> result_rdd.toDebugString
res7: String =
(2) CassandraJoinRDD[14] at RDD at CassandraRDD.scala:19 []
 |  MapPartitionsRDD[2] at rdd at <console>:45 []
 |  MapPartitionsRDD[1] at rdd at <console>:45 []
 |  ParallelCollectionRDD[0] at rdd at <console>:45 []

, в то время как при обычном соединении вы получите следующее:

scala> val rdd1 = sc.parallelize(Seq((2, 5),(5, 2)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:44
scala> val ct = sc.cassandraTable[(Int, Int)]("test", "jt").select("col1", "col2")
ct: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int)] = CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19

scala> rdd1.join(ct)
res15: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:49
scala> rdd1.join(ct).toDebugString
res16: String =
(6) MapPartitionsRDD[37] at join at <console>:49 []
 |  MapPartitionsRDD[36] at join at <console>:49 []
 |  CoGroupedRDD[35] at join at <console>:49 []
 +-(3) ParallelCollectionRDD[21] at parallelize at <console>:44 []
 +-(6) CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19 []

Более подробная информация доступна в соответствующем разделе документации S CC .

...