Присоединение KStream с KTable из промежуточной темы приводит к исключению - PullRequest
0 голосов
/ 06 февраля 2019

Я пытаюсь присоединиться к KStream с KTable.Без объединения у меня нет проблем с чтением из промежуточной темы "book-attribute-by-id".

Пример сообщения для KTable:

{key: {id: 1}
 value: {id: 1, attribute_name: "weight"}}

Пример сообщения для KStream:

{key: {id: 1},
 value: {id: 1, book_id: 1, attribute_id: 1, value: 200}}

Требуемый вывод в тему «окончательного агрегирования»:

{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "weight", value: 200}}
{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "number_of_pages", value: 450}}

Вот код

    KStream<DefaultId, BookAttribute> bookAttributeStream = builder.stream(bookAttributeTopic, Consumed.with(defaultIdSerde, bookAttributeSerde));
    KStream<DefaultId, BookValueInt> bookValueIntStream = builder.stream(bookValueIntTopic, Consumed.with(defaultIdSerde, bookValueIntSerde));

    bookAttributeStream
        .selectKey((k, v) -> k.getId())
        .to("book-attribute-by-id", Produced.with(Serdes.Integer(), bookAttributeSerde));

    KTable<Integer, BookAttribute> bookAttributeByIdTable = builder.table("book-attribute-by-id", Consumed.with(Serdes.Integer(), bookAttributeSerde));

    // when the snippet below is commented out, consuming "book-attribute-by-id" works. 
    bookValueIntStream
        .selectKey((k, v) -> v.getAttribute_id())
        .join(bookAttributeByIdTable, (intValue, attribute) -> {
                System.out.println("intValue: " + intValue);
                System.out.println("attribute: " + attribute);
                return new BookAttributeValue(intValue, attribute);
            });

Исключение при присоединении к KStream & KTable:

Исключение в потоке "xxx-StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: недопустимое построение топологии: stream-thread [xxx-StreamThread-1] Тема не найдена: book-attribute-by-id на org.apache.kafka.streams.processor.internals.StreamPartitionAssignor $ CopartitionedTopicsValidator.validate (StreamPartorition.java: 792)

1 Ответ

0 голосов
/ 06 февраля 2019

Полагаю, вы используете kafka-streams 1.0.0

Проблема заключается в том, что вам нужно создавать входные темы для ваших потоков.

В вашем случае темы: book-attribute-by-idи те, которые являются значениями переменных: bookAttributeTopic, bookValueIntTopic.

Для объединений Kafka Streams должен убедиться, что количество разделов в объединяемых темах равно.Исключение выдается при попытке получить метаданные для темы: book-attribute-by-id.

Перед запуском приложения вы должны вручную создать book-attribute-by-id topic

В более новой версии существования kafka-streamsтемы проверяются перед проверкой количества разделов.

...