В Beam элементы PCollection неупорядочены.Я бы сохранил результаты в базе данных и выполнил бы сортировку там.
Не уверен насчет вашего варианта использования и если действительно необходимо выполнить сортировку в Beam, но обходной путь может заключаться в группировке всех строк на фиктивномключ, используйте GroupByKey и выполните сортировку сгруппированных данных следующим образом:
word_count_list = [
('itself', 16),
('grey', 1),
('senses', 4),
('repair', 1),
('me', 228),
]
def addKey(row):
return (1, row)
def sortGroupedData(row):
(keyNumber, sortData) = row
sortData.sort(key=lambda x: x[1], reverse=True)
return sortData[0:3]
word_count = (p
| 'CreateWordCountColl' >> beam.Create(word_count_list)
| 'AddKey' >> beam.Map(addKey)
| 'GroupByKey' >> beam.GroupByKey()
| 'SortGroupedData' >> beam.Map(sortGroupedData)
| 'Write' >> WriteToText('./sorting_results')
)
Возвращает первые 3 в списке одной строки.
[('me', 228), ('itself', 16), ('senses', 4)]
Однако учтите, чтоВы бы отказались от параллельной обработки набора данных.