У меня есть конвейер потока данных, который читает из раздела pubsub, выполняет преобразования и пишет в BigTable. Я хочу, чтобы элементы, считанные из pubsub, обрабатывались в порядке их порядкового номера.
Я использую фиксированное окно продолжительностью 2 минуты, а затем применяю к нему GroupByKey. После GBK я использую преобразование SortValues, которое сортирует Iterable по SequenceNumber. Я наблюдаю, что время стены шага GroupByKey будет высоким, так как все элементы в окне обрабатываются одним и тем же рабочим. Есть ли эффективный способ сортировки элементов в фиксированном окне?
Ниже приведен код моего конвейера:
PCollection<PubsubMessage> pubsubRecords = p.apply(PubsubIO.readMessagesWithAttributes()
.fromTopic(StaticValueProvider.of(topic)));
PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
.apply("Window", Window
.<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.StandardMinutes(2))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
);
PCollection<KV<String, KV<BigInteger, JSONObject>>> keyedWindow = window
.apply(WithKeys.of(new SerializableFunction<KV<BigInteger, JSONObject>,String>() {
@Override
public String apply(KV<BigInteger, JSONObject> row) {
return "key";
}
}));
PCollection<KV<String, Iterable<KV<BigInteger, JSONObject>>>> groupedWindow = keyedWindow
.apply(GroupByKey.<String, KV<BigInteger, JSONObject>>create()).apply(
SortValues.<String, BigInteger, JSONObject>create(BufferedExternalSorter.options()));