У меня есть приложение, которое использует Spark (с Spark Job Server), которое использует хранилище Cassandra.Моя текущая настройка - режим client
, работающий с master=local[*]
.Таким образом, существует единственный исполнитель Spark, который также является процессом драйвера, который использует все 8 ядер машины.У меня есть экземпляр Cassandra, работающий на той же машине.
Таблицы Cassandra имеют первичный ключ формы ((datasource_id, date), clustering_col_1 ... clustering_col_n), где date - это один день формы "2019-02-07 "и является частью составного ключа раздела.
В моем приложении Spark я выполняю запрос, подобный следующему:
df.filter(col("date").isin(days: _*))
В физическом плане Spark яобратите внимание, что эти фильтры вместе с фильтром для ключа раздела «datasource_id» передаются в CQL-запрос Cassandra.
Для наших самых больших источников данных я знаю, что разделы имеют размер около 30 МБ.Таким образом, у меня есть следующий параметр в конфигурации сервера заданий Spark:
spark.cassandra.input.split.size_in_mb = 1
Однако я замечаю, что на этапе загрузки Cassandra нет распараллеливания.Хотя существует несколько разделов Cassandra размером более 1 МБ, дополнительные искровые разделы не создаются.Существует только одна задача, которая выполняет все запросы на одном ядре, поэтому для загрузки данных за 1-месячный диапазон дат, который соответствует ~ 1 миллиону строк, требуется ~ 20 секунд.
Я пробовал альтернативный подходниже:
df union days.foldLeft(df)((df: DataFrame, day: String) => {
df.filter(col("date").equalTo(day))
})
Это действительно создает спарк-раздел (или задачу) для каждого «дневного» раздела в Кассандре.Однако для небольших источников данных, где разделы кассандры намного меньше по размеру, этот метод оказывается довольно дорогим с точки зрения создания чрезмерных задач и накладных расходов из-за их координации.Для этих источников данных было бы совершенно нормально объединить много разделов кассандры в одну искровую часть.Поэтому я подумал, что использование конфигурации spark.cassandra.input.split.size_in_mb
окажется полезным при работе как с маленькими, так и с большими источниками данных.
Неужели мое понимание неверно?Есть ли что-то еще, что мне не хватает, чтобы эта конфигурация вступила в силу?
PS Я также прочитал ответы об использовании joinWithCassandraTable.Тем не менее, наш код опирается на использование DataFrame.Кроме того, преобразование из CassandraRDD в DataFrame для нас не очень целесообразно, поскольку наша схема является динамической и не может быть указана с использованием классов case.