Я не уверен, является ли kafka-streams правильным решением проблемы, которую я пытаюсь решить.Я хотел бы иметь возможность использовать его из-за параллелизма и отказоустойчивости, которые он обеспечивает, но я изо всех сил пытаюсь найти способ достижения желаемого конвейера обработки.
Конвейер - что-то вроде этого:
- Запись некоторого типа поступает в тему ввода
- Информация в этой записи используется для выполнения запроса к базе данных, который возвращает много результатов
Я хотел бы иметь возможность записывать каждый результат как отдельную запись с собственным ключом, а не как набор результатов в одной записи.
Игнорирование одной выходной записи для каждого требования к результату дляна мгновение у меня есть код, который выглядит следующим образом:
Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<List<MyOutput>> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);
KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
outputs.to("output-topic", Produced.with(stringSerde, outputSerde));
Это достаточно просто, 1 сообщение, 1 сообщение (хотя бы коллекция).
Что бы я хотелбыть в состоянии сделать что-то вроде:
Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<MyOutput> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);
KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
KStream<String, MyOutput> sink = outputs.???
sink.to("output-topic", Produced.with(stringSerde, outputSerde));
Я не могу придумать ничего разумного для операции или операций, выполняемых в потоке outputs
.
Есть предложения?Или kafka-streams, возможно, не является правильным решением такой проблемы?