joinWithCassandra
таблица извлекает значения разделов / первичных ключей из переданного RDD и преобразует их в отдельные запросы к разделам в Cassandra. Затем, в дополнение к этому, S CC может применить дополнительную фильтрацию, как будто вы where
условие. Если я правильно помню, но могу ошибаться, предел не будет полностью передан Cassandra - он все равно может извлекать limit
строк на каждый раздел.
Вы всегда можете проверить, где происходит соединение, выполнив result_rdd.toDebugString
. Для моего кода:
val df_for_join = Seq((2, 5),(5, 2)).toDF("col1", "col2")
val rdd_for_join = df_for_join.rdd
val result_rdd = rdd_for_join
.joinWithCassandraTable("test", "jt"
,selectedColumns = SomeColumns("col1","col2", "v")
,SomeColumns("col1", "col2")
).where("created_at >'2020-03-13T00:00:00Z' and created_at<= '2020-03-14T00:00:00Z'")
.limit(1)
это дает следующее:
scala> result_rdd.toDebugString
res7: String =
(2) CassandraJoinRDD[14] at RDD at CassandraRDD.scala:19 []
| MapPartitionsRDD[2] at rdd at <console>:45 []
| MapPartitionsRDD[1] at rdd at <console>:45 []
| ParallelCollectionRDD[0] at rdd at <console>:45 []
, в то время как при обычном соединении вы получите следующее:
scala> val rdd1 = sc.parallelize(Seq((2, 5),(5, 2)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:44
scala> val ct = sc.cassandraTable[(Int, Int)]("test", "jt").select("col1", "col2")
ct: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int)] = CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19
scala> rdd1.join(ct)
res15: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:49
scala> rdd1.join(ct).toDebugString
res16: String =
(6) MapPartitionsRDD[37] at join at <console>:49 []
| MapPartitionsRDD[36] at join at <console>:49 []
| CoGroupedRDD[35] at join at <console>:49 []
+-(3) ParallelCollectionRDD[21] at parallelize at <console>:44 []
+-(6) CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19 []
Более подробная информация доступна в соответствующем разделе документации S CC .