Это интересный и не очень распространенный вариант использования, поскольку, как говорит @chamikara, в Dataflow нет гарантии заказа.Однако я подумал о реализации решения, в котором вы перетасовываете входную коллекцию PC, а затем соединяете последовательные элементы на основе состояние .Я нашел несколько предостережений на этом пути, но я подумал, что в любом случае стоит поделиться.
Во-первых, я использовал Python SDK, но Dataflow Runner пока не поддерживает DoFn с отслеживанием состояния.Он работает с Direct Runner, но: 1) он не масштабируется и 2) трудно перетасовать записи без многопоточности.Конечно, простым решением для последнего является подача уже перетасованной коллекции PC в конвейер (мы можем использовать другое задание для предварительной обработки данных).В противном случае мы можем адаптировать этот пример к Java SDK.
Пока я решил попробовать перемешать и связать его с одним конвейером.Я действительно не знаю, помогает ли это или делает вещи более сложными, но код можно найти здесь .
Вкратце, DoFn с отслеживанием состояния смотрит на буфер и, если он пуст, вставляеттекущий элемент.В противном случае он извлекает предыдущий элемент из буфера и выводит кортеж (previous_element, current_element):
class PairRecordsFn(beam.DoFn):
"""Pairs two consecutive elements after shuffle"""
BUFFER = BagStateSpec('buffer', PickleCoder())
def process(self, element, buffer=beam.DoFn.StateParam(BUFFER)):
try:
previous_element = list(buffer.read())[0]
except:
previous_element = []
unused_key, value = element
if previous_element:
yield (previous_element, value)
buffer.clear()
else:
buffer.add(value)
Конвейер добавляет ключи к элементам ввода, как требуется для использования DoFn с состоянием.Здесь будет компромисс, потому что вы потенциально можете назначить один и тот же ключ всем элементам с помощью beam.Map(lambda x: (1, x))
.Это не будет хорошо распараллеливать, но это не проблема, так как мы все равно используем Direct Runner (имейте это в виду, если используете Java SDK).Тем не менее, это не будет перемешивать записи.Если вместо этого мы перетасуем большое количество ключей, мы получим большее количество «потерянных» элементов, которые не могут быть спарены (так как состояние сохраняется для ключа и мы назначаем их случайным образом, мы можем иметь нечетное количество записейна ключ):
pairs = (p
| 'Create Events' >> beam.Create(data)
| 'Add Keys' >> beam.Map(lambda x: (randint(1,4), x))
| 'Pair Records' >> beam.ParDo(PairRecordsFn())
| 'Check Results' >> beam.ParDo(LogFn()))
В моем случае я получил что-то вроде:
INFO:root:('one', 'three')
INFO:root:('two', 'five')
INFO:root:('zero', 'six')
INFO:root:('four', 'seven')
INFO:root:('ten', 'twelve')
INFO:root:('nine', 'thirteen')
INFO:root:('eight', 'fourteen')
INFO:root:('eleven', 'sixteen')
...
РЕДАКТИРОВАТЬ: я подумал о другом способе сделать это с помощью сумматора Sample.FixedSizeGlobally
.Хорошо, что он лучше перетасовывает данные, но вам нужно знать количество элементов априори (в противном случае нам понадобится начальная передача данных), и кажется, что все элементы возвращаются вместе.Вкратце, я дважды инициализирую одну и ту же коллекцию PC, но применяю разные порядки случайного порядка и назначаю индексы в DoFn с состоянием.Это гарантирует, что индексы уникальны для элементов одной и той же коллекции ПК (даже если порядок не гарантирован).В моем случае обе PCollections будут иметь ровно одну запись для каждой клавиши в диапазоне [0, 31].Преобразование CoGroupByKey объединит обе PCollections по одному и тому же индексу, таким образом, имея случайные пары элементов:
pc1 = (p
| 'Create Events 1' >> beam.Create(data)
| 'Sample 1' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 1' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 1' >> beam.Map(lambda x: (1, x))
| 'Assign Index 1' >> beam.ParDo(IndexAssigningStatefulDoFn()))
pc2 = (p
| 'Create Events 2' >> beam.Create(data)
| 'Sample 2' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 2' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 2' >> beam.Map(lambda x: (2, x))
| 'Assign Index 2' >> beam.ParDo(IndexAssigningStatefulDoFn()))
zipped = ((pc1, pc2)
| 'Zip Shuffled PCollections' >> beam.CoGroupByKey()
| 'Drop Index' >> beam.Map(lambda (x, y):y)
| 'Check Results' >> beam.ParDo(LogFn()))
Полный код здесь
Результаты:
INFO:root:(['ten'], ['nineteen'])
INFO:root:(['twenty-three'], ['seven'])
INFO:root:(['twenty-five'], ['twenty'])
INFO:root:(['twelve'], ['twenty-one'])
INFO:root:(['twenty-six'], ['twenty-five'])
INFO:root:(['zero'], ['twenty-three'])
...