Флинк не погружает данные в тему кафки - PullRequest
0 голосов
/ 30 августа 2018

Я написал код flink, который читает файл csv из папки и топит данные по теме kafka.

Вот моя работа на пороге:

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

org.apache.flink.core.fs.Path filePath = new           
org.apache.flink.core.fs.Path(feedFileFolder);

RowCsvInputFormat format = new RowCsvInputFormat(filePath, 
FetchTypeInformation.getTypeInformation());


DataStream<Row> inputStream = env.readFile(format, feedFileFolder, 
FileProcessingMode.PROCESS_CONTINUOUSLY,
parseInt(folderLookupTime));


DataStream<String> speStream = inputStream.filter(new FilterFunction<Row> 
().map(new MapFunction<Row, String>() {
@Override
public String map(Row row) {

            ...............
            return resultingJsonString;
        }
    });

Properties props = Producer.getProducerConfig(propertiesFilePath);

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName, new
    KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
     FlinkKafkaProducer011.Semantic.EXACTLY_ONCE));

Выпуск

Когда я выполняю вышеуказанное задание Flink из Eclipse, оно работает абсолютно нормально. Если я поместил файл из 10 записей, я смогу увидеть на пользовательском интерфейсе flink, что flink job поглощает 10 записей по теме kafka.

Name           Bytes received   Records received    Records sent
Source:        0 B              0                   1       
Split Reader   1.12 KB          1                   10      
Sink: Unnamed  1.79 KB          10                  0   

Но когда я выполняю это (как jar) на сервере flink, задание не может получить данные по теме kafka. Интерфейс Flink выглядит следующим образом:

Name           Bytes received   Records received    Records sent
Source:        0 B              0                   1       
Split Reader   616 B            1                   0       
Sink: Unnamed  450 B            0                   0
...