Я реализовал пакетирование по N элементам, как описано в этом ответе: Может ли вход хранилища данных в конвейере потока данных Google обрабатываться в пакете из N записей одновременно?
package com.example.dataflow.transform;
import com.example.dataflow.event.ClickEvent;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.joda.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class ClickToClicksPack extends DoFn> {
public static final int BATCH_SIZE = 10;
private List accumulator;
@StartBundle
public void startBundle() {
accumulator = new ArrayList(BATCH_SIZE);
}
@ProcessElement
public void processElement(ProcessContext c) {
ClickEvent clickEvent = c.element();
accumulator.add(clickEvent);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList(BATCH_SIZE);
}
}
@FinishBundle
public void finishBundle(FinishBundleContext c) {
if (accumulator.size() > 0) {
ClickEvent clickEvent = accumulator.get(0);
long time = clickEvent.getClickTimestamp().getTime();
c.output(accumulator, new Instant(time), GlobalWindow.INSTANCE);
}
}
}
Но когда я запускаю конвейер в потоковом режиме, есть много пакетов только с 1 или 2 элементами.Как я понимаю, это из-за небольших размеров пачек.После одного дня работы среднее количество элементов в пакете составляет примерно 4. Мне действительно нужно, чтобы оно было ближе к 10 для лучшей производительности следующих шагов.
Есть ли способ контролировать размер пакетов?Или я должен использовать преобразование "GroupIntoBatches" для этой цели.В этом случае для меня не ясно, что должно быть выбрано в качестве ключа.
ОБНОВЛЕНИЕ: является ли хорошей идеей использовать идентификатор потока Java или имя хоста ВМ для ключа, чтобы применить преобразование "GroupIntoBatches"?