Я пытаюсь получить первое ненулевое значение в кадре данных, которое должно быть сгруппировано по определенному ключу и отсортировано по другому ключу.
Я пробовал 2 подхода;первый создает оконную функцию и для каждого столбца выполняется F.first (column_name, True) .over (ws).Это оказывается ужасно медленным, если количество столбцов велико.Я говорю 15 минут для 20 столбцов и 6 строк.
Второй подход заключается в вызове reduByKey для RDD и выполнении сравнения в функции сокращения, которая страдает от несколько меньшего, но все же значительного снижения производительности.Это также не так красиво.
Версия spark - 2.3.1
Использование windows:
PK = FileColumn.id_column_for(entity)
ws = Window.partitionBy(df[PK]) \
.orderBy(df[sort_value].desc())
columns = [c for c in sorted(df.columns) if c != PK]
for column in columns:
df = df.withColumn('first:{}'.format(column), func.first(column, True).over(ws))
df = df.drop(*columns)
df = df.dropDuplicates()
for column in columns:
df = df.withColumnRenamed('first:{}'.format(column), column)
План выполнения для этого выполняет большое количество оконных функций, по одной для каждого столбца.
Это версия, использующая reduByKey
rdd = mergeable_records.rdd.map(lambda x: (x[key_column], x))
def merge_record(r1, r2):
values = {}
if r1['sort_value'] > r2['sort_value']:
primary_record, secondary_record = r1, r2
else:
primary_record, secondary_record = r2, r1
for column in columns:
values[column] = primary_record[column] \
if primary_record[column].isNotNull() \
else secondary_record[column]
return Row(**values)
rdd = rdd.reduceByKey(merge_record)
rdd = rdd.map(lambda record: record[1])
Мне нужно найти эффективный способ группировки этих записей вместе.Функционально он работает нормально, но при высокой производительности.Что мне здесь не хватает?