Как распечатать данные темы кафки в программе Flink? - PullRequest
0 голосов
/ 05 ноября 2018

Я создал тему с помощью этой инструкции:

C:\kafka_2.12-0.10.2.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test < C:\User11\Desktop\Data.csv

Тогда я проверил тему, правильно ли эти данные. После этого я хотел напечатать тему в программе Flink. Моя программа такова:

 try{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),properties));

           stream.print();
    env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

Но я получил эту ИНФОРМАЦИЮ (потому что ИНФО слишком длинная, мне пришлось ее написать):

[main] INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - Выполнение задания на локальном встроенном мини-кластере Flink [главная] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - запуск Flink Mini Cluster [main] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - Начальный реестр метрик [main] INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Репортер метрик не настроен, метрики не будут отображаться / сообщаться. [main] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - Запуск служб RPC [flink-akka.actor.default-dispatcher-2] ИНФОРМАЦИЯ akka.event.slf4j.Slf4jLogger - запущен Slf4jLogger [main] ИНФОРМАЦИЯ org.apache.flink.runtime.minicluster.MiniCluster - Запуск служб высокой доступности [main] INFO org.apache.flink.runtime.blob.BlobServer - Создан каталог хранилища BLOB-сервера C: \ Users \ user11 \ AppData \ Local \ Temp \ blobStore-a02ff126-35cc-4c1b-b300-8689d19ff5d2 [main] INFO org.apache.flink.runtime.blob.BlobServer - Запущен BLOB-сервер с 0.0.0.0:57907 - максимальное количество одновременных запросов: 50 - максимальное отставание: 1000

Кроме того, я тоже видел эту ссылку, и она не решила мою проблему: Как получить доступ / прочитать данные темы кафки из flink?

Скажите, пожалуйста, в чем проблема?

Спасибо.

1 Ответ

0 голосов
/ 23 июня 2019

Проблема решена. Сначала я заполнил тему Кафки этой командой:

/home/kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list 10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092 --topic flinkTopic < transactions2.csv

Затем, используя этот код, я мог напечатать тему Кафки:

 final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 Properties prop = new Properties();
 prop.setProperty("bootstrap.servers", 
 "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
 prop.setProperty("group.id", "test");
    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 
  ("flinkTopic", new SimpleStringSchema(),prop);
    myConsumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(myConsumer);
    stream.print();
    env.execute("Flink Streaming Java API Skeleton");

Надеюсь, это было полезно для других.

...