Reactor Kafka - At-Least-Once - обработка сбоев и смещений в нескольких разделах - PullRequest
0 голосов
/ 04 марта 2019

Ниже приведен код потребителя для получения и обработки сообщений из раздела kafka (8 разделов).

    @Component
    public class MessageConsumer {

        private static final String TOPIC = "mytopic.t";
        private static final String GROUP_ID = "mygroup";
        private final ReceiverOptions consumerSettings;
        private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);

        @Autowired
        public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
        {
            this.consumerSettings=consumerSettings;
            consumerMessage();
        }

        private void consumerMessage()
        {

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

        Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

        Flux.defer(receiver::receive)
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                        partitionFlux.publishOn(scheduler)
                                .concatMap(m -> {
                                    LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
                                    return process(m.key(), m.value())
                                            .thenEmpty(m.receiverOffset().commit());
                                }))
                .retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
                .doOnError(err -> {
                    handleError(err);
                }).retry()
                .doOnCancel(() -> close()).subscribe();

    }

    private void close() {
    }

    private void handleError(Throwable err) {
        LOG.error("kafka stream error : ",err);
    }

    private Mono<Void> process(String key, String value)
    {
        if(key.equals("error"))
            return Mono.error(new Exception("process error : "));

        LOG.error("message consumed : "+key);
        return Mono.empty();
    }


    public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
        return consumerSettings
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .subscription(topics);
    }


}
    @Bean(name="consumerSettings")
    public ReceiverOptions<String, String> getConsumerSettings() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put("max.block.ms", "3000");
        props.put("request.timeout.ms", "3000");

        return ReceiverOptions.create(props);
    }

При получении каждого сообщения логика обработки возвращается к пустому моно, если обработанное сообщение успешно обработано.

Все работает, как и ожидалось, если в логике обработки не возвращена ошибка.

Но если я выдавал ошибку, имитирующую поведение исключения в моей логике обработки для конкретного сообщения, то я пропускалобработать то сообщение, которое вызвало исключение.Поток перемещается к следующему сообщению.

Что я хочу добиться, это обработать текущее сообщение и зафиксировать смещение, если оно успешно, а затем перейти к следующей записи.

Если при обработке возникнет исключениесообщение не фиксирует текущее смещение и повторяет одно и то же сообщение до его успешного завершения.Не переходите к следующему сообщению до тех пор, пока текущее сообщение не будет успешным.

Пожалуйста, дайте мне знать, как обрабатывать сбои процесса, не пропуская сообщение, и заставить поток начинаться со смещения, в которое выдается исключение.

С уважением,

Винот

Ответы [ 2 ]

0 голосов
/ 10 марта 2019

Код ниже работает для меня.Идея состоит в том, чтобы повторить неудачное сообщение настроенное количество раз, а если оно все равно не удалось, переместите его в очередь с ошибками и зафиксируйте сообщение.В то же время обрабатывайте сообщения из других разделов одновременно.

Если сообщение из определенного раздела терпит неудачу заданное количество раз, то перезапустите поток после задержки, чтобы мы могли обрабатывать сбои зависимостей, не прерывая их непрерывно.

@Autowired
public ReactiveMessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings,MessageProducer producer)
{
    this.consumerSettings=consumerSettings;
    this.fraudCheckService=fraudCheckService;
    this.producer=producer;
    consumerMessage();
}

private void consumerMessage() {

    int numRetries=3;

    Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

    KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

    Flux<GroupedFlux<TopicPartition, ReceiverRecord<String, String>>> f = Flux.defer(receiver::receive)
            .groupBy(m -> m.receiverOffset().topicPartition());

    Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b ->
            Flux.just(b)
                    .concatMap(a -> {
                        LOG.error("processing message - order: {} offset: {} partition: {}",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition());

                        return process(a.key(), a.value()).
                                then(a.receiverOffset().commit())
                                .doOnSuccess(d -> LOG.info("committing  order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()))
                                .doOnError(d -> LOG.info("committing offset failed for order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()));
                    })
                    .retryWhen(companion -> companion
                            .doOnNext(s -> LOG.info(" --> Exception processing message for order {}: offset: {} partition: {} message: {} " , b.key() , b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition(),s.getMessage()))
                            .zipWith(Flux.range(1, numRetries), (error, index) -> {
                                if (index < numRetries) {
                                    LOG.info(" --> Retying {} order: {} offset: {} partition: {} ", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    return index;
                                } else {
                                    LOG.info(" --> Retries Exhausted: {} - order: {} offset: {} partition: {}. Message moved to error queue. Commit and proceed to next", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    producer.sendMessages(ERROR_TOPIC,b.key(),b.value());
                                    b.receiverOffset().commit();
                                    //return index;
                                    throw Exceptions.propagate(error);
                                }
                            })
                            .flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 3)))
                            .doOnNext(s -> LOG.info(" --> Retried at: {} ", LocalTime.now()))
                    ))
    );

    f1.doOnError(a ->  {
                LOG.info("Moving to next message because of : ", a);
                try {

                    Thread.sleep(5000); // configurable
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

    ).retry().subscribe(); 

}

public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
    return consumerSettings
            .commitInterval(Duration.ZERO)
            .commitBatchSize(0)
            .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .subscription(topics);
}

private Mono<Void> process(OrderId orderId, TraceId traceId)
{
    try {

        Thread.sleep(500); // simulate slow response
    } catch (InterruptedException e) {
        // Causes the restart
        e.printStackTrace();
    }

   if(orderId.getId().startsWith("error")) // simulate error scenario
        return Mono.error(new Exception("processing message failed for order: " + orderId.getId()));

    return Mono.empty();
}
0 голосов
/ 04 марта 2019

Создание разных групп потребителей.

Каждая группа потребителей будет связана с одной базой данных.

Создайте своего потребителя так, чтобы он обрабатывал только соответствующее событие и отправлял его в соответствующую базу данных.Если база данных не работает, настройте потребителя на , повторять бесконечное количество времени .По любой причине, если ваш потребитель умирает, убедитесь, что он начинается с того места, где раньше ушел потребитель.Существует небольшая вероятность того, что ваш потребитель умрет сразу после передачи данных в базу данных и отправки подтверждения брокеру kafka.Вам необходимо обновить потребительский код, чтобы обеспечить точную обработку сообщений (если это необходимо).

...