Я создал тему с помощью этой инструкции:
C:\kafka_2.12-0.10.2.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test < C:\User11\Desktop\Data.csv
Тогда я проверил тему, правильно ли эти данные. После этого я хотел напечатать тему в программе Flink. Моя программа такова:
try{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),properties));
stream.print();
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
Но я получил эту ИНФОРМАЦИЮ (потому что ИНФО слишком длинная, мне пришлось ее написать):
[main] INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - Выполнение задания на локальном встроенном мини-кластере Flink
[главная] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - запуск Flink Mini Cluster
[main] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - Начальный реестр метрик
[main] INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Репортер метрик не настроен, метрики не будут отображаться / сообщаться.
[main] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - Запуск служб RPC
[flink-akka.actor.default-dispatcher-2] ИНФОРМАЦИЯ akka.event.slf4j.Slf4jLogger - запущен Slf4jLogger
[main] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - Запуск служб высокой доступности
[main] INFO org.apache.flink.runtime.blob.BlobServer - Создан каталог хранилища BLOB-сервера C: \ Users \ user11 \ AppData \ Local \ Temp \ blobStore-a02ff126-35cc-4c1b-b300-8689d19ff5d2
[main] INFO org.apache.flink.runtime.blob.BlobServer - Запущен BLOB-сервер с 0.0.0.0:57907 - максимальное количество одновременных запросов: 50 - максимальное отставание: 1000
Кроме того, я тоже видел эту ссылку, и она не решила мою проблему:
Как получить доступ / прочитать данные темы кафки из flink?
Скажите, пожалуйста, в чем проблема?
Спасибо.