Разделение данных при чтении из кустов / hdfs на основе значений столбцов в Spark - PullRequest
0 голосов
/ 08 октября 2018

У меня есть 2 кадра данных искры, которые я читаю из улья, используя sqlContext.Давайте назовем эти кадры данных как df1 и df2.Данные в обоих фреймах данных сортируются на Column, называемом PolicyNumber на уровне улья.PolicyNumber также является первичным ключом для обоих кадров данных.Ниже приведены примерные значения для обоих фреймов данных, хотя в действительности оба моих фрейма данных огромны и распределены по 5 исполнителям в виде 5 разделов. Для простоты я предполагаю, что в каждом разделе будет одна запись.

Sample df1 PolicyNumber FirstName 1 A 2 B 3 C 4 D 5 E

Sample df2 PolicyNumber PremiumAmount 1 450 2 890 3 345 4 563 5 2341

Теперь я хочу присоединиться к df1 и df2 в столбце PolicyNumber.Я могу запустить приведенный ниже фрагмент кода и получить требуемый результат:

df1.join(df2,df1.PolicyNumber=df2.PolicyNumber)

Теперь я хочу избежать как можно большего количества перемешивания, чтобы сделать это соединение эффективным.Поэтому, чтобы избежать случайного воспроизведения, при чтении из улья я хочу разбить df1 на основе значений PolicyNumber Column таким образом, чтобы строка с PolicyNumber 1 перешла на Executor 1, строка с PolicyNumber 2перейдет к Executor 2, строка с PolicyNumber 3 перейдет к Executor 3 и так далее.И я хочу разделить df2 точно так же, как я сделал для df1.

Таким образом, Executor 1 теперь будет иметь строку из df1 с PolicyNumber=1, а такжестрока из df2 с PolicyNumber=1.Аналогично, Executor 2 будет иметь строку из df1 с PolicyNumber=2, а также строку из df2 с PolicyNumber=2 и так далее.

Таким образом, не будет необходимости в перетасовке, как сейчас, данные локальны для этого исполнителя.

Мой вопрос, есть ли способ управления разделами в этой детализации?И если да, то как мне это сделать.

1 Ответ

0 голосов
/ 08 октября 2018

К сожалению, нет прямого контроля над данными, которые плавают в каждом исполнителе, однако, пока вы читаете данные в каждый фрейм данных, используйте столбец CLUSTER BY при соединении, который помогает сортировать данные, распределяемые по нужным исполнителям.

ex: 
df1 = sqlContext.sql("select * from CLSUTER BY JOIN_COLUMN")
df2 = sqlContext.sql("SELECT * FROM TABLE2 CLSUTER BY JOIN_COLUMN")

надеюсь, это поможет.

...