Я написал код flink, который читает файл csv из папки и топит данные по теме kafka.
Вот моя работа на пороге:
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
org.apache.flink.core.fs.Path filePath = new
org.apache.flink.core.fs.Path(feedFileFolder);
RowCsvInputFormat format = new RowCsvInputFormat(filePath,
FetchTypeInformation.getTypeInformation());
DataStream<Row> inputStream = env.readFile(format, feedFileFolder,
FileProcessingMode.PROCESS_CONTINUOUSLY,
parseInt(folderLookupTime));
DataStream<String> speStream = inputStream.filter(new FilterFunction<Row>
().map(new MapFunction<Row, String>() {
@Override
public String map(Row row) {
...............
return resultingJsonString;
}
});
Properties props = Producer.getProducerConfig(propertiesFilePath);
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName, new
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE));
Выпуск
Когда я выполняю вышеуказанное задание Flink из Eclipse, оно работает абсолютно нормально.
Если я поместил файл из 10 записей, я смогу увидеть на пользовательском интерфейсе flink, что flink job поглощает 10 записей по теме kafka.
Name Bytes received Records received Records sent
Source: 0 B 0 1
Split Reader 1.12 KB 1 10
Sink: Unnamed 1.79 KB 10 0
Но когда я выполняю это (как jar) на сервере flink, задание не может получить данные по теме kafka. Интерфейс Flink выглядит следующим образом:
Name Bytes received Records received Records sent
Source: 0 B 0 1
Split Reader 616 B 1 0
Sink: Unnamed 450 B 0 0