Получение первого ненулевого значения в сгруппированном кадре данных детерминистическим способом - PullRequest
0 голосов
/ 11 апреля 2019

Я пытаюсь получить первое ненулевое значение в кадре данных, которое должно быть сгруппировано по определенному ключу и отсортировано по другому ключу.

Я пробовал 2 подхода;первый создает оконную функцию и для каждого столбца выполняется F.first (column_name, True) .over (ws).Это оказывается ужасно медленным, если количество столбцов велико.Я говорю 15 минут для 20 столбцов и 6 строк.

Второй подход заключается в вызове reduByKey для RDD и выполнении сравнения в функции сокращения, которая страдает от несколько меньшего, но все же значительного снижения производительности.Это также не так красиво.

Версия spark - 2.3.1

Использование windows:

PK = FileColumn.id_column_for(entity)

ws = Window.partitionBy(df[PK]) \
    .orderBy(df[sort_value].desc())

columns = [c for c in sorted(df.columns) if c != PK]

for column in columns:
    df = df.withColumn('first:{}'.format(column), func.first(column, True).over(ws))

df = df.drop(*columns)
df = df.dropDuplicates()

for column in columns:
    df = df.withColumnRenamed('first:{}'.format(column), column)

План выполнения для этого выполняет большое количество оконных функций, по одной для каждого столбца.

Это версия, использующая reduByKey

rdd = mergeable_records.rdd.map(lambda x: (x[key_column], x))

def merge_record(r1, r2):
    values = {}

    if r1['sort_value'] > r2['sort_value']:
        primary_record, secondary_record = r1, r2
    else:
        primary_record, secondary_record = r2, r1

    for column in columns:
        values[column] = primary_record[column] \
        if primary_record[column].isNotNull() \
        else secondary_record[column]
    return Row(**values)

rdd = rdd.reduceByKey(merge_record)
rdd = rdd.map(lambda record: record[1])

Мне нужно найти эффективный способ группировки этих записей вместе.Функционально он работает нормально, но при высокой производительности.Что мне здесь не хватает?

...