ВЫБЕРИТЕ РАЗЛИЧНЫЕ Кассандры в Искре - PullRequest
0 голосов
/ 27 апреля 2018

Мне нужен запрос, в котором перечислены уникальные Составные ключи разделов внутри Spark.
Запрос в CASSANDRA: SELECT DISTINCT key1, key2, key3 FROM schema.table; довольно быстрый, однако использование фильтра данных того же типа в RDD или spark.sql приводит к невероятно медленным результатам в сравнении.

например.

---- SPARK ----
var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")

t1.count // takes 20 minutes
t2.count // takes 20 minutes

---- CASSANDRA ----
// takes < 1 minute while also printing out all results
SELECT DISTINCT key1, key2, key3 FROM schema.table; 

где формат таблицы:

CREATE TABLE schema.table (
    key1 text,
    key2 text,
    key3 text,
    ckey1 text,
    ckey2 text,
    v1 int,
    PRIMARY KEY ((key1, key2, key3), ckey1, ckey2)
);

Разве Spark не использует оптимизацию Кассандры в своих запросах?
Как эффективно получить эту информацию?

Ответы [ 3 ]

0 голосов
/ 27 апреля 2018

Быстрые ответы

Разве Spark не использует оптимизацию Кассандры в своих запросах?

Да. Но с SparkSQL только сокращение столбца и предикаты нажатия. В СДР это руководство.

Как эффективно получить эту информацию?

Поскольку ваш запрос возвращается достаточно быстро, я бы просто использовал драйвер Java напрямую, чтобы получить этот набор результатов.


Длинные ответы

Несмотря на то, что Spark SQL может обеспечить некоторые оптимизации на основе C *, они обычно ограничиваются предикатами нажатия при использовании интерфейса DataFrame. Это связано с тем, что структура предоставляет только ограниченную информацию для источника данных. Мы можем убедиться в этом, выполнив объяснение в запросе, который вы написали.

Давайте начнем с примера SparkSQL

scala> spark.sql("SELECT DISTINCT key1, key2, key3 FROM test.tab").explain
== Physical Plan ==
*HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- Exchange hashpartitioning(key1#30, key2#31, key3#32, 200)
   +- *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
      +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation test.tab[key1#30,key2#31,key3#32] ReadSchema: struct<key1:string,key2:string,key3:string>

Итак, ваш пример Spark будет разбит на несколько шагов.

  1. Сканирование: прочитать все данные из этой таблицы. Это означает сериализацию каждого значения от машины C до JVM Spark Executor, другими словами много работы.
  2. * HashAggregate / Exchange / Hash Aggregate: возьмите значения от каждого исполнителя, хэшируйте их локально, затем обменивайтесь данными между машинами и снова хешируйте, чтобы обеспечить уникальность. С точки зрения непрофессионала это означает создание больших хеш-структур, их сериализацию, запуск сложной распределенной сортировки, а затем запуск снова хэш (Дорого)

Почему ничего из этого не перенесено в C *? Это связано с тем, что Datasource (в данном случае CassandraSourceRelation) не предоставляется информация о Distinct части запроса. Это только часть того, как в настоящее время работает Spark. Документы о том, что можно поставить

Так что насчет версии RDD?

С помощью RDDS мы даем Spark прямой набор инструкций. Это означает, что если вы хотите нажать что-то вниз, это должно быть , указанное вручную . Давайте посмотрим на отладочный вывод запроса RDD

scala> sc.cassandraTable("test","tab").distinct.toDebugString
res2: String =
(13) MapPartitionsRDD[7] at distinct at <console>:45 []
 |   ShuffledRDD[6] at distinct at <console>:45 []
 +-(13) MapPartitionsRDD[5] at distinct at <console>:45 []
    |   CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:19 []

Здесь проблема в том, что ваш «отдельный» вызов является общей операцией для RDD и не относится к Cassandra. Поскольку RDD требуют, чтобы все оптимизации были явными (то, что вы печатаете, это то, что вы получаете), Cassandra никогда не слышит об этой необходимости в «Distinct», и мы получаем план, который почти идентичен нашей версии Spark SQL. Сделайте полное сканирование, сериализуйте все данные от Cassandra до Spark. Сделайте Shuffle и затем верните результаты.

Так что мы можем с этим поделать?

С SparkSQL это почти так же хорошо, как мы можем получить, не добавляя новые правила в Catalyst (Оптимизатор SparkSQL / Dataframes), чтобы дать ему понять, что Cassandra может обрабатывать некоторые отдельные вызовы на уровне сервера. Затем его необходимо реализовать для подклассов CassandraRDD.

Для СДР нам необходимо добавить функцию, подобную уже существующим where, select и limit, вызовам СДР Cassandra. Новый Distinct вызов может быть добавлен здесь , хотя это будет допустимо только в определенных ситуациях. Это функция, которая в настоящее время не существует в SCC, но может быть добавлена ​​относительно легко, поскольку все, что она должна сделать, это добавить DISTINCT к запросам и, возможно, добавить некоторую проверку, чтобы убедиться, что это DISTINCT в этом есть смысл.

Что мы можем сделать прямо сейчас без изменения базового соединителя?

Поскольку нам известен точный запрос CQL, который мы хотели бы сделать, мы всегда можем напрямую использовать драйвер Cassandra для получения этой информации. Разъем Spark Cassandra предоставляет пул драйверов, который мы можем использовать, или мы могли бы просто использовать драйвер Java изначально. Чтобы использовать пул, мы бы сделали что-то вроде

import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(sc.getConf).withSessionDo{ session => 
  session.execute("SELECT DISTINCT key1, key2, key3 FROM test.tab;").all()
}

А затем распараллелить результаты, если они необходимы для дальнейшей работы Spark. Если бы мы действительно хотели распространить это, то, скорее всего, было бы необходимо добавить функцию в Spark Cassandra Connector, как я описал выше.

0 голосов
/ 23 мая 2019

Пока мы выбираем ключ раздела, мы можем использовать функцию .perPartitionLimit CassandraRDD:

val partition_keys = sc.cassandraTable("schema","table").select("key1", "key2", "key3").perPartitionLimit(1)

Это работает, потому что, за SPARKC-436

select key from some_table per partition limit 1

дает тот же результат, что и

select distinct key from some_table

Эта функция была добавлена ​​в разъем для искрового кассандры 2.0.0-RC1 и требует не менее C * 3,6

0 голосов
/ 27 апреля 2018

Distinct имеет плохую производительность. Здесь есть хороший ответ с некоторыми альтернативами: Как эффективно выбирать отдельные строки в СДР на основе подмножества его столбцов`

Вы можете использовать toDebugString, чтобы иметь представление о том, сколько данных ваш код перемешивает.

...