Таким образом, мой вариант использования - использовать сообщения от Kafka в приложении Spring Webflux при программировании в реактивном стиле с использованием Project Reactor и выполнять неблокирующую операцию для каждого сообщения в том же порядке, в котором сообщения были получены от Кафка. Система также должна восстанавливаться самостоятельно.
Вот фрагмент кода, который настроен для использования из:
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
Как видите, я делаю следующее: беру сообщение от Кафки, превращаю его в объект, предназначенный для нового пункта назначения, затем отправляю его в пункт назначения и затем подтверждаю смещение, чтобы пометить сообщение как использованное и обработанное , Крайне важно подтвердить смещение в том же порядке, что и сообщения, получаемые от Kafka, чтобы мы не перемещали смещение за пределы сообщений, которые не были полностью обработаны (включая отправку некоторых данных в место назначения). Поэтому я использую flatMapSequential
, чтобы гарантировать это.
Для простоты предположим, что метод transformToOutputFormat()
является тождественным преобразованием.
public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}
Метод performAction()
должен что-то делать по сети, например, вызывать HTTP REST API. Таким образом, соответствующие API возвращают Mono, что означает, что на цепочку необходимо подписаться. Кроме того, мне нужно, чтобы ReceiverRecord
был возвращен этим методом, чтобы смещение могло быть подтверждено оператором flatMapSequential () выше. Поскольку мне нужно подписаться на Mono, я использую flatMapSequential
выше. Если нет, я мог бы использовать map
вместо.
public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));
У меня есть две конфликтующие потребности в этом методе:
1. Подписаться на цепочку, которая делает HTTP-вызов
2. Верните ReceiverRecord
Использование flatMap () означает, что мой тип возврата меняется на Mono. Использование doOnNext () в том же месте сохранит ReceiverRecord в цепочке, но не позволит автоматически подписаться на ответ HttpClient.
Я не могу добавить .subscribe()
после asString()
, потому что я хочу дождаться, пока ответ HTTP будет полностью получен, прежде чем смещение будет подтверждено.
Я также не могу использовать .block()
, так как он работает в параллельном потоке.
В результате мне нужно обмануть и вернуть объект record
из области видимости метода.
Другое дело, что при повторной попытке внутри performAction
он переключает потоки. Поскольку flatMapSequential () охотно подписывается на каждый Mono во внешнем потоке, это означает, что хотя подтверждение смещений может быть гарантировано по порядку, мы не можем гарантировать, что HTTP-вызов в performAction
будет выполняться в том же порядке.
Итак, у меня два вопроса.
- Возможно ли вернуть
record
естественным путем, а не вернуть объект области метода?
- Можно ли гарантировать, что как HTTP-вызов, так и подтверждение смещения выполняются в том же порядке, что и сообщения, для которых выполняются эти операции?