Разбиение Фрейма Данных в Pyspark с использованием Custom Partitioner - PullRequest
0 голосов
/ 13 октября 2018

Нужна информация об использовании пользовательского разделителя в Pyspark.У меня есть датафрейм, содержащий данные по странам для разных стран.Так что, если я сделаю перераспределение по столбцу страны, он распределит мои данные по n разделам и сохранит данные о странах по отдельным разделам.Это создает перекос данных раздела, когда я вижу, используя glom() метод.

В некоторых странах, таких как США и CHN, имеется огромное количество данных в конкретном фрейме данных.Я хочу перераспределить мой фрейм данных таким образом, чтобы, если страны были США и CHN, он далее разделился на 10 разделов, в противном случае разделы остались бы такими же для других стран, таких как IND, THA, AUS и т. Д. Можем ли мы расширить класс разделителей в коде Pyspark.

Я прочитал эту ссылку ниже, что мы можем расширить класс разделителя scala в приложении Spark Scala и изменить класс разделителя, чтобы использовать собственную логику для перераспределения наших данных на основе требований.Как и тот, который у меня есть .. пожалуйста, помогите найти решение в Pyspark. См. Ссылку ниже Какой эффективный способ разделения по столбцам, но с фиксированным числом разделов?


Я использую Spark версии 2.3.0.2 и ниже моя структура данных:

datadf= spark.sql("""
    SELECT    
        ID_NUMBER ,SENDER_NAME ,SENDER_ADDRESS ,REGION_CODE ,COUNTRY_CODE
    from udb.sometable
""");

Входящие данные имеют данные для шести стран, такие как AUS, IND,THA, RUS, CHN и USA.CHN и USA имеют перекос данных.

, поэтому, если я сделаю repartition для COUNTRY_CODE, два раздела содержат много данных, тогда как с другими все в порядке.Я проверил это, используя метод glom().

newdf = datadf.repartition("COUNTRY_CODE")

from pyspark.sql import SparkSession
from pyspark.sql import  HiveContext, DataFrameWriter, DataFrame

newDF = datadf.repartitionByRange(3,"COUNTRY_CODE","USA")

Я пытался перераспределить свои данные еще на 3 раздела для стран USA и CHN и хотел бы сохранить данные других стран в один раздел.

This is what I am expecting 
AUS- one partition
IND- one partition
THA- one partition
RUS- one partition
CHN- three partition
USA- three partition

Traceback (последний вызов был последним): файл "", строка 1, в файле "/ usr / hdp / current / spark2-client / python / pyspark / sql / dataframe.py ", строка 1182, в getattr " Объект "% s" не имеет атрибута "% s" "% (self. class . name , name)) AttributeError: у объекта «DataFrame» нет атрибута «repartitionByRange»

Ответы [ 2 ]

0 голосов
/ 28 марта 2019

В структурированном API нет настраиваемого разделителя, поэтому, чтобы использовать настраиваемый разделитель, вам нужно перейти к RDD API.3 простых шага:

  1. Преобразование структурированного API в API RDD
dataRDD = dataDF.rdd
Применение настраиваемого разделителя в RDD API
import random

# Extract key from Row object
dataRDD = dataRDD.map(lambda r: (r[0], r))

def partitioner(key):
    if key == "CHN":
        return random.randint(1, 10)
    elif key == "USA":
        return random.randint(11, 20)
    else:
        # distinctCountryDict is a dict mapping distinct countries to distinct integers
        # these distinct integers should not overlap with range(1, 20)
        return distinctCountryDict[key]

numPartitions = 100
dataRDD = dataRDD.partitionBy(numPartitions, partitioner)

# Remove key extracted previously
dataRDD = dataRDD.map(lambda r: r[1])
Преобразование RDD API обратно в структурированный API
dataDF = dataRDD.toDF()

Таким образом, вы получаете лучшее из обоих миров, типов Spark и оптимизированного физического плана в структурированном API, а также настраиваемый разделитель в низком уровне.RDD APIИ мы переходим к низкоуровневому API только тогда, когда это абсолютно необходимо.

0 голосов
/ 13 октября 2018

Попробуйте что-то вроде этого с хэшированием:

newDf = oldDf.repartition(N, $"col1", $"coln")

или для метода ранжирования:

newDF = oldDF.repartitionByRange(N, $"col1", $"coln")

Для DF пока нет пользовательских разделов.

ВВ твоем случае я бы пошел на хеширование, но нет никаких гарантий.

Но если ваши данные искажены, вам может потребоваться дополнительная работа, например, 2 столбца для разделения - это самый простой подход.

Например, существующий или новый столбец - в этом случае столбец, который применяет группирование к данной стране, например, 1 .. N, и разделение на два столбца.

Для стран с большим количеством групп вы получаете N синтетических подразделений;для других с низкой кардинальностью, только с 1 таким номером группы.Не слишком сложно.Оба разбиения могут занимать более 1 цв.

На мой взгляд, заполнение разделов с помощью единого номера требует больших усилий и не совсем достижимо, но следующего лучшего подхода, как здесь, может быть достаточно.Достигает настраиваемого разбиения до степени.

В противном случае, используя .withColumn на DF, вы можете смоделировать настраиваемое разбиение с этими правилами и заполнение нового столбца DF, а затем применить repartitionByRange.Тоже не так сложно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...