Включение распараллеливания в Spark с разделением Pushdown в MemSQL - PullRequest
0 голосов
/ 26 февраля 2019

У меня есть таблица columnstore в MemSQL, схема которой аналогична приведенной ниже:

CREATE TABLE key_metrics (
source_id TEXT,
date TEXT,
metric1 FLOAT,
metric2 FLOAT,
…
SHARD KEY (source_id, date) USING CLUSTERED COLUMNSTORE
);

У меня есть приложение Spark (работающее с Spark Job Server), которое запрашивает таблицу MemSQL.Ниже приведена упрощенная форма операции типа Dataframe, которую я выполняю (в Scala):

sparkSession
.read
.format(“com.memsql.spark.connector”)
.options( Map (“path” -> “dbName.key_metrics”))
.load()
.filter(col(“source_id”).equalTo(“12345678”)
.filter(col(“date”)).isin(Seq(“2019-02-01”, “2019-02-02”, “2019-02-03”))

Я на физическом плане подтвердил, что эти предикаты фильтра передаются в MemSQL.

Я также проверил, что в таблице довольно равномерное распределение разделов:

±--------------±----------------±-------------±-------±-----------+
| DATABASE_NAME | TABLE_NAME | PARTITION_ID | ROWS | MEMORY_USE |
±--------------±----------------±-------------±-------±-----------+
| dbName        | key_metrics |           0 | 784012 |        0 |
| dbName        | key_metrics |           1 | 778441 |        0 |
| dbName        | key_metrics |           2 | 671606 |        0 |
| dbName        | key_metrics |           3 | 748569 |        0 |
| dbName        | key_metrics |           4 | 622241 |        0 |
| dbName        | key_metrics |           5 | 739029 |        0 |
| dbName        | key_metrics |           6 | 955205 |        0 |
| dbName        | key_metrics |           7 | 751677 |        0 |
±--------------±----------------±-------------±-------±-----------+

Мой вопрос касается нажатия на разделы.Насколько я понимаю, с его помощью мы можем использовать все ядра машин и использовать параллелизм для массовой загрузки.Согласно документации, это делается путем создания столько задач Spark, сколько существует разделов базы данных MemSQL.

Однако при запуске конвейера Spark и наблюдении за пользовательским интерфейсом Spark кажется, что существует только одна задача Spark, котораясоздан, который делает один запрос к базе данных, которая работает на одном ядре.

Я убедился, что также установлены следующие свойства:

spark.memsql.disablePartitionPushdown = false
spark.memsql.defaultDatabase = “dbName”

Это мое пониманиеневерный раздел?Есть ли какая-то другая конфигурация, которую мне не хватает?

Буду признателен за ваш вклад в это.

Спасибо!

...