Как прочитать таблицу на scala spark, используя partitionColumn типа данных varchar? - PullRequest
0 голосов
/ 24 января 2020

Можно ли дать раздел Столбец типа данных varchar? Таблица, которую я хочу прочитать, не имеет первичного ключа, и все столбцы имеют тип данных varchar. Есть ли способ прочитать из jdb c с partitionColumn как тип varchar?

var finaldataframe = spark.read.format("jdbc")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("database",db)
.option("url", url)
.option("dbtable", table_name)
.option("numPartitions", partitions)
.option("partitionColumn", pm_key)
.option("lowerbound", w_minLogID)
.option("upperbound", w_maxLogID)
.load() 

В документах spark https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html сказано, что:

partitionColumn должен быть числовым c, столбцом даты или метки времени из рассматриваемой таблицы.

Но есть ли какое-то решение этой проблемы?

1 Ответ

0 голосов
/ 24 января 2020

TL; DR Это необязательное поле для повышения производительности. Если ваш набор данных маленький, вы можете пропустить его. Аналогичные функциональные возможности могут быть достигнуты с помощью аргумента predicates: Array[String] для строковых столбцов.

val connectionProperties = new Properties()
connectionProperties.put("database", db)
connectionProperties.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")

var finaldataframe = spark.read.jdbc(
    url = url,
    table = table_name,
    predicates = Array[String]("columnA like 'A%'", "columnB like 'B%'" )
    connectionProperties = connectionProperties
)

Чтобы понять, почему для partitionColumn допускается только столбец, похожий на цифру c, посмотрите, как спарк читает из базы данных.

По умолчанию spark будет использовать одного исполнителя для подключения к базе данных, выполнения запроса и начала извлечения результатов. Этот подход не подходит, если вы хотите извлечь большие наборы данных из базы данных, потому что

  • только один исполнитель извлекает данные.
  • Извлеченные данные должны быть перераспределены / перетасованы другим исполнителям.

Это можно улучшить, если несколько исполнителей извлекают часть данных из базы данных параллельно. Предположим, что вы хотите выполнить запрос select id, prod_name from products, и эта таблица содержит миллион строк.
Вы бы указали spark numPartitions как 4, partitionColumn как id, что является целым числом, lowerbound 1 и upperbound as 1000000.
Теперь spark рассчитает, что каждый исполнитель должен извлечь 250K записей.

(верхний - нижний +1) / numPartitions
= (1000000 - 1 +1 ) / 4
= 250000

С помощью этого первый исполнитель запустит запрос

select * 
from 
  (
    select id, prod_name 
    from products
  ) 
where id >= 1 and id<=250000

от второго исполнителя

select * 
from 
  (
    select id, prod_name 
    from products
  ) 
where id >= 250001 and id<=500000

и т. Д. , Это работает только в том случае, если paritionColumns имеет естественный порядок, который можно использовать для вычисления разделов (в данном случае (1, 250000), (250001, 500000), (500001, 750000) и (750001, 1000000)). Эти разделы используются для вычисления предикатов. Этот подход нельзя использовать со строковыми столбцами, поскольку между строковыми значениями нет естественного упорядочения.

Эту проблему можно преодолеть, если пользователь предоставляет предикаты напрямую, и спарк не должен ничего вычислять. Например, если вы хотите выполнить запрос select name, score from people и указать предикаты Array("name like 'a%'", "name like 'b%'"), тогда spark запустит запросы

select *
from 
  (
    select name, score 
    from people
  ) where "name like 'a%'"

и

select *
from 
  (
    select name, score 
    from people
  ) where "name like 'b%'"
...