Я не конфликтую с шаблоном потока "Точно один раз", потому что это красота Kafka Stream, однако можно использовать Kafka Stream, не создавая другие темы.
Шаблон потока "ровно один раз" - это просто возможность выполнитьоперация чтения-процесса-записи ровно один раз. Означает, что вы потребляете одно сообщение за раз, получаете процесс и публикуетесь в другой теме и фиксируете. Таким образом, commit будет обрабатываться Stream автоматически по одному сообщению за раз. Для этого kafka Stream устанавливает следующие параметры, которые нельзя перезаписать
- изоляция. Уровень: (read_committed) - Потребители всегда будут читать только зафиксированные данные
- enable.idempotence:(true) - у производителя всегда будет включена идемпотентность
- max.in.flight.requests.per.connection "(5) - у производителя всегда будет один запрос в полете на соединение
В случае, если какая-либо ошибка в потоке kafka потребителя или производителя всегда повторяет определенное сконфигурированное число попыток KafkaStream не гарантирует внутри логики обработки, которую мы все еще должны обработать, например, есть требование для работы с БД, и если соединение с БД не удалось вв этом случае Кафка не знает, поэтому вам нужно справиться с этим самостоятельно.
В соответствии с определением шаблона, да, нам нужна тема для потребителей, процессов и производителей, но в целом это не останавливает вас, если вы не выводите на другойТем не менее, вы можете потреблять ровно один элемент за один раз с фиксацией интервала времени по умолчанию (DEFAULT_COMMIT_INTERVAL_MS) и снова вам нужно самостоятельно обработать свой сбой логической транзакции. Я приведу несколько примеров.
StreamsBuilder builder = new StreamsBuilder();
Properties props = getStreamProperties();
KStream<String, String> textLines = builder.stream(Pattern.compile("topic"));
textLines.process(() -> new ProcessInternal());
KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Completed VQM stream");
streams.close();
}));
logger.info("Streaming start...");
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
class ProcessInternal implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {
// Any code for clean up would go here.
}
@Override
public void process(String key, String value) {
///Your transactional process business logic
}
}