Быстрые ответы
Разве Spark не использует оптимизацию Кассандры в своих запросах?
Да. Но с SparkSQL только сокращение столбца и предикаты нажатия. В СДР это руководство.
Как эффективно получить эту информацию?
Поскольку ваш запрос возвращается достаточно быстро, я бы просто использовал драйвер Java напрямую, чтобы получить этот набор результатов.
Длинные ответы
Несмотря на то, что Spark SQL может обеспечить некоторые оптимизации на основе C *, они обычно ограничиваются предикатами нажатия при использовании интерфейса DataFrame. Это связано с тем, что структура предоставляет только ограниченную информацию для источника данных. Мы можем убедиться в этом, выполнив объяснение в запросе, который вы написали.
Давайте начнем с примера SparkSQL
scala> spark.sql("SELECT DISTINCT key1, key2, key3 FROM test.tab").explain
== Physical Plan ==
*HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- Exchange hashpartitioning(key1#30, key2#31, key3#32, 200)
+- *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation test.tab[key1#30,key2#31,key3#32] ReadSchema: struct<key1:string,key2:string,key3:string>
Итак, ваш пример Spark будет разбит на несколько шагов.
- Сканирование: прочитать все данные из этой таблицы. Это означает сериализацию каждого значения от машины C до JVM Spark Executor, другими словами много работы.
- * HashAggregate / Exchange / Hash Aggregate: возьмите значения от каждого исполнителя, хэшируйте их локально, затем обменивайтесь данными между машинами и снова хешируйте, чтобы обеспечить уникальность. С точки зрения непрофессионала это означает создание больших хеш-структур, их сериализацию, запуск сложной распределенной сортировки, а затем запуск
снова хэш (Дорого)
Почему ничего из этого не перенесено в C *? Это связано с тем, что Datasource (в данном случае CassandraSourceRelation) не предоставляется информация о Distinct части запроса. Это только часть того, как в настоящее время работает Spark. Документы о том, что можно поставить
Так что насчет версии RDD?
С помощью RDDS мы даем Spark прямой набор инструкций. Это означает, что если вы хотите нажать что-то вниз, это должно быть , указанное вручную . Давайте посмотрим на отладочный вывод запроса RDD
scala> sc.cassandraTable("test","tab").distinct.toDebugString
res2: String =
(13) MapPartitionsRDD[7] at distinct at <console>:45 []
| ShuffledRDD[6] at distinct at <console>:45 []
+-(13) MapPartitionsRDD[5] at distinct at <console>:45 []
| CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:19 []
Здесь проблема в том, что ваш «отдельный» вызов является общей операцией для RDD и не относится к Cassandra. Поскольку RDD требуют, чтобы все оптимизации были явными (то, что вы печатаете, это то, что вы получаете), Cassandra никогда не слышит об этой необходимости в «Distinct», и мы получаем план, который почти идентичен нашей версии Spark SQL. Сделайте полное сканирование, сериализуйте все данные от Cassandra до Spark. Сделайте Shuffle и затем верните результаты.
Так что мы можем с этим поделать?
С SparkSQL это почти так же хорошо, как мы можем получить, не добавляя новые правила в Catalyst (Оптимизатор SparkSQL / Dataframes), чтобы дать ему понять, что Cassandra может обрабатывать некоторые отдельные вызовы на уровне сервера. Затем его необходимо реализовать для подклассов CassandraRDD.
Для СДР нам необходимо добавить функцию, подобную уже существующим where
, select
и limit
, вызовам СДР Cassandra. Новый Distinct
вызов может быть добавлен здесь , хотя это будет допустимо только в определенных ситуациях. Это функция, которая в настоящее время не существует в SCC, но может быть добавлена относительно легко, поскольку все, что она должна сделать, это добавить DISTINCT
к запросам и, возможно, добавить некоторую проверку, чтобы убедиться, что это DISTINCT
в этом есть смысл.
Что мы можем сделать прямо сейчас без изменения базового соединителя?
Поскольку нам известен точный запрос CQL, который мы хотели бы сделать, мы всегда можем напрямую использовать драйвер Cassandra для получения этой информации. Разъем Spark Cassandra предоставляет пул драйверов, который мы можем использовать, или мы могли бы просто использовать драйвер Java изначально. Чтобы использовать пул, мы бы сделали что-то вроде
import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(sc.getConf).withSessionDo{ session =>
session.execute("SELECT DISTINCT key1, key2, key3 FROM test.tab;").all()
}
А затем распараллелить результаты, если они необходимы для дальнейшей работы Spark. Если бы мы действительно хотели распространить это, то, скорее всего, было бы необходимо добавить функцию в Spark Cassandra Connector, как я описал выше.