как разбить список на несколько разделов и отправить исполнителям - PullRequest
0 голосов
/ 04 февраля 2019

Когда мы используем spark для чтения данных из csv для БД следующим образом, он автоматически разбивает данные на несколько разделов и отправляет их исполнителям

spark
  .read
  .option("delimiter", ",")
  .option("header", "true")
  .option("mergeSchema", "true")
  .option("codec", properties.getProperty("sparkCodeC"))
  .format(properties.getProperty("fileFormat"))
  .load(inputFile)

В настоящее время у меня есть список идентификаторов как:

[1,2,3,4,5,6,7,8,9,...1000]

Что я хочу сделать, это разделить этот список на несколько разделов и отправить исполнителям, в каждом исполнителе запустить sql как

ids.foreach(id => {    
select * from table where id = id
})

Когда мы загружаем данные из Кассандры, соединительсгенерирует запрос sql следующим образом:

select columns from table where Token(k) >= ? and Token(k) <= ? 

это означает, что соединитель будет сканировать всю базу данных, практически, мне не нужно сканировать всю таблицу, я просто хочу получить все данные из таблицыгде k (ключ раздела) в списке идентификаторов.

схема таблицы в виде:

CREATE TABLE IF NOT EXISTS tab.events (
    k int,
    o text,
    event text
    PRIMARY KEY (k,o)
);

или как я могу использовать spark для загрузки данных из Кассандры, используя предварительно определенный оператор SQL без сканированиявесь стол?

1 Ответ

0 голосов
/ 04 февраля 2019

Вам просто нужно использовать функцию joinWithCassandra для выбора только тех данных, которые необходимы для вашей операции.Но имейте в виду, что эта функция доступна только через RDD API.

Примерно так:

val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")

Необходимо убедиться, что имя столбца в вашем DataFrame соответствует имени ключа раздела в Cassandra - для получения дополнительной информации см. Документацию.

Реализация DataFrame доступна только в DSE-версии Spark Cassandra Connector, как описано в после публикации в блоге .

...