Как читать с конца темы, независимо от зафиксированного смещения группы - PullRequest
0 голосов
/ 15 мая 2019

Я использую следующие пакеты для приема сообщений kafka

compile 'org.springframework.boot:spring-boot-starter-webflux'
compile("org.springframework.boot:spring-boot-starter-web")
// tag::actuator[]
compile("org.springframework.boot:spring-boot-starter-actuator")
compile('org.springframework.kafka:spring-kafka:2.1.7.RELEASE')
compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'

Я хочу получать сообщения из конца темы, независимо от зафиксированного смещения группы

При поиске я обнаружил, что мы можем использовать следующий код

consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());

Но я не могу найти, как использовать приведенный выше код при весенней загрузке webflux

@Component
public class EventConsumer
{
    private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();

    public Flux<ServerSentEvent<String>> get()
    {
        return emitter;
    }

    @KafkaListener(topics = "${kafka.zone.status.topic.name}")
    public void receive(String data)
    {
        //System.out.println(data);
        emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
    }
}

1 Ответ

0 голосов
/ 15 мая 2019

См. документацию .

Реализуйте ConsumerSeekAware и выполните поиск в методе onPartitionsAssigned.

@Component
public class EventConsumer implements ConsumerSeekAware {

    private final EmitterProcessor<ServerSentEvent<String>> emitter = EmitterProcessor.create();

    public Flux<ServerSentEvent<String>> get() {
        return emitter;
    }

    @KafkaListener(topics = "${kafka.zone.status.topic.name}")
    public void receive(String data) {
        // System.out.println(data);
        emitter.onNext(ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).build());
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> callback.seekToEnd(tp.topic(), tp.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {

    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...