Различное поведение метода кэширования для фреймов данных PySpark в Spark 2.3 - PullRequest
0 голосов
/ 21 ноября 2018

После обновления Spark с 2.1 до 2.3 у меня возникли проблемы с кэшированными фреймами данных PySpark.В Spark 2.1 метод cache () работал для меня как глубокое копирование, хотя он не должен работать так, как описано в документации.

Пример:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window 

sparkSession = (SparkSession
               .builder
               .appName('process_name')
               .enableHiveSupport()
               .getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl") 
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl") 
delta = src_tbl.subtract(dst_tbl)  # find the difference 

# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records) 
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number(). \                                        
  over(Window.partitionBy(join_field). \                                                    
  orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records    

# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl  + 
                    SELECT * FROM snpsht_table_name_tmp""")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count()  # 0 records     

inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl  + 
                    SELECT * FROM inc_table_name_tmp""")

Это минимальный пример для решения моей проблемы.

А теперь в Spark 2.1 inc_tbl был просто сохранен в inc_tbl со всеми новыми записями (с текущего дня) с данными, которые были там в момент использования метода кэширования, и эточто я хочу иметьВ Spark 2.3 есть что-то, что снова вычисляет все преобразования с начала, поэтому проверяет, что в таблице snpsht_tbl уже есть записи с текущей даты, поэтому просто вставляет записи, которые были там до обработки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...