pyspark соединяется с длинной строкой - PullRequest
0 голосов
/ 09 апреля 2020

Я выполняю внутреннее соединение в script_0.py , которое я не могу найти в script_1.py . Я нашел 657 464 в script_0.py , тогда как я нашел 279 636 в script_1.py Внутреннее соединение используется в качестве ключа соединения длинная строка, такая как +1BTjyLpbUXhz0HVuTeWD/Hc379UCAyIH8KOyklYq54=, поэтому я подумал о проблеме кодирования, которая может привести к такому результату: при перезагрузке sample_df и card_id_dict['train'] может возникнуть несоответствие из-за проблемы с кодированием. Данные записываются в формате паркета, мгновенное сжатие.

###############
# script_0.py #
###############

sample_df = extended_df.dropDuplicates(JOIN_ON)

train_ext_df_1 = sample_df.filter(F.col("TXN_DTTM").between(time.DATES["sequence_extension"][0],
                                                            time.DATES["train"][1]))  # 9 + 2 months


train_trans_df_1 = (train_ext_df_1.join(card_id_dict['train'], on=['SCR_CUS_ACC_XID'],
                                    how='inner'))

train_export_1 = train_trans_df_1.select('SCR_CUS_ACC_XID').distinct().count()
print(f'train_export_1h: {train_export_1}')  # 657 464

if train_export_1 == 657464:
    (sample_df
        .write
        .format("parquet")
        .mode('overwrite')
        .save(OUTPUT)
        )

    print_log('\nJob Done!')


###############
# script_1.py #
###############

# load back sample_df
sample_df = spark.read.parquet(OUTPUT)

train_ext_df_1 = sample_df.filter(F.col("TXN_DTTM").between(time.DATES["sequence_extension"][0],
                                                            time.DATES["train"][1]))  # 9 + 2 months

# loading train card ids: replace card_id_dict['train'] by train_card_id_df = 657 464
TRAIN_CARD_ID = "train_card_id.parquet"
train_card_id_df = spark.read.parquet(root_path + TRAIN_CARD_ID)


train_trans_df_1 = (train_ext_df_1.join(train_card_id_df, on=['SCR_CUS_ACC_XID'],
                                        how='inner'))

train_export_1 = train_trans_df_1.select('SCR_CUS_ACC_XID').distinct().count()
print(f'train_export_1h: {train_export_1}')  # 657 464 (in script_0.py) # 279 636 (here)
...