Я использую Ubuntu 18 и использую Java (intellij IDE) и пишу базовое приложение для kafka. Я пытаюсь использовать базовый пример из здесь , и для потоковой передачи информации в приложение, а затем распечатать что-то на экран, я запускаю приложение с помощью команды intellij «run».
Когда я подключаю приложение к выходному потоку, оно работает нормально, и мне удается вывести информацию на терминал.
Я попытался добавить System.out.println()
в методах foreach, в методах применения, он не работает, я добавляю точки останова внутри них и запускаю режим отладки, и он не добирается, я думаю, что поток не добирается во время бежать.
Я передаю информацию в приложение, используя соответствующую тему, и распечатки, которые я помещаю в приложение вне приложения и foreach, работают нормально.
Как заставить приложение печатать что-либо каждый раз, когда я передаю на него информацию? основная идея состоит в том, чтобы что-то обрабатывать и выводить результаты на монитор, а не в поток kafka
Вот код:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Printed;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class main {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
System.out.println("yeah");
}
});
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
System.out.println("what");
return value;
}
})
.count();
//System.exit(0);
// need to override value serde to Long type
System.out.println("what");
//counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(2);
}
System.exit(0);
}
}