pyspark - режим перезаписи в паркете удаляет другие разделы - PullRequest
1 голос
/ 24 января 2020

Я использую pyspark для перезаписи паркетных перегородок в корзине s3. Ниже показано, как выглядят мои разделенные папки:

parent_folder
      -> year=2019 
            -->month=1
                ---->date=2019-01-01
                ---->date=2019-01-02
            -->month=2
                 ........
      -> year=2020
            -->month=1
                    ---->date=2020-01-01
                    ---->date=2020-01-02
            -->month=2
                    ........

Теперь, когда я запускаю сценарий spark, которому нужно перезаписать только указанные c разделы, используя строку ниже, скажем, разделы для year = 2020 и month = 1 и даты = 2020-01-01 и 2020-01-02:

df_final.write.partitionBy([["year","month","date"]"]).mode("overwrite").format("parquet").save(output_dir_path)

В приведенной выше строке удаляются все другие разделы и записываются данные, которые присутствуют только в конечном кадре данных - df_final . Я также установил динамическую c модель перезаписи, используя ниже, но, похоже, не работает:

conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Мои вопросы, есть ли способ перезаписать только определенные c разделы (более один ) . Любая помощь будет высоко ценится. Заранее спасибо.

1 Ответ

0 голосов
/ 25 января 2020

Полагаю, вы ищете решение, в котором пользователь может вставить и перезаписать существующий раздел в паркетной таблице, используя spark sql, и надеяться, что в конце паркет будет ссылаться на таблицу разделенных кустов.

Вы можете создать Искры sql контекст с включением поддержки улья к нему, ниже шаг для того же, этот - что-то на точном коде, но как код sudo для того же самого,

spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

sqlCtx = SQLContext(spark)

try:
   df = spark.read.parquet("filename.parquet")
   df.createOrReplaceTempView("temp")

   insertQuery = """INSERT OVERWRITE INTO TABLE {}.{} PARTITION (part1, part2) \
                    SELECT *
                    FROM temp a""".format(hiveSchema, hiveTable)
   sqlCtx.sql(insertQuery)
except:
   logger.error("Error while loading data into table..")
   exit()

Я написал пример один для того же самого. Ниже приведена функция перезаписи вставки из улья, которую вы можете выбрать для себя:

Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;
FROM from_statement
INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2]
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] ...;

Hive extension (dynamic partition inserts):
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;
INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...