Улей Инкрементальный на Секционированном столе - PullRequest
0 голосов
/ 23 сентября 2018

Я работаю над внедрением пошагового процесса в таблицу кустов A;Таблица A - уже создана в кусте с разбивкой на YearMonth (столбец YYYYMM) с полным объемом.

В текущем режиме мы планируем импортировать обновления / вставки из источника и записывать их в Delta Table куста;

, как показано на рисунке ниже, таблица Delta указывает, что новые обновления относятся к разделам (201804/201611/201705).

Для инкрементного процесса я планирую

  1. Выберите 3 раздела из исходной таблицы, на которые влияют.

INSERT INTO delta2 выберите YYYYMM из таблицы, где YYYYMM in (выберите отличный YYYYMM из Delta);

Объединить эти 3 раздела из таблицы Delta с соответствующими разделами из исходной таблицы.(Я могу следовать 4-шаговой стратегии Хортона, чтобы применить обновления)

    Merge Delta2 + Delta : = new 3 partitions.

Удалить 3 раздела из исходной таблицы

Alter Table Drop partitions 201804 / 201611 / 201705

Добавить вновь объединенные разделы обратно в исходную таблицу (с новыми обновлениями)

Мне нужно автоматизировать эти сценарии - Можете ли вы предложить, как поставить выше логику в кусте QL или искре - Определить разделыи выкиньте их из оригинальной таблицы.

enter image description here

1 Ответ

0 голосов
/ 26 декабря 2018

вы можете создать решение, используя pyspark.Я объясняю этот подход на некотором базовом примере.вы можете изменить его в соответствии с вашими бизнес-требованиями.

Предположим, у вас есть секционированная таблица в кусте ниже конфигурации.

CREATE TABLE IF NOT EXISTS udb.emp_partition_Load_tbl (
 emp_id                 smallint
,emp_name               VARCHAR(30)
,emp_city               VARCHAR(10)
,emp_dept               VARCHAR(30)
,emp_salary             BIGINT
)
PARTITIONED BY (Year String, Month String)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS ORC;

и у вас есть некоторый CSV-файл с некоторыми входными записями, которые выхотите загрузить в вашу многораздельную таблицу

1|vikrant singh rana|Gurgaon|Information Technology|20000

dataframe = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "false") \
  .option("inferschema", "true") \
  .schema(userschema) \
  .option("delimiter", "|").load("file:///filelocation/userinput")

newdf = dataframe.withColumn('year', lit('2018')).withColumn('month',lit('01'))

+------+------------------+--------+----------------------+----------+----+-----+
|emp-id|emp-name          |emp-city|emp-department        |emp-salary|year|month|
+------+------------------+--------+----------------------+----------+----+-----+
|1     |vikrant singh rana|Gurgaon |Information Technology|20000     |2018|01   |
+------+------------------+--------+----------------------+----------+----+-----+

, указав нижеприведенные свойства, чтобы перезаписывать только данные определенных разделов.

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition=true");
spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict");

newdf.write.format('orc').mode("overwrite").insertInto('udb.emp_partition_Load_tbl')

Допустим, вы получили другой набор данных и хотите вставить в некоторые другиеразделы

+------+--------+--------+--------------+----------+----+-----+
|emp-id|emp-name|emp-city|emp-department|emp-salary|year|month|
+------+--------+--------+--------------+----------+----+-----+
|     2|     ABC| Gurgaon|HUMAN RESOURCE|     10000|2018|   02|
+------+--------+--------+--------------+----------+----+-----+
newdf.write.format('orc').mode("overwrite").insertInto('udb.emp_partition_Load_tbl')

> show partitions udb.emp_partition_Load_tbl;
+---------------------+--+
|      partition      |
+---------------------+--+
| year=2018/month=01  |
| year=2018/month=02  |
+---------------------+--+

при условии, что у вас есть другой набор записей, относящихся к существующему разделу.

3|XYZ|Gurgaon|HUMAN RESOURCE|80000

newdf = dataframe.withColumn('year', lit('2018')).withColumn('month',lit('02'))
+------+--------+--------+--------------+----------+----+-----+
|emp-id|emp-name|emp-city|emp-department|emp-salary|year|month|
+------+--------+--------+--------------+----------+----+-----+
|     3|     XYZ| Gurgaon|HUMAN RESOURCE|     80000|2018|   02|
+------+--------+--------+--------------+----------+----+-----+

newdf.write.format('orc').mode("overwrite").insertInto('udb.emp_partition_Load_tbl')


 select * from udb.emp_partition_Load_tbl where year ='2018' and month ='02';
+---------+-----------+-----------+-----------------+-------------+-------+--------+--+
| emp_id  | emp_name  | emp_city  |    emp_dept     | emp_salary  | year  | month  |
+---------+-----------+-----------+-----------------+-------------+-------+--------+--+
| 3       | XYZ       | Gurgaon   | HUMAN RESOURCE  | 80000       | 2018  | 02     |
| 2       | ABC       | Gurgaon   | HUMAN RESOURCE  | 10000       | 2018  | 02     |
+---------+-----------+-----------+-----------------+-------------+-------+--------+--+

ниже вы можете видеть, что другие данные о разделах не были затронуты.

> select * from udb.emp_partition_Load_tbl where year ='2018' and month ='01';

+---------+---------------------+-----------+-------------------------+-------------+-------+--------+--+
| emp_id  |      emp_name       | emp_city  |        emp_dept         | emp_salary  | year  | month  |
+---------+---------------------+-----------+-------------------------+-------------+-------+--------+--+
| 1       | vikrant singh rana  | Gurgaon   | Information Technology  | 20000       | 2018  | 01     |
+---------+---------------------+-----------+-------------------------+-------------+-------+--------+--+
...