У меня есть задача спроектировать объединение новых данных в медленно изменяющуюся таблицу размеров в кусте с pyspark. Включение функции слияния кустов не является политически возможным вариантом.
Итак, я сделал это:
Данные хранятся в таблице, разделенной суррогатной версией. Существует поле бизнес-ключа, которое идентифицирует запись.
Изначально (версия = 0) старые данные хранятся в упорядоченном по ключевому полю ИЛИ C
Алгоритм:
1) получить ежедневное приращение из внешней таблицы, дедуплицировать его, преобразовать в ту же схему, что и старые данные
2) записать приращение, как указано в ключевом поле ИЛИ C файл в следующий раздел (версия 1)
3) прочитать всю таблицу (версия = 1 + версия = 2) как информационный кадр, создать из нее временное представление:
df= sqlContext.sql("""select {fields}, version from {table} order by {key_columns}""")
df.createOrReplaceTempView("view_union")
4) очистить ее с помощью
merge_df = sqlContext.sql("""
select {fields} from
(
select
{fields}, vers, max(version) over (partition by {key_columns} ) mvers
from
view_union) un2
where version=mvers
""")
5) записать merge_df в версию = 3
6) удалить разделы для версии = 0 и версии = 1
, что приводит к упорядочению по ключевому полю ИЛИ C, работает нормально. Из-за жирного порядка по обмен на функцию max отсутствует, и данные остаются отсортированными до тех пор, пока порядок сохранения * по ключевому полю значительно не уменьшит размер ИЛИ C
Но если я посмотрю на Spark Пользовательский интерфейс я вижу это:
Первый этап: размер ввода = целая таблица, без вывода, без случайной записи
Второй этап: размер ввода = целая таблица, без вывода, случайная запись = x2 входной размер (обычный)
Третий этап: случайное чтение = stage_2.shuffle_write, output = some_normal_value
Вопрос в том, какова цель первого этапа? можно ли этого избежать? У меня есть предположение, что это определить, как разделить по диапазону. Тогда, если я прав, есть ли способ намекнуть что-то напрямую разделителю диапазонов, чтобы избежать этого чтения? Я могу хранить любые метаданные о таблице и передавать в pyspark. Может быть, некоторые параметры таблицы улья будут работать?
Этапы
Детали запроса. Здесь число выходных строк в первом HiveTableScan удваивается