Слив Hazelcast-Jet с проблемой синтаксиса - PullRequest
0 голосов
/ 30 января 2019

Я пытаюсь объединиться с помощью Jet, источник и приемник - это тема Kafka, требуется взять сообщения GPB (google proto buf) из источника и опубликовать сообщения GPB.Проблема в том, что я могу опубликовать Double, но не сообщение GPB, и оно дает мне ошибку компиляции.

Это прекрасно работает:

    Pipeline p = Pipeline.create();
    p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>)) 
    .map(s->s.getValue() ).groupingKey(x->x.account)
    .rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

Даже если приведенный выше код работает нормально, онпубликует double для топика, а мое требование - опубликовать GPB с атрибутом double для топика.Когда я пытаюсь сделать это, помещая map перед drainTo, это дает мне синтаксическую ошибку.Ниже я попробовал:

    .rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
    .map(k->Amount.newBuilder().setAmount(k.getValue()).build())
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

Сумма - это сообщение GPB, имеющее атрибут double.Это дает мне синтаксическую ошибку, которую я не понимаю.Не могли бы вы помочь мне разобраться с этим.

Не могли бы вы также поделиться некоторыми документами или ссылками, где есть разные агрегации для разных сценариев?Я просмотрел образцы Hazelcast, демонстрации, не все, но немногие, но не нашел своего варианта использования там.Большое спасибо.

1 Ответ

0 голосов
/ 08 февраля 2019

Я предполагаю, что синтаксическая ошибка была такой:

Несовместимые типы.Требуется Раковинано 'Кафка' была выведена на раковину>: не существует экземпляра (ов) переменных типа (s) K, V, поэтому String соответствует Entry

(В следующий раз, пожалуйста, поделитесь исключением, ваш код имеет зависимости от не-общих классов, и я не могу его скомпилировать.)

Это означает, что приемник Кафки ожидает java.util.Map.Entryна входе, но вы дали это Amount.Вам нужно map это так:

.map(entry-> Util.entry(entry.getKey(), Amount.newBuilder().setAmount(entry.getValue()).build()))
...