Как ускорить перетасовку после прочтения отсортированного или c таблицы в спарке - PullRequest
0 голосов
/ 03 марта 2020

У меня есть задача спроектировать объединение новых данных в медленно изменяющуюся таблицу размеров в кусте с pyspark. Включение функции слияния кустов не является политически возможным вариантом.

Итак, я сделал это:

Данные хранятся в таблице, разделенной суррогатной версией. Существует поле бизнес-ключа, которое идентифицирует запись.

Изначально (версия = 0) старые данные хранятся в упорядоченном по ключевому полю ИЛИ C

Алгоритм:

1) получить ежедневное приращение из внешней таблицы, дедуплицировать его, преобразовать в ту же схему, что и старые данные

2) записать приращение, как указано в ключевом поле ИЛИ C файл в следующий раздел (версия 1)

3) прочитать всю таблицу (версия = 1 + версия = 2) как информационный кадр, создать из нее временное представление:

df= sqlContext.sql("""select {fields}, version from   {table}  order by {key_columns}""")        
    df.createOrReplaceTempView("view_union")

4) очистить ее с помощью

merge_df = sqlContext.sql("""
        select  {fields} from 
         (
        select 
        {fields}, vers, max(version) over (partition by {key_columns} ) mvers
        from 
        view_union) un2
        where version=mvers        
        """)

5) записать merge_df в версию = 3

6) удалить разделы для версии = 0 и версии = 1

, что приводит к упорядочению по ключевому полю ИЛИ C, работает нормально. Из-за жирного порядка по обмен на функцию max отсутствует, и данные остаются отсортированными до тех пор, пока порядок сохранения * по ключевому полю значительно не уменьшит размер ИЛИ C

Но если я посмотрю на Spark Пользовательский интерфейс я вижу это:

Первый этап: размер ввода = целая таблица, без вывода, без случайной записи

Второй этап: размер ввода = целая таблица, без вывода, случайная запись = x2 входной размер (обычный)

Третий этап: случайное чтение = stage_2.shuffle_write, output = some_normal_value

Вопрос в том, какова цель первого этапа? можно ли этого избежать? У меня есть предположение, что это определить, как разделить по диапазону. Тогда, если я прав, есть ли способ намекнуть что-то напрямую разделителю диапазонов, чтобы избежать этого чтения? Я могу хранить любые метаданные о таблице и передавать в pyspark. Может быть, некоторые параметры таблицы улья будут работать?

Этапы

Детали запроса. Здесь число выходных строк в первом HiveTableScan удваивается

...