Как мне написать скрипт / код, чтобы запретить потребителю apache kafka принимать сообщения? - PullRequest
0 голосов
/ 13 января 2019

Мне нужно написать Java-код, чтобы пользователь kafka мог потреблять сообщения. После того, как потребитель запускается из командной строки. Но не уверен насчет стандартного способа помешать потребителю переработку.

В моей локальной машине Windows я написал простого автономного производителя и потребителя. Теперь я остановлю автономного потребителя от дальнейшей обработки с использованием другого кода / скрипта. `private static final String TOPIC =" conftest "; частная конечная статическая строка BOOTSTRAP_SERVERS = "Локальный: 9092";

private static Consumer<String, String> createConsumer() {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    // Create the consumer using props.
    consumer = new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(TOPIC));
    return consumer;
}

public static void main(String args[]) throws Exception {
    runConsumer();
}

private static void runConsumer(){
    Consumer<String, String> consumer = createConsumer();
    try{
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); 
            }
            consumer.commitAsync();
        }
    }catch(WakeupException wue){
        System.out.println("Wake Up Exception Occured");
        wue.printStackTrace();
    }
    catch(Exception e){
        e.printStackTrace();
    }finally {
        consumer.close();
    }
}`

1 Ответ

0 голосов
/ 13 января 2019

Используйте метод close, чтобы закрыть KafkaConsumer Просто позвоните consumer.close();

public void close ()

Закройте получателя, ожидая до времени ожидания по умолчанию 30 секунд для любой необходимой очистки. Если автоматическая фиксация включена, это будет фиксировать текущие смещения, если это возможно, в течение времени ожидания по умолчанию. Смотрите close (long, TimeUnit) для подробностей. Обратите внимание, что wakeup () нельзя использовать для прерывания закрытия.

закрытие публичной пустоты (длительное время ожидания, java.util.concurrent.TimeUnit timeUnit) с указанным временем

Пытается аккуратно закрыть потребителя в течение указанного времени ожидания. Этот метод ожидает истечения времени ожидания для потребителя, чтобы завершить ожидающие фиксации и покинуть группу. Если автоматическая фиксация включена, это будет фиксировать текущие смещения, если это возможно в течение тайм-аута. Если потребитель не может выполнить коммиты смещения и грациозно покинуть группу до истечения времени ожидания, он принудительно закрывается. Обратите внимание, что wakeup () нельзя использовать для прерывания закрытия.

...