Я думаю, что вам нужно указать порядок элементов в вашем СДР, чтобы определить, как 2 элемента считаются "последовательными" друг для друга.Поскольку ваш RDD может состоять из нескольких разделов, поэтому spark не будет знать, является ли 1 элемент в partition_1 последовательным по отношению к другому элементу в partition_2.
Если вы заранее знаете свои данные, вы можете определить ключ, а такжекак 2 элемента являются «последовательными».Учитывая ваш пример, где rdd создается из списка, вы можете использовать индекс в качестве ключа и сделать соединение.
"""you want to shift arr by 1 to the left, then join back to arr. Calculation based on index"""
arr = ['a','b','c','d','e','f']
rdd = sc.parallelize(arr, 2).zipWithIndex().cache() #cache if rdd is small
original_rdd = rdd.map(lambda x: (x[1], x[0])) #create rdd with key=index, value=item in list
shifted_rdd = rdd.map(lambda x: (x[1]-1, x[0]))
results = original_rdd.join(shifted_rdd)
print(results.values().collect())
Чтобы добиться лучшей производительности в join
, вы можете использовать разделы диапазона для original_rdd
и shifted_rdd
.