Вот еще один подход, использующий mapPartitions из RDD API. Мы перебираем элементы каждого раздела, пока не достигнем конца. Я ожидаю, что эта итерация будет очень быстрой. Вот код:
df = spark.createDataFrame([
["Tom", "a"],
["Dick", "b"],
["Harry", "c"],
["Elvis", "d"],
["Elton", "e"],
["Sandra", "f"]
], ["name", "toy"])
def get_first_last(it):
first = last = next(it)
for last in it:
pass
# Attention: if first equals last by reference return only one!
if first is last:
return [first]
return [first, last]
# coalesce here is just for demonstration
first_last_rdd = df.coalesce(2).rdd.mapPartitions(get_first_last)
spark.createDataFrame(first_last_rdd, ["name", "toy"]).show()
# +------+---+
# | name|toy|
# +------+---+
# | Tom| a|
# | Harry| c|
# | Elvis| d|
# |Sandra| f|
# +------+---+
PS: Нечетные позиции будут содержать первый элемент раздела, а четные - последний элемент. Также обратите внимание, что число результатов будет (numPartitions * 2) - numPartitionsWithOneItem
, которое, как я ожидаю, будет относительно небольшим, поэтому не стоит беспокоиться о стоимости нового оператора createDataFrame
.