Я пытаюсь объединиться с помощью Jet, источник и приемник - это тема Kafka, требуется взять сообщения GPB (google proto buf) из источника и опубликовать сообщения GPB.Проблема в том, что я могу опубликовать Double
, но не сообщение GPB, и оно дает мне ошибку компиляции.
Это прекрасно работает:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>))
.map(s->s.getValue() ).groupingKey(x->x.account)
.rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));
Даже если приведенный выше код работает нормально, онпубликует double
для топика, а мое требование - опубликовать GPB с атрибутом double
для топика.Когда я пытаюсь сделать это, помещая map
перед drainTo
, это дает мне синтаксическую ошибку.Ниже я попробовал:
.rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
.map(k->Amount.newBuilder().setAmount(k.getValue()).build())
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));
Сумма - это сообщение GPB, имеющее атрибут double
.Это дает мне синтаксическую ошибку, которую я не понимаю.Не могли бы вы помочь мне разобраться с этим.
Не могли бы вы также поделиться некоторыми документами или ссылками, где есть разные агрегации для разных сценариев?Я просмотрел образцы Hazelcast, демонстрации, не все, но немногие, но не нашел своего варианта использования там.Большое спасибо.