Я искал целую вечность, пытаясь выяснить, в чем здесь проблема.
У меня есть фрейм данных pyspark и разбить его следующим образом:
data.registerTempTable('data')
query = """
SELECT *
From data
DISTRIBUTE BY id1, id2, id3
SORT BY id1, id2, id3, date
"""
data = self.sql(query, store=False)
Затем я хочу применить некоторыекод для разделов,
data_list = data.rdd.mapPartitions(lambda x:func_1(form_set(x)))
Во внутренней функции я делаю
def form_set(partition):
tracker_index =0
for row in partition:
tracker_index+=1
##some actions##
print("id1:{}, id2:{}, id3:{}, ind:{}".format(id1,id2,id3,tracker_index)
return partition
Затем внешняя функция, которая просто принимает выходные данные form_set
def func_1(partition):
#actions#
return partition
Действия в функциях основаны на данных, находящихся в правильном разделе, то есть все данные с одинаковыми идентификаторами id1, id2 и id3 должны находиться в одном разделе.Однако, когда я смотрю журналы печати, они показывают
id1:1, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:1
id1:2, id2:2, id3:3, ind:2
id1:1, id2:2, id3:3, ind:1
id1:2, id2:2, id3:3, ind:2
, тогда как я ожидал, что это будет
id1:1, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:2
id1:2, id2:2, id3:3, ind:1
id1:1, id2:2, id3:3, ind:3
id1:2, id2:2, id3:3, ind:2
Я действительно растерялся относительно того, как это исправить.Я довольно новичок в pyspark, но я долго искал и не могу найти ответ.Любая помощь будет принята с благодарностью.Спасибо