Возможно ли для приложения kafka steams записать несколько выходов с одного входа? - PullRequest
0 голосов
/ 17 ноября 2018

Я не уверен, является ли kafka-streams правильным решением проблемы, которую я пытаюсь решить.Я хотел бы иметь возможность использовать его из-за параллелизма и отказоустойчивости, которые он обеспечивает, но я изо всех сил пытаюсь найти способ достижения желаемого конвейера обработки.

Конвейер - что-то вроде этого:

  1. Запись некоторого типа поступает в тему ввода
  2. Информация в этой записи используется для выполнения запроса к базе данных, который возвращает много результатов

Я хотел бы иметь возможность записывать каждый результат как отдельную запись с собственным ключом, а не как набор результатов в одной записи.

Игнорирование одной выходной записи для каждого требования к результату дляна мгновение у меня есть код, который выглядит следующим образом:

Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<List<MyOutput>> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);

KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
outputs.to("output-topic", Produced.with(stringSerde, outputSerde));

Это достаточно просто, 1 сообщение, 1 сообщение (хотя бы коллекция).

Что бы я хотелбыть в состоянии сделать что-то вроде:

Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<MyOutput> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);

KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
KStream<String, MyOutput> sink = outputs.???
sink.to("output-topic", Produced.with(stringSerde, outputSerde));

Я не могу придумать ничего разумного для операции или операций, выполняемых в потоке outputs.

Есть предложения?Или kafka-streams, возможно, не является правильным решением такой проблемы?

Ответы [ 2 ]

0 голосов
/ 19 ноября 2018

Спасибо, Василий, flatMap действительно было то, что мне было нужно.Я смотрел на это раньше, думал, что это правильная операция, но затем запутался и по ошибке отбросил ее.

Комбинируя то, что у меня было раньше, с вашим предложением, следующие работы, предполагая, что MyOutput реализует метод под названием getKey():

Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<MyOutput> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);

KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
KStream<String, MyOutput> sink = outputs.flatMap(((key, value) -> 
    value.stream().map(o -> new KeyValue<>(o.getKey(), o)).collect(Collectors.toList())));
sink.to("output-topic", Produced.with(stringSerde, outputSerde));
0 голосов
/ 17 ноября 2018

Да, это возможно, для этого вам нужно использовать преобразование KStream flatMap. FlatMap преобразует каждую запись входного потока в ноль или более записей в выходном потоке (как ключ, так и тип значения могут быть изменены произвольно)

kStream = kStream.flatMap(
        (key, value) -> {
            List<KeyValue<String, MyOutput>> result = new ArrayList<>();
            // do your logic here
            return result;
        });
kStream.to("output-topic", Produced.with(stringSerde, outputSerde));
...