Я выполняю внутреннее соединение в script_0.py , которое я не могу найти в script_1.py . Я нашел 657 464 в script_0.py , тогда как я нашел 279 636 в script_1.py Внутреннее соединение используется в качестве ключа соединения длинная строка, такая как +1BTjyLpbUXhz0HVuTeWD/Hc379UCAyIH8KOyklYq54=
, поэтому я подумал о проблеме кодирования, которая может привести к такому результату: при перезагрузке sample_df
и card_id_dict['train']
может возникнуть несоответствие из-за проблемы с кодированием. Данные записываются в формате паркета, мгновенное сжатие.
###############
# script_0.py #
###############
sample_df = extended_df.dropDuplicates(JOIN_ON)
train_ext_df_1 = sample_df.filter(F.col("TXN_DTTM").between(time.DATES["sequence_extension"][0],
time.DATES["train"][1])) # 9 + 2 months
train_trans_df_1 = (train_ext_df_1.join(card_id_dict['train'], on=['SCR_CUS_ACC_XID'],
how='inner'))
train_export_1 = train_trans_df_1.select('SCR_CUS_ACC_XID').distinct().count()
print(f'train_export_1h: {train_export_1}') # 657 464
if train_export_1 == 657464:
(sample_df
.write
.format("parquet")
.mode('overwrite')
.save(OUTPUT)
)
print_log('\nJob Done!')
###############
# script_1.py #
###############
# load back sample_df
sample_df = spark.read.parquet(OUTPUT)
train_ext_df_1 = sample_df.filter(F.col("TXN_DTTM").between(time.DATES["sequence_extension"][0],
time.DATES["train"][1])) # 9 + 2 months
# loading train card ids: replace card_id_dict['train'] by train_card_id_df = 657 464
TRAIN_CARD_ID = "train_card_id.parquet"
train_card_id_df = spark.read.parquet(root_path + TRAIN_CARD_ID)
train_trans_df_1 = (train_ext_df_1.join(train_card_id_df, on=['SCR_CUS_ACC_XID'],
how='inner'))
train_export_1 = train_trans_df_1.select('SCR_CUS_ACC_XID').distinct().count()
print(f'train_export_1h: {train_export_1}') # 657 464 (in script_0.py) # 279 636 (here)