Последовательный запуск неблокирующих операций при использовании потока, включая повторные попытки - PullRequest
0 голосов
/ 10 января 2019

Таким образом, мой вариант использования - использовать сообщения от Kafka в приложении Spring Webflux при программировании в реактивном стиле с использованием Project Reactor и выполнять неблокирующую операцию для каждого сообщения в том же порядке, в котором сообщения были получены от Кафка. Система также должна восстанавливаться самостоятельно.

Вот фрагмент кода, который настроен для использования из:

    Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
        KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
        return receiver.receive();
    });

    messages.map(this::transformToOutputFormat)
            .map(this::performAction)
            .flatMapSequential(receiverRecordMono -> receiverRecordMono)
            .doOnNext(record -> record.receiverOffset().acknowledge())
            .doOnError(error -> logger.error("Error receiving record", error))
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .subscribe();

Как видите, я делаю следующее: беру сообщение от Кафки, превращаю его в объект, предназначенный для нового пункта назначения, затем отправляю его в пункт назначения и затем подтверждаю смещение, чтобы пометить сообщение как использованное и обработанное , Крайне важно подтвердить смещение в том же порядке, что и сообщения, получаемые от Kafka, чтобы мы не перемещали смещение за пределы сообщений, которые не были полностью обработаны (включая отправку некоторых данных в место назначения). Поэтому я использую flatMapSequential, чтобы гарантировать это.

Для простоты предположим, что метод transformToOutputFormat() является тождественным преобразованием.

public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
    return record;
}

Метод performAction() должен что-то делать по сети, например, вызывать HTTP REST API. Таким образом, соответствующие API возвращают Mono, что означает, что на цепочку необходимо подписаться. Кроме того, мне нужно, чтобы ReceiverRecord был возвращен этим методом, чтобы смещение могло быть подтверждено оператором flatMapSequential () выше. Поскольку мне нужно подписаться на Mono, я использую flatMapSequential выше. Если нет, я мог бы использовать map вместо.

public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
    return Mono.just(record)
            .flatMap(receiverRecord ->
                    HttpClient.create()
                            .port(3000)
                            .get()
                            .uri("/makeCall?data=" + receiverRecord.value().getData())
                            .responseContent()
                            .aggregate()
                            .asString()
            )
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .then(Mono.just(record));

У меня есть две конфликтующие потребности в этом методе: 1. Подписаться на цепочку, которая делает HTTP-вызов 2. Верните ReceiverRecord

Использование flatMap () означает, что мой тип возврата меняется на Mono. Использование doOnNext () в том же месте сохранит ReceiverRecord в цепочке, но не позволит автоматически подписаться на ответ HttpClient.

Я не могу добавить .subscribe() после asString(), потому что я хочу дождаться, пока ответ HTTP будет полностью получен, прежде чем смещение будет подтверждено.

Я также не могу использовать .block(), так как он работает в параллельном потоке.

В результате мне нужно обмануть и вернуть объект record из области видимости метода.

Другое дело, что при повторной попытке внутри performAction он переключает потоки. Поскольку flatMapSequential () охотно подписывается на каждый Mono во внешнем потоке, это означает, что хотя подтверждение смещений может быть гарантировано по порядку, мы не можем гарантировать, что HTTP-вызов в performAction будет выполняться в том же порядке.

Итак, у меня два вопроса.

  1. Возможно ли вернуть record естественным путем, а не вернуть объект области метода?
  2. Можно ли гарантировать, что как HTTP-вызов, так и подтверждение смещения выполняются в том же порядке, что и сообщения, для которых выполняются эти операции?

1 Ответ

0 голосов
/ 14 января 2019

Вот решение, которое я придумала.

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
    KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
    return receiver.receive();
});

messages.map(this::transformToOutputFormat)
        .delayUntil(this::performAction)
        .doOnNext(record -> record.receiverOffset().acknowledge())
        .doOnError(error -> logger.error("Error receiving record", error))
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
        .subscribe();

Вместо того, чтобы использовать flatMapSequential для подписки на executeAction Mono и сохранения последовательности, вместо этого я отложил запрос большего количества сообщений от получателя Kafka, пока действие не будет выполнено. Это позволяет мне обрабатывать по одному.

В результате executeAction не нужно возвращать Mono of ReceiverRecord. Я также упростил это до следующего:

public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
    HttpClient.create()
        .port(3000)
        .get()
        .uri("/makeCall?data=" + receiverRecord.value().getData())
        .responseContent()
        .aggregate()
        .asString()
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}
...