Ниже приведен код потребителя для получения и обработки сообщений из раздела kafka (8 разделов).
@Component
public class MessageConsumer {
private static final String TOPIC = "mytopic.t";
private static final String GROUP_ID = "mygroup";
private final ReceiverOptions consumerSettings;
private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);
@Autowired
public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
{
this.consumerSettings=consumerSettings;
consumerMessage();
}
private void consumerMessage()
{
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));
Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);
Flux.defer(receiver::receive)
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.concatMap(m -> {
LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
return process(m.key(), m.value())
.thenEmpty(m.receiverOffset().commit());
}))
.retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
.doOnError(err -> {
handleError(err);
}).retry()
.doOnCancel(() -> close()).subscribe();
}
private void close() {
}
private void handleError(Throwable err) {
LOG.error("kafka stream error : ",err);
}
private Mono<Void> process(String key, String value)
{
if(key.equals("error"))
return Mono.error(new Exception("process error : "));
LOG.error("message consumed : "+key);
return Mono.empty();
}
public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
return consumerSettings
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.subscription(topics);
}
}
@Bean(name="consumerSettings")
public ReceiverOptions<String, String> getConsumerSettings() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put("max.block.ms", "3000");
props.put("request.timeout.ms", "3000");
return ReceiverOptions.create(props);
}
При получении каждого сообщения логика обработки возвращается к пустому моно, если обработанное сообщение успешно обработано.
Все работает, как и ожидалось, если в логике обработки не возвращена ошибка.
Но если я выдавал ошибку, имитирующую поведение исключения в моей логике обработки для конкретного сообщения, то я пропускалобработать то сообщение, которое вызвало исключение.Поток перемещается к следующему сообщению.
Что я хочу добиться, это обработать текущее сообщение и зафиксировать смещение, если оно успешно, а затем перейти к следующей записи.
Если при обработке возникнет исключениесообщение не фиксирует текущее смещение и повторяет одно и то же сообщение до его успешного завершения.Не переходите к следующему сообщению до тех пор, пока текущее сообщение не будет успешным.
Пожалуйста, дайте мне знать, как обрабатывать сбои процесса, не пропуская сообщение, и заставить поток начинаться со смещения, в которое выдается исключение.
С уважением,
Винот