Я использую Reactor Kafka , чтобы и потреблять, и генерировать события Кафки. В случае событий, связанных с потреблением, мой потребитель работает медленно, и поэтому мне нужно справиться с противодавлением.
Тем не менее, я чувствую, что независимо от того, что я называю Subscription.request (), издатель немедленно публикует все события из этой темы, что подавляет потребителя.
Я использую пользовательского подписчика, задающего небольшое количество начальных запросов, вызывая Subscription.request (), когда я подписываюсь на KafkaReceiver.receive () для этого. Насколько я понимаю, именно так я сообщаю издателю, сколько событий первоначально хочет получить мой потребитель.
Мой подписчик:
public class KafkaEventSubscriber extends BaseSubscriber {
private final int numberOfItemsToRequestOnSubscribe;
private final int numberOfItemsToRequestOnNext;
public KafkaEventSubscriber(int numberOfItemsToRequestOnSubscribe,
int numberOfItemsToRequestOnNext) {
this.numberOfItemsToRequestOnSubscribe = numberOfItemsToRequestOnSubscribe;
this.numberOfItemsToRequestOnNext = numberOfItemsToRequestOnNext;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(numberOfItemsToRequestOnSubscribe);
}
@Override
protected void hookOnNext(EnrichedMetadata value) {
request(numberOfItemsToRequestOnNext);
}
}
Как пользоваться абонентом:
kafkaReceiver.receive().map(ReceiverRecord::value).map(KafkaConsumer::acknowledge).subscribe(new KafkaEventSubscriber(10, 1));
Я ожидаю, что KafkaReceiver выведет 10 событий до того, как будет выполнен любой вызов метода подписчиков onNext (), но KafkaReceiver выведет все события, которые еще не получены ACK: ed из темы.
Мне кажется, что независимо от того, что мы называем Subscription.request (), издатель немедленно опубликует все события из этой темы, не принимая во внимание меры противодавления, которые я предпринимал.