Как создать несколько задач Spark для запроса разделов Cassandra - PullRequest
0 голосов
/ 07 февраля 2019

У меня есть приложение, которое использует Spark (с Spark Job Server), которое использует хранилище Cassandra.Моя текущая настройка - режим client, работающий с master=local[*].Таким образом, существует единственный исполнитель Spark, который также является процессом драйвера, который использует все 8 ядер машины.У меня есть экземпляр Cassandra, работающий на той же машине.

Таблицы Cassandra имеют первичный ключ формы ((datasource_id, date), clustering_col_1 ... clustering_col_n), где date - это один день формы "2019-02-07 "и является частью составного ключа раздела.

В моем приложении Spark я выполняю запрос, подобный следующему:

df.filter(col("date").isin(days: _*))

В физическом плане Spark яобратите внимание, что эти фильтры вместе с фильтром для ключа раздела «datasource_id» передаются в CQL-запрос Cassandra.

Для наших самых больших источников данных я знаю, что разделы имеют размер около 30 МБ.Таким образом, у меня есть следующий параметр в конфигурации сервера заданий Spark:

spark.cassandra.input.split.size_in_mb = 1

Однако я замечаю, что на этапе загрузки Cassandra нет распараллеливания.Хотя существует несколько разделов Cassandra размером более 1 МБ, дополнительные искровые разделы не создаются.Существует только одна задача, которая выполняет все запросы на одном ядре, поэтому для загрузки данных за 1-месячный диапазон дат, который соответствует ~ 1 миллиону строк, требуется ~ 20 секунд.

Я пробовал альтернативный подходниже:

  df union days.foldLeft(df)((df: DataFrame, day: String) => {
    df.filter(col("date").equalTo(day))
  })

Это действительно создает спарк-раздел (или задачу) для каждого «дневного» раздела в Кассандре.Однако для небольших источников данных, где разделы кассандры намного меньше по размеру, этот метод оказывается довольно дорогим с точки зрения создания чрезмерных задач и накладных расходов из-за их координации.Для этих источников данных было бы совершенно нормально объединить много разделов кассандры в одну искровую часть.Поэтому я подумал, что использование конфигурации spark.cassandra.input.split.size_in_mb окажется полезным при работе как с маленькими, так и с большими источниками данных.

Неужели мое понимание неверно?Есть ли что-то еще, что мне не хватает, чтобы эта конфигурация вступила в силу?

PS Я также прочитал ответы об использовании joinWithCassandraTable.Тем не менее, наш код опирается на использование DataFrame.Кроме того, преобразование из CassandraRDD в DataFrame для нас не очень целесообразно, поскольку наша схема является динамической и не может быть указана с использованием классов case.

...