У меня процессор Kafka с таким определением.
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.kafka.receiver.ReceiverRecord
import reactor.kotlin.core.publisher.toMono
import reactor.util.retry.Retry
import java.time.Duration
import java.util.*
@Component
class KafkaProcessor {
private val logger = LoggerFactory.getLogger(javaClass)
private val consumerProps = hashMapOf(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::javaClass,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::javaClass,
ConsumerConfig.GROUP_ID_CONFIG to "groupId",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"
)
private val receiverOptions = ReceiverOptions.create<String, String>(consumerProps)
.subscription(Collections.singleton("some-topic"))
.commitInterval(Duration.ofSeconds(1))
.commitBatchSize(1000)
.maxCommitAttempts(1)
private val kafkaReceiver: KafkaReceiver<String, String> = KafkaReceiver.create(receiverOptions)
@Bean
fun processKafkaMessages(): Unit {
kafkaReceiver.receive()
.groupBy { m -> m.receiverOffset().topicPartition() }
.flatMap { partitionFlux ->
partitionFlux.publishOn(Schedulers.elastic())
.concatMap { receiverRecord ->
processRecord(receiverRecord)
.map { it.receiverOffset().acknowledge() }
}
}
.retryWhen(
Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(3))
.doBeforeRetry { rs ->
logger.warn("Retrying: ${rs.totalRetries() + 1}/3 due to ${rs.failure()}")
}
.onRetryExhaustedThrow { _, u ->
logger.error("All ${u.totalRetries() + 1} attempts failed with the last exception: ${u.failure()}")
u.failure()
}
)
.subscribe()
}
private fun processRecord(record: ReceiverRecord<String, String>): Mono<ReceiverRecord<String, String>> {
return record.toMono()
}
}
Иногда я получаю эту ошибку.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.
Первая попытка выглядит так.
Retrying: 1/3 due to org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets
Второй и третий выглядят так.
Retrying: 2/3 due to reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
Retrying: 3/3 due to reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
И как только все 3 попытки исчерпаны, сообщение выглядит так:
All 4 attempts failed with the last exception: reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
Когда я получаю Эта ошибка, мне нужно перезапустить приложение, чтобы повторно подключиться к брокеру Kafka и зафиксировать запись.
Я знаю, что установка maxCommitAttempts
на 1
означает, что как только он достигает RetriableCommitFailedException
, он не повторит попытку. Я думал, что предложение retryWhen
, которое я поставил в конце функции processKafkaMessages()
, поможет восстановить конвейер сам по себе.
Я установил maxCommitAttempts
потому, что он нет повторных попыток с отсрочкой, как описано здесь , а максимальное количество попыток фиксации по умолчанию 100 выполняется в течение 10 мс. Итак, я подумал, что должен написать свой собственный журнал повторных попыток c с откатом.
Вопрос в том, как мне правильно выполнить повтор с откатом для автоматической фиксации? И можно ли написать для этого модульный тест, используя EmbeddedKafka
?
Язык: Kotlin
Библиотека Reactor Kafka: io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE