Вы не можете присоединиться к Sink.ignore, потому что вы уже подключили Commiter.Sink.Но вы можете отбросить материализованные значения.
В примере используется toMat с Keep.both для сохранения обоих материализованных значений: элемента управления из источника и будущего [Готово] из приемника.С обоими значениями он создает DrainingControl в mapMaterializedValue, который позволяет вам останавливать поток или сливать поток до его остановки или получать уведомления, когда поток останавливается.
Если вас не волнует этот элемент управления (хотя вам следует)Вы можете использовать
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.to(Committer.sink(committerSettings.withMaxBatch(1)))
.run(materializer);