Мне нужно написать Java-код, чтобы пользователь kafka мог потреблять сообщения. После того, как потребитель запускается из командной строки. Но не уверен насчет стандартного способа помешать потребителю переработку.
В моей локальной машине Windows я написал простого автономного производителя и потребителя. Теперь я остановлю автономного потребителя от дальнейшей обработки с использованием другого кода / скрипта.
`private static final String TOPIC =" conftest ";
частная конечная статическая строка BOOTSTRAP_SERVERS =
"Локальный: 9092";
private static Consumer<String, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
// Create the consumer using props.
consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public static void main(String args[]) throws Exception {
runConsumer();
}
private static void runConsumer(){
Consumer<String, String> consumer = createConsumer();
try{
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
}catch(WakeupException wue){
System.out.println("Wake Up Exception Occured");
wue.printStackTrace();
}
catch(Exception e){
e.printStackTrace();
}finally {
consumer.close();
}
}`