Как я могу обработать исключение при потоковой обработке с помощью quarkus + kafka + smallrye?
Мой код очень похож на пример обязательного производителя в руководстве по quarkus (https://quarkus.io/guides/kafka#imperative -usage )
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(price);
}
}
Я хотел что-то похожее на ванильную библиотеку Кафки, которая дает возможность обрабатывать обратный вызов каждой записи, которую требуется отправить.
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.info(record.toString());
if (exception != null) {
logger.error("Producer exception", exception);
}
}
});
Tks